Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

collectagent.cpp 24.3 KB
Newer Older
1
2
//================================================================================
// Name        : collectagent.cpp
3
// Author      : Axel Auweter
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
// Copyright   : Leibniz Supercomputing Centre
6
// Description : Main code of the CollectAgent
7
8
9
10
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
11
// Copyright (C) 2011-2019 Leibniz Supercomputing Centre
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
27

28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
 * @defgroup ca Collect Agent
 *
 * @brief MQTT message broker in between pusher and storage backend.
 *
 * @details Collect Agent is a intermediary between one or multiple Pusher
 *          instances and one storage backend. It runs a reduced custom MQTT
 *          message server. Collect Agent receives data from Pusher
 *          via MQTT messages and stores them in the storage via libdcdb.
 */

/**
 * @file collectagent.cpp
 *
 * @brief Main function for the DCDB Collect Agent.
 *
 * @ingroup ca
 */

47
#include <cstdlib>
48
#include <signal.h>
49
50
#include <unistd.h>
#include <string>
51

52
#include <boost/date_time/posix_time/posix_time.hpp>
53

54
#include <dcdb/connection.h>
55
#include <dcdb/sensordatastore.h>
Alessio Netti's avatar
Alessio Netti committed
56
#include <dcdb/jobdatastore.h>
57
#include <dcdb/sensorconfig.h>
58
#include <dcdb/version.h>
59
#include <dcdb/sensor.h>
60
#include "version.h"
61

62
#include "CARestAPI.h"
63
#include "configuration.h"
64
#include "simplemqttserver.h"
65
#include "messaging.h"
66
#include "abrt.h"
67
#include "dcdbdaemon.h"
68
#include "sensorcache.h"
69
70
#include "analyticscontroller.h"
#include "../analytics/includes/QueryEngine.h"
71

72
73
74
#define __STDC_FORMAT_MACROS
#include <inttypes.h>

75
using namespace std;
76
77

int keepRunning;
78
bool statistics;
79
80
uint64_t msgCtr;
uint64_t pmsgCtr;
81
uint64_t readingCtr;
Alessio Netti's avatar
Alessio Netti committed
82
SensorCache mySensorCache;
83
AnalyticsController* analyticsController;
84
DCDB::Connection* dcdbConn;
85
DCDB::SensorDataStore *mySensorDataStore;
Alessio Netti's avatar
Alessio Netti committed
86
DCDB::JobDataStore *myJobDataStore;
87
88
DCDB::SensorConfig *mySensorConfig;
DCDB::SCError err;
89
QueryEngine& queryEngine = QueryEngine::getInstance();
90
logger_t lg;
91

Alessio Netti's avatar
Alessio Netti committed
92
93
94
95
96
97
98
99
100
101
102
std::vector<qeJobData>* jobQueryCallback(const uint32_t jobId, const uint64_t startTs, const uint64_t endTs, vector<qeJobData>* buffer, const bool rel, const bool range) {
    std::list<JobData> tempList;
    JobData   tempData;
    qeJobData tempQeData;
    JDError err;
    if(range) {
        // Getting a list of jobs in the given time range
        uint64_t now = getTimestamp();
        uint64_t startTsInt = rel ? now - startTs : startTs;
        uint64_t endTsInt = rel ? now - endTs : endTs;
        DCDB::TimeStamp start(startTsInt), end(endTsInt);
103
        err = myJobDataStore->getJobsInIntervalRunning(tempList, start, end);
Alessio Netti's avatar
Alessio Netti committed
104
105
106
107
108
109
110
111
112
113
        if(err != JD_OK) return NULL;
    } else {
        // Getting a single job by id
        err = myJobDataStore->getJobById(tempData, jobId);
        if(err != JD_OK) return NULL;
        tempList.push_back(tempData);
    }
    
    if(!buffer)
        buffer = new std::vector<qeJobData>();
114
    //buffer->clear();
Alessio Netti's avatar
Alessio Netti committed
115
116
117
118
119
120
121
122
123
124
125
    for(auto& jd : tempList) {
        tempQeData.jobId = jd.jobId;
        tempQeData.userId = jd.userId;
        tempQeData.startTime = jd.startTime.getRaw();
        tempQeData.endTime = jd.endTime.getRaw();
        tempQeData.nodes = jd.nodes;
        buffer->push_back(tempQeData);
    }
    return buffer;
}

126
std::vector<reading_t>* sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>* buffer, const bool rel) {
127
    std::string topic;
Alessio Netti's avatar
Alessio Netti committed
128
    // Getting the topic of the queried sensor from the Navigator
129
130
131
132
133
134
135
    try {
        topic = queryEngine.getNavigator()->getNodeTopic(name);
    } catch(const std::domain_error& e) {
        return NULL;
    }
    std::vector <reading_t> *output = NULL;
    DCDB::SensorId sid;
Alessio Netti's avatar
Alessio Netti committed
136
    // Creating a SID to perform the query
137
138
139
    sid.mqttTopicConvert(topic);
    if(mySensorCache.getSensorMap().count(sid) > 0) {
        CacheEntry &entry = mySensorCache.getSensorMap()[sid];
140
141
        // Counting the number of elements in the buffer before accessing the cache
        size_t elCtr = (buffer==nullptr) ? 0 : buffer->size();
142
        output = entry.getView(startTs, endTs, buffer, rel);
143
        if (output->size() > elCtr)
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
            return output;
    }
    // If we are here then the sensor was not found in the cache - we need to fetch data from Cassandra
    try {
        DCDB::PublicSensor publicSensor;
        publicSensor.name = name;
        publicSensor.pattern = topic;
        std::list <DCDB::SensorDataStoreReading> results;
        DCDB::Sensor sensor(dcdbConn, publicSensor);
        uint64_t now = getTimestamp();
        //Converting relative timestamps to absolute
        uint64_t startTsInt = rel ? now - startTs : startTs;
        uint64_t endTsInt = rel ? now - endTs : endTs;
        DCDB::TimeStamp start(startTsInt), end(endTsInt);
        sensor.query(results, start, end, DCDB::AGGREGATE_NONE);
Alessio Netti's avatar
Alessio Netti committed
159
        // Dealing with allocations that may have been performed by the cache search
160
161
        if(!output)
            output = (buffer==nullptr) ? new std::vector<reading_t>() : buffer;
162
163
164
165
166
167
168
169
170
171
172
173
174
        reading_t reading;
        //TODO: fix when result contains only partial time range of the query
        for (const auto &r : results) {
            reading.value = r.value;
            reading.timestamp = r.timeStamp.getRaw();
            output->push_back(reading);
        }
    }
    catch(const std::exception& e) {
        if(!buffer && output) delete output;
        return NULL;
    }
    return output;
175
176
}

177
/* Normal termination (SIGINT, CTRL+C) */
178
179
void sigHandler(int sig)
{
Alessio Netti's avatar
Alessio Netti committed
180
181
182
183
184
  boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
  if( sig == SIGINT )
      LOG(fatal) << "Received SIGINT";
  else if( sig == SIGTERM )
      LOG(fatal) << "Received SIGTERM";
185
186
187
  keepRunning = 0;
}

188
189
190
191
192
193
/* Crash */
void abrtHandler(int sig)
{
  abrt(EXIT_FAILURE, SIGNAL);
}

194
int mqttCallback(SimpleMQTTMessage *msg)
195
196
197
198
{
  /*
   * Increment the msgCtr/vmsgCtr for statistics.
   */
199
  msgCtr++;
200
201
202
  if (msg->isPublish())
    pmsgCtr++;

203
  uint64_t len;
204
205
206
  /*
   * Decode the message and put into the database.
   */
207
  if (msg->isPublish()) {
208
209
210
211
      const char *topic = msg->getTopic().c_str();
      // We check whether the topic includes the \DCDB_MAP\ keyword, indicating that the payload will contain the
      // sensor's name. In that case, we set the mappingMessage flag to true, and filter the keyword out of the prefix
      // We use strncmp as it is the most efficient way to do it
Alessio Netti's avatar
Alessio Netti committed
212
      if (strncmp(topic, DCDB_MAP, DCDB_MAP_LEN) == 0) {
213
          if ((len = msg->getPayloadLength()) == 0) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
214
              LOG(error) << "Empty topic-to-name mapping message received";
215
216
              return 1;
          }
217

218
219
220
221
222
223
          string sensorName((char *) msg->getPayload(), len);
          err = mySensorConfig->publishSensor(sensorName.c_str(), topic + DCDB_MAP_LEN);

          // PublishSensor does most of the error checking for us
          switch (err) {
              case DCDB::SC_INVALIDPATTERN:
Alessio Netti's avatar
Logging    
Alessio Netti committed
224
                  LOG(error) << "Invalid sensor topic : " << msg->getTopic();
225
226
                  return 1;
              case DCDB::SC_INVALIDPUBLICNAME:
Alessio Netti's avatar
Logging    
Alessio Netti committed
227
                  LOG(error) << "Invalid sensor public name: " << sensorName;
228
229
                  return 1;
              case DCDB::SC_INVALIDSESSION:
Alessio Netti's avatar
Logging    
Alessio Netti committed
230
                  LOG(error) << "Cannot reach sensor data store.";
231
232
233
234
235
236
237
238
239
240
241
                  return 1;
              default:
                  break;
          }
      } else {
          mqttPayload buf, *payload;

          len = msg->getPayloadLength();
          //In the 64 bit message case, the collect agent provides a timestamp
          if (len == sizeof(uint64_t)) {
              payload = &buf;
242
              payload->value = *((int64_t *) msg->getPayload());
243
244
              payload->timestamp = Messaging::calculateTimestamp();
              len = sizeof(uint64_t) * 2;
245
          }
246
247
248
249
250
251
              //...otherwise it just retrieves it from the MQTT message payload.
          else if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
              payload = (mqttPayload *) msg->getPayload();
          }
              //...otherwise this message is malformed -> ignore...
          else {
Alessio Netti's avatar
Logging    
Alessio Netti committed
252
              LOG(error) << "Message malformed";
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
              return 1;
          }

          /*
           * Check if we can decode the message topic
           * into a valid SensorId. If successful, store
           * the record in the database.
           */
          DCDB::SensorId sid;
          if (sid.mqttTopicConvert(msg->getTopic())) {
#if 0
              cout << "Topic decode successful:" << endl
                  << "  Raw:            " << hex << setw(16) << setfill('0') << sid.getRaw()[0] << hex << setw(16) << setfill('0') << sid.getRaw()[1] << endl
                  << "  DeviceLocation: " << hex << setw(16) << setfill('0') << sid.getDeviceLocation() << endl
                  << "  device_id:      " << hex << setw(8) << setfill('0') << sid.getDeviceSensorId().device_id << endl
                  << "  sensor_number:  " << hex << setw(4) << setfill('0') << sid.getDeviceSensorId().sensor_number << endl << dec;

              cout << "Payload ("  << len/sizeof(mqttPayload) << " messages):"<< endl;
              for (uint64_t i=0; i<len/sizeof(mqttPayload); i++) {
                cout << "  " << i << ": ts=" << payload[i].timestamp << " val=" << payload[i].value << endl;
              }
              cout << endl;
275
#endif
276
	      std::list<DCDB::SensorDataStoreReading> readings;
Alessio Netti's avatar
Alessio Netti committed
277
278
279
280
281
282
          for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
              DCDB::SensorDataStoreReading r(sid, payload[i].timestamp, payload[i].value);
              readings.push_back(r);
              mySensorCache.storeSensor(sid, payload[i].timestamp, payload[i].value);
          }
          mySensorCache.getSensorMap()[sid].updateBatchSize(uint64_t(len / sizeof(mqttPayload)));
283
	      mySensorDataStore->insertBatch(readings);
284
	      readingCtr+= readings.size();
285

286
              //mySensorCache.dump();
287
          }
288
#if 1
289
290
291
292
          else {
              cout << "Wrong topic format: " << msg->getTopic() << "\n";
              return 1;
          }
293
#endif
294
      }
295
  }
296
  return 0;
297
298
}

299
300
301



302
303
304
305
/*
 * Print usage information
 */
void usage() {
Alessio Netti's avatar
Alessio Netti committed
306
  Configuration config("", "collectagent.conf");
307
308
309
310
311
  /*
             1         2         3         4         5         6         7         8
   012345678901234567890123456789012345678901234567890123456789012345678901234567890
   */
  cout << "Usage:" << endl;
312
  cout << "  collectagent [-d] [-s] [-x] [-a] [-m<host>] [-c<host>] [-u<username>] [-p<password>] [-t<ttl>] [-v<verbosity>] <path/to/configfiles/>" << endl;
313
314
315
316
  cout << "  collectagent -h" << endl;
  cout << endl;
  
  cout << "Options:" << endl;
Alessio Netti's avatar
Alessio Netti committed
317
318
  cout << "  -m<host>      MQTT listen address     [default: " << config.mqttListenHost << ":" << config.mqttListenPort << "]" << endl;
  cout << "  -c<host>      Cassandra host          [default: " << config.cassandraSettings.host << ":" << config.cassandraSettings.port << "]" << endl;
Michael Ott's avatar
Michael Ott committed
319
320
  cout << "  -u<username>  Cassandra username      [default: none]" << endl;
  cout << "  -p<password>  Cassandra password      [default: none]" << endl;
Alessio Netti's avatar
Alessio Netti committed
321
322
  cout << "  -t<ttl>       Cassandra insert TTL    [default: " << config.cassandraSettings.ttl << "]" << endl;
  cout << "  -v<level>     Set verbosity of output [default: " << config.logLevelCmd << "]" << endl
Alessio Netti's avatar
Logging    
Alessio Netti committed
323
       << "                Can be a number between 5 (all) and 0 (fatal)." << endl;
324
  cout << endl;
Michael Ott's avatar
Michael Ott committed
325
  cout << "  -d            Daemonize" << endl;
326
  cout << "  -s            Print message stats" <<endl;
327
  cout << "  -x            Parse and print the config but do not actually start collectagent" << endl;
328
  cout << "  -a			   Enable sensor auto-publish" << endl;
Michael Ott's avatar
Michael Ott committed
329
  cout << "  -h            This help page" << endl;
330
  cout << endl;
331
332
333
}

int main(int argc, char* const argv[]) {
334
    cout << "CollectAgent " << VERSION << " (libdcdb " << DCDB::Version::getVersion() << ")" << endl << endl;
335
336

  try{
337

338
339
340
341
342
343
344
345
      // Checking if path to config is supplied
      if (argc <= 1) {
          cout << "Please specify a path to the config-directory" << endl << endl;
          usage();
          exit(EXIT_FAILURE);
      }

      // Defining options
346
      const char* opts = "m:r:c:C:u:p:t:v:dDsaxh";
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362

      // Same mechanism as in DCDBPusher - checking if help string is requested before loading config
      char ret;
      while ((ret = getopt(argc, argv, opts)) != -1) {
          switch (ret)
          {
              case 'h':
                  usage();
                  exit(EXIT_FAILURE);
                  break;
              default:
                  //do nothing (other options are read later on)
                  break;
          }
      }

363
364
      initLogging();
      auto cmdSink = setupCmdLogger();
Alessio Netti's avatar
Logging    
Alessio Netti committed
365

Alessio Netti's avatar
Alessio Netti committed
366
367
      Configuration config(argv[argc - 1], "collectagent.conf");
      if( !config.readConfig() ) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
368
          LOG(fatal) << "Failed to read global configuration!";
369
370
371
          exit(EXIT_FAILURE);
      }

Alessio Netti's avatar
Alessio Netti committed
372
373
374
375
      // References to shorten access to config parameters
      Configuration& settings = config;
      cassandra_t& cassandraSettings = config.cassandraSettings;
      pluginSettings_t& pluginSettings = config.pluginSettings;
Micha Mueller's avatar
Micha Mueller committed
376
      serverSettings_t& restAPISettings = config.restAPISettings;
Alessio Netti's avatar
Alessio Netti committed
377
      analyticsSettings_t& analyticsSettings = config.analyticsSettings;
Alessio Netti's avatar
Alessio Netti committed
378
      
379
380
      optind = 1;
      while ((ret=getopt(argc, argv, opts))!=-1) {
381
          switch(ret) {
382
              case 'a':
383
                  pluginSettings.autoPublish = true;
384
                  break;
385
              case 'm':
Alessio Netti's avatar
Alessio Netti committed
386
387
388
                  settings.mqttListenHost = parseNetworkHost(optarg);
                  settings.mqttListenPort = parseNetworkPort(optarg);
                  if(settings.mqttListenPort=="") settings.mqttListenPort = string(LISTENPORT);
389
                  break;
390
              case 'c':
Alessio Netti's avatar
Alessio Netti committed
391
392
393
                  cassandraSettings.host = parseNetworkHost(optarg);
                  cassandraSettings.port = parseNetworkPort(optarg);
                  if(cassandraSettings.port=="") cassandraSettings.port = string(CASSANDRAPORT);
394
                  break;
Michael Ott's avatar
Michael Ott committed
395
              case 'u':
Alessio Netti's avatar
Alessio Netti committed
396
                  cassandraSettings.username = optarg;
397
                  break;
Michael Ott's avatar
Michael Ott committed
398
              case 'p': {
Alessio Netti's avatar
Alessio Netti committed
399
                  cassandraSettings.password = optarg;
400
                  // What does this do? Mask the password?
Michael Ott's avatar
Michael Ott committed
401
402
403
404
405
406
407
                  size_t pwdLen = strlen(optarg);
                  memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
                  if (pwdLen > 3) {
                      memset(optarg+3, 0, pwdLen-3);
                  }
                  break;
              }
408
              case 't':
Alessio Netti's avatar
Alessio Netti committed
409
                  cassandraSettings.ttl = stoul(optarg);
410
                  break;
Alessio Netti's avatar
Logging    
Alessio Netti committed
411
              case 'v':
412
                  settings.logLevelCmd = stoi(optarg);
Alessio Netti's avatar
Logging    
Alessio Netti committed
413
                  break;
414
              case 'd':
415
              case 'D':
416
                  settings.daemonize = 1;
417
                  break;
418
              case 's':
419
                  settings.statistics = 1;
420
                  break;
421
              case 'x':
Alessio Netti's avatar
Alessio Netti committed
422
                  settings.validateConfig = true;
423
                  break;
424
              case 'h':
425
426
427
              default:
                  usage();
                  exit(EXIT_FAILURE);
428
429
430
          }
      }

431
432
      //set up logger to file
      if (settings.logLevelFile >= 0) {
Alessio Netti's avatar
Alessio Netti committed
433
	  auto fileSink = setupFileLogger(pluginSettings.tempdir, std::string("collectagent"));
434
435
436
	  fileSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelFile));
      }
      
Alessio Netti's avatar
Logging    
Alessio Netti committed
437
      //severity level may be overwritten (per option or config-file) --> set it according to globalSettings
438
439
440
      if (settings.logLevelCmd >= 0) {
	  cmdSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelCmd));
      }
Alessio Netti's avatar
Logging    
Alessio Netti committed
441

442
      /*
Alessio Netti's avatar
Alessio Netti committed
443
       * Catch SIGINT and SIGTERM signals to allow for proper server shutdowns.
444
445
       */
      signal(SIGINT, sigHandler);
Alessio Netti's avatar
Alessio Netti committed
446
      signal(SIGTERM, sigHandler);
447
448
449
450
451
452
453
454
455
456

      /*
       * Catch critical signals to allow for backtraces
       */
      signal(SIGABRT, abrtHandler);
      signal(SIGSEGV, abrtHandler);

      // Daemonizing the collectagent
      if(settings.daemonize)
          dcdbdaemon();
Alessio Netti's avatar
Alessio Netti committed
457
      
458
      // Setting the size of the sensor cache
459
      // Conversion from milliseconds to nanoseconds
Alessio Netti's avatar
Alessio Netti committed
460
      mySensorCache.setMaxHistory(uint64_t(pluginSettings.cacheInterval) * 1000000);
461

462
      //Allocate and initialize connection to Cassandra.
Alessio Netti's avatar
Alessio Netti committed
463
464
      dcdbConn = new DCDB::Connection(cassandraSettings.host, atoi(cassandraSettings.port.c_str()), 
                                      cassandraSettings.username, cassandraSettings.password);
Alessio Netti's avatar
Alessio Netti committed
465
466
      dcdbConn->setNumThreadsIo(cassandraSettings.numThreadsIo);
      dcdbConn->setQueueSizeIo(cassandraSettings.queueSizeIo);
467
      uint32_t params[1] = {cassandraSettings.coreConnPerHost};
Alessio Netti's avatar
Alessio Netti committed
468
      dcdbConn->setBackendParams(params);
Alessio Netti's avatar
Alessio Netti committed
469
      
Axel Auweter's avatar
Axel Auweter committed
470
      if (!dcdbConn->connect()) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
471
          LOG(fatal) << "Cannot connect to Cassandra!";
472
473
474
475
476
477
          exit(EXIT_FAILURE);
      }

      /*
       * Legacy behavior: Initialize the DCDB schema in Cassandra.
       */
Axel Auweter's avatar
Axel Auweter committed
478
      dcdbConn->initSchema();
Alessio Netti's avatar
Alessio Netti committed
479
      
480
481
482
      /*
       * Allocate the SensorDataStore.
       */
483
      mySensorDataStore = new DCDB::SensorDataStore(dcdbConn);
484
      mySensorConfig = new DCDB::SensorConfig(dcdbConn);
Alessio Netti's avatar
Alessio Netti committed
485
      myJobDataStore = new DCDB::JobDataStore(dcdbConn);
486
487
488

      /*
       * Set TTL for data store inserts if TTL > 0.
489
       */
Alessio Netti's avatar
Alessio Netti committed
490
491
492
      if (cassandraSettings.ttl > 0)
        mySensorDataStore->setTTL(cassandraSettings.ttl);
      mySensorDataStore->setDebugLog(cassandraSettings.debugLog);
493

494
495
496
497
      analyticsController = new AnalyticsController(mySensorConfig, mySensorDataStore);
      analyticsController->setCache(&mySensorCache);
      if(!analyticsController->initialize(settings, argv[argc - 1]))
          return EXIT_FAILURE;
Alessio Netti's avatar
Alessio Netti committed
498
499
      queryEngine.setFilter(analyticsSettings.filter);
      queryEngine.setSensorHierarchy(analyticsSettings.hierarchy);
500
      queryEngine.setQueryCallback(sensorQueryCallback);
Alessio Netti's avatar
Alessio Netti committed
501
      queryEngine.setJobQueryCallback(jobQueryCallback);
502

Alessio Netti's avatar
Alessio Netti committed
503
      LOG_LEVEL vLogLevel = settings.validateConfig ? LOG_LEVEL::info : LOG_LEVEL::debug;
504
505
506
507
      LOG_VAR(vLogLevel) << "-----  Configuration  -----";

      //print global settings in either case
      LOG(info) << "Global Settings:";
Alessio Netti's avatar
Alessio Netti committed
508
      LOG(info) << "    MQTT-listenAddress: " << settings.mqttListenHost << ":" << settings.mqttListenPort;
Alessio Netti's avatar
Alessio Netti committed
509
      LOG(info) << "    CacheInterval:      " << int(pluginSettings.cacheInterval/1000) << " [s]";
510
511
512
513
514
      LOG(info) << "    CleaningInterval:   " << settings.cleaningInterval << " [s]";
      LOG(info) << "    MessageThreads:     " << settings.messageThreads;
      LOG(info) << "    MessageSlots:       " << settings.messageSlots;
      LOG(info) << "    Daemonize:          " << (settings.daemonize ? "Enabled" : "Disabled");
      LOG(info) << "    Statistics:         " << (settings.statistics ? "Enabled" : "Disabled");
Alessio Netti's avatar
Alessio Netti committed
515
      LOG(info) << "    MQTT-prefix:        " << pluginSettings.mqttPrefix;
516
      LOG(info) << "    Auto-publish:       " << (pluginSettings.autoPublish ? "Enabled" : "Disabled");
Alessio Netti's avatar
Alessio Netti committed
517
518
      LOG(info) << "    Write-Dir:          " << pluginSettings.tempdir;
      LOG(info) << (settings.validateConfig ? "    Only validating config files." : "    ValidateConfig:     Disabled");
519

Alessio Netti's avatar
Alessio Netti committed
520
521
      LOG(info) << "Analytics Settings:";
      LOG(info) << "    Hierarchy:          " << (analyticsSettings.hierarchy!="" ? analyticsSettings.hierarchy : "none");
522
      LOG(info) << "    Filter:             " << (analyticsSettings.filter!="" ? analyticsSettings.filter : "none");
Alessio Netti's avatar
Alessio Netti committed
523
      
524
      LOG(info) << "Cassandra Driver Settings:";
Alessio Netti's avatar
Alessio Netti committed
525
      LOG(info) << "    Address:            " << cassandraSettings.host << ":" << cassandraSettings.port;
Alessio Netti's avatar
Alessio Netti committed
526
527
528
529
530
      LOG(info) << "    TTL:                " << cassandraSettings.ttl;
      LOG(info) << "    NumThreadsIO:       " << cassandraSettings.numThreadsIo;
      LOG(info) << "    QueueSizeIO:        " << cassandraSettings.queueSizeIo;
      LOG(info) << "    CoreConnPerHost:    " << cassandraSettings.coreConnPerHost;
      LOG(info) << "    DebugLog:           " << (cassandraSettings.debugLog ? "Enabled" : "Disabled");
531
#ifdef SimpleMQTTVerbose
Alessio Netti's avatar
Alessio Netti committed
532
533
      LOG(info) << "    Username:           " << cassandraSettings.username;
	  LOG(info) << "    Password:           " << cassandraSettings.password;
534
535
536
537
538
#else
      LOG(info) << "    Username and password not printed.";
#endif

      LOG(info) << "RestAPI Settings:";
Micha Mueller's avatar
Micha Mueller committed
539
      LOG(info) << "    REST Server: " << restAPISettings.host << ":" << restAPISettings.port;
Alessio Netti's avatar
Alessio Netti committed
540
541
542
543
544
545
546
#ifdef SimpleMQTTVerbose
      LOG(info) << "    Certificate: " << restAPISettings.certificate;
	  LOG(info) << "    Private key file: " << restAPISettings.privateKey;
	  LOG(info) << "    DH params from: " << restAPISettings.dhFile;
#else
      LOG(info) << "    Certificate, private key and DH-param file not printed.";
#endif
547
548
549
550
551
552
553
554

      LOG_VAR(vLogLevel) << "-----  Analytics Configuration  -----";
      for(auto& p : analyticsController->getManager()->getPlugins()) {
          LOG_VAR(vLogLevel) << "Analytics Plugin \"" << p.id << "\"";
          p.configurator->printConfig(vLogLevel);
      }
      LOG_VAR(vLogLevel) << "-----  End Configuration  -----";

Alessio Netti's avatar
Alessio Netti committed
555
      if (settings.validateConfig)
556
557
558
559
          return EXIT_SUCCESS;
      else
          analyticsController->start();

560
561
562
      /*
       * Start the MQTT Message Server.
       */
Alessio Netti's avatar
Alessio Netti committed
563
      SimpleMQTTServer ms(settings.mqttListenHost, settings.mqttListenPort, settings.messageThreads, settings.messageSlots);
564
      
565
566
567
      ms.setMessageCallback(mqttCallback);
      ms.start();

Alessio Netti's avatar
Logging    
Alessio Netti committed
568
      LOG(info) << "MQTT Server running...";
569
      
570
571
572
      /*
       * Start the HTTP Server for the REST API
       */
573
      CARestAPI httpsServer(restAPISettings, &mySensorCache, analyticsController);
574
      config.readRestAPIUsers(&httpsServer);
575
      httpsServer.start();
576

Alessio Netti's avatar
Logging    
Alessio Netti committed
577
      LOG(info) <<  "HTTP Server running...";
578

579
580
581
582
583
584
      /*
       * Run (hopefully) forever...
       */
      keepRunning = 1;
      timeval start, end;
      double elapsed;
585
586
587
      msgCtr = 0;
      pmsgCtr = 0;
      readingCtr = 0;
588

Alessio Netti's avatar
Alessio Netti committed
589
590
591
      gettimeofday(&start, NULL);
      uint64_t lastCleanup = start.tv_sec;

592
      LOG(info) << "Collect Agent running...";
593
594
      while(keepRunning) {
          gettimeofday(&start, NULL);
Alessio Netti's avatar
Alessio Netti committed
595
596
597
598
599
600
601
          if(start.tv_sec - lastCleanup > settings.cleaningInterval) {
              uint64_t purged = mySensorCache.clean(settings.cleaningInterval * 1000000000);
              lastCleanup = start.tv_sec;
              if(purged > 0)
                  LOG(info) << "Cache: purged " << purged << " obsolete entries";
          }

602
          sleep(60);
603
604
605
606
          /* not really thread safe but will do the job */
          gettimeofday(&end, NULL);
          elapsed = (end.tv_sec - start.tv_sec) * 1000.0;
          elapsed += (end.tv_usec - start.tv_usec) / 1000.0;
607
          float publish = msgCtr?(pmsgCtr*100.0)/msgCtr:0;
Alessio Netti's avatar
Alessio Netti committed
608
          if (settings.statistics && keepRunning) {
609
              LOG(info) << "Performance: " << (readingCtr/elapsed)*1000.0 << " inserts/s, " << (msgCtr/elapsed)*1000.0 << " messages/s (" << publish << "% PUBLISH)";
610
              LOG(info) << "Analytics Performance: " << (analyticsController->getReadingCtr()/elapsed)*1000.0 << " inserts/s ";
611
          }
612
          msgCtr = 0;
613
          pmsgCtr = 0;
614
	  readingCtr = 0;
615
616
      }

Alessio Netti's avatar
Logging    
Alessio Netti committed
617
      LOG(info) << "Stopping...";
618
      analyticsController->stop();
619
      ms.stop();
Alessio Netti's avatar
Logging    
Alessio Netti committed
620
      LOG(info) << "MQTT Server stopped...";
621
      httpsServer.stop();
Alessio Netti's avatar
Logging    
Alessio Netti committed
622
      LOG(info) << "HTTP Server stopped...";
623
      delete mySensorDataStore;
Alessio Netti's avatar
Alessio Netti committed
624
      delete myJobDataStore;
625
      delete mySensorConfig;
Axel Auweter's avatar
Axel Auweter committed
626
627
      dcdbConn->disconnect();
      delete dcdbConn;
Alessio Netti's avatar
Logging    
Alessio Netti committed
628
      LOG(info) << "Collect Agent closed. Bye bye...";
629
  }
630
  catch (const exception& e) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
631
      LOG(fatal) << "Exception: " << e.what();
632
      abrt(EXIT_FAILURE, INTERR);
633
634
  }

635
  return EXIT_SUCCESS;
636
}
637
638