CollectAgent.cpp 8.42 KB
Newer Older
1
2
3
4
5
6
7
8
//============================================================================
// Name        : CollectAgent.cpp
// Author      : Axel Auweter
// Version     :
// Copyright   : Leibniz Supercomputing Centre
// Description : As of now this is some rudimentary bad code to talk to the DB
//============================================================================

9
//#include <cstdio>
10
11
12
#include <cstdlib>

#include <sstream>
13
14
15

#include <sys/socket.h>
#include <netinet/in.h>
16
#include <signal.h>
17
18
#include <sys/time.h>

19

20
21
22
23
24
25
26
#include <thrift/Thrift.h>
#include <thrift/transport/TTransport.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/transport/TSocket.h>
#include <thrift/protocol/TProtocol.h>
#include <thrift/protocol/TBinaryProtocol.h>

27
#include "cassandra/Cassandra.h"
28

29
30
#include "mosquitto.h"

31
using namespace std;
32
33
34
35
36
using namespace apache::thrift;
using namespace apache::thrift::transport;
using namespace apache::thrift::protocol;
using namespace org::apache::cassandra;

37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
CassandraClient *myClient;

int keepRunning;
int msgCtr;

void sigHandler(int sig) {
  keepRunning = 0;
}

void mqttOnMessage(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message) {
  msgCtr++;
  try {
      CqlResult res;
      std::string query;
      std::stringstream datestr, sidstr;
      time_t t = time(NULL);
      struct tm *now = localtime(&t);

      datestr << "'" << (now->tm_year + 1900) << "-" << (now->tm_mon + 1)
            << "-" << (now->tm_mday) << " " << (now->tm_hour) << ":"
            << (now->tm_min) << ":" << (now->tm_sec) << "'";
      sidstr << msgCtr;

      query = "INSERT INTO sensordata (sid, ts, value) VALUES ( " + sidstr.str() + ", " + datestr.str() + ", 3.141 );";
      myClient->execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
  }
  catch(TTransportException te){
64
      cout << "TP Exception: " << te.what() << "[" << te.getType() << "]\n";
65
  }catch(InvalidRequestException ire){
66
      cout << "IRE Exception: " << ire.what() << "[" << ire.why << "]\n";
67
  }catch(NotFoundException nfe){
68
      cout << "NF Exception: " <<  nfe.what() << "\n";
69
70
71
72
  }
}


73
74
75
76
77
78
79
80
81
82
83
84
int main(void) {
  boost::shared_ptr<TSocket> sock;
  boost::shared_ptr<TTransport> tr;
  boost::shared_ptr<TProtocol> prot;

  std::string clusterName;

  sock = boost::shared_ptr<TSocket>(new TSocket("localhost", 9160));
  tr = boost::shared_ptr<TFramedTransport>(new TFramedTransport(sock));
  prot = boost::shared_ptr<TBinaryProtocol>(new TBinaryProtocol(tr));

  try {
85
      myClient = new CassandraClient(prot);
86
      tr->open();
87
88
89

    start:
      myClient->describe_cluster_name(clusterName);
90
      cout << "Cluster name: " << clusterName << "\n";
91

92
93

      int dcdbKeyspace = -1;
94
      cout << "Keyspaces:\n";
95
      std::vector<KsDef> keySpaces;
96
      myClient->describe_keyspaces(keySpaces);
97
      for (unsigned int i=0; i<keySpaces.size(); i++) {
98
          cout << "   [" << i << "]: " << keySpaces[i].name << "\n";
99
100
101
          if(keySpaces[i].name == "dcdb") {
              dcdbKeyspace = i;
          }
102
103
104
      }

      CqlResult res;
105
106
107
      std::string query;

      if (dcdbKeyspace<0) {
108
          cout << "Creating dcdb keyspace...\n";
109
          query = "CREATE KEYSPACE dcdb WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '1'};";
110
          cout << "Sending CQL statement: " << query.c_str();
111
          myClient->execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
112
113
          cout << "  --> Success!\n";
          cout << "Starting over...\n\n";
114
115
116
          goto start;
      }
      else {
117
          cout << "Using existing keyspace dcdb...\n";
118
119
120
      }

      query = "USE dcdb;";
121
      cout << "Sending CQL statement: " << query;
122
      myClient->execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
123
      cout << "  --> Success!\n";
124
125

      int sensordataCf = -1;
126
      cout << "Column families in dcdb:\n";
127
      for (unsigned int i=0; i<keySpaces[dcdbKeyspace].cf_defs.size(); i++) {
128
          cout << "   [" << i << "]: " << keySpaces[dcdbKeyspace].cf_defs[i].name << "\n";
129
130
131
132
133
134
          if (keySpaces[dcdbKeyspace].cf_defs[i].name == "sensordata") {
              sensordataCf = i;
          }
      }

      if (sensordataCf<0) {
135
          cout << "Creating sensordata column familiy...\n";
136
          query = "CREATE TABLE sensordata ( sid int, ts timestamp, value float, PRIMARY KEY (sid, ts)) WITH COMPACT STORAGE;";
137
          cout << "Sending CQL statement: " << query;
138
          myClient->execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
139
140
          cout << "  --> Success!\n";
          cout << "Starting over...\n\n";
141
142
143
          goto start;
      }
      else {
144
          cout << "Using existing sensordata column familiy.\n";
145
146
147
148
149
      }

      /* Should have the keyspace and the column familiy in the system now, subscribe to local mqtt broker */
      int mosqMajor, mosqMinor, mosqRevision;
      mosquitto_lib_version(&mosqMajor, &mosqMinor, &mosqRevision);
150
      cout << "Initializing Mosquitto Library Version " << mosqMajor << "." << mosqMinor << "." << mosqRevision << "\n";
151
152
153
154
155
156
157
158
159
160
161
      mosquitto_lib_init();

      /* Init mosquitto struct */
      struct mosquitto* mosq;
      mosq = mosquitto_new("CollectAgent", false, NULL);
      if (!mosq) {
          perror(NULL);
          exit(EXIT_FAILURE);
      }

      /* Connect to the broker */
162
163
      cout << "Connecting to broker...";
      cout.flush();
164
165
166
167
      if (mosquitto_connect(mosq, "localhost", 1883, 1000) != MOSQ_ERR_SUCCESS) {
          perror("\nCould not connect to host");
          exit(EXIT_FAILURE);
      }
168
      cout << " Done.\n";
169
170
171
172
173

      /* Catch SIGINT signals */
      signal(SIGINT, sigHandler);

      /* Subscribe to anything */
174
175
      cout << "Subscribing to anything on the broker (Pattern #)...";
      cout.flush();
176
177
178
179
      if (mosquitto_subscribe(mosq, NULL, "#", 0) != MOSQ_ERR_SUCCESS) {
          perror("\nCould not subscribe");
          exit(EXIT_FAILURE);
      }
180
      cout << " Done.\n";
181
182

      /* Set the callback for mosquitto */
183
184
      cout << "Configuring message callback...";
      cout.flush();
185
      mosquitto_message_callback_set(mosq, mqttOnMessage);
186
      cout << " Done.\n";
187
188

      /* Here comes the main loop */
189
190
      cout << "Starting mqtt loop thread...";
      cout.flush();
191
192
193
194
      if (mosquitto_loop_start(mosq) != MOSQ_ERR_SUCCESS) {
          perror("\nCould not start mosquitto thread");
          exit(EXIT_FAILURE);
      }
195
      cout << " Done.\n";
196
197
198
199
200
201
202
203
204
205
206
207

      keepRunning = 1;
      timeval start, end;
      double elapsed;

      while(keepRunning) {
          gettimeofday(&start, NULL);
          sleep(10);
          /* not really thread safe but will do the job */
          gettimeofday(&end, NULL);
          elapsed = (end.tv_sec - start.tv_sec) * 1000.0;
          elapsed += (end.tv_usec - start.tv_usec) / 1000.0;
208
          cout << "Message rate: " << (msgCtr/elapsed)*1000.0 << " messages/second\n";
209
210
211
          msgCtr = 0;
      }

212
      cout << "Cleaning up...";
213
214
215
216
217
218
219
220

      /* Stop the mosquitto loop */
      mosquitto_loop_stop(mosq, true);
      mosquitto_disconnect(mosq);

      /* Disconnect from Cassandra */
      tr->close();

221
      cout << " Done.\n";
222

223
#if 0
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
      int resB = 0;
      while (resB < keySpaces[0].cf_defs.size()) {
          query = "SELECT * FROM " + keySpaces[0].cf_defs[resB].name + ";";
          printf("Sending CQL statement: %s...", query.c_str());
          myClient.execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
          printf(" Done.\nResult:\n");
          int resC = 0;
          while (resC < res.rows.size()) {
              int resD = 0;
              while (resD < res.rows[resC].columns.size()) {
                  printf("%s [%lu]:", res.rows[resC].columns[resD].name.c_str(),
                      res.rows[resC].columns[resD].value.length());
                  int resE = 0;
                  while (resE < res.rows[resC].columns[resD].value.length()) {
                      printf(" %02x", res.rows[resC].columns[resD].value.c_str()[resE] & 0xff);
                      resE++;
                  }
                  printf("\n");
                  resD++;
              }
              resC++;
          }
          resB++;
      }
248
#endif
249
250
  }
  catch(TTransportException te){
251
252
253
254
255
256
257
      cout << "TP Exception: " << te.what() << "[" << te.getType() << "]\n";
  }
  catch(InvalidRequestException ire){
      cout << "IRE Exception: " << ire.what() << "[" << ire.why << "]\n";
  }
  catch(NotFoundException nfe){
      cout << "NF Exception: " <<  nfe.what() << "\n";
258
259
260
  }
  return EXIT_SUCCESS;
}