collectagent.cpp 37.4 KB
Newer Older
1
2
//================================================================================
// Name        : collectagent.cpp
3
// Author      : Axel Auweter
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
// Copyright   : Leibniz Supercomputing Centre
6
// Description : Main code of the CollectAgent
7
8
9
10
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
11
// Copyright (C) 2011-2019 Leibniz Supercomputing Centre
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
27

28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
 * @defgroup ca Collect Agent
 *
 * @brief MQTT message broker in between pusher and storage backend.
 *
 * @details Collect Agent is a intermediary between one or multiple Pusher
 *          instances and one storage backend. It runs a reduced custom MQTT
 *          message server. Collect Agent receives data from Pusher
 *          via MQTT messages and stores them in the storage via libdcdb.
 */

/**
 * @file collectagent.cpp
 *
 * @brief Main function for the DCDB Collect Agent.
 *
 * @ingroup ca
 */

47
#include <cstdlib>
48
#include <signal.h>
49
50
#include <unistd.h>
#include <string>
51

52
#include <boost/date_time/posix_time/posix_time.hpp>
Micha Müller's avatar
Micha Müller committed
53
54
55
#include <boost/foreach.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>
56

57
#include <dcdb/connection.h>
58
#include <dcdb/sensordatastore.h>
Alessio Netti's avatar
Alessio Netti committed
59
#include <dcdb/jobdatastore.h>
60
#include <dcdb/calievtdatastore.h>
61
#include <dcdb/sensorconfig.h>
62
#include <dcdb/version.h>
63
#include <dcdb/sensor.h>
64
#include "version.h"
65

66
#include "CARestAPI.h"
67
#include "configuration.h"
68
#include "simplemqttserver.h"
69
#include "messaging.h"
70
#include "abrt.h"
71
#include "dcdbdaemon.h"
72
#include "sensorcache.h"
73
74
#include "analyticscontroller.h"
#include "../analytics/includes/QueryEngine.h"
75

76
77
78
#define __STDC_FORMAT_MACROS
#include <inttypes.h>

79
80
81
82
83
84
85
/**
 * Uncomment and recompile to activate CollectAgent's special benchmark mode.
 * In this mode, all received messages will be discarded and no data is stored
 * in the storage backend.
 */
//#define BENCHMARK_MODE

86
using namespace std;
87
88

int keepRunning;
Alessio Netti's avatar
Alessio Netti committed
89
int retCode = EXIT_SUCCESS;
90
uint64_t msgCtr;
91
uint64_t readingCtr;
Michael Ott's avatar
Michael Ott committed
92
uint64_t queryCtr;
Alessio Netti's avatar
Alessio Netti committed
93
SensorCache mySensorCache;
94
AnalyticsController* analyticsController;
95
DCDB::Connection* dcdbConn;
96
DCDB::SensorDataStore *mySensorDataStore;
Alessio Netti's avatar
Alessio Netti committed
97
DCDB::JobDataStore *myJobDataStore;
98
DCDB::SensorConfig *mySensorConfig;
99
DCDB::CaliEvtDataStore *myCaliEvtDataStore;
100
MetadataStore *metadataStore;
Alessio Netti's avatar
Alessio Netti committed
101
CARestAPI* httpsServer = nullptr;
102
DCDB::SCError err;
103
QueryEngine& queryEngine = QueryEngine::getInstance();
104
logger_t lg;
105

106
bool jobQueryCallback(const string& jobId, const uint64_t startTs, const uint64_t endTs, vector<qeJobData>& buffer, const bool rel, const bool range, const string& domainId) {
Alessio Netti's avatar
Alessio Netti committed
107
108
109
110
111
112
113
114
115
116
    std::list<JobData> tempList;
    JobData   tempData;
    qeJobData tempQeData;
    JDError err;
    if(range) {
        // Getting a list of jobs in the given time range
        uint64_t now = getTimestamp();
        uint64_t startTsInt = rel ? now - startTs : startTs;
        uint64_t endTsInt = rel ? now - endTs : endTs;
        DCDB::TimeStamp start(startTsInt), end(endTsInt);
117
        err = myJobDataStore->getJobsInIntervalRunning(tempList, start, end, domainId);
118
        if(err != JD_OK) return false;
Alessio Netti's avatar
Alessio Netti committed
119
120
    } else {
        // Getting a single job by id
121
        err = myJobDataStore->getJobById(tempData, jobId, domainId);
122
        if(err != JD_OK) return false;
Alessio Netti's avatar
Alessio Netti committed
123
124
125
126
        tempList.push_back(tempData);
    }
    
    for(auto& jd : tempList) {
127
        tempQeData.domainId = jd.domainId;
Alessio Netti's avatar
Alessio Netti committed
128
129
130
131
132
        tempQeData.jobId = jd.jobId;
        tempQeData.userId = jd.userId;
        tempQeData.startTime = jd.startTime.getRaw();
        tempQeData.endTime = jd.endTime.getRaw();
        tempQeData.nodes = jd.nodes;
133
        buffer.push_back(tempQeData);
Alessio Netti's avatar
Alessio Netti committed
134
    }
135
    return true;
Alessio Netti's avatar
Alessio Netti committed
136
137
}

138
bool sensorGroupQueryCallback(const std::vector<string>& names, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>& buffer, const bool rel, const uint64_t tol) {
139
140
141
    // Returning NULL if the query engine is being updated
    if(queryEngine.updating.load()) return false;
    ++queryEngine.access;
142
143
144
145
146
147
148
149
150
151
152
153
154
155
    
    std::list<DCDB::SensorId> topics;
    std::string topic;
    sensorCache_t& sensorMap = mySensorCache.getSensorMap();
    size_t successCtr = 0;
    for(const auto& name : names) {
        // Getting the topic of the queried sensor from the Navigator
        // If not found, we try to use the input name as topic
        try {
            topic = queryEngine.getNavigator()->getNodeTopic(name);
        } catch (const std::domain_error &e) { topic = name; }
        DCDB::SensorId sid;
        // Creating a SID to perform the query
        if (sid.mqttTopicConvert(topic)) {
156
            mySensorCache.wait();
157
            if (sensorMap.count(sid) > 0 && sensorMap[sid].getView(startTs, endTs, buffer, rel, tol)) {
158
159
160
161
                // Data was found, can continue to next SID
                successCtr++;
            } else {
                // This happens only if no data was found in the local cache
162
163
                topics.push_back(sid);
            }
164
            mySensorCache.release();
165
        }
166
    }
167
168
169
170
171
172
173
174
175
176
177
178
179
    // If we are here then some sensors were not found in the cache - we need to fetch data from Cassandra
    if(!topics.empty()) {
        try {
            std::list <DCDB::SensorDataStoreReading> results;
            uint64_t now = getTimestamp();
            //Converting relative timestamps to absolute
            uint64_t startTsInt = rel ? now - startTs : startTs;
            uint64_t endTsInt = rel ? now - endTs : endTs;
            DCDB::TimeStamp start(startTsInt), end(endTsInt);
            uint16_t startWs=start.getWeekstamp(), endWs=end.getWeekstamp();
            // If timestamps are equal we perform a fuzzy query
            if(startTsInt == endTsInt) {
                topics.front().setRsvd(startWs);
180
                mySensorDataStore->fuzzyQuery(results, topics, start, tol, false);
181
182
183
184
185
            }
            // Else, we iterate over the weekstamps (if more than one) and perform range queries
            else {
                for(uint16_t currWs=startWs; currWs<=endWs; currWs++) {
                    topics.front().setRsvd(currWs);
186
                    mySensorDataStore->query(results, topics, start, end, DCDB::AGGREGATE_NONE, false);
187
188
189
190
191
192
                }
            }

            if (!results.empty()) {
                successCtr++;
                reading_t reading;
Michael Ott's avatar
Michael Ott committed
193
		queryCtr+= results.size();
194
195
196
197
198
199
                for (const auto &r : results) {
                    reading.value = r.value;
                    reading.timestamp = r.timeStamp.getRaw();
                    buffer.push_back(reading);
                }
            }
200
        }
201
        catch (const std::exception &e) {}
202
    }
203
    
204
    --queryEngine.access;
205
206
207
    return successCtr>0;
}

208
bool sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>& buffer, const bool rel, const uint64_t tol) {
209
210
211
212
    // Returning NULL if the query engine is being updated
    if(queryEngine.updating.load()) return false;
    std::vector<std::string> nameWrapper;
    nameWrapper.push_back(name);
213
    return sensorGroupQueryCallback(nameWrapper, startTs, endTs, buffer, rel, tol);
214
215
}

216
217
218
219
220
221
222
223
224
225
226
bool metadataQueryCallback(const string& name, SensorMetadata& buffer) {
    // Returning NULL if the query engine is being updated
    if(queryEngine.updating.load()) return false;
    ++queryEngine.access;
    std::string topic=name;
    // Getting the topic of the queried sensor from the Navigator
    // If not found, we try to use the input name as topic
    try {
        topic = queryEngine.getNavigator()->getNodeTopic(name);
    } catch(const std::domain_error& e) {}
    
227
    bool local = false;
228
229
230
231
    metadataStore->wait();
    if(metadataStore->getMap().count(topic)) {
        buffer = metadataStore->get(topic);
        local = true;
232
    }
233
234
    metadataStore->release();
        
235
    if(!local) {
236
237
238
239
240
241
242
        // If we are here then the sensor was not found in the cache - we need to fetch data from Cassandra
        try {
            DCDB::PublicSensor publicSensor;
            if (mySensorConfig->getPublicSensorByName(publicSensor, topic.c_str()) != SC_OK) {
                --queryEngine.access;
                return false;
            }
243
            buffer = DCDB::PublicSensor::publicSensorToMetadata(publicSensor);
244
245
246
247
248
249
250
251
252
253
        }
        catch (const std::exception &e) {
            --queryEngine.access;
            return false;
        }
    }
    --queryEngine.access;
    return true;
}

254
/* Normal termination (SIGINT, CTRL+C) */
255
256
void sigHandler(int sig)
{
Alessio Netti's avatar
Alessio Netti committed
257
  boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
Alessio Netti's avatar
Alessio Netti committed
258
  if( sig == SIGINT ) {
Alessio Netti's avatar
Alessio Netti committed
259
      LOG(fatal) << "Received SIGINT";
Alessio Netti's avatar
Alessio Netti committed
260
261
      retCode = EXIT_SUCCESS;
  } else if( sig == SIGTERM ) {
Alessio Netti's avatar
Alessio Netti committed
262
      LOG(fatal) << "Received SIGTERM";
Alessio Netti's avatar
Alessio Netti committed
263
264
265
266
267
      retCode = EXIT_SUCCESS;
  } else if( sig == SIGUSR1 ) {
      LOG(fatal) << "Received SIGUSR1 via REST API";
      retCode = !httpsServer ? EXIT_SUCCESS : httpsServer->getReturnCode();
  }
268
269
270
  keepRunning = 0;
}

271
272
273
274
275
276
/* Crash */
void abrtHandler(int sig)
{
  abrt(EXIT_FAILURE, SIGNAL);
}

277
int mqttCallback(SimpleMQTTMessage *msg)
278
279
{
  /*
Michael Ott's avatar
Michael Ott committed
280
   * Increment the msgCtr for statistics.
281
   */
282
  msgCtr++;
283

284
285
#ifndef BENCHMARK_MODE

286
  uint64_t len;
287
288
289
  /*
   * Decode the message and put into the database.
   */
290
  if (msg->isPublish()) {
291
      const char *topic = msg->getTopic().c_str();
292
      // We check whether the topic includes the /DCDB_MAP/ keyword, indicating that the payload will contain the
293
294
      // sensor's name. In that case, we set the mappingMessage flag to true, and filter the keyword out of the prefix
      // We use strncmp as it is the most efficient way to do it
Alessio Netti's avatar
Alessio Netti committed
295
      if (strncmp(topic, DCDB_MAP, DCDB_MAP_LEN) == 0) {
296
          if ((len = msg->getPayloadLength()) == 0) {
297
              LOG(error) << "Empty sensor publish message received!";
298
299
              return 1;
          }
300

301
          string payload((char *) msg->getPayload(), len);
302
303
304
305
306
307
308
309
310
          //If the topic includes the extended /DCDB_MAP/METADATA/ keyword, we assume a JSON metadata packet is encoded
          if(strncmp(topic, DCDB_MET, DCDB_MET_LEN) == 0) {
              SensorMetadata sm;
              try {
                  sm.parseJSON(payload);
              } catch (const std::exception &e) {
                  LOG(error) << "Invalid metadata packed received!";
                  return 1;
              }
311
              if(sm.isValid()) {
312
                  err = mySensorConfig->publishSensor(sm);
313
314
                  metadataStore->store(*sm.getPattern(), sm);
              }
315
316
          } else {
              err = mySensorConfig->publishSensor(payload.c_str(), topic + DCDB_MAP_LEN);
317
          }
318
319
320
321

          // PublishSensor does most of the error checking for us
          switch (err) {
              case DCDB::SC_INVALIDPATTERN:
Alessio Netti's avatar
Logging    
Alessio Netti committed
322
                  LOG(error) << "Invalid sensor topic : " << msg->getTopic();
323
324
                  return 1;
              case DCDB::SC_INVALIDPUBLICNAME:
325
                  LOG(error) << "Invalid sensor public name.";
326
327
                  return 1;
              case DCDB::SC_INVALIDSESSION:
Alessio Netti's avatar
Logging    
Alessio Netti committed
328
                  LOG(error) << "Cannot reach sensor data store.";
329
330
331
332
                  return 1;
              default:
                  break;
          }
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
      } else if (strncmp(topic, DCDB_CALIEVT, DCDB_CALIEVT_LEN) == 0) {
          /*
           * Special message case. This message contains a Caliper Event data
           * string that is encoded in the MQTT topic. Its payload consists of
           * usual timestamp-value pairs.
           * Data from this messages will be stored in a separate table managed
           * by CaliEvtDataStore class.
           */
          std::string topicStr(msg->getTopic());
          mqttPayload buf, *payload;

          len = msg->getPayloadLength();
          //TODO support case that no timestamp is given?
          //retrieve timestamp and value from the payload
          if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
              payload = (mqttPayload *) msg->getPayload();
          } else {
              //this message is malformed -> ignore
              LOG(error) << "Message malformed";
              return 1;
          }

          /*
           * Decode message topic in actual sensor topic and string data.
           * "/:/" is used as delimiter between topic and data.
           */
          topicStr.erase(0, DCDB_CALIEVT_LEN);
          size_t pos = topicStr.find("/:/");
          if (pos == std::string::npos) {
              // topic is malformed -> ignore
              LOG(error) << "CaliEvt topic malformed";
              return 1;
          }
          const std::string data(topicStr, pos+3);
          topicStr.erase(pos);

          /*
           * We use the same MQTT-topic/SensorId infrastructure as actual sensor
           * data readings to sort related events.
           * Check if we can decode the event topic into a valid SensorId. If
           * successful, store the record in the database.
           */
          DCDB::SensorId sid;
          if (sid.mqttTopicConvert(topicStr)) {
377
              //std::list<DCDB::CaliEvtData> events;
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
              DCDB::CaliEvtData e;
              e.eventId   = sid;
              e.event     = data;
              for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
                  e.timeStamp = payload[i].timestamp;

                  /**
                   * We want an exhaustive list of all events ordered by their
                   * time of occurrence. Payload values should always be
                   * one. Other values currently indicate a malformed message.
                   *
                   * In the future, the value field could be used to aggregate
                   * multiple equal events that occurred in the same plugin
                   * read cycle.
                   */
                  if(payload[i].value != 1) {
                      LOG(error) << "CaliEvt message malformed. Value != 1";
                      return 1;
                  }

398
399
                  myCaliEvtDataStore->insert(e, metadataStore->getTTL(topicStr));
                  //events.push_back(e);
400
              }
401
              //myCaliEvtDataStore->insertBatch(events, metadataStore->getTTL(topicStr));
402
403
404
          } else {
              LOG(error) << "Topic could not be converted to SID";
          }
Micha Müller's avatar
Micha Müller committed
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
      } else if (strncmp(topic, DCDB_JOBDATA, DCDB_JOBDATA_LEN) == 0) {
	      /**
           * This message contains Slurm job data in JSON format. We need to
           * parse the payload and store it within the JobDataStore.
           */
	      if ((len = msg->getPayloadLength()) == 0) {
		      LOG(error) << "Empty job data message received!";
		      return 1;
	      }

	      //parse JSON into JobData object
	      string        payload((char *)msg->getPayload(), len);
	      DCDB::JobData jd;
	      try {
		      boost::property_tree::iptree config;
		      std::istringstream           str(payload);
		      boost::property_tree::read_json(str, config);
		      BOOST_FOREACH (boost::property_tree::iptree::value_type &val, config) {
			      if (boost::iequals(val.first, "jobid")) {
				      jd.jobId = val.second.data();
425
426
427
			      } else if (boost::iequals(val.first, "domainid")) {
                      jd.domainId = val.second.data();
                  } else if (boost::iequals(val.first, "userid")) {
Micha Müller's avatar
Micha Müller committed
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
				      jd.userId = val.second.data();
			      } else if (boost::iequals(val.first, "starttime")) {
				      jd.startTime = DCDB::TimeStamp((uint64_t)stoull(val.second.data()));
			      } else if (boost::iequals(val.first, "endtime")) {
				      jd.endTime = DCDB::TimeStamp((uint64_t)stoull(val.second.data()));
			      } else if (boost::iequals(val.first, "nodes")) {
				      BOOST_FOREACH (boost::property_tree::iptree::value_type &node, val.second) {
					      jd.nodes.push_back(node.second.data());
				      }
			      }
		      }
	      } catch (const std::exception &e) {
		      LOG(error) << "Invalid job data packet received!";
		      return 1;
	      }

#ifdef DEBUG
	      LOG(debug) << "JobId  = " << jd.jobId;
	      LOG(debug) << "UserId = " << jd.userId;
	      LOG(debug) << "Start  = " << jd.startTime.getString();
	      LOG(debug) << "End    = " << jd.endTime.getString();
	      LOG(debug) << "Nodes: ";
	      for (const auto &n : jd.nodes) {
		      LOG(debug) << "    " << n;
	      }
#endif

	      //store JobData into Storage Backend
456
457
	      //dcdbslurmjob start inserts the endTime as startTime + max job length + 1, i.e. the last digit will be 1 
	      if ((jd.endTime == DCDB::TimeStamp((uint64_t)0)) || ((jd.endTime.getRaw() & 0x1) == 1)) {
Micha Müller's avatar
Micha Müller committed
458
459
		      //starting job data
		      if (myJobDataStore->insertJob(jd) != DCDB::JD_OK) {
460
			      LOG(error) << "Job data insert for job " << jd.jobId << " failed!";
Micha Müller's avatar
Micha Müller committed
461
462
463
464
465
			      return 1;
		      }
	      } else {
		      //ending job data
		      DCDB::JobData tmp;
466
		      if (myJobDataStore->getJobById(tmp, jd.jobId, jd.domainId) != DCDB::JD_OK) {
467
			      LOG(error) << "Could not retrieve job " << jd.jobId << " to be updated!";
Micha Müller's avatar
Micha Müller committed
468
469
470
			      return 1;
		      }

471
		      if (myJobDataStore->updateEndtime(tmp.jobId, tmp.startTime, jd.endTime, jd.domainId) != DCDB::JD_OK) {
472
			      LOG(error) << "Could not update end time of job " << tmp.jobId << "!";
Micha Müller's avatar
Micha Müller committed
473
474
475
			      return 1;
		      }
	      }
476
      } else {
Micha Müller's avatar
Micha Müller committed
477
	      mqttPayload buf, *payload;
478

Micha Müller's avatar
Micha Müller committed
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
	      len = msg->getPayloadLength();
	      //In the 64 bit message case, the collect agent provides a timestamp
	      if (len == sizeof(uint64_t)) {
		      payload = &buf;
		      payload->value = *((int64_t *)msg->getPayload());
		      payload->timestamp = Messaging::calculateTimestamp();
		      len = sizeof(uint64_t) * 2;
	      }
	      //...otherwise it just retrieves it from the MQTT message payload.
	      else if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
		      payload = (mqttPayload *)msg->getPayload();
	      }
	      //...otherwise this message is malformed -> ignore...
	      else {
		      LOG(error) << "Message malformed";
		      return 1;
	      }
496

Micha Müller's avatar
Micha Müller committed
497
	      /*
498
499
500
501
           * Check if we can decode the message topic
           * into a valid SensorId. If successful, store
           * the record in the database.
           */
Micha Müller's avatar
Micha Müller committed
502
503
	      DCDB::SensorId sid;
	      if (sid.mqttTopicConvert(msg->getTopic())) {
504
505
506
507
508
509
510
511
512
513
514
515
#if 0
              cout << "Topic decode successful:" << endl
                  << "  Raw:            " << hex << setw(16) << setfill('0') << sid.getRaw()[0] << hex << setw(16) << setfill('0') << sid.getRaw()[1] << endl
                  << "  DeviceLocation: " << hex << setw(16) << setfill('0') << sid.getDeviceLocation() << endl
                  << "  device_id:      " << hex << setw(8) << setfill('0') << sid.getDeviceSensorId().device_id << endl
                  << "  sensor_number:  " << hex << setw(4) << setfill('0') << sid.getDeviceSensorId().sensor_number << endl << dec;

              cout << "Payload ("  << len/sizeof(mqttPayload) << " messages):"<< endl;
              for (uint64_t i=0; i<len/sizeof(mqttPayload); i++) {
                cout << "  " << i << ": ts=" << payload[i].timestamp << " val=" << payload[i].value << endl;
              }
              cout << endl;
516
#endif
517
	      std::list<DCDB::SensorDataStoreReading> readings;
Alessio Netti's avatar
Alessio Netti committed
518
519
520
521
522
523
          for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
              DCDB::SensorDataStoreReading r(sid, payload[i].timestamp, payload[i].value);
              readings.push_back(r);
              mySensorCache.storeSensor(sid, payload[i].timestamp, payload[i].value);
          }
          mySensorCache.getSensorMap()[sid].updateBatchSize(uint64_t(len / sizeof(mqttPayload)));
524
	      mySensorDataStore->insertBatch(readings, metadataStore->getTTL(msg->getTopic()));
525
	      readingCtr+= readings.size();
526

527
              //mySensorCache.dump();
528
529
          } else {
              LOG(error) << "Message with empty topic received";
530
          }
531
      }
532
  }
533
#endif
534
  return 0;
535
536
}

537
538
539



540
541
542
543
/*
 * Print usage information
 */
void usage() {
544
545
546
547
548
  /*
             1         2         3         4         5         6         7         8
   012345678901234567890123456789012345678901234567890123456789012345678901234567890
   */
  cout << "Usage:" << endl;
549
  cout << "  collectagent [-d] [-s] [-x] [-a] [-m<host>] [-c<host>] [-u<username>] [-p<password>] [-t<ttl>] [-v<verbosity>] <config>" << endl;
550
551
552
553
  cout << "  collectagent -h" << endl;
  cout << endl;
  
  cout << "Options:" << endl;
554
555
  cout << "  -m<host>      MQTT listen address     [default: " << DEFAULT_LISTENHOST << ":" << DEFAULT_LISTENPORT << "]" << endl;
  cout << "  -c<host>      Cassandra host          [default: " << DEFAULT_CASSANDRAHOST << ":" << DEFAULT_CASSANDRAPORT << "]" << endl;
Michael Ott's avatar
Michael Ott committed
556
557
  cout << "  -u<username>  Cassandra username      [default: none]" << endl;
  cout << "  -p<password>  Cassandra password      [default: none]" << endl;
558
559
  cout << "  -t<ttl>       Cassandra insert TTL    [default: " << DEFAULT_CASSANDRATTL << "]" << endl;
  cout << "  -v<level>     Set verbosity of output [default: " << DEFAULT_LOGLEVEL << "]" << endl
Alessio Netti's avatar
Logging    
Alessio Netti committed
560
       << "                Can be a number between 5 (all) and 0 (fatal)." << endl;
561
  cout << endl;
Michael Ott's avatar
Michael Ott committed
562
  cout << "  -d            Daemonize" << endl;
563
  cout << "  -s            Print message stats" <<endl;
564
  cout << "  -x            Parse and print the config but do not actually start collectagent" << endl;
565
  cout << "  -a			   Enable sensor auto-publish" << endl;
Michael Ott's avatar
Michael Ott committed
566
  cout << "  -h            This help page" << endl;
567
  cout << endl;
568
569
570
}

int main(int argc, char* const argv[]) {
571
    cout << "CollectAgent " << VERSION << " (libdcdb " << DCDB::Version::getVersion() << ")" << endl << endl;
572
573

  try{
574

575
576
      // Checking if path to config is supplied
      if (argc <= 1) {
577
          cout << "Please specify a path to the config-directory or a config-file" << endl << endl;
578
579
580
581
582
          usage();
          exit(EXIT_FAILURE);
      }

      // Defining options
583
      const char* opts = "m:r:c:C:u:p:t:v:dDsaxh";
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599

      // Same mechanism as in DCDBPusher - checking if help string is requested before loading config
      char ret;
      while ((ret = getopt(argc, argv, opts)) != -1) {
          switch (ret)
          {
              case 'h':
                  usage();
                  exit(EXIT_FAILURE);
                  break;
              default:
                  //do nothing (other options are read later on)
                  break;
          }
      }

600
601
      initLogging();
      auto cmdSink = setupCmdLogger();
Alessio Netti's avatar
Logging    
Alessio Netti committed
602

Alessio Netti's avatar
Alessio Netti committed
603
      Configuration config(argv[argc - 1], "collectagent.conf");
604
      config.readConfig();
605

Alessio Netti's avatar
Alessio Netti committed
606
607
608
609
      // References to shorten access to config parameters
      Configuration& settings = config;
      cassandra_t& cassandraSettings = config.cassandraSettings;
      pluginSettings_t& pluginSettings = config.pluginSettings;
Micha Mueller's avatar
Micha Mueller committed
610
      serverSettings_t& restAPISettings = config.restAPISettings;
Alessio Netti's avatar
Alessio Netti committed
611
      analyticsSettings_t& analyticsSettings = config.analyticsSettings;
Alessio Netti's avatar
Alessio Netti committed
612
      
613
614
      optind = 1;
      while ((ret=getopt(argc, argv, opts))!=-1) {
615
          switch(ret) {
616
              case 'a':
617
                  pluginSettings.autoPublish = true;
618
                  break;
619
              case 'm':
Alessio Netti's avatar
Alessio Netti committed
620
621
                  settings.mqttListenHost = parseNetworkHost(optarg);
                  settings.mqttListenPort = parseNetworkPort(optarg);
622
                  if(settings.mqttListenPort=="") settings.mqttListenPort = string(DEFAULT_LISTENPORT);
623
                  break;
624
              case 'c':
Alessio Netti's avatar
Alessio Netti committed
625
626
                  cassandraSettings.host = parseNetworkHost(optarg);
                  cassandraSettings.port = parseNetworkPort(optarg);
627
                  if(cassandraSettings.port=="") cassandraSettings.port = string(DEFAULT_CASSANDRAPORT);
628
                  break;
Michael Ott's avatar
Michael Ott committed
629
              case 'u':
Alessio Netti's avatar
Alessio Netti committed
630
                  cassandraSettings.username = optarg;
631
                  break;
Michael Ott's avatar
Michael Ott committed
632
              case 'p': {
Alessio Netti's avatar
Alessio Netti committed
633
                  cassandraSettings.password = optarg;
634
                  // What does this do? Mask the password?
Michael Ott's avatar
Michael Ott committed
635
636
637
638
639
640
641
                  size_t pwdLen = strlen(optarg);
                  memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
                  if (pwdLen > 3) {
                      memset(optarg+3, 0, pwdLen-3);
                  }
                  break;
              }
642
              case 't':
Alessio Netti's avatar
Alessio Netti committed
643
                  cassandraSettings.ttl = stoul(optarg);
644
                  break;
Alessio Netti's avatar
Logging    
Alessio Netti committed
645
              case 'v':
646
                  settings.logLevelCmd = stoi(optarg);
Alessio Netti's avatar
Logging    
Alessio Netti committed
647
                  break;
648
              case 'd':
649
              case 'D':
650
                  settings.daemonize = 1;
651
                  break;
652
              case 's':
653
                  settings.statisticsInterval = 1;
654
                  break;
655
              case 'x':
Alessio Netti's avatar
Alessio Netti committed
656
                  settings.validateConfig = true;
657
                  break;
658
              case 'h':
659
660
661
              default:
                  usage();
                  exit(EXIT_FAILURE);
662
663
664
          }
      }

665
666
      //set up logger to file
      if (settings.logLevelFile >= 0) {
Alessio Netti's avatar
Alessio Netti committed
667
	  auto fileSink = setupFileLogger(pluginSettings.tempdir, std::string("collectagent"));
668
669
670
	  fileSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelFile));
      }
      
Alessio Netti's avatar
Logging    
Alessio Netti committed
671
      //severity level may be overwritten (per option or config-file) --> set it according to globalSettings
672
673
674
      if (settings.logLevelCmd >= 0) {
	  cmdSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelCmd));
      }
Alessio Netti's avatar
Logging    
Alessio Netti committed
675

676
      /*
Alessio Netti's avatar
Alessio Netti committed
677
       * Catch SIGINT and SIGTERM signals to allow for proper server shutdowns.
678
679
       */
      signal(SIGINT, sigHandler);
Alessio Netti's avatar
Alessio Netti committed
680
      signal(SIGTERM, sigHandler);
Alessio Netti's avatar
Alessio Netti committed
681
      signal(SIGUSR1, sigHandler);
682
683
684
685
686
687
688
689
690
691

      /*
       * Catch critical signals to allow for backtraces
       */
      signal(SIGABRT, abrtHandler);
      signal(SIGSEGV, abrtHandler);

      // Daemonizing the collectagent
      if(settings.daemonize)
          dcdbdaemon();
Alessio Netti's avatar
Alessio Netti committed
692
      
693
      // Setting the size of the sensor cache
694
      // Conversion from milliseconds to nanoseconds
Alessio Netti's avatar
Alessio Netti committed
695
      mySensorCache.setMaxHistory(uint64_t(pluginSettings.cacheInterval) * 1000000);
696

697
      //Allocate and initialize connection to Cassandra.
Alessio Netti's avatar
Alessio Netti committed
698
699
      dcdbConn = new DCDB::Connection(cassandraSettings.host, atoi(cassandraSettings.port.c_str()), 
                                      cassandraSettings.username, cassandraSettings.password);
Alessio Netti's avatar
Alessio Netti committed
700
701
      dcdbConn->setNumThreadsIo(cassandraSettings.numThreadsIo);
      dcdbConn->setQueueSizeIo(cassandraSettings.queueSizeIo);
702
      uint32_t params[1] = {cassandraSettings.coreConnPerHost};
Alessio Netti's avatar
Alessio Netti committed
703
      dcdbConn->setBackendParams(params);
Alessio Netti's avatar
Alessio Netti committed
704
      
Axel Auweter's avatar
Axel Auweter committed
705
      if (!dcdbConn->connect()) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
706
          LOG(fatal) << "Cannot connect to Cassandra!";
707
708
709
710
711
712
          exit(EXIT_FAILURE);
      }

      /*
       * Legacy behavior: Initialize the DCDB schema in Cassandra.
       */
Axel Auweter's avatar
Axel Auweter committed
713
      dcdbConn->initSchema();
Alessio Netti's avatar
Alessio Netti committed
714
      
715
716
717
      /*
       * Allocate the SensorDataStore.
       */
718
      mySensorDataStore = new DCDB::SensorDataStore(dcdbConn);
719
      mySensorConfig = new DCDB::SensorConfig(dcdbConn);
Alessio Netti's avatar
Alessio Netti committed
720
      myJobDataStore = new DCDB::JobDataStore(dcdbConn);
721
      myCaliEvtDataStore = new DCDB::CaliEvtDataStore(dcdbConn);
722
723
724

      /*
       * Set TTL for data store inserts if TTL > 0.
725
       */
726
      if (cassandraSettings.ttl > 0) {
Alessio Netti's avatar
Alessio Netti committed
727
        mySensorDataStore->setTTL(cassandraSettings.ttl);
728
729
        myCaliEvtDataStore->setTTL(cassandraSettings.ttl);
      }
Alessio Netti's avatar
Alessio Netti committed
730
      mySensorDataStore->setDebugLog(cassandraSettings.debugLog);
731
      myCaliEvtDataStore->setDebugLog(cassandraSettings.debugLog);
732

733
734
735
      // Fetching public sensor information from the Cassandra datastore
      list<DCDB::PublicSensor> publicSensors;
      metadataStore = new MetadataStore();
736
737
      if(mySensorConfig->getPublicSensorsVerbose(publicSensors)!=SC_OK)
          LOG(error) << "Failed to retrieve public sensors. Metadata Store and Sensor Navigator will be empty.";
738
      SensorMetadata sBuf;
739
      for (const auto &s : publicSensors)
740
          if (!s.is_virtual) {
741
              sBuf = DCDB::PublicSensor::publicSensorToMetadata(s);
742
743
              if(sBuf.isValid())
                  metadataStore->store(*sBuf.getPattern(), sBuf);
744
          }
745
      publicSensors.clear();
746
          
747
748
      analyticsController = new AnalyticsController(mySensorConfig, mySensorDataStore);
      analyticsController->setCache(&mySensorCache);
749
      analyticsController->setMetadataStore(metadataStore);
Alessio Netti's avatar
Alessio Netti committed
750
      queryEngine.setFilter(analyticsSettings.filter);
Alessio Netti's avatar
Alessio Netti committed
751
      queryEngine.setJobFilter(analyticsSettings.jobFilter);
752
      queryEngine.setJobMatch(analyticsSettings.jobMatch);
753
      queryEngine.setJobIDFilter(analyticsSettings.jobIdFilter);
754
      queryEngine.setJobDomainId(analyticsSettings.jobDomainId);
Alessio Netti's avatar
Alessio Netti committed
755
      queryEngine.setSensorHierarchy(analyticsSettings.hierarchy);
756
      queryEngine.setQueryCallback(sensorQueryCallback);
757
      queryEngine.setGroupQueryCallback(sensorGroupQueryCallback);
758
      queryEngine.setMetadataQueryCallback(metadataQueryCallback);
Alessio Netti's avatar
Alessio Netti committed
759
      queryEngine.setJobQueryCallback(jobQueryCallback);
760
      if(!analyticsController->initialize(settings))
761
762
          return EXIT_FAILURE;
      
Alessio Netti's avatar
Alessio Netti committed
763
      LOG_LEVEL vLogLevel = settings.validateConfig ? LOG_LEVEL::info : LOG_LEVEL::debug;
764
765
766
767
      LOG_VAR(vLogLevel) << "-----  Configuration  -----";

      //print global settings in either case
      LOG(info) << "Global Settings:";
Alessio Netti's avatar
Alessio Netti committed
768
      LOG(info) << "    MQTT-listenAddress: " << settings.mqttListenHost << ":" << settings.mqttListenPort;
Alessio Netti's avatar
Alessio Netti committed
769
      LOG(info) << "    CacheInterval:      " << int(pluginSettings.cacheInterval/1000) << " [s]";
770
771
772
773
      LOG(info) << "    CleaningInterval:   " << settings.cleaningInterval << " [s]";
      LOG(info) << "    MessageThreads:     " << settings.messageThreads;
      LOG(info) << "    MessageSlots:       " << settings.messageSlots;
      LOG(info) << "    Daemonize:          " << (settings.daemonize ? "Enabled" : "Disabled");
774
      LOG(info) << "    StatisticsInterval: " << settings.statisticsInterval << " [s]";
Alessio Netti's avatar
Alessio Netti committed
775
      LOG(info) << "    MQTT-prefix:        " << pluginSettings.mqttPrefix;
776
      LOG(info) << "    Auto-publish:       " << (pluginSettings.autoPublish ? "Enabled" : "Disabled");
Alessio Netti's avatar
Alessio Netti committed
777
778
      LOG(info) << "    Write-Dir:          " << pluginSettings.tempdir;
      LOG(info) << (settings.validateConfig ? "    Only validating config files." : "    ValidateConfig:     Disabled");
779

Alessio Netti's avatar
Alessio Netti committed
780
781
      LOG(info) << "Analytics Settings:";
      LOG(info) << "    Hierarchy:          " << (analyticsSettings.hierarchy!="" ? analyticsSettings.hierarchy : "none");
782
      LOG(info) << "    Filter:             " << (analyticsSettings.filter!="" ? analyticsSettings.filter : "none");
783
784
      LOG(info) << "    Job Filter:         " << (analyticsSettings.jobFilter != "" ? analyticsSettings.jobFilter : "none");
      LOG(info) << "    Job Match:          " << (analyticsSettings.jobMatch != "" ? analyticsSettings.jobMatch : "none");
785
      LOG(info) << "    Job ID Filter:      " << (analyticsSettings.jobIdFilter != "" ? analyticsSettings.jobIdFilter : "none");
786
      LOG(info) << "    Job Domain ID:      " << analyticsSettings.jobDomainId;
Alessio Netti's avatar
Alessio Netti committed
787
      
788
      LOG(info) << "Cassandra Driver Settings:";
Alessio Netti's avatar
Alessio Netti committed
789
      LOG(info) << "    Address:            " << cassandraSettings.host << ":" << cassandraSettings.port;
Alessio Netti's avatar
Alessio Netti committed
790
791
792
793
794
      LOG(info) << "    TTL:                " << cassandraSettings.ttl;
      LOG(info) << "    NumThreadsIO:       " << cassandraSettings.numThreadsIo;
      LOG(info) << "    QueueSizeIO:        " << cassandraSettings.queueSizeIo;
      LOG(info) << "    CoreConnPerHost:    " << cassandraSettings.coreConnPerHost;
      LOG(info) << "    DebugLog:           " << (cassandraSettings.debugLog ? "Enabled" : "Disabled");
795
#ifdef SimpleMQTTVerbose
Alessio Netti's avatar
Alessio Netti committed
796
797
      LOG(info) << "    Username:           " << cassandraSettings.username;
	  LOG(info) << "    Password:           " << cassandraSettings.password;
798
799
800
801
#else
      LOG(info) << "    Username and password not printed.";
#endif

802
803
804
805
806
      if (restAPISettings.enabled) {
	  
	  LOG(info) << "RestAPI Settings:";
	  LOG(info) << "    REST Server: " << restAPISettings.host << ":" << restAPISettings.port;
	  LOG(info) << "    Certificate: " << restAPISettings.certificate;
Alessio Netti's avatar
Alessio Netti committed
807
	  LOG(info) << "    Private key file: " << restAPISettings.privateKey;
Michael Ott's avatar
Michael Ott committed
808
	  
809
	  if (config.influxSettings.measurements.size() > 0) {
Michael Ott's avatar
Michael Ott committed
810
	      LOG(info) << "InfluxDB Settings:";
811
	      LOG(info) << "    MQTT-Prefix:  " << config.influxSettings.mqttPrefix;
Michael Ott's avatar
Michael Ott committed
812
813
	      LOG(info) << "    Auto-Publish: " << (config.influxSettings.publish ? "Enabled" : "Disabled");

814
	      for (auto &m: config.influxSettings.measurements) {
Michael Ott's avatar
Michael Ott committed
815
		  LOG(info) << "    Measurement: " << m.first;
816
		  LOG(info) << "        MQTT-Part:   " << m.second.mqttPart;
Michael Ott's avatar
Michael Ott committed
817
		  LOG(info) << "        Tag:         " << m.second.tag;
Michael Ott's avatar
Michael Ott committed
818
819
820
821
822
823
		  if ((m.second.tagRegex.size() > 0) && (m.second.tagSubstitution.size() > 0)) {
		      if (m.second.tagSubstitution != "&") {
			  LOG(info) << "        TagFilter:   s/" << m.second.tagRegex.str() << "/" <<  m.second.tagSubstitution << "/";
		      } else {
			  LOG(info) << "    TagFilter:   " << m.second.tagRegex.str();
		      }
Michael Ott's avatar
Michael Ott committed
824
825
826
827
828
829
		  }
		  if (m.second.fields.size() > 0) {
		      stringstream ss;
		      copy(m.second.fields.begin(), m.second.fields.end(), ostream_iterator<std::string>(ss, ","));
		      string fields = ss.str();
		      fields.pop_back();
Michael Ott's avatar
Michael Ott committed
830
		      LOG(info) << "        Fields:      " << fields;
Michael Ott's avatar
Michael Ott committed
831
832
833
		  }
	      }
	  }
834
      }
835
836
      LOG_VAR(vLogLevel) << "-----  Analytics Configuration  -----";
      for(auto& p : analyticsController->getManager()->getPlugins()) {
837
          LOG_VAR(vLogLevel) << "Operator Plugin \"" << p.id << "\"";
838
839
840
841
          p.configurator->printConfig(vLogLevel);
      }
      LOG_VAR(vLogLevel) << "-----  End Configuration  -----";

Alessio Netti's avatar
Alessio Netti committed
842
      if (settings.validateConfig)
843
844
845
846
          return EXIT_SUCCESS;
      else
          analyticsController->start();

847
848
849
      /*
       * Start the MQTT Message Server.
       */
Alessio Netti's avatar
Alessio Netti committed
850
      SimpleMQTTServer ms(settings.mqttListenHost, settings.mqttListenPort, settings.messageThreads, settings.messageSlots);
851
      
852
853
854
      ms.setMessageCallback(mqttCallback);
      ms.start();

Alessio Netti's avatar
Logging    
Alessio Netti committed
855
      LOG(info) << "MQTT Server running...";
856
      
857
858
859
      /*
       * Start the HTTP Server for the REST API
       */
860
      if (restAPISettings.enabled) {
Michael Ott's avatar
Michael Ott committed
861
          httpsServer = new CARestAPI(restAPISettings, &config.influxSettings, &mySensorCache, mySensorDataStore, mySensorConfig, analyticsController, &ms);
862
863
864
          config.readRestAPIUsers(httpsServer);
          httpsServer->start();
          LOG(info) <<  "HTTP Server running...";
865
      }
866

867
868
869
870
      /*
       * Run (hopefully) forever...
       */
      keepRunning = 1;
871
      uint64_t start, end;
872
      float elapsed;
873
874
      msgCtr = 0;
      readingCtr = 0;
Michael Ott's avatar
Michael Ott committed
875
      queryCtr = 0;
876

877
878
      start = getTimestamp();
      uint64_t lastCleanup = start;
879
      uint64_t sleepInterval = (settings.statisticsInterval > 0) ? settings.statisticsInterval : 60;
Alessio Netti's avatar
Alessio Netti committed
880

881
      LOG(info) << "Collect Agent running...";
882
      while(keepRunning) {
883
          start = getTimestamp();
884
          if(NS_TO_S(start) - NS_TO_S(lastCleanup) > settings.cleaningInterval) {
885
886
              uint64_t purged = mySensorCache.clean(S_TO_NS(settings.cleaningInterval));
              lastCleanup = start;
Alessio Netti's avatar
Alessio Netti committed
887
888
889
890
              if(purged > 0)
                  LOG(info) << "Cache: purged " << purged << " obsolete entries";
          }

891
892
893
894
	  sleep(sleepInterval);

	  if((settings.statisticsInterval > 0) && keepRunning) {
	      /* not really thread safe but will do the job */
895
	      end = getTimestamp();
896
897
898
899
900
901
	      elapsed = (float)(NS_TO_S(end) - NS_TO_S(start));
	      float aIns = ceil(((float)analyticsController->getReadingCtr()) / elapsed);
	      float aReq = ceil(((float)queryCtr) / elapsed);
	      float rIns = restAPISettings.enabled ? ceil(((float)httpsServer->getInfluxCounter()) / elapsed) : 0.0f;
	      float mIns = ceil(((float)readingCtr) / elapsed);
	      float mMsg = ceil(((float) msgCtr) / elapsed);
Michael Ott's avatar
Michael Ott committed
902
	      LOG(info) << "Performance: MQTT [" << std::fixed << std::setprecision(0) << mIns << " ins/s|" << mMsg << " msg/s]   REST [" << rIns << " ins/s]   Analytics [" << aIns << " ins/s|" << aReq << " req/s]";
903
904
905
906
907
908
909
910
	      std::map<std::string, hostInfo_t> lastSeen = ms.collectLastSeen();
	      uint64_t connectedHosts = 0;
	      for (auto h: lastSeen) {
		  if (h.second.lastSeen >= end - S_TO_NS(settings.statisticsInterval)) {
		      connectedHosts++;
		  }
	      }
	      LOG(info) << "Connected hosts: " << connectedHosts;
911
	      msgCtr = 0;
Michael Ott's avatar
Michael Ott committed
912
	      queryCtr = 0;
913
914
	      readingCtr = 0;
	  }
915
916
      }

Alessio Netti's avatar
Logging    
Alessio Netti committed
917
      LOG(info) << "Stopping...";
918
      analyticsController->stop();
919
      ms.stop();
Alessio Netti's avatar
Logging    
Alessio Netti committed
920
      LOG(info) << "MQTT Server stopped...";
921
922
923
924
925
      if (restAPISettings.enabled) {
	  httpsServer->stop();
	  delete httpsServer;
	  LOG(info) << "HTTP Server stopped...";
      }
926
      delete mySensorDataStore;
Alessio Netti's avatar
Alessio Netti committed
927
      delete myJobDataStore;
928
      delete mySensorConfig;
929
      delete myCaliEvtDataStore;
Axel Auweter's avatar
Axel Auweter committed
930
931
      dcdbConn->disconnect();
      delete dcdbConn;
932
      delete metadataStore;
933
      delete analyticsController;
Alessio Netti's avatar
Logging    
Alessio Netti committed
934
      LOG(info) << "Collect Agent closed. Bye bye...";
935
  }
936
937
938
939
  catch (const std::runtime_error& e) {
      LOG(fatal) <<  e.what();
      return EXIT_FAILURE;
  }
940
  catch (const exception& e) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
941
      LOG(fatal) << "Exception: " << e.what();
942
      abrt(EXIT_FAILURE, INTERR);