sensordatastore.cpp 8.44 KB
Newer Older
1
2
3
4
5
6
7
/*
 * sensordatastore.cpp
 *
 *  Created on: Jul 24, 2013
 *      Author: Axel Auweter
 */

8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
 * @mainpage
 * The DCDBLib library is a dynamic runtime library providing
 * functions to initialize and access the DCDB data store. It
 * is being used by the CollectAgent to handle insertion of
 * data and can be used by tools responsible for data analysis.
 *
 * Its main class is the SensorDataStore class which provides
 * functions to connect to the data store, initialize an empty
 * data base and to (TODO) retrieve data.
 *
 * For its internal handling, SensorDataStore relies on the
 * SensorDataStoreImpl class (which hides all private member
 * functions belonging to the SensorDataStore class from the
 * header that is used by programmers who link against this
 * library). Raw database functionality is abstracted into the
 * CassandraBackend class (to easy switching to other
 * key-value style databases in the future).
 *
27
 * To use the library in your client application, simply
28
29
30
31
 * include the sensordatastore.h header file and initialize
 * an object of the SensorDataStore class.
 */

Axel Auweter's avatar
Axel Auweter committed
32
33
34
35
36
#include <string>
#include <iostream>

#include <stdint.h>

37
38
#include "sensordatastore_internal.h"

39
40
41
42
43
44
/**
 * @details
 * This function serializes a SensorId object into a
 * big-endian 128-bit character array represented as
 * std::string.
 */
Axel Auweter's avatar
Axel Auweter committed
45
std::string SensorDataStoreImpl::sidConvert(SensorId *sid)
46
47
{
    uint64_t ll[2];
48
49
    ll[0] = Endian::hostToBE(sid->raw[0]);
    ll[1] = Endian::hostToBE(sid->raw[1]);
Axel Auweter's avatar
Axel Auweter committed
50
    return std::string((char*)ll, 16);
51
52
}

53
54
55
56
57
58
59
60
61
62
63
64
65
/**
 * @details
 * The conversion of a MQTT message topic to a SensorId
 * object is performed by byte-wise scanning of the string,
 * and skipping of all characters except for those in the
 * range [0-9,a-f,A-F]. Each character is then turned from
 * hex string into its binary representation an OR'ed into
 * the 128-bit raw fields of the SensorId object.
 *
 * Applications should not call this function directly, but
 * use the topicToSid function provided by the
 * SensorDataStore class.
 */
Axel Auweter's avatar
Axel Auweter committed
66
bool SensorDataStoreImpl::topicToSid(SensorId* sid, std::string topic)
67
68
69
70
71
{
    uint64_t pos = 0;
    const char* buf = topic.c_str();
    sid->raw[0] = 0;
    sid->raw[1] = 0;
72
    while (*buf && pos < 128) {
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
        if (*buf >= '0' && *buf <= '9') {
            sid->raw[pos / 64] |= (((uint64_t)(*buf - '0')) << (60-(pos%64)));
            pos += 4;
        }
        else if (*buf >= 'A' && *buf <= 'F') {
            sid->raw[pos / 64] |= (((uint64_t)(*buf - 'A' + 0xa)) << (60-(pos%64)));
            pos += 4;
        }
        else if (*buf >= 'a' && *buf <= 'f') {
            sid->raw[pos / 64] |= (((uint64_t)(*buf - 'a' + 0xa)) << (60-(pos%64)));
            pos += 4;
        }
        buf++;
    }
    return pos == 128;
}

90
91
92
93
94
95
96
97
98
99
100
/**
 * @details
 * Initialization of the SensorDataStore happens in three
 * steps:
 *  - Establish Connection
 *  - Check if all keyspaces are present and create them if not
 *  - Check if all column families are present and create them if not
 *
 *  Applications should not call this function directly, but
 *  use the init function provideed by the SensorDataStore class.
 */
Axel Auweter's avatar
Axel Auweter committed
101
void SensorDataStoreImpl::init(std::string hostname, int port) {
102

103
104
105
106
  /*
   * Open the connection to the Cassandra database and
   * create the necessary keyspaces and column families.
   */
107
108
109
  if (!csBackend->connect(hostname, port)) {
      exit(EXIT_FAILURE);
  }
110

111
  /* Keyspace and column family for published sensors */
112
113
114
115
116
  /* FIXME: We should have a good way to determine the number of replicas here */
  if (!csBackend->existsKeyspace(CONFIG_KEYSPACE_NAME)) {
      std::cout << "Creating Keyspace " << CONFIG_KEYSPACE_NAME << "...\n";
      csBackend->createKeyspace(CONFIG_KEYSPACE_NAME, 1);
  }
117

118
  csBackend->selectKeyspace(CONFIG_KEYSPACE_NAME);
119

120
121
122
123
  if (!(csBackend->getActiveKeyspace().compare(CONFIG_KEYSPACE_NAME) == 0)) {
      std::cout << "Cannot select keyspace " << CONFIG_KEYSPACE_NAME << "\n";
      exit(EXIT_FAILURE);
  }
124

125
126
127
  if (!csBackend->existsColumnFamily(CF_PUBLISHEDSENSORS)) {
      std::cout << "Creating Column Familiy " CF_PUBLISHEDSENSORS "...\n";
      csBackend->createColumnFamily(CF_PUBLISHEDSENSORS,
Axel Auweter's avatar
Axel Auweter committed
128
          "name varchar, pattern varchar, scaling_factor double, unit varchar",
129
130
131
          "name",
          "COMPACT STORAGE AND CACHING = all");
  }
132

133
134
135
136
137
  /* Keyspace and column family for raw sensor data */
  if (!csBackend->existsKeyspace(KEYSPACE_NAME)) {
      std::cout << "Creating Keyspace " << KEYSPACE_NAME << "...\n";
      csBackend->createKeyspace(KEYSPACE_NAME, 1);
  }
138

139
  csBackend->selectKeyspace(KEYSPACE_NAME);
140

141
142
143
144
  if (!(csBackend->getActiveKeyspace().compare(KEYSPACE_NAME) == 0)) {
      std::cout << "Cannot select keyspace " << KEYSPACE_NAME << "\n";
      exit(EXIT_FAILURE);
  }
145

146
147
148
149
150
151
152
  if (!csBackend->existsColumnFamily(CF_SENSORDATA)) {
      std::cout << "Creating Column Familiy " CF_SENSORDATA "...\n";
      csBackend->createColumnFamily(CF_SENSORDATA,
          "sid blob, ts bigint, value bigint",
          "sid, ts",
          "COMPACT STORAGE");
  }
153
154
155

  /* Prepare for optimized insertions */
  csBackend->prepareInsert();
156
157
}

158
159
160
161
162
163
164
165
166
167
168
169
170
171
/**
 * @details
 * To insert a sensor reading, the Rsvd field of the SensorId must
 * be filled with a time component that ensures that the maximum
 * number of 2^32 columns per key is not exceeded while still
 * allowing relatively easy retrieval of data.
 *
 * We achieve this by using a "week-stamp" (i.e. number of weeks
 * since Unix epoch) within the Rsvd field of the SensorId before
 * calling the Cassandra Backend to do the raw insert.
 *
 * Applications should not call this function directly, but
 * use the insert function provided by the SensorDataStore class.
 */
172
173
void SensorDataStoreImpl::insert(SensorId* sid, uint64_t ts, uint64_t value)
{
174
#if 0
175
176
177
  std::cout << "Inserting@SensorDataStoreImpl (" << sid->raw[0] << " " << sid->raw[1] << ", " << ts << ", " << value << ")" << std::endl;
#endif

178
  /* Calculate and insert week number */
179
  uint16_t week = ts / 604800000000000;
180
181
182
  sid->dsid.rsvd = week;

  /* Insert into Cassandra */
183
  csBackend->insert(sidConvert(sid), ts, value);
184
185
}

186
187
188
189
190
/**
 * @details
 * This constructor only sets the internal csBackend variable to
 * the externally provided CassandraBackend object.
 */
191
SensorDataStoreImpl::SensorDataStoreImpl(CassandraBackend *csb)
192
{
193
  csBackend = csb;
194
195
}

196
197
198
199
/**
 * @details
 * Due to the simplicity of the class, the destructor is left empty.
 */
200
201
202
203
SensorDataStoreImpl::~SensorDataStoreImpl()
{
}

204
205
206
207
208
209
210
/**
 * @details
 * The main task of this function is to ensure that the
 * SensorDataStore object owns an instance of SensorDataStoreImpl.
 * Once this is ensured, the actual initialization work is
 * performed by the init function of SensorDataStoreImpl.
 */
Axel Auweter's avatar
Axel Auweter committed
211
void SensorDataStore::init(std::string hostname, int port)
212
213
214
{
    /* Allocate new SensorDataStoreImpl Object if necessary */
    if (!impl) {
215
        impl = new SensorDataStoreImpl(csBackend);
216
217
    }
    
218
    /* Call the Impl class init function */
219
220
221
    impl->init(hostname, port);
}

222
223
224
225
226
227
/**
 * @details
 * Instead of doing the actual work, this function simply
 * forwards to the insert function of the SensorDataStoreImpl
 * class.
 */
228
229
230
231
232
bool SensorDataStore::topicToSid(SensorId* sid, std::string topic)
{
    return impl->topicToSid(sid, topic);
}

233
234
235
236
237
238
/**
 * @details
 * Instead of doing the actual work, this function simply
 * forwards to the insert function of the SensorDataStoreImpl
 * class.
 */
239
240
241
242
243
void SensorDataStore::insert(SensorId* sid, uint64_t ts, uint64_t value)
{
    impl->insert(sid, ts, value);
}

244
245
246
247
248
249
/**
 * @details
 * This constructor for SensorDataStore acts similarly to the
 * standard constructor by calling the standard constructor
 * with "localhost" as the hostname and 9160 as the port number.
 */
250
251
SensorDataStore::SensorDataStore()
{
252
    csBackend = new CassandraBackend();
253
    impl = nullptr;
254
    init("localhost", 9042);
255
256
}

257
258
259
260
261
262
263
264
265
266
267
/**
 * @details
 * This constructor for SensorDataStore objects initializes
 * the object and connects to the database at the given
 * hostname on the given port.
 *
 * For this, the constructor allocates a CassandraBackend
 * object that is then used by SensorDataStoreImpl for doing
 * the raw database accesses.
 */
SensorDataStore::SensorDataStore(std::string hostname, int port)
268
{
269
    csBackend = new CassandraBackend();
270
271
272
273
    impl = nullptr;
    init(hostname, port);
}

274
275
276
277
278
/**
 * @details
 * The SensorDataStore desctructor deallocates the
 * SensorDataStoreImpl and CassandraBackend objects.
 */
279
280
SensorDataStore::~SensorDataStore()
{
281
282
283
284
285
286
  /* Clean up... */
  if (impl)
    delete impl;
  if (csBackend)
    delete csBackend;

287
}