Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

collectagent.cpp 23.7 KB
Newer Older
1
2
//================================================================================
// Name        : collectagent.cpp
3
// Author      : Axel Auweter
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
// Copyright   : Leibniz Supercomputing Centre
6
// Description : Main code of the CollectAgent
7
8
9
10
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
11
// Copyright (C) 2011-2019 Leibniz Supercomputing Centre
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
27

28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
 * @defgroup ca Collect Agent
 *
 * @brief MQTT message broker in between pusher and storage backend.
 *
 * @details Collect Agent is a intermediary between one or multiple Pusher
 *          instances and one storage backend. It runs a reduced custom MQTT
 *          message server. Collect Agent receives data from Pusher
 *          via MQTT messages and stores them in the storage via libdcdb.
 */

/**
 * @file collectagent.cpp
 *
 * @brief Main function for the DCDB Collect Agent.
 *
 * @ingroup ca
 */

47
#include <cstdlib>
48
#include <signal.h>
49
50
#include <unistd.h>
#include <string>
51

52
#include <boost/date_time/posix_time/posix_time.hpp>
53

54
#include <dcdb/connection.h>
55
#include <dcdb/sensordatastore.h>
Alessio Netti's avatar
Alessio Netti committed
56
#include <dcdb/jobdatastore.h>
57
#include <dcdb/sensorconfig.h>
58
#include <dcdb/version.h>
59
#include <dcdb/sensor.h>
60
#include "version.h"
61

62
#include "CARestAPI.h"
63
#include "configuration.h"
64
#include "simplemqttserver.h"
65
#include "messaging.h"
66
#include "abrt.h"
67
#include "dcdbdaemon.h"
68
#include "sensorcache.h"
69
70
#include "analyticscontroller.h"
#include "../analytics/includes/QueryEngine.h"
71

72
73
74
#define __STDC_FORMAT_MACROS
#include <inttypes.h>

75
using namespace std;
76
77

int keepRunning;
78
bool statistics;
79
80
uint64_t msgCtr;
uint64_t pmsgCtr;
81
uint64_t readingCtr;
Alessio Netti's avatar
Alessio Netti committed
82
SensorCache mySensorCache;
83
AnalyticsController* analyticsController;
84
DCDB::Connection* dcdbConn;
85
DCDB::SensorDataStore *mySensorDataStore;
Alessio Netti's avatar
Alessio Netti committed
86
DCDB::JobDataStore *myJobDataStore;
87
88
DCDB::SensorConfig *mySensorConfig;
DCDB::SCError err;
89
QueryEngine& queryEngine = QueryEngine::getInstance();
90
logger_t lg;
91

92
bool jobQueryCallback(const uint32_t jobId, const uint64_t startTs, const uint64_t endTs, vector<qeJobData>& buffer, const bool rel, const bool range) {
Alessio Netti's avatar
Alessio Netti committed
93
94
95
96
97
98
99
100
101
102
    std::list<JobData> tempList;
    JobData   tempData;
    qeJobData tempQeData;
    JDError err;
    if(range) {
        // Getting a list of jobs in the given time range
        uint64_t now = getTimestamp();
        uint64_t startTsInt = rel ? now - startTs : startTs;
        uint64_t endTsInt = rel ? now - endTs : endTs;
        DCDB::TimeStamp start(startTsInt), end(endTsInt);
103
        err = myJobDataStore->getJobsInIntervalRunning(tempList, start, end);
104
        if(err != JD_OK) return false;
Alessio Netti's avatar
Alessio Netti committed
105
106
107
    } else {
        // Getting a single job by id
        err = myJobDataStore->getJobById(tempData, jobId);
108
        if(err != JD_OK) return false;
Alessio Netti's avatar
Alessio Netti committed
109
110
111
112
113
114
115
116
117
        tempList.push_back(tempData);
    }
    
    for(auto& jd : tempList) {
        tempQeData.jobId = jd.jobId;
        tempQeData.userId = jd.userId;
        tempQeData.startTime = jd.startTime.getRaw();
        tempQeData.endTime = jd.endTime.getRaw();
        tempQeData.nodes = jd.nodes;
118
        buffer.push_back(tempQeData);
Alessio Netti's avatar
Alessio Netti committed
119
    }
120
    return true;
Alessio Netti's avatar
Alessio Netti committed
121
122
}

123
bool sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>& buffer, const bool rel) {
124
    std::string topic;
Alessio Netti's avatar
Alessio Netti committed
125
    // Getting the topic of the queried sensor from the Navigator
126
127
128
    try {
        topic = queryEngine.getNavigator()->getNodeTopic(name);
    } catch(const std::domain_error& e) {
129
        return false;
130
131
    }
    DCDB::SensorId sid;
Alessio Netti's avatar
Alessio Netti committed
132
    // Creating a SID to perform the query
133
134
135
    sid.mqttTopicConvert(topic);
    if(mySensorCache.getSensorMap().count(sid) > 0) {
        CacheEntry &entry = mySensorCache.getSensorMap()[sid];
136
137
        if (entry.getView(startTs, endTs, buffer, rel))
            return true;
138
139
140
141
142
143
144
145
146
147
148
149
150
    }
    // If we are here then the sensor was not found in the cache - we need to fetch data from Cassandra
    try {
        DCDB::PublicSensor publicSensor;
        publicSensor.name = name;
        publicSensor.pattern = topic;
        std::list <DCDB::SensorDataStoreReading> results;
        DCDB::Sensor sensor(dcdbConn, publicSensor);
        uint64_t now = getTimestamp();
        //Converting relative timestamps to absolute
        uint64_t startTsInt = rel ? now - startTs : startTs;
        uint64_t endTsInt = rel ? now - endTs : endTs;
        DCDB::TimeStamp start(startTsInt), end(endTsInt);
151
        sensor.query(results, start, end, DCDB::AGGREGATE_NONE, 10000000000);
152
153
        if(results.empty())
            return false;
154
155
156
157
        reading_t reading;
        for (const auto &r : results) {
            reading.value = r.value;
            reading.timestamp = r.timeStamp.getRaw();
158
            buffer.push_back(reading);
159
160
161
        }
    }
    catch(const std::exception& e) {
162
        return false;
163
    }
164
    return true;
165
166
}

167
/* Normal termination (SIGINT, CTRL+C) */
168
169
void sigHandler(int sig)
{
Alessio Netti's avatar
Alessio Netti committed
170
171
172
173
174
  boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
  if( sig == SIGINT )
      LOG(fatal) << "Received SIGINT";
  else if( sig == SIGTERM )
      LOG(fatal) << "Received SIGTERM";
175
176
177
  keepRunning = 0;
}

178
179
180
181
182
183
/* Crash */
void abrtHandler(int sig)
{
  abrt(EXIT_FAILURE, SIGNAL);
}

184
int mqttCallback(SimpleMQTTMessage *msg)
185
186
187
188
{
  /*
   * Increment the msgCtr/vmsgCtr for statistics.
   */
189
  msgCtr++;
190
191
192
  if (msg->isPublish())
    pmsgCtr++;

193
  uint64_t len;
194
195
196
  /*
   * Decode the message and put into the database.
   */
197
  if (msg->isPublish()) {
198
199
200
201
      const char *topic = msg->getTopic().c_str();
      // We check whether the topic includes the \DCDB_MAP\ keyword, indicating that the payload will contain the
      // sensor's name. In that case, we set the mappingMessage flag to true, and filter the keyword out of the prefix
      // We use strncmp as it is the most efficient way to do it
Alessio Netti's avatar
Alessio Netti committed
202
      if (strncmp(topic, DCDB_MAP, DCDB_MAP_LEN) == 0) {
203
          if ((len = msg->getPayloadLength()) == 0) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
204
              LOG(error) << "Empty topic-to-name mapping message received";
205
206
              return 1;
          }
207

208
209
210
211
212
213
          string sensorName((char *) msg->getPayload(), len);
          err = mySensorConfig->publishSensor(sensorName.c_str(), topic + DCDB_MAP_LEN);

          // PublishSensor does most of the error checking for us
          switch (err) {
              case DCDB::SC_INVALIDPATTERN:
Alessio Netti's avatar
Logging    
Alessio Netti committed
214
                  LOG(error) << "Invalid sensor topic : " << msg->getTopic();
215
216
                  return 1;
              case DCDB::SC_INVALIDPUBLICNAME:
Alessio Netti's avatar
Logging    
Alessio Netti committed
217
                  LOG(error) << "Invalid sensor public name: " << sensorName;
218
219
                  return 1;
              case DCDB::SC_INVALIDSESSION:
Alessio Netti's avatar
Logging    
Alessio Netti committed
220
                  LOG(error) << "Cannot reach sensor data store.";
221
222
223
224
225
226
227
228
229
230
231
                  return 1;
              default:
                  break;
          }
      } else {
          mqttPayload buf, *payload;

          len = msg->getPayloadLength();
          //In the 64 bit message case, the collect agent provides a timestamp
          if (len == sizeof(uint64_t)) {
              payload = &buf;
232
              payload->value = *((int64_t *) msg->getPayload());
233
234
              payload->timestamp = Messaging::calculateTimestamp();
              len = sizeof(uint64_t) * 2;
235
          }
236
237
238
239
240
241
              //...otherwise it just retrieves it from the MQTT message payload.
          else if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
              payload = (mqttPayload *) msg->getPayload();
          }
              //...otherwise this message is malformed -> ignore...
          else {
Alessio Netti's avatar
Logging    
Alessio Netti committed
242
              LOG(error) << "Message malformed";
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
              return 1;
          }

          /*
           * Check if we can decode the message topic
           * into a valid SensorId. If successful, store
           * the record in the database.
           */
          DCDB::SensorId sid;
          if (sid.mqttTopicConvert(msg->getTopic())) {
#if 0
              cout << "Topic decode successful:" << endl
                  << "  Raw:            " << hex << setw(16) << setfill('0') << sid.getRaw()[0] << hex << setw(16) << setfill('0') << sid.getRaw()[1] << endl
                  << "  DeviceLocation: " << hex << setw(16) << setfill('0') << sid.getDeviceLocation() << endl
                  << "  device_id:      " << hex << setw(8) << setfill('0') << sid.getDeviceSensorId().device_id << endl
                  << "  sensor_number:  " << hex << setw(4) << setfill('0') << sid.getDeviceSensorId().sensor_number << endl << dec;

              cout << "Payload ("  << len/sizeof(mqttPayload) << " messages):"<< endl;
              for (uint64_t i=0; i<len/sizeof(mqttPayload); i++) {
                cout << "  " << i << ": ts=" << payload[i].timestamp << " val=" << payload[i].value << endl;
              }
              cout << endl;
265
#endif
266
	      std::list<DCDB::SensorDataStoreReading> readings;
Alessio Netti's avatar
Alessio Netti committed
267
268
269
270
271
272
          for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
              DCDB::SensorDataStoreReading r(sid, payload[i].timestamp, payload[i].value);
              readings.push_back(r);
              mySensorCache.storeSensor(sid, payload[i].timestamp, payload[i].value);
          }
          mySensorCache.getSensorMap()[sid].updateBatchSize(uint64_t(len / sizeof(mqttPayload)));
273
	      mySensorDataStore->insertBatch(readings);
274
	      readingCtr+= readings.size();
275

276
              //mySensorCache.dump();
277
          }
278
#if 1
279
280
281
282
          else {
              cout << "Wrong topic format: " << msg->getTopic() << "\n";
              return 1;
          }
283
#endif
284
      }
285
  }
286
  return 0;
287
288
}

289
290
291



292
293
294
295
/*
 * Print usage information
 */
void usage() {
Alessio Netti's avatar
Alessio Netti committed
296
  Configuration config("", "collectagent.conf");
297
298
299
300
301
  /*
             1         2         3         4         5         6         7         8
   012345678901234567890123456789012345678901234567890123456789012345678901234567890
   */
  cout << "Usage:" << endl;
302
  cout << "  collectagent [-d] [-s] [-x] [-a] [-m<host>] [-c<host>] [-u<username>] [-p<password>] [-t<ttl>] [-v<verbosity>] <path/to/configfiles/>" << endl;
303
304
305
306
  cout << "  collectagent -h" << endl;
  cout << endl;
  
  cout << "Options:" << endl;
Alessio Netti's avatar
Alessio Netti committed
307
308
  cout << "  -m<host>      MQTT listen address     [default: " << config.mqttListenHost << ":" << config.mqttListenPort << "]" << endl;
  cout << "  -c<host>      Cassandra host          [default: " << config.cassandraSettings.host << ":" << config.cassandraSettings.port << "]" << endl;
Michael Ott's avatar
Michael Ott committed
309
310
  cout << "  -u<username>  Cassandra username      [default: none]" << endl;
  cout << "  -p<password>  Cassandra password      [default: none]" << endl;
Alessio Netti's avatar
Alessio Netti committed
311
312
  cout << "  -t<ttl>       Cassandra insert TTL    [default: " << config.cassandraSettings.ttl << "]" << endl;
  cout << "  -v<level>     Set verbosity of output [default: " << config.logLevelCmd << "]" << endl
Alessio Netti's avatar
Logging    
Alessio Netti committed
313
       << "                Can be a number between 5 (all) and 0 (fatal)." << endl;
314
  cout << endl;
Michael Ott's avatar
Michael Ott committed
315
  cout << "  -d            Daemonize" << endl;
316
  cout << "  -s            Print message stats" <<endl;
317
  cout << "  -x            Parse and print the config but do not actually start collectagent" << endl;
318
  cout << "  -a			   Enable sensor auto-publish" << endl;
Michael Ott's avatar
Michael Ott committed
319
  cout << "  -h            This help page" << endl;
320
  cout << endl;
321
322
323
}

int main(int argc, char* const argv[]) {
324
    cout << "CollectAgent " << VERSION << " (libdcdb " << DCDB::Version::getVersion() << ")" << endl << endl;
325
326

  try{
327

328
329
330
331
332
333
334
335
      // Checking if path to config is supplied
      if (argc <= 1) {
          cout << "Please specify a path to the config-directory" << endl << endl;
          usage();
          exit(EXIT_FAILURE);
      }

      // Defining options
336
      const char* opts = "m:r:c:C:u:p:t:v:dDsaxh";
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352

      // Same mechanism as in DCDBPusher - checking if help string is requested before loading config
      char ret;
      while ((ret = getopt(argc, argv, opts)) != -1) {
          switch (ret)
          {
              case 'h':
                  usage();
                  exit(EXIT_FAILURE);
                  break;
              default:
                  //do nothing (other options are read later on)
                  break;
          }
      }

353
354
      initLogging();
      auto cmdSink = setupCmdLogger();
Alessio Netti's avatar
Logging    
Alessio Netti committed
355

Alessio Netti's avatar
Alessio Netti committed
356
357
      Configuration config(argv[argc - 1], "collectagent.conf");
      if( !config.readConfig() ) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
358
          LOG(fatal) << "Failed to read global configuration!";
359
360
361
          exit(EXIT_FAILURE);
      }

Alessio Netti's avatar
Alessio Netti committed
362
363
364
365
      // References to shorten access to config parameters
      Configuration& settings = config;
      cassandra_t& cassandraSettings = config.cassandraSettings;
      pluginSettings_t& pluginSettings = config.pluginSettings;
Micha Mueller's avatar
Micha Mueller committed
366
      serverSettings_t& restAPISettings = config.restAPISettings;
Alessio Netti's avatar
Alessio Netti committed
367
      analyticsSettings_t& analyticsSettings = config.analyticsSettings;
Alessio Netti's avatar
Alessio Netti committed
368
      
369
370
      optind = 1;
      while ((ret=getopt(argc, argv, opts))!=-1) {
371
          switch(ret) {
372
              case 'a':
373
                  pluginSettings.autoPublish = true;
374
                  break;
375
              case 'm':
Alessio Netti's avatar
Alessio Netti committed
376
377
378
                  settings.mqttListenHost = parseNetworkHost(optarg);
                  settings.mqttListenPort = parseNetworkPort(optarg);
                  if(settings.mqttListenPort=="") settings.mqttListenPort = string(LISTENPORT);
379
                  break;
380
              case 'c':
Alessio Netti's avatar
Alessio Netti committed
381
382
383
                  cassandraSettings.host = parseNetworkHost(optarg);
                  cassandraSettings.port = parseNetworkPort(optarg);
                  if(cassandraSettings.port=="") cassandraSettings.port = string(CASSANDRAPORT);
384
                  break;
Michael Ott's avatar
Michael Ott committed
385
              case 'u':
Alessio Netti's avatar
Alessio Netti committed
386
                  cassandraSettings.username = optarg;
387
                  break;
Michael Ott's avatar
Michael Ott committed
388
              case 'p': {
Alessio Netti's avatar
Alessio Netti committed
389
                  cassandraSettings.password = optarg;
390
                  // What does this do? Mask the password?
Michael Ott's avatar
Michael Ott committed
391
392
393
394
395
396
397
                  size_t pwdLen = strlen(optarg);
                  memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
                  if (pwdLen > 3) {
                      memset(optarg+3, 0, pwdLen-3);
                  }
                  break;
              }
398
              case 't':
Alessio Netti's avatar
Alessio Netti committed
399
                  cassandraSettings.ttl = stoul(optarg);
400
                  break;
Alessio Netti's avatar
Logging    
Alessio Netti committed
401
              case 'v':
402
                  settings.logLevelCmd = stoi(optarg);
Alessio Netti's avatar
Logging    
Alessio Netti committed
403
                  break;
404
              case 'd':
405
              case 'D':
406
                  settings.daemonize = 1;
407
                  break;
408
              case 's':
409
                  settings.statistics = 1;
410
                  break;
411
              case 'x':
Alessio Netti's avatar
Alessio Netti committed
412
                  settings.validateConfig = true;
413
                  break;
414
              case 'h':
415
416
417
              default:
                  usage();
                  exit(EXIT_FAILURE);
418
419
420
          }
      }

421
422
      //set up logger to file
      if (settings.logLevelFile >= 0) {
Alessio Netti's avatar
Alessio Netti committed
423
	  auto fileSink = setupFileLogger(pluginSettings.tempdir, std::string("collectagent"));
424
425
426
	  fileSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelFile));
      }
      
Alessio Netti's avatar
Logging    
Alessio Netti committed
427
      //severity level may be overwritten (per option or config-file) --> set it according to globalSettings
428
429
430
      if (settings.logLevelCmd >= 0) {
	  cmdSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelCmd));
      }
Alessio Netti's avatar
Logging    
Alessio Netti committed
431

432
      /*
Alessio Netti's avatar
Alessio Netti committed
433
       * Catch SIGINT and SIGTERM signals to allow for proper server shutdowns.
434
435
       */
      signal(SIGINT, sigHandler);
Alessio Netti's avatar
Alessio Netti committed
436
      signal(SIGTERM, sigHandler);
437
438
439
440
441
442
443
444
445
446

      /*
       * Catch critical signals to allow for backtraces
       */
      signal(SIGABRT, abrtHandler);
      signal(SIGSEGV, abrtHandler);

      // Daemonizing the collectagent
      if(settings.daemonize)
          dcdbdaemon();
Alessio Netti's avatar
Alessio Netti committed
447
      
448
      // Setting the size of the sensor cache
449
      // Conversion from milliseconds to nanoseconds
Alessio Netti's avatar
Alessio Netti committed
450
      mySensorCache.setMaxHistory(uint64_t(pluginSettings.cacheInterval) * 1000000);
451

452
      //Allocate and initialize connection to Cassandra.
Alessio Netti's avatar
Alessio Netti committed
453
454
      dcdbConn = new DCDB::Connection(cassandraSettings.host, atoi(cassandraSettings.port.c_str()), 
                                      cassandraSettings.username, cassandraSettings.password);
Alessio Netti's avatar
Alessio Netti committed
455
456
      dcdbConn->setNumThreadsIo(cassandraSettings.numThreadsIo);
      dcdbConn->setQueueSizeIo(cassandraSettings.queueSizeIo);
457
      uint32_t params[1] = {cassandraSettings.coreConnPerHost};
Alessio Netti's avatar
Alessio Netti committed
458
      dcdbConn->setBackendParams(params);
Alessio Netti's avatar
Alessio Netti committed
459
      
Axel Auweter's avatar
Axel Auweter committed
460
      if (!dcdbConn->connect()) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
461
          LOG(fatal) << "Cannot connect to Cassandra!";
462
463
464
465
466
467
          exit(EXIT_FAILURE);
      }

      /*
       * Legacy behavior: Initialize the DCDB schema in Cassandra.
       */
Axel Auweter's avatar
Axel Auweter committed
468
      dcdbConn->initSchema();
Alessio Netti's avatar
Alessio Netti committed
469
      
470
471
472
      /*
       * Allocate the SensorDataStore.
       */
473
      mySensorDataStore = new DCDB::SensorDataStore(dcdbConn);
474
      mySensorConfig = new DCDB::SensorConfig(dcdbConn);
Alessio Netti's avatar
Alessio Netti committed
475
      myJobDataStore = new DCDB::JobDataStore(dcdbConn);
476
477
478

      /*
       * Set TTL for data store inserts if TTL > 0.
479
       */
Alessio Netti's avatar
Alessio Netti committed
480
481
482
      if (cassandraSettings.ttl > 0)
        mySensorDataStore->setTTL(cassandraSettings.ttl);
      mySensorDataStore->setDebugLog(cassandraSettings.debugLog);
483

484
485
      analyticsController = new AnalyticsController(mySensorConfig, mySensorDataStore);
      analyticsController->setCache(&mySensorCache);
Alessio Netti's avatar
Alessio Netti committed
486
      queryEngine.setFilter(analyticsSettings.filter);
Alessio Netti's avatar
Alessio Netti committed
487
      queryEngine.setJobFilter(analyticsSettings.jobFilter);
Alessio Netti's avatar
Alessio Netti committed
488
      queryEngine.setSensorHierarchy(analyticsSettings.hierarchy);
489
      queryEngine.setQueryCallback(sensorQueryCallback);
Alessio Netti's avatar
Alessio Netti committed
490
      queryEngine.setJobQueryCallback(jobQueryCallback);
491
492
493
      if(!analyticsController->initialize(settings, argv[argc - 1]))
          return EXIT_FAILURE;
      
Alessio Netti's avatar
Alessio Netti committed
494
      LOG_LEVEL vLogLevel = settings.validateConfig ? LOG_LEVEL::info : LOG_LEVEL::debug;
495
496
497
498
      LOG_VAR(vLogLevel) << "-----  Configuration  -----";

      //print global settings in either case
      LOG(info) << "Global Settings:";
Alessio Netti's avatar
Alessio Netti committed
499
      LOG(info) << "    MQTT-listenAddress: " << settings.mqttListenHost << ":" << settings.mqttListenPort;
Alessio Netti's avatar
Alessio Netti committed
500
      LOG(info) << "    CacheInterval:      " << int(pluginSettings.cacheInterval/1000) << " [s]";
501
502
503
504
505
      LOG(info) << "    CleaningInterval:   " << settings.cleaningInterval << " [s]";
      LOG(info) << "    MessageThreads:     " << settings.messageThreads;
      LOG(info) << "    MessageSlots:       " << settings.messageSlots;
      LOG(info) << "    Daemonize:          " << (settings.daemonize ? "Enabled" : "Disabled");
      LOG(info) << "    Statistics:         " << (settings.statistics ? "Enabled" : "Disabled");
Alessio Netti's avatar
Alessio Netti committed
506
      LOG(info) << "    MQTT-prefix:        " << pluginSettings.mqttPrefix;
507
      LOG(info) << "    Auto-publish:       " << (pluginSettings.autoPublish ? "Enabled" : "Disabled");
Alessio Netti's avatar
Alessio Netti committed
508
509
      LOG(info) << "    Write-Dir:          " << pluginSettings.tempdir;
      LOG(info) << (settings.validateConfig ? "    Only validating config files." : "    ValidateConfig:     Disabled");
510

Alessio Netti's avatar
Alessio Netti committed
511
512
      LOG(info) << "Analytics Settings:";
      LOG(info) << "    Hierarchy:          " << (analyticsSettings.hierarchy!="" ? analyticsSettings.hierarchy : "none");
513
      LOG(info) << "    Filter:             " << (analyticsSettings.filter!="" ? analyticsSettings.filter : "none");
Alessio Netti's avatar
Alessio Netti committed
514
      
515
      LOG(info) << "Cassandra Driver Settings:";
Alessio Netti's avatar
Alessio Netti committed
516
      LOG(info) << "    Address:            " << cassandraSettings.host << ":" << cassandraSettings.port;
Alessio Netti's avatar
Alessio Netti committed
517
518
519
520
521
      LOG(info) << "    TTL:                " << cassandraSettings.ttl;
      LOG(info) << "    NumThreadsIO:       " << cassandraSettings.numThreadsIo;
      LOG(info) << "    QueueSizeIO:        " << cassandraSettings.queueSizeIo;
      LOG(info) << "    CoreConnPerHost:    " << cassandraSettings.coreConnPerHost;
      LOG(info) << "    DebugLog:           " << (cassandraSettings.debugLog ? "Enabled" : "Disabled");
522
#ifdef SimpleMQTTVerbose
Alessio Netti's avatar
Alessio Netti committed
523
524
      LOG(info) << "    Username:           " << cassandraSettings.username;
	  LOG(info) << "    Password:           " << cassandraSettings.password;
525
526
527
528
529
#else
      LOG(info) << "    Username and password not printed.";
#endif

      LOG(info) << "RestAPI Settings:";
Micha Mueller's avatar
Micha Mueller committed
530
      LOG(info) << "    REST Server: " << restAPISettings.host << ":" << restAPISettings.port;
Alessio Netti's avatar
Alessio Netti committed
531
532
533
534
535
536
537
#ifdef SimpleMQTTVerbose
      LOG(info) << "    Certificate: " << restAPISettings.certificate;
	  LOG(info) << "    Private key file: " << restAPISettings.privateKey;
	  LOG(info) << "    DH params from: " << restAPISettings.dhFile;
#else
      LOG(info) << "    Certificate, private key and DH-param file not printed.";
#endif
538
539
540

      LOG_VAR(vLogLevel) << "-----  Analytics Configuration  -----";
      for(auto& p : analyticsController->getManager()->getPlugins()) {
541
          LOG_VAR(vLogLevel) << "Operator Plugin \"" << p.id << "\"";
542
543
544
545
          p.configurator->printConfig(vLogLevel);
      }
      LOG_VAR(vLogLevel) << "-----  End Configuration  -----";

Alessio Netti's avatar
Alessio Netti committed
546
      if (settings.validateConfig)
547
548
549
550
          return EXIT_SUCCESS;
      else
          analyticsController->start();

551
552
553
      /*
       * Start the MQTT Message Server.
       */
Alessio Netti's avatar
Alessio Netti committed
554
      SimpleMQTTServer ms(settings.mqttListenHost, settings.mqttListenPort, settings.messageThreads, settings.messageSlots);
555
      
556
557
558
      ms.setMessageCallback(mqttCallback);
      ms.start();

Alessio Netti's avatar
Logging    
Alessio Netti committed
559
      LOG(info) << "MQTT Server running...";
560
      
561
562
563
      /*
       * Start the HTTP Server for the REST API
       */
564
      CARestAPI httpsServer(restAPISettings, &mySensorCache, analyticsController);
565
      config.readRestAPIUsers(&httpsServer);
566
      httpsServer.start();
567

Alessio Netti's avatar
Logging    
Alessio Netti committed
568
      LOG(info) <<  "HTTP Server running...";
569

570
571
572
573
574
575
      /*
       * Run (hopefully) forever...
       */
      keepRunning = 1;
      timeval start, end;
      double elapsed;
576
577
578
      msgCtr = 0;
      pmsgCtr = 0;
      readingCtr = 0;
579

Alessio Netti's avatar
Alessio Netti committed
580
581
582
      gettimeofday(&start, NULL);
      uint64_t lastCleanup = start.tv_sec;

583
      LOG(info) << "Collect Agent running...";
584
585
      while(keepRunning) {
          gettimeofday(&start, NULL);
Alessio Netti's avatar
Alessio Netti committed
586
587
588
589
590
591
592
          if(start.tv_sec - lastCleanup > settings.cleaningInterval) {
              uint64_t purged = mySensorCache.clean(settings.cleaningInterval * 1000000000);
              lastCleanup = start.tv_sec;
              if(purged > 0)
                  LOG(info) << "Cache: purged " << purged << " obsolete entries";
          }

593
          sleep(60);
594
595
596
597
          /* not really thread safe but will do the job */
          gettimeofday(&end, NULL);
          elapsed = (end.tv_sec - start.tv_sec) * 1000.0;
          elapsed += (end.tv_usec - start.tv_usec) / 1000.0;
598
          float publish = msgCtr?(pmsgCtr*100.0)/msgCtr:0;
Alessio Netti's avatar
Alessio Netti committed
599
          if (settings.statistics && keepRunning) {
600
              LOG(info) << "Performance: " << (readingCtr/elapsed)*1000.0 << " inserts/s, " << (msgCtr/elapsed)*1000.0 << " messages/s (" << publish << "% PUBLISH)";
601
              LOG(info) << "Analytics Performance: " << (analyticsController->getReadingCtr()/elapsed)*1000.0 << " inserts/s ";
602
          }
603
          msgCtr = 0;
604
          pmsgCtr = 0;
605
	  readingCtr = 0;
606
607
      }

Alessio Netti's avatar
Logging    
Alessio Netti committed
608
      LOG(info) << "Stopping...";
609
      analyticsController->stop();
610
      ms.stop();
Alessio Netti's avatar
Logging    
Alessio Netti committed
611
      LOG(info) << "MQTT Server stopped...";
612
      httpsServer.stop();
Alessio Netti's avatar
Logging    
Alessio Netti committed
613
      LOG(info) << "HTTP Server stopped...";
614
      delete mySensorDataStore;
Alessio Netti's avatar
Alessio Netti committed
615
      delete myJobDataStore;
616
      delete mySensorConfig;
Axel Auweter's avatar
Axel Auweter committed
617
618
      dcdbConn->disconnect();
      delete dcdbConn;
Alessio Netti's avatar
Logging    
Alessio Netti committed
619
      LOG(info) << "Collect Agent closed. Bye bye...";
620
  }
621
  catch (const exception& e) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
622
      LOG(fatal) << "Exception: " << e.what();
623
      abrt(EXIT_FAILURE, INTERR);
624
625
  }

626
  return EXIT_SUCCESS;
627
}
628
629