collectagent.cpp 9.22 KB
Newer Older
1
2
3
4
5
//============================================================================
// Name        : CollectAgent.cpp
// Author      : Axel Auweter
// Version     :
// Copyright   : Leibniz Supercomputing Centre
6
// Description : Main code of the CollectAgent
7
8
//============================================================================

9
10
#include <cstdlib>
#include <sstream>
11
12
13

#include <sys/socket.h>
#include <netinet/in.h>
14
#include <signal.h>
15
16
#include <sys/time.h>

17
#include <boost/date_time/posix_time/posix_time.hpp>
18

19
20
21
22
23
24
25
#include <thrift/Thrift.h>
#include <thrift/transport/TTransport.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/transport/TSocket.h>
#include <thrift/protocol/TProtocol.h>
#include <thrift/protocol/TBinaryProtocol.h>

26
#include "cassandra/Cassandra.h"
27
#include "simplemqttserver.h"
28

29
30
#include "mosquitto.h"

31
using namespace std;
32
33
34
35
36
using namespace apache::thrift;
using namespace apache::thrift::transport;
using namespace apache::thrift::protocol;
using namespace org::apache::cassandra;

37
38
39
CassandraClient *myClient;

int keepRunning;
40
41
uint64_t msgCtr;
uint64_t pmsgCtr;
42

43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
#pragma pack(push,1)

typedef union {
  uint64_t raw;
  struct {
    uint8_t knc_id;
    uint8_t bnc_id;
    uint8_t bic_id;
    uint8_t chassis_id;
    uint8_t rack_id_lsb;
    uint8_t rack_id_msb;
    uint8_t cluster_id;
    uint8_t datacenter_id;
  };
} DeviceLocation;

typedef struct {
  uint64_t sensor_id    : 16;
  uint64_t device_id    : 48;
} DeviceSensorId;

typedef union {
  uint64_t raw[2];
  struct {
    DeviceLocation dl;
    DeviceSensorId dsid;
  };
} SensorId;

#pragma pack(pop)


75
76
void sigHandler(int sig)
{
77
78
79
  keepRunning = 0;
}

80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
bool topicToSid(SensorId* sid, string topic)
{
  uint64_t pos = 0;
  const char* buf = topic.c_str();
  sid->raw[0] = 0;
  sid->raw[1] = 0;
  while (*buf) {
      if (*buf >= '0' && *buf <= '9') {
          sid->raw[pos / 64] |= (((uint64_t)(*buf - '0')) << (60-(pos%64)));
          pos += 4;
      }
      else if (*buf >= 'A' && *buf <= 'F') {
          sid->raw[pos / 64] |= (((uint64_t)(*buf - 'A' + 0xa)) << (60-(pos%64)));
          pos += 4;
      }
      else if (*buf >= 'a' && *buf <= 'f') {
          sid->raw[pos / 64] |= (((uint64_t)(*buf - 'a' + 0xa)) << (60-(pos%64)));
          pos += 4;
      }
      buf++;
  }
  return pos == 128;
}

string sidConvert(SensorId *sid)
{
  uint64_t ll[2];
  ll[0] = __builtin_bswap64(sid->raw[0]);
  ll[1]= __builtin_bswap64(sid->raw[1]);
  return string((char*)ll, 16);
}

string int64Convert(uint64_t n)
{
  n = __builtin_bswap64(n);
  return string((char*)&n, 8);
}

118
119
120
121
122
void mqttCallback(SimpleMQTTMessage *msg)
{
  /*
   * Increment the msgCtr/vmsgCtr for statistics.
   */
123
  msgCtr++;
124
125
126
127
128
129
  if (msg->isPublish())
    pmsgCtr++;

  /*
   * Decode the message and put into the database.
   */
130
  try {
131
132
133
134
135
136
137
#if 0
      msg->dump();
#endif
      if (msg->isPublish()) {
          ColumnParent cparent;
          Column c;
          string key, name, value;
138

139
140
141
          boost::posix_time::ptime epoch(boost::gregorian::date(1970,1,1));
          boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
          boost::posix_time::time_duration diff = now - epoch;
142

143
144
          cparent.column_family = "sensordata";
          uint64_t ts = diff.total_nanoseconds();
145

146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
          SensorId sid;
          if (topicToSid(&sid,msg->getTopic())) {
#if 0
              cout << "Topic decode successful:"
                  << "\nRaw: " << sid.raw[0] << " " << sid.raw[1]
                  << "\ndatacenter_id: " << hex << (uint32_t)sid.dl.datacenter_id
                  << "\ncluster_id: " << hex << (uint32_t)sid.dl.cluster_id
                  << "\nrack_id_msb: " << hex << (uint32_t)sid.dl.rack_id_msb
                  << "\nrack_id_lsb: " << hex << (uint32_t)sid.dl.rack_id_lsb
                  << "\nchassis_id: " << hex << (uint32_t)sid.dl.chassis_id
                  << "\nbic_id: " << hex << (uint32_t)sid.dl.bic_id
                  << "\nbnc_id: " << hex << (uint32_t)sid.dl.bnc_id
                  << "\nknc_id: " << hex << (uint32_t)sid.dl.knc_id
                  << "\ndevice_id: " << hex << sid.dsid.device_id
                  << "\nsensor_id: " << hex << sid.dsid.sensor_id
                  << "\n";
#endif

              key = sidConvert(&sid);
              name = int64Convert(ts);
              value = int64Convert(*((uint64_t*)msg->getPayload()));

              c.name = name;
              c.value = value;
              c.__isset.value = true;
              c.timestamp = ts/1000;
              c.__isset.timestamp = true;

              myClient->insert(key, cparent, c, ConsistencyLevel::ONE);

          }
Axel Auweter's avatar
Axel Auweter committed
177

178
#if 0
179
180
181
182
183
184
185
186
187
188
189
190
          key = convert(&sid, sizeof(msgCtr));
          name = convert(&ts, sizeof(ts));

          float tmp = 3.141;
          value = convert(&tmp, 4);
          c.name = name;
          c.value = value;
          c.__isset.value = true;
          c.timestamp = ts/1000;
          c.__isset.timestamp = true;

          myClient->insert(key, cparent, c, ConsistencyLevel::ONE);
191
#endif
192
      }
193
  }
194
195
  catch(TTransportException *te){
      cout << "TP Exception: " << te->what() << "[" << te->getType() << "]\n";
196
  }
197
198
199
200
201
202
203
204
  catch(InvalidRequestException *ire){
      cout << "IRE Exception: " << ire->what() << "[" << ire->why << "]\n";
  }
  catch(NotFoundException *nfe){
      cout << "NF Exception: " <<  nfe->what() << "\n";
  }

  delete msg;
205
206
}

207
int main(void) {
208

209
210
211
212
213
214
215
216
217
218
  boost::shared_ptr<TSocket> sock;
  boost::shared_ptr<TTransport> tr;
  boost::shared_ptr<TProtocol> prot;

  std::string clusterName;

  sock = boost::shared_ptr<TSocket>(new TSocket("localhost", 9160));
  tr = boost::shared_ptr<TFramedTransport>(new TFramedTransport(sock));
  prot = boost::shared_ptr<TBinaryProtocol>(new TBinaryProtocol(tr));

219
220
221
222
  /*
   * Open the connection to the Cassandra database and
   * create the necessary keyspace and column family.
   */
223
  try {
224
      myClient = new CassandraClient(prot);
225
      tr->open();
226

227
      start:
228
      myClient->describe_cluster_name(clusterName);
229
      cout << "Cluster name: " << clusterName << "\n";
230

231
      int dcdbKeyspace = -1;
232
      cout << "Keyspaces:\n";
233
      std::vector<KsDef> keySpaces;
234
      myClient->describe_keyspaces(keySpaces);
235
      for (unsigned int i=0; i<keySpaces.size(); i++) {
236
          cout << "   [" << i << "]: " << keySpaces[i].name << "\n";
237
238
239
          if(keySpaces[i].name == "dcdb") {
              dcdbKeyspace = i;
          }
240
241
242
      }

      CqlResult res;
243
244
245
      std::string query;

      if (dcdbKeyspace<0) {
246
          cout << "Creating dcdb keyspace...\n";
247
          query = "CREATE KEYSPACE dcdb WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '1'};";
248
          cout << "Sending CQL statement: " << query.c_str();
249
          myClient->execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
250
251
          cout << "  --> Success!\n";
          cout << "Starting over...\n\n";
252
253
254
          goto start;
      }
      else {
255
          cout << "Using existing keyspace dcdb...\n";
256
257
258
      }

      query = "USE dcdb;";
259
      cout << "Sending CQL statement: " << query;
260
      myClient->execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
261
      cout << "  --> Success!\n";
262
263

      int sensordataCf = -1;
264
      cout << "Column families in dcdb:\n";
265
      for (unsigned int i=0; i<keySpaces[dcdbKeyspace].cf_defs.size(); i++) {
266
          cout << "   [" << i << "]: " << keySpaces[dcdbKeyspace].cf_defs[i].name << "\n";
267
268
269
270
271
272
          if (keySpaces[dcdbKeyspace].cf_defs[i].name == "sensordata") {
              sensordataCf = i;
          }
      }

      if (sensordataCf<0) {
273
          cout << "Creating sensordata column familiy...\n";
274
          query = "CREATE TABLE sensordata ( sid blob, ts bigint, value bigint, PRIMARY KEY (sid, ts)) WITH COMPACT STORAGE;";
275
          cout << "Sending CQL statement: " << query;
276
          myClient->execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
277
278
          cout << "  --> Success!\n";
          cout << "Starting over...\n\n";
279
280
281
          goto start;
      }
      else {
282
          cout << "Using existing sensordata column familiy.\n";
283
      }
284
285
286
287
288
289
290
291
292
293
  }
  catch(TTransportException *te){
      cout << "TP Exception: " << te->what() << "[" << te->getType() << "]\n";
  }
  catch(InvalidRequestException *ire){
      cout << "IRE Exception: " << ire->what() << "[" << ire->why << "]\n";
  }
  catch(NotFoundException *nfe){
      cout << "NF Exception: " <<  nfe->what() << "\n";
  }
294

295
296
  /* Catch SIGINT signals */
  signal(SIGINT, sigHandler);
297
298


299
  try{
300
301
302
303
      keepRunning = 1;
      timeval start, end;
      double elapsed;

304
305
306
307
308
309
      SimpleMQTTServer ms;
      ms.setMessageCallback(mqttCallback);
      ms.start();

      cout << "Server running...\n";

310
311
      while(keepRunning) {
          gettimeofday(&start, NULL);
312
          sleep(60);
313
314
315
316
          /* not really thread safe but will do the job */
          gettimeofday(&end, NULL);
          elapsed = (end.tv_sec - start.tv_sec) * 1000.0;
          elapsed += (end.tv_usec - start.tv_usec) / 1000.0;
317
318
          float publish = msgCtr?(pmsgCtr*100.0)/msgCtr:0;
          cout << "Message rate: " << (msgCtr/elapsed)*1000.0 << " messages/second (" << publish << "% PUBLISH)\n";
319
          msgCtr = 0;
320
          pmsgCtr = 0;
321
322
      }

323
      cout << "Stopping...\n";
324
325
326

      ms.stop();
  }
327
328
  catch (exception *e) {
      cout << "Exception: " << e->what() << "\n";
329
330
331
332
  }

  return 0;
}