Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

collectagent.cpp 25.2 KB
Newer Older
1
2
//================================================================================
// Name        : collectagent.cpp
3
// Author      : Axel Auweter
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
// Copyright   : Leibniz Supercomputing Centre
6
// Description : Main code of the CollectAgent
7
8
9
10
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
11
// Copyright (C) 2011-2019 Leibniz Supercomputing Centre
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
27

28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
 * @defgroup ca Collect Agent
 *
 * @brief MQTT message broker in between pusher and storage backend.
 *
 * @details Collect Agent is a intermediary between one or multiple Pusher
 *          instances and one storage backend. It runs a reduced custom MQTT
 *          message server. Collect Agent receives data from Pusher
 *          via MQTT messages and stores them in the storage via libdcdb.
 */

/**
 * @file collectagent.cpp
 *
 * @brief Main function for the DCDB Collect Agent.
 *
 * @ingroup ca
 */

47
#include <cstdlib>
48
#include <signal.h>
49
50
#include <unistd.h>
#include <string>
51

52
#include <boost/date_time/posix_time/posix_time.hpp>
53
#include <boost/algorithm/string/trim.hpp>
54

55
#include <dcdb/connection.h>
56
#include <dcdb/sensordatastore.h>
Alessio Netti's avatar
Alessio Netti committed
57
#include <dcdb/jobdatastore.h>
58
#include <dcdb/sensorconfig.h>
59
#include <dcdb/version.h>
60
#include <dcdb/sensor.h>
61
#include "version.h"
62

63
#include "CARestAPI.h"
64
#include "configuration.h"
65
#include "simplemqttserver.h"
66
#include "messaging.h"
67
#include "abrt.h"
68
#include "dcdbdaemon.h"
69
#include "sensorcache.h"
70
71
#include "analyticscontroller.h"
#include "../analytics/includes/QueryEngine.h"
72

73
74
75
#define __STDC_FORMAT_MACROS
#include <inttypes.h>

76
using namespace std;
77
78

int keepRunning;
79
bool statistics;
80
81
uint64_t msgCtr;
uint64_t pmsgCtr;
82
uint64_t readingCtr;
Alessio Netti's avatar
Alessio Netti committed
83
SensorCache mySensorCache;
84
AnalyticsController* analyticsController;
85
DCDB::Connection* dcdbConn;
86
DCDB::SensorDataStore *mySensorDataStore;
Alessio Netti's avatar
Alessio Netti committed
87
DCDB::JobDataStore *myJobDataStore;
88
DCDB::SensorConfig *mySensorConfig;
89
MetadataStore *metadataStore;
90
DCDB::SCError err;
91
QueryEngine& queryEngine = QueryEngine::getInstance();
92
logger_t lg;
93

94
bool jobQueryCallback(const uint32_t jobId, const uint64_t startTs, const uint64_t endTs, vector<qeJobData>& buffer, const bool rel, const bool range) {
Alessio Netti's avatar
Alessio Netti committed
95
96
97
98
99
100
101
102
103
104
    std::list<JobData> tempList;
    JobData   tempData;
    qeJobData tempQeData;
    JDError err;
    if(range) {
        // Getting a list of jobs in the given time range
        uint64_t now = getTimestamp();
        uint64_t startTsInt = rel ? now - startTs : startTs;
        uint64_t endTsInt = rel ? now - endTs : endTs;
        DCDB::TimeStamp start(startTsInt), end(endTsInt);
105
        err = myJobDataStore->getJobsInIntervalRunning(tempList, start, end);
106
        if(err != JD_OK) return false;
Alessio Netti's avatar
Alessio Netti committed
107
108
109
    } else {
        // Getting a single job by id
        err = myJobDataStore->getJobById(tempData, jobId);
110
        if(err != JD_OK) return false;
Alessio Netti's avatar
Alessio Netti committed
111
112
113
114
115
116
117
118
119
        tempList.push_back(tempData);
    }
    
    for(auto& jd : tempList) {
        tempQeData.jobId = jd.jobId;
        tempQeData.userId = jd.userId;
        tempQeData.startTime = jd.startTime.getRaw();
        tempQeData.endTime = jd.endTime.getRaw();
        tempQeData.nodes = jd.nodes;
120
        buffer.push_back(tempQeData);
Alessio Netti's avatar
Alessio Netti committed
121
    }
122
    return true;
Alessio Netti's avatar
Alessio Netti committed
123
124
}

125
bool sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>& buffer, const bool rel) {
126
    std::string topic=name;
Alessio Netti's avatar
Alessio Netti committed
127
    // Getting the topic of the queried sensor from the Navigator
128
    // If not found, we try to use the input name as topic
129
130
    try {
        topic = queryEngine.getNavigator()->getNodeTopic(name);
131
    } catch(const std::domain_error& e) {}
132
    DCDB::SensorId sid;
Alessio Netti's avatar
Alessio Netti committed
133
    // Creating a SID to perform the query
134
135
    if(!sid.mqttTopicConvert(topic))
        return false;
136
137
    if(mySensorCache.getSensorMap().count(sid) > 0) {
        CacheEntry &entry = mySensorCache.getSensorMap()[sid];
138
139
        if (entry.getView(startTs, endTs, buffer, rel))
            return true;
140
141
142
143
144
145
146
147
148
149
150
151
152
    }
    // If we are here then the sensor was not found in the cache - we need to fetch data from Cassandra
    try {
        DCDB::PublicSensor publicSensor;
        publicSensor.name = name;
        publicSensor.pattern = topic;
        std::list <DCDB::SensorDataStoreReading> results;
        DCDB::Sensor sensor(dcdbConn, publicSensor);
        uint64_t now = getTimestamp();
        //Converting relative timestamps to absolute
        uint64_t startTsInt = rel ? now - startTs : startTs;
        uint64_t endTsInt = rel ? now - endTs : endTs;
        DCDB::TimeStamp start(startTsInt), end(endTsInt);
Alessio Netti's avatar
Alessio Netti committed
153
        sensor.query(results, start, end, DCDB::AGGREGATE_NONE, 3600000000000);
154
155
        if(results.empty())
            return false;
156
157
158
159
        reading_t reading;
        for (const auto &r : results) {
            reading.value = r.value;
            reading.timestamp = r.timeStamp.getRaw();
160
            buffer.push_back(reading);
161
162
163
        }
    }
    catch(const std::exception& e) {
164
        return false;
165
    }
166
    return true;
167
168
}

169
/* Normal termination (SIGINT, CTRL+C) */
170
171
void sigHandler(int sig)
{
Alessio Netti's avatar
Alessio Netti committed
172
173
174
175
176
  boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
  if( sig == SIGINT )
      LOG(fatal) << "Received SIGINT";
  else if( sig == SIGTERM )
      LOG(fatal) << "Received SIGTERM";
177
178
179
  keepRunning = 0;
}

180
181
182
183
184
185
/* Crash */
void abrtHandler(int sig)
{
  abrt(EXIT_FAILURE, SIGNAL);
}

186
int mqttCallback(SimpleMQTTMessage *msg)
187
188
189
190
{
  /*
   * Increment the msgCtr/vmsgCtr for statistics.
   */
191
  msgCtr++;
192
193
194
  if (msg->isPublish())
    pmsgCtr++;

195
  uint64_t len;
196
197
198
  /*
   * Decode the message and put into the database.
   */
199
  if (msg->isPublish()) {
200
      const char *topic = msg->getTopic().c_str();
201
      // We check whether the topic includes the /DCDB_MAP/ keyword, indicating that the payload will contain the
202
203
      // sensor's name. In that case, we set the mappingMessage flag to true, and filter the keyword out of the prefix
      // We use strncmp as it is the most efficient way to do it
Alessio Netti's avatar
Alessio Netti committed
204
      if (strncmp(topic, DCDB_MAP, DCDB_MAP_LEN) == 0) {
205
          if ((len = msg->getPayloadLength()) == 0) {
206
              LOG(error) << "Empty sensor publish message received!";
207
208
              return 1;
          }
209

210
          string payload((char *) msg->getPayload(), len);
211
212
213
214
215
216
217
218
219
          //If the topic includes the extended /DCDB_MAP/METADATA/ keyword, we assume a JSON metadata packet is encoded
          if(strncmp(topic, DCDB_MET, DCDB_MET_LEN) == 0) {
              SensorMetadata sm;
              try {
                  sm.parseJSON(payload);
              } catch (const std::exception &e) {
                  LOG(error) << "Invalid metadata packed received!";
                  return 1;
              }
220
              DCDB::PublicSensor ps = Configuration::metadataToPublicSensor(sm);
Alessio Netti's avatar
Alessio Netti committed
221
              err = mySensorConfig->publishSensor(ps);
222
              metadataStore->store(sm.pattern, sm);
223
224
          } else {
              err = mySensorConfig->publishSensor(payload.c_str(), topic + DCDB_MAP_LEN);
225
          }
226
227
228
229

          // PublishSensor does most of the error checking for us
          switch (err) {
              case DCDB::SC_INVALIDPATTERN:
Alessio Netti's avatar
Logging    
Alessio Netti committed
230
                  LOG(error) << "Invalid sensor topic : " << msg->getTopic();
231
232
                  return 1;
              case DCDB::SC_INVALIDPUBLICNAME:
233
                  LOG(error) << "Invalid sensor public name.";
234
235
                  return 1;
              case DCDB::SC_INVALIDSESSION:
Alessio Netti's avatar
Logging    
Alessio Netti committed
236
                  LOG(error) << "Cannot reach sensor data store.";
237
238
239
240
241
242
243
244
245
246
247
                  return 1;
              default:
                  break;
          }
      } else {
          mqttPayload buf, *payload;

          len = msg->getPayloadLength();
          //In the 64 bit message case, the collect agent provides a timestamp
          if (len == sizeof(uint64_t)) {
              payload = &buf;
248
              payload->value = *((int64_t *) msg->getPayload());
249
250
              payload->timestamp = Messaging::calculateTimestamp();
              len = sizeof(uint64_t) * 2;
251
          }
252
253
254
255
256
257
              //...otherwise it just retrieves it from the MQTT message payload.
          else if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
              payload = (mqttPayload *) msg->getPayload();
          }
              //...otherwise this message is malformed -> ignore...
          else {
Alessio Netti's avatar
Logging    
Alessio Netti committed
258
              LOG(error) << "Message malformed";
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
              return 1;
          }

          /*
           * Check if we can decode the message topic
           * into a valid SensorId. If successful, store
           * the record in the database.
           */
          DCDB::SensorId sid;
          if (sid.mqttTopicConvert(msg->getTopic())) {
#if 0
              cout << "Topic decode successful:" << endl
                  << "  Raw:            " << hex << setw(16) << setfill('0') << sid.getRaw()[0] << hex << setw(16) << setfill('0') << sid.getRaw()[1] << endl
                  << "  DeviceLocation: " << hex << setw(16) << setfill('0') << sid.getDeviceLocation() << endl
                  << "  device_id:      " << hex << setw(8) << setfill('0') << sid.getDeviceSensorId().device_id << endl
                  << "  sensor_number:  " << hex << setw(4) << setfill('0') << sid.getDeviceSensorId().sensor_number << endl << dec;

              cout << "Payload ("  << len/sizeof(mqttPayload) << " messages):"<< endl;
              for (uint64_t i=0; i<len/sizeof(mqttPayload); i++) {
                cout << "  " << i << ": ts=" << payload[i].timestamp << " val=" << payload[i].value << endl;
              }
              cout << endl;
281
#endif
282
	      std::list<DCDB::SensorDataStoreReading> readings;
Alessio Netti's avatar
Alessio Netti committed
283
284
285
286
287
288
          for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
              DCDB::SensorDataStoreReading r(sid, payload[i].timestamp, payload[i].value);
              readings.push_back(r);
              mySensorCache.storeSensor(sid, payload[i].timestamp, payload[i].value);
          }
          mySensorCache.getSensorMap()[sid].updateBatchSize(uint64_t(len / sizeof(mqttPayload)));
289
	      mySensorDataStore->insertBatch(readings, metadataStore->getTTL(msg->getTopic()));
290
	      readingCtr+= readings.size();
291

292
              //mySensorCache.dump();
293
          }
294
#if 1
295
296
297
298
          else {
              cout << "Wrong topic format: " << msg->getTopic() << "\n";
              return 1;
          }
299
#endif
300
      }
301
  }
302
  return 0;
303
304
}

305
306
307



308
309
310
311
/*
 * Print usage information
 */
void usage() {
Alessio Netti's avatar
Alessio Netti committed
312
  Configuration config("", "collectagent.conf");
313
314
315
316
317
  /*
             1         2         3         4         5         6         7         8
   012345678901234567890123456789012345678901234567890123456789012345678901234567890
   */
  cout << "Usage:" << endl;
318
  cout << "  collectagent [-d] [-s] [-x] [-a] [-m<host>] [-c<host>] [-u<username>] [-p<password>] [-t<ttl>] [-v<verbosity>] <path/to/configfiles/>" << endl;
319
320
321
322
  cout << "  collectagent -h" << endl;
  cout << endl;
  
  cout << "Options:" << endl;
Alessio Netti's avatar
Alessio Netti committed
323
324
  cout << "  -m<host>      MQTT listen address     [default: " << config.mqttListenHost << ":" << config.mqttListenPort << "]" << endl;
  cout << "  -c<host>      Cassandra host          [default: " << config.cassandraSettings.host << ":" << config.cassandraSettings.port << "]" << endl;
Michael Ott's avatar
Michael Ott committed
325
326
  cout << "  -u<username>  Cassandra username      [default: none]" << endl;
  cout << "  -p<password>  Cassandra password      [default: none]" << endl;
Alessio Netti's avatar
Alessio Netti committed
327
328
  cout << "  -t<ttl>       Cassandra insert TTL    [default: " << config.cassandraSettings.ttl << "]" << endl;
  cout << "  -v<level>     Set verbosity of output [default: " << config.logLevelCmd << "]" << endl
Alessio Netti's avatar
Logging    
Alessio Netti committed
329
       << "                Can be a number between 5 (all) and 0 (fatal)." << endl;
330
  cout << endl;
Michael Ott's avatar
Michael Ott committed
331
  cout << "  -d            Daemonize" << endl;
332
  cout << "  -s            Print message stats" <<endl;
333
  cout << "  -x            Parse and print the config but do not actually start collectagent" << endl;
334
  cout << "  -a			   Enable sensor auto-publish" << endl;
Michael Ott's avatar
Michael Ott committed
335
  cout << "  -h            This help page" << endl;
336
  cout << endl;
337
338
339
}

int main(int argc, char* const argv[]) {
340
    cout << "CollectAgent " << VERSION << " (libdcdb " << DCDB::Version::getVersion() << ")" << endl << endl;
341
342

  try{
343

344
345
346
347
348
349
350
351
      // Checking if path to config is supplied
      if (argc <= 1) {
          cout << "Please specify a path to the config-directory" << endl << endl;
          usage();
          exit(EXIT_FAILURE);
      }

      // Defining options
352
      const char* opts = "m:r:c:C:u:p:t:v:dDsaxh";
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368

      // Same mechanism as in DCDBPusher - checking if help string is requested before loading config
      char ret;
      while ((ret = getopt(argc, argv, opts)) != -1) {
          switch (ret)
          {
              case 'h':
                  usage();
                  exit(EXIT_FAILURE);
                  break;
              default:
                  //do nothing (other options are read later on)
                  break;
          }
      }

369
370
      initLogging();
      auto cmdSink = setupCmdLogger();
Alessio Netti's avatar
Logging    
Alessio Netti committed
371

Alessio Netti's avatar
Alessio Netti committed
372
373
      Configuration config(argv[argc - 1], "collectagent.conf");
      if( !config.readConfig() ) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
374
          LOG(fatal) << "Failed to read global configuration!";
375
376
377
          exit(EXIT_FAILURE);
      }

Alessio Netti's avatar
Alessio Netti committed
378
379
380
381
      // References to shorten access to config parameters
      Configuration& settings = config;
      cassandra_t& cassandraSettings = config.cassandraSettings;
      pluginSettings_t& pluginSettings = config.pluginSettings;
Micha Mueller's avatar
Micha Mueller committed
382
      serverSettings_t& restAPISettings = config.restAPISettings;
Alessio Netti's avatar
Alessio Netti committed
383
      analyticsSettings_t& analyticsSettings = config.analyticsSettings;
Alessio Netti's avatar
Alessio Netti committed
384
      
385
386
      optind = 1;
      while ((ret=getopt(argc, argv, opts))!=-1) {
387
          switch(ret) {
388
              case 'a':
389
                  pluginSettings.autoPublish = true;
390
                  break;
391
              case 'm':
Alessio Netti's avatar
Alessio Netti committed
392
393
394
                  settings.mqttListenHost = parseNetworkHost(optarg);
                  settings.mqttListenPort = parseNetworkPort(optarg);
                  if(settings.mqttListenPort=="") settings.mqttListenPort = string(LISTENPORT);
395
                  break;
396
              case 'c':
Alessio Netti's avatar
Alessio Netti committed
397
398
399
                  cassandraSettings.host = parseNetworkHost(optarg);
                  cassandraSettings.port = parseNetworkPort(optarg);
                  if(cassandraSettings.port=="") cassandraSettings.port = string(CASSANDRAPORT);
400
                  break;
Michael Ott's avatar
Michael Ott committed
401
              case 'u':
Alessio Netti's avatar
Alessio Netti committed
402
                  cassandraSettings.username = optarg;
403
                  break;
Michael Ott's avatar
Michael Ott committed
404
              case 'p': {
Alessio Netti's avatar
Alessio Netti committed
405
                  cassandraSettings.password = optarg;
406
                  // What does this do? Mask the password?
Michael Ott's avatar
Michael Ott committed
407
408
409
410
411
412
413
                  size_t pwdLen = strlen(optarg);
                  memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
                  if (pwdLen > 3) {
                      memset(optarg+3, 0, pwdLen-3);
                  }
                  break;
              }
414
              case 't':
Alessio Netti's avatar
Alessio Netti committed
415
                  cassandraSettings.ttl = stoul(optarg);
416
                  break;
Alessio Netti's avatar
Logging    
Alessio Netti committed
417
              case 'v':
418
                  settings.logLevelCmd = stoi(optarg);
Alessio Netti's avatar
Logging    
Alessio Netti committed
419
                  break;
420
              case 'd':
421
              case 'D':
422
                  settings.daemonize = 1;
423
                  break;
424
              case 's':
425
                  settings.statistics = 1;
426
                  break;
427
              case 'x':
Alessio Netti's avatar
Alessio Netti committed
428
                  settings.validateConfig = true;
429
                  break;
430
              case 'h':
431
432
433
              default:
                  usage();
                  exit(EXIT_FAILURE);
434
435
436
          }
      }

437
438
      //set up logger to file
      if (settings.logLevelFile >= 0) {
Alessio Netti's avatar
Alessio Netti committed
439
	  auto fileSink = setupFileLogger(pluginSettings.tempdir, std::string("collectagent"));
440
441
442
	  fileSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelFile));
      }
      
Alessio Netti's avatar
Logging    
Alessio Netti committed
443
      //severity level may be overwritten (per option or config-file) --> set it according to globalSettings
444
445
446
      if (settings.logLevelCmd >= 0) {
	  cmdSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelCmd));
      }
Alessio Netti's avatar
Logging    
Alessio Netti committed
447

448
      /*
Alessio Netti's avatar
Alessio Netti committed
449
       * Catch SIGINT and SIGTERM signals to allow for proper server shutdowns.
450
451
       */
      signal(SIGINT, sigHandler);
Alessio Netti's avatar
Alessio Netti committed
452
      signal(SIGTERM, sigHandler);
453
454
455
456
457
458
459
460
461
462

      /*
       * Catch critical signals to allow for backtraces
       */
      signal(SIGABRT, abrtHandler);
      signal(SIGSEGV, abrtHandler);

      // Daemonizing the collectagent
      if(settings.daemonize)
          dcdbdaemon();
Alessio Netti's avatar
Alessio Netti committed
463
      
464
      // Setting the size of the sensor cache
465
      // Conversion from milliseconds to nanoseconds
Alessio Netti's avatar
Alessio Netti committed
466
      mySensorCache.setMaxHistory(uint64_t(pluginSettings.cacheInterval) * 1000000);
467

468
      //Allocate and initialize connection to Cassandra.
Alessio Netti's avatar
Alessio Netti committed
469
470
      dcdbConn = new DCDB::Connection(cassandraSettings.host, atoi(cassandraSettings.port.c_str()), 
                                      cassandraSettings.username, cassandraSettings.password);
Alessio Netti's avatar
Alessio Netti committed
471
472
      dcdbConn->setNumThreadsIo(cassandraSettings.numThreadsIo);
      dcdbConn->setQueueSizeIo(cassandraSettings.queueSizeIo);
473
      uint32_t params[1] = {cassandraSettings.coreConnPerHost};
Alessio Netti's avatar
Alessio Netti committed
474
      dcdbConn->setBackendParams(params);
Alessio Netti's avatar
Alessio Netti committed
475
      
Axel Auweter's avatar
Axel Auweter committed
476
      if (!dcdbConn->connect()) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
477
          LOG(fatal) << "Cannot connect to Cassandra!";
478
479
480
481
482
483
          exit(EXIT_FAILURE);
      }

      /*
       * Legacy behavior: Initialize the DCDB schema in Cassandra.
       */
Axel Auweter's avatar
Axel Auweter committed
484
      dcdbConn->initSchema();
Alessio Netti's avatar
Alessio Netti committed
485
      
486
487
488
      /*
       * Allocate the SensorDataStore.
       */
489
      mySensorDataStore = new DCDB::SensorDataStore(dcdbConn);
490
      mySensorConfig = new DCDB::SensorConfig(dcdbConn);
Alessio Netti's avatar
Alessio Netti committed
491
      myJobDataStore = new DCDB::JobDataStore(dcdbConn);
492
493
494

      /*
       * Set TTL for data store inserts if TTL > 0.
495
       */
Alessio Netti's avatar
Alessio Netti committed
496
497
498
      if (cassandraSettings.ttl > 0)
        mySensorDataStore->setTTL(cassandraSettings.ttl);
      mySensorDataStore->setDebugLog(cassandraSettings.debugLog);
499

500
501
502
503
      // Fetching public sensor information from the Cassandra datastore
      list<DCDB::PublicSensor> publicSensors;
      metadataStore = new MetadataStore();
      mySensorConfig->getPublicSensorsVerbose(publicSensors);
504
      SensorMetadata sBuf;
505
      for (const auto &s : publicSensors)
506
507
508
509
510
          if (!s.is_virtual) {
              sBuf = Configuration::publicSensorToMetadata(s);
              boost::algorithm::trim(sBuf.pattern);
              metadataStore->store(sBuf.pattern, sBuf);
          }
511
          
512
513
      analyticsController = new AnalyticsController(mySensorConfig, mySensorDataStore);
      analyticsController->setCache(&mySensorCache);
514
      analyticsController->setMetadataStore(metadataStore);
Alessio Netti's avatar
Alessio Netti committed
515
      queryEngine.setFilter(analyticsSettings.filter);
Alessio Netti's avatar
Alessio Netti committed
516
      queryEngine.setJobFilter(analyticsSettings.jobFilter);
Alessio Netti's avatar
Alessio Netti committed
517
      queryEngine.setSensorHierarchy(analyticsSettings.hierarchy);
518
      queryEngine.setQueryCallback(sensorQueryCallback);
Alessio Netti's avatar
Alessio Netti committed
519
      queryEngine.setJobQueryCallback(jobQueryCallback);
520
      if(!analyticsController->initialize(settings, argv[argc - 1]))
521
522
          return EXIT_FAILURE;
      
Alessio Netti's avatar
Alessio Netti committed
523
      LOG_LEVEL vLogLevel = settings.validateConfig ? LOG_LEVEL::info : LOG_LEVEL::debug;
524
525
526
527
      LOG_VAR(vLogLevel) << "-----  Configuration  -----";

      //print global settings in either case
      LOG(info) << "Global Settings:";
Alessio Netti's avatar
Alessio Netti committed
528
      LOG(info) << "    MQTT-listenAddress: " << settings.mqttListenHost << ":" << settings.mqttListenPort;
Alessio Netti's avatar
Alessio Netti committed
529
      LOG(info) << "    CacheInterval:      " << int(pluginSettings.cacheInterval/1000) << " [s]";
530
531
532
533
534
      LOG(info) << "    CleaningInterval:   " << settings.cleaningInterval << " [s]";
      LOG(info) << "    MessageThreads:     " << settings.messageThreads;
      LOG(info) << "    MessageSlots:       " << settings.messageSlots;
      LOG(info) << "    Daemonize:          " << (settings.daemonize ? "Enabled" : "Disabled");
      LOG(info) << "    Statistics:         " << (settings.statistics ? "Enabled" : "Disabled");
Alessio Netti's avatar
Alessio Netti committed
535
      LOG(info) << "    MQTT-prefix:        " << pluginSettings.mqttPrefix;
536
      LOG(info) << "    Auto-publish:       " << (pluginSettings.autoPublish ? "Enabled" : "Disabled");
Alessio Netti's avatar
Alessio Netti committed
537
538
      LOG(info) << "    Write-Dir:          " << pluginSettings.tempdir;
      LOG(info) << (settings.validateConfig ? "    Only validating config files." : "    ValidateConfig:     Disabled");
539

Alessio Netti's avatar
Alessio Netti committed
540
541
      LOG(info) << "Analytics Settings:";
      LOG(info) << "    Hierarchy:          " << (analyticsSettings.hierarchy!="" ? analyticsSettings.hierarchy : "none");
542
      LOG(info) << "    Filter:             " << (analyticsSettings.filter!="" ? analyticsSettings.filter : "none");
Alessio Netti's avatar
Alessio Netti committed
543
      
544
      LOG(info) << "Cassandra Driver Settings:";
Alessio Netti's avatar
Alessio Netti committed
545
      LOG(info) << "    Address:            " << cassandraSettings.host << ":" << cassandraSettings.port;
Alessio Netti's avatar
Alessio Netti committed
546
547
548
549
550
      LOG(info) << "    TTL:                " << cassandraSettings.ttl;
      LOG(info) << "    NumThreadsIO:       " << cassandraSettings.numThreadsIo;
      LOG(info) << "    QueueSizeIO:        " << cassandraSettings.queueSizeIo;
      LOG(info) << "    CoreConnPerHost:    " << cassandraSettings.coreConnPerHost;
      LOG(info) << "    DebugLog:           " << (cassandraSettings.debugLog ? "Enabled" : "Disabled");
551
#ifdef SimpleMQTTVerbose
Alessio Netti's avatar
Alessio Netti committed
552
553
      LOG(info) << "    Username:           " << cassandraSettings.username;
	  LOG(info) << "    Password:           " << cassandraSettings.password;
554
555
556
557
#else
      LOG(info) << "    Username and password not printed.";
#endif

558
559
560
561
562
      if (restAPISettings.enabled) {
	  
	  LOG(info) << "RestAPI Settings:";
	  LOG(info) << "    REST Server: " << restAPISettings.host << ":" << restAPISettings.port;
	  LOG(info) << "    Certificate: " << restAPISettings.certificate;
Alessio Netti's avatar
Alessio Netti committed
563
564
	  LOG(info) << "    Private key file: " << restAPISettings.privateKey;
	  LOG(info) << "    DH params from: " << restAPISettings.dhFile;
565
      }
566
567
      LOG_VAR(vLogLevel) << "-----  Analytics Configuration  -----";
      for(auto& p : analyticsController->getManager()->getPlugins()) {
568
          LOG_VAR(vLogLevel) << "Operator Plugin \"" << p.id << "\"";
569
570
571
572
          p.configurator->printConfig(vLogLevel);
      }
      LOG_VAR(vLogLevel) << "-----  End Configuration  -----";

Alessio Netti's avatar
Alessio Netti committed
573
      if (settings.validateConfig)
574
575
576
577
          return EXIT_SUCCESS;
      else
          analyticsController->start();

578
579
580
      /*
       * Start the MQTT Message Server.
       */
Alessio Netti's avatar
Alessio Netti committed
581
      SimpleMQTTServer ms(settings.mqttListenHost, settings.mqttListenPort, settings.messageThreads, settings.messageSlots);
582
      
583
584
585
      ms.setMessageCallback(mqttCallback);
      ms.start();

Alessio Netti's avatar
Logging    
Alessio Netti committed
586
      LOG(info) << "MQTT Server running...";
587
      
588
589
590
      /*
       * Start the HTTP Server for the REST API
       */
591
      CARestAPI* httpsServer = nullptr;
592
593
594
595
596
597
      if (restAPISettings.enabled) {
	  httpsServer = new CARestAPI(restAPISettings, &mySensorCache, analyticsController);
	  config.readRestAPIUsers(httpsServer);
	  httpsServer->start();
	  LOG(info) <<  "HTTP Server running...";
      }
598

599
600
601
602
603
604
      /*
       * Run (hopefully) forever...
       */
      keepRunning = 1;
      timeval start, end;
      double elapsed;
605
606
607
      msgCtr = 0;
      pmsgCtr = 0;
      readingCtr = 0;
608

Alessio Netti's avatar
Alessio Netti committed
609
610
611
      gettimeofday(&start, NULL);
      uint64_t lastCleanup = start.tv_sec;

612
      LOG(info) << "Collect Agent running...";
613
614
      while(keepRunning) {
          gettimeofday(&start, NULL);
Alessio Netti's avatar
Alessio Netti committed
615
616
617
618
619
620
621
          if(start.tv_sec - lastCleanup > settings.cleaningInterval) {
              uint64_t purged = mySensorCache.clean(settings.cleaningInterval * 1000000000);
              lastCleanup = start.tv_sec;
              if(purged > 0)
                  LOG(info) << "Cache: purged " << purged << " obsolete entries";
          }

622
          sleep(60);
623
624
625
626
          /* not really thread safe but will do the job */
          gettimeofday(&end, NULL);
          elapsed = (end.tv_sec - start.tv_sec) * 1000.0;
          elapsed += (end.tv_usec - start.tv_usec) / 1000.0;
627
          float publish = msgCtr?(pmsgCtr*100.0)/msgCtr:0;
Alessio Netti's avatar
Alessio Netti committed
628
          if (settings.statistics && keepRunning) {
629
              LOG(info) << "Performance: " << (readingCtr/elapsed)*1000.0 << " inserts/s, " << (msgCtr/elapsed)*1000.0 << " messages/s (" << publish << "% PUBLISH)";
630
              LOG(info) << "Analytics Performance: " << (analyticsController->getReadingCtr()/elapsed)*1000.0 << " inserts/s ";
631
          }
632
          msgCtr = 0;
633
          pmsgCtr = 0;
634
	  readingCtr = 0;
635
636
      }

Alessio Netti's avatar
Logging    
Alessio Netti committed
637
      LOG(info) << "Stopping...";
638
      analyticsController->stop();
639
      ms.stop();
Alessio Netti's avatar
Logging    
Alessio Netti committed
640
      LOG(info) << "MQTT Server stopped...";
641
642
643
644
645
      if (restAPISettings.enabled) {
	  httpsServer->stop();
	  delete httpsServer;
	  LOG(info) << "HTTP Server stopped...";
      }
646
      delete mySensorDataStore;
Alessio Netti's avatar
Alessio Netti committed
647
      delete myJobDataStore;
648
      delete mySensorConfig;
Axel Auweter's avatar
Axel Auweter committed
649
650
      dcdbConn->disconnect();
      delete dcdbConn;
651
      delete metadataStore;
Alessio Netti's avatar
Logging    
Alessio Netti committed
652
      LOG(info) << "Collect Agent closed. Bye bye...";
653
  }
654
  catch (const exception& e) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
655
      LOG(fatal) << "Exception: " << e.what();
656
      abrt(EXIT_FAILURE, INTERR);
657
658
  }

659
  return EXIT_SUCCESS;
660
}
661
662