collectagent.cpp 35.6 KB
Newer Older
1
2
//================================================================================
// Name        : collectagent.cpp
3
// Author      : Axel Auweter
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
// Copyright   : Leibniz Supercomputing Centre
6
// Description : Main code of the CollectAgent
7
8
9
10
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
11
// Copyright (C) 2011-2019 Leibniz Supercomputing Centre
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
27

28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
 * @defgroup ca Collect Agent
 *
 * @brief MQTT message broker in between pusher and storage backend.
 *
 * @details Collect Agent is a intermediary between one or multiple Pusher
 *          instances and one storage backend. It runs a reduced custom MQTT
 *          message server. Collect Agent receives data from Pusher
 *          via MQTT messages and stores them in the storage via libdcdb.
 */

/**
 * @file collectagent.cpp
 *
 * @brief Main function for the DCDB Collect Agent.
 *
 * @ingroup ca
 */

47
#include <cstdlib>
48
#include <signal.h>
49
50
#include <unistd.h>
#include <string>
51

52
#include <boost/date_time/posix_time/posix_time.hpp>
Micha Müller's avatar
Micha Müller committed
53
54
55
#include <boost/foreach.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>
56

57
#include <dcdb/connection.h>
58
#include <dcdb/sensordatastore.h>
Alessio Netti's avatar
Alessio Netti committed
59
#include <dcdb/jobdatastore.h>
60
#include <dcdb/calievtdatastore.h>
61
#include <dcdb/sensorconfig.h>
62
#include <dcdb/version.h>
63
#include <dcdb/sensor.h>
64
#include "version.h"
65

66
#include "CARestAPI.h"
67
#include "configuration.h"
68
#include "simplemqttserver.h"
69
#include "messaging.h"
70
#include "abrt.h"
71
#include "dcdbdaemon.h"
72
#include "sensorcache.h"
73
74
#include "analyticscontroller.h"
#include "../analytics/includes/QueryEngine.h"
75

76
77
78
#define __STDC_FORMAT_MACROS
#include <inttypes.h>

79
80
81
82
83
84
85
/**
 * Uncomment and recompile to activate CollectAgent's special benchmark mode.
 * In this mode, all received messages will be discarded and no data is stored
 * in the storage backend.
 */
//#define BENCHMARK_MODE

86
using namespace std;
87
88

int keepRunning;
Alessio Netti's avatar
Alessio Netti committed
89
int retCode = EXIT_SUCCESS;
90
bool statistics;
91
92
uint64_t msgCtr;
uint64_t pmsgCtr;
93
uint64_t readingCtr;
Alessio Netti's avatar
Alessio Netti committed
94
SensorCache mySensorCache;
95
AnalyticsController* analyticsController;
96
DCDB::Connection* dcdbConn;
97
DCDB::SensorDataStore *mySensorDataStore;
Alessio Netti's avatar
Alessio Netti committed
98
DCDB::JobDataStore *myJobDataStore;
99
DCDB::SensorConfig *mySensorConfig;
100
DCDB::CaliEvtDataStore *myCaliEvtDataStore;
101
MetadataStore *metadataStore;
Alessio Netti's avatar
Alessio Netti committed
102
CARestAPI* httpsServer = nullptr;
103
DCDB::SCError err;
104
QueryEngine& queryEngine = QueryEngine::getInstance();
105
logger_t lg;
106

107
bool jobQueryCallback(const string& jobId, const uint64_t startTs, const uint64_t endTs, vector<qeJobData>& buffer, const bool rel, const bool range, const string& domainId) {
Alessio Netti's avatar
Alessio Netti committed
108
109
110
111
112
113
114
115
116
117
    std::list<JobData> tempList;
    JobData   tempData;
    qeJobData tempQeData;
    JDError err;
    if(range) {
        // Getting a list of jobs in the given time range
        uint64_t now = getTimestamp();
        uint64_t startTsInt = rel ? now - startTs : startTs;
        uint64_t endTsInt = rel ? now - endTs : endTs;
        DCDB::TimeStamp start(startTsInt), end(endTsInt);
118
        err = myJobDataStore->getJobsInIntervalRunning(tempList, start, end, domainId);
119
        if(err != JD_OK) return false;
Alessio Netti's avatar
Alessio Netti committed
120
121
    } else {
        // Getting a single job by id
122
        err = myJobDataStore->getJobById(tempData, jobId, domainId);
123
        if(err != JD_OK) return false;
Alessio Netti's avatar
Alessio Netti committed
124
125
126
127
        tempList.push_back(tempData);
    }
    
    for(auto& jd : tempList) {
128
        tempQeData.domainId = jd.domainId;
Alessio Netti's avatar
Alessio Netti committed
129
130
131
132
133
        tempQeData.jobId = jd.jobId;
        tempQeData.userId = jd.userId;
        tempQeData.startTime = jd.startTime.getRaw();
        tempQeData.endTime = jd.endTime.getRaw();
        tempQeData.nodes = jd.nodes;
134
        buffer.push_back(tempQeData);
Alessio Netti's avatar
Alessio Netti committed
135
    }
136
    return true;
Alessio Netti's avatar
Alessio Netti committed
137
138
}

139
bool sensorGroupQueryCallback(const std::vector<string>& names, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>& buffer, const bool rel, const uint64_t tol) {
140
141
142
    // Returning NULL if the query engine is being updated
    if(queryEngine.updating.load()) return false;
    ++queryEngine.access;
143
144
145
146
147
148
149
150
151
152
153
154
155
156
    
    std::list<DCDB::SensorId> topics;
    std::string topic;
    sensorCache_t& sensorMap = mySensorCache.getSensorMap();
    size_t successCtr = 0;
    for(const auto& name : names) {
        // Getting the topic of the queried sensor from the Navigator
        // If not found, we try to use the input name as topic
        try {
            topic = queryEngine.getNavigator()->getNodeTopic(name);
        } catch (const std::domain_error &e) { topic = name; }
        DCDB::SensorId sid;
        // Creating a SID to perform the query
        if (sid.mqttTopicConvert(topic)) {
157
            mySensorCache.wait();
158
            if (sensorMap.count(sid) > 0 && sensorMap[sid].getView(startTs, endTs, buffer, rel, tol)) {
159
160
161
162
                // Data was found, can continue to next SID
                successCtr++;
            } else {
                // This happens only if no data was found in the local cache
163
164
                topics.push_back(sid);
            }
165
            mySensorCache.release();
166
        }
167
    }
168
169
170
171
172
173
174
175
176
177
178
179
180
    // If we are here then some sensors were not found in the cache - we need to fetch data from Cassandra
    if(!topics.empty()) {
        try {
            std::list <DCDB::SensorDataStoreReading> results;
            uint64_t now = getTimestamp();
            //Converting relative timestamps to absolute
            uint64_t startTsInt = rel ? now - startTs : startTs;
            uint64_t endTsInt = rel ? now - endTs : endTs;
            DCDB::TimeStamp start(startTsInt), end(endTsInt);
            uint16_t startWs=start.getWeekstamp(), endWs=end.getWeekstamp();
            // If timestamps are equal we perform a fuzzy query
            if(startTsInt == endTsInt) {
                topics.front().setRsvd(startWs);
181
                mySensorDataStore->fuzzyQuery(results, topics, start, tol, false);
182
183
184
185
186
            }
            // Else, we iterate over the weekstamps (if more than one) and perform range queries
            else {
                for(uint16_t currWs=startWs; currWs<=endWs; currWs++) {
                    topics.front().setRsvd(currWs);
187
                    mySensorDataStore->query(results, topics, start, end, DCDB::AGGREGATE_NONE, false);
188
189
190
191
192
193
194
195
196
197
198
199
                }
            }

            if (!results.empty()) {
                successCtr++;
                reading_t reading;
                for (const auto &r : results) {
                    reading.value = r.value;
                    reading.timestamp = r.timeStamp.getRaw();
                    buffer.push_back(reading);
                }
            }
200
        }
201
        catch (const std::exception &e) {}
202
    }
203
    
204
    --queryEngine.access;
205
206
207
    return successCtr>0;
}

208
bool sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>& buffer, const bool rel, const uint64_t tol) {
209
210
211
212
    // Returning NULL if the query engine is being updated
    if(queryEngine.updating.load()) return false;
    std::vector<std::string> nameWrapper;
    nameWrapper.push_back(name);
213
    return sensorGroupQueryCallback(nameWrapper, startTs, endTs, buffer, rel, tol);
214
215
}

216
217
218
219
220
221
222
223
224
225
226
bool metadataQueryCallback(const string& name, SensorMetadata& buffer) {
    // Returning NULL if the query engine is being updated
    if(queryEngine.updating.load()) return false;
    ++queryEngine.access;
    std::string topic=name;
    // Getting the topic of the queried sensor from the Navigator
    // If not found, we try to use the input name as topic
    try {
        topic = queryEngine.getNavigator()->getNodeTopic(name);
    } catch(const std::domain_error& e) {}
    
227
    bool local = false;
228
229
230
231
    metadataStore->wait();
    if(metadataStore->getMap().count(topic)) {
        buffer = metadataStore->get(topic);
        local = true;
232
    }
233
234
    metadataStore->release();
        
235
    if(!local) {
236
237
238
239
240
241
242
        // If we are here then the sensor was not found in the cache - we need to fetch data from Cassandra
        try {
            DCDB::PublicSensor publicSensor;
            if (mySensorConfig->getPublicSensorByName(publicSensor, topic.c_str()) != SC_OK) {
                --queryEngine.access;
                return false;
            }
243
            buffer = DCDB::PublicSensor::publicSensorToMetadata(publicSensor);
244
245
246
247
248
249
250
251
252
253
        }
        catch (const std::exception &e) {
            --queryEngine.access;
            return false;
        }
    }
    --queryEngine.access;
    return true;
}

254
/* Normal termination (SIGINT, CTRL+C) */
255
256
void sigHandler(int sig)
{
Alessio Netti's avatar
Alessio Netti committed
257
  boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
Alessio Netti's avatar
Alessio Netti committed
258
  if( sig == SIGINT ) {
Alessio Netti's avatar
Alessio Netti committed
259
      LOG(fatal) << "Received SIGINT";
Alessio Netti's avatar
Alessio Netti committed
260
261
      retCode = EXIT_SUCCESS;
  } else if( sig == SIGTERM ) {
Alessio Netti's avatar
Alessio Netti committed
262
      LOG(fatal) << "Received SIGTERM";
Alessio Netti's avatar
Alessio Netti committed
263
264
265
266
267
      retCode = EXIT_SUCCESS;
  } else if( sig == SIGUSR1 ) {
      LOG(fatal) << "Received SIGUSR1 via REST API";
      retCode = !httpsServer ? EXIT_SUCCESS : httpsServer->getReturnCode();
  }
268
269
270
  keepRunning = 0;
}

271
272
273
274
275
276
/* Crash */
void abrtHandler(int sig)
{
  abrt(EXIT_FAILURE, SIGNAL);
}

277
int mqttCallback(SimpleMQTTMessage *msg)
278
279
280
281
{
  /*
   * Increment the msgCtr/vmsgCtr for statistics.
   */
282
  msgCtr++;
283
284
285
  if (msg->isPublish())
    pmsgCtr++;

286
287
#ifndef BENCHMARK_MODE

288
  uint64_t len;
289
290
291
  /*
   * Decode the message and put into the database.
   */
292
  if (msg->isPublish()) {
293
      const char *topic = msg->getTopic().c_str();
294
      // We check whether the topic includes the /DCDB_MAP/ keyword, indicating that the payload will contain the
295
296
      // sensor's name. In that case, we set the mappingMessage flag to true, and filter the keyword out of the prefix
      // We use strncmp as it is the most efficient way to do it
Alessio Netti's avatar
Alessio Netti committed
297
      if (strncmp(topic, DCDB_MAP, DCDB_MAP_LEN) == 0) {
298
          if ((len = msg->getPayloadLength()) == 0) {
299
              LOG(error) << "Empty sensor publish message received!";
300
301
              return 1;
          }
302

303
          string payload((char *) msg->getPayload(), len);
304
305
306
307
308
309
310
311
312
          //If the topic includes the extended /DCDB_MAP/METADATA/ keyword, we assume a JSON metadata packet is encoded
          if(strncmp(topic, DCDB_MET, DCDB_MET_LEN) == 0) {
              SensorMetadata sm;
              try {
                  sm.parseJSON(payload);
              } catch (const std::exception &e) {
                  LOG(error) << "Invalid metadata packed received!";
                  return 1;
              }
313
              if(sm.isValid()) {
314
                  err = mySensorConfig->publishSensor(sm);
315
316
                  metadataStore->store(*sm.getPattern(), sm);
              }
317
318
          } else {
              err = mySensorConfig->publishSensor(payload.c_str(), topic + DCDB_MAP_LEN);
319
          }
320
321
322
323

          // PublishSensor does most of the error checking for us
          switch (err) {
              case DCDB::SC_INVALIDPATTERN:
Alessio Netti's avatar
Logging    
Alessio Netti committed
324
                  LOG(error) << "Invalid sensor topic : " << msg->getTopic();
325
326
                  return 1;
              case DCDB::SC_INVALIDPUBLICNAME:
327
                  LOG(error) << "Invalid sensor public name.";
328
329
                  return 1;
              case DCDB::SC_INVALIDSESSION:
Alessio Netti's avatar
Logging    
Alessio Netti committed
330
                  LOG(error) << "Cannot reach sensor data store.";
331
332
333
334
                  return 1;
              default:
                  break;
          }
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
      } else if (strncmp(topic, DCDB_CALIEVT, DCDB_CALIEVT_LEN) == 0) {
          /*
           * Special message case. This message contains a Caliper Event data
           * string that is encoded in the MQTT topic. Its payload consists of
           * usual timestamp-value pairs.
           * Data from this messages will be stored in a separate table managed
           * by CaliEvtDataStore class.
           */
          std::string topicStr(msg->getTopic());
          mqttPayload buf, *payload;

          len = msg->getPayloadLength();
          //TODO support case that no timestamp is given?
          //retrieve timestamp and value from the payload
          if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
              payload = (mqttPayload *) msg->getPayload();
          } else {
              //this message is malformed -> ignore
              LOG(error) << "Message malformed";
              return 1;
          }

          /*
           * Decode message topic in actual sensor topic and string data.
           * "/:/" is used as delimiter between topic and data.
           */
          topicStr.erase(0, DCDB_CALIEVT_LEN);
          size_t pos = topicStr.find("/:/");
          if (pos == std::string::npos) {
              // topic is malformed -> ignore
              LOG(error) << "CaliEvt topic malformed";
              return 1;
          }
          const std::string data(topicStr, pos+3);
          topicStr.erase(pos);

          /*
           * We use the same MQTT-topic/SensorId infrastructure as actual sensor
           * data readings to sort related events.
           * Check if we can decode the event topic into a valid SensorId. If
           * successful, store the record in the database.
           */
          DCDB::SensorId sid;
          if (sid.mqttTopicConvert(topicStr)) {
379
              //std::list<DCDB::CaliEvtData> events;
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
              DCDB::CaliEvtData e;
              e.eventId   = sid;
              e.event     = data;
              for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
                  e.timeStamp = payload[i].timestamp;

                  /**
                   * We want an exhaustive list of all events ordered by their
                   * time of occurrence. Payload values should always be
                   * one. Other values currently indicate a malformed message.
                   *
                   * In the future, the value field could be used to aggregate
                   * multiple equal events that occurred in the same plugin
                   * read cycle.
                   */
                  if(payload[i].value != 1) {
                      LOG(error) << "CaliEvt message malformed. Value != 1";
                      return 1;
                  }

400
401
                  myCaliEvtDataStore->insert(e, metadataStore->getTTL(topicStr));
                  //events.push_back(e);
402
              }
403
              //myCaliEvtDataStore->insertBatch(events, metadataStore->getTTL(topicStr));
404
405
406
          } else {
              LOG(error) << "Topic could not be converted to SID";
          }
Micha Müller's avatar
Micha Müller committed
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
      } else if (strncmp(topic, DCDB_JOBDATA, DCDB_JOBDATA_LEN) == 0) {
	      /**
           * This message contains Slurm job data in JSON format. We need to
           * parse the payload and store it within the JobDataStore.
           */
	      if ((len = msg->getPayloadLength()) == 0) {
		      LOG(error) << "Empty job data message received!";
		      return 1;
	      }

	      //parse JSON into JobData object
	      string        payload((char *)msg->getPayload(), len);
	      DCDB::JobData jd;
	      try {
		      boost::property_tree::iptree config;
		      std::istringstream           str(payload);
		      boost::property_tree::read_json(str, config);
		      BOOST_FOREACH (boost::property_tree::iptree::value_type &val, config) {
			      if (boost::iequals(val.first, "jobid")) {
				      jd.jobId = val.second.data();
427
428
429
			      } else if (boost::iequals(val.first, "domainid")) {
                      jd.domainId = val.second.data();
                  } else if (boost::iequals(val.first, "userid")) {
Micha Müller's avatar
Micha Müller committed
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
				      jd.userId = val.second.data();
			      } else if (boost::iequals(val.first, "starttime")) {
				      jd.startTime = DCDB::TimeStamp((uint64_t)stoull(val.second.data()));
			      } else if (boost::iequals(val.first, "endtime")) {
				      jd.endTime = DCDB::TimeStamp((uint64_t)stoull(val.second.data()));
			      } else if (boost::iequals(val.first, "nodes")) {
				      BOOST_FOREACH (boost::property_tree::iptree::value_type &node, val.second) {
					      jd.nodes.push_back(node.second.data());
				      }
			      }
		      }
	      } catch (const std::exception &e) {
		      LOG(error) << "Invalid job data packet received!";
		      return 1;
	      }

#ifdef DEBUG
	      LOG(debug) << "JobId  = " << jd.jobId;
	      LOG(debug) << "UserId = " << jd.userId;
	      LOG(debug) << "Start  = " << jd.startTime.getString();
	      LOG(debug) << "End    = " << jd.endTime.getString();
	      LOG(debug) << "Nodes: ";
	      for (const auto &n : jd.nodes) {
		      LOG(debug) << "    " << n;
	      }
#endif

	      //store JobData into Storage Backend
458
459
	      //dcdbslurmjob start inserts the endTime as startTime + max job length + 1, i.e. the last digit will be 1 
	      if ((jd.endTime == DCDB::TimeStamp((uint64_t)0)) || ((jd.endTime.getRaw() & 0x1) == 1)) {
Micha Müller's avatar
Micha Müller committed
460
461
462
463
464
465
466
467
		      //starting job data
		      if (myJobDataStore->insertJob(jd) != DCDB::JD_OK) {
			      LOG(error) << "Job data insert failed!";
			      return 1;
		      }
	      } else {
		      //ending job data
		      DCDB::JobData tmp;
468
		      if (myJobDataStore->getJobById(tmp, jd.jobId, jd.domainId) != DCDB::JD_OK) {
Micha Müller's avatar
Micha Müller committed
469
470
471
472
			      LOG(error) << "Could not retrieve job to be updated!";
			      return 1;
		      }

473
		      if (myJobDataStore->updateEndtime(tmp.jobId, tmp.startTime, jd.endTime, jd.domainId) != DCDB::JD_OK) {
Micha Müller's avatar
Micha Müller committed
474
475
476
477
			      LOG(error) << "Could not update end time of job!";
			      return 1;
		      }
	      }
478
      } else {
Micha Müller's avatar
Micha Müller committed
479
	      mqttPayload buf, *payload;
480

Micha Müller's avatar
Micha Müller committed
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
	      len = msg->getPayloadLength();
	      //In the 64 bit message case, the collect agent provides a timestamp
	      if (len == sizeof(uint64_t)) {
		      payload = &buf;
		      payload->value = *((int64_t *)msg->getPayload());
		      payload->timestamp = Messaging::calculateTimestamp();
		      len = sizeof(uint64_t) * 2;
	      }
	      //...otherwise it just retrieves it from the MQTT message payload.
	      else if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
		      payload = (mqttPayload *)msg->getPayload();
	      }
	      //...otherwise this message is malformed -> ignore...
	      else {
		      LOG(error) << "Message malformed";
		      return 1;
	      }
498

Micha Müller's avatar
Micha Müller committed
499
	      /*
500
501
502
503
           * Check if we can decode the message topic
           * into a valid SensorId. If successful, store
           * the record in the database.
           */
Micha Müller's avatar
Micha Müller committed
504
505
	      DCDB::SensorId sid;
	      if (sid.mqttTopicConvert(msg->getTopic())) {
506
507
508
509
510
511
512
513
514
515
516
517
#if 0
              cout << "Topic decode successful:" << endl
                  << "  Raw:            " << hex << setw(16) << setfill('0') << sid.getRaw()[0] << hex << setw(16) << setfill('0') << sid.getRaw()[1] << endl
                  << "  DeviceLocation: " << hex << setw(16) << setfill('0') << sid.getDeviceLocation() << endl
                  << "  device_id:      " << hex << setw(8) << setfill('0') << sid.getDeviceSensorId().device_id << endl
                  << "  sensor_number:  " << hex << setw(4) << setfill('0') << sid.getDeviceSensorId().sensor_number << endl << dec;

              cout << "Payload ("  << len/sizeof(mqttPayload) << " messages):"<< endl;
              for (uint64_t i=0; i<len/sizeof(mqttPayload); i++) {
                cout << "  " << i << ": ts=" << payload[i].timestamp << " val=" << payload[i].value << endl;
              }
              cout << endl;
518
#endif
519
	      std::list<DCDB::SensorDataStoreReading> readings;
Alessio Netti's avatar
Alessio Netti committed
520
521
522
523
524
525
          for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
              DCDB::SensorDataStoreReading r(sid, payload[i].timestamp, payload[i].value);
              readings.push_back(r);
              mySensorCache.storeSensor(sid, payload[i].timestamp, payload[i].value);
          }
          mySensorCache.getSensorMap()[sid].updateBatchSize(uint64_t(len / sizeof(mqttPayload)));
526
	      mySensorDataStore->insertBatch(readings, metadataStore->getTTL(msg->getTopic()));
527
	      readingCtr+= readings.size();
528

529
              //mySensorCache.dump();
530
531
          } else {
              LOG(error) << "Message with empty topic received";
532
          }
533
      }
534
  }
535
#endif
536
  return 0;
537
538
}

539
540
541



542
543
544
545
/*
 * Print usage information
 */
void usage() {
546
547
548
549
550
  /*
             1         2         3         4         5         6         7         8
   012345678901234567890123456789012345678901234567890123456789012345678901234567890
   */
  cout << "Usage:" << endl;
551
  cout << "  collectagent [-d] [-s] [-x] [-a] [-m<host>] [-c<host>] [-u<username>] [-p<password>] [-t<ttl>] [-v<verbosity>] <config>" << endl;
552
553
554
555
  cout << "  collectagent -h" << endl;
  cout << endl;
  
  cout << "Options:" << endl;
556
557
  cout << "  -m<host>      MQTT listen address     [default: " << DEFAULT_LISTENHOST << ":" << DEFAULT_LISTENPORT << "]" << endl;
  cout << "  -c<host>      Cassandra host          [default: " << DEFAULT_CASSANDRAHOST << ":" << DEFAULT_CASSANDRAPORT << "]" << endl;
Michael Ott's avatar
Michael Ott committed
558
559
  cout << "  -u<username>  Cassandra username      [default: none]" << endl;
  cout << "  -p<password>  Cassandra password      [default: none]" << endl;
560
561
  cout << "  -t<ttl>       Cassandra insert TTL    [default: " << DEFAULT_CASSANDRATTL << "]" << endl;
  cout << "  -v<level>     Set verbosity of output [default: " << DEFAULT_LOGLEVEL << "]" << endl
Alessio Netti's avatar
Logging    
Alessio Netti committed
562
       << "                Can be a number between 5 (all) and 0 (fatal)." << endl;
563
  cout << endl;
Michael Ott's avatar
Michael Ott committed
564
  cout << "  -d            Daemonize" << endl;
565
  cout << "  -s            Print message stats" <<endl;
566
  cout << "  -x            Parse and print the config but do not actually start collectagent" << endl;
567
  cout << "  -a			   Enable sensor auto-publish" << endl;
Michael Ott's avatar
Michael Ott committed
568
  cout << "  -h            This help page" << endl;
569
  cout << endl;
570
571
572
}

int main(int argc, char* const argv[]) {
573
    cout << "CollectAgent " << VERSION << " (libdcdb " << DCDB::Version::getVersion() << ")" << endl << endl;
574
575

  try{
576

577
578
      // Checking if path to config is supplied
      if (argc <= 1) {
579
          cout << "Please specify a path to the config-directory or a config-file" << endl << endl;
580
581
582
583
584
          usage();
          exit(EXIT_FAILURE);
      }

      // Defining options
585
      const char* opts = "m:r:c:C:u:p:t:v:dDsaxh";
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601

      // Same mechanism as in DCDBPusher - checking if help string is requested before loading config
      char ret;
      while ((ret = getopt(argc, argv, opts)) != -1) {
          switch (ret)
          {
              case 'h':
                  usage();
                  exit(EXIT_FAILURE);
                  break;
              default:
                  //do nothing (other options are read later on)
                  break;
          }
      }

602
603
      initLogging();
      auto cmdSink = setupCmdLogger();
Alessio Netti's avatar
Logging    
Alessio Netti committed
604

Alessio Netti's avatar
Alessio Netti committed
605
      Configuration config(argv[argc - 1], "collectagent.conf");
606
      config.readConfig();
607

Alessio Netti's avatar
Alessio Netti committed
608
609
610
611
      // References to shorten access to config parameters
      Configuration& settings = config;
      cassandra_t& cassandraSettings = config.cassandraSettings;
      pluginSettings_t& pluginSettings = config.pluginSettings;
Micha Mueller's avatar
Micha Mueller committed
612
      serverSettings_t& restAPISettings = config.restAPISettings;
Alessio Netti's avatar
Alessio Netti committed
613
      analyticsSettings_t& analyticsSettings = config.analyticsSettings;
Alessio Netti's avatar
Alessio Netti committed
614
      
615
616
      optind = 1;
      while ((ret=getopt(argc, argv, opts))!=-1) {
617
          switch(ret) {
618
              case 'a':
619
                  pluginSettings.autoPublish = true;
620
                  break;
621
              case 'm':
Alessio Netti's avatar
Alessio Netti committed
622
623
                  settings.mqttListenHost = parseNetworkHost(optarg);
                  settings.mqttListenPort = parseNetworkPort(optarg);
624
                  if(settings.mqttListenPort=="") settings.mqttListenPort = string(DEFAULT_LISTENPORT);
625
                  break;
626
              case 'c':
Alessio Netti's avatar
Alessio Netti committed
627
628
                  cassandraSettings.host = parseNetworkHost(optarg);
                  cassandraSettings.port = parseNetworkPort(optarg);
629
                  if(cassandraSettings.port=="") cassandraSettings.port = string(DEFAULT_CASSANDRAPORT);
630
                  break;
Michael Ott's avatar
Michael Ott committed
631
              case 'u':
Alessio Netti's avatar
Alessio Netti committed
632
                  cassandraSettings.username = optarg;
633
                  break;
Michael Ott's avatar
Michael Ott committed
634
              case 'p': {
Alessio Netti's avatar
Alessio Netti committed
635
                  cassandraSettings.password = optarg;
636
                  // What does this do? Mask the password?
Michael Ott's avatar
Michael Ott committed
637
638
639
640
641
642
643
                  size_t pwdLen = strlen(optarg);
                  memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
                  if (pwdLen > 3) {
                      memset(optarg+3, 0, pwdLen-3);
                  }
                  break;
              }
644
              case 't':
Alessio Netti's avatar
Alessio Netti committed
645
                  cassandraSettings.ttl = stoul(optarg);
646
                  break;
Alessio Netti's avatar
Logging    
Alessio Netti committed
647
              case 'v':
648
                  settings.logLevelCmd = stoi(optarg);
Alessio Netti's avatar
Logging    
Alessio Netti committed
649
                  break;
650
              case 'd':
651
              case 'D':
652
                  settings.daemonize = 1;
653
                  break;
654
              case 's':
655
                  settings.statistics = 1;
656
                  break;
657
              case 'x':
Alessio Netti's avatar
Alessio Netti committed
658
                  settings.validateConfig = true;
659
                  break;
660
              case 'h':
661
662
663
              default:
                  usage();
                  exit(EXIT_FAILURE);
664
665
666
          }
      }

667
668
      //set up logger to file
      if (settings.logLevelFile >= 0) {
Alessio Netti's avatar
Alessio Netti committed
669
	  auto fileSink = setupFileLogger(pluginSettings.tempdir, std::string("collectagent"));
670
671
672
	  fileSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelFile));
      }
      
Alessio Netti's avatar
Logging    
Alessio Netti committed
673
      //severity level may be overwritten (per option or config-file) --> set it according to globalSettings
674
675
676
      if (settings.logLevelCmd >= 0) {
	  cmdSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelCmd));
      }
Alessio Netti's avatar
Logging    
Alessio Netti committed
677

678
      /*
Alessio Netti's avatar
Alessio Netti committed
679
       * Catch SIGINT and SIGTERM signals to allow for proper server shutdowns.
680
681
       */
      signal(SIGINT, sigHandler);
Alessio Netti's avatar
Alessio Netti committed
682
      signal(SIGTERM, sigHandler);
Alessio Netti's avatar
Alessio Netti committed
683
      signal(SIGUSR1, sigHandler);
684
685
686
687
688
689
690
691
692
693

      /*
       * Catch critical signals to allow for backtraces
       */
      signal(SIGABRT, abrtHandler);
      signal(SIGSEGV, abrtHandler);

      // Daemonizing the collectagent
      if(settings.daemonize)
          dcdbdaemon();
Alessio Netti's avatar
Alessio Netti committed
694
      
695
      // Setting the size of the sensor cache
696
      // Conversion from milliseconds to nanoseconds
Alessio Netti's avatar
Alessio Netti committed
697
      mySensorCache.setMaxHistory(uint64_t(pluginSettings.cacheInterval) * 1000000);
698

699
      //Allocate and initialize connection to Cassandra.
Alessio Netti's avatar
Alessio Netti committed
700
701
      dcdbConn = new DCDB::Connection(cassandraSettings.host, atoi(cassandraSettings.port.c_str()), 
                                      cassandraSettings.username, cassandraSettings.password);
Alessio Netti's avatar
Alessio Netti committed
702
703
      dcdbConn->setNumThreadsIo(cassandraSettings.numThreadsIo);
      dcdbConn->setQueueSizeIo(cassandraSettings.queueSizeIo);
704
      uint32_t params[1] = {cassandraSettings.coreConnPerHost};
Alessio Netti's avatar
Alessio Netti committed
705
      dcdbConn->setBackendParams(params);
Alessio Netti's avatar
Alessio Netti committed
706
      
Axel Auweter's avatar
Axel Auweter committed
707
      if (!dcdbConn->connect()) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
708
          LOG(fatal) << "Cannot connect to Cassandra!";
709
710
711
712
713
714
          exit(EXIT_FAILURE);
      }

      /*
       * Legacy behavior: Initialize the DCDB schema in Cassandra.
       */
Axel Auweter's avatar
Axel Auweter committed
715
      dcdbConn->initSchema();
Alessio Netti's avatar
Alessio Netti committed
716
      
717
718
719
      /*
       * Allocate the SensorDataStore.
       */
720
      mySensorDataStore = new DCDB::SensorDataStore(dcdbConn);
721
      mySensorConfig = new DCDB::SensorConfig(dcdbConn);
Alessio Netti's avatar
Alessio Netti committed
722
      myJobDataStore = new DCDB::JobDataStore(dcdbConn);
723
      myCaliEvtDataStore = new DCDB::CaliEvtDataStore(dcdbConn);
724
725
726

      /*
       * Set TTL for data store inserts if TTL > 0.
727
       */
728
      if (cassandraSettings.ttl > 0) {
Alessio Netti's avatar
Alessio Netti committed
729
        mySensorDataStore->setTTL(cassandraSettings.ttl);
730
731
        myCaliEvtDataStore->setTTL(cassandraSettings.ttl);
      }
Alessio Netti's avatar
Alessio Netti committed
732
      mySensorDataStore->setDebugLog(cassandraSettings.debugLog);
733
      myCaliEvtDataStore->setDebugLog(cassandraSettings.debugLog);
734

735
736
737
      // Fetching public sensor information from the Cassandra datastore
      list<DCDB::PublicSensor> publicSensors;
      metadataStore = new MetadataStore();
738
739
      if(mySensorConfig->getPublicSensorsVerbose(publicSensors)!=SC_OK)
          LOG(error) << "Failed to retrieve public sensors. Metadata Store and Sensor Navigator will be empty.";
740
      SensorMetadata sBuf;
741
      for (const auto &s : publicSensors)
742
          if (!s.is_virtual) {
743
              sBuf = DCDB::PublicSensor::publicSensorToMetadata(s);
744
745
              if(sBuf.isValid())
                  metadataStore->store(*sBuf.getPattern(), sBuf);
746
          }
747
      publicSensors.clear();
748
          
749
750
      analyticsController = new AnalyticsController(mySensorConfig, mySensorDataStore);
      analyticsController->setCache(&mySensorCache);
751
      analyticsController->setMetadataStore(metadataStore);
Alessio Netti's avatar
Alessio Netti committed
752
      queryEngine.setFilter(analyticsSettings.filter);
Alessio Netti's avatar
Alessio Netti committed
753
      queryEngine.setJobFilter(analyticsSettings.jobFilter);
754
      queryEngine.setJobMatch(analyticsSettings.jobMatch);
755
      queryEngine.setJobIDFilter(analyticsSettings.jobIdFilter);
756
      queryEngine.setJobDomainId(analyticsSettings.jobDomainId);
Alessio Netti's avatar
Alessio Netti committed
757
      queryEngine.setSensorHierarchy(analyticsSettings.hierarchy);
758
      queryEngine.setQueryCallback(sensorQueryCallback);
759
      queryEngine.setGroupQueryCallback(sensorGroupQueryCallback);
760
      queryEngine.setMetadataQueryCallback(metadataQueryCallback);
Alessio Netti's avatar
Alessio Netti committed
761
      queryEngine.setJobQueryCallback(jobQueryCallback);
762
      if(!analyticsController->initialize(settings))
763
764
          return EXIT_FAILURE;
      
Alessio Netti's avatar
Alessio Netti committed
765
      LOG_LEVEL vLogLevel = settings.validateConfig ? LOG_LEVEL::info : LOG_LEVEL::debug;
766
767
768
769
      LOG_VAR(vLogLevel) << "-----  Configuration  -----";

      //print global settings in either case
      LOG(info) << "Global Settings:";
Alessio Netti's avatar
Alessio Netti committed
770
      LOG(info) << "    MQTT-listenAddress: " << settings.mqttListenHost << ":" << settings.mqttListenPort;
Alessio Netti's avatar
Alessio Netti committed
771
      LOG(info) << "    CacheInterval:      " << int(pluginSettings.cacheInterval/1000) << " [s]";
772
773
774
775
776
      LOG(info) << "    CleaningInterval:   " << settings.cleaningInterval << " [s]";
      LOG(info) << "    MessageThreads:     " << settings.messageThreads;
      LOG(info) << "    MessageSlots:       " << settings.messageSlots;
      LOG(info) << "    Daemonize:          " << (settings.daemonize ? "Enabled" : "Disabled");
      LOG(info) << "    Statistics:         " << (settings.statistics ? "Enabled" : "Disabled");
Alessio Netti's avatar
Alessio Netti committed
777
      LOG(info) << "    MQTT-prefix:        " << pluginSettings.mqttPrefix;
778
      LOG(info) << "    Auto-publish:       " << (pluginSettings.autoPublish ? "Enabled" : "Disabled");
Alessio Netti's avatar
Alessio Netti committed
779
780
      LOG(info) << "    Write-Dir:          " << pluginSettings.tempdir;
      LOG(info) << (settings.validateConfig ? "    Only validating config files." : "    ValidateConfig:     Disabled");
781

Alessio Netti's avatar
Alessio Netti committed
782
783
      LOG(info) << "Analytics Settings:";
      LOG(info) << "    Hierarchy:          " << (analyticsSettings.hierarchy!="" ? analyticsSettings.hierarchy : "none");
784
      LOG(info) << "    Filter:             " << (analyticsSettings.filter!="" ? analyticsSettings.filter : "none");
785
786
      LOG(info) << "    Job Filter:         " << (analyticsSettings.jobFilter != "" ? analyticsSettings.jobFilter : "none");
      LOG(info) << "    Job Match:          " << (analyticsSettings.jobMatch != "" ? analyticsSettings.jobMatch : "none");
787
      LOG(info) << "    Job ID Filter:      " << (analyticsSettings.jobIdFilter != "" ? analyticsSettings.jobIdFilter : "none");
788
      LOG(info) << "    Job Domain ID:      " << analyticsSettings.jobDomainId;
Alessio Netti's avatar
Alessio Netti committed
789
      
790
      LOG(info) << "Cassandra Driver Settings:";
Alessio Netti's avatar
Alessio Netti committed
791
      LOG(info) << "    Address:            " << cassandraSettings.host << ":" << cassandraSettings.port;
Alessio Netti's avatar
Alessio Netti committed
792
793
794
795
796
      LOG(info) << "    TTL:                " << cassandraSettings.ttl;
      LOG(info) << "    NumThreadsIO:       " << cassandraSettings.numThreadsIo;
      LOG(info) << "    QueueSizeIO:        " << cassandraSettings.queueSizeIo;
      LOG(info) << "    CoreConnPerHost:    " << cassandraSettings.coreConnPerHost;
      LOG(info) << "    DebugLog:           " << (cassandraSettings.debugLog ? "Enabled" : "Disabled");
797
#ifdef SimpleMQTTVerbose
Alessio Netti's avatar
Alessio Netti committed
798
799
      LOG(info) << "    Username:           " << cassandraSettings.username;
	  LOG(info) << "    Password:           " << cassandraSettings.password;
800
801
802
803
#else
      LOG(info) << "    Username and password not printed.";
#endif

804
805
806
807
808
      if (restAPISettings.enabled) {
	  
	  LOG(info) << "RestAPI Settings:";
	  LOG(info) << "    REST Server: " << restAPISettings.host << ":" << restAPISettings.port;
	  LOG(info) << "    Certificate: " << restAPISettings.certificate;
Alessio Netti's avatar
Alessio Netti committed
809
	  LOG(info) << "    Private key file: " << restAPISettings.privateKey;
810
      }
811
812
      LOG_VAR(vLogLevel) << "-----  Analytics Configuration  -----";
      for(auto& p : analyticsController->getManager()->getPlugins()) {
813
          LOG_VAR(vLogLevel) << "Operator Plugin \"" << p.id << "\"";
814
815
816
817
          p.configurator->printConfig(vLogLevel);
      }
      LOG_VAR(vLogLevel) << "-----  End Configuration  -----";

Alessio Netti's avatar
Alessio Netti committed
818
      if (settings.validateConfig)
819
820
821
822
          return EXIT_SUCCESS;
      else
          analyticsController->start();

823
824
825
      /*
       * Start the MQTT Message Server.
       */
Alessio Netti's avatar
Alessio Netti committed
826
      SimpleMQTTServer ms(settings.mqttListenHost, settings.mqttListenPort, settings.messageThreads, settings.messageSlots);
827
      
828
829
830
      ms.setMessageCallback(mqttCallback);
      ms.start();

Alessio Netti's avatar
Logging    
Alessio Netti committed
831
      LOG(info) << "MQTT Server running...";
832
      
833
834
835
      /*
       * Start the HTTP Server for the REST API
       */
836
      if (restAPISettings.enabled) {
837
838
839
840
          httpsServer = new CARestAPI(restAPISettings, &mySensorCache, analyticsController);
          config.readRestAPIUsers(httpsServer);
          httpsServer->start();
          LOG(info) <<  "HTTP Server running...";
841
      }
842

843
844
845
846
847
848
      /*
       * Run (hopefully) forever...
       */
      keepRunning = 1;
      timeval start, end;
      double elapsed;
849
850
851
      msgCtr = 0;
      pmsgCtr = 0;
      readingCtr = 0;
852

Alessio Netti's avatar
Alessio Netti committed
853
854
855
      gettimeofday(&start, NULL);
      uint64_t lastCleanup = start.tv_sec;

856
      LOG(info) << "Collect Agent running...";
857
858
      while(keepRunning) {
          gettimeofday(&start, NULL);
Alessio Netti's avatar
Alessio Netti committed
859
860
861
862
863
864
865
          if(start.tv_sec - lastCleanup > settings.cleaningInterval) {
              uint64_t purged = mySensorCache.clean(settings.cleaningInterval * 1000000000);
              lastCleanup = start.tv_sec;
              if(purged > 0)
                  LOG(info) << "Cache: purged " << purged << " obsolete entries";
          }

866
          sleep(60);
867
868
869
870
          /* not really thread safe but will do the job */
          gettimeofday(&end, NULL);
          elapsed = (end.tv_sec - start.tv_sec) * 1000.0;
          elapsed += (end.tv_usec - start.tv_usec) / 1000.0;
871
          float publish = msgCtr?(pmsgCtr*100.0)/msgCtr:0;
Alessio Netti's avatar
Alessio Netti committed
872
          if (settings.statistics && keepRunning) {
873
              LOG(info) << "Performance: " << (readingCtr/elapsed)*1000.0 << " inserts/s, " << (msgCtr/elapsed)*1000.0 << " messages/s (" << publish << "% PUBLISH)";
874
              LOG(info) << "Analytics Performance: " << (analyticsController->getReadingCtr()/elapsed)*1000.0 << " inserts/s ";
875
          }
876
          msgCtr = 0;
877
          pmsgCtr = 0;
878
	  readingCtr = 0;
879
880
      }

Alessio Netti's avatar
Logging    
Alessio Netti committed
881
      LOG(info) << "Stopping...";
882
      analyticsController->stop();
883
      ms.stop();
Alessio Netti's avatar
Logging    
Alessio Netti committed
884
      LOG(info) << "MQTT Server stopped...";
885
886
887
888
889
      if (restAPISettings.enabled) {
	  httpsServer->stop();
	  delete httpsServer;
	  LOG(info) << "HTTP Server stopped...";
      }
890
      delete mySensorDataStore;
Alessio Netti's avatar
Alessio Netti committed
891
      delete myJobDataStore;
892
      delete mySensorConfig;
893
      delete myCaliEvtDataStore;
Axel Auweter's avatar
Axel Auweter committed
894
895
      dcdbConn->disconnect();
      delete dcdbConn;
896
      delete metadataStore;
897
      delete analyticsController;
Alessio Netti's avatar
Logging    
Alessio Netti committed
898
      LOG(info) << "Collect Agent closed. Bye bye...";
899
  }
900
901
902
903
  catch (const std::runtime_error& e) {
      LOG(fatal) <<  e.what();
      return EXIT_FAILURE;
  }
904
  catch (const exception& e) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
905
      LOG(fatal) << "Exception: " << e.what();
906
      abrt(EXIT_FAILURE, INTERR);
907
908
  }

Alessio Netti's avatar
Alessio Netti committed
909
  return retCode;
910
}
911
912