collectagent.cpp 39.5 KB
Newer Older
1
2
//================================================================================
// Name        : collectagent.cpp
3
// Author      : Axel Auweter
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
// Copyright   : Leibniz Supercomputing Centre
6
// Description : Main code of the CollectAgent
7
8
9
10
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
11
// Copyright (C) 2011-2019 Leibniz Supercomputing Centre
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
27

28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
 * @defgroup ca Collect Agent
 *
 * @brief MQTT message broker in between pusher and storage backend.
 *
 * @details Collect Agent is a intermediary between one or multiple Pusher
 *          instances and one storage backend. It runs a reduced custom MQTT
 *          message server. Collect Agent receives data from Pusher
 *          via MQTT messages and stores them in the storage via libdcdb.
 */

/**
 * @file collectagent.cpp
 *
 * @brief Main function for the DCDB Collect Agent.
 *
 * @ingroup ca
 */

47
#include <cstdlib>
48
#include <signal.h>
49
50
#include <unistd.h>
#include <string>
51

52
#include <boost/date_time/posix_time/posix_time.hpp>
Micha Müller's avatar
Micha Müller committed
53
54
55
#include <boost/foreach.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>
56

57
#include <dcdb/connection.h>
58
#include <dcdb/sensordatastore.h>
Alessio Netti's avatar
Alessio Netti committed
59
#include <dcdb/jobdatastore.h>
60
#include <dcdb/calievtdatastore.h>
61
#include <dcdb/sensorconfig.h>
62
#include <dcdb/version.h>
63
#include <dcdb/sensor.h>
64
#include "version.h"
65

66
#include "CARestAPI.h"
67
#include "configuration.h"
68
#include "simplemqttserver.h"
69
#include "messaging.h"
70
#include "abrt.h"
71
#include "dcdbdaemon.h"
72
#include "sensorcache.h"
73
74
#include "analyticscontroller.h"
#include "../analytics/includes/QueryEngine.h"
75

76
77
78
#define __STDC_FORMAT_MACROS
#include <inttypes.h>

79
80
81
82
83
84
85
/**
 * Uncomment and recompile to activate CollectAgent's special benchmark mode.
 * In this mode, all received messages will be discarded and no data is stored
 * in the storage backend.
 */
//#define BENCHMARK_MODE

86
using namespace std;
87

88
bool newAutoPub;
89
int keepRunning;
Alessio Netti's avatar
Alessio Netti committed
90
int retCode = EXIT_SUCCESS;
91
uint64_t msgCtr;
92
uint64_t readingCtr;
93
94
uint64_t dbQueryCtr;
uint64_t cachedQueryCtr;
Alessio Netti's avatar
Alessio Netti committed
95
SensorCache mySensorCache;
96
AnalyticsController* analyticsController;
97
DCDB::Connection* dcdbConn;
98
DCDB::SensorDataStore *mySensorDataStore;
Alessio Netti's avatar
Alessio Netti committed
99
DCDB::JobDataStore *myJobDataStore;
100
DCDB::SensorConfig *mySensorConfig;
101
DCDB::CaliEvtDataStore *myCaliEvtDataStore;
102
MetadataStore *metadataStore;
Alessio Netti's avatar
Alessio Netti committed
103
CARestAPI* httpsServer = nullptr;
104
DCDB::SCError err;
105
QueryEngine& queryEngine = QueryEngine::getInstance();
106
boost::shared_ptr<boost::asio::io_context::work> keepAliveWork;
107
logger_t lg;
108

109
bool jobQueryCallback(const string& jobId, const uint64_t startTs, const uint64_t endTs, vector<qeJobData>& buffer, const bool rel, const bool range, const string& domainId) {
Alessio Netti's avatar
Alessio Netti committed
110
111
112
113
114
115
116
117
118
119
    std::list<JobData> tempList;
    JobData   tempData;
    qeJobData tempQeData;
    JDError err;
    if(range) {
        // Getting a list of jobs in the given time range
        uint64_t now = getTimestamp();
        uint64_t startTsInt = rel ? now - startTs : startTs;
        uint64_t endTsInt = rel ? now - endTs : endTs;
        DCDB::TimeStamp start(startTsInt), end(endTsInt);
120
        err = myJobDataStore->getJobsInIntervalRunning(tempList, start, end, domainId);
121
        if(err != JD_OK) return false;
Alessio Netti's avatar
Alessio Netti committed
122
123
    } else {
        // Getting a single job by id
124
        err = myJobDataStore->getJobById(tempData, jobId, domainId);
125
        if(err != JD_OK) return false;
Alessio Netti's avatar
Alessio Netti committed
126
127
128
129
        tempList.push_back(tempData);
    }
    
    for(auto& jd : tempList) {
130
        tempQeData.domainId = jd.domainId;
Alessio Netti's avatar
Alessio Netti committed
131
132
133
134
135
        tempQeData.jobId = jd.jobId;
        tempQeData.userId = jd.userId;
        tempQeData.startTime = jd.startTime.getRaw();
        tempQeData.endTime = jd.endTime.getRaw();
        tempQeData.nodes = jd.nodes;
136
        buffer.push_back(tempQeData);
Alessio Netti's avatar
Alessio Netti committed
137
    }
138
    return true;
Alessio Netti's avatar
Alessio Netti committed
139
140
}

141
bool sensorGroupQueryCallback(const std::vector<string>& names, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>& buffer, const bool rel, const uint64_t tol) {
142
143
144
    // Returning NULL if the query engine is being updated
    if(queryEngine.updating.load()) return false;
    ++queryEngine.access;
145
146
147
148
149
150
151
152
153
154
155
156
157
158
    
    std::list<DCDB::SensorId> topics;
    std::string topic;
    sensorCache_t& sensorMap = mySensorCache.getSensorMap();
    size_t successCtr = 0;
    for(const auto& name : names) {
        // Getting the topic of the queried sensor from the Navigator
        // If not found, we try to use the input name as topic
        try {
            topic = queryEngine.getNavigator()->getNodeTopic(name);
        } catch (const std::domain_error &e) { topic = name; }
        DCDB::SensorId sid;
        // Creating a SID to perform the query
        if (sid.mqttTopicConvert(topic)) {
159
            mySensorCache.wait();
160
            if (sensorMap.count(sid) > 0 && sensorMap[sid].getView(startTs, endTs, buffer, rel, tol)) {
161
162
163
164
                // Data was found, can continue to next SID
                successCtr++;
            } else {
                // This happens only if no data was found in the local cache
165
166
                topics.push_back(sid);
            }
167
            mySensorCache.release();
168
        }
169
    }
170
171
172
173
    if (successCtr) {
	cachedQueryCtr+= buffer.size();
    }
    
174
175
176
177
178
179
180
181
182
183
184
185
186
    // If we are here then some sensors were not found in the cache - we need to fetch data from Cassandra
    if(!topics.empty()) {
        try {
            std::list <DCDB::SensorDataStoreReading> results;
            uint64_t now = getTimestamp();
            //Converting relative timestamps to absolute
            uint64_t startTsInt = rel ? now - startTs : startTs;
            uint64_t endTsInt = rel ? now - endTs : endTs;
            DCDB::TimeStamp start(startTsInt), end(endTsInt);
            uint16_t startWs=start.getWeekstamp(), endWs=end.getWeekstamp();
            // If timestamps are equal we perform a fuzzy query
            if(startTsInt == endTsInt) {
                topics.front().setRsvd(startWs);
187
                mySensorDataStore->fuzzyQuery(results, topics, start, tol, false);
188
189
190
191
192
            }
            // Else, we iterate over the weekstamps (if more than one) and perform range queries
            else {
                for(uint16_t currWs=startWs; currWs<=endWs; currWs++) {
                    topics.front().setRsvd(currWs);
193
                    mySensorDataStore->query(results, topics, start, end, DCDB::AGGREGATE_NONE, false);
194
195
196
197
198
199
                }
            }

            if (!results.empty()) {
                successCtr++;
                reading_t reading;
200
		dbQueryCtr+= results.size();
201
202
203
204
205
206
                for (const auto &r : results) {
                    reading.value = r.value;
                    reading.timestamp = r.timeStamp.getRaw();
                    buffer.push_back(reading);
                }
            }
207
        }
208
        catch (const std::exception &e) {}
209
    }
210
    
211
    --queryEngine.access;
212
213
214
    return successCtr>0;
}

215
bool sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>& buffer, const bool rel, const uint64_t tol) {
216
217
218
219
    // Returning NULL if the query engine is being updated
    if(queryEngine.updating.load()) return false;
    std::vector<std::string> nameWrapper;
    nameWrapper.push_back(name);
220
    return sensorGroupQueryCallback(nameWrapper, startTs, endTs, buffer, rel, tol);
221
222
}

223
224
225
226
227
228
229
230
231
232
233
bool metadataQueryCallback(const string& name, SensorMetadata& buffer) {
    // Returning NULL if the query engine is being updated
    if(queryEngine.updating.load()) return false;
    ++queryEngine.access;
    std::string topic=name;
    // Getting the topic of the queried sensor from the Navigator
    // If not found, we try to use the input name as topic
    try {
        topic = queryEngine.getNavigator()->getNodeTopic(name);
    } catch(const std::domain_error& e) {}
    
234
    bool local = false;
235
236
237
238
    metadataStore->wait();
    if(metadataStore->getMap().count(topic)) {
        buffer = metadataStore->get(topic);
        local = true;
239
    }
240
241
    metadataStore->release();
        
242
    if(!local) {
243
244
245
246
247
248
249
        // If we are here then the sensor was not found in the cache - we need to fetch data from Cassandra
        try {
            DCDB::PublicSensor publicSensor;
            if (mySensorConfig->getPublicSensorByName(publicSensor, topic.c_str()) != SC_OK) {
                --queryEngine.access;
                return false;
            }
250
            buffer = DCDB::PublicSensor::publicSensorToMetadata(publicSensor);
251
252
253
254
255
256
257
258
259
260
        }
        catch (const std::exception &e) {
            --queryEngine.access;
            return false;
        }
    }
    --queryEngine.access;
    return true;
}

261
/* Normal termination (SIGINT, CTRL+C) */
262
263
void sigHandler(int sig)
{
Alessio Netti's avatar
Alessio Netti committed
264
  boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
Alessio Netti's avatar
Alessio Netti committed
265
  if( sig == SIGINT ) {
Alessio Netti's avatar
Alessio Netti committed
266
      LOG(fatal) << "Received SIGINT";
Alessio Netti's avatar
Alessio Netti committed
267
268
      retCode = EXIT_SUCCESS;
  } else if( sig == SIGTERM ) {
Alessio Netti's avatar
Alessio Netti committed
269
      LOG(fatal) << "Received SIGTERM";
Alessio Netti's avatar
Alessio Netti committed
270
271
272
273
274
      retCode = EXIT_SUCCESS;
  } else if( sig == SIGUSR1 ) {
      LOG(fatal) << "Received SIGUSR1 via REST API";
      retCode = !httpsServer ? EXIT_SUCCESS : httpsServer->getReturnCode();
  }
275
276

  keepAliveWork.reset();
277
278
279
  keepRunning = 0;
}

280
281
282
283
284
285
/* Crash */
void abrtHandler(int sig)
{
  abrt(EXIT_FAILURE, SIGNAL);
}

286
int mqttCallback(SimpleMQTTMessage *msg)
287
288
{
  /*
Michael Ott's avatar
Michael Ott committed
289
   * Increment the msgCtr for statistics.
290
   */
291
  msgCtr++;
292

293
294
#ifndef BENCHMARK_MODE

295
  uint64_t len;
296
297
298
  /*
   * Decode the message and put into the database.
   */
299
  if (msg->isPublish()) {
300
      const char *topic = msg->getTopic().c_str();
301
      // We check whether the topic includes the /DCDB_MAP/ keyword, indicating that the payload will contain the
302
303
      // sensor's name. In that case, we set the mappingMessage flag to true, and filter the keyword out of the prefix
      // We use strncmp as it is the most efficient way to do it
Alessio Netti's avatar
Alessio Netti committed
304
      if (strncmp(topic, DCDB_MAP, DCDB_MAP_LEN) == 0) {
305
          if ((len = msg->getPayloadLength()) == 0) {
306
              LOG(error) << "Empty sensor publish message received!";
307
308
              return 1;
          }
309

310
          string payload((char *) msg->getPayload(), len);
311
312
313
314
315
316
317
318
319
          //If the topic includes the extended /DCDB_MAP/METADATA/ keyword, we assume a JSON metadata packet is encoded
          if(strncmp(topic, DCDB_MET, DCDB_MET_LEN) == 0) {
              SensorMetadata sm;
              try {
                  sm.parseJSON(payload);
              } catch (const std::exception &e) {
                  LOG(error) << "Invalid metadata packed received!";
                  return 1;
              }
320
              if(sm.isValid()) {
321
                  err = mySensorConfig->publishSensor(sm);
322
323
                  metadataStore->store(*sm.getPattern(), sm);
              }
324
325
          } else {
              err = mySensorConfig->publishSensor(payload.c_str(), topic + DCDB_MAP_LEN);
326
          }
327
328
329
330

          // PublishSensor does most of the error checking for us
          switch (err) {
              case DCDB::SC_INVALIDPATTERN:
Alessio Netti's avatar
Logging    
Alessio Netti committed
331
                  LOG(error) << "Invalid sensor topic : " << msg->getTopic();
332
333
                  return 1;
              case DCDB::SC_INVALIDPUBLICNAME:
334
                  LOG(error) << "Invalid sensor public name.";
335
336
                  return 1;
              case DCDB::SC_INVALIDSESSION:
Alessio Netti's avatar
Logging    
Alessio Netti committed
337
                  LOG(error) << "Cannot reach sensor data store.";
338
339
340
341
                  return 1;
              default:
                  break;
          }
342
343
          
          newAutoPub = true;
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
      } else if (strncmp(topic, DCDB_CALIEVT, DCDB_CALIEVT_LEN) == 0) {
          /*
           * Special message case. This message contains a Caliper Event data
           * string that is encoded in the MQTT topic. Its payload consists of
           * usual timestamp-value pairs.
           * Data from this messages will be stored in a separate table managed
           * by CaliEvtDataStore class.
           */
          std::string topicStr(msg->getTopic());
          mqttPayload buf, *payload;

          len = msg->getPayloadLength();
          //TODO support case that no timestamp is given?
          //retrieve timestamp and value from the payload
          if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
              payload = (mqttPayload *) msg->getPayload();
          } else {
              //this message is malformed -> ignore
              LOG(error) << "Message malformed";
              return 1;
          }

          /*
           * Decode message topic in actual sensor topic and string data.
           * "/:/" is used as delimiter between topic and data.
           */
          topicStr.erase(0, DCDB_CALIEVT_LEN);
          size_t pos = topicStr.find("/:/");
          if (pos == std::string::npos) {
              // topic is malformed -> ignore
              LOG(error) << "CaliEvt topic malformed";
              return 1;
          }
          const std::string data(topicStr, pos+3);
          topicStr.erase(pos);

          /*
           * We use the same MQTT-topic/SensorId infrastructure as actual sensor
           * data readings to sort related events.
           * Check if we can decode the event topic into a valid SensorId. If
           * successful, store the record in the database.
           */
          DCDB::SensorId sid;
          if (sid.mqttTopicConvert(topicStr)) {
388
              //std::list<DCDB::CaliEvtData> events;
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
              DCDB::CaliEvtData e;
              e.eventId   = sid;
              e.event     = data;
              for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
                  e.timeStamp = payload[i].timestamp;

                  /**
                   * We want an exhaustive list of all events ordered by their
                   * time of occurrence. Payload values should always be
                   * one. Other values currently indicate a malformed message.
                   *
                   * In the future, the value field could be used to aggregate
                   * multiple equal events that occurred in the same plugin
                   * read cycle.
                   */
                  if(payload[i].value != 1) {
                      LOG(error) << "CaliEvt message malformed. Value != 1";
                      return 1;
                  }

409
410
                  myCaliEvtDataStore->insert(e, metadataStore->getTTL(topicStr));
                  //events.push_back(e);
411
              }
412
              //myCaliEvtDataStore->insertBatch(events, metadataStore->getTTL(topicStr));
413
414
415
          } else {
              LOG(error) << "Topic could not be converted to SID";
          }
Micha Müller's avatar
Micha Müller committed
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
      } else if (strncmp(topic, DCDB_JOBDATA, DCDB_JOBDATA_LEN) == 0) {
	      /**
           * This message contains Slurm job data in JSON format. We need to
           * parse the payload and store it within the JobDataStore.
           */
	      if ((len = msg->getPayloadLength()) == 0) {
		      LOG(error) << "Empty job data message received!";
		      return 1;
	      }

	      //parse JSON into JobData object
	      string        payload((char *)msg->getPayload(), len);
	      DCDB::JobData jd;
	      try {
		      boost::property_tree::iptree config;
		      std::istringstream           str(payload);
		      boost::property_tree::read_json(str, config);
		      BOOST_FOREACH (boost::property_tree::iptree::value_type &val, config) {
			      if (boost::iequals(val.first, "jobid")) {
				      jd.jobId = val.second.data();
436
437
438
			      } else if (boost::iequals(val.first, "domainid")) {
                      jd.domainId = val.second.data();
                  } else if (boost::iequals(val.first, "userid")) {
Micha Müller's avatar
Micha Müller committed
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
				      jd.userId = val.second.data();
			      } else if (boost::iequals(val.first, "starttime")) {
				      jd.startTime = DCDB::TimeStamp((uint64_t)stoull(val.second.data()));
			      } else if (boost::iequals(val.first, "endtime")) {
				      jd.endTime = DCDB::TimeStamp((uint64_t)stoull(val.second.data()));
			      } else if (boost::iequals(val.first, "nodes")) {
				      BOOST_FOREACH (boost::property_tree::iptree::value_type &node, val.second) {
					      jd.nodes.push_back(node.second.data());
				      }
			      }
		      }
	      } catch (const std::exception &e) {
		      LOG(error) << "Invalid job data packet received!";
		      return 1;
	      }

#ifdef DEBUG
	      LOG(debug) << "JobId  = " << jd.jobId;
	      LOG(debug) << "UserId = " << jd.userId;
	      LOG(debug) << "Start  = " << jd.startTime.getString();
	      LOG(debug) << "End    = " << jd.endTime.getString();
	      LOG(debug) << "Nodes: ";
	      for (const auto &n : jd.nodes) {
		      LOG(debug) << "    " << n;
	      }
#endif

	      //store JobData into Storage Backend
467
468
	      //dcdbslurmjob start inserts the endTime as startTime + max job length + 1, i.e. the last digit will be 1 
	      if ((jd.endTime == DCDB::TimeStamp((uint64_t)0)) || ((jd.endTime.getRaw() & 0x1) == 1)) {
Micha Müller's avatar
Micha Müller committed
469
470
		      //starting job data
		      if (myJobDataStore->insertJob(jd) != DCDB::JD_OK) {
471
			      LOG(error) << "Job data insert for job " << jd.jobId << " failed!";
Micha Müller's avatar
Micha Müller committed
472
473
474
475
476
			      return 1;
		      }
	      } else {
		      //ending job data
		      DCDB::JobData tmp;
477
		      if (myJobDataStore->getJobById(tmp, jd.jobId, jd.domainId) != DCDB::JD_OK) {
478
			      LOG(error) << "Could not retrieve job " << jd.jobId << " to be updated!";
Micha Müller's avatar
Micha Müller committed
479
480
481
			      return 1;
		      }

482
		      if (myJobDataStore->updateEndtime(tmp.jobId, tmp.startTime, jd.endTime, jd.domainId) != DCDB::JD_OK) {
483
			      LOG(error) << "Could not update end time of job " << tmp.jobId << "!";
Micha Müller's avatar
Micha Müller committed
484
485
486
			      return 1;
		      }
	      }
487
      } else {
Micha Müller's avatar
Micha Müller committed
488
	      mqttPayload buf, *payload;
489

Micha Müller's avatar
Micha Müller committed
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
	      len = msg->getPayloadLength();
	      //In the 64 bit message case, the collect agent provides a timestamp
	      if (len == sizeof(uint64_t)) {
		      payload = &buf;
		      payload->value = *((int64_t *)msg->getPayload());
		      payload->timestamp = Messaging::calculateTimestamp();
		      len = sizeof(uint64_t) * 2;
	      }
	      //...otherwise it just retrieves it from the MQTT message payload.
	      else if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
		      payload = (mqttPayload *)msg->getPayload();
	      }
	      //...otherwise this message is malformed -> ignore...
	      else {
		      LOG(error) << "Message malformed";
		      return 1;
	      }
507

Micha Müller's avatar
Micha Müller committed
508
	      /*
509
510
511
512
           * Check if we can decode the message topic
           * into a valid SensorId. If successful, store
           * the record in the database.
           */
Micha Müller's avatar
Micha Müller committed
513
514
	      DCDB::SensorId sid;
	      if (sid.mqttTopicConvert(msg->getTopic())) {
515
516
517
518
519
520
521
522
523
524
525
526
#if 0
              cout << "Topic decode successful:" << endl
                  << "  Raw:            " << hex << setw(16) << setfill('0') << sid.getRaw()[0] << hex << setw(16) << setfill('0') << sid.getRaw()[1] << endl
                  << "  DeviceLocation: " << hex << setw(16) << setfill('0') << sid.getDeviceLocation() << endl
                  << "  device_id:      " << hex << setw(8) << setfill('0') << sid.getDeviceSensorId().device_id << endl
                  << "  sensor_number:  " << hex << setw(4) << setfill('0') << sid.getDeviceSensorId().sensor_number << endl << dec;

              cout << "Payload ("  << len/sizeof(mqttPayload) << " messages):"<< endl;
              for (uint64_t i=0; i<len/sizeof(mqttPayload); i++) {
                cout << "  " << i << ": ts=" << payload[i].timestamp << " val=" << payload[i].value << endl;
              }
              cout << endl;
527
#endif
528
	      std::list<DCDB::SensorDataStoreReading> readings;
Alessio Netti's avatar
Alessio Netti committed
529
530
531
532
533
534
          for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
              DCDB::SensorDataStoreReading r(sid, payload[i].timestamp, payload[i].value);
              readings.push_back(r);
              mySensorCache.storeSensor(sid, payload[i].timestamp, payload[i].value);
          }
          mySensorCache.getSensorMap()[sid].updateBatchSize(uint64_t(len / sizeof(mqttPayload)));
535
	      mySensorDataStore->insertBatch(readings, metadataStore->getTTL(msg->getTopic()));
536
	      readingCtr+= readings.size();
537

538
              //mySensorCache.dump();
539
540
          } else {
              LOG(error) << "Message with empty topic received";
541
          }
542
      }
543
  }
544
#endif
545
  return 0;
546
547
}

548
549
550



551
552
553
554
/*
 * Print usage information
 */
void usage() {
555
556
557
558
559
  /*
             1         2         3         4         5         6         7         8
   012345678901234567890123456789012345678901234567890123456789012345678901234567890
   */
  cout << "Usage:" << endl;
560
  cout << "  collectagent [-d] [-s] [-x] [-a] [-m<host>] [-c<host>] [-u<username>] [-p<password>] [-t<ttl>] [-v<verbosity>] <config>" << endl;
561
562
563
564
  cout << "  collectagent -h" << endl;
  cout << endl;
  
  cout << "Options:" << endl;
565
566
  cout << "  -m<host>      MQTT listen address     [default: " << DEFAULT_LISTENHOST << ":" << DEFAULT_LISTENPORT << "]" << endl;
  cout << "  -c<host>      Cassandra host          [default: " << DEFAULT_CASSANDRAHOST << ":" << DEFAULT_CASSANDRAPORT << "]" << endl;
Michael Ott's avatar
Michael Ott committed
567
568
  cout << "  -u<username>  Cassandra username      [default: none]" << endl;
  cout << "  -p<password>  Cassandra password      [default: none]" << endl;
569
570
  cout << "  -t<ttl>       Cassandra insert TTL    [default: " << DEFAULT_CASSANDRATTL << "]" << endl;
  cout << "  -v<level>     Set verbosity of output [default: " << DEFAULT_LOGLEVEL << "]" << endl
Alessio Netti's avatar
Logging    
Alessio Netti committed
571
       << "                Can be a number between 5 (all) and 0 (fatal)." << endl;
572
  cout << endl;
Michael Ott's avatar
Michael Ott committed
573
  cout << "  -d            Daemonize" << endl;
574
  cout << "  -s            Print message stats" <<endl;
575
  cout << "  -x            Parse and print the config but do not actually start collectagent" << endl;
576
  cout << "  -a			   Enable sensor auto-publish" << endl;
Michael Ott's avatar
Michael Ott committed
577
  cout << "  -h            This help page" << endl;
578
  cout << endl;
579
580
581
}

int main(int argc, char* const argv[]) {
582
    cout << "CollectAgent " << VERSION << " (libdcdb " << DCDB::Version::getVersion() << ")" << endl << endl;
583
584

  try{
585

586
587
      // Checking if path to config is supplied
      if (argc <= 1) {
588
          cout << "Please specify a path to the config-directory or a config-file" << endl << endl;
589
590
591
592
593
          usage();
          exit(EXIT_FAILURE);
      }

      // Defining options
594
      const char* opts = "m:r:c:C:u:p:t:v:dDsaxh";
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610

      // Same mechanism as in DCDBPusher - checking if help string is requested before loading config
      char ret;
      while ((ret = getopt(argc, argv, opts)) != -1) {
          switch (ret)
          {
              case 'h':
                  usage();
                  exit(EXIT_FAILURE);
                  break;
              default:
                  //do nothing (other options are read later on)
                  break;
          }
      }

611
612
      initLogging();
      auto cmdSink = setupCmdLogger();
Alessio Netti's avatar
Logging    
Alessio Netti committed
613

Alessio Netti's avatar
Alessio Netti committed
614
      Configuration config(argv[argc - 1], "collectagent.conf");
615
      config.readConfig();
616

Alessio Netti's avatar
Alessio Netti committed
617
618
619
620
      // References to shorten access to config parameters
      Configuration& settings = config;
      cassandra_t& cassandraSettings = config.cassandraSettings;
      pluginSettings_t& pluginSettings = config.pluginSettings;
Micha Mueller's avatar
Micha Mueller committed
621
      serverSettings_t& restAPISettings = config.restAPISettings;
Alessio Netti's avatar
Alessio Netti committed
622
      analyticsSettings_t& analyticsSettings = config.analyticsSettings;
Alessio Netti's avatar
Alessio Netti committed
623
      
624
625
      optind = 1;
      while ((ret=getopt(argc, argv, opts))!=-1) {
626
          switch(ret) {
627
              case 'a':
628
                  pluginSettings.autoPublish = true;
629
                  break;
630
              case 'm':
Alessio Netti's avatar
Alessio Netti committed
631
632
                  settings.mqttListenHost = parseNetworkHost(optarg);
                  settings.mqttListenPort = parseNetworkPort(optarg);
633
                  if(settings.mqttListenPort=="") settings.mqttListenPort = string(DEFAULT_LISTENPORT);
634
                  break;
635
              case 'c':
Alessio Netti's avatar
Alessio Netti committed
636
637
                  cassandraSettings.host = parseNetworkHost(optarg);
                  cassandraSettings.port = parseNetworkPort(optarg);
638
                  if(cassandraSettings.port=="") cassandraSettings.port = string(DEFAULT_CASSANDRAPORT);
639
                  break;
Michael Ott's avatar
Michael Ott committed
640
              case 'u':
Alessio Netti's avatar
Alessio Netti committed
641
                  cassandraSettings.username = optarg;
642
                  break;
Michael Ott's avatar
Michael Ott committed
643
              case 'p': {
Alessio Netti's avatar
Alessio Netti committed
644
                  cassandraSettings.password = optarg;
645
                  // What does this do? Mask the password?
Michael Ott's avatar
Michael Ott committed
646
647
648
649
650
651
652
                  size_t pwdLen = strlen(optarg);
                  memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
                  if (pwdLen > 3) {
                      memset(optarg+3, 0, pwdLen-3);
                  }
                  break;
              }
653
              case 't':
Alessio Netti's avatar
Alessio Netti committed
654
                  cassandraSettings.ttl = stoul(optarg);
655
                  break;
Alessio Netti's avatar
Logging    
Alessio Netti committed
656
              case 'v':
657
                  settings.logLevelCmd = stoi(optarg);
Alessio Netti's avatar
Logging    
Alessio Netti committed
658
                  break;
659
              case 'd':
660
              case 'D':
661
                  settings.daemonize = 1;
662
                  break;
663
              case 's':
664
                  settings.statisticsInterval = 1;
665
                  break;
666
              case 'x':
Alessio Netti's avatar
Alessio Netti committed
667
                  settings.validateConfig = true;
668
                  break;
669
              case 'h':
670
671
672
              default:
                  usage();
                  exit(EXIT_FAILURE);
673
674
675
          }
      }

676
677
      //set up logger to file
      if (settings.logLevelFile >= 0) {
Alessio Netti's avatar
Alessio Netti committed
678
	  auto fileSink = setupFileLogger(pluginSettings.tempdir, std::string("collectagent"));
679
680
681
	  fileSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelFile));
      }
      
Alessio Netti's avatar
Logging    
Alessio Netti committed
682
      //severity level may be overwritten (per option or config-file) --> set it according to globalSettings
683
684
685
      if (settings.logLevelCmd >= 0) {
	  cmdSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelCmd));
      }
Alessio Netti's avatar
Logging    
Alessio Netti committed
686

687
      /*
Alessio Netti's avatar
Alessio Netti committed
688
       * Catch SIGINT and SIGTERM signals to allow for proper server shutdowns.
689
690
       */
      signal(SIGINT, sigHandler);
Alessio Netti's avatar
Alessio Netti committed
691
      signal(SIGTERM, sigHandler);
Alessio Netti's avatar
Alessio Netti committed
692
      signal(SIGUSR1, sigHandler);
693
694
695
696
697
698
699
700
701
702

      /*
       * Catch critical signals to allow for backtraces
       */
      signal(SIGABRT, abrtHandler);
      signal(SIGSEGV, abrtHandler);

      // Daemonizing the collectagent
      if(settings.daemonize)
          dcdbdaemon();
Alessio Netti's avatar
Alessio Netti committed
703
      
704
      // Setting the size of the sensor cache
705
      // Conversion from milliseconds to nanoseconds
Alessio Netti's avatar
Alessio Netti committed
706
      mySensorCache.setMaxHistory(uint64_t(pluginSettings.cacheInterval) * 1000000);
707

708
      //Allocate and initialize connection to Cassandra.
Alessio Netti's avatar
Alessio Netti committed
709
710
      dcdbConn = new DCDB::Connection(cassandraSettings.host, atoi(cassandraSettings.port.c_str()), 
                                      cassandraSettings.username, cassandraSettings.password);
Alessio Netti's avatar
Alessio Netti committed
711
712
      dcdbConn->setNumThreadsIo(cassandraSettings.numThreadsIo);
      dcdbConn->setQueueSizeIo(cassandraSettings.queueSizeIo);
713
      uint32_t params[1] = {cassandraSettings.coreConnPerHost};
Alessio Netti's avatar
Alessio Netti committed
714
      dcdbConn->setBackendParams(params);
Alessio Netti's avatar
Alessio Netti committed
715
      
Axel Auweter's avatar
Axel Auweter committed
716
      if (!dcdbConn->connect()) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
717
          LOG(fatal) << "Cannot connect to Cassandra!";
718
719
720
721
722
723
          exit(EXIT_FAILURE);
      }

      /*
       * Legacy behavior: Initialize the DCDB schema in Cassandra.
       */
Axel Auweter's avatar
Axel Auweter committed
724
      dcdbConn->initSchema();
Alessio Netti's avatar
Alessio Netti committed
725
      
726
727
728
      /*
       * Allocate the SensorDataStore.
       */
729
      mySensorDataStore = new DCDB::SensorDataStore(dcdbConn);
730
      mySensorConfig = new DCDB::SensorConfig(dcdbConn);
Alessio Netti's avatar
Alessio Netti committed
731
      myJobDataStore = new DCDB::JobDataStore(dcdbConn);
732
      myCaliEvtDataStore = new DCDB::CaliEvtDataStore(dcdbConn);
733
734
735

      /*
       * Set TTL for data store inserts if TTL > 0.
736
       */
737
      if (cassandraSettings.ttl > 0) {
Alessio Netti's avatar
Alessio Netti committed
738
        mySensorDataStore->setTTL(cassandraSettings.ttl);
739
740
        myCaliEvtDataStore->setTTL(cassandraSettings.ttl);
      }
Alessio Netti's avatar
Alessio Netti committed
741
      mySensorDataStore->setDebugLog(cassandraSettings.debugLog);
742
      myCaliEvtDataStore->setDebugLog(cassandraSettings.debugLog);
743

744
745
      // Fetching public sensor information from the Cassandra datastore
      list<DCDB::PublicSensor> publicSensors;
746
747
748
749
750
      if(mySensorConfig->getPublicSensorsVerbose(publicSensors)!=SC_OK) {
	  LOG(error) << "Failed to retrieve public sensors!";
	  exit(EXIT_FAILURE);
      }
      
751
      metadataStore = new MetadataStore();
752
      SensorMetadata sBuf;
753
      for (const auto &s : publicSensors)
754
          if (!s.is_virtual) {
755
              sBuf = DCDB::PublicSensor::publicSensorToMetadata(s);
756
757
              if(sBuf.isValid())
                  metadataStore->store(*sBuf.getPattern(), sBuf);
758
          }
759
      publicSensors.clear();
760
761
762
763
764

      boost::asio::io_context io;
      boost::thread_group     threads;

      analyticsController = new AnalyticsController(mySensorConfig, mySensorDataStore, io);
765
      analyticsController->setCache(&mySensorCache);
766
      analyticsController->setMetadataStore(metadataStore);
Alessio Netti's avatar
Alessio Netti committed
767
      queryEngine.setFilter(analyticsSettings.filter);
Alessio Netti's avatar
Alessio Netti committed
768
      queryEngine.setJobFilter(analyticsSettings.jobFilter);
769
      queryEngine.setJobMatch(analyticsSettings.jobMatch);
770
      queryEngine.setJobIDFilter(analyticsSettings.jobIdFilter);
771
      queryEngine.setJobDomainId(analyticsSettings.jobDomainId);
Alessio Netti's avatar
Alessio Netti committed
772
      queryEngine.setSensorHierarchy(analyticsSettings.hierarchy);
773
      queryEngine.setQueryCallback(sensorQueryCallback);
774
      queryEngine.setGroupQueryCallback(sensorGroupQueryCallback);
775
      queryEngine.setMetadataQueryCallback(metadataQueryCallback);
Alessio Netti's avatar
Alessio Netti committed
776
      queryEngine.setJobQueryCallback(jobQueryCallback);
777
      if(!analyticsController->initialize(settings))
778
779
          return EXIT_FAILURE;
      
Alessio Netti's avatar
Alessio Netti committed
780
      LOG_LEVEL vLogLevel = settings.validateConfig ? LOG_LEVEL::info : LOG_LEVEL::debug;
781
782
783
784
      LOG_VAR(vLogLevel) << "-----  Configuration  -----";

      //print global settings in either case
      LOG(info) << "Global Settings:";
Alessio Netti's avatar
Alessio Netti committed
785
      LOG(info) << "    MQTT-listenAddress: " << settings.mqttListenHost << ":" << settings.mqttListenPort;
Alessio Netti's avatar
Alessio Netti committed
786
      LOG(info) << "    CacheInterval:      " << int(pluginSettings.cacheInterval/1000) << " [s]";
787
      LOG(info) << "    CleaningInterval:   " << settings.cleaningInterval << " [s]";
788
      LOG(info) << "    Threads:            " << settings.threads;
789
790
791
      LOG(info) << "    MessageThreads:     " << settings.messageThreads;
      LOG(info) << "    MessageSlots:       " << settings.messageSlots;
      LOG(info) << "    Daemonize:          " << (settings.daemonize ? "Enabled" : "Disabled");
792
      LOG(info) << "    StatisticsInterval: " << settings.statisticsInterval << " [s]";
793
      LOG(info) << "    StatisticsMqttPart: " << settings.statisticsMqttPart;
Alessio Netti's avatar
Alessio Netti committed
794
      LOG(info) << "    MQTT-prefix:        " << pluginSettings.mqttPrefix;
795
      LOG(info) << "    Auto-publish:       " << (pluginSettings.autoPublish ? "Enabled" : "Disabled");
Alessio Netti's avatar
Alessio Netti committed
796
797
      LOG(info) << "    Write-Dir:          " << pluginSettings.tempdir;
      LOG(info) << (settings.validateConfig ? "    Only validating config files." : "    ValidateConfig:     Disabled");
798

Alessio Netti's avatar
Alessio Netti committed
799
800
      LOG(info) << "Analytics Settings:";
      LOG(info) << "    Hierarchy:          " << (analyticsSettings.hierarchy!="" ? analyticsSettings.hierarchy : "none");
801
      LOG(info) << "    Filter:             " << (analyticsSettings.filter!="" ? analyticsSettings.filter : "none");
802
803
      LOG(info) << "    Job Filter:         " << (analyticsSettings.jobFilter != "" ? analyticsSettings.jobFilter : "none");
      LOG(info) << "    Job Match:          " << (analyticsSettings.jobMatch != "" ? analyticsSettings.jobMatch : "none");
804
      LOG(info) << "    Job ID Filter:      " << (analyticsSettings.jobIdFilter != "" ? analyticsSettings.jobIdFilter : "none");
805
      LOG(info) << "    Job Domain ID:      " << analyticsSettings.jobDomainId;
Alessio Netti's avatar
Alessio Netti committed
806
      
807
      LOG(info) << "Cassandra Driver Settings:";
Alessio Netti's avatar
Alessio Netti committed
808
      LOG(info) << "    Address:            " << cassandraSettings.host << ":" << cassandraSettings.port;
Alessio Netti's avatar
Alessio Netti committed
809
810
811
812
813
      LOG(info) << "    TTL:                " << cassandraSettings.ttl;
      LOG(info) << "    NumThreadsIO:       " << cassandraSettings.numThreadsIo;
      LOG(info) << "    QueueSizeIO:        " << cassandraSettings.queueSizeIo;
      LOG(info) << "    CoreConnPerHost:    " << cassandraSettings.coreConnPerHost;
      LOG(info) << "    DebugLog:           " << (cassandraSettings.debugLog ? "Enabled" : "Disabled");
814
#ifdef SimpleMQTTVerbose
Alessio Netti's avatar
Alessio Netti committed
815
816
      LOG(info) << "    Username:           " << cassandraSettings.username;
	  LOG(info) << "    Password:           " << cassandraSettings.password;
817
818
819
820
#else
      LOG(info) << "    Username and password not printed.";
#endif

821
822
823
824
825
      if (restAPISettings.enabled) {
	  
	  LOG(info) << "RestAPI Settings:";
	  LOG(info) << "    REST Server: " << restAPISettings.host << ":" << restAPISettings.port;
	  LOG(info) << "    Certificate: " << restAPISettings.certificate;
Alessio Netti's avatar
Alessio Netti committed
826
	  LOG(info) << "    Private key file: " << restAPISettings.privateKey;
Michael Ott's avatar
Michael Ott committed
827
	  
828
	  if (config.influxSettings.measurements.size() > 0) {
Michael Ott's avatar
Michael Ott committed
829
	      LOG(info) << "InfluxDB Settings:";
830
	      LOG(info) << "    MQTT-Prefix:  " << config.influxSettings.mqttPrefix;
Michael Ott's avatar
Michael Ott committed
831
832
	      LOG(info) << "    Auto-Publish: " << (config.influxSettings.publish ? "Enabled" : "Disabled");

833
	      for (auto &m: config.influxSettings.measurements) {
Michael Ott's avatar
Michael Ott committed
834
		  LOG(info) << "    Measurement: " << m.first;
835
		  LOG(info) << "        MQTT-Part:   " << m.second.mqttPart;
Michael Ott's avatar
Michael Ott committed
836
		  LOG(info) << "        Tag:         " << m.second.tag;
Michael Ott's avatar
Michael Ott committed
837
838
839
840
841
842
		  if ((m.second.tagRegex.size() > 0) && (m.second.tagSubstitution.size() > 0)) {
		      if (m.second.tagSubstitution != "&") {
			  LOG(info) << "        TagFilter:   s/" << m.second.tagRegex.str() << "/" <<  m.second.tagSubstitution << "/";
		      } else {
			  LOG(info) << "    TagFilter:   " << m.second.tagRegex.str();
		      }
Michael Ott's avatar
Michael Ott committed
843
844
845
846
847
848
		  }
		  if (m.second.fields.size() > 0) {
		      stringstream ss;
		      copy(m.second.fields.begin(), m.second.fields.end(), ostream_iterator<std::string>(ss, ","));
		      string fields = ss.str();
		      fields.pop_back();
Michael Ott's avatar
Michael Ott committed
849
		      LOG(info) << "        Fields:      " << fields;
Michael Ott's avatar
Michael Ott committed
850
851
852
		  }
	      }
	  }
853
      }
854
855
      LOG_VAR(vLogLevel) << "-----  Analytics Configuration  -----";
      for(auto& p : analyticsController->getManager()->getPlugins()) {
856
          LOG_VAR(vLogLevel) << "Operator Plugin \"" << p.id << "\"";
857
858
859
860
          p.configurator->printConfig(vLogLevel);
      }
      LOG_VAR(vLogLevel) << "-----  End Configuration  -----";

Alessio Netti's avatar
Alessio Netti committed
861
      if (settings.validateConfig)
862
          return EXIT_SUCCESS;
863
864
865
866
867
868
869
870
871
872
873
874
875
876
      
      LOG(info) << "Creating threads...";
      // Dummy to keep io service alive even if no tasks remain (e.g. because all sensors have been stopped over REST API)
      // Inherited from DCDB Pusher
      keepAliveWork = boost::make_shared<boost::asio::io_context::work>(io);
      // Create pool of threads which handle the sensors
      for(size_t i = 0; i < settings.threads; i++) {
	  threads.create_thread(bind(static_cast< size_t (boost::asio::io_context::*) () >(&boost::asio::io_context::run), &io));
      }
      LOG(info) << "Threads created!";
      
      analyticsController->start();
      LOG(info) << "AnalyticsController running...";
      
877
878
879
      /*
       * Start the MQTT Message Server.
       */
Alessio Netti's avatar
Alessio Netti committed
880
      SimpleMQTTServer ms(settings.mqttListenHost, settings.mqttListenPort, settings.messageThreads, settings.messageSlots);
881
      
882
883
884
      ms.setMessageCallback(mqttCallback);
      ms.start();

Alessio Netti's avatar
Logging    
Alessio Netti committed
885
      LOG(info) << "MQTT Server running...";
886
      
887
888
889
      /*
       * Start the HTTP Server for the REST API
       */
890
      if (restAPISettings.enabled) {
891
          httpsServer = new CARestAPI(restAPISettings, &config.influxSettings, &mySensorCache, mySensorDataStore, mySensorConfig, analyticsController, &ms, io);
892
893
894
          config.readRestAPIUsers(httpsServer);
          httpsServer->start();
          LOG(info) <<  "HTTP Server running...";
895
      }
896

897
898
899
      /*
       * Run (hopefully) forever...
       */
900
      newAutoPub = false;
901
      keepRunning = 1;
902
      uint64_t start, end;
903
      float elapsed;
904
905
      msgCtr = 0;
      readingCtr = 0;
906
907
      dbQueryCtr = 0;
      cachedQueryCtr = 0;
908

909
910
      start = getTimestamp();
      uint64_t lastCleanup = start;
911
      uint64_t sleepInterval = (settings.statisticsInterval > 0) ? settings.statisticsInterval : 60;
Alessio Netti's avatar
Alessio Netti committed
912

913
      LOG(info) << "Collect Agent running...";
914
      while(keepRunning) {
915
916
917
918
919
920
921
922
923
924
925
926
	  start = getTimestamp();
	  if(NS_TO_S(start) - NS_TO_S(lastCleanup) > settings.cleaningInterval) {
	      uint64_t purged = mySensorCache.clean(S_TO_NS(settings.cleaningInterval));
	      lastCleanup = start;
	      if(purged > 0)
		  LOG(info) << "Cache: purged " << purged << " obsolete entries";
	  }
	  if(newAutoPub) {
	      newAutoPub = false;
	      mySensorConfig->setPublishedSensorsWritetime(getTimestamp());
	  }
	  
927
	  sleep(sleepInterval);
928
	  
929
930
	  if((settings.statisticsInterval > 0) && keepRunning) {
	      /* not really thread safe but will do the job */
931
	      end = getTimestamp();
932
933
	      elapsed = (float)(NS_TO_S(end) - NS_TO_S(start));
	      float aIns = ceil(((float)analyticsController->getReadingCtr()) / elapsed);
934
935
	      float cacheReq = ceil(((float)cachedQueryCtr) / elapsed);
	      float dbReq = ceil(((float)dbQueryCtr) / elapsed);
936
937
938
	      float rIns = restAPISettings.enabled ? ceil(((float)httpsServer->getInfluxCounter()) / elapsed) : 0.0f;
	      float mIns = ceil(((float)readingCtr) / elapsed);
	      float mMsg = ceil(((float) msgCtr) / elapsed);
939
	      LOG(info) << "Performance: MQTT [" << std::fixed << std::setprecision(0) << mIns << " ins/s|" << mMsg << " msg/s]   REST [" << rIns << " ins/s]   Analytics [" << aIns << " ins/s]   Cache [" << cacheReq << " req/s]   DB [" << dbReq << " req/s]";
940
941
942
943
944
945
946
947
	      std::map<std::string, hostInfo_t> lastSeen = ms.collectLastSeen();
	      uint64_t connectedHosts = 0;
	      for (auto h: lastSeen) {
		  if (h.second.lastSeen >= end - S_TO_NS(settings.statisticsInterval)) {
		      connectedHosts++;
		  }
	      }
	      LOG(info) << "Connected hosts: " << connectedHosts;
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
	      
	      if (settings.statisticsMqttPart.size() > 0) {
		  std::string statisticsMqttTopic = pluginSettings.mqttPrefix + settings.statisticsMqttPart;
		  std::list<SensorDataStoreReading> stats;
		  stats.push_back(SensorDataStoreReading(SensorId(statisticsMqttTopic+"/msgs"), end, msgCtr));
		  stats.push_back(SensorDataStoreReading(SensorId(statisticsMqttTopic+"/cachedQueries"), end, cachedQueryCtr));
		  stats.push_back(SensorDataStoreReading(SensorId(statisticsMqttTopic+"/dbQueries"), end, dbQueryCtr));
		  stats.push_back(SensorDataStoreReading(SensorId(statisticsMqttTopic+"/readings"), end, readingCtr));
		  stats.push_back(SensorDataStoreReading(SensorId(statisticsMqttTopic+"/hosts"), end, connectedHosts));
		  for (auto s: