collectagent.cpp 36 KB
Newer Older
1
2
//================================================================================
// Name        : collectagent.cpp
3
// Author      : Axel Auweter
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
// Copyright   : Leibniz Supercomputing Centre
6
// Description : Main code of the CollectAgent
7
8
9
10
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
11
// Copyright (C) 2011-2019 Leibniz Supercomputing Centre
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
27

28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
 * @defgroup ca Collect Agent
 *
 * @brief MQTT message broker in between pusher and storage backend.
 *
 * @details Collect Agent is a intermediary between one or multiple Pusher
 *          instances and one storage backend. It runs a reduced custom MQTT
 *          message server. Collect Agent receives data from Pusher
 *          via MQTT messages and stores them in the storage via libdcdb.
 */

/**
 * @file collectagent.cpp
 *
 * @brief Main function for the DCDB Collect Agent.
 *
 * @ingroup ca
 */

47
#include <cstdlib>
48
#include <signal.h>
49
50
#include <unistd.h>
#include <string>
51

52
#include <boost/date_time/posix_time/posix_time.hpp>
Micha Müller's avatar
Micha Müller committed
53
54
55
#include <boost/foreach.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>
56

57
#include <dcdb/connection.h>
58
#include <dcdb/sensordatastore.h>
Alessio Netti's avatar
Alessio Netti committed
59
#include <dcdb/jobdatastore.h>
60
#include <dcdb/calievtdatastore.h>
61
#include <dcdb/sensorconfig.h>
62
#include <dcdb/version.h>
63
#include <dcdb/sensor.h>
64
#include "version.h"
65

66
#include "CARestAPI.h"
67
#include "configuration.h"
68
#include "simplemqttserver.h"
69
#include "messaging.h"
70
#include "abrt.h"
71
#include "dcdbdaemon.h"
72
#include "sensorcache.h"
73
74
#include "analyticscontroller.h"
#include "../analytics/includes/QueryEngine.h"
75

76
77
78
#define __STDC_FORMAT_MACROS
#include <inttypes.h>

79
80
81
82
83
84
85
/**
 * Uncomment and recompile to activate CollectAgent's special benchmark mode.
 * In this mode, all received messages will be discarded and no data is stored
 * in the storage backend.
 */
//#define BENCHMARK_MODE

86
using namespace std;
87
88

int keepRunning;
Alessio Netti's avatar
Alessio Netti committed
89
int retCode = EXIT_SUCCESS;
90
91
uint64_t msgCtr;
uint64_t pmsgCtr;
92
uint64_t readingCtr;
Alessio Netti's avatar
Alessio Netti committed
93
SensorCache mySensorCache;
94
AnalyticsController* analyticsController;
95
DCDB::Connection* dcdbConn;
96
DCDB::SensorDataStore *mySensorDataStore;
Alessio Netti's avatar
Alessio Netti committed
97
DCDB::JobDataStore *myJobDataStore;
98
DCDB::SensorConfig *mySensorConfig;
99
DCDB::CaliEvtDataStore *myCaliEvtDataStore;
100
MetadataStore *metadataStore;
Alessio Netti's avatar
Alessio Netti committed
101
CARestAPI* httpsServer = nullptr;
102
DCDB::SCError err;
103
QueryEngine& queryEngine = QueryEngine::getInstance();
104
logger_t lg;
105

106
bool jobQueryCallback(const string& jobId, const uint64_t startTs, const uint64_t endTs, vector<qeJobData>& buffer, const bool rel, const bool range, const string& domainId) {
Alessio Netti's avatar
Alessio Netti committed
107
108
109
110
111
112
113
114
115
116
    std::list<JobData> tempList;
    JobData   tempData;
    qeJobData tempQeData;
    JDError err;
    if(range) {
        // Getting a list of jobs in the given time range
        uint64_t now = getTimestamp();
        uint64_t startTsInt = rel ? now - startTs : startTs;
        uint64_t endTsInt = rel ? now - endTs : endTs;
        DCDB::TimeStamp start(startTsInt), end(endTsInt);
117
        err = myJobDataStore->getJobsInIntervalRunning(tempList, start, end, domainId);
118
        if(err != JD_OK) return false;
Alessio Netti's avatar
Alessio Netti committed
119
120
    } else {
        // Getting a single job by id
121
        err = myJobDataStore->getJobById(tempData, jobId, domainId);
122
        if(err != JD_OK) return false;
Alessio Netti's avatar
Alessio Netti committed
123
124
125
126
        tempList.push_back(tempData);
    }
    
    for(auto& jd : tempList) {
127
        tempQeData.domainId = jd.domainId;
Alessio Netti's avatar
Alessio Netti committed
128
129
130
131
132
        tempQeData.jobId = jd.jobId;
        tempQeData.userId = jd.userId;
        tempQeData.startTime = jd.startTime.getRaw();
        tempQeData.endTime = jd.endTime.getRaw();
        tempQeData.nodes = jd.nodes;
133
        buffer.push_back(tempQeData);
Alessio Netti's avatar
Alessio Netti committed
134
    }
135
    return true;
Alessio Netti's avatar
Alessio Netti committed
136
137
}

138
bool sensorGroupQueryCallback(const std::vector<string>& names, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>& buffer, const bool rel, const uint64_t tol) {
139
140
141
    // Returning NULL if the query engine is being updated
    if(queryEngine.updating.load()) return false;
    ++queryEngine.access;
142
143
144
145
146
147
148
149
150
151
152
153
154
155
    
    std::list<DCDB::SensorId> topics;
    std::string topic;
    sensorCache_t& sensorMap = mySensorCache.getSensorMap();
    size_t successCtr = 0;
    for(const auto& name : names) {
        // Getting the topic of the queried sensor from the Navigator
        // If not found, we try to use the input name as topic
        try {
            topic = queryEngine.getNavigator()->getNodeTopic(name);
        } catch (const std::domain_error &e) { topic = name; }
        DCDB::SensorId sid;
        // Creating a SID to perform the query
        if (sid.mqttTopicConvert(topic)) {
156
            mySensorCache.wait();
157
            if (sensorMap.count(sid) > 0 && sensorMap[sid].getView(startTs, endTs, buffer, rel, tol)) {
158
159
160
161
                // Data was found, can continue to next SID
                successCtr++;
            } else {
                // This happens only if no data was found in the local cache
162
163
                topics.push_back(sid);
            }
164
            mySensorCache.release();
165
        }
166
    }
167
168
169
170
171
172
173
174
175
176
177
178
179
    // If we are here then some sensors were not found in the cache - we need to fetch data from Cassandra
    if(!topics.empty()) {
        try {
            std::list <DCDB::SensorDataStoreReading> results;
            uint64_t now = getTimestamp();
            //Converting relative timestamps to absolute
            uint64_t startTsInt = rel ? now - startTs : startTs;
            uint64_t endTsInt = rel ? now - endTs : endTs;
            DCDB::TimeStamp start(startTsInt), end(endTsInt);
            uint16_t startWs=start.getWeekstamp(), endWs=end.getWeekstamp();
            // If timestamps are equal we perform a fuzzy query
            if(startTsInt == endTsInt) {
                topics.front().setRsvd(startWs);
180
                mySensorDataStore->fuzzyQuery(results, topics, start, tol, false);
181
182
183
184
185
            }
            // Else, we iterate over the weekstamps (if more than one) and perform range queries
            else {
                for(uint16_t currWs=startWs; currWs<=endWs; currWs++) {
                    topics.front().setRsvd(currWs);
186
                    mySensorDataStore->query(results, topics, start, end, DCDB::AGGREGATE_NONE, false);
187
188
189
190
191
192
193
194
195
196
197
198
                }
            }

            if (!results.empty()) {
                successCtr++;
                reading_t reading;
                for (const auto &r : results) {
                    reading.value = r.value;
                    reading.timestamp = r.timeStamp.getRaw();
                    buffer.push_back(reading);
                }
            }
199
        }
200
        catch (const std::exception &e) {}
201
    }
202
    
203
    --queryEngine.access;
204
205
206
    return successCtr>0;
}

207
bool sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>& buffer, const bool rel, const uint64_t tol) {
208
209
210
211
    // Returning NULL if the query engine is being updated
    if(queryEngine.updating.load()) return false;
    std::vector<std::string> nameWrapper;
    nameWrapper.push_back(name);
212
    return sensorGroupQueryCallback(nameWrapper, startTs, endTs, buffer, rel, tol);
213
214
}

215
216
217
218
219
220
221
222
223
224
225
bool metadataQueryCallback(const string& name, SensorMetadata& buffer) {
    // Returning NULL if the query engine is being updated
    if(queryEngine.updating.load()) return false;
    ++queryEngine.access;
    std::string topic=name;
    // Getting the topic of the queried sensor from the Navigator
    // If not found, we try to use the input name as topic
    try {
        topic = queryEngine.getNavigator()->getNodeTopic(name);
    } catch(const std::domain_error& e) {}
    
226
    bool local = false;
227
228
229
230
    metadataStore->wait();
    if(metadataStore->getMap().count(topic)) {
        buffer = metadataStore->get(topic);
        local = true;
231
    }
232
233
    metadataStore->release();
        
234
    if(!local) {
235
236
237
238
239
240
241
        // If we are here then the sensor was not found in the cache - we need to fetch data from Cassandra
        try {
            DCDB::PublicSensor publicSensor;
            if (mySensorConfig->getPublicSensorByName(publicSensor, topic.c_str()) != SC_OK) {
                --queryEngine.access;
                return false;
            }
242
            buffer = DCDB::PublicSensor::publicSensorToMetadata(publicSensor);
243
244
245
246
247
248
249
250
251
252
        }
        catch (const std::exception &e) {
            --queryEngine.access;
            return false;
        }
    }
    --queryEngine.access;
    return true;
}

253
/* Normal termination (SIGINT, CTRL+C) */
254
255
void sigHandler(int sig)
{
Alessio Netti's avatar
Alessio Netti committed
256
  boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
Alessio Netti's avatar
Alessio Netti committed
257
  if( sig == SIGINT ) {
Alessio Netti's avatar
Alessio Netti committed
258
      LOG(fatal) << "Received SIGINT";
Alessio Netti's avatar
Alessio Netti committed
259
260
      retCode = EXIT_SUCCESS;
  } else if( sig == SIGTERM ) {
Alessio Netti's avatar
Alessio Netti committed
261
      LOG(fatal) << "Received SIGTERM";
Alessio Netti's avatar
Alessio Netti committed
262
263
264
265
266
      retCode = EXIT_SUCCESS;
  } else if( sig == SIGUSR1 ) {
      LOG(fatal) << "Received SIGUSR1 via REST API";
      retCode = !httpsServer ? EXIT_SUCCESS : httpsServer->getReturnCode();
  }
267
268
269
  keepRunning = 0;
}

270
271
272
273
274
275
/* Crash */
void abrtHandler(int sig)
{
  abrt(EXIT_FAILURE, SIGNAL);
}

276
int mqttCallback(SimpleMQTTMessage *msg)
277
278
279
280
{
  /*
   * Increment the msgCtr/vmsgCtr for statistics.
   */
281
  msgCtr++;
282
283
284
  if (msg->isPublish())
    pmsgCtr++;

285
286
#ifndef BENCHMARK_MODE

287
  uint64_t len;
288
289
290
  /*
   * Decode the message and put into the database.
   */
291
  if (msg->isPublish()) {
292
      const char *topic = msg->getTopic().c_str();
293
      // We check whether the topic includes the /DCDB_MAP/ keyword, indicating that the payload will contain the
294
295
      // sensor's name. In that case, we set the mappingMessage flag to true, and filter the keyword out of the prefix
      // We use strncmp as it is the most efficient way to do it
Alessio Netti's avatar
Alessio Netti committed
296
      if (strncmp(topic, DCDB_MAP, DCDB_MAP_LEN) == 0) {
297
          if ((len = msg->getPayloadLength()) == 0) {
298
              LOG(error) << "Empty sensor publish message received!";
299
300
              return 1;
          }
301

302
          string payload((char *) msg->getPayload(), len);
303
304
305
306
307
308
309
310
311
          //If the topic includes the extended /DCDB_MAP/METADATA/ keyword, we assume a JSON metadata packet is encoded
          if(strncmp(topic, DCDB_MET, DCDB_MET_LEN) == 0) {
              SensorMetadata sm;
              try {
                  sm.parseJSON(payload);
              } catch (const std::exception &e) {
                  LOG(error) << "Invalid metadata packed received!";
                  return 1;
              }
312
              if(sm.isValid()) {
313
                  err = mySensorConfig->publishSensor(sm);
314
315
                  metadataStore->store(*sm.getPattern(), sm);
              }
316
317
          } else {
              err = mySensorConfig->publishSensor(payload.c_str(), topic + DCDB_MAP_LEN);
318
          }
319
320
321
322

          // PublishSensor does most of the error checking for us
          switch (err) {
              case DCDB::SC_INVALIDPATTERN:
Alessio Netti's avatar
Logging    
Alessio Netti committed
323
                  LOG(error) << "Invalid sensor topic : " << msg->getTopic();
324
325
                  return 1;
              case DCDB::SC_INVALIDPUBLICNAME:
326
                  LOG(error) << "Invalid sensor public name.";
327
328
                  return 1;
              case DCDB::SC_INVALIDSESSION:
Alessio Netti's avatar
Logging    
Alessio Netti committed
329
                  LOG(error) << "Cannot reach sensor data store.";
330
331
332
333
                  return 1;
              default:
                  break;
          }
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
      } else if (strncmp(topic, DCDB_CALIEVT, DCDB_CALIEVT_LEN) == 0) {
          /*
           * Special message case. This message contains a Caliper Event data
           * string that is encoded in the MQTT topic. Its payload consists of
           * usual timestamp-value pairs.
           * Data from this messages will be stored in a separate table managed
           * by CaliEvtDataStore class.
           */
          std::string topicStr(msg->getTopic());
          mqttPayload buf, *payload;

          len = msg->getPayloadLength();
          //TODO support case that no timestamp is given?
          //retrieve timestamp and value from the payload
          if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
              payload = (mqttPayload *) msg->getPayload();
          } else {
              //this message is malformed -> ignore
              LOG(error) << "Message malformed";
              return 1;
          }

          /*
           * Decode message topic in actual sensor topic and string data.
           * "/:/" is used as delimiter between topic and data.
           */
          topicStr.erase(0, DCDB_CALIEVT_LEN);
          size_t pos = topicStr.find("/:/");
          if (pos == std::string::npos) {
              // topic is malformed -> ignore
              LOG(error) << "CaliEvt topic malformed";
              return 1;
          }
          const std::string data(topicStr, pos+3);
          topicStr.erase(pos);

          /*
           * We use the same MQTT-topic/SensorId infrastructure as actual sensor
           * data readings to sort related events.
           * Check if we can decode the event topic into a valid SensorId. If
           * successful, store the record in the database.
           */
          DCDB::SensorId sid;
          if (sid.mqttTopicConvert(topicStr)) {
378
              //std::list<DCDB::CaliEvtData> events;
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
              DCDB::CaliEvtData e;
              e.eventId   = sid;
              e.event     = data;
              for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
                  e.timeStamp = payload[i].timestamp;

                  /**
                   * We want an exhaustive list of all events ordered by their
                   * time of occurrence. Payload values should always be
                   * one. Other values currently indicate a malformed message.
                   *
                   * In the future, the value field could be used to aggregate
                   * multiple equal events that occurred in the same plugin
                   * read cycle.
                   */
                  if(payload[i].value != 1) {
                      LOG(error) << "CaliEvt message malformed. Value != 1";
                      return 1;
                  }

399
400
                  myCaliEvtDataStore->insert(e, metadataStore->getTTL(topicStr));
                  //events.push_back(e);
401
              }
402
              //myCaliEvtDataStore->insertBatch(events, metadataStore->getTTL(topicStr));
403
404
405
          } else {
              LOG(error) << "Topic could not be converted to SID";
          }
Micha Müller's avatar
Micha Müller committed
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
      } else if (strncmp(topic, DCDB_JOBDATA, DCDB_JOBDATA_LEN) == 0) {
	      /**
           * This message contains Slurm job data in JSON format. We need to
           * parse the payload and store it within the JobDataStore.
           */
	      if ((len = msg->getPayloadLength()) == 0) {
		      LOG(error) << "Empty job data message received!";
		      return 1;
	      }

	      //parse JSON into JobData object
	      string        payload((char *)msg->getPayload(), len);
	      DCDB::JobData jd;
	      try {
		      boost::property_tree::iptree config;
		      std::istringstream           str(payload);
		      boost::property_tree::read_json(str, config);
		      BOOST_FOREACH (boost::property_tree::iptree::value_type &val, config) {
			      if (boost::iequals(val.first, "jobid")) {
				      jd.jobId = val.second.data();
426
427
428
			      } else if (boost::iequals(val.first, "domainid")) {
                      jd.domainId = val.second.data();
                  } else if (boost::iequals(val.first, "userid")) {
Micha Müller's avatar
Micha Müller committed
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
				      jd.userId = val.second.data();
			      } else if (boost::iequals(val.first, "starttime")) {
				      jd.startTime = DCDB::TimeStamp((uint64_t)stoull(val.second.data()));
			      } else if (boost::iequals(val.first, "endtime")) {
				      jd.endTime = DCDB::TimeStamp((uint64_t)stoull(val.second.data()));
			      } else if (boost::iequals(val.first, "nodes")) {
				      BOOST_FOREACH (boost::property_tree::iptree::value_type &node, val.second) {
					      jd.nodes.push_back(node.second.data());
				      }
			      }
		      }
	      } catch (const std::exception &e) {
		      LOG(error) << "Invalid job data packet received!";
		      return 1;
	      }

#ifdef DEBUG
	      LOG(debug) << "JobId  = " << jd.jobId;
	      LOG(debug) << "UserId = " << jd.userId;
	      LOG(debug) << "Start  = " << jd.startTime.getString();
	      LOG(debug) << "End    = " << jd.endTime.getString();
	      LOG(debug) << "Nodes: ";
	      for (const auto &n : jd.nodes) {
		      LOG(debug) << "    " << n;
	      }
#endif

	      //store JobData into Storage Backend
457
458
	      //dcdbslurmjob start inserts the endTime as startTime + max job length + 1, i.e. the last digit will be 1 
	      if ((jd.endTime == DCDB::TimeStamp((uint64_t)0)) || ((jd.endTime.getRaw() & 0x1) == 1)) {
Micha Müller's avatar
Micha Müller committed
459
460
		      //starting job data
		      if (myJobDataStore->insertJob(jd) != DCDB::JD_OK) {
461
			      LOG(error) << "Job data insert for job " << jd.jobId << " failed!";
Micha Müller's avatar
Micha Müller committed
462
463
464
465
466
			      return 1;
		      }
	      } else {
		      //ending job data
		      DCDB::JobData tmp;
467
		      if (myJobDataStore->getJobById(tmp, jd.jobId, jd.domainId) != DCDB::JD_OK) {
468
			      LOG(error) << "Could not retrieve job " << jd.jobId << " to be updated!";
Micha Müller's avatar
Micha Müller committed
469
470
471
			      return 1;
		      }

472
		      if (myJobDataStore->updateEndtime(tmp.jobId, tmp.startTime, jd.endTime, jd.domainId) != DCDB::JD_OK) {
473
			      LOG(error) << "Could not update end time of job " << tmp.jobId << "!";
Micha Müller's avatar
Micha Müller committed
474
475
476
			      return 1;
		      }
	      }
477
      } else {
Micha Müller's avatar
Micha Müller committed
478
	      mqttPayload buf, *payload;
479

Micha Müller's avatar
Micha Müller committed
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
	      len = msg->getPayloadLength();
	      //In the 64 bit message case, the collect agent provides a timestamp
	      if (len == sizeof(uint64_t)) {
		      payload = &buf;
		      payload->value = *((int64_t *)msg->getPayload());
		      payload->timestamp = Messaging::calculateTimestamp();
		      len = sizeof(uint64_t) * 2;
	      }
	      //...otherwise it just retrieves it from the MQTT message payload.
	      else if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
		      payload = (mqttPayload *)msg->getPayload();
	      }
	      //...otherwise this message is malformed -> ignore...
	      else {
		      LOG(error) << "Message malformed";
		      return 1;
	      }
497

Micha Müller's avatar
Micha Müller committed
498
	      /*
499
500
501
502
           * Check if we can decode the message topic
           * into a valid SensorId. If successful, store
           * the record in the database.
           */
Micha Müller's avatar
Micha Müller committed
503
504
	      DCDB::SensorId sid;
	      if (sid.mqttTopicConvert(msg->getTopic())) {
505
506
507
508
509
510
511
512
513
514
515
516
#if 0
              cout << "Topic decode successful:" << endl
                  << "  Raw:            " << hex << setw(16) << setfill('0') << sid.getRaw()[0] << hex << setw(16) << setfill('0') << sid.getRaw()[1] << endl
                  << "  DeviceLocation: " << hex << setw(16) << setfill('0') << sid.getDeviceLocation() << endl
                  << "  device_id:      " << hex << setw(8) << setfill('0') << sid.getDeviceSensorId().device_id << endl
                  << "  sensor_number:  " << hex << setw(4) << setfill('0') << sid.getDeviceSensorId().sensor_number << endl << dec;

              cout << "Payload ("  << len/sizeof(mqttPayload) << " messages):"<< endl;
              for (uint64_t i=0; i<len/sizeof(mqttPayload); i++) {
                cout << "  " << i << ": ts=" << payload[i].timestamp << " val=" << payload[i].value << endl;
              }
              cout << endl;
517
#endif
518
	      std::list<DCDB::SensorDataStoreReading> readings;
Alessio Netti's avatar
Alessio Netti committed
519
520
521
522
523
524
          for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
              DCDB::SensorDataStoreReading r(sid, payload[i].timestamp, payload[i].value);
              readings.push_back(r);
              mySensorCache.storeSensor(sid, payload[i].timestamp, payload[i].value);
          }
          mySensorCache.getSensorMap()[sid].updateBatchSize(uint64_t(len / sizeof(mqttPayload)));
525
	      mySensorDataStore->insertBatch(readings, metadataStore->getTTL(msg->getTopic()));
526
	      readingCtr+= readings.size();
527

528
              //mySensorCache.dump();
529
530
          } else {
              LOG(error) << "Message with empty topic received";
531
          }
532
      }
533
  }
534
#endif
535
  return 0;
536
537
}

538
539
540



541
542
543
544
/*
 * Print usage information
 */
void usage() {
545
546
547
548
549
  /*
             1         2         3         4         5         6         7         8
   012345678901234567890123456789012345678901234567890123456789012345678901234567890
   */
  cout << "Usage:" << endl;
550
  cout << "  collectagent [-d] [-s] [-x] [-a] [-m<host>] [-c<host>] [-u<username>] [-p<password>] [-t<ttl>] [-v<verbosity>] <config>" << endl;
551
552
553
554
  cout << "  collectagent -h" << endl;
  cout << endl;
  
  cout << "Options:" << endl;
555
556
  cout << "  -m<host>      MQTT listen address     [default: " << DEFAULT_LISTENHOST << ":" << DEFAULT_LISTENPORT << "]" << endl;
  cout << "  -c<host>      Cassandra host          [default: " << DEFAULT_CASSANDRAHOST << ":" << DEFAULT_CASSANDRAPORT << "]" << endl;
Michael Ott's avatar
Michael Ott committed
557
558
  cout << "  -u<username>  Cassandra username      [default: none]" << endl;
  cout << "  -p<password>  Cassandra password      [default: none]" << endl;
559
560
  cout << "  -t<ttl>       Cassandra insert TTL    [default: " << DEFAULT_CASSANDRATTL << "]" << endl;
  cout << "  -v<level>     Set verbosity of output [default: " << DEFAULT_LOGLEVEL << "]" << endl
Alessio Netti's avatar
Logging    
Alessio Netti committed
561
       << "                Can be a number between 5 (all) and 0 (fatal)." << endl;
562
  cout << endl;
Michael Ott's avatar
Michael Ott committed
563
  cout << "  -d            Daemonize" << endl;
564
  cout << "  -s            Print message stats" <<endl;
565
  cout << "  -x            Parse and print the config but do not actually start collectagent" << endl;
566
  cout << "  -a			   Enable sensor auto-publish" << endl;
Michael Ott's avatar
Michael Ott committed
567
  cout << "  -h            This help page" << endl;
568
  cout << endl;
569
570
571
}

int main(int argc, char* const argv[]) {
572
    cout << "CollectAgent " << VERSION << " (libdcdb " << DCDB::Version::getVersion() << ")" << endl << endl;
573
574

  try{
575

576
577
      // Checking if path to config is supplied
      if (argc <= 1) {
578
          cout << "Please specify a path to the config-directory or a config-file" << endl << endl;
579
580
581
582
583
          usage();
          exit(EXIT_FAILURE);
      }

      // Defining options
584
      const char* opts = "m:r:c:C:u:p:t:v:dDsaxh";
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600

      // Same mechanism as in DCDBPusher - checking if help string is requested before loading config
      char ret;
      while ((ret = getopt(argc, argv, opts)) != -1) {
          switch (ret)
          {
              case 'h':
                  usage();
                  exit(EXIT_FAILURE);
                  break;
              default:
                  //do nothing (other options are read later on)
                  break;
          }
      }

601
602
      initLogging();
      auto cmdSink = setupCmdLogger();
Alessio Netti's avatar
Logging    
Alessio Netti committed
603

Alessio Netti's avatar
Alessio Netti committed
604
      Configuration config(argv[argc - 1], "collectagent.conf");
605
      config.readConfig();
606

Alessio Netti's avatar
Alessio Netti committed
607
608
609
610
      // References to shorten access to config parameters
      Configuration& settings = config;
      cassandra_t& cassandraSettings = config.cassandraSettings;
      pluginSettings_t& pluginSettings = config.pluginSettings;
Micha Mueller's avatar
Micha Mueller committed
611
      serverSettings_t& restAPISettings = config.restAPISettings;
Alessio Netti's avatar
Alessio Netti committed
612
      analyticsSettings_t& analyticsSettings = config.analyticsSettings;
Alessio Netti's avatar
Alessio Netti committed
613
      
614
615
      optind = 1;
      while ((ret=getopt(argc, argv, opts))!=-1) {
616
          switch(ret) {
617
              case 'a':
618
                  pluginSettings.autoPublish = true;
619
                  break;
620
              case 'm':
Alessio Netti's avatar
Alessio Netti committed
621
622
                  settings.mqttListenHost = parseNetworkHost(optarg);
                  settings.mqttListenPort = parseNetworkPort(optarg);
623
                  if(settings.mqttListenPort=="") settings.mqttListenPort = string(DEFAULT_LISTENPORT);
624
                  break;
625
              case 'c':
Alessio Netti's avatar
Alessio Netti committed
626
627
                  cassandraSettings.host = parseNetworkHost(optarg);
                  cassandraSettings.port = parseNetworkPort(optarg);
628
                  if(cassandraSettings.port=="") cassandraSettings.port = string(DEFAULT_CASSANDRAPORT);
629
                  break;
Michael Ott's avatar
Michael Ott committed
630
              case 'u':
Alessio Netti's avatar
Alessio Netti committed
631
                  cassandraSettings.username = optarg;
632
                  break;
Michael Ott's avatar
Michael Ott committed
633
              case 'p': {
Alessio Netti's avatar
Alessio Netti committed
634
                  cassandraSettings.password = optarg;
635
                  // What does this do? Mask the password?
Michael Ott's avatar
Michael Ott committed
636
637
638
639
640
641
642
                  size_t pwdLen = strlen(optarg);
                  memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
                  if (pwdLen > 3) {
                      memset(optarg+3, 0, pwdLen-3);
                  }
                  break;
              }
643
              case 't':
Alessio Netti's avatar
Alessio Netti committed
644
                  cassandraSettings.ttl = stoul(optarg);
645
                  break;
Alessio Netti's avatar
Logging    
Alessio Netti committed
646
              case 'v':
647
                  settings.logLevelCmd = stoi(optarg);
Alessio Netti's avatar
Logging    
Alessio Netti committed
648
                  break;
649
              case 'd':
650
              case 'D':
651
                  settings.daemonize = 1;
652
                  break;
653
              case 's':
654
                  settings.statisticsInterval = 1;
655
                  break;
656
              case 'x':
Alessio Netti's avatar
Alessio Netti committed
657
                  settings.validateConfig = true;
658
                  break;
659
              case 'h':
660
661
662
              default:
                  usage();
                  exit(EXIT_FAILURE);
663
664
665
          }
      }

666
667
      //set up logger to file
      if (settings.logLevelFile >= 0) {
Alessio Netti's avatar
Alessio Netti committed
668
	  auto fileSink = setupFileLogger(pluginSettings.tempdir, std::string("collectagent"));
669
670
671
	  fileSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelFile));
      }
      
Alessio Netti's avatar
Logging    
Alessio Netti committed
672
      //severity level may be overwritten (per option or config-file) --> set it according to globalSettings
673
674
675
      if (settings.logLevelCmd >= 0) {
	  cmdSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelCmd));
      }
Alessio Netti's avatar
Logging    
Alessio Netti committed
676

677
      /*
Alessio Netti's avatar
Alessio Netti committed
678
       * Catch SIGINT and SIGTERM signals to allow for proper server shutdowns.
679
680
       */
      signal(SIGINT, sigHandler);
Alessio Netti's avatar
Alessio Netti committed
681
      signal(SIGTERM, sigHandler);
Alessio Netti's avatar
Alessio Netti committed
682
      signal(SIGUSR1, sigHandler);
683
684
685
686
687
688
689
690
691
692

      /*
       * Catch critical signals to allow for backtraces
       */
      signal(SIGABRT, abrtHandler);
      signal(SIGSEGV, abrtHandler);

      // Daemonizing the collectagent
      if(settings.daemonize)
          dcdbdaemon();
Alessio Netti's avatar
Alessio Netti committed
693
      
694
      // Setting the size of the sensor cache
695
      // Conversion from milliseconds to nanoseconds
Alessio Netti's avatar
Alessio Netti committed
696
      mySensorCache.setMaxHistory(uint64_t(pluginSettings.cacheInterval) * 1000000);
697

698
      //Allocate and initialize connection to Cassandra.
Alessio Netti's avatar
Alessio Netti committed
699
700
      dcdbConn = new DCDB::Connection(cassandraSettings.host, atoi(cassandraSettings.port.c_str()), 
                                      cassandraSettings.username, cassandraSettings.password);
Alessio Netti's avatar
Alessio Netti committed
701
702
      dcdbConn->setNumThreadsIo(cassandraSettings.numThreadsIo);
      dcdbConn->setQueueSizeIo(cassandraSettings.queueSizeIo);
703
      uint32_t params[1] = {cassandraSettings.coreConnPerHost};
Alessio Netti's avatar
Alessio Netti committed
704
      dcdbConn->setBackendParams(params);
Alessio Netti's avatar
Alessio Netti committed
705
      
Axel Auweter's avatar
Axel Auweter committed
706
      if (!dcdbConn->connect()) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
707
          LOG(fatal) << "Cannot connect to Cassandra!";
708
709
710
711
712
713
          exit(EXIT_FAILURE);
      }

      /*
       * Legacy behavior: Initialize the DCDB schema in Cassandra.
       */
Axel Auweter's avatar
Axel Auweter committed
714
      dcdbConn->initSchema();
Alessio Netti's avatar
Alessio Netti committed
715
      
716
717
718
      /*
       * Allocate the SensorDataStore.
       */
719
      mySensorDataStore = new DCDB::SensorDataStore(dcdbConn);
720
      mySensorConfig = new DCDB::SensorConfig(dcdbConn);
Alessio Netti's avatar
Alessio Netti committed
721
      myJobDataStore = new DCDB::JobDataStore(dcdbConn);
722
      myCaliEvtDataStore = new DCDB::CaliEvtDataStore(dcdbConn);
723
724
725

      /*
       * Set TTL for data store inserts if TTL > 0.
726
       */
727
      if (cassandraSettings.ttl > 0) {
Alessio Netti's avatar
Alessio Netti committed
728
        mySensorDataStore->setTTL(cassandraSettings.ttl);
729
730
        myCaliEvtDataStore->setTTL(cassandraSettings.ttl);
      }
Alessio Netti's avatar
Alessio Netti committed
731
      mySensorDataStore->setDebugLog(cassandraSettings.debugLog);
732
      myCaliEvtDataStore->setDebugLog(cassandraSettings.debugLog);
733

734
735
736
      // Fetching public sensor information from the Cassandra datastore
      list<DCDB::PublicSensor> publicSensors;
      metadataStore = new MetadataStore();
737
738
      if(mySensorConfig->getPublicSensorsVerbose(publicSensors)!=SC_OK)
          LOG(error) << "Failed to retrieve public sensors. Metadata Store and Sensor Navigator will be empty.";
739
      SensorMetadata sBuf;
740
      for (const auto &s : publicSensors)
741
          if (!s.is_virtual) {
742
              sBuf = DCDB::PublicSensor::publicSensorToMetadata(s);
743
744
              if(sBuf.isValid())
                  metadataStore->store(*sBuf.getPattern(), sBuf);
745
          }
746
      publicSensors.clear();
747
          
748
749
      analyticsController = new AnalyticsController(mySensorConfig, mySensorDataStore);
      analyticsController->setCache(&mySensorCache);
750
      analyticsController->setMetadataStore(metadataStore);
Alessio Netti's avatar
Alessio Netti committed
751
      queryEngine.setFilter(analyticsSettings.filter);
Alessio Netti's avatar
Alessio Netti committed
752
      queryEngine.setJobFilter(analyticsSettings.jobFilter);
753
      queryEngine.setJobMatch(analyticsSettings.jobMatch);
754
      queryEngine.setJobIDFilter(analyticsSettings.jobIdFilter);
755
      queryEngine.setJobDomainId(analyticsSettings.jobDomainId);
Alessio Netti's avatar
Alessio Netti committed
756
      queryEngine.setSensorHierarchy(analyticsSettings.hierarchy);
757
      queryEngine.setQueryCallback(sensorQueryCallback);
758
      queryEngine.setGroupQueryCallback(sensorGroupQueryCallback);
759
      queryEngine.setMetadataQueryCallback(metadataQueryCallback);
Alessio Netti's avatar
Alessio Netti committed
760
      queryEngine.setJobQueryCallback(jobQueryCallback);
761
      if(!analyticsController->initialize(settings))
762
763
          return EXIT_FAILURE;
      
Alessio Netti's avatar
Alessio Netti committed
764
      LOG_LEVEL vLogLevel = settings.validateConfig ? LOG_LEVEL::info : LOG_LEVEL::debug;
765
766
767
768
      LOG_VAR(vLogLevel) << "-----  Configuration  -----";

      //print global settings in either case
      LOG(info) << "Global Settings:";
Alessio Netti's avatar
Alessio Netti committed
769
      LOG(info) << "    MQTT-listenAddress: " << settings.mqttListenHost << ":" << settings.mqttListenPort;
Alessio Netti's avatar
Alessio Netti committed
770
      LOG(info) << "    CacheInterval:      " << int(pluginSettings.cacheInterval/1000) << " [s]";
771
772
773
774
      LOG(info) << "    CleaningInterval:   " << settings.cleaningInterval << " [s]";
      LOG(info) << "    MessageThreads:     " << settings.messageThreads;
      LOG(info) << "    MessageSlots:       " << settings.messageSlots;
      LOG(info) << "    Daemonize:          " << (settings.daemonize ? "Enabled" : "Disabled");
775
      LOG(info) << "    StatisticsInterval: " << settings.statisticsInterval << " [s]";
Alessio Netti's avatar
Alessio Netti committed
776
      LOG(info) << "    MQTT-prefix:        " << pluginSettings.mqttPrefix;
777
      LOG(info) << "    Auto-publish:       " << (pluginSettings.autoPublish ? "Enabled" : "Disabled");
Alessio Netti's avatar
Alessio Netti committed
778
779
      LOG(info) << "    Write-Dir:          " << pluginSettings.tempdir;
      LOG(info) << (settings.validateConfig ? "    Only validating config files." : "    ValidateConfig:     Disabled");
780

Alessio Netti's avatar
Alessio Netti committed
781
782
      LOG(info) << "Analytics Settings:";
      LOG(info) << "    Hierarchy:          " << (analyticsSettings.hierarchy!="" ? analyticsSettings.hierarchy : "none");
783
      LOG(info) << "    Filter:             " << (analyticsSettings.filter!="" ? analyticsSettings.filter : "none");
784
785
      LOG(info) << "    Job Filter:         " << (analyticsSettings.jobFilter != "" ? analyticsSettings.jobFilter : "none");
      LOG(info) << "    Job Match:          " << (analyticsSettings.jobMatch != "" ? analyticsSettings.jobMatch : "none");
786
      LOG(info) << "    Job ID Filter:      " << (analyticsSettings.jobIdFilter != "" ? analyticsSettings.jobIdFilter : "none");
787
      LOG(info) << "    Job Domain ID:      " << analyticsSettings.jobDomainId;
Alessio Netti's avatar
Alessio Netti committed
788
      
789
      LOG(info) << "Cassandra Driver Settings:";
Alessio Netti's avatar
Alessio Netti committed
790
      LOG(info) << "    Address:            " << cassandraSettings.host << ":" << cassandraSettings.port;
Alessio Netti's avatar
Alessio Netti committed
791
792
793
794
795
      LOG(info) << "    TTL:                " << cassandraSettings.ttl;
      LOG(info) << "    NumThreadsIO:       " << cassandraSettings.numThreadsIo;
      LOG(info) << "    QueueSizeIO:        " << cassandraSettings.queueSizeIo;
      LOG(info) << "    CoreConnPerHost:    " << cassandraSettings.coreConnPerHost;
      LOG(info) << "    DebugLog:           " << (cassandraSettings.debugLog ? "Enabled" : "Disabled");
796
#ifdef SimpleMQTTVerbose
Alessio Netti's avatar
Alessio Netti committed
797
798
      LOG(info) << "    Username:           " << cassandraSettings.username;
	  LOG(info) << "    Password:           " << cassandraSettings.password;
799
800
801
802
#else
      LOG(info) << "    Username and password not printed.";
#endif

803
804
805
806
807
      if (restAPISettings.enabled) {
	  
	  LOG(info) << "RestAPI Settings:";
	  LOG(info) << "    REST Server: " << restAPISettings.host << ":" << restAPISettings.port;
	  LOG(info) << "    Certificate: " << restAPISettings.certificate;
Alessio Netti's avatar
Alessio Netti committed
808
	  LOG(info) << "    Private key file: " << restAPISettings.privateKey;
809
      }
810
811
      LOG_VAR(vLogLevel) << "-----  Analytics Configuration  -----";
      for(auto& p : analyticsController->getManager()->getPlugins()) {
812
          LOG_VAR(vLogLevel) << "Operator Plugin \"" << p.id << "\"";
813
814
815
816
          p.configurator->printConfig(vLogLevel);
      }
      LOG_VAR(vLogLevel) << "-----  End Configuration  -----";

Alessio Netti's avatar
Alessio Netti committed
817
      if (settings.validateConfig)
818
819
820
821
          return EXIT_SUCCESS;
      else
          analyticsController->start();

822
823
824
      /*
       * Start the MQTT Message Server.
       */
Alessio Netti's avatar
Alessio Netti committed
825
      SimpleMQTTServer ms(settings.mqttListenHost, settings.mqttListenPort, settings.messageThreads, settings.messageSlots);
826
      
827
828
829
      ms.setMessageCallback(mqttCallback);
      ms.start();

Alessio Netti's avatar
Logging    
Alessio Netti committed
830
      LOG(info) << "MQTT Server running...";
831
      
832
833
834
      /*
       * Start the HTTP Server for the REST API
       */
835
      if (restAPISettings.enabled) {
836
          httpsServer = new CARestAPI(restAPISettings, config.influxMap, &mySensorCache, mySensorDataStore, analyticsController, &ms);
837
838
839
          config.readRestAPIUsers(httpsServer);
          httpsServer->start();
          LOG(info) <<  "HTTP Server running...";
840
      }
841

842
843
844
845
      /*
       * Run (hopefully) forever...
       */
      keepRunning = 1;
846
      uint64_t start, end;
847
      double elapsed;
848
849
850
      msgCtr = 0;
      pmsgCtr = 0;
      readingCtr = 0;
851

852
853
      start = getTimestamp();
      uint64_t lastCleanup = start;
854
      uint64_t sleepInterval = (settings.statisticsInterval > 0) ? settings.statisticsInterval : 60;
Alessio Netti's avatar
Alessio Netti committed
855

856
      LOG(info) << "Collect Agent running...";
857
      while(keepRunning) {
858
          start = getTimestamp();
859
          if(NS_TO_S(start) - NS_TO_S(lastCleanup) > settings.cleaningInterval) {
860
861
              uint64_t purged = mySensorCache.clean(S_TO_NS(settings.cleaningInterval));
              lastCleanup = start;
Alessio Netti's avatar
Alessio Netti committed
862
863
864
865
              if(purged > 0)
                  LOG(info) << "Cache: purged " << purged << " obsolete entries";
          }

866
867
868
869
	  sleep(sleepInterval);

	  if((settings.statisticsInterval > 0) && keepRunning) {
	      /* not really thread safe but will do the job */
870
871
	      end = getTimestamp();
	      elapsed = NS_TO_S(((double) end - (double) start));
872
	      float publish = msgCtr?(pmsgCtr*100.0)/msgCtr:0;
873
874
	      LOG(info) << "Performance: " << (readingCtr/elapsed) << " inserts/s, " << (msgCtr/elapsed) << " messages/s (" << publish << "% PUBLISH)";
	      LOG(info) << "Analytics Performance: " << (analyticsController->getReadingCtr()/elapsed) << " inserts/s ";
875
876
877
878
879
880
881
882
	      std::map<std::string, hostInfo_t> lastSeen = ms.collectLastSeen();
	      uint64_t connectedHosts = 0;
	      for (auto h: lastSeen) {
		  if (h.second.lastSeen >= end - S_TO_NS(settings.statisticsInterval)) {
		      connectedHosts++;
		  }
	      }
	      LOG(info) << "Connected hosts: " << connectedHosts;
883
884
885
886
	      msgCtr = 0;
	      pmsgCtr = 0;
	      readingCtr = 0;
	  }
887
888
      }

Alessio Netti's avatar
Logging    
Alessio Netti committed
889
      LOG(info) << "Stopping...";
890
      analyticsController->stop();
891
      ms.stop();
Alessio Netti's avatar
Logging    
Alessio Netti committed
892
      LOG(info) << "MQTT Server stopped...";
893
894
895
896
897
      if (restAPISettings.enabled) {
	  httpsServer->stop();
	  delete httpsServer;
	  LOG(info) << "HTTP Server stopped...";
      }
898
      delete mySensorDataStore;
Alessio Netti's avatar
Alessio Netti committed
899
      delete myJobDataStore;
900
      delete mySensorConfig;
901
      delete myCaliEvtDataStore;
Axel Auweter's avatar
Axel Auweter committed
902
903
      dcdbConn->disconnect();
      delete dcdbConn;
904
      delete metadataStore;
905
      delete analyticsController;
Alessio Netti's avatar
Logging    
Alessio Netti committed
906
      LOG(info) << "Collect Agent closed. Bye bye...";
907
  }
908
909
910
911
  catch (const std::runtime_error& e) {
      LOG(fatal) <<  e.what();
      return EXIT_FAILURE;
  }
912
  catch (const exception& e) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
913
      LOG(fatal) << "Exception: " << e.what();
914
      abrt(EXIT_FAILURE, INTERR);
915
916
  }

Alessio Netti's avatar
Alessio Netti committed
917
  return retCode;
918
}
919
920