collectagent.cpp 16.3 KB
Newer Older
1
2
//================================================================================
// Name        : collectagent.cpp
3
4
// Author      : Axel Auweter
// Copyright   : Leibniz Supercomputing Centre
5
// Description : Main code of the CollectAgent
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2011-2016 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
26

27
28
#include <boost/network/protocol/http/server.hpp>
#include <boost/network/utils/thread_pool.hpp>
29
#include <boost/network/uri.hpp>
30
#include <iostream>
31
#include <cstdlib>
32
#include <signal.h>
33
34
#include <unistd.h>
#include <string>
35

36
#include <boost/date_time/posix_time/posix_time.hpp>
37

38
#include <dcdb/connection.h>
39
#include <dcdb/sensordatastore.h>
40
#include <dcdb/sensorconfig.h>
41

42
#include "configuration.h"
43
#include "simplemqttserver.h"
44
#include "messaging.h"
45
#include "abrt.h"
46
#include "dcdbdaemon.h"
47

48
49
#include "sensorcache.h"

50
51
52
#define __STDC_FORMAT_MACROS
#include <inttypes.h>

53
using namespace std;
54
55

int keepRunning;
56
bool statistics;
57
58
uint64_t msgCtr;
uint64_t pmsgCtr;
59
DCDB::Connection* dcdbConn;
60
DCDB::SensorDataStore *mySensorDataStore;
61
DCDB::SensorConfig *mySensorConfig;
62
DCDB::SensorCache mySensorCache;
63
DCDB::SCError err;
64

65
/* Normal termination (SIGINT, CTRL+C) */
66
67
void sigHandler(int sig)
{
68
69
70
  keepRunning = 0;
}

71
72
73
74
75
76
/* Crash */
void abrtHandler(int sig)
{
  abrt(EXIT_FAILURE, SIGNAL);
}

77
78
79
80
81
struct httpHandler_t;
typedef boost::network::http::server<httpHandler_t> httpServer_t;
struct httpHandler_t {
  void operator()(httpServer_t::request const &request, httpServer_t::connection_ptr connection) {
    httpServer_t::string_type ip = source(request);
82
    std::ostringstream data;
83
    static httpServer_t::response_header headers[] = { { "Connection", "close" }, { "Content-Type", "text/plain" } };
84

85
86
    //try getting the latest value 
    try {
87
88
89
90
91
92
      boost::network::uri::uri uri("http://localhost"+request.destination);
      std::map<std::string, std::string> queries;
      boost::network::uri::query_map(uri, queries);
      int avg = atoi(queries.find("avg")->second.c_str());

      uint64_t val = mySensorCache.getSensor(uri.path(), (uint64_t) avg);
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116

      data << val << "\n";
      //data << "Sid : " << sid.toString() << ", Value: " << val << "." << std::endl;

      connection->set_status(httpServer_t::connection::ok);
      connection->set_headers(boost::make_iterator_range(headers, headers + 2));
      connection->write(data.str());

    }

    catch (const std::invalid_argument& e) {
      connection->set_status(httpServer_t::connection::not_found);
      connection->set_headers(boost::make_iterator_range(headers, headers + 2));
      connection->write("Error: Sensor id not found.\n");
    } catch (const std::out_of_range &e) {
      connection->set_status(httpServer_t::connection::no_content);
      connection->set_headers(boost::make_iterator_range(headers, headers + 2));
      connection->write("Error: Sensor unavailable.\n");
    } catch (const std::exception& e) {
      connection->set_status(httpServer_t::connection::internal_server_error);
      connection->set_headers(boost::make_iterator_range(headers, headers + 2));
      connection->write("Server error.\n");
    }

117
118
119
120
  }
};


121
int mqttCallback(SimpleMQTTMessage *msg)
122
123
124
125
{
  /*
   * Increment the msgCtr/vmsgCtr for statistics.
   */
126
  msgCtr++;
127
128
129
  if (msg->isPublish())
    pmsgCtr++;

130
  uint64_t len;
131
132
133
  /*
   * Decode the message and put into the database.
   */
134
  if (msg->isPublish()) {
135
136
137
138
139
140
141
142
143
      const char *topic = msg->getTopic().c_str();
      // We check whether the topic includes the \DCDB_MAP\ keyword, indicating that the payload will contain the
      // sensor's name. In that case, we set the mappingMessage flag to true, and filter the keyword out of the prefix
      // We use strncmp as it is the most efficient way to do it
      if (strlen(topic) > DCDB_MAP_LEN && strncmp(topic, DCDB_MAP, DCDB_MAP_LEN) == 0) {
          if ((len = msg->getPayloadLength()) == 0) {
              cout << "Empty topic-to-name mapping message received\n";
              return 1;
          }
144

145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
          string sensorName((char *) msg->getPayload(), len);
          err = mySensorConfig->publishSensor(sensorName.c_str(), topic + DCDB_MAP_LEN);

          // PublishSensor does most of the error checking for us
          switch (err) {
              case DCDB::SC_INVALIDPATTERN:
                  std::cout << "Invalid sensor topic : " << msg->getTopic() << std::endl;
                  return 1;
              case DCDB::SC_INVALIDPUBLICNAME:
                  std::cout << "Invalid sensor public name: " << sensorName << std::endl;
                  return 1;
              case DCDB::SC_INVALIDSESSION:
                  std::cout << "Cannot reach sensor data store." << std::endl;
                  return 1;
              default:
                  break;
          }
      } else {
          mqttPayload buf, *payload;

          len = msg->getPayloadLength();
          //In the 64 bit message case, the collect agent provides a timestamp
          if (len == sizeof(uint64_t)) {
              payload = &buf;
              payload->value = *((uint64_t *) msg->getPayload());
              payload->timestamp = Messaging::calculateTimestamp();
              len = sizeof(uint64_t) * 2;
172
          }
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
              //...otherwise it just retrieves it from the MQTT message payload.
          else if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
              payload = (mqttPayload *) msg->getPayload();
          }
              //...otherwise this message is malformed -> ignore...
          else {
              cout << "Message malformed\n";
              return 1;
          }

          /*
           * Check if we can decode the message topic
           * into a valid SensorId. If successful, store
           * the record in the database.
           */
          DCDB::SensorId sid;
          if (sid.mqttTopicConvert(msg->getTopic())) {
#if 0
              cout << "Topic decode successful:" << endl
                  << "  Raw:            " << hex << setw(16) << setfill('0') << sid.getRaw()[0] << hex << setw(16) << setfill('0') << sid.getRaw()[1] << endl
                  << "  DeviceLocation: " << hex << setw(16) << setfill('0') << sid.getDeviceLocation() << endl
                  << "  device_id:      " << hex << setw(8) << setfill('0') << sid.getDeviceSensorId().device_id << endl
                  << "  sensor_number:  " << hex << setw(4) << setfill('0') << sid.getDeviceSensorId().sensor_number << endl << dec;

              cout << "Payload ("  << len/sizeof(mqttPayload) << " messages):"<< endl;
              for (uint64_t i=0; i<len/sizeof(mqttPayload); i++) {
                cout << "  " << i << ": ts=" << payload[i].timestamp << " val=" << payload[i].value << endl;
              }
              cout << endl;
202
#endif
203
204
205
              for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
                  mySensorDataStore->insert(&sid, payload[i].timestamp, payload[i].value);
                  mySensorCache.storeSensor(sid, payload[i].timestamp, payload[i].value);
206

207
208
              }
              //mySensorCache.dump();
209
          }
210
#if 1
211
212
213
214
          else {
              cout << "Wrong topic format: " << msg->getTopic() << "\n";
              return 1;
          }
215
#endif
216
      }
217
  }
218
  return 0;
219
220
}

221
222
223



224
225
226
227
/*
 * Print usage information
 */
void usage() {
228
  globalCA_t& defaults = Configuration("").getGlobal();
229
230
231
232
233
  /*
             1         2         3         4         5         6         7         8
   012345678901234567890123456789012345678901234567890123456789012345678901234567890
   */
  cout << "Usage:" << endl;
234
  cout << "  collectagent [-m<host>] [-r<host>] [-c<host>] [-C<interval>] [-u<username>] [-p<password>] [-t<ttl>] [-d] [-s] <path/to/configfiles/>" << endl;
235
236
237
238
  cout << "  collectagent -h" << endl;
  cout << endl;
  
  cout << "Options:" << endl;
239
240
241
242
  cout << "  -m<host>      MQTT listen address     [default: " << defaults.mqttListenAddress << "]" << endl;
  cout << "  -r<host>      REST API listen address [default: " << defaults.restListenAddress << "]" << endl;
  cout << "  -c<host>      Cassandra host          [default: " << defaults.cassandraSettings.address << "]" << endl;
  cout << "  -C<interval>  Cache interval in [s]   [default: " << defaults.cacheInterval << "]" << endl;
Michael Ott's avatar
Michael Ott committed
243
244
  cout << "  -u<username>  Cassandra username      [default: none]" << endl;
  cout << "  -p<password>  Cassandra password      [default: none]" << endl;
245
  cout << "  -t<ttl>       Cassandra insert TTL    [default: " << defaults.cassandraSettings.ttl << "]" << endl;
246
  cout << endl;
Michael Ott's avatar
Michael Ott committed
247
  cout << "  -d            Daemonize" << endl;
248
  cout << "  -s            Print message stats" <<endl;
Michael Ott's avatar
Michael Ott committed
249
  cout << "  -h            This help page" << endl;
250
  cout << endl;
251
252
253
}

int main(int argc, char* const argv[]) {
254
255

  try{
256

257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
      // Checking if path to config is supplied
      if (argc <= 1) {
          cout << "Please specify a path to the config-directory" << endl << endl;
          usage();
          exit(EXIT_FAILURE);
      }

      // Defining options
      const char* opts = "m:r:c:C:u:p:t:dDsh";

      // Same mechanism as in DCDBPusher - checking if help string is requested before loading config
      char ret;
      while ((ret = getopt(argc, argv, opts)) != -1) {
          switch (ret)
          {
              case 'h':
                  usage();
                  exit(EXIT_FAILURE);
                  break;
              default:
                  //do nothing (other options are read later on)
                  break;
          }
      }

      Configuration config(argv[argc - 1]);
      if( !config.readGlobal() ) {
          cout << "Failed to read global configuration!" << endl;
          exit(EXIT_FAILURE);
      }

      globalCA_t& settings = config.getGlobal();
289

290
      /* Parse command line */
291
      std::string listenHost, cassandraHost, restApiHost;
292
      std::string listenPort, cassandraPort, restApiPort;
293
294
295

      optind = 1;
      while ((ret=getopt(argc, argv, opts))!=-1) {
296
          switch(ret) {
297
              case 'm':
298
                  settings.mqttListenAddress = optarg;
299
                  break;
300
              case 'r':
301
                  settings.restListenAddress = optarg;
302
                  break;
303
              case 'c':
304
305
306
307
                  settings.cassandraSettings.address = optarg;
                  break;
              case 'C':
                  settings.cacheInterval = stoul(optarg);
308
                  break;
Michael Ott's avatar
Michael Ott committed
309
              case 'u':
310
                  settings.cassandraSettings.username = optarg;
311
                  break;
Michael Ott's avatar
Michael Ott committed
312
              case 'p': {
313
314
                  settings.cassandraSettings.password = optarg;
                  // What does this do? Mask the password?
Michael Ott's avatar
Michael Ott committed
315
316
317
318
319
320
321
                  size_t pwdLen = strlen(optarg);
                  memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
                  if (pwdLen > 3) {
                      memset(optarg+3, 0, pwdLen-3);
                  }
                  break;
              }
322
              case 't':
323
                  settings.cassandraSettings.ttl = stoul(optarg);
324
                  break;
325
              case 'd':
326
              case 'D':
327
                  settings.daemonize = 1;
328
                  break;
329
              case 's':
330
                  settings.statistics = 1;
331
                  break;
332
              case 'h':
333
334
335
              default:
                  usage();
                  exit(EXIT_FAILURE);
336
337
338
          }
      }

339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
      /*
       * Catch SIGINT signals to allow for proper server shutdowns.
       */
      signal(SIGINT, sigHandler);

      /*
       * Catch critical signals to allow for backtraces
       */
      signal(SIGABRT, abrtHandler);
      signal(SIGSEGV, abrtHandler);
      signal(SIGTERM, abrtHandler);

      // Daemonizing the collectagent
      if(settings.daemonize)
          dcdbdaemon();

355
356
357
      /*
       * Parse hostnames for port specifications
       */
358
      listenHost = string(settings.mqttListenAddress);
359
360
361
362
363
      size_t pos = listenHost.find(":");
      if (pos != string::npos) {
        listenPort = listenHost.substr(pos+1);
        listenHost.erase(pos);
      } else {
364
        listenPort = LISTENPORT;
365
      }
366
367

      cassandraHost = string(settings.cassandraSettings.address);
368
369
370
371
372
      pos = cassandraHost.find(":");
      if (pos != string::npos) {
        cassandraPort = cassandraHost.substr(pos+1);
        cassandraHost.erase(pos);
      } else {
373
        cassandraPort = CASSANDRAPORT;
374
      }
375
376

      restApiHost = string(settings.restListenAddress);
377
378
379
380
381
      pos = restApiHost.find(":");
      if (pos != string::npos) {
        restApiPort = restApiHost.substr(pos+1);
        restApiHost.erase(pos);
      } else {
382
        restApiPort = RESTAPIPORT;
383
384
      }

385
386
      // Setting the size of the sensor cache
      mySensorCache.setMaxHistory(settings.cacheInterval * 1000000000);
387

388
389
      //Allocate and initialize connection to Cassandra.
      dcdbConn = new DCDB::Connection(cassandraHost, atoi(cassandraPort.c_str()), settings.cassandraSettings.username, settings.cassandraSettings.password);
390

Axel Auweter's avatar
Axel Auweter committed
391
      if (!dcdbConn->connect()) {
392
393
394
395
396
397
398
          std::cout << "Cannot connect to Cassandra!" << std::endl;
          exit(EXIT_FAILURE);
      }

      /*
       * Legacy behavior: Initialize the DCDB schema in Cassandra.
       */
Axel Auweter's avatar
Axel Auweter committed
399
      dcdbConn->initSchema();
400
401
402
403

      /*
       * Allocate the SensorDataStore.
       */
404
      mySensorDataStore = new DCDB::SensorDataStore(dcdbConn);
405
      mySensorConfig = new DCDB::SensorConfig(dcdbConn);
406
407
408

      /*
       * Set TTL for data store inserts if TTL > 0.
409
       */
410
411
      if (settings.cassandraSettings.ttl > 0)
        mySensorDataStore->setTTL(settings.cassandraSettings.ttl);
412

413
414
415
      /*
       * Start the MQTT Message Server.
       */
416
      SimpleMQTTServer ms(listenHost, listenPort, settings.messageThreads, settings.messageSlots);
417
      
418
419
420
      ms.setMessageCallback(mqttCallback);
      ms.start();

421
422
      cout << "MQTT Server running..." << std::endl;
      
423
424
425
426
427
428
429
430
431
432
433
434
435
436
      /*
       * Start the HTTP Server for the REST API
       */
      std::thread httpThread;
      httpHandler_t httpHandler;
      httpServer_t::options httpOptions(httpHandler);

      httpOptions.reuse_address(true);
      httpOptions.thread_pool(std::make_shared<boost::network::utils::thread_pool>());
      httpServer_t httpServer(httpOptions.address(restApiHost).port(restApiPort));
      httpThread = std::thread([&httpServer] { httpServer.run(); });

      cout << "HTTP Server running..." << std::endl;

437
438
439
440
441
442
      /*
       * Run (hopefully) forever...
       */
      keepRunning = 1;
      timeval start, end;
      double elapsed;
443
444
      
      cout << "Collect Agent running..." << std::endl;
445

446
447
      while(keepRunning) {
          gettimeofday(&start, NULL);
448
          sleep(60);
449
450
451
452
          /* not really thread safe but will do the job */
          gettimeofday(&end, NULL);
          elapsed = (end.tv_sec - start.tv_sec) * 1000.0;
          elapsed += (end.tv_usec - start.tv_usec) / 1000.0;
453
          float publish = msgCtr?(pmsgCtr*100.0)/msgCtr:0;
454
          if (settings.statistics) {
455
456
              cout << "Message rate: " << (msgCtr/elapsed)*1000.0 << " messages/second (" << publish << "% PUBLISH)\n";
          }
457
          msgCtr = 0;
458
          pmsgCtr = 0;
459
460
      }

461
      cout << "Stopping...\n";
462
463

      ms.stop();
464
      cout << "MQTT Server stopped..." << std::endl;
465
466
      httpServer.stop();
      httpThread.join();
467
      cout << "HTTP Server stopped..." << std::endl;
468
      delete mySensorDataStore;
469
      delete mySensorConfig;
Axel Auweter's avatar
Axel Auweter committed
470
471
      dcdbConn->disconnect();
      delete dcdbConn;
472
      cout << "Collect Agent closed. Bye bye..." << std::endl;
473
  }
474
475
  catch (const exception& e) {
      cout << "Exception: " << e.what() << "\n";
476
      abrt(EXIT_FAILURE, INTERR);
477
478
  }

479
  return EXIT_SUCCESS;
480
}
481
482