collectagent.cpp 20.8 KB
Newer Older
1
2
//================================================================================
// Name        : collectagent.cpp
3
4
// Author      : Axel Auweter
// Copyright   : Leibniz Supercomputing Centre
5
// Description : Main code of the CollectAgent
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2011-2016 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
26

27
28
#include <boost/network/protocol/http/server.hpp>
#include <boost/network/utils/thread_pool.hpp>
29
#include <boost/network/uri.hpp>
30
#include <iostream>
31
#include <cstdlib>
32
#include <signal.h>
33
34
#include <unistd.h>
#include <string>
35

36
#include <boost/date_time/posix_time/posix_time.hpp>
37

38
#include <dcdb/connection.h>
39
#include <dcdb/sensordatastore.h>
40
#include <dcdb/sensorconfig.h>
41
#include <dcdb/version.h>
42
#include "version.h"
43

44
#include "configuration.h"
45
#include "simplemqttserver.h"
46
#include "messaging.h"
47
#include "abrt.h"
48
#include "dcdbdaemon.h"
49
50
#include "sensorcache.h"

51
52
53
#define __STDC_FORMAT_MACROS
#include <inttypes.h>

54
using namespace std;
55
56

int keepRunning;
57
bool statistics;
58
59
uint64_t msgCtr;
uint64_t pmsgCtr;
60
uint64_t readingCtr;
Alessio Netti's avatar
Alessio Netti committed
61
SensorCache mySensorCache;
62
DCDB::Connection* dcdbConn;
63
DCDB::SensorDataStore *mySensorDataStore;
64
65
DCDB::SensorConfig *mySensorConfig;
DCDB::SCError err;
66
logger_t lg;
67

68
/* Normal termination (SIGINT, CTRL+C) */
69
70
void sigHandler(int sig)
{
Alessio Netti's avatar
Alessio Netti committed
71
72
73
74
75
  boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
  if( sig == SIGINT )
      LOG(fatal) << "Received SIGINT";
  else if( sig == SIGTERM )
      LOG(fatal) << "Received SIGTERM";
76
77
78
  keepRunning = 0;
}

79
80
81
82
83
84
/* Crash */
void abrtHandler(int sig)
{
  abrt(EXIT_FAILURE, SIGNAL);
}

85
86
87
88
89
struct httpHandler_t;
typedef boost::network::http::server<httpHandler_t> httpServer_t;
struct httpHandler_t {
  void operator()(httpServer_t::request const &request, httpServer_t::connection_ptr connection) {
    httpServer_t::string_type ip = source(request);
90
    std::ostringstream data;
91
    static httpServer_t::response_header headers[] = { { "Connection", "close" }, { "Content-Type", "text/plain" } };
92

93
94
    //try getting the latest value 
    try {
95
96
97
98
99
      boost::network::uri::uri uri("http://localhost"+request.destination);
      std::map<std::string, std::string> queries;
      boost::network::uri::query_map(uri, queries);
      int avg = atoi(queries.find("avg")->second.c_str());

Alessio Netti's avatar
Alessio Netti committed
100
      int64_t val = mySensorCache.getSensor(uri.path(), (uint64_t)avg * 1000000000);
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124

      data << val << "\n";
      //data << "Sid : " << sid.toString() << ", Value: " << val << "." << std::endl;

      connection->set_status(httpServer_t::connection::ok);
      connection->set_headers(boost::make_iterator_range(headers, headers + 2));
      connection->write(data.str());

    }

    catch (const std::invalid_argument& e) {
      connection->set_status(httpServer_t::connection::not_found);
      connection->set_headers(boost::make_iterator_range(headers, headers + 2));
      connection->write("Error: Sensor id not found.\n");
    } catch (const std::out_of_range &e) {
      connection->set_status(httpServer_t::connection::no_content);
      connection->set_headers(boost::make_iterator_range(headers, headers + 2));
      connection->write("Error: Sensor unavailable.\n");
    } catch (const std::exception& e) {
      connection->set_status(httpServer_t::connection::internal_server_error);
      connection->set_headers(boost::make_iterator_range(headers, headers + 2));
      connection->write("Server error.\n");
    }

125
126
127
128
  }
};


129
int mqttCallback(SimpleMQTTMessage *msg)
130
131
132
133
{
  /*
   * Increment the msgCtr/vmsgCtr for statistics.
   */
134
  msgCtr++;
135
136
137
  if (msg->isPublish())
    pmsgCtr++;

138
  uint64_t len;
139
140
141
  /*
   * Decode the message and put into the database.
   */
142
  if (msg->isPublish()) {
143
144
145
146
      const char *topic = msg->getTopic().c_str();
      // We check whether the topic includes the \DCDB_MAP\ keyword, indicating that the payload will contain the
      // sensor's name. In that case, we set the mappingMessage flag to true, and filter the keyword out of the prefix
      // We use strncmp as it is the most efficient way to do it
Alessio Netti's avatar
Alessio Netti committed
147
      if (strncmp(topic, DCDB_MAP, DCDB_MAP_LEN) == 0) {
148
          if ((len = msg->getPayloadLength()) == 0) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
149
              LOG(error) << "Empty topic-to-name mapping message received";
150
151
              return 1;
          }
152

153
154
155
156
157
158
          string sensorName((char *) msg->getPayload(), len);
          err = mySensorConfig->publishSensor(sensorName.c_str(), topic + DCDB_MAP_LEN);

          // PublishSensor does most of the error checking for us
          switch (err) {
              case DCDB::SC_INVALIDPATTERN:
Alessio Netti's avatar
Logging    
Alessio Netti committed
159
                  LOG(error) << "Invalid sensor topic : " << msg->getTopic();
160
161
                  return 1;
              case DCDB::SC_INVALIDPUBLICNAME:
Alessio Netti's avatar
Logging    
Alessio Netti committed
162
                  LOG(error) << "Invalid sensor public name: " << sensorName;
163
164
                  return 1;
              case DCDB::SC_INVALIDSESSION:
Alessio Netti's avatar
Logging    
Alessio Netti committed
165
                  LOG(error) << "Cannot reach sensor data store.";
166
167
168
169
170
171
172
173
174
175
176
                  return 1;
              default:
                  break;
          }
      } else {
          mqttPayload buf, *payload;

          len = msg->getPayloadLength();
          //In the 64 bit message case, the collect agent provides a timestamp
          if (len == sizeof(uint64_t)) {
              payload = &buf;
177
              payload->value = *((int64_t *) msg->getPayload());
178
179
              payload->timestamp = Messaging::calculateTimestamp();
              len = sizeof(uint64_t) * 2;
180
          }
181
182
183
184
185
186
              //...otherwise it just retrieves it from the MQTT message payload.
          else if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
              payload = (mqttPayload *) msg->getPayload();
          }
              //...otherwise this message is malformed -> ignore...
          else {
Alessio Netti's avatar
Logging    
Alessio Netti committed
187
              LOG(error) << "Message malformed";
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
              return 1;
          }

          /*
           * Check if we can decode the message topic
           * into a valid SensorId. If successful, store
           * the record in the database.
           */
          DCDB::SensorId sid;
          if (sid.mqttTopicConvert(msg->getTopic())) {
#if 0
              cout << "Topic decode successful:" << endl
                  << "  Raw:            " << hex << setw(16) << setfill('0') << sid.getRaw()[0] << hex << setw(16) << setfill('0') << sid.getRaw()[1] << endl
                  << "  DeviceLocation: " << hex << setw(16) << setfill('0') << sid.getDeviceLocation() << endl
                  << "  device_id:      " << hex << setw(8) << setfill('0') << sid.getDeviceSensorId().device_id << endl
                  << "  sensor_number:  " << hex << setw(4) << setfill('0') << sid.getDeviceSensorId().sensor_number << endl << dec;

              cout << "Payload ("  << len/sizeof(mqttPayload) << " messages):"<< endl;
              for (uint64_t i=0; i<len/sizeof(mqttPayload); i++) {
                cout << "  " << i << ": ts=" << payload[i].timestamp << " val=" << payload[i].value << endl;
              }
              cout << endl;
210
#endif
211
	      std::list<DCDB::SensorDataStoreReading> readings;
212
              for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
213
214
		  DCDB::SensorDataStoreReading r(sid, payload[i].timestamp, payload[i].value);
		  readings.push_back(r);
215
216
                  mySensorCache.storeSensor(sid, payload[i].timestamp, payload[i].value);
              }
217
	      mySensorDataStore->insertBatch(readings);
218
	      readingCtr+= readings.size();
219

220
              //mySensorCache.dump();
221
          }
222
#if 1
223
224
225
226
          else {
              cout << "Wrong topic format: " << msg->getTopic() << "\n";
              return 1;
          }
227
#endif
228
      }
229
  }
230
  return 0;
231
232
}

233
234
235



236
237
238
239
/*
 * Print usage information
 */
void usage() {
Alessio Netti's avatar
Alessio Netti committed
240
241
  Configuration config("");
  globalCA_t& defaults = config.getGlobal();
242
243
244
245
246
  /*
             1         2         3         4         5         6         7         8
   012345678901234567890123456789012345678901234567890123456789012345678901234567890
   */
  cout << "Usage:" << endl;
247
  cout << "  collectagent [-d] [-s] [-x] [-m<host>] [-r<host>] [-c<host>] [-u<username>] [-p<password>] [-t<ttl>] [-v<verbosity>] <path/to/configfiles/>" << endl;
248
249
250
251
  cout << "  collectagent -h" << endl;
  cout << endl;
  
  cout << "Options:" << endl;
252
253
254
  cout << "  -m<host>      MQTT listen address     [default: " << defaults.mqttListenAddress << "]" << endl;
  cout << "  -r<host>      REST API listen address [default: " << defaults.restListenAddress << "]" << endl;
  cout << "  -c<host>      Cassandra host          [default: " << defaults.cassandraSettings.address << "]" << endl;
Michael Ott's avatar
Michael Ott committed
255
256
  cout << "  -u<username>  Cassandra username      [default: none]" << endl;
  cout << "  -p<password>  Cassandra password      [default: none]" << endl;
257
  cout << "  -t<ttl>       Cassandra insert TTL    [default: " << defaults.cassandraSettings.ttl << "]" << endl;
Alessio Netti's avatar
Logging    
Alessio Netti committed
258
259
  cout << "  -v<level>     Set verbosity of output [default: " << defaults.logLevelCmd << "]" << endl
       << "                Can be a number between 5 (all) and 0 (fatal)." << endl;
260
  cout << endl;
Michael Ott's avatar
Michael Ott committed
261
  cout << "  -d            Daemonize" << endl;
262
  cout << "  -s            Print message stats" <<endl;
263
  cout << "  -x            Parse and print the config but do not actually start collectagent" << endl;
Michael Ott's avatar
Michael Ott committed
264
  cout << "  -h            This help page" << endl;
265
  cout << endl;
266
267
268
}

int main(int argc, char* const argv[]) {
269
    cout << "CollectAgent " << VERSION << " (libdcdb " << DCDB::Version::getVersion() << ")" << endl << endl;
270
    bool validateConfig = false;
271
272

  try{
273

274
275
276
277
278
279
280
281
      // Checking if path to config is supplied
      if (argc <= 1) {
          cout << "Please specify a path to the config-directory" << endl << endl;
          usage();
          exit(EXIT_FAILURE);
      }

      // Defining options
282
      const char* opts = "m:r:c:C:u:p:t:v:dDsxh";
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298

      // Same mechanism as in DCDBPusher - checking if help string is requested before loading config
      char ret;
      while ((ret = getopt(argc, argv, opts)) != -1) {
          switch (ret)
          {
              case 'h':
                  usage();
                  exit(EXIT_FAILURE);
                  break;
              default:
                  //do nothing (other options are read later on)
                  break;
          }
      }

299
300
      initLogging();
      auto cmdSink = setupCmdLogger();
Alessio Netti's avatar
Logging    
Alessio Netti committed
301

302
303
      Configuration config(argv[argc - 1]);
      if( !config.readGlobal() ) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
304
          LOG(fatal) << "Failed to read global configuration!";
305
306
307
308
          exit(EXIT_FAILURE);
      }

      globalCA_t& settings = config.getGlobal();
309

310
      /* Parse command line */
311
      std::string listenHost, cassandraHost, restApiHost;
312
      std::string listenPort, cassandraPort, restApiPort;
313
314
315

      optind = 1;
      while ((ret=getopt(argc, argv, opts))!=-1) {
316
          switch(ret) {
317
              case 'm':
318
                  settings.mqttListenAddress = optarg;
319
                  break;
320
              case 'r':
321
                  settings.restListenAddress = optarg;
322
                  break;
323
              case 'c':
324
325
                  settings.cassandraSettings.address = optarg;
                  break;
Michael Ott's avatar
Michael Ott committed
326
              case 'u':
327
                  settings.cassandraSettings.username = optarg;
328
                  break;
Michael Ott's avatar
Michael Ott committed
329
              case 'p': {
330
331
                  settings.cassandraSettings.password = optarg;
                  // What does this do? Mask the password?
Michael Ott's avatar
Michael Ott committed
332
333
334
335
336
337
338
                  size_t pwdLen = strlen(optarg);
                  memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
                  if (pwdLen > 3) {
                      memset(optarg+3, 0, pwdLen-3);
                  }
                  break;
              }
339
              case 't':
340
                  settings.cassandraSettings.ttl = stoul(optarg);
341
                  break;
Alessio Netti's avatar
Logging    
Alessio Netti committed
342
              case 'v':
343
                  settings.logLevelCmd = translateLogLevel(stoi(optarg));
Alessio Netti's avatar
Logging    
Alessio Netti committed
344
                  break;
345
              case 'd':
346
              case 'D':
347
                  settings.daemonize = 1;
348
                  break;
349
              case 's':
350
                  settings.statistics = 1;
351
                  break;
352
353
354
              case 'x':
                  validateConfig = true;
                  break;
355
              case 'h':
356
357
358
              default:
                  usage();
                  exit(EXIT_FAILURE);
359
360
361
          }
      }

362
      auto fileSink = setupFileLogger(settings.tempDir, std::string("collectagent"));
Alessio Netti's avatar
Logging    
Alessio Netti committed
363
364
365
366
      //severity level may be overwritten (per option or config-file) --> set it according to globalSettings
      fileSink->set_filter(boost::log::trivial::severity >= settings.logLevelFile);
      cmdSink->set_filter(boost::log::trivial::severity >= settings.logLevelCmd);

367
      /*
Alessio Netti's avatar
Alessio Netti committed
368
       * Catch SIGINT and SIGTERM signals to allow for proper server shutdowns.
369
370
       */
      signal(SIGINT, sigHandler);
Alessio Netti's avatar
Alessio Netti committed
371
      signal(SIGTERM, sigHandler);
372
373
374
375
376
377
378

      /*
       * Catch critical signals to allow for backtraces
       */
      signal(SIGABRT, abrtHandler);
      signal(SIGSEGV, abrtHandler);

379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
      LOG_LEVEL vLogLevel = validateConfig ? LOG_LEVEL::info : LOG_LEVEL::debug;
      LOG_VAR(vLogLevel) << "-----  Configuration  -----";

      //print global settings in either case
      LOG(info) << "Global Settings:";
      LOG(info) << "    MQTT-listenAddress: " << settings.mqttListenAddress;
      LOG(info) << "    CacheInterval:      " << settings.cacheInterval << " [s]";
      LOG(info) << "    CleaningInterval:   " << settings.cleaningInterval << " [s]";
      LOG(info) << "    MessageThreads:     " << settings.messageThreads;
      LOG(info) << "    MessageSlots:       " << settings.messageSlots;
      LOG(info) << "    Daemonize:          " << (settings.daemonize ? "Enabled" : "Disabled");
      LOG(info) << "    Statistics:         " << (settings.statistics ? "Enabled" : "Disabled");
      LOG(info) << "    Write-Dir:          " << settings.tempDir;
      LOG(info) << "    Hierarchy:          " << (settings.hierarchy!="" ? settings.hierarchy : "none");
      LOG(info) << (validateConfig ? "    Only validating config files." : "    ValidateConfig:     Disabled");

      LOG(info) << "Cassandra Driver Settings:";
      LOG(info) << "    Address:            " << settings.cassandraSettings.address;
      LOG(info) << "    TTL:                " << settings.cassandraSettings.ttl;
      LOG(info) << "    NumThreadsIO:       " << settings.cassandraSettings.numThreadsIo;
      LOG(info) << "    QueueSizeIO:        " << settings.cassandraSettings.queueSizeIo;
      LOG(info) << "    CoreConnPerHost:    " << settings.cassandraSettings.coreConnPerHost;
      LOG(info) << "    MaxConnPerHost:     " << settings.cassandraSettings.maxConnPerHost;
      LOG(info) << "    MaxConcRequests:    " << settings.cassandraSettings.maxConcRequests;
      LOG(info) << "    DebugLog:           " << (settings.cassandraSettings.debugLog ? "Enabled" : "Disabled");
#ifdef SimpleMQTTVerbose
      LOG(info) << "    Username:           " << settings.cassandraSettings.username;
	  LOG(info) << "    Password:           " << settings.cassandraSettings.password;
#else
      LOG(info) << "    Username and password not printed.";
#endif

      LOG(info) << "RestAPI Settings:";
      LOG(info) << "    REST Server: " << settings.restListenAddress;

      LOG_VAR(vLogLevel) << "-----  End Configuration  -----";

      if (validateConfig)
          return 0;

419
420
421
422
      // Daemonizing the collectagent
      if(settings.daemonize)
          dcdbdaemon();

423
424
425
      /*
       * Parse hostnames for port specifications
       */
426
      listenHost = string(settings.mqttListenAddress);
427
428
429
430
431
      size_t pos = listenHost.find(":");
      if (pos != string::npos) {
        listenPort = listenHost.substr(pos+1);
        listenHost.erase(pos);
      } else {
432
        listenPort = LISTENPORT;
433
      }
434
435

      cassandraHost = string(settings.cassandraSettings.address);
436
437
438
439
440
      pos = cassandraHost.find(":");
      if (pos != string::npos) {
        cassandraPort = cassandraHost.substr(pos+1);
        cassandraHost.erase(pos);
      } else {
441
        cassandraPort = CASSANDRAPORT;
442
      }
443
444

      restApiHost = string(settings.restListenAddress);
445
446
447
448
449
      pos = restApiHost.find(":");
      if (pos != string::npos) {
        restApiPort = restApiHost.substr(pos+1);
        restApiHost.erase(pos);
      } else {
450
        restApiPort = RESTAPIPORT;
451
452
      }

453
454
      // Setting the size of the sensor cache
      mySensorCache.setMaxHistory(settings.cacheInterval * 1000000000);
455

456
457
      //Allocate and initialize connection to Cassandra.
      dcdbConn = new DCDB::Connection(cassandraHost, atoi(cassandraPort.c_str()), settings.cassandraSettings.username, settings.cassandraSettings.password);
Alessio Netti's avatar
Alessio Netti committed
458
459
460
461
462
      dcdbConn->setNumThreadsIo(settings.cassandraSettings.numThreadsIo);
      dcdbConn->setQueueSizeIo(settings.cassandraSettings.queueSizeIo);
      uint32_t params[3] = {settings.cassandraSettings.coreConnPerHost, settings.cassandraSettings.maxConnPerHost, settings.cassandraSettings.maxConcRequests};
      dcdbConn->setBackendParams(params);

463

Axel Auweter's avatar
Axel Auweter committed
464
      if (!dcdbConn->connect()) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
465
          LOG(fatal) << "Cannot connect to Cassandra!";
466
467
468
469
470
471
          exit(EXIT_FAILURE);
      }

      /*
       * Legacy behavior: Initialize the DCDB schema in Cassandra.
       */
Axel Auweter's avatar
Axel Auweter committed
472
      dcdbConn->initSchema();
473

Alessio Netti's avatar
Alessio Netti committed
474

475
476
477
      /*
       * Allocate the SensorDataStore.
       */
478
      mySensorDataStore = new DCDB::SensorDataStore(dcdbConn);
479
      mySensorConfig = new DCDB::SensorConfig(dcdbConn);
480
481
482

      /*
       * Set TTL for data store inserts if TTL > 0.
483
       */
484
485
      if (settings.cassandraSettings.ttl > 0)
        mySensorDataStore->setTTL(settings.cassandraSettings.ttl);
Alessio Netti's avatar
Alessio Netti committed
486
      mySensorDataStore->setDebugLog(settings.cassandraSettings.debugLog);
487

488
489
490
      /*
       * Start the MQTT Message Server.
       */
491
      SimpleMQTTServer ms(listenHost, listenPort, settings.messageThreads, settings.messageSlots);
492
      
493
494
495
      ms.setMessageCallback(mqttCallback);
      ms.start();

Alessio Netti's avatar
Logging    
Alessio Netti committed
496
      LOG(info) << "MQTT Server running...";
497
      
498
499
500
501
502
503
504
505
506
507
508
509
      /*
       * Start the HTTP Server for the REST API
       */
      std::thread httpThread;
      httpHandler_t httpHandler;
      httpServer_t::options httpOptions(httpHandler);

      httpOptions.reuse_address(true);
      httpOptions.thread_pool(std::make_shared<boost::network::utils::thread_pool>());
      httpServer_t httpServer(httpOptions.address(restApiHost).port(restApiPort));
      httpThread = std::thread([&httpServer] { httpServer.run(); });

Alessio Netti's avatar
Logging    
Alessio Netti committed
510
      LOG(info) <<  "HTTP Server running...";
511

512
513
514
515
516
517
      /*
       * Run (hopefully) forever...
       */
      keepRunning = 1;
      timeval start, end;
      double elapsed;
518
519
520
      msgCtr = 0;
      pmsgCtr = 0;
      readingCtr = 0;
521

Alessio Netti's avatar
Alessio Netti committed
522
523
524
      gettimeofday(&start, NULL);
      uint64_t lastCleanup = start.tv_sec;

525
      LOG(info) << "Collect Agent running...";
526
527
      while(keepRunning) {
          gettimeofday(&start, NULL);
Alessio Netti's avatar
Alessio Netti committed
528
529
530
531
532
533
534
          if(start.tv_sec - lastCleanup > settings.cleaningInterval) {
              uint64_t purged = mySensorCache.clean(settings.cleaningInterval * 1000000000);
              lastCleanup = start.tv_sec;
              if(purged > 0)
                  LOG(info) << "Cache: purged " << purged << " obsolete entries";
          }

535
          sleep(60);
536
537
538
539
          /* not really thread safe but will do the job */
          gettimeofday(&end, NULL);
          elapsed = (end.tv_sec - start.tv_sec) * 1000.0;
          elapsed += (end.tv_usec - start.tv_usec) / 1000.0;
540
          float publish = msgCtr?(pmsgCtr*100.0)/msgCtr:0;
Alessio Netti's avatar
Alessio Netti committed
541
          if (settings.statistics && keepRunning) {
542
              LOG(info) << "Performance: " << (readingCtr/elapsed)*1000.0 << " inserts/s, " << (msgCtr/elapsed)*1000.0 << " messages/s (" << publish << "% PUBLISH)";
543
          }
544
          msgCtr = 0;
545
          pmsgCtr = 0;
546
	  readingCtr = 0;
547
548
      }

Alessio Netti's avatar
Logging    
Alessio Netti committed
549
      LOG(info) << "Stopping...";
550
551

      ms.stop();
Alessio Netti's avatar
Logging    
Alessio Netti committed
552
      LOG(info) << "MQTT Server stopped...";
553
554
      httpServer.stop();
      httpThread.join();
Alessio Netti's avatar
Logging    
Alessio Netti committed
555
      LOG(info) << "HTTP Server stopped...";
556
      delete mySensorDataStore;
557
      delete mySensorConfig;
Axel Auweter's avatar
Axel Auweter committed
558
559
      dcdbConn->disconnect();
      delete dcdbConn;
Alessio Netti's avatar
Logging    
Alessio Netti committed
560
      LOG(info) << "Collect Agent closed. Bye bye...";
561
  }
562
  catch (const exception& e) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
563
      LOG(fatal) << "Exception: " << e.what();
564
      abrt(EXIT_FAILURE, INTERR);
565
566
  }

567
  return EXIT_SUCCESS;
568
}
569
570