collectagent.cpp 10.4 KB
Newer Older
1
2
3
4
5
6
7
8
//============================================================================
// Name        : CollectAgent.cpp
// Author      : Axel Auweter
// Version     :
// Copyright   : Leibniz Supercomputing Centre
// Description : As of now this is some rudimentary bad code to talk to the DB
//============================================================================

9
10
11
#include <cstdlib>

#include <sstream>
12
13
14

#include <sys/socket.h>
#include <netinet/in.h>
15
#include <signal.h>
16
17
#include <sys/time.h>

18
#include <boost/date_time/posix_time/posix_time.hpp>
19

20
21
22
23
24
25
26
#include <thrift/Thrift.h>
#include <thrift/transport/TTransport.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/transport/TSocket.h>
#include <thrift/protocol/TProtocol.h>
#include <thrift/protocol/TBinaryProtocol.h>

27
#include "cassandra/Cassandra.h"
28
#include "simplemqttserver.h"
29

30
31
#include "mosquitto.h"

32
using namespace std;
33
34
35
36
37
using namespace apache::thrift;
using namespace apache::thrift::transport;
using namespace apache::thrift::protocol;
using namespace org::apache::cassandra;

38
39
40
41
42
43
44
45
46
CassandraClient *myClient;

int keepRunning;
int msgCtr;

void sigHandler(int sig) {
  keepRunning = 0;
}

47
static inline string convert(void* ptr, size_t len) {
48
49
50
51
52
53
54
55
56
57
58
59
60
  switch (len) {
  case 4:
//    *((uint32_t*)ptr) = __builtin_bswap32(*((uint32_t*)ptr));
    break;
  case 8:
//    *((uint64_t*)ptr) = __builtin_bswap64(*((uint64_t*)ptr));
    break;
  default:
    // Do nothing and hope for the best...
    break;
  }
  string res((char*)ptr, len);
  return res;
61
62
}

63
64
65
66
67
68
void mqttOnMessage(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message) {
  msgCtr++;
  try {
      CqlResult res;
      std::string query;
      std::stringstream datestr, sidstr;
69
#if 0
70
71
72
      time_t t = time(NULL);
      struct tm *now = localtime(&t);
      datestr << "'" << (now->tm_year + 1900) << "-" << (now->tm_mon + 1)
73
74
                << "-" << (now->tm_mday) << " " << (now->tm_hour) << ":"
                << (now->tm_min) << ":" << (now->tm_sec) << "'";
75
76
77
78
79
80
#else
      boost::posix_time::ptime epoch(boost::gregorian::date(1970,1,1));
      boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
      boost::posix_time::time_duration diff = now - epoch;
      //datestr << diff.total_nanoseconds();
#endif
81

82
83
84
      //sidstr << msgCtr;

#if 0
85
86
      query = "INSERT INTO sensordata (sid, ts, value) VALUES ( " + sidstr.str() + ", " + datestr.str() + ", 3.141 );";
      myClient->execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
87
88
89
90
91
92
93
#else
      ColumnParent cparent;
      cparent.column_family = "sensordata";
      Column c;
      string key, name, value;
      uint64_t ts = diff.total_nanoseconds();
      uint32_t sid = msgCtr;
94

95
96
      key = convert(&sid, sizeof(msgCtr));
      name = convert(&ts, sizeof(ts));
97

98
99
100
101
102
103
104
      float tmp = 3.141;
      value = convert(&tmp, 4);
      c.name = name;
      c.value = value;
      c.__isset.value = true;
      c.timestamp = ts/1000;
      c.__isset.timestamp = true;
105

106
107
      myClient->insert(key, cparent, c, ConsistencyLevel::ONE);
#endif
108

109
110
  }
  catch(TTransportException te){
111
      cout << "TP Exception: " << te.what() << "[" << te.getType() << "]\n";
112
  }catch(InvalidRequestException ire){
113
      cout << "IRE Exception: " << ire.what() << "[" << ire.why << "]\n";
114
  }catch(NotFoundException nfe){
115
      cout << "NF Exception: " <<  nfe.what() << "\n";
116
117
118
  }
}

119
#if 0
120
121
122
123
124
125
126
127
128
129
130
131
int main(void) {
  boost::shared_ptr<TSocket> sock;
  boost::shared_ptr<TTransport> tr;
  boost::shared_ptr<TProtocol> prot;

  std::string clusterName;

  sock = boost::shared_ptr<TSocket>(new TSocket("localhost", 9160));
  tr = boost::shared_ptr<TFramedTransport>(new TFramedTransport(sock));
  prot = boost::shared_ptr<TBinaryProtocol>(new TBinaryProtocol(tr));

  try {
132
      myClient = new CassandraClient(prot);
133
      tr->open();
134

135
      start:
136
      myClient->describe_cluster_name(clusterName);
137
      cout << "Cluster name: " << clusterName << "\n";
138

139
140

      int dcdbKeyspace = -1;
141
      cout << "Keyspaces:\n";
142
      std::vector<KsDef> keySpaces;
143
      myClient->describe_keyspaces(keySpaces);
144
      for (unsigned int i=0; i<keySpaces.size(); i++) {
145
          cout << "   [" << i << "]: " << keySpaces[i].name << "\n";
146
147
148
          if(keySpaces[i].name == "dcdb") {
              dcdbKeyspace = i;
          }
149
150
151
      }

      CqlResult res;
152
153
154
      std::string query;

      if (dcdbKeyspace<0) {
155
          cout << "Creating dcdb keyspace...\n";
156
          query = "CREATE KEYSPACE dcdb WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '1'};";
157
          cout << "Sending CQL statement: " << query.c_str();
158
          myClient->execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
159
160
          cout << "  --> Success!\n";
          cout << "Starting over...\n\n";
161
162
163
          goto start;
      }
      else {
164
          cout << "Using existing keyspace dcdb...\n";
165
166
167
      }

      query = "USE dcdb;";
168
      cout << "Sending CQL statement: " << query;
169
      myClient->execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
170
      cout << "  --> Success!\n";
171
172

      int sensordataCf = -1;
173
      cout << "Column families in dcdb:\n";
174
      for (unsigned int i=0; i<keySpaces[dcdbKeyspace].cf_defs.size(); i++) {
175
          cout << "   [" << i << "]: " << keySpaces[dcdbKeyspace].cf_defs[i].name << "\n";
176
177
178
179
180
181
          if (keySpaces[dcdbKeyspace].cf_defs[i].name == "sensordata") {
              sensordataCf = i;
          }
      }

      if (sensordataCf<0) {
182
          cout << "Creating sensordata column familiy...\n";
183
          query = "CREATE TABLE sensordata ( sid int, ts bigint, value float, PRIMARY KEY (sid, ts)) WITH COMPACT STORAGE;";
184
          cout << "Sending CQL statement: " << query;
185
          myClient->execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
186
187
          cout << "  --> Success!\n";
          cout << "Starting over...\n\n";
188
189
190
          goto start;
      }
      else {
191
          cout << "Using existing sensordata column familiy.\n";
192
193
194
195
196
      }

      /* Should have the keyspace and the column familiy in the system now, subscribe to local mqtt broker */
      int mosqMajor, mosqMinor, mosqRevision;
      mosquitto_lib_version(&mosqMajor, &mosqMinor, &mosqRevision);
197
      cout << "Initializing Mosquitto Library Version " << mosqMajor << "." << mosqMinor << "." << mosqRevision << "\n";
198
199
200
201
202
203
204
205
206
207
208
      mosquitto_lib_init();

      /* Init mosquitto struct */
      struct mosquitto* mosq;
      mosq = mosquitto_new("CollectAgent", false, NULL);
      if (!mosq) {
          perror(NULL);
          exit(EXIT_FAILURE);
      }

      /* Connect to the broker */
209
210
      cout << "Connecting to broker...";
      cout.flush();
211
212
213
214
      if (mosquitto_connect(mosq, "localhost", 1883, 1000) != MOSQ_ERR_SUCCESS) {
          perror("\nCould not connect to host");
          exit(EXIT_FAILURE);
      }
215
      cout << " Done.\n";
216
217
218
219
220

      /* Catch SIGINT signals */
      signal(SIGINT, sigHandler);

      /* Subscribe to anything */
221
222
      cout << "Subscribing to anything on the broker (Pattern #)...";
      cout.flush();
223
224
225
226
      if (mosquitto_subscribe(mosq, NULL, "#", 0) != MOSQ_ERR_SUCCESS) {
          perror("\nCould not subscribe");
          exit(EXIT_FAILURE);
      }
227
      cout << " Done.\n";
228
229

      /* Set the callback for mosquitto */
230
231
      cout << "Configuring message callback...";
      cout.flush();
232
      mosquitto_message_callback_set(mosq, mqttOnMessage);
233
      cout << " Done.\n";
234
235

      /* Here comes the main loop */
236
237
      cout << "Starting mqtt loop thread...";
      cout.flush();
238
239
240
241
      if (mosquitto_loop_start(mosq) != MOSQ_ERR_SUCCESS) {
          perror("\nCould not start mosquitto thread");
          exit(EXIT_FAILURE);
      }
242
      cout << " Done.\n";
243
244
245
246
247
248
249
250
251
252
253
254

      keepRunning = 1;
      timeval start, end;
      double elapsed;

      while(keepRunning) {
          gettimeofday(&start, NULL);
          sleep(10);
          /* not really thread safe but will do the job */
          gettimeofday(&end, NULL);
          elapsed = (end.tv_sec - start.tv_sec) * 1000.0;
          elapsed += (end.tv_usec - start.tv_usec) / 1000.0;
255
          cout << "Message rate: " << (msgCtr/elapsed)*1000.0 << " messages/second\n";
256
257
258
          msgCtr = 0;
      }

259
      cout << "Cleaning up...";
260
261
262
263
264
265
266
267

      /* Stop the mosquitto loop */
      mosquitto_loop_stop(mosq, true);
      mosquitto_disconnect(mosq);

      /* Disconnect from Cassandra */
      tr->close();

268
      cout << " Done.\n";
269

270
#if 0
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
      int resB = 0;
      while (resB < keySpaces[0].cf_defs.size()) {
          query = "SELECT * FROM " + keySpaces[0].cf_defs[resB].name + ";";
          printf("Sending CQL statement: %s...", query.c_str());
          myClient.execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
          printf(" Done.\nResult:\n");
          int resC = 0;
          while (resC < res.rows.size()) {
              int resD = 0;
              while (resD < res.rows[resC].columns.size()) {
                  printf("%s [%lu]:", res.rows[resC].columns[resD].name.c_str(),
                      res.rows[resC].columns[resD].value.length());
                  int resE = 0;
                  while (resE < res.rows[resC].columns[resD].value.length()) {
                      printf(" %02x", res.rows[resC].columns[resD].value.c_str()[resE] & 0xff);
                      resE++;
                  }
                  printf("\n");
                  resD++;
              }
              resC++;
          }
          resB++;
      }
295
#endif
296
297
  }
  catch(TTransportException te){
298
299
300
301
302
303
304
      cout << "TP Exception: " << te.what() << "[" << te.getType() << "]\n";
  }
  catch(InvalidRequestException ire){
      cout << "IRE Exception: " << ire.what() << "[" << ire.why << "]\n";
  }
  catch(NotFoundException nfe){
      cout << "NF Exception: " <<  nfe.what() << "\n";
305
306
307
  }
  return EXIT_SUCCESS;
}
308
309
#else

310
311
312
313
314
315
316
void mqttCallback(SimpleMQTTMessage *msg)
{
  cout << "Hello from mqttCallback!\n";
  msg->dump();
  delete msg;
}

317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
int main(void) {

  MQTTFixedHeader mh;
  mh.bits.type = MQTT_CONNECT;
  mh.bits.qos = 2;
  mh.bits.dup = 1;
  mh.bits.retain = 1;
  mh.bits.remaining_length[0] = 0xff;
  mh.bits.remaining_length[1] = 0xee;
  mh.bits.remaining_length[2] = 0xdd;
  mh.bits.remaining_length[3] = 0xcc;

  printf("mh.raw: 0x%02x 0x%02x 0x%02x 0x%02x 0x%02x\n", mh.raw[0], mh.raw[1], mh.raw[2], mh.raw[3], mh.raw[4]);

  try{
      SimpleMQTTServer ms;
333
      ms.setMessageCallback(mqttCallback);
334
335
336
337
338
339
340
341
342
343
344
345
346
      ms.start();
      sleep(60);
      ms.stop();
  }
  catch (std::exception e) {
      cout << "Exception: " << e.what() << "\n";
  }

  return 0;
}


#endif