collectagent.cpp 18.5 KB
Newer Older
1
2
//================================================================================
// Name        : collectagent.cpp
3
4
// Author      : Axel Auweter
// Copyright   : Leibniz Supercomputing Centre
5
// Description : Main code of the CollectAgent
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2011-2016 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
26

27
28
#include <boost/network/protocol/http/server.hpp>
#include <boost/network/utils/thread_pool.hpp>
29
#include <boost/network/uri.hpp>
30
#include <iostream>
31
#include <cstdlib>
32
#include <signal.h>
33
34
#include <unistd.h>
#include <string>
35

36
#include <boost/date_time/posix_time/posix_time.hpp>
37

38
#include <dcdb/connection.h>
39
#include <dcdb/sensordatastore.h>
40
#include <dcdb/sensorconfig.h>
41
#include <dcdb/version.h>
42
#include "version.h"
43

44
#include "configuration.h"
45
#include "simplemqttserver.h"
46
#include "messaging.h"
47
#include "abrt.h"
48
#include "dcdbdaemon.h"
49

50
51
#include "sensorcache.h"

52
53
54
#define __STDC_FORMAT_MACROS
#include <inttypes.h>

55
using namespace std;
56
57

int keepRunning;
58
bool statistics;
59
60
uint64_t msgCtr;
uint64_t pmsgCtr;
61
uint64_t readingCtr;
62
DCDB::Connection* dcdbConn;
63
DCDB::SensorDataStore *mySensorDataStore;
64
DCDB::SensorConfig *mySensorConfig;
65
DCDB::SensorCache mySensorCache;
66
DCDB::SCError err;
67
logger_t lg;
68

69
/* Normal termination (SIGINT, CTRL+C) */
70
71
void sigHandler(int sig)
{
Alessio Netti's avatar
Alessio Netti committed
72
73
74
75
76
  boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
  if( sig == SIGINT )
      LOG(fatal) << "Received SIGINT";
  else if( sig == SIGTERM )
      LOG(fatal) << "Received SIGTERM";
77
78
79
  keepRunning = 0;
}

80
81
82
83
84
85
/* Crash */
void abrtHandler(int sig)
{
  abrt(EXIT_FAILURE, SIGNAL);
}

86
87
88
89
90
struct httpHandler_t;
typedef boost::network::http::server<httpHandler_t> httpServer_t;
struct httpHandler_t {
  void operator()(httpServer_t::request const &request, httpServer_t::connection_ptr connection) {
    httpServer_t::string_type ip = source(request);
91
    std::ostringstream data;
92
    static httpServer_t::response_header headers[] = { { "Connection", "close" }, { "Content-Type", "text/plain" } };
93

94
95
    //try getting the latest value 
    try {
96
97
98
99
100
      boost::network::uri::uri uri("http://localhost"+request.destination);
      std::map<std::string, std::string> queries;
      boost::network::uri::query_map(uri, queries);
      int avg = atoi(queries.find("avg")->second.c_str());

101
      int64_t val = mySensorCache.getSensor(uri.path(), (uint64_t) avg);
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125

      data << val << "\n";
      //data << "Sid : " << sid.toString() << ", Value: " << val << "." << std::endl;

      connection->set_status(httpServer_t::connection::ok);
      connection->set_headers(boost::make_iterator_range(headers, headers + 2));
      connection->write(data.str());

    }

    catch (const std::invalid_argument& e) {
      connection->set_status(httpServer_t::connection::not_found);
      connection->set_headers(boost::make_iterator_range(headers, headers + 2));
      connection->write("Error: Sensor id not found.\n");
    } catch (const std::out_of_range &e) {
      connection->set_status(httpServer_t::connection::no_content);
      connection->set_headers(boost::make_iterator_range(headers, headers + 2));
      connection->write("Error: Sensor unavailable.\n");
    } catch (const std::exception& e) {
      connection->set_status(httpServer_t::connection::internal_server_error);
      connection->set_headers(boost::make_iterator_range(headers, headers + 2));
      connection->write("Server error.\n");
    }

126
127
128
129
  }
};


130
int mqttCallback(SimpleMQTTMessage *msg)
131
132
133
134
{
  /*
   * Increment the msgCtr/vmsgCtr for statistics.
   */
135
  msgCtr++;
136
137
138
  if (msg->isPublish())
    pmsgCtr++;

139
  uint64_t len;
140
141
142
  /*
   * Decode the message and put into the database.
   */
143
  if (msg->isPublish()) {
144
145
146
147
      const char *topic = msg->getTopic().c_str();
      // We check whether the topic includes the \DCDB_MAP\ keyword, indicating that the payload will contain the
      // sensor's name. In that case, we set the mappingMessage flag to true, and filter the keyword out of the prefix
      // We use strncmp as it is the most efficient way to do it
Alessio Netti's avatar
Alessio Netti committed
148
      if (strncmp(topic, DCDB_MAP, DCDB_MAP_LEN) == 0) {
149
          if ((len = msg->getPayloadLength()) == 0) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
150
              LOG(error) << "Empty topic-to-name mapping message received";
151
152
              return 1;
          }
153

154
155
156
157
158
159
          string sensorName((char *) msg->getPayload(), len);
          err = mySensorConfig->publishSensor(sensorName.c_str(), topic + DCDB_MAP_LEN);

          // PublishSensor does most of the error checking for us
          switch (err) {
              case DCDB::SC_INVALIDPATTERN:
Alessio Netti's avatar
Logging    
Alessio Netti committed
160
                  LOG(error) << "Invalid sensor topic : " << msg->getTopic();
161
162
                  return 1;
              case DCDB::SC_INVALIDPUBLICNAME:
Alessio Netti's avatar
Logging    
Alessio Netti committed
163
                  LOG(error) << "Invalid sensor public name: " << sensorName;
164
165
                  return 1;
              case DCDB::SC_INVALIDSESSION:
Alessio Netti's avatar
Logging    
Alessio Netti committed
166
                  LOG(error) << "Cannot reach sensor data store.";
167
168
169
170
171
172
173
174
175
176
177
                  return 1;
              default:
                  break;
          }
      } else {
          mqttPayload buf, *payload;

          len = msg->getPayloadLength();
          //In the 64 bit message case, the collect agent provides a timestamp
          if (len == sizeof(uint64_t)) {
              payload = &buf;
178
              payload->value = *((int64_t *) msg->getPayload());
179
180
              payload->timestamp = Messaging::calculateTimestamp();
              len = sizeof(uint64_t) * 2;
181
          }
182
183
184
185
186
187
              //...otherwise it just retrieves it from the MQTT message payload.
          else if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
              payload = (mqttPayload *) msg->getPayload();
          }
              //...otherwise this message is malformed -> ignore...
          else {
Alessio Netti's avatar
Logging    
Alessio Netti committed
188
              LOG(error) << "Message malformed";
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
              return 1;
          }

          /*
           * Check if we can decode the message topic
           * into a valid SensorId. If successful, store
           * the record in the database.
           */
          DCDB::SensorId sid;
          if (sid.mqttTopicConvert(msg->getTopic())) {
#if 0
              cout << "Topic decode successful:" << endl
                  << "  Raw:            " << hex << setw(16) << setfill('0') << sid.getRaw()[0] << hex << setw(16) << setfill('0') << sid.getRaw()[1] << endl
                  << "  DeviceLocation: " << hex << setw(16) << setfill('0') << sid.getDeviceLocation() << endl
                  << "  device_id:      " << hex << setw(8) << setfill('0') << sid.getDeviceSensorId().device_id << endl
                  << "  sensor_number:  " << hex << setw(4) << setfill('0') << sid.getDeviceSensorId().sensor_number << endl << dec;

              cout << "Payload ("  << len/sizeof(mqttPayload) << " messages):"<< endl;
              for (uint64_t i=0; i<len/sizeof(mqttPayload); i++) {
                cout << "  " << i << ": ts=" << payload[i].timestamp << " val=" << payload[i].value << endl;
              }
              cout << endl;
211
#endif
212
	      std::list<DCDB::SensorDataStoreReading> readings;
213
              for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
214
215
		  DCDB::SensorDataStoreReading r(sid, payload[i].timestamp, payload[i].value);
		  readings.push_back(r);
216
217
                  mySensorCache.storeSensor(sid, payload[i].timestamp, payload[i].value);
              }
218
	      mySensorDataStore->insertBatch(readings);
219
	      readingCtr+= readings.size();
220

221
              //mySensorCache.dump();
222
          }
223
#if 1
224
225
226
227
          else {
              cout << "Wrong topic format: " << msg->getTopic() << "\n";
              return 1;
          }
228
#endif
229
      }
230
  }
231
  return 0;
232
233
}

234
235
236



237
238
239
240
/*
 * Print usage information
 */
void usage() {
Alessio Netti's avatar
Alessio Netti committed
241
242
  Configuration config("");
  globalCA_t& defaults = config.getGlobal();
243
244
245
246
247
  /*
             1         2         3         4         5         6         7         8
   012345678901234567890123456789012345678901234567890123456789012345678901234567890
   */
  cout << "Usage:" << endl;
Alessio Netti's avatar
Logging    
Alessio Netti committed
248
  cout << "  collectagent [-m<host>] [-r<host>] [-c<host>] [-C<interval>] [-u<username>] [-p<password>] [-t<ttl>] [-v<verbosity>] [-d] [-s] <path/to/configfiles/>" << endl;
249
250
251
252
  cout << "  collectagent -h" << endl;
  cout << endl;
  
  cout << "Options:" << endl;
253
254
255
256
  cout << "  -m<host>      MQTT listen address     [default: " << defaults.mqttListenAddress << "]" << endl;
  cout << "  -r<host>      REST API listen address [default: " << defaults.restListenAddress << "]" << endl;
  cout << "  -c<host>      Cassandra host          [default: " << defaults.cassandraSettings.address << "]" << endl;
  cout << "  -C<interval>  Cache interval in [s]   [default: " << defaults.cacheInterval << "]" << endl;
Michael Ott's avatar
Michael Ott committed
257
258
  cout << "  -u<username>  Cassandra username      [default: none]" << endl;
  cout << "  -p<password>  Cassandra password      [default: none]" << endl;
259
  cout << "  -t<ttl>       Cassandra insert TTL    [default: " << defaults.cassandraSettings.ttl << "]" << endl;
Alessio Netti's avatar
Logging    
Alessio Netti committed
260
261
  cout << "  -v<level>     Set verbosity of output [default: " << defaults.logLevelCmd << "]" << endl
       << "                Can be a number between 5 (all) and 0 (fatal)." << endl;
262
  cout << endl;
Michael Ott's avatar
Michael Ott committed
263
  cout << "  -d            Daemonize" << endl;
264
  cout << "  -s            Print message stats" <<endl;
Michael Ott's avatar
Michael Ott committed
265
  cout << "  -h            This help page" << endl;
266
  cout << endl;
267
268
269
}

int main(int argc, char* const argv[]) {
270
    cout << "CollectAgent " << VERSION << " (libdcdb " << DCDB::Version::getVersion() << ")" << endl << endl;
271
272

  try{
273

274
275
276
277
278
279
280
281
      // Checking if path to config is supplied
      if (argc <= 1) {
          cout << "Please specify a path to the config-directory" << endl << endl;
          usage();
          exit(EXIT_FAILURE);
      }

      // Defining options
Alessio Netti's avatar
Logging    
Alessio Netti committed
282
      const char* opts = "m:r:c:C:u:p:t:v:dDsh";
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298

      // Same mechanism as in DCDBPusher - checking if help string is requested before loading config
      char ret;
      while ((ret = getopt(argc, argv, opts)) != -1) {
          switch (ret)
          {
              case 'h':
                  usage();
                  exit(EXIT_FAILURE);
                  break;
              default:
                  //do nothing (other options are read later on)
                  break;
          }
      }

299
300
      initLogging();
      auto cmdSink = setupCmdLogger();
Alessio Netti's avatar
Logging    
Alessio Netti committed
301

302
303
      Configuration config(argv[argc - 1]);
      if( !config.readGlobal() ) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
304
          LOG(fatal) << "Failed to read global configuration!";
305
306
307
308
          exit(EXIT_FAILURE);
      }

      globalCA_t& settings = config.getGlobal();
309

310
      /* Parse command line */
311
      std::string listenHost, cassandraHost, restApiHost;
312
      std::string listenPort, cassandraPort, restApiPort;
313
314
315

      optind = 1;
      while ((ret=getopt(argc, argv, opts))!=-1) {
316
          switch(ret) {
317
              case 'm':
318
                  settings.mqttListenAddress = optarg;
319
                  break;
320
              case 'r':
321
                  settings.restListenAddress = optarg;
322
                  break;
323
              case 'c':
324
325
326
327
                  settings.cassandraSettings.address = optarg;
                  break;
              case 'C':
                  settings.cacheInterval = stoul(optarg);
328
                  break;
Michael Ott's avatar
Michael Ott committed
329
              case 'u':
330
                  settings.cassandraSettings.username = optarg;
331
                  break;
Michael Ott's avatar
Michael Ott committed
332
              case 'p': {
333
334
                  settings.cassandraSettings.password = optarg;
                  // What does this do? Mask the password?
Michael Ott's avatar
Michael Ott committed
335
336
337
338
339
340
341
                  size_t pwdLen = strlen(optarg);
                  memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
                  if (pwdLen > 3) {
                      memset(optarg+3, 0, pwdLen-3);
                  }
                  break;
              }
342
              case 't':
343
                  settings.cassandraSettings.ttl = stoul(optarg);
344
                  break;
Alessio Netti's avatar
Logging    
Alessio Netti committed
345
              case 'v':
346
                  settings.logLevelCmd = translateLogLevel(stoi(optarg));
Alessio Netti's avatar
Logging    
Alessio Netti committed
347
                  break;
348
              case 'd':
349
              case 'D':
350
                  settings.daemonize = 1;
351
                  break;
352
              case 's':
353
                  settings.statistics = 1;
354
                  break;
355
              case 'h':
356
357
358
              default:
                  usage();
                  exit(EXIT_FAILURE);
359
360
361
          }
      }

362
      auto fileSink = setupFileLogger(settings.tempDir, std::string("collectagent"));
Alessio Netti's avatar
Logging    
Alessio Netti committed
363
364
365
366
      //severity level may be overwritten (per option or config-file) --> set it according to globalSettings
      fileSink->set_filter(boost::log::trivial::severity >= settings.logLevelFile);
      cmdSink->set_filter(boost::log::trivial::severity >= settings.logLevelCmd);

367
      /*
Alessio Netti's avatar
Alessio Netti committed
368
       * Catch SIGINT and SIGTERM signals to allow for proper server shutdowns.
369
370
       */
      signal(SIGINT, sigHandler);
Alessio Netti's avatar
Alessio Netti committed
371
      signal(SIGTERM, sigHandler);
372
373
374
375
376
377
378
379
380
381
382

      /*
       * Catch critical signals to allow for backtraces
       */
      signal(SIGABRT, abrtHandler);
      signal(SIGSEGV, abrtHandler);

      // Daemonizing the collectagent
      if(settings.daemonize)
          dcdbdaemon();

383
384
385
      /*
       * Parse hostnames for port specifications
       */
386
      listenHost = string(settings.mqttListenAddress);
387
388
389
390
391
      size_t pos = listenHost.find(":");
      if (pos != string::npos) {
        listenPort = listenHost.substr(pos+1);
        listenHost.erase(pos);
      } else {
392
        listenPort = LISTENPORT;
393
      }
394
395

      cassandraHost = string(settings.cassandraSettings.address);
396
397
398
399
400
      pos = cassandraHost.find(":");
      if (pos != string::npos) {
        cassandraPort = cassandraHost.substr(pos+1);
        cassandraHost.erase(pos);
      } else {
401
        cassandraPort = CASSANDRAPORT;
402
      }
403
404

      restApiHost = string(settings.restListenAddress);
405
406
407
408
409
      pos = restApiHost.find(":");
      if (pos != string::npos) {
        restApiPort = restApiHost.substr(pos+1);
        restApiHost.erase(pos);
      } else {
410
        restApiPort = RESTAPIPORT;
411
412
      }

413
414
      // Setting the size of the sensor cache
      mySensorCache.setMaxHistory(settings.cacheInterval * 1000000000);
415

416
417
      //Allocate and initialize connection to Cassandra.
      dcdbConn = new DCDB::Connection(cassandraHost, atoi(cassandraPort.c_str()), settings.cassandraSettings.username, settings.cassandraSettings.password);
Alessio Netti's avatar
Alessio Netti committed
418
419
420
421
422
      dcdbConn->setNumThreadsIo(settings.cassandraSettings.numThreadsIo);
      dcdbConn->setQueueSizeIo(settings.cassandraSettings.queueSizeIo);
      uint32_t params[3] = {settings.cassandraSettings.coreConnPerHost, settings.cassandraSettings.maxConnPerHost, settings.cassandraSettings.maxConcRequests};
      dcdbConn->setBackendParams(params);

423

Axel Auweter's avatar
Axel Auweter committed
424
      if (!dcdbConn->connect()) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
425
          LOG(fatal) << "Cannot connect to Cassandra!";
426
427
428
429
430
431
          exit(EXIT_FAILURE);
      }

      /*
       * Legacy behavior: Initialize the DCDB schema in Cassandra.
       */
Axel Auweter's avatar
Axel Auweter committed
432
      dcdbConn->initSchema();
433

Alessio Netti's avatar
Alessio Netti committed
434

435
436
437
      /*
       * Allocate the SensorDataStore.
       */
438
      mySensorDataStore = new DCDB::SensorDataStore(dcdbConn);
439
      mySensorConfig = new DCDB::SensorConfig(dcdbConn);
440
441
442

      /*
       * Set TTL for data store inserts if TTL > 0.
443
       */
444
445
      if (settings.cassandraSettings.ttl > 0)
        mySensorDataStore->setTTL(settings.cassandraSettings.ttl);
Alessio Netti's avatar
Alessio Netti committed
446
      mySensorDataStore->setDebugLog(settings.cassandraSettings.debugLog);
447

448
449
450
      /*
       * Start the MQTT Message Server.
       */
451
      SimpleMQTTServer ms(listenHost, listenPort, settings.messageThreads, settings.messageSlots);
452
      
453
454
455
      ms.setMessageCallback(mqttCallback);
      ms.start();

Alessio Netti's avatar
Logging    
Alessio Netti committed
456
      LOG(info) << "MQTT Server running...";
457
      
458
459
460
461
462
463
464
465
466
467
468
469
      /*
       * Start the HTTP Server for the REST API
       */
      std::thread httpThread;
      httpHandler_t httpHandler;
      httpServer_t::options httpOptions(httpHandler);

      httpOptions.reuse_address(true);
      httpOptions.thread_pool(std::make_shared<boost::network::utils::thread_pool>());
      httpServer_t httpServer(httpOptions.address(restApiHost).port(restApiPort));
      httpThread = std::thread([&httpServer] { httpServer.run(); });

Alessio Netti's avatar
Logging    
Alessio Netti committed
470
      LOG(info) <<  "HTTP Server running...";
471

472
473
474
475
476
477
      /*
       * Run (hopefully) forever...
       */
      keepRunning = 1;
      timeval start, end;
      double elapsed;
478
479
480
      msgCtr = 0;
      pmsgCtr = 0;
      readingCtr = 0;
481

Alessio Netti's avatar
Alessio Netti committed
482
483
484
      gettimeofday(&start, NULL);
      uint64_t lastCleanup = start.tv_sec;

485
      LOG(info) << "Collect Agent running...";
486
487
      while(keepRunning) {
          gettimeofday(&start, NULL);
Alessio Netti's avatar
Alessio Netti committed
488
489
490
491
492
493
494
          if(start.tv_sec - lastCleanup > settings.cleaningInterval) {
              uint64_t purged = mySensorCache.clean(settings.cleaningInterval * 1000000000);
              lastCleanup = start.tv_sec;
              if(purged > 0)
                  LOG(info) << "Cache: purged " << purged << " obsolete entries";
          }

495
          sleep(60);
496
497
498
499
          /* not really thread safe but will do the job */
          gettimeofday(&end, NULL);
          elapsed = (end.tv_sec - start.tv_sec) * 1000.0;
          elapsed += (end.tv_usec - start.tv_usec) / 1000.0;
500
          float publish = msgCtr?(pmsgCtr*100.0)/msgCtr:0;
Alessio Netti's avatar
Alessio Netti committed
501
          if (settings.statistics && keepRunning) {
502
              LOG(info) << "Performance: " << (readingCtr/elapsed)*1000.0 << " inserts/s, " << (msgCtr/elapsed)*1000.0 << " messages/s (" << publish << "% PUBLISH)";
503
          }
504
          msgCtr = 0;
505
          pmsgCtr = 0;
506
	  readingCtr = 0;
507
508
      }

Alessio Netti's avatar
Logging    
Alessio Netti committed
509
      LOG(info) << "Stopping...";
510
511

      ms.stop();
Alessio Netti's avatar
Logging    
Alessio Netti committed
512
      LOG(info) << "MQTT Server stopped...";
513
514
      httpServer.stop();
      httpThread.join();
Alessio Netti's avatar
Logging    
Alessio Netti committed
515
      LOG(info) << "HTTP Server stopped...";
516
      delete mySensorDataStore;
517
      delete mySensorConfig;
Axel Auweter's avatar
Axel Auweter committed
518
519
      dcdbConn->disconnect();
      delete dcdbConn;
Alessio Netti's avatar
Logging    
Alessio Netti committed
520
      LOG(info) << "Collect Agent closed. Bye bye...";
521
  }
522
  catch (const exception& e) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
523
      LOG(fatal) << "Exception: " << e.what();
524
      abrt(EXIT_FAILURE, INTERR);
525
526
  }

527
  return EXIT_SUCCESS;
528
}
529
530