Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

collectagent.cpp 22.2 KB
Newer Older
1
2
//================================================================================
// Name        : collectagent.cpp
3
4
// Author      : Axel Auweter
// Copyright   : Leibniz Supercomputing Centre
5
// Description : Main code of the CollectAgent
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2011-2016 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
26

27
28
#include <boost/network/protocol/http/server.hpp>
#include <boost/network/utils/thread_pool.hpp>
29
#include <boost/network/uri.hpp>
30
#include <iostream>
31
#include <cstdlib>
32
#include <signal.h>
33
34
#include <unistd.h>
#include <string>
35

36
#include <boost/date_time/posix_time/posix_time.hpp>
37

38
#include <dcdb/connection.h>
39
#include <dcdb/sensordatastore.h>
40
#include <dcdb/sensorconfig.h>
41
#include <dcdb/version.h>
42
#include "version.h"
43

44
#include "configuration.h"
45
#include "simplemqttserver.h"
46
#include "messaging.h"
47
#include "abrt.h"
48
#include "dcdbdaemon.h"
49
#include "sensorcache.h"
50
51
#include "analyticscontroller.h"
#include "../analytics/includes/QueryEngine.h"
52

53
54
55
#define __STDC_FORMAT_MACROS
#include <inttypes.h>

56
using namespace std;
57
58

int keepRunning;
59
bool statistics;
60
61
uint64_t msgCtr;
uint64_t pmsgCtr;
62
uint64_t readingCtr;
Alessio Netti's avatar
Alessio Netti committed
63
SensorCache mySensorCache;
64
AnalyticsController* analyticsController;
65
DCDB::Connection* dcdbConn;
66
DCDB::SensorDataStore *mySensorDataStore;
67
68
DCDB::SensorConfig *mySensorConfig;
DCDB::SCError err;
69
logger_t lg;
70

71
72
73
74
std::vector<reading_t>* sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>* buffer, const bool rel) {
    return buffer;
}

75
/* Normal termination (SIGINT, CTRL+C) */
76
77
void sigHandler(int sig)
{
Alessio Netti's avatar
Alessio Netti committed
78
79
80
81
82
  boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
  if( sig == SIGINT )
      LOG(fatal) << "Received SIGINT";
  else if( sig == SIGTERM )
      LOG(fatal) << "Received SIGTERM";
83
84
85
  keepRunning = 0;
}

86
87
88
89
90
91
/* Crash */
void abrtHandler(int sig)
{
  abrt(EXIT_FAILURE, SIGNAL);
}

92
93
94
95
96
struct httpHandler_t;
typedef boost::network::http::server<httpHandler_t> httpServer_t;
struct httpHandler_t {
  void operator()(httpServer_t::request const &request, httpServer_t::connection_ptr connection) {
    httpServer_t::string_type ip = source(request);
97
    std::ostringstream data;
98
    static httpServer_t::response_header headers[] = { { "Connection", "close" }, { "Content-Type", "text/plain" } };
99

100
101
    //try getting the latest value 
    try {
102
103
104
105
106
      boost::network::uri::uri uri("http://localhost"+request.destination);
      std::map<std::string, std::string> queries;
      boost::network::uri::query_map(uri, queries);
      int avg = atoi(queries.find("avg")->second.c_str());

Alessio Netti's avatar
Alessio Netti committed
107
      int64_t val = mySensorCache.getSensor(uri.path(), (uint64_t)avg * 1000000000);
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131

      data << val << "\n";
      //data << "Sid : " << sid.toString() << ", Value: " << val << "." << std::endl;

      connection->set_status(httpServer_t::connection::ok);
      connection->set_headers(boost::make_iterator_range(headers, headers + 2));
      connection->write(data.str());

    }

    catch (const std::invalid_argument& e) {
      connection->set_status(httpServer_t::connection::not_found);
      connection->set_headers(boost::make_iterator_range(headers, headers + 2));
      connection->write("Error: Sensor id not found.\n");
    } catch (const std::out_of_range &e) {
      connection->set_status(httpServer_t::connection::no_content);
      connection->set_headers(boost::make_iterator_range(headers, headers + 2));
      connection->write("Error: Sensor unavailable.\n");
    } catch (const std::exception& e) {
      connection->set_status(httpServer_t::connection::internal_server_error);
      connection->set_headers(boost::make_iterator_range(headers, headers + 2));
      connection->write("Server error.\n");
    }

132
133
134
135
  }
};


136
int mqttCallback(SimpleMQTTMessage *msg)
137
138
139
140
{
  /*
   * Increment the msgCtr/vmsgCtr for statistics.
   */
141
  msgCtr++;
142
143
144
  if (msg->isPublish())
    pmsgCtr++;

145
  uint64_t len;
146
147
148
  /*
   * Decode the message and put into the database.
   */
149
  if (msg->isPublish()) {
150
151
152
153
      const char *topic = msg->getTopic().c_str();
      // We check whether the topic includes the \DCDB_MAP\ keyword, indicating that the payload will contain the
      // sensor's name. In that case, we set the mappingMessage flag to true, and filter the keyword out of the prefix
      // We use strncmp as it is the most efficient way to do it
Alessio Netti's avatar
Alessio Netti committed
154
      if (strncmp(topic, DCDB_MAP, DCDB_MAP_LEN) == 0) {
155
          if ((len = msg->getPayloadLength()) == 0) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
156
              LOG(error) << "Empty topic-to-name mapping message received";
157
158
              return 1;
          }
159

160
161
162
163
164
165
          string sensorName((char *) msg->getPayload(), len);
          err = mySensorConfig->publishSensor(sensorName.c_str(), topic + DCDB_MAP_LEN);

          // PublishSensor does most of the error checking for us
          switch (err) {
              case DCDB::SC_INVALIDPATTERN:
Alessio Netti's avatar
Logging    
Alessio Netti committed
166
                  LOG(error) << "Invalid sensor topic : " << msg->getTopic();
167
168
                  return 1;
              case DCDB::SC_INVALIDPUBLICNAME:
Alessio Netti's avatar
Logging    
Alessio Netti committed
169
                  LOG(error) << "Invalid sensor public name: " << sensorName;
170
171
                  return 1;
              case DCDB::SC_INVALIDSESSION:
Alessio Netti's avatar
Logging    
Alessio Netti committed
172
                  LOG(error) << "Cannot reach sensor data store.";
173
174
175
176
177
178
179
180
181
182
183
                  return 1;
              default:
                  break;
          }
      } else {
          mqttPayload buf, *payload;

          len = msg->getPayloadLength();
          //In the 64 bit message case, the collect agent provides a timestamp
          if (len == sizeof(uint64_t)) {
              payload = &buf;
184
              payload->value = *((int64_t *) msg->getPayload());
185
186
              payload->timestamp = Messaging::calculateTimestamp();
              len = sizeof(uint64_t) * 2;
187
          }
188
189
190
191
192
193
              //...otherwise it just retrieves it from the MQTT message payload.
          else if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
              payload = (mqttPayload *) msg->getPayload();
          }
              //...otherwise this message is malformed -> ignore...
          else {
Alessio Netti's avatar
Logging    
Alessio Netti committed
194
              LOG(error) << "Message malformed";
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
              return 1;
          }

          /*
           * Check if we can decode the message topic
           * into a valid SensorId. If successful, store
           * the record in the database.
           */
          DCDB::SensorId sid;
          if (sid.mqttTopicConvert(msg->getTopic())) {
#if 0
              cout << "Topic decode successful:" << endl
                  << "  Raw:            " << hex << setw(16) << setfill('0') << sid.getRaw()[0] << hex << setw(16) << setfill('0') << sid.getRaw()[1] << endl
                  << "  DeviceLocation: " << hex << setw(16) << setfill('0') << sid.getDeviceLocation() << endl
                  << "  device_id:      " << hex << setw(8) << setfill('0') << sid.getDeviceSensorId().device_id << endl
                  << "  sensor_number:  " << hex << setw(4) << setfill('0') << sid.getDeviceSensorId().sensor_number << endl << dec;

              cout << "Payload ("  << len/sizeof(mqttPayload) << " messages):"<< endl;
              for (uint64_t i=0; i<len/sizeof(mqttPayload); i++) {
                cout << "  " << i << ": ts=" << payload[i].timestamp << " val=" << payload[i].value << endl;
              }
              cout << endl;
217
#endif
218
	      std::list<DCDB::SensorDataStoreReading> readings;
219
              for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
220
221
		  DCDB::SensorDataStoreReading r(sid, payload[i].timestamp, payload[i].value);
		  readings.push_back(r);
222
223
                  mySensorCache.storeSensor(sid, payload[i].timestamp, payload[i].value);
              }
224
	      mySensorDataStore->insertBatch(readings);
225
	      readingCtr+= readings.size();
226

227
              //mySensorCache.dump();
228
          }
229
#if 1
230
231
232
233
          else {
              cout << "Wrong topic format: " << msg->getTopic() << "\n";
              return 1;
          }
234
#endif
235
      }
236
  }
237
  return 0;
238
239
}

240
241
242



243
244
245
246
/*
 * Print usage information
 */
void usage() {
Alessio Netti's avatar
Alessio Netti committed
247
248
  Configuration config("");
  globalCA_t& defaults = config.getGlobal();
249
250
251
252
253
  /*
             1         2         3         4         5         6         7         8
   012345678901234567890123456789012345678901234567890123456789012345678901234567890
   */
  cout << "Usage:" << endl;
254
  cout << "  collectagent [-d] [-s] [-x] [-a<string>] [-m<host>] [-r<host>] [-c<host>] [-u<username>] [-p<password>] [-t<ttl>] [-v<verbosity>] <path/to/configfiles/>" << endl;
255
256
257
258
  cout << "  collectagent -h" << endl;
  cout << endl;
  
  cout << "Options:" << endl;
259
  cout << "  -a <string>   Auto-publish pattern    [default: none]" << endl;
260
261
262
  cout << "  -m<host>      MQTT listen address     [default: " << defaults.mqttListenAddress << "]" << endl;
  cout << "  -r<host>      REST API listen address [default: " << defaults.restListenAddress << "]" << endl;
  cout << "  -c<host>      Cassandra host          [default: " << defaults.cassandraSettings.address << "]" << endl;
Michael Ott's avatar
Michael Ott committed
263
264
  cout << "  -u<username>  Cassandra username      [default: none]" << endl;
  cout << "  -p<password>  Cassandra password      [default: none]" << endl;
265
  cout << "  -t<ttl>       Cassandra insert TTL    [default: " << defaults.cassandraSettings.ttl << "]" << endl;
Alessio Netti's avatar
Logging    
Alessio Netti committed
266
267
  cout << "  -v<level>     Set verbosity of output [default: " << defaults.logLevelCmd << "]" << endl
       << "                Can be a number between 5 (all) and 0 (fatal)." << endl;
268
  cout << endl;
Michael Ott's avatar
Michael Ott committed
269
  cout << "  -d            Daemonize" << endl;
270
  cout << "  -s            Print message stats" <<endl;
271
  cout << "  -x            Parse and print the config but do not actually start collectagent" << endl;
Michael Ott's avatar
Michael Ott committed
272
  cout << "  -h            This help page" << endl;
273
  cout << endl;
274
275
276
}

int main(int argc, char* const argv[]) {
277
    cout << "CollectAgent " << VERSION << " (libdcdb " << DCDB::Version::getVersion() << ")" << endl << endl;
278
    bool validateConfig = false;
279
280

  try{
281

282
283
284
285
286
287
288
289
      // Checking if path to config is supplied
      if (argc <= 1) {
          cout << "Please specify a path to the config-directory" << endl << endl;
          usage();
          exit(EXIT_FAILURE);
      }

      // Defining options
290
      const char* opts = "a:m:r:c:C:u:p:t:v:dDsxh";
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306

      // Same mechanism as in DCDBPusher - checking if help string is requested before loading config
      char ret;
      while ((ret = getopt(argc, argv, opts)) != -1) {
          switch (ret)
          {
              case 'h':
                  usage();
                  exit(EXIT_FAILURE);
                  break;
              default:
                  //do nothing (other options are read later on)
                  break;
          }
      }

307
308
      initLogging();
      auto cmdSink = setupCmdLogger();
Alessio Netti's avatar
Logging    
Alessio Netti committed
309

310
311
      Configuration config(argv[argc - 1]);
      if( !config.readGlobal() ) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
312
          LOG(fatal) << "Failed to read global configuration!";
313
314
315
316
          exit(EXIT_FAILURE);
      }

      globalCA_t& settings = config.getGlobal();
317

318
      /* Parse command line */
319
      std::string listenHost, cassandraHost, restApiHost;
320
      std::string listenPort, cassandraPort, restApiPort;
321
322
323

      optind = 1;
      while ((ret=getopt(argc, argv, opts))!=-1) {
324
          switch(ret) {
325
326
327
              case 'a':
                  settings.pluginSettings.sensorPattern = optarg;
                  break;
328
              case 'm':
329
                  settings.mqttListenAddress = optarg;
330
                  break;
331
              case 'r':
332
                  settings.restListenAddress = optarg;
333
                  break;
334
              case 'c':
335
336
                  settings.cassandraSettings.address = optarg;
                  break;
Michael Ott's avatar
Michael Ott committed
337
              case 'u':
338
                  settings.cassandraSettings.username = optarg;
339
                  break;
Michael Ott's avatar
Michael Ott committed
340
              case 'p': {
341
342
                  settings.cassandraSettings.password = optarg;
                  // What does this do? Mask the password?
Michael Ott's avatar
Michael Ott committed
343
344
345
346
347
348
349
                  size_t pwdLen = strlen(optarg);
                  memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
                  if (pwdLen > 3) {
                      memset(optarg+3, 0, pwdLen-3);
                  }
                  break;
              }
350
              case 't':
351
                  settings.cassandraSettings.ttl = stoul(optarg);
352
                  break;
Alessio Netti's avatar
Logging    
Alessio Netti committed
353
              case 'v':
354
                  settings.logLevelCmd = translateLogLevel(stoi(optarg));
Alessio Netti's avatar
Logging    
Alessio Netti committed
355
                  break;
356
              case 'd':
357
              case 'D':
358
                  settings.daemonize = 1;
359
                  break;
360
              case 's':
361
                  settings.statistics = 1;
362
                  break;
363
364
365
              case 'x':
                  validateConfig = true;
                  break;
366
              case 'h':
367
368
369
              default:
                  usage();
                  exit(EXIT_FAILURE);
370
371
372
          }
      }

373
      auto fileSink = setupFileLogger(settings.pluginSettings.tempdir, std::string("collectagent"));
Alessio Netti's avatar
Logging    
Alessio Netti committed
374
375
376
377
      //severity level may be overwritten (per option or config-file) --> set it according to globalSettings
      fileSink->set_filter(boost::log::trivial::severity >= settings.logLevelFile);
      cmdSink->set_filter(boost::log::trivial::severity >= settings.logLevelCmd);

378
      /*
Alessio Netti's avatar
Alessio Netti committed
379
       * Catch SIGINT and SIGTERM signals to allow for proper server shutdowns.
380
381
       */
      signal(SIGINT, sigHandler);
Alessio Netti's avatar
Alessio Netti committed
382
      signal(SIGTERM, sigHandler);
383
384
385
386
387
388
389
390
391
392
393

      /*
       * Catch critical signals to allow for backtraces
       */
      signal(SIGABRT, abrtHandler);
      signal(SIGSEGV, abrtHandler);

      // Daemonizing the collectagent
      if(settings.daemonize)
          dcdbdaemon();

394
395
396
      /*
       * Parse hostnames for port specifications
       */
397
      listenHost = string(settings.mqttListenAddress);
398
399
400
401
402
      size_t pos = listenHost.find(":");
      if (pos != string::npos) {
        listenPort = listenHost.substr(pos+1);
        listenHost.erase(pos);
      } else {
403
        listenPort = LISTENPORT;
404
      }
405
406

      cassandraHost = string(settings.cassandraSettings.address);
407
408
409
410
411
      pos = cassandraHost.find(":");
      if (pos != string::npos) {
        cassandraPort = cassandraHost.substr(pos+1);
        cassandraHost.erase(pos);
      } else {
412
        cassandraPort = CASSANDRAPORT;
413
      }
414
415

      restApiHost = string(settings.restListenAddress);
416
417
418
419
420
      pos = restApiHost.find(":");
      if (pos != string::npos) {
        restApiPort = restApiHost.substr(pos+1);
        restApiHost.erase(pos);
      } else {
421
        restApiPort = RESTAPIPORT;
422
423
      }

424
      // Setting the size of the sensor cache
425
426
      // Conversion from milliseconds to nanoseconds
      mySensorCache.setMaxHistory(settings.pluginSettings.cacheInterval * 1000000);
427

428
429
      //Allocate and initialize connection to Cassandra.
      dcdbConn = new DCDB::Connection(cassandraHost, atoi(cassandraPort.c_str()), settings.cassandraSettings.username, settings.cassandraSettings.password);
Alessio Netti's avatar
Alessio Netti committed
430
431
432
433
434
      dcdbConn->setNumThreadsIo(settings.cassandraSettings.numThreadsIo);
      dcdbConn->setQueueSizeIo(settings.cassandraSettings.queueSizeIo);
      uint32_t params[3] = {settings.cassandraSettings.coreConnPerHost, settings.cassandraSettings.maxConnPerHost, settings.cassandraSettings.maxConcRequests};
      dcdbConn->setBackendParams(params);

435

Axel Auweter's avatar
Axel Auweter committed
436
      if (!dcdbConn->connect()) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
437
          LOG(fatal) << "Cannot connect to Cassandra!";
438
439
440
441
442
443
          exit(EXIT_FAILURE);
      }

      /*
       * Legacy behavior: Initialize the DCDB schema in Cassandra.
       */
Axel Auweter's avatar
Axel Auweter committed
444
      dcdbConn->initSchema();
445

Alessio Netti's avatar
Alessio Netti committed
446

447
448
449
      /*
       * Allocate the SensorDataStore.
       */
450
      mySensorDataStore = new DCDB::SensorDataStore(dcdbConn);
451
      mySensorConfig = new DCDB::SensorConfig(dcdbConn);
452
453
454

      /*
       * Set TTL for data store inserts if TTL > 0.
455
       */
456
457
      if (settings.cassandraSettings.ttl > 0)
        mySensorDataStore->setTTL(settings.cassandraSettings.ttl);
Alessio Netti's avatar
Alessio Netti committed
458
      mySensorDataStore->setDebugLog(settings.cassandraSettings.debugLog);
459

460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
      analyticsController = new AnalyticsController(mySensorConfig, mySensorDataStore);
      analyticsController->setCache(&mySensorCache);
      if(!analyticsController->initialize(settings, argv[argc - 1]))
          return EXIT_FAILURE;
      QueryEngine::getInstance().setQueryCallback(sensorQueryCallback);

      LOG_LEVEL vLogLevel = validateConfig ? LOG_LEVEL::info : LOG_LEVEL::debug;
      LOG_VAR(vLogLevel) << "-----  Configuration  -----";

      //print global settings in either case
      LOG(info) << "Global Settings:";
      LOG(info) << "    MQTT-listenAddress: " << settings.mqttListenAddress;
      LOG(info) << "    CacheInterval:      " << int(settings.pluginSettings.cacheInterval/1000) << " [s]";
      LOG(info) << "    CleaningInterval:   " << settings.cleaningInterval << " [s]";
      LOG(info) << "    MessageThreads:     " << settings.messageThreads;
      LOG(info) << "    MessageSlots:       " << settings.messageSlots;
      LOG(info) << "    Daemonize:          " << (settings.daemonize ? "Enabled" : "Disabled");
      LOG(info) << "    Statistics:         " << (settings.statistics ? "Enabled" : "Disabled");
      LOG(info) << "    Write-Dir:          " << settings.pluginSettings.tempdir;
      LOG(info) << "    Hierarchy:          " << (settings.hierarchy!="" ? settings.hierarchy : "none");
      LOG(info) << (validateConfig ? "    Only validating config files." : "    ValidateConfig:     Disabled");

      LOG(info) << "Cassandra Driver Settings:";
      LOG(info) << "    Address:            " << settings.cassandraSettings.address;
      LOG(info) << "    TTL:                " << settings.cassandraSettings.ttl;
      LOG(info) << "    NumThreadsIO:       " << settings.cassandraSettings.numThreadsIo;
      LOG(info) << "    QueueSizeIO:        " << settings.cassandraSettings.queueSizeIo;
      LOG(info) << "    CoreConnPerHost:    " << settings.cassandraSettings.coreConnPerHost;
      LOG(info) << "    MaxConnPerHost:     " << settings.cassandraSettings.maxConnPerHost;
      LOG(info) << "    MaxConcRequests:    " << settings.cassandraSettings.maxConcRequests;
      LOG(info) << "    DebugLog:           " << (settings.cassandraSettings.debugLog ? "Enabled" : "Disabled");
#ifdef SimpleMQTTVerbose
      LOG(info) << "    Username:           " << settings.cassandraSettings.username;
	  LOG(info) << "    Password:           " << settings.cassandraSettings.password;
#else
      LOG(info) << "    Username and password not printed.";
#endif

      LOG(info) << "RestAPI Settings:";
      LOG(info) << "    REST Server: " << settings.restListenAddress;

      LOG_VAR(vLogLevel) << "-----  Analytics Configuration  -----";
      for(auto& p : analyticsController->getManager()->getPlugins()) {
          LOG_VAR(vLogLevel) << "Analytics Plugin \"" << p.id << "\"";
          p.configurator->printConfig(vLogLevel);
      }
      LOG_VAR(vLogLevel) << "-----  End Configuration  -----";

      if (validateConfig)
          return EXIT_SUCCESS;
      else
          analyticsController->start();

513
514
515
      /*
       * Start the MQTT Message Server.
       */
516
      SimpleMQTTServer ms(listenHost, listenPort, settings.messageThreads, settings.messageSlots);
517
      
518
519
520
      ms.setMessageCallback(mqttCallback);
      ms.start();

Alessio Netti's avatar
Logging    
Alessio Netti committed
521
      LOG(info) << "MQTT Server running...";
522
      
523
524
525
526
527
528
529
530
531
532
533
534
      /*
       * Start the HTTP Server for the REST API
       */
      std::thread httpThread;
      httpHandler_t httpHandler;
      httpServer_t::options httpOptions(httpHandler);

      httpOptions.reuse_address(true);
      httpOptions.thread_pool(std::make_shared<boost::network::utils::thread_pool>());
      httpServer_t httpServer(httpOptions.address(restApiHost).port(restApiPort));
      httpThread = std::thread([&httpServer] { httpServer.run(); });

Alessio Netti's avatar
Logging    
Alessio Netti committed
535
      LOG(info) <<  "HTTP Server running...";
536

537
538
539
540
541
542
      /*
       * Run (hopefully) forever...
       */
      keepRunning = 1;
      timeval start, end;
      double elapsed;
543
544
545
      msgCtr = 0;
      pmsgCtr = 0;
      readingCtr = 0;
546

Alessio Netti's avatar
Alessio Netti committed
547
548
549
      gettimeofday(&start, NULL);
      uint64_t lastCleanup = start.tv_sec;

550
      LOG(info) << "Collect Agent running...";
551
552
      while(keepRunning) {
          gettimeofday(&start, NULL);
Alessio Netti's avatar
Alessio Netti committed
553
554
555
556
557
558
559
          if(start.tv_sec - lastCleanup > settings.cleaningInterval) {
              uint64_t purged = mySensorCache.clean(settings.cleaningInterval * 1000000000);
              lastCleanup = start.tv_sec;
              if(purged > 0)
                  LOG(info) << "Cache: purged " << purged << " obsolete entries";
          }

560
          sleep(60);
561
562
563
564
          /* not really thread safe but will do the job */
          gettimeofday(&end, NULL);
          elapsed = (end.tv_sec - start.tv_sec) * 1000.0;
          elapsed += (end.tv_usec - start.tv_usec) / 1000.0;
565
          float publish = msgCtr?(pmsgCtr*100.0)/msgCtr:0;
Alessio Netti's avatar
Alessio Netti committed
566
          if (settings.statistics && keepRunning) {
567
              LOG(info) << "Performance: " << (readingCtr/elapsed)*1000.0 << " inserts/s, " << (msgCtr/elapsed)*1000.0 << " messages/s (" << publish << "% PUBLISH)";
568
              LOG(info) << "Analytics Performance: " << (analyticsController->getReadingCtr()/elapsed)*1000.0 << " inserts/s ";
569
          }
570
          msgCtr = 0;
571
          pmsgCtr = 0;
572
	  readingCtr = 0;
573
574
      }

Alessio Netti's avatar
Logging    
Alessio Netti committed
575
      LOG(info) << "Stopping...";
576
      analyticsController->stop();
577
      ms.stop();
Alessio Netti's avatar
Logging    
Alessio Netti committed
578
      LOG(info) << "MQTT Server stopped...";
579
580
      httpServer.stop();
      httpThread.join();
Alessio Netti's avatar
Logging    
Alessio Netti committed
581
      LOG(info) << "HTTP Server stopped...";
582
      delete mySensorDataStore;
583
      delete mySensorConfig;
Axel Auweter's avatar
Axel Auweter committed
584
585
      dcdbConn->disconnect();
      delete dcdbConn;
Alessio Netti's avatar
Logging    
Alessio Netti committed
586
      LOG(info) << "Collect Agent closed. Bye bye...";
587
  }
588
  catch (const exception& e) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
589
      LOG(fatal) << "Exception: " << e.what();
590
      abrt(EXIT_FAILURE, INTERR);
591
592
  }

593
  return EXIT_SUCCESS;
594
}
595
596