collectagent.cpp 30 KB
Newer Older
1
2
//================================================================================
// Name        : collectagent.cpp
3
4
// Author      : Axel Auweter
// Copyright   : Leibniz Supercomputing Centre
5
// Description : Main code of the CollectAgent
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2011-2016 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
26

27
28
#include <boost/network/protocol/http/server.hpp>
#include <boost/network/utils/thread_pool.hpp>
29
#include <boost/network/uri.hpp>
30
#include <iostream>
31
#include <cstdlib>
32
#include <signal.h>
33
34
#include <unistd.h>
#include <string>
35

36
#include <boost/date_time/posix_time/posix_time.hpp>
37

38
#include <dcdb/connection.h>
39
#include <dcdb/sensordatastore.h>
40
#include <dcdb/sensorconfig.h>
41
#include <dcdb/version.h>
42
#include <dcdb/sensor.h>
43
#include "version.h"
44

45
#include "configuration.h"
46
#include "simplemqttserver.h"
47
#include "messaging.h"
48
#include "abrt.h"
49
#include "dcdbdaemon.h"
50
#include "sensorcache.h"
51
52
#include "analyticscontroller.h"
#include "../analytics/includes/QueryEngine.h"
53

54
55
56
#define __STDC_FORMAT_MACROS
#include <inttypes.h>

57
using namespace std;
58
59

int keepRunning;
60
bool statistics;
61
62
uint64_t msgCtr;
uint64_t pmsgCtr;
63
uint64_t readingCtr;
Alessio Netti's avatar
Alessio Netti committed
64
SensorCache mySensorCache;
65
AnalyticsController* analyticsController;
66
DCDB::Connection* dcdbConn;
67
DCDB::SensorDataStore *mySensorDataStore;
68
69
DCDB::SensorConfig *mySensorConfig;
DCDB::SCError err;
70
QueryEngine& queryEngine = QueryEngine::getInstance();
71
logger_t lg;
72

73
std::vector<reading_t>* sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>* buffer, const bool rel) {
74
    std::string topic;
Alessio Netti's avatar
Alessio Netti committed
75
    // Getting the topic of the queried sensor from the Navigator
76
77
78
79
80
81
82
    try {
        topic = queryEngine.getNavigator()->getNodeTopic(name);
    } catch(const std::domain_error& e) {
        return NULL;
    }
    std::vector <reading_t> *output = NULL;
    DCDB::SensorId sid;
Alessio Netti's avatar
Alessio Netti committed
83
    // Creating a SID to perform the query
84
85
86
    sid.mqttTopicConvert(topic);
    if(mySensorCache.getSensorMap().count(sid) > 0) {
        CacheEntry &entry = mySensorCache.getSensorMap()[sid];
Alessio Netti's avatar
Alessio Netti committed
87
88
        // getView is called with live=false to drop strict staleness checks
        output = entry.getView(startTs, endTs, buffer, rel, false);
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
        if (output->size() > 0)
            return output;
    }
    // If we are here then the sensor was not found in the cache - we need to fetch data from Cassandra
    try {
        DCDB::PublicSensor publicSensor;
        publicSensor.name = name;
        publicSensor.pattern = topic;
        std::list <DCDB::SensorDataStoreReading> results;
        DCDB::Sensor sensor(dcdbConn, publicSensor);
        uint64_t now = getTimestamp();
        //Converting relative timestamps to absolute
        uint64_t startTsInt = rel ? now - startTs : startTs;
        uint64_t endTsInt = rel ? now - endTs : endTs;
        DCDB::TimeStamp start(startTsInt), end(endTsInt);
        sensor.query(results, start, end, DCDB::AGGREGATE_NONE);
Alessio Netti's avatar
Alessio Netti committed
105
        // Dealing with allocations that may have been performed by the cache search
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
        if(output)
            output->clear();
        else if(buffer) {
            buffer->clear();
            output = buffer;
        } else
            output = new std::vector<reading_t>();
        reading_t reading;
        //TODO: fix when result contains only partial time range of the query
        for (const auto &r : results) {
            reading.value = r.value;
            reading.timestamp = r.timeStamp.getRaw();
            output->push_back(reading);
        }
    }
    catch(const std::exception& e) {
        if(!buffer && output) delete output;
        return NULL;
    }
    return output;
126
127
}

128
/* Normal termination (SIGINT, CTRL+C) */
129
130
void sigHandler(int sig)
{
Alessio Netti's avatar
Alessio Netti committed
131
132
133
134
135
  boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
  if( sig == SIGINT )
      LOG(fatal) << "Received SIGINT";
  else if( sig == SIGTERM )
      LOG(fatal) << "Received SIGTERM";
136
137
138
  keepRunning = 0;
}

139
140
141
142
143
144
/* Crash */
void abrtHandler(int sig)
{
  abrt(EXIT_FAILURE, SIGNAL);
}

Alessio Netti's avatar
Alessio Netti committed
145
//TODO: trim common code with dcdbpusher
146
147
148
149
150
151
struct httpHandler_t;
typedef boost::network::http::server<httpHandler_t> httpServer_t;
struct httpHandler_t {
  void operator()(httpServer_t::request const &request, httpServer_t::connection_ptr connection) {
    httpServer_t::string_type ip = source(request);
    static httpServer_t::response_header headers[] = { { "Connection", "close" }, { "Content-Type", "text/plain" } };
Alessio Netti's avatar
Alessio Netti committed
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
    
    boost::network::uri::uri uri("https://" + request.destination);
    httpServer_t::string_type method = request.method;
    httpServer_t::string_type path = uri.path();
    httpServer_t::string_type query = uri.query();
    std::string response = "";
    std::ostringstream data;
    std::string auth_value = "";
    bool json = false;
    std::vector<std::string> pathStrs;
    std::vector<std::pair<std::string, std::string>> queries;
    
    //first check if request is supported at all
    if (method != "GET" && method != "PUT") {
      LOGH(warning) << "Unsupported " << method << " request was made";
      connection->set_status(httpServer_t::connection::not_supported);
      goto error;
    }
    
    LOGH(info) << method << " request of " << request.destination << " was made";
    
    //do some string processing
    //split path into its hierarchical parts
    if (path.size() >= 2) {
      if (path[0] == '/')
          path.erase(0,1);
      if (path[path.size() -1] == '/')
          path.erase(path.size() -1);
      boost::split(pathStrs, path, boost::is_any_of("/"), boost::token_compress_off);
    }
    
    //split query part into the individual queries (key-value pairs)
    {	//do not remove the enclosing brackets
      //need to encapsulate this code block to keep queryStrs local
      std::vector<std::string> queryStrs;
      boost::split(queryStrs, query, boost::is_any_of(";"), boost::token_compress_on);
      for(auto& key : queryStrs) {
          size_t pos = key.find("=");
          if (pos != std::string::npos) {
              std::string value;
              value = key.substr(pos+1);
              key.erase(pos);
              queries.push_back(std::make_pair(key, value));
          }
      }
    }
    //finished string processing
    for (auto& p : queries) {
      //authkey is required in every case
      //if (p.first == "authkey") {
      //    auth_value = p.second;
      //} else 
      if (p.first == "json")
          if (stoi(p.second) > 0)
              json = true;
    }
    
    if (pathStrs.size() < 1) {
      LOGH(warning) << "Received malformed request: No first path part";
      connection->set_status(httpServer_t::connection::bad_request);
      goto error;
    }
214

Alessio Netti's avatar
Alessio Netti committed
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
    //select code depending on request
    if (method == "GET") {
        if (pathStrs[0] == "help") {
            response = "collectagent RESTful API cheatsheet:\n"
                       " -GET:  /help     This help message\n"
                       "        /analytics/help\n"
                       "        /[sensor]/avg?interval=[timeInSec]\n"
                       "                  Average of last sensor readings from the last\n"
                       "                  [interval] seconds or of all cached readings\n"
                       "                  if no interval is given\n"
                       "\n";
            //"All resources have to be prepended by host:port and need at\n"
            //"least the query ?authkey=[token] at the end. Multiple queries\n"
            //"need to be separated by semicolons(';')\n";
            response += analyticsController->getManager()->restCheatSheet;
        } else if (pathStrs[0] == "analytics") {
            try {
                restResponse_t reply = analyticsController->REST(pathStrs, queries, method);
                data << reply.data;
                response = reply.response;
            } catch(const std::invalid_argument &e) {
                LOGH(warning) << e.what();
                connection->set_status(httpServer_t::connection::bad_request);
                goto error;
            } catch(const std::domain_error &e) {
                response = e.what();
                connection->set_status(httpServer_t::connection::not_found);
            } catch(const std::exception &e) {
                LOGH(warning) << e.what();
                connection->set_status(httpServer_t::connection::internal_server_error);
                goto error;
            }
        } else {
248

Alessio Netti's avatar
Alessio Netti committed
249
250
251
252
253
254
255
256
257
258
            if (pathStrs.size() < 2) {
                LOGH(warning) << "Received malformed request: No second path part";
                connection->set_status(httpServer_t::connection::bad_request);
                goto error;
            }
            if (pathStrs[1] != "avg") {
                LOGH(warning) << "Unknown action " << pathStrs[1] << " requested";
                connection->set_status(httpServer_t::connection::not_supported);
                goto error;
            }
259

Alessio Netti's avatar
Alessio Netti committed
260
261
262
263
            uint64_t time = 0;
            for (auto &p : queries)
                if (p.first == "interval")
                    time = std::stoul(p.second);
264

Alessio Netti's avatar
Alessio Netti committed
265
266
267
268
            //try getting the latest value 
            try {
                //TODO: switch from SID input to sensor name input
                int64_t val = mySensorCache.getSensor(pathStrs[0], (uint64_t) time * 1000000000);
269

Alessio Netti's avatar
Alessio Netti committed
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
                connection->set_status(httpServer_t::connection::ok);
                response = "collectagent::" + pathStrs[0] + " Average of last " +
                           std::to_string(time) + " seconds is " + std::to_string(val);
                //data << val << "\n";
                //data << "Sid : " << sid.toString() << ", Value: " << val << "." << std::endl;
            }
            catch (const std::invalid_argument &e) {
                connection->set_status(httpServer_t::connection::not_found);
                response = "Error: Sensor id not found.\n";
            } catch (const std::out_of_range &e) {
                connection->set_status(httpServer_t::connection::no_content);
                response = "Error: Sensor unavailable.\n";
            } catch (const std::exception &e) {
                connection->set_status(httpServer_t::connection::internal_server_error);
                LOGH(warning) << "Internal server error.\n";
                goto error;
            }
        }
    } else if(method == "PUT") {
        if( pathStrs.back() == "reload" )
            analyticsController->halt(true);
        try {
            restResponse_t reply = analyticsController->REST(pathStrs, queries, method);
            data << reply.data;
            response = reply.response;
        } catch(const std::invalid_argument &e) {
            LOGH(warning) << e.what();
            connection->set_status(httpServer_t::connection::bad_request);
            goto error;
        } catch(const std::domain_error &e) {
            response = e.what();
            connection->set_status(httpServer_t::connection::not_found);
        } catch(const std::exception &e) {
            response = e.what();
            connection->set_status(httpServer_t::connection::internal_server_error);
        }
        // Continue MQTTPusher when a reload was performed
        if( pathStrs.back() == "reload" )
            analyticsController->resume();
309
    }
Alessio Netti's avatar
Alessio Netti committed
310
311
312
313
314
315
316
317
    
    LOGH(info) << "Responding: " << response;
    data << response << std::endl;
    
    //Error management section
    error:
    connection->set_headers(boost::make_iterator_range(headers, headers + 2));
    connection->write(data.str());
318
319
320
321
  }
};


322
int mqttCallback(SimpleMQTTMessage *msg)
323
324
325
326
{
  /*
   * Increment the msgCtr/vmsgCtr for statistics.
   */
327
  msgCtr++;
328
329
330
  if (msg->isPublish())
    pmsgCtr++;

331
  uint64_t len;
332
333
334
  /*
   * Decode the message and put into the database.
   */
335
  if (msg->isPublish()) {
336
337
338
339
      const char *topic = msg->getTopic().c_str();
      // We check whether the topic includes the \DCDB_MAP\ keyword, indicating that the payload will contain the
      // sensor's name. In that case, we set the mappingMessage flag to true, and filter the keyword out of the prefix
      // We use strncmp as it is the most efficient way to do it
Alessio Netti's avatar
Alessio Netti committed
340
      if (strncmp(topic, DCDB_MAP, DCDB_MAP_LEN) == 0) {
341
          if ((len = msg->getPayloadLength()) == 0) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
342
              LOG(error) << "Empty topic-to-name mapping message received";
343
344
              return 1;
          }
345

346
347
348
349
350
351
          string sensorName((char *) msg->getPayload(), len);
          err = mySensorConfig->publishSensor(sensorName.c_str(), topic + DCDB_MAP_LEN);

          // PublishSensor does most of the error checking for us
          switch (err) {
              case DCDB::SC_INVALIDPATTERN:
Alessio Netti's avatar
Logging    
Alessio Netti committed
352
                  LOG(error) << "Invalid sensor topic : " << msg->getTopic();
353
354
                  return 1;
              case DCDB::SC_INVALIDPUBLICNAME:
Alessio Netti's avatar
Logging    
Alessio Netti committed
355
                  LOG(error) << "Invalid sensor public name: " << sensorName;
356
357
                  return 1;
              case DCDB::SC_INVALIDSESSION:
Alessio Netti's avatar
Logging    
Alessio Netti committed
358
                  LOG(error) << "Cannot reach sensor data store.";
359
360
361
362
363
364
365
366
367
368
369
                  return 1;
              default:
                  break;
          }
      } else {
          mqttPayload buf, *payload;

          len = msg->getPayloadLength();
          //In the 64 bit message case, the collect agent provides a timestamp
          if (len == sizeof(uint64_t)) {
              payload = &buf;
370
              payload->value = *((int64_t *) msg->getPayload());
371
372
              payload->timestamp = Messaging::calculateTimestamp();
              len = sizeof(uint64_t) * 2;
373
          }
374
375
376
377
378
379
              //...otherwise it just retrieves it from the MQTT message payload.
          else if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
              payload = (mqttPayload *) msg->getPayload();
          }
              //...otherwise this message is malformed -> ignore...
          else {
Alessio Netti's avatar
Logging    
Alessio Netti committed
380
              LOG(error) << "Message malformed";
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
              return 1;
          }

          /*
           * Check if we can decode the message topic
           * into a valid SensorId. If successful, store
           * the record in the database.
           */
          DCDB::SensorId sid;
          if (sid.mqttTopicConvert(msg->getTopic())) {
#if 0
              cout << "Topic decode successful:" << endl
                  << "  Raw:            " << hex << setw(16) << setfill('0') << sid.getRaw()[0] << hex << setw(16) << setfill('0') << sid.getRaw()[1] << endl
                  << "  DeviceLocation: " << hex << setw(16) << setfill('0') << sid.getDeviceLocation() << endl
                  << "  device_id:      " << hex << setw(8) << setfill('0') << sid.getDeviceSensorId().device_id << endl
                  << "  sensor_number:  " << hex << setw(4) << setfill('0') << sid.getDeviceSensorId().sensor_number << endl << dec;

              cout << "Payload ("  << len/sizeof(mqttPayload) << " messages):"<< endl;
              for (uint64_t i=0; i<len/sizeof(mqttPayload); i++) {
                cout << "  " << i << ": ts=" << payload[i].timestamp << " val=" << payload[i].value << endl;
              }
              cout << endl;
403
#endif
404
	      std::list<DCDB::SensorDataStoreReading> readings;
Alessio Netti's avatar
Alessio Netti committed
405
406
407
408
409
410
          for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
              DCDB::SensorDataStoreReading r(sid, payload[i].timestamp, payload[i].value);
              readings.push_back(r);
              mySensorCache.storeSensor(sid, payload[i].timestamp, payload[i].value);
          }
          mySensorCache.getSensorMap()[sid].updateBatchSize(uint64_t(len / sizeof(mqttPayload)));
411
	      mySensorDataStore->insertBatch(readings);
412
	      readingCtr+= readings.size();
413

414
              //mySensorCache.dump();
415
          }
416
#if 1
417
418
419
420
          else {
              cout << "Wrong topic format: " << msg->getTopic() << "\n";
              return 1;
          }
421
#endif
422
      }
423
  }
424
  return 0;
425
426
}

427
428
429



430
431
432
433
/*
 * Print usage information
 */
void usage() {
Alessio Netti's avatar
Alessio Netti committed
434
435
  Configuration config("");
  globalCA_t& defaults = config.getGlobal();
436
437
438
439
440
  /*
             1         2         3         4         5         6         7         8
   012345678901234567890123456789012345678901234567890123456789012345678901234567890
   */
  cout << "Usage:" << endl;
441
  cout << "  collectagent [-d] [-s] [-x] [-a<string>] [-m<host>] [-r<host>] [-c<host>] [-u<username>] [-p<password>] [-t<ttl>] [-v<verbosity>] <path/to/configfiles/>" << endl;
442
443
444
445
  cout << "  collectagent -h" << endl;
  cout << endl;
  
  cout << "Options:" << endl;
446
  cout << "  -a <string>   Auto-publish pattern    [default: none]" << endl;
447
448
449
  cout << "  -m<host>      MQTT listen address     [default: " << defaults.mqttListenAddress << "]" << endl;
  cout << "  -r<host>      REST API listen address [default: " << defaults.restListenAddress << "]" << endl;
  cout << "  -c<host>      Cassandra host          [default: " << defaults.cassandraSettings.address << "]" << endl;
Michael Ott's avatar
Michael Ott committed
450
451
  cout << "  -u<username>  Cassandra username      [default: none]" << endl;
  cout << "  -p<password>  Cassandra password      [default: none]" << endl;
452
  cout << "  -t<ttl>       Cassandra insert TTL    [default: " << defaults.cassandraSettings.ttl << "]" << endl;
Alessio Netti's avatar
Logging    
Alessio Netti committed
453
454
  cout << "  -v<level>     Set verbosity of output [default: " << defaults.logLevelCmd << "]" << endl
       << "                Can be a number between 5 (all) and 0 (fatal)." << endl;
455
  cout << endl;
Michael Ott's avatar
Michael Ott committed
456
  cout << "  -d            Daemonize" << endl;
457
  cout << "  -s            Print message stats" <<endl;
458
  cout << "  -x            Parse and print the config but do not actually start collectagent" << endl;
Michael Ott's avatar
Michael Ott committed
459
  cout << "  -h            This help page" << endl;
460
  cout << endl;
461
462
463
}

int main(int argc, char* const argv[]) {
464
    cout << "CollectAgent " << VERSION << " (libdcdb " << DCDB::Version::getVersion() << ")" << endl << endl;
465
    bool validateConfig = false;
466
467

  try{
468

469
470
471
472
473
474
475
476
      // Checking if path to config is supplied
      if (argc <= 1) {
          cout << "Please specify a path to the config-directory" << endl << endl;
          usage();
          exit(EXIT_FAILURE);
      }

      // Defining options
477
      const char* opts = "a:m:r:c:C:u:p:t:v:dDsxh";
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493

      // Same mechanism as in DCDBPusher - checking if help string is requested before loading config
      char ret;
      while ((ret = getopt(argc, argv, opts)) != -1) {
          switch (ret)
          {
              case 'h':
                  usage();
                  exit(EXIT_FAILURE);
                  break;
              default:
                  //do nothing (other options are read later on)
                  break;
          }
      }

494
495
      initLogging();
      auto cmdSink = setupCmdLogger();
Alessio Netti's avatar
Logging    
Alessio Netti committed
496

497
498
      Configuration config(argv[argc - 1]);
      if( !config.readGlobal() ) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
499
          LOG(fatal) << "Failed to read global configuration!";
500
501
502
503
          exit(EXIT_FAILURE);
      }

      globalCA_t& settings = config.getGlobal();
504

505
      /* Parse command line */
506
      std::string listenHost, cassandraHost, restApiHost;
507
      std::string listenPort, cassandraPort, restApiPort;
508
509
510

      optind = 1;
      while ((ret=getopt(argc, argv, opts))!=-1) {
511
          switch(ret) {
512
513
514
              case 'a':
                  settings.pluginSettings.sensorPattern = optarg;
                  break;
515
              case 'm':
516
                  settings.mqttListenAddress = optarg;
517
                  break;
518
              case 'r':
519
                  settings.restListenAddress = optarg;
520
                  break;
521
              case 'c':
522
523
                  settings.cassandraSettings.address = optarg;
                  break;
Michael Ott's avatar
Michael Ott committed
524
              case 'u':
525
                  settings.cassandraSettings.username = optarg;
526
                  break;
Michael Ott's avatar
Michael Ott committed
527
              case 'p': {
528
529
                  settings.cassandraSettings.password = optarg;
                  // What does this do? Mask the password?
Michael Ott's avatar
Michael Ott committed
530
531
532
533
534
535
536
                  size_t pwdLen = strlen(optarg);
                  memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
                  if (pwdLen > 3) {
                      memset(optarg+3, 0, pwdLen-3);
                  }
                  break;
              }
537
              case 't':
538
                  settings.cassandraSettings.ttl = stoul(optarg);
539
                  break;
Alessio Netti's avatar
Logging    
Alessio Netti committed
540
              case 'v':
541
                  settings.logLevelCmd = translateLogLevel(stoi(optarg));
Alessio Netti's avatar
Logging    
Alessio Netti committed
542
                  break;
543
              case 'd':
544
              case 'D':
545
                  settings.daemonize = 1;
546
                  break;
547
              case 's':
548
                  settings.statistics = 1;
549
                  break;
550
551
552
              case 'x':
                  validateConfig = true;
                  break;
553
              case 'h':
554
555
556
              default:
                  usage();
                  exit(EXIT_FAILURE);
557
558
559
          }
      }

560
      auto fileSink = setupFileLogger(settings.pluginSettings.tempdir, std::string("collectagent"));
Alessio Netti's avatar
Logging    
Alessio Netti committed
561
562
563
564
      //severity level may be overwritten (per option or config-file) --> set it according to globalSettings
      fileSink->set_filter(boost::log::trivial::severity >= settings.logLevelFile);
      cmdSink->set_filter(boost::log::trivial::severity >= settings.logLevelCmd);

565
      /*
Alessio Netti's avatar
Alessio Netti committed
566
       * Catch SIGINT and SIGTERM signals to allow for proper server shutdowns.
567
568
       */
      signal(SIGINT, sigHandler);
Alessio Netti's avatar
Alessio Netti committed
569
      signal(SIGTERM, sigHandler);
570
571
572
573
574
575
576
577
578
579
580

      /*
       * Catch critical signals to allow for backtraces
       */
      signal(SIGABRT, abrtHandler);
      signal(SIGSEGV, abrtHandler);

      // Daemonizing the collectagent
      if(settings.daemonize)
          dcdbdaemon();

581
582
583
      /*
       * Parse hostnames for port specifications
       */
584
      listenHost = string(settings.mqttListenAddress);
585
586
587
588
589
      size_t pos = listenHost.find(":");
      if (pos != string::npos) {
        listenPort = listenHost.substr(pos+1);
        listenHost.erase(pos);
      } else {
590
        listenPort = LISTENPORT;
591
      }
592
593

      cassandraHost = string(settings.cassandraSettings.address);
594
595
596
597
598
      pos = cassandraHost.find(":");
      if (pos != string::npos) {
        cassandraPort = cassandraHost.substr(pos+1);
        cassandraHost.erase(pos);
      } else {
599
        cassandraPort = CASSANDRAPORT;
600
      }
601
602

      restApiHost = string(settings.restListenAddress);
603
604
605
606
607
      pos = restApiHost.find(":");
      if (pos != string::npos) {
        restApiPort = restApiHost.substr(pos+1);
        restApiHost.erase(pos);
      } else {
608
        restApiPort = RESTAPIPORT;
609
610
      }

611
      // Setting the size of the sensor cache
612
      // Conversion from milliseconds to nanoseconds
613
      mySensorCache.setMaxHistory(uint64_t(settings.pluginSettings.cacheInterval) * 1000000);
614

615
616
      //Allocate and initialize connection to Cassandra.
      dcdbConn = new DCDB::Connection(cassandraHost, atoi(cassandraPort.c_str()), settings.cassandraSettings.username, settings.cassandraSettings.password);
Alessio Netti's avatar
Alessio Netti committed
617
618
619
620
621
      dcdbConn->setNumThreadsIo(settings.cassandraSettings.numThreadsIo);
      dcdbConn->setQueueSizeIo(settings.cassandraSettings.queueSizeIo);
      uint32_t params[3] = {settings.cassandraSettings.coreConnPerHost, settings.cassandraSettings.maxConnPerHost, settings.cassandraSettings.maxConcRequests};
      dcdbConn->setBackendParams(params);

622

Axel Auweter's avatar
Axel Auweter committed
623
      if (!dcdbConn->connect()) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
624
          LOG(fatal) << "Cannot connect to Cassandra!";
625
626
627
628
629
630
          exit(EXIT_FAILURE);
      }

      /*
       * Legacy behavior: Initialize the DCDB schema in Cassandra.
       */
Axel Auweter's avatar
Axel Auweter committed
631
      dcdbConn->initSchema();
632

Alessio Netti's avatar
Alessio Netti committed
633

634
635
636
      /*
       * Allocate the SensorDataStore.
       */
637
      mySensorDataStore = new DCDB::SensorDataStore(dcdbConn);
638
      mySensorConfig = new DCDB::SensorConfig(dcdbConn);
639
640
641

      /*
       * Set TTL for data store inserts if TTL > 0.
642
       */
643
644
      if (settings.cassandraSettings.ttl > 0)
        mySensorDataStore->setTTL(settings.cassandraSettings.ttl);
Alessio Netti's avatar
Alessio Netti committed
645
      mySensorDataStore->setDebugLog(settings.cassandraSettings.debugLog);
646

647
648
649
650
      analyticsController = new AnalyticsController(mySensorConfig, mySensorDataStore);
      analyticsController->setCache(&mySensorCache);
      if(!analyticsController->initialize(settings, argv[argc - 1]))
          return EXIT_FAILURE;
651
      queryEngine.setQueryCallback(sensorQueryCallback);
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699

      LOG_LEVEL vLogLevel = validateConfig ? LOG_LEVEL::info : LOG_LEVEL::debug;
      LOG_VAR(vLogLevel) << "-----  Configuration  -----";

      //print global settings in either case
      LOG(info) << "Global Settings:";
      LOG(info) << "    MQTT-listenAddress: " << settings.mqttListenAddress;
      LOG(info) << "    CacheInterval:      " << int(settings.pluginSettings.cacheInterval/1000) << " [s]";
      LOG(info) << "    CleaningInterval:   " << settings.cleaningInterval << " [s]";
      LOG(info) << "    MessageThreads:     " << settings.messageThreads;
      LOG(info) << "    MessageSlots:       " << settings.messageSlots;
      LOG(info) << "    Daemonize:          " << (settings.daemonize ? "Enabled" : "Disabled");
      LOG(info) << "    Statistics:         " << (settings.statistics ? "Enabled" : "Disabled");
      LOG(info) << "    Write-Dir:          " << settings.pluginSettings.tempdir;
      LOG(info) << "    Hierarchy:          " << (settings.hierarchy!="" ? settings.hierarchy : "none");
      LOG(info) << (validateConfig ? "    Only validating config files." : "    ValidateConfig:     Disabled");

      LOG(info) << "Cassandra Driver Settings:";
      LOG(info) << "    Address:            " << settings.cassandraSettings.address;
      LOG(info) << "    TTL:                " << settings.cassandraSettings.ttl;
      LOG(info) << "    NumThreadsIO:       " << settings.cassandraSettings.numThreadsIo;
      LOG(info) << "    QueueSizeIO:        " << settings.cassandraSettings.queueSizeIo;
      LOG(info) << "    CoreConnPerHost:    " << settings.cassandraSettings.coreConnPerHost;
      LOG(info) << "    MaxConnPerHost:     " << settings.cassandraSettings.maxConnPerHost;
      LOG(info) << "    MaxConcRequests:    " << settings.cassandraSettings.maxConcRequests;
      LOG(info) << "    DebugLog:           " << (settings.cassandraSettings.debugLog ? "Enabled" : "Disabled");
#ifdef SimpleMQTTVerbose
      LOG(info) << "    Username:           " << settings.cassandraSettings.username;
	  LOG(info) << "    Password:           " << settings.cassandraSettings.password;
#else
      LOG(info) << "    Username and password not printed.";
#endif

      LOG(info) << "RestAPI Settings:";
      LOG(info) << "    REST Server: " << settings.restListenAddress;

      LOG_VAR(vLogLevel) << "-----  Analytics Configuration  -----";
      for(auto& p : analyticsController->getManager()->getPlugins()) {
          LOG_VAR(vLogLevel) << "Analytics Plugin \"" << p.id << "\"";
          p.configurator->printConfig(vLogLevel);
      }
      LOG_VAR(vLogLevel) << "-----  End Configuration  -----";

      if (validateConfig)
          return EXIT_SUCCESS;
      else
          analyticsController->start();

700
701
702
      /*
       * Start the MQTT Message Server.
       */
703
      SimpleMQTTServer ms(listenHost, listenPort, settings.messageThreads, settings.messageSlots);
704
      
705
706
707
      ms.setMessageCallback(mqttCallback);
      ms.start();

Alessio Netti's avatar
Logging    
Alessio Netti committed
708
      LOG(info) << "MQTT Server running...";
709
      
710
711
712
713
714
715
716
717
718
719
720
721
      /*
       * Start the HTTP Server for the REST API
       */
      std::thread httpThread;
      httpHandler_t httpHandler;
      httpServer_t::options httpOptions(httpHandler);

      httpOptions.reuse_address(true);
      httpOptions.thread_pool(std::make_shared<boost::network::utils::thread_pool>());
      httpServer_t httpServer(httpOptions.address(restApiHost).port(restApiPort));
      httpThread = std::thread([&httpServer] { httpServer.run(); });

Alessio Netti's avatar
Logging    
Alessio Netti committed
722
      LOG(info) <<  "HTTP Server running...";
723

724
725
726
727
728
729
      /*
       * Run (hopefully) forever...
       */
      keepRunning = 1;
      timeval start, end;
      double elapsed;
730
731
732
      msgCtr = 0;
      pmsgCtr = 0;
      readingCtr = 0;
733

Alessio Netti's avatar
Alessio Netti committed
734
735
736
      gettimeofday(&start, NULL);
      uint64_t lastCleanup = start.tv_sec;

737
      LOG(info) << "Collect Agent running...";
738
739
      while(keepRunning) {
          gettimeofday(&start, NULL);
Alessio Netti's avatar
Alessio Netti committed
740
741
742
743
744
745
746
          if(start.tv_sec - lastCleanup > settings.cleaningInterval) {
              uint64_t purged = mySensorCache.clean(settings.cleaningInterval * 1000000000);
              lastCleanup = start.tv_sec;
              if(purged > 0)
                  LOG(info) << "Cache: purged " << purged << " obsolete entries";
          }

747
          sleep(60);
748
749
750
751
          /* not really thread safe but will do the job */
          gettimeofday(&end, NULL);
          elapsed = (end.tv_sec - start.tv_sec) * 1000.0;
          elapsed += (end.tv_usec - start.tv_usec) / 1000.0;
752
          float publish = msgCtr?(pmsgCtr*100.0)/msgCtr:0;
Alessio Netti's avatar
Alessio Netti committed
753
          if (settings.statistics && keepRunning) {
754
              LOG(info) << "Performance: " << (readingCtr/elapsed)*1000.0 << " inserts/s, " << (msgCtr/elapsed)*1000.0 << " messages/s (" << publish << "% PUBLISH)";
755
              LOG(info) << "Analytics Performance: " << (analyticsController->getReadingCtr()/elapsed)*1000.0 << " inserts/s ";
756
          }
757
          msgCtr = 0;
758
          pmsgCtr = 0;
759
	  readingCtr = 0;
760
761
      }

Alessio Netti's avatar
Logging    
Alessio Netti committed
762
      LOG(info) << "Stopping...";
763
      analyticsController->stop();
764
      ms.stop();
Alessio Netti's avatar
Logging    
Alessio Netti committed
765
      LOG(info) << "MQTT Server stopped...";
766
767
      httpServer.stop();
      httpThread.join();
Alessio Netti's avatar
Logging    
Alessio Netti committed
768
      LOG(info) << "HTTP Server stopped...";
769
      delete mySensorDataStore;
770
      delete mySensorConfig;
Axel Auweter's avatar
Axel Auweter committed
771
772
      dcdbConn->disconnect();
      delete dcdbConn;
Alessio Netti's avatar
Logging    
Alessio Netti committed
773
      LOG(info) << "Collect Agent closed. Bye bye...";
774
  }
775
  catch (const exception& e) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
776
      LOG(fatal) << "Exception: " << e.what();
777
      abrt(EXIT_FAILURE, INTERR);
778
779
  }

780
  return EXIT_SUCCESS;
781
}
782
783