Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

collectagent.cpp 18.5 KB
Newer Older
1
2
//================================================================================
// Name        : collectagent.cpp
3
4
// Author      : Axel Auweter
// Copyright   : Leibniz Supercomputing Centre
5
// Description : Main code of the CollectAgent
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2011-2016 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
26

27
28
#include <boost/network/protocol/http/server.hpp>
#include <boost/network/utils/thread_pool.hpp>
29
#include <boost/network/uri.hpp>
30
#include <iostream>
31
#include <cstdlib>
32
#include <signal.h>
33
34
#include <unistd.h>
#include <string>
35

36
#include <boost/date_time/posix_time/posix_time.hpp>
37

38
#include <dcdb/connection.h>
39
#include <dcdb/sensordatastore.h>
40
#include <dcdb/sensorconfig.h>
41
#include <dcdb/version.h>
42
#include "version.h"
43

44
#include "configuration.h"
45
#include "simplemqttserver.h"
46
#include "messaging.h"
47
#include "abrt.h"
48
#include "dcdbdaemon.h"
49

50
51
#include "sensorcache.h"

52
53
54
#define __STDC_FORMAT_MACROS
#include <inttypes.h>

55
using namespace std;
56
57

int keepRunning;
58
bool statistics;
59
60
uint64_t msgCtr;
uint64_t pmsgCtr;
61
uint64_t readingCtr;
62
DCDB::Connection* dcdbConn;
63
DCDB::SensorDataStore *mySensorDataStore;
64
DCDB::SensorConfig *mySensorConfig;
65
DCDB::SensorCache mySensorCache;
66
DCDB::SCError err;
67
logger_t lg;
68

69
/* Normal termination (SIGINT, CTRL+C) */
70
71
void sigHandler(int sig)
{
Alessio Netti's avatar
Alessio Netti committed
72
73
74
75
76
  boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
  if( sig == SIGINT )
      LOG(fatal) << "Received SIGINT";
  else if( sig == SIGTERM )
      LOG(fatal) << "Received SIGTERM";
77
78
79
  keepRunning = 0;
}

80
81
82
83
84
85
/* Crash */
void abrtHandler(int sig)
{
  abrt(EXIT_FAILURE, SIGNAL);
}

86
87
88
89
90
struct httpHandler_t;
typedef boost::network::http::server<httpHandler_t> httpServer_t;
struct httpHandler_t {
  void operator()(httpServer_t::request const &request, httpServer_t::connection_ptr connection) {
    httpServer_t::string_type ip = source(request);
91
    std::ostringstream data;
92
    static httpServer_t::response_header headers[] = { { "Connection", "close" }, { "Content-Type", "text/plain" } };
93

94
95
    //try getting the latest value 
    try {
96
97
98
99
100
      boost::network::uri::uri uri("http://localhost"+request.destination);
      std::map<std::string, std::string> queries;
      boost::network::uri::query_map(uri, queries);
      int avg = atoi(queries.find("avg")->second.c_str());

101
      int64_t val = mySensorCache.getSensor(uri.path(), (uint64_t) avg);
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125

      data << val << "\n";
      //data << "Sid : " << sid.toString() << ", Value: " << val << "." << std::endl;

      connection->set_status(httpServer_t::connection::ok);
      connection->set_headers(boost::make_iterator_range(headers, headers + 2));
      connection->write(data.str());

    }

    catch (const std::invalid_argument& e) {
      connection->set_status(httpServer_t::connection::not_found);
      connection->set_headers(boost::make_iterator_range(headers, headers + 2));
      connection->write("Error: Sensor id not found.\n");
    } catch (const std::out_of_range &e) {
      connection->set_status(httpServer_t::connection::no_content);
      connection->set_headers(boost::make_iterator_range(headers, headers + 2));
      connection->write("Error: Sensor unavailable.\n");
    } catch (const std::exception& e) {
      connection->set_status(httpServer_t::connection::internal_server_error);
      connection->set_headers(boost::make_iterator_range(headers, headers + 2));
      connection->write("Server error.\n");
    }

126
127
128
129
  }
};


130
int mqttCallback(SimpleMQTTMessage *msg)
131
132
133
134
{
  /*
   * Increment the msgCtr/vmsgCtr for statistics.
   */
135
  msgCtr++;
136
137
138
  if (msg->isPublish())
    pmsgCtr++;

139
  uint64_t len;
140
141
142
  /*
   * Decode the message and put into the database.
   */
143
  if (msg->isPublish()) {
144
145
146
147
      const char *topic = msg->getTopic().c_str();
      // We check whether the topic includes the \DCDB_MAP\ keyword, indicating that the payload will contain the
      // sensor's name. In that case, we set the mappingMessage flag to true, and filter the keyword out of the prefix
      // We use strncmp as it is the most efficient way to do it
Alessio Netti's avatar
Alessio Netti committed
148
      if (strncmp(topic, DCDB_MAP, DCDB_MAP_LEN) == 0) {
149
          if ((len = msg->getPayloadLength()) == 0) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
150
              LOG(error) << "Empty topic-to-name mapping message received";
151
152
              return 1;
          }
153

154
155
156
157
158
159
          string sensorName((char *) msg->getPayload(), len);
          err = mySensorConfig->publishSensor(sensorName.c_str(), topic + DCDB_MAP_LEN);

          // PublishSensor does most of the error checking for us
          switch (err) {
              case DCDB::SC_INVALIDPATTERN:
Alessio Netti's avatar
Logging    
Alessio Netti committed
160
                  LOG(error) << "Invalid sensor topic : " << msg->getTopic();
161
162
                  return 1;
              case DCDB::SC_INVALIDPUBLICNAME:
Alessio Netti's avatar
Logging    
Alessio Netti committed
163
                  LOG(error) << "Invalid sensor public name: " << sensorName;
164
165
                  return 1;
              case DCDB::SC_INVALIDSESSION:
Alessio Netti's avatar
Logging    
Alessio Netti committed
166
                  LOG(error) << "Cannot reach sensor data store.";
167
168
169
170
171
172
173
174
175
176
177
                  return 1;
              default:
                  break;
          }
      } else {
          mqttPayload buf, *payload;

          len = msg->getPayloadLength();
          //In the 64 bit message case, the collect agent provides a timestamp
          if (len == sizeof(uint64_t)) {
              payload = &buf;
178
              payload->value = *((int64_t *) msg->getPayload());
179
180
              payload->timestamp = Messaging::calculateTimestamp();
              len = sizeof(uint64_t) * 2;
181
          }
182
183
184
185
186
187
              //...otherwise it just retrieves it from the MQTT message payload.
          else if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
              payload = (mqttPayload *) msg->getPayload();
          }
              //...otherwise this message is malformed -> ignore...
          else {
Alessio Netti's avatar
Logging    
Alessio Netti committed
188
              LOG(error) << "Message malformed";
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
              return 1;
          }

          /*
           * Check if we can decode the message topic
           * into a valid SensorId. If successful, store
           * the record in the database.
           */
          DCDB::SensorId sid;
          if (sid.mqttTopicConvert(msg->getTopic())) {
#if 0
              cout << "Topic decode successful:" << endl
                  << "  Raw:            " << hex << setw(16) << setfill('0') << sid.getRaw()[0] << hex << setw(16) << setfill('0') << sid.getRaw()[1] << endl
                  << "  DeviceLocation: " << hex << setw(16) << setfill('0') << sid.getDeviceLocation() << endl
                  << "  device_id:      " << hex << setw(8) << setfill('0') << sid.getDeviceSensorId().device_id << endl
                  << "  sensor_number:  " << hex << setw(4) << setfill('0') << sid.getDeviceSensorId().sensor_number << endl << dec;

              cout << "Payload ("  << len/sizeof(mqttPayload) << " messages):"<< endl;
              for (uint64_t i=0; i<len/sizeof(mqttPayload); i++) {
                cout << "  " << i << ": ts=" << payload[i].timestamp << " val=" << payload[i].value << endl;
              }
              cout << endl;
211
#endif
212
	      std::list<DCDB::SensorDataStoreReading> readings;
213
              for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
214
215
		  DCDB::SensorDataStoreReading r(sid, payload[i].timestamp, payload[i].value);
		  readings.push_back(r);
216
217
                  mySensorCache.storeSensor(sid, payload[i].timestamp, payload[i].value);
              }
218
	      mySensorDataStore->insertBatch(readings);
219
	      readingCtr+= readings.size();
220

221
              //mySensorCache.dump();
222
          }
223
#if 1
224
225
226
227
          else {
              cout << "Wrong topic format: " << msg->getTopic() << "\n";
              return 1;
          }
228
#endif
229
      }
230
  }
231
  return 0;
232
233
}

234
235
236



237
238
239
240
/*
 * Print usage information
 */
void usage() {
Alessio Netti's avatar
Alessio Netti committed
241
242
  Configuration config("");
  globalCA_t& defaults = config.getGlobal();
243
244
245
246
247
  /*
             1         2         3         4         5         6         7         8
   012345678901234567890123456789012345678901234567890123456789012345678901234567890
   */
  cout << "Usage:" << endl;
Alessio Netti's avatar
Logging    
Alessio Netti committed
248
  cout << "  collectagent [-m<host>] [-r<host>] [-c<host>] [-C<interval>] [-u<username>] [-p<password>] [-t<ttl>] [-v<verbosity>] [-d] [-s] <path/to/configfiles/>" << endl;
249
250
251
252
  cout << "  collectagent -h" << endl;
  cout << endl;
  
  cout << "Options:" << endl;
253
254
255
256
  cout << "  -m<host>      MQTT listen address     [default: " << defaults.mqttListenAddress << "]" << endl;
  cout << "  -r<host>      REST API listen address [default: " << defaults.restListenAddress << "]" << endl;
  cout << "  -c<host>      Cassandra host          [default: " << defaults.cassandraSettings.address << "]" << endl;
  cout << "  -C<interval>  Cache interval in [s]   [default: " << defaults.cacheInterval << "]" << endl;
Michael Ott's avatar
Michael Ott committed
257
258
  cout << "  -u<username>  Cassandra username      [default: none]" << endl;
  cout << "  -p<password>  Cassandra password      [default: none]" << endl;
259
  cout << "  -t<ttl>       Cassandra insert TTL    [default: " << defaults.cassandraSettings.ttl << "]" << endl;
Alessio Netti's avatar
Logging    
Alessio Netti committed
260
261
  cout << "  -v<level>     Set verbosity of output [default: " << defaults.logLevelCmd << "]" << endl
       << "                Can be a number between 5 (all) and 0 (fatal)." << endl;
262
  cout << endl;
Michael Ott's avatar
Michael Ott committed
263
  cout << "  -d            Daemonize" << endl;
264
  cout << "  -s            Print message stats" <<endl;
Michael Ott's avatar
Michael Ott committed
265
  cout << "  -h            This help page" << endl;
266
  cout << endl;
267
268
269
}

int main(int argc, char* const argv[]) {
270
    cout << "CollectAgent " << VERSION << " (libdcdb " << DCDB::Version::getVersion() << ")" << endl << endl;
271
272

  try{
273

274
275
276
277
278
279
280
281
      // Checking if path to config is supplied
      if (argc <= 1) {
          cout << "Please specify a path to the config-directory" << endl << endl;
          usage();
          exit(EXIT_FAILURE);
      }

      // Defining options
Alessio Netti's avatar
Logging    
Alessio Netti committed
282
      const char* opts = "m:r:c:C:u:p:t:v:dDsh";
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298

      // Same mechanism as in DCDBPusher - checking if help string is requested before loading config
      char ret;
      while ((ret = getopt(argc, argv, opts)) != -1) {
          switch (ret)
          {
              case 'h':
                  usage();
                  exit(EXIT_FAILURE);
                  break;
              default:
                  //do nothing (other options are read later on)
                  break;
          }
      }

299
300
      initLogging();
      auto cmdSink = setupCmdLogger();
Alessio Netti's avatar
Logging    
Alessio Netti committed
301

302
303
      Configuration config(argv[argc - 1]);
      if( !config.readGlobal() ) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
304
          LOG(fatal) << "Failed to read global configuration!";
305
306
307
308
          exit(EXIT_FAILURE);
      }

      globalCA_t& settings = config.getGlobal();
309

310
      /* Parse command line */
311
      std::string listenHost, cassandraHost, restApiHost;
312
      std::string listenPort, cassandraPort, restApiPort;
313
314
315

      optind = 1;
      while ((ret=getopt(argc, argv, opts))!=-1) {
316
          switch(ret) {
317
              case 'm':
318
                  settings.mqttListenAddress = optarg;
319
                  break;
320
              case 'r':
321
                  settings.restListenAddress = optarg;
322
                  break;
323
              case 'c':
324
325
326
327
                  settings.cassandraSettings.address = optarg;
                  break;
              case 'C':
                  settings.cacheInterval = stoul(optarg);
328
                  break;
Michael Ott's avatar
Michael Ott committed
329
              case 'u':
330
                  settings.cassandraSettings.username = optarg;
331
                  break;
Michael Ott's avatar
Michael Ott committed
332
              case 'p': {
333
334
                  settings.cassandraSettings.password = optarg;
                  // What does this do? Mask the password?
Michael Ott's avatar
Michael Ott committed
335
336
337
338
339
340
341
                  size_t pwdLen = strlen(optarg);
                  memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
                  if (pwdLen > 3) {
                      memset(optarg+3, 0, pwdLen-3);
                  }
                  break;
              }
342
              case 't':
343
                  settings.cassandraSettings.ttl = stoul(optarg);
344
                  break;
Alessio Netti's avatar
Logging    
Alessio Netti committed
345
              case 'v':
346
                  settings.logLevelCmd = translateLogLevel(stoi(optarg));
Alessio Netti's avatar
Logging    
Alessio Netti committed
347
                  break;
348
              case 'd':
349
              case 'D':
350
                  settings.daemonize = 1;
351
                  break;
352
              case 's':
353
                  settings.statistics = 1;
354
                  break;
355
              case 'h':
356
357
358
              default:
                  usage();
                  exit(EXIT_FAILURE);
359
360
361
          }
      }

362
      auto fileSink = setupFileLogger(settings.tempDir, std::string("collectagent"));
Alessio Netti's avatar
Logging    
Alessio Netti committed
363
364
365
366
      //severity level may be overwritten (per option or config-file) --> set it according to globalSettings
      fileSink->set_filter(boost::log::trivial::severity >= settings.logLevelFile);
      cmdSink->set_filter(boost::log::trivial::severity >= settings.logLevelCmd);

367
      /*
Alessio Netti's avatar
Alessio Netti committed
368
       * Catch SIGINT and SIGTERM signals to allow for proper server shutdowns.
369
370
       */
      signal(SIGINT, sigHandler);
Alessio Netti's avatar
Alessio Netti committed
371
      signal(SIGTERM, sigHandler);
372
373
374
375
376
377
378
379
380
381
382

      /*
       * Catch critical signals to allow for backtraces
       */
      signal(SIGABRT, abrtHandler);
      signal(SIGSEGV, abrtHandler);

      // Daemonizing the collectagent
      if(settings.daemonize)
          dcdbdaemon();

383
384
385
      /*
       * Parse hostnames for port specifications
       */
386
      listenHost = string(settings.mqttListenAddress);
387
388
389
390
391
      size_t pos = listenHost.find(":");
      if (pos != string::npos) {
        listenPort = listenHost.substr(pos+1);
        listenHost.erase(pos);
      } else {
392
        listenPort = LISTENPORT;
393
      }
394
395

      cassandraHost = string(settings.cassandraSettings.address);
396
397
398
399
400
      pos = cassandraHost.find(":");
      if (pos != string::npos) {
        cassandraPort = cassandraHost.substr(pos+1);
        cassandraHost.erase(pos);
      } else {
401
        cassandraPort = CASSANDRAPORT;
402
      }
403
404

      restApiHost = string(settings.restListenAddress);
405
406
407
408
409
      pos = restApiHost.find(":");
      if (pos != string::npos) {
        restApiPort = restApiHost.substr(pos+1);
        restApiHost.erase(pos);
      } else {
410
        restApiPort = RESTAPIPORT;
411
412
      }

413
414
      // Setting the size of the sensor cache
      mySensorCache.setMaxHistory(settings.cacheInterval * 1000000000);
415

416
417
      //Allocate and initialize connection to Cassandra.
      dcdbConn = new DCDB::Connection(cassandraHost, atoi(cassandraPort.c_str()), settings.cassandraSettings.username, settings.cassandraSettings.password);
Alessio Netti's avatar
Alessio Netti committed
418
419
420
421
422
      dcdbConn->setNumThreadsIo(settings.cassandraSettings.numThreadsIo);
      dcdbConn->setQueueSizeIo(settings.cassandraSettings.queueSizeIo);
      uint32_t params[3] = {settings.cassandraSettings.coreConnPerHost, settings.cassandraSettings.maxConnPerHost, settings.cassandraSettings.maxConcRequests};
      dcdbConn->setBackendParams(params);

423

Axel Auweter's avatar
Axel Auweter committed
424
      if (!dcdbConn->connect()) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
425
          LOG(fatal) << "Cannot connect to Cassandra!";
426
427
428
429
430
431
          exit(EXIT_FAILURE);
      }

      /*
       * Legacy behavior: Initialize the DCDB schema in Cassandra.
       */
Axel Auweter's avatar
Axel Auweter committed
432
      dcdbConn->initSchema();
433

Alessio Netti's avatar
Alessio Netti committed
434

435
436
437
      /*
       * Allocate the SensorDataStore.
       */
438
      mySensorDataStore = new DCDB::SensorDataStore(dcdbConn);
439
      mySensorConfig = new DCDB::SensorConfig(dcdbConn);
440
441
442

      /*
       * Set TTL for data store inserts if TTL > 0.
443
       */
444
445
      if (settings.cassandraSettings.ttl > 0)
        mySensorDataStore->setTTL(settings.cassandraSettings.ttl);
Alessio Netti's avatar
Alessio Netti committed
446
      mySensorDataStore->setDebugLog(settings.cassandraSettings.debugLog);
447

448
449
450
      /*
       * Start the MQTT Message Server.
       */
451
      SimpleMQTTServer ms(listenHost, listenPort, settings.messageThreads, settings.messageSlots);
452
      
453
454
455
      ms.setMessageCallback(mqttCallback);
      ms.start();

Alessio Netti's avatar
Logging    
Alessio Netti committed
456
      LOG(info) << "MQTT Server running...";
457
      
458
459
460
461
462
463
464
465
466
467
468
469
      /*
       * Start the HTTP Server for the REST API
       */
      std::thread httpThread;
      httpHandler_t httpHandler;
      httpServer_t::options httpOptions(httpHandler);

      httpOptions.reuse_address(true);
      httpOptions.thread_pool(std::make_shared<boost::network::utils::thread_pool>());
      httpServer_t httpServer(httpOptions.address(restApiHost).port(restApiPort));
      httpThread = std::thread([&httpServer] { httpServer.run(); });

Alessio Netti's avatar
Logging    
Alessio Netti committed
470
      LOG(info) <<  "HTTP Server running...";
471

472
473
474
475
476
477
      /*
       * Run (hopefully) forever...
       */
      keepRunning = 1;
      timeval start, end;
      double elapsed;
478
479
480
      msgCtr = 0;
      pmsgCtr = 0;
      readingCtr = 0;
481

Alessio Netti's avatar
Alessio Netti committed
482
483
484
      gettimeofday(&start, NULL);
      uint64_t lastCleanup = start.tv_sec;

485
      LOG(info) << "Collect Agent running...";
486
487
      while(keepRunning) {
          gettimeofday(&start, NULL);
Alessio Netti's avatar
Alessio Netti committed
488
489
490
491
492
493
494
          if(start.tv_sec - lastCleanup > settings.cleaningInterval) {
              uint64_t purged = mySensorCache.clean(settings.cleaningInterval * 1000000000);
              lastCleanup = start.tv_sec;
              if(purged > 0)
                  LOG(info) << "Cache: purged " << purged << " obsolete entries";
          }

495
          sleep(60);
496
497
498
499
          /* not really thread safe but will do the job */
          gettimeofday(&end, NULL);
          elapsed = (end.tv_sec - start.tv_sec) * 1000.0;
          elapsed += (end.tv_usec - start.tv_usec) / 1000.0;
500
          float publish = msgCtr?(pmsgCtr*100.0)/msgCtr:0;
Alessio Netti's avatar
Alessio Netti committed
501
          if (settings.statistics && keepRunning) {
502
              LOG(info) << "Performance: " << (readingCtr/elapsed)*1000.0 << " inserts/s, " << (msgCtr/elapsed)*1000.0 << " messages/s (" << publish << "% PUBLISH)";
503
          }
504
          msgCtr = 0;
505
          pmsgCtr = 0;
506
	  readingCtr = 0;
507
508
      }

Alessio Netti's avatar
Logging    
Alessio Netti committed
509
      LOG(info) << "Stopping...";
510
511

      ms.stop();
Alessio Netti's avatar
Logging    
Alessio Netti committed
512
      LOG(info) << "MQTT Server stopped...";
513
514
      httpServer.stop();
      httpThread.join();
Alessio Netti's avatar
Logging    
Alessio Netti committed
515
      LOG(info) << "HTTP Server stopped...";
516
      delete mySensorDataStore;
517
      delete mySensorConfig;
Axel Auweter's avatar
Axel Auweter committed
518
519
      dcdbConn->disconnect();
      delete dcdbConn;
Alessio Netti's avatar
Logging    
Alessio Netti committed
520
      LOG(info) << "Collect Agent closed. Bye bye...";
521
  }
522
  catch (const exception& e) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
523
      LOG(fatal) << "Exception: " << e.what();
524
      abrt(EXIT_FAILURE, INTERR);
525
526
  }

527
  return EXIT_SUCCESS;
528
}
529
530