In January 2021 we will introduce a 10 GB quota for project repositories. Higher limits for individual projects will be available on request. Please see https://doku.lrz.de/display/PUBLIC/GitLab for more information.

collectagent.cpp 33 KB
Newer Older
1 2
//================================================================================
// Name        : collectagent.cpp
3
// Author      : Axel Auweter
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
// Copyright   : Leibniz Supercomputing Centre
6
// Description : Main code of the CollectAgent
7 8 9 10
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
11
// Copyright (C) 2011-2019 Leibniz Supercomputing Centre
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
27

28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
/**
 * @defgroup ca Collect Agent
 *
 * @brief MQTT message broker in between pusher and storage backend.
 *
 * @details Collect Agent is a intermediary between one or multiple Pusher
 *          instances and one storage backend. It runs a reduced custom MQTT
 *          message server. Collect Agent receives data from Pusher
 *          via MQTT messages and stores them in the storage via libdcdb.
 */

/**
 * @file collectagent.cpp
 *
 * @brief Main function for the DCDB Collect Agent.
 *
 * @ingroup ca
 */

47
#include <cstdlib>
48
#include <signal.h>
49 50
#include <unistd.h>
#include <string>
51

52
#include <boost/date_time/posix_time/posix_time.hpp>
Micha Müller's avatar
Micha Müller committed
53 54 55
#include <boost/foreach.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>
56

57
#include <dcdb/connection.h>
58
#include <dcdb/sensordatastore.h>
Alessio Netti's avatar
Alessio Netti committed
59
#include <dcdb/jobdatastore.h>
60
#include <dcdb/calievtdatastore.h>
61
#include <dcdb/sensorconfig.h>
62
#include <dcdb/version.h>
63
#include <dcdb/sensor.h>
64
#include "version.h"
65

66
#include "CARestAPI.h"
67
#include "configuration.h"
68
#include "simplemqttserver.h"
69
#include "messaging.h"
70
#include "abrt.h"
71
#include "dcdbdaemon.h"
72
#include "sensorcache.h"
73 74
#include "analyticscontroller.h"
#include "../analytics/includes/QueryEngine.h"
75

76 77 78
#define __STDC_FORMAT_MACROS
#include <inttypes.h>

79 80 81 82 83 84 85
/**
 * Uncomment and recompile to activate CollectAgent's special benchmark mode.
 * In this mode, all received messages will be discarded and no data is stored
 * in the storage backend.
 */
//#define BENCHMARK_MODE

86
using namespace std;
87 88

int keepRunning;
89
bool statistics;
90 91
uint64_t msgCtr;
uint64_t pmsgCtr;
92
uint64_t readingCtr;
Alessio Netti's avatar
Alessio Netti committed
93
SensorCache mySensorCache;
94
AnalyticsController* analyticsController;
95
DCDB::Connection* dcdbConn;
96
DCDB::SensorDataStore *mySensorDataStore;
Alessio Netti's avatar
Alessio Netti committed
97
DCDB::JobDataStore *myJobDataStore;
98
DCDB::SensorConfig *mySensorConfig;
99
DCDB::CaliEvtDataStore *myCaliEvtDataStore;
100
MetadataStore *metadataStore;
101
DCDB::SCError err;
102
QueryEngine& queryEngine = QueryEngine::getInstance();
103
logger_t lg;
104

105
bool jobQueryCallback(const string& jobId, const uint64_t startTs, const uint64_t endTs, vector<qeJobData>& buffer, const bool rel, const bool range) {
Alessio Netti's avatar
Alessio Netti committed
106 107 108 109 110 111 112 113 114 115
    std::list<JobData> tempList;
    JobData   tempData;
    qeJobData tempQeData;
    JDError err;
    if(range) {
        // Getting a list of jobs in the given time range
        uint64_t now = getTimestamp();
        uint64_t startTsInt = rel ? now - startTs : startTs;
        uint64_t endTsInt = rel ? now - endTs : endTs;
        DCDB::TimeStamp start(startTsInt), end(endTsInt);
116
        err = myJobDataStore->getJobsInIntervalRunning(tempList, start, end);
117
        if(err != JD_OK) return false;
Alessio Netti's avatar
Alessio Netti committed
118 119 120
    } else {
        // Getting a single job by id
        err = myJobDataStore->getJobById(tempData, jobId);
121
        if(err != JD_OK) return false;
Alessio Netti's avatar
Alessio Netti committed
122 123 124 125 126 127 128 129 130
        tempList.push_back(tempData);
    }
    
    for(auto& jd : tempList) {
        tempQeData.jobId = jd.jobId;
        tempQeData.userId = jd.userId;
        tempQeData.startTime = jd.startTime.getRaw();
        tempQeData.endTime = jd.endTime.getRaw();
        tempQeData.nodes = jd.nodes;
131
        buffer.push_back(tempQeData);
Alessio Netti's avatar
Alessio Netti committed
132
    }
133
    return true;
Alessio Netti's avatar
Alessio Netti committed
134 135
}

136
bool sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>& buffer, const bool rel) {
137 138 139
    // Returning NULL if the query engine is being updated
    if(queryEngine.updating.load()) return false;
    ++queryEngine.access;
140
    std::string topic=name;
Alessio Netti's avatar
Alessio Netti committed
141
    // Getting the topic of the queried sensor from the Navigator
142
    // If not found, we try to use the input name as topic
143 144
    try {
        topic = queryEngine.getNavigator()->getNodeTopic(name);
145
    } catch(const std::domain_error& e) {}
146
    DCDB::SensorId sid;
Alessio Netti's avatar
Alessio Netti committed
147
    // Creating a SID to perform the query
148 149
    if(!sid.mqttTopicConvert(topic)) {
        --queryEngine.access;
150
        return false;
151
    }
152 153
    if(mySensorCache.getSensorMap().count(sid) > 0) {
        CacheEntry &entry = mySensorCache.getSensorMap()[sid];
154 155
        if (entry.getView(startTs, endTs, buffer, rel)) {
            --queryEngine.access;
156
            return true;
157
        }
158 159 160 161 162 163 164 165 166 167 168 169 170
    }
    // If we are here then the sensor was not found in the cache - we need to fetch data from Cassandra
    try {
        DCDB::PublicSensor publicSensor;
        publicSensor.name = name;
        publicSensor.pattern = topic;
        std::list <DCDB::SensorDataStoreReading> results;
        DCDB::Sensor sensor(dcdbConn, publicSensor);
        uint64_t now = getTimestamp();
        //Converting relative timestamps to absolute
        uint64_t startTsInt = rel ? now - startTs : startTs;
        uint64_t endTsInt = rel ? now - endTs : endTs;
        DCDB::TimeStamp start(startTsInt), end(endTsInt);
Alessio Netti's avatar
Alessio Netti committed
171
        sensor.query(results, start, end, DCDB::AGGREGATE_NONE, 3600000000000);
172 173
        if(results.empty()) {
            --queryEngine.access;
174
            return false;
175
        }
176 177 178 179
        reading_t reading;
        for (const auto &r : results) {
            reading.value = r.value;
            reading.timestamp = r.timeStamp.getRaw();
180
            buffer.push_back(reading);
181 182 183
        }
    }
    catch(const std::exception& e) {
184
        --queryEngine.access;
185
        return false;
186
    }
187
    --queryEngine.access;
188
    return true;
189 190
}

191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222
bool metadataQueryCallback(const string& name, SensorMetadata& buffer) {
    // Returning NULL if the query engine is being updated
    if(queryEngine.updating.load()) return false;
    ++queryEngine.access;
    std::string topic=name;
    // Getting the topic of the queried sensor from the Navigator
    // If not found, we try to use the input name as topic
    try {
        topic = queryEngine.getNavigator()->getNodeTopic(name);
    } catch(const std::domain_error& e) {}
    
    if(metadataStore->getMap().count(topic)) {
        buffer = metadataStore->get(topic);
    } else {
        // If we are here then the sensor was not found in the cache - we need to fetch data from Cassandra
        try {
            DCDB::PublicSensor publicSensor;
            if (mySensorConfig->getPublicSensorByName(publicSensor, topic.c_str()) != SC_OK) {
                --queryEngine.access;
                return false;
            }
            buffer = Configuration::publicSensorToMetadata(publicSensor);
        }
        catch (const std::exception &e) {
            --queryEngine.access;
            return false;
        }
    }
    --queryEngine.access;
    return true;
}

223
/* Normal termination (SIGINT, CTRL+C) */
224 225
void sigHandler(int sig)
{
Alessio Netti's avatar
Alessio Netti committed
226 227 228 229 230
  boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
  if( sig == SIGINT )
      LOG(fatal) << "Received SIGINT";
  else if( sig == SIGTERM )
      LOG(fatal) << "Received SIGTERM";
231 232 233
  keepRunning = 0;
}

234 235 236 237 238 239
/* Crash */
void abrtHandler(int sig)
{
  abrt(EXIT_FAILURE, SIGNAL);
}

240
int mqttCallback(SimpleMQTTMessage *msg)
241 242 243 244
{
  /*
   * Increment the msgCtr/vmsgCtr for statistics.
   */
245
  msgCtr++;
246 247 248
  if (msg->isPublish())
    pmsgCtr++;

249 250
#ifndef BENCHMARK_MODE

251
  uint64_t len;
252 253 254
  /*
   * Decode the message and put into the database.
   */
255
  if (msg->isPublish()) {
256
      const char *topic = msg->getTopic().c_str();
257
      // We check whether the topic includes the /DCDB_MAP/ keyword, indicating that the payload will contain the
258 259
      // sensor's name. In that case, we set the mappingMessage flag to true, and filter the keyword out of the prefix
      // We use strncmp as it is the most efficient way to do it
Alessio Netti's avatar
Alessio Netti committed
260
      if (strncmp(topic, DCDB_MAP, DCDB_MAP_LEN) == 0) {
261
          if ((len = msg->getPayloadLength()) == 0) {
262
              LOG(error) << "Empty sensor publish message received!";
263 264
              return 1;
          }
265

266
          string payload((char *) msg->getPayload(), len);
267 268 269 270 271 272 273 274 275
          //If the topic includes the extended /DCDB_MAP/METADATA/ keyword, we assume a JSON metadata packet is encoded
          if(strncmp(topic, DCDB_MET, DCDB_MET_LEN) == 0) {
              SensorMetadata sm;
              try {
                  sm.parseJSON(payload);
              } catch (const std::exception &e) {
                  LOG(error) << "Invalid metadata packed received!";
                  return 1;
              }
276 277 278 279 280
              if(sm.isValid()) {
                  DCDB::PublicSensor ps = Configuration::metadataToPublicSensor(sm);
                  err = mySensorConfig->publishSensor(ps);
                  metadataStore->store(*sm.getPattern(), sm);
              }
281 282
          } else {
              err = mySensorConfig->publishSensor(payload.c_str(), topic + DCDB_MAP_LEN);
283
          }
284 285 286 287

          // PublishSensor does most of the error checking for us
          switch (err) {
              case DCDB::SC_INVALIDPATTERN:
Alessio Netti's avatar
Logging  
Alessio Netti committed
288
                  LOG(error) << "Invalid sensor topic : " << msg->getTopic();
289 290
                  return 1;
              case DCDB::SC_INVALIDPUBLICNAME:
291
                  LOG(error) << "Invalid sensor public name.";
292 293
                  return 1;
              case DCDB::SC_INVALIDSESSION:
Alessio Netti's avatar
Logging  
Alessio Netti committed
294
                  LOG(error) << "Cannot reach sensor data store.";
295 296 297 298
                  return 1;
              default:
                  break;
          }
299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342
      } else if (strncmp(topic, DCDB_CALIEVT, DCDB_CALIEVT_LEN) == 0) {
          /*
           * Special message case. This message contains a Caliper Event data
           * string that is encoded in the MQTT topic. Its payload consists of
           * usual timestamp-value pairs.
           * Data from this messages will be stored in a separate table managed
           * by CaliEvtDataStore class.
           */
          std::string topicStr(msg->getTopic());
          mqttPayload buf, *payload;

          len = msg->getPayloadLength();
          //TODO support case that no timestamp is given?
          //retrieve timestamp and value from the payload
          if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
              payload = (mqttPayload *) msg->getPayload();
          } else {
              //this message is malformed -> ignore
              LOG(error) << "Message malformed";
              return 1;
          }

          /*
           * Decode message topic in actual sensor topic and string data.
           * "/:/" is used as delimiter between topic and data.
           */
          topicStr.erase(0, DCDB_CALIEVT_LEN);
          size_t pos = topicStr.find("/:/");
          if (pos == std::string::npos) {
              // topic is malformed -> ignore
              LOG(error) << "CaliEvt topic malformed";
              return 1;
          }
          const std::string data(topicStr, pos+3);
          topicStr.erase(pos);

          /*
           * We use the same MQTT-topic/SensorId infrastructure as actual sensor
           * data readings to sort related events.
           * Check if we can decode the event topic into a valid SensorId. If
           * successful, store the record in the database.
           */
          DCDB::SensorId sid;
          if (sid.mqttTopicConvert(topicStr)) {
343
              //std::list<DCDB::CaliEvtData> events;
344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363
              DCDB::CaliEvtData e;
              e.eventId   = sid;
              e.event     = data;
              for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
                  e.timeStamp = payload[i].timestamp;

                  /**
                   * We want an exhaustive list of all events ordered by their
                   * time of occurrence. Payload values should always be
                   * one. Other values currently indicate a malformed message.
                   *
                   * In the future, the value field could be used to aggregate
                   * multiple equal events that occurred in the same plugin
                   * read cycle.
                   */
                  if(payload[i].value != 1) {
                      LOG(error) << "CaliEvt message malformed. Value != 1";
                      return 1;
                  }

364 365
                  myCaliEvtDataStore->insert(e, metadataStore->getTTL(topicStr));
                  //events.push_back(e);
366
              }
367
              //myCaliEvtDataStore->insertBatch(events, metadataStore->getTTL(topicStr));
368 369 370
          } else {
              LOG(error) << "Topic could not be converted to SID";
          }
Micha Müller's avatar
Micha Müller committed
371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438
      } else if (strncmp(topic, DCDB_JOBDATA, DCDB_JOBDATA_LEN) == 0) {
	      /**
           * This message contains Slurm job data in JSON format. We need to
           * parse the payload and store it within the JobDataStore.
           */
	      if ((len = msg->getPayloadLength()) == 0) {
		      LOG(error) << "Empty job data message received!";
		      return 1;
	      }

	      //parse JSON into JobData object
	      string        payload((char *)msg->getPayload(), len);
	      DCDB::JobData jd;
	      try {
		      boost::property_tree::iptree config;
		      std::istringstream           str(payload);
		      boost::property_tree::read_json(str, config);
		      BOOST_FOREACH (boost::property_tree::iptree::value_type &val, config) {
			      if (boost::iequals(val.first, "jobid")) {
				      jd.jobId = val.second.data();
			      } else if (boost::iequals(val.first, "userid")) {
				      jd.userId = val.second.data();
			      } else if (boost::iequals(val.first, "starttime")) {
				      jd.startTime = DCDB::TimeStamp((uint64_t)stoull(val.second.data()));
			      } else if (boost::iequals(val.first, "endtime")) {
				      jd.endTime = DCDB::TimeStamp((uint64_t)stoull(val.second.data()));
			      } else if (boost::iequals(val.first, "nodes")) {
				      BOOST_FOREACH (boost::property_tree::iptree::value_type &node, val.second) {
					      jd.nodes.push_back(node.second.data());
				      }
			      }
		      }
	      } catch (const std::exception &e) {
		      LOG(error) << "Invalid job data packet received!";
		      return 1;
	      }

#ifdef DEBUG
	      LOG(debug) << "JobId  = " << jd.jobId;
	      LOG(debug) << "UserId = " << jd.userId;
	      LOG(debug) << "Start  = " << jd.startTime.getString();
	      LOG(debug) << "End    = " << jd.endTime.getString();
	      LOG(debug) << "Nodes: ";
	      for (const auto &n : jd.nodes) {
		      LOG(debug) << "    " << n;
	      }
#endif

	      //store JobData into Storage Backend
	      if (jd.endTime == DCDB::TimeStamp((uint64_t)0)) {
		      //starting job data
		      if (myJobDataStore->insertJob(jd) != DCDB::JD_OK) {
			      LOG(error) << "Job data insert failed!";
			      return 1;
		      }
	      } else {
		      //ending job data
		      DCDB::JobData tmp;
		      if (myJobDataStore->getJobById(tmp, jd.jobId) != DCDB::JD_OK) {
			      LOG(error) << "Could not retrieve job to be updated!";
			      return 1;
		      }

		      if (myJobDataStore->updateEndtime(tmp.jobId, tmp.startTime, jd.endTime) != DCDB::JD_OK) {
			      LOG(error) << "Could not update end time of job!";
			      return 1;
		      }
	      }
439
      } else {
Micha Müller's avatar
Micha Müller committed
440
	      mqttPayload buf, *payload;
441

Micha Müller's avatar
Micha Müller committed
442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458
	      len = msg->getPayloadLength();
	      //In the 64 bit message case, the collect agent provides a timestamp
	      if (len == sizeof(uint64_t)) {
		      payload = &buf;
		      payload->value = *((int64_t *)msg->getPayload());
		      payload->timestamp = Messaging::calculateTimestamp();
		      len = sizeof(uint64_t) * 2;
	      }
	      //...otherwise it just retrieves it from the MQTT message payload.
	      else if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
		      payload = (mqttPayload *)msg->getPayload();
	      }
	      //...otherwise this message is malformed -> ignore...
	      else {
		      LOG(error) << "Message malformed";
		      return 1;
	      }
459

Micha Müller's avatar
Micha Müller committed
460
	      /*
461 462 463 464
           * Check if we can decode the message topic
           * into a valid SensorId. If successful, store
           * the record in the database.
           */
Micha Müller's avatar
Micha Müller committed
465 466
	      DCDB::SensorId sid;
	      if (sid.mqttTopicConvert(msg->getTopic())) {
467 468 469 470 471 472 473 474 475 476 477 478
#if 0
              cout << "Topic decode successful:" << endl
                  << "  Raw:            " << hex << setw(16) << setfill('0') << sid.getRaw()[0] << hex << setw(16) << setfill('0') << sid.getRaw()[1] << endl
                  << "  DeviceLocation: " << hex << setw(16) << setfill('0') << sid.getDeviceLocation() << endl
                  << "  device_id:      " << hex << setw(8) << setfill('0') << sid.getDeviceSensorId().device_id << endl
                  << "  sensor_number:  " << hex << setw(4) << setfill('0') << sid.getDeviceSensorId().sensor_number << endl << dec;

              cout << "Payload ("  << len/sizeof(mqttPayload) << " messages):"<< endl;
              for (uint64_t i=0; i<len/sizeof(mqttPayload); i++) {
                cout << "  " << i << ": ts=" << payload[i].timestamp << " val=" << payload[i].value << endl;
              }
              cout << endl;
479
#endif
480
	      std::list<DCDB::SensorDataStoreReading> readings;
Alessio Netti's avatar
Alessio Netti committed
481 482 483 484 485 486
          for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
              DCDB::SensorDataStoreReading r(sid, payload[i].timestamp, payload[i].value);
              readings.push_back(r);
              mySensorCache.storeSensor(sid, payload[i].timestamp, payload[i].value);
          }
          mySensorCache.getSensorMap()[sid].updateBatchSize(uint64_t(len / sizeof(mqttPayload)));
487
	      mySensorDataStore->insertBatch(readings, metadataStore->getTTL(msg->getTopic()));
488
	      readingCtr+= readings.size();
489

490
              //mySensorCache.dump();
491 492
          } else {
              LOG(error) << "Message with empty topic received";
493
          }
494
      }
495
  }
496
#endif
497
  return 0;
498 499
}

500 501 502



503 504 505 506
/*
 * Print usage information
 */
void usage() {
Alessio Netti's avatar
Alessio Netti committed
507
  Configuration config("", "collectagent.conf");
508 509 510 511 512
  /*
             1         2         3         4         5         6         7         8
   012345678901234567890123456789012345678901234567890123456789012345678901234567890
   */
  cout << "Usage:" << endl;
513
  cout << "  collectagent [-d] [-s] [-x] [-a] [-m<host>] [-c<host>] [-u<username>] [-p<password>] [-t<ttl>] [-v<verbosity>] <path/to/configfiles/>" << endl;
514 515 516 517
  cout << "  collectagent -h" << endl;
  cout << endl;
  
  cout << "Options:" << endl;
Alessio Netti's avatar
Alessio Netti committed
518 519
  cout << "  -m<host>      MQTT listen address     [default: " << config.mqttListenHost << ":" << config.mqttListenPort << "]" << endl;
  cout << "  -c<host>      Cassandra host          [default: " << config.cassandraSettings.host << ":" << config.cassandraSettings.port << "]" << endl;
Michael Ott's avatar
Michael Ott committed
520 521
  cout << "  -u<username>  Cassandra username      [default: none]" << endl;
  cout << "  -p<password>  Cassandra password      [default: none]" << endl;
Alessio Netti's avatar
Alessio Netti committed
522 523
  cout << "  -t<ttl>       Cassandra insert TTL    [default: " << config.cassandraSettings.ttl << "]" << endl;
  cout << "  -v<level>     Set verbosity of output [default: " << config.logLevelCmd << "]" << endl
Alessio Netti's avatar
Logging  
Alessio Netti committed
524
       << "                Can be a number between 5 (all) and 0 (fatal)." << endl;
525
  cout << endl;
Michael Ott's avatar
Michael Ott committed
526
  cout << "  -d            Daemonize" << endl;
527
  cout << "  -s            Print message stats" <<endl;
528
  cout << "  -x            Parse and print the config but do not actually start collectagent" << endl;
529
  cout << "  -a			   Enable sensor auto-publish" << endl;
Michael Ott's avatar
Michael Ott committed
530
  cout << "  -h            This help page" << endl;
531
  cout << endl;
532 533 534
}

int main(int argc, char* const argv[]) {
535
    cout << "CollectAgent " << VERSION << " (libdcdb " << DCDB::Version::getVersion() << ")" << endl << endl;
536 537

  try{
538

539 540 541 542 543 544 545 546
      // Checking if path to config is supplied
      if (argc <= 1) {
          cout << "Please specify a path to the config-directory" << endl << endl;
          usage();
          exit(EXIT_FAILURE);
      }

      // Defining options
547
      const char* opts = "m:r:c:C:u:p:t:v:dDsaxh";
548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563

      // Same mechanism as in DCDBPusher - checking if help string is requested before loading config
      char ret;
      while ((ret = getopt(argc, argv, opts)) != -1) {
          switch (ret)
          {
              case 'h':
                  usage();
                  exit(EXIT_FAILURE);
                  break;
              default:
                  //do nothing (other options are read later on)
                  break;
          }
      }

564 565
      initLogging();
      auto cmdSink = setupCmdLogger();
Alessio Netti's avatar
Logging  
Alessio Netti committed
566

Alessio Netti's avatar
Alessio Netti committed
567 568
      Configuration config(argv[argc - 1], "collectagent.conf");
      if( !config.readConfig() ) {
Alessio Netti's avatar
Logging  
Alessio Netti committed
569
          LOG(fatal) << "Failed to read global configuration!";
570 571 572
          exit(EXIT_FAILURE);
      }

Alessio Netti's avatar
Alessio Netti committed
573 574 575 576
      // References to shorten access to config parameters
      Configuration& settings = config;
      cassandra_t& cassandraSettings = config.cassandraSettings;
      pluginSettings_t& pluginSettings = config.pluginSettings;
Micha Müller's avatar
Micha Müller committed
577
      serverSettings_t& restAPISettings = config.restAPISettings;
Alessio Netti's avatar
Alessio Netti committed
578
      analyticsSettings_t& analyticsSettings = config.analyticsSettings;
Alessio Netti's avatar
Alessio Netti committed
579
      
580 581
      optind = 1;
      while ((ret=getopt(argc, argv, opts))!=-1) {
582
          switch(ret) {
583
              case 'a':
584
                  pluginSettings.autoPublish = true;
585
                  break;
586
              case 'm':
Alessio Netti's avatar
Alessio Netti committed
587 588 589
                  settings.mqttListenHost = parseNetworkHost(optarg);
                  settings.mqttListenPort = parseNetworkPort(optarg);
                  if(settings.mqttListenPort=="") settings.mqttListenPort = string(LISTENPORT);
590
                  break;
591
              case 'c':
Alessio Netti's avatar
Alessio Netti committed
592 593 594
                  cassandraSettings.host = parseNetworkHost(optarg);
                  cassandraSettings.port = parseNetworkPort(optarg);
                  if(cassandraSettings.port=="") cassandraSettings.port = string(CASSANDRAPORT);
595
                  break;
Michael Ott's avatar
Michael Ott committed
596
              case 'u':
Alessio Netti's avatar
Alessio Netti committed
597
                  cassandraSettings.username = optarg;
598
                  break;
Michael Ott's avatar
Michael Ott committed
599
              case 'p': {
Alessio Netti's avatar
Alessio Netti committed
600
                  cassandraSettings.password = optarg;
601
                  // What does this do? Mask the password?
Michael Ott's avatar
Michael Ott committed
602 603 604 605 606 607 608
                  size_t pwdLen = strlen(optarg);
                  memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
                  if (pwdLen > 3) {
                      memset(optarg+3, 0, pwdLen-3);
                  }
                  break;
              }
609
              case 't':
Alessio Netti's avatar
Alessio Netti committed
610
                  cassandraSettings.ttl = stoul(optarg);
611
                  break;
Alessio Netti's avatar
Logging  
Alessio Netti committed
612
              case 'v':
613
                  settings.logLevelCmd = stoi(optarg);
Alessio Netti's avatar
Logging  
Alessio Netti committed
614
                  break;
615
              case 'd':
616
              case 'D':
617
                  settings.daemonize = 1;
618
                  break;
619
              case 's':
620
                  settings.statistics = 1;
621
                  break;
622
              case 'x':
Alessio Netti's avatar
Alessio Netti committed
623
                  settings.validateConfig = true;
624
                  break;
625
              case 'h':
626 627 628
              default:
                  usage();
                  exit(EXIT_FAILURE);
629 630 631
          }
      }

632 633
      //set up logger to file
      if (settings.logLevelFile >= 0) {
Alessio Netti's avatar
Alessio Netti committed
634
	  auto fileSink = setupFileLogger(pluginSettings.tempdir, std::string("collectagent"));
635 636 637
	  fileSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelFile));
      }
      
Alessio Netti's avatar
Logging  
Alessio Netti committed
638
      //severity level may be overwritten (per option or config-file) --> set it according to globalSettings
639 640 641
      if (settings.logLevelCmd >= 0) {
	  cmdSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelCmd));
      }
Alessio Netti's avatar
Logging  
Alessio Netti committed
642

643
      /*
Alessio Netti's avatar
Alessio Netti committed
644
       * Catch SIGINT and SIGTERM signals to allow for proper server shutdowns.
645 646
       */
      signal(SIGINT, sigHandler);
Alessio Netti's avatar
Alessio Netti committed
647
      signal(SIGTERM, sigHandler);
648 649 650 651 652 653 654 655 656 657

      /*
       * Catch critical signals to allow for backtraces
       */
      signal(SIGABRT, abrtHandler);
      signal(SIGSEGV, abrtHandler);

      // Daemonizing the collectagent
      if(settings.daemonize)
          dcdbdaemon();
Alessio Netti's avatar
Alessio Netti committed
658
      
659
      // Setting the size of the sensor cache
660
      // Conversion from milliseconds to nanoseconds
Alessio Netti's avatar
Alessio Netti committed
661
      mySensorCache.setMaxHistory(uint64_t(pluginSettings.cacheInterval) * 1000000);
662

663
      //Allocate and initialize connection to Cassandra.
Alessio Netti's avatar
Alessio Netti committed
664 665
      dcdbConn = new DCDB::Connection(cassandraSettings.host, atoi(cassandraSettings.port.c_str()), 
                                      cassandraSettings.username, cassandraSettings.password);
Alessio Netti's avatar
Alessio Netti committed
666 667
      dcdbConn->setNumThreadsIo(cassandraSettings.numThreadsIo);
      dcdbConn->setQueueSizeIo(cassandraSettings.queueSizeIo);
668
      uint32_t params[1] = {cassandraSettings.coreConnPerHost};
Alessio Netti's avatar
Alessio Netti committed
669
      dcdbConn->setBackendParams(params);
Alessio Netti's avatar
Alessio Netti committed
670
      
Axel Auweter's avatar
Axel Auweter committed
671
      if (!dcdbConn->connect()) {
Alessio Netti's avatar
Logging  
Alessio Netti committed
672
          LOG(fatal) << "Cannot connect to Cassandra!";
673 674 675 676 677 678
          exit(EXIT_FAILURE);
      }

      /*
       * Legacy behavior: Initialize the DCDB schema in Cassandra.
       */
Axel Auweter's avatar
Axel Auweter committed
679
      dcdbConn->initSchema();
Alessio Netti's avatar
Alessio Netti committed
680
      
681 682 683
      /*
       * Allocate the SensorDataStore.
       */
684
      mySensorDataStore = new DCDB::SensorDataStore(dcdbConn);
685
      mySensorConfig = new DCDB::SensorConfig(dcdbConn);
Alessio Netti's avatar
Alessio Netti committed
686
      myJobDataStore = new DCDB::JobDataStore(dcdbConn);
687
      myCaliEvtDataStore = new DCDB::CaliEvtDataStore(dcdbConn);
688 689 690

      /*
       * Set TTL for data store inserts if TTL > 0.
691
       */
692
      if (cassandraSettings.ttl > 0) {
Alessio Netti's avatar
Alessio Netti committed
693
        mySensorDataStore->setTTL(cassandraSettings.ttl);
694 695
        myCaliEvtDataStore->setTTL(cassandraSettings.ttl);
      }
Alessio Netti's avatar
Alessio Netti committed
696
      mySensorDataStore->setDebugLog(cassandraSettings.debugLog);
697
      myCaliEvtDataStore->setDebugLog(cassandraSettings.debugLog);
698

699 700 701
      // Fetching public sensor information from the Cassandra datastore
      list<DCDB::PublicSensor> publicSensors;
      metadataStore = new MetadataStore();
702 703
      if(mySensorConfig->getPublicSensorsVerbose(publicSensors)!=SC_OK)
          LOG(error) << "Failed to retrieve public sensors. Metadata Store and Sensor Navigator will be empty.";
704
      SensorMetadata sBuf;
705
      for (const auto &s : publicSensors)
706 707
          if (!s.is_virtual) {
              sBuf = Configuration::publicSensorToMetadata(s);
708 709
              if(sBuf.isValid())
                  metadataStore->store(*sBuf.getPattern(), sBuf);
710
          }
711
          
712 713
      analyticsController = new AnalyticsController(mySensorConfig, mySensorDataStore);
      analyticsController->setCache(&mySensorCache);
714
      analyticsController->setMetadataStore(metadataStore);
Alessio Netti's avatar
Alessio Netti committed
715
      queryEngine.setFilter(analyticsSettings.filter);
Alessio Netti's avatar
Alessio Netti committed
716
      queryEngine.setJobFilter(analyticsSettings.jobFilter);
Alessio Netti's avatar
Alessio Netti committed
717
      queryEngine.setSensorHierarchy(analyticsSettings.hierarchy);
718
      queryEngine.setQueryCallback(sensorQueryCallback);
719
      queryEngine.setMetadataQueryCallback(metadataQueryCallback);
Alessio Netti's avatar
Alessio Netti committed
720
      queryEngine.setJobQueryCallback(jobQueryCallback);
721
      if(!analyticsController->initialize(settings, argv[argc - 1]))
722 723
          return EXIT_FAILURE;
      
Alessio Netti's avatar
Alessio Netti committed
724
      LOG_LEVEL vLogLevel = settings.validateConfig ? LOG_LEVEL::info : LOG_LEVEL::debug;
725 726 727 728
      LOG_VAR(vLogLevel) << "-----  Configuration  -----";

      //print global settings in either case
      LOG(info) << "Global Settings:";
Alessio Netti's avatar
Alessio Netti committed
729
      LOG(info) << "    MQTT-listenAddress: " << settings.mqttListenHost << ":" << settings.mqttListenPort;
Alessio Netti's avatar
Alessio Netti committed
730
      LOG(info) << "    CacheInterval:      " << int(pluginSettings.cacheInterval/1000) << " [s]";
731 732 733 734 735
      LOG(info) << "    CleaningInterval:   " << settings.cleaningInterval << " [s]";
      LOG(info) << "    MessageThreads:     " << settings.messageThreads;
      LOG(info) << "    MessageSlots:       " << settings.messageSlots;
      LOG(info) << "    Daemonize:          " << (settings.daemonize ? "Enabled" : "Disabled");
      LOG(info) << "    Statistics:         " << (settings.statistics ? "Enabled" : "Disabled");
Alessio Netti's avatar
Alessio Netti committed
736
      LOG(info) << "    MQTT-prefix:        " << pluginSettings.mqttPrefix;
737
      LOG(info) << "    Auto-publish:       " << (pluginSettings.autoPublish ? "Enabled" : "Disabled");
Alessio Netti's avatar
Alessio Netti committed
738 739
      LOG(info) << "    Write-Dir:          " << pluginSettings.tempdir;
      LOG(info) << (settings.validateConfig ? "    Only validating config files." : "    ValidateConfig:     Disabled");
740

Alessio Netti's avatar
Alessio Netti committed
741 742
      LOG(info) << "Analytics Settings:";
      LOG(info) << "    Hierarchy:          " << (analyticsSettings.hierarchy!="" ? analyticsSettings.hierarchy : "none");
743
      LOG(info) << "    Filter:             " << (analyticsSettings.filter!="" ? analyticsSettings.filter : "none");
Alessio Netti's avatar
Alessio Netti committed
744
      
745
      LOG(info) << "Cassandra Driver Settings:";
Alessio Netti's avatar
Alessio Netti committed
746
      LOG(info) << "    Address:            " << cassandraSettings.host << ":" << cassandraSettings.port;
Alessio Netti's avatar
Alessio Netti committed
747 748 749 750 751
      LOG(info) << "    TTL:                " << cassandraSettings.ttl;
      LOG(info) << "    NumThreadsIO:       " << cassandraSettings.numThreadsIo;
      LOG(info) << "    QueueSizeIO:        " << cassandraSettings.queueSizeIo;
      LOG(info) << "    CoreConnPerHost:    " << cassandraSettings.coreConnPerHost;
      LOG(info) << "    DebugLog:           " << (cassandraSettings.debugLog ? "Enabled" : "Disabled");
752
#ifdef SimpleMQTTVerbose
Alessio Netti's avatar
Alessio Netti committed
753 754
      LOG(info) << "    Username:           " << cassandraSettings.username;
	  LOG(info) << "    Password:           " << cassandraSettings.password;
755 756 757 758
#else
      LOG(info) << "    Username and password not printed.";
#endif

759 760 761 762 763
      if (restAPISettings.enabled) {
	  
	  LOG(info) << "RestAPI Settings:";
	  LOG(info) << "    REST Server: " << restAPISettings.host << ":" << restAPISettings.port;
	  LOG(info) << "    Certificate: " << restAPISettings.certificate;
Alessio Netti's avatar
Alessio Netti committed
764 765
	  LOG(info) << "    Private key file: " << restAPISettings.privateKey;
	  LOG(info) << "    DH params from: " << restAPISettings.dhFile;
766
      }
767 768
      LOG_VAR(vLogLevel) << "-----  Analytics Configuration  -----";
      for(auto& p : analyticsController->getManager()->getPlugins()) {
769
          LOG_VAR(vLogLevel) << "Operator Plugin \"" << p.id << "\"";
770 771 772 773
          p.configurator->printConfig(vLogLevel);
      }
      LOG_VAR(vLogLevel) << "-----  End Configuration  -----";

Alessio Netti's avatar
Alessio Netti committed
774
      if (settings.validateConfig)
775 776 777 778
          return EXIT_SUCCESS;
      else
          analyticsController->start();

779 780 781
      /*
       * Start the MQTT Message Server.
       */
Alessio Netti's avatar
Alessio Netti committed
782
      SimpleMQTTServer ms(settings.mqttListenHost, settings.mqttListenPort, settings.messageThreads, settings.messageSlots);
783
      
784 785 786
      ms.setMessageCallback(mqttCallback);
      ms.start();

Alessio Netti's avatar
Logging  
Alessio Netti committed
787
      LOG(info) << "MQTT Server running...";
788
      
789 790 791
      /*
       * Start the HTTP Server for the REST API
       */
792
      CARestAPI* httpsServer = nullptr;
793
      if (restAPISettings.enabled) {
794 795 796 797
          httpsServer = new CARestAPI(restAPISettings, &mySensorCache, analyticsController);
          config.readRestAPIUsers(httpsServer);
          httpsServer->start();
          LOG(info) <<  "HTTP Server running...";
798
      }
799

800 801 802 803 804 805
      /*
       * Run (hopefully) forever...
       */
      keepRunning = 1;
      timeval start, end;
      double elapsed;
806 807 808
      msgCtr = 0;
      pmsgCtr = 0;
      readingCtr = 0;
809

Alessio Netti's avatar
Alessio Netti committed
810 811 812
      gettimeofday(&start, NULL);
      uint64_t lastCleanup = start.tv_sec;

813
      LOG(info) << "Collect Agent running...";
814 815
      while(keepRunning) {
          gettimeofday(&start, NULL);
Alessio Netti's avatar
Alessio Netti committed
816 817 818 819 820 821 822
          if(start.tv_sec - lastCleanup > settings.cleaningInterval) {
              uint64_t purged = mySensorCache.clean(settings.cleaningInterval * 1000000000);
              lastCleanup = start.tv_sec;
              if(purged > 0)
                  LOG(info) << "Cache: purged " << purged << " obsolete entries";
          }

823
          sleep(60);
824 825 826 827
          /* not really thread safe but will do the job */
          gettimeofday(&end, NULL);
          elapsed = (end.tv_sec - start.tv_sec) * 1000.0;
          elapsed += (end.tv_usec - start.tv_usec) / 1000.0;
828
          float publish = msgCtr?(pmsgCtr*100.0)/msgCtr:0;
Alessio Netti's avatar
Alessio Netti committed
829
          if (settings.statistics && keepRunning) {
830
              LOG(info) << "Performance: " << (readingCtr/elapsed)*1000.0 << " inserts/s, " << (msgCtr/elapsed)*1000.0 << " messages/s (" << publish << "% PUBLISH)";
831
              LOG(info) << "Analytics Performance: " << (analyticsController->getReadingCtr()/elapsed)*1000.0 << " inserts/s ";
832
          }
833
          msgCtr = 0;
834
          pmsgCtr = 0;
835
	  readingCtr = 0;
836 837
      }

Alessio Netti's avatar
Logging  
Alessio Netti committed
838
      LOG(info) << "Stopping...";
839
      analyticsController->stop();
840
      ms.stop();
Alessio Netti's avatar
Logging  
Alessio Netti committed
841
      LOG(info) << "MQTT Server stopped...";
842 843 844 845 846
      if (restAPISettings.enabled) {
	  httpsServer->stop();
	  delete httpsServer;
	  LOG(info) << "HTTP Server stopped...";
      }
847
      delete mySensorDataStore;
Alessio Netti's avatar
Alessio Netti committed
848
      delete myJobDataStore;
849
      delete mySensorConfig;
850
      delete myCaliEvtDataStore;
Axel Auweter's avatar
Axel Auweter committed
851 852
      dcdbConn->disconnect();
      delete dcdbConn;
853
      delete metadataStore;
Alessio Netti's avatar
Logging  
Alessio Netti committed
854
      LOG(info) << "Collect Agent closed. Bye bye...";
855
  }
856
  catch (const exception& e) {
Alessio Netti's avatar
Logging  
Alessio Netti committed
857
      LOG(fatal) << "Exception: " << e.what();
858
      abrt(EXIT_FAILURE, INTERR);
859 860
  }

861
  return EXIT_SUCCESS;
862
}
863 864