collectagent.cpp 7.47 KB
Newer Older
1
2
3
4
5
//============================================================================
// Name        : CollectAgent.cpp
// Author      : Axel Auweter
// Version     :
// Copyright   : Leibniz Supercomputing Centre
6
// Description : Main code of the CollectAgent
7
8
//============================================================================

9
#include <cstdlib>
10
#include <signal.h>
11
12
#include <unistd.h>
#include <string>
13

14
#include <boost/date_time/posix_time/posix_time.hpp>
15

16
#include <dcdb/connection.h>
17
18
#include <dcdb/sensordatastore.h>

19
#include "simplemqttserver.h"
20
#include "messaging.h"
21
#include "abrt.h"
22
#include "dcdbdaemon.h"
23

24
25
26
#define __STDC_FORMAT_MACROS
#include <inttypes.h>

27
using namespace std;
28
29

int keepRunning;
30
bool statistics;
31
32
uint64_t msgCtr;
uint64_t pmsgCtr;
33
DCDB::SensorDataStore *mySensorDataStore;
34
std::string listenHost, cassandraHost, ttl;
35

36
/* Normal termination (SIGINT, CTRL+C) */
37
38
void sigHandler(int sig)
{
39
40
41
  keepRunning = 0;
}

42
43
44
45
46
47
/* Crash */
void abrtHandler(int sig)
{
  abrt(EXIT_FAILURE, SIGNAL);
}

48
49
50
51
52
void mqttCallback(SimpleMQTTMessage *msg)
{
  /*
   * Increment the msgCtr/vmsgCtr for statistics.
   */
53
  msgCtr++;
54
55
56
57
58
59
  if (msg->isPublish())
    pmsgCtr++;

  /*
   * Decode the message and put into the database.
   */
60
  if (msg->isPublish()) {
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89

      uint64_t val;
      uint64_t ts;

      //In the 64 bit message case, the collect agent provides a timestamp
      if(msg->getPayloadLength()==sizeof(uint64_t)) {

          //printf("Providing timestamp...\n");
          val = *((uint64_t*)msg->getPayload());
          ts = Messaging::calculateTimestamp();
          //printf("Val = %" PRIu64 ", timestamp = %" PRIu64 "\n", val, ts);
      }

      //...otherwise it just retrieves it from the MQTT message payload.
      else if((msg->getPayloadLength()%sizeof(mqttPayload)==0) && (msg->getPayloadLength()>0)){

          //printf("Retrieving timestamp...\n");
          mqttPayload *payload = (mqttPayload*)msg->getPayload();

          val = payload->value; // payload[n].value
          ts = payload->timestamp;
          //printf("Val = %" PRIu64 ", timestamp = %" PRIu64 "\n", val, ts);
      }

      //...otherwise this message is malformed -> ignore...
      else {
          delete msg;
          return;
      }
90
91
92
93
94
95

      /*
       * Check if we can decode the message topic
       * into a valid SensorId. If successful, store
       * the record in the database.
       */
96
      DCDB::SensorId sid;
Axel Auweter's avatar
Axel Auweter committed
97
      if (sid.mqttTopicConvert(msg->getTopic())) {
98
#if 0
99
          cout << "Topic decode successful:"
100
101
102
103
104
105
106
107
108
109
110
111
              << "\nRaw: " << hex << setw(16) << setfill('0') << sid.raw[0] << " " << hex << setw(16) << setfill('0') << sid.raw[1]
              << "\ndatacenter_id: " << hex << ((sid.dl & 0xFF00000000000000) >> 56)
              << "\ncluster_id:    " << hex << ((sid.dl & 0x00FF000000000000) >> 48)
              << "\nrack_id:       " << hex << ((sid.dl & 0x0000FFFF00000000) >> 32)
              << "\nchassis_id:    " << hex << ((sid.dl & 0x00000000FF000000) >> 24)
              << "\nbic_id:        " << hex << ((sid.dl & 0x0000000000FF0000) >> 16)
              << "\nbmc_id:        " << hex << ((sid.dl & 0x000000000000FF00) >> 8)
              << "\nknc_id:        " << hex << ((sid.dl & 0x00000000000000FF))
              << "\ndevice_id:     " << hex << sid.dsid.device_id
              << "\nreserved:      " << hex << sid.dsid.rsvd
              << "\nsensor_number: " << hex << sid.dsid.sensor_number
              << "\n";
112
#endif
113
          mySensorDataStore->insert(&sid, ts, val);
114
      }
115
#if 1
116
117
118
119
      else {
          cout << "Wrong topic format: " << msg->getTopic() << "\n";
      }
#endif
120
  }
121
  delete msg;
122
123
}

124
125
126
127
/*
 * Print usage information
 */
void usage() {
128
  printf("Usage: collectagent [-D] [-s] [-l <host>] [-h <host>] [-t <ttl>]\n");
129
  printf("Collectagent will accept remote connections by listening to the\n");
130
131
  printf("specified listen address (-l <host>) at port 1883 (default MQTT port).\n");
  printf("It will also connect to cassandra to the specifiec addres (-h <host>).\n");
132
  printf("The default <host> is localhost/127.0.0.1.\n");
133
134
  printf("If the -t option is specified, data will be inserted with the specified\n");
  printf("TTL in seconds.\n");
135
136
  printf("If the -D option is specified, CollectAgent will run as daemon.\n");
  printf("With the -s option, CollectAgent will print message statistics.\n\n");
137
138
139
}

int main(int argc, char* const argv[]) {
140
141

  try{
142
      /*
143
       * Catch SIGINT signals to allow for proper server shutdowns.
144
       */
145
      signal(SIGINT, sigHandler);
146

147
148
149
150
151
152
153
      /*
       * Catch critical signals to allow for backtraces
       */
      signal(SIGABRT, abrtHandler);
      signal(SIGSEGV, abrtHandler);
      signal(SIGTERM, abrtHandler);

154
      /* Parse command line */
155
      int ret;
156
157
      listenHost="localhost";
      cassandraHost="127.0.0.1";
158
      ttl="0";
159
160
      statistics = false;
      while ((ret=getopt(argc, argv, "h:l:t:Ds?"))!=-1) {
161
          switch(ret) {
162
              case 'h':
163
164
165
166
                  cassandraHost = optarg;
                  break;
              case 'l':
                  listenHost = optarg;
167
                  break;
168
169
170
              case 't':
                  ttl = optarg;
                  break;
171
              case 'D':
172
                  dcdbdaemon();
173
                  break;
174
175
176
              case 's':
                  statistics = true;
                  break;
177
178
179
180
              case '?':
              default:
                  usage();
                  exit(EXIT_FAILURE);
181
182
183
          }
      }

184
      /*
185
186
       * Allocate and initialize connection to Cassandra.
       */
Axel Auweter's avatar
Axel Auweter committed
187
      std::string sdHost = cassandraHost;
188
189
      DCDB::Connection* dcdbConn;
      dcdbConn = new DCDB::Connection(sdHost, 9042);
190

Axel Auweter's avatar
Axel Auweter committed
191
      if (!dcdbConn->connect()) {
192
193
194
195
196
197
198
          std::cout << "Cannot connect to Cassandra!" << std::endl;
          exit(EXIT_FAILURE);
      }

      /*
       * Legacy behavior: Initialize the DCDB schema in Cassandra.
       */
Axel Auweter's avatar
Axel Auweter committed
199
      dcdbConn->initSchema();
200
201
202
203

      /*
       * Allocate the SensorDataStore.
       */
204
      mySensorDataStore = new DCDB::SensorDataStore(dcdbConn);
205
206
207

      /*
       * Set TTL for data store inserts if TTL > 0.
208
       */
209
210
211
212
213
214
      uint64_t ttlInt;
      std::istringstream ttlParser(ttl);
      if (!(ttlParser >> ttlInt)) {
          std::cout << "Invalid TTL!" << std::endl;
          exit(EXIT_FAILURE);
      }
215
216
217
      if (ttlInt) {
        mySensorDataStore->setTTL(ttlInt);
      }
218

219
220
221
      /*
       * Start the MQTT Message Server.
       */
222
      SimpleMQTTServer ms(listenHost,"1883");
223
      
224
225
226
227
228
      ms.setMessageCallback(mqttCallback);
      ms.start();

      cout << "Server running...\n";

229
230
231
232
233
234
235
      /*
       * Run (hopefully) forever...
       */
      keepRunning = 1;
      timeval start, end;
      double elapsed;

236
237
      while(keepRunning) {
          gettimeofday(&start, NULL);
238
          sleep(60);
239
240
241
242
          /* not really thread safe but will do the job */
          gettimeofday(&end, NULL);
          elapsed = (end.tv_sec - start.tv_sec) * 1000.0;
          elapsed += (end.tv_usec - start.tv_usec) / 1000.0;
243
          float publish = msgCtr?(pmsgCtr*100.0)/msgCtr:0;
244
245
246
          if (statistics) {
              cout << "Message rate: " << (msgCtr/elapsed)*1000.0 << " messages/second (" << publish << "% PUBLISH)\n";
          }
247
          msgCtr = 0;
248
          pmsgCtr = 0;
249
250
      }

251
      cout << "Stopping...\n";
252
253

      ms.stop();
254
      delete mySensorDataStore;
Axel Auweter's avatar
Axel Auweter committed
255
256
      dcdbConn->disconnect();
      delete dcdbConn;
257
  }
258
259
  catch (const exception& e) {
      cout << "Exception: " << e.what() << "\n";
260
      abrt(EXIT_FAILURE, INTERR);
261
262
  }

263
  return EXIT_SUCCESS;
264
}
265
266