collectagent.cpp 30.1 KB
Newer Older
1
2
//================================================================================
// Name        : collectagent.cpp
3
// Author      : Axel Auweter
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
// Copyright   : Leibniz Supercomputing Centre
6
// Description : Main code of the CollectAgent
7
8
9
10
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
11
// Copyright (C) 2011-2019 Leibniz Supercomputing Centre
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
27

28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
 * @defgroup ca Collect Agent
 *
 * @brief MQTT message broker in between pusher and storage backend.
 *
 * @details Collect Agent is a intermediary between one or multiple Pusher
 *          instances and one storage backend. It runs a reduced custom MQTT
 *          message server. Collect Agent receives data from Pusher
 *          via MQTT messages and stores them in the storage via libdcdb.
 */

/**
 * @file collectagent.cpp
 *
 * @brief Main function for the DCDB Collect Agent.
 *
 * @ingroup ca
 */

47
#include <cstdlib>
48
#include <signal.h>
49
50
#include <unistd.h>
#include <string>
51

52
#include <boost/date_time/posix_time/posix_time.hpp>
53

54
#include <dcdb/connection.h>
55
#include <dcdb/sensordatastore.h>
Alessio Netti's avatar
Alessio Netti committed
56
#include <dcdb/jobdatastore.h>
57
#include <dcdb/calievtdatastore.h>
58
#include <dcdb/sensorconfig.h>
59
#include <dcdb/version.h>
60
#include <dcdb/sensor.h>
61
#include "version.h"
62

63
#include "CARestAPI.h"
64
#include "configuration.h"
65
#include "simplemqttserver.h"
66
#include "messaging.h"
67
#include "abrt.h"
68
#include "dcdbdaemon.h"
69
#include "sensorcache.h"
70
71
#include "analyticscontroller.h"
#include "../analytics/includes/QueryEngine.h"
72

73
74
75
#define __STDC_FORMAT_MACROS
#include <inttypes.h>

76
using namespace std;
77
78

int keepRunning;
79
bool statistics;
80
81
uint64_t msgCtr;
uint64_t pmsgCtr;
82
uint64_t readingCtr;
Alessio Netti's avatar
Alessio Netti committed
83
SensorCache mySensorCache;
84
AnalyticsController* analyticsController;
85
DCDB::Connection* dcdbConn;
86
DCDB::SensorDataStore *mySensorDataStore;
Alessio Netti's avatar
Alessio Netti committed
87
DCDB::JobDataStore *myJobDataStore;
88
DCDB::SensorConfig *mySensorConfig;
89
DCDB::CaliEvtDataStore *myCaliEvtDataStore;
90
MetadataStore *metadataStore;
91
DCDB::SCError err;
92
QueryEngine& queryEngine = QueryEngine::getInstance();
93
logger_t lg;
94

95
bool jobQueryCallback(const string& jobId, const uint64_t startTs, const uint64_t endTs, vector<qeJobData>& buffer, const bool rel, const bool range) {
Alessio Netti's avatar
Alessio Netti committed
96
97
98
99
100
101
102
103
104
105
    std::list<JobData> tempList;
    JobData   tempData;
    qeJobData tempQeData;
    JDError err;
    if(range) {
        // Getting a list of jobs in the given time range
        uint64_t now = getTimestamp();
        uint64_t startTsInt = rel ? now - startTs : startTs;
        uint64_t endTsInt = rel ? now - endTs : endTs;
        DCDB::TimeStamp start(startTsInt), end(endTsInt);
106
        err = myJobDataStore->getJobsInIntervalRunning(tempList, start, end);
107
        if(err != JD_OK) return false;
Alessio Netti's avatar
Alessio Netti committed
108
109
110
    } else {
        // Getting a single job by id
        err = myJobDataStore->getJobById(tempData, jobId);
111
        if(err != JD_OK) return false;
Alessio Netti's avatar
Alessio Netti committed
112
113
114
115
116
117
118
119
120
        tempList.push_back(tempData);
    }
    
    for(auto& jd : tempList) {
        tempQeData.jobId = jd.jobId;
        tempQeData.userId = jd.userId;
        tempQeData.startTime = jd.startTime.getRaw();
        tempQeData.endTime = jd.endTime.getRaw();
        tempQeData.nodes = jd.nodes;
121
        buffer.push_back(tempQeData);
Alessio Netti's avatar
Alessio Netti committed
122
    }
123
    return true;
Alessio Netti's avatar
Alessio Netti committed
124
125
}

126
bool sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>& buffer, const bool rel) {
127
128
129
    // Returning NULL if the query engine is being updated
    if(queryEngine.updating.load()) return false;
    ++queryEngine.access;
130
    std::string topic=name;
Alessio Netti's avatar
Alessio Netti committed
131
    // Getting the topic of the queried sensor from the Navigator
132
    // If not found, we try to use the input name as topic
133
134
    try {
        topic = queryEngine.getNavigator()->getNodeTopic(name);
135
    } catch(const std::domain_error& e) {}
136
    DCDB::SensorId sid;
Alessio Netti's avatar
Alessio Netti committed
137
    // Creating a SID to perform the query
138
139
    if(!sid.mqttTopicConvert(topic)) {
        --queryEngine.access;
140
        return false;
141
    }
142
143
    if(mySensorCache.getSensorMap().count(sid) > 0) {
        CacheEntry &entry = mySensorCache.getSensorMap()[sid];
144
145
        if (entry.getView(startTs, endTs, buffer, rel)) {
            --queryEngine.access;
146
            return true;
147
        }
148
149
150
151
152
153
154
155
156
157
158
159
160
    }
    // If we are here then the sensor was not found in the cache - we need to fetch data from Cassandra
    try {
        DCDB::PublicSensor publicSensor;
        publicSensor.name = name;
        publicSensor.pattern = topic;
        std::list <DCDB::SensorDataStoreReading> results;
        DCDB::Sensor sensor(dcdbConn, publicSensor);
        uint64_t now = getTimestamp();
        //Converting relative timestamps to absolute
        uint64_t startTsInt = rel ? now - startTs : startTs;
        uint64_t endTsInt = rel ? now - endTs : endTs;
        DCDB::TimeStamp start(startTsInt), end(endTsInt);
Alessio Netti's avatar
Alessio Netti committed
161
        sensor.query(results, start, end, DCDB::AGGREGATE_NONE, 3600000000000);
162
163
        if(results.empty()) {
            --queryEngine.access;
164
            return false;
165
        }
166
167
168
169
        reading_t reading;
        for (const auto &r : results) {
            reading.value = r.value;
            reading.timestamp = r.timeStamp.getRaw();
170
            buffer.push_back(reading);
171
172
173
        }
    }
    catch(const std::exception& e) {
174
        --queryEngine.access;
175
        return false;
176
    }
177
    --queryEngine.access;
178
    return true;
179
180
}

181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
bool metadataQueryCallback(const string& name, SensorMetadata& buffer) {
    // Returning NULL if the query engine is being updated
    if(queryEngine.updating.load()) return false;
    ++queryEngine.access;
    std::string topic=name;
    // Getting the topic of the queried sensor from the Navigator
    // If not found, we try to use the input name as topic
    try {
        topic = queryEngine.getNavigator()->getNodeTopic(name);
    } catch(const std::domain_error& e) {}
    
    if(metadataStore->getMap().count(topic)) {
        buffer = metadataStore->get(topic);
    } else {
        // If we are here then the sensor was not found in the cache - we need to fetch data from Cassandra
        try {
            DCDB::PublicSensor publicSensor;
            if (mySensorConfig->getPublicSensorByName(publicSensor, topic.c_str()) != SC_OK) {
                --queryEngine.access;
                return false;
            }
            buffer = Configuration::publicSensorToMetadata(publicSensor);
        }
        catch (const std::exception &e) {
            --queryEngine.access;
            return false;
        }
    }
    --queryEngine.access;
    return true;
}

213
/* Normal termination (SIGINT, CTRL+C) */
214
215
void sigHandler(int sig)
{
Alessio Netti's avatar
Alessio Netti committed
216
217
218
219
220
  boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
  if( sig == SIGINT )
      LOG(fatal) << "Received SIGINT";
  else if( sig == SIGTERM )
      LOG(fatal) << "Received SIGTERM";
221
222
223
  keepRunning = 0;
}

224
225
226
227
228
229
/* Crash */
void abrtHandler(int sig)
{
  abrt(EXIT_FAILURE, SIGNAL);
}

230
int mqttCallback(SimpleMQTTMessage *msg)
231
232
233
234
{
  /*
   * Increment the msgCtr/vmsgCtr for statistics.
   */
235
  msgCtr++;
236
237
238
  if (msg->isPublish())
    pmsgCtr++;

239
  uint64_t len;
240
241
242
  /*
   * Decode the message and put into the database.
   */
243
  if (msg->isPublish()) {
244
      const char *topic = msg->getTopic().c_str();
245
      // We check whether the topic includes the /DCDB_MAP/ keyword, indicating that the payload will contain the
246
247
      // sensor's name. In that case, we set the mappingMessage flag to true, and filter the keyword out of the prefix
      // We use strncmp as it is the most efficient way to do it
Alessio Netti's avatar
Alessio Netti committed
248
      if (strncmp(topic, DCDB_MAP, DCDB_MAP_LEN) == 0) {
249
          if ((len = msg->getPayloadLength()) == 0) {
250
              LOG(error) << "Empty sensor publish message received!";
251
252
              return 1;
          }
253

254
          string payload((char *) msg->getPayload(), len);
255
256
257
258
259
260
261
262
263
          //If the topic includes the extended /DCDB_MAP/METADATA/ keyword, we assume a JSON metadata packet is encoded
          if(strncmp(topic, DCDB_MET, DCDB_MET_LEN) == 0) {
              SensorMetadata sm;
              try {
                  sm.parseJSON(payload);
              } catch (const std::exception &e) {
                  LOG(error) << "Invalid metadata packed received!";
                  return 1;
              }
264
              DCDB::PublicSensor ps = Configuration::metadataToPublicSensor(sm);
Alessio Netti's avatar
Alessio Netti committed
265
              err = mySensorConfig->publishSensor(ps);
266
              metadataStore->store(sm.pattern, sm);
267
268
          } else {
              err = mySensorConfig->publishSensor(payload.c_str(), topic + DCDB_MAP_LEN);
269
          }
270
271
272
273

          // PublishSensor does most of the error checking for us
          switch (err) {
              case DCDB::SC_INVALIDPATTERN:
Alessio Netti's avatar
Logging    
Alessio Netti committed
274
                  LOG(error) << "Invalid sensor topic : " << msg->getTopic();
275
276
                  return 1;
              case DCDB::SC_INVALIDPUBLICNAME:
277
                  LOG(error) << "Invalid sensor public name.";
278
279
                  return 1;
              case DCDB::SC_INVALIDSESSION:
Alessio Netti's avatar
Logging    
Alessio Netti committed
280
                  LOG(error) << "Cannot reach sensor data store.";
281
282
283
284
                  return 1;
              default:
                  break;
          }
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
      } else if (strncmp(topic, DCDB_CALIEVT, DCDB_CALIEVT_LEN) == 0) {
          /*
           * Special message case. This message contains a Caliper Event data
           * string that is encoded in the MQTT topic. Its payload consists of
           * usual timestamp-value pairs.
           * Data from this messages will be stored in a separate table managed
           * by CaliEvtDataStore class.
           */
          std::string topicStr(msg->getTopic());
          mqttPayload buf, *payload;

          len = msg->getPayloadLength();
          //TODO support case that no timestamp is given?
          //retrieve timestamp and value from the payload
          if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
              payload = (mqttPayload *) msg->getPayload();
          } else {
              //this message is malformed -> ignore
              LOG(error) << "Message malformed";
              return 1;
          }

          /*
           * Decode message topic in actual sensor topic and string data.
           * "/:/" is used as delimiter between topic and data.
           */
          topicStr.erase(0, DCDB_CALIEVT_LEN);
          size_t pos = topicStr.find("/:/");
          if (pos == std::string::npos) {
              // topic is malformed -> ignore
              LOG(error) << "CaliEvt topic malformed";
              return 1;
          }
          const std::string data(topicStr, pos+3);
          topicStr.erase(pos);

          /*
           * We use the same MQTT-topic/SensorId infrastructure as actual sensor
           * data readings to sort related events.
           * Check if we can decode the event topic into a valid SensorId. If
           * successful, store the record in the database.
           */
          DCDB::SensorId sid;
          if (sid.mqttTopicConvert(topicStr)) {
329
              //std::list<DCDB::CaliEvtData> events;
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
              DCDB::CaliEvtData e;
              e.eventId   = sid;
              e.event     = data;
              for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
                  e.timeStamp = payload[i].timestamp;

                  /**
                   * We want an exhaustive list of all events ordered by their
                   * time of occurrence. Payload values should always be
                   * one. Other values currently indicate a malformed message.
                   *
                   * In the future, the value field could be used to aggregate
                   * multiple equal events that occurred in the same plugin
                   * read cycle.
                   */
                  if(payload[i].value != 1) {
                      LOG(error) << "CaliEvt message malformed. Value != 1";
                      return 1;
                  }

350
351
                  myCaliEvtDataStore->insert(e, metadataStore->getTTL(topicStr));
                  //events.push_back(e);
352
              }
353
              //myCaliEvtDataStore->insertBatch(events, metadataStore->getTTL(topicStr));
354
355
356
          } else {
              LOG(error) << "Topic could not be converted to SID";
          }
357
358
359
360
361
362
363
      } else {
          mqttPayload buf, *payload;

          len = msg->getPayloadLength();
          //In the 64 bit message case, the collect agent provides a timestamp
          if (len == sizeof(uint64_t)) {
              payload = &buf;
364
              payload->value = *((int64_t *) msg->getPayload());
365
366
              payload->timestamp = Messaging::calculateTimestamp();
              len = sizeof(uint64_t) * 2;
367
          }
368
369
370
371
372
373
              //...otherwise it just retrieves it from the MQTT message payload.
          else if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
              payload = (mqttPayload *) msg->getPayload();
          }
              //...otherwise this message is malformed -> ignore...
          else {
Alessio Netti's avatar
Logging    
Alessio Netti committed
374
              LOG(error) << "Message malformed";
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
              return 1;
          }

          /*
           * Check if we can decode the message topic
           * into a valid SensorId. If successful, store
           * the record in the database.
           */
          DCDB::SensorId sid;
          if (sid.mqttTopicConvert(msg->getTopic())) {
#if 0
              cout << "Topic decode successful:" << endl
                  << "  Raw:            " << hex << setw(16) << setfill('0') << sid.getRaw()[0] << hex << setw(16) << setfill('0') << sid.getRaw()[1] << endl
                  << "  DeviceLocation: " << hex << setw(16) << setfill('0') << sid.getDeviceLocation() << endl
                  << "  device_id:      " << hex << setw(8) << setfill('0') << sid.getDeviceSensorId().device_id << endl
                  << "  sensor_number:  " << hex << setw(4) << setfill('0') << sid.getDeviceSensorId().sensor_number << endl << dec;

              cout << "Payload ("  << len/sizeof(mqttPayload) << " messages):"<< endl;
              for (uint64_t i=0; i<len/sizeof(mqttPayload); i++) {
                cout << "  " << i << ": ts=" << payload[i].timestamp << " val=" << payload[i].value << endl;
              }
              cout << endl;
397
#endif
398
	      std::list<DCDB::SensorDataStoreReading> readings;
Alessio Netti's avatar
Alessio Netti committed
399
400
401
402
403
404
          for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
              DCDB::SensorDataStoreReading r(sid, payload[i].timestamp, payload[i].value);
              readings.push_back(r);
              mySensorCache.storeSensor(sid, payload[i].timestamp, payload[i].value);
          }
          mySensorCache.getSensorMap()[sid].updateBatchSize(uint64_t(len / sizeof(mqttPayload)));
405
	      mySensorDataStore->insertBatch(readings, metadataStore->getTTL(msg->getTopic()));
406
	      readingCtr+= readings.size();
407

408
              //mySensorCache.dump();
409
          }
410
#if 1
411
412
413
414
          else {
              cout << "Wrong topic format: " << msg->getTopic() << "\n";
              return 1;
          }
415
#endif
416
      }
417
  }
418
  return 0;
419
420
}

421
422
423



424
425
426
427
/*
 * Print usage information
 */
void usage() {
Alessio Netti's avatar
Alessio Netti committed
428
  Configuration config("", "collectagent.conf");
429
430
431
432
433
  /*
             1         2         3         4         5         6         7         8
   012345678901234567890123456789012345678901234567890123456789012345678901234567890
   */
  cout << "Usage:" << endl;
434
  cout << "  collectagent [-d] [-s] [-x] [-a] [-m<host>] [-c<host>] [-u<username>] [-p<password>] [-t<ttl>] [-v<verbosity>] <path/to/configfiles/>" << endl;
435
436
437
438
  cout << "  collectagent -h" << endl;
  cout << endl;
  
  cout << "Options:" << endl;
Alessio Netti's avatar
Alessio Netti committed
439
440
  cout << "  -m<host>      MQTT listen address     [default: " << config.mqttListenHost << ":" << config.mqttListenPort << "]" << endl;
  cout << "  -c<host>      Cassandra host          [default: " << config.cassandraSettings.host << ":" << config.cassandraSettings.port << "]" << endl;
Michael Ott's avatar
Michael Ott committed
441
442
  cout << "  -u<username>  Cassandra username      [default: none]" << endl;
  cout << "  -p<password>  Cassandra password      [default: none]" << endl;
Alessio Netti's avatar
Alessio Netti committed
443
444
  cout << "  -t<ttl>       Cassandra insert TTL    [default: " << config.cassandraSettings.ttl << "]" << endl;
  cout << "  -v<level>     Set verbosity of output [default: " << config.logLevelCmd << "]" << endl
Alessio Netti's avatar
Logging    
Alessio Netti committed
445
       << "                Can be a number between 5 (all) and 0 (fatal)." << endl;
446
  cout << endl;
Michael Ott's avatar
Michael Ott committed
447
  cout << "  -d            Daemonize" << endl;
448
  cout << "  -s            Print message stats" <<endl;
449
  cout << "  -x            Parse and print the config but do not actually start collectagent" << endl;
450
  cout << "  -a			   Enable sensor auto-publish" << endl;
Michael Ott's avatar
Michael Ott committed
451
  cout << "  -h            This help page" << endl;
452
  cout << endl;
453
454
455
}

int main(int argc, char* const argv[]) {
456
    cout << "CollectAgent " << VERSION << " (libdcdb " << DCDB::Version::getVersion() << ")" << endl << endl;
457
458

  try{
459

460
461
462
463
464
465
466
467
      // Checking if path to config is supplied
      if (argc <= 1) {
          cout << "Please specify a path to the config-directory" << endl << endl;
          usage();
          exit(EXIT_FAILURE);
      }

      // Defining options
468
      const char* opts = "m:r:c:C:u:p:t:v:dDsaxh";
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484

      // Same mechanism as in DCDBPusher - checking if help string is requested before loading config
      char ret;
      while ((ret = getopt(argc, argv, opts)) != -1) {
          switch (ret)
          {
              case 'h':
                  usage();
                  exit(EXIT_FAILURE);
                  break;
              default:
                  //do nothing (other options are read later on)
                  break;
          }
      }

485
486
      initLogging();
      auto cmdSink = setupCmdLogger();
Alessio Netti's avatar
Logging    
Alessio Netti committed
487

Alessio Netti's avatar
Alessio Netti committed
488
489
      Configuration config(argv[argc - 1], "collectagent.conf");
      if( !config.readConfig() ) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
490
          LOG(fatal) << "Failed to read global configuration!";
491
492
493
          exit(EXIT_FAILURE);
      }

Alessio Netti's avatar
Alessio Netti committed
494
495
496
497
      // References to shorten access to config parameters
      Configuration& settings = config;
      cassandra_t& cassandraSettings = config.cassandraSettings;
      pluginSettings_t& pluginSettings = config.pluginSettings;
Micha Mueller's avatar
Micha Mueller committed
498
      serverSettings_t& restAPISettings = config.restAPISettings;
Alessio Netti's avatar
Alessio Netti committed
499
      analyticsSettings_t& analyticsSettings = config.analyticsSettings;
Alessio Netti's avatar
Alessio Netti committed
500
      
501
502
      optind = 1;
      while ((ret=getopt(argc, argv, opts))!=-1) {
503
          switch(ret) {
504
              case 'a':
505
                  pluginSettings.autoPublish = true;
506
                  break;
507
              case 'm':
Alessio Netti's avatar
Alessio Netti committed
508
509
510
                  settings.mqttListenHost = parseNetworkHost(optarg);
                  settings.mqttListenPort = parseNetworkPort(optarg);
                  if(settings.mqttListenPort=="") settings.mqttListenPort = string(LISTENPORT);
511
                  break;
512
              case 'c':
Alessio Netti's avatar
Alessio Netti committed
513
514
515
                  cassandraSettings.host = parseNetworkHost(optarg);
                  cassandraSettings.port = parseNetworkPort(optarg);
                  if(cassandraSettings.port=="") cassandraSettings.port = string(CASSANDRAPORT);
516
                  break;
Michael Ott's avatar
Michael Ott committed
517
              case 'u':
Alessio Netti's avatar
Alessio Netti committed
518
                  cassandraSettings.username = optarg;
519
                  break;
Michael Ott's avatar
Michael Ott committed
520
              case 'p': {
Alessio Netti's avatar
Alessio Netti committed
521
                  cassandraSettings.password = optarg;
522
                  // What does this do? Mask the password?
Michael Ott's avatar
Michael Ott committed
523
524
525
526
527
528
529
                  size_t pwdLen = strlen(optarg);
                  memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
                  if (pwdLen > 3) {
                      memset(optarg+3, 0, pwdLen-3);
                  }
                  break;
              }
530
              case 't':
Alessio Netti's avatar
Alessio Netti committed
531
                  cassandraSettings.ttl = stoul(optarg);
532
                  break;
Alessio Netti's avatar
Logging    
Alessio Netti committed
533
              case 'v':
534
                  settings.logLevelCmd = stoi(optarg);
Alessio Netti's avatar
Logging    
Alessio Netti committed
535
                  break;
536
              case 'd':
537
              case 'D':
538
                  settings.daemonize = 1;
539
                  break;
540
              case 's':
541
                  settings.statistics = 1;
542
                  break;
543
              case 'x':
Alessio Netti's avatar
Alessio Netti committed
544
                  settings.validateConfig = true;
545
                  break;
546
              case 'h':
547
548
549
              default:
                  usage();
                  exit(EXIT_FAILURE);
550
551
552
          }
      }

553
554
      //set up logger to file
      if (settings.logLevelFile >= 0) {
Alessio Netti's avatar
Alessio Netti committed
555
	  auto fileSink = setupFileLogger(pluginSettings.tempdir, std::string("collectagent"));
556
557
558
	  fileSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelFile));
      }
      
Alessio Netti's avatar
Logging    
Alessio Netti committed
559
      //severity level may be overwritten (per option or config-file) --> set it according to globalSettings
560
561
562
      if (settings.logLevelCmd >= 0) {
	  cmdSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelCmd));
      }
Alessio Netti's avatar
Logging    
Alessio Netti committed
563

564
      /*
Alessio Netti's avatar
Alessio Netti committed
565
       * Catch SIGINT and SIGTERM signals to allow for proper server shutdowns.
566
567
       */
      signal(SIGINT, sigHandler);
Alessio Netti's avatar
Alessio Netti committed
568
      signal(SIGTERM, sigHandler);
569
570
571
572
573
574
575
576
577
578

      /*
       * Catch critical signals to allow for backtraces
       */
      signal(SIGABRT, abrtHandler);
      signal(SIGSEGV, abrtHandler);

      // Daemonizing the collectagent
      if(settings.daemonize)
          dcdbdaemon();
Alessio Netti's avatar
Alessio Netti committed
579
      
580
      // Setting the size of the sensor cache
581
      // Conversion from milliseconds to nanoseconds
Alessio Netti's avatar
Alessio Netti committed
582
      mySensorCache.setMaxHistory(uint64_t(pluginSettings.cacheInterval) * 1000000);
583

584
      //Allocate and initialize connection to Cassandra.
Alessio Netti's avatar
Alessio Netti committed
585
586
      dcdbConn = new DCDB::Connection(cassandraSettings.host, atoi(cassandraSettings.port.c_str()), 
                                      cassandraSettings.username, cassandraSettings.password);
Alessio Netti's avatar
Alessio Netti committed
587
588
      dcdbConn->setNumThreadsIo(cassandraSettings.numThreadsIo);
      dcdbConn->setQueueSizeIo(cassandraSettings.queueSizeIo);
589
      uint32_t params[1] = {cassandraSettings.coreConnPerHost};
Alessio Netti's avatar
Alessio Netti committed
590
      dcdbConn->setBackendParams(params);
Alessio Netti's avatar
Alessio Netti committed
591
      
Axel Auweter's avatar
Axel Auweter committed
592
      if (!dcdbConn->connect()) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
593
          LOG(fatal) << "Cannot connect to Cassandra!";
594
595
596
597
598
599
          exit(EXIT_FAILURE);
      }

      /*
       * Legacy behavior: Initialize the DCDB schema in Cassandra.
       */
Axel Auweter's avatar
Axel Auweter committed
600
      dcdbConn->initSchema();
Alessio Netti's avatar
Alessio Netti committed
601
      
602
603
604
      /*
       * Allocate the SensorDataStore.
       */
605
      mySensorDataStore = new DCDB::SensorDataStore(dcdbConn);
606
      mySensorConfig = new DCDB::SensorConfig(dcdbConn);
Alessio Netti's avatar
Alessio Netti committed
607
      myJobDataStore = new DCDB::JobDataStore(dcdbConn);
608
      myCaliEvtDataStore = new DCDB::CaliEvtDataStore(dcdbConn);
609
610
611

      /*
       * Set TTL for data store inserts if TTL > 0.
612
       */
613
      if (cassandraSettings.ttl > 0) {
Alessio Netti's avatar
Alessio Netti committed
614
        mySensorDataStore->setTTL(cassandraSettings.ttl);
615
616
        myCaliEvtDataStore->setTTL(cassandraSettings.ttl);
      }
Alessio Netti's avatar
Alessio Netti committed
617
      mySensorDataStore->setDebugLog(cassandraSettings.debugLog);
618
      myCaliEvtDataStore->setDebugLog(cassandraSettings.debugLog);
619

620
621
622
      // Fetching public sensor information from the Cassandra datastore
      list<DCDB::PublicSensor> publicSensors;
      metadataStore = new MetadataStore();
623
624
      if(mySensorConfig->getPublicSensorsVerbose(publicSensors)!=SC_OK)
          LOG(error) << "Failed to retrieve public sensors. Metadata Store and Sensor Navigator will be empty.";
625
      SensorMetadata sBuf;
626
      for (const auto &s : publicSensors)
627
628
629
630
          if (!s.is_virtual) {
              sBuf = Configuration::publicSensorToMetadata(s);
              metadataStore->store(sBuf.pattern, sBuf);
          }
631
          
632
633
      analyticsController = new AnalyticsController(mySensorConfig, mySensorDataStore);
      analyticsController->setCache(&mySensorCache);
634
      analyticsController->setMetadataStore(metadataStore);
Alessio Netti's avatar
Alessio Netti committed
635
      queryEngine.setFilter(analyticsSettings.filter);
Alessio Netti's avatar
Alessio Netti committed
636
      queryEngine.setJobFilter(analyticsSettings.jobFilter);
Alessio Netti's avatar
Alessio Netti committed
637
      queryEngine.setSensorHierarchy(analyticsSettings.hierarchy);
638
      queryEngine.setQueryCallback(sensorQueryCallback);
639
      queryEngine.setMetadataQueryCallback(metadataQueryCallback);
Alessio Netti's avatar
Alessio Netti committed
640
      queryEngine.setJobQueryCallback(jobQueryCallback);
641
      if(!analyticsController->initialize(settings, argv[argc - 1]))
642
643
          return EXIT_FAILURE;
      
Alessio Netti's avatar
Alessio Netti committed
644
      LOG_LEVEL vLogLevel = settings.validateConfig ? LOG_LEVEL::info : LOG_LEVEL::debug;
645
646
647
648
      LOG_VAR(vLogLevel) << "-----  Configuration  -----";

      //print global settings in either case
      LOG(info) << "Global Settings:";
Alessio Netti's avatar
Alessio Netti committed
649
      LOG(info) << "    MQTT-listenAddress: " << settings.mqttListenHost << ":" << settings.mqttListenPort;
Alessio Netti's avatar
Alessio Netti committed
650
      LOG(info) << "    CacheInterval:      " << int(pluginSettings.cacheInterval/1000) << " [s]";
651
652
653
654
655
      LOG(info) << "    CleaningInterval:   " << settings.cleaningInterval << " [s]";
      LOG(info) << "    MessageThreads:     " << settings.messageThreads;
      LOG(info) << "    MessageSlots:       " << settings.messageSlots;
      LOG(info) << "    Daemonize:          " << (settings.daemonize ? "Enabled" : "Disabled");
      LOG(info) << "    Statistics:         " << (settings.statistics ? "Enabled" : "Disabled");
Alessio Netti's avatar
Alessio Netti committed
656
      LOG(info) << "    MQTT-prefix:        " << pluginSettings.mqttPrefix;
657
      LOG(info) << "    Auto-publish:       " << (pluginSettings.autoPublish ? "Enabled" : "Disabled");
Alessio Netti's avatar
Alessio Netti committed
658
659
      LOG(info) << "    Write-Dir:          " << pluginSettings.tempdir;
      LOG(info) << (settings.validateConfig ? "    Only validating config files." : "    ValidateConfig:     Disabled");
660

Alessio Netti's avatar
Alessio Netti committed
661
662
      LOG(info) << "Analytics Settings:";
      LOG(info) << "    Hierarchy:          " << (analyticsSettings.hierarchy!="" ? analyticsSettings.hierarchy : "none");
663
      LOG(info) << "    Filter:             " << (analyticsSettings.filter!="" ? analyticsSettings.filter : "none");
Alessio Netti's avatar
Alessio Netti committed
664
      
665
      LOG(info) << "Cassandra Driver Settings:";
Alessio Netti's avatar
Alessio Netti committed
666
      LOG(info) << "    Address:            " << cassandraSettings.host << ":" << cassandraSettings.port;
Alessio Netti's avatar
Alessio Netti committed
667
668
669
670
671
      LOG(info) << "    TTL:                " << cassandraSettings.ttl;
      LOG(info) << "    NumThreadsIO:       " << cassandraSettings.numThreadsIo;
      LOG(info) << "    QueueSizeIO:        " << cassandraSettings.queueSizeIo;
      LOG(info) << "    CoreConnPerHost:    " << cassandraSettings.coreConnPerHost;
      LOG(info) << "    DebugLog:           " << (cassandraSettings.debugLog ? "Enabled" : "Disabled");
672
#ifdef SimpleMQTTVerbose
Alessio Netti's avatar
Alessio Netti committed
673
674
      LOG(info) << "    Username:           " << cassandraSettings.username;
	  LOG(info) << "    Password:           " << cassandraSettings.password;
675
676
677
678
#else
      LOG(info) << "    Username and password not printed.";
#endif

679
680
681
682
683
      if (restAPISettings.enabled) {
	  
	  LOG(info) << "RestAPI Settings:";
	  LOG(info) << "    REST Server: " << restAPISettings.host << ":" << restAPISettings.port;
	  LOG(info) << "    Certificate: " << restAPISettings.certificate;
Alessio Netti's avatar
Alessio Netti committed
684
685
	  LOG(info) << "    Private key file: " << restAPISettings.privateKey;
	  LOG(info) << "    DH params from: " << restAPISettings.dhFile;
686
      }
687
688
      LOG_VAR(vLogLevel) << "-----  Analytics Configuration  -----";
      for(auto& p : analyticsController->getManager()->getPlugins()) {
689
          LOG_VAR(vLogLevel) << "Operator Plugin \"" << p.id << "\"";
690
691
692
693
          p.configurator->printConfig(vLogLevel);
      }
      LOG_VAR(vLogLevel) << "-----  End Configuration  -----";

Alessio Netti's avatar
Alessio Netti committed
694
      if (settings.validateConfig)
695
696
697
698
          return EXIT_SUCCESS;
      else
          analyticsController->start();

699
700
701
      /*
       * Start the MQTT Message Server.
       */
Alessio Netti's avatar
Alessio Netti committed
702
      SimpleMQTTServer ms(settings.mqttListenHost, settings.mqttListenPort, settings.messageThreads, settings.messageSlots);
703
      
704
705
706
      ms.setMessageCallback(mqttCallback);
      ms.start();

Alessio Netti's avatar
Logging    
Alessio Netti committed
707
      LOG(info) << "MQTT Server running...";
708
      
709
710
711
      /*
       * Start the HTTP Server for the REST API
       */
712
      CARestAPI* httpsServer = nullptr;
713
      if (restAPISettings.enabled) {
714
715
716
717
          httpsServer = new CARestAPI(restAPISettings, &mySensorCache, analyticsController);
          config.readRestAPIUsers(httpsServer);
          httpsServer->start();
          LOG(info) <<  "HTTP Server running...";
718
      }
719

720
721
722
723
724
725
      /*
       * Run (hopefully) forever...
       */
      keepRunning = 1;
      timeval start, end;
      double elapsed;
726
727
728
      msgCtr = 0;
      pmsgCtr = 0;
      readingCtr = 0;
729

Alessio Netti's avatar
Alessio Netti committed
730
731
732
      gettimeofday(&start, NULL);
      uint64_t lastCleanup = start.tv_sec;

733
      LOG(info) << "Collect Agent running...";
734
735
      while(keepRunning) {
          gettimeofday(&start, NULL);
Alessio Netti's avatar
Alessio Netti committed
736
737
738
739
740
741
742
          if(start.tv_sec - lastCleanup > settings.cleaningInterval) {
              uint64_t purged = mySensorCache.clean(settings.cleaningInterval * 1000000000);
              lastCleanup = start.tv_sec;
              if(purged > 0)
                  LOG(info) << "Cache: purged " << purged << " obsolete entries";
          }

743
          sleep(60);
744
745
746
747
          /* not really thread safe but will do the job */
          gettimeofday(&end, NULL);
          elapsed = (end.tv_sec - start.tv_sec) * 1000.0;
          elapsed += (end.tv_usec - start.tv_usec) / 1000.0;
748
          float publish = msgCtr?(pmsgCtr*100.0)/msgCtr:0;
Alessio Netti's avatar
Alessio Netti committed
749
          if (settings.statistics && keepRunning) {
750
              LOG(info) << "Performance: " << (readingCtr/elapsed)*1000.0 << " inserts/s, " << (msgCtr/elapsed)*1000.0 << " messages/s (" << publish << "% PUBLISH)";
751
              LOG(info) << "Analytics Performance: " << (analyticsController->getReadingCtr()/elapsed)*1000.0 << " inserts/s ";
752
          }
753
          msgCtr = 0;
754
          pmsgCtr = 0;
755
	  readingCtr = 0;
756
757
      }

Alessio Netti's avatar
Logging    
Alessio Netti committed
758
      LOG(info) << "Stopping...";
759
      analyticsController->stop();
760
      ms.stop();
Alessio Netti's avatar
Logging    
Alessio Netti committed
761
      LOG(info) << "MQTT Server stopped...";
762
763
764
765
766
      if (restAPISettings.enabled) {
	  httpsServer->stop();
	  delete httpsServer;
	  LOG(info) << "HTTP Server stopped...";
      }
767
      delete mySensorDataStore;
Alessio Netti's avatar
Alessio Netti committed
768
      delete myJobDataStore;
769
      delete mySensorConfig;
770
      delete myCaliEvtDataStore;
Axel Auweter's avatar
Axel Auweter committed
771
772
      dcdbConn->disconnect();
      delete dcdbConn;
773
      delete metadataStore;
Alessio Netti's avatar
Logging    
Alessio Netti committed
774
      LOG(info) << "Collect Agent closed. Bye bye...";
775
  }
776
  catch (const exception& e) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
777
      LOG(fatal) << "Exception: " << e.what();
778
      abrt(EXIT_FAILURE, INTERR);
779
780
  }

781
  return EXIT_SUCCESS;
782
}
783
784