collectagent.cpp 12 KB
Newer Older
1
2
//================================================================================
// Name        : collectagent.cpp
3
4
// Author      : Axel Auweter
// Copyright   : Leibniz Supercomputing Centre
5
// Description : Main code of the CollectAgent
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2011-2016 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
26

27
28
29
#include <boost/network/protocol/http/server.hpp>
#include <boost/network/utils/thread_pool.hpp>
#include <iostream>
30
#include <cstdlib>
31
#include <signal.h>
32
33
#include <unistd.h>
#include <string>
34

35
#include <boost/date_time/posix_time/posix_time.hpp>
36

37
#include <dcdb/connection.h>
38
39
#include <dcdb/sensordatastore.h>

40
#include "simplemqttserver.h"
41
#include "messaging.h"
42
#include "abrt.h"
43
#include "dcdbdaemon.h"
44

45
46
#include "sensorcache.h"

47
48
49
#define __STDC_FORMAT_MACROS
#include <inttypes.h>

50
using namespace std;
51
52

int keepRunning;
53
bool statistics;
54
55
uint64_t msgCtr;
uint64_t pmsgCtr;
56
DCDB::SensorDataStore *mySensorDataStore;
57
DCDB::SensorCache mySensorCache;
58

59
/* Normal termination (SIGINT, CTRL+C) */
60
61
void sigHandler(int sig)
{
62
63
64
  keepRunning = 0;
}

65
66
67
68
69
70
/* Crash */
void abrtHandler(int sig)
{
  abrt(EXIT_FAILURE, SIGNAL);
}

71
72
73
74
75
struct httpHandler_t;
typedef boost::network::http::server<httpHandler_t> httpServer_t;
struct httpHandler_t {
  void operator()(httpServer_t::request const &request, httpServer_t::connection_ptr connection) {
    httpServer_t::string_type ip = source(request);
76
77
78
79

    DCDB::SensorId sid;
    sid.mqttTopicConvert(request.destination);
    std::ostringstream data;
80
81

    static httpServer_t::response_header headers[] = { { "Connection", "close" }, { "Content-Type", "text/plain" } };
82
83
    //try getting the latest value 
    try {
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
      uint64_t val = mySensorCache.getSensor(sid);

      data << val << "\n";
      //data << "Sid : " << sid.toString() << ", Value: " << val << "." << std::endl;

      connection->set_status(httpServer_t::connection::ok);
      connection->set_headers(boost::make_iterator_range(headers, headers + 2));
      connection->write(data.str());

    }

    catch (const std::invalid_argument& e) {
      connection->set_status(httpServer_t::connection::not_found);
      connection->set_headers(boost::make_iterator_range(headers, headers + 2));
      connection->write("Error: Sensor id not found.\n");
    } catch (const std::out_of_range &e) {
      connection->set_status(httpServer_t::connection::no_content);
      connection->set_headers(boost::make_iterator_range(headers, headers + 2));
      connection->write("Error: Sensor unavailable.\n");
    } catch (const std::exception& e) {
      connection->set_status(httpServer_t::connection::internal_server_error);
      connection->set_headers(boost::make_iterator_range(headers, headers + 2));
      connection->write("Server error.\n");
    }

109
110
111
112
  }
};


113
114
115
116
117
void mqttCallback(SimpleMQTTMessage *msg)
{
  /*
   * Increment the msgCtr/vmsgCtr for statistics.
   */
118
  msgCtr++;
119
120
121
122
123
124
  if (msg->isPublish())
    pmsgCtr++;

  /*
   * Decode the message and put into the database.
   */
125
  if (msg->isPublish()) {
126
127
128
129
130
131
132
133
134

      uint64_t val;
      uint64_t ts;

      //In the 64 bit message case, the collect agent provides a timestamp
      if(msg->getPayloadLength()==sizeof(uint64_t)) {

          //printf("Providing timestamp...\n");
          val = *((uint64_t*)msg->getPayload());
135
          ts = Messaging::calculateTimestamp();
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
          //printf("Val = %" PRIu64 ", timestamp = %" PRIu64 "\n", val, ts);
      }

      //...otherwise it just retrieves it from the MQTT message payload.
      else if((msg->getPayloadLength()%sizeof(mqttPayload)==0) && (msg->getPayloadLength()>0)){

          //printf("Retrieving timestamp...\n");
          mqttPayload *payload = (mqttPayload*)msg->getPayload();

          val = payload->value; // payload[n].value
          ts = payload->timestamp;
          //printf("Val = %" PRIu64 ", timestamp = %" PRIu64 "\n", val, ts);
      }

      //...otherwise this message is malformed -> ignore...
      else {
          delete msg;
          return;
      }
155
156
157
158
159
160

      /*
       * Check if we can decode the message topic
       * into a valid SensorId. If successful, store
       * the record in the database.
       */
161
      DCDB::SensorId sid;
Axel Auweter's avatar
Axel Auweter committed
162
      if (sid.mqttTopicConvert(msg->getTopic())) {
163
#if 0
164
          cout << "Topic decode successful:"
165
166
167
168
169
170
171
172
173
174
175
176
              << "\nRaw: " << hex << setw(16) << setfill('0') << sid.raw[0] << " " << hex << setw(16) << setfill('0') << sid.raw[1]
              << "\ndatacenter_id: " << hex << ((sid.dl & 0xFF00000000000000) >> 56)
              << "\ncluster_id:    " << hex << ((sid.dl & 0x00FF000000000000) >> 48)
              << "\nrack_id:       " << hex << ((sid.dl & 0x0000FFFF00000000) >> 32)
              << "\nchassis_id:    " << hex << ((sid.dl & 0x00000000FF000000) >> 24)
              << "\nbic_id:        " << hex << ((sid.dl & 0x0000000000FF0000) >> 16)
              << "\nbmc_id:        " << hex << ((sid.dl & 0x000000000000FF00) >> 8)
              << "\nknc_id:        " << hex << ((sid.dl & 0x00000000000000FF))
              << "\ndevice_id:     " << hex << sid.dsid.device_id
              << "\nreserved:      " << hex << sid.dsid.rsvd
              << "\nsensor_number: " << hex << sid.dsid.sensor_number
              << "\n";
177
#endif
178
          mySensorDataStore->insert(&sid, ts, val);
179
          mySensorCache.storeSensor(sid, ts, val);
180
          //mySensorCache.dump();
181
      }
182
#if 1
183
184
185
186
      else {
          cout << "Wrong topic format: " << msg->getTopic() << "\n";
      }
#endif
187
  }
188
  delete msg;
189
190
}

191
192
193



194
195
196
197
/*
 * Print usage information
 */
void usage() {
198
  printf("Usage: collectagent [-D] [-s] [-l <host>] [-h <host>] [-t <ttl>]\n");
199
  printf("Collectagent will accept remote connections by listening to the\n");
200
201
  printf("specified listen address (-l <host>) at port 1883 (default MQTT port).\n");
  printf("It will also connect to cassandra to the specifiec addres (-h <host>).\n");
202
  printf("The default <host> is localhost/127.0.0.1.\n");
203
204
  printf("If the -t option is specified, data will be inserted with the specified\n");
  printf("TTL in seconds.\n");
205
206
  printf("If the -D option is specified, CollectAgent will run as daemon.\n");
  printf("With the -s option, CollectAgent will print message statistics.\n\n");
207
208
209
}

int main(int argc, char* const argv[]) {
210
211

  try{
212
      /*
213
       * Catch SIGINT signals to allow for proper server shutdowns.
214
       */
215
      signal(SIGINT, sigHandler);
216

217
218
219
220
221
222
223
      /*
       * Catch critical signals to allow for backtraces
       */
      signal(SIGABRT, abrtHandler);
      signal(SIGSEGV, abrtHandler);
      signal(SIGTERM, abrtHandler);

224
      /* Parse command line */
225
      int ret;
226
227
      std::string listenHost, cassandraHost, restApiHost, ttl;
      std::string listenPort, cassandraPort, restApiPort;
228
229
      listenHost="localhost";
      cassandraHost="127.0.0.1";
230
      restApiHost="0:0:0:0";
231
      ttl="0";
232
      statistics = false;
233
      while ((ret=getopt(argc, argv, "h:l:r:t:Ds?"))!=-1) {
234
          switch(ret) {
235
              case 'h':
236
237
238
239
                  cassandraHost = optarg;
                  break;
              case 'l':
                  listenHost = optarg;
240
                  break;
241
242
243
              case 'r':
                  restApiHost = optarg;
                  break;
244
245
246
              case 't':
                  ttl = optarg;
                  break;
247
              case 'D':
248
                  dcdbdaemon();
249
                  break;
250
251
252
              case 's':
                  statistics = true;
                  break;
253
254
255
256
              case '?':
              default:
                  usage();
                  exit(EXIT_FAILURE);
257
258
259
          }
      }

260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
      /*
       * Parse hostnames for port specifications
       */
      size_t pos = listenHost.find(":");
      if (pos != string::npos) {
        listenPort = listenHost.substr(pos+1);
        listenHost.erase(pos);
      } else {
        listenPort = "1883";
      }
      pos = cassandraHost.find(":");
      if (pos != string::npos) {
        cassandraPort = cassandraHost.substr(pos+1);
        cassandraHost.erase(pos);
      } else {
        cassandraPort = "9042";
      }
      pos = restApiHost.find(":");
      if (pos != string::npos) {
        restApiPort = restApiHost.substr(pos+1);
        restApiHost.erase(pos);
      } else {
        restApiPort = "8080";
      }


286
      /*
287
288
       * Allocate and initialize connection to Cassandra.
       */
289
      DCDB::Connection* dcdbConn;
290
      dcdbConn = new DCDB::Connection(cassandraHost, atoi(cassandraPort.c_str()));
291

Axel Auweter's avatar
Axel Auweter committed
292
      if (!dcdbConn->connect()) {
293
294
295
296
297
298
299
          std::cout << "Cannot connect to Cassandra!" << std::endl;
          exit(EXIT_FAILURE);
      }

      /*
       * Legacy behavior: Initialize the DCDB schema in Cassandra.
       */
Axel Auweter's avatar
Axel Auweter committed
300
      dcdbConn->initSchema();
301
302
303
304

      /*
       * Allocate the SensorDataStore.
       */
305
      mySensorDataStore = new DCDB::SensorDataStore(dcdbConn);
306
307
308

      /*
       * Set TTL for data store inserts if TTL > 0.
309
       */
310
311
312
313
314
315
      uint64_t ttlInt;
      std::istringstream ttlParser(ttl);
      if (!(ttlParser >> ttlInt)) {
          std::cout << "Invalid TTL!" << std::endl;
          exit(EXIT_FAILURE);
      }
316
317
318
      if (ttlInt) {
        mySensorDataStore->setTTL(ttlInt);
      }
319

320
321
322
      /*
       * Start the MQTT Message Server.
       */
323
      SimpleMQTTServer ms(listenHost, listenPort);
324
      
325
326
327
      ms.setMessageCallback(mqttCallback);
      ms.start();

328
329
      cout << "MQTT Server running..." << std::endl;
      
330
331
332
333
334
335
336
337
338
339
340
341
342
343
      /*
       * Start the HTTP Server for the REST API
       */
      std::thread httpThread;
      httpHandler_t httpHandler;
      httpServer_t::options httpOptions(httpHandler);

      httpOptions.reuse_address(true);
      httpOptions.thread_pool(std::make_shared<boost::network::utils::thread_pool>());
      httpServer_t httpServer(httpOptions.address(restApiHost).port(restApiPort));
      httpThread = std::thread([&httpServer] { httpServer.run(); });

      cout << "HTTP Server running..." << std::endl;

344
345
346
347
348
349
      /*
       * Run (hopefully) forever...
       */
      keepRunning = 1;
      timeval start, end;
      double elapsed;
350
351
      
      cout << "Collect Agent running..." << std::endl;
352

353
354
      while(keepRunning) {
          gettimeofday(&start, NULL);
355
          sleep(60);
356
357
358
359
          /* not really thread safe but will do the job */
          gettimeofday(&end, NULL);
          elapsed = (end.tv_sec - start.tv_sec) * 1000.0;
          elapsed += (end.tv_usec - start.tv_usec) / 1000.0;
360
          float publish = msgCtr?(pmsgCtr*100.0)/msgCtr:0;
361
362
363
          if (statistics) {
              cout << "Message rate: " << (msgCtr/elapsed)*1000.0 << " messages/second (" << publish << "% PUBLISH)\n";
          }
364
          msgCtr = 0;
365
          pmsgCtr = 0;
366
367
      }

368
      cout << "Stopping...\n";
369
370

      ms.stop();
371
      cout << "MQTT Server stopped..." << std::endl;
372
373
      httpServer.stop();
      httpThread.join();
374
      cout << "HTTP Server stopped..." << std::endl;
375
      delete mySensorDataStore;
Axel Auweter's avatar
Axel Auweter committed
376
377
      dcdbConn->disconnect();
      delete dcdbConn;
378
      cout << "Collect Agent closed. Bye bye..." << std::endl;
379
  }
380
381
  catch (const exception& e) {
      cout << "Exception: " << e.what() << "\n";
382
      abrt(EXIT_FAILURE, INTERR);
383
384
  }

385
  return EXIT_SUCCESS;
386
}
387
388