Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

collectagent.cpp 23.7 KB
Newer Older
1
2
//================================================================================
// Name        : collectagent.cpp
3
4
// Author      : Axel Auweter
// Copyright   : Leibniz Supercomputing Centre
5
// Description : Main code of the CollectAgent
6
7
8
9
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
10
// Copyright (C) 2011-2019 Leibniz Supercomputing Centre
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
26

27
#include <cstdlib>
28
#include <signal.h>
29
30
#include <unistd.h>
#include <string>
31

32
#include <boost/date_time/posix_time/posix_time.hpp>
33

34
#include <dcdb/connection.h>
35
#include <dcdb/sensordatastore.h>
Alessio Netti's avatar
Alessio Netti committed
36
#include <dcdb/jobdatastore.h>
37
#include <dcdb/sensorconfig.h>
38
#include <dcdb/version.h>
39
#include <dcdb/sensor.h>
40
#include "version.h"
41

42
#include "CARestAPI.h"
43
#include "configuration.h"
44
#include "simplemqttserver.h"
45
#include "messaging.h"
46
#include "abrt.h"
47
#include "dcdbdaemon.h"
48
#include "sensorcache.h"
49
50
#include "analyticscontroller.h"
#include "../analytics/includes/QueryEngine.h"
51

52
53
54
#define __STDC_FORMAT_MACROS
#include <inttypes.h>

55
using namespace std;
56
57

int keepRunning;
58
bool statistics;
59
60
uint64_t msgCtr;
uint64_t pmsgCtr;
61
uint64_t readingCtr;
Alessio Netti's avatar
Alessio Netti committed
62
SensorCache mySensorCache;
63
AnalyticsController* analyticsController;
64
DCDB::Connection* dcdbConn;
65
DCDB::SensorDataStore *mySensorDataStore;
Alessio Netti's avatar
Alessio Netti committed
66
DCDB::JobDataStore *myJobDataStore;
67
68
DCDB::SensorConfig *mySensorConfig;
DCDB::SCError err;
69
QueryEngine& queryEngine = QueryEngine::getInstance();
70
logger_t lg;
71

Alessio Netti's avatar
Alessio Netti committed
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
std::vector<qeJobData>* jobQueryCallback(const uint32_t jobId, const uint64_t startTs, const uint64_t endTs, vector<qeJobData>* buffer, const bool rel, const bool range) {
    std::list<JobData> tempList;
    JobData   tempData;
    qeJobData tempQeData;
    JDError err;
    if(range) {
        // Getting a list of jobs in the given time range
        uint64_t now = getTimestamp();
        uint64_t startTsInt = rel ? now - startTs : startTs;
        uint64_t endTsInt = rel ? now - endTs : endTs;
        DCDB::TimeStamp start(startTsInt), end(endTsInt);
        err = myJobDataStore->getJobsInIntervalExcl(tempList, start, end);
        if(err != JD_OK) return NULL;
    } else {
        // Getting a single job by id
        err = myJobDataStore->getJobById(tempData, jobId);
        if(err != JD_OK) return NULL;
        tempList.push_back(tempData);
    }
    
    if(!buffer)
        buffer = new std::vector<qeJobData>();
94
    //buffer->clear();
Alessio Netti's avatar
Alessio Netti committed
95
96
97
98
99
100
101
102
103
104
105
    for(auto& jd : tempList) {
        tempQeData.jobId = jd.jobId;
        tempQeData.userId = jd.userId;
        tempQeData.startTime = jd.startTime.getRaw();
        tempQeData.endTime = jd.endTime.getRaw();
        tempQeData.nodes = jd.nodes;
        buffer->push_back(tempQeData);
    }
    return buffer;
}

106
std::vector<reading_t>* sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>* buffer, const bool rel) {
107
    std::string topic;
Alessio Netti's avatar
Alessio Netti committed
108
    // Getting the topic of the queried sensor from the Navigator
109
110
111
112
113
114
115
    try {
        topic = queryEngine.getNavigator()->getNodeTopic(name);
    } catch(const std::domain_error& e) {
        return NULL;
    }
    std::vector <reading_t> *output = NULL;
    DCDB::SensorId sid;
Alessio Netti's avatar
Alessio Netti committed
116
    // Creating a SID to perform the query
117
118
119
    sid.mqttTopicConvert(topic);
    if(mySensorCache.getSensorMap().count(sid) > 0) {
        CacheEntry &entry = mySensorCache.getSensorMap()[sid];
120
121
        // Counting the number of elements in the buffer before accessing the cache
        size_t elCtr = (buffer==nullptr) ? 0 : buffer->size();
122
        output = entry.getView(startTs, endTs, buffer, rel);
123
        if (output->size() > elCtr)
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
            return output;
    }
    // If we are here then the sensor was not found in the cache - we need to fetch data from Cassandra
    try {
        DCDB::PublicSensor publicSensor;
        publicSensor.name = name;
        publicSensor.pattern = topic;
        std::list <DCDB::SensorDataStoreReading> results;
        DCDB::Sensor sensor(dcdbConn, publicSensor);
        uint64_t now = getTimestamp();
        //Converting relative timestamps to absolute
        uint64_t startTsInt = rel ? now - startTs : startTs;
        uint64_t endTsInt = rel ? now - endTs : endTs;
        DCDB::TimeStamp start(startTsInt), end(endTsInt);
        sensor.query(results, start, end, DCDB::AGGREGATE_NONE);
Alessio Netti's avatar
Alessio Netti committed
139
        // Dealing with allocations that may have been performed by the cache search
140
141
        if(!output)
            output = (buffer==nullptr) ? new std::vector<reading_t>() : buffer;
142
143
144
145
146
147
148
149
150
151
152
153
154
        reading_t reading;
        //TODO: fix when result contains only partial time range of the query
        for (const auto &r : results) {
            reading.value = r.value;
            reading.timestamp = r.timeStamp.getRaw();
            output->push_back(reading);
        }
    }
    catch(const std::exception& e) {
        if(!buffer && output) delete output;
        return NULL;
    }
    return output;
155
156
}

157
/* Normal termination (SIGINT, CTRL+C) */
158
159
void sigHandler(int sig)
{
Alessio Netti's avatar
Alessio Netti committed
160
161
162
163
164
  boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
  if( sig == SIGINT )
      LOG(fatal) << "Received SIGINT";
  else if( sig == SIGTERM )
      LOG(fatal) << "Received SIGTERM";
165
166
167
  keepRunning = 0;
}

168
169
170
171
172
173
/* Crash */
void abrtHandler(int sig)
{
  abrt(EXIT_FAILURE, SIGNAL);
}

174
int mqttCallback(SimpleMQTTMessage *msg)
175
176
177
178
{
  /*
   * Increment the msgCtr/vmsgCtr for statistics.
   */
179
  msgCtr++;
180
181
182
  if (msg->isPublish())
    pmsgCtr++;

183
  uint64_t len;
184
185
186
  /*
   * Decode the message and put into the database.
   */
187
  if (msg->isPublish()) {
188
189
190
191
      const char *topic = msg->getTopic().c_str();
      // We check whether the topic includes the \DCDB_MAP\ keyword, indicating that the payload will contain the
      // sensor's name. In that case, we set the mappingMessage flag to true, and filter the keyword out of the prefix
      // We use strncmp as it is the most efficient way to do it
Alessio Netti's avatar
Alessio Netti committed
192
      if (strncmp(topic, DCDB_MAP, DCDB_MAP_LEN) == 0) {
193
          if ((len = msg->getPayloadLength()) == 0) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
194
              LOG(error) << "Empty topic-to-name mapping message received";
195
196
              return 1;
          }
197

198
199
200
201
202
203
          string sensorName((char *) msg->getPayload(), len);
          err = mySensorConfig->publishSensor(sensorName.c_str(), topic + DCDB_MAP_LEN);

          // PublishSensor does most of the error checking for us
          switch (err) {
              case DCDB::SC_INVALIDPATTERN:
Alessio Netti's avatar
Logging    
Alessio Netti committed
204
                  LOG(error) << "Invalid sensor topic : " << msg->getTopic();
205
206
                  return 1;
              case DCDB::SC_INVALIDPUBLICNAME:
Alessio Netti's avatar
Logging    
Alessio Netti committed
207
                  LOG(error) << "Invalid sensor public name: " << sensorName;
208
209
                  return 1;
              case DCDB::SC_INVALIDSESSION:
Alessio Netti's avatar
Logging    
Alessio Netti committed
210
                  LOG(error) << "Cannot reach sensor data store.";
211
212
213
214
215
216
217
218
219
220
221
                  return 1;
              default:
                  break;
          }
      } else {
          mqttPayload buf, *payload;

          len = msg->getPayloadLength();
          //In the 64 bit message case, the collect agent provides a timestamp
          if (len == sizeof(uint64_t)) {
              payload = &buf;
222
              payload->value = *((int64_t *) msg->getPayload());
223
224
              payload->timestamp = Messaging::calculateTimestamp();
              len = sizeof(uint64_t) * 2;
225
          }
226
227
228
229
230
231
              //...otherwise it just retrieves it from the MQTT message payload.
          else if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
              payload = (mqttPayload *) msg->getPayload();
          }
              //...otherwise this message is malformed -> ignore...
          else {
Alessio Netti's avatar
Logging    
Alessio Netti committed
232
              LOG(error) << "Message malformed";
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
              return 1;
          }

          /*
           * Check if we can decode the message topic
           * into a valid SensorId. If successful, store
           * the record in the database.
           */
          DCDB::SensorId sid;
          if (sid.mqttTopicConvert(msg->getTopic())) {
#if 0
              cout << "Topic decode successful:" << endl
                  << "  Raw:            " << hex << setw(16) << setfill('0') << sid.getRaw()[0] << hex << setw(16) << setfill('0') << sid.getRaw()[1] << endl
                  << "  DeviceLocation: " << hex << setw(16) << setfill('0') << sid.getDeviceLocation() << endl
                  << "  device_id:      " << hex << setw(8) << setfill('0') << sid.getDeviceSensorId().device_id << endl
                  << "  sensor_number:  " << hex << setw(4) << setfill('0') << sid.getDeviceSensorId().sensor_number << endl << dec;

              cout << "Payload ("  << len/sizeof(mqttPayload) << " messages):"<< endl;
              for (uint64_t i=0; i<len/sizeof(mqttPayload); i++) {
                cout << "  " << i << ": ts=" << payload[i].timestamp << " val=" << payload[i].value << endl;
              }
              cout << endl;
255
#endif
256
	      std::list<DCDB::SensorDataStoreReading> readings;
Alessio Netti's avatar
Alessio Netti committed
257
258
259
260
261
262
          for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
              DCDB::SensorDataStoreReading r(sid, payload[i].timestamp, payload[i].value);
              readings.push_back(r);
              mySensorCache.storeSensor(sid, payload[i].timestamp, payload[i].value);
          }
          mySensorCache.getSensorMap()[sid].updateBatchSize(uint64_t(len / sizeof(mqttPayload)));
263
	      mySensorDataStore->insertBatch(readings);
264
	      readingCtr+= readings.size();
265

266
              //mySensorCache.dump();
267
          }
268
#if 1
269
270
271
272
          else {
              cout << "Wrong topic format: " << msg->getTopic() << "\n";
              return 1;
          }
273
#endif
274
      }
275
  }
276
  return 0;
277
278
}

279
280
281



282
283
284
285
/*
 * Print usage information
 */
void usage() {
Alessio Netti's avatar
Alessio Netti committed
286
  Configuration config("", "collectagent.conf");
287
288
289
290
291
  /*
             1         2         3         4         5         6         7         8
   012345678901234567890123456789012345678901234567890123456789012345678901234567890
   */
  cout << "Usage:" << endl;
292
  cout << "  collectagent [-d] [-s] [-x] [-a] [-m<host>] [-c<host>] [-u<username>] [-p<password>] [-t<ttl>] [-v<verbosity>] <path/to/configfiles/>" << endl;
293
294
295
296
  cout << "  collectagent -h" << endl;
  cout << endl;
  
  cout << "Options:" << endl;
Alessio Netti's avatar
Alessio Netti committed
297
298
  cout << "  -m<host>      MQTT listen address     [default: " << config.mqttListenHost << ":" << config.mqttListenPort << "]" << endl;
  cout << "  -c<host>      Cassandra host          [default: " << config.cassandraSettings.host << ":" << config.cassandraSettings.port << "]" << endl;
Michael Ott's avatar
Michael Ott committed
299
300
  cout << "  -u<username>  Cassandra username      [default: none]" << endl;
  cout << "  -p<password>  Cassandra password      [default: none]" << endl;
Alessio Netti's avatar
Alessio Netti committed
301
302
  cout << "  -t<ttl>       Cassandra insert TTL    [default: " << config.cassandraSettings.ttl << "]" << endl;
  cout << "  -v<level>     Set verbosity of output [default: " << config.logLevelCmd << "]" << endl
Alessio Netti's avatar
Logging    
Alessio Netti committed
303
       << "                Can be a number between 5 (all) and 0 (fatal)." << endl;
304
  cout << endl;
Michael Ott's avatar
Michael Ott committed
305
  cout << "  -d            Daemonize" << endl;
306
  cout << "  -s            Print message stats" <<endl;
307
  cout << "  -x            Parse and print the config but do not actually start collectagent" << endl;
308
  cout << "  -a			   Enable sensor auto-publish" << endl;
Michael Ott's avatar
Michael Ott committed
309
  cout << "  -h            This help page" << endl;
310
  cout << endl;
311
312
313
}

int main(int argc, char* const argv[]) {
314
    cout << "CollectAgent " << VERSION << " (libdcdb " << DCDB::Version::getVersion() << ")" << endl << endl;
315
316

  try{
317

318
319
320
321
322
323
324
325
      // Checking if path to config is supplied
      if (argc <= 1) {
          cout << "Please specify a path to the config-directory" << endl << endl;
          usage();
          exit(EXIT_FAILURE);
      }

      // Defining options
326
      const char* opts = "m:r:c:C:u:p:t:v:dDsaxh";
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342

      // Same mechanism as in DCDBPusher - checking if help string is requested before loading config
      char ret;
      while ((ret = getopt(argc, argv, opts)) != -1) {
          switch (ret)
          {
              case 'h':
                  usage();
                  exit(EXIT_FAILURE);
                  break;
              default:
                  //do nothing (other options are read later on)
                  break;
          }
      }

343
344
      initLogging();
      auto cmdSink = setupCmdLogger();
Alessio Netti's avatar
Logging    
Alessio Netti committed
345

Alessio Netti's avatar
Alessio Netti committed
346
347
      Configuration config(argv[argc - 1], "collectagent.conf");
      if( !config.readConfig() ) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
348
          LOG(fatal) << "Failed to read global configuration!";
349
350
351
          exit(EXIT_FAILURE);
      }

Alessio Netti's avatar
Alessio Netti committed
352
353
354
355
      // References to shorten access to config parameters
      Configuration& settings = config;
      cassandra_t& cassandraSettings = config.cassandraSettings;
      pluginSettings_t& pluginSettings = config.pluginSettings;
Micha Mueller's avatar
Micha Mueller committed
356
      serverSettings_t& restAPISettings = config.restAPISettings;
Alessio Netti's avatar
Alessio Netti committed
357
      analyticsSettings_t& analyticsSettings = config.analyticsSettings;
Alessio Netti's avatar
Alessio Netti committed
358
      
359
360
      optind = 1;
      while ((ret=getopt(argc, argv, opts))!=-1) {
361
          switch(ret) {
362
              case 'a':
363
                  pluginSettings.autoPublish = true;
364
                  break;
365
              case 'm':
Alessio Netti's avatar
Alessio Netti committed
366
367
368
                  settings.mqttListenHost = parseNetworkHost(optarg);
                  settings.mqttListenPort = parseNetworkPort(optarg);
                  if(settings.mqttListenPort=="") settings.mqttListenPort = string(LISTENPORT);
369
                  break;
370
              case 'c':
Alessio Netti's avatar
Alessio Netti committed
371
372
373
                  cassandraSettings.host = parseNetworkHost(optarg);
                  cassandraSettings.port = parseNetworkPort(optarg);
                  if(cassandraSettings.port=="") cassandraSettings.port = string(CASSANDRAPORT);
374
                  break;
Michael Ott's avatar
Michael Ott committed
375
              case 'u':
Alessio Netti's avatar
Alessio Netti committed
376
                  cassandraSettings.username = optarg;
377
                  break;
Michael Ott's avatar
Michael Ott committed
378
              case 'p': {
Alessio Netti's avatar
Alessio Netti committed
379
                  cassandraSettings.password = optarg;
380
                  // What does this do? Mask the password?
Michael Ott's avatar
Michael Ott committed
381
382
383
384
385
386
387
                  size_t pwdLen = strlen(optarg);
                  memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
                  if (pwdLen > 3) {
                      memset(optarg+3, 0, pwdLen-3);
                  }
                  break;
              }
388
              case 't':
Alessio Netti's avatar
Alessio Netti committed
389
                  cassandraSettings.ttl = stoul(optarg);
390
                  break;
Alessio Netti's avatar
Logging    
Alessio Netti committed
391
              case 'v':
392
                  settings.logLevelCmd = stoi(optarg);
Alessio Netti's avatar
Logging    
Alessio Netti committed
393
                  break;
394
              case 'd':
395
              case 'D':
396
                  settings.daemonize = 1;
397
                  break;
398
              case 's':
399
                  settings.statistics = 1;
400
                  break;
401
              case 'x':
Alessio Netti's avatar
Alessio Netti committed
402
                  settings.validateConfig = true;
403
                  break;
404
              case 'h':
405
406
407
              default:
                  usage();
                  exit(EXIT_FAILURE);
408
409
410
          }
      }

411
412
      //set up logger to file
      if (settings.logLevelFile >= 0) {
Alessio Netti's avatar
Alessio Netti committed
413
	  auto fileSink = setupFileLogger(pluginSettings.tempdir, std::string("collectagent"));
414
415
416
	  fileSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelFile));
      }
      
Alessio Netti's avatar
Logging    
Alessio Netti committed
417
      //severity level may be overwritten (per option or config-file) --> set it according to globalSettings
418
419
420
      if (settings.logLevelCmd >= 0) {
	  cmdSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelCmd));
      }
Alessio Netti's avatar
Logging    
Alessio Netti committed
421

422
      /*
Alessio Netti's avatar
Alessio Netti committed
423
       * Catch SIGINT and SIGTERM signals to allow for proper server shutdowns.
424
425
       */
      signal(SIGINT, sigHandler);
Alessio Netti's avatar
Alessio Netti committed
426
      signal(SIGTERM, sigHandler);
427
428
429
430
431
432
433
434
435
436

      /*
       * Catch critical signals to allow for backtraces
       */
      signal(SIGABRT, abrtHandler);
      signal(SIGSEGV, abrtHandler);

      // Daemonizing the collectagent
      if(settings.daemonize)
          dcdbdaemon();
Alessio Netti's avatar
Alessio Netti committed
437
      
438
      // Setting the size of the sensor cache
439
      // Conversion from milliseconds to nanoseconds
Alessio Netti's avatar
Alessio Netti committed
440
      mySensorCache.setMaxHistory(uint64_t(pluginSettings.cacheInterval) * 1000000);
441

442
      //Allocate and initialize connection to Cassandra.
Alessio Netti's avatar
Alessio Netti committed
443
444
      dcdbConn = new DCDB::Connection(cassandraSettings.host, atoi(cassandraSettings.port.c_str()), 
                                      cassandraSettings.username, cassandraSettings.password);
Alessio Netti's avatar
Alessio Netti committed
445
446
      dcdbConn->setNumThreadsIo(cassandraSettings.numThreadsIo);
      dcdbConn->setQueueSizeIo(cassandraSettings.queueSizeIo);
447
      uint32_t params[1] = {cassandraSettings.coreConnPerHost};
Alessio Netti's avatar
Alessio Netti committed
448
      dcdbConn->setBackendParams(params);
Alessio Netti's avatar
Alessio Netti committed
449
      
Axel Auweter's avatar
Axel Auweter committed
450
      if (!dcdbConn->connect()) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
451
          LOG(fatal) << "Cannot connect to Cassandra!";
452
453
454
455
456
457
          exit(EXIT_FAILURE);
      }

      /*
       * Legacy behavior: Initialize the DCDB schema in Cassandra.
       */
Axel Auweter's avatar
Axel Auweter committed
458
      dcdbConn->initSchema();
Alessio Netti's avatar
Alessio Netti committed
459
      
460
461
462
      /*
       * Allocate the SensorDataStore.
       */
463
      mySensorDataStore = new DCDB::SensorDataStore(dcdbConn);
464
      mySensorConfig = new DCDB::SensorConfig(dcdbConn);
Alessio Netti's avatar
Alessio Netti committed
465
      myJobDataStore = new DCDB::JobDataStore(dcdbConn);
466
467
468

      /*
       * Set TTL for data store inserts if TTL > 0.
469
       */
Alessio Netti's avatar
Alessio Netti committed
470
471
472
      if (cassandraSettings.ttl > 0)
        mySensorDataStore->setTTL(cassandraSettings.ttl);
      mySensorDataStore->setDebugLog(cassandraSettings.debugLog);
473

474
475
476
477
      analyticsController = new AnalyticsController(mySensorConfig, mySensorDataStore);
      analyticsController->setCache(&mySensorCache);
      if(!analyticsController->initialize(settings, argv[argc - 1]))
          return EXIT_FAILURE;
Alessio Netti's avatar
Alessio Netti committed
478
479
      queryEngine.setFilter(analyticsSettings.filter);
      queryEngine.setSensorHierarchy(analyticsSettings.hierarchy);
480
      queryEngine.setQueryCallback(sensorQueryCallback);
Alessio Netti's avatar
Alessio Netti committed
481
      queryEngine.setJobQueryCallback(jobQueryCallback);
482

Alessio Netti's avatar
Alessio Netti committed
483
      LOG_LEVEL vLogLevel = settings.validateConfig ? LOG_LEVEL::info : LOG_LEVEL::debug;
484
485
486
487
      LOG_VAR(vLogLevel) << "-----  Configuration  -----";

      //print global settings in either case
      LOG(info) << "Global Settings:";
Alessio Netti's avatar
Alessio Netti committed
488
      LOG(info) << "    MQTT-listenAddress: " << settings.mqttListenHost << ":" << settings.mqttListenPort;
Alessio Netti's avatar
Alessio Netti committed
489
      LOG(info) << "    CacheInterval:      " << int(pluginSettings.cacheInterval/1000) << " [s]";
490
491
492
493
494
      LOG(info) << "    CleaningInterval:   " << settings.cleaningInterval << " [s]";
      LOG(info) << "    MessageThreads:     " << settings.messageThreads;
      LOG(info) << "    MessageSlots:       " << settings.messageSlots;
      LOG(info) << "    Daemonize:          " << (settings.daemonize ? "Enabled" : "Disabled");
      LOG(info) << "    Statistics:         " << (settings.statistics ? "Enabled" : "Disabled");
Alessio Netti's avatar
Alessio Netti committed
495
      LOG(info) << "    MQTT-prefix:        " << pluginSettings.mqttPrefix;
496
      LOG(info) << "    Auto-publish:       " << (pluginSettings.autoPublish ? "Enabled" : "Disabled");
Alessio Netti's avatar
Alessio Netti committed
497
498
      LOG(info) << "    Write-Dir:          " << pluginSettings.tempdir;
      LOG(info) << (settings.validateConfig ? "    Only validating config files." : "    ValidateConfig:     Disabled");
499

Alessio Netti's avatar
Alessio Netti committed
500
501
      LOG(info) << "Analytics Settings:";
      LOG(info) << "    Hierarchy:          " << (analyticsSettings.hierarchy!="" ? analyticsSettings.hierarchy : "none");
502
      LOG(info) << "    Filter:             " << (analyticsSettings.filter!="" ? analyticsSettings.filter : "none");
Alessio Netti's avatar
Alessio Netti committed
503
      
504
      LOG(info) << "Cassandra Driver Settings:";
Alessio Netti's avatar
Alessio Netti committed
505
      LOG(info) << "    Address:            " << cassandraSettings.host << ":" << cassandraSettings.port;
Alessio Netti's avatar
Alessio Netti committed
506
507
508
509
510
      LOG(info) << "    TTL:                " << cassandraSettings.ttl;
      LOG(info) << "    NumThreadsIO:       " << cassandraSettings.numThreadsIo;
      LOG(info) << "    QueueSizeIO:        " << cassandraSettings.queueSizeIo;
      LOG(info) << "    CoreConnPerHost:    " << cassandraSettings.coreConnPerHost;
      LOG(info) << "    DebugLog:           " << (cassandraSettings.debugLog ? "Enabled" : "Disabled");
511
#ifdef SimpleMQTTVerbose
Alessio Netti's avatar
Alessio Netti committed
512
513
      LOG(info) << "    Username:           " << cassandraSettings.username;
	  LOG(info) << "    Password:           " << cassandraSettings.password;
514
515
516
517
518
#else
      LOG(info) << "    Username and password not printed.";
#endif

      LOG(info) << "RestAPI Settings:";
Micha Mueller's avatar
Micha Mueller committed
519
      LOG(info) << "    REST Server: " << restAPISettings.host << ":" << restAPISettings.port;
Alessio Netti's avatar
Alessio Netti committed
520
521
522
523
524
525
526
#ifdef SimpleMQTTVerbose
      LOG(info) << "    Certificate: " << restAPISettings.certificate;
	  LOG(info) << "    Private key file: " << restAPISettings.privateKey;
	  LOG(info) << "    DH params from: " << restAPISettings.dhFile;
#else
      LOG(info) << "    Certificate, private key and DH-param file not printed.";
#endif
527
528
529
530
531
532
533
534

      LOG_VAR(vLogLevel) << "-----  Analytics Configuration  -----";
      for(auto& p : analyticsController->getManager()->getPlugins()) {
          LOG_VAR(vLogLevel) << "Analytics Plugin \"" << p.id << "\"";
          p.configurator->printConfig(vLogLevel);
      }
      LOG_VAR(vLogLevel) << "-----  End Configuration  -----";

Alessio Netti's avatar
Alessio Netti committed
535
      if (settings.validateConfig)
536
537
538
539
          return EXIT_SUCCESS;
      else
          analyticsController->start();

540
541
542
      /*
       * Start the MQTT Message Server.
       */
Alessio Netti's avatar
Alessio Netti committed
543
      SimpleMQTTServer ms(settings.mqttListenHost, settings.mqttListenPort, settings.messageThreads, settings.messageSlots);
544
      
545
546
547
      ms.setMessageCallback(mqttCallback);
      ms.start();

Alessio Netti's avatar
Logging    
Alessio Netti committed
548
      LOG(info) << "MQTT Server running...";
549
      
550
551
552
      /*
       * Start the HTTP Server for the REST API
       */
553
      CARestAPI httpsServer(restAPISettings, &mySensorCache, analyticsController);
554
      config.readRestAPIUsers(&httpsServer);
555
      httpsServer.start();
556

Alessio Netti's avatar
Logging    
Alessio Netti committed
557
      LOG(info) <<  "HTTP Server running...";
558

559
560
561
562
563
564
      /*
       * Run (hopefully) forever...
       */
      keepRunning = 1;
      timeval start, end;
      double elapsed;
565
566
567
      msgCtr = 0;
      pmsgCtr = 0;
      readingCtr = 0;
568

Alessio Netti's avatar
Alessio Netti committed
569
570
571
      gettimeofday(&start, NULL);
      uint64_t lastCleanup = start.tv_sec;

572
      LOG(info) << "Collect Agent running...";
573
574
      while(keepRunning) {
          gettimeofday(&start, NULL);
Alessio Netti's avatar
Alessio Netti committed
575
576
577
578
579
580
581
          if(start.tv_sec - lastCleanup > settings.cleaningInterval) {
              uint64_t purged = mySensorCache.clean(settings.cleaningInterval * 1000000000);
              lastCleanup = start.tv_sec;
              if(purged > 0)
                  LOG(info) << "Cache: purged " << purged << " obsolete entries";
          }

582
          sleep(60);
583
584
585
586
          /* not really thread safe but will do the job */
          gettimeofday(&end, NULL);
          elapsed = (end.tv_sec - start.tv_sec) * 1000.0;
          elapsed += (end.tv_usec - start.tv_usec) / 1000.0;
587
          float publish = msgCtr?(pmsgCtr*100.0)/msgCtr:0;
Alessio Netti's avatar
Alessio Netti committed
588
          if (settings.statistics && keepRunning) {
589
              LOG(info) << "Performance: " << (readingCtr/elapsed)*1000.0 << " inserts/s, " << (msgCtr/elapsed)*1000.0 << " messages/s (" << publish << "% PUBLISH)";
590
              LOG(info) << "Analytics Performance: " << (analyticsController->getReadingCtr()/elapsed)*1000.0 << " inserts/s ";
591
          }
592
          msgCtr = 0;
593
          pmsgCtr = 0;
594
	  readingCtr = 0;
595
596
      }

Alessio Netti's avatar
Logging    
Alessio Netti committed
597
      LOG(info) << "Stopping...";
598
      analyticsController->stop();
599
      ms.stop();
Alessio Netti's avatar
Logging    
Alessio Netti committed
600
      LOG(info) << "MQTT Server stopped...";
601
      httpsServer.stop();
Alessio Netti's avatar
Logging    
Alessio Netti committed
602
      LOG(info) << "HTTP Server stopped...";
603
      delete mySensorDataStore;
Alessio Netti's avatar
Alessio Netti committed
604
      delete myJobDataStore;
605
      delete mySensorConfig;
Axel Auweter's avatar
Axel Auweter committed
606
607
      dcdbConn->disconnect();
      delete dcdbConn;
Alessio Netti's avatar
Logging    
Alessio Netti committed
608
      LOG(info) << "Collect Agent closed. Bye bye...";
609
  }
610
  catch (const exception& e) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
611
      LOG(fatal) << "Exception: " << e.what();
612
      abrt(EXIT_FAILURE, INTERR);
613
614
  }

615
  return EXIT_SUCCESS;
616
}
617
618