Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

collectagent.cpp 32.9 KB
Newer Older
1
2
//================================================================================
// Name        : collectagent.cpp
3
// Author      : Axel Auweter
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
// Copyright   : Leibniz Supercomputing Centre
6
// Description : Main code of the CollectAgent
7
8
9
10
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
11
// Copyright (C) 2011-2019 Leibniz Supercomputing Centre
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
27

28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
 * @defgroup ca Collect Agent
 *
 * @brief MQTT message broker in between pusher and storage backend.
 *
 * @details Collect Agent is a intermediary between one or multiple Pusher
 *          instances and one storage backend. It runs a reduced custom MQTT
 *          message server. Collect Agent receives data from Pusher
 *          via MQTT messages and stores them in the storage via libdcdb.
 */

/**
 * @file collectagent.cpp
 *
 * @brief Main function for the DCDB Collect Agent.
 *
 * @ingroup ca
 */

47
#include <cstdlib>
48
#include <signal.h>
49
50
#include <unistd.h>
#include <string>
51

52
#include <boost/date_time/posix_time/posix_time.hpp>
Micha Müller's avatar
Micha Müller committed
53
54
55
#include <boost/foreach.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>
56

57
#include <dcdb/connection.h>
58
#include <dcdb/sensordatastore.h>
Alessio Netti's avatar
Alessio Netti committed
59
#include <dcdb/jobdatastore.h>
60
#include <dcdb/calievtdatastore.h>
61
#include <dcdb/sensorconfig.h>
62
#include <dcdb/version.h>
63
#include <dcdb/sensor.h>
64
#include "version.h"
65

66
#include "CARestAPI.h"
67
#include "configuration.h"
68
#include "simplemqttserver.h"
69
#include "messaging.h"
70
#include "abrt.h"
71
#include "dcdbdaemon.h"
72
#include "sensorcache.h"
73
74
#include "analyticscontroller.h"
#include "../analytics/includes/QueryEngine.h"
75

76
77
78
#define __STDC_FORMAT_MACROS
#include <inttypes.h>

79
80
81
82
83
84
85
/**
 * Uncomment and recompile to activate CollectAgent's special benchmark mode.
 * In this mode, all received messages will be discarded and no data is stored
 * in the storage backend.
 */
//#define BENCHMARK_MODE

86
using namespace std;
87
88

int keepRunning;
89
bool statistics;
90
91
uint64_t msgCtr;
uint64_t pmsgCtr;
92
uint64_t readingCtr;
Alessio Netti's avatar
Alessio Netti committed
93
SensorCache mySensorCache;
94
AnalyticsController* analyticsController;
95
DCDB::Connection* dcdbConn;
96
DCDB::SensorDataStore *mySensorDataStore;
Alessio Netti's avatar
Alessio Netti committed
97
DCDB::JobDataStore *myJobDataStore;
98
DCDB::SensorConfig *mySensorConfig;
99
DCDB::CaliEvtDataStore *myCaliEvtDataStore;
100
MetadataStore *metadataStore;
101
DCDB::SCError err;
102
QueryEngine& queryEngine = QueryEngine::getInstance();
103
logger_t lg;
104

105
bool jobQueryCallback(const string& jobId, const uint64_t startTs, const uint64_t endTs, vector<qeJobData>& buffer, const bool rel, const bool range) {
Alessio Netti's avatar
Alessio Netti committed
106
107
108
109
110
111
112
113
114
115
    std::list<JobData> tempList;
    JobData   tempData;
    qeJobData tempQeData;
    JDError err;
    if(range) {
        // Getting a list of jobs in the given time range
        uint64_t now = getTimestamp();
        uint64_t startTsInt = rel ? now - startTs : startTs;
        uint64_t endTsInt = rel ? now - endTs : endTs;
        DCDB::TimeStamp start(startTsInt), end(endTsInt);
116
        err = myJobDataStore->getJobsInIntervalRunning(tempList, start, end);
117
        if(err != JD_OK) return false;
Alessio Netti's avatar
Alessio Netti committed
118
119
120
    } else {
        // Getting a single job by id
        err = myJobDataStore->getJobById(tempData, jobId);
121
        if(err != JD_OK) return false;
Alessio Netti's avatar
Alessio Netti committed
122
123
124
125
126
127
128
129
130
        tempList.push_back(tempData);
    }
    
    for(auto& jd : tempList) {
        tempQeData.jobId = jd.jobId;
        tempQeData.userId = jd.userId;
        tempQeData.startTime = jd.startTime.getRaw();
        tempQeData.endTime = jd.endTime.getRaw();
        tempQeData.nodes = jd.nodes;
131
        buffer.push_back(tempQeData);
Alessio Netti's avatar
Alessio Netti committed
132
    }
133
    return true;
Alessio Netti's avatar
Alessio Netti committed
134
135
}

136
bool sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>& buffer, const bool rel) {
137
138
139
    // Returning NULL if the query engine is being updated
    if(queryEngine.updating.load()) return false;
    ++queryEngine.access;
140
    std::string topic=name;
Alessio Netti's avatar
Alessio Netti committed
141
    // Getting the topic of the queried sensor from the Navigator
142
    // If not found, we try to use the input name as topic
143
144
    try {
        topic = queryEngine.getNavigator()->getNodeTopic(name);
145
    } catch(const std::domain_error& e) {}
146
    DCDB::SensorId sid;
Alessio Netti's avatar
Alessio Netti committed
147
    // Creating a SID to perform the query
148
149
    if(!sid.mqttTopicConvert(topic)) {
        --queryEngine.access;
150
        return false;
151
    }
152
153
    if(mySensorCache.getSensorMap().count(sid) > 0) {
        CacheEntry &entry = mySensorCache.getSensorMap()[sid];
154
155
        if (entry.getView(startTs, endTs, buffer, rel)) {
            --queryEngine.access;
156
            return true;
157
        }
158
159
160
161
162
163
164
165
166
167
168
169
170
    }
    // If we are here then the sensor was not found in the cache - we need to fetch data from Cassandra
    try {
        DCDB::PublicSensor publicSensor;
        publicSensor.name = name;
        publicSensor.pattern = topic;
        std::list <DCDB::SensorDataStoreReading> results;
        DCDB::Sensor sensor(dcdbConn, publicSensor);
        uint64_t now = getTimestamp();
        //Converting relative timestamps to absolute
        uint64_t startTsInt = rel ? now - startTs : startTs;
        uint64_t endTsInt = rel ? now - endTs : endTs;
        DCDB::TimeStamp start(startTsInt), end(endTsInt);
Alessio Netti's avatar
Alessio Netti committed
171
        sensor.query(results, start, end, DCDB::AGGREGATE_NONE, 3600000000000);
172
173
        if(results.empty()) {
            --queryEngine.access;
174
            return false;
175
        }
176
177
178
179
        reading_t reading;
        for (const auto &r : results) {
            reading.value = r.value;
            reading.timestamp = r.timeStamp.getRaw();
180
            buffer.push_back(reading);
181
182
183
        }
    }
    catch(const std::exception& e) {
184
        --queryEngine.access;
185
        return false;
186
    }
187
    --queryEngine.access;
188
    return true;
189
190
}

191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
bool metadataQueryCallback(const string& name, SensorMetadata& buffer) {
    // Returning NULL if the query engine is being updated
    if(queryEngine.updating.load()) return false;
    ++queryEngine.access;
    std::string topic=name;
    // Getting the topic of the queried sensor from the Navigator
    // If not found, we try to use the input name as topic
    try {
        topic = queryEngine.getNavigator()->getNodeTopic(name);
    } catch(const std::domain_error& e) {}
    
    if(metadataStore->getMap().count(topic)) {
        buffer = metadataStore->get(topic);
    } else {
        // If we are here then the sensor was not found in the cache - we need to fetch data from Cassandra
        try {
            DCDB::PublicSensor publicSensor;
            if (mySensorConfig->getPublicSensorByName(publicSensor, topic.c_str()) != SC_OK) {
                --queryEngine.access;
                return false;
            }
            buffer = Configuration::publicSensorToMetadata(publicSensor);
        }
        catch (const std::exception &e) {
            --queryEngine.access;
            return false;
        }
    }
    --queryEngine.access;
    return true;
}

223
/* Normal termination (SIGINT, CTRL+C) */
224
225
void sigHandler(int sig)
{
Alessio Netti's avatar
Alessio Netti committed
226
227
228
229
230
  boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
  if( sig == SIGINT )
      LOG(fatal) << "Received SIGINT";
  else if( sig == SIGTERM )
      LOG(fatal) << "Received SIGTERM";
231
232
233
  keepRunning = 0;
}

234
235
236
237
238
239
/* Crash */
void abrtHandler(int sig)
{
  abrt(EXIT_FAILURE, SIGNAL);
}

240
int mqttCallback(SimpleMQTTMessage *msg)
241
242
243
244
{
  /*
   * Increment the msgCtr/vmsgCtr for statistics.
   */
245
  msgCtr++;
246
247
248
  if (msg->isPublish())
    pmsgCtr++;

249
250
#ifndef BENCHMARK_MODE

251
  uint64_t len;
252
253
254
  /*
   * Decode the message and put into the database.
   */
255
  if (msg->isPublish()) {
256
      const char *topic = msg->getTopic().c_str();
257
      // We check whether the topic includes the /DCDB_MAP/ keyword, indicating that the payload will contain the
258
259
      // sensor's name. In that case, we set the mappingMessage flag to true, and filter the keyword out of the prefix
      // We use strncmp as it is the most efficient way to do it
Alessio Netti's avatar
Alessio Netti committed
260
      if (strncmp(topic, DCDB_MAP, DCDB_MAP_LEN) == 0) {
261
          if ((len = msg->getPayloadLength()) == 0) {
262
              LOG(error) << "Empty sensor publish message received!";
263
264
              return 1;
          }
265

266
          string payload((char *) msg->getPayload(), len);
267
268
269
270
271
272
273
274
275
          //If the topic includes the extended /DCDB_MAP/METADATA/ keyword, we assume a JSON metadata packet is encoded
          if(strncmp(topic, DCDB_MET, DCDB_MET_LEN) == 0) {
              SensorMetadata sm;
              try {
                  sm.parseJSON(payload);
              } catch (const std::exception &e) {
                  LOG(error) << "Invalid metadata packed received!";
                  return 1;
              }
276
              DCDB::PublicSensor ps = Configuration::metadataToPublicSensor(sm);
Alessio Netti's avatar
Alessio Netti committed
277
              err = mySensorConfig->publishSensor(ps);
278
              metadataStore->store(sm.pattern, sm);
279
280
          } else {
              err = mySensorConfig->publishSensor(payload.c_str(), topic + DCDB_MAP_LEN);
281
          }
282
283
284
285

          // PublishSensor does most of the error checking for us
          switch (err) {
              case DCDB::SC_INVALIDPATTERN:
Alessio Netti's avatar
Logging    
Alessio Netti committed
286
                  LOG(error) << "Invalid sensor topic : " << msg->getTopic();
287
288
                  return 1;
              case DCDB::SC_INVALIDPUBLICNAME:
289
                  LOG(error) << "Invalid sensor public name.";
290
291
                  return 1;
              case DCDB::SC_INVALIDSESSION:
Alessio Netti's avatar
Logging    
Alessio Netti committed
292
                  LOG(error) << "Cannot reach sensor data store.";
293
294
295
296
                  return 1;
              default:
                  break;
          }
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
      } else if (strncmp(topic, DCDB_CALIEVT, DCDB_CALIEVT_LEN) == 0) {
          /*
           * Special message case. This message contains a Caliper Event data
           * string that is encoded in the MQTT topic. Its payload consists of
           * usual timestamp-value pairs.
           * Data from this messages will be stored in a separate table managed
           * by CaliEvtDataStore class.
           */
          std::string topicStr(msg->getTopic());
          mqttPayload buf, *payload;

          len = msg->getPayloadLength();
          //TODO support case that no timestamp is given?
          //retrieve timestamp and value from the payload
          if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
              payload = (mqttPayload *) msg->getPayload();
          } else {
              //this message is malformed -> ignore
              LOG(error) << "Message malformed";
              return 1;
          }

          /*
           * Decode message topic in actual sensor topic and string data.
           * "/:/" is used as delimiter between topic and data.
           */
          topicStr.erase(0, DCDB_CALIEVT_LEN);
          size_t pos = topicStr.find("/:/");
          if (pos == std::string::npos) {
              // topic is malformed -> ignore
              LOG(error) << "CaliEvt topic malformed";
              return 1;
          }
          const std::string data(topicStr, pos+3);
          topicStr.erase(pos);

          /*
           * We use the same MQTT-topic/SensorId infrastructure as actual sensor
           * data readings to sort related events.
           * Check if we can decode the event topic into a valid SensorId. If
           * successful, store the record in the database.
           */
          DCDB::SensorId sid;
          if (sid.mqttTopicConvert(topicStr)) {
341
              //std::list<DCDB::CaliEvtData> events;
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
              DCDB::CaliEvtData e;
              e.eventId   = sid;
              e.event     = data;
              for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
                  e.timeStamp = payload[i].timestamp;

                  /**
                   * We want an exhaustive list of all events ordered by their
                   * time of occurrence. Payload values should always be
                   * one. Other values currently indicate a malformed message.
                   *
                   * In the future, the value field could be used to aggregate
                   * multiple equal events that occurred in the same plugin
                   * read cycle.
                   */
                  if(payload[i].value != 1) {
                      LOG(error) << "CaliEvt message malformed. Value != 1";
                      return 1;
                  }

362
363
                  myCaliEvtDataStore->insert(e, metadataStore->getTTL(topicStr));
                  //events.push_back(e);
364
              }
365
              //myCaliEvtDataStore->insertBatch(events, metadataStore->getTTL(topicStr));
366
367
368
          } else {
              LOG(error) << "Topic could not be converted to SID";
          }
Micha Müller's avatar
Micha Müller committed
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
      } else if (strncmp(topic, DCDB_JOBDATA, DCDB_JOBDATA_LEN) == 0) {
	      /**
           * This message contains Slurm job data in JSON format. We need to
           * parse the payload and store it within the JobDataStore.
           */
	      if ((len = msg->getPayloadLength()) == 0) {
		      LOG(error) << "Empty job data message received!";
		      return 1;
	      }

	      //parse JSON into JobData object
	      string        payload((char *)msg->getPayload(), len);
	      DCDB::JobData jd;
	      try {
		      boost::property_tree::iptree config;
		      std::istringstream           str(payload);
		      boost::property_tree::read_json(str, config);
		      BOOST_FOREACH (boost::property_tree::iptree::value_type &val, config) {
			      if (boost::iequals(val.first, "jobid")) {
				      jd.jobId = val.second.data();
			      } else if (boost::iequals(val.first, "userid")) {
				      jd.userId = val.second.data();
			      } else if (boost::iequals(val.first, "starttime")) {
				      jd.startTime = DCDB::TimeStamp((uint64_t)stoull(val.second.data()));
			      } else if (boost::iequals(val.first, "endtime")) {
				      jd.endTime = DCDB::TimeStamp((uint64_t)stoull(val.second.data()));
			      } else if (boost::iequals(val.first, "nodes")) {
				      BOOST_FOREACH (boost::property_tree::iptree::value_type &node, val.second) {
					      jd.nodes.push_back(node.second.data());
				      }
			      }
		      }
	      } catch (const std::exception &e) {
		      LOG(error) << "Invalid job data packet received!";
		      return 1;
	      }

#ifdef DEBUG
	      LOG(debug) << "JobId  = " << jd.jobId;
	      LOG(debug) << "UserId = " << jd.userId;
	      LOG(debug) << "Start  = " << jd.startTime.getString();
	      LOG(debug) << "End    = " << jd.endTime.getString();
	      LOG(debug) << "Nodes: ";
	      for (const auto &n : jd.nodes) {
		      LOG(debug) << "    " << n;
	      }
#endif

	      //store JobData into Storage Backend
	      if (jd.endTime == DCDB::TimeStamp((uint64_t)0)) {
		      //starting job data
		      if (myJobDataStore->insertJob(jd) != DCDB::JD_OK) {
			      LOG(error) << "Job data insert failed!";
			      return 1;
		      }
	      } else {
		      //ending job data
		      DCDB::JobData tmp;
		      if (myJobDataStore->getJobById(tmp, jd.jobId) != DCDB::JD_OK) {
			      LOG(error) << "Could not retrieve job to be updated!";
			      return 1;
		      }

		      if (myJobDataStore->updateEndtime(tmp.jobId, tmp.startTime, jd.endTime) != DCDB::JD_OK) {
			      LOG(error) << "Could not update end time of job!";
			      return 1;
		      }
	      }
437
      } else {
Micha Müller's avatar
Micha Müller committed
438
	      mqttPayload buf, *payload;
439

Micha Müller's avatar
Micha Müller committed
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
	      len = msg->getPayloadLength();
	      //In the 64 bit message case, the collect agent provides a timestamp
	      if (len == sizeof(uint64_t)) {
		      payload = &buf;
		      payload->value = *((int64_t *)msg->getPayload());
		      payload->timestamp = Messaging::calculateTimestamp();
		      len = sizeof(uint64_t) * 2;
	      }
	      //...otherwise it just retrieves it from the MQTT message payload.
	      else if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
		      payload = (mqttPayload *)msg->getPayload();
	      }
	      //...otherwise this message is malformed -> ignore...
	      else {
		      LOG(error) << "Message malformed";
		      return 1;
	      }
457

Micha Müller's avatar
Micha Müller committed
458
	      /*
459
460
461
462
           * Check if we can decode the message topic
           * into a valid SensorId. If successful, store
           * the record in the database.
           */
Micha Müller's avatar
Micha Müller committed
463
464
	      DCDB::SensorId sid;
	      if (sid.mqttTopicConvert(msg->getTopic())) {
465
466
467
468
469
470
471
472
473
474
475
476
#if 0
              cout << "Topic decode successful:" << endl
                  << "  Raw:            " << hex << setw(16) << setfill('0') << sid.getRaw()[0] << hex << setw(16) << setfill('0') << sid.getRaw()[1] << endl
                  << "  DeviceLocation: " << hex << setw(16) << setfill('0') << sid.getDeviceLocation() << endl
                  << "  device_id:      " << hex << setw(8) << setfill('0') << sid.getDeviceSensorId().device_id << endl
                  << "  sensor_number:  " << hex << setw(4) << setfill('0') << sid.getDeviceSensorId().sensor_number << endl << dec;

              cout << "Payload ("  << len/sizeof(mqttPayload) << " messages):"<< endl;
              for (uint64_t i=0; i<len/sizeof(mqttPayload); i++) {
                cout << "  " << i << ": ts=" << payload[i].timestamp << " val=" << payload[i].value << endl;
              }
              cout << endl;
477
#endif
478
	      std::list<DCDB::SensorDataStoreReading> readings;
Alessio Netti's avatar
Alessio Netti committed
479
480
481
482
483
484
          for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
              DCDB::SensorDataStoreReading r(sid, payload[i].timestamp, payload[i].value);
              readings.push_back(r);
              mySensorCache.storeSensor(sid, payload[i].timestamp, payload[i].value);
          }
          mySensorCache.getSensorMap()[sid].updateBatchSize(uint64_t(len / sizeof(mqttPayload)));
485
	      mySensorDataStore->insertBatch(readings, metadataStore->getTTL(msg->getTopic()));
486
	      readingCtr+= readings.size();
487

488
              //mySensorCache.dump();
489
490
          } else {
              LOG(error) << "Message with empty topic received";
491
          }
492
      }
493
  }
494
#endif
495
  return 0;
496
497
}

498
499
500



501
502
503
504
/*
 * Print usage information
 */
void usage() {
Alessio Netti's avatar
Alessio Netti committed
505
  Configuration config("", "collectagent.conf");
506
507
508
509
510
  /*
             1         2         3         4         5         6         7         8
   012345678901234567890123456789012345678901234567890123456789012345678901234567890
   */
  cout << "Usage:" << endl;
511
  cout << "  collectagent [-d] [-s] [-x] [-a] [-m<host>] [-c<host>] [-u<username>] [-p<password>] [-t<ttl>] [-v<verbosity>] <path/to/configfiles/>" << endl;
512
513
514
515
  cout << "  collectagent -h" << endl;
  cout << endl;
  
  cout << "Options:" << endl;
Alessio Netti's avatar
Alessio Netti committed
516
517
  cout << "  -m<host>      MQTT listen address     [default: " << config.mqttListenHost << ":" << config.mqttListenPort << "]" << endl;
  cout << "  -c<host>      Cassandra host          [default: " << config.cassandraSettings.host << ":" << config.cassandraSettings.port << "]" << endl;
Michael Ott's avatar
Michael Ott committed
518
519
  cout << "  -u<username>  Cassandra username      [default: none]" << endl;
  cout << "  -p<password>  Cassandra password      [default: none]" << endl;
Alessio Netti's avatar
Alessio Netti committed
520
521
  cout << "  -t<ttl>       Cassandra insert TTL    [default: " << config.cassandraSettings.ttl << "]" << endl;
  cout << "  -v<level>     Set verbosity of output [default: " << config.logLevelCmd << "]" << endl
Alessio Netti's avatar
Logging    
Alessio Netti committed
522
       << "                Can be a number between 5 (all) and 0 (fatal)." << endl;
523
  cout << endl;
Michael Ott's avatar
Michael Ott committed
524
  cout << "  -d            Daemonize" << endl;
525
  cout << "  -s            Print message stats" <<endl;
526
  cout << "  -x            Parse and print the config but do not actually start collectagent" << endl;
527
  cout << "  -a			   Enable sensor auto-publish" << endl;
Michael Ott's avatar
Michael Ott committed
528
  cout << "  -h            This help page" << endl;
529
  cout << endl;
530
531
532
}

int main(int argc, char* const argv[]) {
533
    cout << "CollectAgent " << VERSION << " (libdcdb " << DCDB::Version::getVersion() << ")" << endl << endl;
534
535

  try{
536

537
538
539
540
541
542
543
544
      // Checking if path to config is supplied
      if (argc <= 1) {
          cout << "Please specify a path to the config-directory" << endl << endl;
          usage();
          exit(EXIT_FAILURE);
      }

      // Defining options
545
      const char* opts = "m:r:c:C:u:p:t:v:dDsaxh";
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561

      // Same mechanism as in DCDBPusher - checking if help string is requested before loading config
      char ret;
      while ((ret = getopt(argc, argv, opts)) != -1) {
          switch (ret)
          {
              case 'h':
                  usage();
                  exit(EXIT_FAILURE);
                  break;
              default:
                  //do nothing (other options are read later on)
                  break;
          }
      }

562
563
      initLogging();
      auto cmdSink = setupCmdLogger();
Alessio Netti's avatar
Logging    
Alessio Netti committed
564

Alessio Netti's avatar
Alessio Netti committed
565
566
      Configuration config(argv[argc - 1], "collectagent.conf");
      if( !config.readConfig() ) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
567
          LOG(fatal) << "Failed to read global configuration!";
568
569
570
          exit(EXIT_FAILURE);
      }

Alessio Netti's avatar
Alessio Netti committed
571
572
573
574
      // References to shorten access to config parameters
      Configuration& settings = config;
      cassandra_t& cassandraSettings = config.cassandraSettings;
      pluginSettings_t& pluginSettings = config.pluginSettings;
Micha Mueller's avatar
Micha Mueller committed
575
      serverSettings_t& restAPISettings = config.restAPISettings;
Alessio Netti's avatar
Alessio Netti committed
576
      analyticsSettings_t& analyticsSettings = config.analyticsSettings;
Alessio Netti's avatar
Alessio Netti committed
577
      
578
579
      optind = 1;
      while ((ret=getopt(argc, argv, opts))!=-1) {
580
          switch(ret) {
581
              case 'a':
582
                  pluginSettings.autoPublish = true;
583
                  break;
584
              case 'm':
Alessio Netti's avatar
Alessio Netti committed
585
586
587
                  settings.mqttListenHost = parseNetworkHost(optarg);
                  settings.mqttListenPort = parseNetworkPort(optarg);
                  if(settings.mqttListenPort=="") settings.mqttListenPort = string(LISTENPORT);
588
                  break;
589
              case 'c':
Alessio Netti's avatar
Alessio Netti committed
590
591
592
                  cassandraSettings.host = parseNetworkHost(optarg);
                  cassandraSettings.port = parseNetworkPort(optarg);
                  if(cassandraSettings.port=="") cassandraSettings.port = string(CASSANDRAPORT);
593
                  break;
Michael Ott's avatar
Michael Ott committed
594
              case 'u':
Alessio Netti's avatar
Alessio Netti committed
595
                  cassandraSettings.username = optarg;
596
                  break;
Michael Ott's avatar
Michael Ott committed
597
              case 'p': {
Alessio Netti's avatar
Alessio Netti committed
598
                  cassandraSettings.password = optarg;
599
                  // What does this do? Mask the password?
Michael Ott's avatar
Michael Ott committed
600
601
602
603
604
605
606
                  size_t pwdLen = strlen(optarg);
                  memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
                  if (pwdLen > 3) {
                      memset(optarg+3, 0, pwdLen-3);
                  }
                  break;
              }
607
              case 't':
Alessio Netti's avatar
Alessio Netti committed
608
                  cassandraSettings.ttl = stoul(optarg);
609
                  break;
Alessio Netti's avatar
Logging    
Alessio Netti committed
610
              case 'v':
611
                  settings.logLevelCmd = stoi(optarg);
Alessio Netti's avatar
Logging    
Alessio Netti committed
612
                  break;
613
              case 'd':
614
              case 'D':
615
                  settings.daemonize = 1;
616
                  break;
617
              case 's':
618
                  settings.statistics = 1;
619
                  break;
620
              case 'x':
Alessio Netti's avatar
Alessio Netti committed
621
                  settings.validateConfig = true;
622
                  break;
623
              case 'h':
624
625
626
              default:
                  usage();
                  exit(EXIT_FAILURE);
627
628
629
          }
      }

630
631
      //set up logger to file
      if (settings.logLevelFile >= 0) {
Alessio Netti's avatar
Alessio Netti committed
632
	  auto fileSink = setupFileLogger(pluginSettings.tempdir, std::string("collectagent"));
633
634
635
	  fileSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelFile));
      }
      
Alessio Netti's avatar
Logging    
Alessio Netti committed
636
      //severity level may be overwritten (per option or config-file) --> set it according to globalSettings
637
638
639
      if (settings.logLevelCmd >= 0) {
	  cmdSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelCmd));
      }
Alessio Netti's avatar
Logging    
Alessio Netti committed
640

641
      /*
Alessio Netti's avatar
Alessio Netti committed
642
       * Catch SIGINT and SIGTERM signals to allow for proper server shutdowns.
643
644
       */
      signal(SIGINT, sigHandler);
Alessio Netti's avatar
Alessio Netti committed
645
      signal(SIGTERM, sigHandler);
646
647
648
649
650
651
652
653
654
655

      /*
       * Catch critical signals to allow for backtraces
       */
      signal(SIGABRT, abrtHandler);
      signal(SIGSEGV, abrtHandler);

      // Daemonizing the collectagent
      if(settings.daemonize)
          dcdbdaemon();
Alessio Netti's avatar
Alessio Netti committed
656
      
657
      // Setting the size of the sensor cache
658
      // Conversion from milliseconds to nanoseconds
Alessio Netti's avatar
Alessio Netti committed
659
      mySensorCache.setMaxHistory(uint64_t(pluginSettings.cacheInterval) * 1000000);
660

661
      //Allocate and initialize connection to Cassandra.
Alessio Netti's avatar
Alessio Netti committed
662
663
      dcdbConn = new DCDB::Connection(cassandraSettings.host, atoi(cassandraSettings.port.c_str()), 
                                      cassandraSettings.username, cassandraSettings.password);
Alessio Netti's avatar
Alessio Netti committed
664
665
      dcdbConn->setNumThreadsIo(cassandraSettings.numThreadsIo);
      dcdbConn->setQueueSizeIo(cassandraSettings.queueSizeIo);
666
      uint32_t params[1] = {cassandraSettings.coreConnPerHost};
Alessio Netti's avatar
Alessio Netti committed
667
      dcdbConn->setBackendParams(params);
Alessio Netti's avatar
Alessio Netti committed
668
      
Axel Auweter's avatar
Axel Auweter committed
669
      if (!dcdbConn->connect()) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
670
          LOG(fatal) << "Cannot connect to Cassandra!";
671
672
673
674
675
676
          exit(EXIT_FAILURE);
      }

      /*
       * Legacy behavior: Initialize the DCDB schema in Cassandra.
       */
Axel Auweter's avatar
Axel Auweter committed
677
      dcdbConn->initSchema();
Alessio Netti's avatar
Alessio Netti committed
678
      
679
680
681
      /*
       * Allocate the SensorDataStore.
       */
682
      mySensorDataStore = new DCDB::SensorDataStore(dcdbConn);
683
      mySensorConfig = new DCDB::SensorConfig(dcdbConn);
Alessio Netti's avatar
Alessio Netti committed
684
      myJobDataStore = new DCDB::JobDataStore(dcdbConn);
685
      myCaliEvtDataStore = new DCDB::CaliEvtDataStore(dcdbConn);
686
687
688

      /*
       * Set TTL for data store inserts if TTL > 0.
689
       */
690
      if (cassandraSettings.ttl > 0) {
Alessio Netti's avatar
Alessio Netti committed
691
        mySensorDataStore->setTTL(cassandraSettings.ttl);
692
693
        myCaliEvtDataStore->setTTL(cassandraSettings.ttl);
      }
Alessio Netti's avatar
Alessio Netti committed
694
      mySensorDataStore->setDebugLog(cassandraSettings.debugLog);
695
      myCaliEvtDataStore->setDebugLog(cassandraSettings.debugLog);
696

697
698
699
      // Fetching public sensor information from the Cassandra datastore
      list<DCDB::PublicSensor> publicSensors;
      metadataStore = new MetadataStore();
700
701
      if(mySensorConfig->getPublicSensorsVerbose(publicSensors)!=SC_OK)
          LOG(error) << "Failed to retrieve public sensors. Metadata Store and Sensor Navigator will be empty.";
702
      SensorMetadata sBuf;
703
      for (const auto &s : publicSensors)
704
705
706
707
          if (!s.is_virtual) {
              sBuf = Configuration::publicSensorToMetadata(s);
              metadataStore->store(sBuf.pattern, sBuf);
          }
708
          
709
710
      analyticsController = new AnalyticsController(mySensorConfig, mySensorDataStore);
      analyticsController->setCache(&mySensorCache);
711
      analyticsController->setMetadataStore(metadataStore);
Alessio Netti's avatar
Alessio Netti committed
712
      queryEngine.setFilter(analyticsSettings.filter);
Alessio Netti's avatar
Alessio Netti committed
713
      queryEngine.setJobFilter(analyticsSettings.jobFilter);
Alessio Netti's avatar
Alessio Netti committed
714
      queryEngine.setSensorHierarchy(analyticsSettings.hierarchy);
715
      queryEngine.setQueryCallback(sensorQueryCallback);
716
      queryEngine.setMetadataQueryCallback(metadataQueryCallback);
Alessio Netti's avatar
Alessio Netti committed
717
      queryEngine.setJobQueryCallback(jobQueryCallback);
718
      if(!analyticsController->initialize(settings, argv[argc - 1]))
719
720
          return EXIT_FAILURE;
      
Alessio Netti's avatar
Alessio Netti committed
721
      LOG_LEVEL vLogLevel = settings.validateConfig ? LOG_LEVEL::info : LOG_LEVEL::debug;
722
723
724
725
      LOG_VAR(vLogLevel) << "-----  Configuration  -----";

      //print global settings in either case
      LOG(info) << "Global Settings:";
Alessio Netti's avatar
Alessio Netti committed
726
      LOG(info) << "    MQTT-listenAddress: " << settings.mqttListenHost << ":" << settings.mqttListenPort;
Alessio Netti's avatar
Alessio Netti committed
727
      LOG(info) << "    CacheInterval:      " << int(pluginSettings.cacheInterval/1000) << " [s]";
728
729
730
731
732
      LOG(info) << "    CleaningInterval:   " << settings.cleaningInterval << " [s]";
      LOG(info) << "    MessageThreads:     " << settings.messageThreads;
      LOG(info) << "    MessageSlots:       " << settings.messageSlots;
      LOG(info) << "    Daemonize:          " << (settings.daemonize ? "Enabled" : "Disabled");
      LOG(info) << "    Statistics:         " << (settings.statistics ? "Enabled" : "Disabled");
Alessio Netti's avatar
Alessio Netti committed
733
      LOG(info) << "    MQTT-prefix:        " << pluginSettings.mqttPrefix;
734
      LOG(info) << "    Auto-publish:       " << (pluginSettings.autoPublish ? "Enabled" : "Disabled");
Alessio Netti's avatar
Alessio Netti committed
735
736
      LOG(info) << "    Write-Dir:          " << pluginSettings.tempdir;
      LOG(info) << (settings.validateConfig ? "    Only validating config files." : "    ValidateConfig:     Disabled");
737

Alessio Netti's avatar
Alessio Netti committed
738
739
      LOG(info) << "Analytics Settings:";
      LOG(info) << "    Hierarchy:          " << (analyticsSettings.hierarchy!="" ? analyticsSettings.hierarchy : "none");
740
      LOG(info) << "    Filter:             " << (analyticsSettings.filter!="" ? analyticsSettings.filter : "none");
Alessio Netti's avatar
Alessio Netti committed
741
      
742
      LOG(info) << "Cassandra Driver Settings:";
Alessio Netti's avatar
Alessio Netti committed
743
      LOG(info) << "    Address:            " << cassandraSettings.host << ":" << cassandraSettings.port;
Alessio Netti's avatar
Alessio Netti committed
744
745
746
747
748
      LOG(info) << "    TTL:                " << cassandraSettings.ttl;
      LOG(info) << "    NumThreadsIO:       " << cassandraSettings.numThreadsIo;
      LOG(info) << "    QueueSizeIO:        " << cassandraSettings.queueSizeIo;
      LOG(info) << "    CoreConnPerHost:    " << cassandraSettings.coreConnPerHost;
      LOG(info) << "    DebugLog:           " << (cassandraSettings.debugLog ? "Enabled" : "Disabled");
749
#ifdef SimpleMQTTVerbose
Alessio Netti's avatar
Alessio Netti committed
750
751
      LOG(info) << "    Username:           " << cassandraSettings.username;
	  LOG(info) << "    Password:           " << cassandraSettings.password;
752
753
754
755
#else
      LOG(info) << "    Username and password not printed.";
#endif

756
757
758
759
760
      if (restAPISettings.enabled) {
	  
	  LOG(info) << "RestAPI Settings:";
	  LOG(info) << "    REST Server: " << restAPISettings.host << ":" << restAPISettings.port;
	  LOG(info) << "    Certificate: " << restAPISettings.certificate;
Alessio Netti's avatar
Alessio Netti committed
761
762
	  LOG(info) << "    Private key file: " << restAPISettings.privateKey;
	  LOG(info) << "    DH params from: " << restAPISettings.dhFile;
763
      }
764
765
      LOG_VAR(vLogLevel) << "-----  Analytics Configuration  -----";
      for(auto& p : analyticsController->getManager()->getPlugins()) {
766
          LOG_VAR(vLogLevel) << "Operator Plugin \"" << p.id << "\"";
767
768
769
770
          p.configurator->printConfig(vLogLevel);
      }
      LOG_VAR(vLogLevel) << "-----  End Configuration  -----";

Alessio Netti's avatar
Alessio Netti committed
771
      if (settings.validateConfig)
772
773
774
775
          return EXIT_SUCCESS;
      else
          analyticsController->start();

776
777
778
      /*
       * Start the MQTT Message Server.
       */
Alessio Netti's avatar
Alessio Netti committed
779
      SimpleMQTTServer ms(settings.mqttListenHost, settings.mqttListenPort, settings.messageThreads, settings.messageSlots);
780
      
781
782
783
      ms.setMessageCallback(mqttCallback);
      ms.start();

Alessio Netti's avatar
Logging    
Alessio Netti committed
784
      LOG(info) << "MQTT Server running...";
785
      
786
787
788
      /*
       * Start the HTTP Server for the REST API
       */
789
      CARestAPI* httpsServer = nullptr;
790
      if (restAPISettings.enabled) {
791
792
793
794
          httpsServer = new CARestAPI(restAPISettings, &mySensorCache, analyticsController);
          config.readRestAPIUsers(httpsServer);
          httpsServer->start();
          LOG(info) <<  "HTTP Server running...";
795
      }
796

797
798
799
800
801
802
      /*
       * Run (hopefully) forever...
       */
      keepRunning = 1;
      timeval start, end;
      double elapsed;
803
804
805
      msgCtr = 0;
      pmsgCtr = 0;
      readingCtr = 0;
806

Alessio Netti's avatar
Alessio Netti committed
807
808
809
      gettimeofday(&start, NULL);
      uint64_t lastCleanup = start.tv_sec;

810
      LOG(info) << "Collect Agent running...";
811
812
      while(keepRunning) {
          gettimeofday(&start, NULL);
Alessio Netti's avatar
Alessio Netti committed
813
814
815
816
817
818
819
          if(start.tv_sec - lastCleanup > settings.cleaningInterval) {
              uint64_t purged = mySensorCache.clean(settings.cleaningInterval * 1000000000);
              lastCleanup = start.tv_sec;
              if(purged > 0)
                  LOG(info) << "Cache: purged " << purged << " obsolete entries";
          }

820
          sleep(60);
821
822
823
824
          /* not really thread safe but will do the job */
          gettimeofday(&end, NULL);
          elapsed = (end.tv_sec - start.tv_sec) * 1000.0;
          elapsed += (end.tv_usec - start.tv_usec) / 1000.0;
825
          float publish = msgCtr?(pmsgCtr*100.0)/msgCtr:0;
Alessio Netti's avatar
Alessio Netti committed
826
          if (settings.statistics && keepRunning) {
827
              LOG(info) << "Performance: " << (readingCtr/elapsed)*1000.0 << " inserts/s, " << (msgCtr/elapsed)*1000.0 << " messages/s (" << publish << "% PUBLISH)";
828
              LOG(info) << "Analytics Performance: " << (analyticsController->getReadingCtr()/elapsed)*1000.0 << " inserts/s ";
829
          }
830
          msgCtr = 0;
831
          pmsgCtr = 0;
832
	  readingCtr = 0;
833
834
      }

Alessio Netti's avatar
Logging    
Alessio Netti committed
835
      LOG(info) << "Stopping...";
836
      analyticsController->stop();
837
      ms.stop();
Alessio Netti's avatar
Logging    
Alessio Netti committed
838
      LOG(info) << "MQTT Server stopped...";
839
840
841
842
843
      if (restAPISettings.enabled) {
	  httpsServer->stop();
	  delete httpsServer;
	  LOG(info) << "HTTP Server stopped...";
      }
844
      delete mySensorDataStore;
Alessio Netti's avatar
Alessio Netti committed
845
      delete myJobDataStore;
846
      delete mySensorConfig;
847
      delete myCaliEvtDataStore;
Axel Auweter's avatar
Axel Auweter committed
848
849
      dcdbConn->disconnect();
      delete dcdbConn;
850
      delete metadataStore;
Alessio Netti's avatar
Logging    
Alessio Netti committed
851
      LOG(info) << "Collect Agent closed. Bye bye...";
852
  }
853
  catch (const exception& e) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
854
      LOG(fatal) << "Exception: " << e.what();
855
      abrt(EXIT_FAILURE, INTERR);
856
857
  }

858
  return EXIT_SUCCESS;
859
}
860
861