collectagent.cpp 24.5 KB
Newer Older
1
2
//================================================================================
// Name        : collectagent.cpp
3
// Author      : Axel Auweter
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
// Copyright   : Leibniz Supercomputing Centre
6
// Description : Main code of the CollectAgent
7
8
9
10
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
11
// Copyright (C) 2011-2019 Leibniz Supercomputing Centre
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
27

28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
 * @defgroup ca Collect Agent
 *
 * @brief MQTT message broker in between pusher and storage backend.
 *
 * @details Collect Agent is a intermediary between one or multiple Pusher
 *          instances and one storage backend. It runs a reduced custom MQTT
 *          message server. Collect Agent receives data from Pusher
 *          via MQTT messages and stores them in the storage via libdcdb.
 */

/**
 * @file collectagent.cpp
 *
 * @brief Main function for the DCDB Collect Agent.
 *
 * @ingroup ca
 */

47
#include <cstdlib>
48
#include <signal.h>
49
50
#include <unistd.h>
#include <string>
51

52
#include <boost/date_time/posix_time/posix_time.hpp>
53

54
#include <dcdb/connection.h>
55
#include <dcdb/sensordatastore.h>
Alessio Netti's avatar
Alessio Netti committed
56
#include <dcdb/jobdatastore.h>
57
#include <dcdb/sensorconfig.h>
58
#include <dcdb/version.h>
59
#include <dcdb/sensor.h>
60
#include "version.h"
61

62
#include "CARestAPI.h"
63
#include "configuration.h"
64
#include "simplemqttserver.h"
65
#include "messaging.h"
66
#include "abrt.h"
67
#include "dcdbdaemon.h"
68
#include "sensorcache.h"
69
70
#include "analyticscontroller.h"
#include "../analytics/includes/QueryEngine.h"
71

72
73
74
#define __STDC_FORMAT_MACROS
#include <inttypes.h>

75
using namespace std;
76
77

int keepRunning;
78
bool statistics;
79
80
uint64_t msgCtr;
uint64_t pmsgCtr;
81
uint64_t readingCtr;
Alessio Netti's avatar
Alessio Netti committed
82
SensorCache mySensorCache;
83
AnalyticsController* analyticsController;
84
DCDB::Connection* dcdbConn;
85
DCDB::SensorDataStore *mySensorDataStore;
Alessio Netti's avatar
Alessio Netti committed
86
DCDB::JobDataStore *myJobDataStore;
87
88
DCDB::SensorConfig *mySensorConfig;
DCDB::SCError err;
89
QueryEngine& queryEngine = QueryEngine::getInstance();
90
logger_t lg;
91

92
bool jobQueryCallback(const uint32_t jobId, const uint64_t startTs, const uint64_t endTs, vector<qeJobData>& buffer, const bool rel, const bool range) {
Alessio Netti's avatar
Alessio Netti committed
93
94
95
96
97
98
99
100
101
102
    std::list<JobData> tempList;
    JobData   tempData;
    qeJobData tempQeData;
    JDError err;
    if(range) {
        // Getting a list of jobs in the given time range
        uint64_t now = getTimestamp();
        uint64_t startTsInt = rel ? now - startTs : startTs;
        uint64_t endTsInt = rel ? now - endTs : endTs;
        DCDB::TimeStamp start(startTsInt), end(endTsInt);
103
        err = myJobDataStore->getJobsInIntervalRunning(tempList, start, end);
104
        if(err != JD_OK) return false;
Alessio Netti's avatar
Alessio Netti committed
105
106
107
    } else {
        // Getting a single job by id
        err = myJobDataStore->getJobById(tempData, jobId);
108
        if(err != JD_OK) return false;
Alessio Netti's avatar
Alessio Netti committed
109
110
111
112
113
114
115
116
117
        tempList.push_back(tempData);
    }
    
    for(auto& jd : tempList) {
        tempQeData.jobId = jd.jobId;
        tempQeData.userId = jd.userId;
        tempQeData.startTime = jd.startTime.getRaw();
        tempQeData.endTime = jd.endTime.getRaw();
        tempQeData.nodes = jd.nodes;
118
        buffer.push_back(tempQeData);
Alessio Netti's avatar
Alessio Netti committed
119
    }
120
    return true;
Alessio Netti's avatar
Alessio Netti committed
121
122
}

123
bool sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>& buffer, const bool rel) {
124
    std::string topic=name;
Alessio Netti's avatar
Alessio Netti committed
125
    // Getting the topic of the queried sensor from the Navigator
126
    // If not found, we try to use the input name as topic
127
128
    try {
        topic = queryEngine.getNavigator()->getNodeTopic(name);
129
    } catch(const std::domain_error& e) {}
130
    DCDB::SensorId sid;
Alessio Netti's avatar
Alessio Netti committed
131
    // Creating a SID to perform the query
132
133
    if(!sid.mqttTopicConvert(topic))
        return false;
134
135
    if(mySensorCache.getSensorMap().count(sid) > 0) {
        CacheEntry &entry = mySensorCache.getSensorMap()[sid];
136
137
        if (entry.getView(startTs, endTs, buffer, rel))
            return true;
138
139
140
141
142
143
144
145
146
147
148
149
150
    }
    // If we are here then the sensor was not found in the cache - we need to fetch data from Cassandra
    try {
        DCDB::PublicSensor publicSensor;
        publicSensor.name = name;
        publicSensor.pattern = topic;
        std::list <DCDB::SensorDataStoreReading> results;
        DCDB::Sensor sensor(dcdbConn, publicSensor);
        uint64_t now = getTimestamp();
        //Converting relative timestamps to absolute
        uint64_t startTsInt = rel ? now - startTs : startTs;
        uint64_t endTsInt = rel ? now - endTs : endTs;
        DCDB::TimeStamp start(startTsInt), end(endTsInt);
Alessio Netti's avatar
Alessio Netti committed
151
        sensor.query(results, start, end, DCDB::AGGREGATE_NONE, 3600000000000);
152
153
        if(results.empty())
            return false;
154
155
156
157
        reading_t reading;
        for (const auto &r : results) {
            reading.value = r.value;
            reading.timestamp = r.timeStamp.getRaw();
158
            buffer.push_back(reading);
159
160
161
        }
    }
    catch(const std::exception& e) {
162
        return false;
163
    }
164
    return true;
165
166
}

167
/* Normal termination (SIGINT, CTRL+C) */
168
169
void sigHandler(int sig)
{
Alessio Netti's avatar
Alessio Netti committed
170
171
172
173
174
  boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
  if( sig == SIGINT )
      LOG(fatal) << "Received SIGINT";
  else if( sig == SIGTERM )
      LOG(fatal) << "Received SIGTERM";
175
176
177
  keepRunning = 0;
}

178
179
180
181
182
183
/* Crash */
void abrtHandler(int sig)
{
  abrt(EXIT_FAILURE, SIGNAL);
}

184
int mqttCallback(SimpleMQTTMessage *msg)
185
186
187
188
{
  /*
   * Increment the msgCtr/vmsgCtr for statistics.
   */
189
  msgCtr++;
190
191
192
  if (msg->isPublish())
    pmsgCtr++;

193
  uint64_t len;
194
195
196
  /*
   * Decode the message and put into the database.
   */
197
  if (msg->isPublish()) {
198
      const char *topic = msg->getTopic().c_str();
199
      // We check whether the topic includes the /DCDB_MAP/ keyword, indicating that the payload will contain the
200
201
      // sensor's name. In that case, we set the mappingMessage flag to true, and filter the keyword out of the prefix
      // We use strncmp as it is the most efficient way to do it
Alessio Netti's avatar
Alessio Netti committed
202
      if (strncmp(topic, DCDB_MAP, DCDB_MAP_LEN) == 0) {
203
          if ((len = msg->getPayloadLength()) == 0) {
204
              LOG(error) << "Empty sensor publish message received!";
205
206
              return 1;
          }
207

208
          string payload((char *) msg->getPayload(), len);
209
210
211
212
213
214
215
216
217
          //If the topic includes the extended /DCDB_MAP/METADATA/ keyword, we assume a JSON metadata packet is encoded
          if(strncmp(topic, DCDB_MET, DCDB_MET_LEN) == 0) {
              SensorMetadata sm;
              try {
                  sm.parseJSON(payload);
              } catch (const std::exception &e) {
                  LOG(error) << "Invalid metadata packed received!";
                  return 1;
              }
Alessio Netti's avatar
Alessio Netti committed
218
219
              DCDB::PublicSensor ps = Configuration::metadataToPublicSensor(sm, std::string(topic + DCDB_MET_LEN));
              err = mySensorConfig->publishSensor(ps);
220
221
          } else {
              err = mySensorConfig->publishSensor(payload.c_str(), topic + DCDB_MAP_LEN);
222
          }
223
224
225
226

          // PublishSensor does most of the error checking for us
          switch (err) {
              case DCDB::SC_INVALIDPATTERN:
Alessio Netti's avatar
Logging    
Alessio Netti committed
227
                  LOG(error) << "Invalid sensor topic : " << msg->getTopic();
228
229
                  return 1;
              case DCDB::SC_INVALIDPUBLICNAME:
230
                  LOG(error) << "Invalid sensor public name.";
231
232
                  return 1;
              case DCDB::SC_INVALIDSESSION:
Alessio Netti's avatar
Logging    
Alessio Netti committed
233
                  LOG(error) << "Cannot reach sensor data store.";
234
235
236
237
238
239
240
241
242
243
244
                  return 1;
              default:
                  break;
          }
      } else {
          mqttPayload buf, *payload;

          len = msg->getPayloadLength();
          //In the 64 bit message case, the collect agent provides a timestamp
          if (len == sizeof(uint64_t)) {
              payload = &buf;
245
              payload->value = *((int64_t *) msg->getPayload());
246
247
              payload->timestamp = Messaging::calculateTimestamp();
              len = sizeof(uint64_t) * 2;
248
          }
249
250
251
252
253
254
              //...otherwise it just retrieves it from the MQTT message payload.
          else if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
              payload = (mqttPayload *) msg->getPayload();
          }
              //...otherwise this message is malformed -> ignore...
          else {
Alessio Netti's avatar
Logging    
Alessio Netti committed
255
              LOG(error) << "Message malformed";
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
              return 1;
          }

          /*
           * Check if we can decode the message topic
           * into a valid SensorId. If successful, store
           * the record in the database.
           */
          DCDB::SensorId sid;
          if (sid.mqttTopicConvert(msg->getTopic())) {
#if 0
              cout << "Topic decode successful:" << endl
                  << "  Raw:            " << hex << setw(16) << setfill('0') << sid.getRaw()[0] << hex << setw(16) << setfill('0') << sid.getRaw()[1] << endl
                  << "  DeviceLocation: " << hex << setw(16) << setfill('0') << sid.getDeviceLocation() << endl
                  << "  device_id:      " << hex << setw(8) << setfill('0') << sid.getDeviceSensorId().device_id << endl
                  << "  sensor_number:  " << hex << setw(4) << setfill('0') << sid.getDeviceSensorId().sensor_number << endl << dec;

              cout << "Payload ("  << len/sizeof(mqttPayload) << " messages):"<< endl;
              for (uint64_t i=0; i<len/sizeof(mqttPayload); i++) {
                cout << "  " << i << ": ts=" << payload[i].timestamp << " val=" << payload[i].value << endl;
              }
              cout << endl;
278
#endif
279
	      std::list<DCDB::SensorDataStoreReading> readings;
Alessio Netti's avatar
Alessio Netti committed
280
281
282
283
284
285
          for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
              DCDB::SensorDataStoreReading r(sid, payload[i].timestamp, payload[i].value);
              readings.push_back(r);
              mySensorCache.storeSensor(sid, payload[i].timestamp, payload[i].value);
          }
          mySensorCache.getSensorMap()[sid].updateBatchSize(uint64_t(len / sizeof(mqttPayload)));
286
	      mySensorDataStore->insertBatch(readings);
287
	      readingCtr+= readings.size();
288

289
              //mySensorCache.dump();
290
          }
291
#if 1
292
293
294
295
          else {
              cout << "Wrong topic format: " << msg->getTopic() << "\n";
              return 1;
          }
296
#endif
297
      }
298
  }
299
  return 0;
300
301
}

302
303
304



305
306
307
308
/*
 * Print usage information
 */
void usage() {
Alessio Netti's avatar
Alessio Netti committed
309
  Configuration config("", "collectagent.conf");
310
311
312
313
314
  /*
             1         2         3         4         5         6         7         8
   012345678901234567890123456789012345678901234567890123456789012345678901234567890
   */
  cout << "Usage:" << endl;
315
  cout << "  collectagent [-d] [-s] [-x] [-a] [-m<host>] [-c<host>] [-u<username>] [-p<password>] [-t<ttl>] [-v<verbosity>] <path/to/configfiles/>" << endl;
316
317
318
319
  cout << "  collectagent -h" << endl;
  cout << endl;
  
  cout << "Options:" << endl;
Alessio Netti's avatar
Alessio Netti committed
320
321
  cout << "  -m<host>      MQTT listen address     [default: " << config.mqttListenHost << ":" << config.mqttListenPort << "]" << endl;
  cout << "  -c<host>      Cassandra host          [default: " << config.cassandraSettings.host << ":" << config.cassandraSettings.port << "]" << endl;
Michael Ott's avatar
Michael Ott committed
322
323
  cout << "  -u<username>  Cassandra username      [default: none]" << endl;
  cout << "  -p<password>  Cassandra password      [default: none]" << endl;
Alessio Netti's avatar
Alessio Netti committed
324
325
  cout << "  -t<ttl>       Cassandra insert TTL    [default: " << config.cassandraSettings.ttl << "]" << endl;
  cout << "  -v<level>     Set verbosity of output [default: " << config.logLevelCmd << "]" << endl
Alessio Netti's avatar
Logging    
Alessio Netti committed
326
       << "                Can be a number between 5 (all) and 0 (fatal)." << endl;
327
  cout << endl;
Michael Ott's avatar
Michael Ott committed
328
  cout << "  -d            Daemonize" << endl;
329
  cout << "  -s            Print message stats" <<endl;
330
  cout << "  -x            Parse and print the config but do not actually start collectagent" << endl;
331
  cout << "  -a			   Enable sensor auto-publish" << endl;
Michael Ott's avatar
Michael Ott committed
332
  cout << "  -h            This help page" << endl;
333
  cout << endl;
334
335
336
}

int main(int argc, char* const argv[]) {
337
    cout << "CollectAgent " << VERSION << " (libdcdb " << DCDB::Version::getVersion() << ")" << endl << endl;
338
339

  try{
340

341
342
343
344
345
346
347
348
      // Checking if path to config is supplied
      if (argc <= 1) {
          cout << "Please specify a path to the config-directory" << endl << endl;
          usage();
          exit(EXIT_FAILURE);
      }

      // Defining options
349
      const char* opts = "m:r:c:C:u:p:t:v:dDsaxh";
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365

      // Same mechanism as in DCDBPusher - checking if help string is requested before loading config
      char ret;
      while ((ret = getopt(argc, argv, opts)) != -1) {
          switch (ret)
          {
              case 'h':
                  usage();
                  exit(EXIT_FAILURE);
                  break;
              default:
                  //do nothing (other options are read later on)
                  break;
          }
      }

366
367
      initLogging();
      auto cmdSink = setupCmdLogger();
Alessio Netti's avatar
Logging    
Alessio Netti committed
368

Alessio Netti's avatar
Alessio Netti committed
369
370
      Configuration config(argv[argc - 1], "collectagent.conf");
      if( !config.readConfig() ) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
371
          LOG(fatal) << "Failed to read global configuration!";
372
373
374
          exit(EXIT_FAILURE);
      }

Alessio Netti's avatar
Alessio Netti committed
375
376
377
378
      // References to shorten access to config parameters
      Configuration& settings = config;
      cassandra_t& cassandraSettings = config.cassandraSettings;
      pluginSettings_t& pluginSettings = config.pluginSettings;
Micha Mueller's avatar
Micha Mueller committed
379
      serverSettings_t& restAPISettings = config.restAPISettings;
Alessio Netti's avatar
Alessio Netti committed
380
      analyticsSettings_t& analyticsSettings = config.analyticsSettings;
Alessio Netti's avatar
Alessio Netti committed
381
      
382
383
      optind = 1;
      while ((ret=getopt(argc, argv, opts))!=-1) {
384
          switch(ret) {
385
              case 'a':
386
                  pluginSettings.autoPublish = true;
387
                  break;
388
              case 'm':
Alessio Netti's avatar
Alessio Netti committed
389
390
391
                  settings.mqttListenHost = parseNetworkHost(optarg);
                  settings.mqttListenPort = parseNetworkPort(optarg);
                  if(settings.mqttListenPort=="") settings.mqttListenPort = string(LISTENPORT);
392
                  break;
393
              case 'c':
Alessio Netti's avatar
Alessio Netti committed
394
395
396
                  cassandraSettings.host = parseNetworkHost(optarg);
                  cassandraSettings.port = parseNetworkPort(optarg);
                  if(cassandraSettings.port=="") cassandraSettings.port = string(CASSANDRAPORT);
397
                  break;
Michael Ott's avatar
Michael Ott committed
398
              case 'u':
Alessio Netti's avatar
Alessio Netti committed
399
                  cassandraSettings.username = optarg;
400
                  break;
Michael Ott's avatar
Michael Ott committed
401
              case 'p': {
Alessio Netti's avatar
Alessio Netti committed
402
                  cassandraSettings.password = optarg;
403
                  // What does this do? Mask the password?
Michael Ott's avatar
Michael Ott committed
404
405
406
407
408
409
410
                  size_t pwdLen = strlen(optarg);
                  memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
                  if (pwdLen > 3) {
                      memset(optarg+3, 0, pwdLen-3);
                  }
                  break;
              }
411
              case 't':
Alessio Netti's avatar
Alessio Netti committed
412
                  cassandraSettings.ttl = stoul(optarg);
413
                  break;
Alessio Netti's avatar
Logging    
Alessio Netti committed
414
              case 'v':
415
                  settings.logLevelCmd = stoi(optarg);
Alessio Netti's avatar
Logging    
Alessio Netti committed
416
                  break;
417
              case 'd':
418
              case 'D':
419
                  settings.daemonize = 1;
420
                  break;
421
              case 's':
422
                  settings.statistics = 1;
423
                  break;
424
              case 'x':
Alessio Netti's avatar
Alessio Netti committed
425
                  settings.validateConfig = true;
426
                  break;
427
              case 'h':
428
429
430
              default:
                  usage();
                  exit(EXIT_FAILURE);
431
432
433
          }
      }

434
435
      //set up logger to file
      if (settings.logLevelFile >= 0) {
Alessio Netti's avatar
Alessio Netti committed
436
	  auto fileSink = setupFileLogger(pluginSettings.tempdir, std::string("collectagent"));
437
438
439
	  fileSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelFile));
      }
      
Alessio Netti's avatar
Logging    
Alessio Netti committed
440
      //severity level may be overwritten (per option or config-file) --> set it according to globalSettings
441
442
443
      if (settings.logLevelCmd >= 0) {
	  cmdSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelCmd));
      }
Alessio Netti's avatar
Logging    
Alessio Netti committed
444

445
      /*
Alessio Netti's avatar
Alessio Netti committed
446
       * Catch SIGINT and SIGTERM signals to allow for proper server shutdowns.
447
448
       */
      signal(SIGINT, sigHandler);
Alessio Netti's avatar
Alessio Netti committed
449
      signal(SIGTERM, sigHandler);
450
451
452
453
454
455
456
457
458
459

      /*
       * Catch critical signals to allow for backtraces
       */
      signal(SIGABRT, abrtHandler);
      signal(SIGSEGV, abrtHandler);

      // Daemonizing the collectagent
      if(settings.daemonize)
          dcdbdaemon();
Alessio Netti's avatar
Alessio Netti committed
460
      
461
      // Setting the size of the sensor cache
462
      // Conversion from milliseconds to nanoseconds
Alessio Netti's avatar
Alessio Netti committed
463
      mySensorCache.setMaxHistory(uint64_t(pluginSettings.cacheInterval) * 1000000);
464

465
      //Allocate and initialize connection to Cassandra.
Alessio Netti's avatar
Alessio Netti committed
466
467
      dcdbConn = new DCDB::Connection(cassandraSettings.host, atoi(cassandraSettings.port.c_str()), 
                                      cassandraSettings.username, cassandraSettings.password);
Alessio Netti's avatar
Alessio Netti committed
468
469
      dcdbConn->setNumThreadsIo(cassandraSettings.numThreadsIo);
      dcdbConn->setQueueSizeIo(cassandraSettings.queueSizeIo);
470
      uint32_t params[1] = {cassandraSettings.coreConnPerHost};
Alessio Netti's avatar
Alessio Netti committed
471
      dcdbConn->setBackendParams(params);
Alessio Netti's avatar
Alessio Netti committed
472
      
Axel Auweter's avatar
Axel Auweter committed
473
      if (!dcdbConn->connect()) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
474
          LOG(fatal) << "Cannot connect to Cassandra!";
475
476
477
478
479
480
          exit(EXIT_FAILURE);
      }

      /*
       * Legacy behavior: Initialize the DCDB schema in Cassandra.
       */
Axel Auweter's avatar
Axel Auweter committed
481
      dcdbConn->initSchema();
Alessio Netti's avatar
Alessio Netti committed
482
      
483
484
485
      /*
       * Allocate the SensorDataStore.
       */
486
      mySensorDataStore = new DCDB::SensorDataStore(dcdbConn);
487
      mySensorConfig = new DCDB::SensorConfig(dcdbConn);
Alessio Netti's avatar
Alessio Netti committed
488
      myJobDataStore = new DCDB::JobDataStore(dcdbConn);
489
490
491

      /*
       * Set TTL for data store inserts if TTL > 0.
492
       */
Alessio Netti's avatar
Alessio Netti committed
493
494
495
      if (cassandraSettings.ttl > 0)
        mySensorDataStore->setTTL(cassandraSettings.ttl);
      mySensorDataStore->setDebugLog(cassandraSettings.debugLog);
496

497
498
      analyticsController = new AnalyticsController(mySensorConfig, mySensorDataStore);
      analyticsController->setCache(&mySensorCache);
Alessio Netti's avatar
Alessio Netti committed
499
      queryEngine.setFilter(analyticsSettings.filter);
Alessio Netti's avatar
Alessio Netti committed
500
      queryEngine.setJobFilter(analyticsSettings.jobFilter);
Alessio Netti's avatar
Alessio Netti committed
501
      queryEngine.setSensorHierarchy(analyticsSettings.hierarchy);
502
      queryEngine.setQueryCallback(sensorQueryCallback);
Alessio Netti's avatar
Alessio Netti committed
503
      queryEngine.setJobQueryCallback(jobQueryCallback);
504
505
506
      if(!analyticsController->initialize(settings, argv[argc - 1]))
          return EXIT_FAILURE;
      
Alessio Netti's avatar
Alessio Netti committed
507
      LOG_LEVEL vLogLevel = settings.validateConfig ? LOG_LEVEL::info : LOG_LEVEL::debug;
508
509
510
511
      LOG_VAR(vLogLevel) << "-----  Configuration  -----";

      //print global settings in either case
      LOG(info) << "Global Settings:";
Alessio Netti's avatar
Alessio Netti committed
512
      LOG(info) << "    MQTT-listenAddress: " << settings.mqttListenHost << ":" << settings.mqttListenPort;
Alessio Netti's avatar
Alessio Netti committed
513
      LOG(info) << "    CacheInterval:      " << int(pluginSettings.cacheInterval/1000) << " [s]";
514
515
516
517
518
      LOG(info) << "    CleaningInterval:   " << settings.cleaningInterval << " [s]";
      LOG(info) << "    MessageThreads:     " << settings.messageThreads;
      LOG(info) << "    MessageSlots:       " << settings.messageSlots;
      LOG(info) << "    Daemonize:          " << (settings.daemonize ? "Enabled" : "Disabled");
      LOG(info) << "    Statistics:         " << (settings.statistics ? "Enabled" : "Disabled");
Alessio Netti's avatar
Alessio Netti committed
519
      LOG(info) << "    MQTT-prefix:        " << pluginSettings.mqttPrefix;
520
      LOG(info) << "    Auto-publish:       " << (pluginSettings.autoPublish ? "Enabled" : "Disabled");
Alessio Netti's avatar
Alessio Netti committed
521
522
      LOG(info) << "    Write-Dir:          " << pluginSettings.tempdir;
      LOG(info) << (settings.validateConfig ? "    Only validating config files." : "    ValidateConfig:     Disabled");
523

Alessio Netti's avatar
Alessio Netti committed
524
525
      LOG(info) << "Analytics Settings:";
      LOG(info) << "    Hierarchy:          " << (analyticsSettings.hierarchy!="" ? analyticsSettings.hierarchy : "none");
526
      LOG(info) << "    Filter:             " << (analyticsSettings.filter!="" ? analyticsSettings.filter : "none");
Alessio Netti's avatar
Alessio Netti committed
527
      
528
      LOG(info) << "Cassandra Driver Settings:";
Alessio Netti's avatar
Alessio Netti committed
529
      LOG(info) << "    Address:            " << cassandraSettings.host << ":" << cassandraSettings.port;
Alessio Netti's avatar
Alessio Netti committed
530
531
532
533
534
      LOG(info) << "    TTL:                " << cassandraSettings.ttl;
      LOG(info) << "    NumThreadsIO:       " << cassandraSettings.numThreadsIo;
      LOG(info) << "    QueueSizeIO:        " << cassandraSettings.queueSizeIo;
      LOG(info) << "    CoreConnPerHost:    " << cassandraSettings.coreConnPerHost;
      LOG(info) << "    DebugLog:           " << (cassandraSettings.debugLog ? "Enabled" : "Disabled");
535
#ifdef SimpleMQTTVerbose
Alessio Netti's avatar
Alessio Netti committed
536
537
      LOG(info) << "    Username:           " << cassandraSettings.username;
	  LOG(info) << "    Password:           " << cassandraSettings.password;
538
539
540
541
#else
      LOG(info) << "    Username and password not printed.";
#endif

542
543
544
545
546
      if (restAPISettings.enabled) {
	  
	  LOG(info) << "RestAPI Settings:";
	  LOG(info) << "    REST Server: " << restAPISettings.host << ":" << restAPISettings.port;
	  LOG(info) << "    Certificate: " << restAPISettings.certificate;
Alessio Netti's avatar
Alessio Netti committed
547
548
	  LOG(info) << "    Private key file: " << restAPISettings.privateKey;
	  LOG(info) << "    DH params from: " << restAPISettings.dhFile;
549
      }
550
551
      LOG_VAR(vLogLevel) << "-----  Analytics Configuration  -----";
      for(auto& p : analyticsController->getManager()->getPlugins()) {
552
          LOG_VAR(vLogLevel) << "Operator Plugin \"" << p.id << "\"";
553
554
555
556
          p.configurator->printConfig(vLogLevel);
      }
      LOG_VAR(vLogLevel) << "-----  End Configuration  -----";

Alessio Netti's avatar
Alessio Netti committed
557
      if (settings.validateConfig)
558
559
560
561
          return EXIT_SUCCESS;
      else
          analyticsController->start();

562
563
564
      /*
       * Start the MQTT Message Server.
       */
Alessio Netti's avatar
Alessio Netti committed
565
      SimpleMQTTServer ms(settings.mqttListenHost, settings.mqttListenPort, settings.messageThreads, settings.messageSlots);
566
      
567
568
569
      ms.setMessageCallback(mqttCallback);
      ms.start();

Alessio Netti's avatar
Logging    
Alessio Netti committed
570
      LOG(info) << "MQTT Server running...";
571
      
572
573
574
      /*
       * Start the HTTP Server for the REST API
       */
575
      CARestAPI* httpsServer = nullptr;
576
577
578
579
580
581
      if (restAPISettings.enabled) {
	  httpsServer = new CARestAPI(restAPISettings, &mySensorCache, analyticsController);
	  config.readRestAPIUsers(httpsServer);
	  httpsServer->start();
	  LOG(info) <<  "HTTP Server running...";
      }
582

583
584
585
586
587
588
      /*
       * Run (hopefully) forever...
       */
      keepRunning = 1;
      timeval start, end;
      double elapsed;
589
590
591
      msgCtr = 0;
      pmsgCtr = 0;
      readingCtr = 0;
592

Alessio Netti's avatar
Alessio Netti committed
593
594
595
      gettimeofday(&start, NULL);
      uint64_t lastCleanup = start.tv_sec;

596
      LOG(info) << "Collect Agent running...";
597
598
      while(keepRunning) {
          gettimeofday(&start, NULL);
Alessio Netti's avatar
Alessio Netti committed
599
600
601
602
603
604
605
          if(start.tv_sec - lastCleanup > settings.cleaningInterval) {
              uint64_t purged = mySensorCache.clean(settings.cleaningInterval * 1000000000);
              lastCleanup = start.tv_sec;
              if(purged > 0)
                  LOG(info) << "Cache: purged " << purged << " obsolete entries";
          }

606
          sleep(60);
607
608
609
610
          /* not really thread safe but will do the job */
          gettimeofday(&end, NULL);
          elapsed = (end.tv_sec - start.tv_sec) * 1000.0;
          elapsed += (end.tv_usec - start.tv_usec) / 1000.0;
611
          float publish = msgCtr?(pmsgCtr*100.0)/msgCtr:0;
Alessio Netti's avatar
Alessio Netti committed
612
          if (settings.statistics && keepRunning) {
613
              LOG(info) << "Performance: " << (readingCtr/elapsed)*1000.0 << " inserts/s, " << (msgCtr/elapsed)*1000.0 << " messages/s (" << publish << "% PUBLISH)";
614
              LOG(info) << "Analytics Performance: " << (analyticsController->getReadingCtr()/elapsed)*1000.0 << " inserts/s ";
615
          }
616
          msgCtr = 0;
617
          pmsgCtr = 0;
618
	  readingCtr = 0;
619
620
      }

Alessio Netti's avatar
Logging    
Alessio Netti committed
621
      LOG(info) << "Stopping...";
622
      analyticsController->stop();
623
      ms.stop();
Alessio Netti's avatar
Logging    
Alessio Netti committed
624
      LOG(info) << "MQTT Server stopped...";
625
626
627
628
629
      if (restAPISettings.enabled) {
	  httpsServer->stop();
	  delete httpsServer;
	  LOG(info) << "HTTP Server stopped...";
      }
630
      delete mySensorDataStore;
Alessio Netti's avatar
Alessio Netti committed
631
      delete myJobDataStore;
632
      delete mySensorConfig;
Axel Auweter's avatar
Axel Auweter committed
633
634
      dcdbConn->disconnect();
      delete dcdbConn;
Alessio Netti's avatar
Logging    
Alessio Netti committed
635
      LOG(info) << "Collect Agent closed. Bye bye...";
636
  }
637
  catch (const exception& e) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
638
      LOG(fatal) << "Exception: " << e.what();
639
      abrt(EXIT_FAILURE, INTERR);
640
641
  }

642
  return EXIT_SUCCESS;
643
}
644
645