Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

collectagent.cpp 30 KB
Newer Older
1
2
//================================================================================
// Name        : collectagent.cpp
3
// Author      : Axel Auweter
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
// Copyright   : Leibniz Supercomputing Centre
6
// Description : Main code of the CollectAgent
7
8
9
10
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
11
// Copyright (C) 2011-2019 Leibniz Supercomputing Centre
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
27

28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
 * @defgroup ca Collect Agent
 *
 * @brief MQTT message broker in between pusher and storage backend.
 *
 * @details Collect Agent is a intermediary between one or multiple Pusher
 *          instances and one storage backend. It runs a reduced custom MQTT
 *          message server. Collect Agent receives data from Pusher
 *          via MQTT messages and stores them in the storage via libdcdb.
 */

/**
 * @file collectagent.cpp
 *
 * @brief Main function for the DCDB Collect Agent.
 *
 * @ingroup ca
 */

47
#include <cstdlib>
48
#include <signal.h>
49
50
#include <unistd.h>
#include <string>
51

52
#include <boost/date_time/posix_time/posix_time.hpp>
53

54
#include <dcdb/connection.h>
55
#include <dcdb/sensordatastore.h>
Alessio Netti's avatar
Alessio Netti committed
56
#include <dcdb/jobdatastore.h>
57
#include <dcdb/calievtdatastore.h>
58
#include <dcdb/sensorconfig.h>
59
#include <dcdb/version.h>
60
#include <dcdb/sensor.h>
61
#include "version.h"
62

63
#include "CARestAPI.h"
64
#include "configuration.h"
65
#include "simplemqttserver.h"
66
#include "messaging.h"
67
#include "abrt.h"
68
#include "dcdbdaemon.h"
69
#include "sensorcache.h"
70
71
#include "analyticscontroller.h"
#include "../analytics/includes/QueryEngine.h"
72

73
74
75
#define __STDC_FORMAT_MACROS
#include <inttypes.h>

76
using namespace std;
77
78

int keepRunning;
79
bool statistics;
80
81
uint64_t msgCtr;
uint64_t pmsgCtr;
82
uint64_t readingCtr;
Alessio Netti's avatar
Alessio Netti committed
83
SensorCache mySensorCache;
84
AnalyticsController* analyticsController;
85
DCDB::Connection* dcdbConn;
86
DCDB::SensorDataStore *mySensorDataStore;
Alessio Netti's avatar
Alessio Netti committed
87
DCDB::JobDataStore *myJobDataStore;
88
DCDB::SensorConfig *mySensorConfig;
89
DCDB::CaliEvtDataStore *myCaliEvtDataStore;
90
MetadataStore *metadataStore;
91
DCDB::SCError err;
92
QueryEngine& queryEngine = QueryEngine::getInstance();
93
logger_t lg;
94

95
bool jobQueryCallback(const uint32_t jobId, const uint64_t startTs, const uint64_t endTs, vector<qeJobData>& buffer, const bool rel, const bool range) {
Alessio Netti's avatar
Alessio Netti committed
96
97
98
99
100
101
102
103
104
105
    std::list<JobData> tempList;
    JobData   tempData;
    qeJobData tempQeData;
    JDError err;
    if(range) {
        // Getting a list of jobs in the given time range
        uint64_t now = getTimestamp();
        uint64_t startTsInt = rel ? now - startTs : startTs;
        uint64_t endTsInt = rel ? now - endTs : endTs;
        DCDB::TimeStamp start(startTsInt), end(endTsInt);
106
        err = myJobDataStore->getJobsInIntervalRunning(tempList, start, end);
107
        if(err != JD_OK) return false;
Alessio Netti's avatar
Alessio Netti committed
108
109
110
    } else {
        // Getting a single job by id
        err = myJobDataStore->getJobById(tempData, jobId);
111
        if(err != JD_OK) return false;
Alessio Netti's avatar
Alessio Netti committed
112
113
114
115
116
117
118
119
120
        tempList.push_back(tempData);
    }
    
    for(auto& jd : tempList) {
        tempQeData.jobId = jd.jobId;
        tempQeData.userId = jd.userId;
        tempQeData.startTime = jd.startTime.getRaw();
        tempQeData.endTime = jd.endTime.getRaw();
        tempQeData.nodes = jd.nodes;
121
        buffer.push_back(tempQeData);
Alessio Netti's avatar
Alessio Netti committed
122
    }
123
    return true;
Alessio Netti's avatar
Alessio Netti committed
124
125
}

126
bool sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>& buffer, const bool rel) {
127
128
129
    // Returning NULL if the query engine is being updated
    if(queryEngine.updating.load()) return false;
    ++queryEngine.access;
130
    std::string topic=name;
Alessio Netti's avatar
Alessio Netti committed
131
    // Getting the topic of the queried sensor from the Navigator
132
    // If not found, we try to use the input name as topic
133
134
    try {
        topic = queryEngine.getNavigator()->getNodeTopic(name);
135
    } catch(const std::domain_error& e) {}
136
    DCDB::SensorId sid;
Alessio Netti's avatar
Alessio Netti committed
137
    // Creating a SID to perform the query
138
139
    if(!sid.mqttTopicConvert(topic)) {
        --queryEngine.access;
140
        return false;
141
    }
142
143
    if(mySensorCache.getSensorMap().count(sid) > 0) {
        CacheEntry &entry = mySensorCache.getSensorMap()[sid];
144
145
        if (entry.getView(startTs, endTs, buffer, rel)) {
            --queryEngine.access;
146
            return true;
147
        }
148
149
150
151
152
153
154
155
156
157
158
159
160
    }
    // If we are here then the sensor was not found in the cache - we need to fetch data from Cassandra
    try {
        DCDB::PublicSensor publicSensor;
        publicSensor.name = name;
        publicSensor.pattern = topic;
        std::list <DCDB::SensorDataStoreReading> results;
        DCDB::Sensor sensor(dcdbConn, publicSensor);
        uint64_t now = getTimestamp();
        //Converting relative timestamps to absolute
        uint64_t startTsInt = rel ? now - startTs : startTs;
        uint64_t endTsInt = rel ? now - endTs : endTs;
        DCDB::TimeStamp start(startTsInt), end(endTsInt);
Alessio Netti's avatar
Alessio Netti committed
161
        sensor.query(results, start, end, DCDB::AGGREGATE_NONE, 3600000000000);
162
163
        if(results.empty()) {
            --queryEngine.access;
164
            return false;
165
        }
166
167
168
169
        reading_t reading;
        for (const auto &r : results) {
            reading.value = r.value;
            reading.timestamp = r.timeStamp.getRaw();
170
            buffer.push_back(reading);
171
172
173
        }
    }
    catch(const std::exception& e) {
174
        --queryEngine.access;
175
        return false;
176
    }
177
    --queryEngine.access;
178
    return true;
179
180
}

181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
bool metadataQueryCallback(const string& name, SensorMetadata& buffer) {
    // Returning NULL if the query engine is being updated
    if(queryEngine.updating.load()) return false;
    ++queryEngine.access;
    std::string topic=name;
    // Getting the topic of the queried sensor from the Navigator
    // If not found, we try to use the input name as topic
    try {
        topic = queryEngine.getNavigator()->getNodeTopic(name);
    } catch(const std::domain_error& e) {}
    
    if(metadataStore->getMap().count(topic)) {
        buffer = metadataStore->get(topic);
    } else {
        // If we are here then the sensor was not found in the cache - we need to fetch data from Cassandra
        try {
            DCDB::PublicSensor publicSensor;
            if (mySensorConfig->getPublicSensorByName(publicSensor, topic.c_str()) != SC_OK) {
                --queryEngine.access;
                return false;
            }
            buffer = Configuration::publicSensorToMetadata(publicSensor);
        }
        catch (const std::exception &e) {
            --queryEngine.access;
            return false;
        }
    }
    --queryEngine.access;
    return true;
}

213
/* Normal termination (SIGINT, CTRL+C) */
214
215
void sigHandler(int sig)
{
Alessio Netti's avatar
Alessio Netti committed
216
217
218
219
220
  boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
  if( sig == SIGINT )
      LOG(fatal) << "Received SIGINT";
  else if( sig == SIGTERM )
      LOG(fatal) << "Received SIGTERM";
221
222
223
  keepRunning = 0;
}

224
225
226
227
228
229
/* Crash */
void abrtHandler(int sig)
{
  abrt(EXIT_FAILURE, SIGNAL);
}

230
int mqttCallback(SimpleMQTTMessage *msg)
231
232
233
234
{
  /*
   * Increment the msgCtr/vmsgCtr for statistics.
   */
235
  msgCtr++;
236
237
238
  if (msg->isPublish())
    pmsgCtr++;

239
  uint64_t len;
240
241
242
  /*
   * Decode the message and put into the database.
   */
243
  if (msg->isPublish()) {
244
      const char *topic = msg->getTopic().c_str();
245
      // We check whether the topic includes the /DCDB_MAP/ keyword, indicating that the payload will contain the
246
247
      // sensor's name. In that case, we set the mappingMessage flag to true, and filter the keyword out of the prefix
      // We use strncmp as it is the most efficient way to do it
Alessio Netti's avatar
Alessio Netti committed
248
      if (strncmp(topic, DCDB_MAP, DCDB_MAP_LEN) == 0) {
249
          if ((len = msg->getPayloadLength()) == 0) {
250
              LOG(error) << "Empty sensor publish message received!";
251
252
              return 1;
          }
253

254
          string payload((char *) msg->getPayload(), len);
255
256
257
258
259
260
261
262
263
          //If the topic includes the extended /DCDB_MAP/METADATA/ keyword, we assume a JSON metadata packet is encoded
          if(strncmp(topic, DCDB_MET, DCDB_MET_LEN) == 0) {
              SensorMetadata sm;
              try {
                  sm.parseJSON(payload);
              } catch (const std::exception &e) {
                  LOG(error) << "Invalid metadata packed received!";
                  return 1;
              }
264
              DCDB::PublicSensor ps = Configuration::metadataToPublicSensor(sm);
Alessio Netti's avatar
Alessio Netti committed
265
              err = mySensorConfig->publishSensor(ps);
266
              metadataStore->store(sm.pattern, sm);
267
268
          } else {
              err = mySensorConfig->publishSensor(payload.c_str(), topic + DCDB_MAP_LEN);
269
          }
270
271
272
273

          // PublishSensor does most of the error checking for us
          switch (err) {
              case DCDB::SC_INVALIDPATTERN:
Alessio Netti's avatar
Logging    
Alessio Netti committed
274
                  LOG(error) << "Invalid sensor topic : " << msg->getTopic();
275
276
                  return 1;
              case DCDB::SC_INVALIDPUBLICNAME:
277
                  LOG(error) << "Invalid sensor public name.";
278
279
                  return 1;
              case DCDB::SC_INVALIDSESSION:
Alessio Netti's avatar
Logging    
Alessio Netti committed
280
                  LOG(error) << "Cannot reach sensor data store.";
281
282
283
284
                  return 1;
              default:
                  break;
          }
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
      } else if (strncmp(topic, DCDB_CALIEVT, DCDB_CALIEVT_LEN) == 0) {
          /*
           * Special message case. This message contains a Caliper Event data
           * string that is encoded in the MQTT topic. Its payload consists of
           * usual timestamp-value pairs.
           * Data from this messages will be stored in a separate table managed
           * by CaliEvtDataStore class.
           */
          std::string topicStr(msg->getTopic());
          mqttPayload buf, *payload;

          len = msg->getPayloadLength();
          //TODO support case that no timestamp is given?
          //retrieve timestamp and value from the payload
          if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
              payload = (mqttPayload *) msg->getPayload();
          } else {
              //this message is malformed -> ignore
              LOG(error) << "Message malformed";
              return 1;
          }

          /*
           * Decode message topic in actual sensor topic and string data.
           * "/:/" is used as delimiter between topic and data.
           */
          topicStr.erase(0, DCDB_CALIEVT_LEN);
          size_t pos = topicStr.find("/:/");
          if (pos == std::string::npos) {
              // topic is malformed -> ignore
              LOG(error) << "CaliEvt topic malformed";
              return 1;
          }
          const std::string data(topicStr, pos+3);
          topicStr.erase(pos);

          /*
           * We use the same MQTT-topic/SensorId infrastructure as actual sensor
           * data readings to sort related events.
           * Check if we can decode the event topic into a valid SensorId. If
           * successful, store the record in the database.
           */
          DCDB::SensorId sid;
          if (sid.mqttTopicConvert(topicStr)) {
              std::list<DCDB::CaliEvtData> events;
              DCDB::CaliEvtData e;
              e.eventId   = sid;
              e.event     = data;
              for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
                  e.timeStamp = payload[i].timestamp;

                  /**
                   * We want an exhaustive list of all events ordered by their
                   * time of occurrence. Payload values should always be
                   * one. Other values currently indicate a malformed message.
                   *
                   * In the future, the value field could be used to aggregate
                   * multiple equal events that occurred in the same plugin
                   * read cycle.
                   */
                  if(payload[i].value != 1) {
                      LOG(error) << "CaliEvt message malformed. Value != 1";
                      return 1;
                  }

                  events.push_back(e);
              }
              myCaliEvtDataStore->insertBatch(events, metadataStore->getTTL(topicStr));
          } else {
              LOG(error) << "Topic could not be converted to SID";
          }
356
357
358
359
360
361
362
      } else {
          mqttPayload buf, *payload;

          len = msg->getPayloadLength();
          //In the 64 bit message case, the collect agent provides a timestamp
          if (len == sizeof(uint64_t)) {
              payload = &buf;
363
              payload->value = *((int64_t *) msg->getPayload());
364
365
              payload->timestamp = Messaging::calculateTimestamp();
              len = sizeof(uint64_t) * 2;
366
          }
367
368
369
370
371
372
              //...otherwise it just retrieves it from the MQTT message payload.
          else if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
              payload = (mqttPayload *) msg->getPayload();
          }
              //...otherwise this message is malformed -> ignore...
          else {
Alessio Netti's avatar
Logging    
Alessio Netti committed
373
              LOG(error) << "Message malformed";
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
              return 1;
          }

          /*
           * Check if we can decode the message topic
           * into a valid SensorId. If successful, store
           * the record in the database.
           */
          DCDB::SensorId sid;
          if (sid.mqttTopicConvert(msg->getTopic())) {
#if 0
              cout << "Topic decode successful:" << endl
                  << "  Raw:            " << hex << setw(16) << setfill('0') << sid.getRaw()[0] << hex << setw(16) << setfill('0') << sid.getRaw()[1] << endl
                  << "  DeviceLocation: " << hex << setw(16) << setfill('0') << sid.getDeviceLocation() << endl
                  << "  device_id:      " << hex << setw(8) << setfill('0') << sid.getDeviceSensorId().device_id << endl
                  << "  sensor_number:  " << hex << setw(4) << setfill('0') << sid.getDeviceSensorId().sensor_number << endl << dec;

              cout << "Payload ("  << len/sizeof(mqttPayload) << " messages):"<< endl;
              for (uint64_t i=0; i<len/sizeof(mqttPayload); i++) {
                cout << "  " << i << ": ts=" << payload[i].timestamp << " val=" << payload[i].value << endl;
              }
              cout << endl;
396
#endif
397
	      std::list<DCDB::SensorDataStoreReading> readings;
Alessio Netti's avatar
Alessio Netti committed
398
399
400
401
402
403
          for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
              DCDB::SensorDataStoreReading r(sid, payload[i].timestamp, payload[i].value);
              readings.push_back(r);
              mySensorCache.storeSensor(sid, payload[i].timestamp, payload[i].value);
          }
          mySensorCache.getSensorMap()[sid].updateBatchSize(uint64_t(len / sizeof(mqttPayload)));
404
	      mySensorDataStore->insertBatch(readings, metadataStore->getTTL(msg->getTopic()));
405
	      readingCtr+= readings.size();
406

407
              //mySensorCache.dump();
408
          }
409
#if 1
410
411
412
413
          else {
              cout << "Wrong topic format: " << msg->getTopic() << "\n";
              return 1;
          }
414
#endif
415
      }
416
  }
417
  return 0;
418
419
}

420
421
422



423
424
425
426
/*
 * Print usage information
 */
void usage() {
Alessio Netti's avatar
Alessio Netti committed
427
  Configuration config("", "collectagent.conf");
428
429
430
431
432
  /*
             1         2         3         4         5         6         7         8
   012345678901234567890123456789012345678901234567890123456789012345678901234567890
   */
  cout << "Usage:" << endl;
433
  cout << "  collectagent [-d] [-s] [-x] [-a] [-m<host>] [-c<host>] [-u<username>] [-p<password>] [-t<ttl>] [-v<verbosity>] <path/to/configfiles/>" << endl;
434
435
436
437
  cout << "  collectagent -h" << endl;
  cout << endl;
  
  cout << "Options:" << endl;
Alessio Netti's avatar
Alessio Netti committed
438
439
  cout << "  -m<host>      MQTT listen address     [default: " << config.mqttListenHost << ":" << config.mqttListenPort << "]" << endl;
  cout << "  -c<host>      Cassandra host          [default: " << config.cassandraSettings.host << ":" << config.cassandraSettings.port << "]" << endl;
Michael Ott's avatar
Michael Ott committed
440
441
  cout << "  -u<username>  Cassandra username      [default: none]" << endl;
  cout << "  -p<password>  Cassandra password      [default: none]" << endl;
Alessio Netti's avatar
Alessio Netti committed
442
443
  cout << "  -t<ttl>       Cassandra insert TTL    [default: " << config.cassandraSettings.ttl << "]" << endl;
  cout << "  -v<level>     Set verbosity of output [default: " << config.logLevelCmd << "]" << endl
Alessio Netti's avatar
Logging    
Alessio Netti committed
444
       << "                Can be a number between 5 (all) and 0 (fatal)." << endl;
445
  cout << endl;
Michael Ott's avatar
Michael Ott committed
446
  cout << "  -d            Daemonize" << endl;
447
  cout << "  -s            Print message stats" <<endl;
448
  cout << "  -x            Parse and print the config but do not actually start collectagent" << endl;
449
  cout << "  -a			   Enable sensor auto-publish" << endl;
Michael Ott's avatar
Michael Ott committed
450
  cout << "  -h            This help page" << endl;
451
  cout << endl;
452
453
454
}

int main(int argc, char* const argv[]) {
455
    cout << "CollectAgent " << VERSION << " (libdcdb " << DCDB::Version::getVersion() << ")" << endl << endl;
456
457

  try{
458

459
460
461
462
463
464
465
466
      // Checking if path to config is supplied
      if (argc <= 1) {
          cout << "Please specify a path to the config-directory" << endl << endl;
          usage();
          exit(EXIT_FAILURE);
      }

      // Defining options
467
      const char* opts = "m:r:c:C:u:p:t:v:dDsaxh";
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483

      // Same mechanism as in DCDBPusher - checking if help string is requested before loading config
      char ret;
      while ((ret = getopt(argc, argv, opts)) != -1) {
          switch (ret)
          {
              case 'h':
                  usage();
                  exit(EXIT_FAILURE);
                  break;
              default:
                  //do nothing (other options are read later on)
                  break;
          }
      }

484
485
      initLogging();
      auto cmdSink = setupCmdLogger();
Alessio Netti's avatar
Logging    
Alessio Netti committed
486

Alessio Netti's avatar
Alessio Netti committed
487
488
      Configuration config(argv[argc - 1], "collectagent.conf");
      if( !config.readConfig() ) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
489
          LOG(fatal) << "Failed to read global configuration!";
490
491
492
          exit(EXIT_FAILURE);
      }

Alessio Netti's avatar
Alessio Netti committed
493
494
495
496
      // References to shorten access to config parameters
      Configuration& settings = config;
      cassandra_t& cassandraSettings = config.cassandraSettings;
      pluginSettings_t& pluginSettings = config.pluginSettings;
Micha Mueller's avatar
Micha Mueller committed
497
      serverSettings_t& restAPISettings = config.restAPISettings;
Alessio Netti's avatar
Alessio Netti committed
498
      analyticsSettings_t& analyticsSettings = config.analyticsSettings;
Alessio Netti's avatar
Alessio Netti committed
499
      
500
501
      optind = 1;
      while ((ret=getopt(argc, argv, opts))!=-1) {
502
          switch(ret) {
503
              case 'a':
504
                  pluginSettings.autoPublish = true;
505
                  break;
506
              case 'm':
Alessio Netti's avatar
Alessio Netti committed
507
508
509
                  settings.mqttListenHost = parseNetworkHost(optarg);
                  settings.mqttListenPort = parseNetworkPort(optarg);
                  if(settings.mqttListenPort=="") settings.mqttListenPort = string(LISTENPORT);
510
                  break;
511
              case 'c':
Alessio Netti's avatar
Alessio Netti committed
512
513
514
                  cassandraSettings.host = parseNetworkHost(optarg);
                  cassandraSettings.port = parseNetworkPort(optarg);
                  if(cassandraSettings.port=="") cassandraSettings.port = string(CASSANDRAPORT);
515
                  break;
Michael Ott's avatar
Michael Ott committed
516
              case 'u':
Alessio Netti's avatar
Alessio Netti committed
517
                  cassandraSettings.username = optarg;
518
                  break;
Michael Ott's avatar
Michael Ott committed
519
              case 'p': {
Alessio Netti's avatar
Alessio Netti committed
520
                  cassandraSettings.password = optarg;
521
                  // What does this do? Mask the password?
Michael Ott's avatar
Michael Ott committed
522
523
524
525
526
527
528
                  size_t pwdLen = strlen(optarg);
                  memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
                  if (pwdLen > 3) {
                      memset(optarg+3, 0, pwdLen-3);
                  }
                  break;
              }
529
              case 't':
Alessio Netti's avatar
Alessio Netti committed
530
                  cassandraSettings.ttl = stoul(optarg);
531
                  break;
Alessio Netti's avatar
Logging    
Alessio Netti committed
532
              case 'v':
533
                  settings.logLevelCmd = stoi(optarg);
Alessio Netti's avatar
Logging    
Alessio Netti committed
534
                  break;
535
              case 'd':
536
              case 'D':
537
                  settings.daemonize = 1;
538
                  break;
539
              case 's':
540
                  settings.statistics = 1;
541
                  break;
542
              case 'x':
Alessio Netti's avatar
Alessio Netti committed
543
                  settings.validateConfig = true;
544
                  break;
545
              case 'h':
546
547
548
              default:
                  usage();
                  exit(EXIT_FAILURE);
549
550
551
          }
      }

552
553
      //set up logger to file
      if (settings.logLevelFile >= 0) {
Alessio Netti's avatar
Alessio Netti committed
554
	  auto fileSink = setupFileLogger(pluginSettings.tempdir, std::string("collectagent"));
555
556
557
	  fileSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelFile));
      }
      
Alessio Netti's avatar
Logging    
Alessio Netti committed
558
      //severity level may be overwritten (per option or config-file) --> set it according to globalSettings
559
560
561
      if (settings.logLevelCmd >= 0) {
	  cmdSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelCmd));
      }
Alessio Netti's avatar
Logging    
Alessio Netti committed
562

563
      /*
Alessio Netti's avatar
Alessio Netti committed
564
       * Catch SIGINT and SIGTERM signals to allow for proper server shutdowns.
565
566
       */
      signal(SIGINT, sigHandler);
Alessio Netti's avatar
Alessio Netti committed
567
      signal(SIGTERM, sigHandler);
568
569
570
571
572
573
574
575
576
577

      /*
       * Catch critical signals to allow for backtraces
       */
      signal(SIGABRT, abrtHandler);
      signal(SIGSEGV, abrtHandler);

      // Daemonizing the collectagent
      if(settings.daemonize)
          dcdbdaemon();
Alessio Netti's avatar
Alessio Netti committed
578
      
579
      // Setting the size of the sensor cache
580
      // Conversion from milliseconds to nanoseconds
Alessio Netti's avatar
Alessio Netti committed
581
      mySensorCache.setMaxHistory(uint64_t(pluginSettings.cacheInterval) * 1000000);
582

583
      //Allocate and initialize connection to Cassandra.
Alessio Netti's avatar
Alessio Netti committed
584
585
      dcdbConn = new DCDB::Connection(cassandraSettings.host, atoi(cassandraSettings.port.c_str()), 
                                      cassandraSettings.username, cassandraSettings.password);
Alessio Netti's avatar
Alessio Netti committed
586
587
      dcdbConn->setNumThreadsIo(cassandraSettings.numThreadsIo);
      dcdbConn->setQueueSizeIo(cassandraSettings.queueSizeIo);
588
      uint32_t params[1] = {cassandraSettings.coreConnPerHost};
Alessio Netti's avatar
Alessio Netti committed
589
      dcdbConn->setBackendParams(params);
Alessio Netti's avatar
Alessio Netti committed
590
      
Axel Auweter's avatar
Axel Auweter committed
591
      if (!dcdbConn->connect()) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
592
          LOG(fatal) << "Cannot connect to Cassandra!";
593
594
595
596
597
598
          exit(EXIT_FAILURE);
      }

      /*
       * Legacy behavior: Initialize the DCDB schema in Cassandra.
       */
Axel Auweter's avatar
Axel Auweter committed
599
      dcdbConn->initSchema();
Alessio Netti's avatar
Alessio Netti committed
600
      
601
602
603
      /*
       * Allocate the SensorDataStore.
       */
604
      mySensorDataStore = new DCDB::SensorDataStore(dcdbConn);
605
      mySensorConfig = new DCDB::SensorConfig(dcdbConn);
Alessio Netti's avatar
Alessio Netti committed
606
      myJobDataStore = new DCDB::JobDataStore(dcdbConn);
607
      myCaliEvtDataStore = new DCDB::CaliEvtDataStore(dcdbConn);
608
609
610

      /*
       * Set TTL for data store inserts if TTL > 0.
611
       */
612
      if (cassandraSettings.ttl > 0) {
Alessio Netti's avatar
Alessio Netti committed
613
        mySensorDataStore->setTTL(cassandraSettings.ttl);
614
615
        myCaliEvtDataStore->setTTL(cassandraSettings.ttl);
      }
Alessio Netti's avatar
Alessio Netti committed
616
      mySensorDataStore->setDebugLog(cassandraSettings.debugLog);
617
      myCaliEvtDataStore->setDebugLog(cassandraSettings.debugLog);
618

619
620
621
      // Fetching public sensor information from the Cassandra datastore
      list<DCDB::PublicSensor> publicSensors;
      metadataStore = new MetadataStore();
622
623
      if(mySensorConfig->getPublicSensorsVerbose(publicSensors)!=SC_OK)
          LOG(error) << "Failed to retrieve public sensors. Metadata Store and Sensor Navigator will be empty.";
624
      SensorMetadata sBuf;
625
      for (const auto &s : publicSensors)
626
627
628
629
          if (!s.is_virtual) {
              sBuf = Configuration::publicSensorToMetadata(s);
              metadataStore->store(sBuf.pattern, sBuf);
          }
630
          
631
632
      analyticsController = new AnalyticsController(mySensorConfig, mySensorDataStore);
      analyticsController->setCache(&mySensorCache);
633
      analyticsController->setMetadataStore(metadataStore);
Alessio Netti's avatar
Alessio Netti committed
634
      queryEngine.setFilter(analyticsSettings.filter);
Alessio Netti's avatar
Alessio Netti committed
635
      queryEngine.setJobFilter(analyticsSettings.jobFilter);
Alessio Netti's avatar
Alessio Netti committed
636
      queryEngine.setSensorHierarchy(analyticsSettings.hierarchy);
637
      queryEngine.setQueryCallback(sensorQueryCallback);
638
      queryEngine.setMetadataQueryCallback(metadataQueryCallback);
Alessio Netti's avatar
Alessio Netti committed
639
      queryEngine.setJobQueryCallback(jobQueryCallback);
640
      if(!analyticsController->initialize(settings, argv[argc - 1]))
641
642
          return EXIT_FAILURE;
      
Alessio Netti's avatar
Alessio Netti committed
643
      LOG_LEVEL vLogLevel = settings.validateConfig ? LOG_LEVEL::info : LOG_LEVEL::debug;
644
645
646
647
      LOG_VAR(vLogLevel) << "-----  Configuration  -----";

      //print global settings in either case
      LOG(info) << "Global Settings:";
Alessio Netti's avatar
Alessio Netti committed
648
      LOG(info) << "    MQTT-listenAddress: " << settings.mqttListenHost << ":" << settings.mqttListenPort;
Alessio Netti's avatar
Alessio Netti committed
649
      LOG(info) << "    CacheInterval:      " << int(pluginSettings.cacheInterval/1000) << " [s]";
650
651
652
653
654
      LOG(info) << "    CleaningInterval:   " << settings.cleaningInterval << " [s]";
      LOG(info) << "    MessageThreads:     " << settings.messageThreads;
      LOG(info) << "    MessageSlots:       " << settings.messageSlots;
      LOG(info) << "    Daemonize:          " << (settings.daemonize ? "Enabled" : "Disabled");
      LOG(info) << "    Statistics:         " << (settings.statistics ? "Enabled" : "Disabled");
Alessio Netti's avatar
Alessio Netti committed
655
      LOG(info) << "    MQTT-prefix:        " << pluginSettings.mqttPrefix;
656
      LOG(info) << "    Auto-publish:       " << (pluginSettings.autoPublish ? "Enabled" : "Disabled");
Alessio Netti's avatar
Alessio Netti committed
657
658
      LOG(info) << "    Write-Dir:          " << pluginSettings.tempdir;
      LOG(info) << (settings.validateConfig ? "    Only validating config files." : "    ValidateConfig:     Disabled");
659

Alessio Netti's avatar
Alessio Netti committed
660
661
      LOG(info) << "Analytics Settings:";
      LOG(info) << "    Hierarchy:          " << (analyticsSettings.hierarchy!="" ? analyticsSettings.hierarchy : "none");
662
      LOG(info) << "    Filter:             " << (analyticsSettings.filter!="" ? analyticsSettings.filter : "none");
Alessio Netti's avatar
Alessio Netti committed
663
      
664
      LOG(info) << "Cassandra Driver Settings:";
Alessio Netti's avatar
Alessio Netti committed
665
      LOG(info) << "    Address:            " << cassandraSettings.host << ":" << cassandraSettings.port;
Alessio Netti's avatar
Alessio Netti committed
666
667
668
669
670
      LOG(info) << "    TTL:                " << cassandraSettings.ttl;
      LOG(info) << "    NumThreadsIO:       " << cassandraSettings.numThreadsIo;
      LOG(info) << "    QueueSizeIO:        " << cassandraSettings.queueSizeIo;
      LOG(info) << "    CoreConnPerHost:    " << cassandraSettings.coreConnPerHost;
      LOG(info) << "    DebugLog:           " << (cassandraSettings.debugLog ? "Enabled" : "Disabled");
671
#ifdef SimpleMQTTVerbose
Alessio Netti's avatar
Alessio Netti committed
672
673
      LOG(info) << "    Username:           " << cassandraSettings.username;
	  LOG(info) << "    Password:           " << cassandraSettings.password;
674
675
676
677
#else
      LOG(info) << "    Username and password not printed.";
#endif

678
679
680
681
682
      if (restAPISettings.enabled) {
	  
	  LOG(info) << "RestAPI Settings:";
	  LOG(info) << "    REST Server: " << restAPISettings.host << ":" << restAPISettings.port;
	  LOG(info) << "    Certificate: " << restAPISettings.certificate;
Alessio Netti's avatar
Alessio Netti committed
683
684
	  LOG(info) << "    Private key file: " << restAPISettings.privateKey;
	  LOG(info) << "    DH params from: " << restAPISettings.dhFile;
685
      }
686
687
      LOG_VAR(vLogLevel) << "-----  Analytics Configuration  -----";
      for(auto& p : analyticsController->getManager()->getPlugins()) {
688
          LOG_VAR(vLogLevel) << "Operator Plugin \"" << p.id << "\"";
689
690
691
692
          p.configurator->printConfig(vLogLevel);
      }
      LOG_VAR(vLogLevel) << "-----  End Configuration  -----";

Alessio Netti's avatar
Alessio Netti committed
693
      if (settings.validateConfig)
694
695
696
697
          return EXIT_SUCCESS;
      else
          analyticsController->start();

698
699
700
      /*
       * Start the MQTT Message Server.
       */
Alessio Netti's avatar
Alessio Netti committed
701
      SimpleMQTTServer ms(settings.mqttListenHost, settings.mqttListenPort, settings.messageThreads, settings.messageSlots);
702
      
703
704
705
      ms.setMessageCallback(mqttCallback);
      ms.start();

Alessio Netti's avatar
Logging    
Alessio Netti committed
706
      LOG(info) << "MQTT Server running...";
707
      
708
709
710
      /*
       * Start the HTTP Server for the REST API
       */
711
      CARestAPI* httpsServer = nullptr;
712
      if (restAPISettings.enabled) {
713
714
715
716
          httpsServer = new CARestAPI(restAPISettings, &mySensorCache, analyticsController);
          config.readRestAPIUsers(httpsServer);
          httpsServer->start();
          LOG(info) <<  "HTTP Server running...";
717
      }
718

719
720
721
722
723
724
      /*
       * Run (hopefully) forever...
       */
      keepRunning = 1;
      timeval start, end;
      double elapsed;
725
726
727
      msgCtr = 0;
      pmsgCtr = 0;
      readingCtr = 0;
728

Alessio Netti's avatar
Alessio Netti committed
729
730
731
      gettimeofday(&start, NULL);
      uint64_t lastCleanup = start.tv_sec;

732
      LOG(info) << "Collect Agent running...";
733
734
      while(keepRunning) {
          gettimeofday(&start, NULL);
Alessio Netti's avatar
Alessio Netti committed
735
736
737
738
739
740
741
          if(start.tv_sec - lastCleanup > settings.cleaningInterval) {
              uint64_t purged = mySensorCache.clean(settings.cleaningInterval * 1000000000);
              lastCleanup = start.tv_sec;
              if(purged > 0)
                  LOG(info) << "Cache: purged " << purged << " obsolete entries";
          }

742
          sleep(60);
743
744
745
746
          /* not really thread safe but will do the job */
          gettimeofday(&end, NULL);
          elapsed = (end.tv_sec - start.tv_sec) * 1000.0;
          elapsed += (end.tv_usec - start.tv_usec) / 1000.0;
747
          float publish = msgCtr?(pmsgCtr*100.0)/msgCtr:0;
Alessio Netti's avatar
Alessio Netti committed
748
          if (settings.statistics && keepRunning) {
749
              LOG(info) << "Performance: " << (readingCtr/elapsed)*1000.0 << " inserts/s, " << (msgCtr/elapsed)*1000.0 << " messages/s (" << publish << "% PUBLISH)";
750
              LOG(info) << "Analytics Performance: " << (analyticsController->getReadingCtr()/elapsed)*1000.0 << " inserts/s ";
751
          }
752
          msgCtr = 0;
753
          pmsgCtr = 0;
754
	  readingCtr = 0;
755
756
      }

Alessio Netti's avatar
Logging    
Alessio Netti committed
757
      LOG(info) << "Stopping...";
758
      analyticsController->stop();
759
      ms.stop();
Alessio Netti's avatar
Logging    
Alessio Netti committed
760
      LOG(info) << "MQTT Server stopped...";
761
762
763
764
765
      if (restAPISettings.enabled) {
	  httpsServer->stop();
	  delete httpsServer;
	  LOG(info) << "HTTP Server stopped...";
      }
766
      delete mySensorDataStore;
Alessio Netti's avatar
Alessio Netti committed
767
      delete myJobDataStore;
768
      delete mySensorConfig;
769
      delete myCaliEvtDataStore;
Axel Auweter's avatar
Axel Auweter committed
770
771
      dcdbConn->disconnect();
      delete dcdbConn;
772
      delete metadataStore;
Alessio Netti's avatar
Logging    
Alessio Netti committed
773
      LOG(info) << "Collect Agent closed. Bye bye...";
774
  }
775
  catch (const exception& e) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
776
      LOG(fatal) << "Exception: " << e.what();
777
      abrt(EXIT_FAILURE, INTERR);
778
779
  }

780
  return EXIT_SUCCESS;
781
}
782
783