Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

collectagent.cpp 24.9 KB
Newer Older
1
2
//================================================================================
// Name        : collectagent.cpp
3
// Author      : Axel Auweter
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
// Copyright   : Leibniz Supercomputing Centre
6
// Description : Main code of the CollectAgent
7
8
9
10
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
11
// Copyright (C) 2011-2019 Leibniz Supercomputing Centre
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
27

28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
 * @defgroup ca Collect Agent
 *
 * @brief MQTT message broker in between pusher and storage backend.
 *
 * @details Collect Agent is a intermediary between one or multiple Pusher
 *          instances and one storage backend. It runs a reduced custom MQTT
 *          message server. Collect Agent receives data from Pusher
 *          via MQTT messages and stores them in the storage via libdcdb.
 */

/**
 * @file collectagent.cpp
 *
 * @brief Main function for the DCDB Collect Agent.
 *
 * @ingroup ca
 */

47
#include <cstdlib>
48
#include <signal.h>
49
50
#include <unistd.h>
#include <string>
51

52
#include <boost/date_time/posix_time/posix_time.hpp>
53

54
#include <dcdb/connection.h>
55
#include <dcdb/sensordatastore.h>
Alessio Netti's avatar
Alessio Netti committed
56
#include <dcdb/jobdatastore.h>
57
#include <dcdb/sensorconfig.h>
58
#include <dcdb/version.h>
59
#include <dcdb/sensor.h>
60
#include "version.h"
61

62
#include "CARestAPI.h"
63
#include "configuration.h"
64
#include "simplemqttserver.h"
65
#include "messaging.h"
66
#include "abrt.h"
67
#include "dcdbdaemon.h"
68
#include "sensorcache.h"
69
70
#include "analyticscontroller.h"
#include "../analytics/includes/QueryEngine.h"
71

72
73
74
#define __STDC_FORMAT_MACROS
#include <inttypes.h>

75
using namespace std;
76
77

int keepRunning;
78
bool statistics;
79
80
uint64_t msgCtr;
uint64_t pmsgCtr;
81
uint64_t readingCtr;
Alessio Netti's avatar
Alessio Netti committed
82
SensorCache mySensorCache;
83
AnalyticsController* analyticsController;
84
DCDB::Connection* dcdbConn;
85
DCDB::SensorDataStore *mySensorDataStore;
Alessio Netti's avatar
Alessio Netti committed
86
DCDB::JobDataStore *myJobDataStore;
87
DCDB::SensorConfig *mySensorConfig;
88
MetadataStore *metadataStore;
89
DCDB::SCError err;
90
QueryEngine& queryEngine = QueryEngine::getInstance();
91
logger_t lg;
92

93
bool jobQueryCallback(const uint32_t jobId, const uint64_t startTs, const uint64_t endTs, vector<qeJobData>& buffer, const bool rel, const bool range) {
Alessio Netti's avatar
Alessio Netti committed
94
95
96
97
98
99
100
101
102
103
    std::list<JobData> tempList;
    JobData   tempData;
    qeJobData tempQeData;
    JDError err;
    if(range) {
        // Getting a list of jobs in the given time range
        uint64_t now = getTimestamp();
        uint64_t startTsInt = rel ? now - startTs : startTs;
        uint64_t endTsInt = rel ? now - endTs : endTs;
        DCDB::TimeStamp start(startTsInt), end(endTsInt);
104
        err = myJobDataStore->getJobsInIntervalRunning(tempList, start, end);
105
        if(err != JD_OK) return false;
Alessio Netti's avatar
Alessio Netti committed
106
107
108
    } else {
        // Getting a single job by id
        err = myJobDataStore->getJobById(tempData, jobId);
109
        if(err != JD_OK) return false;
Alessio Netti's avatar
Alessio Netti committed
110
111
112
113
114
115
116
117
118
        tempList.push_back(tempData);
    }
    
    for(auto& jd : tempList) {
        tempQeData.jobId = jd.jobId;
        tempQeData.userId = jd.userId;
        tempQeData.startTime = jd.startTime.getRaw();
        tempQeData.endTime = jd.endTime.getRaw();
        tempQeData.nodes = jd.nodes;
119
        buffer.push_back(tempQeData);
Alessio Netti's avatar
Alessio Netti committed
120
    }
121
    return true;
Alessio Netti's avatar
Alessio Netti committed
122
123
}

124
bool sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>& buffer, const bool rel) {
125
    std::string topic=name;
Alessio Netti's avatar
Alessio Netti committed
126
    // Getting the topic of the queried sensor from the Navigator
127
    // If not found, we try to use the input name as topic
128
129
    try {
        topic = queryEngine.getNavigator()->getNodeTopic(name);
130
    } catch(const std::domain_error& e) {}
131
    DCDB::SensorId sid;
Alessio Netti's avatar
Alessio Netti committed
132
    // Creating a SID to perform the query
133
134
    if(!sid.mqttTopicConvert(topic))
        return false;
135
136
    if(mySensorCache.getSensorMap().count(sid) > 0) {
        CacheEntry &entry = mySensorCache.getSensorMap()[sid];
137
138
        if (entry.getView(startTs, endTs, buffer, rel))
            return true;
139
140
141
142
143
144
145
146
147
148
149
150
151
    }
    // If we are here then the sensor was not found in the cache - we need to fetch data from Cassandra
    try {
        DCDB::PublicSensor publicSensor;
        publicSensor.name = name;
        publicSensor.pattern = topic;
        std::list <DCDB::SensorDataStoreReading> results;
        DCDB::Sensor sensor(dcdbConn, publicSensor);
        uint64_t now = getTimestamp();
        //Converting relative timestamps to absolute
        uint64_t startTsInt = rel ? now - startTs : startTs;
        uint64_t endTsInt = rel ? now - endTs : endTs;
        DCDB::TimeStamp start(startTsInt), end(endTsInt);
Alessio Netti's avatar
Alessio Netti committed
152
        sensor.query(results, start, end, DCDB::AGGREGATE_NONE, 3600000000000);
153
154
        if(results.empty())
            return false;
155
156
157
158
        reading_t reading;
        for (const auto &r : results) {
            reading.value = r.value;
            reading.timestamp = r.timeStamp.getRaw();
159
            buffer.push_back(reading);
160
161
162
        }
    }
    catch(const std::exception& e) {
163
        return false;
164
    }
165
    return true;
166
167
}

168
/* Normal termination (SIGINT, CTRL+C) */
169
170
void sigHandler(int sig)
{
Alessio Netti's avatar
Alessio Netti committed
171
172
173
174
175
  boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
  if( sig == SIGINT )
      LOG(fatal) << "Received SIGINT";
  else if( sig == SIGTERM )
      LOG(fatal) << "Received SIGTERM";
176
177
178
  keepRunning = 0;
}

179
180
181
182
183
184
/* Crash */
void abrtHandler(int sig)
{
  abrt(EXIT_FAILURE, SIGNAL);
}

185
int mqttCallback(SimpleMQTTMessage *msg)
186
187
188
189
{
  /*
   * Increment the msgCtr/vmsgCtr for statistics.
   */
190
  msgCtr++;
191
192
193
  if (msg->isPublish())
    pmsgCtr++;

194
  uint64_t len;
195
196
197
  /*
   * Decode the message and put into the database.
   */
198
  if (msg->isPublish()) {
199
      const char *topic = msg->getTopic().c_str();
200
      // We check whether the topic includes the /DCDB_MAP/ keyword, indicating that the payload will contain the
201
202
      // sensor's name. In that case, we set the mappingMessage flag to true, and filter the keyword out of the prefix
      // We use strncmp as it is the most efficient way to do it
Alessio Netti's avatar
Alessio Netti committed
203
      if (strncmp(topic, DCDB_MAP, DCDB_MAP_LEN) == 0) {
204
          if ((len = msg->getPayloadLength()) == 0) {
205
              LOG(error) << "Empty sensor publish message received!";
206
207
              return 1;
          }
208

209
          string payload((char *) msg->getPayload(), len);
210
211
212
213
214
215
216
217
218
          //If the topic includes the extended /DCDB_MAP/METADATA/ keyword, we assume a JSON metadata packet is encoded
          if(strncmp(topic, DCDB_MET, DCDB_MET_LEN) == 0) {
              SensorMetadata sm;
              try {
                  sm.parseJSON(payload);
              } catch (const std::exception &e) {
                  LOG(error) << "Invalid metadata packed received!";
                  return 1;
              }
219
              DCDB::PublicSensor ps = Configuration::metadataToPublicSensor(sm);
Alessio Netti's avatar
Alessio Netti committed
220
              err = mySensorConfig->publishSensor(ps);
221
222
          } else {
              err = mySensorConfig->publishSensor(payload.c_str(), topic + DCDB_MAP_LEN);
223
          }
224
225
226
227

          // PublishSensor does most of the error checking for us
          switch (err) {
              case DCDB::SC_INVALIDPATTERN:
Alessio Netti's avatar
Logging    
Alessio Netti committed
228
                  LOG(error) << "Invalid sensor topic : " << msg->getTopic();
229
230
                  return 1;
              case DCDB::SC_INVALIDPUBLICNAME:
231
                  LOG(error) << "Invalid sensor public name.";
232
233
                  return 1;
              case DCDB::SC_INVALIDSESSION:
Alessio Netti's avatar
Logging    
Alessio Netti committed
234
                  LOG(error) << "Cannot reach sensor data store.";
235
236
237
238
239
240
241
242
243
244
245
                  return 1;
              default:
                  break;
          }
      } else {
          mqttPayload buf, *payload;

          len = msg->getPayloadLength();
          //In the 64 bit message case, the collect agent provides a timestamp
          if (len == sizeof(uint64_t)) {
              payload = &buf;
246
              payload->value = *((int64_t *) msg->getPayload());
247
248
              payload->timestamp = Messaging::calculateTimestamp();
              len = sizeof(uint64_t) * 2;
249
          }
250
251
252
253
254
255
              //...otherwise it just retrieves it from the MQTT message payload.
          else if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
              payload = (mqttPayload *) msg->getPayload();
          }
              //...otherwise this message is malformed -> ignore...
          else {
Alessio Netti's avatar
Logging    
Alessio Netti committed
256
              LOG(error) << "Message malformed";
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
              return 1;
          }

          /*
           * Check if we can decode the message topic
           * into a valid SensorId. If successful, store
           * the record in the database.
           */
          DCDB::SensorId sid;
          if (sid.mqttTopicConvert(msg->getTopic())) {
#if 0
              cout << "Topic decode successful:" << endl
                  << "  Raw:            " << hex << setw(16) << setfill('0') << sid.getRaw()[0] << hex << setw(16) << setfill('0') << sid.getRaw()[1] << endl
                  << "  DeviceLocation: " << hex << setw(16) << setfill('0') << sid.getDeviceLocation() << endl
                  << "  device_id:      " << hex << setw(8) << setfill('0') << sid.getDeviceSensorId().device_id << endl
                  << "  sensor_number:  " << hex << setw(4) << setfill('0') << sid.getDeviceSensorId().sensor_number << endl << dec;

              cout << "Payload ("  << len/sizeof(mqttPayload) << " messages):"<< endl;
              for (uint64_t i=0; i<len/sizeof(mqttPayload); i++) {
                cout << "  " << i << ": ts=" << payload[i].timestamp << " val=" << payload[i].value << endl;
              }
              cout << endl;
279
#endif
280
	      std::list<DCDB::SensorDataStoreReading> readings;
Alessio Netti's avatar
Alessio Netti committed
281
282
283
284
285
286
          for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
              DCDB::SensorDataStoreReading r(sid, payload[i].timestamp, payload[i].value);
              readings.push_back(r);
              mySensorCache.storeSensor(sid, payload[i].timestamp, payload[i].value);
          }
          mySensorCache.getSensorMap()[sid].updateBatchSize(uint64_t(len / sizeof(mqttPayload)));
287
	      mySensorDataStore->insertBatch(readings);
288
	      readingCtr+= readings.size();
289

290
              //mySensorCache.dump();
291
          }
292
#if 1
293
294
295
296
          else {
              cout << "Wrong topic format: " << msg->getTopic() << "\n";
              return 1;
          }
297
#endif
298
      }
299
  }
300
  return 0;
301
302
}

303
304
305



306
307
308
309
/*
 * Print usage information
 */
void usage() {
Alessio Netti's avatar
Alessio Netti committed
310
  Configuration config("", "collectagent.conf");
311
312
313
314
315
  /*
             1         2         3         4         5         6         7         8
   012345678901234567890123456789012345678901234567890123456789012345678901234567890
   */
  cout << "Usage:" << endl;
316
  cout << "  collectagent [-d] [-s] [-x] [-a] [-m<host>] [-c<host>] [-u<username>] [-p<password>] [-t<ttl>] [-v<verbosity>] <path/to/configfiles/>" << endl;
317
318
319
320
  cout << "  collectagent -h" << endl;
  cout << endl;
  
  cout << "Options:" << endl;
Alessio Netti's avatar
Alessio Netti committed
321
322
  cout << "  -m<host>      MQTT listen address     [default: " << config.mqttListenHost << ":" << config.mqttListenPort << "]" << endl;
  cout << "  -c<host>      Cassandra host          [default: " << config.cassandraSettings.host << ":" << config.cassandraSettings.port << "]" << endl;
Michael Ott's avatar
Michael Ott committed
323
324
  cout << "  -u<username>  Cassandra username      [default: none]" << endl;
  cout << "  -p<password>  Cassandra password      [default: none]" << endl;
Alessio Netti's avatar
Alessio Netti committed
325
326
  cout << "  -t<ttl>       Cassandra insert TTL    [default: " << config.cassandraSettings.ttl << "]" << endl;
  cout << "  -v<level>     Set verbosity of output [default: " << config.logLevelCmd << "]" << endl
Alessio Netti's avatar
Logging    
Alessio Netti committed
327
       << "                Can be a number between 5 (all) and 0 (fatal)." << endl;
328
  cout << endl;
Michael Ott's avatar
Michael Ott committed
329
  cout << "  -d            Daemonize" << endl;
330
  cout << "  -s            Print message stats" <<endl;
331
  cout << "  -x            Parse and print the config but do not actually start collectagent" << endl;
332
  cout << "  -a			   Enable sensor auto-publish" << endl;
Michael Ott's avatar
Michael Ott committed
333
  cout << "  -h            This help page" << endl;
334
  cout << endl;
335
336
337
}

int main(int argc, char* const argv[]) {
338
    cout << "CollectAgent " << VERSION << " (libdcdb " << DCDB::Version::getVersion() << ")" << endl << endl;
339
340

  try{
341

342
343
344
345
346
347
348
349
      // Checking if path to config is supplied
      if (argc <= 1) {
          cout << "Please specify a path to the config-directory" << endl << endl;
          usage();
          exit(EXIT_FAILURE);
      }

      // Defining options
350
      const char* opts = "m:r:c:C:u:p:t:v:dDsaxh";
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366

      // Same mechanism as in DCDBPusher - checking if help string is requested before loading config
      char ret;
      while ((ret = getopt(argc, argv, opts)) != -1) {
          switch (ret)
          {
              case 'h':
                  usage();
                  exit(EXIT_FAILURE);
                  break;
              default:
                  //do nothing (other options are read later on)
                  break;
          }
      }

367
368
      initLogging();
      auto cmdSink = setupCmdLogger();
Alessio Netti's avatar
Logging    
Alessio Netti committed
369

Alessio Netti's avatar
Alessio Netti committed
370
371
      Configuration config(argv[argc - 1], "collectagent.conf");
      if( !config.readConfig() ) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
372
          LOG(fatal) << "Failed to read global configuration!";
373
374
375
          exit(EXIT_FAILURE);
      }

Alessio Netti's avatar
Alessio Netti committed
376
377
378
379
      // References to shorten access to config parameters
      Configuration& settings = config;
      cassandra_t& cassandraSettings = config.cassandraSettings;
      pluginSettings_t& pluginSettings = config.pluginSettings;
Micha Mueller's avatar
Micha Mueller committed
380
      serverSettings_t& restAPISettings = config.restAPISettings;
Alessio Netti's avatar
Alessio Netti committed
381
      analyticsSettings_t& analyticsSettings = config.analyticsSettings;
Alessio Netti's avatar
Alessio Netti committed
382
      
383
384
      optind = 1;
      while ((ret=getopt(argc, argv, opts))!=-1) {
385
          switch(ret) {
386
              case 'a':
387
                  pluginSettings.autoPublish = true;
388
                  break;
389
              case 'm':
Alessio Netti's avatar
Alessio Netti committed
390
391
392
                  settings.mqttListenHost = parseNetworkHost(optarg);
                  settings.mqttListenPort = parseNetworkPort(optarg);
                  if(settings.mqttListenPort=="") settings.mqttListenPort = string(LISTENPORT);
393
                  break;
394
              case 'c':
Alessio Netti's avatar
Alessio Netti committed
395
396
397
                  cassandraSettings.host = parseNetworkHost(optarg);
                  cassandraSettings.port = parseNetworkPort(optarg);
                  if(cassandraSettings.port=="") cassandraSettings.port = string(CASSANDRAPORT);
398
                  break;
Michael Ott's avatar
Michael Ott committed
399
              case 'u':
Alessio Netti's avatar
Alessio Netti committed
400
                  cassandraSettings.username = optarg;
401
                  break;
Michael Ott's avatar
Michael Ott committed
402
              case 'p': {
Alessio Netti's avatar
Alessio Netti committed
403
                  cassandraSettings.password = optarg;
404
                  // What does this do? Mask the password?
Michael Ott's avatar
Michael Ott committed
405
406
407
408
409
410
411
                  size_t pwdLen = strlen(optarg);
                  memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
                  if (pwdLen > 3) {
                      memset(optarg+3, 0, pwdLen-3);
                  }
                  break;
              }
412
              case 't':
Alessio Netti's avatar
Alessio Netti committed
413
                  cassandraSettings.ttl = stoul(optarg);
414
                  break;
Alessio Netti's avatar
Logging    
Alessio Netti committed
415
              case 'v':
416
                  settings.logLevelCmd = stoi(optarg);
Alessio Netti's avatar
Logging    
Alessio Netti committed
417
                  break;
418
              case 'd':
419
              case 'D':
420
                  settings.daemonize = 1;
421
                  break;
422
              case 's':
423
                  settings.statistics = 1;
424
                  break;
425
              case 'x':
Alessio Netti's avatar
Alessio Netti committed
426
                  settings.validateConfig = true;
427
                  break;
428
              case 'h':
429
430
431
              default:
                  usage();
                  exit(EXIT_FAILURE);
432
433
434
          }
      }

435
436
      //set up logger to file
      if (settings.logLevelFile >= 0) {
Alessio Netti's avatar
Alessio Netti committed
437
	  auto fileSink = setupFileLogger(pluginSettings.tempdir, std::string("collectagent"));
438
439
440
	  fileSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelFile));
      }
      
Alessio Netti's avatar
Logging    
Alessio Netti committed
441
      //severity level may be overwritten (per option or config-file) --> set it according to globalSettings
442
443
444
      if (settings.logLevelCmd >= 0) {
	  cmdSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelCmd));
      }
Alessio Netti's avatar
Logging    
Alessio Netti committed
445

446
      /*
Alessio Netti's avatar
Alessio Netti committed
447
       * Catch SIGINT and SIGTERM signals to allow for proper server shutdowns.
448
449
       */
      signal(SIGINT, sigHandler);
Alessio Netti's avatar
Alessio Netti committed
450
      signal(SIGTERM, sigHandler);
451
452
453
454
455
456
457
458
459
460

      /*
       * Catch critical signals to allow for backtraces
       */
      signal(SIGABRT, abrtHandler);
      signal(SIGSEGV, abrtHandler);

      // Daemonizing the collectagent
      if(settings.daemonize)
          dcdbdaemon();
Alessio Netti's avatar
Alessio Netti committed
461
      
462
      // Setting the size of the sensor cache
463
      // Conversion from milliseconds to nanoseconds
Alessio Netti's avatar
Alessio Netti committed
464
      mySensorCache.setMaxHistory(uint64_t(pluginSettings.cacheInterval) * 1000000);
465

466
      //Allocate and initialize connection to Cassandra.
Alessio Netti's avatar
Alessio Netti committed
467
468
      dcdbConn = new DCDB::Connection(cassandraSettings.host, atoi(cassandraSettings.port.c_str()), 
                                      cassandraSettings.username, cassandraSettings.password);
Alessio Netti's avatar
Alessio Netti committed
469
470
      dcdbConn->setNumThreadsIo(cassandraSettings.numThreadsIo);
      dcdbConn->setQueueSizeIo(cassandraSettings.queueSizeIo);
471
      uint32_t params[1] = {cassandraSettings.coreConnPerHost};
Alessio Netti's avatar
Alessio Netti committed
472
      dcdbConn->setBackendParams(params);
Alessio Netti's avatar
Alessio Netti committed
473
      
Axel Auweter's avatar
Axel Auweter committed
474
      if (!dcdbConn->connect()) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
475
          LOG(fatal) << "Cannot connect to Cassandra!";
476
477
478
479
480
481
          exit(EXIT_FAILURE);
      }

      /*
       * Legacy behavior: Initialize the DCDB schema in Cassandra.
       */
Axel Auweter's avatar
Axel Auweter committed
482
      dcdbConn->initSchema();
Alessio Netti's avatar
Alessio Netti committed
483
      
484
485
486
      /*
       * Allocate the SensorDataStore.
       */
487
      mySensorDataStore = new DCDB::SensorDataStore(dcdbConn);
488
      mySensorConfig = new DCDB::SensorConfig(dcdbConn);
Alessio Netti's avatar
Alessio Netti committed
489
      myJobDataStore = new DCDB::JobDataStore(dcdbConn);
490
491
492

      /*
       * Set TTL for data store inserts if TTL > 0.
493
       */
Alessio Netti's avatar
Alessio Netti committed
494
495
496
      if (cassandraSettings.ttl > 0)
        mySensorDataStore->setTTL(cassandraSettings.ttl);
      mySensorDataStore->setDebugLog(cassandraSettings.debugLog);
497

498
499
500
501
502
503
504
505
506
      // Fetching public sensor information from the Cassandra datastore
      list<DCDB::PublicSensor> publicSensors;
      metadataStore = new MetadataStore();
      mySensorConfig->getPublicSensorsVerbose(publicSensors);
      std::string patternBuf;
      for (const auto &s : publicSensors)
          if (!s.is_virtual)
              metadataStore->store(s.pattern, Configuration::publicSensorToMetadata(s));
          
507
508
      analyticsController = new AnalyticsController(mySensorConfig, mySensorDataStore);
      analyticsController->setCache(&mySensorCache);
Alessio Netti's avatar
Alessio Netti committed
509
      queryEngine.setFilter(analyticsSettings.filter);
Alessio Netti's avatar
Alessio Netti committed
510
      queryEngine.setJobFilter(analyticsSettings.jobFilter);
Alessio Netti's avatar
Alessio Netti committed
511
      queryEngine.setSensorHierarchy(analyticsSettings.hierarchy);
512
      queryEngine.setQueryCallback(sensorQueryCallback);
Alessio Netti's avatar
Alessio Netti committed
513
      queryEngine.setJobQueryCallback(jobQueryCallback);
514
      if(!analyticsController->initialize(settings, argv[argc - 1], *metadataStore))
515
516
          return EXIT_FAILURE;
      
Alessio Netti's avatar
Alessio Netti committed
517
      LOG_LEVEL vLogLevel = settings.validateConfig ? LOG_LEVEL::info : LOG_LEVEL::debug;
518
519
520
521
      LOG_VAR(vLogLevel) << "-----  Configuration  -----";

      //print global settings in either case
      LOG(info) << "Global Settings:";
Alessio Netti's avatar
Alessio Netti committed
522
      LOG(info) << "    MQTT-listenAddress: " << settings.mqttListenHost << ":" << settings.mqttListenPort;
Alessio Netti's avatar
Alessio Netti committed
523
      LOG(info) << "    CacheInterval:      " << int(pluginSettings.cacheInterval/1000) << " [s]";
524
525
526
527
528
      LOG(info) << "    CleaningInterval:   " << settings.cleaningInterval << " [s]";
      LOG(info) << "    MessageThreads:     " << settings.messageThreads;
      LOG(info) << "    MessageSlots:       " << settings.messageSlots;
      LOG(info) << "    Daemonize:          " << (settings.daemonize ? "Enabled" : "Disabled");
      LOG(info) << "    Statistics:         " << (settings.statistics ? "Enabled" : "Disabled");
Alessio Netti's avatar
Alessio Netti committed
529
      LOG(info) << "    MQTT-prefix:        " << pluginSettings.mqttPrefix;
530
      LOG(info) << "    Auto-publish:       " << (pluginSettings.autoPublish ? "Enabled" : "Disabled");
Alessio Netti's avatar
Alessio Netti committed
531
532
      LOG(info) << "    Write-Dir:          " << pluginSettings.tempdir;
      LOG(info) << (settings.validateConfig ? "    Only validating config files." : "    ValidateConfig:     Disabled");
533

Alessio Netti's avatar
Alessio Netti committed
534
535
      LOG(info) << "Analytics Settings:";
      LOG(info) << "    Hierarchy:          " << (analyticsSettings.hierarchy!="" ? analyticsSettings.hierarchy : "none");
536
      LOG(info) << "    Filter:             " << (analyticsSettings.filter!="" ? analyticsSettings.filter : "none");
Alessio Netti's avatar
Alessio Netti committed
537
      
538
      LOG(info) << "Cassandra Driver Settings:";
Alessio Netti's avatar
Alessio Netti committed
539
      LOG(info) << "    Address:            " << cassandraSettings.host << ":" << cassandraSettings.port;
Alessio Netti's avatar
Alessio Netti committed
540
541
542
543
544
      LOG(info) << "    TTL:                " << cassandraSettings.ttl;
      LOG(info) << "    NumThreadsIO:       " << cassandraSettings.numThreadsIo;
      LOG(info) << "    QueueSizeIO:        " << cassandraSettings.queueSizeIo;
      LOG(info) << "    CoreConnPerHost:    " << cassandraSettings.coreConnPerHost;
      LOG(info) << "    DebugLog:           " << (cassandraSettings.debugLog ? "Enabled" : "Disabled");
545
#ifdef SimpleMQTTVerbose
Alessio Netti's avatar
Alessio Netti committed
546
547
      LOG(info) << "    Username:           " << cassandraSettings.username;
	  LOG(info) << "    Password:           " << cassandraSettings.password;
548
549
550
551
#else
      LOG(info) << "    Username and password not printed.";
#endif

552
553
554
555
556
      if (restAPISettings.enabled) {
	  
	  LOG(info) << "RestAPI Settings:";
	  LOG(info) << "    REST Server: " << restAPISettings.host << ":" << restAPISettings.port;
	  LOG(info) << "    Certificate: " << restAPISettings.certificate;
Alessio Netti's avatar
Alessio Netti committed
557
558
	  LOG(info) << "    Private key file: " << restAPISettings.privateKey;
	  LOG(info) << "    DH params from: " << restAPISettings.dhFile;
559
      }
560
561
      LOG_VAR(vLogLevel) << "-----  Analytics Configuration  -----";
      for(auto& p : analyticsController->getManager()->getPlugins()) {
562
          LOG_VAR(vLogLevel) << "Operator Plugin \"" << p.id << "\"";
563
564
565
566
          p.configurator->printConfig(vLogLevel);
      }
      LOG_VAR(vLogLevel) << "-----  End Configuration  -----";

Alessio Netti's avatar
Alessio Netti committed
567
      if (settings.validateConfig)
568
569
570
571
          return EXIT_SUCCESS;
      else
          analyticsController->start();

572
573
574
      /*
       * Start the MQTT Message Server.
       */
Alessio Netti's avatar
Alessio Netti committed
575
      SimpleMQTTServer ms(settings.mqttListenHost, settings.mqttListenPort, settings.messageThreads, settings.messageSlots);
576
      
577
578
579
      ms.setMessageCallback(mqttCallback);
      ms.start();

Alessio Netti's avatar
Logging    
Alessio Netti committed
580
      LOG(info) << "MQTT Server running...";
581
      
582
583
584
      /*
       * Start the HTTP Server for the REST API
       */
585
      CARestAPI* httpsServer = nullptr;
586
587
588
589
590
591
      if (restAPISettings.enabled) {
	  httpsServer = new CARestAPI(restAPISettings, &mySensorCache, analyticsController);
	  config.readRestAPIUsers(httpsServer);
	  httpsServer->start();
	  LOG(info) <<  "HTTP Server running...";
      }
592

593
594
595
596
597
598
      /*
       * Run (hopefully) forever...
       */
      keepRunning = 1;
      timeval start, end;
      double elapsed;
599
600
601
      msgCtr = 0;
      pmsgCtr = 0;
      readingCtr = 0;
602

Alessio Netti's avatar
Alessio Netti committed
603
604
605
      gettimeofday(&start, NULL);
      uint64_t lastCleanup = start.tv_sec;

606
      LOG(info) << "Collect Agent running...";
607
608
      while(keepRunning) {
          gettimeofday(&start, NULL);
Alessio Netti's avatar
Alessio Netti committed
609
610
611
612
613
614
615
          if(start.tv_sec - lastCleanup > settings.cleaningInterval) {
              uint64_t purged = mySensorCache.clean(settings.cleaningInterval * 1000000000);
              lastCleanup = start.tv_sec;
              if(purged > 0)
                  LOG(info) << "Cache: purged " << purged << " obsolete entries";
          }

616
          sleep(60);
617
618
619
620
          /* not really thread safe but will do the job */
          gettimeofday(&end, NULL);
          elapsed = (end.tv_sec - start.tv_sec) * 1000.0;
          elapsed += (end.tv_usec - start.tv_usec) / 1000.0;
621
          float publish = msgCtr?(pmsgCtr*100.0)/msgCtr:0;
Alessio Netti's avatar
Alessio Netti committed
622
          if (settings.statistics && keepRunning) {
623
              LOG(info) << "Performance: " << (readingCtr/elapsed)*1000.0 << " inserts/s, " << (msgCtr/elapsed)*1000.0 << " messages/s (" << publish << "% PUBLISH)";
624
              LOG(info) << "Analytics Performance: " << (analyticsController->getReadingCtr()/elapsed)*1000.0 << " inserts/s ";
625
          }
626
          msgCtr = 0;
627
          pmsgCtr = 0;
628
	  readingCtr = 0;
629
630
      }

Alessio Netti's avatar
Logging    
Alessio Netti committed
631
      LOG(info) << "Stopping...";
632
      analyticsController->stop();
633
      ms.stop();
Alessio Netti's avatar
Logging    
Alessio Netti committed
634
      LOG(info) << "MQTT Server stopped...";
635
636
637
638
639
      if (restAPISettings.enabled) {
	  httpsServer->stop();
	  delete httpsServer;
	  LOG(info) << "HTTP Server stopped...";
      }
640
      delete mySensorDataStore;
Alessio Netti's avatar
Alessio Netti committed
641
      delete myJobDataStore;
642
      delete mySensorConfig;
Axel Auweter's avatar
Axel Auweter committed
643
644
      dcdbConn->disconnect();
      delete dcdbConn;
645
      delete metadataStore;
Alessio Netti's avatar
Logging    
Alessio Netti committed
646
      LOG(info) << "Collect Agent closed. Bye bye...";
647
  }
648
  catch (const exception& e) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
649
      LOG(fatal) << "Exception: " << e.what();
650
      abrt(EXIT_FAILURE, INTERR);
651
652
  }

653
  return EXIT_SUCCESS;
654
}
655
656