collectagent.cpp 34.5 KB
Newer Older
1
2
//================================================================================
// Name        : collectagent.cpp
3
// Author      : Axel Auweter
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
// Copyright   : Leibniz Supercomputing Centre
6
// Description : Main code of the CollectAgent
7
8
9
10
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
11
// Copyright (C) 2011-2019 Leibniz Supercomputing Centre
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
27

28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
 * @defgroup ca Collect Agent
 *
 * @brief MQTT message broker in between pusher and storage backend.
 *
 * @details Collect Agent is a intermediary between one or multiple Pusher
 *          instances and one storage backend. It runs a reduced custom MQTT
 *          message server. Collect Agent receives data from Pusher
 *          via MQTT messages and stores them in the storage via libdcdb.
 */

/**
 * @file collectagent.cpp
 *
 * @brief Main function for the DCDB Collect Agent.
 *
 * @ingroup ca
 */

47
#include <cstdlib>
48
#include <signal.h>
49
50
#include <unistd.h>
#include <string>
51

52
#include <boost/date_time/posix_time/posix_time.hpp>
Micha Müller's avatar
Micha Müller committed
53
54
55
#include <boost/foreach.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>
56

57
#include <dcdb/connection.h>
58
#include <dcdb/sensordatastore.h>
Alessio Netti's avatar
Alessio Netti committed
59
#include <dcdb/jobdatastore.h>
60
#include <dcdb/calievtdatastore.h>
61
#include <dcdb/sensorconfig.h>
62
#include <dcdb/version.h>
63
#include <dcdb/sensor.h>
64
#include "version.h"
65

66
#include "CARestAPI.h"
67
#include "configuration.h"
68
#include "simplemqttserver.h"
69
#include "messaging.h"
70
#include "abrt.h"
71
#include "dcdbdaemon.h"
72
#include "sensorcache.h"
73
74
#include "analyticscontroller.h"
#include "../analytics/includes/QueryEngine.h"
75

76
77
78
#define __STDC_FORMAT_MACROS
#include <inttypes.h>

79
80
81
82
83
84
85
/**
 * Uncomment and recompile to activate CollectAgent's special benchmark mode.
 * In this mode, all received messages will be discarded and no data is stored
 * in the storage backend.
 */
//#define BENCHMARK_MODE

86
using namespace std;
87
88

int keepRunning;
89
bool statistics;
90
91
uint64_t msgCtr;
uint64_t pmsgCtr;
92
uint64_t readingCtr;
Alessio Netti's avatar
Alessio Netti committed
93
SensorCache mySensorCache;
94
AnalyticsController* analyticsController;
95
DCDB::Connection* dcdbConn;
96
DCDB::SensorDataStore *mySensorDataStore;
Alessio Netti's avatar
Alessio Netti committed
97
DCDB::JobDataStore *myJobDataStore;
98
DCDB::SensorConfig *mySensorConfig;
99
DCDB::CaliEvtDataStore *myCaliEvtDataStore;
100
MetadataStore *metadataStore;
101
DCDB::SCError err;
102
QueryEngine& queryEngine = QueryEngine::getInstance();
103
logger_t lg;
104

105
bool jobQueryCallback(const string& jobId, const uint64_t startTs, const uint64_t endTs, vector<qeJobData>& buffer, const bool rel, const bool range) {
Alessio Netti's avatar
Alessio Netti committed
106
107
108
109
110
111
112
113
114
115
    std::list<JobData> tempList;
    JobData   tempData;
    qeJobData tempQeData;
    JDError err;
    if(range) {
        // Getting a list of jobs in the given time range
        uint64_t now = getTimestamp();
        uint64_t startTsInt = rel ? now - startTs : startTs;
        uint64_t endTsInt = rel ? now - endTs : endTs;
        DCDB::TimeStamp start(startTsInt), end(endTsInt);
116
        err = myJobDataStore->getJobsInIntervalRunning(tempList, start, end);
117
        if(err != JD_OK) return false;
Alessio Netti's avatar
Alessio Netti committed
118
119
120
    } else {
        // Getting a single job by id
        err = myJobDataStore->getJobById(tempData, jobId);
121
        if(err != JD_OK) return false;
Alessio Netti's avatar
Alessio Netti committed
122
123
124
125
126
127
128
129
130
        tempList.push_back(tempData);
    }
    
    for(auto& jd : tempList) {
        tempQeData.jobId = jd.jobId;
        tempQeData.userId = jd.userId;
        tempQeData.startTime = jd.startTime.getRaw();
        tempQeData.endTime = jd.endTime.getRaw();
        tempQeData.nodes = jd.nodes;
131
        buffer.push_back(tempQeData);
Alessio Netti's avatar
Alessio Netti committed
132
    }
133
    return true;
Alessio Netti's avatar
Alessio Netti committed
134
135
}

136
bool sensorGroupQueryCallback(const std::vector<string>& names, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>& buffer, const bool rel) {
137
138
139
    // Returning NULL if the query engine is being updated
    if(queryEngine.updating.load()) return false;
    ++queryEngine.access;
140
141
142
143
144
145
146
147
148
149
150
151
152
153
    
    std::list<DCDB::SensorId> topics;
    std::string topic;
    sensorCache_t& sensorMap = mySensorCache.getSensorMap();
    size_t successCtr = 0;
    for(const auto& name : names) {
        // Getting the topic of the queried sensor from the Navigator
        // If not found, we try to use the input name as topic
        try {
            topic = queryEngine.getNavigator()->getNodeTopic(name);
        } catch (const std::domain_error &e) { topic = name; }
        DCDB::SensorId sid;
        // Creating a SID to perform the query
        if (sid.mqttTopicConvert(topic)) {
154
155
156
157
158
159
            mySensorCache.wait();
            if (sensorMap.count(sid) > 0 && sensorMap[sid].getView(startTs, endTs, buffer, rel)) {
                // Data was found, can continue to next SID
                successCtr++;
            } else {
                // This happens only if no data was found in the local cache
160
161
                topics.push_back(sid);
            }
162
            mySensorCache.release();
163
        }
164
    }
165
166
167
168
169
170
171
172
173
174
175
176
177
    // If we are here then some sensors were not found in the cache - we need to fetch data from Cassandra
    if(!topics.empty()) {
        try {
            std::list <DCDB::SensorDataStoreReading> results;
            uint64_t now = getTimestamp();
            //Converting relative timestamps to absolute
            uint64_t startTsInt = rel ? now - startTs : startTs;
            uint64_t endTsInt = rel ? now - endTs : endTs;
            DCDB::TimeStamp start(startTsInt), end(endTsInt);
            uint16_t startWs=start.getWeekstamp(), endWs=end.getWeekstamp();
            // If timestamps are equal we perform a fuzzy query
            if(startTsInt == endTsInt) {
                topics.front().setRsvd(startWs);
178
                mySensorDataStore->fuzzyQuery(results, topics, start, 3600000000000, false);
179
180
181
182
183
            }
            // Else, we iterate over the weekstamps (if more than one) and perform range queries
            else {
                for(uint16_t currWs=startWs; currWs<=endWs; currWs++) {
                    topics.front().setRsvd(currWs);
184
                    mySensorDataStore->query(results, topics, start, end, DCDB::AGGREGATE_NONE, false);
185
186
187
188
189
190
191
192
193
194
195
196
                }
            }

            if (!results.empty()) {
                successCtr++;
                reading_t reading;
                for (const auto &r : results) {
                    reading.value = r.value;
                    reading.timestamp = r.timeStamp.getRaw();
                    buffer.push_back(reading);
                }
            }
197
        }
198
        catch (const std::exception &e) {}
199
    }
200
    
201
    --queryEngine.access;
202
203
204
205
206
207
208
209
210
    return successCtr>0;
}

bool sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>& buffer, const bool rel) {
    // Returning NULL if the query engine is being updated
    if(queryEngine.updating.load()) return false;
    std::vector<std::string> nameWrapper;
    nameWrapper.push_back(name);
    return sensorGroupQueryCallback(nameWrapper, startTs, endTs, buffer, rel);
211
212
}

213
214
215
216
217
218
219
220
221
222
223
bool metadataQueryCallback(const string& name, SensorMetadata& buffer) {
    // Returning NULL if the query engine is being updated
    if(queryEngine.updating.load()) return false;
    ++queryEngine.access;
    std::string topic=name;
    // Getting the topic of the queried sensor from the Navigator
    // If not found, we try to use the input name as topic
    try {
        topic = queryEngine.getNavigator()->getNodeTopic(name);
    } catch(const std::domain_error& e) {}
    
224
    bool local = false;
225
226
227
228
    metadataStore->wait();
    if(metadataStore->getMap().count(topic)) {
        buffer = metadataStore->get(topic);
        local = true;
229
    }
230
231
    metadataStore->release();
        
232
    if(!local) {
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
        // If we are here then the sensor was not found in the cache - we need to fetch data from Cassandra
        try {
            DCDB::PublicSensor publicSensor;
            if (mySensorConfig->getPublicSensorByName(publicSensor, topic.c_str()) != SC_OK) {
                --queryEngine.access;
                return false;
            }
            buffer = Configuration::publicSensorToMetadata(publicSensor);
        }
        catch (const std::exception &e) {
            --queryEngine.access;
            return false;
        }
    }
    --queryEngine.access;
    return true;
}

251
/* Normal termination (SIGINT, CTRL+C) */
252
253
void sigHandler(int sig)
{
Alessio Netti's avatar
Alessio Netti committed
254
255
256
257
258
  boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
  if( sig == SIGINT )
      LOG(fatal) << "Received SIGINT";
  else if( sig == SIGTERM )
      LOG(fatal) << "Received SIGTERM";
259
260
261
  keepRunning = 0;
}

262
263
264
265
266
267
/* Crash */
void abrtHandler(int sig)
{
  abrt(EXIT_FAILURE, SIGNAL);
}

268
int mqttCallback(SimpleMQTTMessage *msg)
269
270
271
272
{
  /*
   * Increment the msgCtr/vmsgCtr for statistics.
   */
273
  msgCtr++;
274
275
276
  if (msg->isPublish())
    pmsgCtr++;

277
278
#ifndef BENCHMARK_MODE

279
  uint64_t len;
280
281
282
  /*
   * Decode the message and put into the database.
   */
283
  if (msg->isPublish()) {
284
      const char *topic = msg->getTopic().c_str();
285
      // We check whether the topic includes the /DCDB_MAP/ keyword, indicating that the payload will contain the
286
287
      // sensor's name. In that case, we set the mappingMessage flag to true, and filter the keyword out of the prefix
      // We use strncmp as it is the most efficient way to do it
Alessio Netti's avatar
Alessio Netti committed
288
      if (strncmp(topic, DCDB_MAP, DCDB_MAP_LEN) == 0) {
289
          if ((len = msg->getPayloadLength()) == 0) {
290
              LOG(error) << "Empty sensor publish message received!";
291
292
              return 1;
          }
293

294
          string payload((char *) msg->getPayload(), len);
295
296
297
298
299
300
301
302
303
          //If the topic includes the extended /DCDB_MAP/METADATA/ keyword, we assume a JSON metadata packet is encoded
          if(strncmp(topic, DCDB_MET, DCDB_MET_LEN) == 0) {
              SensorMetadata sm;
              try {
                  sm.parseJSON(payload);
              } catch (const std::exception &e) {
                  LOG(error) << "Invalid metadata packed received!";
                  return 1;
              }
304
              if(sm.isValid()) {
305
                  err = mySensorConfig->publishSensor(sm);
306
307
                  metadataStore->store(*sm.getPattern(), sm);
              }
308
309
          } else {
              err = mySensorConfig->publishSensor(payload.c_str(), topic + DCDB_MAP_LEN);
310
          }
311
312
313
314

          // PublishSensor does most of the error checking for us
          switch (err) {
              case DCDB::SC_INVALIDPATTERN:
Alessio Netti's avatar
Logging    
Alessio Netti committed
315
                  LOG(error) << "Invalid sensor topic : " << msg->getTopic();
316
317
                  return 1;
              case DCDB::SC_INVALIDPUBLICNAME:
318
                  LOG(error) << "Invalid sensor public name.";
319
320
                  return 1;
              case DCDB::SC_INVALIDSESSION:
Alessio Netti's avatar
Logging    
Alessio Netti committed
321
                  LOG(error) << "Cannot reach sensor data store.";
322
323
324
325
                  return 1;
              default:
                  break;
          }
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
      } else if (strncmp(topic, DCDB_CALIEVT, DCDB_CALIEVT_LEN) == 0) {
          /*
           * Special message case. This message contains a Caliper Event data
           * string that is encoded in the MQTT topic. Its payload consists of
           * usual timestamp-value pairs.
           * Data from this messages will be stored in a separate table managed
           * by CaliEvtDataStore class.
           */
          std::string topicStr(msg->getTopic());
          mqttPayload buf, *payload;

          len = msg->getPayloadLength();
          //TODO support case that no timestamp is given?
          //retrieve timestamp and value from the payload
          if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
              payload = (mqttPayload *) msg->getPayload();
          } else {
              //this message is malformed -> ignore
              LOG(error) << "Message malformed";
              return 1;
          }

          /*
           * Decode message topic in actual sensor topic and string data.
           * "/:/" is used as delimiter between topic and data.
           */
          topicStr.erase(0, DCDB_CALIEVT_LEN);
          size_t pos = topicStr.find("/:/");
          if (pos == std::string::npos) {
              // topic is malformed -> ignore
              LOG(error) << "CaliEvt topic malformed";
              return 1;
          }
          const std::string data(topicStr, pos+3);
          topicStr.erase(pos);

          /*
           * We use the same MQTT-topic/SensorId infrastructure as actual sensor
           * data readings to sort related events.
           * Check if we can decode the event topic into a valid SensorId. If
           * successful, store the record in the database.
           */
          DCDB::SensorId sid;
          if (sid.mqttTopicConvert(topicStr)) {
370
              //std::list<DCDB::CaliEvtData> events;
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
              DCDB::CaliEvtData e;
              e.eventId   = sid;
              e.event     = data;
              for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
                  e.timeStamp = payload[i].timestamp;

                  /**
                   * We want an exhaustive list of all events ordered by their
                   * time of occurrence. Payload values should always be
                   * one. Other values currently indicate a malformed message.
                   *
                   * In the future, the value field could be used to aggregate
                   * multiple equal events that occurred in the same plugin
                   * read cycle.
                   */
                  if(payload[i].value != 1) {
                      LOG(error) << "CaliEvt message malformed. Value != 1";
                      return 1;
                  }

391
392
                  myCaliEvtDataStore->insert(e, metadataStore->getTTL(topicStr));
                  //events.push_back(e);
393
              }
394
              //myCaliEvtDataStore->insertBatch(events, metadataStore->getTTL(topicStr));
395
396
397
          } else {
              LOG(error) << "Topic could not be converted to SID";
          }
Micha Müller's avatar
Micha Müller committed
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
      } else if (strncmp(topic, DCDB_JOBDATA, DCDB_JOBDATA_LEN) == 0) {
	      /**
           * This message contains Slurm job data in JSON format. We need to
           * parse the payload and store it within the JobDataStore.
           */
	      if ((len = msg->getPayloadLength()) == 0) {
		      LOG(error) << "Empty job data message received!";
		      return 1;
	      }

	      //parse JSON into JobData object
	      string        payload((char *)msg->getPayload(), len);
	      DCDB::JobData jd;
	      try {
		      boost::property_tree::iptree config;
		      std::istringstream           str(payload);
		      boost::property_tree::read_json(str, config);
		      BOOST_FOREACH (boost::property_tree::iptree::value_type &val, config) {
			      if (boost::iequals(val.first, "jobid")) {
				      jd.jobId = val.second.data();
			      } else if (boost::iequals(val.first, "userid")) {
				      jd.userId = val.second.data();
			      } else if (boost::iequals(val.first, "starttime")) {
				      jd.startTime = DCDB::TimeStamp((uint64_t)stoull(val.second.data()));
			      } else if (boost::iequals(val.first, "endtime")) {
				      jd.endTime = DCDB::TimeStamp((uint64_t)stoull(val.second.data()));
			      } else if (boost::iequals(val.first, "nodes")) {
				      BOOST_FOREACH (boost::property_tree::iptree::value_type &node, val.second) {
					      jd.nodes.push_back(node.second.data());
				      }
			      }
		      }
	      } catch (const std::exception &e) {
		      LOG(error) << "Invalid job data packet received!";
		      return 1;
	      }

#ifdef DEBUG
	      LOG(debug) << "JobId  = " << jd.jobId;
	      LOG(debug) << "UserId = " << jd.userId;
	      LOG(debug) << "Start  = " << jd.startTime.getString();
	      LOG(debug) << "End    = " << jd.endTime.getString();
	      LOG(debug) << "Nodes: ";
	      for (const auto &n : jd.nodes) {
		      LOG(debug) << "    " << n;
	      }
#endif

	      //store JobData into Storage Backend
	      if (jd.endTime == DCDB::TimeStamp((uint64_t)0)) {
		      //starting job data
		      if (myJobDataStore->insertJob(jd) != DCDB::JD_OK) {
			      LOG(error) << "Job data insert failed!";
			      return 1;
		      }
	      } else {
		      //ending job data
		      DCDB::JobData tmp;
		      if (myJobDataStore->getJobById(tmp, jd.jobId) != DCDB::JD_OK) {
			      LOG(error) << "Could not retrieve job to be updated!";
			      return 1;
		      }

		      if (myJobDataStore->updateEndtime(tmp.jobId, tmp.startTime, jd.endTime) != DCDB::JD_OK) {
			      LOG(error) << "Could not update end time of job!";
			      return 1;
		      }
	      }
466
      } else {
Micha Müller's avatar
Micha Müller committed
467
	      mqttPayload buf, *payload;
468

Micha Müller's avatar
Micha Müller committed
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
	      len = msg->getPayloadLength();
	      //In the 64 bit message case, the collect agent provides a timestamp
	      if (len == sizeof(uint64_t)) {
		      payload = &buf;
		      payload->value = *((int64_t *)msg->getPayload());
		      payload->timestamp = Messaging::calculateTimestamp();
		      len = sizeof(uint64_t) * 2;
	      }
	      //...otherwise it just retrieves it from the MQTT message payload.
	      else if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
		      payload = (mqttPayload *)msg->getPayload();
	      }
	      //...otherwise this message is malformed -> ignore...
	      else {
		      LOG(error) << "Message malformed";
		      return 1;
	      }
486

Micha Müller's avatar
Micha Müller committed
487
	      /*
488
489
490
491
           * Check if we can decode the message topic
           * into a valid SensorId. If successful, store
           * the record in the database.
           */
Micha Müller's avatar
Micha Müller committed
492
493
	      DCDB::SensorId sid;
	      if (sid.mqttTopicConvert(msg->getTopic())) {
494
495
496
497
498
499
500
501
502
503
504
505
#if 0
              cout << "Topic decode successful:" << endl
                  << "  Raw:            " << hex << setw(16) << setfill('0') << sid.getRaw()[0] << hex << setw(16) << setfill('0') << sid.getRaw()[1] << endl
                  << "  DeviceLocation: " << hex << setw(16) << setfill('0') << sid.getDeviceLocation() << endl
                  << "  device_id:      " << hex << setw(8) << setfill('0') << sid.getDeviceSensorId().device_id << endl
                  << "  sensor_number:  " << hex << setw(4) << setfill('0') << sid.getDeviceSensorId().sensor_number << endl << dec;

              cout << "Payload ("  << len/sizeof(mqttPayload) << " messages):"<< endl;
              for (uint64_t i=0; i<len/sizeof(mqttPayload); i++) {
                cout << "  " << i << ": ts=" << payload[i].timestamp << " val=" << payload[i].value << endl;
              }
              cout << endl;
506
#endif
507
	      std::list<DCDB::SensorDataStoreReading> readings;
Alessio Netti's avatar
Alessio Netti committed
508
509
510
511
512
513
          for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
              DCDB::SensorDataStoreReading r(sid, payload[i].timestamp, payload[i].value);
              readings.push_back(r);
              mySensorCache.storeSensor(sid, payload[i].timestamp, payload[i].value);
          }
          mySensorCache.getSensorMap()[sid].updateBatchSize(uint64_t(len / sizeof(mqttPayload)));
514
	      mySensorDataStore->insertBatch(readings, metadataStore->getTTL(msg->getTopic()));
515
	      readingCtr+= readings.size();
516

517
              //mySensorCache.dump();
518
519
          } else {
              LOG(error) << "Message with empty topic received";
520
          }
521
      }
522
  }
523
#endif
524
  return 0;
525
526
}

527
528
529



530
531
532
533
/*
 * Print usage information
 */
void usage() {
534
535
536
537
538
  /*
             1         2         3         4         5         6         7         8
   012345678901234567890123456789012345678901234567890123456789012345678901234567890
   */
  cout << "Usage:" << endl;
539
  cout << "  collectagent [-d] [-s] [-x] [-a] [-m<host>] [-c<host>] [-u<username>] [-p<password>] [-t<ttl>] [-v<verbosity>] <config>" << endl;
540
541
542
543
  cout << "  collectagent -h" << endl;
  cout << endl;
  
  cout << "Options:" << endl;
544
545
  cout << "  -m<host>      MQTT listen address     [default: " << DEFAULT_LISTENHOST << ":" << DEFAULT_LISTENPORT << "]" << endl;
  cout << "  -c<host>      Cassandra host          [default: " << DEFAULT_CASSANDRAHOST << ":" << DEFAULT_CASSANDRAPORT << "]" << endl;
Michael Ott's avatar
Michael Ott committed
546
547
  cout << "  -u<username>  Cassandra username      [default: none]" << endl;
  cout << "  -p<password>  Cassandra password      [default: none]" << endl;
548
549
  cout << "  -t<ttl>       Cassandra insert TTL    [default: " << DEFAULT_CASSANDRATTL << "]" << endl;
  cout << "  -v<level>     Set verbosity of output [default: " << DEFAULT_LOGLEVEL << "]" << endl
Alessio Netti's avatar
Logging    
Alessio Netti committed
550
       << "                Can be a number between 5 (all) and 0 (fatal)." << endl;
551
  cout << endl;
Michael Ott's avatar
Michael Ott committed
552
  cout << "  -d            Daemonize" << endl;
553
  cout << "  -s            Print message stats" <<endl;
554
  cout << "  -x            Parse and print the config but do not actually start collectagent" << endl;
555
  cout << "  -a			   Enable sensor auto-publish" << endl;
Michael Ott's avatar
Michael Ott committed
556
  cout << "  -h            This help page" << endl;
557
  cout << endl;
558
559
560
}

int main(int argc, char* const argv[]) {
561
    cout << "CollectAgent " << VERSION << " (libdcdb " << DCDB::Version::getVersion() << ")" << endl << endl;
562
563

  try{
564

565
566
      // Checking if path to config is supplied
      if (argc <= 1) {
567
          cout << "Please specify a path to the config-directory or a config-file" << endl << endl;
568
569
570
571
572
          usage();
          exit(EXIT_FAILURE);
      }

      // Defining options
573
      const char* opts = "m:r:c:C:u:p:t:v:dDsaxh";
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589

      // Same mechanism as in DCDBPusher - checking if help string is requested before loading config
      char ret;
      while ((ret = getopt(argc, argv, opts)) != -1) {
          switch (ret)
          {
              case 'h':
                  usage();
                  exit(EXIT_FAILURE);
                  break;
              default:
                  //do nothing (other options are read later on)
                  break;
          }
      }

590
591
      initLogging();
      auto cmdSink = setupCmdLogger();
Alessio Netti's avatar
Logging    
Alessio Netti committed
592

Alessio Netti's avatar
Alessio Netti committed
593
      Configuration config(argv[argc - 1], "collectagent.conf");
594
      config.readConfig();
595

Alessio Netti's avatar
Alessio Netti committed
596
597
598
599
      // References to shorten access to config parameters
      Configuration& settings = config;
      cassandra_t& cassandraSettings = config.cassandraSettings;
      pluginSettings_t& pluginSettings = config.pluginSettings;
Micha Mueller's avatar
Micha Mueller committed
600
      serverSettings_t& restAPISettings = config.restAPISettings;
Alessio Netti's avatar
Alessio Netti committed
601
      analyticsSettings_t& analyticsSettings = config.analyticsSettings;
Alessio Netti's avatar
Alessio Netti committed
602
      
603
604
      optind = 1;
      while ((ret=getopt(argc, argv, opts))!=-1) {
605
          switch(ret) {
606
              case 'a':
607
                  pluginSettings.autoPublish = true;
608
                  break;
609
              case 'm':
Alessio Netti's avatar
Alessio Netti committed
610
611
                  settings.mqttListenHost = parseNetworkHost(optarg);
                  settings.mqttListenPort = parseNetworkPort(optarg);
612
                  if(settings.mqttListenPort=="") settings.mqttListenPort = string(DEFAULT_LISTENPORT);
613
                  break;
614
              case 'c':
Alessio Netti's avatar
Alessio Netti committed
615
616
                  cassandraSettings.host = parseNetworkHost(optarg);
                  cassandraSettings.port = parseNetworkPort(optarg);
617
                  if(cassandraSettings.port=="") cassandraSettings.port = string(DEFAULT_CASSANDRAPORT);
618
                  break;
Michael Ott's avatar
Michael Ott committed
619
              case 'u':
Alessio Netti's avatar
Alessio Netti committed
620
                  cassandraSettings.username = optarg;
621
                  break;
Michael Ott's avatar
Michael Ott committed
622
              case 'p': {
Alessio Netti's avatar
Alessio Netti committed
623
                  cassandraSettings.password = optarg;
624
                  // What does this do? Mask the password?
Michael Ott's avatar
Michael Ott committed
625
626
627
628
629
630
631
                  size_t pwdLen = strlen(optarg);
                  memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
                  if (pwdLen > 3) {
                      memset(optarg+3, 0, pwdLen-3);
                  }
                  break;
              }
632
              case 't':
Alessio Netti's avatar
Alessio Netti committed
633
                  cassandraSettings.ttl = stoul(optarg);
634
                  break;
Alessio Netti's avatar
Logging    
Alessio Netti committed
635
              case 'v':
636
                  settings.logLevelCmd = stoi(optarg);
Alessio Netti's avatar
Logging    
Alessio Netti committed
637
                  break;
638
              case 'd':
639
              case 'D':
640
                  settings.daemonize = 1;
641
                  break;
642
              case 's':
643
                  settings.statistics = 1;
644
                  break;
645
              case 'x':
Alessio Netti's avatar
Alessio Netti committed
646
                  settings.validateConfig = true;
647
                  break;
648
              case 'h':
649
650
651
              default:
                  usage();
                  exit(EXIT_FAILURE);
652
653
654
          }
      }

655
656
      //set up logger to file
      if (settings.logLevelFile >= 0) {
Alessio Netti's avatar
Alessio Netti committed
657
	  auto fileSink = setupFileLogger(pluginSettings.tempdir, std::string("collectagent"));
658
659
660
	  fileSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelFile));
      }
      
Alessio Netti's avatar
Logging    
Alessio Netti committed
661
      //severity level may be overwritten (per option or config-file) --> set it according to globalSettings
662
663
664
      if (settings.logLevelCmd >= 0) {
	  cmdSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelCmd));
      }
Alessio Netti's avatar
Logging    
Alessio Netti committed
665

666
      /*
Alessio Netti's avatar
Alessio Netti committed
667
       * Catch SIGINT and SIGTERM signals to allow for proper server shutdowns.
668
669
       */
      signal(SIGINT, sigHandler);
Alessio Netti's avatar
Alessio Netti committed
670
      signal(SIGTERM, sigHandler);
671
672
673
674
675
676
677
678
679
680

      /*
       * Catch critical signals to allow for backtraces
       */
      signal(SIGABRT, abrtHandler);
      signal(SIGSEGV, abrtHandler);

      // Daemonizing the collectagent
      if(settings.daemonize)
          dcdbdaemon();
Alessio Netti's avatar
Alessio Netti committed
681
      
682
      // Setting the size of the sensor cache
683
      // Conversion from milliseconds to nanoseconds
Alessio Netti's avatar
Alessio Netti committed
684
      mySensorCache.setMaxHistory(uint64_t(pluginSettings.cacheInterval) * 1000000);
685

686
      //Allocate and initialize connection to Cassandra.
Alessio Netti's avatar
Alessio Netti committed
687
688
      dcdbConn = new DCDB::Connection(cassandraSettings.host, atoi(cassandraSettings.port.c_str()), 
                                      cassandraSettings.username, cassandraSettings.password);
Alessio Netti's avatar
Alessio Netti committed
689
690
      dcdbConn->setNumThreadsIo(cassandraSettings.numThreadsIo);
      dcdbConn->setQueueSizeIo(cassandraSettings.queueSizeIo);
691
      uint32_t params[1] = {cassandraSettings.coreConnPerHost};
Alessio Netti's avatar
Alessio Netti committed
692
      dcdbConn->setBackendParams(params);
Alessio Netti's avatar
Alessio Netti committed
693
      
Axel Auweter's avatar
Axel Auweter committed
694
      if (!dcdbConn->connect()) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
695
          LOG(fatal) << "Cannot connect to Cassandra!";
696
697
698
699
700
701
          exit(EXIT_FAILURE);
      }

      /*
       * Legacy behavior: Initialize the DCDB schema in Cassandra.
       */
Axel Auweter's avatar
Axel Auweter committed
702
      dcdbConn->initSchema();
Alessio Netti's avatar
Alessio Netti committed
703
      
704
705
706
      /*
       * Allocate the SensorDataStore.
       */
707
      mySensorDataStore = new DCDB::SensorDataStore(dcdbConn);
708
      mySensorConfig = new DCDB::SensorConfig(dcdbConn);
Alessio Netti's avatar
Alessio Netti committed
709
      myJobDataStore = new DCDB::JobDataStore(dcdbConn);
710
      myCaliEvtDataStore = new DCDB::CaliEvtDataStore(dcdbConn);
711
712
713

      /*
       * Set TTL for data store inserts if TTL > 0.
714
       */
715
      if (cassandraSettings.ttl > 0) {
Alessio Netti's avatar
Alessio Netti committed
716
        mySensorDataStore->setTTL(cassandraSettings.ttl);
717
718
        myCaliEvtDataStore->setTTL(cassandraSettings.ttl);
      }
Alessio Netti's avatar
Alessio Netti committed
719
      mySensorDataStore->setDebugLog(cassandraSettings.debugLog);
720
      myCaliEvtDataStore->setDebugLog(cassandraSettings.debugLog);
721

722
723
724
      // Fetching public sensor information from the Cassandra datastore
      list<DCDB::PublicSensor> publicSensors;
      metadataStore = new MetadataStore();
725
726
      if(mySensorConfig->getPublicSensorsVerbose(publicSensors)!=SC_OK)
          LOG(error) << "Failed to retrieve public sensors. Metadata Store and Sensor Navigator will be empty.";
727
      SensorMetadata sBuf;
728
      for (const auto &s : publicSensors)
729
730
          if (!s.is_virtual) {
              sBuf = Configuration::publicSensorToMetadata(s);
731
732
              if(sBuf.isValid())
                  metadataStore->store(*sBuf.getPattern(), sBuf);
733
          }
734
          
735
736
      analyticsController = new AnalyticsController(mySensorConfig, mySensorDataStore);
      analyticsController->setCache(&mySensorCache);
737
      analyticsController->setMetadataStore(metadataStore);
Alessio Netti's avatar
Alessio Netti committed
738
      queryEngine.setFilter(analyticsSettings.filter);
Alessio Netti's avatar
Alessio Netti committed
739
      queryEngine.setJobFilter(analyticsSettings.jobFilter);
740
      queryEngine.setJobMatch(analyticsSettings.jobMatch);
Alessio Netti's avatar
Alessio Netti committed
741
      queryEngine.setSensorHierarchy(analyticsSettings.hierarchy);
742
      queryEngine.setQueryCallback(sensorQueryCallback);
743
      queryEngine.setGroupQueryCallback(sensorGroupQueryCallback);
744
      queryEngine.setMetadataQueryCallback(metadataQueryCallback);
Alessio Netti's avatar
Alessio Netti committed
745
      queryEngine.setJobQueryCallback(jobQueryCallback);
746
      if(!analyticsController->initialize(settings))
747
748
          return EXIT_FAILURE;
      
Alessio Netti's avatar
Alessio Netti committed
749
      LOG_LEVEL vLogLevel = settings.validateConfig ? LOG_LEVEL::info : LOG_LEVEL::debug;
750
751
752
753
      LOG_VAR(vLogLevel) << "-----  Configuration  -----";

      //print global settings in either case
      LOG(info) << "Global Settings:";
Alessio Netti's avatar
Alessio Netti committed
754
      LOG(info) << "    MQTT-listenAddress: " << settings.mqttListenHost << ":" << settings.mqttListenPort;
Alessio Netti's avatar
Alessio Netti committed
755
      LOG(info) << "    CacheInterval:      " << int(pluginSettings.cacheInterval/1000) << " [s]";
756
757
758
759
760
      LOG(info) << "    CleaningInterval:   " << settings.cleaningInterval << " [s]";
      LOG(info) << "    MessageThreads:     " << settings.messageThreads;
      LOG(info) << "    MessageSlots:       " << settings.messageSlots;
      LOG(info) << "    Daemonize:          " << (settings.daemonize ? "Enabled" : "Disabled");
      LOG(info) << "    Statistics:         " << (settings.statistics ? "Enabled" : "Disabled");
Alessio Netti's avatar
Alessio Netti committed
761
      LOG(info) << "    MQTT-prefix:        " << pluginSettings.mqttPrefix;
762
      LOG(info) << "    Auto-publish:       " << (pluginSettings.autoPublish ? "Enabled" : "Disabled");
Alessio Netti's avatar
Alessio Netti committed
763
764
      LOG(info) << "    Write-Dir:          " << pluginSettings.tempdir;
      LOG(info) << (settings.validateConfig ? "    Only validating config files." : "    ValidateConfig:     Disabled");
765

Alessio Netti's avatar
Alessio Netti committed
766
767
      LOG(info) << "Analytics Settings:";
      LOG(info) << "    Hierarchy:          " << (analyticsSettings.hierarchy!="" ? analyticsSettings.hierarchy : "none");
768
      LOG(info) << "    Filter:             " << (analyticsSettings.filter!="" ? analyticsSettings.filter : "none");
769
770
      LOG(info) << "    Job Filter:         " << (analyticsSettings.jobFilter != "" ? analyticsSettings.jobFilter : "none");
      LOG(info) << "    Job Match:          " << (analyticsSettings.jobMatch != "" ? analyticsSettings.jobMatch : "none");
Alessio Netti's avatar
Alessio Netti committed
771
      
772
      LOG(info) << "Cassandra Driver Settings:";
Alessio Netti's avatar
Alessio Netti committed
773
      LOG(info) << "    Address:            " << cassandraSettings.host << ":" << cassandraSettings.port;
Alessio Netti's avatar
Alessio Netti committed
774
775
776
777
778
      LOG(info) << "    TTL:                " << cassandraSettings.ttl;
      LOG(info) << "    NumThreadsIO:       " << cassandraSettings.numThreadsIo;
      LOG(info) << "    QueueSizeIO:        " << cassandraSettings.queueSizeIo;
      LOG(info) << "    CoreConnPerHost:    " << cassandraSettings.coreConnPerHost;
      LOG(info) << "    DebugLog:           " << (cassandraSettings.debugLog ? "Enabled" : "Disabled");
779
#ifdef SimpleMQTTVerbose
Alessio Netti's avatar
Alessio Netti committed
780
781
      LOG(info) << "    Username:           " << cassandraSettings.username;
	  LOG(info) << "    Password:           " << cassandraSettings.password;
782
783
784
785
#else
      LOG(info) << "    Username and password not printed.";
#endif

786
787
788
789
790
      if (restAPISettings.enabled) {
	  
	  LOG(info) << "RestAPI Settings:";
	  LOG(info) << "    REST Server: " << restAPISettings.host << ":" << restAPISettings.port;
	  LOG(info) << "    Certificate: " << restAPISettings.certificate;
Alessio Netti's avatar
Alessio Netti committed
791
	  LOG(info) << "    Private key file: " << restAPISettings.privateKey;
792
      }
793
794
      LOG_VAR(vLogLevel) << "-----  Analytics Configuration  -----";
      for(auto& p : analyticsController->getManager()->getPlugins()) {
795
          LOG_VAR(vLogLevel) << "Operator Plugin \"" << p.id << "\"";
796
797
798
799
          p.configurator->printConfig(vLogLevel);
      }
      LOG_VAR(vLogLevel) << "-----  End Configuration  -----";

Alessio Netti's avatar
Alessio Netti committed
800
      if (settings.validateConfig)
801
802
803
804
          return EXIT_SUCCESS;
      else
          analyticsController->start();

805
806
807
      /*
       * Start the MQTT Message Server.
       */
Alessio Netti's avatar
Alessio Netti committed
808
      SimpleMQTTServer ms(settings.mqttListenHost, settings.mqttListenPort, settings.messageThreads, settings.messageSlots);
809
      
810
811
812
      ms.setMessageCallback(mqttCallback);
      ms.start();

Alessio Netti's avatar
Logging    
Alessio Netti committed
813
      LOG(info) << "MQTT Server running...";
814
      
815
816
817
      /*
       * Start the HTTP Server for the REST API
       */
818
      CARestAPI* httpsServer = nullptr;
819
      if (restAPISettings.enabled) {
820
821
822
823
          httpsServer = new CARestAPI(restAPISettings, &mySensorCache, analyticsController);
          config.readRestAPIUsers(httpsServer);
          httpsServer->start();
          LOG(info) <<  "HTTP Server running...";
824
      }
825

826
827
828
829
830
831
      /*
       * Run (hopefully) forever...
       */
      keepRunning = 1;
      timeval start, end;
      double elapsed;
832
833
834
      msgCtr = 0;
      pmsgCtr = 0;
      readingCtr = 0;
835

Alessio Netti's avatar
Alessio Netti committed
836
837
838
      gettimeofday(&start, NULL);
      uint64_t lastCleanup = start.tv_sec;

839
      LOG(info) << "Collect Agent running...";
840
841
      while(keepRunning) {
          gettimeofday(&start, NULL);
Alessio Netti's avatar
Alessio Netti committed
842
843
844
845
846
847
848
          if(start.tv_sec - lastCleanup > settings.cleaningInterval) {
              uint64_t purged = mySensorCache.clean(settings.cleaningInterval * 1000000000);
              lastCleanup = start.tv_sec;
              if(purged > 0)
                  LOG(info) << "Cache: purged " << purged << " obsolete entries";
          }

849
          sleep(60);
850
851
852
853
          /* not really thread safe but will do the job */
          gettimeofday(&end, NULL);
          elapsed = (end.tv_sec - start.tv_sec) * 1000.0;
          elapsed += (end.tv_usec - start.tv_usec) / 1000.0;
854
          float publish = msgCtr?(pmsgCtr*100.0)/msgCtr:0;
Alessio Netti's avatar
Alessio Netti committed
855
          if (settings.statistics && keepRunning) {
856
              LOG(info) << "Performance: " << (readingCtr/elapsed)*1000.0 << " inserts/s, " << (msgCtr/elapsed)*1000.0 << " messages/s (" << publish << "% PUBLISH)";
857
              LOG(info) << "Analytics Performance: " << (analyticsController->getReadingCtr()/elapsed)*1000.0 << " inserts/s ";
858
          }
859
          msgCtr = 0;
860
          pmsgCtr = 0;
861
	  readingCtr = 0;
862
863
      }

Alessio Netti's avatar
Logging    
Alessio Netti committed
864
      LOG(info) << "Stopping...";
865
      analyticsController->stop();
866
      ms.stop();
Alessio Netti's avatar
Logging    
Alessio Netti committed
867
      LOG(info) << "MQTT Server stopped...";
868
869
870
871
872
      if (restAPISettings.enabled) {
	  httpsServer->stop();
	  delete httpsServer;
	  LOG(info) << "HTTP Server stopped...";
      }
873
      delete mySensorDataStore;
Alessio Netti's avatar
Alessio Netti committed
874
      delete myJobDataStore;
875
      delete mySensorConfig;
876
      delete myCaliEvtDataStore;
Axel Auweter's avatar
Axel Auweter committed
877
878
      dcdbConn->disconnect();
      delete dcdbConn;
879
      delete metadataStore;
Alessio Netti's avatar
Logging    
Alessio Netti committed
880
      LOG(info) << "Collect Agent closed. Bye bye...";
881
  }
882
883
884
885
  catch (const std::runtime_error& e) {
      LOG(fatal) <<  e.what();
      return EXIT_FAILURE;
  }
886
  catch (const exception& e) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
887
      LOG(fatal) << "Exception: " << e.what();
888
      abrt(EXIT_FAILURE, INTERR);
889
890
  }

891
  return EXIT_SUCCESS;
892
}
893
894