Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

collectagent.cpp 37.8 KB
Newer Older
1
2
//================================================================================
// Name        : collectagent.cpp
3
// Author      : Axel Auweter
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
// Copyright   : Leibniz Supercomputing Centre
6
// Description : Main code of the CollectAgent
7
8
9
10
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
11
// Copyright (C) 2011-2019 Leibniz Supercomputing Centre
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
27

28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
 * @defgroup ca Collect Agent
 *
 * @brief MQTT message broker in between pusher and storage backend.
 *
 * @details Collect Agent is a intermediary between one or multiple Pusher
 *          instances and one storage backend. It runs a reduced custom MQTT
 *          message server. Collect Agent receives data from Pusher
 *          via MQTT messages and stores them in the storage via libdcdb.
 */

/**
 * @file collectagent.cpp
 *
 * @brief Main function for the DCDB Collect Agent.
 *
 * @ingroup ca
 */

47
#include <cstdlib>
48
#include <signal.h>
49
50
#include <unistd.h>
#include <string>
51

52
#include <boost/date_time/posix_time/posix_time.hpp>
Micha Müller's avatar
Micha Müller committed
53
54
55
#include <boost/foreach.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>
56

57
#include <dcdb/connection.h>
58
#include <dcdb/sensordatastore.h>
Alessio Netti's avatar
Alessio Netti committed
59
#include <dcdb/jobdatastore.h>
60
#include <dcdb/calievtdatastore.h>
61
#include <dcdb/sensorconfig.h>
62
#include <dcdb/version.h>
63
#include <dcdb/sensor.h>
64
#include "version.h"
65

66
#include "CARestAPI.h"
67
#include "configuration.h"
68
#include "simplemqttserver.h"
69
#include "messaging.h"
70
#include "abrt.h"
71
#include "dcdbdaemon.h"
72
#include "sensorcache.h"
73
74
#include "analyticscontroller.h"
#include "../analytics/includes/QueryEngine.h"
75

76
77
78
#define __STDC_FORMAT_MACROS
#include <inttypes.h>

79
80
81
82
83
84
85
/**
 * Uncomment and recompile to activate CollectAgent's special benchmark mode.
 * In this mode, all received messages will be discarded and no data is stored
 * in the storage backend.
 */
//#define BENCHMARK_MODE

86
using namespace std;
87

88
bool newAutoPub;
89
int keepRunning;
Alessio Netti's avatar
Alessio Netti committed
90
int retCode = EXIT_SUCCESS;
91
uint64_t msgCtr;
92
uint64_t readingCtr;
93
94
uint64_t dbQueryCtr;
uint64_t cachedQueryCtr;
Alessio Netti's avatar
Alessio Netti committed
95
SensorCache mySensorCache;
96
AnalyticsController* analyticsController;
97
DCDB::Connection* dcdbConn;
98
DCDB::SensorDataStore *mySensorDataStore;
Alessio Netti's avatar
Alessio Netti committed
99
DCDB::JobDataStore *myJobDataStore;
100
DCDB::SensorConfig *mySensorConfig;
101
DCDB::CaliEvtDataStore *myCaliEvtDataStore;
102
MetadataStore *metadataStore;
Alessio Netti's avatar
Alessio Netti committed
103
CARestAPI* httpsServer = nullptr;
104
DCDB::SCError err;
105
QueryEngine& queryEngine = QueryEngine::getInstance();
106
logger_t lg;
107

108
bool jobQueryCallback(const string& jobId, const uint64_t startTs, const uint64_t endTs, vector<qeJobData>& buffer, const bool rel, const bool range, const string& domainId) {
Alessio Netti's avatar
Alessio Netti committed
109
110
111
112
113
114
115
116
117
118
    std::list<JobData> tempList;
    JobData   tempData;
    qeJobData tempQeData;
    JDError err;
    if(range) {
        // Getting a list of jobs in the given time range
        uint64_t now = getTimestamp();
        uint64_t startTsInt = rel ? now - startTs : startTs;
        uint64_t endTsInt = rel ? now - endTs : endTs;
        DCDB::TimeStamp start(startTsInt), end(endTsInt);
119
        err = myJobDataStore->getJobsInIntervalRunning(tempList, start, end, domainId);
120
        if(err != JD_OK) return false;
Alessio Netti's avatar
Alessio Netti committed
121
122
    } else {
        // Getting a single job by id
123
        err = myJobDataStore->getJobById(tempData, jobId, domainId);
124
        if(err != JD_OK) return false;
Alessio Netti's avatar
Alessio Netti committed
125
126
127
128
        tempList.push_back(tempData);
    }
    
    for(auto& jd : tempList) {
129
        tempQeData.domainId = jd.domainId;
Alessio Netti's avatar
Alessio Netti committed
130
131
132
133
134
        tempQeData.jobId = jd.jobId;
        tempQeData.userId = jd.userId;
        tempQeData.startTime = jd.startTime.getRaw();
        tempQeData.endTime = jd.endTime.getRaw();
        tempQeData.nodes = jd.nodes;
135
        buffer.push_back(tempQeData);
Alessio Netti's avatar
Alessio Netti committed
136
    }
137
    return true;
Alessio Netti's avatar
Alessio Netti committed
138
139
}

140
bool sensorGroupQueryCallback(const std::vector<string>& names, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>& buffer, const bool rel, const uint64_t tol) {
141
142
143
    // Returning NULL if the query engine is being updated
    if(queryEngine.updating.load()) return false;
    ++queryEngine.access;
144
145
146
147
148
149
150
151
152
153
154
155
156
157
    
    std::list<DCDB::SensorId> topics;
    std::string topic;
    sensorCache_t& sensorMap = mySensorCache.getSensorMap();
    size_t successCtr = 0;
    for(const auto& name : names) {
        // Getting the topic of the queried sensor from the Navigator
        // If not found, we try to use the input name as topic
        try {
            topic = queryEngine.getNavigator()->getNodeTopic(name);
        } catch (const std::domain_error &e) { topic = name; }
        DCDB::SensorId sid;
        // Creating a SID to perform the query
        if (sid.mqttTopicConvert(topic)) {
158
            mySensorCache.wait();
159
            if (sensorMap.count(sid) > 0 && sensorMap[sid].getView(startTs, endTs, buffer, rel, tol)) {
160
161
162
163
                // Data was found, can continue to next SID
                successCtr++;
            } else {
                // This happens only if no data was found in the local cache
164
165
                topics.push_back(sid);
            }
166
            mySensorCache.release();
167
        }
168
    }
169
170
171
172
    if (successCtr) {
	cachedQueryCtr+= buffer.size();
    }
    
173
174
175
176
177
178
179
180
181
182
183
184
185
    // If we are here then some sensors were not found in the cache - we need to fetch data from Cassandra
    if(!topics.empty()) {
        try {
            std::list <DCDB::SensorDataStoreReading> results;
            uint64_t now = getTimestamp();
            //Converting relative timestamps to absolute
            uint64_t startTsInt = rel ? now - startTs : startTs;
            uint64_t endTsInt = rel ? now - endTs : endTs;
            DCDB::TimeStamp start(startTsInt), end(endTsInt);
            uint16_t startWs=start.getWeekstamp(), endWs=end.getWeekstamp();
            // If timestamps are equal we perform a fuzzy query
            if(startTsInt == endTsInt) {
                topics.front().setRsvd(startWs);
186
                mySensorDataStore->fuzzyQuery(results, topics, start, tol, false);
187
188
189
190
191
            }
            // Else, we iterate over the weekstamps (if more than one) and perform range queries
            else {
                for(uint16_t currWs=startWs; currWs<=endWs; currWs++) {
                    topics.front().setRsvd(currWs);
192
                    mySensorDataStore->query(results, topics, start, end, DCDB::AGGREGATE_NONE, false);
193
194
195
196
197
198
                }
            }

            if (!results.empty()) {
                successCtr++;
                reading_t reading;
199
		dbQueryCtr+= results.size();
200
201
202
203
204
205
                for (const auto &r : results) {
                    reading.value = r.value;
                    reading.timestamp = r.timeStamp.getRaw();
                    buffer.push_back(reading);
                }
            }
206
        }
207
        catch (const std::exception &e) {}
208
    }
209
    
210
    --queryEngine.access;
211
212
213
    return successCtr>0;
}

214
bool sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>& buffer, const bool rel, const uint64_t tol) {
215
216
217
218
    // Returning NULL if the query engine is being updated
    if(queryEngine.updating.load()) return false;
    std::vector<std::string> nameWrapper;
    nameWrapper.push_back(name);
219
    return sensorGroupQueryCallback(nameWrapper, startTs, endTs, buffer, rel, tol);
220
221
}

222
223
224
225
226
227
228
229
230
231
232
bool metadataQueryCallback(const string& name, SensorMetadata& buffer) {
    // Returning NULL if the query engine is being updated
    if(queryEngine.updating.load()) return false;
    ++queryEngine.access;
    std::string topic=name;
    // Getting the topic of the queried sensor from the Navigator
    // If not found, we try to use the input name as topic
    try {
        topic = queryEngine.getNavigator()->getNodeTopic(name);
    } catch(const std::domain_error& e) {}
    
233
    bool local = false;
234
235
236
237
    metadataStore->wait();
    if(metadataStore->getMap().count(topic)) {
        buffer = metadataStore->get(topic);
        local = true;
238
    }
239
240
    metadataStore->release();
        
241
    if(!local) {
242
243
244
245
246
247
248
        // If we are here then the sensor was not found in the cache - we need to fetch data from Cassandra
        try {
            DCDB::PublicSensor publicSensor;
            if (mySensorConfig->getPublicSensorByName(publicSensor, topic.c_str()) != SC_OK) {
                --queryEngine.access;
                return false;
            }
249
            buffer = DCDB::PublicSensor::publicSensorToMetadata(publicSensor);
250
251
252
253
254
255
256
257
258
259
        }
        catch (const std::exception &e) {
            --queryEngine.access;
            return false;
        }
    }
    --queryEngine.access;
    return true;
}

260
/* Normal termination (SIGINT, CTRL+C) */
261
262
void sigHandler(int sig)
{
Alessio Netti's avatar
Alessio Netti committed
263
  boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
Alessio Netti's avatar
Alessio Netti committed
264
  if( sig == SIGINT ) {
Alessio Netti's avatar
Alessio Netti committed
265
      LOG(fatal) << "Received SIGINT";
Alessio Netti's avatar
Alessio Netti committed
266
267
      retCode = EXIT_SUCCESS;
  } else if( sig == SIGTERM ) {
Alessio Netti's avatar
Alessio Netti committed
268
      LOG(fatal) << "Received SIGTERM";
Alessio Netti's avatar
Alessio Netti committed
269
270
271
272
273
      retCode = EXIT_SUCCESS;
  } else if( sig == SIGUSR1 ) {
      LOG(fatal) << "Received SIGUSR1 via REST API";
      retCode = !httpsServer ? EXIT_SUCCESS : httpsServer->getReturnCode();
  }
274
275
276
  keepRunning = 0;
}

277
278
279
280
281
282
/* Crash */
void abrtHandler(int sig)
{
  abrt(EXIT_FAILURE, SIGNAL);
}

283
int mqttCallback(SimpleMQTTMessage *msg)
284
285
{
  /*
Michael Ott's avatar
Michael Ott committed
286
   * Increment the msgCtr for statistics.
287
   */
288
  msgCtr++;
289

290
291
#ifndef BENCHMARK_MODE

292
  uint64_t len;
293
294
295
  /*
   * Decode the message and put into the database.
   */
296
  if (msg->isPublish()) {
297
      const char *topic = msg->getTopic().c_str();
298
      // We check whether the topic includes the /DCDB_MAP/ keyword, indicating that the payload will contain the
299
300
      // sensor's name. In that case, we set the mappingMessage flag to true, and filter the keyword out of the prefix
      // We use strncmp as it is the most efficient way to do it
Alessio Netti's avatar
Alessio Netti committed
301
      if (strncmp(topic, DCDB_MAP, DCDB_MAP_LEN) == 0) {
302
          if ((len = msg->getPayloadLength()) == 0) {
303
              LOG(error) << "Empty sensor publish message received!";
304
305
              return 1;
          }
306

307
          string payload((char *) msg->getPayload(), len);
308
309
310
311
312
313
314
315
316
          //If the topic includes the extended /DCDB_MAP/METADATA/ keyword, we assume a JSON metadata packet is encoded
          if(strncmp(topic, DCDB_MET, DCDB_MET_LEN) == 0) {
              SensorMetadata sm;
              try {
                  sm.parseJSON(payload);
              } catch (const std::exception &e) {
                  LOG(error) << "Invalid metadata packed received!";
                  return 1;
              }
317
              if(sm.isValid()) {
318
                  err = mySensorConfig->publishSensor(sm);
319
320
                  metadataStore->store(*sm.getPattern(), sm);
              }
321
322
          } else {
              err = mySensorConfig->publishSensor(payload.c_str(), topic + DCDB_MAP_LEN);
323
          }
324
325
326
327

          // PublishSensor does most of the error checking for us
          switch (err) {
              case DCDB::SC_INVALIDPATTERN:
Alessio Netti's avatar
Logging    
Alessio Netti committed
328
                  LOG(error) << "Invalid sensor topic : " << msg->getTopic();
329
330
                  return 1;
              case DCDB::SC_INVALIDPUBLICNAME:
331
                  LOG(error) << "Invalid sensor public name.";
332
333
                  return 1;
              case DCDB::SC_INVALIDSESSION:
Alessio Netti's avatar
Logging    
Alessio Netti committed
334
                  LOG(error) << "Cannot reach sensor data store.";
335
336
337
338
                  return 1;
              default:
                  break;
          }
339
340
          
          newAutoPub = true;
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
      } else if (strncmp(topic, DCDB_CALIEVT, DCDB_CALIEVT_LEN) == 0) {
          /*
           * Special message case. This message contains a Caliper Event data
           * string that is encoded in the MQTT topic. Its payload consists of
           * usual timestamp-value pairs.
           * Data from this messages will be stored in a separate table managed
           * by CaliEvtDataStore class.
           */
          std::string topicStr(msg->getTopic());
          mqttPayload buf, *payload;

          len = msg->getPayloadLength();
          //TODO support case that no timestamp is given?
          //retrieve timestamp and value from the payload
          if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
              payload = (mqttPayload *) msg->getPayload();
          } else {
              //this message is malformed -> ignore
              LOG(error) << "Message malformed";
              return 1;
          }

          /*
           * Decode message topic in actual sensor topic and string data.
           * "/:/" is used as delimiter between topic and data.
           */
          topicStr.erase(0, DCDB_CALIEVT_LEN);
          size_t pos = topicStr.find("/:/");
          if (pos == std::string::npos) {
              // topic is malformed -> ignore
              LOG(error) << "CaliEvt topic malformed";
              return 1;
          }
          const std::string data(topicStr, pos+3);
          topicStr.erase(pos);

          /*
           * We use the same MQTT-topic/SensorId infrastructure as actual sensor
           * data readings to sort related events.
           * Check if we can decode the event topic into a valid SensorId. If
           * successful, store the record in the database.
           */
          DCDB::SensorId sid;
          if (sid.mqttTopicConvert(topicStr)) {
385
              //std::list<DCDB::CaliEvtData> events;
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
              DCDB::CaliEvtData e;
              e.eventId   = sid;
              e.event     = data;
              for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
                  e.timeStamp = payload[i].timestamp;

                  /**
                   * We want an exhaustive list of all events ordered by their
                   * time of occurrence. Payload values should always be
                   * one. Other values currently indicate a malformed message.
                   *
                   * In the future, the value field could be used to aggregate
                   * multiple equal events that occurred in the same plugin
                   * read cycle.
                   */
                  if(payload[i].value != 1) {
                      LOG(error) << "CaliEvt message malformed. Value != 1";
                      return 1;
                  }

406
407
                  myCaliEvtDataStore->insert(e, metadataStore->getTTL(topicStr));
                  //events.push_back(e);
408
              }
409
              //myCaliEvtDataStore->insertBatch(events, metadataStore->getTTL(topicStr));
410
411
412
          } else {
              LOG(error) << "Topic could not be converted to SID";
          }
Micha Müller's avatar
Micha Müller committed
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
      } else if (strncmp(topic, DCDB_JOBDATA, DCDB_JOBDATA_LEN) == 0) {
	      /**
           * This message contains Slurm job data in JSON format. We need to
           * parse the payload and store it within the JobDataStore.
           */
	      if ((len = msg->getPayloadLength()) == 0) {
		      LOG(error) << "Empty job data message received!";
		      return 1;
	      }

	      //parse JSON into JobData object
	      string        payload((char *)msg->getPayload(), len);
	      DCDB::JobData jd;
	      try {
		      boost::property_tree::iptree config;
		      std::istringstream           str(payload);
		      boost::property_tree::read_json(str, config);
		      BOOST_FOREACH (boost::property_tree::iptree::value_type &val, config) {
			      if (boost::iequals(val.first, "jobid")) {
				      jd.jobId = val.second.data();
433
434
435
			      } else if (boost::iequals(val.first, "domainid")) {
                      jd.domainId = val.second.data();
                  } else if (boost::iequals(val.first, "userid")) {
Micha Müller's avatar
Micha Müller committed
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
				      jd.userId = val.second.data();
			      } else if (boost::iequals(val.first, "starttime")) {
				      jd.startTime = DCDB::TimeStamp((uint64_t)stoull(val.second.data()));
			      } else if (boost::iequals(val.first, "endtime")) {
				      jd.endTime = DCDB::TimeStamp((uint64_t)stoull(val.second.data()));
			      } else if (boost::iequals(val.first, "nodes")) {
				      BOOST_FOREACH (boost::property_tree::iptree::value_type &node, val.second) {
					      jd.nodes.push_back(node.second.data());
				      }
			      }
		      }
	      } catch (const std::exception &e) {
		      LOG(error) << "Invalid job data packet received!";
		      return 1;
	      }

#ifdef DEBUG
	      LOG(debug) << "JobId  = " << jd.jobId;
	      LOG(debug) << "UserId = " << jd.userId;
	      LOG(debug) << "Start  = " << jd.startTime.getString();
	      LOG(debug) << "End    = " << jd.endTime.getString();
	      LOG(debug) << "Nodes: ";
	      for (const auto &n : jd.nodes) {
		      LOG(debug) << "    " << n;
	      }
#endif

	      //store JobData into Storage Backend
464
465
	      //dcdbslurmjob start inserts the endTime as startTime + max job length + 1, i.e. the last digit will be 1 
	      if ((jd.endTime == DCDB::TimeStamp((uint64_t)0)) || ((jd.endTime.getRaw() & 0x1) == 1)) {
Micha Müller's avatar
Micha Müller committed
466
467
		      //starting job data
		      if (myJobDataStore->insertJob(jd) != DCDB::JD_OK) {
468
			      LOG(error) << "Job data insert for job " << jd.jobId << " failed!";
Micha Müller's avatar
Micha Müller committed
469
470
471
472
473
			      return 1;
		      }
	      } else {
		      //ending job data
		      DCDB::JobData tmp;
474
		      if (myJobDataStore->getJobById(tmp, jd.jobId, jd.domainId) != DCDB::JD_OK) {
475
			      LOG(error) << "Could not retrieve job " << jd.jobId << " to be updated!";
Micha Müller's avatar
Micha Müller committed
476
477
478
			      return 1;
		      }

479
		      if (myJobDataStore->updateEndtime(tmp.jobId, tmp.startTime, jd.endTime, jd.domainId) != DCDB::JD_OK) {
480
			      LOG(error) << "Could not update end time of job " << tmp.jobId << "!";
Micha Müller's avatar
Micha Müller committed
481
482
483
			      return 1;
		      }
	      }
484
      } else {
Micha Müller's avatar
Micha Müller committed
485
	      mqttPayload buf, *payload;
486

Micha Müller's avatar
Micha Müller committed
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
	      len = msg->getPayloadLength();
	      //In the 64 bit message case, the collect agent provides a timestamp
	      if (len == sizeof(uint64_t)) {
		      payload = &buf;
		      payload->value = *((int64_t *)msg->getPayload());
		      payload->timestamp = Messaging::calculateTimestamp();
		      len = sizeof(uint64_t) * 2;
	      }
	      //...otherwise it just retrieves it from the MQTT message payload.
	      else if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
		      payload = (mqttPayload *)msg->getPayload();
	      }
	      //...otherwise this message is malformed -> ignore...
	      else {
		      LOG(error) << "Message malformed";
		      return 1;
	      }
504

Micha Müller's avatar
Micha Müller committed
505
	      /*
506
507
508
509
           * Check if we can decode the message topic
           * into a valid SensorId. If successful, store
           * the record in the database.
           */
Micha Müller's avatar
Micha Müller committed
510
511
	      DCDB::SensorId sid;
	      if (sid.mqttTopicConvert(msg->getTopic())) {
512
513
514
515
516
517
518
519
520
521
522
523
#if 0
              cout << "Topic decode successful:" << endl
                  << "  Raw:            " << hex << setw(16) << setfill('0') << sid.getRaw()[0] << hex << setw(16) << setfill('0') << sid.getRaw()[1] << endl
                  << "  DeviceLocation: " << hex << setw(16) << setfill('0') << sid.getDeviceLocation() << endl
                  << "  device_id:      " << hex << setw(8) << setfill('0') << sid.getDeviceSensorId().device_id << endl
                  << "  sensor_number:  " << hex << setw(4) << setfill('0') << sid.getDeviceSensorId().sensor_number << endl << dec;

              cout << "Payload ("  << len/sizeof(mqttPayload) << " messages):"<< endl;
              for (uint64_t i=0; i<len/sizeof(mqttPayload); i++) {
                cout << "  " << i << ": ts=" << payload[i].timestamp << " val=" << payload[i].value << endl;
              }
              cout << endl;
524
#endif
525
	      std::list<DCDB::SensorDataStoreReading> readings;
Alessio Netti's avatar
Alessio Netti committed
526
527
528
529
530
531
          for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
              DCDB::SensorDataStoreReading r(sid, payload[i].timestamp, payload[i].value);
              readings.push_back(r);
              mySensorCache.storeSensor(sid, payload[i].timestamp, payload[i].value);
          }
          mySensorCache.getSensorMap()[sid].updateBatchSize(uint64_t(len / sizeof(mqttPayload)));
532
	      mySensorDataStore->insertBatch(readings, metadataStore->getTTL(msg->getTopic()));
533
	      readingCtr+= readings.size();
534

535
              //mySensorCache.dump();
536
537
          } else {
              LOG(error) << "Message with empty topic received";
538
          }
539
      }
540
  }
541
#endif
542
  return 0;
543
544
}

545
546
547



548
549
550
551
/*
 * Print usage information
 */
void usage() {
552
553
554
555
556
  /*
             1         2         3         4         5         6         7         8
   012345678901234567890123456789012345678901234567890123456789012345678901234567890
   */
  cout << "Usage:" << endl;
557
  cout << "  collectagent [-d] [-s] [-x] [-a] [-m<host>] [-c<host>] [-u<username>] [-p<password>] [-t<ttl>] [-v<verbosity>] <config>" << endl;
558
559
560
561
  cout << "  collectagent -h" << endl;
  cout << endl;
  
  cout << "Options:" << endl;
562
563
  cout << "  -m<host>      MQTT listen address     [default: " << DEFAULT_LISTENHOST << ":" << DEFAULT_LISTENPORT << "]" << endl;
  cout << "  -c<host>      Cassandra host          [default: " << DEFAULT_CASSANDRAHOST << ":" << DEFAULT_CASSANDRAPORT << "]" << endl;
Michael Ott's avatar
Michael Ott committed
564
565
  cout << "  -u<username>  Cassandra username      [default: none]" << endl;
  cout << "  -p<password>  Cassandra password      [default: none]" << endl;
566
567
  cout << "  -t<ttl>       Cassandra insert TTL    [default: " << DEFAULT_CASSANDRATTL << "]" << endl;
  cout << "  -v<level>     Set verbosity of output [default: " << DEFAULT_LOGLEVEL << "]" << endl
Alessio Netti's avatar
Logging    
Alessio Netti committed
568
       << "                Can be a number between 5 (all) and 0 (fatal)." << endl;
569
  cout << endl;
Michael Ott's avatar
Michael Ott committed
570
  cout << "  -d            Daemonize" << endl;
571
  cout << "  -s            Print message stats" <<endl;
572
  cout << "  -x            Parse and print the config but do not actually start collectagent" << endl;
573
  cout << "  -a			   Enable sensor auto-publish" << endl;
Michael Ott's avatar
Michael Ott committed
574
  cout << "  -h            This help page" << endl;
575
  cout << endl;
576
577
578
}

int main(int argc, char* const argv[]) {
579
    cout << "CollectAgent " << VERSION << " (libdcdb " << DCDB::Version::getVersion() << ")" << endl << endl;
580
581

  try{
582

583
584
      // Checking if path to config is supplied
      if (argc <= 1) {
585
          cout << "Please specify a path to the config-directory or a config-file" << endl << endl;
586
587
588
589
590
          usage();
          exit(EXIT_FAILURE);
      }

      // Defining options
591
      const char* opts = "m:r:c:C:u:p:t:v:dDsaxh";
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607

      // Same mechanism as in DCDBPusher - checking if help string is requested before loading config
      char ret;
      while ((ret = getopt(argc, argv, opts)) != -1) {
          switch (ret)
          {
              case 'h':
                  usage();
                  exit(EXIT_FAILURE);
                  break;
              default:
                  //do nothing (other options are read later on)
                  break;
          }
      }

608
609
      initLogging();
      auto cmdSink = setupCmdLogger();
Alessio Netti's avatar
Logging    
Alessio Netti committed
610

Alessio Netti's avatar
Alessio Netti committed
611
      Configuration config(argv[argc - 1], "collectagent.conf");
612
      config.readConfig();
613

Alessio Netti's avatar
Alessio Netti committed
614
615
616
617
      // References to shorten access to config parameters
      Configuration& settings = config;
      cassandra_t& cassandraSettings = config.cassandraSettings;
      pluginSettings_t& pluginSettings = config.pluginSettings;
Micha Mueller's avatar
Micha Mueller committed
618
      serverSettings_t& restAPISettings = config.restAPISettings;
Alessio Netti's avatar
Alessio Netti committed
619
      analyticsSettings_t& analyticsSettings = config.analyticsSettings;
Alessio Netti's avatar
Alessio Netti committed
620
      
621
622
      optind = 1;
      while ((ret=getopt(argc, argv, opts))!=-1) {
623
          switch(ret) {
624
              case 'a':
625
                  pluginSettings.autoPublish = true;
626
                  break;
627
              case 'm':
Alessio Netti's avatar
Alessio Netti committed
628
629
                  settings.mqttListenHost = parseNetworkHost(optarg);
                  settings.mqttListenPort = parseNetworkPort(optarg);
630
                  if(settings.mqttListenPort=="") settings.mqttListenPort = string(DEFAULT_LISTENPORT);
631
                  break;
632
              case 'c':
Alessio Netti's avatar
Alessio Netti committed
633
634
                  cassandraSettings.host = parseNetworkHost(optarg);
                  cassandraSettings.port = parseNetworkPort(optarg);
635
                  if(cassandraSettings.port=="") cassandraSettings.port = string(DEFAULT_CASSANDRAPORT);
636
                  break;
Michael Ott's avatar
Michael Ott committed
637
              case 'u':
Alessio Netti's avatar
Alessio Netti committed
638
                  cassandraSettings.username = optarg;
639
                  break;
Michael Ott's avatar
Michael Ott committed
640
              case 'p': {
Alessio Netti's avatar
Alessio Netti committed
641
                  cassandraSettings.password = optarg;
642
                  // What does this do? Mask the password?
Michael Ott's avatar
Michael Ott committed
643
644
645
646
647
648
649
                  size_t pwdLen = strlen(optarg);
                  memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
                  if (pwdLen > 3) {
                      memset(optarg+3, 0, pwdLen-3);
                  }
                  break;
              }
650
              case 't':
Alessio Netti's avatar
Alessio Netti committed
651
                  cassandraSettings.ttl = stoul(optarg);
652
                  break;
Alessio Netti's avatar
Logging    
Alessio Netti committed
653
              case 'v':
654
                  settings.logLevelCmd = stoi(optarg);
Alessio Netti's avatar
Logging    
Alessio Netti committed
655
                  break;
656
              case 'd':
657
              case 'D':
658
                  settings.daemonize = 1;
659
                  break;
660
              case 's':
661
                  settings.statisticsInterval = 1;
662
                  break;
663
              case 'x':
Alessio Netti's avatar
Alessio Netti committed
664
                  settings.validateConfig = true;
665
                  break;
666
              case 'h':
667
668
669
              default:
                  usage();
                  exit(EXIT_FAILURE);
670
671
672
          }
      }

673
674
      //set up logger to file
      if (settings.logLevelFile >= 0) {
Alessio Netti's avatar
Alessio Netti committed
675
	  auto fileSink = setupFileLogger(pluginSettings.tempdir, std::string("collectagent"));
676
677
678
	  fileSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelFile));
      }
      
Alessio Netti's avatar
Logging    
Alessio Netti committed
679
      //severity level may be overwritten (per option or config-file) --> set it according to globalSettings
680
681
682
      if (settings.logLevelCmd >= 0) {
	  cmdSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelCmd));
      }
Alessio Netti's avatar
Logging    
Alessio Netti committed
683

684
      /*
Alessio Netti's avatar
Alessio Netti committed
685
       * Catch SIGINT and SIGTERM signals to allow for proper server shutdowns.
686
687
       */
      signal(SIGINT, sigHandler);
Alessio Netti's avatar
Alessio Netti committed
688
      signal(SIGTERM, sigHandler);
Alessio Netti's avatar
Alessio Netti committed
689
      signal(SIGUSR1, sigHandler);
690
691
692
693
694
695
696
697
698
699

      /*
       * Catch critical signals to allow for backtraces
       */
      signal(SIGABRT, abrtHandler);
      signal(SIGSEGV, abrtHandler);

      // Daemonizing the collectagent
      if(settings.daemonize)
          dcdbdaemon();
Alessio Netti's avatar
Alessio Netti committed
700
      
701
      // Setting the size of the sensor cache
702
      // Conversion from milliseconds to nanoseconds
Alessio Netti's avatar
Alessio Netti committed
703
      mySensorCache.setMaxHistory(uint64_t(pluginSettings.cacheInterval) * 1000000);
704

705
      //Allocate and initialize connection to Cassandra.
Alessio Netti's avatar
Alessio Netti committed
706
707
      dcdbConn = new DCDB::Connection(cassandraSettings.host, atoi(cassandraSettings.port.c_str()), 
                                      cassandraSettings.username, cassandraSettings.password);
Alessio Netti's avatar
Alessio Netti committed
708
709
      dcdbConn->setNumThreadsIo(cassandraSettings.numThreadsIo);
      dcdbConn->setQueueSizeIo(cassandraSettings.queueSizeIo);
710
      uint32_t params[1] = {cassandraSettings.coreConnPerHost};
Alessio Netti's avatar
Alessio Netti committed
711
      dcdbConn->setBackendParams(params);
Alessio Netti's avatar
Alessio Netti committed
712
      
Axel Auweter's avatar
Axel Auweter committed
713
      if (!dcdbConn->connect()) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
714
          LOG(fatal) << "Cannot connect to Cassandra!";
715
716
717
718
719
720
          exit(EXIT_FAILURE);
      }

      /*
       * Legacy behavior: Initialize the DCDB schema in Cassandra.
       */
Axel Auweter's avatar
Axel Auweter committed
721
      dcdbConn->initSchema();
Alessio Netti's avatar
Alessio Netti committed
722
      
723
724
725
      /*
       * Allocate the SensorDataStore.
       */
726
      mySensorDataStore = new DCDB::SensorDataStore(dcdbConn);
727
      mySensorConfig = new DCDB::SensorConfig(dcdbConn);
Alessio Netti's avatar
Alessio Netti committed
728
      myJobDataStore = new DCDB::JobDataStore(dcdbConn);
729
      myCaliEvtDataStore = new DCDB::CaliEvtDataStore(dcdbConn);
730
731
732

      /*
       * Set TTL for data store inserts if TTL > 0.
733
       */
734
      if (cassandraSettings.ttl > 0) {
Alessio Netti's avatar
Alessio Netti committed
735
        mySensorDataStore->setTTL(cassandraSettings.ttl);
736
737
        myCaliEvtDataStore->setTTL(cassandraSettings.ttl);
      }
Alessio Netti's avatar
Alessio Netti committed
738
      mySensorDataStore->setDebugLog(cassandraSettings.debugLog);
739
      myCaliEvtDataStore->setDebugLog(cassandraSettings.debugLog);
740

741
742
      // Fetching public sensor information from the Cassandra datastore
      list<DCDB::PublicSensor> publicSensors;
743
744
745
746
747
      if(mySensorConfig->getPublicSensorsVerbose(publicSensors)!=SC_OK) {
	  LOG(error) << "Failed to retrieve public sensors!";
	  exit(EXIT_FAILURE);
      }
      
748
      metadataStore = new MetadataStore();
749
      SensorMetadata sBuf;
750
      for (const auto &s : publicSensors)
751
          if (!s.is_virtual) {
752
              sBuf = DCDB::PublicSensor::publicSensorToMetadata(s);
753
754
              if(sBuf.isValid())
                  metadataStore->store(*sBuf.getPattern(), sBuf);
755
          }
756
      publicSensors.clear();
757
          
758
759
      analyticsController = new AnalyticsController(mySensorConfig, mySensorDataStore);
      analyticsController->setCache(&mySensorCache);
760
      analyticsController->setMetadataStore(metadataStore);
Alessio Netti's avatar
Alessio Netti committed
761
      queryEngine.setFilter(analyticsSettings.filter);
Alessio Netti's avatar
Alessio Netti committed
762
      queryEngine.setJobFilter(analyticsSettings.jobFilter);
763
      queryEngine.setJobMatch(analyticsSettings.jobMatch);
764
      queryEngine.setJobIDFilter(analyticsSettings.jobIdFilter);
765
      queryEngine.setJobDomainId(analyticsSettings.jobDomainId);
Alessio Netti's avatar
Alessio Netti committed
766
      queryEngine.setSensorHierarchy(analyticsSettings.hierarchy);
767
      queryEngine.setQueryCallback(sensorQueryCallback);
768
      queryEngine.setGroupQueryCallback(sensorGroupQueryCallback);
769
      queryEngine.setMetadataQueryCallback(metadataQueryCallback);
Alessio Netti's avatar
Alessio Netti committed
770
      queryEngine.setJobQueryCallback(jobQueryCallback);
771
      if(!analyticsController->initialize(settings))
772
773
          return EXIT_FAILURE;
      
Alessio Netti's avatar
Alessio Netti committed
774
      LOG_LEVEL vLogLevel = settings.validateConfig ? LOG_LEVEL::info : LOG_LEVEL::debug;
775
776
777
778
      LOG_VAR(vLogLevel) << "-----  Configuration  -----";

      //print global settings in either case
      LOG(info) << "Global Settings:";
Alessio Netti's avatar
Alessio Netti committed
779
      LOG(info) << "    MQTT-listenAddress: " << settings.mqttListenHost << ":" << settings.mqttListenPort;
Alessio Netti's avatar
Alessio Netti committed
780
      LOG(info) << "    CacheInterval:      " << int(pluginSettings.cacheInterval/1000) << " [s]";
781
782
783
784
      LOG(info) << "    CleaningInterval:   " << settings.cleaningInterval << " [s]";
      LOG(info) << "    MessageThreads:     " << settings.messageThreads;
      LOG(info) << "    MessageSlots:       " << settings.messageSlots;
      LOG(info) << "    Daemonize:          " << (settings.daemonize ? "Enabled" : "Disabled");
785
      LOG(info) << "    StatisticsInterval: " << settings.statisticsInterval << " [s]";
Alessio Netti's avatar
Alessio Netti committed
786
      LOG(info) << "    MQTT-prefix:        " << pluginSettings.mqttPrefix;
787
      LOG(info) << "    Auto-publish:       " << (pluginSettings.autoPublish ? "Enabled" : "Disabled");
Alessio Netti's avatar
Alessio Netti committed
788
789
      LOG(info) << "    Write-Dir:          " << pluginSettings.tempdir;
      LOG(info) << (settings.validateConfig ? "    Only validating config files." : "    ValidateConfig:     Disabled");
790

Alessio Netti's avatar
Alessio Netti committed
791
792
      LOG(info) << "Analytics Settings:";
      LOG(info) << "    Hierarchy:          " << (analyticsSettings.hierarchy!="" ? analyticsSettings.hierarchy : "none");
793
      LOG(info) << "    Filter:             " << (analyticsSettings.filter!="" ? analyticsSettings.filter : "none");
794
795
      LOG(info) << "    Job Filter:         " << (analyticsSettings.jobFilter != "" ? analyticsSettings.jobFilter : "none");
      LOG(info) << "    Job Match:          " << (analyticsSettings.jobMatch != "" ? analyticsSettings.jobMatch : "none");
796
      LOG(info) << "    Job ID Filter:      " << (analyticsSettings.jobIdFilter != "" ? analyticsSettings.jobIdFilter : "none");
797
      LOG(info) << "    Job Domain ID:      " << analyticsSettings.jobDomainId;
Alessio Netti's avatar
Alessio Netti committed
798
      
799
      LOG(info) << "Cassandra Driver Settings:";
Alessio Netti's avatar
Alessio Netti committed
800
      LOG(info) << "    Address:            " << cassandraSettings.host << ":" << cassandraSettings.port;
Alessio Netti's avatar
Alessio Netti committed
801
802
803
804
805
      LOG(info) << "    TTL:                " << cassandraSettings.ttl;
      LOG(info) << "    NumThreadsIO:       " << cassandraSettings.numThreadsIo;
      LOG(info) << "    QueueSizeIO:        " << cassandraSettings.queueSizeIo;
      LOG(info) << "    CoreConnPerHost:    " << cassandraSettings.coreConnPerHost;
      LOG(info) << "    DebugLog:           " << (cassandraSettings.debugLog ? "Enabled" : "Disabled");
806
#ifdef SimpleMQTTVerbose
Alessio Netti's avatar
Alessio Netti committed
807
808
      LOG(info) << "    Username:           " << cassandraSettings.username;
	  LOG(info) << "    Password:           " << cassandraSettings.password;
809
810
811
812
#else
      LOG(info) << "    Username and password not printed.";
#endif

813
814
815
816
817
      if (restAPISettings.enabled) {
	  
	  LOG(info) << "RestAPI Settings:";
	  LOG(info) << "    REST Server: " << restAPISettings.host << ":" << restAPISettings.port;
	  LOG(info) << "    Certificate: " << restAPISettings.certificate;
Alessio Netti's avatar
Alessio Netti committed
818
	  LOG(info) << "    Private key file: " << restAPISettings.privateKey;
Michael Ott's avatar
Michael Ott committed
819
	  
820
	  if (config.influxSettings.measurements.size() > 0) {
Michael Ott's avatar
Michael Ott committed
821
	      LOG(info) << "InfluxDB Settings:";
822
	      LOG(info) << "    MQTT-Prefix:  " << config.influxSettings.mqttPrefix;
Michael Ott's avatar
Michael Ott committed
823
824
	      LOG(info) << "    Auto-Publish: " << (config.influxSettings.publish ? "Enabled" : "Disabled");

825
	      for (auto &m: config.influxSettings.measurements) {
Michael Ott's avatar
Michael Ott committed
826
		  LOG(info) << "    Measurement: " << m.first;
827
		  LOG(info) << "        MQTT-Part:   " << m.second.mqttPart;
Michael Ott's avatar
Michael Ott committed
828
		  LOG(info) << "        Tag:         " << m.second.tag;
Michael Ott's avatar
Michael Ott committed
829
830
831
832
833
834
		  if ((m.second.tagRegex.size() > 0) && (m.second.tagSubstitution.size() > 0)) {
		      if (m.second.tagSubstitution != "&") {
			  LOG(info) << "        TagFilter:   s/" << m.second.tagRegex.str() << "/" <<  m.second.tagSubstitution << "/";
		      } else {
			  LOG(info) << "    TagFilter:   " << m.second.tagRegex.str();
		      }
Michael Ott's avatar
Michael Ott committed
835
836
837
838
839
840
		  }
		  if (m.second.fields.size() > 0) {
		      stringstream ss;
		      copy(m.second.fields.begin(), m.second.fields.end(), ostream_iterator<std::string>(ss, ","));
		      string fields = ss.str();
		      fields.pop_back();
Michael Ott's avatar
Michael Ott committed
841
		      LOG(info) << "        Fields:      " << fields;
Michael Ott's avatar
Michael Ott committed
842
843
844
		  }
	      }
	  }
845
      }
846
847
      LOG_VAR(vLogLevel) << "-----  Analytics Configuration  -----";
      for(auto& p : analyticsController->getManager()->getPlugins()) {
848
          LOG_VAR(vLogLevel) << "Operator Plugin \"" << p.id << "\"";
849
850
851
852
          p.configurator->printConfig(vLogLevel);
      }
      LOG_VAR(vLogLevel) << "-----  End Configuration  -----";

Alessio Netti's avatar
Alessio Netti committed
853
      if (settings.validateConfig)
854
855
856
857
          return EXIT_SUCCESS;
      else
          analyticsController->start();

858
859
860
      /*
       * Start the MQTT Message Server.
       */
Alessio Netti's avatar
Alessio Netti committed
861
      SimpleMQTTServer ms(settings.mqttListenHost, settings.mqttListenPort, settings.messageThreads, settings.messageSlots);
862
      
863
864
865
      ms.setMessageCallback(mqttCallback);
      ms.start();

Alessio Netti's avatar
Logging    
Alessio Netti committed
866
      LOG(info) << "MQTT Server running...";
867
      
868
869
870
      /*
       * Start the HTTP Server for the REST API
       */
871
      if (restAPISettings.enabled) {
Michael Ott's avatar
Michael Ott committed
872
          httpsServer = new CARestAPI(restAPISettings, &config.influxSettings, &mySensorCache, mySensorDataStore, mySensorConfig, analyticsController, &ms);
873
874
875
          config.readRestAPIUsers(httpsServer);
          httpsServer->start();
          LOG(info) <<  "HTTP Server running...";
876
      }
877

878
879
880
      /*
       * Run (hopefully) forever...
       */
881
      newAutoPub = false;
882
      keepRunning = 1;
883
      uint64_t start, end;
884
      float elapsed;
885
886
      msgCtr = 0;
      readingCtr = 0;
887
888
      dbQueryCtr = 0;
      cachedQueryCtr = 0;
889

890
891
      start = getTimestamp();
      uint64_t lastCleanup = start;
892
      uint64_t sleepInterval = (settings.statisticsInterval > 0) ? settings.statisticsInterval : 60;
Alessio Netti's avatar
Alessio Netti committed
893

894
      LOG(info) << "Collect Agent running...";
895
      while(keepRunning) {
896
          start = getTimestamp();
897
          if(NS_TO_S(start) - NS_TO_S(lastCleanup) > settings.cleaningInterval) {
898
899
              uint64_t purged = mySensorCache.clean(S_TO_NS(settings.cleaningInterval));
              lastCleanup = start;
Alessio Netti's avatar
Alessio Netti committed
900
901
902
              if(purged > 0)
                  LOG(info) << "Cache: purged " << purged << " obsolete entries";
          }
903
904
905
906
          if(newAutoPub) {
              newAutoPub = false;
              mySensorConfig->setPublishedSensorsWritetime(getTimestamp());
          }
Alessio Netti's avatar
Alessio Netti committed
907

908
909
910
911
	  sleep(sleepInterval);

	  if((settings.statisticsInterval > 0) && keepRunning) {
	      /* not really thread safe but will do the job */
912
	      end = getTimestamp();
913
914
	      elapsed = (float)(NS_TO_S(end) - NS_TO_S(start));
	      float aIns = ceil(((float)analyticsController->getReadingCtr()) / elapsed);
915
916
	      float cacheReq = ceil(((float)cachedQueryCtr) / elapsed);
	      float dbReq = ceil(((float)dbQueryCtr) / elapsed);
917
918
919
	      float rIns = restAPISettings.enabled ? ceil(((float)httpsServer->getInfluxCounter()) / elapsed) : 0.0f;
	      float mIns = ceil(((float)readingCtr) / elapsed);
	      float mMsg = ceil(((float) msgCtr) / elapsed);
920
	      LOG(info) << "Performance: MQTT [" << std::fixed << std::setprecision(0) << mIns << " ins/s|" << mMsg << " msg/s]   REST [" << rIns << " ins/s]   Analytics [" << aIns << " ins/s]   Cache [" << cacheReq << " req/s]   DB [" << dbReq << " req/s]";
921
922
923
924
925
926
927
928
	      std::map<std::string, hostInfo_t> lastSeen = ms.collectLastSeen();
	      uint64_t connectedHosts = 0;
	      for (auto h: lastSeen) {
		  if (h.second.lastSeen >= end - S_TO_NS(settings.statisticsInterval)) {
		      connectedHosts++;
		  }
	      }
	      LOG(info) << "Connected hosts: " << connectedHosts;
929
	      msgCtr = 0;
930
931
	      cachedQueryCtr = 0;
	      dbQueryCtr = 0;
932
933
	      readingCtr = 0;
	  }
934
935
      }

Alessio Netti's avatar
Logging    
Alessio Netti committed
936
      LOG(info) << "Stopping...";
937
      analyticsController->stop();
938
      ms.stop();
Alessio Netti's avatar
Logging    
Alessio Netti committed
939
      LOG(info) << "MQTT Server stopped...";
940
941
942
943
944
      if (restAPISettings.enabled) {
	  httpsServer->stop();
	  delete httpsServer;
	  LOG(info) << "HTTP Server stopped...";
      }