Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

collectagent.cpp 24.3 KB
Newer Older
1
2
//================================================================================
// Name        : collectagent.cpp
3
4
// Author      : Axel Auweter
// Copyright   : Leibniz Supercomputing Centre
5
// Description : Main code of the CollectAgent
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2011-2016 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
26

27
28
#include <boost/network/protocol/http/server.hpp>
#include <boost/network/utils/thread_pool.hpp>
29
#include <boost/network/uri.hpp>
30
#include <iostream>
31
#include <cstdlib>
32
#include <signal.h>
33
34
#include <unistd.h>
#include <string>
35

36
#include <boost/date_time/posix_time/posix_time.hpp>
37

38
#include <dcdb/connection.h>
39
#include <dcdb/sensordatastore.h>
40
#include <dcdb/sensorconfig.h>
41
#include <dcdb/version.h>
42
#include <dcdb/sensor.h>
43
#include "version.h"
44

45
#include "configuration.h"
46
#include "simplemqttserver.h"
47
#include "messaging.h"
48
#include "abrt.h"
49
#include "dcdbdaemon.h"
50
#include "sensorcache.h"
51
52
#include "analyticscontroller.h"
#include "../analytics/includes/QueryEngine.h"
53

54
55
56
#define __STDC_FORMAT_MACROS
#include <inttypes.h>

57
using namespace std;
58
59

int keepRunning;
60
bool statistics;
61
62
uint64_t msgCtr;
uint64_t pmsgCtr;
63
uint64_t readingCtr;
Alessio Netti's avatar
Alessio Netti committed
64
SensorCache mySensorCache;
65
AnalyticsController* analyticsController;
66
DCDB::Connection* dcdbConn;
67
DCDB::SensorDataStore *mySensorDataStore;
68
69
DCDB::SensorConfig *mySensorConfig;
DCDB::SCError err;
70
QueryEngine& queryEngine = QueryEngine::getInstance();
71
logger_t lg;
72

73
std::vector<reading_t>* sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>* buffer, const bool rel) {
74
    std::string topic;
Alessio Netti's avatar
Alessio Netti committed
75
    // Getting the topic of the queried sensor from the Navigator
76
77
78
79
80
81
82
    try {
        topic = queryEngine.getNavigator()->getNodeTopic(name);
    } catch(const std::domain_error& e) {
        return NULL;
    }
    std::vector <reading_t> *output = NULL;
    DCDB::SensorId sid;
Alessio Netti's avatar
Alessio Netti committed
83
    // Creating a SID to perform the query
84
85
86
    sid.mqttTopicConvert(topic);
    if(mySensorCache.getSensorMap().count(sid) > 0) {
        CacheEntry &entry = mySensorCache.getSensorMap()[sid];
Alessio Netti's avatar
Alessio Netti committed
87
88
        // getView is called with live=false to drop strict staleness checks
        output = entry.getView(startTs, endTs, buffer, rel, false);
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
        if (output->size() > 0)
            return output;
    }
    // If we are here then the sensor was not found in the cache - we need to fetch data from Cassandra
    try {
        DCDB::PublicSensor publicSensor;
        publicSensor.name = name;
        publicSensor.pattern = topic;
        std::list <DCDB::SensorDataStoreReading> results;
        DCDB::Sensor sensor(dcdbConn, publicSensor);
        uint64_t now = getTimestamp();
        //Converting relative timestamps to absolute
        uint64_t startTsInt = rel ? now - startTs : startTs;
        uint64_t endTsInt = rel ? now - endTs : endTs;
        DCDB::TimeStamp start(startTsInt), end(endTsInt);
        sensor.query(results, start, end, DCDB::AGGREGATE_NONE);
Alessio Netti's avatar
Alessio Netti committed
105
        // Dealing with allocations that may have been performed by the cache search
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
        if(output)
            output->clear();
        else if(buffer) {
            buffer->clear();
            output = buffer;
        } else
            output = new std::vector<reading_t>();
        reading_t reading;
        //TODO: fix when result contains only partial time range of the query
        for (const auto &r : results) {
            reading.value = r.value;
            reading.timestamp = r.timeStamp.getRaw();
            output->push_back(reading);
        }
    }
    catch(const std::exception& e) {
        if(!buffer && output) delete output;
        return NULL;
    }
    return output;
126
127
}

128
/* Normal termination (SIGINT, CTRL+C) */
129
130
void sigHandler(int sig)
{
Alessio Netti's avatar
Alessio Netti committed
131
132
133
134
135
  boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
  if( sig == SIGINT )
      LOG(fatal) << "Received SIGINT";
  else if( sig == SIGTERM )
      LOG(fatal) << "Received SIGTERM";
136
137
138
  keepRunning = 0;
}

139
140
141
142
143
144
/* Crash */
void abrtHandler(int sig)
{
  abrt(EXIT_FAILURE, SIGNAL);
}

145
146
147
148
149
struct httpHandler_t;
typedef boost::network::http::server<httpHandler_t> httpServer_t;
struct httpHandler_t {
  void operator()(httpServer_t::request const &request, httpServer_t::connection_ptr connection) {
    httpServer_t::string_type ip = source(request);
150
    std::ostringstream data;
151
    static httpServer_t::response_header headers[] = { { "Connection", "close" }, { "Content-Type", "text/plain" } };
152

153
154
    //try getting the latest value 
    try {
155
156
157
158
159
      boost::network::uri::uri uri("http://localhost"+request.destination);
      std::map<std::string, std::string> queries;
      boost::network::uri::query_map(uri, queries);
      int avg = atoi(queries.find("avg")->second.c_str());

Alessio Netti's avatar
Alessio Netti committed
160
      int64_t val = mySensorCache.getSensor(uri.path(), (uint64_t)avg * 1000000000);
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184

      data << val << "\n";
      //data << "Sid : " << sid.toString() << ", Value: " << val << "." << std::endl;

      connection->set_status(httpServer_t::connection::ok);
      connection->set_headers(boost::make_iterator_range(headers, headers + 2));
      connection->write(data.str());

    }

    catch (const std::invalid_argument& e) {
      connection->set_status(httpServer_t::connection::not_found);
      connection->set_headers(boost::make_iterator_range(headers, headers + 2));
      connection->write("Error: Sensor id not found.\n");
    } catch (const std::out_of_range &e) {
      connection->set_status(httpServer_t::connection::no_content);
      connection->set_headers(boost::make_iterator_range(headers, headers + 2));
      connection->write("Error: Sensor unavailable.\n");
    } catch (const std::exception& e) {
      connection->set_status(httpServer_t::connection::internal_server_error);
      connection->set_headers(boost::make_iterator_range(headers, headers + 2));
      connection->write("Server error.\n");
    }

185
186
187
188
  }
};


189
int mqttCallback(SimpleMQTTMessage *msg)
190
191
192
193
{
  /*
   * Increment the msgCtr/vmsgCtr for statistics.
   */
194
  msgCtr++;
195
196
197
  if (msg->isPublish())
    pmsgCtr++;

198
  uint64_t len;
199
200
201
  /*
   * Decode the message and put into the database.
   */
202
  if (msg->isPublish()) {
203
204
205
206
      const char *topic = msg->getTopic().c_str();
      // We check whether the topic includes the \DCDB_MAP\ keyword, indicating that the payload will contain the
      // sensor's name. In that case, we set the mappingMessage flag to true, and filter the keyword out of the prefix
      // We use strncmp as it is the most efficient way to do it
Alessio Netti's avatar
Alessio Netti committed
207
      if (strncmp(topic, DCDB_MAP, DCDB_MAP_LEN) == 0) {
208
          if ((len = msg->getPayloadLength()) == 0) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
209
              LOG(error) << "Empty topic-to-name mapping message received";
210
211
              return 1;
          }
212

213
214
215
216
217
218
          string sensorName((char *) msg->getPayload(), len);
          err = mySensorConfig->publishSensor(sensorName.c_str(), topic + DCDB_MAP_LEN);

          // PublishSensor does most of the error checking for us
          switch (err) {
              case DCDB::SC_INVALIDPATTERN:
Alessio Netti's avatar
Logging    
Alessio Netti committed
219
                  LOG(error) << "Invalid sensor topic : " << msg->getTopic();
220
221
                  return 1;
              case DCDB::SC_INVALIDPUBLICNAME:
Alessio Netti's avatar
Logging    
Alessio Netti committed
222
                  LOG(error) << "Invalid sensor public name: " << sensorName;
223
224
                  return 1;
              case DCDB::SC_INVALIDSESSION:
Alessio Netti's avatar
Logging    
Alessio Netti committed
225
                  LOG(error) << "Cannot reach sensor data store.";
226
227
228
229
230
231
232
233
234
235
236
                  return 1;
              default:
                  break;
          }
      } else {
          mqttPayload buf, *payload;

          len = msg->getPayloadLength();
          //In the 64 bit message case, the collect agent provides a timestamp
          if (len == sizeof(uint64_t)) {
              payload = &buf;
237
              payload->value = *((int64_t *) msg->getPayload());
238
239
              payload->timestamp = Messaging::calculateTimestamp();
              len = sizeof(uint64_t) * 2;
240
          }
241
242
243
244
245
246
              //...otherwise it just retrieves it from the MQTT message payload.
          else if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
              payload = (mqttPayload *) msg->getPayload();
          }
              //...otherwise this message is malformed -> ignore...
          else {
Alessio Netti's avatar
Logging    
Alessio Netti committed
247
              LOG(error) << "Message malformed";
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
              return 1;
          }

          /*
           * Check if we can decode the message topic
           * into a valid SensorId. If successful, store
           * the record in the database.
           */
          DCDB::SensorId sid;
          if (sid.mqttTopicConvert(msg->getTopic())) {
#if 0
              cout << "Topic decode successful:" << endl
                  << "  Raw:            " << hex << setw(16) << setfill('0') << sid.getRaw()[0] << hex << setw(16) << setfill('0') << sid.getRaw()[1] << endl
                  << "  DeviceLocation: " << hex << setw(16) << setfill('0') << sid.getDeviceLocation() << endl
                  << "  device_id:      " << hex << setw(8) << setfill('0') << sid.getDeviceSensorId().device_id << endl
                  << "  sensor_number:  " << hex << setw(4) << setfill('0') << sid.getDeviceSensorId().sensor_number << endl << dec;

              cout << "Payload ("  << len/sizeof(mqttPayload) << " messages):"<< endl;
              for (uint64_t i=0; i<len/sizeof(mqttPayload); i++) {
                cout << "  " << i << ": ts=" << payload[i].timestamp << " val=" << payload[i].value << endl;
              }
              cout << endl;
270
#endif
271
	      std::list<DCDB::SensorDataStoreReading> readings;
272
              for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
273
274
		  DCDB::SensorDataStoreReading r(sid, payload[i].timestamp, payload[i].value);
		  readings.push_back(r);
275
276
                  mySensorCache.storeSensor(sid, payload[i].timestamp, payload[i].value);
              }
277
	      mySensorDataStore->insertBatch(readings);
278
	      readingCtr+= readings.size();
279

280
              //mySensorCache.dump();
281
          }
282
#if 1
283
284
285
286
          else {
              cout << "Wrong topic format: " << msg->getTopic() << "\n";
              return 1;
          }
287
#endif
288
      }
289
  }
290
  return 0;
291
292
}

293
294
295



296
297
298
299
/*
 * Print usage information
 */
void usage() {
Alessio Netti's avatar
Alessio Netti committed
300
301
  Configuration config("");
  globalCA_t& defaults = config.getGlobal();
302
303
304
305
306
  /*
             1         2         3         4         5         6         7         8
   012345678901234567890123456789012345678901234567890123456789012345678901234567890
   */
  cout << "Usage:" << endl;
307
  cout << "  collectagent [-d] [-s] [-x] [-a<string>] [-m<host>] [-r<host>] [-c<host>] [-u<username>] [-p<password>] [-t<ttl>] [-v<verbosity>] <path/to/configfiles/>" << endl;
308
309
310
311
  cout << "  collectagent -h" << endl;
  cout << endl;
  
  cout << "Options:" << endl;
312
  cout << "  -a <string>   Auto-publish pattern    [default: none]" << endl;
313
314
315
  cout << "  -m<host>      MQTT listen address     [default: " << defaults.mqttListenAddress << "]" << endl;
  cout << "  -r<host>      REST API listen address [default: " << defaults.restListenAddress << "]" << endl;
  cout << "  -c<host>      Cassandra host          [default: " << defaults.cassandraSettings.address << "]" << endl;
Michael Ott's avatar
Michael Ott committed
316
317
  cout << "  -u<username>  Cassandra username      [default: none]" << endl;
  cout << "  -p<password>  Cassandra password      [default: none]" << endl;
318
  cout << "  -t<ttl>       Cassandra insert TTL    [default: " << defaults.cassandraSettings.ttl << "]" << endl;
Alessio Netti's avatar
Logging    
Alessio Netti committed
319
320
  cout << "  -v<level>     Set verbosity of output [default: " << defaults.logLevelCmd << "]" << endl
       << "                Can be a number between 5 (all) and 0 (fatal)." << endl;
321
  cout << endl;
Michael Ott's avatar
Michael Ott committed
322
  cout << "  -d            Daemonize" << endl;
323
  cout << "  -s            Print message stats" <<endl;
324
  cout << "  -x            Parse and print the config but do not actually start collectagent" << endl;
Michael Ott's avatar
Michael Ott committed
325
  cout << "  -h            This help page" << endl;
326
  cout << endl;
327
328
329
}

int main(int argc, char* const argv[]) {
330
    cout << "CollectAgent " << VERSION << " (libdcdb " << DCDB::Version::getVersion() << ")" << endl << endl;
331
    bool validateConfig = false;
332
333

  try{
334

335
336
337
338
339
340
341
342
      // Checking if path to config is supplied
      if (argc <= 1) {
          cout << "Please specify a path to the config-directory" << endl << endl;
          usage();
          exit(EXIT_FAILURE);
      }

      // Defining options
343
      const char* opts = "a:m:r:c:C:u:p:t:v:dDsxh";
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359

      // Same mechanism as in DCDBPusher - checking if help string is requested before loading config
      char ret;
      while ((ret = getopt(argc, argv, opts)) != -1) {
          switch (ret)
          {
              case 'h':
                  usage();
                  exit(EXIT_FAILURE);
                  break;
              default:
                  //do nothing (other options are read later on)
                  break;
          }
      }

360
361
      initLogging();
      auto cmdSink = setupCmdLogger();
Alessio Netti's avatar
Logging    
Alessio Netti committed
362

363
364
      Configuration config(argv[argc - 1]);
      if( !config.readGlobal() ) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
365
          LOG(fatal) << "Failed to read global configuration!";
366
367
368
369
          exit(EXIT_FAILURE);
      }

      globalCA_t& settings = config.getGlobal();
370

371
      /* Parse command line */
372
      std::string listenHost, cassandraHost, restApiHost;
373
      std::string listenPort, cassandraPort, restApiPort;
374
375
376

      optind = 1;
      while ((ret=getopt(argc, argv, opts))!=-1) {
377
          switch(ret) {
378
379
380
              case 'a':
                  settings.pluginSettings.sensorPattern = optarg;
                  break;
381
              case 'm':
382
                  settings.mqttListenAddress = optarg;
383
                  break;
384
              case 'r':
385
                  settings.restListenAddress = optarg;
386
                  break;
387
              case 'c':
388
389
                  settings.cassandraSettings.address = optarg;
                  break;
Michael Ott's avatar
Michael Ott committed
390
              case 'u':
391
                  settings.cassandraSettings.username = optarg;
392
                  break;
Michael Ott's avatar
Michael Ott committed
393
              case 'p': {
394
395
                  settings.cassandraSettings.password = optarg;
                  // What does this do? Mask the password?
Michael Ott's avatar
Michael Ott committed
396
397
398
399
400
401
402
                  size_t pwdLen = strlen(optarg);
                  memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
                  if (pwdLen > 3) {
                      memset(optarg+3, 0, pwdLen-3);
                  }
                  break;
              }
403
              case 't':
404
                  settings.cassandraSettings.ttl = stoul(optarg);
405
                  break;
Alessio Netti's avatar
Logging    
Alessio Netti committed
406
              case 'v':
407
                  settings.logLevelCmd = translateLogLevel(stoi(optarg));
Alessio Netti's avatar
Logging    
Alessio Netti committed
408
                  break;
409
              case 'd':
410
              case 'D':
411
                  settings.daemonize = 1;
412
                  break;
413
              case 's':
414
                  settings.statistics = 1;
415
                  break;
416
417
418
              case 'x':
                  validateConfig = true;
                  break;
419
              case 'h':
420
421
422
              default:
                  usage();
                  exit(EXIT_FAILURE);
423
424
425
          }
      }

426
      auto fileSink = setupFileLogger(settings.pluginSettings.tempdir, std::string("collectagent"));
Alessio Netti's avatar
Logging    
Alessio Netti committed
427
428
429
430
      //severity level may be overwritten (per option or config-file) --> set it according to globalSettings
      fileSink->set_filter(boost::log::trivial::severity >= settings.logLevelFile);
      cmdSink->set_filter(boost::log::trivial::severity >= settings.logLevelCmd);

431
      /*
Alessio Netti's avatar
Alessio Netti committed
432
       * Catch SIGINT and SIGTERM signals to allow for proper server shutdowns.
433
434
       */
      signal(SIGINT, sigHandler);
Alessio Netti's avatar
Alessio Netti committed
435
      signal(SIGTERM, sigHandler);
436
437
438
439
440
441
442
443
444
445
446

      /*
       * Catch critical signals to allow for backtraces
       */
      signal(SIGABRT, abrtHandler);
      signal(SIGSEGV, abrtHandler);

      // Daemonizing the collectagent
      if(settings.daemonize)
          dcdbdaemon();

447
448
449
      /*
       * Parse hostnames for port specifications
       */
450
      listenHost = string(settings.mqttListenAddress);
451
452
453
454
455
      size_t pos = listenHost.find(":");
      if (pos != string::npos) {
        listenPort = listenHost.substr(pos+1);
        listenHost.erase(pos);
      } else {
456
        listenPort = LISTENPORT;
457
      }
458
459

      cassandraHost = string(settings.cassandraSettings.address);
460
461
462
463
464
      pos = cassandraHost.find(":");
      if (pos != string::npos) {
        cassandraPort = cassandraHost.substr(pos+1);
        cassandraHost.erase(pos);
      } else {
465
        cassandraPort = CASSANDRAPORT;
466
      }
467
468

      restApiHost = string(settings.restListenAddress);
469
470
471
472
473
      pos = restApiHost.find(":");
      if (pos != string::npos) {
        restApiPort = restApiHost.substr(pos+1);
        restApiHost.erase(pos);
      } else {
474
        restApiPort = RESTAPIPORT;
475
476
      }

477
      // Setting the size of the sensor cache
478
      // Conversion from milliseconds to nanoseconds
479
      mySensorCache.setMaxHistory(uint64_t(settings.pluginSettings.cacheInterval) * 1000000);
480

481
482
      //Allocate and initialize connection to Cassandra.
      dcdbConn = new DCDB::Connection(cassandraHost, atoi(cassandraPort.c_str()), settings.cassandraSettings.username, settings.cassandraSettings.password);
Alessio Netti's avatar
Alessio Netti committed
483
484
485
486
487
      dcdbConn->setNumThreadsIo(settings.cassandraSettings.numThreadsIo);
      dcdbConn->setQueueSizeIo(settings.cassandraSettings.queueSizeIo);
      uint32_t params[3] = {settings.cassandraSettings.coreConnPerHost, settings.cassandraSettings.maxConnPerHost, settings.cassandraSettings.maxConcRequests};
      dcdbConn->setBackendParams(params);

488

Axel Auweter's avatar
Axel Auweter committed
489
      if (!dcdbConn->connect()) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
490
          LOG(fatal) << "Cannot connect to Cassandra!";
491
492
493
494
495
496
          exit(EXIT_FAILURE);
      }

      /*
       * Legacy behavior: Initialize the DCDB schema in Cassandra.
       */
Axel Auweter's avatar
Axel Auweter committed
497
      dcdbConn->initSchema();
498

Alessio Netti's avatar
Alessio Netti committed
499

500
501
502
      /*
       * Allocate the SensorDataStore.
       */
503
      mySensorDataStore = new DCDB::SensorDataStore(dcdbConn);
504
      mySensorConfig = new DCDB::SensorConfig(dcdbConn);
505
506
507

      /*
       * Set TTL for data store inserts if TTL > 0.
508
       */
509
510
      if (settings.cassandraSettings.ttl > 0)
        mySensorDataStore->setTTL(settings.cassandraSettings.ttl);
Alessio Netti's avatar
Alessio Netti committed
511
      mySensorDataStore->setDebugLog(settings.cassandraSettings.debugLog);
512

513
514
515
516
      analyticsController = new AnalyticsController(mySensorConfig, mySensorDataStore);
      analyticsController->setCache(&mySensorCache);
      if(!analyticsController->initialize(settings, argv[argc - 1]))
          return EXIT_FAILURE;
517
      queryEngine.setQueryCallback(sensorQueryCallback);
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565

      LOG_LEVEL vLogLevel = validateConfig ? LOG_LEVEL::info : LOG_LEVEL::debug;
      LOG_VAR(vLogLevel) << "-----  Configuration  -----";

      //print global settings in either case
      LOG(info) << "Global Settings:";
      LOG(info) << "    MQTT-listenAddress: " << settings.mqttListenAddress;
      LOG(info) << "    CacheInterval:      " << int(settings.pluginSettings.cacheInterval/1000) << " [s]";
      LOG(info) << "    CleaningInterval:   " << settings.cleaningInterval << " [s]";
      LOG(info) << "    MessageThreads:     " << settings.messageThreads;
      LOG(info) << "    MessageSlots:       " << settings.messageSlots;
      LOG(info) << "    Daemonize:          " << (settings.daemonize ? "Enabled" : "Disabled");
      LOG(info) << "    Statistics:         " << (settings.statistics ? "Enabled" : "Disabled");
      LOG(info) << "    Write-Dir:          " << settings.pluginSettings.tempdir;
      LOG(info) << "    Hierarchy:          " << (settings.hierarchy!="" ? settings.hierarchy : "none");
      LOG(info) << (validateConfig ? "    Only validating config files." : "    ValidateConfig:     Disabled");

      LOG(info) << "Cassandra Driver Settings:";
      LOG(info) << "    Address:            " << settings.cassandraSettings.address;
      LOG(info) << "    TTL:                " << settings.cassandraSettings.ttl;
      LOG(info) << "    NumThreadsIO:       " << settings.cassandraSettings.numThreadsIo;
      LOG(info) << "    QueueSizeIO:        " << settings.cassandraSettings.queueSizeIo;
      LOG(info) << "    CoreConnPerHost:    " << settings.cassandraSettings.coreConnPerHost;
      LOG(info) << "    MaxConnPerHost:     " << settings.cassandraSettings.maxConnPerHost;
      LOG(info) << "    MaxConcRequests:    " << settings.cassandraSettings.maxConcRequests;
      LOG(info) << "    DebugLog:           " << (settings.cassandraSettings.debugLog ? "Enabled" : "Disabled");
#ifdef SimpleMQTTVerbose
      LOG(info) << "    Username:           " << settings.cassandraSettings.username;
	  LOG(info) << "    Password:           " << settings.cassandraSettings.password;
#else
      LOG(info) << "    Username and password not printed.";
#endif

      LOG(info) << "RestAPI Settings:";
      LOG(info) << "    REST Server: " << settings.restListenAddress;

      LOG_VAR(vLogLevel) << "-----  Analytics Configuration  -----";
      for(auto& p : analyticsController->getManager()->getPlugins()) {
          LOG_VAR(vLogLevel) << "Analytics Plugin \"" << p.id << "\"";
          p.configurator->printConfig(vLogLevel);
      }
      LOG_VAR(vLogLevel) << "-----  End Configuration  -----";

      if (validateConfig)
          return EXIT_SUCCESS;
      else
          analyticsController->start();

566
567
568
      /*
       * Start the MQTT Message Server.
       */
569
      SimpleMQTTServer ms(listenHost, listenPort, settings.messageThreads, settings.messageSlots);
570
      
571
572
573
      ms.setMessageCallback(mqttCallback);
      ms.start();

Alessio Netti's avatar
Logging    
Alessio Netti committed
574
      LOG(info) << "MQTT Server running...";
575
      
576
577
578
579
580
581
582
583
584
585
586
587
      /*
       * Start the HTTP Server for the REST API
       */
      std::thread httpThread;
      httpHandler_t httpHandler;
      httpServer_t::options httpOptions(httpHandler);

      httpOptions.reuse_address(true);
      httpOptions.thread_pool(std::make_shared<boost::network::utils::thread_pool>());
      httpServer_t httpServer(httpOptions.address(restApiHost).port(restApiPort));
      httpThread = std::thread([&httpServer] { httpServer.run(); });

Alessio Netti's avatar
Logging    
Alessio Netti committed
588
      LOG(info) <<  "HTTP Server running...";
589

590
591
592
593
594
595
      /*
       * Run (hopefully) forever...
       */
      keepRunning = 1;
      timeval start, end;
      double elapsed;
596
597
598
      msgCtr = 0;
      pmsgCtr = 0;
      readingCtr = 0;
599

Alessio Netti's avatar
Alessio Netti committed
600
601
602
      gettimeofday(&start, NULL);
      uint64_t lastCleanup = start.tv_sec;

603
      LOG(info) << "Collect Agent running...";
604
605
      while(keepRunning) {
          gettimeofday(&start, NULL);
Alessio Netti's avatar
Alessio Netti committed
606
607
608
609
610
611
612
          if(start.tv_sec - lastCleanup > settings.cleaningInterval) {
              uint64_t purged = mySensorCache.clean(settings.cleaningInterval * 1000000000);
              lastCleanup = start.tv_sec;
              if(purged > 0)
                  LOG(info) << "Cache: purged " << purged << " obsolete entries";
          }

613
          sleep(60);
614
615
616
617
          /* not really thread safe but will do the job */
          gettimeofday(&end, NULL);
          elapsed = (end.tv_sec - start.tv_sec) * 1000.0;
          elapsed += (end.tv_usec - start.tv_usec) / 1000.0;
618
          float publish = msgCtr?(pmsgCtr*100.0)/msgCtr:0;
Alessio Netti's avatar
Alessio Netti committed
619
          if (settings.statistics && keepRunning) {
620
              LOG(info) << "Performance: " << (readingCtr/elapsed)*1000.0 << " inserts/s, " << (msgCtr/elapsed)*1000.0 << " messages/s (" << publish << "% PUBLISH)";
621
              LOG(info) << "Analytics Performance: " << (analyticsController->getReadingCtr()/elapsed)*1000.0 << " inserts/s ";
622
          }
623
          msgCtr = 0;
624
          pmsgCtr = 0;
625
	  readingCtr = 0;
626
627
      }

Alessio Netti's avatar
Logging    
Alessio Netti committed
628
      LOG(info) << "Stopping...";
629
      analyticsController->stop();
630
      ms.stop();
Alessio Netti's avatar
Logging    
Alessio Netti committed
631
      LOG(info) << "MQTT Server stopped...";
632
633
      httpServer.stop();
      httpThread.join();
Alessio Netti's avatar
Logging    
Alessio Netti committed
634
      LOG(info) << "HTTP Server stopped...";
635
      delete mySensorDataStore;
636
      delete mySensorConfig;
Axel Auweter's avatar
Axel Auweter committed
637
638
      dcdbConn->disconnect();
      delete dcdbConn;
Alessio Netti's avatar
Logging    
Alessio Netti committed
639
      LOG(info) << "Collect Agent closed. Bye bye...";
640
  }
641
  catch (const exception& e) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
642
      LOG(fatal) << "Exception: " << e.what();
643
      abrt(EXIT_FAILURE, INTERR);
644
645
  }

646
  return EXIT_SUCCESS;
647
}
648
649