sensordatastore.cpp 32 KB
Newer Older
1
2
3
//================================================================================
// Name        : sensordatastore.cpp
// Author      : Axel Auweter
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
6
7
8
9
10
// Copyright   : Leibniz Supercomputing Centre
// Description : C++ API implementation for inserting and querying DCDB sensor data.
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
11
// Copyright (C) 2011-2019 Leibniz Supercomputing Centre
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Lesser General Public
// License as published by the Free Software Foundation; either
// version 2.1 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
// Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public
// License along with this library; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
//================================================================================
27

28
29
/**
 * @page libdcdb libdcdb
Michael Ott's avatar
Michael Ott committed
30
 * The libdcdb library is a dynamic runtime library providing
31
32
33
34
 * functions to initialize and access the DCDB data store. It
 * is being used by the CollectAgent to handle insertion of
 * data and can be used by tools responsible for data analysis.
 *
35
36
37
 * Its main class is the DCDB::SensorDataStore class which
 * provides functions to connect to the data store, initialize
 * an empty data base and to retrieve data.
38
 *
39
40
41
42
 * For its internal handling, DCDB::SensorDataStore relies on
 * the DCDB::SensorDataStoreImpl class (which hides all private
 * member functions belonging to the SensorDataStore class from
 * the header that is used by programmers who link against this
43
44
45
46
 * library). Raw database functionality is abstracted into the
 * CassandraBackend class (to easy switching to other
 * key-value style databases in the future).
 *
47
 * To use the library in your client application, simply
48
49
50
51
 * include the sensordatastore.h header file and initialize
 * an object of the SensorDataStore class.
 */

Axel Auweter's avatar
Axel Auweter committed
52
53
#include <string>
#include <iostream>
54
55
#include <cstdint>
#include <cinttypes>
Axel Auweter's avatar
Axel Auweter committed
56

57
#include "cassandra.h"
Axel Auweter's avatar
Axel Auweter committed
58

59
#include "dcdb/sensordatastore.h"
60
#include "sensordatastore_internal.h"
61
#include "dcdb/connection.h"
62
63
64
#include "dcdbglobals.h"

#include "dcdbendian.h"
65

66
67
using namespace DCDB;

68
69
70
SensorDataStoreReading::SensorDataStoreReading() {
}

71
SensorDataStoreReading::SensorDataStoreReading(const SensorId& sid, const uint64_t ts, const int64_t value) {
72
73
74
75
76
77
78
79
  this->sensorId = sid;
  this->timeStamp = TimeStamp(ts);
  this->value = value;
}

SensorDataStoreReading::~SensorDataStoreReading() {
}

80
81
82
83
84
85
/**
 * @details
 * Since we want high-performance inserts, we prepare the
 * insert CQL query in advance and only bind it on the actual
 * insert.
 */
86
void SensorDataStoreImpl::prepareInsert(const uint64_t ttl)
87
{
88
89
90
91
92
93
94
95
    CassError rc = CASS_OK;
    CassFuture* future = NULL;
    const char* query;
    
    /*
    * Free the old prepared if necessary.
    */
    if (preparedInsert) {
96
      cass_prepared_free(preparedInsert);
97
98
99
100
101
102
103
104
    }
    
    query = "INSERT INTO dcdb.sensordata (sid, ws, ts, value) VALUES (?, ?, ?, ?) USING TTL ? ;";
    future = cass_session_prepare(session, query);
    cass_future_wait(future);
    
    rc = cass_future_error_code(future);
    if (rc != CASS_OK) {
105
      connection->printError(future);
106
    } else {
107
      preparedInsert = cass_future_get_prepared(future);
108
109
    }
    cass_future_free(future);
110

111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
    // Preparing the insert statement without a TTL clause
    if (preparedInsert_noTTL) {
        cass_prepared_free(preparedInsert_noTTL);
    }
    
    query = "INSERT INTO dcdb.sensordata (sid, ws, ts, value) VALUES (?, ?, ?, ?);";
    future = cass_session_prepare(session, query);
    cass_future_wait(future);
    
    rc = cass_future_error_code(future);
    if (rc != CASS_OK) {
        connection->printError(future);
    } else {
        preparedInsert_noTTL = cass_future_get_prepared(future);
    }
    cass_future_free(future);
        
    defaultTTL = ttl;
129
130
}

131
132
133
134
135
136
137
138
139
140
141
142
143
144
/**
 * @details
 * To insert a sensor reading, the Rsvd field of the SensorId must
 * be filled with a time component that ensures that the maximum
 * number of 2^32 columns per key is not exceeded while still
 * allowing relatively easy retrieval of data.
 *
 * We achieve this by using a "week-stamp" (i.e. number of weeks
 * since Unix epoch) within the Rsvd field of the SensorId before
 * calling the Cassandra Backend to do the raw insert.
 *
 * Applications should not call this function directly, but
 * use the insert function provided by the SensorDataStore class.
 */
145
void SensorDataStoreImpl::insert(const SensorId& sid, const uint64_t ts, const int64_t value, const int64_t ttl)
146
{
147
#if 0
148
149
150
  std::cout << "Inserting@SensorDataStoreImpl (" << sid->raw[0] << " " << sid->raw[1] << ", " << ts << ", " << value << ")" << std::endl;
#endif

151
  /* Calculate and insert week number */
152
  uint16_t week = ts / 604800000000000;
153
  int64_t ttlReal = (ttl<0 ? defaultTTL : ttl);
154
  std::string sidStr = sid.getId();
155
  
156
  CassStatement* statement = cass_prepared_bind(ttlReal<=0 ? preparedInsert_noTTL : preparedInsert);
157
  cass_statement_bind_string_by_name(statement, "sid", sidStr.c_str());
158
  cass_statement_bind_int16_by_name(statement, "ws", week);
159
160
  cass_statement_bind_int64_by_name(statement, "ts", ts);
  cass_statement_bind_int64_by_name(statement, "value", value);
161
162
  if(ttlReal>0)
      cass_statement_bind_int32(statement, 4, ttlReal);
163
  
164
165
  CassFuture* future = cass_session_execute(session, statement);
  cass_statement_free(statement);
166

167
  /* Don't wait for the future, just free it to make the call truly asynchronous */
168
  cass_future_free(future);
169
170
}

171
172
void SensorDataStoreImpl::insert(const SensorDataStoreReading& reading, const int64_t ttl) {
  insert(reading.sensorId, reading.timeStamp.getRaw(), reading.value, ttl);
173
174
}

175
void SensorDataStoreImpl::insertBatch(const std::list<SensorDataStoreReading>& readings, const int64_t ttl) {
176
177
  CassBatch* batch = cass_batch_new(CASS_BATCH_TYPE_UNLOGGED);
  
178
179
  int64_t ttlReal = (ttl<0 ? defaultTTL : ttl);
  
180
  for (auto r: readings) {
181
    /* Calculate week number */
182
183
184
    uint16_t week = r.timeStamp.getRaw() / 604800000000000;
    
    /* Add insert statement to batch */
185
    CassStatement* statement = cass_prepared_bind(ttlReal<=0 ? preparedInsert_noTTL : preparedInsert);
186
187
    cass_statement_bind_string_by_name(statement, "sid", r.sensorId.getId().c_str());
    cass_statement_bind_int16_by_name(statement, "ws", week);
188
189
    cass_statement_bind_int64_by_name(statement, "ts", r.timeStamp.getRaw());
    cass_statement_bind_int64_by_name(statement, "value", r.value);
190
191
    if(ttlReal>0)
        cass_statement_bind_int32(statement, 4, ttlReal);
192
193
194
195
196
197
198
    cass_batch_add_statement(batch, statement);
    cass_statement_free(statement);
  }
  
  /* Execute batch */
  CassFuture *future = cass_session_execute_batch(session, batch);
  cass_batch_free(batch);
Alessio Netti's avatar
Alessio Netti committed
199
200
201

  if(debugLog)
    cass_future_set_callback(future, DataStoreImpl_on_result, NULL);
202
  
203
  /* Don't wait for the future, just free it to make the call truly asynchronous */
204
205
206
  cass_future_free(future);
}

207
208
/**
 * @details
209
210
 * This function updates the prepared statement for inserts
 * with the new TTL value.
211
 */
212
void SensorDataStoreImpl::setTTL(const uint64_t ttl)
213
{
214
  prepareInsert(ttl);
215
216
}

Alessio Netti's avatar
Alessio Netti committed
217
218
219
220
/**
 * @brief Enables or disables logging of Cassandra insert errors
 * @param dl        true to enable logging, false otherwise
 */
221
void SensorDataStoreImpl::setDebugLog(const bool dl)
Alessio Netti's avatar
Alessio Netti committed
222
223
224
225
{
    debugLog = dl;
}

Axel Auweter's avatar
Axel Auweter committed
226
227
228
229
/**
 * @details
 * This function issues a regular query to the data store
 * and creates a SensorDataStoreReading object for each
230
 * entry which is stored in the result list.
Axel Auweter's avatar
Axel Auweter committed
231
 */
232
void SensorDataStoreImpl::query(std::list<SensorDataStoreReading>& result, const SensorId& sid, const TimeStamp& start, const TimeStamp& end, const QueryAggregate aggregate) {
Axel Auweter's avatar
Axel Auweter committed
233
234
235
236
  CassError rc = CASS_OK;
  CassStatement* statement = NULL;
  CassFuture *future = NULL;
  const CassPrepared* prepared = nullptr;
237
238
239
240
241
242
243
  
  std::string query = std::string("SELECT ts,");
  if (aggregate == AGGREGATE_NONE) {
    query.append("value");
  } else {
    query.append(AggregateString[aggregate] + std::string("(value) as value"));
  }
244
  query.append(" FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE sid = ? AND ws = ? AND ts >= ? AND ts <= ? ;");
245
  future = cass_session_prepare(session, query.c_str());
Axel Auweter's avatar
Axel Auweter committed
246
247
248
249
250
251
252
253
254
255
256
  cass_future_wait(future);

  rc = cass_future_error_code(future);
  if (rc != CASS_OK) {
    connection->printError(future);
    cass_future_free(future);
    return;
  }

  prepared = cass_future_get_prepared(future);
  cass_future_free(future);
257
  
258
#if 0
259
  std::cout << "Query: " << query << std::endl << "sid: " << sid.toString() << " ts1: " << start.getRaw() << " ts2: " << end.getRaw() << std::endl;
260
#endif
261

Axel Auweter's avatar
Axel Auweter committed
262
  statement = cass_prepared_bind(prepared);
263
  cass_statement_set_paging_size(statement, PAGING_SIZE);
264
265
266
267
268
  cass_statement_bind_string(statement, 0, sid.getId().c_str());
  cass_statement_bind_int16(statement, 1, sid.getRsvd());
  cass_statement_bind_int64(statement, 2, start.getRaw());
  cass_statement_bind_int64(statement, 3, end.getRaw());
  
269
270
271
272
  bool morePages = false;
  do {
      future = cass_session_execute(session, statement);
      cass_future_wait(future);
Axel Auweter's avatar
Axel Auweter committed
273

274
275
276
      if (cass_future_error_code(future) == CASS_OK) {
          const CassResult *cresult = cass_future_get_result(future);
          CassIterator *rows = cass_iterator_from_result(cresult);
Axel Auweter's avatar
Axel Auweter committed
277

278
          SensorDataStoreReading entry;
279

280
281
          while (cass_iterator_next(rows)) {
              const CassRow *row = cass_iterator_get_row(rows);
Axel Auweter's avatar
Axel Auweter committed
282

283
284
285
              cass_int64_t ts, value;
              cass_value_get_int64(cass_row_get_column_by_name(row, "ts"), &ts);
              cass_value_get_int64(cass_row_get_column_by_name(row, "value"), &value);
Axel Auweter's avatar
Axel Auweter committed
286

287
288
289
              entry.sensorId = sid;
              entry.timeStamp = (uint64_t) ts;
              entry.value = (int64_t) value;
Axel Auweter's avatar
Axel Auweter committed
290

291
              result.push_back(entry);
Axel Auweter's avatar
Axel Auweter committed
292
#if 0
293
294
295
296
297
298
299
300
301
              if (localtime) {
                  t.convertToLocal();
              }
              if (raw) {
                  std::cout << sensorName << "," << std::dec << t.getRaw() << "," << std::dec << value << std::endl;
              }
              else {
                  std::cout << sensorName << "," << t.getString() << "," << std::dec << value << std::endl;
              }
Axel Auweter's avatar
Axel Auweter committed
302
#endif
303
304
305
306
307
308
309
310
311
          }
          
          if((morePages = cass_result_has_more_pages(cresult)))
              cass_statement_set_paging_state(statement, cresult);

          cass_iterator_free(rows);
          cass_result_free(cresult);
      } else {
          morePages = false;
Axel Auweter's avatar
Axel Auweter committed
312
      }
313
314
315
316
      
      cass_future_free(future);
  } 
  while(morePages);
Axel Auweter's avatar
Axel Auweter committed
317
318
319

  cass_statement_free(statement);
  cass_prepared_free(prepared);
320
  result.reverse();
Axel Auweter's avatar
Axel Auweter committed
321
322
}

323
324
325
326
327
328
329
/**
 * @details
 * This function issues a regular query to the data store,
 * queries an arbitrary number of sensors simultaneously
 * and creates a SensorDataStoreReading object for each
 * entry which is stored in the result list.
 */
330
void SensorDataStoreImpl::query(std::list<SensorDataStoreReading>& result, const std::list<SensorId>& sids, const TimeStamp& start, const TimeStamp& end, const QueryAggregate aggregate, const bool storeSids) {
331
332
333
334
335
336
337
338
    if(sids.empty())
        return;
    
    CassError rc = CASS_OK;
    CassStatement* statement = NULL;
    CassFuture *future = NULL;
    const CassPrepared* prepared = nullptr;

339
    std::string query = std::string(storeSids ? "SELECT sid,ts," : "SELECT ts,");
340
341
342
343
344
    if (aggregate == AGGREGATE_NONE) {
        query.append("value");
    } else {
        query.append(AggregateString[aggregate] + std::string("(value) as value"));
    }
345
    query.append(" FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE sid = ? AND ws = ? AND ts >= ? AND ts <= ? ;");
346
347
348
349
350
351
352
353
354
355
356
357
358
    future = cass_session_prepare(session, query.c_str());
    cass_future_wait(future);

    rc = cass_future_error_code(future);
    if (rc != CASS_OK) {
        connection->printError(future);
        cass_future_free(future);
        return;
    }

    prepared = cass_future_get_prepared(future);
    cass_future_free(future);

359
360
    // Paged asynchronous queries require keeping track of statements
    std::list<std::pair<CassStatement*, CassFuture*>> futures;
361
362
363
    auto sidIt = sids.begin();
    size_t sidCtr = 0;

364
365
    // Limiting the amount of concurrent requests with small queues
    uint32_t realGroupLimit = connection->getQueueSizeIo()/10 < QUERY_GROUP_LIMIT ? connection->getQueueSizeIo()/10 : QUERY_GROUP_LIMIT;
366

367
368
    while(sidIt != sids.end()) {
        futures.clear();
369
        sidCtr = 0;
370
        
371
        // Breaking up the original list of sids in chunks
372
373
374
375
376
377
378
379
380
        while(sidIt != sids.end() && sidCtr < realGroupLimit) {
            statement = cass_prepared_bind(prepared);
            cass_statement_set_paging_size(statement, PAGING_SIZE);
            cass_statement_bind_string(statement, 0, sidIt->getId().c_str());
            cass_statement_bind_int16(statement, 1, sids.front().getRsvd());
            cass_statement_bind_int64(statement, 2, start.getRaw());
            cass_statement_bind_int64(statement, 3, end.getRaw());
            futures.push_back(std::pair<CassStatement*, CassFuture*>(statement, cass_session_execute(session, statement)));
            
381
382
383
384
            ++sidIt;
            ++sidCtr;
        }

385
        do {
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
            // Keeps track of outstanding pages from current queries
            std::list<std::pair<CassStatement *, CassFuture *>> futurePages;
            for (auto &fut : futures) {
                bool morePages = false;
                cass_future_wait(fut.second);

                if (cass_future_error_code(fut.second) == CASS_OK) {
                    const CassResult *cresult = cass_future_get_result(fut.second);
                    CassIterator *rows = cass_iterator_from_result(cresult);
                    SensorDataStoreReading entry;
                    cass_int64_t ts, value;
                    const char *name;
                    size_t name_len;

                    while (cass_iterator_next(rows)) {
                        const CassRow *row = cass_iterator_get_row(rows);

                        cass_value_get_int64(cass_row_get_column_by_name(row, "ts"), &ts);
                        cass_value_get_int64(cass_row_get_column_by_name(row, "value"), &value);
                        entry.timeStamp = (uint64_t) ts;
                        entry.value = (int64_t) value;

                        if (storeSids) {
                            cass_value_get_string(cass_row_get_column_by_name(row, "sid"), &name, &name_len);
                            entry.sensorId = SensorId(std::string(name, name_len));
                        }

                        result.push_back(entry);
414
#if 0
415
416
417
418
419
420
421
422
423
                        if (localtime) {
                          t.convertToLocal();
                      }
                      if (raw) {
                          std::cout << sensorName << "," << std::dec << t.getRaw() << "," << std::dec << value << std::endl;
                      }
                      else {
                          std::cout << sensorName << "," << t.getString() << "," << std::dec << value << std::endl;
                      }
424
#endif
425
426
427
428
429
430
431
432
433
434
435
436
437
438
                    }
                    
                    if ((morePages = cass_result_has_more_pages(cresult))) {
                        cass_statement_set_paging_state(fut.first, cresult);
                        futurePages.push_back(std::pair<CassStatement *, CassFuture *>(fut.first, cass_session_execute(session, fut.first)));
                    } else {
                        cass_statement_free(fut.first);
                    }

                    cass_iterator_free(rows);
                    cass_result_free(cresult);
                } else {
                    cass_statement_free(fut.first);
                    morePages = false;
439
                }
440
                cass_future_free(fut.second);
441
            }
442
443
444
445
            futures.clear();
            futures = futurePages;
            futurePages.clear();
        } while(!futures.empty());
446
447
448
    }
    
    cass_prepared_free(prepared);
449
    result.reverse();
450
451
}

452
453
454
455
456
457
/**
 * @details
 * This function performs a fuzzy query on the datastore,
 * picking a single sensor readings that is closest to
 * the one given as input
 */
458
void SensorDataStoreImpl::fuzzyQuery(std::list<SensorDataStoreReading>& result, const SensorId& sid, const TimeStamp& ts, const uint64_t tol_ns) {
459
    /* Find the readings before time t */
460
461
462
463
    CassError rc = CASS_OK;
    CassStatement* statement = NULL;
    CassFuture *future = NULL;
    const CassPrepared* prepared = nullptr;
464
465
    const char* queryBefore = "SELECT sid,ts,value FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE sid = ? AND ws = ? AND ts <= ? ORDER BY ws DESC, ts DESC LIMIT 1";

466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
    future = cass_session_prepare(session, queryBefore);
    cass_future_wait(future);

    rc = cass_future_error_code(future);
    if (rc != CASS_OK) {
        connection->printError(future);
        cass_future_free(future);
        return;
    }

    prepared = cass_future_get_prepared(future);
    cass_future_free(future);

    statement = cass_prepared_bind(prepared);
    cass_statement_bind_string(statement, 0, sid.getId().c_str());
    cass_statement_bind_int16(statement, 1, sid.getRsvd());
    cass_statement_bind_int64(statement, 2, ts.getRaw());

    future = cass_session_execute(session, statement);
    cass_future_wait(future);
    SensorDataStoreReading r;
487

488
489
490
    if (cass_future_error_code(future) == CASS_OK) {
        const CassResult* cresult = cass_future_get_result(future);
        CassIterator* rows = cass_iterator_from_result(cresult);
491

492
493
494
495
496
497
498
        while (cass_iterator_next(rows)) {
            const CassRow* row = cass_iterator_get_row(rows);

            cass_int64_t tsInt, value;
            cass_value_get_int64(cass_row_get_column_by_name(row, "ts"), &tsInt);
            cass_value_get_int64(cass_row_get_column_by_name(row, "value"), &value);

499
            if(ts.getRaw() - (uint64_t)tsInt < tol_ns) {
500
501
502
                r.sensorId = sid;
                r.timeStamp = (uint64_t) tsInt;
                r.value = (int64_t) value;
503
                result.push_back(r);
504
505
506
507
508
509
510
511
512
            }
        }
        cass_iterator_free(rows);
        cass_result_free(cresult);
    }

    cass_statement_free(statement);
    cass_future_free(future);
    cass_prepared_free(prepared);
513
    result.reverse();
514
}
515

516
517
518
519
520
521
/**
 * @details
 * This function performs a fuzzy query on the datastore,
 * picking readings from a set of sensors that are closest to
 * the timestamp given as input
 */
522
void SensorDataStoreImpl::fuzzyQuery(std::list<SensorDataStoreReading>& result, const std::list<SensorId>& sids, const TimeStamp& ts, const uint64_t tol_ns, const bool storeSids) {
523
524
525
526
527
528
529
530
    if(sids.empty())
        return;

    /* Find the readings before time t */
    CassError rc = CASS_OK;
    CassStatement *statement = NULL;
    CassFuture *future = NULL;
    const CassPrepared *prepared = nullptr;
Alessio Netti's avatar
Alessio Netti committed
531
532
    const char *queryBefore;
    if(storeSids) 
533
        queryBefore = "SELECT sid,ts,value FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE sid = ? AND ws = ? AND ts <= ? ORDER BY ws DESC, ts DESC LIMIT 1";
Alessio Netti's avatar
Alessio Netti committed
534
    else
535
        queryBefore = "SELECT ts,value FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE sid = ? AND ws = ? AND ts <= ? ORDER BY ws DESC, ts DESC LIMIT 1";
536
537

    future = cass_session_prepare(session, queryBefore);
538
539
540
541
542
543
544
545
546
547
548
    cass_future_wait(future);

    rc = cass_future_error_code(future);
    if (rc != CASS_OK) {
        connection->printError(future);
        cass_future_free(future);
        return;
    }

    prepared = cass_future_get_prepared(future);
    cass_future_free(future);
549
    
550
    std::list<CassFuture*> futures;
551
552
553
    auto sidIt = sids.begin();
    size_t sidCtr = 0;
    
554
555
556
    // Limiting the amount of concurrent requests with small queues
    uint32_t realGroupLimit = connection->getQueueSizeIo()/10 < QUERY_GROUP_LIMIT ? connection->getQueueSizeIo()/10 : QUERY_GROUP_LIMIT;
    
557
558
    while(sidIt != sids.end()) {
        sidCtr = 0;
559
560
        futures.clear();
        
561
        // Breaking up the original list of sids in chunks
562
563
564
565
566
567
568
569
570
        while(sidIt != sids.end() && sidCtr < realGroupLimit) {
            statement = cass_prepared_bind(prepared);
            cass_statement_set_paging_size(statement, -1);
            cass_statement_bind_string(statement, 0, sidIt->getId().c_str());
            cass_statement_bind_int16(statement, 1, sids.front().getRsvd());
            cass_statement_bind_int64(statement, 2, ts.getRaw());
            futures.push_back(cass_session_execute(session, statement));
            cass_statement_free(statement);
            
571
572
573
            ++sidIt;
            ++sidCtr;
        }
574
        
575
        SensorDataStoreReading r;
576
577
578
579
580
581
582
583
        for(auto& fut : futures) {
            cass_future_wait(fut);
            if (cass_future_error_code(fut) == CASS_OK) {
                const CassResult *cresult = cass_future_get_result(fut);
                CassIterator *rows = cass_iterator_from_result(cresult);
                cass_int64_t tsInt, value;
                const char *name;
                size_t name_len;
584

585
586
                while (cass_iterator_next(rows)) {
                    const CassRow *row = cass_iterator_get_row(rows);
587

588
589
                    cass_value_get_int64(cass_row_get_column_by_name(row, "ts"), &tsInt);
                    cass_value_get_int64(cass_row_get_column_by_name(row, "value"), &value);
590

591
592
593
594
595
596
597
598
                    if (ts.getRaw() - (uint64_t) tsInt < tol_ns) {
                        if(storeSids) {
                            cass_value_get_string(cass_row_get_column_by_name(row, "sid"), &name, &name_len);
                            r.sensorId = SensorId(std::string(name, name_len));
                        }
                        r.timeStamp = (uint64_t) tsInt;
                        r.value = (int64_t) value;
                        result.push_back(r);
599
                    }
600
                }
601
602
603
                
                cass_iterator_free(rows);
                cass_result_free(cresult);
604
            }
605
            cass_future_free(fut);
606
607
        }
    }
608
    cass_prepared_free(prepared);
609
    result.reverse();
610
611
}

612

613
614
615
616
617
/**
 * @details
 * This function issues a regular query to the data store
 * and calls cbFunc for every reading.
 */
618
void SensorDataStoreImpl::queryCB(SensorDataStore::QueryCbFunc cbFunc, void* userData, const SensorId& sid, const TimeStamp& start, const TimeStamp& end, const QueryAggregate aggregate)
619
620
621
622
623
{
  CassError rc = CASS_OK;
  CassStatement* statement = NULL;
  CassFuture *future = NULL;
  const CassPrepared* prepared = nullptr;
624
625
626
627
628
629
630
  
  std::string query = std::string("SELECT ts,");
  if (aggregate == AGGREGATE_NONE) {
    query.append("value");
  } else {
    query.append(AggregateString[aggregate] + std::string("(value) as value"));
  }
631
  query.append(" FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE sid = ? AND ws = ? AND ts >= ? AND ts <= ? ;");
632
  future = cass_session_prepare(session, query.c_str());
633
634
635
636
637
638
639
640
641
642
643
  cass_future_wait(future);

  rc = cass_future_error_code(future);
  if (rc != CASS_OK) {
    connection->printError(future);
    cass_future_free(future);
    return;
  }

  prepared = cass_future_get_prepared(future);
  cass_future_free(future);
644
  
645
  statement = cass_prepared_bind(prepared);
646
  cass_statement_set_paging_size(statement, PAGING_SIZE);
647
648
649
650
  cass_statement_bind_string(statement, 0, sid.getId().c_str());
  cass_statement_bind_int16(statement, 1, sid.getRsvd());
  cass_statement_bind_int64(statement, 2, start.getRaw());
  cass_statement_bind_int64(statement, 3, end.getRaw());
651

652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
  bool morePages = false;
  do {
      future = cass_session_execute(session, statement);
      cass_future_wait(future);
    
      if (cass_future_error_code(future) == CASS_OK) {
          const CassResult* cresult = cass_future_get_result(future);
          CassIterator* rows = cass_iterator_from_result(cresult);
    
          SensorDataStoreReading entry;
    
          while (cass_iterator_next(rows)) {
              const CassRow* row = cass_iterator_get_row(rows);
    
              cass_int64_t ts, value;
              cass_value_get_int64(cass_row_get_column_by_name(row, "ts"), &ts);
              cass_value_get_int64(cass_row_get_column_by_name(row, "value"), &value);
    
              entry.sensorId = sid;
              entry.timeStamp = (uint64_t)ts;
              entry.value = (int64_t)value;
    
              cbFunc(entry, userData);
          }
676

677
678
679
680
681
682
683
684
          if((morePages = cass_result_has_more_pages(cresult)))
              cass_statement_set_paging_state(statement, cresult);
          
          cass_iterator_free(rows);
          cass_result_free(cresult);
      } else {
          morePages = false;
      }
685

686
      cass_future_free(future);
687
688

  }
689
  while(morePages);
690
691
692
693
694

  cass_statement_free(statement);
  cass_prepared_free(prepared);
}

695
696
697
698
699
/**
 * @details
 * This function deletes all data from the sensordata store
 * that is older than weekStamp-1 weeks.
 */
700
void SensorDataStoreImpl::truncBeforeWeek(const uint16_t weekStamp)
701
702
703
704
705
706
707
708
{
  /* List of rows that should be deleted */
  std::list<SensorId> deleteList;

  /* Query the database to collect all rows */
  CassError rc = CASS_OK;
  CassStatement* statement = nullptr;
  CassFuture* future = nullptr;
709
  const char* query = "SELECT DISTINCT sid,ws FROM " KEYSPACE_NAME "." CF_SENSORDATA ";";
710
711
712
713
714
715
716
717

  statement = cass_statement_new(query, 0);
  future = cass_session_execute(session, statement);
  cass_future_wait(future);

  rc = cass_future_error_code(future);
  if (rc != CASS_OK) {
      connection->printError(future);
718
      cass_future_free(future);
719
720
721
722
723
724
725
726
727
728
729
      return;
  }

  const CassResult* result = cass_future_get_result(future);
  cass_future_free(future);

  CassIterator* iterator = cass_iterator_from_result(result);

  /* Iterate over all rows and filter out those, that are too old */
  while (cass_iterator_next(iterator)) {
      const CassRow* row = cass_iterator_get_row(iterator);
730
      const char* res;
731
      size_t       res_len;
732
733
734
735
      int16_t     res_ws;
      cass_value_get_string(cass_row_get_column_by_name(row, "sid"), &res, &res_len);
      cass_value_get_int16(cass_row_get_column_by_name(row, "ws"), &res_ws);
      
736
      SensorId sensor;
737
738
739
      std::string id(res, res_len);
      sensor.setId(id);
      sensor.setRsvd(res_ws);
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759

      /* Check if the sensorId's rsvd field is smaller than the weekStamp */
      if (sensor.getRsvd() < weekStamp) {
          deleteList.push_back(sensor);
      }
  }
  cass_result_free(result);
  cass_iterator_free(iterator);
  cass_statement_free(statement);

  /* Now iterate over all entries in the deleteList and delete them */
  for (std::list<SensorId>::iterator it = deleteList.begin(); it != deleteList.end(); it++) {
      deleteRow(*it);
  }
}

/**
 * @details
 * Deleting entire rows is rather efficient compared to deleting individual columns.
 */
760
void SensorDataStoreImpl::deleteRow(const SensorId& sid)
761
762
763
764
765
{
  CassError rc = CASS_OK;
  CassStatement* statement = NULL;
  CassFuture *future = NULL;
  const CassPrepared* prepared = nullptr;
766
  const char* query = "DELETE FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE sid = ? and ws = ?;";
767
768
769
770
771
772
773
774
775
776
777
778
779

  future = cass_session_prepare(session, query);
  cass_future_wait(future);

  rc = cass_future_error_code(future);
  if (rc != CASS_OK) {
    connection->printError(future);
    cass_future_free(future);
    return;
  }

  prepared = cass_future_get_prepared(future);
  cass_future_free(future);
780
  
781
  statement = cass_prepared_bind(prepared);
782
783
  cass_statement_bind_string(statement, 0, sid.getId().c_str());
  cass_statement_bind_int16(statement, 1, sid.getRsvd());
784
785
786
787
788
789
790
791
792

  future = cass_session_execute(session, statement);
  cass_future_wait(future);

  cass_statement_free(statement);
  cass_future_free(future);
  cass_prepared_free(prepared);
}

793
794
/**
 * @details
795
 * This constructor sets the internal connection variable to
796
 * the externally provided Connection object and also
797
 * retrieves the CassSession pointer of the connection.
798
 */
799
SensorDataStoreImpl::SensorDataStoreImpl(Connection* conn)
800
{
801
802
  connection = conn;
  session = connection->getSessionHandle();
Alessio Netti's avatar
Alessio Netti committed
803
  debugLog = false;
804
  defaultTTL = 0;
Axel Auweter's avatar
Axel Auweter committed
805
806

  preparedInsert = nullptr;
807
808
  preparedInsert_noTTL = nullptr;
  prepareInsert(defaultTTL);
809
810
}

811
812
/**
 * @details
813
 * Due to the simplicity of the class, the destructor is left empty.
814
 */
815
SensorDataStoreImpl::~SensorDataStoreImpl()
816
{
817
  connection = nullptr;
Axel Auweter's avatar
Axel Auweter committed
818
819
820
821
  session = nullptr;
  if (preparedInsert) {
      cass_prepared_free(preparedInsert);
  }
822
823
824
  if (preparedInsert_noTTL) {
      cass_prepared_free(preparedInsert_noTTL);
  }
825
826
}

827
828
829
830
831
832
/**
 * @details
 * Instead of doing the actual work, this function simply
 * forwards to the insert function of the SensorDataStoreImpl
 * class.
 */
833
void SensorDataStore::insert(const SensorId& sid, const uint64_t ts, const int64_t value, const int64_t ttl)
834
{
835
    impl->insert(sid, ts, value, ttl);
836
837
}

838
void SensorDataStore::insert(const SensorDataStoreReading& reading, const int64_t ttl)
839
{
840
  impl->insert(reading, ttl);
841
842
}

843
void SensorDataStore::insertBatch(const std::list<SensorDataStoreReading>& readings, const int64_t ttl) {
844
  impl->insertBatch(readings, ttl);
845
846
}

847
848
/**
 * @details
849
850
851
 * Instead of doing the actual work, this function simply
 * forwards to the insert function of the SensorDataStoreImpl
 * class.
852
 */
853
void SensorDataStore::setTTL(const uint64_t ttl)
854
{
855
    impl->setTTL(ttl);
856
857
}

Alessio Netti's avatar
Alessio Netti committed
858
859
860
861
/**
 * @brief Enables or disables logging of Cassandra insert errors
 * @param dl        true to enable logging, false otherwise
 */
862
void SensorDataStore::setDebugLog(const bool dl)
Alessio Netti's avatar
Alessio Netti committed
863
864
865
866
{
    impl->setDebugLog(dl);
}

Axel Auweter's avatar
Axel Auweter committed
867
868
869
870
871
872
/**
 * @details
 * Instead of doing the actual work, this function simply
 * forwards to the insert function of the SensorDataStoreImpl
 * class.
 */
873
void SensorDataStore::query(std::list<SensorDataStoreReading>& result, const SensorId& sid, const TimeStamp& start, const TimeStamp& end, const QueryAggregate aggregate)
Axel Auweter's avatar
Axel Auweter committed
874
{
875
    impl->query(result, sid, start, end, aggregate);
Axel Auweter's avatar
Axel Auweter committed
876
877
}

878
879
880
881
882
883
/**
 * @details
 * Instead of doing the actual work, this function simply
 * forwards to the insert function of the SensorDataStoreImpl
 * class.
 */
884
void SensorDataStore::query(std::list<SensorDataStoreReading>& result, const std::list<SensorId>& sids, const TimeStamp& start, const TimeStamp& end, const QueryAggregate aggregate, const bool storeSids)
885
{
886
    impl->query(result, sids, start, end, aggregate, storeSids);
887
888
}

889
890
891
892
893
894
/**
 * @details
 * Instead of doing the actual work, this function simply
 * forwards to the insert function of the SensorDataStoreImpl
 * class.
 */
895
void SensorDataStore::fuzzyQuery(std::list<SensorDataStoreReading>& result, const SensorId& sid, const TimeStamp& ts, const uint64_t tol_ns) {
896
897
898
    impl->fuzzyQuery(result, sid, ts, tol_ns);
}

899
900
901
902
903
904
/**
 * @details
 * Instead of doing the actual work, this function simply
 * forwards to the insert function of the SensorDataStoreImpl
 * class.
 */
905
void SensorDataStore::fuzzyQuery(std::list<SensorDataStoreReading>& result, const std::list<SensorId>& sids, const TimeStamp& ts, const uint64_t tol_ns, const bool storeSids) {
906
    impl->fuzzyQuery(result, sids, ts, tol_ns, storeSids);
907
908
}

909
910
911
912
913
914
/**
 * @details
 * Instead of doing the actual work, this function simply
 * forwards to the insert function of the SensorDataStoreImpl
 * class.
 */
915
void SensorDataStore::queryCB(SensorDataStore::QueryCbFunc cbFunc, void* userData, const SensorId& sid, const TimeStamp& start, const TimeStamp& end, const QueryAggregate aggregate)
916
{
917
    return impl->queryCB(cbFunc, userData, sid, start, end, aggregate);
918
919
}

920
921
922
923
924
925
/**
 * @details
 * Instead of doing the actual work, this function simply
 * forwards to the insert function of the SensorDataStoreImpl
 * class.
 */
926
void SensorDataStore::truncBeforeWeek(const uint16_t weekStamp)
927
928
929
930
{
    return impl->truncBeforeWeek(weekStamp);
}

931
932
/**
 * @details
933
934
 * This constructor allocates the implementation class which
 * holds the actual implementation of the class functionality.
935
 */
936
SensorDataStore::SensorDataStore(Connection* conn)
937
{
938
    impl = new SensorDataStoreImpl(conn);
939
940
}

941
942
943
944
945
/**
 * @details
 * The SensorDataStore desctructor deallocates the
 * SensorDataStoreImpl and CassandraBackend objects.
 */
946
947
SensorDataStore::~SensorDataStore()
{
948
949
950
  /* Clean up... */
  if (impl)
    delete impl;
951
}