collectagent.cpp 31.6 KB
Newer Older
1
2
//================================================================================
// Name        : collectagent.cpp
3
4
// Author      : Axel Auweter
// Copyright   : Leibniz Supercomputing Centre
5
// Description : Main code of the CollectAgent
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2011-2016 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
26

27
28
#include <boost/network/protocol/http/server.hpp>
#include <boost/network/utils/thread_pool.hpp>
29
#include <boost/network/uri.hpp>
30
#include <iostream>
31
#include <cstdlib>
32
#include <signal.h>
33
34
#include <unistd.h>
#include <string>
35

36
#include <boost/date_time/posix_time/posix_time.hpp>
37

38
#include <dcdb/connection.h>
39
#include <dcdb/sensordatastore.h>
Alessio Netti's avatar
Alessio Netti committed
40
#include <dcdb/jobdatastore.h>
41
#include <dcdb/sensorconfig.h>
42
#include <dcdb/version.h>
43
#include <dcdb/sensor.h>
44
#include "version.h"
45

46
#include "configuration.h"
47
#include "simplemqttserver.h"
48
#include "messaging.h"
49
#include "abrt.h"
50
#include "dcdbdaemon.h"
51
#include "sensorcache.h"
52
53
#include "analyticscontroller.h"
#include "../analytics/includes/QueryEngine.h"
54

55
56
57
#define __STDC_FORMAT_MACROS
#include <inttypes.h>

58
using namespace std;
59
60

int keepRunning;
61
bool statistics;
62
63
uint64_t msgCtr;
uint64_t pmsgCtr;
64
uint64_t readingCtr;
Alessio Netti's avatar
Alessio Netti committed
65
SensorCache mySensorCache;
66
AnalyticsController* analyticsController;
67
DCDB::Connection* dcdbConn;
68
DCDB::SensorDataStore *mySensorDataStore;
Alessio Netti's avatar
Alessio Netti committed
69
DCDB::JobDataStore *myJobDataStore;
70
71
DCDB::SensorConfig *mySensorConfig;
DCDB::SCError err;
72
QueryEngine& queryEngine = QueryEngine::getInstance();
73
logger_t lg;
74

Alessio Netti's avatar
Alessio Netti committed
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
std::vector<qeJobData>* jobQueryCallback(const uint32_t jobId, const uint64_t startTs, const uint64_t endTs, vector<qeJobData>* buffer, const bool rel, const bool range) {
    std::list<JobData> tempList;
    JobData   tempData;
    qeJobData tempQeData;
    JDError err;
    if(range) {
        // Getting a list of jobs in the given time range
        uint64_t now = getTimestamp();
        uint64_t startTsInt = rel ? now - startTs : startTs;
        uint64_t endTsInt = rel ? now - endTs : endTs;
        DCDB::TimeStamp start(startTsInt), end(endTsInt);
        err = myJobDataStore->getJobsInIntervalExcl(tempList, start, end);
        if(err != JD_OK) return NULL;
    } else {
        // Getting a single job by id
        err = myJobDataStore->getJobById(tempData, jobId);
        if(err != JD_OK) return NULL;
        tempList.push_back(tempData);
    }
    
    if(!buffer)
        buffer = new std::vector<qeJobData>();
    buffer->clear();
    for(auto& jd : tempList) {
        tempQeData.jobId = jd.jobId;
        tempQeData.userId = jd.userId;
        tempQeData.startTime = jd.startTime.getRaw();
        tempQeData.endTime = jd.endTime.getRaw();
        tempQeData.nodes = jd.nodes;
        buffer->push_back(tempQeData);
    }
    return buffer;
}

109
std::vector<reading_t>* sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>* buffer, const bool rel) {
110
    std::string topic;
Alessio Netti's avatar
Alessio Netti committed
111
    // Getting the topic of the queried sensor from the Navigator
112
113
114
115
116
117
118
    try {
        topic = queryEngine.getNavigator()->getNodeTopic(name);
    } catch(const std::domain_error& e) {
        return NULL;
    }
    std::vector <reading_t> *output = NULL;
    DCDB::SensorId sid;
Alessio Netti's avatar
Alessio Netti committed
119
    // Creating a SID to perform the query
120
121
122
    sid.mqttTopicConvert(topic);
    if(mySensorCache.getSensorMap().count(sid) > 0) {
        CacheEntry &entry = mySensorCache.getSensorMap()[sid];
123
        output = entry.getView(startTs, endTs, buffer, rel);
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
        if (output->size() > 0)
            return output;
    }
    // If we are here then the sensor was not found in the cache - we need to fetch data from Cassandra
    try {
        DCDB::PublicSensor publicSensor;
        publicSensor.name = name;
        publicSensor.pattern = topic;
        std::list <DCDB::SensorDataStoreReading> results;
        DCDB::Sensor sensor(dcdbConn, publicSensor);
        uint64_t now = getTimestamp();
        //Converting relative timestamps to absolute
        uint64_t startTsInt = rel ? now - startTs : startTs;
        uint64_t endTsInt = rel ? now - endTs : endTs;
        DCDB::TimeStamp start(startTsInt), end(endTsInt);
        sensor.query(results, start, end, DCDB::AGGREGATE_NONE);
Alessio Netti's avatar
Alessio Netti committed
140
        // Dealing with allocations that may have been performed by the cache search
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
        if(output)
            output->clear();
        else if(buffer) {
            buffer->clear();
            output = buffer;
        } else
            output = new std::vector<reading_t>();
        reading_t reading;
        //TODO: fix when result contains only partial time range of the query
        for (const auto &r : results) {
            reading.value = r.value;
            reading.timestamp = r.timeStamp.getRaw();
            output->push_back(reading);
        }
    }
    catch(const std::exception& e) {
        if(!buffer && output) delete output;
        return NULL;
    }
    return output;
161
162
}

163
/* Normal termination (SIGINT, CTRL+C) */
164
165
void sigHandler(int sig)
{
Alessio Netti's avatar
Alessio Netti committed
166
167
168
169
170
  boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
  if( sig == SIGINT )
      LOG(fatal) << "Received SIGINT";
  else if( sig == SIGTERM )
      LOG(fatal) << "Received SIGTERM";
171
172
173
  keepRunning = 0;
}

174
175
176
177
178
179
/* Crash */
void abrtHandler(int sig)
{
  abrt(EXIT_FAILURE, SIGNAL);
}

Alessio Netti's avatar
Alessio Netti committed
180
//TODO: trim common code with dcdbpusher
181
182
183
184
185
186
struct httpHandler_t;
typedef boost::network::http::server<httpHandler_t> httpServer_t;
struct httpHandler_t {
  void operator()(httpServer_t::request const &request, httpServer_t::connection_ptr connection) {
    httpServer_t::string_type ip = source(request);
    static httpServer_t::response_header headers[] = { { "Connection", "close" }, { "Content-Type", "text/plain" } };
Alessio Netti's avatar
Alessio Netti committed
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
    
    boost::network::uri::uri uri("https://" + request.destination);
    httpServer_t::string_type method = request.method;
    httpServer_t::string_type path = uri.path();
    httpServer_t::string_type query = uri.query();
    std::string response = "";
    std::ostringstream data;
    std::string auth_value = "";
    std::vector<std::string> pathStrs;
    std::vector<std::pair<std::string, std::string>> queries;
    
    //first check if request is supported at all
    if (method != "GET" && method != "PUT") {
      LOGH(warning) << "Unsupported " << method << " request was made";
      connection->set_status(httpServer_t::connection::not_supported);
      goto error;
    }
    
    LOGH(info) << method << " request of " << request.destination << " was made";
    
    //do some string processing
    //split path into its hierarchical parts
    if (path.size() >= 2) {
      if (path[0] == '/')
          path.erase(0,1);
      if (path[path.size() -1] == '/')
          path.erase(path.size() -1);
      boost::split(pathStrs, path, boost::is_any_of("/"), boost::token_compress_off);
    }
    
    //split query part into the individual queries (key-value pairs)
    {	//do not remove the enclosing brackets
      //need to encapsulate this code block to keep queryStrs local
      std::vector<std::string> queryStrs;
      boost::split(queryStrs, query, boost::is_any_of(";"), boost::token_compress_on);
      for(auto& key : queryStrs) {
          size_t pos = key.find("=");
          if (pos != std::string::npos) {
              std::string value;
              value = key.substr(pos+1);
              key.erase(pos);
              queries.push_back(std::make_pair(key, value));
          }
      }
    }
    //finished string processing
Alessio Netti's avatar
Alessio Netti committed
233
234
235
236
237
238
239
240
241
242
243

    //bool json = false;
    //for (auto& p : queries) {
    //      authkey is required in every case
    //if (p.first == "authkey") {
    //      auth_value = p.second;
    //} else 
    //if (p.first == "json")
    //    if (stoi(p.second) > 0)
    //        json = true;
    //}
Alessio Netti's avatar
Alessio Netti committed
244
245
246
247
248
249
    
    if (pathStrs.size() < 1) {
      LOGH(warning) << "Received malformed request: No first path part";
      connection->set_status(httpServer_t::connection::bad_request);
      goto error;
    }
250

Alessio Netti's avatar
Alessio Netti committed
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
    //select code depending on request
    if (method == "GET") {
        if (pathStrs[0] == "help") {
            response = "collectagent RESTful API cheatsheet:\n"
                       " -GET:  /help     This help message\n"
                       "        /analytics/help\n"
                       "        /[sensor]/avg?interval=[timeInSec]\n"
                       "                  Average of last sensor readings from the last\n"
                       "                  [interval] seconds or of all cached readings\n"
                       "                  if no interval is given\n"
                       "\n";
            //"All resources have to be prepended by host:port and need at\n"
            //"least the query ?authkey=[token] at the end. Multiple queries\n"
            //"need to be separated by semicolons(';')\n";
            response += analyticsController->getManager()->restCheatSheet;
        } else if (pathStrs[0] == "analytics") {
            try {
                restResponse_t reply = analyticsController->REST(pathStrs, queries, method);
                data << reply.data;
                response = reply.response;
            } catch(const std::invalid_argument &e) {
                LOGH(warning) << e.what();
                connection->set_status(httpServer_t::connection::bad_request);
                goto error;
            } catch(const std::domain_error &e) {
                response = e.what();
                connection->set_status(httpServer_t::connection::not_found);
            } catch(const std::exception &e) {
                LOGH(warning) << e.what();
                connection->set_status(httpServer_t::connection::internal_server_error);
                goto error;
            }
        } else {
284

Alessio Netti's avatar
Alessio Netti committed
285
286
287
288
289
290
291
292
293
294
            if (pathStrs.size() < 2) {
                LOGH(warning) << "Received malformed request: No second path part";
                connection->set_status(httpServer_t::connection::bad_request);
                goto error;
            }
            if (pathStrs[1] != "avg") {
                LOGH(warning) << "Unknown action " << pathStrs[1] << " requested";
                connection->set_status(httpServer_t::connection::not_supported);
                goto error;
            }
295

Alessio Netti's avatar
Alessio Netti committed
296
297
298
299
            uint64_t time = 0;
            for (auto &p : queries)
                if (p.first == "interval")
                    time = std::stoul(p.second);
300

Alessio Netti's avatar
Alessio Netti committed
301
302
            //try getting the latest value 
            try {
303
                std::string sensor = MQTTChecker::nameToTopic(pathStrs[0]);
Alessio Netti's avatar
Alessio Netti committed
304
                int64_t val = mySensorCache.getSensor(sensor, (uint64_t) time * 1000000000);
Alessio Netti's avatar
Alessio Netti committed
305
                connection->set_status(httpServer_t::connection::ok);
Alessio Netti's avatar
Alessio Netti committed
306
                response = "collectagent::" + sensor + " Average of last " +
Alessio Netti's avatar
Alessio Netti committed
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
                           std::to_string(time) + " seconds is " + std::to_string(val);
                //data << val << "\n";
                //data << "Sid : " << sid.toString() << ", Value: " << val << "." << std::endl;
            }
            catch (const std::invalid_argument &e) {
                connection->set_status(httpServer_t::connection::not_found);
                response = "Error: Sensor id not found.\n";
            } catch (const std::out_of_range &e) {
                connection->set_status(httpServer_t::connection::no_content);
                response = "Error: Sensor unavailable.\n";
            } catch (const std::exception &e) {
                connection->set_status(httpServer_t::connection::internal_server_error);
                LOGH(warning) << "Internal server error.\n";
                goto error;
            }
        }
    } else if(method == "PUT") {
        if( pathStrs.back() == "reload" )
            analyticsController->halt(true);
        try {
            restResponse_t reply = analyticsController->REST(pathStrs, queries, method);
            data << reply.data;
            response = reply.response;
        } catch(const std::invalid_argument &e) {
            LOGH(warning) << e.what();
            connection->set_status(httpServer_t::connection::bad_request);
            goto error;
        } catch(const std::domain_error &e) {
            response = e.what();
            connection->set_status(httpServer_t::connection::not_found);
        } catch(const std::exception &e) {
            response = e.what();
            connection->set_status(httpServer_t::connection::internal_server_error);
        }
        // Continue MQTTPusher when a reload was performed
        if( pathStrs.back() == "reload" )
            analyticsController->resume();
344
    }
Alessio Netti's avatar
Alessio Netti committed
345
346
347
348
349
350
351
352
    
    LOGH(info) << "Responding: " << response;
    data << response << std::endl;
    
    //Error management section
    error:
    connection->set_headers(boost::make_iterator_range(headers, headers + 2));
    connection->write(data.str());
353
354
355
356
  }
};


357
int mqttCallback(SimpleMQTTMessage *msg)
358
359
360
361
{
  /*
   * Increment the msgCtr/vmsgCtr for statistics.
   */
362
  msgCtr++;
363
364
365
  if (msg->isPublish())
    pmsgCtr++;

366
  uint64_t len;
367
368
369
  /*
   * Decode the message and put into the database.
   */
370
  if (msg->isPublish()) {
371
372
373
374
      const char *topic = msg->getTopic().c_str();
      // We check whether the topic includes the \DCDB_MAP\ keyword, indicating that the payload will contain the
      // sensor's name. In that case, we set the mappingMessage flag to true, and filter the keyword out of the prefix
      // We use strncmp as it is the most efficient way to do it
Alessio Netti's avatar
Alessio Netti committed
375
      if (strncmp(topic, DCDB_MAP, DCDB_MAP_LEN) == 0) {
376
          if ((len = msg->getPayloadLength()) == 0) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
377
              LOG(error) << "Empty topic-to-name mapping message received";
378
379
              return 1;
          }
380

381
382
383
384
385
386
          string sensorName((char *) msg->getPayload(), len);
          err = mySensorConfig->publishSensor(sensorName.c_str(), topic + DCDB_MAP_LEN);

          // PublishSensor does most of the error checking for us
          switch (err) {
              case DCDB::SC_INVALIDPATTERN:
Alessio Netti's avatar
Logging    
Alessio Netti committed
387
                  LOG(error) << "Invalid sensor topic : " << msg->getTopic();
388
389
                  return 1;
              case DCDB::SC_INVALIDPUBLICNAME:
Alessio Netti's avatar
Logging    
Alessio Netti committed
390
                  LOG(error) << "Invalid sensor public name: " << sensorName;
391
392
                  return 1;
              case DCDB::SC_INVALIDSESSION:
Alessio Netti's avatar
Logging    
Alessio Netti committed
393
                  LOG(error) << "Cannot reach sensor data store.";
394
395
396
397
398
399
400
401
402
403
404
                  return 1;
              default:
                  break;
          }
      } else {
          mqttPayload buf, *payload;

          len = msg->getPayloadLength();
          //In the 64 bit message case, the collect agent provides a timestamp
          if (len == sizeof(uint64_t)) {
              payload = &buf;
405
              payload->value = *((int64_t *) msg->getPayload());
406
407
              payload->timestamp = Messaging::calculateTimestamp();
              len = sizeof(uint64_t) * 2;
408
          }
409
410
411
412
413
414
              //...otherwise it just retrieves it from the MQTT message payload.
          else if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
              payload = (mqttPayload *) msg->getPayload();
          }
              //...otherwise this message is malformed -> ignore...
          else {
Alessio Netti's avatar
Logging    
Alessio Netti committed
415
              LOG(error) << "Message malformed";
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
              return 1;
          }

          /*
           * Check if we can decode the message topic
           * into a valid SensorId. If successful, store
           * the record in the database.
           */
          DCDB::SensorId sid;
          if (sid.mqttTopicConvert(msg->getTopic())) {
#if 0
              cout << "Topic decode successful:" << endl
                  << "  Raw:            " << hex << setw(16) << setfill('0') << sid.getRaw()[0] << hex << setw(16) << setfill('0') << sid.getRaw()[1] << endl
                  << "  DeviceLocation: " << hex << setw(16) << setfill('0') << sid.getDeviceLocation() << endl
                  << "  device_id:      " << hex << setw(8) << setfill('0') << sid.getDeviceSensorId().device_id << endl
                  << "  sensor_number:  " << hex << setw(4) << setfill('0') << sid.getDeviceSensorId().sensor_number << endl << dec;

              cout << "Payload ("  << len/sizeof(mqttPayload) << " messages):"<< endl;
              for (uint64_t i=0; i<len/sizeof(mqttPayload); i++) {
                cout << "  " << i << ": ts=" << payload[i].timestamp << " val=" << payload[i].value << endl;
              }
              cout << endl;
438
#endif
439
	      std::list<DCDB::SensorDataStoreReading> readings;
Alessio Netti's avatar
Alessio Netti committed
440
441
442
443
444
445
          for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
              DCDB::SensorDataStoreReading r(sid, payload[i].timestamp, payload[i].value);
              readings.push_back(r);
              mySensorCache.storeSensor(sid, payload[i].timestamp, payload[i].value);
          }
          mySensorCache.getSensorMap()[sid].updateBatchSize(uint64_t(len / sizeof(mqttPayload)));
446
	      mySensorDataStore->insertBatch(readings);
447
	      readingCtr+= readings.size();
448

449
              //mySensorCache.dump();
450
          }
451
#if 1
452
453
454
455
          else {
              cout << "Wrong topic format: " << msg->getTopic() << "\n";
              return 1;
          }
456
#endif
457
      }
458
  }
459
  return 0;
460
461
}

462
463
464



465
466
467
468
/*
 * Print usage information
 */
void usage() {
Alessio Netti's avatar
Alessio Netti committed
469
  Configuration config("", "collectagent.conf");
470
471
472
473
474
  /*
             1         2         3         4         5         6         7         8
   012345678901234567890123456789012345678901234567890123456789012345678901234567890
   */
  cout << "Usage:" << endl;
475
  cout << "  collectagent [-d] [-s] [-x] [-a] [-m<host>] [-c<host>] [-u<username>] [-p<password>] [-t<ttl>] [-v<verbosity>] <path/to/configfiles/>" << endl;
476
477
478
479
  cout << "  collectagent -h" << endl;
  cout << endl;
  
  cout << "Options:" << endl;
Alessio Netti's avatar
Alessio Netti committed
480
481
  cout << "  -m<host>      MQTT listen address     [default: " << config.mqttListenHost << ":" << config.mqttListenPort << "]" << endl;
  cout << "  -c<host>      Cassandra host          [default: " << config.cassandraSettings.host << ":" << config.cassandraSettings.port << "]" << endl;
Michael Ott's avatar
Michael Ott committed
482
483
  cout << "  -u<username>  Cassandra username      [default: none]" << endl;
  cout << "  -p<password>  Cassandra password      [default: none]" << endl;
Alessio Netti's avatar
Alessio Netti committed
484
485
  cout << "  -t<ttl>       Cassandra insert TTL    [default: " << config.cassandraSettings.ttl << "]" << endl;
  cout << "  -v<level>     Set verbosity of output [default: " << config.logLevelCmd << "]" << endl
Alessio Netti's avatar
Logging    
Alessio Netti committed
486
       << "                Can be a number between 5 (all) and 0 (fatal)." << endl;
487
  cout << endl;
Michael Ott's avatar
Michael Ott committed
488
  cout << "  -d            Daemonize" << endl;
489
  cout << "  -s            Print message stats" <<endl;
490
  cout << "  -x            Parse and print the config but do not actually start collectagent" << endl;
491
  cout << "  -a			   Enable sensor auto-publish" << endl;
Michael Ott's avatar
Michael Ott committed
492
  cout << "  -h            This help page" << endl;
493
  cout << endl;
494
495
496
}

int main(int argc, char* const argv[]) {
497
    cout << "CollectAgent " << VERSION << " (libdcdb " << DCDB::Version::getVersion() << ")" << endl << endl;
498
499

  try{
500

501
502
503
504
505
506
507
508
      // Checking if path to config is supplied
      if (argc <= 1) {
          cout << "Please specify a path to the config-directory" << endl << endl;
          usage();
          exit(EXIT_FAILURE);
      }

      // Defining options
509
      const char* opts = "m:r:c:C:u:p:t:v:dDsaxh";
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525

      // Same mechanism as in DCDBPusher - checking if help string is requested before loading config
      char ret;
      while ((ret = getopt(argc, argv, opts)) != -1) {
          switch (ret)
          {
              case 'h':
                  usage();
                  exit(EXIT_FAILURE);
                  break;
              default:
                  //do nothing (other options are read later on)
                  break;
          }
      }

526
527
      initLogging();
      auto cmdSink = setupCmdLogger();
Alessio Netti's avatar
Logging    
Alessio Netti committed
528

Alessio Netti's avatar
Alessio Netti committed
529
530
      Configuration config(argv[argc - 1], "collectagent.conf");
      if( !config.readConfig() ) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
531
          LOG(fatal) << "Failed to read global configuration!";
532
533
534
          exit(EXIT_FAILURE);
      }

Alessio Netti's avatar
Alessio Netti committed
535
536
537
538
539
540
      // References to shorten access to config parameters
      Configuration& settings = config;
      cassandra_t& cassandraSettings = config.cassandraSettings;
      pluginSettings_t& pluginSettings = config.pluginSettings;
      restAPISettings_t& restAPISettings = config.restAPISettings;
      analyticsSettings_t& analyticsSettings = config.analyticsSettings;
Alessio Netti's avatar
Alessio Netti committed
541
      
542
543
      optind = 1;
      while ((ret=getopt(argc, argv, opts))!=-1) {
544
          switch(ret) {
545
              case 'a':
546
                  pluginSettings.autoPublish = true;
547
                  break;
548
              case 'm':
Alessio Netti's avatar
Alessio Netti committed
549
550
551
                  settings.mqttListenHost = parseNetworkHost(optarg);
                  settings.mqttListenPort = parseNetworkPort(optarg);
                  if(settings.mqttListenPort=="") settings.mqttListenPort = string(LISTENPORT);
552
                  break;
553
              case 'c':
Alessio Netti's avatar
Alessio Netti committed
554
555
556
                  cassandraSettings.host = parseNetworkHost(optarg);
                  cassandraSettings.port = parseNetworkPort(optarg);
                  if(cassandraSettings.port=="") cassandraSettings.port = string(CASSANDRAPORT);
557
                  break;
Michael Ott's avatar
Michael Ott committed
558
              case 'u':
Alessio Netti's avatar
Alessio Netti committed
559
                  cassandraSettings.username = optarg;
560
                  break;
Michael Ott's avatar
Michael Ott committed
561
              case 'p': {
Alessio Netti's avatar
Alessio Netti committed
562
                  cassandraSettings.password = optarg;
563
                  // What does this do? Mask the password?
Michael Ott's avatar
Michael Ott committed
564
565
566
567
568
569
570
                  size_t pwdLen = strlen(optarg);
                  memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
                  if (pwdLen > 3) {
                      memset(optarg+3, 0, pwdLen-3);
                  }
                  break;
              }
571
              case 't':
Alessio Netti's avatar
Alessio Netti committed
572
                  cassandraSettings.ttl = stoul(optarg);
573
                  break;
Alessio Netti's avatar
Logging    
Alessio Netti committed
574
              case 'v':
575
                  settings.logLevelCmd = stoi(optarg);
Alessio Netti's avatar
Logging    
Alessio Netti committed
576
                  break;
577
              case 'd':
578
              case 'D':
579
                  settings.daemonize = 1;
580
                  break;
581
              case 's':
582
                  settings.statistics = 1;
583
                  break;
584
              case 'x':
Alessio Netti's avatar
Alessio Netti committed
585
                  settings.validateConfig = true;
586
                  break;
587
              case 'h':
588
589
590
              default:
                  usage();
                  exit(EXIT_FAILURE);
591
592
593
          }
      }

594
595
596
597
598
599
      //set up logger to file
      if (settings.logLevelFile >= 0) {
	  auto fileSink = setupFileLogger(pluginSettings.tempdir, std::string("dcdbpusher"));
	  fileSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelFile));
      }
      
Alessio Netti's avatar
Logging    
Alessio Netti committed
600
      //severity level may be overwritten (per option or config-file) --> set it according to globalSettings
601
602
603
      if (settings.logLevelCmd >= 0) {
	  cmdSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelCmd));
      }
Alessio Netti's avatar
Logging    
Alessio Netti committed
604

605
      /*
Alessio Netti's avatar
Alessio Netti committed
606
       * Catch SIGINT and SIGTERM signals to allow for proper server shutdowns.
607
608
       */
      signal(SIGINT, sigHandler);
Alessio Netti's avatar
Alessio Netti committed
609
      signal(SIGTERM, sigHandler);
610
611
612
613
614
615
616
617
618
619

      /*
       * Catch critical signals to allow for backtraces
       */
      signal(SIGABRT, abrtHandler);
      signal(SIGSEGV, abrtHandler);

      // Daemonizing the collectagent
      if(settings.daemonize)
          dcdbdaemon();
Alessio Netti's avatar
Alessio Netti committed
620
      
621
      // Setting the size of the sensor cache
622
      // Conversion from milliseconds to nanoseconds
Alessio Netti's avatar
Alessio Netti committed
623
      mySensorCache.setMaxHistory(uint64_t(pluginSettings.cacheInterval) * 1000000);
624

625
      //Allocate and initialize connection to Cassandra.
Alessio Netti's avatar
Alessio Netti committed
626
627
      dcdbConn = new DCDB::Connection(cassandraSettings.host, atoi(cassandraSettings.port.c_str()), 
                                      cassandraSettings.username, cassandraSettings.password);
Alessio Netti's avatar
Alessio Netti committed
628
629
630
      dcdbConn->setNumThreadsIo(cassandraSettings.numThreadsIo);
      dcdbConn->setQueueSizeIo(cassandraSettings.queueSizeIo);
      uint32_t params[3] = {cassandraSettings.coreConnPerHost, cassandraSettings.maxConnPerHost, cassandraSettings.maxConcRequests};
Alessio Netti's avatar
Alessio Netti committed
631
      dcdbConn->setBackendParams(params);
Alessio Netti's avatar
Alessio Netti committed
632
      
Axel Auweter's avatar
Axel Auweter committed
633
      if (!dcdbConn->connect()) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
634
          LOG(fatal) << "Cannot connect to Cassandra!";
635
636
637
638
639
640
          exit(EXIT_FAILURE);
      }

      /*
       * Legacy behavior: Initialize the DCDB schema in Cassandra.
       */
Axel Auweter's avatar
Axel Auweter committed
641
      dcdbConn->initSchema();
Alessio Netti's avatar
Alessio Netti committed
642
      
643
644
645
      /*
       * Allocate the SensorDataStore.
       */
646
      mySensorDataStore = new DCDB::SensorDataStore(dcdbConn);
647
      mySensorConfig = new DCDB::SensorConfig(dcdbConn);
Alessio Netti's avatar
Alessio Netti committed
648
      myJobDataStore = new DCDB::JobDataStore(dcdbConn);
649
650
651

      /*
       * Set TTL for data store inserts if TTL > 0.
652
       */
Alessio Netti's avatar
Alessio Netti committed
653
654
655
      if (cassandraSettings.ttl > 0)
        mySensorDataStore->setTTL(cassandraSettings.ttl);
      mySensorDataStore->setDebugLog(cassandraSettings.debugLog);
656

657
658
659
660
      analyticsController = new AnalyticsController(mySensorConfig, mySensorDataStore);
      analyticsController->setCache(&mySensorCache);
      if(!analyticsController->initialize(settings, argv[argc - 1]))
          return EXIT_FAILURE;
661
      queryEngine.setQueryCallback(sensorQueryCallback);
Alessio Netti's avatar
Alessio Netti committed
662
      queryEngine.setJobQueryCallback(jobQueryCallback);
663

Alessio Netti's avatar
Alessio Netti committed
664
      LOG_LEVEL vLogLevel = settings.validateConfig ? LOG_LEVEL::info : LOG_LEVEL::debug;
665
666
667
668
      LOG_VAR(vLogLevel) << "-----  Configuration  -----";

      //print global settings in either case
      LOG(info) << "Global Settings:";
Alessio Netti's avatar
Alessio Netti committed
669
      LOG(info) << "    MQTT-listenAddress: " << settings.mqttListenHost << ":" << settings.mqttListenPort;
Alessio Netti's avatar
Alessio Netti committed
670
      LOG(info) << "    CacheInterval:      " << int(pluginSettings.cacheInterval/1000) << " [s]";
671
672
673
674
675
      LOG(info) << "    CleaningInterval:   " << settings.cleaningInterval << " [s]";
      LOG(info) << "    MessageThreads:     " << settings.messageThreads;
      LOG(info) << "    MessageSlots:       " << settings.messageSlots;
      LOG(info) << "    Daemonize:          " << (settings.daemonize ? "Enabled" : "Disabled");
      LOG(info) << "    Statistics:         " << (settings.statistics ? "Enabled" : "Disabled");
Alessio Netti's avatar
Alessio Netti committed
676
      LOG(info) << "    MQTT-prefix:        " << pluginSettings.mqttPrefix;
677
      LOG(info) << "    Auto-publish:       " << (pluginSettings.autoPublish ? "Enabled" : "Disabled");
Alessio Netti's avatar
Alessio Netti committed
678
679
      LOG(info) << "    Write-Dir:          " << pluginSettings.tempdir;
      LOG(info) << (settings.validateConfig ? "    Only validating config files." : "    ValidateConfig:     Disabled");
680

Alessio Netti's avatar
Alessio Netti committed
681
682
      LOG(info) << "Analytics Settings:";
      LOG(info) << "    Hierarchy:          " << (analyticsSettings.hierarchy!="" ? analyticsSettings.hierarchy : "none");
683
      LOG(info) << "    Filter:             " << (analyticsSettings.filter!="" ? analyticsSettings.filter : "none");
Alessio Netti's avatar
Alessio Netti committed
684
      
685
      LOG(info) << "Cassandra Driver Settings:";
Alessio Netti's avatar
Alessio Netti committed
686
      LOG(info) << "    Address:            " << cassandraSettings.host << ":" << cassandraSettings.port;
Alessio Netti's avatar
Alessio Netti committed
687
688
689
690
691
692
693
      LOG(info) << "    TTL:                " << cassandraSettings.ttl;
      LOG(info) << "    NumThreadsIO:       " << cassandraSettings.numThreadsIo;
      LOG(info) << "    QueueSizeIO:        " << cassandraSettings.queueSizeIo;
      LOG(info) << "    CoreConnPerHost:    " << cassandraSettings.coreConnPerHost;
      LOG(info) << "    MaxConnPerHost:     " << cassandraSettings.maxConnPerHost;
      LOG(info) << "    MaxConcRequests:    " << cassandraSettings.maxConcRequests;
      LOG(info) << "    DebugLog:           " << (cassandraSettings.debugLog ? "Enabled" : "Disabled");
694
#ifdef SimpleMQTTVerbose
Alessio Netti's avatar
Alessio Netti committed
695
696
      LOG(info) << "    Username:           " << cassandraSettings.username;
	  LOG(info) << "    Password:           " << cassandraSettings.password;
697
698
699
700
701
#else
      LOG(info) << "    Username and password not printed.";
#endif

      LOG(info) << "RestAPI Settings:";
Alessio Netti's avatar
Alessio Netti committed
702
703
704
705
706
707
708
709
      LOG(info) << "    REST Server: " << restAPISettings.restHost << ":" << restAPISettings.restPort;
#ifdef SimpleMQTTVerbose
      LOG(info) << "    Certificate: " << restAPISettings.certificate;
	  LOG(info) << "    Private key file: " << restAPISettings.privateKey;
	  LOG(info) << "    DH params from: " << restAPISettings.dhFile;
#else
      LOG(info) << "    Certificate, private key and DH-param file not printed.";
#endif
710
711
712
713
714
715
716
717

      LOG_VAR(vLogLevel) << "-----  Analytics Configuration  -----";
      for(auto& p : analyticsController->getManager()->getPlugins()) {
          LOG_VAR(vLogLevel) << "Analytics Plugin \"" << p.id << "\"";
          p.configurator->printConfig(vLogLevel);
      }
      LOG_VAR(vLogLevel) << "-----  End Configuration  -----";

Alessio Netti's avatar
Alessio Netti committed
718
      if (settings.validateConfig)
719
720
721
722
          return EXIT_SUCCESS;
      else
          analyticsController->start();

723
724
725
      /*
       * Start the MQTT Message Server.
       */
Alessio Netti's avatar
Alessio Netti committed
726
      SimpleMQTTServer ms(settings.mqttListenHost, settings.mqttListenPort, settings.messageThreads, settings.messageSlots);
727
      
728
729
730
      ms.setMessageCallback(mqttCallback);
      ms.start();

Alessio Netti's avatar
Logging    
Alessio Netti committed
731
      LOG(info) << "MQTT Server running...";
732
      
733
734
735
736
737
738
739
740
741
      /*
       * Start the HTTP Server for the REST API
       */
      std::thread httpThread;
      httpHandler_t httpHandler;
      httpServer_t::options httpOptions(httpHandler);

      httpOptions.reuse_address(true);
      httpOptions.thread_pool(std::make_shared<boost::network::utils::thread_pool>());
Alessio Netti's avatar
Alessio Netti committed
742
      httpServer_t httpServer(httpOptions.address(restAPISettings.restHost).port(restAPISettings.restPort));
743
744
      httpThread = std::thread([&httpServer] { httpServer.run(); });

Alessio Netti's avatar
Logging    
Alessio Netti committed
745
      LOG(info) <<  "HTTP Server running...";
746

747
748
749
750
751
752
      /*
       * Run (hopefully) forever...
       */
      keepRunning = 1;
      timeval start, end;
      double elapsed;
753
754
755
      msgCtr = 0;
      pmsgCtr = 0;
      readingCtr = 0;
756

Alessio Netti's avatar
Alessio Netti committed
757
758
759
      gettimeofday(&start, NULL);
      uint64_t lastCleanup = start.tv_sec;

760
      LOG(info) << "Collect Agent running...";
761
762
      while(keepRunning) {
          gettimeofday(&start, NULL);
Alessio Netti's avatar
Alessio Netti committed
763
764
765
766
767
768
769
          if(start.tv_sec - lastCleanup > settings.cleaningInterval) {
              uint64_t purged = mySensorCache.clean(settings.cleaningInterval * 1000000000);
              lastCleanup = start.tv_sec;
              if(purged > 0)
                  LOG(info) << "Cache: purged " << purged << " obsolete entries";
          }

770
          sleep(60);
771
772
773
774
          /* not really thread safe but will do the job */
          gettimeofday(&end, NULL);
          elapsed = (end.tv_sec - start.tv_sec) * 1000.0;
          elapsed += (end.tv_usec - start.tv_usec) / 1000.0;
775
          float publish = msgCtr?(pmsgCtr*100.0)/msgCtr:0;
Alessio Netti's avatar
Alessio Netti committed
776
          if (settings.statistics && keepRunning) {
777
              LOG(info) << "Performance: " << (readingCtr/elapsed)*1000.0 << " inserts/s, " << (msgCtr/elapsed)*1000.0 << " messages/s (" << publish << "% PUBLISH)";
778
              LOG(info) << "Analytics Performance: " << (analyticsController->getReadingCtr()/elapsed)*1000.0 << " inserts/s ";
779
          }
780
          msgCtr = 0;
781
          pmsgCtr = 0;
782
	  readingCtr = 0;
783
784
      }

Alessio Netti's avatar
Logging    
Alessio Netti committed
785
      LOG(info) << "Stopping...";
786
      analyticsController->stop();
787
      ms.stop();
Alessio Netti's avatar
Logging    
Alessio Netti committed
788
      LOG(info) << "MQTT Server stopped...";
789
790
      httpServer.stop();
      httpThread.join();
Alessio Netti's avatar
Logging    
Alessio Netti committed
791
      LOG(info) << "HTTP Server stopped...";
792
      delete mySensorDataStore;
Alessio Netti's avatar
Alessio Netti committed
793
      delete myJobDataStore;
794
      delete mySensorConfig;
Axel Auweter's avatar
Axel Auweter committed
795
796
      dcdbConn->disconnect();
      delete dcdbConn;
Alessio Netti's avatar
Logging    
Alessio Netti committed
797
      LOG(info) << "Collect Agent closed. Bye bye...";
798
  }
799
  catch (const exception& e) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
800
      LOG(fatal) << "Exception: " << e.what();
801
      abrt(EXIT_FAILURE, INTERR);
802
803
  }

804
  return EXIT_SUCCESS;
805
}
806
807