Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

collectagent.cpp 25.5 KB
Newer Older
1
2
//================================================================================
// Name        : collectagent.cpp
3
// Author      : Axel Auweter
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
// Copyright   : Leibniz Supercomputing Centre
6
// Description : Main code of the CollectAgent
7
8
9
10
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
11
// Copyright (C) 2011-2019 Leibniz Supercomputing Centre
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
27

28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
 * @defgroup ca Collect Agent
 *
 * @brief MQTT message broker in between pusher and storage backend.
 *
 * @details Collect Agent is a intermediary between one or multiple Pusher
 *          instances and one storage backend. It runs a reduced custom MQTT
 *          message server. Collect Agent receives data from Pusher
 *          via MQTT messages and stores them in the storage via libdcdb.
 */

/**
 * @file collectagent.cpp
 *
 * @brief Main function for the DCDB Collect Agent.
 *
 * @ingroup ca
 */

47
#include <cstdlib>
48
#include <signal.h>
49
50
#include <unistd.h>
#include <string>
51

52
#include <boost/date_time/posix_time/posix_time.hpp>
53

54
#include <dcdb/connection.h>
55
#include <dcdb/sensordatastore.h>
Alessio Netti's avatar
Alessio Netti committed
56
#include <dcdb/jobdatastore.h>
57
#include <dcdb/sensorconfig.h>
58
#include <dcdb/version.h>
59
#include <dcdb/sensor.h>
60
#include "version.h"
61

62
#include "CARestAPI.h"
63
#include "configuration.h"
64
#include "simplemqttserver.h"
65
#include "messaging.h"
66
#include "abrt.h"
67
#include "dcdbdaemon.h"
68
#include "sensorcache.h"
69
70
#include "analyticscontroller.h"
#include "../analytics/includes/QueryEngine.h"
71

72
73
74
#define __STDC_FORMAT_MACROS
#include <inttypes.h>

75
using namespace std;
76
77

int keepRunning;
78
bool statistics;
79
80
uint64_t msgCtr;
uint64_t pmsgCtr;
81
uint64_t readingCtr;
Alessio Netti's avatar
Alessio Netti committed
82
SensorCache mySensorCache;
83
AnalyticsController* analyticsController;
84
DCDB::Connection* dcdbConn;
85
DCDB::SensorDataStore *mySensorDataStore;
Alessio Netti's avatar
Alessio Netti committed
86
DCDB::JobDataStore *myJobDataStore;
87
DCDB::SensorConfig *mySensorConfig;
88
MetadataStore *metadataStore;
89
DCDB::SCError err;
90
QueryEngine& queryEngine = QueryEngine::getInstance();
91
logger_t lg;
92

93
bool jobQueryCallback(const uint32_t jobId, const uint64_t startTs, const uint64_t endTs, vector<qeJobData>& buffer, const bool rel, const bool range) {
Alessio Netti's avatar
Alessio Netti committed
94
95
96
97
98
99
100
101
102
103
    std::list<JobData> tempList;
    JobData   tempData;
    qeJobData tempQeData;
    JDError err;
    if(range) {
        // Getting a list of jobs in the given time range
        uint64_t now = getTimestamp();
        uint64_t startTsInt = rel ? now - startTs : startTs;
        uint64_t endTsInt = rel ? now - endTs : endTs;
        DCDB::TimeStamp start(startTsInt), end(endTsInt);
104
        err = myJobDataStore->getJobsInIntervalRunning(tempList, start, end);
105
        if(err != JD_OK) return false;
Alessio Netti's avatar
Alessio Netti committed
106
107
108
    } else {
        // Getting a single job by id
        err = myJobDataStore->getJobById(tempData, jobId);
109
        if(err != JD_OK) return false;
Alessio Netti's avatar
Alessio Netti committed
110
111
112
113
114
115
116
117
118
        tempList.push_back(tempData);
    }
    
    for(auto& jd : tempList) {
        tempQeData.jobId = jd.jobId;
        tempQeData.userId = jd.userId;
        tempQeData.startTime = jd.startTime.getRaw();
        tempQeData.endTime = jd.endTime.getRaw();
        tempQeData.nodes = jd.nodes;
119
        buffer.push_back(tempQeData);
Alessio Netti's avatar
Alessio Netti committed
120
    }
121
    return true;
Alessio Netti's avatar
Alessio Netti committed
122
123
}

124
bool sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>& buffer, const bool rel) {
125
126
127
    // Returning NULL if the query engine is being updated
    if(queryEngine.updating.load()) return false;
    ++queryEngine.access;
128
    std::string topic=name;
Alessio Netti's avatar
Alessio Netti committed
129
    // Getting the topic of the queried sensor from the Navigator
130
    // If not found, we try to use the input name as topic
131
132
    try {
        topic = queryEngine.getNavigator()->getNodeTopic(name);
133
    } catch(const std::domain_error& e) {}
134
    DCDB::SensorId sid;
Alessio Netti's avatar
Alessio Netti committed
135
    // Creating a SID to perform the query
136
137
    if(!sid.mqttTopicConvert(topic)) {
        --queryEngine.access;
138
        return false;
139
    }
140
141
    if(mySensorCache.getSensorMap().count(sid) > 0) {
        CacheEntry &entry = mySensorCache.getSensorMap()[sid];
142
143
        if (entry.getView(startTs, endTs, buffer, rel)) {
            --queryEngine.access;
144
            return true;
145
        }
146
147
148
149
150
151
152
153
154
155
156
157
158
    }
    // If we are here then the sensor was not found in the cache - we need to fetch data from Cassandra
    try {
        DCDB::PublicSensor publicSensor;
        publicSensor.name = name;
        publicSensor.pattern = topic;
        std::list <DCDB::SensorDataStoreReading> results;
        DCDB::Sensor sensor(dcdbConn, publicSensor);
        uint64_t now = getTimestamp();
        //Converting relative timestamps to absolute
        uint64_t startTsInt = rel ? now - startTs : startTs;
        uint64_t endTsInt = rel ? now - endTs : endTs;
        DCDB::TimeStamp start(startTsInt), end(endTsInt);
Alessio Netti's avatar
Alessio Netti committed
159
        sensor.query(results, start, end, DCDB::AGGREGATE_NONE, 3600000000000);
160
161
        if(results.empty()) {
            --queryEngine.access;
162
            return false;
163
        }
164
165
166
167
        reading_t reading;
        for (const auto &r : results) {
            reading.value = r.value;
            reading.timestamp = r.timeStamp.getRaw();
168
            buffer.push_back(reading);
169
170
171
        }
    }
    catch(const std::exception& e) {
172
        --queryEngine.access;
173
        return false;
174
    }
175
    --queryEngine.access;
176
    return true;
177
178
}

179
/* Normal termination (SIGINT, CTRL+C) */
180
181
void sigHandler(int sig)
{
Alessio Netti's avatar
Alessio Netti committed
182
183
184
185
186
  boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
  if( sig == SIGINT )
      LOG(fatal) << "Received SIGINT";
  else if( sig == SIGTERM )
      LOG(fatal) << "Received SIGTERM";
187
188
189
  keepRunning = 0;
}

190
191
192
193
194
195
/* Crash */
void abrtHandler(int sig)
{
  abrt(EXIT_FAILURE, SIGNAL);
}

196
int mqttCallback(SimpleMQTTMessage *msg)
197
198
199
200
{
  /*
   * Increment the msgCtr/vmsgCtr for statistics.
   */
201
  msgCtr++;
202
203
204
  if (msg->isPublish())
    pmsgCtr++;

205
  uint64_t len;
206
207
208
  /*
   * Decode the message and put into the database.
   */
209
  if (msg->isPublish()) {
210
      const char *topic = msg->getTopic().c_str();
211
      // We check whether the topic includes the /DCDB_MAP/ keyword, indicating that the payload will contain the
212
213
      // sensor's name. In that case, we set the mappingMessage flag to true, and filter the keyword out of the prefix
      // We use strncmp as it is the most efficient way to do it
Alessio Netti's avatar
Alessio Netti committed
214
      if (strncmp(topic, DCDB_MAP, DCDB_MAP_LEN) == 0) {
215
          if ((len = msg->getPayloadLength()) == 0) {
216
              LOG(error) << "Empty sensor publish message received!";
217
218
              return 1;
          }
219

220
          string payload((char *) msg->getPayload(), len);
221
222
223
224
225
226
227
228
229
          //If the topic includes the extended /DCDB_MAP/METADATA/ keyword, we assume a JSON metadata packet is encoded
          if(strncmp(topic, DCDB_MET, DCDB_MET_LEN) == 0) {
              SensorMetadata sm;
              try {
                  sm.parseJSON(payload);
              } catch (const std::exception &e) {
                  LOG(error) << "Invalid metadata packed received!";
                  return 1;
              }
230
              DCDB::PublicSensor ps = Configuration::metadataToPublicSensor(sm);
Alessio Netti's avatar
Alessio Netti committed
231
              err = mySensorConfig->publishSensor(ps);
232
              metadataStore->store(sm.pattern, sm);
233
234
          } else {
              err = mySensorConfig->publishSensor(payload.c_str(), topic + DCDB_MAP_LEN);
235
          }
236
237
238
239

          // PublishSensor does most of the error checking for us
          switch (err) {
              case DCDB::SC_INVALIDPATTERN:
Alessio Netti's avatar
Logging    
Alessio Netti committed
240
                  LOG(error) << "Invalid sensor topic : " << msg->getTopic();
241
242
                  return 1;
              case DCDB::SC_INVALIDPUBLICNAME:
243
                  LOG(error) << "Invalid sensor public name.";
244
245
                  return 1;
              case DCDB::SC_INVALIDSESSION:
Alessio Netti's avatar
Logging    
Alessio Netti committed
246
                  LOG(error) << "Cannot reach sensor data store.";
247
248
249
250
251
252
253
254
255
256
257
                  return 1;
              default:
                  break;
          }
      } else {
          mqttPayload buf, *payload;

          len = msg->getPayloadLength();
          //In the 64 bit message case, the collect agent provides a timestamp
          if (len == sizeof(uint64_t)) {
              payload = &buf;
258
              payload->value = *((int64_t *) msg->getPayload());
259
260
              payload->timestamp = Messaging::calculateTimestamp();
              len = sizeof(uint64_t) * 2;
261
          }
262
263
264
265
266
267
              //...otherwise it just retrieves it from the MQTT message payload.
          else if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
              payload = (mqttPayload *) msg->getPayload();
          }
              //...otherwise this message is malformed -> ignore...
          else {
Alessio Netti's avatar
Logging    
Alessio Netti committed
268
              LOG(error) << "Message malformed";
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
              return 1;
          }

          /*
           * Check if we can decode the message topic
           * into a valid SensorId. If successful, store
           * the record in the database.
           */
          DCDB::SensorId sid;
          if (sid.mqttTopicConvert(msg->getTopic())) {
#if 0
              cout << "Topic decode successful:" << endl
                  << "  Raw:            " << hex << setw(16) << setfill('0') << sid.getRaw()[0] << hex << setw(16) << setfill('0') << sid.getRaw()[1] << endl
                  << "  DeviceLocation: " << hex << setw(16) << setfill('0') << sid.getDeviceLocation() << endl
                  << "  device_id:      " << hex << setw(8) << setfill('0') << sid.getDeviceSensorId().device_id << endl
                  << "  sensor_number:  " << hex << setw(4) << setfill('0') << sid.getDeviceSensorId().sensor_number << endl << dec;

              cout << "Payload ("  << len/sizeof(mqttPayload) << " messages):"<< endl;
              for (uint64_t i=0; i<len/sizeof(mqttPayload); i++) {
                cout << "  " << i << ": ts=" << payload[i].timestamp << " val=" << payload[i].value << endl;
              }
              cout << endl;
291
#endif
292
	      std::list<DCDB::SensorDataStoreReading> readings;
Alessio Netti's avatar
Alessio Netti committed
293
294
295
296
297
298
          for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
              DCDB::SensorDataStoreReading r(sid, payload[i].timestamp, payload[i].value);
              readings.push_back(r);
              mySensorCache.storeSensor(sid, payload[i].timestamp, payload[i].value);
          }
          mySensorCache.getSensorMap()[sid].updateBatchSize(uint64_t(len / sizeof(mqttPayload)));
299
	      mySensorDataStore->insertBatch(readings, metadataStore->getTTL(msg->getTopic()));
300
	      readingCtr+= readings.size();
301

302
              //mySensorCache.dump();
303
          }
304
#if 1
305
306
307
308
          else {
              cout << "Wrong topic format: " << msg->getTopic() << "\n";
              return 1;
          }
309
#endif
310
      }
311
  }
312
  return 0;
313
314
}

315
316
317



318
319
320
321
/*
 * Print usage information
 */
void usage() {
Alessio Netti's avatar
Alessio Netti committed
322
  Configuration config("", "collectagent.conf");
323
324
325
326
327
  /*
             1         2         3         4         5         6         7         8
   012345678901234567890123456789012345678901234567890123456789012345678901234567890
   */
  cout << "Usage:" << endl;
328
  cout << "  collectagent [-d] [-s] [-x] [-a] [-m<host>] [-c<host>] [-u<username>] [-p<password>] [-t<ttl>] [-v<verbosity>] <path/to/configfiles/>" << endl;
329
330
331
332
  cout << "  collectagent -h" << endl;
  cout << endl;
  
  cout << "Options:" << endl;
Alessio Netti's avatar
Alessio Netti committed
333
334
  cout << "  -m<host>      MQTT listen address     [default: " << config.mqttListenHost << ":" << config.mqttListenPort << "]" << endl;
  cout << "  -c<host>      Cassandra host          [default: " << config.cassandraSettings.host << ":" << config.cassandraSettings.port << "]" << endl;
Michael Ott's avatar
Michael Ott committed
335
336
  cout << "  -u<username>  Cassandra username      [default: none]" << endl;
  cout << "  -p<password>  Cassandra password      [default: none]" << endl;
Alessio Netti's avatar
Alessio Netti committed
337
338
  cout << "  -t<ttl>       Cassandra insert TTL    [default: " << config.cassandraSettings.ttl << "]" << endl;
  cout << "  -v<level>     Set verbosity of output [default: " << config.logLevelCmd << "]" << endl
Alessio Netti's avatar
Logging    
Alessio Netti committed
339
       << "                Can be a number between 5 (all) and 0 (fatal)." << endl;
340
  cout << endl;
Michael Ott's avatar
Michael Ott committed
341
  cout << "  -d            Daemonize" << endl;
342
  cout << "  -s            Print message stats" <<endl;
343
  cout << "  -x            Parse and print the config but do not actually start collectagent" << endl;
344
  cout << "  -a			   Enable sensor auto-publish" << endl;
Michael Ott's avatar
Michael Ott committed
345
  cout << "  -h            This help page" << endl;
346
  cout << endl;
347
348
349
}

int main(int argc, char* const argv[]) {
350
    cout << "CollectAgent " << VERSION << " (libdcdb " << DCDB::Version::getVersion() << ")" << endl << endl;
351
352

  try{
353

354
355
356
357
358
359
360
361
      // Checking if path to config is supplied
      if (argc <= 1) {
          cout << "Please specify a path to the config-directory" << endl << endl;
          usage();
          exit(EXIT_FAILURE);
      }

      // Defining options
362
      const char* opts = "m:r:c:C:u:p:t:v:dDsaxh";
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378

      // Same mechanism as in DCDBPusher - checking if help string is requested before loading config
      char ret;
      while ((ret = getopt(argc, argv, opts)) != -1) {
          switch (ret)
          {
              case 'h':
                  usage();
                  exit(EXIT_FAILURE);
                  break;
              default:
                  //do nothing (other options are read later on)
                  break;
          }
      }

379
380
      initLogging();
      auto cmdSink = setupCmdLogger();
Alessio Netti's avatar
Logging    
Alessio Netti committed
381

Alessio Netti's avatar
Alessio Netti committed
382
383
      Configuration config(argv[argc - 1], "collectagent.conf");
      if( !config.readConfig() ) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
384
          LOG(fatal) << "Failed to read global configuration!";
385
386
387
          exit(EXIT_FAILURE);
      }

Alessio Netti's avatar
Alessio Netti committed
388
389
390
391
      // References to shorten access to config parameters
      Configuration& settings = config;
      cassandra_t& cassandraSettings = config.cassandraSettings;
      pluginSettings_t& pluginSettings = config.pluginSettings;
Micha Mueller's avatar
Micha Mueller committed
392
      serverSettings_t& restAPISettings = config.restAPISettings;
Alessio Netti's avatar
Alessio Netti committed
393
      analyticsSettings_t& analyticsSettings = config.analyticsSettings;
Alessio Netti's avatar
Alessio Netti committed
394
      
395
396
      optind = 1;
      while ((ret=getopt(argc, argv, opts))!=-1) {
397
          switch(ret) {
398
              case 'a':
399
                  pluginSettings.autoPublish = true;
400
                  break;
401
              case 'm':
Alessio Netti's avatar
Alessio Netti committed
402
403
404
                  settings.mqttListenHost = parseNetworkHost(optarg);
                  settings.mqttListenPort = parseNetworkPort(optarg);
                  if(settings.mqttListenPort=="") settings.mqttListenPort = string(LISTENPORT);
405
                  break;
406
              case 'c':
Alessio Netti's avatar
Alessio Netti committed
407
408
409
                  cassandraSettings.host = parseNetworkHost(optarg);
                  cassandraSettings.port = parseNetworkPort(optarg);
                  if(cassandraSettings.port=="") cassandraSettings.port = string(CASSANDRAPORT);
410
                  break;
Michael Ott's avatar
Michael Ott committed
411
              case 'u':
Alessio Netti's avatar
Alessio Netti committed
412
                  cassandraSettings.username = optarg;
413
                  break;
Michael Ott's avatar
Michael Ott committed
414
              case 'p': {
Alessio Netti's avatar
Alessio Netti committed
415
                  cassandraSettings.password = optarg;
416
                  // What does this do? Mask the password?
Michael Ott's avatar
Michael Ott committed
417
418
419
420
421
422
423
                  size_t pwdLen = strlen(optarg);
                  memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
                  if (pwdLen > 3) {
                      memset(optarg+3, 0, pwdLen-3);
                  }
                  break;
              }
424
              case 't':
Alessio Netti's avatar
Alessio Netti committed
425
                  cassandraSettings.ttl = stoul(optarg);
426
                  break;
Alessio Netti's avatar
Logging    
Alessio Netti committed
427
              case 'v':
428
                  settings.logLevelCmd = stoi(optarg);
Alessio Netti's avatar
Logging    
Alessio Netti committed
429
                  break;
430
              case 'd':
431
              case 'D':
432
                  settings.daemonize = 1;
433
                  break;
434
              case 's':
435
                  settings.statistics = 1;
436
                  break;
437
              case 'x':
Alessio Netti's avatar
Alessio Netti committed
438
                  settings.validateConfig = true;
439
                  break;
440
              case 'h':
441
442
443
              default:
                  usage();
                  exit(EXIT_FAILURE);
444
445
446
          }
      }

447
448
      //set up logger to file
      if (settings.logLevelFile >= 0) {
Alessio Netti's avatar
Alessio Netti committed
449
	  auto fileSink = setupFileLogger(pluginSettings.tempdir, std::string("collectagent"));
450
451
452
	  fileSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelFile));
      }
      
Alessio Netti's avatar
Logging    
Alessio Netti committed
453
      //severity level may be overwritten (per option or config-file) --> set it according to globalSettings
454
455
456
      if (settings.logLevelCmd >= 0) {
	  cmdSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelCmd));
      }
Alessio Netti's avatar
Logging    
Alessio Netti committed
457

458
      /*
Alessio Netti's avatar
Alessio Netti committed
459
       * Catch SIGINT and SIGTERM signals to allow for proper server shutdowns.
460
461
       */
      signal(SIGINT, sigHandler);
Alessio Netti's avatar
Alessio Netti committed
462
      signal(SIGTERM, sigHandler);
463
464
465
466
467
468
469
470
471
472

      /*
       * Catch critical signals to allow for backtraces
       */
      signal(SIGABRT, abrtHandler);
      signal(SIGSEGV, abrtHandler);

      // Daemonizing the collectagent
      if(settings.daemonize)
          dcdbdaemon();
Alessio Netti's avatar
Alessio Netti committed
473
      
474
      // Setting the size of the sensor cache
475
      // Conversion from milliseconds to nanoseconds
Alessio Netti's avatar
Alessio Netti committed
476
      mySensorCache.setMaxHistory(uint64_t(pluginSettings.cacheInterval) * 1000000);
477

478
      //Allocate and initialize connection to Cassandra.
Alessio Netti's avatar
Alessio Netti committed
479
480
      dcdbConn = new DCDB::Connection(cassandraSettings.host, atoi(cassandraSettings.port.c_str()), 
                                      cassandraSettings.username, cassandraSettings.password);
Alessio Netti's avatar
Alessio Netti committed
481
482
      dcdbConn->setNumThreadsIo(cassandraSettings.numThreadsIo);
      dcdbConn->setQueueSizeIo(cassandraSettings.queueSizeIo);
483
      uint32_t params[1] = {cassandraSettings.coreConnPerHost};
Alessio Netti's avatar
Alessio Netti committed
484
      dcdbConn->setBackendParams(params);
Alessio Netti's avatar
Alessio Netti committed
485
      
Axel Auweter's avatar
Axel Auweter committed
486
      if (!dcdbConn->connect()) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
487
          LOG(fatal) << "Cannot connect to Cassandra!";
488
489
490
491
492
493
          exit(EXIT_FAILURE);
      }

      /*
       * Legacy behavior: Initialize the DCDB schema in Cassandra.
       */
Axel Auweter's avatar
Axel Auweter committed
494
      dcdbConn->initSchema();
Alessio Netti's avatar
Alessio Netti committed
495
      
496
497
498
      /*
       * Allocate the SensorDataStore.
       */
499
      mySensorDataStore = new DCDB::SensorDataStore(dcdbConn);
500
      mySensorConfig = new DCDB::SensorConfig(dcdbConn);
Alessio Netti's avatar
Alessio Netti committed
501
      myJobDataStore = new DCDB::JobDataStore(dcdbConn);
502
503
504

      /*
       * Set TTL for data store inserts if TTL > 0.
505
       */
Alessio Netti's avatar
Alessio Netti committed
506
507
508
      if (cassandraSettings.ttl > 0)
        mySensorDataStore->setTTL(cassandraSettings.ttl);
      mySensorDataStore->setDebugLog(cassandraSettings.debugLog);
509

510
511
512
      // Fetching public sensor information from the Cassandra datastore
      list<DCDB::PublicSensor> publicSensors;
      metadataStore = new MetadataStore();
513
514
      if(mySensorConfig->getPublicSensorsVerbose(publicSensors)!=SC_OK)
          LOG(error) << "Failed to retrieve public sensors. Metadata Store and Sensor Navigator will be empty.";
515
      SensorMetadata sBuf;
516
      for (const auto &s : publicSensors)
517
518
519
520
          if (!s.is_virtual) {
              sBuf = Configuration::publicSensorToMetadata(s);
              metadataStore->store(sBuf.pattern, sBuf);
          }
521
          
522
523
      analyticsController = new AnalyticsController(mySensorConfig, mySensorDataStore);
      analyticsController->setCache(&mySensorCache);
524
      analyticsController->setMetadataStore(metadataStore);
Alessio Netti's avatar
Alessio Netti committed
525
      queryEngine.setFilter(analyticsSettings.filter);
Alessio Netti's avatar
Alessio Netti committed
526
      queryEngine.setJobFilter(analyticsSettings.jobFilter);
Alessio Netti's avatar
Alessio Netti committed
527
      queryEngine.setSensorHierarchy(analyticsSettings.hierarchy);
528
      queryEngine.setQueryCallback(sensorQueryCallback);
Alessio Netti's avatar
Alessio Netti committed
529
      queryEngine.setJobQueryCallback(jobQueryCallback);
530
      if(!analyticsController->initialize(settings, argv[argc - 1]))
531
532
          return EXIT_FAILURE;
      
Alessio Netti's avatar
Alessio Netti committed
533
      LOG_LEVEL vLogLevel = settings.validateConfig ? LOG_LEVEL::info : LOG_LEVEL::debug;
534
535
536
537
      LOG_VAR(vLogLevel) << "-----  Configuration  -----";

      //print global settings in either case
      LOG(info) << "Global Settings:";
Alessio Netti's avatar
Alessio Netti committed
538
      LOG(info) << "    MQTT-listenAddress: " << settings.mqttListenHost << ":" << settings.mqttListenPort;
Alessio Netti's avatar
Alessio Netti committed
539
      LOG(info) << "    CacheInterval:      " << int(pluginSettings.cacheInterval/1000) << " [s]";
540
541
542
543
544
      LOG(info) << "    CleaningInterval:   " << settings.cleaningInterval << " [s]";
      LOG(info) << "    MessageThreads:     " << settings.messageThreads;
      LOG(info) << "    MessageSlots:       " << settings.messageSlots;
      LOG(info) << "    Daemonize:          " << (settings.daemonize ? "Enabled" : "Disabled");
      LOG(info) << "    Statistics:         " << (settings.statistics ? "Enabled" : "Disabled");
Alessio Netti's avatar
Alessio Netti committed
545
      LOG(info) << "    MQTT-prefix:        " << pluginSettings.mqttPrefix;
546
      LOG(info) << "    Auto-publish:       " << (pluginSettings.autoPublish ? "Enabled" : "Disabled");
Alessio Netti's avatar
Alessio Netti committed
547
548
      LOG(info) << "    Write-Dir:          " << pluginSettings.tempdir;
      LOG(info) << (settings.validateConfig ? "    Only validating config files." : "    ValidateConfig:     Disabled");
549

Alessio Netti's avatar
Alessio Netti committed
550
551
      LOG(info) << "Analytics Settings:";
      LOG(info) << "    Hierarchy:          " << (analyticsSettings.hierarchy!="" ? analyticsSettings.hierarchy : "none");
552
      LOG(info) << "    Filter:             " << (analyticsSettings.filter!="" ? analyticsSettings.filter : "none");
Alessio Netti's avatar
Alessio Netti committed
553
      
554
      LOG(info) << "Cassandra Driver Settings:";
Alessio Netti's avatar
Alessio Netti committed
555
      LOG(info) << "    Address:            " << cassandraSettings.host << ":" << cassandraSettings.port;
Alessio Netti's avatar
Alessio Netti committed
556
557
558
559
560
      LOG(info) << "    TTL:                " << cassandraSettings.ttl;
      LOG(info) << "    NumThreadsIO:       " << cassandraSettings.numThreadsIo;
      LOG(info) << "    QueueSizeIO:        " << cassandraSettings.queueSizeIo;
      LOG(info) << "    CoreConnPerHost:    " << cassandraSettings.coreConnPerHost;
      LOG(info) << "    DebugLog:           " << (cassandraSettings.debugLog ? "Enabled" : "Disabled");
561
#ifdef SimpleMQTTVerbose
Alessio Netti's avatar
Alessio Netti committed
562
563
      LOG(info) << "    Username:           " << cassandraSettings.username;
	  LOG(info) << "    Password:           " << cassandraSettings.password;
564
565
566
567
#else
      LOG(info) << "    Username and password not printed.";
#endif

568
569
570
571
572
      if (restAPISettings.enabled) {
	  
	  LOG(info) << "RestAPI Settings:";
	  LOG(info) << "    REST Server: " << restAPISettings.host << ":" << restAPISettings.port;
	  LOG(info) << "    Certificate: " << restAPISettings.certificate;
Alessio Netti's avatar
Alessio Netti committed
573
574
	  LOG(info) << "    Private key file: " << restAPISettings.privateKey;
	  LOG(info) << "    DH params from: " << restAPISettings.dhFile;
575
      }
576
577
      LOG_VAR(vLogLevel) << "-----  Analytics Configuration  -----";
      for(auto& p : analyticsController->getManager()->getPlugins()) {
578
          LOG_VAR(vLogLevel) << "Operator Plugin \"" << p.id << "\"";
579
580
581
582
          p.configurator->printConfig(vLogLevel);
      }
      LOG_VAR(vLogLevel) << "-----  End Configuration  -----";

Alessio Netti's avatar
Alessio Netti committed
583
      if (settings.validateConfig)
584
585
586
587
          return EXIT_SUCCESS;
      else
          analyticsController->start();

588
589
590
      /*
       * Start the MQTT Message Server.
       */
Alessio Netti's avatar
Alessio Netti committed
591
      SimpleMQTTServer ms(settings.mqttListenHost, settings.mqttListenPort, settings.messageThreads, settings.messageSlots);
592
      
593
594
595
      ms.setMessageCallback(mqttCallback);
      ms.start();

Alessio Netti's avatar
Logging    
Alessio Netti committed
596
      LOG(info) << "MQTT Server running...";
597
      
598
599
600
      /*
       * Start the HTTP Server for the REST API
       */
601
      CARestAPI* httpsServer = nullptr;
602
      if (restAPISettings.enabled) {
603
604
605
606
          httpsServer = new CARestAPI(restAPISettings, &mySensorCache, analyticsController);
          config.readRestAPIUsers(httpsServer);
          httpsServer->start();
          LOG(info) <<  "HTTP Server running...";
607
      }
608

609
610
611
612
613
614
      /*
       * Run (hopefully) forever...
       */
      keepRunning = 1;
      timeval start, end;
      double elapsed;
615
616
617
      msgCtr = 0;
      pmsgCtr = 0;
      readingCtr = 0;
618

Alessio Netti's avatar
Alessio Netti committed
619
620
621
      gettimeofday(&start, NULL);
      uint64_t lastCleanup = start.tv_sec;

622
      LOG(info) << "Collect Agent running...";
623
624
      while(keepRunning) {
          gettimeofday(&start, NULL);
Alessio Netti's avatar
Alessio Netti committed
625
626
627
628
629
630
631
          if(start.tv_sec - lastCleanup > settings.cleaningInterval) {
              uint64_t purged = mySensorCache.clean(settings.cleaningInterval * 1000000000);
              lastCleanup = start.tv_sec;
              if(purged > 0)
                  LOG(info) << "Cache: purged " << purged << " obsolete entries";
          }

632
          sleep(60);
633
634
635
636
          /* not really thread safe but will do the job */
          gettimeofday(&end, NULL);
          elapsed = (end.tv_sec - start.tv_sec) * 1000.0;
          elapsed += (end.tv_usec - start.tv_usec) / 1000.0;
637
          float publish = msgCtr?(pmsgCtr*100.0)/msgCtr:0;
Alessio Netti's avatar
Alessio Netti committed
638
          if (settings.statistics && keepRunning) {
639
              LOG(info) << "Performance: " << (readingCtr/elapsed)*1000.0 << " inserts/s, " << (msgCtr/elapsed)*1000.0 << " messages/s (" << publish << "% PUBLISH)";
640
              LOG(info) << "Analytics Performance: " << (analyticsController->getReadingCtr()/elapsed)*1000.0 << " inserts/s ";
641
          }
642
          msgCtr = 0;
643
          pmsgCtr = 0;
644
	  readingCtr = 0;
645
646
      }

Alessio Netti's avatar
Logging    
Alessio Netti committed
647
      LOG(info) << "Stopping...";
648
      analyticsController->stop();
649
      ms.stop();
Alessio Netti's avatar
Logging    
Alessio Netti committed
650
      LOG(info) << "MQTT Server stopped...";
651
652
653
654
655
      if (restAPISettings.enabled) {
	  httpsServer->stop();
	  delete httpsServer;
	  LOG(info) << "HTTP Server stopped...";
      }
656
      delete mySensorDataStore;
Alessio Netti's avatar
Alessio Netti committed
657
      delete myJobDataStore;
658
      delete mySensorConfig;
Axel Auweter's avatar
Axel Auweter committed
659
660
      dcdbConn->disconnect();
      delete dcdbConn;
661
      delete metadataStore;
Alessio Netti's avatar
Logging    
Alessio Netti committed
662
      LOG(info) << "Collect Agent closed. Bye bye...";
663
  }
664
  catch (const exception& e) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
665
      LOG(fatal) << "Exception: " << e.what();
666
      abrt(EXIT_FAILURE, INTERR);
667
668
  }

669
  return EXIT_SUCCESS;
670
}
671
672