sensordatastore.cpp 7.28 KB
Newer Older
1
2
3
4
5
6
7
/*
 * sensordatastore.cpp
 *
 *  Created on: Jul 24, 2013
 *      Author: Axel Auweter
 */

8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
 * @mainpage
 * The DCDBLib library is a dynamic runtime library providing
 * functions to initialize and access the DCDB data store. It
 * is being used by the CollectAgent to handle insertion of
 * data and can be used by tools responsible for data analysis.
 *
 * Its main class is the SensorDataStore class which provides
 * functions to connect to the data store, initialize an empty
 * data base and to (TODO) retrieve data.
 *
 * For its internal handling, SensorDataStore relies on the
 * SensorDataStoreImpl class (which hides all private member
 * functions belonging to the SensorDataStore class from the
 * header that is used by programmers who link against this
 * library). Raw database functionality is abstracted into the
 * CassandraBackend class (to easy switching to other
 * key-value style databases in the future).
 *
27
 * To use the library in your client application, simply
28
29
30
31
 * include the sensordatastore.h header file and initialize
 * an object of the SensorDataStore class.
 */

Axel Auweter's avatar
Axel Auweter committed
32
33
#include <string>
#include <iostream>
34
35
#include <cstdint>
#include <cinttypes>
Axel Auweter's avatar
Axel Auweter committed
36

37
#include "cassandra.h"
Axel Auweter's avatar
Axel Auweter committed
38

39
#include "sensordatastore.h"
40
#include "sensordatastore_internal.h"
41
42
43
44
#include "connection.h"
#include "dcdbglobals.h"

#include "dcdbendian.h"
45

46
47
48
49
50
51
/**
 * @details
 * This function serializes a SensorId object into a
 * big-endian 128-bit character array represented as
 * std::string.
 */
Axel Auweter's avatar
Axel Auweter committed
52
std::string SensorDataStoreImpl::sidConvert(SensorId *sid)
53
54
{
    uint64_t ll[2];
55
56
    ll[0] = Endian::hostToBE(sid->raw[0]);
    ll[1] = Endian::hostToBE(sid->raw[1]);
Axel Auweter's avatar
Axel Auweter committed
57
    return std::string((char*)ll, 16);
58
59
}

60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
/**
 * @details
 * Since we want high-performance inserts, we prepare the
 * insert CQL query in advance and only bind it on the actual
 * insert.
 */
void SensorDataStoreImpl::prepareInsert(uint64_t ttl)
{
  CassError rc = CASS_OK;
  CassFuture* future = NULL;
  const char* query;

  /*
   * Free the old prepared if necessary.
   */
  if (preparedInsert) {
      cass_prepared_free(preparedInsert);
  }

  char *queryBuf = NULL;
  if (ttl == 0) {
      query = "INSERT INTO dcdb.sensordata (sid, ts, value) VALUES (?, ?, ?);";
  }
  else {
      queryBuf = (char*)malloc(256);
      snprintf(queryBuf, 256, "INSERT INTO dcdb.sensordata (sid, ts, value) VALUES (?, ?, ?) USING TTL %" PRIu64 " ;", ttl);
      query = queryBuf;
  }

  future = cass_session_prepare(session, query);
  cass_future_wait(future);

  rc = cass_future_error_code(future);
  if (rc != CASS_OK) {
      connection->printError(future);
  } else {
      preparedInsert = cass_future_get_prepared(future);
  }

  cass_future_free(future);
  if (queryBuf) {
      free(queryBuf);
  }
}


106
107
108
109
110
111
112
113
114
115
116
117
118
/**
 * @details
 * The conversion of a MQTT message topic to a SensorId
 * object is performed by byte-wise scanning of the string,
 * and skipping of all characters except for those in the
 * range [0-9,a-f,A-F]. Each character is then turned from
 * hex string into its binary representation an OR'ed into
 * the 128-bit raw fields of the SensorId object.
 *
 * Applications should not call this function directly, but
 * use the topicToSid function provided by the
 * SensorDataStore class.
 */
Axel Auweter's avatar
Axel Auweter committed
119
bool SensorDataStoreImpl::topicToSid(SensorId* sid, std::string topic)
120
121
122
123
124
{
    uint64_t pos = 0;
    const char* buf = topic.c_str();
    sid->raw[0] = 0;
    sid->raw[1] = 0;
125
    while (*buf && pos < 128) {
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
        if (*buf >= '0' && *buf <= '9') {
            sid->raw[pos / 64] |= (((uint64_t)(*buf - '0')) << (60-(pos%64)));
            pos += 4;
        }
        else if (*buf >= 'A' && *buf <= 'F') {
            sid->raw[pos / 64] |= (((uint64_t)(*buf - 'A' + 0xa)) << (60-(pos%64)));
            pos += 4;
        }
        else if (*buf >= 'a' && *buf <= 'f') {
            sid->raw[pos / 64] |= (((uint64_t)(*buf - 'a' + 0xa)) << (60-(pos%64)));
            pos += 4;
        }
        buf++;
    }
    return pos == 128;
}

143
144
145
146
147
148
149
150
151
152
153
154
155
156
/**
 * @details
 * To insert a sensor reading, the Rsvd field of the SensorId must
 * be filled with a time component that ensures that the maximum
 * number of 2^32 columns per key is not exceeded while still
 * allowing relatively easy retrieval of data.
 *
 * We achieve this by using a "week-stamp" (i.e. number of weeks
 * since Unix epoch) within the Rsvd field of the SensorId before
 * calling the Cassandra Backend to do the raw insert.
 *
 * Applications should not call this function directly, but
 * use the insert function provided by the SensorDataStore class.
 */
157
158
void SensorDataStoreImpl::insert(SensorId* sid, uint64_t ts, uint64_t value)
{
159
#if 0
160
161
162
  std::cout << "Inserting@SensorDataStoreImpl (" << sid->raw[0] << " " << sid->raw[1] << ", " << ts << ", " << value << ")" << std::endl;
#endif

163
  /* Calculate and insert week number */
164
  uint16_t week = ts / 604800000000000;
165
166
167
  sid->dsid.rsvd = week;

  /* Insert into Cassandra */
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
  std::string key = sidConvert(sid);

  CassError rc = CASS_OK;
  CassStatement* statement = NULL;
  CassFuture *future = NULL;

  statement = cass_prepared_bind(preparedInsert);

  cass_statement_bind_bytes_by_name(statement, "sid", (cass_byte_t*)(key.c_str()), 16);
  cass_statement_bind_int64_by_name(statement, "ts", ts);
  cass_statement_bind_int64_by_name(statement, "value", value);

  future = cass_session_execute(session, statement);

  cass_future_wait(future);

  rc = cass_future_error_code(future);
  if (rc != CASS_OK) {
      connection->printError(future);
  }

  cass_future_free(future);
  cass_statement_free(statement);
191
192
}

193
194
/**
 * @details
195
196
 * This function updates the prepared statement for inserts
 * with the new TTL value.
197
 */
198
void SensorDataStoreImpl::setTTL(uint64_t ttl)
199
{
200
  prepareInsert(ttl);
201
202
}

203
204
/**
 * @details
205
206
207
 * This constructor sets the internal connection variable to
 * the externally provided DCDBConnection object and also
 * retrieves the CassSession pointer of the connection.
208
 */
209
SensorDataStoreImpl::SensorDataStoreImpl(DCDBConnection* conn)
210
{
211
212
  connection = conn;
  session = connection->getSessionHandle();
213
214
}

215
216
/**
 * @details
217
 * Due to the simplicity of the class, the destructor is left empty.
218
 */
219
SensorDataStoreImpl::~SensorDataStoreImpl()
220
{
221
  connection = nullptr;
222
223
}

224
225
226
227
228
229
/**
 * @details
 * Instead of doing the actual work, this function simply
 * forwards to the insert function of the SensorDataStoreImpl
 * class.
 */
230
231
232
233
234
bool SensorDataStore::topicToSid(SensorId* sid, std::string topic)
{
    return impl->topicToSid(sid, topic);
}

235
236
237
238
239
240
/**
 * @details
 * Instead of doing the actual work, this function simply
 * forwards to the insert function of the SensorDataStoreImpl
 * class.
 */
241
242
243
244
245
void SensorDataStore::insert(SensorId* sid, uint64_t ts, uint64_t value)
{
    impl->insert(sid, ts, value);
}

246
247
/**
 * @details
248
249
250
 * Instead of doing the actual work, this function simply
 * forwards to the insert function of the SensorDataStoreImpl
 * class.
251
 */
252
void SensorDataStore::setTTL(uint64_t ttl)
253
{
254
    impl->setTTL(ttl);
255
256
}

257
258
/**
 * @details
259
260
 * This constructor allocates the implementation class which
 * holds the actual implementation of the class functionality.
261
 */
262
SensorDataStore::SensorDataStore(DCDBConnection* conn)
263
{
264
    impl = new SensorDataStoreImpl(conn);
265
266
}

267
268
269
270
271
/**
 * @details
 * The SensorDataStore desctructor deallocates the
 * SensorDataStoreImpl and CassandraBackend objects.
 */
272
273
SensorDataStore::~SensorDataStore()
{
274
275
276
  /* Clean up... */
  if (impl)
    delete impl;
277
}