Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

collectagent.cpp 37.6 KB
Newer Older
1
2
//================================================================================
// Name        : collectagent.cpp
3
// Author      : Axel Auweter
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
// Copyright   : Leibniz Supercomputing Centre
6
// Description : Main code of the CollectAgent
7
8
9
10
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
11
// Copyright (C) 2011-2019 Leibniz Supercomputing Centre
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
27

28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
 * @defgroup ca Collect Agent
 *
 * @brief MQTT message broker in between pusher and storage backend.
 *
 * @details Collect Agent is a intermediary between one or multiple Pusher
 *          instances and one storage backend. It runs a reduced custom MQTT
 *          message server. Collect Agent receives data from Pusher
 *          via MQTT messages and stores them in the storage via libdcdb.
 */

/**
 * @file collectagent.cpp
 *
 * @brief Main function for the DCDB Collect Agent.
 *
 * @ingroup ca
 */

47
#include <cstdlib>
48
#include <signal.h>
49
50
#include <unistd.h>
#include <string>
51

52
#include <boost/date_time/posix_time/posix_time.hpp>
Micha Müller's avatar
Micha Müller committed
53
54
55
#include <boost/foreach.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>
56

57
#include <dcdb/connection.h>
58
#include <dcdb/sensordatastore.h>
Alessio Netti's avatar
Alessio Netti committed
59
#include <dcdb/jobdatastore.h>
60
#include <dcdb/calievtdatastore.h>
61
#include <dcdb/sensorconfig.h>
62
#include <dcdb/version.h>
63
#include <dcdb/sensor.h>
64
#include "version.h"
65

66
#include "CARestAPI.h"
67
#include "configuration.h"
68
#include "simplemqttserver.h"
69
#include "messaging.h"
70
#include "abrt.h"
71
#include "dcdbdaemon.h"
72
#include "sensorcache.h"
73
74
#include "analyticscontroller.h"
#include "../analytics/includes/QueryEngine.h"
75

76
77
78
#define __STDC_FORMAT_MACROS
#include <inttypes.h>

79
80
81
82
83
84
85
/**
 * Uncomment and recompile to activate CollectAgent's special benchmark mode.
 * In this mode, all received messages will be discarded and no data is stored
 * in the storage backend.
 */
//#define BENCHMARK_MODE

86
using namespace std;
87
88

int keepRunning;
Alessio Netti's avatar
Alessio Netti committed
89
int retCode = EXIT_SUCCESS;
90
uint64_t msgCtr;
91
uint64_t readingCtr;
92
93
uint64_t dbQueryCtr;
uint64_t cachedQueryCtr;
Alessio Netti's avatar
Alessio Netti committed
94
SensorCache mySensorCache;
95
AnalyticsController* analyticsController;
96
DCDB::Connection* dcdbConn;
97
DCDB::SensorDataStore *mySensorDataStore;
Alessio Netti's avatar
Alessio Netti committed
98
DCDB::JobDataStore *myJobDataStore;
99
DCDB::SensorConfig *mySensorConfig;
100
DCDB::CaliEvtDataStore *myCaliEvtDataStore;
101
MetadataStore *metadataStore;
Alessio Netti's avatar
Alessio Netti committed
102
CARestAPI* httpsServer = nullptr;
103
DCDB::SCError err;
104
QueryEngine& queryEngine = QueryEngine::getInstance();
105
logger_t lg;
106

107
bool jobQueryCallback(const string& jobId, const uint64_t startTs, const uint64_t endTs, vector<qeJobData>& buffer, const bool rel, const bool range, const string& domainId) {
Alessio Netti's avatar
Alessio Netti committed
108
109
110
111
112
113
114
115
116
117
    std::list<JobData> tempList;
    JobData   tempData;
    qeJobData tempQeData;
    JDError err;
    if(range) {
        // Getting a list of jobs in the given time range
        uint64_t now = getTimestamp();
        uint64_t startTsInt = rel ? now - startTs : startTs;
        uint64_t endTsInt = rel ? now - endTs : endTs;
        DCDB::TimeStamp start(startTsInt), end(endTsInt);
118
        err = myJobDataStore->getJobsInIntervalRunning(tempList, start, end, domainId);
119
        if(err != JD_OK) return false;
Alessio Netti's avatar
Alessio Netti committed
120
121
    } else {
        // Getting a single job by id
122
        err = myJobDataStore->getJobById(tempData, jobId, domainId);
123
        if(err != JD_OK) return false;
Alessio Netti's avatar
Alessio Netti committed
124
125
126
127
        tempList.push_back(tempData);
    }
    
    for(auto& jd : tempList) {
128
        tempQeData.domainId = jd.domainId;
Alessio Netti's avatar
Alessio Netti committed
129
130
131
132
133
        tempQeData.jobId = jd.jobId;
        tempQeData.userId = jd.userId;
        tempQeData.startTime = jd.startTime.getRaw();
        tempQeData.endTime = jd.endTime.getRaw();
        tempQeData.nodes = jd.nodes;
134
        buffer.push_back(tempQeData);
Alessio Netti's avatar
Alessio Netti committed
135
    }
136
    return true;
Alessio Netti's avatar
Alessio Netti committed
137
138
}

139
bool sensorGroupQueryCallback(const std::vector<string>& names, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>& buffer, const bool rel, const uint64_t tol) {
140
141
142
    // Returning NULL if the query engine is being updated
    if(queryEngine.updating.load()) return false;
    ++queryEngine.access;
143
144
145
146
147
148
149
150
151
152
153
154
155
156
    
    std::list<DCDB::SensorId> topics;
    std::string topic;
    sensorCache_t& sensorMap = mySensorCache.getSensorMap();
    size_t successCtr = 0;
    for(const auto& name : names) {
        // Getting the topic of the queried sensor from the Navigator
        // If not found, we try to use the input name as topic
        try {
            topic = queryEngine.getNavigator()->getNodeTopic(name);
        } catch (const std::domain_error &e) { topic = name; }
        DCDB::SensorId sid;
        // Creating a SID to perform the query
        if (sid.mqttTopicConvert(topic)) {
157
            mySensorCache.wait();
158
            if (sensorMap.count(sid) > 0 && sensorMap[sid].getView(startTs, endTs, buffer, rel, tol)) {
159
160
161
162
                // Data was found, can continue to next SID
                successCtr++;
            } else {
                // This happens only if no data was found in the local cache
163
164
                topics.push_back(sid);
            }
165
            mySensorCache.release();
166
        }
167
    }
168
169
170
171
    if (successCtr) {
	cachedQueryCtr+= buffer.size();
    }
    
172
173
174
175
176
177
178
179
180
181
182
183
184
    // If we are here then some sensors were not found in the cache - we need to fetch data from Cassandra
    if(!topics.empty()) {
        try {
            std::list <DCDB::SensorDataStoreReading> results;
            uint64_t now = getTimestamp();
            //Converting relative timestamps to absolute
            uint64_t startTsInt = rel ? now - startTs : startTs;
            uint64_t endTsInt = rel ? now - endTs : endTs;
            DCDB::TimeStamp start(startTsInt), end(endTsInt);
            uint16_t startWs=start.getWeekstamp(), endWs=end.getWeekstamp();
            // If timestamps are equal we perform a fuzzy query
            if(startTsInt == endTsInt) {
                topics.front().setRsvd(startWs);
185
                mySensorDataStore->fuzzyQuery(results, topics, start, tol, false);
186
187
188
189
190
            }
            // Else, we iterate over the weekstamps (if more than one) and perform range queries
            else {
                for(uint16_t currWs=startWs; currWs<=endWs; currWs++) {
                    topics.front().setRsvd(currWs);
191
                    mySensorDataStore->query(results, topics, start, end, DCDB::AGGREGATE_NONE, false);
192
193
194
195
196
197
                }
            }

            if (!results.empty()) {
                successCtr++;
                reading_t reading;
198
		dbQueryCtr+= results.size();
199
200
201
202
203
204
                for (const auto &r : results) {
                    reading.value = r.value;
                    reading.timestamp = r.timeStamp.getRaw();
                    buffer.push_back(reading);
                }
            }
205
        }
206
        catch (const std::exception &e) {}
207
    }
208
    
209
    --queryEngine.access;
210
211
212
    return successCtr>0;
}

213
bool sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>& buffer, const bool rel, const uint64_t tol) {
214
215
216
217
    // Returning NULL if the query engine is being updated
    if(queryEngine.updating.load()) return false;
    std::vector<std::string> nameWrapper;
    nameWrapper.push_back(name);
218
    return sensorGroupQueryCallback(nameWrapper, startTs, endTs, buffer, rel, tol);
219
220
}

221
222
223
224
225
226
227
228
229
230
231
bool metadataQueryCallback(const string& name, SensorMetadata& buffer) {
    // Returning NULL if the query engine is being updated
    if(queryEngine.updating.load()) return false;
    ++queryEngine.access;
    std::string topic=name;
    // Getting the topic of the queried sensor from the Navigator
    // If not found, we try to use the input name as topic
    try {
        topic = queryEngine.getNavigator()->getNodeTopic(name);
    } catch(const std::domain_error& e) {}
    
232
    bool local = false;
233
234
235
236
    metadataStore->wait();
    if(metadataStore->getMap().count(topic)) {
        buffer = metadataStore->get(topic);
        local = true;
237
    }
238
239
    metadataStore->release();
        
240
    if(!local) {
241
242
243
244
245
246
247
        // If we are here then the sensor was not found in the cache - we need to fetch data from Cassandra
        try {
            DCDB::PublicSensor publicSensor;
            if (mySensorConfig->getPublicSensorByName(publicSensor, topic.c_str()) != SC_OK) {
                --queryEngine.access;
                return false;
            }
248
            buffer = DCDB::PublicSensor::publicSensorToMetadata(publicSensor);
249
250
251
252
253
254
255
256
257
258
        }
        catch (const std::exception &e) {
            --queryEngine.access;
            return false;
        }
    }
    --queryEngine.access;
    return true;
}

259
/* Normal termination (SIGINT, CTRL+C) */
260
261
void sigHandler(int sig)
{
Alessio Netti's avatar
Alessio Netti committed
262
  boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
Alessio Netti's avatar
Alessio Netti committed
263
  if( sig == SIGINT ) {
Alessio Netti's avatar
Alessio Netti committed
264
      LOG(fatal) << "Received SIGINT";
Alessio Netti's avatar
Alessio Netti committed
265
266
      retCode = EXIT_SUCCESS;
  } else if( sig == SIGTERM ) {
Alessio Netti's avatar
Alessio Netti committed
267
      LOG(fatal) << "Received SIGTERM";
Alessio Netti's avatar
Alessio Netti committed
268
269
270
271
272
      retCode = EXIT_SUCCESS;
  } else if( sig == SIGUSR1 ) {
      LOG(fatal) << "Received SIGUSR1 via REST API";
      retCode = !httpsServer ? EXIT_SUCCESS : httpsServer->getReturnCode();
  }
273
274
275
  keepRunning = 0;
}

276
277
278
279
280
281
/* Crash */
void abrtHandler(int sig)
{
  abrt(EXIT_FAILURE, SIGNAL);
}

282
int mqttCallback(SimpleMQTTMessage *msg)
283
284
{
  /*
Michael Ott's avatar
Michael Ott committed
285
   * Increment the msgCtr for statistics.
286
   */
287
  msgCtr++;
288

289
290
#ifndef BENCHMARK_MODE

291
  uint64_t len;
292
293
294
  /*
   * Decode the message and put into the database.
   */
295
  if (msg->isPublish()) {
296
      const char *topic = msg->getTopic().c_str();
297
      // We check whether the topic includes the /DCDB_MAP/ keyword, indicating that the payload will contain the
298
299
      // sensor's name. In that case, we set the mappingMessage flag to true, and filter the keyword out of the prefix
      // We use strncmp as it is the most efficient way to do it
Alessio Netti's avatar
Alessio Netti committed
300
      if (strncmp(topic, DCDB_MAP, DCDB_MAP_LEN) == 0) {
301
          if ((len = msg->getPayloadLength()) == 0) {
302
              LOG(error) << "Empty sensor publish message received!";
303
304
              return 1;
          }
305

306
          string payload((char *) msg->getPayload(), len);
307
308
309
310
311
312
313
314
315
          //If the topic includes the extended /DCDB_MAP/METADATA/ keyword, we assume a JSON metadata packet is encoded
          if(strncmp(topic, DCDB_MET, DCDB_MET_LEN) == 0) {
              SensorMetadata sm;
              try {
                  sm.parseJSON(payload);
              } catch (const std::exception &e) {
                  LOG(error) << "Invalid metadata packed received!";
                  return 1;
              }
316
              if(sm.isValid()) {
317
                  err = mySensorConfig->publishSensor(sm);
318
319
                  metadataStore->store(*sm.getPattern(), sm);
              }
320
321
          } else {
              err = mySensorConfig->publishSensor(payload.c_str(), topic + DCDB_MAP_LEN);
322
          }
323
324
325
326

          // PublishSensor does most of the error checking for us
          switch (err) {
              case DCDB::SC_INVALIDPATTERN:
Alessio Netti's avatar
Logging    
Alessio Netti committed
327
                  LOG(error) << "Invalid sensor topic : " << msg->getTopic();
328
329
                  return 1;
              case DCDB::SC_INVALIDPUBLICNAME:
330
                  LOG(error) << "Invalid sensor public name.";
331
332
                  return 1;
              case DCDB::SC_INVALIDSESSION:
Alessio Netti's avatar
Logging    
Alessio Netti committed
333
                  LOG(error) << "Cannot reach sensor data store.";
334
335
336
337
                  return 1;
              default:
                  break;
          }
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
      } else if (strncmp(topic, DCDB_CALIEVT, DCDB_CALIEVT_LEN) == 0) {
          /*
           * Special message case. This message contains a Caliper Event data
           * string that is encoded in the MQTT topic. Its payload consists of
           * usual timestamp-value pairs.
           * Data from this messages will be stored in a separate table managed
           * by CaliEvtDataStore class.
           */
          std::string topicStr(msg->getTopic());
          mqttPayload buf, *payload;

          len = msg->getPayloadLength();
          //TODO support case that no timestamp is given?
          //retrieve timestamp and value from the payload
          if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
              payload = (mqttPayload *) msg->getPayload();
          } else {
              //this message is malformed -> ignore
              LOG(error) << "Message malformed";
              return 1;
          }

          /*
           * Decode message topic in actual sensor topic and string data.
           * "/:/" is used as delimiter between topic and data.
           */
          topicStr.erase(0, DCDB_CALIEVT_LEN);
          size_t pos = topicStr.find("/:/");
          if (pos == std::string::npos) {
              // topic is malformed -> ignore
              LOG(error) << "CaliEvt topic malformed";
              return 1;
          }
          const std::string data(topicStr, pos+3);
          topicStr.erase(pos);

          /*
           * We use the same MQTT-topic/SensorId infrastructure as actual sensor
           * data readings to sort related events.
           * Check if we can decode the event topic into a valid SensorId. If
           * successful, store the record in the database.
           */
          DCDB::SensorId sid;
          if (sid.mqttTopicConvert(topicStr)) {
382
              //std::list<DCDB::CaliEvtData> events;
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
              DCDB::CaliEvtData e;
              e.eventId   = sid;
              e.event     = data;
              for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
                  e.timeStamp = payload[i].timestamp;

                  /**
                   * We want an exhaustive list of all events ordered by their
                   * time of occurrence. Payload values should always be
                   * one. Other values currently indicate a malformed message.
                   *
                   * In the future, the value field could be used to aggregate
                   * multiple equal events that occurred in the same plugin
                   * read cycle.
                   */
                  if(payload[i].value != 1) {
                      LOG(error) << "CaliEvt message malformed. Value != 1";
                      return 1;
                  }

403
404
                  myCaliEvtDataStore->insert(e, metadataStore->getTTL(topicStr));
                  //events.push_back(e);
405
              }
406
              //myCaliEvtDataStore->insertBatch(events, metadataStore->getTTL(topicStr));
407
408
409
          } else {
              LOG(error) << "Topic could not be converted to SID";
          }
Micha Müller's avatar
Micha Müller committed
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
      } else if (strncmp(topic, DCDB_JOBDATA, DCDB_JOBDATA_LEN) == 0) {
	      /**
           * This message contains Slurm job data in JSON format. We need to
           * parse the payload and store it within the JobDataStore.
           */
	      if ((len = msg->getPayloadLength()) == 0) {
		      LOG(error) << "Empty job data message received!";
		      return 1;
	      }

	      //parse JSON into JobData object
	      string        payload((char *)msg->getPayload(), len);
	      DCDB::JobData jd;
	      try {
		      boost::property_tree::iptree config;
		      std::istringstream           str(payload);
		      boost::property_tree::read_json(str, config);
		      BOOST_FOREACH (boost::property_tree::iptree::value_type &val, config) {
			      if (boost::iequals(val.first, "jobid")) {
				      jd.jobId = val.second.data();
430
431
432
			      } else if (boost::iequals(val.first, "domainid")) {
                      jd.domainId = val.second.data();
                  } else if (boost::iequals(val.first, "userid")) {
Micha Müller's avatar
Micha Müller committed
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
				      jd.userId = val.second.data();
			      } else if (boost::iequals(val.first, "starttime")) {
				      jd.startTime = DCDB::TimeStamp((uint64_t)stoull(val.second.data()));
			      } else if (boost::iequals(val.first, "endtime")) {
				      jd.endTime = DCDB::TimeStamp((uint64_t)stoull(val.second.data()));
			      } else if (boost::iequals(val.first, "nodes")) {
				      BOOST_FOREACH (boost::property_tree::iptree::value_type &node, val.second) {
					      jd.nodes.push_back(node.second.data());
				      }
			      }
		      }
	      } catch (const std::exception &e) {
		      LOG(error) << "Invalid job data packet received!";
		      return 1;
	      }

#ifdef DEBUG
	      LOG(debug) << "JobId  = " << jd.jobId;
	      LOG(debug) << "UserId = " << jd.userId;
	      LOG(debug) << "Start  = " << jd.startTime.getString();
	      LOG(debug) << "End    = " << jd.endTime.getString();
	      LOG(debug) << "Nodes: ";
	      for (const auto &n : jd.nodes) {
		      LOG(debug) << "    " << n;
	      }
#endif

	      //store JobData into Storage Backend
461
462
	      //dcdbslurmjob start inserts the endTime as startTime + max job length + 1, i.e. the last digit will be 1 
	      if ((jd.endTime == DCDB::TimeStamp((uint64_t)0)) || ((jd.endTime.getRaw() & 0x1) == 1)) {
Micha Müller's avatar
Micha Müller committed
463
464
		      //starting job data
		      if (myJobDataStore->insertJob(jd) != DCDB::JD_OK) {
465
			      LOG(error) << "Job data insert for job " << jd.jobId << " failed!";
Micha Müller's avatar
Micha Müller committed
466
467
468
469
470
			      return 1;
		      }
	      } else {
		      //ending job data
		      DCDB::JobData tmp;
471
		      if (myJobDataStore->getJobById(tmp, jd.jobId, jd.domainId) != DCDB::JD_OK) {
472
			      LOG(error) << "Could not retrieve job " << jd.jobId << " to be updated!";
Micha Müller's avatar
Micha Müller committed
473
474
475
			      return 1;
		      }

476
		      if (myJobDataStore->updateEndtime(tmp.jobId, tmp.startTime, jd.endTime, jd.domainId) != DCDB::JD_OK) {
477
			      LOG(error) << "Could not update end time of job " << tmp.jobId << "!";
Micha Müller's avatar
Micha Müller committed
478
479
480
			      return 1;
		      }
	      }
481
      } else {
Micha Müller's avatar
Micha Müller committed
482
	      mqttPayload buf, *payload;
483

Micha Müller's avatar
Micha Müller committed
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
	      len = msg->getPayloadLength();
	      //In the 64 bit message case, the collect agent provides a timestamp
	      if (len == sizeof(uint64_t)) {
		      payload = &buf;
		      payload->value = *((int64_t *)msg->getPayload());
		      payload->timestamp = Messaging::calculateTimestamp();
		      len = sizeof(uint64_t) * 2;
	      }
	      //...otherwise it just retrieves it from the MQTT message payload.
	      else if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
		      payload = (mqttPayload *)msg->getPayload();
	      }
	      //...otherwise this message is malformed -> ignore...
	      else {
		      LOG(error) << "Message malformed";
		      return 1;
	      }
501

Micha Müller's avatar
Micha Müller committed
502
	      /*
503
504
505
506
           * Check if we can decode the message topic
           * into a valid SensorId. If successful, store
           * the record in the database.
           */
Micha Müller's avatar
Micha Müller committed
507
508
	      DCDB::SensorId sid;
	      if (sid.mqttTopicConvert(msg->getTopic())) {
509
510
511
512
513
514
515
516
517
518
519
520
#if 0
              cout << "Topic decode successful:" << endl
                  << "  Raw:            " << hex << setw(16) << setfill('0') << sid.getRaw()[0] << hex << setw(16) << setfill('0') << sid.getRaw()[1] << endl
                  << "  DeviceLocation: " << hex << setw(16) << setfill('0') << sid.getDeviceLocation() << endl
                  << "  device_id:      " << hex << setw(8) << setfill('0') << sid.getDeviceSensorId().device_id << endl
                  << "  sensor_number:  " << hex << setw(4) << setfill('0') << sid.getDeviceSensorId().sensor_number << endl << dec;

              cout << "Payload ("  << len/sizeof(mqttPayload) << " messages):"<< endl;
              for (uint64_t i=0; i<len/sizeof(mqttPayload); i++) {
                cout << "  " << i << ": ts=" << payload[i].timestamp << " val=" << payload[i].value << endl;
              }
              cout << endl;
521
#endif
522
	      std::list<DCDB::SensorDataStoreReading> readings;
Alessio Netti's avatar
Alessio Netti committed
523
524
525
526
527
528
          for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
              DCDB::SensorDataStoreReading r(sid, payload[i].timestamp, payload[i].value);
              readings.push_back(r);
              mySensorCache.storeSensor(sid, payload[i].timestamp, payload[i].value);
          }
          mySensorCache.getSensorMap()[sid].updateBatchSize(uint64_t(len / sizeof(mqttPayload)));
529
	      mySensorDataStore->insertBatch(readings, metadataStore->getTTL(msg->getTopic()));
530
	      readingCtr+= readings.size();
531

532
              //mySensorCache.dump();
533
534
          } else {
              LOG(error) << "Message with empty topic received";
535
          }
536
      }
537
  }
538
#endif
539
  return 0;
540
541
}

542
543
544



545
546
547
548
/*
 * Print usage information
 */
void usage() {
549
550
551
552
553
  /*
             1         2         3         4         5         6         7         8
   012345678901234567890123456789012345678901234567890123456789012345678901234567890
   */
  cout << "Usage:" << endl;
554
  cout << "  collectagent [-d] [-s] [-x] [-a] [-m<host>] [-c<host>] [-u<username>] [-p<password>] [-t<ttl>] [-v<verbosity>] <config>" << endl;
555
556
557
558
  cout << "  collectagent -h" << endl;
  cout << endl;
  
  cout << "Options:" << endl;
559
560
  cout << "  -m<host>      MQTT listen address     [default: " << DEFAULT_LISTENHOST << ":" << DEFAULT_LISTENPORT << "]" << endl;
  cout << "  -c<host>      Cassandra host          [default: " << DEFAULT_CASSANDRAHOST << ":" << DEFAULT_CASSANDRAPORT << "]" << endl;
Michael Ott's avatar
Michael Ott committed
561
562
  cout << "  -u<username>  Cassandra username      [default: none]" << endl;
  cout << "  -p<password>  Cassandra password      [default: none]" << endl;
563
564
  cout << "  -t<ttl>       Cassandra insert TTL    [default: " << DEFAULT_CASSANDRATTL << "]" << endl;
  cout << "  -v<level>     Set verbosity of output [default: " << DEFAULT_LOGLEVEL << "]" << endl
Alessio Netti's avatar
Logging    
Alessio Netti committed
565
       << "                Can be a number between 5 (all) and 0 (fatal)." << endl;
566
  cout << endl;
Michael Ott's avatar
Michael Ott committed
567
  cout << "  -d            Daemonize" << endl;
568
  cout << "  -s            Print message stats" <<endl;
569
  cout << "  -x            Parse and print the config but do not actually start collectagent" << endl;
570
  cout << "  -a			   Enable sensor auto-publish" << endl;
Michael Ott's avatar
Michael Ott committed
571
  cout << "  -h            This help page" << endl;
572
  cout << endl;
573
574
575
}

int main(int argc, char* const argv[]) {
576
    cout << "CollectAgent " << VERSION << " (libdcdb " << DCDB::Version::getVersion() << ")" << endl << endl;
577
578

  try{
579

580
581
      // Checking if path to config is supplied
      if (argc <= 1) {
582
          cout << "Please specify a path to the config-directory or a config-file" << endl << endl;
583
584
585
586
587
          usage();
          exit(EXIT_FAILURE);
      }

      // Defining options
588
      const char* opts = "m:r:c:C:u:p:t:v:dDsaxh";
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604

      // Same mechanism as in DCDBPusher - checking if help string is requested before loading config
      char ret;
      while ((ret = getopt(argc, argv, opts)) != -1) {
          switch (ret)
          {
              case 'h':
                  usage();
                  exit(EXIT_FAILURE);
                  break;
              default:
                  //do nothing (other options are read later on)
                  break;
          }
      }

605
606
      initLogging();
      auto cmdSink = setupCmdLogger();
Alessio Netti's avatar
Logging    
Alessio Netti committed
607

Alessio Netti's avatar
Alessio Netti committed
608
      Configuration config(argv[argc - 1], "collectagent.conf");
609
      config.readConfig();
610

Alessio Netti's avatar
Alessio Netti committed
611
612
613
614
      // References to shorten access to config parameters
      Configuration& settings = config;
      cassandra_t& cassandraSettings = config.cassandraSettings;
      pluginSettings_t& pluginSettings = config.pluginSettings;
Micha Mueller's avatar
Micha Mueller committed
615
      serverSettings_t& restAPISettings = config.restAPISettings;
Alessio Netti's avatar
Alessio Netti committed
616
      analyticsSettings_t& analyticsSettings = config.analyticsSettings;
Alessio Netti's avatar
Alessio Netti committed
617
      
618
619
      optind = 1;
      while ((ret=getopt(argc, argv, opts))!=-1) {
620
          switch(ret) {
621
              case 'a':
622
                  pluginSettings.autoPublish = true;
623
                  break;
624
              case 'm':
Alessio Netti's avatar
Alessio Netti committed
625
626
                  settings.mqttListenHost = parseNetworkHost(optarg);
                  settings.mqttListenPort = parseNetworkPort(optarg);
627
                  if(settings.mqttListenPort=="") settings.mqttListenPort = string(DEFAULT_LISTENPORT);
628
                  break;
629
              case 'c':
Alessio Netti's avatar
Alessio Netti committed
630
631
                  cassandraSettings.host = parseNetworkHost(optarg);
                  cassandraSettings.port = parseNetworkPort(optarg);
632
                  if(cassandraSettings.port=="") cassandraSettings.port = string(DEFAULT_CASSANDRAPORT);
633
                  break;
Michael Ott's avatar
Michael Ott committed
634
              case 'u':
Alessio Netti's avatar
Alessio Netti committed
635
                  cassandraSettings.username = optarg;
636
                  break;
Michael Ott's avatar
Michael Ott committed
637
              case 'p': {
Alessio Netti's avatar
Alessio Netti committed
638
                  cassandraSettings.password = optarg;
639
                  // What does this do? Mask the password?
Michael Ott's avatar
Michael Ott committed
640
641
642
643
644
645
646
                  size_t pwdLen = strlen(optarg);
                  memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
                  if (pwdLen > 3) {
                      memset(optarg+3, 0, pwdLen-3);
                  }
                  break;
              }
647
              case 't':
Alessio Netti's avatar
Alessio Netti committed
648
                  cassandraSettings.ttl = stoul(optarg);
649
                  break;
Alessio Netti's avatar
Logging    
Alessio Netti committed
650
              case 'v':
651
                  settings.logLevelCmd = stoi(optarg);
Alessio Netti's avatar
Logging    
Alessio Netti committed
652
                  break;
653
              case 'd':
654
              case 'D':
655
                  settings.daemonize = 1;
656
                  break;
657
              case 's':
658
                  settings.statisticsInterval = 1;
659
                  break;
660
              case 'x':
Alessio Netti's avatar
Alessio Netti committed
661
                  settings.validateConfig = true;
662
                  break;
663
              case 'h':
664
665
666
              default:
                  usage();
                  exit(EXIT_FAILURE);
667
668
669
          }
      }

670
671
      //set up logger to file
      if (settings.logLevelFile >= 0) {
Alessio Netti's avatar
Alessio Netti committed
672
	  auto fileSink = setupFileLogger(pluginSettings.tempdir, std::string("collectagent"));
673
674
675
	  fileSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelFile));
      }
      
Alessio Netti's avatar
Logging    
Alessio Netti committed
676
      //severity level may be overwritten (per option or config-file) --> set it according to globalSettings
677
678
679
      if (settings.logLevelCmd >= 0) {
	  cmdSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelCmd));
      }
Alessio Netti's avatar
Logging    
Alessio Netti committed
680

681
      /*
Alessio Netti's avatar
Alessio Netti committed
682
       * Catch SIGINT and SIGTERM signals to allow for proper server shutdowns.
683
684
       */
      signal(SIGINT, sigHandler);
Alessio Netti's avatar
Alessio Netti committed
685
      signal(SIGTERM, sigHandler);
Alessio Netti's avatar
Alessio Netti committed
686
      signal(SIGUSR1, sigHandler);
687
688
689
690
691
692
693
694
695
696

      /*
       * Catch critical signals to allow for backtraces
       */
      signal(SIGABRT, abrtHandler);
      signal(SIGSEGV, abrtHandler);

      // Daemonizing the collectagent
      if(settings.daemonize)
          dcdbdaemon();
Alessio Netti's avatar
Alessio Netti committed
697
      
698
      // Setting the size of the sensor cache
699
      // Conversion from milliseconds to nanoseconds
Alessio Netti's avatar
Alessio Netti committed
700
      mySensorCache.setMaxHistory(uint64_t(pluginSettings.cacheInterval) * 1000000);
701

702
      //Allocate and initialize connection to Cassandra.
Alessio Netti's avatar
Alessio Netti committed
703
704
      dcdbConn = new DCDB::Connection(cassandraSettings.host, atoi(cassandraSettings.port.c_str()), 
                                      cassandraSettings.username, cassandraSettings.password);
Alessio Netti's avatar
Alessio Netti committed
705
706
      dcdbConn->setNumThreadsIo(cassandraSettings.numThreadsIo);
      dcdbConn->setQueueSizeIo(cassandraSettings.queueSizeIo);
707
      uint32_t params[1] = {cassandraSettings.coreConnPerHost};
Alessio Netti's avatar
Alessio Netti committed
708
      dcdbConn->setBackendParams(params);
Alessio Netti's avatar
Alessio Netti committed
709
      
Axel Auweter's avatar
Axel Auweter committed
710
      if (!dcdbConn->connect()) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
711
          LOG(fatal) << "Cannot connect to Cassandra!";
712
713
714
715
716
717
          exit(EXIT_FAILURE);
      }

      /*
       * Legacy behavior: Initialize the DCDB schema in Cassandra.
       */
Axel Auweter's avatar
Axel Auweter committed
718
      dcdbConn->initSchema();
Alessio Netti's avatar
Alessio Netti committed
719
      
720
721
722
      /*
       * Allocate the SensorDataStore.
       */
723
      mySensorDataStore = new DCDB::SensorDataStore(dcdbConn);
724
      mySensorConfig = new DCDB::SensorConfig(dcdbConn);
Alessio Netti's avatar
Alessio Netti committed
725
      myJobDataStore = new DCDB::JobDataStore(dcdbConn);
726
      myCaliEvtDataStore = new DCDB::CaliEvtDataStore(dcdbConn);
727
728
729

      /*
       * Set TTL for data store inserts if TTL > 0.
730
       */
731
      if (cassandraSettings.ttl > 0) {
Alessio Netti's avatar
Alessio Netti committed
732
        mySensorDataStore->setTTL(cassandraSettings.ttl);
733
734
        myCaliEvtDataStore->setTTL(cassandraSettings.ttl);
      }
Alessio Netti's avatar
Alessio Netti committed
735
      mySensorDataStore->setDebugLog(cassandraSettings.debugLog);
736
      myCaliEvtDataStore->setDebugLog(cassandraSettings.debugLog);
737

738
739
      // Fetching public sensor information from the Cassandra datastore
      list<DCDB::PublicSensor> publicSensors;
740
741
742
743
744
      if(mySensorConfig->getPublicSensorsVerbose(publicSensors)!=SC_OK) {
	  LOG(error) << "Failed to retrieve public sensors!";
	  exit(EXIT_FAILURE);
      }
      
745
      metadataStore = new MetadataStore();
746
      SensorMetadata sBuf;
747
      for (const auto &s : publicSensors)
748
          if (!s.is_virtual) {
749
              sBuf = DCDB::PublicSensor::publicSensorToMetadata(s);
750
751
              if(sBuf.isValid())
                  metadataStore->store(*sBuf.getPattern(), sBuf);
752
          }
753
      publicSensors.clear();
754
          
755
756
      analyticsController = new AnalyticsController(mySensorConfig, mySensorDataStore);
      analyticsController->setCache(&mySensorCache);
757
      analyticsController->setMetadataStore(metadataStore);
Alessio Netti's avatar
Alessio Netti committed
758
      queryEngine.setFilter(analyticsSettings.filter);
Alessio Netti's avatar
Alessio Netti committed
759
      queryEngine.setJobFilter(analyticsSettings.jobFilter);
760
      queryEngine.setJobMatch(analyticsSettings.jobMatch);
761
      queryEngine.setJobIDFilter(analyticsSettings.jobIdFilter);
762
      queryEngine.setJobDomainId(analyticsSettings.jobDomainId);
Alessio Netti's avatar
Alessio Netti committed
763
      queryEngine.setSensorHierarchy(analyticsSettings.hierarchy);
764
      queryEngine.setQueryCallback(sensorQueryCallback);
765
      queryEngine.setGroupQueryCallback(sensorGroupQueryCallback);
766
      queryEngine.setMetadataQueryCallback(metadataQueryCallback);
Alessio Netti's avatar
Alessio Netti committed
767
      queryEngine.setJobQueryCallback(jobQueryCallback);
768
      if(!analyticsController->initialize(settings))
769
770
          return EXIT_FAILURE;
      
Alessio Netti's avatar
Alessio Netti committed
771
      LOG_LEVEL vLogLevel = settings.validateConfig ? LOG_LEVEL::info : LOG_LEVEL::debug;
772
773
774
775
      LOG_VAR(vLogLevel) << "-----  Configuration  -----";

      //print global settings in either case
      LOG(info) << "Global Settings:";
Alessio Netti's avatar
Alessio Netti committed
776
      LOG(info) << "    MQTT-listenAddress: " << settings.mqttListenHost << ":" << settings.mqttListenPort;
Alessio Netti's avatar
Alessio Netti committed
777
      LOG(info) << "    CacheInterval:      " << int(pluginSettings.cacheInterval/1000) << " [s]";
778
779
780
781
      LOG(info) << "    CleaningInterval:   " << settings.cleaningInterval << " [s]";
      LOG(info) << "    MessageThreads:     " << settings.messageThreads;
      LOG(info) << "    MessageSlots:       " << settings.messageSlots;
      LOG(info) << "    Daemonize:          " << (settings.daemonize ? "Enabled" : "Disabled");
782
      LOG(info) << "    StatisticsInterval: " << settings.statisticsInterval << " [s]";
Alessio Netti's avatar
Alessio Netti committed
783
      LOG(info) << "    MQTT-prefix:        " << pluginSettings.mqttPrefix;
784
      LOG(info) << "    Auto-publish:       " << (pluginSettings.autoPublish ? "Enabled" : "Disabled");
Alessio Netti's avatar
Alessio Netti committed
785
786
      LOG(info) << "    Write-Dir:          " << pluginSettings.tempdir;
      LOG(info) << (settings.validateConfig ? "    Only validating config files." : "    ValidateConfig:     Disabled");
787

Alessio Netti's avatar
Alessio Netti committed
788
789
      LOG(info) << "Analytics Settings:";
      LOG(info) << "    Hierarchy:          " << (analyticsSettings.hierarchy!="" ? analyticsSettings.hierarchy : "none");
790
      LOG(info) << "    Filter:             " << (analyticsSettings.filter!="" ? analyticsSettings.filter : "none");
791
792
      LOG(info) << "    Job Filter:         " << (analyticsSettings.jobFilter != "" ? analyticsSettings.jobFilter : "none");
      LOG(info) << "    Job Match:          " << (analyticsSettings.jobMatch != "" ? analyticsSettings.jobMatch : "none");
793
      LOG(info) << "    Job ID Filter:      " << (analyticsSettings.jobIdFilter != "" ? analyticsSettings.jobIdFilter : "none");
794
      LOG(info) << "    Job Domain ID:      " << analyticsSettings.jobDomainId;
Alessio Netti's avatar
Alessio Netti committed
795
      
796
      LOG(info) << "Cassandra Driver Settings:";
Alessio Netti's avatar
Alessio Netti committed
797
      LOG(info) << "    Address:            " << cassandraSettings.host << ":" << cassandraSettings.port;
Alessio Netti's avatar
Alessio Netti committed
798
799
800
801
802
      LOG(info) << "    TTL:                " << cassandraSettings.ttl;
      LOG(info) << "    NumThreadsIO:       " << cassandraSettings.numThreadsIo;
      LOG(info) << "    QueueSizeIO:        " << cassandraSettings.queueSizeIo;
      LOG(info) << "    CoreConnPerHost:    " << cassandraSettings.coreConnPerHost;
      LOG(info) << "    DebugLog:           " << (cassandraSettings.debugLog ? "Enabled" : "Disabled");
803
#ifdef SimpleMQTTVerbose
Alessio Netti's avatar
Alessio Netti committed
804
805
      LOG(info) << "    Username:           " << cassandraSettings.username;
	  LOG(info) << "    Password:           " << cassandraSettings.password;
806
807
808
809
#else
      LOG(info) << "    Username and password not printed.";
#endif

810
811
812
813
814
      if (restAPISettings.enabled) {
	  
	  LOG(info) << "RestAPI Settings:";
	  LOG(info) << "    REST Server: " << restAPISettings.host << ":" << restAPISettings.port;
	  LOG(info) << "    Certificate: " << restAPISettings.certificate;
Alessio Netti's avatar
Alessio Netti committed
815
	  LOG(info) << "    Private key file: " << restAPISettings.privateKey;
Michael Ott's avatar
Michael Ott committed
816
	  
817
	  if (config.influxSettings.measurements.size() > 0) {
Michael Ott's avatar
Michael Ott committed
818
	      LOG(info) << "InfluxDB Settings:";
819
	      LOG(info) << "    MQTT-Prefix:  " << config.influxSettings.mqttPrefix;
Michael Ott's avatar
Michael Ott committed
820
821
	      LOG(info) << "    Auto-Publish: " << (config.influxSettings.publish ? "Enabled" : "Disabled");

822
	      for (auto &m: config.influxSettings.measurements) {
Michael Ott's avatar
Michael Ott committed
823
		  LOG(info) << "    Measurement: " << m.first;
824
		  LOG(info) << "        MQTT-Part:   " << m.second.mqttPart;
Michael Ott's avatar
Michael Ott committed
825
		  LOG(info) << "        Tag:         " << m.second.tag;
Michael Ott's avatar
Michael Ott committed
826
827
828
829
830
831
		  if ((m.second.tagRegex.size() > 0) && (m.second.tagSubstitution.size() > 0)) {
		      if (m.second.tagSubstitution != "&") {
			  LOG(info) << "        TagFilter:   s/" << m.second.tagRegex.str() << "/" <<  m.second.tagSubstitution << "/";
		      } else {
			  LOG(info) << "    TagFilter:   " << m.second.tagRegex.str();
		      }
Michael Ott's avatar
Michael Ott committed
832
833
834
835
836
837
		  }
		  if (m.second.fields.size() > 0) {
		      stringstream ss;
		      copy(m.second.fields.begin(), m.second.fields.end(), ostream_iterator<std::string>(ss, ","));
		      string fields = ss.str();
		      fields.pop_back();
Michael Ott's avatar
Michael Ott committed
838
		      LOG(info) << "        Fields:      " << fields;
Michael Ott's avatar
Michael Ott committed
839
840
841
		  }
	      }
	  }
842
      }
843
844
      LOG_VAR(vLogLevel) << "-----  Analytics Configuration  -----";
      for(auto& p : analyticsController->getManager()->getPlugins()) {
845
          LOG_VAR(vLogLevel) << "Operator Plugin \"" << p.id << "\"";
846
847
848
849
          p.configurator->printConfig(vLogLevel);
      }
      LOG_VAR(vLogLevel) << "-----  End Configuration  -----";

Alessio Netti's avatar
Alessio Netti committed
850
      if (settings.validateConfig)
851
852
853
854
          return EXIT_SUCCESS;
      else
          analyticsController->start();

855
856
857
      /*
       * Start the MQTT Message Server.
       */
Alessio Netti's avatar
Alessio Netti committed
858
      SimpleMQTTServer ms(settings.mqttListenHost, settings.mqttListenPort, settings.messageThreads, settings.messageSlots);
859
      
860
861
862
      ms.setMessageCallback(mqttCallback);
      ms.start();

Alessio Netti's avatar
Logging    
Alessio Netti committed
863
      LOG(info) << "MQTT Server running...";
864
      
865
866
867
      /*
       * Start the HTTP Server for the REST API
       */
868
      if (restAPISettings.enabled) {
Michael Ott's avatar
Michael Ott committed
869
          httpsServer = new CARestAPI(restAPISettings, &config.influxSettings, &mySensorCache, mySensorDataStore, mySensorConfig, analyticsController, &ms);
870
871
872
          config.readRestAPIUsers(httpsServer);
          httpsServer->start();
          LOG(info) <<  "HTTP Server running...";
873
      }
874

875
876
877
878
      /*
       * Run (hopefully) forever...
       */
      keepRunning = 1;
879
      uint64_t start, end;
880
      float elapsed;
881
882
      msgCtr = 0;
      readingCtr = 0;
883
884
      dbQueryCtr = 0;
      cachedQueryCtr = 0;
885

886
887
      start = getTimestamp();
      uint64_t lastCleanup = start;
888
      uint64_t sleepInterval = (settings.statisticsInterval > 0) ? settings.statisticsInterval : 60;
Alessio Netti's avatar
Alessio Netti committed
889

890
      LOG(info) << "Collect Agent running...";
891
      while(keepRunning) {
892
          start = getTimestamp();
893
          if(NS_TO_S(start) - NS_TO_S(lastCleanup) > settings.cleaningInterval) {
894
895
              uint64_t purged = mySensorCache.clean(S_TO_NS(settings.cleaningInterval));
              lastCleanup = start;
Alessio Netti's avatar
Alessio Netti committed
896
897
898
899
              if(purged > 0)
                  LOG(info) << "Cache: purged " << purged << " obsolete entries";
          }

900
901
902
903
	  sleep(sleepInterval);

	  if((settings.statisticsInterval > 0) && keepRunning) {
	      /* not really thread safe but will do the job */
904
	      end = getTimestamp();
905
906
	      elapsed = (float)(NS_TO_S(end) - NS_TO_S(start));
	      float aIns = ceil(((float)analyticsController->getReadingCtr()) / elapsed);
907
908
	      float cacheReq = ceil(((float)cachedQueryCtr) / elapsed);
	      float dbReq = ceil(((float)dbQueryCtr) / elapsed);
909
910
911
	      float rIns = restAPISettings.enabled ? ceil(((float)httpsServer->getInfluxCounter()) / elapsed) : 0.0f;
	      float mIns = ceil(((float)readingCtr) / elapsed);
	      float mMsg = ceil(((float) msgCtr) / elapsed);
912
	      LOG(info) << "Performance: MQTT [" << std::fixed << std::setprecision(0) << mIns << " ins/s|" << mMsg << " msg/s]   REST [" << rIns << " ins/s]   Analytics [" << aIns << " ins/s]   Cache [" << cacheReq << " req/s]   DB [" << dbReq << " req/s]";
913
914
915
916
917
918
919
920
	      std::map<std::string, hostInfo_t> lastSeen = ms.collectLastSeen();
	      uint64_t connectedHosts = 0;
	      for (auto h: lastSeen) {
		  if (h.second.lastSeen >= end - S_TO_NS(settings.statisticsInterval)) {
		      connectedHosts++;
		  }
	      }
	      LOG(info) << "Connected hosts: " << connectedHosts;
921
	      msgCtr = 0;
922
923
	      cachedQueryCtr = 0;
	      dbQueryCtr = 0;
924
925
	      readingCtr = 0;
	  }
926
927
      }

Alessio Netti's avatar
Logging    
Alessio Netti committed
928
      LOG(info) << "Stopping...";
929
      analyticsController->stop();
930
      ms.stop();
Alessio Netti's avatar
Logging    
Alessio Netti committed
931
      LOG(info) << "MQTT Server stopped...";
932
933
934
935
936
      if (restAPISettings.enabled) {
	  httpsServer->stop();
	  delete httpsServer;
	  LOG(info) << "HTTP Server stopped...";
      }
937
      delete mySensorDataStore;
Alessio Netti's avatar
Alessio Netti committed
938
      delete myJobDataStore;
939
      delete mySensorConfig;
940
      delete myCaliEvtDataStore;
Axel Auweter's avatar
Axel Auweter committed
941
942
      dcdbConn->disconnect();
      delete dcdbConn;
943
      delete metadataStore;
944
      delete analyticsController;
Alessio Netti's avatar
Logging    
Alessio Netti committed