Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

collectagent.cpp 34.2 KB
Newer Older
1
2
//================================================================================
// Name        : collectagent.cpp
3
// Author      : Axel Auweter
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
// Copyright   : Leibniz Supercomputing Centre
6
// Description : Main code of the CollectAgent
7
8
9
10
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
11
// Copyright (C) 2011-2019 Leibniz Supercomputing Centre
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
27

28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
 * @defgroup ca Collect Agent
 *
 * @brief MQTT message broker in between pusher and storage backend.
 *
 * @details Collect Agent is a intermediary between one or multiple Pusher
 *          instances and one storage backend. It runs a reduced custom MQTT
 *          message server. Collect Agent receives data from Pusher
 *          via MQTT messages and stores them in the storage via libdcdb.
 */

/**
 * @file collectagent.cpp
 *
 * @brief Main function for the DCDB Collect Agent.
 *
 * @ingroup ca
 */

47
#include <cstdlib>
48
#include <signal.h>
49
50
#include <unistd.h>
#include <string>
51

52
#include <boost/date_time/posix_time/posix_time.hpp>
Micha Müller's avatar
Micha Müller committed
53
54
55
#include <boost/foreach.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>
56

57
#include <dcdb/connection.h>
58
#include <dcdb/sensordatastore.h>
Alessio Netti's avatar
Alessio Netti committed
59
#include <dcdb/jobdatastore.h>
60
#include <dcdb/calievtdatastore.h>
61
#include <dcdb/sensorconfig.h>
62
#include <dcdb/version.h>
63
#include <dcdb/sensor.h>
64
#include "version.h"
65

66
#include "CARestAPI.h"
67
#include "configuration.h"
68
#include "simplemqttserver.h"
69
#include "messaging.h"
70
#include "abrt.h"
71
#include "dcdbdaemon.h"
72
#include "sensorcache.h"
73
74
#include "analyticscontroller.h"
#include "../analytics/includes/QueryEngine.h"
75

76
77
78
#define __STDC_FORMAT_MACROS
#include <inttypes.h>

79
80
81
82
83
84
85
/**
 * Uncomment and recompile to activate CollectAgent's special benchmark mode.
 * In this mode, all received messages will be discarded and no data is stored
 * in the storage backend.
 */
//#define BENCHMARK_MODE

86
using namespace std;
87
88

int keepRunning;
89
bool statistics;
90
91
uint64_t msgCtr;
uint64_t pmsgCtr;
92
uint64_t readingCtr;
Alessio Netti's avatar
Alessio Netti committed
93
SensorCache mySensorCache;
94
AnalyticsController* analyticsController;
95
DCDB::Connection* dcdbConn;
96
DCDB::SensorDataStore *mySensorDataStore;
Alessio Netti's avatar
Alessio Netti committed
97
DCDB::JobDataStore *myJobDataStore;
98
DCDB::SensorConfig *mySensorConfig;
99
DCDB::CaliEvtDataStore *myCaliEvtDataStore;
100
MetadataStore *metadataStore;
101
DCDB::SCError err;
102
QueryEngine& queryEngine = QueryEngine::getInstance();
103
logger_t lg;
104

105
bool jobQueryCallback(const string& jobId, const uint64_t startTs, const uint64_t endTs, vector<qeJobData>& buffer, const bool rel, const bool range) {
Alessio Netti's avatar
Alessio Netti committed
106
107
108
109
110
111
112
113
114
115
    std::list<JobData> tempList;
    JobData   tempData;
    qeJobData tempQeData;
    JDError err;
    if(range) {
        // Getting a list of jobs in the given time range
        uint64_t now = getTimestamp();
        uint64_t startTsInt = rel ? now - startTs : startTs;
        uint64_t endTsInt = rel ? now - endTs : endTs;
        DCDB::TimeStamp start(startTsInt), end(endTsInt);
116
        err = myJobDataStore->getJobsInIntervalRunning(tempList, start, end);
117
        if(err != JD_OK) return false;
Alessio Netti's avatar
Alessio Netti committed
118
119
120
    } else {
        // Getting a single job by id
        err = myJobDataStore->getJobById(tempData, jobId);
121
        if(err != JD_OK) return false;
Alessio Netti's avatar
Alessio Netti committed
122
123
124
125
126
127
128
129
130
        tempList.push_back(tempData);
    }
    
    for(auto& jd : tempList) {
        tempQeData.jobId = jd.jobId;
        tempQeData.userId = jd.userId;
        tempQeData.startTime = jd.startTime.getRaw();
        tempQeData.endTime = jd.endTime.getRaw();
        tempQeData.nodes = jd.nodes;
131
        buffer.push_back(tempQeData);
Alessio Netti's avatar
Alessio Netti committed
132
    }
133
    return true;
Alessio Netti's avatar
Alessio Netti committed
134
135
}

136
bool sensorGroupQueryCallback(const std::vector<string>& names, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>& buffer, const bool rel) {
137
138
139
    // Returning NULL if the query engine is being updated
    if(queryEngine.updating.load()) return false;
    ++queryEngine.access;
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
    
    std::list<DCDB::SensorId> topics;
    std::string topic;
    sensorCache_t& sensorMap = mySensorCache.getSensorMap();
    size_t successCtr = 0;
    for(const auto& name : names) {
        // Getting the topic of the queried sensor from the Navigator
        // If not found, we try to use the input name as topic
        try {
            topic = queryEngine.getNavigator()->getNodeTopic(name);
        } catch (const std::domain_error &e) { topic = name; }
        DCDB::SensorId sid;
        // Creating a SID to perform the query
        if (sid.mqttTopicConvert(topic)) {
            if (sensorMap.count(sid) > 0 && sensorMap[sid].getView(startTs, endTs, buffer, rel)) {
                // Data was found, can continue to next SID
                successCtr++;
            } else {
                // This happens only if no data was found in the local cache
                topics.push_back(sid);
            }
161
        }
162
    }
163
164
165
166
167
168
169
170
171
172
173
174
175
    // If we are here then some sensors were not found in the cache - we need to fetch data from Cassandra
    if(!topics.empty()) {
        try {
            std::list <DCDB::SensorDataStoreReading> results;
            uint64_t now = getTimestamp();
            //Converting relative timestamps to absolute
            uint64_t startTsInt = rel ? now - startTs : startTs;
            uint64_t endTsInt = rel ? now - endTs : endTs;
            DCDB::TimeStamp start(startTsInt), end(endTsInt);
            uint16_t startWs=start.getWeekstamp(), endWs=end.getWeekstamp();
            // If timestamps are equal we perform a fuzzy query
            if(startTsInt == endTsInt) {
                topics.front().setRsvd(startWs);
176
                mySensorDataStore->fuzzyQuery(results, topics, start, 3600000000000, false);
177
178
179
180
181
            }
            // Else, we iterate over the weekstamps (if more than one) and perform range queries
            else {
                for(uint16_t currWs=startWs; currWs<=endWs; currWs++) {
                    topics.front().setRsvd(currWs);
182
                    mySensorDataStore->query(results, topics, start, end, DCDB::AGGREGATE_NONE, false);
183
184
185
186
187
188
189
190
191
192
193
194
                }
            }

            if (!results.empty()) {
                successCtr++;
                reading_t reading;
                for (const auto &r : results) {
                    reading.value = r.value;
                    reading.timestamp = r.timeStamp.getRaw();
                    buffer.push_back(reading);
                }
            }
195
        }
196
        catch (const std::exception &e) {}
197
    }
198
    
199
    --queryEngine.access;
200
201
202
203
204
205
206
207
208
    return successCtr>0;
}

bool sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>& buffer, const bool rel) {
    // Returning NULL if the query engine is being updated
    if(queryEngine.updating.load()) return false;
    std::vector<std::string> nameWrapper;
    nameWrapper.push_back(name);
    return sensorGroupQueryCallback(nameWrapper, startTs, endTs, buffer, rel);
209
210
}

211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
bool metadataQueryCallback(const string& name, SensorMetadata& buffer) {
    // Returning NULL if the query engine is being updated
    if(queryEngine.updating.load()) return false;
    ++queryEngine.access;
    std::string topic=name;
    // Getting the topic of the queried sensor from the Navigator
    // If not found, we try to use the input name as topic
    try {
        topic = queryEngine.getNavigator()->getNodeTopic(name);
    } catch(const std::domain_error& e) {}
    
    if(metadataStore->getMap().count(topic)) {
        buffer = metadataStore->get(topic);
    } else {
        // If we are here then the sensor was not found in the cache - we need to fetch data from Cassandra
        try {
            DCDB::PublicSensor publicSensor;
            if (mySensorConfig->getPublicSensorByName(publicSensor, topic.c_str()) != SC_OK) {
                --queryEngine.access;
                return false;
            }
            buffer = Configuration::publicSensorToMetadata(publicSensor);
        }
        catch (const std::exception &e) {
            --queryEngine.access;
            return false;
        }
    }
    --queryEngine.access;
    return true;
}

243
/* Normal termination (SIGINT, CTRL+C) */
244
245
void sigHandler(int sig)
{
Alessio Netti's avatar
Alessio Netti committed
246
247
248
249
250
  boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
  if( sig == SIGINT )
      LOG(fatal) << "Received SIGINT";
  else if( sig == SIGTERM )
      LOG(fatal) << "Received SIGTERM";
251
252
253
  keepRunning = 0;
}

254
255
256
257
258
259
/* Crash */
void abrtHandler(int sig)
{
  abrt(EXIT_FAILURE, SIGNAL);
}

260
int mqttCallback(SimpleMQTTMessage *msg)
261
262
263
264
{
  /*
   * Increment the msgCtr/vmsgCtr for statistics.
   */
265
  msgCtr++;
266
267
268
  if (msg->isPublish())
    pmsgCtr++;

269
270
#ifndef BENCHMARK_MODE

271
  uint64_t len;
272
273
274
  /*
   * Decode the message and put into the database.
   */
275
  if (msg->isPublish()) {
276
      const char *topic = msg->getTopic().c_str();
277
      // We check whether the topic includes the /DCDB_MAP/ keyword, indicating that the payload will contain the
278
279
      // sensor's name. In that case, we set the mappingMessage flag to true, and filter the keyword out of the prefix
      // We use strncmp as it is the most efficient way to do it
Alessio Netti's avatar
Alessio Netti committed
280
      if (strncmp(topic, DCDB_MAP, DCDB_MAP_LEN) == 0) {
281
          if ((len = msg->getPayloadLength()) == 0) {
282
              LOG(error) << "Empty sensor publish message received!";
283
284
              return 1;
          }
285

286
          string payload((char *) msg->getPayload(), len);
287
288
289
290
291
292
293
294
295
          //If the topic includes the extended /DCDB_MAP/METADATA/ keyword, we assume a JSON metadata packet is encoded
          if(strncmp(topic, DCDB_MET, DCDB_MET_LEN) == 0) {
              SensorMetadata sm;
              try {
                  sm.parseJSON(payload);
              } catch (const std::exception &e) {
                  LOG(error) << "Invalid metadata packed received!";
                  return 1;
              }
296
              if(sm.isValid()) {
297
                  err = mySensorConfig->publishSensor(sm);
298
299
                  metadataStore->store(*sm.getPattern(), sm);
              }
300
301
          } else {
              err = mySensorConfig->publishSensor(payload.c_str(), topic + DCDB_MAP_LEN);
302
          }
303
304
305
306

          // PublishSensor does most of the error checking for us
          switch (err) {
              case DCDB::SC_INVALIDPATTERN:
Alessio Netti's avatar
Logging    
Alessio Netti committed
307
                  LOG(error) << "Invalid sensor topic : " << msg->getTopic();
308
309
                  return 1;
              case DCDB::SC_INVALIDPUBLICNAME:
310
                  LOG(error) << "Invalid sensor public name.";
311
312
                  return 1;
              case DCDB::SC_INVALIDSESSION:
Alessio Netti's avatar
Logging    
Alessio Netti committed
313
                  LOG(error) << "Cannot reach sensor data store.";
314
315
316
317
                  return 1;
              default:
                  break;
          }
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
      } else if (strncmp(topic, DCDB_CALIEVT, DCDB_CALIEVT_LEN) == 0) {
          /*
           * Special message case. This message contains a Caliper Event data
           * string that is encoded in the MQTT topic. Its payload consists of
           * usual timestamp-value pairs.
           * Data from this messages will be stored in a separate table managed
           * by CaliEvtDataStore class.
           */
          std::string topicStr(msg->getTopic());
          mqttPayload buf, *payload;

          len = msg->getPayloadLength();
          //TODO support case that no timestamp is given?
          //retrieve timestamp and value from the payload
          if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
              payload = (mqttPayload *) msg->getPayload();
          } else {
              //this message is malformed -> ignore
              LOG(error) << "Message malformed";
              return 1;
          }

          /*
           * Decode message topic in actual sensor topic and string data.
           * "/:/" is used as delimiter between topic and data.
           */
          topicStr.erase(0, DCDB_CALIEVT_LEN);
          size_t pos = topicStr.find("/:/");
          if (pos == std::string::npos) {
              // topic is malformed -> ignore
              LOG(error) << "CaliEvt topic malformed";
              return 1;
          }
          const std::string data(topicStr, pos+3);
          topicStr.erase(pos);

          /*
           * We use the same MQTT-topic/SensorId infrastructure as actual sensor
           * data readings to sort related events.
           * Check if we can decode the event topic into a valid SensorId. If
           * successful, store the record in the database.
           */
          DCDB::SensorId sid;
          if (sid.mqttTopicConvert(topicStr)) {
362
              //std::list<DCDB::CaliEvtData> events;
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
              DCDB::CaliEvtData e;
              e.eventId   = sid;
              e.event     = data;
              for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
                  e.timeStamp = payload[i].timestamp;

                  /**
                   * We want an exhaustive list of all events ordered by their
                   * time of occurrence. Payload values should always be
                   * one. Other values currently indicate a malformed message.
                   *
                   * In the future, the value field could be used to aggregate
                   * multiple equal events that occurred in the same plugin
                   * read cycle.
                   */
                  if(payload[i].value != 1) {
                      LOG(error) << "CaliEvt message malformed. Value != 1";
                      return 1;
                  }

383
384
                  myCaliEvtDataStore->insert(e, metadataStore->getTTL(topicStr));
                  //events.push_back(e);
385
              }
386
              //myCaliEvtDataStore->insertBatch(events, metadataStore->getTTL(topicStr));
387
388
389
          } else {
              LOG(error) << "Topic could not be converted to SID";
          }
Micha Müller's avatar
Micha Müller committed
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
      } else if (strncmp(topic, DCDB_JOBDATA, DCDB_JOBDATA_LEN) == 0) {
	      /**
           * This message contains Slurm job data in JSON format. We need to
           * parse the payload and store it within the JobDataStore.
           */
	      if ((len = msg->getPayloadLength()) == 0) {
		      LOG(error) << "Empty job data message received!";
		      return 1;
	      }

	      //parse JSON into JobData object
	      string        payload((char *)msg->getPayload(), len);
	      DCDB::JobData jd;
	      try {
		      boost::property_tree::iptree config;
		      std::istringstream           str(payload);
		      boost::property_tree::read_json(str, config);
		      BOOST_FOREACH (boost::property_tree::iptree::value_type &val, config) {
			      if (boost::iequals(val.first, "jobid")) {
				      jd.jobId = val.second.data();
			      } else if (boost::iequals(val.first, "userid")) {
				      jd.userId = val.second.data();
			      } else if (boost::iequals(val.first, "starttime")) {
				      jd.startTime = DCDB::TimeStamp((uint64_t)stoull(val.second.data()));
			      } else if (boost::iequals(val.first, "endtime")) {
				      jd.endTime = DCDB::TimeStamp((uint64_t)stoull(val.second.data()));
			      } else if (boost::iequals(val.first, "nodes")) {
				      BOOST_FOREACH (boost::property_tree::iptree::value_type &node, val.second) {
					      jd.nodes.push_back(node.second.data());
				      }
			      }
		      }
	      } catch (const std::exception &e) {
		      LOG(error) << "Invalid job data packet received!";
		      return 1;
	      }

#ifdef DEBUG
	      LOG(debug) << "JobId  = " << jd.jobId;
	      LOG(debug) << "UserId = " << jd.userId;
	      LOG(debug) << "Start  = " << jd.startTime.getString();
	      LOG(debug) << "End    = " << jd.endTime.getString();
	      LOG(debug) << "Nodes: ";
	      for (const auto &n : jd.nodes) {
		      LOG(debug) << "    " << n;
	      }
#endif

	      //store JobData into Storage Backend
	      if (jd.endTime == DCDB::TimeStamp((uint64_t)0)) {
		      //starting job data
		      if (myJobDataStore->insertJob(jd) != DCDB::JD_OK) {
			      LOG(error) << "Job data insert failed!";
			      return 1;
		      }
	      } else {
		      //ending job data
		      DCDB::JobData tmp;
		      if (myJobDataStore->getJobById(tmp, jd.jobId) != DCDB::JD_OK) {
			      LOG(error) << "Could not retrieve job to be updated!";
			      return 1;
		      }

		      if (myJobDataStore->updateEndtime(tmp.jobId, tmp.startTime, jd.endTime) != DCDB::JD_OK) {
			      LOG(error) << "Could not update end time of job!";
			      return 1;
		      }
	      }
458
      } else {
Micha Müller's avatar
Micha Müller committed
459
	      mqttPayload buf, *payload;
460

Micha Müller's avatar
Micha Müller committed
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
	      len = msg->getPayloadLength();
	      //In the 64 bit message case, the collect agent provides a timestamp
	      if (len == sizeof(uint64_t)) {
		      payload = &buf;
		      payload->value = *((int64_t *)msg->getPayload());
		      payload->timestamp = Messaging::calculateTimestamp();
		      len = sizeof(uint64_t) * 2;
	      }
	      //...otherwise it just retrieves it from the MQTT message payload.
	      else if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
		      payload = (mqttPayload *)msg->getPayload();
	      }
	      //...otherwise this message is malformed -> ignore...
	      else {
		      LOG(error) << "Message malformed";
		      return 1;
	      }
478

Micha Müller's avatar
Micha Müller committed
479
	      /*
480
481
482
483
           * Check if we can decode the message topic
           * into a valid SensorId. If successful, store
           * the record in the database.
           */
Micha Müller's avatar
Micha Müller committed
484
485
	      DCDB::SensorId sid;
	      if (sid.mqttTopicConvert(msg->getTopic())) {
486
487
488
489
490
491
492
493
494
495
496
497
#if 0
              cout << "Topic decode successful:" << endl
                  << "  Raw:            " << hex << setw(16) << setfill('0') << sid.getRaw()[0] << hex << setw(16) << setfill('0') << sid.getRaw()[1] << endl
                  << "  DeviceLocation: " << hex << setw(16) << setfill('0') << sid.getDeviceLocation() << endl
                  << "  device_id:      " << hex << setw(8) << setfill('0') << sid.getDeviceSensorId().device_id << endl
                  << "  sensor_number:  " << hex << setw(4) << setfill('0') << sid.getDeviceSensorId().sensor_number << endl << dec;

              cout << "Payload ("  << len/sizeof(mqttPayload) << " messages):"<< endl;
              for (uint64_t i=0; i<len/sizeof(mqttPayload); i++) {
                cout << "  " << i << ": ts=" << payload[i].timestamp << " val=" << payload[i].value << endl;
              }
              cout << endl;
498
#endif
499
	      std::list<DCDB::SensorDataStoreReading> readings;
Alessio Netti's avatar
Alessio Netti committed
500
501
502
503
504
505
          for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
              DCDB::SensorDataStoreReading r(sid, payload[i].timestamp, payload[i].value);
              readings.push_back(r);
              mySensorCache.storeSensor(sid, payload[i].timestamp, payload[i].value);
          }
          mySensorCache.getSensorMap()[sid].updateBatchSize(uint64_t(len / sizeof(mqttPayload)));
506
	      mySensorDataStore->insertBatch(readings, metadataStore->getTTL(msg->getTopic()));
507
	      readingCtr+= readings.size();
508

509
              //mySensorCache.dump();
510
511
          } else {
              LOG(error) << "Message with empty topic received";
512
          }
513
      }
514
  }
515
#endif
516
  return 0;
517
518
}

519
520
521



522
523
524
525
/*
 * Print usage information
 */
void usage() {
Alessio Netti's avatar
Alessio Netti committed
526
  Configuration config("", "collectagent.conf");
527
528
529
530
531
  /*
             1         2         3         4         5         6         7         8
   012345678901234567890123456789012345678901234567890123456789012345678901234567890
   */
  cout << "Usage:" << endl;
532
  cout << "  collectagent [-d] [-s] [-x] [-a] [-m<host>] [-c<host>] [-u<username>] [-p<password>] [-t<ttl>] [-v<verbosity>] <path/to/configfiles/>" << endl;
533
534
535
536
  cout << "  collectagent -h" << endl;
  cout << endl;
  
  cout << "Options:" << endl;
Alessio Netti's avatar
Alessio Netti committed
537
538
  cout << "  -m<host>      MQTT listen address     [default: " << config.mqttListenHost << ":" << config.mqttListenPort << "]" << endl;
  cout << "  -c<host>      Cassandra host          [default: " << config.cassandraSettings.host << ":" << config.cassandraSettings.port << "]" << endl;
Michael Ott's avatar
Michael Ott committed
539
540
  cout << "  -u<username>  Cassandra username      [default: none]" << endl;
  cout << "  -p<password>  Cassandra password      [default: none]" << endl;
Alessio Netti's avatar
Alessio Netti committed
541
542
  cout << "  -t<ttl>       Cassandra insert TTL    [default: " << config.cassandraSettings.ttl << "]" << endl;
  cout << "  -v<level>     Set verbosity of output [default: " << config.logLevelCmd << "]" << endl
Alessio Netti's avatar
Logging    
Alessio Netti committed
543
       << "                Can be a number between 5 (all) and 0 (fatal)." << endl;
544
  cout << endl;
Michael Ott's avatar
Michael Ott committed
545
  cout << "  -d            Daemonize" << endl;
546
  cout << "  -s            Print message stats" <<endl;
547
  cout << "  -x            Parse and print the config but do not actually start collectagent" << endl;
548
  cout << "  -a			   Enable sensor auto-publish" << endl;
Michael Ott's avatar
Michael Ott committed
549
  cout << "  -h            This help page" << endl;
550
  cout << endl;
551
552
553
}

int main(int argc, char* const argv[]) {
554
    cout << "CollectAgent " << VERSION << " (libdcdb " << DCDB::Version::getVersion() << ")" << endl << endl;
555
556

  try{
557

558
559
560
561
562
563
564
565
      // Checking if path to config is supplied
      if (argc <= 1) {
          cout << "Please specify a path to the config-directory" << endl << endl;
          usage();
          exit(EXIT_FAILURE);
      }

      // Defining options
566
      const char* opts = "m:r:c:C:u:p:t:v:dDsaxh";
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582

      // Same mechanism as in DCDBPusher - checking if help string is requested before loading config
      char ret;
      while ((ret = getopt(argc, argv, opts)) != -1) {
          switch (ret)
          {
              case 'h':
                  usage();
                  exit(EXIT_FAILURE);
                  break;
              default:
                  //do nothing (other options are read later on)
                  break;
          }
      }

583
584
      initLogging();
      auto cmdSink = setupCmdLogger();
Alessio Netti's avatar
Logging    
Alessio Netti committed
585

Alessio Netti's avatar
Alessio Netti committed
586
587
      Configuration config(argv[argc - 1], "collectagent.conf");
      if( !config.readConfig() ) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
588
          LOG(fatal) << "Failed to read global configuration!";
589
590
591
          exit(EXIT_FAILURE);
      }

Alessio Netti's avatar
Alessio Netti committed
592
593
594
595
      // References to shorten access to config parameters
      Configuration& settings = config;
      cassandra_t& cassandraSettings = config.cassandraSettings;
      pluginSettings_t& pluginSettings = config.pluginSettings;
Micha Mueller's avatar
Micha Mueller committed
596
      serverSettings_t& restAPISettings = config.restAPISettings;
Alessio Netti's avatar
Alessio Netti committed
597
      analyticsSettings_t& analyticsSettings = config.analyticsSettings;
Alessio Netti's avatar
Alessio Netti committed
598
      
599
600
      optind = 1;
      while ((ret=getopt(argc, argv, opts))!=-1) {
601
          switch(ret) {
602
              case 'a':
603
                  pluginSettings.autoPublish = true;
604
                  break;
605
              case 'm':
Alessio Netti's avatar
Alessio Netti committed
606
607
608
                  settings.mqttListenHost = parseNetworkHost(optarg);
                  settings.mqttListenPort = parseNetworkPort(optarg);
                  if(settings.mqttListenPort=="") settings.mqttListenPort = string(LISTENPORT);
609
                  break;
610
              case 'c':
Alessio Netti's avatar
Alessio Netti committed
611
612
613
                  cassandraSettings.host = parseNetworkHost(optarg);
                  cassandraSettings.port = parseNetworkPort(optarg);
                  if(cassandraSettings.port=="") cassandraSettings.port = string(CASSANDRAPORT);
614
                  break;
Michael Ott's avatar
Michael Ott committed
615
              case 'u':
Alessio Netti's avatar
Alessio Netti committed
616
                  cassandraSettings.username = optarg;
617
                  break;
Michael Ott's avatar
Michael Ott committed
618
              case 'p': {
Alessio Netti's avatar
Alessio Netti committed
619
                  cassandraSettings.password = optarg;
620
                  // What does this do? Mask the password?
Michael Ott's avatar
Michael Ott committed
621
622
623
624
625
626
627
                  size_t pwdLen = strlen(optarg);
                  memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
                  if (pwdLen > 3) {
                      memset(optarg+3, 0, pwdLen-3);
                  }
                  break;
              }
628
              case 't':
Alessio Netti's avatar
Alessio Netti committed
629
                  cassandraSettings.ttl = stoul(optarg);
630
                  break;
Alessio Netti's avatar
Logging    
Alessio Netti committed
631
              case 'v':
632
                  settings.logLevelCmd = stoi(optarg);
Alessio Netti's avatar
Logging    
Alessio Netti committed
633
                  break;
634
              case 'd':
635
              case 'D':
636
                  settings.daemonize = 1;
637
                  break;
638
              case 's':
639
                  settings.statistics = 1;
640
                  break;
641
              case 'x':
Alessio Netti's avatar
Alessio Netti committed
642
                  settings.validateConfig = true;
643
                  break;
644
              case 'h':
645
646
647
              default:
                  usage();
                  exit(EXIT_FAILURE);
648
649
650
          }
      }

651
652
      //set up logger to file
      if (settings.logLevelFile >= 0) {
Alessio Netti's avatar
Alessio Netti committed
653
	  auto fileSink = setupFileLogger(pluginSettings.tempdir, std::string("collectagent"));
654
655
656
	  fileSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelFile));
      }
      
Alessio Netti's avatar
Logging    
Alessio Netti committed
657
      //severity level may be overwritten (per option or config-file) --> set it according to globalSettings
658
659
660
      if (settings.logLevelCmd >= 0) {
	  cmdSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelCmd));
      }
Alessio Netti's avatar
Logging    
Alessio Netti committed
661

662
      /*
Alessio Netti's avatar
Alessio Netti committed
663
       * Catch SIGINT and SIGTERM signals to allow for proper server shutdowns.
664
665
       */
      signal(SIGINT, sigHandler);
Alessio Netti's avatar
Alessio Netti committed
666
      signal(SIGTERM, sigHandler);
667
668
669
670
671
672
673
674
675
676

      /*
       * Catch critical signals to allow for backtraces
       */
      signal(SIGABRT, abrtHandler);
      signal(SIGSEGV, abrtHandler);

      // Daemonizing the collectagent
      if(settings.daemonize)
          dcdbdaemon();
Alessio Netti's avatar
Alessio Netti committed
677
      
678
      // Setting the size of the sensor cache
679
      // Conversion from milliseconds to nanoseconds
Alessio Netti's avatar
Alessio Netti committed
680
      mySensorCache.setMaxHistory(uint64_t(pluginSettings.cacheInterval) * 1000000);
681

682
      //Allocate and initialize connection to Cassandra.
Alessio Netti's avatar
Alessio Netti committed
683
684
      dcdbConn = new DCDB::Connection(cassandraSettings.host, atoi(cassandraSettings.port.c_str()), 
                                      cassandraSettings.username, cassandraSettings.password);
Alessio Netti's avatar
Alessio Netti committed
685
686
      dcdbConn->setNumThreadsIo(cassandraSettings.numThreadsIo);
      dcdbConn->setQueueSizeIo(cassandraSettings.queueSizeIo);
687
      uint32_t params[1] = {cassandraSettings.coreConnPerHost};
Alessio Netti's avatar
Alessio Netti committed
688
      dcdbConn->setBackendParams(params);
Alessio Netti's avatar
Alessio Netti committed
689
      
Axel Auweter's avatar
Axel Auweter committed
690
      if (!dcdbConn->connect()) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
691
          LOG(fatal) << "Cannot connect to Cassandra!";
692
693
694
695
696
697
          exit(EXIT_FAILURE);
      }

      /*
       * Legacy behavior: Initialize the DCDB schema in Cassandra.
       */
Axel Auweter's avatar
Axel Auweter committed
698
      dcdbConn->initSchema();
Alessio Netti's avatar
Alessio Netti committed
699
      
700
701
702
      /*
       * Allocate the SensorDataStore.
       */
703
      mySensorDataStore = new DCDB::SensorDataStore(dcdbConn);
704
      mySensorConfig = new DCDB::SensorConfig(dcdbConn);
Alessio Netti's avatar
Alessio Netti committed
705
      myJobDataStore = new DCDB::JobDataStore(dcdbConn);
706
      myCaliEvtDataStore = new DCDB::CaliEvtDataStore(dcdbConn);
707
708
709

      /*
       * Set TTL for data store inserts if TTL > 0.
710
       */
711
      if (cassandraSettings.ttl > 0) {
Alessio Netti's avatar
Alessio Netti committed
712
        mySensorDataStore->setTTL(cassandraSettings.ttl);
713
714
        myCaliEvtDataStore->setTTL(cassandraSettings.ttl);
      }
Alessio Netti's avatar
Alessio Netti committed
715
      mySensorDataStore->setDebugLog(cassandraSettings.debugLog);
716
      myCaliEvtDataStore->setDebugLog(cassandraSettings.debugLog);
717

718
719
720
      // Fetching public sensor information from the Cassandra datastore
      list<DCDB::PublicSensor> publicSensors;
      metadataStore = new MetadataStore();
721
722
      if(mySensorConfig->getPublicSensorsVerbose(publicSensors)!=SC_OK)
          LOG(error) << "Failed to retrieve public sensors. Metadata Store and Sensor Navigator will be empty.";
723
      SensorMetadata sBuf;
724
      for (const auto &s : publicSensors)
725
726
          if (!s.is_virtual) {
              sBuf = Configuration::publicSensorToMetadata(s);
727
728
              if(sBuf.isValid())
                  metadataStore->store(*sBuf.getPattern(), sBuf);
729
          }
730
          
731
732
      analyticsController = new AnalyticsController(mySensorConfig, mySensorDataStore);
      analyticsController->setCache(&mySensorCache);
733
      analyticsController->setMetadataStore(metadataStore);
Alessio Netti's avatar
Alessio Netti committed
734
      queryEngine.setFilter(analyticsSettings.filter);
Alessio Netti's avatar
Alessio Netti committed
735
      queryEngine.setJobFilter(analyticsSettings.jobFilter);
Alessio Netti's avatar
Alessio Netti committed
736
      queryEngine.setSensorHierarchy(analyticsSettings.hierarchy);
737
      queryEngine.setQueryCallback(sensorQueryCallback);
738
      queryEngine.setGroupQueryCallback(sensorGroupQueryCallback);
739
      queryEngine.setMetadataQueryCallback(metadataQueryCallback);
Alessio Netti's avatar
Alessio Netti committed
740
      queryEngine.setJobQueryCallback(jobQueryCallback);
741
      if(!analyticsController->initialize(settings, argv[argc - 1]))
742
743
          return EXIT_FAILURE;
      
Alessio Netti's avatar
Alessio Netti committed
744
      LOG_LEVEL vLogLevel = settings.validateConfig ? LOG_LEVEL::info : LOG_LEVEL::debug;
745
746
747
748
      LOG_VAR(vLogLevel) << "-----  Configuration  -----";

      //print global settings in either case
      LOG(info) << "Global Settings:";
Alessio Netti's avatar
Alessio Netti committed
749
      LOG(info) << "    MQTT-listenAddress: " << settings.mqttListenHost << ":" << settings.mqttListenPort;
Alessio Netti's avatar
Alessio Netti committed
750
      LOG(info) << "    CacheInterval:      " << int(pluginSettings.cacheInterval/1000) << " [s]";
751
752
753
754
755
      LOG(info) << "    CleaningInterval:   " << settings.cleaningInterval << " [s]";
      LOG(info) << "    MessageThreads:     " << settings.messageThreads;
      LOG(info) << "    MessageSlots:       " << settings.messageSlots;
      LOG(info) << "    Daemonize:          " << (settings.daemonize ? "Enabled" : "Disabled");
      LOG(info) << "    Statistics:         " << (settings.statistics ? "Enabled" : "Disabled");
Alessio Netti's avatar
Alessio Netti committed
756
      LOG(info) << "    MQTT-prefix:        " << pluginSettings.mqttPrefix;
757
      LOG(info) << "    Auto-publish:       " << (pluginSettings.autoPublish ? "Enabled" : "Disabled");
Alessio Netti's avatar
Alessio Netti committed
758
759
      LOG(info) << "    Write-Dir:          " << pluginSettings.tempdir;
      LOG(info) << (settings.validateConfig ? "    Only validating config files." : "    ValidateConfig:     Disabled");
760

Alessio Netti's avatar
Alessio Netti committed
761
762
      LOG(info) << "Analytics Settings:";
      LOG(info) << "    Hierarchy:          " << (analyticsSettings.hierarchy!="" ? analyticsSettings.hierarchy : "none");
763
      LOG(info) << "    Filter:             " << (analyticsSettings.filter!="" ? analyticsSettings.filter : "none");
Alessio Netti's avatar
Alessio Netti committed
764
      
765
      LOG(info) << "Cassandra Driver Settings:";
Alessio Netti's avatar
Alessio Netti committed
766
      LOG(info) << "    Address:            " << cassandraSettings.host << ":" << cassandraSettings.port;
Alessio Netti's avatar
Alessio Netti committed
767
768
769
770
771
      LOG(info) << "    TTL:                " << cassandraSettings.ttl;
      LOG(info) << "    NumThreadsIO:       " << cassandraSettings.numThreadsIo;
      LOG(info) << "    QueueSizeIO:        " << cassandraSettings.queueSizeIo;
      LOG(info) << "    CoreConnPerHost:    " << cassandraSettings.coreConnPerHost;
      LOG(info) << "    DebugLog:           " << (cassandraSettings.debugLog ? "Enabled" : "Disabled");
772
#ifdef SimpleMQTTVerbose
Alessio Netti's avatar
Alessio Netti committed
773
774
      LOG(info) << "    Username:           " << cassandraSettings.username;
	  LOG(info) << "    Password:           " << cassandraSettings.password;
775
776
777
778
#else
      LOG(info) << "    Username and password not printed.";
#endif

779
780
781
782
783
      if (restAPISettings.enabled) {
	  
	  LOG(info) << "RestAPI Settings:";
	  LOG(info) << "    REST Server: " << restAPISettings.host << ":" << restAPISettings.port;
	  LOG(info) << "    Certificate: " << restAPISettings.certificate;
Alessio Netti's avatar
Alessio Netti committed
784
785
	  LOG(info) << "    Private key file: " << restAPISettings.privateKey;
	  LOG(info) << "    DH params from: " << restAPISettings.dhFile;
786
      }
787
788
      LOG_VAR(vLogLevel) << "-----  Analytics Configuration  -----";
      for(auto& p : analyticsController->getManager()->getPlugins()) {
789
          LOG_VAR(vLogLevel) << "Operator Plugin \"" << p.id << "\"";
790
791
792
793
          p.configurator->printConfig(vLogLevel);
      }
      LOG_VAR(vLogLevel) << "-----  End Configuration  -----";

Alessio Netti's avatar
Alessio Netti committed
794
      if (settings.validateConfig)
795
796
797
798
          return EXIT_SUCCESS;
      else
          analyticsController->start();

799
800
801
      /*
       * Start the MQTT Message Server.
       */
Alessio Netti's avatar
Alessio Netti committed
802
      SimpleMQTTServer ms(settings.mqttListenHost, settings.mqttListenPort, settings.messageThreads, settings.messageSlots);
803
      
804
805
806
      ms.setMessageCallback(mqttCallback);
      ms.start();

Alessio Netti's avatar
Logging    
Alessio Netti committed
807
      LOG(info) << "MQTT Server running...";
808
      
809
810
811
      /*
       * Start the HTTP Server for the REST API
       */
812
      CARestAPI* httpsServer = nullptr;
813
      if (restAPISettings.enabled) {
814
815
816
817
          httpsServer = new CARestAPI(restAPISettings, &mySensorCache, analyticsController);
          config.readRestAPIUsers(httpsServer);
          httpsServer->start();
          LOG(info) <<  "HTTP Server running...";
818
      }
819

820
821
822
823
824
825
      /*
       * Run (hopefully) forever...
       */
      keepRunning = 1;
      timeval start, end;
      double elapsed;
826
827
828
      msgCtr = 0;
      pmsgCtr = 0;
      readingCtr = 0;
829

Alessio Netti's avatar
Alessio Netti committed
830
831
832
      gettimeofday(&start, NULL);
      uint64_t lastCleanup = start.tv_sec;

833
      LOG(info) << "Collect Agent running...";
834
835
      while(keepRunning) {
          gettimeofday(&start, NULL);
Alessio Netti's avatar
Alessio Netti committed
836
837
838
839
840
841
842
          if(start.tv_sec - lastCleanup > settings.cleaningInterval) {
              uint64_t purged = mySensorCache.clean(settings.cleaningInterval * 1000000000);
              lastCleanup = start.tv_sec;
              if(purged > 0)
                  LOG(info) << "Cache: purged " << purged << " obsolete entries";
          }

843
          sleep(60);
844
845
846
847
          /* not really thread safe but will do the job */
          gettimeofday(&end, NULL);
          elapsed = (end.tv_sec - start.tv_sec) * 1000.0;
          elapsed += (end.tv_usec - start.tv_usec) / 1000.0;
848
          float publish = msgCtr?(pmsgCtr*100.0)/msgCtr:0;
Alessio Netti's avatar
Alessio Netti committed
849
          if (settings.statistics && keepRunning) {
850
              LOG(info) << "Performance: " << (readingCtr/elapsed)*1000.0 << " inserts/s, " << (msgCtr/elapsed)*1000.0 << " messages/s (" << publish << "% PUBLISH)";
851
              LOG(info) << "Analytics Performance: " << (analyticsController->getReadingCtr()/elapsed)*1000.0 << " inserts/s ";
852
          }
853
          msgCtr = 0;
854
          pmsgCtr = 0;
855
	  readingCtr = 0;
856
857
      }

Alessio Netti's avatar
Logging    
Alessio Netti committed
858
      LOG(info) << "Stopping...";
859
      analyticsController->stop();
860
      ms.stop();
Alessio Netti's avatar
Logging    
Alessio Netti committed
861
      LOG(info) << "MQTT Server stopped...";
862
863
864
865
866
      if (restAPISettings.enabled) {
	  httpsServer->stop();
	  delete httpsServer;
	  LOG(info) << "HTTP Server stopped...";
      }
867
      delete mySensorDataStore;
Alessio Netti's avatar
Alessio Netti committed
868
      delete myJobDataStore;
869
      delete mySensorConfig;
870
      delete myCaliEvtDataStore;
Axel Auweter's avatar
Axel Auweter committed
871
872
      dcdbConn->disconnect();
      delete dcdbConn;
873
      delete metadataStore;
Alessio Netti's avatar
Logging    
Alessio Netti committed
874
      LOG(info) << "Collect Agent closed. Bye bye...";
875
  }
876
  catch (const exception& e) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
877
      LOG(fatal) << "Exception: " << e.what();
878
      abrt(EXIT_FAILURE, INTERR);
879
880
  }

881
  return EXIT_SUCCESS;
882
}
883
884