collectagent.cpp 30.3 KB
Newer Older
1
2
//================================================================================
// Name        : collectagent.cpp
3
// Author      : Axel Auweter
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
// Copyright   : Leibniz Supercomputing Centre
6
// Description : Main code of the CollectAgent
7
8
9
10
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
11
// Copyright (C) 2011-2019 Leibniz Supercomputing Centre
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
27

28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
 * @defgroup ca Collect Agent
 *
 * @brief MQTT message broker in between pusher and storage backend.
 *
 * @details Collect Agent is a intermediary between one or multiple Pusher
 *          instances and one storage backend. It runs a reduced custom MQTT
 *          message server. Collect Agent receives data from Pusher
 *          via MQTT messages and stores them in the storage via libdcdb.
 */

/**
 * @file collectagent.cpp
 *
 * @brief Main function for the DCDB Collect Agent.
 *
 * @ingroup ca
 */

47
#include <cstdlib>
48
#include <signal.h>
49
50
#include <unistd.h>
#include <string>
51

52
#include <boost/date_time/posix_time/posix_time.hpp>
53

54
#include <dcdb/connection.h>
55
#include <dcdb/sensordatastore.h>
Alessio Netti's avatar
Alessio Netti committed
56
#include <dcdb/jobdatastore.h>
57
#include <dcdb/calievtdatastore.h>
58
#include <dcdb/sensorconfig.h>
59
#include <dcdb/version.h>
60
#include <dcdb/sensor.h>
61
#include "version.h"
62

63
#include "CARestAPI.h"
64
#include "configuration.h"
65
#include "simplemqttserver.h"
66
#include "messaging.h"
67
#include "abrt.h"
68
#include "dcdbdaemon.h"
69
#include "sensorcache.h"
70
71
#include "analyticscontroller.h"
#include "../analytics/includes/QueryEngine.h"
72

73
74
75
#define __STDC_FORMAT_MACROS
#include <inttypes.h>

76
77
78
79
80
81
82
/**
 * Uncomment and recompile to activate CollectAgent's special benchmark mode.
 * In this mode, all received messages will be discarded and no data is stored
 * in the storage backend.
 */
//#define BENCHMARK_MODE

83
using namespace std;
84
85

int keepRunning;
86
bool statistics;
87
88
uint64_t msgCtr;
uint64_t pmsgCtr;
89
uint64_t readingCtr;
Alessio Netti's avatar
Alessio Netti committed
90
SensorCache mySensorCache;
91
AnalyticsController* analyticsController;
92
DCDB::Connection* dcdbConn;
93
DCDB::SensorDataStore *mySensorDataStore;
Alessio Netti's avatar
Alessio Netti committed
94
DCDB::JobDataStore *myJobDataStore;
95
DCDB::SensorConfig *mySensorConfig;
96
DCDB::CaliEvtDataStore *myCaliEvtDataStore;
97
MetadataStore *metadataStore;
98
DCDB::SCError err;
99
QueryEngine& queryEngine = QueryEngine::getInstance();
100
logger_t lg;
101

102
bool jobQueryCallback(const string& jobId, const uint64_t startTs, const uint64_t endTs, vector<qeJobData>& buffer, const bool rel, const bool range) {
Alessio Netti's avatar
Alessio Netti committed
103
104
105
106
107
108
109
110
111
112
    std::list<JobData> tempList;
    JobData   tempData;
    qeJobData tempQeData;
    JDError err;
    if(range) {
        // Getting a list of jobs in the given time range
        uint64_t now = getTimestamp();
        uint64_t startTsInt = rel ? now - startTs : startTs;
        uint64_t endTsInt = rel ? now - endTs : endTs;
        DCDB::TimeStamp start(startTsInt), end(endTsInt);
113
        err = myJobDataStore->getJobsInIntervalRunning(tempList, start, end);
114
        if(err != JD_OK) return false;
Alessio Netti's avatar
Alessio Netti committed
115
116
117
    } else {
        // Getting a single job by id
        err = myJobDataStore->getJobById(tempData, jobId);
118
        if(err != JD_OK) return false;
Alessio Netti's avatar
Alessio Netti committed
119
120
121
122
123
124
125
126
127
        tempList.push_back(tempData);
    }
    
    for(auto& jd : tempList) {
        tempQeData.jobId = jd.jobId;
        tempQeData.userId = jd.userId;
        tempQeData.startTime = jd.startTime.getRaw();
        tempQeData.endTime = jd.endTime.getRaw();
        tempQeData.nodes = jd.nodes;
128
        buffer.push_back(tempQeData);
Alessio Netti's avatar
Alessio Netti committed
129
    }
130
    return true;
Alessio Netti's avatar
Alessio Netti committed
131
132
}

133
bool sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>& buffer, const bool rel) {
134
135
136
    // Returning NULL if the query engine is being updated
    if(queryEngine.updating.load()) return false;
    ++queryEngine.access;
137
    std::string topic=name;
Alessio Netti's avatar
Alessio Netti committed
138
    // Getting the topic of the queried sensor from the Navigator
139
    // If not found, we try to use the input name as topic
140
141
    try {
        topic = queryEngine.getNavigator()->getNodeTopic(name);
142
    } catch(const std::domain_error& e) {}
143
    DCDB::SensorId sid;
Alessio Netti's avatar
Alessio Netti committed
144
    // Creating a SID to perform the query
145
146
    if(!sid.mqttTopicConvert(topic)) {
        --queryEngine.access;
147
        return false;
148
    }
149
150
    if(mySensorCache.getSensorMap().count(sid) > 0) {
        CacheEntry &entry = mySensorCache.getSensorMap()[sid];
151
152
        if (entry.getView(startTs, endTs, buffer, rel)) {
            --queryEngine.access;
153
            return true;
154
        }
155
156
157
158
159
160
161
162
163
164
165
166
167
    }
    // If we are here then the sensor was not found in the cache - we need to fetch data from Cassandra
    try {
        DCDB::PublicSensor publicSensor;
        publicSensor.name = name;
        publicSensor.pattern = topic;
        std::list <DCDB::SensorDataStoreReading> results;
        DCDB::Sensor sensor(dcdbConn, publicSensor);
        uint64_t now = getTimestamp();
        //Converting relative timestamps to absolute
        uint64_t startTsInt = rel ? now - startTs : startTs;
        uint64_t endTsInt = rel ? now - endTs : endTs;
        DCDB::TimeStamp start(startTsInt), end(endTsInt);
Alessio Netti's avatar
Alessio Netti committed
168
        sensor.query(results, start, end, DCDB::AGGREGATE_NONE, 3600000000000);
169
170
        if(results.empty()) {
            --queryEngine.access;
171
            return false;
172
        }
173
174
175
176
        reading_t reading;
        for (const auto &r : results) {
            reading.value = r.value;
            reading.timestamp = r.timeStamp.getRaw();
177
            buffer.push_back(reading);
178
179
180
        }
    }
    catch(const std::exception& e) {
181
        --queryEngine.access;
182
        return false;
183
    }
184
    --queryEngine.access;
185
    return true;
186
187
}

188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
bool metadataQueryCallback(const string& name, SensorMetadata& buffer) {
    // Returning NULL if the query engine is being updated
    if(queryEngine.updating.load()) return false;
    ++queryEngine.access;
    std::string topic=name;
    // Getting the topic of the queried sensor from the Navigator
    // If not found, we try to use the input name as topic
    try {
        topic = queryEngine.getNavigator()->getNodeTopic(name);
    } catch(const std::domain_error& e) {}
    
    if(metadataStore->getMap().count(topic)) {
        buffer = metadataStore->get(topic);
    } else {
        // If we are here then the sensor was not found in the cache - we need to fetch data from Cassandra
        try {
            DCDB::PublicSensor publicSensor;
            if (mySensorConfig->getPublicSensorByName(publicSensor, topic.c_str()) != SC_OK) {
                --queryEngine.access;
                return false;
            }
            buffer = Configuration::publicSensorToMetadata(publicSensor);
        }
        catch (const std::exception &e) {
            --queryEngine.access;
            return false;
        }
    }
    --queryEngine.access;
    return true;
}

220
/* Normal termination (SIGINT, CTRL+C) */
221
222
void sigHandler(int sig)
{
Alessio Netti's avatar
Alessio Netti committed
223
224
225
226
227
  boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
  if( sig == SIGINT )
      LOG(fatal) << "Received SIGINT";
  else if( sig == SIGTERM )
      LOG(fatal) << "Received SIGTERM";
228
229
230
  keepRunning = 0;
}

231
232
233
234
235
236
/* Crash */
void abrtHandler(int sig)
{
  abrt(EXIT_FAILURE, SIGNAL);
}

237
int mqttCallback(SimpleMQTTMessage *msg)
238
239
240
241
{
  /*
   * Increment the msgCtr/vmsgCtr for statistics.
   */
242
  msgCtr++;
243
244
245
  if (msg->isPublish())
    pmsgCtr++;

246
247
#ifndef BENCHMARK_MODE

248
  uint64_t len;
249
250
251
  /*
   * Decode the message and put into the database.
   */
252
  if (msg->isPublish()) {
253
      const char *topic = msg->getTopic().c_str();
254
      // We check whether the topic includes the /DCDB_MAP/ keyword, indicating that the payload will contain the
255
256
      // sensor's name. In that case, we set the mappingMessage flag to true, and filter the keyword out of the prefix
      // We use strncmp as it is the most efficient way to do it
Alessio Netti's avatar
Alessio Netti committed
257
      if (strncmp(topic, DCDB_MAP, DCDB_MAP_LEN) == 0) {
258
          if ((len = msg->getPayloadLength()) == 0) {
259
              LOG(error) << "Empty sensor publish message received!";
260
261
              return 1;
          }
262

263
          string payload((char *) msg->getPayload(), len);
264
265
266
267
268
269
270
271
272
          //If the topic includes the extended /DCDB_MAP/METADATA/ keyword, we assume a JSON metadata packet is encoded
          if(strncmp(topic, DCDB_MET, DCDB_MET_LEN) == 0) {
              SensorMetadata sm;
              try {
                  sm.parseJSON(payload);
              } catch (const std::exception &e) {
                  LOG(error) << "Invalid metadata packed received!";
                  return 1;
              }
273
              DCDB::PublicSensor ps = Configuration::metadataToPublicSensor(sm);
Alessio Netti's avatar
Alessio Netti committed
274
              err = mySensorConfig->publishSensor(ps);
275
              metadataStore->store(sm.pattern, sm);
276
277
          } else {
              err = mySensorConfig->publishSensor(payload.c_str(), topic + DCDB_MAP_LEN);
278
          }
279
280
281
282

          // PublishSensor does most of the error checking for us
          switch (err) {
              case DCDB::SC_INVALIDPATTERN:
Alessio Netti's avatar
Logging    
Alessio Netti committed
283
                  LOG(error) << "Invalid sensor topic : " << msg->getTopic();
284
285
                  return 1;
              case DCDB::SC_INVALIDPUBLICNAME:
286
                  LOG(error) << "Invalid sensor public name.";
287
288
                  return 1;
              case DCDB::SC_INVALIDSESSION:
Alessio Netti's avatar
Logging    
Alessio Netti committed
289
                  LOG(error) << "Cannot reach sensor data store.";
290
291
292
293
                  return 1;
              default:
                  break;
          }
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
      } else if (strncmp(topic, DCDB_CALIEVT, DCDB_CALIEVT_LEN) == 0) {
          /*
           * Special message case. This message contains a Caliper Event data
           * string that is encoded in the MQTT topic. Its payload consists of
           * usual timestamp-value pairs.
           * Data from this messages will be stored in a separate table managed
           * by CaliEvtDataStore class.
           */
          std::string topicStr(msg->getTopic());
          mqttPayload buf, *payload;

          len = msg->getPayloadLength();
          //TODO support case that no timestamp is given?
          //retrieve timestamp and value from the payload
          if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
              payload = (mqttPayload *) msg->getPayload();
          } else {
              //this message is malformed -> ignore
              LOG(error) << "Message malformed";
              return 1;
          }

          /*
           * Decode message topic in actual sensor topic and string data.
           * "/:/" is used as delimiter between topic and data.
           */
          topicStr.erase(0, DCDB_CALIEVT_LEN);
          size_t pos = topicStr.find("/:/");
          if (pos == std::string::npos) {
              // topic is malformed -> ignore
              LOG(error) << "CaliEvt topic malformed";
              return 1;
          }
          const std::string data(topicStr, pos+3);
          topicStr.erase(pos);

          /*
           * We use the same MQTT-topic/SensorId infrastructure as actual sensor
           * data readings to sort related events.
           * Check if we can decode the event topic into a valid SensorId. If
           * successful, store the record in the database.
           */
          DCDB::SensorId sid;
          if (sid.mqttTopicConvert(topicStr)) {
338
              //std::list<DCDB::CaliEvtData> events;
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
              DCDB::CaliEvtData e;
              e.eventId   = sid;
              e.event     = data;
              for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
                  e.timeStamp = payload[i].timestamp;

                  /**
                   * We want an exhaustive list of all events ordered by their
                   * time of occurrence. Payload values should always be
                   * one. Other values currently indicate a malformed message.
                   *
                   * In the future, the value field could be used to aggregate
                   * multiple equal events that occurred in the same plugin
                   * read cycle.
                   */
                  if(payload[i].value != 1) {
                      LOG(error) << "CaliEvt message malformed. Value != 1";
                      return 1;
                  }

359
360
                  myCaliEvtDataStore->insert(e, metadataStore->getTTL(topicStr));
                  //events.push_back(e);
361
              }
362
              //myCaliEvtDataStore->insertBatch(events, metadataStore->getTTL(topicStr));
363
364
365
          } else {
              LOG(error) << "Topic could not be converted to SID";
          }
366
367
368
369
370
371
372
      } else {
          mqttPayload buf, *payload;

          len = msg->getPayloadLength();
          //In the 64 bit message case, the collect agent provides a timestamp
          if (len == sizeof(uint64_t)) {
              payload = &buf;
373
              payload->value = *((int64_t *) msg->getPayload());
374
375
              payload->timestamp = Messaging::calculateTimestamp();
              len = sizeof(uint64_t) * 2;
376
          }
377
378
379
380
381
382
              //...otherwise it just retrieves it from the MQTT message payload.
          else if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
              payload = (mqttPayload *) msg->getPayload();
          }
              //...otherwise this message is malformed -> ignore...
          else {
Alessio Netti's avatar
Logging    
Alessio Netti committed
383
              LOG(error) << "Message malformed";
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
              return 1;
          }

          /*
           * Check if we can decode the message topic
           * into a valid SensorId. If successful, store
           * the record in the database.
           */
          DCDB::SensorId sid;
          if (sid.mqttTopicConvert(msg->getTopic())) {
#if 0
              cout << "Topic decode successful:" << endl
                  << "  Raw:            " << hex << setw(16) << setfill('0') << sid.getRaw()[0] << hex << setw(16) << setfill('0') << sid.getRaw()[1] << endl
                  << "  DeviceLocation: " << hex << setw(16) << setfill('0') << sid.getDeviceLocation() << endl
                  << "  device_id:      " << hex << setw(8) << setfill('0') << sid.getDeviceSensorId().device_id << endl
                  << "  sensor_number:  " << hex << setw(4) << setfill('0') << sid.getDeviceSensorId().sensor_number << endl << dec;

              cout << "Payload ("  << len/sizeof(mqttPayload) << " messages):"<< endl;
              for (uint64_t i=0; i<len/sizeof(mqttPayload); i++) {
                cout << "  " << i << ": ts=" << payload[i].timestamp << " val=" << payload[i].value << endl;
              }
              cout << endl;
406
#endif
407
	      std::list<DCDB::SensorDataStoreReading> readings;
Alessio Netti's avatar
Alessio Netti committed
408
409
410
411
412
413
          for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
              DCDB::SensorDataStoreReading r(sid, payload[i].timestamp, payload[i].value);
              readings.push_back(r);
              mySensorCache.storeSensor(sid, payload[i].timestamp, payload[i].value);
          }
          mySensorCache.getSensorMap()[sid].updateBatchSize(uint64_t(len / sizeof(mqttPayload)));
414
	      mySensorDataStore->insertBatch(readings, metadataStore->getTTL(msg->getTopic()));
415
	      readingCtr+= readings.size();
416

417
              //mySensorCache.dump();
418
419
          } else {
              LOG(error) << "Message with empty topic received";
420
          }
421
      }
422
  }
423
#endif
424
  return 0;
425
426
}

427
428
429



430
431
432
433
/*
 * Print usage information
 */
void usage() {
Alessio Netti's avatar
Alessio Netti committed
434
  Configuration config("", "collectagent.conf");
435
436
437
438
439
  /*
             1         2         3         4         5         6         7         8
   012345678901234567890123456789012345678901234567890123456789012345678901234567890
   */
  cout << "Usage:" << endl;
440
  cout << "  collectagent [-d] [-s] [-x] [-a] [-m<host>] [-c<host>] [-u<username>] [-p<password>] [-t<ttl>] [-v<verbosity>] <path/to/configfiles/>" << endl;
441
442
443
444
  cout << "  collectagent -h" << endl;
  cout << endl;
  
  cout << "Options:" << endl;
Alessio Netti's avatar
Alessio Netti committed
445
446
  cout << "  -m<host>      MQTT listen address     [default: " << config.mqttListenHost << ":" << config.mqttListenPort << "]" << endl;
  cout << "  -c<host>      Cassandra host          [default: " << config.cassandraSettings.host << ":" << config.cassandraSettings.port << "]" << endl;
Michael Ott's avatar
Michael Ott committed
447
448
  cout << "  -u<username>  Cassandra username      [default: none]" << endl;
  cout << "  -p<password>  Cassandra password      [default: none]" << endl;
Alessio Netti's avatar
Alessio Netti committed
449
450
  cout << "  -t<ttl>       Cassandra insert TTL    [default: " << config.cassandraSettings.ttl << "]" << endl;
  cout << "  -v<level>     Set verbosity of output [default: " << config.logLevelCmd << "]" << endl
Alessio Netti's avatar
Logging    
Alessio Netti committed
451
       << "                Can be a number between 5 (all) and 0 (fatal)." << endl;
452
  cout << endl;
Michael Ott's avatar
Michael Ott committed
453
  cout << "  -d            Daemonize" << endl;
454
  cout << "  -s            Print message stats" <<endl;
455
  cout << "  -x            Parse and print the config but do not actually start collectagent" << endl;
456
  cout << "  -a			   Enable sensor auto-publish" << endl;
Michael Ott's avatar
Michael Ott committed
457
  cout << "  -h            This help page" << endl;
458
  cout << endl;
459
460
461
}

int main(int argc, char* const argv[]) {
462
    cout << "CollectAgent " << VERSION << " (libdcdb " << DCDB::Version::getVersion() << ")" << endl << endl;
463
464

  try{
465

466
467
468
469
470
471
472
473
      // Checking if path to config is supplied
      if (argc <= 1) {
          cout << "Please specify a path to the config-directory" << endl << endl;
          usage();
          exit(EXIT_FAILURE);
      }

      // Defining options
474
      const char* opts = "m:r:c:C:u:p:t:v:dDsaxh";
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490

      // Same mechanism as in DCDBPusher - checking if help string is requested before loading config
      char ret;
      while ((ret = getopt(argc, argv, opts)) != -1) {
          switch (ret)
          {
              case 'h':
                  usage();
                  exit(EXIT_FAILURE);
                  break;
              default:
                  //do nothing (other options are read later on)
                  break;
          }
      }

491
492
      initLogging();
      auto cmdSink = setupCmdLogger();
Alessio Netti's avatar
Logging    
Alessio Netti committed
493

Alessio Netti's avatar
Alessio Netti committed
494
495
      Configuration config(argv[argc - 1], "collectagent.conf");
      if( !config.readConfig() ) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
496
          LOG(fatal) << "Failed to read global configuration!";
497
498
499
          exit(EXIT_FAILURE);
      }

Alessio Netti's avatar
Alessio Netti committed
500
501
502
503
      // References to shorten access to config parameters
      Configuration& settings = config;
      cassandra_t& cassandraSettings = config.cassandraSettings;
      pluginSettings_t& pluginSettings = config.pluginSettings;
Micha Mueller's avatar
Micha Mueller committed
504
      serverSettings_t& restAPISettings = config.restAPISettings;
Alessio Netti's avatar
Alessio Netti committed
505
      analyticsSettings_t& analyticsSettings = config.analyticsSettings;
Alessio Netti's avatar
Alessio Netti committed
506
      
507
508
      optind = 1;
      while ((ret=getopt(argc, argv, opts))!=-1) {
509
          switch(ret) {
510
              case 'a':
511
                  pluginSettings.autoPublish = true;
512
                  break;
513
              case 'm':
Alessio Netti's avatar
Alessio Netti committed
514
515
516
                  settings.mqttListenHost = parseNetworkHost(optarg);
                  settings.mqttListenPort = parseNetworkPort(optarg);
                  if(settings.mqttListenPort=="") settings.mqttListenPort = string(LISTENPORT);
517
                  break;
518
              case 'c':
Alessio Netti's avatar
Alessio Netti committed
519
520
521
                  cassandraSettings.host = parseNetworkHost(optarg);
                  cassandraSettings.port = parseNetworkPort(optarg);
                  if(cassandraSettings.port=="") cassandraSettings.port = string(CASSANDRAPORT);
522
                  break;
Michael Ott's avatar
Michael Ott committed
523
              case 'u':
Alessio Netti's avatar
Alessio Netti committed
524
                  cassandraSettings.username = optarg;
525
                  break;
Michael Ott's avatar
Michael Ott committed
526
              case 'p': {
Alessio Netti's avatar
Alessio Netti committed
527
                  cassandraSettings.password = optarg;
528
                  // What does this do? Mask the password?
Michael Ott's avatar
Michael Ott committed
529
530
531
532
533
534
535
                  size_t pwdLen = strlen(optarg);
                  memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
                  if (pwdLen > 3) {
                      memset(optarg+3, 0, pwdLen-3);
                  }
                  break;
              }
536
              case 't':
Alessio Netti's avatar
Alessio Netti committed
537
                  cassandraSettings.ttl = stoul(optarg);
538
                  break;
Alessio Netti's avatar
Logging    
Alessio Netti committed
539
              case 'v':
540
                  settings.logLevelCmd = stoi(optarg);
Alessio Netti's avatar
Logging    
Alessio Netti committed
541
                  break;
542
              case 'd':
543
              case 'D':
544
                  settings.daemonize = 1;
545
                  break;
546
              case 's':
547
                  settings.statistics = 1;
548
                  break;
549
              case 'x':
Alessio Netti's avatar
Alessio Netti committed
550
                  settings.validateConfig = true;
551
                  break;
552
              case 'h':
553
554
555
              default:
                  usage();
                  exit(EXIT_FAILURE);
556
557
558
          }
      }

559
560
      //set up logger to file
      if (settings.logLevelFile >= 0) {
Alessio Netti's avatar
Alessio Netti committed
561
	  auto fileSink = setupFileLogger(pluginSettings.tempdir, std::string("collectagent"));
562
563
564
	  fileSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelFile));
      }
      
Alessio Netti's avatar
Logging    
Alessio Netti committed
565
      //severity level may be overwritten (per option or config-file) --> set it according to globalSettings
566
567
568
      if (settings.logLevelCmd >= 0) {
	  cmdSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelCmd));
      }
Alessio Netti's avatar
Logging    
Alessio Netti committed
569

570
      /*
Alessio Netti's avatar
Alessio Netti committed
571
       * Catch SIGINT and SIGTERM signals to allow for proper server shutdowns.
572
573
       */
      signal(SIGINT, sigHandler);
Alessio Netti's avatar
Alessio Netti committed
574
      signal(SIGTERM, sigHandler);
575
576
577
578
579
580
581
582
583
584

      /*
       * Catch critical signals to allow for backtraces
       */
      signal(SIGABRT, abrtHandler);
      signal(SIGSEGV, abrtHandler);

      // Daemonizing the collectagent
      if(settings.daemonize)
          dcdbdaemon();
Alessio Netti's avatar
Alessio Netti committed
585
      
586
      // Setting the size of the sensor cache
587
      // Conversion from milliseconds to nanoseconds
Alessio Netti's avatar
Alessio Netti committed
588
      mySensorCache.setMaxHistory(uint64_t(pluginSettings.cacheInterval) * 1000000);
589

590
      //Allocate and initialize connection to Cassandra.
Alessio Netti's avatar
Alessio Netti committed
591
592
      dcdbConn = new DCDB::Connection(cassandraSettings.host, atoi(cassandraSettings.port.c_str()), 
                                      cassandraSettings.username, cassandraSettings.password);
Alessio Netti's avatar
Alessio Netti committed
593
594
      dcdbConn->setNumThreadsIo(cassandraSettings.numThreadsIo);
      dcdbConn->setQueueSizeIo(cassandraSettings.queueSizeIo);
595
      uint32_t params[1] = {cassandraSettings.coreConnPerHost};
Alessio Netti's avatar
Alessio Netti committed
596
      dcdbConn->setBackendParams(params);
Alessio Netti's avatar
Alessio Netti committed
597
      
Axel Auweter's avatar
Axel Auweter committed
598
      if (!dcdbConn->connect()) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
599
          LOG(fatal) << "Cannot connect to Cassandra!";
600
601
602
603
604
605
          exit(EXIT_FAILURE);
      }

      /*
       * Legacy behavior: Initialize the DCDB schema in Cassandra.
       */
Axel Auweter's avatar
Axel Auweter committed
606
      dcdbConn->initSchema();
Alessio Netti's avatar
Alessio Netti committed
607
      
608
609
610
      /*
       * Allocate the SensorDataStore.
       */
611
      mySensorDataStore = new DCDB::SensorDataStore(dcdbConn);
612
      mySensorConfig = new DCDB::SensorConfig(dcdbConn);
Alessio Netti's avatar
Alessio Netti committed
613
      myJobDataStore = new DCDB::JobDataStore(dcdbConn);
614
      myCaliEvtDataStore = new DCDB::CaliEvtDataStore(dcdbConn);
615
616
617

      /*
       * Set TTL for data store inserts if TTL > 0.
618
       */
619
      if (cassandraSettings.ttl > 0) {
Alessio Netti's avatar
Alessio Netti committed
620
        mySensorDataStore->setTTL(cassandraSettings.ttl);
621
622
        myCaliEvtDataStore->setTTL(cassandraSettings.ttl);
      }
Alessio Netti's avatar
Alessio Netti committed
623
      mySensorDataStore->setDebugLog(cassandraSettings.debugLog);
624
      myCaliEvtDataStore->setDebugLog(cassandraSettings.debugLog);
625

626
627
628
      // Fetching public sensor information from the Cassandra datastore
      list<DCDB::PublicSensor> publicSensors;
      metadataStore = new MetadataStore();
629
630
      if(mySensorConfig->getPublicSensorsVerbose(publicSensors)!=SC_OK)
          LOG(error) << "Failed to retrieve public sensors. Metadata Store and Sensor Navigator will be empty.";
631
      SensorMetadata sBuf;
632
      for (const auto &s : publicSensors)
633
634
635
636
          if (!s.is_virtual) {
              sBuf = Configuration::publicSensorToMetadata(s);
              metadataStore->store(sBuf.pattern, sBuf);
          }
637
          
638
639
      analyticsController = new AnalyticsController(mySensorConfig, mySensorDataStore);
      analyticsController->setCache(&mySensorCache);
640
      analyticsController->setMetadataStore(metadataStore);
Alessio Netti's avatar
Alessio Netti committed
641
      queryEngine.setFilter(analyticsSettings.filter);
Alessio Netti's avatar
Alessio Netti committed
642
      queryEngine.setJobFilter(analyticsSettings.jobFilter);
Alessio Netti's avatar
Alessio Netti committed
643
      queryEngine.setSensorHierarchy(analyticsSettings.hierarchy);
644
      queryEngine.setQueryCallback(sensorQueryCallback);
645
      queryEngine.setMetadataQueryCallback(metadataQueryCallback);
Alessio Netti's avatar
Alessio Netti committed
646
      queryEngine.setJobQueryCallback(jobQueryCallback);
647
      if(!analyticsController->initialize(settings, argv[argc - 1]))
648
649
          return EXIT_FAILURE;
      
Alessio Netti's avatar
Alessio Netti committed
650
      LOG_LEVEL vLogLevel = settings.validateConfig ? LOG_LEVEL::info : LOG_LEVEL::debug;
651
652
653
654
      LOG_VAR(vLogLevel) << "-----  Configuration  -----";

      //print global settings in either case
      LOG(info) << "Global Settings:";
Alessio Netti's avatar
Alessio Netti committed
655
      LOG(info) << "    MQTT-listenAddress: " << settings.mqttListenHost << ":" << settings.mqttListenPort;
Alessio Netti's avatar
Alessio Netti committed
656
      LOG(info) << "    CacheInterval:      " << int(pluginSettings.cacheInterval/1000) << " [s]";
657
658
659
660
661
      LOG(info) << "    CleaningInterval:   " << settings.cleaningInterval << " [s]";
      LOG(info) << "    MessageThreads:     " << settings.messageThreads;
      LOG(info) << "    MessageSlots:       " << settings.messageSlots;
      LOG(info) << "    Daemonize:          " << (settings.daemonize ? "Enabled" : "Disabled");
      LOG(info) << "    Statistics:         " << (settings.statistics ? "Enabled" : "Disabled");
Alessio Netti's avatar
Alessio Netti committed
662
      LOG(info) << "    MQTT-prefix:        " << pluginSettings.mqttPrefix;
663
      LOG(info) << "    Auto-publish:       " << (pluginSettings.autoPublish ? "Enabled" : "Disabled");
Alessio Netti's avatar
Alessio Netti committed
664
665
      LOG(info) << "    Write-Dir:          " << pluginSettings.tempdir;
      LOG(info) << (settings.validateConfig ? "    Only validating config files." : "    ValidateConfig:     Disabled");
666

Alessio Netti's avatar
Alessio Netti committed
667
668
      LOG(info) << "Analytics Settings:";
      LOG(info) << "    Hierarchy:          " << (analyticsSettings.hierarchy!="" ? analyticsSettings.hierarchy : "none");
669
      LOG(info) << "    Filter:             " << (analyticsSettings.filter!="" ? analyticsSettings.filter : "none");
Alessio Netti's avatar
Alessio Netti committed
670
      
671
      LOG(info) << "Cassandra Driver Settings:";
Alessio Netti's avatar
Alessio Netti committed
672
      LOG(info) << "    Address:            " << cassandraSettings.host << ":" << cassandraSettings.port;
Alessio Netti's avatar
Alessio Netti committed
673
674
675
676
677
      LOG(info) << "    TTL:                " << cassandraSettings.ttl;
      LOG(info) << "    NumThreadsIO:       " << cassandraSettings.numThreadsIo;
      LOG(info) << "    QueueSizeIO:        " << cassandraSettings.queueSizeIo;
      LOG(info) << "    CoreConnPerHost:    " << cassandraSettings.coreConnPerHost;
      LOG(info) << "    DebugLog:           " << (cassandraSettings.debugLog ? "Enabled" : "Disabled");
678
#ifdef SimpleMQTTVerbose
Alessio Netti's avatar
Alessio Netti committed
679
680
      LOG(info) << "    Username:           " << cassandraSettings.username;
	  LOG(info) << "    Password:           " << cassandraSettings.password;
681
682
683
684
#else
      LOG(info) << "    Username and password not printed.";
#endif

685
686
687
688
689
      if (restAPISettings.enabled) {
	  
	  LOG(info) << "RestAPI Settings:";
	  LOG(info) << "    REST Server: " << restAPISettings.host << ":" << restAPISettings.port;
	  LOG(info) << "    Certificate: " << restAPISettings.certificate;
Alessio Netti's avatar
Alessio Netti committed
690
691
	  LOG(info) << "    Private key file: " << restAPISettings.privateKey;
	  LOG(info) << "    DH params from: " << restAPISettings.dhFile;
692
      }
693
694
      LOG_VAR(vLogLevel) << "-----  Analytics Configuration  -----";
      for(auto& p : analyticsController->getManager()->getPlugins()) {
695
          LOG_VAR(vLogLevel) << "Operator Plugin \"" << p.id << "\"";
696
697
698
699
          p.configurator->printConfig(vLogLevel);
      }
      LOG_VAR(vLogLevel) << "-----  End Configuration  -----";

Alessio Netti's avatar
Alessio Netti committed
700
      if (settings.validateConfig)
701
702
703
704
          return EXIT_SUCCESS;
      else
          analyticsController->start();

705
706
707
      /*
       * Start the MQTT Message Server.
       */
Alessio Netti's avatar
Alessio Netti committed
708
      SimpleMQTTServer ms(settings.mqttListenHost, settings.mqttListenPort, settings.messageThreads, settings.messageSlots);
709
      
710
711
712
      ms.setMessageCallback(mqttCallback);
      ms.start();

Alessio Netti's avatar
Logging    
Alessio Netti committed
713
      LOG(info) << "MQTT Server running...";
714
      
715
716
717
      /*
       * Start the HTTP Server for the REST API
       */
718
      CARestAPI* httpsServer = nullptr;
719
      if (restAPISettings.enabled) {
720
721
722
723
          httpsServer = new CARestAPI(restAPISettings, &mySensorCache, analyticsController);
          config.readRestAPIUsers(httpsServer);
          httpsServer->start();
          LOG(info) <<  "HTTP Server running...";
724
      }
725

726
727
728
729
730
731
      /*
       * Run (hopefully) forever...
       */
      keepRunning = 1;
      timeval start, end;
      double elapsed;
732
733
734
      msgCtr = 0;
      pmsgCtr = 0;
      readingCtr = 0;
735

Alessio Netti's avatar
Alessio Netti committed
736
737
738
      gettimeofday(&start, NULL);
      uint64_t lastCleanup = start.tv_sec;

739
      LOG(info) << "Collect Agent running...";
740
741
      while(keepRunning) {
          gettimeofday(&start, NULL);
Alessio Netti's avatar
Alessio Netti committed
742
743
744
745
746
747
748
          if(start.tv_sec - lastCleanup > settings.cleaningInterval) {
              uint64_t purged = mySensorCache.clean(settings.cleaningInterval * 1000000000);
              lastCleanup = start.tv_sec;
              if(purged > 0)
                  LOG(info) << "Cache: purged " << purged << " obsolete entries";
          }

749
          sleep(60);
750
751
752
753
          /* not really thread safe but will do the job */
          gettimeofday(&end, NULL);
          elapsed = (end.tv_sec - start.tv_sec) * 1000.0;
          elapsed += (end.tv_usec - start.tv_usec) / 1000.0;
754
          float publish = msgCtr?(pmsgCtr*100.0)/msgCtr:0;
Alessio Netti's avatar
Alessio Netti committed
755
          if (settings.statistics && keepRunning) {
756
              LOG(info) << "Performance: " << (readingCtr/elapsed)*1000.0 << " inserts/s, " << (msgCtr/elapsed)*1000.0 << " messages/s (" << publish << "% PUBLISH)";
757
              LOG(info) << "Analytics Performance: " << (analyticsController->getReadingCtr()/elapsed)*1000.0 << " inserts/s ";
758
          }
759
          msgCtr = 0;
760
          pmsgCtr = 0;
761
	  readingCtr = 0;
762
763
      }

Alessio Netti's avatar
Logging    
Alessio Netti committed
764
      LOG(info) << "Stopping...";
765
      analyticsController->stop();
766
      ms.stop();
Alessio Netti's avatar
Logging    
Alessio Netti committed
767
      LOG(info) << "MQTT Server stopped...";
768
769
770
771
772
      if (restAPISettings.enabled) {
	  httpsServer->stop();
	  delete httpsServer;
	  LOG(info) << "HTTP Server stopped...";
      }
773
      delete mySensorDataStore;
Alessio Netti's avatar
Alessio Netti committed
774
      delete myJobDataStore;
775
      delete mySensorConfig;
776
      delete myCaliEvtDataStore;
Axel Auweter's avatar
Axel Auweter committed
777
778
      dcdbConn->disconnect();
      delete dcdbConn;
779
      delete metadataStore;
Alessio Netti's avatar
Logging    
Alessio Netti committed
780
      LOG(info) << "Collect Agent closed. Bye bye...";
781
  }
782
  catch (const exception& e) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
783
      LOG(fatal) << "Exception: " << e.what();
784
      abrt(EXIT_FAILURE, INTERR);
785
786
  }

787
  return EXIT_SUCCESS;
788
}
789
790