Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

collectagent.cpp 34.7 KB
Newer Older
1
2
//================================================================================
// Name        : collectagent.cpp
3
// Author      : Axel Auweter
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
// Copyright   : Leibniz Supercomputing Centre
6
// Description : Main code of the CollectAgent
7
8
9
10
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
11
// Copyright (C) 2011-2019 Leibniz Supercomputing Centre
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
27

28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
 * @defgroup ca Collect Agent
 *
 * @brief MQTT message broker in between pusher and storage backend.
 *
 * @details Collect Agent is a intermediary between one or multiple Pusher
 *          instances and one storage backend. It runs a reduced custom MQTT
 *          message server. Collect Agent receives data from Pusher
 *          via MQTT messages and stores them in the storage via libdcdb.
 */

/**
 * @file collectagent.cpp
 *
 * @brief Main function for the DCDB Collect Agent.
 *
 * @ingroup ca
 */

47
#include <cstdlib>
48
#include <signal.h>
49
50
#include <unistd.h>
#include <string>
51

52
#include <boost/date_time/posix_time/posix_time.hpp>
Micha Müller's avatar
Micha Müller committed
53
54
55
#include <boost/foreach.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>
56

57
#include <dcdb/connection.h>
58
#include <dcdb/sensordatastore.h>
Alessio Netti's avatar
Alessio Netti committed
59
#include <dcdb/jobdatastore.h>
60
#include <dcdb/calievtdatastore.h>
61
#include <dcdb/sensorconfig.h>
62
#include <dcdb/version.h>
63
#include <dcdb/sensor.h>
64
#include "version.h"
65

66
#include "CARestAPI.h"
67
#include "configuration.h"
68
#include "simplemqttserver.h"
69
#include "messaging.h"
70
#include "abrt.h"
71
#include "dcdbdaemon.h"
72
#include "sensorcache.h"
73
74
#include "analyticscontroller.h"
#include "../analytics/includes/QueryEngine.h"
75

76
77
78
#define __STDC_FORMAT_MACROS
#include <inttypes.h>

79
80
81
82
83
84
85
/**
 * Uncomment and recompile to activate CollectAgent's special benchmark mode.
 * In this mode, all received messages will be discarded and no data is stored
 * in the storage backend.
 */
//#define BENCHMARK_MODE

86
using namespace std;
87
88

int keepRunning;
89
bool statistics;
90
91
uint64_t msgCtr;
uint64_t pmsgCtr;
92
uint64_t readingCtr;
Alessio Netti's avatar
Alessio Netti committed
93
SensorCache mySensorCache;
94
AnalyticsController* analyticsController;
95
DCDB::Connection* dcdbConn;
96
DCDB::SensorDataStore *mySensorDataStore;
Alessio Netti's avatar
Alessio Netti committed
97
DCDB::JobDataStore *myJobDataStore;
98
DCDB::SensorConfig *mySensorConfig;
99
DCDB::CaliEvtDataStore *myCaliEvtDataStore;
100
MetadataStore *metadataStore;
101
DCDB::SCError err;
102
QueryEngine& queryEngine = QueryEngine::getInstance();
103
logger_t lg;
104

105
bool jobQueryCallback(const string& jobId, const uint64_t startTs, const uint64_t endTs, vector<qeJobData>& buffer, const bool rel, const bool range) {
Alessio Netti's avatar
Alessio Netti committed
106
107
108
109
110
111
112
113
114
115
    std::list<JobData> tempList;
    JobData   tempData;
    qeJobData tempQeData;
    JDError err;
    if(range) {
        // Getting a list of jobs in the given time range
        uint64_t now = getTimestamp();
        uint64_t startTsInt = rel ? now - startTs : startTs;
        uint64_t endTsInt = rel ? now - endTs : endTs;
        DCDB::TimeStamp start(startTsInt), end(endTsInt);
116
        err = myJobDataStore->getJobsInIntervalRunning(tempList, start, end);
117
        if(err != JD_OK) return false;
Alessio Netti's avatar
Alessio Netti committed
118
119
120
    } else {
        // Getting a single job by id
        err = myJobDataStore->getJobById(tempData, jobId);
121
        if(err != JD_OK) return false;
Alessio Netti's avatar
Alessio Netti committed
122
123
124
125
126
127
128
129
130
        tempList.push_back(tempData);
    }
    
    for(auto& jd : tempList) {
        tempQeData.jobId = jd.jobId;
        tempQeData.userId = jd.userId;
        tempQeData.startTime = jd.startTime.getRaw();
        tempQeData.endTime = jd.endTime.getRaw();
        tempQeData.nodes = jd.nodes;
131
        buffer.push_back(tempQeData);
Alessio Netti's avatar
Alessio Netti committed
132
    }
133
    return true;
Alessio Netti's avatar
Alessio Netti committed
134
135
}

136
bool sensorGroupQueryCallback(const std::vector<string>& names, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>& buffer, const bool rel) {
137
138
139
    // Returning NULL if the query engine is being updated
    if(queryEngine.updating.load()) return false;
    ++queryEngine.access;
140
141
142
143
144
145
146
147
148
149
150
151
152
153
    
    std::list<DCDB::SensorId> topics;
    std::string topic;
    sensorCache_t& sensorMap = mySensorCache.getSensorMap();
    size_t successCtr = 0;
    for(const auto& name : names) {
        // Getting the topic of the queried sensor from the Navigator
        // If not found, we try to use the input name as topic
        try {
            topic = queryEngine.getNavigator()->getNodeTopic(name);
        } catch (const std::domain_error &e) { topic = name; }
        DCDB::SensorId sid;
        // Creating a SID to perform the query
        if (sid.mqttTopicConvert(topic)) {
154
            try {
155
                mySensorCache.wait();
156
157
158
159
160
161
162
163
164
                if (sensorMap.count(sid) > 0 && sensorMap[sid].getView(startTs, endTs, buffer, rel)) {
                    // Data was found, can continue to next SID
                    successCtr++;
                } else {
                    // This happens only if no data was found in the local cache
                    topics.push_back(sid);
                }
            // To handle nasty (yet rare) race conditions on the sensor cache
            } catch(const std::exception& e) {
165
166
                topics.push_back(sid);
            }
167
        }
168
    }
169
170
171
172
173
174
175
176
177
178
179
180
181
    // If we are here then some sensors were not found in the cache - we need to fetch data from Cassandra
    if(!topics.empty()) {
        try {
            std::list <DCDB::SensorDataStoreReading> results;
            uint64_t now = getTimestamp();
            //Converting relative timestamps to absolute
            uint64_t startTsInt = rel ? now - startTs : startTs;
            uint64_t endTsInt = rel ? now - endTs : endTs;
            DCDB::TimeStamp start(startTsInt), end(endTsInt);
            uint16_t startWs=start.getWeekstamp(), endWs=end.getWeekstamp();
            // If timestamps are equal we perform a fuzzy query
            if(startTsInt == endTsInt) {
                topics.front().setRsvd(startWs);
182
                mySensorDataStore->fuzzyQuery(results, topics, start, 3600000000000, false);
183
184
185
186
187
            }
            // Else, we iterate over the weekstamps (if more than one) and perform range queries
            else {
                for(uint16_t currWs=startWs; currWs<=endWs; currWs++) {
                    topics.front().setRsvd(currWs);
188
                    mySensorDataStore->query(results, topics, start, end, DCDB::AGGREGATE_NONE, false);
189
190
191
192
193
194
195
196
197
198
199
200
                }
            }

            if (!results.empty()) {
                successCtr++;
                reading_t reading;
                for (const auto &r : results) {
                    reading.value = r.value;
                    reading.timestamp = r.timeStamp.getRaw();
                    buffer.push_back(reading);
                }
            }
201
        }
202
        catch (const std::exception &e) {}
203
    }
204
    
205
    --queryEngine.access;
206
207
208
209
210
211
212
213
214
    return successCtr>0;
}

bool sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>& buffer, const bool rel) {
    // Returning NULL if the query engine is being updated
    if(queryEngine.updating.load()) return false;
    std::vector<std::string> nameWrapper;
    nameWrapper.push_back(name);
    return sensorGroupQueryCallback(nameWrapper, startTs, endTs, buffer, rel);
215
216
}

217
218
219
220
221
222
223
224
225
226
227
bool metadataQueryCallback(const string& name, SensorMetadata& buffer) {
    // Returning NULL if the query engine is being updated
    if(queryEngine.updating.load()) return false;
    ++queryEngine.access;
    std::string topic=name;
    // Getting the topic of the queried sensor from the Navigator
    // If not found, we try to use the input name as topic
    try {
        topic = queryEngine.getNavigator()->getNodeTopic(name);
    } catch(const std::domain_error& e) {}
    
228
229
230
231
232
233
234
235
236
237
238
    bool local = false;
    try {
        metadataStore->wait();
        if(metadataStore->getMap().count(topic)) {
            buffer = metadataStore->get(topic);
            local = true;
        }
    }
    catch(const std::exception& e) {}
    
    if(!local) {
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
        // If we are here then the sensor was not found in the cache - we need to fetch data from Cassandra
        try {
            DCDB::PublicSensor publicSensor;
            if (mySensorConfig->getPublicSensorByName(publicSensor, topic.c_str()) != SC_OK) {
                --queryEngine.access;
                return false;
            }
            buffer = Configuration::publicSensorToMetadata(publicSensor);
        }
        catch (const std::exception &e) {
            --queryEngine.access;
            return false;
        }
    }
    --queryEngine.access;
    return true;
}

257
/* Normal termination (SIGINT, CTRL+C) */
258
259
void sigHandler(int sig)
{
Alessio Netti's avatar
Alessio Netti committed
260
261
262
263
264
  boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
  if( sig == SIGINT )
      LOG(fatal) << "Received SIGINT";
  else if( sig == SIGTERM )
      LOG(fatal) << "Received SIGTERM";
265
266
267
  keepRunning = 0;
}

268
269
270
271
272
273
/* Crash */
void abrtHandler(int sig)
{
  abrt(EXIT_FAILURE, SIGNAL);
}

274
int mqttCallback(SimpleMQTTMessage *msg)
275
276
277
278
{
  /*
   * Increment the msgCtr/vmsgCtr for statistics.
   */
279
  msgCtr++;
280
281
282
  if (msg->isPublish())
    pmsgCtr++;

283
284
#ifndef BENCHMARK_MODE

285
  uint64_t len;
286
287
288
  /*
   * Decode the message and put into the database.
   */
289
  if (msg->isPublish()) {
290
      const char *topic = msg->getTopic().c_str();
291
      // We check whether the topic includes the /DCDB_MAP/ keyword, indicating that the payload will contain the
292
293
      // sensor's name. In that case, we set the mappingMessage flag to true, and filter the keyword out of the prefix
      // We use strncmp as it is the most efficient way to do it
Alessio Netti's avatar
Alessio Netti committed
294
      if (strncmp(topic, DCDB_MAP, DCDB_MAP_LEN) == 0) {
295
          if ((len = msg->getPayloadLength()) == 0) {
296
              LOG(error) << "Empty sensor publish message received!";
297
298
              return 1;
          }
299

300
          string payload((char *) msg->getPayload(), len);
301
302
303
304
305
306
307
308
309
          //If the topic includes the extended /DCDB_MAP/METADATA/ keyword, we assume a JSON metadata packet is encoded
          if(strncmp(topic, DCDB_MET, DCDB_MET_LEN) == 0) {
              SensorMetadata sm;
              try {
                  sm.parseJSON(payload);
              } catch (const std::exception &e) {
                  LOG(error) << "Invalid metadata packed received!";
                  return 1;
              }
310
              if(sm.isValid()) {
311
                  err = mySensorConfig->publishSensor(sm);
312
313
                  metadataStore->store(*sm.getPattern(), sm);
              }
314
315
          } else {
              err = mySensorConfig->publishSensor(payload.c_str(), topic + DCDB_MAP_LEN);
316
          }
317
318
319
320

          // PublishSensor does most of the error checking for us
          switch (err) {
              case DCDB::SC_INVALIDPATTERN:
Alessio Netti's avatar
Logging    
Alessio Netti committed
321
                  LOG(error) << "Invalid sensor topic : " << msg->getTopic();
322
323
                  return 1;
              case DCDB::SC_INVALIDPUBLICNAME:
324
                  LOG(error) << "Invalid sensor public name.";
325
326
                  return 1;
              case DCDB::SC_INVALIDSESSION:
Alessio Netti's avatar
Logging    
Alessio Netti committed
327
                  LOG(error) << "Cannot reach sensor data store.";
328
329
330
331
                  return 1;
              default:
                  break;
          }
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
      } else if (strncmp(topic, DCDB_CALIEVT, DCDB_CALIEVT_LEN) == 0) {
          /*
           * Special message case. This message contains a Caliper Event data
           * string that is encoded in the MQTT topic. Its payload consists of
           * usual timestamp-value pairs.
           * Data from this messages will be stored in a separate table managed
           * by CaliEvtDataStore class.
           */
          std::string topicStr(msg->getTopic());
          mqttPayload buf, *payload;

          len = msg->getPayloadLength();
          //TODO support case that no timestamp is given?
          //retrieve timestamp and value from the payload
          if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
              payload = (mqttPayload *) msg->getPayload();
          } else {
              //this message is malformed -> ignore
              LOG(error) << "Message malformed";
              return 1;
          }

          /*
           * Decode message topic in actual sensor topic and string data.
           * "/:/" is used as delimiter between topic and data.
           */
          topicStr.erase(0, DCDB_CALIEVT_LEN);
          size_t pos = topicStr.find("/:/");
          if (pos == std::string::npos) {
              // topic is malformed -> ignore
              LOG(error) << "CaliEvt topic malformed";
              return 1;
          }
          const std::string data(topicStr, pos+3);
          topicStr.erase(pos);

          /*
           * We use the same MQTT-topic/SensorId infrastructure as actual sensor
           * data readings to sort related events.
           * Check if we can decode the event topic into a valid SensorId. If
           * successful, store the record in the database.
           */
          DCDB::SensorId sid;
          if (sid.mqttTopicConvert(topicStr)) {
376
              //std::list<DCDB::CaliEvtData> events;
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
              DCDB::CaliEvtData e;
              e.eventId   = sid;
              e.event     = data;
              for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
                  e.timeStamp = payload[i].timestamp;

                  /**
                   * We want an exhaustive list of all events ordered by their
                   * time of occurrence. Payload values should always be
                   * one. Other values currently indicate a malformed message.
                   *
                   * In the future, the value field could be used to aggregate
                   * multiple equal events that occurred in the same plugin
                   * read cycle.
                   */
                  if(payload[i].value != 1) {
                      LOG(error) << "CaliEvt message malformed. Value != 1";
                      return 1;
                  }

397
398
                  myCaliEvtDataStore->insert(e, metadataStore->getTTL(topicStr));
                  //events.push_back(e);
399
              }
400
              //myCaliEvtDataStore->insertBatch(events, metadataStore->getTTL(topicStr));
401
402
403
          } else {
              LOG(error) << "Topic could not be converted to SID";
          }
Micha Müller's avatar
Micha Müller committed
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
      } else if (strncmp(topic, DCDB_JOBDATA, DCDB_JOBDATA_LEN) == 0) {
	      /**
           * This message contains Slurm job data in JSON format. We need to
           * parse the payload and store it within the JobDataStore.
           */
	      if ((len = msg->getPayloadLength()) == 0) {
		      LOG(error) << "Empty job data message received!";
		      return 1;
	      }

	      //parse JSON into JobData object
	      string        payload((char *)msg->getPayload(), len);
	      DCDB::JobData jd;
	      try {
		      boost::property_tree::iptree config;
		      std::istringstream           str(payload);
		      boost::property_tree::read_json(str, config);
		      BOOST_FOREACH (boost::property_tree::iptree::value_type &val, config) {
			      if (boost::iequals(val.first, "jobid")) {
				      jd.jobId = val.second.data();
			      } else if (boost::iequals(val.first, "userid")) {
				      jd.userId = val.second.data();
			      } else if (boost::iequals(val.first, "starttime")) {
				      jd.startTime = DCDB::TimeStamp((uint64_t)stoull(val.second.data()));
			      } else if (boost::iequals(val.first, "endtime")) {
				      jd.endTime = DCDB::TimeStamp((uint64_t)stoull(val.second.data()));
			      } else if (boost::iequals(val.first, "nodes")) {
				      BOOST_FOREACH (boost::property_tree::iptree::value_type &node, val.second) {
					      jd.nodes.push_back(node.second.data());
				      }
			      }
		      }
	      } catch (const std::exception &e) {
		      LOG(error) << "Invalid job data packet received!";
		      return 1;
	      }

#ifdef DEBUG
	      LOG(debug) << "JobId  = " << jd.jobId;
	      LOG(debug) << "UserId = " << jd.userId;
	      LOG(debug) << "Start  = " << jd.startTime.getString();
	      LOG(debug) << "End    = " << jd.endTime.getString();
	      LOG(debug) << "Nodes: ";
	      for (const auto &n : jd.nodes) {
		      LOG(debug) << "    " << n;
	      }
#endif

	      //store JobData into Storage Backend
	      if (jd.endTime == DCDB::TimeStamp((uint64_t)0)) {
		      //starting job data
		      if (myJobDataStore->insertJob(jd) != DCDB::JD_OK) {
			      LOG(error) << "Job data insert failed!";
			      return 1;
		      }
	      } else {
		      //ending job data
		      DCDB::JobData tmp;
		      if (myJobDataStore->getJobById(tmp, jd.jobId) != DCDB::JD_OK) {
			      LOG(error) << "Could not retrieve job to be updated!";
			      return 1;
		      }

		      if (myJobDataStore->updateEndtime(tmp.jobId, tmp.startTime, jd.endTime) != DCDB::JD_OK) {
			      LOG(error) << "Could not update end time of job!";
			      return 1;
		      }
	      }
472
      } else {
Micha Müller's avatar
Micha Müller committed
473
	      mqttPayload buf, *payload;
474

Micha Müller's avatar
Micha Müller committed
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
	      len = msg->getPayloadLength();
	      //In the 64 bit message case, the collect agent provides a timestamp
	      if (len == sizeof(uint64_t)) {
		      payload = &buf;
		      payload->value = *((int64_t *)msg->getPayload());
		      payload->timestamp = Messaging::calculateTimestamp();
		      len = sizeof(uint64_t) * 2;
	      }
	      //...otherwise it just retrieves it from the MQTT message payload.
	      else if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
		      payload = (mqttPayload *)msg->getPayload();
	      }
	      //...otherwise this message is malformed -> ignore...
	      else {
		      LOG(error) << "Message malformed";
		      return 1;
	      }
492

Micha Müller's avatar
Micha Müller committed
493
	      /*
494
495
496
497
           * Check if we can decode the message topic
           * into a valid SensorId. If successful, store
           * the record in the database.
           */
Micha Müller's avatar
Micha Müller committed
498
499
	      DCDB::SensorId sid;
	      if (sid.mqttTopicConvert(msg->getTopic())) {
500
501
502
503
504
505
506
507
508
509
510
511
#if 0
              cout << "Topic decode successful:" << endl
                  << "  Raw:            " << hex << setw(16) << setfill('0') << sid.getRaw()[0] << hex << setw(16) << setfill('0') << sid.getRaw()[1] << endl
                  << "  DeviceLocation: " << hex << setw(16) << setfill('0') << sid.getDeviceLocation() << endl
                  << "  device_id:      " << hex << setw(8) << setfill('0') << sid.getDeviceSensorId().device_id << endl
                  << "  sensor_number:  " << hex << setw(4) << setfill('0') << sid.getDeviceSensorId().sensor_number << endl << dec;

              cout << "Payload ("  << len/sizeof(mqttPayload) << " messages):"<< endl;
              for (uint64_t i=0; i<len/sizeof(mqttPayload); i++) {
                cout << "  " << i << ": ts=" << payload[i].timestamp << " val=" << payload[i].value << endl;
              }
              cout << endl;
512
#endif
513
	      std::list<DCDB::SensorDataStoreReading> readings;
Alessio Netti's avatar
Alessio Netti committed
514
515
516
517
518
519
          for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
              DCDB::SensorDataStoreReading r(sid, payload[i].timestamp, payload[i].value);
              readings.push_back(r);
              mySensorCache.storeSensor(sid, payload[i].timestamp, payload[i].value);
          }
          mySensorCache.getSensorMap()[sid].updateBatchSize(uint64_t(len / sizeof(mqttPayload)));
520
	      mySensorDataStore->insertBatch(readings, metadataStore->getTTL(msg->getTopic()));
521
	      readingCtr+= readings.size();
522

523
              //mySensorCache.dump();
524
525
          } else {
              LOG(error) << "Message with empty topic received";
526
          }
527
      }
528
  }
529
#endif
530
  return 0;
531
532
}

533
534
535



536
537
538
539
/*
 * Print usage information
 */
void usage() {
540
541
542
543
544
  /*
             1         2         3         4         5         6         7         8
   012345678901234567890123456789012345678901234567890123456789012345678901234567890
   */
  cout << "Usage:" << endl;
545
  cout << "  collectagent [-d] [-s] [-x] [-a] [-m<host>] [-c<host>] [-u<username>] [-p<password>] [-t<ttl>] [-v<verbosity>] <config>" << endl;
546
547
548
549
  cout << "  collectagent -h" << endl;
  cout << endl;
  
  cout << "Options:" << endl;
550
551
  cout << "  -m<host>      MQTT listen address     [default: " << DEFAULT_LISTENHOST << ":" << DEFAULT_LISTENPORT << "]" << endl;
  cout << "  -c<host>      Cassandra host          [default: " << DEFAULT_CASSANDRAHOST << ":" << DEFAULT_CASSANDRAPORT << "]" << endl;
Michael Ott's avatar
Michael Ott committed
552
553
  cout << "  -u<username>  Cassandra username      [default: none]" << endl;
  cout << "  -p<password>  Cassandra password      [default: none]" << endl;
554
555
  cout << "  -t<ttl>       Cassandra insert TTL    [default: " << DEFAULT_CASSANDRATTL << "]" << endl;
  cout << "  -v<level>     Set verbosity of output [default: " << DEFAULT_LOGLEVEL << "]" << endl
Alessio Netti's avatar
Logging    
Alessio Netti committed
556
       << "                Can be a number between 5 (all) and 0 (fatal)." << endl;
557
  cout << endl;
Michael Ott's avatar
Michael Ott committed
558
  cout << "  -d            Daemonize" << endl;
559
  cout << "  -s            Print message stats" <<endl;
560
  cout << "  -x            Parse and print the config but do not actually start collectagent" << endl;
561
  cout << "  -a			   Enable sensor auto-publish" << endl;
Michael Ott's avatar
Michael Ott committed
562
  cout << "  -h            This help page" << endl;
563
  cout << endl;
564
565
566
}

int main(int argc, char* const argv[]) {
567
    cout << "CollectAgent " << VERSION << " (libdcdb " << DCDB::Version::getVersion() << ")" << endl << endl;
568
569

  try{
570

571
572
      // Checking if path to config is supplied
      if (argc <= 1) {
573
          cout << "Please specify a path to the config-directory or a config-file" << endl << endl;
574
575
576
577
578
          usage();
          exit(EXIT_FAILURE);
      }

      // Defining options
579
      const char* opts = "m:r:c:C:u:p:t:v:dDsaxh";
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595

      // Same mechanism as in DCDBPusher - checking if help string is requested before loading config
      char ret;
      while ((ret = getopt(argc, argv, opts)) != -1) {
          switch (ret)
          {
              case 'h':
                  usage();
                  exit(EXIT_FAILURE);
                  break;
              default:
                  //do nothing (other options are read later on)
                  break;
          }
      }

596
597
      initLogging();
      auto cmdSink = setupCmdLogger();
Alessio Netti's avatar
Logging    
Alessio Netti committed
598

Alessio Netti's avatar
Alessio Netti committed
599
      Configuration config(argv[argc - 1], "collectagent.conf");
600
      config.readConfig();
601

Alessio Netti's avatar
Alessio Netti committed
602
603
604
605
      // References to shorten access to config parameters
      Configuration& settings = config;
      cassandra_t& cassandraSettings = config.cassandraSettings;
      pluginSettings_t& pluginSettings = config.pluginSettings;
Micha Mueller's avatar
Micha Mueller committed
606
      serverSettings_t& restAPISettings = config.restAPISettings;
Alessio Netti's avatar
Alessio Netti committed
607
      analyticsSettings_t& analyticsSettings = config.analyticsSettings;
Alessio Netti's avatar
Alessio Netti committed
608
      
609
610
      optind = 1;
      while ((ret=getopt(argc, argv, opts))!=-1) {
611
          switch(ret) {
612
              case 'a':
613
                  pluginSettings.autoPublish = true;
614
                  break;
615
              case 'm':
Alessio Netti's avatar
Alessio Netti committed
616
617
                  settings.mqttListenHost = parseNetworkHost(optarg);
                  settings.mqttListenPort = parseNetworkPort(optarg);
618
                  if(settings.mqttListenPort=="") settings.mqttListenPort = string(DEFAULT_LISTENPORT);
619
                  break;
620
              case 'c':
Alessio Netti's avatar
Alessio Netti committed
621
622
                  cassandraSettings.host = parseNetworkHost(optarg);
                  cassandraSettings.port = parseNetworkPort(optarg);
623
                  if(cassandraSettings.port=="") cassandraSettings.port = string(DEFAULT_CASSANDRAPORT);
624
                  break;
Michael Ott's avatar
Michael Ott committed
625
              case 'u':
Alessio Netti's avatar
Alessio Netti committed
626
                  cassandraSettings.username = optarg;
627
                  break;
Michael Ott's avatar
Michael Ott committed
628
              case 'p': {
Alessio Netti's avatar
Alessio Netti committed
629
                  cassandraSettings.password = optarg;
630
                  // What does this do? Mask the password?
Michael Ott's avatar
Michael Ott committed
631
632
633
634
635
636
637
                  size_t pwdLen = strlen(optarg);
                  memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
                  if (pwdLen > 3) {
                      memset(optarg+3, 0, pwdLen-3);
                  }
                  break;
              }
638
              case 't':
Alessio Netti's avatar
Alessio Netti committed
639
                  cassandraSettings.ttl = stoul(optarg);
640
                  break;
Alessio Netti's avatar
Logging    
Alessio Netti committed
641
              case 'v':
642
                  settings.logLevelCmd = stoi(optarg);
Alessio Netti's avatar
Logging    
Alessio Netti committed
643
                  break;
644
              case 'd':
645
              case 'D':
646
                  settings.daemonize = 1;
647
                  break;
648
              case 's':
649
                  settings.statistics = 1;
650
                  break;
651
              case 'x':
Alessio Netti's avatar
Alessio Netti committed
652
                  settings.validateConfig = true;
653
                  break;
654
              case 'h':
655
656
657
              default:
                  usage();
                  exit(EXIT_FAILURE);
658
659
660
          }
      }

661
662
      //set up logger to file
      if (settings.logLevelFile >= 0) {
Alessio Netti's avatar
Alessio Netti committed
663
	  auto fileSink = setupFileLogger(pluginSettings.tempdir, std::string("collectagent"));
664
665
666
	  fileSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelFile));
      }
      
Alessio Netti's avatar
Logging    
Alessio Netti committed
667
      //severity level may be overwritten (per option or config-file) --> set it according to globalSettings
668
669
670
      if (settings.logLevelCmd >= 0) {
	  cmdSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelCmd));
      }
Alessio Netti's avatar
Logging    
Alessio Netti committed
671

672
      /*
Alessio Netti's avatar
Alessio Netti committed
673
       * Catch SIGINT and SIGTERM signals to allow for proper server shutdowns.
674
675
       */
      signal(SIGINT, sigHandler);
Alessio Netti's avatar
Alessio Netti committed
676
      signal(SIGTERM, sigHandler);
677
678
679
680
681
682
683
684
685
686

      /*
       * Catch critical signals to allow for backtraces
       */
      signal(SIGABRT, abrtHandler);
      signal(SIGSEGV, abrtHandler);

      // Daemonizing the collectagent
      if(settings.daemonize)
          dcdbdaemon();
Alessio Netti's avatar
Alessio Netti committed
687
      
688
      // Setting the size of the sensor cache
689
      // Conversion from milliseconds to nanoseconds
Alessio Netti's avatar
Alessio Netti committed
690
      mySensorCache.setMaxHistory(uint64_t(pluginSettings.cacheInterval) * 1000000);
691

692
      //Allocate and initialize connection to Cassandra.
Alessio Netti's avatar
Alessio Netti committed
693
694
      dcdbConn = new DCDB::Connection(cassandraSettings.host, atoi(cassandraSettings.port.c_str()), 
                                      cassandraSettings.username, cassandraSettings.password);
Alessio Netti's avatar
Alessio Netti committed
695
696
      dcdbConn->setNumThreadsIo(cassandraSettings.numThreadsIo);
      dcdbConn->setQueueSizeIo(cassandraSettings.queueSizeIo);
697
      uint32_t params[1] = {cassandraSettings.coreConnPerHost};
Alessio Netti's avatar
Alessio Netti committed
698
      dcdbConn->setBackendParams(params);
Alessio Netti's avatar
Alessio Netti committed
699
      
Axel Auweter's avatar
Axel Auweter committed
700
      if (!dcdbConn->connect()) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
701
          LOG(fatal) << "Cannot connect to Cassandra!";
702
703
704
705
706
707
          exit(EXIT_FAILURE);
      }

      /*
       * Legacy behavior: Initialize the DCDB schema in Cassandra.
       */
Axel Auweter's avatar
Axel Auweter committed
708
      dcdbConn->initSchema();
Alessio Netti's avatar
Alessio Netti committed
709
      
710
711
712
      /*
       * Allocate the SensorDataStore.
       */
713
      mySensorDataStore = new DCDB::SensorDataStore(dcdbConn);
714
      mySensorConfig = new DCDB::SensorConfig(dcdbConn);
Alessio Netti's avatar
Alessio Netti committed
715
      myJobDataStore = new DCDB::JobDataStore(dcdbConn);
716
      myCaliEvtDataStore = new DCDB::CaliEvtDataStore(dcdbConn);
717
718
719

      /*
       * Set TTL for data store inserts if TTL > 0.
720
       */
721
      if (cassandraSettings.ttl > 0) {
Alessio Netti's avatar
Alessio Netti committed
722
        mySensorDataStore->setTTL(cassandraSettings.ttl);
723
724
        myCaliEvtDataStore->setTTL(cassandraSettings.ttl);
      }
Alessio Netti's avatar
Alessio Netti committed
725
      mySensorDataStore->setDebugLog(cassandraSettings.debugLog);
726
      myCaliEvtDataStore->setDebugLog(cassandraSettings.debugLog);
727

728
729
730
      // Fetching public sensor information from the Cassandra datastore
      list<DCDB::PublicSensor> publicSensors;
      metadataStore = new MetadataStore();
731
732
      if(mySensorConfig->getPublicSensorsVerbose(publicSensors)!=SC_OK)
          LOG(error) << "Failed to retrieve public sensors. Metadata Store and Sensor Navigator will be empty.";
733
      SensorMetadata sBuf;
734
      for (const auto &s : publicSensors)
735
736
          if (!s.is_virtual) {
              sBuf = Configuration::publicSensorToMetadata(s);
737
738
              if(sBuf.isValid())
                  metadataStore->store(*sBuf.getPattern(), sBuf);
739
          }
740
          
741
742
      analyticsController = new AnalyticsController(mySensorConfig, mySensorDataStore);
      analyticsController->setCache(&mySensorCache);
743
      analyticsController->setMetadataStore(metadataStore);
Alessio Netti's avatar
Alessio Netti committed
744
      queryEngine.setFilter(analyticsSettings.filter);
Alessio Netti's avatar
Alessio Netti committed
745
      queryEngine.setJobFilter(analyticsSettings.jobFilter);
746
      queryEngine.setJobMatch(analyticsSettings.jobMatch);
Alessio Netti's avatar
Alessio Netti committed
747
      queryEngine.setSensorHierarchy(analyticsSettings.hierarchy);
748
      queryEngine.setQueryCallback(sensorQueryCallback);
749
      queryEngine.setGroupQueryCallback(sensorGroupQueryCallback);
750
      queryEngine.setMetadataQueryCallback(metadataQueryCallback);
Alessio Netti's avatar
Alessio Netti committed
751
      queryEngine.setJobQueryCallback(jobQueryCallback);
752
      if(!analyticsController->initialize(settings))
753
754
          return EXIT_FAILURE;
      
Alessio Netti's avatar
Alessio Netti committed
755
      LOG_LEVEL vLogLevel = settings.validateConfig ? LOG_LEVEL::info : LOG_LEVEL::debug;
756
757
758
759
      LOG_VAR(vLogLevel) << "-----  Configuration  -----";

      //print global settings in either case
      LOG(info) << "Global Settings:";
Alessio Netti's avatar
Alessio Netti committed
760
      LOG(info) << "    MQTT-listenAddress: " << settings.mqttListenHost << ":" << settings.mqttListenPort;
Alessio Netti's avatar
Alessio Netti committed
761
      LOG(info) << "    CacheInterval:      " << int(pluginSettings.cacheInterval/1000) << " [s]";
762
763
764
765
766
      LOG(info) << "    CleaningInterval:   " << settings.cleaningInterval << " [s]";
      LOG(info) << "    MessageThreads:     " << settings.messageThreads;
      LOG(info) << "    MessageSlots:       " << settings.messageSlots;
      LOG(info) << "    Daemonize:          " << (settings.daemonize ? "Enabled" : "Disabled");
      LOG(info) << "    Statistics:         " << (settings.statistics ? "Enabled" : "Disabled");
Alessio Netti's avatar
Alessio Netti committed
767
      LOG(info) << "    MQTT-prefix:        " << pluginSettings.mqttPrefix;
768
      LOG(info) << "    Auto-publish:       " << (pluginSettings.autoPublish ? "Enabled" : "Disabled");
Alessio Netti's avatar
Alessio Netti committed
769
770
      LOG(info) << "    Write-Dir:          " << pluginSettings.tempdir;
      LOG(info) << (settings.validateConfig ? "    Only validating config files." : "    ValidateConfig:     Disabled");
771

Alessio Netti's avatar
Alessio Netti committed
772
773
      LOG(info) << "Analytics Settings:";
      LOG(info) << "    Hierarchy:          " << (analyticsSettings.hierarchy!="" ? analyticsSettings.hierarchy : "none");
774
      LOG(info) << "    Filter:             " << (analyticsSettings.filter!="" ? analyticsSettings.filter : "none");
775
776
      LOG(info) << "    Job Filter:         " << (analyticsSettings.jobFilter != "" ? analyticsSettings.jobFilter : "none");
      LOG(info) << "    Job Match:          " << (analyticsSettings.jobMatch != "" ? analyticsSettings.jobMatch : "none");
Alessio Netti's avatar
Alessio Netti committed
777
      
778
      LOG(info) << "Cassandra Driver Settings:";
Alessio Netti's avatar
Alessio Netti committed
779
      LOG(info) << "    Address:            " << cassandraSettings.host << ":" << cassandraSettings.port;
Alessio Netti's avatar
Alessio Netti committed
780
781
782
783
784
      LOG(info) << "    TTL:                " << cassandraSettings.ttl;
      LOG(info) << "    NumThreadsIO:       " << cassandraSettings.numThreadsIo;
      LOG(info) << "    QueueSizeIO:        " << cassandraSettings.queueSizeIo;
      LOG(info) << "    CoreConnPerHost:    " << cassandraSettings.coreConnPerHost;
      LOG(info) << "    DebugLog:           " << (cassandraSettings.debugLog ? "Enabled" : "Disabled");
785
#ifdef SimpleMQTTVerbose
Alessio Netti's avatar
Alessio Netti committed
786
787
      LOG(info) << "    Username:           " << cassandraSettings.username;
	  LOG(info) << "    Password:           " << cassandraSettings.password;
788
789
790
791
#else
      LOG(info) << "    Username and password not printed.";
#endif

792
793
794
795
796
      if (restAPISettings.enabled) {
	  
	  LOG(info) << "RestAPI Settings:";
	  LOG(info) << "    REST Server: " << restAPISettings.host << ":" << restAPISettings.port;
	  LOG(info) << "    Certificate: " << restAPISettings.certificate;
Alessio Netti's avatar
Alessio Netti committed
797
	  LOG(info) << "    Private key file: " << restAPISettings.privateKey;
798
      }
799
800
      LOG_VAR(vLogLevel) << "-----  Analytics Configuration  -----";
      for(auto& p : analyticsController->getManager()->getPlugins()) {
801
          LOG_VAR(vLogLevel) << "Operator Plugin \"" << p.id << "\"";
802
803
804
805
          p.configurator->printConfig(vLogLevel);
      }
      LOG_VAR(vLogLevel) << "-----  End Configuration  -----";

Alessio Netti's avatar
Alessio Netti committed
806
      if (settings.validateConfig)
807
808
809
810
          return EXIT_SUCCESS;
      else
          analyticsController->start();

811
812
813
      /*
       * Start the MQTT Message Server.
       */
Alessio Netti's avatar
Alessio Netti committed
814
      SimpleMQTTServer ms(settings.mqttListenHost, settings.mqttListenPort, settings.messageThreads, settings.messageSlots);
815
      
816
817
818
      ms.setMessageCallback(mqttCallback);
      ms.start();

Alessio Netti's avatar
Logging    
Alessio Netti committed
819
      LOG(info) << "MQTT Server running...";
820
      
821
822
823
      /*
       * Start the HTTP Server for the REST API
       */
824
      CARestAPI* httpsServer = nullptr;
825
      if (restAPISettings.enabled) {
826
827
828
829
          httpsServer = new CARestAPI(restAPISettings, &mySensorCache, analyticsController);
          config.readRestAPIUsers(httpsServer);
          httpsServer->start();
          LOG(info) <<  "HTTP Server running...";
830
      }
831

832
833
834
835
836
837
      /*
       * Run (hopefully) forever...
       */
      keepRunning = 1;
      timeval start, end;
      double elapsed;
838
839
840
      msgCtr = 0;
      pmsgCtr = 0;
      readingCtr = 0;
841

Alessio Netti's avatar
Alessio Netti committed
842
843
844
      gettimeofday(&start, NULL);
      uint64_t lastCleanup = start.tv_sec;

845
      LOG(info) << "Collect Agent running...";
846
847
      while(keepRunning) {
          gettimeofday(&start, NULL);
Alessio Netti's avatar
Alessio Netti committed
848
849
850
851
852
853
854
          if(start.tv_sec - lastCleanup > settings.cleaningInterval) {
              uint64_t purged = mySensorCache.clean(settings.cleaningInterval * 1000000000);
              lastCleanup = start.tv_sec;
              if(purged > 0)
                  LOG(info) << "Cache: purged " << purged << " obsolete entries";
          }

855
          sleep(60);
856
857
858
859
          /* not really thread safe but will do the job */
          gettimeofday(&end, NULL);
          elapsed = (end.tv_sec - start.tv_sec) * 1000.0;
          elapsed += (end.tv_usec - start.tv_usec) / 1000.0;
860
          float publish = msgCtr?(pmsgCtr*100.0)/msgCtr:0;
Alessio Netti's avatar
Alessio Netti committed
861
          if (settings.statistics && keepRunning) {
862
              LOG(info) << "Performance: " << (readingCtr/elapsed)*1000.0 << " inserts/s, " << (msgCtr/elapsed)*1000.0 << " messages/s (" << publish << "% PUBLISH)";
863
              LOG(info) << "Analytics Performance: " << (analyticsController->getReadingCtr()/elapsed)*1000.0 << " inserts/s ";
864
          }
865
          msgCtr = 0;
866
          pmsgCtr = 0;
867
	  readingCtr = 0;
868
869
      }

Alessio Netti's avatar
Logging    
Alessio Netti committed
870
      LOG(info) << "Stopping...";
871
      analyticsController->stop();
872
      ms.stop();
Alessio Netti's avatar
Logging    
Alessio Netti committed
873
      LOG(info) << "MQTT Server stopped...";
874
875
876
877
878
      if (restAPISettings.enabled) {
	  httpsServer->stop();
	  delete httpsServer;
	  LOG(info) << "HTTP Server stopped...";
      }
879
      delete mySensorDataStore;
Alessio Netti's avatar
Alessio Netti committed
880
      delete myJobDataStore;
881
      delete mySensorConfig;
882
      delete myCaliEvtDataStore;
Axel Auweter's avatar
Axel Auweter committed
883
884
      dcdbConn->disconnect();
      delete dcdbConn;
885
      delete metadataStore;
Alessio Netti's avatar
Logging    
Alessio Netti committed
886
      LOG(info) << "Collect Agent closed. Bye bye...";
887
  }
888
889
890
891
  catch (const std::runtime_error& e) {
      LOG(fatal) <<  e.what();
      return EXIT_FAILURE;
  }
892
  catch (const exception& e) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
893
      LOG(fatal) << "Exception: " << e.what();
894
      abrt(EXIT_FAILURE, INTERR);
895
896
  }

897
  return EXIT_SUCCESS;
898
}
899
900