collectagent.cpp 30.1 KB
Newer Older
1
2
//================================================================================
// Name        : collectagent.cpp
3
4
// Author      : Axel Auweter
// Copyright   : Leibniz Supercomputing Centre
5
// Description : Main code of the CollectAgent
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2011-2016 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
26

27
28
#include <boost/network/protocol/http/server.hpp>
#include <boost/network/utils/thread_pool.hpp>
29
#include <boost/network/uri.hpp>
30
#include <iostream>
31
#include <cstdlib>
32
#include <signal.h>
33
34
#include <unistd.h>
#include <string>
35

36
#include <boost/date_time/posix_time/posix_time.hpp>
37

38
#include <dcdb/connection.h>
39
#include <dcdb/sensordatastore.h>
40
#include <dcdb/sensorconfig.h>
41
#include <dcdb/version.h>
42
#include <dcdb/sensor.h>
43
#include "version.h"
44

45
#include "configuration.h"
46
#include "simplemqttserver.h"
47
#include "messaging.h"
48
#include "abrt.h"
49
#include "dcdbdaemon.h"
50
#include "sensorcache.h"
51
52
#include "analyticscontroller.h"
#include "../analytics/includes/QueryEngine.h"
53

54
55
56
#define __STDC_FORMAT_MACROS
#include <inttypes.h>

57
using namespace std;
58
59

int keepRunning;
60
bool statistics;
61
62
uint64_t msgCtr;
uint64_t pmsgCtr;
63
uint64_t readingCtr;
Alessio Netti's avatar
Alessio Netti committed
64
SensorCache mySensorCache;
65
AnalyticsController* analyticsController;
66
DCDB::Connection* dcdbConn;
67
DCDB::SensorDataStore *mySensorDataStore;
68
69
DCDB::SensorConfig *mySensorConfig;
DCDB::SCError err;
70
QueryEngine& queryEngine = QueryEngine::getInstance();
71
logger_t lg;
72

73
std::vector<reading_t>* sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>* buffer, const bool rel) {
74
    std::string topic;
Alessio Netti's avatar
Alessio Netti committed
75
    // Getting the topic of the queried sensor from the Navigator
76
77
78
79
80
81
82
    try {
        topic = queryEngine.getNavigator()->getNodeTopic(name);
    } catch(const std::domain_error& e) {
        return NULL;
    }
    std::vector <reading_t> *output = NULL;
    DCDB::SensorId sid;
Alessio Netti's avatar
Alessio Netti committed
83
    // Creating a SID to perform the query
84
85
86
    sid.mqttTopicConvert(topic);
    if(mySensorCache.getSensorMap().count(sid) > 0) {
        CacheEntry &entry = mySensorCache.getSensorMap()[sid];
87
        output = entry.getView(startTs, endTs, buffer, rel);
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
        if (output->size() > 0)
            return output;
    }
    // If we are here then the sensor was not found in the cache - we need to fetch data from Cassandra
    try {
        DCDB::PublicSensor publicSensor;
        publicSensor.name = name;
        publicSensor.pattern = topic;
        std::list <DCDB::SensorDataStoreReading> results;
        DCDB::Sensor sensor(dcdbConn, publicSensor);
        uint64_t now = getTimestamp();
        //Converting relative timestamps to absolute
        uint64_t startTsInt = rel ? now - startTs : startTs;
        uint64_t endTsInt = rel ? now - endTs : endTs;
        DCDB::TimeStamp start(startTsInt), end(endTsInt);
        sensor.query(results, start, end, DCDB::AGGREGATE_NONE);
Alessio Netti's avatar
Alessio Netti committed
104
        // Dealing with allocations that may have been performed by the cache search
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
        if(output)
            output->clear();
        else if(buffer) {
            buffer->clear();
            output = buffer;
        } else
            output = new std::vector<reading_t>();
        reading_t reading;
        //TODO: fix when result contains only partial time range of the query
        for (const auto &r : results) {
            reading.value = r.value;
            reading.timestamp = r.timeStamp.getRaw();
            output->push_back(reading);
        }
    }
    catch(const std::exception& e) {
        if(!buffer && output) delete output;
        return NULL;
    }
    return output;
125
126
}

127
/* Normal termination (SIGINT, CTRL+C) */
128
129
void sigHandler(int sig)
{
Alessio Netti's avatar
Alessio Netti committed
130
131
132
133
134
  boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
  if( sig == SIGINT )
      LOG(fatal) << "Received SIGINT";
  else if( sig == SIGTERM )
      LOG(fatal) << "Received SIGTERM";
135
136
137
  keepRunning = 0;
}

138
139
140
141
142
143
/* Crash */
void abrtHandler(int sig)
{
  abrt(EXIT_FAILURE, SIGNAL);
}

Alessio Netti's avatar
Alessio Netti committed
144
//TODO: trim common code with dcdbpusher
145
146
147
148
149
150
struct httpHandler_t;
typedef boost::network::http::server<httpHandler_t> httpServer_t;
struct httpHandler_t {
  void operator()(httpServer_t::request const &request, httpServer_t::connection_ptr connection) {
    httpServer_t::string_type ip = source(request);
    static httpServer_t::response_header headers[] = { { "Connection", "close" }, { "Content-Type", "text/plain" } };
Alessio Netti's avatar
Alessio Netti committed
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
    
    boost::network::uri::uri uri("https://" + request.destination);
    httpServer_t::string_type method = request.method;
    httpServer_t::string_type path = uri.path();
    httpServer_t::string_type query = uri.query();
    std::string response = "";
    std::ostringstream data;
    std::string auth_value = "";
    std::vector<std::string> pathStrs;
    std::vector<std::pair<std::string, std::string>> queries;
    
    //first check if request is supported at all
    if (method != "GET" && method != "PUT") {
      LOGH(warning) << "Unsupported " << method << " request was made";
      connection->set_status(httpServer_t::connection::not_supported);
      goto error;
    }
    
    LOGH(info) << method << " request of " << request.destination << " was made";
    
    //do some string processing
    //split path into its hierarchical parts
    if (path.size() >= 2) {
      if (path[0] == '/')
          path.erase(0,1);
      if (path[path.size() -1] == '/')
          path.erase(path.size() -1);
      boost::split(pathStrs, path, boost::is_any_of("/"), boost::token_compress_off);
    }
    
    //split query part into the individual queries (key-value pairs)
    {	//do not remove the enclosing brackets
      //need to encapsulate this code block to keep queryStrs local
      std::vector<std::string> queryStrs;
      boost::split(queryStrs, query, boost::is_any_of(";"), boost::token_compress_on);
      for(auto& key : queryStrs) {
          size_t pos = key.find("=");
          if (pos != std::string::npos) {
              std::string value;
              value = key.substr(pos+1);
              key.erase(pos);
              queries.push_back(std::make_pair(key, value));
          }
      }
    }
    //finished string processing
Alessio Netti's avatar
Alessio Netti committed
197
198
199
200
201
202
203
204
205
206
207

    //bool json = false;
    //for (auto& p : queries) {
    //      authkey is required in every case
    //if (p.first == "authkey") {
    //      auth_value = p.second;
    //} else 
    //if (p.first == "json")
    //    if (stoi(p.second) > 0)
    //        json = true;
    //}
Alessio Netti's avatar
Alessio Netti committed
208
209
210
211
212
213
    
    if (pathStrs.size() < 1) {
      LOGH(warning) << "Received malformed request: No first path part";
      connection->set_status(httpServer_t::connection::bad_request);
      goto error;
    }
214

Alessio Netti's avatar
Alessio Netti committed
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
    //select code depending on request
    if (method == "GET") {
        if (pathStrs[0] == "help") {
            response = "collectagent RESTful API cheatsheet:\n"
                       " -GET:  /help     This help message\n"
                       "        /analytics/help\n"
                       "        /[sensor]/avg?interval=[timeInSec]\n"
                       "                  Average of last sensor readings from the last\n"
                       "                  [interval] seconds or of all cached readings\n"
                       "                  if no interval is given\n"
                       "\n";
            //"All resources have to be prepended by host:port and need at\n"
            //"least the query ?authkey=[token] at the end. Multiple queries\n"
            //"need to be separated by semicolons(';')\n";
            response += analyticsController->getManager()->restCheatSheet;
        } else if (pathStrs[0] == "analytics") {
            try {
                restResponse_t reply = analyticsController->REST(pathStrs, queries, method);
                data << reply.data;
                response = reply.response;
            } catch(const std::invalid_argument &e) {
                LOGH(warning) << e.what();
                connection->set_status(httpServer_t::connection::bad_request);
                goto error;
            } catch(const std::domain_error &e) {
                response = e.what();
                connection->set_status(httpServer_t::connection::not_found);
            } catch(const std::exception &e) {
                LOGH(warning) << e.what();
                connection->set_status(httpServer_t::connection::internal_server_error);
                goto error;
            }
        } else {
248

Alessio Netti's avatar
Alessio Netti committed
249
250
251
252
253
254
255
256
257
258
            if (pathStrs.size() < 2) {
                LOGH(warning) << "Received malformed request: No second path part";
                connection->set_status(httpServer_t::connection::bad_request);
                goto error;
            }
            if (pathStrs[1] != "avg") {
                LOGH(warning) << "Unknown action " << pathStrs[1] << " requested";
                connection->set_status(httpServer_t::connection::not_supported);
                goto error;
            }
259

Alessio Netti's avatar
Alessio Netti committed
260
261
262
263
            uint64_t time = 0;
            for (auto &p : queries)
                if (p.first == "interval")
                    time = std::stoul(p.second);
264

Alessio Netti's avatar
Alessio Netti committed
265
266
            //try getting the latest value 
            try {
Alessio Netti's avatar
Alessio Netti committed
267
268
                std::string sensor = MQTTChecker::convertTopic(pathStrs[0]);
                int64_t val = mySensorCache.getSensor(sensor, (uint64_t) time * 1000000000);
Alessio Netti's avatar
Alessio Netti committed
269
                connection->set_status(httpServer_t::connection::ok);
Alessio Netti's avatar
Alessio Netti committed
270
                response = "collectagent::" + sensor + " Average of last " +
Alessio Netti's avatar
Alessio Netti committed
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
                           std::to_string(time) + " seconds is " + std::to_string(val);
                //data << val << "\n";
                //data << "Sid : " << sid.toString() << ", Value: " << val << "." << std::endl;
            }
            catch (const std::invalid_argument &e) {
                connection->set_status(httpServer_t::connection::not_found);
                response = "Error: Sensor id not found.\n";
            } catch (const std::out_of_range &e) {
                connection->set_status(httpServer_t::connection::no_content);
                response = "Error: Sensor unavailable.\n";
            } catch (const std::exception &e) {
                connection->set_status(httpServer_t::connection::internal_server_error);
                LOGH(warning) << "Internal server error.\n";
                goto error;
            }
        }
    } else if(method == "PUT") {
        if( pathStrs.back() == "reload" )
            analyticsController->halt(true);
        try {
            restResponse_t reply = analyticsController->REST(pathStrs, queries, method);
            data << reply.data;
            response = reply.response;
        } catch(const std::invalid_argument &e) {
            LOGH(warning) << e.what();
            connection->set_status(httpServer_t::connection::bad_request);
            goto error;
        } catch(const std::domain_error &e) {
            response = e.what();
            connection->set_status(httpServer_t::connection::not_found);
        } catch(const std::exception &e) {
            response = e.what();
            connection->set_status(httpServer_t::connection::internal_server_error);
        }
        // Continue MQTTPusher when a reload was performed
        if( pathStrs.back() == "reload" )
            analyticsController->resume();
308
    }
Alessio Netti's avatar
Alessio Netti committed
309
310
311
312
313
314
315
316
    
    LOGH(info) << "Responding: " << response;
    data << response << std::endl;
    
    //Error management section
    error:
    connection->set_headers(boost::make_iterator_range(headers, headers + 2));
    connection->write(data.str());
317
318
319
320
  }
};


321
int mqttCallback(SimpleMQTTMessage *msg)
322
323
324
325
{
  /*
   * Increment the msgCtr/vmsgCtr for statistics.
   */
326
  msgCtr++;
327
328
329
  if (msg->isPublish())
    pmsgCtr++;

330
  uint64_t len;
331
332
333
  /*
   * Decode the message and put into the database.
   */
334
  if (msg->isPublish()) {
335
336
337
338
      const char *topic = msg->getTopic().c_str();
      // We check whether the topic includes the \DCDB_MAP\ keyword, indicating that the payload will contain the
      // sensor's name. In that case, we set the mappingMessage flag to true, and filter the keyword out of the prefix
      // We use strncmp as it is the most efficient way to do it
Alessio Netti's avatar
Alessio Netti committed
339
      if (strncmp(topic, DCDB_MAP, DCDB_MAP_LEN) == 0) {
340
          if ((len = msg->getPayloadLength()) == 0) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
341
              LOG(error) << "Empty topic-to-name mapping message received";
342
343
              return 1;
          }
344

345
346
347
348
349
350
          string sensorName((char *) msg->getPayload(), len);
          err = mySensorConfig->publishSensor(sensorName.c_str(), topic + DCDB_MAP_LEN);

          // PublishSensor does most of the error checking for us
          switch (err) {
              case DCDB::SC_INVALIDPATTERN:
Alessio Netti's avatar
Logging    
Alessio Netti committed
351
                  LOG(error) << "Invalid sensor topic : " << msg->getTopic();
352
353
                  return 1;
              case DCDB::SC_INVALIDPUBLICNAME:
Alessio Netti's avatar
Logging    
Alessio Netti committed
354
                  LOG(error) << "Invalid sensor public name: " << sensorName;
355
356
                  return 1;
              case DCDB::SC_INVALIDSESSION:
Alessio Netti's avatar
Logging    
Alessio Netti committed
357
                  LOG(error) << "Cannot reach sensor data store.";
358
359
360
361
362
363
364
365
366
367
368
                  return 1;
              default:
                  break;
          }
      } else {
          mqttPayload buf, *payload;

          len = msg->getPayloadLength();
          //In the 64 bit message case, the collect agent provides a timestamp
          if (len == sizeof(uint64_t)) {
              payload = &buf;
369
              payload->value = *((int64_t *) msg->getPayload());
370
371
              payload->timestamp = Messaging::calculateTimestamp();
              len = sizeof(uint64_t) * 2;
372
          }
373
374
375
376
377
378
              //...otherwise it just retrieves it from the MQTT message payload.
          else if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
              payload = (mqttPayload *) msg->getPayload();
          }
              //...otherwise this message is malformed -> ignore...
          else {
Alessio Netti's avatar
Logging    
Alessio Netti committed
379
              LOG(error) << "Message malformed";
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
              return 1;
          }

          /*
           * Check if we can decode the message topic
           * into a valid SensorId. If successful, store
           * the record in the database.
           */
          DCDB::SensorId sid;
          if (sid.mqttTopicConvert(msg->getTopic())) {
#if 0
              cout << "Topic decode successful:" << endl
                  << "  Raw:            " << hex << setw(16) << setfill('0') << sid.getRaw()[0] << hex << setw(16) << setfill('0') << sid.getRaw()[1] << endl
                  << "  DeviceLocation: " << hex << setw(16) << setfill('0') << sid.getDeviceLocation() << endl
                  << "  device_id:      " << hex << setw(8) << setfill('0') << sid.getDeviceSensorId().device_id << endl
                  << "  sensor_number:  " << hex << setw(4) << setfill('0') << sid.getDeviceSensorId().sensor_number << endl << dec;

              cout << "Payload ("  << len/sizeof(mqttPayload) << " messages):"<< endl;
              for (uint64_t i=0; i<len/sizeof(mqttPayload); i++) {
                cout << "  " << i << ": ts=" << payload[i].timestamp << " val=" << payload[i].value << endl;
              }
              cout << endl;
402
#endif
403
	      std::list<DCDB::SensorDataStoreReading> readings;
Alessio Netti's avatar
Alessio Netti committed
404
405
406
407
408
409
          for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
              DCDB::SensorDataStoreReading r(sid, payload[i].timestamp, payload[i].value);
              readings.push_back(r);
              mySensorCache.storeSensor(sid, payload[i].timestamp, payload[i].value);
          }
          mySensorCache.getSensorMap()[sid].updateBatchSize(uint64_t(len / sizeof(mqttPayload)));
410
	      mySensorDataStore->insertBatch(readings);
411
	      readingCtr+= readings.size();
412

413
              //mySensorCache.dump();
414
          }
415
#if 1
416
417
418
419
          else {
              cout << "Wrong topic format: " << msg->getTopic() << "\n";
              return 1;
          }
420
#endif
421
      }
422
  }
423
  return 0;
424
425
}

426
427
428



429
430
431
432
/*
 * Print usage information
 */
void usage() {
Alessio Netti's avatar
Alessio Netti committed
433
  Configuration config("", "collectagent.conf");
434
435
436
437
438
  /*
             1         2         3         4         5         6         7         8
   012345678901234567890123456789012345678901234567890123456789012345678901234567890
   */
  cout << "Usage:" << endl;
439
  cout << "  collectagent [-d] [-s] [-x] [-a] [-m<host>] [-c<host>] [-u<username>] [-p<password>] [-t<ttl>] [-v<verbosity>] <path/to/configfiles/>" << endl;
440
441
442
443
  cout << "  collectagent -h" << endl;
  cout << endl;
  
  cout << "Options:" << endl;
Alessio Netti's avatar
Alessio Netti committed
444
445
  cout << "  -m<host>      MQTT listen address     [default: " << config.mqttListenHost << ":" << config.mqttListenPort << "]" << endl;
  cout << "  -c<host>      Cassandra host          [default: " << config.cassandraSettings.host << ":" << config.cassandraSettings.port << "]" << endl;
Michael Ott's avatar
Michael Ott committed
446
447
  cout << "  -u<username>  Cassandra username      [default: none]" << endl;
  cout << "  -p<password>  Cassandra password      [default: none]" << endl;
Alessio Netti's avatar
Alessio Netti committed
448
449
  cout << "  -t<ttl>       Cassandra insert TTL    [default: " << config.cassandraSettings.ttl << "]" << endl;
  cout << "  -v<level>     Set verbosity of output [default: " << config.logLevelCmd << "]" << endl
Alessio Netti's avatar
Logging    
Alessio Netti committed
450
       << "                Can be a number between 5 (all) and 0 (fatal)." << endl;
451
  cout << endl;
Michael Ott's avatar
Michael Ott committed
452
  cout << "  -d            Daemonize" << endl;
453
  cout << "  -s            Print message stats" <<endl;
454
  cout << "  -x            Parse and print the config but do not actually start collectagent" << endl;
455
  cout << "  -a			   Enable sensor auto-publish" << endl;
Michael Ott's avatar
Michael Ott committed
456
  cout << "  -h            This help page" << endl;
457
  cout << endl;
458
459
460
}

int main(int argc, char* const argv[]) {
461
    cout << "CollectAgent " << VERSION << " (libdcdb " << DCDB::Version::getVersion() << ")" << endl << endl;
462
463

  try{
464

465
466
467
468
469
470
471
472
      // Checking if path to config is supplied
      if (argc <= 1) {
          cout << "Please specify a path to the config-directory" << endl << endl;
          usage();
          exit(EXIT_FAILURE);
      }

      // Defining options
473
      const char* opts = "m:r:c:C:u:p:t:v:dDsaxh";
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489

      // Same mechanism as in DCDBPusher - checking if help string is requested before loading config
      char ret;
      while ((ret = getopt(argc, argv, opts)) != -1) {
          switch (ret)
          {
              case 'h':
                  usage();
                  exit(EXIT_FAILURE);
                  break;
              default:
                  //do nothing (other options are read later on)
                  break;
          }
      }

490
491
      initLogging();
      auto cmdSink = setupCmdLogger();
Alessio Netti's avatar
Logging    
Alessio Netti committed
492

Alessio Netti's avatar
Alessio Netti committed
493
494
      Configuration config(argv[argc - 1], "collectagent.conf");
      if( !config.readConfig() ) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
495
          LOG(fatal) << "Failed to read global configuration!";
496
497
498
          exit(EXIT_FAILURE);
      }

Alessio Netti's avatar
Alessio Netti committed
499
500
501
502
503
504
      // References to shorten access to config parameters
      Configuration& settings = config;
      cassandra_t& cassandraSettings = config.cassandraSettings;
      pluginSettings_t& pluginSettings = config.pluginSettings;
      restAPISettings_t& restAPISettings = config.restAPISettings;
      analyticsSettings_t& analyticsSettings = config.analyticsSettings;
Alessio Netti's avatar
Alessio Netti committed
505
      
506
507
      optind = 1;
      while ((ret=getopt(argc, argv, opts))!=-1) {
508
          switch(ret) {
509
              case 'a':
510
                  pluginSettings.autoPublish = true;
511
                  break;
512
              case 'm':
Alessio Netti's avatar
Alessio Netti committed
513
514
515
                  settings.mqttListenHost = parseNetworkHost(optarg);
                  settings.mqttListenPort = parseNetworkPort(optarg);
                  if(settings.mqttListenPort=="") settings.mqttListenPort = string(LISTENPORT);
516
                  break;
517
              case 'c':
Alessio Netti's avatar
Alessio Netti committed
518
519
520
                  cassandraSettings.host = parseNetworkHost(optarg);
                  cassandraSettings.port = parseNetworkPort(optarg);
                  if(cassandraSettings.port=="") cassandraSettings.port = string(CASSANDRAPORT);
521
                  break;
Michael Ott's avatar
Michael Ott committed
522
              case 'u':
Alessio Netti's avatar
Alessio Netti committed
523
                  cassandraSettings.username = optarg;
524
                  break;
Michael Ott's avatar
Michael Ott committed
525
              case 'p': {
Alessio Netti's avatar
Alessio Netti committed
526
                  cassandraSettings.password = optarg;
527
                  // What does this do? Mask the password?
Michael Ott's avatar
Michael Ott committed
528
529
530
531
532
533
534
                  size_t pwdLen = strlen(optarg);
                  memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
                  if (pwdLen > 3) {
                      memset(optarg+3, 0, pwdLen-3);
                  }
                  break;
              }
535
              case 't':
Alessio Netti's avatar
Alessio Netti committed
536
                  cassandraSettings.ttl = stoul(optarg);
537
                  break;
Alessio Netti's avatar
Logging    
Alessio Netti committed
538
              case 'v':
539
                  settings.logLevelCmd = stoi(optarg);
Alessio Netti's avatar
Logging    
Alessio Netti committed
540
                  break;
541
              case 'd':
542
              case 'D':
543
                  settings.daemonize = 1;
544
                  break;
545
              case 's':
546
                  settings.statistics = 1;
547
                  break;
548
              case 'x':
Alessio Netti's avatar
Alessio Netti committed
549
                  settings.validateConfig = true;
550
                  break;
551
              case 'h':
552
553
554
              default:
                  usage();
                  exit(EXIT_FAILURE);
555
556
557
          }
      }

558
559
560
561
562
563
      //set up logger to file
      if (settings.logLevelFile >= 0) {
	  auto fileSink = setupFileLogger(pluginSettings.tempdir, std::string("dcdbpusher"));
	  fileSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelFile));
      }
      
Alessio Netti's avatar
Logging    
Alessio Netti committed
564
      //severity level may be overwritten (per option or config-file) --> set it according to globalSettings
565
566
567
      if (settings.logLevelCmd >= 0) {
	  cmdSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelCmd));
      }
Alessio Netti's avatar
Logging    
Alessio Netti committed
568

569
      /*
Alessio Netti's avatar
Alessio Netti committed
570
       * Catch SIGINT and SIGTERM signals to allow for proper server shutdowns.
571
572
       */
      signal(SIGINT, sigHandler);
Alessio Netti's avatar
Alessio Netti committed
573
      signal(SIGTERM, sigHandler);
574
575
576
577
578
579
580
581
582
583

      /*
       * Catch critical signals to allow for backtraces
       */
      signal(SIGABRT, abrtHandler);
      signal(SIGSEGV, abrtHandler);

      // Daemonizing the collectagent
      if(settings.daemonize)
          dcdbdaemon();
Alessio Netti's avatar
Alessio Netti committed
584
      
585
      // Setting the size of the sensor cache
586
      // Conversion from milliseconds to nanoseconds
Alessio Netti's avatar
Alessio Netti committed
587
      mySensorCache.setMaxHistory(uint64_t(pluginSettings.cacheInterval) * 1000000);
588

589
      //Allocate and initialize connection to Cassandra.
Alessio Netti's avatar
Alessio Netti committed
590
591
      dcdbConn = new DCDB::Connection(cassandraSettings.host, atoi(cassandraSettings.port.c_str()), 
                                      cassandraSettings.username, cassandraSettings.password);
Alessio Netti's avatar
Alessio Netti committed
592
593
594
      dcdbConn->setNumThreadsIo(cassandraSettings.numThreadsIo);
      dcdbConn->setQueueSizeIo(cassandraSettings.queueSizeIo);
      uint32_t params[3] = {cassandraSettings.coreConnPerHost, cassandraSettings.maxConnPerHost, cassandraSettings.maxConcRequests};
Alessio Netti's avatar
Alessio Netti committed
595
      dcdbConn->setBackendParams(params);
Alessio Netti's avatar
Alessio Netti committed
596
      
Axel Auweter's avatar
Axel Auweter committed
597
      if (!dcdbConn->connect()) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
598
          LOG(fatal) << "Cannot connect to Cassandra!";
599
600
601
602
603
604
          exit(EXIT_FAILURE);
      }

      /*
       * Legacy behavior: Initialize the DCDB schema in Cassandra.
       */
Axel Auweter's avatar
Axel Auweter committed
605
      dcdbConn->initSchema();
Alessio Netti's avatar
Alessio Netti committed
606
      
607
608
609
      /*
       * Allocate the SensorDataStore.
       */
610
      mySensorDataStore = new DCDB::SensorDataStore(dcdbConn);
611
      mySensorConfig = new DCDB::SensorConfig(dcdbConn);
612
613
614

      /*
       * Set TTL for data store inserts if TTL > 0.
615
       */
Alessio Netti's avatar
Alessio Netti committed
616
617
618
      if (cassandraSettings.ttl > 0)
        mySensorDataStore->setTTL(cassandraSettings.ttl);
      mySensorDataStore->setDebugLog(cassandraSettings.debugLog);
619

620
621
622
623
      analyticsController = new AnalyticsController(mySensorConfig, mySensorDataStore);
      analyticsController->setCache(&mySensorCache);
      if(!analyticsController->initialize(settings, argv[argc - 1]))
          return EXIT_FAILURE;
624
      queryEngine.setQueryCallback(sensorQueryCallback);
625

Alessio Netti's avatar
Alessio Netti committed
626
      LOG_LEVEL vLogLevel = settings.validateConfig ? LOG_LEVEL::info : LOG_LEVEL::debug;
627
628
629
630
      LOG_VAR(vLogLevel) << "-----  Configuration  -----";

      //print global settings in either case
      LOG(info) << "Global Settings:";
Alessio Netti's avatar
Alessio Netti committed
631
      LOG(info) << "    MQTT-listenAddress: " << settings.mqttListenHost << ":" << settings.mqttListenPort;
Alessio Netti's avatar
Alessio Netti committed
632
      LOG(info) << "    CacheInterval:      " << int(pluginSettings.cacheInterval/1000) << " [s]";
633
634
635
636
637
      LOG(info) << "    CleaningInterval:   " << settings.cleaningInterval << " [s]";
      LOG(info) << "    MessageThreads:     " << settings.messageThreads;
      LOG(info) << "    MessageSlots:       " << settings.messageSlots;
      LOG(info) << "    Daemonize:          " << (settings.daemonize ? "Enabled" : "Disabled");
      LOG(info) << "    Statistics:         " << (settings.statistics ? "Enabled" : "Disabled");
Alessio Netti's avatar
Alessio Netti committed
638
      LOG(info) << "    MQTT-prefix:        " << pluginSettings.mqttPrefix;
639
      LOG(info) << "    Auto-publish:       " << (pluginSettings.autoPublish ? "Enabled" : "Disabled");
Alessio Netti's avatar
Alessio Netti committed
640
641
      LOG(info) << "    Write-Dir:          " << pluginSettings.tempdir;
      LOG(info) << (settings.validateConfig ? "    Only validating config files." : "    ValidateConfig:     Disabled");
642

Alessio Netti's avatar
Alessio Netti committed
643
644
      LOG(info) << "Analytics Settings:";
      LOG(info) << "    Hierarchy:          " << (analyticsSettings.hierarchy!="" ? analyticsSettings.hierarchy : "none");
645
      LOG(info) << "    Filter:             " << (analyticsSettings.filter!="" ? analyticsSettings.filter : "none");
Alessio Netti's avatar
Alessio Netti committed
646
      
647
      LOG(info) << "Cassandra Driver Settings:";
Alessio Netti's avatar
Alessio Netti committed
648
      LOG(info) << "    Address:            " << cassandraSettings.host << ":" << cassandraSettings.port;
Alessio Netti's avatar
Alessio Netti committed
649
650
651
652
653
654
655
      LOG(info) << "    TTL:                " << cassandraSettings.ttl;
      LOG(info) << "    NumThreadsIO:       " << cassandraSettings.numThreadsIo;
      LOG(info) << "    QueueSizeIO:        " << cassandraSettings.queueSizeIo;
      LOG(info) << "    CoreConnPerHost:    " << cassandraSettings.coreConnPerHost;
      LOG(info) << "    MaxConnPerHost:     " << cassandraSettings.maxConnPerHost;
      LOG(info) << "    MaxConcRequests:    " << cassandraSettings.maxConcRequests;
      LOG(info) << "    DebugLog:           " << (cassandraSettings.debugLog ? "Enabled" : "Disabled");
656
#ifdef SimpleMQTTVerbose
Alessio Netti's avatar
Alessio Netti committed
657
658
      LOG(info) << "    Username:           " << cassandraSettings.username;
	  LOG(info) << "    Password:           " << cassandraSettings.password;
659
660
661
662
663
#else
      LOG(info) << "    Username and password not printed.";
#endif

      LOG(info) << "RestAPI Settings:";
Alessio Netti's avatar
Alessio Netti committed
664
665
666
667
668
669
670
671
      LOG(info) << "    REST Server: " << restAPISettings.restHost << ":" << restAPISettings.restPort;
#ifdef SimpleMQTTVerbose
      LOG(info) << "    Certificate: " << restAPISettings.certificate;
	  LOG(info) << "    Private key file: " << restAPISettings.privateKey;
	  LOG(info) << "    DH params from: " << restAPISettings.dhFile;
#else
      LOG(info) << "    Certificate, private key and DH-param file not printed.";
#endif
672
673
674
675
676
677
678
679

      LOG_VAR(vLogLevel) << "-----  Analytics Configuration  -----";
      for(auto& p : analyticsController->getManager()->getPlugins()) {
          LOG_VAR(vLogLevel) << "Analytics Plugin \"" << p.id << "\"";
          p.configurator->printConfig(vLogLevel);
      }
      LOG_VAR(vLogLevel) << "-----  End Configuration  -----";

Alessio Netti's avatar
Alessio Netti committed
680
      if (settings.validateConfig)
681
682
683
684
          return EXIT_SUCCESS;
      else
          analyticsController->start();

685
686
687
      /*
       * Start the MQTT Message Server.
       */
Alessio Netti's avatar
Alessio Netti committed
688
      SimpleMQTTServer ms(settings.mqttListenHost, settings.mqttListenPort, settings.messageThreads, settings.messageSlots);
689
      
690
691
692
      ms.setMessageCallback(mqttCallback);
      ms.start();

Alessio Netti's avatar
Logging    
Alessio Netti committed
693
      LOG(info) << "MQTT Server running...";
694
      
695
696
697
698
699
700
701
702
703
      /*
       * Start the HTTP Server for the REST API
       */
      std::thread httpThread;
      httpHandler_t httpHandler;
      httpServer_t::options httpOptions(httpHandler);

      httpOptions.reuse_address(true);
      httpOptions.thread_pool(std::make_shared<boost::network::utils::thread_pool>());
Alessio Netti's avatar
Alessio Netti committed
704
      httpServer_t httpServer(httpOptions.address(restAPISettings.restHost).port(restAPISettings.restPort));
705
706
      httpThread = std::thread([&httpServer] { httpServer.run(); });

Alessio Netti's avatar
Logging    
Alessio Netti committed
707
      LOG(info) <<  "HTTP Server running...";
708

709
710
711
712
713
714
      /*
       * Run (hopefully) forever...
       */
      keepRunning = 1;
      timeval start, end;
      double elapsed;
715
716
717
      msgCtr = 0;
      pmsgCtr = 0;
      readingCtr = 0;
718

Alessio Netti's avatar
Alessio Netti committed
719
720
721
      gettimeofday(&start, NULL);
      uint64_t lastCleanup = start.tv_sec;

722
      LOG(info) << "Collect Agent running...";
723
724
      while(keepRunning) {
          gettimeofday(&start, NULL);
Alessio Netti's avatar
Alessio Netti committed
725
726
727
728
729
730
731
          if(start.tv_sec - lastCleanup > settings.cleaningInterval) {
              uint64_t purged = mySensorCache.clean(settings.cleaningInterval * 1000000000);
              lastCleanup = start.tv_sec;
              if(purged > 0)
                  LOG(info) << "Cache: purged " << purged << " obsolete entries";
          }

732
          sleep(60);
733
734
735
736
          /* not really thread safe but will do the job */
          gettimeofday(&end, NULL);
          elapsed = (end.tv_sec - start.tv_sec) * 1000.0;
          elapsed += (end.tv_usec - start.tv_usec) / 1000.0;
737
          float publish = msgCtr?(pmsgCtr*100.0)/msgCtr:0;
Alessio Netti's avatar
Alessio Netti committed
738
          if (settings.statistics && keepRunning) {
739
              LOG(info) << "Performance: " << (readingCtr/elapsed)*1000.0 << " inserts/s, " << (msgCtr/elapsed)*1000.0 << " messages/s (" << publish << "% PUBLISH)";
740
              LOG(info) << "Analytics Performance: " << (analyticsController->getReadingCtr()/elapsed)*1000.0 << " inserts/s ";
741
          }
742
          msgCtr = 0;
743
          pmsgCtr = 0;
744
	  readingCtr = 0;
745
746
      }

Alessio Netti's avatar
Logging    
Alessio Netti committed
747
      LOG(info) << "Stopping...";
748
      analyticsController->stop();
749
      ms.stop();
Alessio Netti's avatar
Logging    
Alessio Netti committed
750
      LOG(info) << "MQTT Server stopped...";
751
752
      httpServer.stop();
      httpThread.join();
Alessio Netti's avatar
Logging    
Alessio Netti committed
753
      LOG(info) << "HTTP Server stopped...";
754
      delete mySensorDataStore;
755
      delete mySensorConfig;
Axel Auweter's avatar
Axel Auweter committed
756
757
      dcdbConn->disconnect();
      delete dcdbConn;
Alessio Netti's avatar
Logging    
Alessio Netti committed
758
      LOG(info) << "Collect Agent closed. Bye bye...";
759
  }
760
  catch (const exception& e) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
761
      LOG(fatal) << "Exception: " << e.what();
762
      abrt(EXIT_FAILURE, INTERR);
763
764
  }

765
  return EXIT_SUCCESS;
766
}
767
768