sensordatastore.cpp 31.8 KB
Newer Older
1 2 3
//================================================================================
// Name        : sensordatastore.cpp
// Author      : Axel Auweter
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5 6 7 8 9 10
// Copyright   : Leibniz Supercomputing Centre
// Description : C++ API implementation for inserting and querying DCDB sensor data.
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
11
// Copyright (C) 2011-2019 Leibniz Supercomputing Centre
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Lesser General Public
// License as published by the Free Software Foundation; either
// version 2.1 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
// Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public
// License along with this library; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
//================================================================================
27

28 29
/**
 * @page libdcdb libdcdb
Michael Ott's avatar
Michael Ott committed
30
 * The libdcdb library is a dynamic runtime library providing
31 32 33 34
 * functions to initialize and access the DCDB data store. It
 * is being used by the CollectAgent to handle insertion of
 * data and can be used by tools responsible for data analysis.
 *
35 36 37
 * Its main class is the DCDB::SensorDataStore class which
 * provides functions to connect to the data store, initialize
 * an empty data base and to retrieve data.
38
 *
39 40 41 42
 * For its internal handling, DCDB::SensorDataStore relies on
 * the DCDB::SensorDataStoreImpl class (which hides all private
 * member functions belonging to the SensorDataStore class from
 * the header that is used by programmers who link against this
43 44 45 46
 * library). Raw database functionality is abstracted into the
 * CassandraBackend class (to easy switching to other
 * key-value style databases in the future).
 *
47
 * To use the library in your client application, simply
48 49 50 51
 * include the sensordatastore.h header file and initialize
 * an object of the SensorDataStore class.
 */

Axel Auweter's avatar
Axel Auweter committed
52 53
#include <string>
#include <iostream>
54 55
#include <cstdint>
#include <cinttypes>
Axel Auweter's avatar
Axel Auweter committed
56

57
#include "cassandra.h"
Axel Auweter's avatar
Axel Auweter committed
58

59
#include "dcdb/sensordatastore.h"
60
#include "sensordatastore_internal.h"
61
#include "dcdb/connection.h"
62 63 64
#include "dcdbglobals.h"

#include "dcdbendian.h"
65

66 67
using namespace DCDB;

68 69 70 71 72 73 74 75 76 77 78 79
SensorDataStoreReading::SensorDataStoreReading() {
}

SensorDataStoreReading::SensorDataStoreReading(SensorId& sid, uint64_t ts, int64_t value) {
  this->sensorId = sid;
  this->timeStamp = TimeStamp(ts);
  this->value = value;
}

SensorDataStoreReading::~SensorDataStoreReading() {
}

80 81 82 83 84 85 86 87
/**
 * @details
 * Since we want high-performance inserts, we prepare the
 * insert CQL query in advance and only bind it on the actual
 * insert.
 */
void SensorDataStoreImpl::prepareInsert(uint64_t ttl)
{
88 89 90 91 92 93 94 95
    CassError rc = CASS_OK;
    CassFuture* future = NULL;
    const char* query;
    
    /*
    * Free the old prepared if necessary.
    */
    if (preparedInsert) {
96
      cass_prepared_free(preparedInsert);
97 98 99 100 101 102 103 104
    }
    
    query = "INSERT INTO dcdb.sensordata (sid, ws, ts, value) VALUES (?, ?, ?, ?) USING TTL ? ;";
    future = cass_session_prepare(session, query);
    cass_future_wait(future);
    
    rc = cass_future_error_code(future);
    if (rc != CASS_OK) {
105
      connection->printError(future);
106
    } else {
107
      preparedInsert = cass_future_get_prepared(future);
108 109
    }
    cass_future_free(future);
110

111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128
    // Preparing the insert statement without a TTL clause
    if (preparedInsert_noTTL) {
        cass_prepared_free(preparedInsert_noTTL);
    }
    
    query = "INSERT INTO dcdb.sensordata (sid, ws, ts, value) VALUES (?, ?, ?, ?);";
    future = cass_session_prepare(session, query);
    cass_future_wait(future);
    
    rc = cass_future_error_code(future);
    if (rc != CASS_OK) {
        connection->printError(future);
    } else {
        preparedInsert_noTTL = cass_future_get_prepared(future);
    }
    cass_future_free(future);
        
    defaultTTL = ttl;
129 130
}

131 132 133 134 135 136 137 138 139 140 141 142 143 144
/**
 * @details
 * To insert a sensor reading, the Rsvd field of the SensorId must
 * be filled with a time component that ensures that the maximum
 * number of 2^32 columns per key is not exceeded while still
 * allowing relatively easy retrieval of data.
 *
 * We achieve this by using a "week-stamp" (i.e. number of weeks
 * since Unix epoch) within the Rsvd field of the SensorId before
 * calling the Cassandra Backend to do the raw insert.
 *
 * Applications should not call this function directly, but
 * use the insert function provided by the SensorDataStore class.
 */
145
void SensorDataStoreImpl::insert(SensorId* sid, uint64_t ts, int64_t value, int64_t ttl)
146
{
147
#if 0
148 149 150
  std::cout << "Inserting@SensorDataStoreImpl (" << sid->raw[0] << " " << sid->raw[1] << ", " << ts << ", " << value << ")" << std::endl;
#endif

151
  /* Calculate and insert week number */
152
  uint16_t week = ts / 604800000000000;
Axel Auweter's avatar
Axel Auweter committed
153
  sid->setRsvd(week);
154 155

  int64_t ttlReal = (ttl<0 ? defaultTTL : ttl);
156
  
157
  CassStatement* statement = cass_prepared_bind(ttlReal<=0 ? preparedInsert_noTTL : preparedInsert);
158 159
  cass_statement_bind_string_by_name(statement, "sid", sid->getId().c_str());
  cass_statement_bind_int16_by_name(statement, "ws", week);
160 161
  cass_statement_bind_int64_by_name(statement, "ts", ts);
  cass_statement_bind_int64_by_name(statement, "value", value);
162 163
  if(ttlReal>0)
      cass_statement_bind_int32(statement, 4, ttlReal);
164
  
165 166
  CassFuture* future = cass_session_execute(session, statement);
  cass_statement_free(statement);
167

168
  /* Don't wait for the future, just free it to make the call truly asynchronous */
169
  cass_future_free(future);
170 171
}

172 173
void SensorDataStoreImpl::insert(SensorDataStoreReading& reading, int64_t ttl) {
  insert(&reading.sensorId, reading.timeStamp.getRaw(), reading.value, ttl);
174 175
}

176
void SensorDataStoreImpl::insertBatch(std::list<SensorDataStoreReading>& readings, int64_t ttl) {
177 178
  CassBatch* batch = cass_batch_new(CASS_BATCH_TYPE_UNLOGGED);
  
179 180
  int64_t ttlReal = (ttl<0 ? defaultTTL : ttl);
  
181 182 183 184 185 186
  for (auto r: readings) {
    /* Calculate and insert week number */
    uint16_t week = r.timeStamp.getRaw() / 604800000000000;
    r.sensorId.setRsvd(week);
    
    /* Add insert statement to batch */
187
    CassStatement* statement = cass_prepared_bind(ttlReal<=0 ? preparedInsert_noTTL : preparedInsert);
188 189
    cass_statement_bind_string_by_name(statement, "sid", r.sensorId.getId().c_str());
    cass_statement_bind_int16_by_name(statement, "ws", week);
190 191
    cass_statement_bind_int64_by_name(statement, "ts", r.timeStamp.getRaw());
    cass_statement_bind_int64_by_name(statement, "value", r.value);
192 193
    if(ttlReal>0)
        cass_statement_bind_int32(statement, 4, ttlReal);
194 195 196 197 198 199 200
    cass_batch_add_statement(batch, statement);
    cass_statement_free(statement);
  }
  
  /* Execute batch */
  CassFuture *future = cass_session_execute_batch(session, batch);
  cass_batch_free(batch);
Alessio Netti's avatar
Alessio Netti committed
201 202 203

  if(debugLog)
    cass_future_set_callback(future, DataStoreImpl_on_result, NULL);
204
  
205
  /* Don't wait for the future, just free it to make the call truly asynchronous */
206 207 208
  cass_future_free(future);
}

209 210
/**
 * @details
211 212
 * This function updates the prepared statement for inserts
 * with the new TTL value.
213
 */
214
void SensorDataStoreImpl::setTTL(uint64_t ttl)
215
{
216
  prepareInsert(ttl);
217 218
}

Alessio Netti's avatar
Alessio Netti committed
219 220 221 222 223 224 225 226 227
/**
 * @brief Enables or disables logging of Cassandra insert errors
 * @param dl        true to enable logging, false otherwise
 */
void SensorDataStoreImpl::setDebugLog(bool dl)
{
    debugLog = dl;
}

Axel Auweter's avatar
Axel Auweter committed
228 229 230 231
/**
 * @details
 * This function issues a regular query to the data store
 * and creates a SensorDataStoreReading object for each
232
 * entry which is stored in the result list.
Axel Auweter's avatar
Axel Auweter committed
233
 */
234
void SensorDataStoreImpl::query(std::list<SensorDataStoreReading>& result, SensorId& sid, TimeStamp& start, TimeStamp& end, QueryAggregate aggregate) {
Axel Auweter's avatar
Axel Auweter committed
235 236 237 238
  CassError rc = CASS_OK;
  CassStatement* statement = NULL;
  CassFuture *future = NULL;
  const CassPrepared* prepared = nullptr;
239 240 241 242 243 244 245
  
  std::string query = std::string("SELECT ts,");
  if (aggregate == AGGREGATE_NONE) {
    query.append("value");
  } else {
    query.append(AggregateString[aggregate] + std::string("(value) as value"));
  }
246
  query.append(" FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE sid = ? AND ws = ? AND ts >= ? AND ts <= ? ;");
247
  future = cass_session_prepare(session, query.c_str());
Axel Auweter's avatar
Axel Auweter committed
248 249 250 251 252 253 254 255 256 257 258
  cass_future_wait(future);

  rc = cass_future_error_code(future);
  if (rc != CASS_OK) {
    connection->printError(future);
    cass_future_free(future);
    return;
  }

  prepared = cass_future_get_prepared(future);
  cass_future_free(future);
259
  
260
#if 0
261
  std::cout << "Query: " << query << std::endl << "sid: " << sid.toString() << " ts1: " << start.getRaw() << " ts2: " << end.getRaw() << std::endl;
262
#endif
263

Axel Auweter's avatar
Axel Auweter committed
264
  statement = cass_prepared_bind(prepared);
265
  cass_statement_set_paging_size(statement, PAGING_SIZE);
266 267 268 269 270
  cass_statement_bind_string(statement, 0, sid.getId().c_str());
  cass_statement_bind_int16(statement, 1, sid.getRsvd());
  cass_statement_bind_int64(statement, 2, start.getRaw());
  cass_statement_bind_int64(statement, 3, end.getRaw());
  
271 272 273 274
  bool morePages = false;
  do {
      future = cass_session_execute(session, statement);
      cass_future_wait(future);
Axel Auweter's avatar
Axel Auweter committed
275

276 277 278
      if (cass_future_error_code(future) == CASS_OK) {
          const CassResult *cresult = cass_future_get_result(future);
          CassIterator *rows = cass_iterator_from_result(cresult);
Axel Auweter's avatar
Axel Auweter committed
279

280
          SensorDataStoreReading entry;
281

282 283
          while (cass_iterator_next(rows)) {
              const CassRow *row = cass_iterator_get_row(rows);
Axel Auweter's avatar
Axel Auweter committed
284

285 286 287
              cass_int64_t ts, value;
              cass_value_get_int64(cass_row_get_column_by_name(row, "ts"), &ts);
              cass_value_get_int64(cass_row_get_column_by_name(row, "value"), &value);
Axel Auweter's avatar
Axel Auweter committed
288

289 290 291
              entry.sensorId = sid;
              entry.timeStamp = (uint64_t) ts;
              entry.value = (int64_t) value;
Axel Auweter's avatar
Axel Auweter committed
292

293
              result.push_back(entry);
Axel Auweter's avatar
Axel Auweter committed
294
#if 0
295 296 297 298 299 300 301 302 303
              if (localtime) {
                  t.convertToLocal();
              }
              if (raw) {
                  std::cout << sensorName << "," << std::dec << t.getRaw() << "," << std::dec << value << std::endl;
              }
              else {
                  std::cout << sensorName << "," << t.getString() << "," << std::dec << value << std::endl;
              }
Axel Auweter's avatar
Axel Auweter committed
304
#endif
305 306 307 308 309 310 311 312 313
          }
          
          if((morePages = cass_result_has_more_pages(cresult)))
              cass_statement_set_paging_state(statement, cresult);

          cass_iterator_free(rows);
          cass_result_free(cresult);
      } else {
          morePages = false;
Axel Auweter's avatar
Axel Auweter committed
314
      }
315 316 317 318
      
      cass_future_free(future);
  } 
  while(morePages);
Axel Auweter's avatar
Axel Auweter committed
319 320 321 322 323

  cass_statement_free(statement);
  cass_prepared_free(prepared);
}

324 325 326 327 328 329 330
/**
 * @details
 * This function issues a regular query to the data store,
 * queries an arbitrary number of sensors simultaneously
 * and creates a SensorDataStoreReading object for each
 * entry which is stored in the result list.
 */
331
void SensorDataStoreImpl::query(std::list<SensorDataStoreReading>& result, std::list<SensorId>& sids, TimeStamp& start, TimeStamp& end, QueryAggregate aggregate, bool storeSids) {
332 333 334 335 336 337 338 339
    if(sids.empty())
        return;
    
    CassError rc = CASS_OK;
    CassStatement* statement = NULL;
    CassFuture *future = NULL;
    const CassPrepared* prepared = nullptr;

340
    std::string query = std::string(storeSids ? "SELECT sid,ts," : "SELECT ts,");
341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374
    if (aggregate == AGGREGATE_NONE) {
        query.append("value");
    } else {
        query.append(AggregateString[aggregate] + std::string("(value) as value"));
    }
    query.append(" FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE sid in ? AND ws = ? AND ts >= ? AND ts <= ? ;");
    future = cass_session_prepare(session, query.c_str());
    cass_future_wait(future);

    rc = cass_future_error_code(future);
    if (rc != CASS_OK) {
        connection->printError(future);
        cass_future_free(future);
        return;
    }

    prepared = cass_future_get_prepared(future);
    cass_future_free(future);

    auto sidIt = sids.begin();
    size_t sidCtr = 0;

    while(sidIt != sids.end()) {
        CassCollection *cassList = cass_collection_new(CASS_COLLECTION_TYPE_LIST, sids.size());

        sidCtr = 0;
        // Breaking up the original list of sids in chunks
        while(sidIt != sids.end() && sidCtr < QUERY_GROUP_LIMIT) {
            cass_collection_append_string(cassList, sidIt->getId().c_str());
            ++sidIt;
            ++sidCtr;
        }

        statement = cass_prepared_bind(prepared);
375
        cass_statement_set_paging_size(statement, PAGING_SIZE);
376 377 378 379 380
        cass_statement_bind_collection(statement, 0, cassList);
        cass_statement_bind_int16(statement, 1, sids.front().getRsvd());
        cass_statement_bind_int64(statement, 2, start.getRaw());
        cass_statement_bind_int64(statement, 3, end.getRaw());

381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407
        bool morePages = false;
        do {
            future = cass_session_execute(session, statement);
            cass_future_wait(future);
    
            if (cass_future_error_code(future) == CASS_OK) {
                const CassResult *cresult = cass_future_get_result(future);
                CassIterator *rows = cass_iterator_from_result(cresult);
                SensorDataStoreReading entry;
                cass_int64_t ts, value;
                const char *name;
                size_t name_len;
    
                while (cass_iterator_next(rows)) {
                    const CassRow *row = cass_iterator_get_row(rows);
    
                    cass_value_get_int64(cass_row_get_column_by_name(row, "ts"), &ts);
                    cass_value_get_int64(cass_row_get_column_by_name(row, "value"), &value);
                    entry.timeStamp = (uint64_t) ts;
                    entry.value = (int64_t) value;
                    
                    if(storeSids) {
                        cass_value_get_string(cass_row_get_column_by_name(row, "sid"), &name, &name_len);
                        entry.sensorId = SensorId(std::string(name, name_len));
                    }
                    
                    result.push_back(entry);
408
#if 0
409 410 411 412 413 414 415 416 417
                    if (localtime) {
                      t.convertToLocal();
                  }
                  if (raw) {
                      std::cout << sensorName << "," << std::dec << t.getRaw() << "," << std::dec << value << std::endl;
                  }
                  else {
                      std::cout << sensorName << "," << t.getString() << "," << std::dec << value << std::endl;
                  }
418
#endif
419 420 421 422 423 424 425 426 427
                }
    
                if((morePages = cass_result_has_more_pages(cresult)))
                    cass_statement_set_paging_state(statement, cresult);
                
                cass_iterator_free(rows);
                cass_result_free(cresult);
            } else {
                morePages = false;
428
            }
429 430 431
    
            cass_future_free(future);

432
        }
433
        while(morePages);
434 435 436 437 438 439 440 441

        cass_statement_free(statement);
        cass_collection_free(cassList);
    }
    
    cass_prepared_free(prepared);
}

442 443 444 445 446 447 448
/**
 * @details
 * This function performs a fuzzy query on the datastore,
 * picking a single sensor readings that is closest to
 * the one given as input
 */
void SensorDataStoreImpl::fuzzyQuery(std::list<SensorDataStoreReading>& result, SensorId& sid, TimeStamp& ts, uint64_t tol_ns) {
449
    /* Find the readings before time t */
450 451 452 453
    CassError rc = CASS_OK;
    CassStatement* statement = NULL;
    CassFuture *future = NULL;
    const CassPrepared* prepared = nullptr;
454 455
    const char* queryBefore = "SELECT sid,ts,value FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE sid = ? AND ws = ? AND ts <= ? ORDER BY ws DESC, ts DESC LIMIT 1";

456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476
    future = cass_session_prepare(session, queryBefore);
    cass_future_wait(future);

    rc = cass_future_error_code(future);
    if (rc != CASS_OK) {
        connection->printError(future);
        cass_future_free(future);
        return;
    }

    prepared = cass_future_get_prepared(future);
    cass_future_free(future);

    statement = cass_prepared_bind(prepared);
    cass_statement_bind_string(statement, 0, sid.getId().c_str());
    cass_statement_bind_int16(statement, 1, sid.getRsvd());
    cass_statement_bind_int64(statement, 2, ts.getRaw());

    future = cass_session_execute(session, statement);
    cass_future_wait(future);
    SensorDataStoreReading r;
477

478 479 480
    if (cass_future_error_code(future) == CASS_OK) {
        const CassResult* cresult = cass_future_get_result(future);
        CassIterator* rows = cass_iterator_from_result(cresult);
481

482 483 484 485 486 487 488
        while (cass_iterator_next(rows)) {
            const CassRow* row = cass_iterator_get_row(rows);

            cass_int64_t tsInt, value;
            cass_value_get_int64(cass_row_get_column_by_name(row, "ts"), &tsInt);
            cass_value_get_int64(cass_row_get_column_by_name(row, "value"), &value);

489
            if(ts.getRaw() - (uint64_t)tsInt < tol_ns) {
490 491 492
                r.sensorId = sid;
                r.timeStamp = (uint64_t) tsInt;
                r.value = (int64_t) value;
493
                result.push_back(r);
494 495 496 497 498 499 500 501 502
            }
        }
        cass_iterator_free(rows);
        cass_result_free(cresult);
    }

    cass_statement_free(statement);
    cass_future_free(future);
    cass_prepared_free(prepared);
503
}
504

505 506 507 508 509 510
/**
 * @details
 * This function performs a fuzzy query on the datastore,
 * picking readings from a set of sensors that are closest to
 * the timestamp given as input
 */
511
void SensorDataStoreImpl::fuzzyQuery(std::list<SensorDataStoreReading>& result, std::list<SensorId>& sids, TimeStamp& ts, uint64_t tol_ns, bool storeSids) {
512 513 514 515 516 517 518 519
    if(sids.empty())
        return;

    /* Find the readings before time t */
    CassError rc = CASS_OK;
    CassStatement *statement = NULL;
    CassFuture *future = NULL;
    const CassPrepared *prepared = nullptr;
Alessio Netti's avatar
Alessio Netti committed
520 521 522 523 524
    const char *queryBefore;
    if(storeSids) 
        queryBefore = "SELECT sid,ts,value FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE sid in ? AND ws = ? AND ts <= ? ORDER BY ws DESC, ts DESC PER PARTITION LIMIT 1";
    else
        queryBefore = "SELECT ts,value FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE sid in ? AND ws = ? AND ts <= ? ORDER BY ws DESC, ts DESC PER PARTITION LIMIT 1";
525 526

    future = cass_session_prepare(session, queryBefore);
527 528 529 530 531 532 533 534 535 536 537
    cass_future_wait(future);

    rc = cass_future_error_code(future);
    if (rc != CASS_OK) {
        connection->printError(future);
        cass_future_free(future);
        return;
    }

    prepared = cass_future_get_prepared(future);
    cass_future_free(future);
538 539 540 541 542 543 544 545 546 547 548 549 550
    
    auto sidIt = sids.begin();
    size_t sidCtr = 0;
    
    while(sidIt != sids.end()) {
        CassCollection *cassList = cass_collection_new(CASS_COLLECTION_TYPE_LIST, QUERY_GROUP_LIMIT);
        sidCtr = 0;
        // Breaking up the original list of sids in chunks
        while(sidIt != sids.end() && sidCtr < QUERY_GROUP_LIMIT) {
            cass_collection_append_string(cassList, sidIt->getId().c_str());
            ++sidIt;
            ++sidCtr;
        }
551

552 553 554 555 556
        statement = cass_prepared_bind(prepared);
        cass_statement_set_paging_size(statement, -1);
        cass_statement_bind_collection(statement, 0, cassList);
        cass_statement_bind_int16(statement, 1, sids.front().getRsvd());
        cass_statement_bind_int64(statement, 2, ts.getRaw());
557

558 559 560
        future = cass_session_execute(session, statement);
        cass_future_wait(future);
        SensorDataStoreReading r;
561

562 563 564
        if (cass_future_error_code(future) == CASS_OK) {
            const CassResult *cresult = cass_future_get_result(future);
            CassIterator *rows = cass_iterator_from_result(cresult);
565
            cass_int64_t tsInt, value;
566 567 568 569 570 571 572 573
            const char *name;
            size_t name_len;

            while (cass_iterator_next(rows)) {
                const CassRow *row = cass_iterator_get_row(rows);

                cass_value_get_int64(cass_row_get_column_by_name(row, "ts"), &tsInt);
                cass_value_get_int64(cass_row_get_column_by_name(row, "value"), &value);
574
                
575
                if (ts.getRaw() - (uint64_t) tsInt < tol_ns) {
576 577 578 579
                    if(storeSids) {
                        cass_value_get_string(cass_row_get_column_by_name(row, "sid"), &name, &name_len);
                        r.sensorId = SensorId(std::string(name, name_len));
                    }
580 581 582 583
                    r.timeStamp = (uint64_t) tsInt;
                    r.value = (int64_t) value;
                    result.push_back(r);
                }
584
            }
585 586
            cass_iterator_free(rows);
            cass_result_free(cresult);
587 588
        }

589 590 591
        cass_statement_free(statement);
        cass_future_free(future);
        cass_collection_free(cassList);
592 593
    }
    
594
    cass_prepared_free(prepared);
595 596
}

597

598 599 600 601 602
/**
 * @details
 * This function issues a regular query to the data store
 * and calls cbFunc for every reading.
 */
603
void SensorDataStoreImpl::queryCB(SensorDataStore::QueryCbFunc cbFunc, void* userData, SensorId& sid, TimeStamp& start, TimeStamp& end, QueryAggregate aggregate)
604 605 606 607 608
{
  CassError rc = CASS_OK;
  CassStatement* statement = NULL;
  CassFuture *future = NULL;
  const CassPrepared* prepared = nullptr;
609 610 611 612 613 614 615
  
  std::string query = std::string("SELECT ts,");
  if (aggregate == AGGREGATE_NONE) {
    query.append("value");
  } else {
    query.append(AggregateString[aggregate] + std::string("(value) as value"));
  }
616
  query.append(" FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE sid = ? AND ws = ? AND ts >= ? AND ts <= ? ;");
617
  future = cass_session_prepare(session, query.c_str());
618 619 620 621 622 623 624 625 626 627 628
  cass_future_wait(future);

  rc = cass_future_error_code(future);
  if (rc != CASS_OK) {
    connection->printError(future);
    cass_future_free(future);
    return;
  }

  prepared = cass_future_get_prepared(future);
  cass_future_free(future);
629
  
630
  statement = cass_prepared_bind(prepared);
631 632 633 634
  cass_statement_bind_string(statement, 0, sid.getId().c_str());
  cass_statement_bind_int16(statement, 1, sid.getRsvd());
  cass_statement_bind_int64(statement, 2, start.getRaw());
  cass_statement_bind_int64(statement, 3, end.getRaw());
635 636 637 638 639 640 641 642 643 644 645 646 647 648 649 650 651 652 653 654 655 656 657 658 659 660 661 662 663 664 665 666

  future = cass_session_execute(session, statement);
  cass_future_wait(future);

  if (cass_future_error_code(future) == CASS_OK) {
      const CassResult* cresult = cass_future_get_result(future);
      CassIterator* rows = cass_iterator_from_result(cresult);

      SensorDataStoreReading entry;

      while (cass_iterator_next(rows)) {
          const CassRow* row = cass_iterator_get_row(rows);

          cass_int64_t ts, value;
          cass_value_get_int64(cass_row_get_column_by_name(row, "ts"), &ts);
          cass_value_get_int64(cass_row_get_column_by_name(row, "value"), &value);

          entry.sensorId = sid;
          entry.timeStamp = (uint64_t)ts;
          entry.value = (int64_t)value;

          cbFunc(entry, userData);
      }
      cass_iterator_free(rows);
      cass_result_free(cresult);
  }

  cass_statement_free(statement);
  cass_future_free(future);
  cass_prepared_free(prepared);
}

667 668 669 670 671 672
/**
 * @details
 * This function generates an integrated value of the time series
 * by first querying for the result set list using query() and then
 * summing up the result.
 */
673
SDSQueryResult SensorDataStoreImpl::querySum(int64_t& result, SensorId& sid, TimeStamp& start, TimeStamp& end)
674 675 676 677
{
  std::list<SensorDataStoreReading> queryResult;

  /* Issue a standard query */
678
  query(queryResult, sid, start, end, AGGREGATE_NONE);
679

Axel Auweter's avatar
Axel Auweter committed
680 681 682 683
  /* Check if at least 2 readings in result */
  if (queryResult.size() < 2)
    return SDS_EMPTYSET;

684 685 686 687 688 689 690 691 692 693 694 695 696 697 698 699 700 701 702 703 704 705 706 707 708 709 710 711
  /* Integrate the result */
  result = 0;

  SensorDataStoreReading prev;
  for (std::list<SensorDataStoreReading>::iterator it = queryResult.begin(); it != queryResult.end(); it++) {
      if (!(it == queryResult.begin())) {
          SensorDataStoreReading cur = *it;

          /* Calculate average between two readings */
          int64_t avg = (cur.value + prev.value) / 2;

          /* Calculate time difference */
          uint64_t dt = cur.timeStamp.getRaw() - prev.timeStamp.getRaw();

          /* Sum up (with lousy attempt to keep it numerically stable - should probably use double instead) */
          if (dt > 10000000000) {
              /* dt > 10s => convert dt to s first */
              dt /= 1000000000;
              result += avg * dt;
          }
          else {
              /* dt < 10s => multiply first */
              avg *= dt;
              result += avg / 1000000000;
          }
      }
      prev = *it;
  }
Axel Auweter's avatar
Axel Auweter committed
712
  return SDS_OK;
713 714
}

715 716 717 718 719 720 721 722 723 724 725 726 727 728
/**
 * @details
 * This function deletes all data from the sensordata store
 * that is older than weekStamp-1 weeks.
 */
void SensorDataStoreImpl::truncBeforeWeek(uint16_t weekStamp)
{
  /* List of rows that should be deleted */
  std::list<SensorId> deleteList;

  /* Query the database to collect all rows */
  CassError rc = CASS_OK;
  CassStatement* statement = nullptr;
  CassFuture* future = nullptr;
729
  const char* query = "SELECT DISTINCT sid,ws FROM " KEYSPACE_NAME "." CF_SENSORDATA ";";
730 731 732 733 734 735 736 737 738 739 740 741 742 743 744 745 746 747 748

  statement = cass_statement_new(query, 0);
  future = cass_session_execute(session, statement);
  cass_future_wait(future);

  rc = cass_future_error_code(future);
  if (rc != CASS_OK) {
      connection->printError(future);
      return;
  }

  const CassResult* result = cass_future_get_result(future);
  cass_future_free(future);

  CassIterator* iterator = cass_iterator_from_result(result);

  /* Iterate over all rows and filter out those, that are too old */
  while (cass_iterator_next(iterator)) {
      const CassRow* row = cass_iterator_get_row(iterator);
749
      const char* res;
750
      size_t       res_len;
751 752 753 754
      int16_t     res_ws;
      cass_value_get_string(cass_row_get_column_by_name(row, "sid"), &res, &res_len);
      cass_value_get_int16(cass_row_get_column_by_name(row, "ws"), &res_ws);
      
755
      SensorId sensor;
756 757 758
      std::string id(res, res_len);
      sensor.setId(id);
      sensor.setRsvd(res_ws);
759 760 761 762 763 764 765 766 767 768 769 770 771 772 773 774 775 776 777 778 779 780 781 782 783 784

      /* Check if the sensorId's rsvd field is smaller than the weekStamp */
      if (sensor.getRsvd() < weekStamp) {
          deleteList.push_back(sensor);
      }
  }
  cass_result_free(result);
  cass_iterator_free(iterator);
  cass_statement_free(statement);

  /* Now iterate over all entries in the deleteList and delete them */
  for (std::list<SensorId>::iterator it = deleteList.begin(); it != deleteList.end(); it++) {
      deleteRow(*it);
  }
}

/**
 * @details
 * Deleting entire rows is rather efficient compared to deleting individual columns.
 */
void SensorDataStoreImpl::deleteRow(SensorId& sid)
{
  CassError rc = CASS_OK;
  CassStatement* statement = NULL;
  CassFuture *future = NULL;
  const CassPrepared* prepared = nullptr;
785
  const char* query = "DELETE FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE sid = ? and ws = ?;";
786 787 788 789 790 791 792 793 794 795 796 797 798

  future = cass_session_prepare(session, query);
  cass_future_wait(future);

  rc = cass_future_error_code(future);
  if (rc != CASS_OK) {
    connection->printError(future);
    cass_future_free(future);
    return;
  }

  prepared = cass_future_get_prepared(future);
  cass_future_free(future);
799
  
800
  statement = cass_prepared_bind(prepared);
801 802
  cass_statement_bind_string(statement, 0, sid.getId().c_str());
  cass_statement_bind_int16(statement, 1, sid.getRsvd());
803 804 805 806 807 808 809 810 811

  future = cass_session_execute(session, statement);
  cass_future_wait(future);

  cass_statement_free(statement);
  cass_future_free(future);
  cass_prepared_free(prepared);
}

812 813
/**
 * @details
814
 * This constructor sets the internal connection variable to
815
 * the externally provided Connection object and also
816
 * retrieves the CassSession pointer of the connection.
817
 */
818
SensorDataStoreImpl::SensorDataStoreImpl(Connection* conn)
819
{
820 821
  connection = conn;
  session = connection->getSessionHandle();
Alessio Netti's avatar
Alessio Netti committed
822
  debugLog = false;
823
  defaultTTL = 0;
Axel Auweter's avatar
Axel Auweter committed
824 825

  preparedInsert = nullptr;
826 827
  preparedInsert_noTTL = nullptr;
  prepareInsert(defaultTTL);
828 829
}

830 831
/**
 * @details
832
 * Due to the simplicity of the class, the destructor is left empty.
833
 */
834
SensorDataStoreImpl::~SensorDataStoreImpl()
835
{
836
  connection = nullptr;
Axel Auweter's avatar
Axel Auweter committed
837 838 839 840
  session = nullptr;
  if (preparedInsert) {
      cass_prepared_free(preparedInsert);
  }
841 842 843
  if (preparedInsert_noTTL) {
      cass_prepared_free(preparedInsert_noTTL);
  }
844 845
}

846 847 848 849 850 851
/**
 * @details
 * Instead of doing the actual work, this function simply
 * forwards to the insert function of the SensorDataStoreImpl
 * class.
 */
852
void SensorDataStore::insert(SensorId* sid, uint64_t ts, int64_t value, int64_t ttl)
853
{
854
    impl->insert(sid, ts, value, ttl);
855 856
}

857
void SensorDataStore::insert(SensorDataStoreReading& reading, int64_t ttl)
858
{
859
  impl->insert(reading, ttl);
860 861
}

862 863
void SensorDataStore::insertBatch(std::list<SensorDataStoreReading>& readings, int64_t ttl) {
  impl->insertBatch(readings, ttl);
864 865
}

866 867
/**
 * @details
868 869 870
 * Instead of doing the actual work, this function simply
 * forwards to the insert function of the SensorDataStoreImpl
 * class.
871
 */
872
void SensorDataStore::setTTL(uint64_t ttl)
873
{
874
    impl->setTTL(ttl);
875 876
}

Alessio Netti's avatar
Alessio Netti committed
877 878 879 880 881 882 883 884 885
/**
 * @brief Enables or disables logging of Cassandra insert errors
 * @param dl        true to enable logging, false otherwise
 */
void SensorDataStore::setDebugLog(bool dl)
{
    impl->setDebugLog(dl);
}

Axel Auweter's avatar
Axel Auweter committed
886 887 888 889 890 891
/**
 * @details
 * Instead of doing the actual work, this function simply
 * forwards to the insert function of the SensorDataStoreImpl
 * class.
 */
892
void SensorDataStore::query(std::list<SensorDataStoreReading>& result, SensorId& sid, TimeStamp& start, TimeStamp& end, QueryAggregate aggregate)
Axel Auweter's avatar
Axel Auweter committed
893
{
894
    impl->query(result, sid, start, end, aggregate);
Axel Auweter's avatar
Axel Auweter committed
895 896
}

897 898 899 900 901 902
/**
 * @details
 * Instead of doing the actual work, this function simply
 * forwards to the insert function of the SensorDataStoreImpl
 * class.
 */
903
void SensorDataStore::query(std::list<SensorDataStoreReading>& result, std::list<SensorId>& sids, TimeStamp& start, TimeStamp& end, QueryAggregate aggregate, bool storeSids)
904
{
905
    impl->query(result, sids, start, end, aggregate, storeSids);
906 907
}

908 909 910 911 912 913 914 915 916 917
/**
 * @details
 * Instead of doing the actual work, this function simply
 * forwards to the insert function of the SensorDataStoreImpl
 * class.
 */
void SensorDataStore::fuzzyQuery(std::list<SensorDataStoreReading>& result, SensorId& sid, TimeStamp& ts, uint64_t tol_ns) {
    impl->fuzzyQuery(result, sid, ts, tol_ns);
}

918 919 920 921 922 923
/**
 * @details
 * Instead of doing the actual work, this function simply
 * forwards to the insert function of the SensorDataStoreImpl
 * class.
 */
924 925
void SensorDataStore::fuzzyQuery(std::list<SensorDataStoreReading>& result, std::list<SensorId>& sids, TimeStamp& ts, uint64_t tol_ns, bool storeSids) {
    impl->fuzzyQuery(result, sids, ts, tol_ns, storeSids);
926 927
}

928 929 930 931 932 933
/**
 * @details
 * Instead of doing the actual work, this function simply
 * forwards to the insert function of the SensorDataStoreImpl
 * class.
 */
934
void SensorDataStore::queryCB(SensorDataStore::QueryCbFunc cbFunc, void* userData, SensorId& sid, TimeStamp& start, TimeStamp& end, QueryAggregate aggregate)
935
{
936
    return impl->queryCB(cbFunc, userData, sid, start, end, aggregate);
937 938
}

939 940 941 942 943 944
/**
 * @details
 * Instead of doing the actual work, this function simply
 * forwards to the insert function of the SensorDataStoreImpl
 * class.
 */
945
SDSQueryResult SensorDataStore::querySum(int64_t& result, SensorId& sid, TimeStamp& start, TimeStamp& end)
946
{
Axel Auweter's avatar
Axel Auweter committed
947
    return impl->querySum(result, sid, start, end);
948 949
}

950 951 952 953 954 955 956 957 958 959 960
/**
 * @details
 * Instead of doing the actual work, this function simply
 * forwards to the insert function of the SensorDataStoreImpl
 * class.
 */
void SensorDataStore::truncBeforeWeek(uint16_t weekStamp)
{
    return impl->truncBeforeWeek(weekStamp);
}

961 962
/**
 * @details
963 964
 * This constructor allocates the implementation class which
 * holds the actual implementation of the class functionality.
965
 */
966
SensorDataStore::SensorDataStore(Connection* conn)
967
{
968
    impl = new SensorDataStoreImpl(conn);
969 970
}

971 972 973 974 975
/**
 * @details
 * The SensorDataStore desctructor deallocates the
 * SensorDataStoreImpl and CassandraBackend objects.
 */
976 977
SensorDataStore::~SensorDataStore()
{
978 979 980
  /* Clean up... */
  if (impl)
    delete impl;
981
}