collectagent.cpp 24 KB
Newer Older
1
2
//================================================================================
// Name        : collectagent.cpp
3
4
// Author      : Axel Auweter
// Copyright   : Leibniz Supercomputing Centre
5
// Description : Main code of the CollectAgent
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2011-2016 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
26

27
28
#include <boost/network/protocol/http/server.hpp>
#include <boost/network/utils/thread_pool.hpp>
29
#include <boost/network/uri.hpp>
30
#include <iostream>
31
#include <cstdlib>
32
#include <signal.h>
33
34
#include <unistd.h>
#include <string>
35

36
#include <boost/date_time/posix_time/posix_time.hpp>
37

38
#include <dcdb/connection.h>
39
#include <dcdb/sensordatastore.h>
40
#include <dcdb/sensorconfig.h>
41
#include <dcdb/version.h>
42
#include <dcdb/sensor.h>
43
#include "version.h"
44

45
#include "configuration.h"
46
#include "simplemqttserver.h"
47
#include "messaging.h"
48
#include "abrt.h"
49
#include "dcdbdaemon.h"
50
#include "sensorcache.h"
51
52
#include "analyticscontroller.h"
#include "../analytics/includes/QueryEngine.h"
53

54
55
56
#define __STDC_FORMAT_MACROS
#include <inttypes.h>

57
using namespace std;
58
59

int keepRunning;
60
bool statistics;
61
62
uint64_t msgCtr;
uint64_t pmsgCtr;
63
uint64_t readingCtr;
Alessio Netti's avatar
Alessio Netti committed
64
SensorCache mySensorCache;
65
AnalyticsController* analyticsController;
66
DCDB::Connection* dcdbConn;
67
DCDB::SensorDataStore *mySensorDataStore;
68
69
DCDB::SensorConfig *mySensorConfig;
DCDB::SCError err;
70
QueryEngine& queryEngine = QueryEngine::getInstance();
71
logger_t lg;
72

73
std::vector<reading_t>* sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>* buffer, const bool rel) {
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
    std::string topic;
    try {
        topic = queryEngine.getNavigator()->getNodeTopic(name);
    } catch(const std::domain_error& e) {
        return NULL;
    }
    std::vector <reading_t> *output = NULL;
    DCDB::SensorId sid;
    sid.mqttTopicConvert(topic);
    if(mySensorCache.getSensorMap().count(sid) > 0) {
        CacheEntry &entry = mySensorCache.getSensorMap()[sid];
        output = entry.getView(startTs, endTs, buffer, rel, true);
        if (output->size() > 0)
            return output;
    }
    // If we are here then the sensor was not found in the cache - we need to fetch data from Cassandra
    try {
        DCDB::PublicSensor publicSensor;
        publicSensor.name = name;
        publicSensor.pattern = topic;
        std::list <DCDB::SensorDataStoreReading> results;
        DCDB::Sensor sensor(dcdbConn, publicSensor);
        uint64_t now = getTimestamp();
        //Converting relative timestamps to absolute
        uint64_t startTsInt = rel ? now - startTs : startTs;
        uint64_t endTsInt = rel ? now - endTs : endTs;
        DCDB::TimeStamp start(startTsInt), end(endTsInt);
        sensor.query(results, start, end, DCDB::AGGREGATE_NONE);
        if(output)
            output->clear();
        else if(buffer) {
            buffer->clear();
            output = buffer;
        } else
            output = new std::vector<reading_t>();
        reading_t reading;
        //TODO: fix when result contains only partial time range of the query
        for (const auto &r : results) {
            reading.value = r.value;
            reading.timestamp = r.timeStamp.getRaw();
            output->push_back(reading);
        }
    }
    catch(const std::exception& e) {
        if(!buffer && output) delete output;
        return NULL;
    }
    return output;
122
123
}

124
/* Normal termination (SIGINT, CTRL+C) */
125
126
void sigHandler(int sig)
{
Alessio Netti's avatar
Alessio Netti committed
127
128
129
130
131
  boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
  if( sig == SIGINT )
      LOG(fatal) << "Received SIGINT";
  else if( sig == SIGTERM )
      LOG(fatal) << "Received SIGTERM";
132
133
134
  keepRunning = 0;
}

135
136
137
138
139
140
/* Crash */
void abrtHandler(int sig)
{
  abrt(EXIT_FAILURE, SIGNAL);
}

141
142
143
144
145
struct httpHandler_t;
typedef boost::network::http::server<httpHandler_t> httpServer_t;
struct httpHandler_t {
  void operator()(httpServer_t::request const &request, httpServer_t::connection_ptr connection) {
    httpServer_t::string_type ip = source(request);
146
    std::ostringstream data;
147
    static httpServer_t::response_header headers[] = { { "Connection", "close" }, { "Content-Type", "text/plain" } };
148

149
150
    //try getting the latest value 
    try {
151
152
153
154
155
      boost::network::uri::uri uri("http://localhost"+request.destination);
      std::map<std::string, std::string> queries;
      boost::network::uri::query_map(uri, queries);
      int avg = atoi(queries.find("avg")->second.c_str());

Alessio Netti's avatar
Alessio Netti committed
156
      int64_t val = mySensorCache.getSensor(uri.path(), (uint64_t)avg * 1000000000);
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180

      data << val << "\n";
      //data << "Sid : " << sid.toString() << ", Value: " << val << "." << std::endl;

      connection->set_status(httpServer_t::connection::ok);
      connection->set_headers(boost::make_iterator_range(headers, headers + 2));
      connection->write(data.str());

    }

    catch (const std::invalid_argument& e) {
      connection->set_status(httpServer_t::connection::not_found);
      connection->set_headers(boost::make_iterator_range(headers, headers + 2));
      connection->write("Error: Sensor id not found.\n");
    } catch (const std::out_of_range &e) {
      connection->set_status(httpServer_t::connection::no_content);
      connection->set_headers(boost::make_iterator_range(headers, headers + 2));
      connection->write("Error: Sensor unavailable.\n");
    } catch (const std::exception& e) {
      connection->set_status(httpServer_t::connection::internal_server_error);
      connection->set_headers(boost::make_iterator_range(headers, headers + 2));
      connection->write("Server error.\n");
    }

181
182
183
184
  }
};


185
int mqttCallback(SimpleMQTTMessage *msg)
186
187
188
189
{
  /*
   * Increment the msgCtr/vmsgCtr for statistics.
   */
190
  msgCtr++;
191
192
193
  if (msg->isPublish())
    pmsgCtr++;

194
  uint64_t len;
195
196
197
  /*
   * Decode the message and put into the database.
   */
198
  if (msg->isPublish()) {
199
200
201
202
      const char *topic = msg->getTopic().c_str();
      // We check whether the topic includes the \DCDB_MAP\ keyword, indicating that the payload will contain the
      // sensor's name. In that case, we set the mappingMessage flag to true, and filter the keyword out of the prefix
      // We use strncmp as it is the most efficient way to do it
Alessio Netti's avatar
Alessio Netti committed
203
      if (strncmp(topic, DCDB_MAP, DCDB_MAP_LEN) == 0) {
204
          if ((len = msg->getPayloadLength()) == 0) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
205
              LOG(error) << "Empty topic-to-name mapping message received";
206
207
              return 1;
          }
208

209
210
211
212
213
214
          string sensorName((char *) msg->getPayload(), len);
          err = mySensorConfig->publishSensor(sensorName.c_str(), topic + DCDB_MAP_LEN);

          // PublishSensor does most of the error checking for us
          switch (err) {
              case DCDB::SC_INVALIDPATTERN:
Alessio Netti's avatar
Logging    
Alessio Netti committed
215
                  LOG(error) << "Invalid sensor topic : " << msg->getTopic();
216
217
                  return 1;
              case DCDB::SC_INVALIDPUBLICNAME:
Alessio Netti's avatar
Logging    
Alessio Netti committed
218
                  LOG(error) << "Invalid sensor public name: " << sensorName;
219
220
                  return 1;
              case DCDB::SC_INVALIDSESSION:
Alessio Netti's avatar
Logging    
Alessio Netti committed
221
                  LOG(error) << "Cannot reach sensor data store.";
222
223
224
225
226
227
228
229
230
231
232
                  return 1;
              default:
                  break;
          }
      } else {
          mqttPayload buf, *payload;

          len = msg->getPayloadLength();
          //In the 64 bit message case, the collect agent provides a timestamp
          if (len == sizeof(uint64_t)) {
              payload = &buf;
233
              payload->value = *((int64_t *) msg->getPayload());
234
235
              payload->timestamp = Messaging::calculateTimestamp();
              len = sizeof(uint64_t) * 2;
236
          }
237
238
239
240
241
242
              //...otherwise it just retrieves it from the MQTT message payload.
          else if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
              payload = (mqttPayload *) msg->getPayload();
          }
              //...otherwise this message is malformed -> ignore...
          else {
Alessio Netti's avatar
Logging    
Alessio Netti committed
243
              LOG(error) << "Message malformed";
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
              return 1;
          }

          /*
           * Check if we can decode the message topic
           * into a valid SensorId. If successful, store
           * the record in the database.
           */
          DCDB::SensorId sid;
          if (sid.mqttTopicConvert(msg->getTopic())) {
#if 0
              cout << "Topic decode successful:" << endl
                  << "  Raw:            " << hex << setw(16) << setfill('0') << sid.getRaw()[0] << hex << setw(16) << setfill('0') << sid.getRaw()[1] << endl
                  << "  DeviceLocation: " << hex << setw(16) << setfill('0') << sid.getDeviceLocation() << endl
                  << "  device_id:      " << hex << setw(8) << setfill('0') << sid.getDeviceSensorId().device_id << endl
                  << "  sensor_number:  " << hex << setw(4) << setfill('0') << sid.getDeviceSensorId().sensor_number << endl << dec;

              cout << "Payload ("  << len/sizeof(mqttPayload) << " messages):"<< endl;
              for (uint64_t i=0; i<len/sizeof(mqttPayload); i++) {
                cout << "  " << i << ": ts=" << payload[i].timestamp << " val=" << payload[i].value << endl;
              }
              cout << endl;
266
#endif
267
	      std::list<DCDB::SensorDataStoreReading> readings;
268
              for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
269
270
		  DCDB::SensorDataStoreReading r(sid, payload[i].timestamp, payload[i].value);
		  readings.push_back(r);
271
272
                  mySensorCache.storeSensor(sid, payload[i].timestamp, payload[i].value);
              }
273
	      mySensorDataStore->insertBatch(readings);
274
	      readingCtr+= readings.size();
275

276
              //mySensorCache.dump();
277
          }
278
#if 1
279
280
281
282
          else {
              cout << "Wrong topic format: " << msg->getTopic() << "\n";
              return 1;
          }
283
#endif
284
      }
285
  }
286
  return 0;
287
288
}

289
290
291



292
293
294
295
/*
 * Print usage information
 */
void usage() {
Alessio Netti's avatar
Alessio Netti committed
296
297
  Configuration config("");
  globalCA_t& defaults = config.getGlobal();
298
299
300
301
302
  /*
             1         2         3         4         5         6         7         8
   012345678901234567890123456789012345678901234567890123456789012345678901234567890
   */
  cout << "Usage:" << endl;
303
  cout << "  collectagent [-d] [-s] [-x] [-a<string>] [-m<host>] [-r<host>] [-c<host>] [-u<username>] [-p<password>] [-t<ttl>] [-v<verbosity>] <path/to/configfiles/>" << endl;
304
305
306
307
  cout << "  collectagent -h" << endl;
  cout << endl;
  
  cout << "Options:" << endl;
308
  cout << "  -a <string>   Auto-publish pattern    [default: none]" << endl;
309
310
311
  cout << "  -m<host>      MQTT listen address     [default: " << defaults.mqttListenAddress << "]" << endl;
  cout << "  -r<host>      REST API listen address [default: " << defaults.restListenAddress << "]" << endl;
  cout << "  -c<host>      Cassandra host          [default: " << defaults.cassandraSettings.address << "]" << endl;
Michael Ott's avatar
Michael Ott committed
312
313
  cout << "  -u<username>  Cassandra username      [default: none]" << endl;
  cout << "  -p<password>  Cassandra password      [default: none]" << endl;
314
  cout << "  -t<ttl>       Cassandra insert TTL    [default: " << defaults.cassandraSettings.ttl << "]" << endl;
Alessio Netti's avatar
Logging    
Alessio Netti committed
315
316
  cout << "  -v<level>     Set verbosity of output [default: " << defaults.logLevelCmd << "]" << endl
       << "                Can be a number between 5 (all) and 0 (fatal)." << endl;
317
  cout << endl;
Michael Ott's avatar
Michael Ott committed
318
  cout << "  -d            Daemonize" << endl;
319
  cout << "  -s            Print message stats" <<endl;
320
  cout << "  -x            Parse and print the config but do not actually start collectagent" << endl;
Michael Ott's avatar
Michael Ott committed
321
  cout << "  -h            This help page" << endl;
322
  cout << endl;
323
324
325
}

int main(int argc, char* const argv[]) {
326
    cout << "CollectAgent " << VERSION << " (libdcdb " << DCDB::Version::getVersion() << ")" << endl << endl;
327
    bool validateConfig = false;
328
329

  try{
330

331
332
333
334
335
336
337
338
      // Checking if path to config is supplied
      if (argc <= 1) {
          cout << "Please specify a path to the config-directory" << endl << endl;
          usage();
          exit(EXIT_FAILURE);
      }

      // Defining options
339
      const char* opts = "a:m:r:c:C:u:p:t:v:dDsxh";
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355

      // Same mechanism as in DCDBPusher - checking if help string is requested before loading config
      char ret;
      while ((ret = getopt(argc, argv, opts)) != -1) {
          switch (ret)
          {
              case 'h':
                  usage();
                  exit(EXIT_FAILURE);
                  break;
              default:
                  //do nothing (other options are read later on)
                  break;
          }
      }

356
357
      initLogging();
      auto cmdSink = setupCmdLogger();
Alessio Netti's avatar
Logging    
Alessio Netti committed
358

359
360
      Configuration config(argv[argc - 1]);
      if( !config.readGlobal() ) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
361
          LOG(fatal) << "Failed to read global configuration!";
362
363
364
365
          exit(EXIT_FAILURE);
      }

      globalCA_t& settings = config.getGlobal();
366

367
      /* Parse command line */
368
      std::string listenHost, cassandraHost, restApiHost;
369
      std::string listenPort, cassandraPort, restApiPort;
370
371
372

      optind = 1;
      while ((ret=getopt(argc, argv, opts))!=-1) {
373
          switch(ret) {
374
375
376
              case 'a':
                  settings.pluginSettings.sensorPattern = optarg;
                  break;
377
              case 'm':
378
                  settings.mqttListenAddress = optarg;
379
                  break;
380
              case 'r':
381
                  settings.restListenAddress = optarg;
382
                  break;
383
              case 'c':
384
385
                  settings.cassandraSettings.address = optarg;
                  break;
Michael Ott's avatar
Michael Ott committed
386
              case 'u':
387
                  settings.cassandraSettings.username = optarg;
388
                  break;
Michael Ott's avatar
Michael Ott committed
389
              case 'p': {
390
391
                  settings.cassandraSettings.password = optarg;
                  // What does this do? Mask the password?
Michael Ott's avatar
Michael Ott committed
392
393
394
395
396
397
398
                  size_t pwdLen = strlen(optarg);
                  memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
                  if (pwdLen > 3) {
                      memset(optarg+3, 0, pwdLen-3);
                  }
                  break;
              }
399
              case 't':
400
                  settings.cassandraSettings.ttl = stoul(optarg);
401
                  break;
Alessio Netti's avatar
Logging    
Alessio Netti committed
402
              case 'v':
403
                  settings.logLevelCmd = translateLogLevel(stoi(optarg));
Alessio Netti's avatar
Logging    
Alessio Netti committed
404
                  break;
405
              case 'd':
406
              case 'D':
407
                  settings.daemonize = 1;
408
                  break;
409
              case 's':
410
                  settings.statistics = 1;
411
                  break;
412
413
414
              case 'x':
                  validateConfig = true;
                  break;
415
              case 'h':
416
417
418
              default:
                  usage();
                  exit(EXIT_FAILURE);
419
420
421
          }
      }

422
      auto fileSink = setupFileLogger(settings.pluginSettings.tempdir, std::string("collectagent"));
Alessio Netti's avatar
Logging    
Alessio Netti committed
423
424
425
426
      //severity level may be overwritten (per option or config-file) --> set it according to globalSettings
      fileSink->set_filter(boost::log::trivial::severity >= settings.logLevelFile);
      cmdSink->set_filter(boost::log::trivial::severity >= settings.logLevelCmd);

427
      /*
Alessio Netti's avatar
Alessio Netti committed
428
       * Catch SIGINT and SIGTERM signals to allow for proper server shutdowns.
429
430
       */
      signal(SIGINT, sigHandler);
Alessio Netti's avatar
Alessio Netti committed
431
      signal(SIGTERM, sigHandler);
432
433
434
435
436
437
438
439
440
441
442

      /*
       * Catch critical signals to allow for backtraces
       */
      signal(SIGABRT, abrtHandler);
      signal(SIGSEGV, abrtHandler);

      // Daemonizing the collectagent
      if(settings.daemonize)
          dcdbdaemon();

443
444
445
      /*
       * Parse hostnames for port specifications
       */
446
      listenHost = string(settings.mqttListenAddress);
447
448
449
450
451
      size_t pos = listenHost.find(":");
      if (pos != string::npos) {
        listenPort = listenHost.substr(pos+1);
        listenHost.erase(pos);
      } else {
452
        listenPort = LISTENPORT;
453
      }
454
455

      cassandraHost = string(settings.cassandraSettings.address);
456
457
458
459
460
      pos = cassandraHost.find(":");
      if (pos != string::npos) {
        cassandraPort = cassandraHost.substr(pos+1);
        cassandraHost.erase(pos);
      } else {
461
        cassandraPort = CASSANDRAPORT;
462
      }
463
464

      restApiHost = string(settings.restListenAddress);
465
466
467
468
469
      pos = restApiHost.find(":");
      if (pos != string::npos) {
        restApiPort = restApiHost.substr(pos+1);
        restApiHost.erase(pos);
      } else {
470
        restApiPort = RESTAPIPORT;
471
472
      }

473
      // Setting the size of the sensor cache
474
      // Conversion from milliseconds to nanoseconds
475
      mySensorCache.setMaxHistory(uint64_t(settings.pluginSettings.cacheInterval) * 1000000);
476

477
478
      //Allocate and initialize connection to Cassandra.
      dcdbConn = new DCDB::Connection(cassandraHost, atoi(cassandraPort.c_str()), settings.cassandraSettings.username, settings.cassandraSettings.password);
Alessio Netti's avatar
Alessio Netti committed
479
480
481
482
483
      dcdbConn->setNumThreadsIo(settings.cassandraSettings.numThreadsIo);
      dcdbConn->setQueueSizeIo(settings.cassandraSettings.queueSizeIo);
      uint32_t params[3] = {settings.cassandraSettings.coreConnPerHost, settings.cassandraSettings.maxConnPerHost, settings.cassandraSettings.maxConcRequests};
      dcdbConn->setBackendParams(params);

484

Axel Auweter's avatar
Axel Auweter committed
485
      if (!dcdbConn->connect()) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
486
          LOG(fatal) << "Cannot connect to Cassandra!";
487
488
489
490
491
492
          exit(EXIT_FAILURE);
      }

      /*
       * Legacy behavior: Initialize the DCDB schema in Cassandra.
       */
Axel Auweter's avatar
Axel Auweter committed
493
      dcdbConn->initSchema();
494

Alessio Netti's avatar
Alessio Netti committed
495

496
497
498
      /*
       * Allocate the SensorDataStore.
       */
499
      mySensorDataStore = new DCDB::SensorDataStore(dcdbConn);
500
      mySensorConfig = new DCDB::SensorConfig(dcdbConn);
501
502
503

      /*
       * Set TTL for data store inserts if TTL > 0.
504
       */
505
506
      if (settings.cassandraSettings.ttl > 0)
        mySensorDataStore->setTTL(settings.cassandraSettings.ttl);
Alessio Netti's avatar
Alessio Netti committed
507
      mySensorDataStore->setDebugLog(settings.cassandraSettings.debugLog);
508

509
510
511
512
      analyticsController = new AnalyticsController(mySensorConfig, mySensorDataStore);
      analyticsController->setCache(&mySensorCache);
      if(!analyticsController->initialize(settings, argv[argc - 1]))
          return EXIT_FAILURE;
513
      queryEngine.setQueryCallback(sensorQueryCallback);
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561

      LOG_LEVEL vLogLevel = validateConfig ? LOG_LEVEL::info : LOG_LEVEL::debug;
      LOG_VAR(vLogLevel) << "-----  Configuration  -----";

      //print global settings in either case
      LOG(info) << "Global Settings:";
      LOG(info) << "    MQTT-listenAddress: " << settings.mqttListenAddress;
      LOG(info) << "    CacheInterval:      " << int(settings.pluginSettings.cacheInterval/1000) << " [s]";
      LOG(info) << "    CleaningInterval:   " << settings.cleaningInterval << " [s]";
      LOG(info) << "    MessageThreads:     " << settings.messageThreads;
      LOG(info) << "    MessageSlots:       " << settings.messageSlots;
      LOG(info) << "    Daemonize:          " << (settings.daemonize ? "Enabled" : "Disabled");
      LOG(info) << "    Statistics:         " << (settings.statistics ? "Enabled" : "Disabled");
      LOG(info) << "    Write-Dir:          " << settings.pluginSettings.tempdir;
      LOG(info) << "    Hierarchy:          " << (settings.hierarchy!="" ? settings.hierarchy : "none");
      LOG(info) << (validateConfig ? "    Only validating config files." : "    ValidateConfig:     Disabled");

      LOG(info) << "Cassandra Driver Settings:";
      LOG(info) << "    Address:            " << settings.cassandraSettings.address;
      LOG(info) << "    TTL:                " << settings.cassandraSettings.ttl;
      LOG(info) << "    NumThreadsIO:       " << settings.cassandraSettings.numThreadsIo;
      LOG(info) << "    QueueSizeIO:        " << settings.cassandraSettings.queueSizeIo;
      LOG(info) << "    CoreConnPerHost:    " << settings.cassandraSettings.coreConnPerHost;
      LOG(info) << "    MaxConnPerHost:     " << settings.cassandraSettings.maxConnPerHost;
      LOG(info) << "    MaxConcRequests:    " << settings.cassandraSettings.maxConcRequests;
      LOG(info) << "    DebugLog:           " << (settings.cassandraSettings.debugLog ? "Enabled" : "Disabled");
#ifdef SimpleMQTTVerbose
      LOG(info) << "    Username:           " << settings.cassandraSettings.username;
	  LOG(info) << "    Password:           " << settings.cassandraSettings.password;
#else
      LOG(info) << "    Username and password not printed.";
#endif

      LOG(info) << "RestAPI Settings:";
      LOG(info) << "    REST Server: " << settings.restListenAddress;

      LOG_VAR(vLogLevel) << "-----  Analytics Configuration  -----";
      for(auto& p : analyticsController->getManager()->getPlugins()) {
          LOG_VAR(vLogLevel) << "Analytics Plugin \"" << p.id << "\"";
          p.configurator->printConfig(vLogLevel);
      }
      LOG_VAR(vLogLevel) << "-----  End Configuration  -----";

      if (validateConfig)
          return EXIT_SUCCESS;
      else
          analyticsController->start();

562
563
564
      /*
       * Start the MQTT Message Server.
       */
565
      SimpleMQTTServer ms(listenHost, listenPort, settings.messageThreads, settings.messageSlots);
566
      
567
568
569
      ms.setMessageCallback(mqttCallback);
      ms.start();

Alessio Netti's avatar
Logging    
Alessio Netti committed
570
      LOG(info) << "MQTT Server running...";
571
      
572
573
574
575
576
577
578
579
580
581
582
583
      /*
       * Start the HTTP Server for the REST API
       */
      std::thread httpThread;
      httpHandler_t httpHandler;
      httpServer_t::options httpOptions(httpHandler);

      httpOptions.reuse_address(true);
      httpOptions.thread_pool(std::make_shared<boost::network::utils::thread_pool>());
      httpServer_t httpServer(httpOptions.address(restApiHost).port(restApiPort));
      httpThread = std::thread([&httpServer] { httpServer.run(); });

Alessio Netti's avatar
Logging    
Alessio Netti committed
584
      LOG(info) <<  "HTTP Server running...";
585

586
587
588
589
590
591
      /*
       * Run (hopefully) forever...
       */
      keepRunning = 1;
      timeval start, end;
      double elapsed;
592
593
594
      msgCtr = 0;
      pmsgCtr = 0;
      readingCtr = 0;
595

Alessio Netti's avatar
Alessio Netti committed
596
597
598
      gettimeofday(&start, NULL);
      uint64_t lastCleanup = start.tv_sec;

599
      LOG(info) << "Collect Agent running...";
600
601
      while(keepRunning) {
          gettimeofday(&start, NULL);
Alessio Netti's avatar
Alessio Netti committed
602
603
604
605
606
607
608
          if(start.tv_sec - lastCleanup > settings.cleaningInterval) {
              uint64_t purged = mySensorCache.clean(settings.cleaningInterval * 1000000000);
              lastCleanup = start.tv_sec;
              if(purged > 0)
                  LOG(info) << "Cache: purged " << purged << " obsolete entries";
          }

609
          sleep(60);
610
611
612
613
          /* not really thread safe but will do the job */
          gettimeofday(&end, NULL);
          elapsed = (end.tv_sec - start.tv_sec) * 1000.0;
          elapsed += (end.tv_usec - start.tv_usec) / 1000.0;
614
          float publish = msgCtr?(pmsgCtr*100.0)/msgCtr:0;
Alessio Netti's avatar
Alessio Netti committed
615
          if (settings.statistics && keepRunning) {
616
              LOG(info) << "Performance: " << (readingCtr/elapsed)*1000.0 << " inserts/s, " << (msgCtr/elapsed)*1000.0 << " messages/s (" << publish << "% PUBLISH)";
617
              LOG(info) << "Analytics Performance: " << (analyticsController->getReadingCtr()/elapsed)*1000.0 << " inserts/s ";
618
          }
619
          msgCtr = 0;
620
          pmsgCtr = 0;
621
	  readingCtr = 0;
622
623
      }

Alessio Netti's avatar
Logging    
Alessio Netti committed
624
      LOG(info) << "Stopping...";
625
      analyticsController->stop();
626
      ms.stop();
Alessio Netti's avatar
Logging    
Alessio Netti committed
627
      LOG(info) << "MQTT Server stopped...";
628
629
      httpServer.stop();
      httpThread.join();
Alessio Netti's avatar
Logging    
Alessio Netti committed
630
      LOG(info) << "HTTP Server stopped...";
631
      delete mySensorDataStore;
632
      delete mySensorConfig;
Axel Auweter's avatar
Axel Auweter committed
633
634
      dcdbConn->disconnect();
      delete dcdbConn;
Alessio Netti's avatar
Logging    
Alessio Netti committed
635
      LOG(info) << "Collect Agent closed. Bye bye...";
636
  }
637
  catch (const exception& e) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
638
      LOG(fatal) << "Exception: " << e.what();
639
      abrt(EXIT_FAILURE, INTERR);
640
641
  }

642
  return EXIT_SUCCESS;
643
}
644
645