//================================================================================ // Name : sensordatastore.cpp // Author : Axel Auweter // Copyright : Leibniz Supercomputing Centre // Description : C++ API implementation for inserting and querying DCDB sensor data. //================================================================================ //================================================================================ // This file is part of DCDB (DataCenter DataBase) // Copyright (C) 2011-2016 Leibniz Supercomputing Centre // // This library is free software; you can redistribute it and/or // modify it under the terms of the GNU Lesser General Public // License as published by the Free Software Foundation; either // version 2.1 of the License, or (at your option) any later version. // // This library is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU // Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public // License along with this library; if not, write to the Free Software // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA //================================================================================ /** * @mainpage * The libdcdb library is a dynamic runtime library providing * functions to initialize and access the DCDB data store. It * is being used by the CollectAgent to handle insertion of * data and can be used by tools responsible for data analysis. * * Its main class is the DCDB::SensorDataStore class which * provides functions to connect to the data store, initialize * an empty data base and to retrieve data. * * For its internal handling, DCDB::SensorDataStore relies on * the DCDB::SensorDataStoreImpl class (which hides all private * member functions belonging to the SensorDataStore class from * the header that is used by programmers who link against this * library). Raw database functionality is abstracted into the * CassandraBackend class (to easy switching to other * key-value style databases in the future). * * To use the library in your client application, simply * include the sensordatastore.h header file and initialize * an object of the SensorDataStore class. */ #include #include #include #include #include "cassandra.h" #include "dcdb/sensordatastore.h" #include "sensordatastore_internal.h" #include "dcdb/connection.h" #include "dcdbglobals.h" #include "dcdbendian.h" using namespace DCDB; /** * @details * Since we want high-performance inserts, we prepare the * insert CQL query in advance and only bind it on the actual * insert. */ void SensorDataStoreImpl::prepareInsert(uint64_t ttl) { CassError rc = CASS_OK; CassFuture* future = NULL; const char* query; /* * Free the old prepared if necessary. */ if (preparedInsert) { cass_prepared_free(preparedInsert); } char *queryBuf = NULL; if (ttl == 0) { query = "INSERT INTO dcdb.sensordata (sid, ts, value) VALUES (?, ?, ?);"; } else { queryBuf = (char*)malloc(256); snprintf(queryBuf, 256, "INSERT INTO dcdb.sensordata (sid, ts, value) VALUES (?, ?, ?) USING TTL %" PRIu64 " ;", ttl); query = queryBuf; } future = cass_session_prepare(session, query); cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); } else { preparedInsert = cass_future_get_prepared(future); } cass_future_free(future); if (queryBuf) { free(queryBuf); } } /** * @details * To insert a sensor reading, the Rsvd field of the SensorId must * be filled with a time component that ensures that the maximum * number of 2^32 columns per key is not exceeded while still * allowing relatively easy retrieval of data. * * We achieve this by using a "week-stamp" (i.e. number of weeks * since Unix epoch) within the Rsvd field of the SensorId before * calling the Cassandra Backend to do the raw insert. * * Applications should not call this function directly, but * use the insert function provided by the SensorDataStore class. */ void SensorDataStoreImpl::insert(SensorId* sid, uint64_t ts, int64_t value) { #if 0 std::cout << "Inserting@SensorDataStoreImpl (" << sid->raw[0] << " " << sid->raw[1] << ", " << ts << ", " << value << ")" << std::endl; #endif /* Calculate and insert week number */ uint16_t week = ts / 604800000000000; sid->setRsvd(week); /* Insert into Cassandra */ const std::string key = sid->serialize(); CassError rc = CASS_OK; CassStatement* statement = NULL; CassFuture *future = NULL; statement = cass_prepared_bind(preparedInsert); cass_statement_bind_bytes_by_name(statement, "sid", (cass_byte_t*)(key.c_str()), 16); cass_statement_bind_int64_by_name(statement, "ts", ts); cass_statement_bind_int64_by_name(statement, "value", value); future = cass_session_execute(session, statement); cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); } cass_future_free(future); cass_statement_free(statement); } /** * @details * This function updates the prepared statement for inserts * with the new TTL value. */ void SensorDataStoreImpl::setTTL(uint64_t ttl) { prepareInsert(ttl); } /** * @details * This function issues a regular query to the data store * and creates a SensorDataStoreReading object for each * entry which is stored in the result list. */ void SensorDataStoreImpl::query(std::list& result, SensorId& sid, TimeStamp& start, TimeStamp& end, QueryAggregate aggregate) { CassError rc = CASS_OK; CassStatement* statement = NULL; CassFuture *future = NULL; const CassPrepared* prepared = nullptr; std::string query = std::string("SELECT ts,"); if (aggregate == AGGREGATE_NONE) { query.append("value"); } else { query.append(AggregateString[aggregate] + std::string("(value) as value")); } query.append(" FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE sid = ? AND ts >= ? AND ts <= ? ;"); future = cass_session_prepare(session, query.c_str()); cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); cass_future_free(future); return; } prepared = cass_future_get_prepared(future); cass_future_free(future); const std::string key = sid.serialize(); #if 0 std::cout << "Query: " << query << std::endl << "sid: " << sid.toString() << " ts1: " << start.getRaw() << " ts2: " << end.getRaw() << std::endl; #endif statement = cass_prepared_bind(prepared); cass_statement_bind_bytes(statement, 0, (const cass_byte_t*)(key.c_str()), 16); cass_statement_bind_int64(statement, 1, start.getRaw()); cass_statement_bind_int64(statement, 2, end.getRaw()); future = cass_session_execute(session, statement); cass_future_wait(future); if (cass_future_error_code(future) == CASS_OK) { const CassResult* cresult = cass_future_get_result(future); CassIterator* rows = cass_iterator_from_result(cresult); SensorDataStoreReading entry; while (cass_iterator_next(rows)) { const CassRow* row = cass_iterator_get_row(rows); cass_int64_t ts, value; cass_value_get_int64(cass_row_get_column_by_name(row, "ts"), &ts); cass_value_get_int64(cass_row_get_column_by_name(row, "value"), &value); entry.sensorId = sid; entry.timeStamp = (uint64_t)ts; entry.value = (int64_t)value; result.push_back(entry); #if 0 if (localtime) { t.convertToLocal(); } if (raw) { std::cout << sensorName << "," << std::dec << t.getRaw() << "," << std::dec << value << std::endl; } else { std::cout << sensorName << "," << t.getString() << "," << std::dec << value << std::endl; } #endif } cass_iterator_free(rows); cass_result_free(cresult); } cass_statement_free(statement); cass_future_free(future); cass_prepared_free(prepared); } /** * @details * This function issues a regular query to the data store * and calls cbFunc for every reading. */ void SensorDataStoreImpl::queryCB(SensorDataStore::QueryCbFunc cbFunc, void* userData, SensorId& sid, TimeStamp& start, TimeStamp& end, QueryAggregate aggregate) { CassError rc = CASS_OK; CassStatement* statement = NULL; CassFuture *future = NULL; const CassPrepared* prepared = nullptr; std::string query = std::string("SELECT ts,"); if (aggregate == AGGREGATE_NONE) { query.append("value"); } else { query.append(AggregateString[aggregate] + std::string("(value) as value")); } query.append(" FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE sid = ? AND ts >= ? AND ts <= ? ;"); future = cass_session_prepare(session, query.c_str()); cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); cass_future_free(future); return; } prepared = cass_future_get_prepared(future); cass_future_free(future); const std::string key = sid.serialize(); statement = cass_prepared_bind(prepared); cass_statement_bind_bytes(statement, 0, (const cass_byte_t*)(key.c_str()), 16); cass_statement_bind_int64(statement, 1, start.getRaw()); cass_statement_bind_int64(statement, 2, end.getRaw()); future = cass_session_execute(session, statement); cass_future_wait(future); if (cass_future_error_code(future) == CASS_OK) { const CassResult* cresult = cass_future_get_result(future); CassIterator* rows = cass_iterator_from_result(cresult); SensorDataStoreReading entry; while (cass_iterator_next(rows)) { const CassRow* row = cass_iterator_get_row(rows); cass_int64_t ts, value; cass_value_get_int64(cass_row_get_column_by_name(row, "ts"), &ts); cass_value_get_int64(cass_row_get_column_by_name(row, "value"), &value); entry.sensorId = sid; entry.timeStamp = (uint64_t)ts; entry.value = (int64_t)value; cbFunc(entry, userData); } cass_iterator_free(rows); cass_result_free(cresult); } cass_statement_free(statement); cass_future_free(future); cass_prepared_free(prepared); } /** * @details * This function generates an integrated value of the time series * by first querying for the result set list using query() and then * summing up the result. */ SDSQueryResult SensorDataStoreImpl::querySum(int64_t& result, SensorId& sid, TimeStamp& start, TimeStamp& end) { std::list queryResult; /* Issue a standard query */ query(queryResult, sid, start, end, AGGREGATE_NONE); /* Check if at least 2 readings in result */ if (queryResult.size() < 2) return SDS_EMPTYSET; /* Integrate the result */ result = 0; SensorDataStoreReading prev; for (std::list::iterator it = queryResult.begin(); it != queryResult.end(); it++) { if (!(it == queryResult.begin())) { SensorDataStoreReading cur = *it; /* Calculate average between two readings */ int64_t avg = (cur.value + prev.value) / 2; /* Calculate time difference */ uint64_t dt = cur.timeStamp.getRaw() - prev.timeStamp.getRaw(); /* Sum up (with lousy attempt to keep it numerically stable - should probably use double instead) */ if (dt > 10000000000) { /* dt > 10s => convert dt to s first */ dt /= 1000000000; result += avg * dt; } else { /* dt < 10s => multiply first */ avg *= dt; result += avg / 1000000000; } } prev = *it; } return SDS_OK; } /** * @details * This function deletes all data from the sensordata store * that is older than weekStamp-1 weeks. */ void SensorDataStoreImpl::truncBeforeWeek(uint16_t weekStamp) { /* List of rows that should be deleted */ std::list deleteList; /* Query the database to collect all rows */ CassError rc = CASS_OK; CassStatement* statement = nullptr; CassFuture* future = nullptr; const char* query = "SELECT DISTINCT sid FROM " KEYSPACE_NAME "." CF_SENSORDATA ";"; statement = cass_statement_new(query, 0); future = cass_session_execute(session, statement); cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); return; } const CassResult* result = cass_future_get_result(future); cass_future_free(future); CassIterator* iterator = cass_iterator_from_result(result); /* Iterate over all rows and filter out those, that are too old */ while (cass_iterator_next(iterator)) { const CassRow* row = cass_iterator_get_row(iterator); const cass_byte_t* res; size_t res_len; cass_value_get_bytes(cass_row_get_column_by_name(row, "sid"), &res, &res_len); uint64_t raw[2]; raw[0] = Endian::BEToHost(((uint64_t*)res)[0]); raw[1] = Endian::BEToHost(((uint64_t*)res)[1]); SensorId sensor; sensor.setRaw(raw); /* Check if the sensorId's rsvd field is smaller than the weekStamp */ if (sensor.getRsvd() < weekStamp) { deleteList.push_back(sensor); } } cass_result_free(result); cass_iterator_free(iterator); cass_statement_free(statement); /* Now iterate over all entries in the deleteList and delete them */ for (std::list::iterator it = deleteList.begin(); it != deleteList.end(); it++) { deleteRow(*it); } } /** * @details * Deleting entire rows is rather efficient compared to deleting individual columns. */ void SensorDataStoreImpl::deleteRow(SensorId& sid) { CassError rc = CASS_OK; CassStatement* statement = NULL; CassFuture *future = NULL; const CassPrepared* prepared = nullptr; const char* query = "DELETE FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE sid = ?;"; future = cass_session_prepare(session, query); cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); cass_future_free(future); return; } prepared = cass_future_get_prepared(future); cass_future_free(future); const std::string key = sid.serialize(); statement = cass_prepared_bind(prepared); cass_statement_bind_bytes(statement, 0, (const cass_byte_t*)(key.c_str()), 16); future = cass_session_execute(session, statement); cass_future_wait(future); cass_statement_free(statement); cass_future_free(future); cass_prepared_free(prepared); } /** * @details * This constructor sets the internal connection variable to * the externally provided Connection object and also * retrieves the CassSession pointer of the connection. */ SensorDataStoreImpl::SensorDataStoreImpl(Connection* conn) { connection = conn; session = connection->getSessionHandle(); preparedInsert = nullptr; prepareInsert(0); } /** * @details * Due to the simplicity of the class, the destructor is left empty. */ SensorDataStoreImpl::~SensorDataStoreImpl() { connection = nullptr; session = nullptr; if (preparedInsert) { cass_prepared_free(preparedInsert); } } /** * @details * Instead of doing the actual work, this function simply * forwards to the insert function of the SensorDataStoreImpl * class. */ void SensorDataStore::insert(SensorId* sid, uint64_t ts, int64_t value) { impl->insert(sid, ts, value); } /** * @details * Instead of doing the actual work, this function simply * forwards to the insert function of the SensorDataStoreImpl * class. */ void SensorDataStore::setTTL(uint64_t ttl) { impl->setTTL(ttl); } /** * @details * Instead of doing the actual work, this function simply * forwards to the insert function of the SensorDataStoreImpl * class. */ void SensorDataStore::query(std::list& result, SensorId& sid, TimeStamp& start, TimeStamp& end, QueryAggregate aggregate) { impl->query(result, sid, start, end, aggregate); } /** * @details * Instead of doing the actual work, this function simply * forwards to the insert function of the SensorDataStoreImpl * class. */ void SensorDataStore::queryCB(SensorDataStore::QueryCbFunc cbFunc, void* userData, SensorId& sid, TimeStamp& start, TimeStamp& end, QueryAggregate aggregate) { return impl->queryCB(cbFunc, userData, sid, start, end, aggregate); } /** * @details * Instead of doing the actual work, this function simply * forwards to the insert function of the SensorDataStoreImpl * class. */ SDSQueryResult SensorDataStore::querySum(int64_t& result, SensorId& sid, TimeStamp& start, TimeStamp& end) { return impl->querySum(result, sid, start, end); } /** * @details * Instead of doing the actual work, this function simply * forwards to the insert function of the SensorDataStoreImpl * class. */ void SensorDataStore::truncBeforeWeek(uint16_t weekStamp) { return impl->truncBeforeWeek(weekStamp); } /** * @details * This constructor allocates the implementation class which * holds the actual implementation of the class functionality. */ SensorDataStore::SensorDataStore(Connection* conn) { impl = new SensorDataStoreImpl(conn); } /** * @details * The SensorDataStore desctructor deallocates the * SensorDataStoreImpl and CassandraBackend objects. */ SensorDataStore::~SensorDataStore() { /* Clean up... */ if (impl) delete impl; }