collectagent.cpp 22 KB
Newer Older
1
2
//================================================================================
// Name        : collectagent.cpp
3
4
// Author      : Axel Auweter
// Copyright   : Leibniz Supercomputing Centre
5
// Description : Main code of the CollectAgent
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2011-2016 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
26

27
#include <cstdlib>
28
#include <signal.h>
29
30
#include <unistd.h>
#include <string>
31

32
#include <boost/date_time/posix_time/posix_time.hpp>
33

34
#include <dcdb/connection.h>
35
#include <dcdb/sensordatastore.h>
36
#include <dcdb/sensorconfig.h>
37
#include <dcdb/version.h>
38
#include <dcdb/sensor.h>
39
#include "version.h"
40

41
#include "CARestAPI.h"
42
#include "configuration.h"
43
#include "simplemqttserver.h"
44
#include "messaging.h"
45
#include "abrt.h"
46
#include "dcdbdaemon.h"
47
#include "sensorcache.h"
48
49
#include "analyticscontroller.h"
#include "../analytics/includes/QueryEngine.h"
50

51
52
53
#define __STDC_FORMAT_MACROS
#include <inttypes.h>

54
using namespace std;
55
56

int keepRunning;
57
bool statistics;
58
59
uint64_t msgCtr;
uint64_t pmsgCtr;
60
uint64_t readingCtr;
Alessio Netti's avatar
Alessio Netti committed
61
SensorCache mySensorCache;
62
AnalyticsController* analyticsController;
63
DCDB::Connection* dcdbConn;
64
DCDB::SensorDataStore *mySensorDataStore;
65
66
DCDB::SensorConfig *mySensorConfig;
DCDB::SCError err;
67
QueryEngine& queryEngine = QueryEngine::getInstance();
68
logger_t lg;
69

70
std::vector<reading_t>* sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>* buffer, const bool rel) {
71
    std::string topic;
Alessio Netti's avatar
Alessio Netti committed
72
    // Getting the topic of the queried sensor from the Navigator
73
74
75
76
77
78
79
    try {
        topic = queryEngine.getNavigator()->getNodeTopic(name);
    } catch(const std::domain_error& e) {
        return NULL;
    }
    std::vector <reading_t> *output = NULL;
    DCDB::SensorId sid;
Alessio Netti's avatar
Alessio Netti committed
80
    // Creating a SID to perform the query
81
82
83
    sid.mqttTopicConvert(topic);
    if(mySensorCache.getSensorMap().count(sid) > 0) {
        CacheEntry &entry = mySensorCache.getSensorMap()[sid];
84
        output = entry.getView(startTs, endTs, buffer, rel);
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
        if (output->size() > 0)
            return output;
    }
    // If we are here then the sensor was not found in the cache - we need to fetch data from Cassandra
    try {
        DCDB::PublicSensor publicSensor;
        publicSensor.name = name;
        publicSensor.pattern = topic;
        std::list <DCDB::SensorDataStoreReading> results;
        DCDB::Sensor sensor(dcdbConn, publicSensor);
        uint64_t now = getTimestamp();
        //Converting relative timestamps to absolute
        uint64_t startTsInt = rel ? now - startTs : startTs;
        uint64_t endTsInt = rel ? now - endTs : endTs;
        DCDB::TimeStamp start(startTsInt), end(endTsInt);
        sensor.query(results, start, end, DCDB::AGGREGATE_NONE);
Alessio Netti's avatar
Alessio Netti committed
101
        // Dealing with allocations that may have been performed by the cache search
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
        if(output)
            output->clear();
        else if(buffer) {
            buffer->clear();
            output = buffer;
        } else
            output = new std::vector<reading_t>();
        reading_t reading;
        //TODO: fix when result contains only partial time range of the query
        for (const auto &r : results) {
            reading.value = r.value;
            reading.timestamp = r.timeStamp.getRaw();
            output->push_back(reading);
        }
    }
    catch(const std::exception& e) {
        if(!buffer && output) delete output;
        return NULL;
    }
    return output;
122
123
}

124
/* Normal termination (SIGINT, CTRL+C) */
125
126
void sigHandler(int sig)
{
Alessio Netti's avatar
Alessio Netti committed
127
128
129
130
131
  boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
  if( sig == SIGINT )
      LOG(fatal) << "Received SIGINT";
  else if( sig == SIGTERM )
      LOG(fatal) << "Received SIGTERM";
132
133
134
  keepRunning = 0;
}

135
136
137
138
139
140
/* Crash */
void abrtHandler(int sig)
{
  abrt(EXIT_FAILURE, SIGNAL);
}

141
int mqttCallback(SimpleMQTTMessage *msg)
142
143
144
145
{
  /*
   * Increment the msgCtr/vmsgCtr for statistics.
   */
146
  msgCtr++;
147
148
149
  if (msg->isPublish())
    pmsgCtr++;

150
  uint64_t len;
151
152
153
  /*
   * Decode the message and put into the database.
   */
154
  if (msg->isPublish()) {
155
156
157
158
      const char *topic = msg->getTopic().c_str();
      // We check whether the topic includes the \DCDB_MAP\ keyword, indicating that the payload will contain the
      // sensor's name. In that case, we set the mappingMessage flag to true, and filter the keyword out of the prefix
      // We use strncmp as it is the most efficient way to do it
Alessio Netti's avatar
Alessio Netti committed
159
      if (strncmp(topic, DCDB_MAP, DCDB_MAP_LEN) == 0) {
160
          if ((len = msg->getPayloadLength()) == 0) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
161
              LOG(error) << "Empty topic-to-name mapping message received";
162
163
              return 1;
          }
164

165
166
167
168
169
170
          string sensorName((char *) msg->getPayload(), len);
          err = mySensorConfig->publishSensor(sensorName.c_str(), topic + DCDB_MAP_LEN);

          // PublishSensor does most of the error checking for us
          switch (err) {
              case DCDB::SC_INVALIDPATTERN:
Alessio Netti's avatar
Logging    
Alessio Netti committed
171
                  LOG(error) << "Invalid sensor topic : " << msg->getTopic();
172
173
                  return 1;
              case DCDB::SC_INVALIDPUBLICNAME:
Alessio Netti's avatar
Logging    
Alessio Netti committed
174
                  LOG(error) << "Invalid sensor public name: " << sensorName;
175
176
                  return 1;
              case DCDB::SC_INVALIDSESSION:
Alessio Netti's avatar
Logging    
Alessio Netti committed
177
                  LOG(error) << "Cannot reach sensor data store.";
178
179
180
181
182
183
184
185
186
187
188
                  return 1;
              default:
                  break;
          }
      } else {
          mqttPayload buf, *payload;

          len = msg->getPayloadLength();
          //In the 64 bit message case, the collect agent provides a timestamp
          if (len == sizeof(uint64_t)) {
              payload = &buf;
189
              payload->value = *((int64_t *) msg->getPayload());
190
191
              payload->timestamp = Messaging::calculateTimestamp();
              len = sizeof(uint64_t) * 2;
192
          }
193
194
195
196
197
198
              //...otherwise it just retrieves it from the MQTT message payload.
          else if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
              payload = (mqttPayload *) msg->getPayload();
          }
              //...otherwise this message is malformed -> ignore...
          else {
Alessio Netti's avatar
Logging    
Alessio Netti committed
199
              LOG(error) << "Message malformed";
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
              return 1;
          }

          /*
           * Check if we can decode the message topic
           * into a valid SensorId. If successful, store
           * the record in the database.
           */
          DCDB::SensorId sid;
          if (sid.mqttTopicConvert(msg->getTopic())) {
#if 0
              cout << "Topic decode successful:" << endl
                  << "  Raw:            " << hex << setw(16) << setfill('0') << sid.getRaw()[0] << hex << setw(16) << setfill('0') << sid.getRaw()[1] << endl
                  << "  DeviceLocation: " << hex << setw(16) << setfill('0') << sid.getDeviceLocation() << endl
                  << "  device_id:      " << hex << setw(8) << setfill('0') << sid.getDeviceSensorId().device_id << endl
                  << "  sensor_number:  " << hex << setw(4) << setfill('0') << sid.getDeviceSensorId().sensor_number << endl << dec;

              cout << "Payload ("  << len/sizeof(mqttPayload) << " messages):"<< endl;
              for (uint64_t i=0; i<len/sizeof(mqttPayload); i++) {
                cout << "  " << i << ": ts=" << payload[i].timestamp << " val=" << payload[i].value << endl;
              }
              cout << endl;
222
#endif
223
	      std::list<DCDB::SensorDataStoreReading> readings;
Alessio Netti's avatar
Alessio Netti committed
224
225
226
227
228
229
          for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
              DCDB::SensorDataStoreReading r(sid, payload[i].timestamp, payload[i].value);
              readings.push_back(r);
              mySensorCache.storeSensor(sid, payload[i].timestamp, payload[i].value);
          }
          mySensorCache.getSensorMap()[sid].updateBatchSize(uint64_t(len / sizeof(mqttPayload)));
230
	      mySensorDataStore->insertBatch(readings);
231
	      readingCtr+= readings.size();
232

233
              //mySensorCache.dump();
234
          }
235
#if 1
236
237
238
239
          else {
              cout << "Wrong topic format: " << msg->getTopic() << "\n";
              return 1;
          }
240
#endif
241
      }
242
  }
243
  return 0;
244
245
}

246
247
248



249
250
251
252
/*
 * Print usage information
 */
void usage() {
Alessio Netti's avatar
Alessio Netti committed
253
  Configuration config("", "collectagent.conf");
254
255
256
257
258
  /*
             1         2         3         4         5         6         7         8
   012345678901234567890123456789012345678901234567890123456789012345678901234567890
   */
  cout << "Usage:" << endl;
Alessio Netti's avatar
Alessio Netti committed
259
  cout << "  collectagent [-d] [-s] [-x] [-a<string>] [-m<host>] [-c<host>] [-u<username>] [-p<password>] [-t<ttl>] [-v<verbosity>] <path/to/configfiles/>" << endl;
260
261
262
263
  cout << "  collectagent -h" << endl;
  cout << endl;
  
  cout << "Options:" << endl;
264
  cout << "  -a <string>   Auto-publish pattern    [default: none]" << endl;
Alessio Netti's avatar
Alessio Netti committed
265
266
  cout << "  -m<host>      MQTT listen address     [default: " << config.mqttListenHost << ":" << config.mqttListenPort << "]" << endl;
  cout << "  -c<host>      Cassandra host          [default: " << config.cassandraSettings.host << ":" << config.cassandraSettings.port << "]" << endl;
Michael Ott's avatar
Michael Ott committed
267
268
  cout << "  -u<username>  Cassandra username      [default: none]" << endl;
  cout << "  -p<password>  Cassandra password      [default: none]" << endl;
Alessio Netti's avatar
Alessio Netti committed
269
270
  cout << "  -t<ttl>       Cassandra insert TTL    [default: " << config.cassandraSettings.ttl << "]" << endl;
  cout << "  -v<level>     Set verbosity of output [default: " << config.logLevelCmd << "]" << endl
Alessio Netti's avatar
Logging    
Alessio Netti committed
271
       << "                Can be a number between 5 (all) and 0 (fatal)." << endl;
272
  cout << endl;
Michael Ott's avatar
Michael Ott committed
273
  cout << "  -d            Daemonize" << endl;
274
  cout << "  -s            Print message stats" <<endl;
275
  cout << "  -x            Parse and print the config but do not actually start collectagent" << endl;
Michael Ott's avatar
Michael Ott committed
276
  cout << "  -h            This help page" << endl;
277
  cout << endl;
278
279
280
}

int main(int argc, char* const argv[]) {
281
    cout << "CollectAgent " << VERSION << " (libdcdb " << DCDB::Version::getVersion() << ")" << endl << endl;
282
283

  try{
284

285
286
287
288
289
290
291
292
      // Checking if path to config is supplied
      if (argc <= 1) {
          cout << "Please specify a path to the config-directory" << endl << endl;
          usage();
          exit(EXIT_FAILURE);
      }

      // Defining options
293
      const char* opts = "a:m:r:c:C:u:p:t:v:dDsxh";
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309

      // Same mechanism as in DCDBPusher - checking if help string is requested before loading config
      char ret;
      while ((ret = getopt(argc, argv, opts)) != -1) {
          switch (ret)
          {
              case 'h':
                  usage();
                  exit(EXIT_FAILURE);
                  break;
              default:
                  //do nothing (other options are read later on)
                  break;
          }
      }

310
311
      initLogging();
      auto cmdSink = setupCmdLogger();
Alessio Netti's avatar
Logging    
Alessio Netti committed
312

Alessio Netti's avatar
Alessio Netti committed
313
314
      Configuration config(argv[argc - 1], "collectagent.conf");
      if( !config.readConfig() ) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
315
          LOG(fatal) << "Failed to read global configuration!";
316
317
318
          exit(EXIT_FAILURE);
      }

Alessio Netti's avatar
Alessio Netti committed
319
320
321
322
      // References to shorten access to config parameters
      Configuration& settings = config;
      cassandra_t& cassandraSettings = config.cassandraSettings;
      pluginSettings_t& pluginSettings = config.pluginSettings;
Micha Mueller's avatar
Micha Mueller committed
323
      serverSettings_t& restAPISettings = config.restAPISettings;
Alessio Netti's avatar
Alessio Netti committed
324
      analyticsSettings_t& analyticsSettings = config.analyticsSettings;
Alessio Netti's avatar
Alessio Netti committed
325
      
326
327
      optind = 1;
      while ((ret=getopt(argc, argv, opts))!=-1) {
328
          switch(ret) {
329
              case 'a':
Alessio Netti's avatar
Alessio Netti committed
330
                  pluginSettings.sensorPattern = optarg;
331
                  break;
332
              case 'm':
Alessio Netti's avatar
Alessio Netti committed
333
334
335
                  settings.mqttListenHost = parseNetworkHost(optarg);
                  settings.mqttListenPort = parseNetworkPort(optarg);
                  if(settings.mqttListenPort=="") settings.mqttListenPort = string(LISTENPORT);
336
                  break;
337
              case 'c':
Alessio Netti's avatar
Alessio Netti committed
338
339
340
                  cassandraSettings.host = parseNetworkHost(optarg);
                  cassandraSettings.port = parseNetworkPort(optarg);
                  if(cassandraSettings.port=="") cassandraSettings.port = string(CASSANDRAPORT);
341
                  break;
Michael Ott's avatar
Michael Ott committed
342
              case 'u':
Alessio Netti's avatar
Alessio Netti committed
343
                  cassandraSettings.username = optarg;
344
                  break;
Michael Ott's avatar
Michael Ott committed
345
              case 'p': {
Alessio Netti's avatar
Alessio Netti committed
346
                  cassandraSettings.password = optarg;
347
                  // What does this do? Mask the password?
Michael Ott's avatar
Michael Ott committed
348
349
350
351
352
353
354
                  size_t pwdLen = strlen(optarg);
                  memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
                  if (pwdLen > 3) {
                      memset(optarg+3, 0, pwdLen-3);
                  }
                  break;
              }
355
              case 't':
Alessio Netti's avatar
Alessio Netti committed
356
                  cassandraSettings.ttl = stoul(optarg);
357
                  break;
Alessio Netti's avatar
Logging    
Alessio Netti committed
358
              case 'v':
359
                  settings.logLevelCmd = stoi(optarg);
Alessio Netti's avatar
Logging    
Alessio Netti committed
360
                  break;
361
              case 'd':
362
              case 'D':
363
                  settings.daemonize = 1;
364
                  break;
365
              case 's':
366
                  settings.statistics = 1;
367
                  break;
368
              case 'x':
Alessio Netti's avatar
Alessio Netti committed
369
                  settings.validateConfig = true;
370
                  break;
371
              case 'h':
372
373
374
              default:
                  usage();
                  exit(EXIT_FAILURE);
375
376
377
          }
      }

378
379
      //set up logger to file
      if (settings.logLevelFile >= 0) {
Alessio Netti's avatar
Alessio Netti committed
380
	  auto fileSink = setupFileLogger(pluginSettings.tempdir, std::string("collectagent"));
381
382
383
	  fileSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelFile));
      }
      
Alessio Netti's avatar
Logging    
Alessio Netti committed
384
      //severity level may be overwritten (per option or config-file) --> set it according to globalSettings
385
386
387
      if (settings.logLevelCmd >= 0) {
	  cmdSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelCmd));
      }
Alessio Netti's avatar
Logging    
Alessio Netti committed
388

389
      /*
Alessio Netti's avatar
Alessio Netti committed
390
       * Catch SIGINT and SIGTERM signals to allow for proper server shutdowns.
391
392
       */
      signal(SIGINT, sigHandler);
Alessio Netti's avatar
Alessio Netti committed
393
      signal(SIGTERM, sigHandler);
394
395
396
397
398
399
400
401
402
403

      /*
       * Catch critical signals to allow for backtraces
       */
      signal(SIGABRT, abrtHandler);
      signal(SIGSEGV, abrtHandler);

      // Daemonizing the collectagent
      if(settings.daemonize)
          dcdbdaemon();
Alessio Netti's avatar
Alessio Netti committed
404
      
405
      // Setting the size of the sensor cache
406
      // Conversion from milliseconds to nanoseconds
Alessio Netti's avatar
Alessio Netti committed
407
      mySensorCache.setMaxHistory(uint64_t(pluginSettings.cacheInterval) * 1000000);
408

409
      //Allocate and initialize connection to Cassandra.
Alessio Netti's avatar
Alessio Netti committed
410
411
      dcdbConn = new DCDB::Connection(cassandraSettings.host, atoi(cassandraSettings.port.c_str()), 
                                      cassandraSettings.username, cassandraSettings.password);
Alessio Netti's avatar
Alessio Netti committed
412
413
      dcdbConn->setNumThreadsIo(cassandraSettings.numThreadsIo);
      dcdbConn->setQueueSizeIo(cassandraSettings.queueSizeIo);
414
      uint32_t params[1] = {cassandraSettings.coreConnPerHost};
Alessio Netti's avatar
Alessio Netti committed
415
      dcdbConn->setBackendParams(params);
Alessio Netti's avatar
Alessio Netti committed
416
      
Axel Auweter's avatar
Axel Auweter committed
417
      if (!dcdbConn->connect()) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
418
          LOG(fatal) << "Cannot connect to Cassandra!";
419
420
421
422
423
424
          exit(EXIT_FAILURE);
      }

      /*
       * Legacy behavior: Initialize the DCDB schema in Cassandra.
       */
Axel Auweter's avatar
Axel Auweter committed
425
      dcdbConn->initSchema();
Alessio Netti's avatar
Alessio Netti committed
426
      
427
428
429
      /*
       * Allocate the SensorDataStore.
       */
430
      mySensorDataStore = new DCDB::SensorDataStore(dcdbConn);
431
      mySensorConfig = new DCDB::SensorConfig(dcdbConn);
432
433
434

      /*
       * Set TTL for data store inserts if TTL > 0.
435
       */
Alessio Netti's avatar
Alessio Netti committed
436
437
438
      if (cassandraSettings.ttl > 0)
        mySensorDataStore->setTTL(cassandraSettings.ttl);
      mySensorDataStore->setDebugLog(cassandraSettings.debugLog);
439

440
441
442
443
      analyticsController = new AnalyticsController(mySensorConfig, mySensorDataStore);
      analyticsController->setCache(&mySensorCache);
      if(!analyticsController->initialize(settings, argv[argc - 1]))
          return EXIT_FAILURE;
444
      queryEngine.setQueryCallback(sensorQueryCallback);
445

Alessio Netti's avatar
Alessio Netti committed
446
      LOG_LEVEL vLogLevel = settings.validateConfig ? LOG_LEVEL::info : LOG_LEVEL::debug;
447
448
449
450
      LOG_VAR(vLogLevel) << "-----  Configuration  -----";

      //print global settings in either case
      LOG(info) << "Global Settings:";
Alessio Netti's avatar
Alessio Netti committed
451
      LOG(info) << "    MQTT-listenAddress: " << settings.mqttListenHost << ":" << settings.mqttListenPort;
Alessio Netti's avatar
Alessio Netti committed
452
      LOG(info) << "    CacheInterval:      " << int(pluginSettings.cacheInterval/1000) << " [s]";
453
454
455
456
457
      LOG(info) << "    CleaningInterval:   " << settings.cleaningInterval << " [s]";
      LOG(info) << "    MessageThreads:     " << settings.messageThreads;
      LOG(info) << "    MessageSlots:       " << settings.messageSlots;
      LOG(info) << "    Daemonize:          " << (settings.daemonize ? "Enabled" : "Disabled");
      LOG(info) << "    Statistics:         " << (settings.statistics ? "Enabled" : "Disabled");
Alessio Netti's avatar
Alessio Netti committed
458
459
460
      LOG(info) << "    MQTT-prefix:        " << pluginSettings.mqttPrefix;
      LOG(info) << "    Write-Dir:          " << pluginSettings.tempdir;
      LOG(info) << (settings.validateConfig ? "    Only validating config files." : "    ValidateConfig:     Disabled");
461

Alessio Netti's avatar
Alessio Netti committed
462
463
      LOG(info) << "Analytics Settings:";
      LOG(info) << "    Hierarchy:          " << (analyticsSettings.hierarchy!="" ? analyticsSettings.hierarchy : "none");
464
      LOG(info) << "    Filter:             " << (analyticsSettings.filter!="" ? analyticsSettings.filter : "none");
Alessio Netti's avatar
Alessio Netti committed
465
      
466
      LOG(info) << "Cassandra Driver Settings:";
Alessio Netti's avatar
Alessio Netti committed
467
      LOG(info) << "    Address:            " << cassandraSettings.host << ":" << cassandraSettings.port;
Alessio Netti's avatar
Alessio Netti committed
468
469
470
471
472
      LOG(info) << "    TTL:                " << cassandraSettings.ttl;
      LOG(info) << "    NumThreadsIO:       " << cassandraSettings.numThreadsIo;
      LOG(info) << "    QueueSizeIO:        " << cassandraSettings.queueSizeIo;
      LOG(info) << "    CoreConnPerHost:    " << cassandraSettings.coreConnPerHost;
      LOG(info) << "    DebugLog:           " << (cassandraSettings.debugLog ? "Enabled" : "Disabled");
473
#ifdef SimpleMQTTVerbose
Alessio Netti's avatar
Alessio Netti committed
474
475
      LOG(info) << "    Username:           " << cassandraSettings.username;
	  LOG(info) << "    Password:           " << cassandraSettings.password;
476
477
478
479
480
#else
      LOG(info) << "    Username and password not printed.";
#endif

      LOG(info) << "RestAPI Settings:";
Micha Mueller's avatar
Micha Mueller committed
481
      LOG(info) << "    REST Server: " << restAPISettings.host << ":" << restAPISettings.port;
Alessio Netti's avatar
Alessio Netti committed
482
483
484
485
486
487
488
#ifdef SimpleMQTTVerbose
      LOG(info) << "    Certificate: " << restAPISettings.certificate;
	  LOG(info) << "    Private key file: " << restAPISettings.privateKey;
	  LOG(info) << "    DH params from: " << restAPISettings.dhFile;
#else
      LOG(info) << "    Certificate, private key and DH-param file not printed.";
#endif
489
490
491
492
493
494
495
496

      LOG_VAR(vLogLevel) << "-----  Analytics Configuration  -----";
      for(auto& p : analyticsController->getManager()->getPlugins()) {
          LOG_VAR(vLogLevel) << "Analytics Plugin \"" << p.id << "\"";
          p.configurator->printConfig(vLogLevel);
      }
      LOG_VAR(vLogLevel) << "-----  End Configuration  -----";

Alessio Netti's avatar
Alessio Netti committed
497
      if (settings.validateConfig)
498
499
500
501
          return EXIT_SUCCESS;
      else
          analyticsController->start();

502
503
504
      /*
       * Start the MQTT Message Server.
       */
Alessio Netti's avatar
Alessio Netti committed
505
      SimpleMQTTServer ms(settings.mqttListenHost, settings.mqttListenPort, settings.messageThreads, settings.messageSlots);
506
      
507
508
509
      ms.setMessageCallback(mqttCallback);
      ms.start();

Alessio Netti's avatar
Logging    
Alessio Netti committed
510
      LOG(info) << "MQTT Server running...";
511
      
512
513
514
      /*
       * Start the HTTP Server for the REST API
       */
515
      CARestAPI httpsServer(restAPISettings, &mySensorCache, analyticsController);
516
      config.readRestAPIUsers(&httpsServer);
517
      httpsServer.start();
518

Alessio Netti's avatar
Logging    
Alessio Netti committed
519
      LOG(info) <<  "HTTP Server running...";
520

521
522
523
524
525
526
      /*
       * Run (hopefully) forever...
       */
      keepRunning = 1;
      timeval start, end;
      double elapsed;
527
528
529
      msgCtr = 0;
      pmsgCtr = 0;
      readingCtr = 0;
530

Alessio Netti's avatar
Alessio Netti committed
531
532
533
      gettimeofday(&start, NULL);
      uint64_t lastCleanup = start.tv_sec;

534
      LOG(info) << "Collect Agent running...";
535
536
      while(keepRunning) {
          gettimeofday(&start, NULL);
Alessio Netti's avatar
Alessio Netti committed
537
538
539
540
541
542
543
          if(start.tv_sec - lastCleanup > settings.cleaningInterval) {
              uint64_t purged = mySensorCache.clean(settings.cleaningInterval * 1000000000);
              lastCleanup = start.tv_sec;
              if(purged > 0)
                  LOG(info) << "Cache: purged " << purged << " obsolete entries";
          }

544
          sleep(60);
545
546
547
548
          /* not really thread safe but will do the job */
          gettimeofday(&end, NULL);
          elapsed = (end.tv_sec - start.tv_sec) * 1000.0;
          elapsed += (end.tv_usec - start.tv_usec) / 1000.0;
549
          float publish = msgCtr?(pmsgCtr*100.0)/msgCtr:0;
Alessio Netti's avatar
Alessio Netti committed
550
          if (settings.statistics && keepRunning) {
551
              LOG(info) << "Performance: " << (readingCtr/elapsed)*1000.0 << " inserts/s, " << (msgCtr/elapsed)*1000.0 << " messages/s (" << publish << "% PUBLISH)";
552
              LOG(info) << "Analytics Performance: " << (analyticsController->getReadingCtr()/elapsed)*1000.0 << " inserts/s ";
553
          }
554
          msgCtr = 0;
555
          pmsgCtr = 0;
556
	  readingCtr = 0;
557
558
      }

Alessio Netti's avatar
Logging    
Alessio Netti committed
559
      LOG(info) << "Stopping...";
560
      analyticsController->stop();
561
      ms.stop();
Alessio Netti's avatar
Logging    
Alessio Netti committed
562
      LOG(info) << "MQTT Server stopped...";
563
      httpsServer.stop();
Alessio Netti's avatar
Logging    
Alessio Netti committed
564
      LOG(info) << "HTTP Server stopped...";
565
      delete mySensorDataStore;
566
      delete mySensorConfig;
Axel Auweter's avatar
Axel Auweter committed
567
568
      dcdbConn->disconnect();
      delete dcdbConn;
Alessio Netti's avatar
Logging    
Alessio Netti committed
569
      LOG(info) << "Collect Agent closed. Bye bye...";
570
  }
571
  catch (const exception& e) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
572
      LOG(fatal) << "Exception: " << e.what();
573
      abrt(EXIT_FAILURE, INTERR);
574
575
  }

576
  return EXIT_SUCCESS;
577
}
578
579