2.12.2021, 9:00 - 11:00: Due to updates GitLab may be unavailable for some minutes between 09:00 and 11:00.

collectagent.cpp 39.7 KB
Newer Older
1
2
//================================================================================
// Name        : collectagent.cpp
3
// Author      : Axel Auweter
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
// Copyright   : Leibniz Supercomputing Centre
6
// Description : Main code of the CollectAgent
7
8
9
10
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
11
// Copyright (C) 2011-2019 Leibniz Supercomputing Centre
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
27

28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
 * @defgroup ca Collect Agent
 *
 * @brief MQTT message broker in between pusher and storage backend.
 *
 * @details Collect Agent is a intermediary between one or multiple Pusher
 *          instances and one storage backend. It runs a reduced custom MQTT
 *          message server. Collect Agent receives data from Pusher
 *          via MQTT messages and stores them in the storage via libdcdb.
 */

/**
 * @file collectagent.cpp
 *
 * @brief Main function for the DCDB Collect Agent.
 *
 * @ingroup ca
 */

47
#include <cstdlib>
48
#include <signal.h>
49
50
#include <unistd.h>
#include <string>
51

52
#include <boost/date_time/posix_time/posix_time.hpp>
Micha Müller's avatar
Micha Müller committed
53
54
55
#include <boost/foreach.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>
56

57
#include <dcdb/libconfig.h>
58
#include <dcdb/connection.h>
59
#include <dcdb/sensordatastore.h>
Alessio Netti's avatar
Alessio Netti committed
60
#include <dcdb/jobdatastore.h>
61
#include <dcdb/calievtdatastore.h>
62
#include <dcdb/sensorconfig.h>
63
#include <dcdb/version.h>
64
#include <dcdb/sensor.h>
65
#include "version.h"
66

67
#include "CARestAPI.h"
68
#include "configuration.h"
69
#include "simplemqttserver.h"
70
#include "messaging.h"
71
#include "abrt.h"
72
#include "dcdbdaemon.h"
73
#include "sensorcache.h"
74
75
#include "analyticscontroller.h"
#include "../analytics/includes/QueryEngine.h"
76

77
78
79
#define __STDC_FORMAT_MACROS
#include <inttypes.h>

80
81
82
83
84
85
86
/**
 * Uncomment and recompile to activate CollectAgent's special benchmark mode.
 * In this mode, all received messages will be discarded and no data is stored
 * in the storage backend.
 */
//#define BENCHMARK_MODE

87
using namespace std;
88

89
bool newAutoPub;
90
int keepRunning;
Alessio Netti's avatar
Alessio Netti committed
91
int retCode = EXIT_SUCCESS;
92
uint64_t msgCtr;
93
uint64_t readingCtr;
94
95
uint64_t dbQueryCtr;
uint64_t cachedQueryCtr;
Alessio Netti's avatar
Alessio Netti committed
96
SensorCache mySensorCache;
97
AnalyticsController* analyticsController;
98
DCDB::Connection* dcdbConn;
99
DCDB::SensorDataStore *mySensorDataStore;
Alessio Netti's avatar
Alessio Netti committed
100
DCDB::JobDataStore *myJobDataStore;
101
DCDB::SensorConfig *mySensorConfig;
102
DCDB::CaliEvtDataStore *myCaliEvtDataStore;
103
MetadataStore *metadataStore;
Alessio Netti's avatar
Alessio Netti committed
104
CARestAPI* httpsServer = nullptr;
105
DCDB::SCError err;
106
QueryEngine& queryEngine = QueryEngine::getInstance();
107
boost::shared_ptr<boost::asio::io_context::work> keepAliveWork;
108
logger_t lg;
109

110
bool jobQueryCallback(const string& jobId, const uint64_t startTs, const uint64_t endTs, vector<qeJobData>& buffer, const bool rel, const bool range, const string& domainId) {
Alessio Netti's avatar
Alessio Netti committed
111
112
113
114
115
116
117
118
119
120
    std::list<JobData> tempList;
    JobData   tempData;
    qeJobData tempQeData;
    JDError err;
    if(range) {
        // Getting a list of jobs in the given time range
        uint64_t now = getTimestamp();
        uint64_t startTsInt = rel ? now - startTs : startTs;
        uint64_t endTsInt = rel ? now - endTs : endTs;
        DCDB::TimeStamp start(startTsInt), end(endTsInt);
121
        err = myJobDataStore->getJobsInIntervalRunning(tempList, start, end, domainId);
122
        if(err != JD_OK) return false;
Alessio Netti's avatar
Alessio Netti committed
123
124
    } else {
        // Getting a single job by id
125
        err = myJobDataStore->getJobById(tempData, jobId, domainId);
126
        if(err != JD_OK) return false;
Alessio Netti's avatar
Alessio Netti committed
127
128
129
130
        tempList.push_back(tempData);
    }
    
    for(auto& jd : tempList) {
131
        tempQeData.domainId = jd.domainId;
Alessio Netti's avatar
Alessio Netti committed
132
133
134
135
136
        tempQeData.jobId = jd.jobId;
        tempQeData.userId = jd.userId;
        tempQeData.startTime = jd.startTime.getRaw();
        tempQeData.endTime = jd.endTime.getRaw();
        tempQeData.nodes = jd.nodes;
137
        buffer.push_back(tempQeData);
Alessio Netti's avatar
Alessio Netti committed
138
    }
139
    return true;
Alessio Netti's avatar
Alessio Netti committed
140
141
}

142
bool sensorGroupQueryCallback(const std::vector<string>& names, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>& buffer, const bool rel, const uint64_t tol) {
143
144
145
    // Returning NULL if the query engine is being updated
    if(queryEngine.updating.load()) return false;
    ++queryEngine.access;
146
147
148
149
150
151
152
153
154
155
156
157
158
159
    
    std::list<DCDB::SensorId> topics;
    std::string topic;
    sensorCache_t& sensorMap = mySensorCache.getSensorMap();
    size_t successCtr = 0;
    for(const auto& name : names) {
        // Getting the topic of the queried sensor from the Navigator
        // If not found, we try to use the input name as topic
        try {
            topic = queryEngine.getNavigator()->getNodeTopic(name);
        } catch (const std::domain_error &e) { topic = name; }
        DCDB::SensorId sid;
        // Creating a SID to perform the query
        if (sid.mqttTopicConvert(topic)) {
160
            mySensorCache.wait();
161
            if (sensorMap.count(sid) > 0 && sensorMap[sid].getView(startTs, endTs, buffer, rel, tol)) {
162
163
164
165
                // Data was found, can continue to next SID
                successCtr++;
            } else {
                // This happens only if no data was found in the local cache
166
167
                topics.push_back(sid);
            }
168
            mySensorCache.release();
169
        }
170
    }
171
172
173
174
    if (successCtr) {
	cachedQueryCtr+= buffer.size();
    }
    
175
176
177
178
179
180
181
182
183
184
185
186
187
    // If we are here then some sensors were not found in the cache - we need to fetch data from Cassandra
    if(!topics.empty()) {
        try {
            std::list <DCDB::SensorDataStoreReading> results;
            uint64_t now = getTimestamp();
            //Converting relative timestamps to absolute
            uint64_t startTsInt = rel ? now - startTs : startTs;
            uint64_t endTsInt = rel ? now - endTs : endTs;
            DCDB::TimeStamp start(startTsInt), end(endTsInt);
            uint16_t startWs=start.getWeekstamp(), endWs=end.getWeekstamp();
            // If timestamps are equal we perform a fuzzy query
            if(startTsInt == endTsInt) {
                topics.front().setRsvd(startWs);
188
                mySensorDataStore->fuzzyQuery(results, topics, start, tol, false);
189
190
191
192
193
            }
            // Else, we iterate over the weekstamps (if more than one) and perform range queries
            else {
                for(uint16_t currWs=startWs; currWs<=endWs; currWs++) {
                    topics.front().setRsvd(currWs);
194
                    mySensorDataStore->query(results, topics, start, end, DCDB::AGGREGATE_NONE, false);
195
196
197
198
199
200
                }
            }

            if (!results.empty()) {
                successCtr++;
                reading_t reading;
201
		dbQueryCtr+= results.size();
202
203
204
205
206
207
                for (const auto &r : results) {
                    reading.value = r.value;
                    reading.timestamp = r.timeStamp.getRaw();
                    buffer.push_back(reading);
                }
            }
208
        }
209
        catch (const std::exception &e) {}
210
    }
211
    
212
    --queryEngine.access;
213
214
215
    return successCtr>0;
}

216
bool sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>& buffer, const bool rel, const uint64_t tol) {
217
218
219
220
    // Returning NULL if the query engine is being updated
    if(queryEngine.updating.load()) return false;
    std::vector<std::string> nameWrapper;
    nameWrapper.push_back(name);
221
    return sensorGroupQueryCallback(nameWrapper, startTs, endTs, buffer, rel, tol);
222
223
}

224
225
226
227
228
229
230
231
232
233
234
bool metadataQueryCallback(const string& name, SensorMetadata& buffer) {
    // Returning NULL if the query engine is being updated
    if(queryEngine.updating.load()) return false;
    ++queryEngine.access;
    std::string topic=name;
    // Getting the topic of the queried sensor from the Navigator
    // If not found, we try to use the input name as topic
    try {
        topic = queryEngine.getNavigator()->getNodeTopic(name);
    } catch(const std::domain_error& e) {}
    
235
    bool local = false;
236
237
238
239
    metadataStore->wait();
    if(metadataStore->getMap().count(topic)) {
        buffer = metadataStore->get(topic);
        local = true;
240
    }
241
242
    metadataStore->release();
        
243
    if(!local) {
244
245
246
247
248
249
250
        // If we are here then the sensor was not found in the cache - we need to fetch data from Cassandra
        try {
            DCDB::PublicSensor publicSensor;
            if (mySensorConfig->getPublicSensorByName(publicSensor, topic.c_str()) != SC_OK) {
                --queryEngine.access;
                return false;
            }
251
            buffer = DCDB::PublicSensor::publicSensorToMetadata(publicSensor);
252
253
254
255
256
257
258
259
260
261
        }
        catch (const std::exception &e) {
            --queryEngine.access;
            return false;
        }
    }
    --queryEngine.access;
    return true;
}

262
/* Normal termination (SIGINT, CTRL+C) */
263
264
void sigHandler(int sig)
{
Alessio Netti's avatar
Alessio Netti committed
265
  boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
Alessio Netti's avatar
Alessio Netti committed
266
  if( sig == SIGINT ) {
Alessio Netti's avatar
Alessio Netti committed
267
      LOG(fatal) << "Received SIGINT";
Alessio Netti's avatar
Alessio Netti committed
268
269
      retCode = EXIT_SUCCESS;
  } else if( sig == SIGTERM ) {
Alessio Netti's avatar
Alessio Netti committed
270
      LOG(fatal) << "Received SIGTERM";
Alessio Netti's avatar
Alessio Netti committed
271
272
273
274
275
      retCode = EXIT_SUCCESS;
  } else if( sig == SIGUSR1 ) {
      LOG(fatal) << "Received SIGUSR1 via REST API";
      retCode = !httpsServer ? EXIT_SUCCESS : httpsServer->getReturnCode();
  }
276
277

  keepAliveWork.reset();
278
279
280
  keepRunning = 0;
}

281
282
283
284
285
286
/* Crash */
void abrtHandler(int sig)
{
  abrt(EXIT_FAILURE, SIGNAL);
}

287
int mqttCallback(SimpleMQTTMessage *msg)
288
289
{
  /*
Michael Ott's avatar
Michael Ott committed
290
   * Increment the msgCtr for statistics.
291
   */
292
  msgCtr++;
293

294
295
#ifndef BENCHMARK_MODE

296
  uint64_t len;
297
298
299
  /*
   * Decode the message and put into the database.
   */
300
  if (msg->isPublish()) {
301
      const char *topic = msg->getTopic().c_str();
302
      // We check whether the topic includes the /DCDB_MAP/ keyword, indicating that the payload will contain the
303
304
      // sensor's name. In that case, we set the mappingMessage flag to true, and filter the keyword out of the prefix
      // We use strncmp as it is the most efficient way to do it
Alessio Netti's avatar
Alessio Netti committed
305
      if (strncmp(topic, DCDB_MAP, DCDB_MAP_LEN) == 0) {
306
          if ((len = msg->getPayloadLength()) == 0) {
307
              LOG(error) << "Empty sensor publish message received!";
308
309
              return 1;
          }
310

311
          string payload((char *) msg->getPayload(), len);
312
313
314
315
316
317
318
319
320
          //If the topic includes the extended /DCDB_MAP/METADATA/ keyword, we assume a JSON metadata packet is encoded
          if(strncmp(topic, DCDB_MET, DCDB_MET_LEN) == 0) {
              SensorMetadata sm;
              try {
                  sm.parseJSON(payload);
              } catch (const std::exception &e) {
                  LOG(error) << "Invalid metadata packed received!";
                  return 1;
              }
321
              if(sm.isValid()) {
322
                  err = mySensorConfig->publishSensor(sm);
323
324
                  metadataStore->store(*sm.getPattern(), sm);
              }
325
326
          } else {
              err = mySensorConfig->publishSensor(payload.c_str(), topic + DCDB_MAP_LEN);
327
          }
328
329
330
331

          // PublishSensor does most of the error checking for us
          switch (err) {
              case DCDB::SC_INVALIDPATTERN:
Alessio Netti's avatar
Logging    
Alessio Netti committed
332
                  LOG(error) << "Invalid sensor topic : " << msg->getTopic();
333
334
                  return 1;
              case DCDB::SC_INVALIDPUBLICNAME:
335
                  LOG(error) << "Invalid sensor public name.";
336
337
                  return 1;
              case DCDB::SC_INVALIDSESSION:
Alessio Netti's avatar
Logging    
Alessio Netti committed
338
                  LOG(error) << "Cannot reach sensor data store.";
339
340
341
342
                  return 1;
              default:
                  break;
          }
343
344
          
          newAutoPub = true;
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
      } else if (strncmp(topic, DCDB_CALIEVT, DCDB_CALIEVT_LEN) == 0) {
          /*
           * Special message case. This message contains a Caliper Event data
           * string that is encoded in the MQTT topic. Its payload consists of
           * usual timestamp-value pairs.
           * Data from this messages will be stored in a separate table managed
           * by CaliEvtDataStore class.
           */
          std::string topicStr(msg->getTopic());
          mqttPayload buf, *payload;

          len = msg->getPayloadLength();
          //TODO support case that no timestamp is given?
          //retrieve timestamp and value from the payload
          if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
              payload = (mqttPayload *) msg->getPayload();
          } else {
              //this message is malformed -> ignore
              LOG(error) << "Message malformed";
              return 1;
          }

          /*
           * Decode message topic in actual sensor topic and string data.
           * "/:/" is used as delimiter between topic and data.
           */
          topicStr.erase(0, DCDB_CALIEVT_LEN);
          size_t pos = topicStr.find("/:/");
          if (pos == std::string::npos) {
              // topic is malformed -> ignore
              LOG(error) << "CaliEvt topic malformed";
              return 1;
          }
          const std::string data(topicStr, pos+3);
          topicStr.erase(pos);

          /*
           * We use the same MQTT-topic/SensorId infrastructure as actual sensor
           * data readings to sort related events.
           * Check if we can decode the event topic into a valid SensorId. If
           * successful, store the record in the database.
           */
          DCDB::SensorId sid;
          if (sid.mqttTopicConvert(topicStr)) {
389
              //std::list<DCDB::CaliEvtData> events;
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
              DCDB::CaliEvtData e;
              e.eventId   = sid;
              e.event     = data;
              for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
                  e.timeStamp = payload[i].timestamp;

                  /**
                   * We want an exhaustive list of all events ordered by their
                   * time of occurrence. Payload values should always be
                   * one. Other values currently indicate a malformed message.
                   *
                   * In the future, the value field could be used to aggregate
                   * multiple equal events that occurred in the same plugin
                   * read cycle.
                   */
                  if(payload[i].value != 1) {
                      LOG(error) << "CaliEvt message malformed. Value != 1";
                      return 1;
                  }

410
411
                  myCaliEvtDataStore->insert(e, metadataStore->getTTL(topicStr));
                  //events.push_back(e);
412
              }
413
              //myCaliEvtDataStore->insertBatch(events, metadataStore->getTTL(topicStr));
414
415
416
          } else {
              LOG(error) << "Topic could not be converted to SID";
          }
Micha Müller's avatar
Micha Müller committed
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
      } else if (strncmp(topic, DCDB_JOBDATA, DCDB_JOBDATA_LEN) == 0) {
	      /**
           * This message contains Slurm job data in JSON format. We need to
           * parse the payload and store it within the JobDataStore.
           */
	      if ((len = msg->getPayloadLength()) == 0) {
		      LOG(error) << "Empty job data message received!";
		      return 1;
	      }

	      //parse JSON into JobData object
	      string        payload((char *)msg->getPayload(), len);
	      DCDB::JobData jd;
	      try {
		      boost::property_tree::iptree config;
		      std::istringstream           str(payload);
		      boost::property_tree::read_json(str, config);
		      BOOST_FOREACH (boost::property_tree::iptree::value_type &val, config) {
			      if (boost::iequals(val.first, "jobid")) {
				      jd.jobId = val.second.data();
437
438
439
			      } else if (boost::iequals(val.first, "domainid")) {
                      jd.domainId = val.second.data();
                  } else if (boost::iequals(val.first, "userid")) {
Micha Müller's avatar
Micha Müller committed
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
				      jd.userId = val.second.data();
			      } else if (boost::iequals(val.first, "starttime")) {
				      jd.startTime = DCDB::TimeStamp((uint64_t)stoull(val.second.data()));
			      } else if (boost::iequals(val.first, "endtime")) {
				      jd.endTime = DCDB::TimeStamp((uint64_t)stoull(val.second.data()));
			      } else if (boost::iequals(val.first, "nodes")) {
				      BOOST_FOREACH (boost::property_tree::iptree::value_type &node, val.second) {
					      jd.nodes.push_back(node.second.data());
				      }
			      }
		      }
	      } catch (const std::exception &e) {
		      LOG(error) << "Invalid job data packet received!";
		      return 1;
	      }

#ifdef DEBUG
	      LOG(debug) << "JobId  = " << jd.jobId;
	      LOG(debug) << "UserId = " << jd.userId;
	      LOG(debug) << "Start  = " << jd.startTime.getString();
	      LOG(debug) << "End    = " << jd.endTime.getString();
	      LOG(debug) << "Nodes: ";
	      for (const auto &n : jd.nodes) {
		      LOG(debug) << "    " << n;
	      }
#endif

	      //store JobData into Storage Backend
468
469
	      //dcdbslurmjob start inserts the endTime as startTime + max job length + 1, i.e. the last digit will be 1 
	      if ((jd.endTime == DCDB::TimeStamp((uint64_t)0)) || ((jd.endTime.getRaw() & 0x1) == 1)) {
Micha Müller's avatar
Micha Müller committed
470
471
		      //starting job data
		      if (myJobDataStore->insertJob(jd) != DCDB::JD_OK) {
472
			      LOG(error) << "Job data insert for job " << jd.jobId << " failed!";
Micha Müller's avatar
Micha Müller committed
473
474
475
476
477
			      return 1;
		      }
	      } else {
		      //ending job data
		      DCDB::JobData tmp;
478
		      if (myJobDataStore->getJobById(tmp, jd.jobId, jd.domainId) != DCDB::JD_OK) {
479
			      LOG(error) << "Could not retrieve job " << jd.jobId << " to be updated!";
Micha Müller's avatar
Micha Müller committed
480
481
482
			      return 1;
		      }

483
		      if (myJobDataStore->updateEndtime(tmp.jobId, tmp.startTime, jd.endTime, jd.domainId) != DCDB::JD_OK) {
484
			      LOG(error) << "Could not update end time of job " << tmp.jobId << "!";
Micha Müller's avatar
Micha Müller committed
485
486
487
			      return 1;
		      }
	      }
488
      } else {
Micha Müller's avatar
Micha Müller committed
489
	      mqttPayload buf, *payload;
490

Micha Müller's avatar
Micha Müller committed
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
	      len = msg->getPayloadLength();
	      //In the 64 bit message case, the collect agent provides a timestamp
	      if (len == sizeof(uint64_t)) {
		      payload = &buf;
		      payload->value = *((int64_t *)msg->getPayload());
		      payload->timestamp = Messaging::calculateTimestamp();
		      len = sizeof(uint64_t) * 2;
	      }
	      //...otherwise it just retrieves it from the MQTT message payload.
	      else if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
		      payload = (mqttPayload *)msg->getPayload();
	      }
	      //...otherwise this message is malformed -> ignore...
	      else {
		      LOG(error) << "Message malformed";
		      return 1;
	      }
508

Micha Müller's avatar
Micha Müller committed
509
	      /*
510
511
512
513
           * Check if we can decode the message topic
           * into a valid SensorId. If successful, store
           * the record in the database.
           */
Micha Müller's avatar
Micha Müller committed
514
515
	      DCDB::SensorId sid;
	      if (sid.mqttTopicConvert(msg->getTopic())) {
516
517
518
519
520
521
522
523
524
525
526
527
#if 0
              cout << "Topic decode successful:" << endl
                  << "  Raw:            " << hex << setw(16) << setfill('0') << sid.getRaw()[0] << hex << setw(16) << setfill('0') << sid.getRaw()[1] << endl
                  << "  DeviceLocation: " << hex << setw(16) << setfill('0') << sid.getDeviceLocation() << endl
                  << "  device_id:      " << hex << setw(8) << setfill('0') << sid.getDeviceSensorId().device_id << endl
                  << "  sensor_number:  " << hex << setw(4) << setfill('0') << sid.getDeviceSensorId().sensor_number << endl << dec;

              cout << "Payload ("  << len/sizeof(mqttPayload) << " messages):"<< endl;
              for (uint64_t i=0; i<len/sizeof(mqttPayload); i++) {
                cout << "  " << i << ": ts=" << payload[i].timestamp << " val=" << payload[i].value << endl;
              }
              cout << endl;
528
#endif
529
	      std::list<DCDB::SensorDataStoreReading> readings;
Alessio Netti's avatar
Alessio Netti committed
530
531
532
533
534
535
          for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
              DCDB::SensorDataStoreReading r(sid, payload[i].timestamp, payload[i].value);
              readings.push_back(r);
              mySensorCache.storeSensor(sid, payload[i].timestamp, payload[i].value);
          }
          mySensorCache.getSensorMap()[sid].updateBatchSize(uint64_t(len / sizeof(mqttPayload)));
536
	      mySensorDataStore->insertBatch(readings, metadataStore->getTTL(msg->getTopic()));
537
	      readingCtr+= readings.size();
538

539
              //mySensorCache.dump();
540
541
          } else {
              LOG(error) << "Message with empty topic received";
542
          }
543
      }
544
  }
545
#endif
546
  return 0;
547
548
}

549
550
551



552
553
554
555
/*
 * Print usage information
 */
void usage() {
556
557
558
559
560
  /*
             1         2         3         4         5         6         7         8
   012345678901234567890123456789012345678901234567890123456789012345678901234567890
   */
  cout << "Usage:" << endl;
561
  cout << "  collectagent [-d] [-s] [-x] [-a] [-m<host>] [-c<host>] [-u<username>] [-p<password>] [-t<ttl>] [-v<verbosity>] <config>" << endl;
562
563
564
565
  cout << "  collectagent -h" << endl;
  cout << endl;
  
  cout << "Options:" << endl;
566
567
  cout << "  -m<host>      MQTT listen address     [default: " << DEFAULT_LISTENHOST << ":" << DEFAULT_LISTENPORT << "]" << endl;
  cout << "  -c<host>      Cassandra host          [default: " << DEFAULT_CASSANDRAHOST << ":" << DEFAULT_CASSANDRAPORT << "]" << endl;
Michael Ott's avatar
Michael Ott committed
568
569
  cout << "  -u<username>  Cassandra username      [default: none]" << endl;
  cout << "  -p<password>  Cassandra password      [default: none]" << endl;
570
571
  cout << "  -t<ttl>       Cassandra insert TTL    [default: " << DEFAULT_CASSANDRATTL << "]" << endl;
  cout << "  -v<level>     Set verbosity of output [default: " << DEFAULT_LOGLEVEL << "]" << endl
Alessio Netti's avatar
Logging    
Alessio Netti committed
572
       << "                Can be a number between 5 (all) and 0 (fatal)." << endl;
573
  cout << endl;
Michael Ott's avatar
Michael Ott committed
574
  cout << "  -d            Daemonize" << endl;
575
  cout << "  -s            Print message stats" <<endl;
576
  cout << "  -x            Parse and print the config but do not actually start collectagent" << endl;
577
  cout << "  -a			   Enable sensor auto-publish" << endl;
Michael Ott's avatar
Michael Ott committed
578
  cout << "  -h            This help page" << endl;
579
  cout << endl;
580
581
582
}

int main(int argc, char* const argv[]) {
583
    cout << "CollectAgent " << VERSION << " (libdcdb " << DCDB::Version::getVersion() << ")" << endl << endl;
584
585

  try{
586

587
588
      // Checking if path to config is supplied
      if (argc <= 1) {
589
          cout << "Please specify a path to the config-directory or a config-file" << endl << endl;
590
591
592
593
594
          usage();
          exit(EXIT_FAILURE);
      }

      // Defining options
595
      const char* opts = "m:r:c:C:u:p:t:v:dDsaxh";
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611

      // Same mechanism as in DCDBPusher - checking if help string is requested before loading config
      char ret;
      while ((ret = getopt(argc, argv, opts)) != -1) {
          switch (ret)
          {
              case 'h':
                  usage();
                  exit(EXIT_FAILURE);
                  break;
              default:
                  //do nothing (other options are read later on)
                  break;
          }
      }

612
613
      initLogging();
      auto cmdSink = setupCmdLogger();
Alessio Netti's avatar
Logging    
Alessio Netti committed
614

Alessio Netti's avatar
Alessio Netti committed
615
      Configuration config(argv[argc - 1], "collectagent.conf");
616
      config.readConfig();
617

Alessio Netti's avatar
Alessio Netti committed
618
619
620
621
      // References to shorten access to config parameters
      Configuration& settings = config;
      cassandra_t& cassandraSettings = config.cassandraSettings;
      pluginSettings_t& pluginSettings = config.pluginSettings;
Micha Mueller's avatar
Micha Mueller committed
622
      serverSettings_t& restAPISettings = config.restAPISettings;
Alessio Netti's avatar
Alessio Netti committed
623
      analyticsSettings_t& analyticsSettings = config.analyticsSettings;
Alessio Netti's avatar
Alessio Netti committed
624
      
625
626
      optind = 1;
      while ((ret=getopt(argc, argv, opts))!=-1) {
627
          switch(ret) {
628
              case 'a':
629
                  pluginSettings.autoPublish = true;
630
                  break;
631
              case 'm':
Alessio Netti's avatar
Alessio Netti committed
632
633
                  settings.mqttListenHost = parseNetworkHost(optarg);
                  settings.mqttListenPort = parseNetworkPort(optarg);
634
                  if(settings.mqttListenPort=="") settings.mqttListenPort = string(DEFAULT_LISTENPORT);
635
                  break;
636
              case 'c':
Alessio Netti's avatar
Alessio Netti committed
637
638
                  cassandraSettings.host = parseNetworkHost(optarg);
                  cassandraSettings.port = parseNetworkPort(optarg);
639
                  if(cassandraSettings.port=="") cassandraSettings.port = string(DEFAULT_CASSANDRAPORT);
640
                  break;
Michael Ott's avatar
Michael Ott committed
641
              case 'u':
Alessio Netti's avatar
Alessio Netti committed
642
                  cassandraSettings.username = optarg;
643
                  break;
Michael Ott's avatar
Michael Ott committed
644
              case 'p': {
Alessio Netti's avatar
Alessio Netti committed
645
                  cassandraSettings.password = optarg;
646
                  // What does this do? Mask the password?
Michael Ott's avatar
Michael Ott committed
647
648
649
650
651
652
653
                  size_t pwdLen = strlen(optarg);
                  memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
                  if (pwdLen > 3) {
                      memset(optarg+3, 0, pwdLen-3);
                  }
                  break;
              }
654
              case 't':
Alessio Netti's avatar
Alessio Netti committed
655
                  cassandraSettings.ttl = stoul(optarg);
656
                  break;
Alessio Netti's avatar
Logging    
Alessio Netti committed
657
              case 'v':
658
                  settings.logLevelCmd = stoi(optarg);
Alessio Netti's avatar
Logging    
Alessio Netti committed
659
                  break;
660
              case 'd':
661
              case 'D':
662
                  settings.daemonize = 1;
663
                  break;
664
              case 's':
665
                  settings.statisticsInterval = 1;
666
                  break;
667
              case 'x':
Alessio Netti's avatar
Alessio Netti committed
668
                  settings.validateConfig = true;
669
                  break;
670
              case 'h':
671
672
673
              default:
                  usage();
                  exit(EXIT_FAILURE);
674
675
          }
      }
676
677
678
      
      libConfig.init();
      libConfig.setTempDir(pluginSettings.tempdir);
679

680
681
      //set up logger to file
      if (settings.logLevelFile >= 0) {
Alessio Netti's avatar
Alessio Netti committed
682
	  auto fileSink = setupFileLogger(pluginSettings.tempdir, std::string("collectagent"));
683
684
685
	  fileSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelFile));
      }
      
Alessio Netti's avatar
Logging    
Alessio Netti committed
686
      //severity level may be overwritten (per option or config-file) --> set it according to globalSettings
687
688
689
      if (settings.logLevelCmd >= 0) {
	  cmdSink->set_filter(boost::log::trivial::severity >= translateLogLevel(settings.logLevelCmd));
      }
Alessio Netti's avatar
Logging    
Alessio Netti committed
690

691
      /*
Alessio Netti's avatar
Alessio Netti committed
692
       * Catch SIGINT and SIGTERM signals to allow for proper server shutdowns.
693
694
       */
      signal(SIGINT, sigHandler);
Alessio Netti's avatar
Alessio Netti committed
695
      signal(SIGTERM, sigHandler);
Alessio Netti's avatar
Alessio Netti committed
696
      signal(SIGUSR1, sigHandler);
697
698
699
700
701
702
703
704
705
706

      /*
       * Catch critical signals to allow for backtraces
       */
      signal(SIGABRT, abrtHandler);
      signal(SIGSEGV, abrtHandler);

      // Daemonizing the collectagent
      if(settings.daemonize)
          dcdbdaemon();
Alessio Netti's avatar
Alessio Netti committed
707
      
708
      // Setting the size of the sensor cache
709
      // Conversion from milliseconds to nanoseconds
Alessio Netti's avatar
Alessio Netti committed
710
      mySensorCache.setMaxHistory(uint64_t(pluginSettings.cacheInterval) * 1000000);
711

712
      //Allocate and initialize connection to Cassandra.
Alessio Netti's avatar
Alessio Netti committed
713
714
      dcdbConn = new DCDB::Connection(cassandraSettings.host, atoi(cassandraSettings.port.c_str()), 
                                      cassandraSettings.username, cassandraSettings.password);
Alessio Netti's avatar
Alessio Netti committed
715
716
      dcdbConn->setNumThreadsIo(cassandraSettings.numThreadsIo);
      dcdbConn->setQueueSizeIo(cassandraSettings.queueSizeIo);
717
      uint32_t params[1] = {cassandraSettings.coreConnPerHost};
Alessio Netti's avatar
Alessio Netti committed
718
      dcdbConn->setBackendParams(params);
Alessio Netti's avatar
Alessio Netti committed
719
      
Axel Auweter's avatar
Axel Auweter committed
720
      if (!dcdbConn->connect()) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
721
          LOG(fatal) << "Cannot connect to Cassandra!";
722
723
724
725
726
727
          exit(EXIT_FAILURE);
      }

      /*
       * Legacy behavior: Initialize the DCDB schema in Cassandra.
       */
Axel Auweter's avatar
Axel Auweter committed
728
      dcdbConn->initSchema();
Alessio Netti's avatar
Alessio Netti committed
729
      
730
731
732
      /*
       * Allocate the SensorDataStore.
       */
733
      mySensorDataStore = new DCDB::SensorDataStore(dcdbConn);
734
      mySensorConfig = new DCDB::SensorConfig(dcdbConn);
Alessio Netti's avatar
Alessio Netti committed
735
      myJobDataStore = new DCDB::JobDataStore(dcdbConn);
736
      myCaliEvtDataStore = new DCDB::CaliEvtDataStore(dcdbConn);
737
738
739

      /*
       * Set TTL for data store inserts if TTL > 0.
740
       */
741
      if (cassandraSettings.ttl > 0) {
Alessio Netti's avatar
Alessio Netti committed
742
        mySensorDataStore->setTTL(cassandraSettings.ttl);
743
744
        myCaliEvtDataStore->setTTL(cassandraSettings.ttl);
      }
Alessio Netti's avatar
Alessio Netti committed
745
      mySensorDataStore->setDebugLog(cassandraSettings.debugLog);
746
      myCaliEvtDataStore->setDebugLog(cassandraSettings.debugLog);
747

748
749
      // Fetching public sensor information from the Cassandra datastore
      list<DCDB::PublicSensor> publicSensors;
750
751
752
753
754
      if(mySensorConfig->getPublicSensorsVerbose(publicSensors)!=SC_OK) {
	  LOG(error) << "Failed to retrieve public sensors!";
	  exit(EXIT_FAILURE);
      }
      
755
      metadataStore = new MetadataStore();
756
      SensorMetadata sBuf;
757
      for (const auto &s : publicSensors)
758
          if (!s.is_virtual) {
759
              sBuf = DCDB::PublicSensor::publicSensorToMetadata(s);
760
761
              if(sBuf.isValid())
                  metadataStore->store(*sBuf.getPattern(), sBuf);
762
          }
763
      publicSensors.clear();
764
765
766
767
768

      boost::asio::io_context io;
      boost::thread_group     threads;

      analyticsController = new AnalyticsController(mySensorConfig, mySensorDataStore, io);
769
      analyticsController->setCache(&mySensorCache);
770
      analyticsController->setMetadataStore(metadataStore);
Alessio Netti's avatar
Alessio Netti committed
771
      queryEngine.setFilter(analyticsSettings.filter);
Alessio Netti's avatar
Alessio Netti committed
772
      queryEngine.setJobFilter(analyticsSettings.jobFilter);
773
      queryEngine.setJobMatch(analyticsSettings.jobMatch);
774
      queryEngine.setJobIDFilter(analyticsSettings.jobIdFilter);
775
      queryEngine.setJobDomainId(analyticsSettings.jobDomainId);
Alessio Netti's avatar
Alessio Netti committed
776
      queryEngine.setSensorHierarchy(analyticsSettings.hierarchy);
777
      queryEngine.setQueryCallback(sensorQueryCallback);
778
      queryEngine.setGroupQueryCallback(sensorGroupQueryCallback);
779
      queryEngine.setMetadataQueryCallback(metadataQueryCallback);
Alessio Netti's avatar
Alessio Netti committed
780
      queryEngine.setJobQueryCallback(jobQueryCallback);
781
      if(!analyticsController->initialize(settings))
782
783
          return EXIT_FAILURE;
      
Alessio Netti's avatar
Alessio Netti committed
784
      LOG_LEVEL vLogLevel = settings.validateConfig ? LOG_LEVEL::info : LOG_LEVEL::debug;
785
786
787
788
      LOG_VAR(vLogLevel) << "-----  Configuration  -----";

      //print global settings in either case
      LOG(info) << "Global Settings:";
Alessio Netti's avatar
Alessio Netti committed
789
      LOG(info) << "    MQTT-listenAddress: " << settings.mqttListenHost << ":" << settings.mqttListenPort;
Alessio Netti's avatar
Alessio Netti committed
790
      LOG(info) << "    CacheInterval:      " << int(pluginSettings.cacheInterval/1000) << " [s]";
791
      LOG(info) << "    CleaningInterval:   " << settings.cleaningInterval << " [s]";
792
      LOG(info) << "    Threads:            " << settings.threads;
793
794
795
      LOG(info) << "    MessageThreads:     " << settings.messageThreads;
      LOG(info) << "    MessageSlots:       " << settings.messageSlots;
      LOG(info) << "    Daemonize:          " << (settings.daemonize ? "Enabled" : "Disabled");
796
      LOG(info) << "    StatisticsInterval: " << settings.statisticsInterval << " [s]";
797
      LOG(info) << "    StatisticsMqttPart: " << settings.statisticsMqttPart;
Alessio Netti's avatar
Alessio Netti committed
798
      LOG(info) << "    MQTT-prefix:        " << pluginSettings.mqttPrefix;
799
      LOG(info) << "    Auto-publish:       " << (pluginSettings.autoPublish ? "Enabled" : "Disabled");
Alessio Netti's avatar
Alessio Netti committed
800
801
      LOG(info) << "    Write-Dir:          " << pluginSettings.tempdir;
      LOG(info) << (settings.validateConfig ? "    Only validating config files." : "    ValidateConfig:     Disabled");
802

Alessio Netti's avatar
Alessio Netti committed
803
804
      LOG(info) << "Analytics Settings:";
      LOG(info) << "    Hierarchy:          " << (analyticsSettings.hierarchy!="" ? analyticsSettings.hierarchy : "none");
805
      LOG(info) << "    Filter:             " << (analyticsSettings.filter!="" ? analyticsSettings.filter : "none");
806
807
      LOG(info) << "    Job Filter:         " << (analyticsSettings.jobFilter != "" ? analyticsSettings.jobFilter : "none");
      LOG(info) << "    Job Match:          " << (analyticsSettings.jobMatch != "" ? analyticsSettings.jobMatch : "none");
808
      LOG(info) << "    Job ID Filter:      " << (analyticsSettings.jobIdFilter != "" ? analyticsSettings.jobIdFilter : "none");
809
      LOG(info) << "    Job Domain ID:      " << analyticsSettings.jobDomainId;
Alessio Netti's avatar
Alessio Netti committed
810
      
811
      LOG(info) << "Cassandra Driver Settings:";
Alessio Netti's avatar
Alessio Netti committed
812
      LOG(info) << "    Address:            " << cassandraSettings.host << ":" << cassandraSettings.port;
Alessio Netti's avatar
Alessio Netti committed
813
814
815
816
817
      LOG(info) << "    TTL:                " << cassandraSettings.ttl;
      LOG(info) << "    NumThreadsIO:       " << cassandraSettings.numThreadsIo;
      LOG(info) << "    QueueSizeIO:        " << cassandraSettings.queueSizeIo;
      LOG(info) << "    CoreConnPerHost:    " << cassandraSettings.coreConnPerHost;
      LOG(info) << "    DebugLog:           " << (cassandraSettings.debugLog ? "Enabled" : "Disabled");
818
#ifdef SimpleMQTTVerbose
Alessio Netti's avatar
Alessio Netti committed
819
820
      LOG(info) << "    Username:           " << cassandraSettings.username;
	  LOG(info) << "    Password:           " << cassandraSettings.password;
821
822
823
824
#else
      LOG(info) << "    Username and password not printed.";
#endif

825
826
827
828
829
      if (restAPISettings.enabled) {
	  
	  LOG(info) << "RestAPI Settings:";
	  LOG(info) << "    REST Server: " << restAPISettings.host << ":" << restAPISettings.port;
	  LOG(info) << "    Certificate: " << restAPISettings.certificate;
Alessio Netti's avatar
Alessio Netti committed
830
	  LOG(info) << "    Private key file: " << restAPISettings.privateKey;
Michael Ott's avatar
Michael Ott committed
831
	  
832
	  if (config.influxSettings.measurements.size() > 0) {
Michael Ott's avatar
Michael Ott committed
833
	      LOG(info) << "InfluxDB Settings:";
834
	      LOG(info) << "    MQTT-Prefix:  " << config.influxSettings.mqttPrefix;
Michael Ott's avatar
Michael Ott committed
835
836
	      LOG(info) << "    Auto-Publish: " << (config.influxSettings.publish ? "Enabled" : "Disabled");

837
	      for (auto &m: config.influxSettings.measurements) {
Michael Ott's avatar
Michael Ott committed
838
		  LOG(info) << "    Measurement: " << m.first;
839
		  LOG(info) << "        MQTT-Part:   " << m.second.mqttPart;
Michael Ott's avatar
Michael Ott committed
840
		  LOG(info) << "        Tag:         " << m.second.tag;
Michael Ott's avatar
Michael Ott committed
841
842
843
844
845
846
		  if ((m.second.tagRegex.size() > 0) && (m.second.tagSubstitution.size() > 0)) {
		      if (m.second.tagSubstitution != "&") {
			  LOG(info) << "        TagFilter:   s/" << m.second.tagRegex.str() << "/" <<  m.second.tagSubstitution << "/";
		      } else {
			  LOG(info) << "    TagFilter:   " << m.second.tagRegex.str();
		      }
Michael Ott's avatar
Michael Ott committed
847
848
849
850
851
852
		  }
		  if (m.second.fields.size() > 0) {
		      stringstream ss;
		      copy(m.second.fields.begin(), m.second.fields.end(), ostream_iterator<std::string>(ss, ","));
		      string fields = ss.str();
		      fields.pop_back();
Michael Ott's avatar
Michael Ott committed
853
		      LOG(info) << "        Fields:      " << fields;
Michael Ott's avatar
Michael Ott committed
854
855
856
		  }
	      }
	  }
857
      }
858
859
      LOG_VAR(vLogLevel) << "-----  Analytics Configuration  -----";
      for(auto& p : analyticsController->getManager()->getPlugins()) {
860
          LOG_VAR(vLogLevel) << "Operator Plugin \"" << p.id << "\"";
861
862
863
864
          p.configurator->printConfig(vLogLevel);
      }
      LOG_VAR(vLogLevel) << "-----  End Configuration  -----";

Alessio Netti's avatar
Alessio Netti committed
865
      if (settings.validateConfig)
866
          return EXIT_SUCCESS;
867
868
869
870
871
872
873
874
875
876
877
878
879
880
      
      LOG(info) << "Creating threads...";
      // Dummy to keep io service alive even if no tasks remain (e.g. because all sensors have been stopped over REST API)
      // Inherited from DCDB Pusher
      keepAliveWork = boost::make_shared<boost::asio::io_context::work>(io);
      // Create pool of threads which handle the sensors
      for(size_t i = 0; i < settings.threads; i++) {
	  threads.create_thread(bind(static_cast< size_t (boost::asio::io_context::*) () >(&boost::asio::io_context::run), &io));
      }
      LOG(info) << "Threads created!";
      
      analyticsController->start();
      LOG(info) << "AnalyticsController running...";
      
881
882
883
      /*
       * Start the MQTT Message Server.
       */
Alessio Netti's avatar
Alessio Netti committed
884
      SimpleMQTTServer ms(settings.mqttListenHost, settings.mqttListenPort, settings.messageThreads, settings.messageSlots);
885
      
886
887
888
      ms.setMessageCallback(mqttCallback);
      ms.start();

Alessio Netti's avatar
Logging    
Alessio Netti committed
889
      LOG(info) << "MQTT Server running...";
890
      
891
892
893
      /*
       * Start the HTTP Server for the REST API
       */
894
      if (restAPISettings.enabled) {
895
          httpsServer = new CARestAPI(restAPISettings, &config.influxSettings, &mySensorCache, mySensorDataStore, mySensorConfig, analyticsController, &ms, io);
896
897
898
          config.readRestAPIUsers(httpsServer);
          httpsServer->start();
          LOG(info) <<  "HTTP Server running...";
899
      }
900

901
902
903
      /*
       * Run (hopefully) forever...
       */
904
      newAutoPub = false;
905
      keepRunning = 1;
906
      uint64_t start, end;
907
      float elapsed;
908
909
      msgCtr = 0;
      readingCtr = 0;
910
911
      dbQueryCtr = 0;
      cachedQueryCtr = 0;
912

913
914
      start = getTimestamp();
      uint64_t lastCleanup = start;
915
      uint64_t sleepInterval = (settings.statisticsInterval > 0) ? settings.statisticsInterval : 60;
Alessio Netti's avatar
Alessio Netti committed
916

917
      LOG(info) << "Collect Agent running...";
918
      while(keepRunning) {
919
920
921
922
923
924
925
926
927
928
929
930
	  start = getTimestamp();
	  if(NS_TO_S(start) - NS_TO_S(lastCleanup) > settings.cleaningInterval) {
	      uint64_t purged = mySensorCache.clean(S_TO_NS(settings.cleaningInterval));
	      lastCleanup = start;
	      if(purged > 0)
		  LOG(info) << "Cache: purged " << purged << " obsolete entries";
	  }
	  if(newAutoPub) {
	      newAutoPub = false;
	      mySensorConfig->setPublishedSensorsWritetime(getTimestamp());
	  }
	  
931
	  sleep(sleepInterval);
932
	  
933
934
	  if((settings.statisticsInterval > 0) && keepRunning) {
	      /* not really thread safe but will do the job */
935
	      end = getTimestamp();
936
937
	      elapsed = (float)(NS_TO_S(end) - NS_TO_S(start));
	      float aIns = ceil(((float)analyticsController->getReadingCtr()) / elapsed);
938
939
	      float cacheReq = ceil(((float)cachedQueryCtr) / elapsed);
	      float dbReq = ceil(((float)dbQueryCtr) / elapsed);
940
941
942
	      float rIns = restAPISettings.enabled ? ceil(((float)httpsServer->getInfluxCounter()) / elapsed) : 0.0f;
	      float mIns = ceil(((float)readingCtr) / elapsed);
	      float mMsg = ceil(((float) msgCtr) / elapsed);
943
	      LOG(info) << "Performance: MQTT [" << std::fixed << std::setprecision(0) << mIns << " ins/s|" << mMsg << " msg/s]   REST [" << rIns << " ins/s]   Analytics [" << aIns << " ins/s]   Cache [" << cacheReq << " req/s]   DB [" << dbReq << " req/s]";
944
945
946
947
948
949
950
951
	      std::map<std::string, hostInfo_t> lastSeen = ms.collectLastSeen();
	      uint64_t connectedHosts = 0;
	      for (auto h: lastSeen) {
		  if (h.second.lastSeen >= end - S_TO_NS(settings.statisticsInterval)) {
		      connectedHosts++;
		  }
	      }
	      LOG(info) << "Connected hosts: " << connectedHosts;
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
	      
	      if (settings.statisticsMqttPart.size() > 0) {
		  std::string statisticsMqttTopic = pluginSettings.mqttPrefix + settings.statisticsMqttPart;
		  std::list<SensorDataStoreReading> stats;
		  stats.push_back(SensorDataStoreReading(SensorId(statisticsMqttTopic+"/msgs"), end, msgCtr));
		  stats.push_back(SensorDataStoreReading(SensorId(statisticsMqttTopic+"/cachedQueries"), end, cachedQueryCtr));
		  stats.push_back(SensorDataStoreReading(SensorId(statisticsMqttTopic+"/dbQueries"), end, dbQueryCtr));
		  stats.push_back(SensorDataStoreReading(SensorId(statisticsMqttTopic+"/readings"), end, readingCtr));
		  stats.push_back(SensorDataStoreReading(SensorId(statisticsMqttTopic+"/hosts"), end, connectedHosts));
		  for (auto s: stats) {
		      mySensorDataStore->insert(s);
		      mySensorCache.storeSensor(s);
		  }
	      }
	      
967
	      msgCtr = 0;
968
969
	      cachedQueryCtr = 0;
	      dbQueryCtr = 0;
970
971
	      readingCtr = 0;
	  }
972
973
      }

Alessio Netti's avatar
Logging    
Alessio Netti committed
974
      LOG(info) << "Stopping...";
975
      ms.stop();
Alessio Netti's avatar
Logging    
Alessio Netti committed
976
      LOG(info) << "MQTT Server stopped...";
Alessio Netti's avatar
Alessio Netti committed
977
978
979
      if (restAPISettings.enabled) { 
          httpsServer->stop();
          LOG(info) << "HTTP Server stopped...";
980
      }
Alessio Netti's avatar
Alessio Netti committed
981
      analyticsController->stop();