collectagent.cpp 6.48 KB
Newer Older
1
2
3
4
5
//============================================================================
// Name        : CollectAgent.cpp
// Author      : Axel Auweter
// Version     :
// Copyright   : Leibniz Supercomputing Centre
6
// Description : Main code of the CollectAgent
7
8
//============================================================================

9
10
#include <cstdlib>
#include <sstream>
11
12
13

#include <sys/socket.h>
#include <netinet/in.h>
14
#include <signal.h>
15
16
#include <sys/time.h>

17
#include <boost/date_time/posix_time/posix_time.hpp>
18

19
20
21
22
23
24
25
#include <thrift/Thrift.h>
#include <thrift/transport/TTransport.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/transport/TSocket.h>
#include <thrift/protocol/TProtocol.h>
#include <thrift/protocol/TBinaryProtocol.h>

26
#include "cassandra/Cassandra.h"
27
#include "simplemqttserver.h"
28

29
30
#include "mosquitto.h"

31
using namespace std;
32
33
34
35
36
using namespace apache::thrift;
using namespace apache::thrift::transport;
using namespace apache::thrift::protocol;
using namespace org::apache::cassandra;

37
38
39
CassandraClient *myClient;

int keepRunning;
40
41
uint64_t msgCtr;
uint64_t pmsgCtr;
42

43
44
void sigHandler(int sig)
{
45
46
47
  keepRunning = 0;
}

48
49
50
51
52
void mqttCallback(SimpleMQTTMessage *msg)
{
  /*
   * Increment the msgCtr/vmsgCtr for statistics.
   */
53
  msgCtr++;
54
55
56
57
58
59
  if (msg->isPublish())
    pmsgCtr++;

  /*
   * Decode the message and put into the database.
   */
60
  try {
61
62
63
64
      ColumnParent cparent;
      Column c;
      string key, name, value;

65
66
67
68
69
70
71
      boost::posix_time::ptime epoch(boost::gregorian::date(1970,1,1));
      boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
      boost::posix_time::time_duration diff = now - epoch;

      cparent.column_family = "sensordata";
      uint64_t ts = diff.total_nanoseconds();
      uint32_t sid = msgCtr;
72

Axel Auweter's avatar
Axel Auweter committed
73
74
      msg->dump();

75
      if (msg->isPublish()) {
76
#if 0
77
78
79
80
81
82
83
84
85
86
87
88
          key = convert(&sid, sizeof(msgCtr));
          name = convert(&ts, sizeof(ts));

          float tmp = 3.141;
          value = convert(&tmp, 4);
          c.name = name;
          c.value = value;
          c.__isset.value = true;
          c.timestamp = ts/1000;
          c.__isset.timestamp = true;

          myClient->insert(key, cparent, c, ConsistencyLevel::ONE);
89
#endif
90
      }
91
  }
92
93
  catch(TTransportException *te){
      cout << "TP Exception: " << te->what() << "[" << te->getType() << "]\n";
94
  }
95
96
97
98
99
100
101
102
  catch(InvalidRequestException *ire){
      cout << "IRE Exception: " << ire->what() << "[" << ire->why << "]\n";
  }
  catch(NotFoundException *nfe){
      cout << "NF Exception: " <<  nfe->what() << "\n";
  }

  delete msg;
103
104
}

105
int main(void) {
106

107
108
109
110
111
112
113
114
115
116
  boost::shared_ptr<TSocket> sock;
  boost::shared_ptr<TTransport> tr;
  boost::shared_ptr<TProtocol> prot;

  std::string clusterName;

  sock = boost::shared_ptr<TSocket>(new TSocket("localhost", 9160));
  tr = boost::shared_ptr<TFramedTransport>(new TFramedTransport(sock));
  prot = boost::shared_ptr<TBinaryProtocol>(new TBinaryProtocol(tr));

117
118
119
120
  /*
   * Open the connection to the Cassandra database and
   * create the necessary keyspace and column family.
   */
121
  try {
122
      myClient = new CassandraClient(prot);
123
      tr->open();
124

125
      start:
126
      myClient->describe_cluster_name(clusterName);
127
      cout << "Cluster name: " << clusterName << "\n";
128

129
      int dcdbKeyspace = -1;
130
      cout << "Keyspaces:\n";
131
      std::vector<KsDef> keySpaces;
132
      myClient->describe_keyspaces(keySpaces);
133
      for (unsigned int i=0; i<keySpaces.size(); i++) {
134
          cout << "   [" << i << "]: " << keySpaces[i].name << "\n";
135
136
137
          if(keySpaces[i].name == "dcdb") {
              dcdbKeyspace = i;
          }
138
139
140
      }

      CqlResult res;
141
142
143
      std::string query;

      if (dcdbKeyspace<0) {
144
          cout << "Creating dcdb keyspace...\n";
145
          query = "CREATE KEYSPACE dcdb WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '1'};";
146
          cout << "Sending CQL statement: " << query.c_str();
147
          myClient->execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
148
149
          cout << "  --> Success!\n";
          cout << "Starting over...\n\n";
150
151
152
          goto start;
      }
      else {
153
          cout << "Using existing keyspace dcdb...\n";
154
155
156
      }

      query = "USE dcdb;";
157
      cout << "Sending CQL statement: " << query;
158
      myClient->execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
159
      cout << "  --> Success!\n";
160
161

      int sensordataCf = -1;
162
      cout << "Column families in dcdb:\n";
163
      for (unsigned int i=0; i<keySpaces[dcdbKeyspace].cf_defs.size(); i++) {
164
          cout << "   [" << i << "]: " << keySpaces[dcdbKeyspace].cf_defs[i].name << "\n";
165
166
167
168
169
170
          if (keySpaces[dcdbKeyspace].cf_defs[i].name == "sensordata") {
              sensordataCf = i;
          }
      }

      if (sensordataCf<0) {
171
          cout << "Creating sensordata column familiy...\n";
Axel Auweter's avatar
Axel Auweter committed
172
          query = "CREATE TABLE sensordata ( sid bigint, ts bigint, value bigint, PRIMARY KEY (sid, ts)) WITH COMPACT STORAGE;";
173
          cout << "Sending CQL statement: " << query;
174
          myClient->execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
175
176
          cout << "  --> Success!\n";
          cout << "Starting over...\n\n";
177
178
179
          goto start;
      }
      else {
180
          cout << "Using existing sensordata column familiy.\n";
181
      }
182
183
184
185
186
187
188
189
190
191
  }
  catch(TTransportException *te){
      cout << "TP Exception: " << te->what() << "[" << te->getType() << "]\n";
  }
  catch(InvalidRequestException *ire){
      cout << "IRE Exception: " << ire->what() << "[" << ire->why << "]\n";
  }
  catch(NotFoundException *nfe){
      cout << "NF Exception: " <<  nfe->what() << "\n";
  }
192

193
194
  /* Catch SIGINT signals */
  signal(SIGINT, sigHandler);
195
196


197
  try{
198
199
200
201
      keepRunning = 1;
      timeval start, end;
      double elapsed;

202
203
204
205
206
207
      SimpleMQTTServer ms;
      ms.setMessageCallback(mqttCallback);
      ms.start();

      cout << "Server running...\n";

208
209
      while(keepRunning) {
          gettimeofday(&start, NULL);
210
          sleep(60);
211
212
213
214
          /* not really thread safe but will do the job */
          gettimeofday(&end, NULL);
          elapsed = (end.tv_sec - start.tv_sec) * 1000.0;
          elapsed += (end.tv_usec - start.tv_usec) / 1000.0;
215
216
          float publish = msgCtr?(pmsgCtr*100.0)/msgCtr:0;
          cout << "Message rate: " << (msgCtr/elapsed)*1000.0 << " messages/second (" << publish << "% PUBLISH)\n";
217
          msgCtr = 0;
218
          pmsgCtr = 0;
219
220
      }

221
      cout << "Stopping...\n";
222
223
224

      ms.stop();
  }
225
226
  catch (exception *e) {
      cout << "Exception: " << e->what() << "\n";
227
228
229
230
  }

  return 0;
}