Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

collectagent.cpp 18.3 KB
Newer Older
1
2
//================================================================================
// Name        : collectagent.cpp
3
4
// Author      : Axel Auweter
// Copyright   : Leibniz Supercomputing Centre
5
// Description : Main code of the CollectAgent
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2011-2016 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
26

27
28
#include <boost/network/protocol/http/server.hpp>
#include <boost/network/utils/thread_pool.hpp>
29
#include <boost/network/uri.hpp>
30
#include <iostream>
31
#include <cstdlib>
32
#include <signal.h>
33
34
#include <unistd.h>
#include <string>
35

36
#include <boost/date_time/posix_time/posix_time.hpp>
37

38
#include <dcdb/connection.h>
39
#include <dcdb/sensordatastore.h>
40
#include <dcdb/sensorconfig.h>
41
#include <dcdb/version.h>
42
#include "version.h"
43

44
#include "configuration.h"
45
#include "simplemqttserver.h"
46
#include "messaging.h"
47
#include "abrt.h"
48
#include "dcdbdaemon.h"
49
50
#include "sensorcache.h"

51
52
53
#define __STDC_FORMAT_MACROS
#include <inttypes.h>

54
using namespace std;
55
56

int keepRunning;
57
bool statistics;
58
59
uint64_t msgCtr;
uint64_t pmsgCtr;
60
uint64_t readingCtr;
Alessio Netti's avatar
Alessio Netti committed
61
SensorCache mySensorCache;
62
DCDB::Connection* dcdbConn;
63
DCDB::SensorDataStore *mySensorDataStore;
64
65
DCDB::SensorConfig *mySensorConfig;
DCDB::SCError err;
66
logger_t lg;
67

68
/* Normal termination (SIGINT, CTRL+C) */
69
70
void sigHandler(int sig)
{
Alessio Netti's avatar
Alessio Netti committed
71
72
73
74
75
  boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
  if( sig == SIGINT )
      LOG(fatal) << "Received SIGINT";
  else if( sig == SIGTERM )
      LOG(fatal) << "Received SIGTERM";
76
77
78
  keepRunning = 0;
}

79
80
81
82
83
84
/* Crash */
void abrtHandler(int sig)
{
  abrt(EXIT_FAILURE, SIGNAL);
}

85
86
87
88
89
struct httpHandler_t;
typedef boost::network::http::server<httpHandler_t> httpServer_t;
struct httpHandler_t {
  void operator()(httpServer_t::request const &request, httpServer_t::connection_ptr connection) {
    httpServer_t::string_type ip = source(request);
90
    std::ostringstream data;
91
    static httpServer_t::response_header headers[] = { { "Connection", "close" }, { "Content-Type", "text/plain" } };
92

93
94
    //try getting the latest value 
    try {
95
96
97
98
99
      boost::network::uri::uri uri("http://localhost"+request.destination);
      std::map<std::string, std::string> queries;
      boost::network::uri::query_map(uri, queries);
      int avg = atoi(queries.find("avg")->second.c_str());

Alessio Netti's avatar
Alessio Netti committed
100
      int64_t val = mySensorCache.getSensor(uri.path(), (uint64_t)avg * 1000000000);
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124

      data << val << "\n";
      //data << "Sid : " << sid.toString() << ", Value: " << val << "." << std::endl;

      connection->set_status(httpServer_t::connection::ok);
      connection->set_headers(boost::make_iterator_range(headers, headers + 2));
      connection->write(data.str());

    }

    catch (const std::invalid_argument& e) {
      connection->set_status(httpServer_t::connection::not_found);
      connection->set_headers(boost::make_iterator_range(headers, headers + 2));
      connection->write("Error: Sensor id not found.\n");
    } catch (const std::out_of_range &e) {
      connection->set_status(httpServer_t::connection::no_content);
      connection->set_headers(boost::make_iterator_range(headers, headers + 2));
      connection->write("Error: Sensor unavailable.\n");
    } catch (const std::exception& e) {
      connection->set_status(httpServer_t::connection::internal_server_error);
      connection->set_headers(boost::make_iterator_range(headers, headers + 2));
      connection->write("Server error.\n");
    }

125
126
127
128
  }
};


129
int mqttCallback(SimpleMQTTMessage *msg)
130
131
132
133
{
  /*
   * Increment the msgCtr/vmsgCtr for statistics.
   */
134
  msgCtr++;
135
136
137
  if (msg->isPublish())
    pmsgCtr++;

138
  uint64_t len;
139
140
141
  /*
   * Decode the message and put into the database.
   */
142
  if (msg->isPublish()) {
143
144
145
146
      const char *topic = msg->getTopic().c_str();
      // We check whether the topic includes the \DCDB_MAP\ keyword, indicating that the payload will contain the
      // sensor's name. In that case, we set the mappingMessage flag to true, and filter the keyword out of the prefix
      // We use strncmp as it is the most efficient way to do it
Alessio Netti's avatar
Alessio Netti committed
147
      if (strncmp(topic, DCDB_MAP, DCDB_MAP_LEN) == 0) {
148
          if ((len = msg->getPayloadLength()) == 0) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
149
              LOG(error) << "Empty topic-to-name mapping message received";
150
151
              return 1;
          }
152

153
154
155
156
157
158
          string sensorName((char *) msg->getPayload(), len);
          err = mySensorConfig->publishSensor(sensorName.c_str(), topic + DCDB_MAP_LEN);

          // PublishSensor does most of the error checking for us
          switch (err) {
              case DCDB::SC_INVALIDPATTERN:
Alessio Netti's avatar
Logging    
Alessio Netti committed
159
                  LOG(error) << "Invalid sensor topic : " << msg->getTopic();
160
161
                  return 1;
              case DCDB::SC_INVALIDPUBLICNAME:
Alessio Netti's avatar
Logging    
Alessio Netti committed
162
                  LOG(error) << "Invalid sensor public name: " << sensorName;
163
164
                  return 1;
              case DCDB::SC_INVALIDSESSION:
Alessio Netti's avatar
Logging    
Alessio Netti committed
165
                  LOG(error) << "Cannot reach sensor data store.";
166
167
168
169
170
171
172
173
174
175
176
                  return 1;
              default:
                  break;
          }
      } else {
          mqttPayload buf, *payload;

          len = msg->getPayloadLength();
          //In the 64 bit message case, the collect agent provides a timestamp
          if (len == sizeof(uint64_t)) {
              payload = &buf;
177
              payload->value = *((int64_t *) msg->getPayload());
178
179
              payload->timestamp = Messaging::calculateTimestamp();
              len = sizeof(uint64_t) * 2;
180
          }
181
182
183
184
185
186
              //...otherwise it just retrieves it from the MQTT message payload.
          else if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
              payload = (mqttPayload *) msg->getPayload();
          }
              //...otherwise this message is malformed -> ignore...
          else {
Alessio Netti's avatar
Logging    
Alessio Netti committed
187
              LOG(error) << "Message malformed";
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
              return 1;
          }

          /*
           * Check if we can decode the message topic
           * into a valid SensorId. If successful, store
           * the record in the database.
           */
          DCDB::SensorId sid;
          if (sid.mqttTopicConvert(msg->getTopic())) {
#if 0
              cout << "Topic decode successful:" << endl
                  << "  Raw:            " << hex << setw(16) << setfill('0') << sid.getRaw()[0] << hex << setw(16) << setfill('0') << sid.getRaw()[1] << endl
                  << "  DeviceLocation: " << hex << setw(16) << setfill('0') << sid.getDeviceLocation() << endl
                  << "  device_id:      " << hex << setw(8) << setfill('0') << sid.getDeviceSensorId().device_id << endl
                  << "  sensor_number:  " << hex << setw(4) << setfill('0') << sid.getDeviceSensorId().sensor_number << endl << dec;

              cout << "Payload ("  << len/sizeof(mqttPayload) << " messages):"<< endl;
              for (uint64_t i=0; i<len/sizeof(mqttPayload); i++) {
                cout << "  " << i << ": ts=" << payload[i].timestamp << " val=" << payload[i].value << endl;
              }
              cout << endl;
210
#endif
211
	      std::list<DCDB::SensorDataStoreReading> readings;
212
              for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
213
214
		  DCDB::SensorDataStoreReading r(sid, payload[i].timestamp, payload[i].value);
		  readings.push_back(r);
215
216
                  mySensorCache.storeSensor(sid, payload[i].timestamp, payload[i].value);
              }
217
	      mySensorDataStore->insertBatch(readings);
218
	      readingCtr+= readings.size();
219

220
              //mySensorCache.dump();
221
          }
222
#if 1
223
224
225
226
          else {
              cout << "Wrong topic format: " << msg->getTopic() << "\n";
              return 1;
          }
227
#endif
228
      }
229
  }
230
  return 0;
231
232
}

233
234
235



236
237
238
239
/*
 * Print usage information
 */
void usage() {
Alessio Netti's avatar
Alessio Netti committed
240
241
  Configuration config("");
  globalCA_t& defaults = config.getGlobal();
242
243
244
245
246
  /*
             1         2         3         4         5         6         7         8
   012345678901234567890123456789012345678901234567890123456789012345678901234567890
   */
  cout << "Usage:" << endl;
Alessio Netti's avatar
Alessio Netti committed
247
  cout << "  collectagent [-d] [-s] [-m<host>] [-r<host>] [-c<host>] [-u<username>] [-p<password>] [-t<ttl>] [-v<verbosity>] <path/to/configfiles/>" << endl;
248
249
250
251
  cout << "  collectagent -h" << endl;
  cout << endl;
  
  cout << "Options:" << endl;
252
253
254
  cout << "  -m<host>      MQTT listen address     [default: " << defaults.mqttListenAddress << "]" << endl;
  cout << "  -r<host>      REST API listen address [default: " << defaults.restListenAddress << "]" << endl;
  cout << "  -c<host>      Cassandra host          [default: " << defaults.cassandraSettings.address << "]" << endl;
Michael Ott's avatar
Michael Ott committed
255
256
  cout << "  -u<username>  Cassandra username      [default: none]" << endl;
  cout << "  -p<password>  Cassandra password      [default: none]" << endl;
257
  cout << "  -t<ttl>       Cassandra insert TTL    [default: " << defaults.cassandraSettings.ttl << "]" << endl;
Alessio Netti's avatar
Logging    
Alessio Netti committed
258
259
  cout << "  -v<level>     Set verbosity of output [default: " << defaults.logLevelCmd << "]" << endl
       << "                Can be a number between 5 (all) and 0 (fatal)." << endl;
260
  cout << endl;
Michael Ott's avatar
Michael Ott committed
261
  cout << "  -d            Daemonize" << endl;
262
  cout << "  -s            Print message stats" <<endl;
Michael Ott's avatar
Michael Ott committed
263
  cout << "  -h            This help page" << endl;
264
  cout << endl;
265
266
267
}

int main(int argc, char* const argv[]) {
268
    cout << "CollectAgent " << VERSION << " (libdcdb " << DCDB::Version::getVersion() << ")" << endl << endl;
269
270

  try{
271

272
273
274
275
276
277
278
279
      // Checking if path to config is supplied
      if (argc <= 1) {
          cout << "Please specify a path to the config-directory" << endl << endl;
          usage();
          exit(EXIT_FAILURE);
      }

      // Defining options
Alessio Netti's avatar
Logging    
Alessio Netti committed
280
      const char* opts = "m:r:c:C:u:p:t:v:dDsh";
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296

      // Same mechanism as in DCDBPusher - checking if help string is requested before loading config
      char ret;
      while ((ret = getopt(argc, argv, opts)) != -1) {
          switch (ret)
          {
              case 'h':
                  usage();
                  exit(EXIT_FAILURE);
                  break;
              default:
                  //do nothing (other options are read later on)
                  break;
          }
      }

297
298
      initLogging();
      auto cmdSink = setupCmdLogger();
Alessio Netti's avatar
Logging    
Alessio Netti committed
299

300
301
      Configuration config(argv[argc - 1]);
      if( !config.readGlobal() ) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
302
          LOG(fatal) << "Failed to read global configuration!";
303
304
305
306
          exit(EXIT_FAILURE);
      }

      globalCA_t& settings = config.getGlobal();
307

308
      /* Parse command line */
309
      std::string listenHost, cassandraHost, restApiHost;
310
      std::string listenPort, cassandraPort, restApiPort;
311
312
313

      optind = 1;
      while ((ret=getopt(argc, argv, opts))!=-1) {
314
          switch(ret) {
315
              case 'm':
316
                  settings.mqttListenAddress = optarg;
317
                  break;
318
              case 'r':
319
                  settings.restListenAddress = optarg;
320
                  break;
321
              case 'c':
322
323
                  settings.cassandraSettings.address = optarg;
                  break;
Michael Ott's avatar
Michael Ott committed
324
              case 'u':
325
                  settings.cassandraSettings.username = optarg;
326
                  break;
Michael Ott's avatar
Michael Ott committed
327
              case 'p': {
328
329
                  settings.cassandraSettings.password = optarg;
                  // What does this do? Mask the password?
Michael Ott's avatar
Michael Ott committed
330
331
332
333
334
335
336
                  size_t pwdLen = strlen(optarg);
                  memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
                  if (pwdLen > 3) {
                      memset(optarg+3, 0, pwdLen-3);
                  }
                  break;
              }
337
              case 't':
338
                  settings.cassandraSettings.ttl = stoul(optarg);
339
                  break;
Alessio Netti's avatar
Logging    
Alessio Netti committed
340
              case 'v':
341
                  settings.logLevelCmd = translateLogLevel(stoi(optarg));
Alessio Netti's avatar
Logging    
Alessio Netti committed
342
                  break;
343
              case 'd':
344
              case 'D':
345
                  settings.daemonize = 1;
346
                  break;
347
              case 's':
348
                  settings.statistics = 1;
349
                  break;
350
              case 'h':
351
352
353
              default:
                  usage();
                  exit(EXIT_FAILURE);
354
355
356
          }
      }

357
      auto fileSink = setupFileLogger(settings.tempDir, std::string("collectagent"));
Alessio Netti's avatar
Logging    
Alessio Netti committed
358
359
360
361
      //severity level may be overwritten (per option or config-file) --> set it according to globalSettings
      fileSink->set_filter(boost::log::trivial::severity >= settings.logLevelFile);
      cmdSink->set_filter(boost::log::trivial::severity >= settings.logLevelCmd);

362
      /*
Alessio Netti's avatar
Alessio Netti committed
363
       * Catch SIGINT and SIGTERM signals to allow for proper server shutdowns.
364
365
       */
      signal(SIGINT, sigHandler);
Alessio Netti's avatar
Alessio Netti committed
366
      signal(SIGTERM, sigHandler);
367
368
369
370
371
372
373
374
375
376
377

      /*
       * Catch critical signals to allow for backtraces
       */
      signal(SIGABRT, abrtHandler);
      signal(SIGSEGV, abrtHandler);

      // Daemonizing the collectagent
      if(settings.daemonize)
          dcdbdaemon();

378
379
380
      /*
       * Parse hostnames for port specifications
       */
381
      listenHost = string(settings.mqttListenAddress);
382
383
384
385
386
      size_t pos = listenHost.find(":");
      if (pos != string::npos) {
        listenPort = listenHost.substr(pos+1);
        listenHost.erase(pos);
      } else {
387
        listenPort = LISTENPORT;
388
      }
389
390

      cassandraHost = string(settings.cassandraSettings.address);
391
392
393
394
395
      pos = cassandraHost.find(":");
      if (pos != string::npos) {
        cassandraPort = cassandraHost.substr(pos+1);
        cassandraHost.erase(pos);
      } else {
396
        cassandraPort = CASSANDRAPORT;
397
      }
398
399

      restApiHost = string(settings.restListenAddress);
400
401
402
403
404
      pos = restApiHost.find(":");
      if (pos != string::npos) {
        restApiPort = restApiHost.substr(pos+1);
        restApiHost.erase(pos);
      } else {
405
        restApiPort = RESTAPIPORT;
406
407
      }

408
409
      // Setting the size of the sensor cache
      mySensorCache.setMaxHistory(settings.cacheInterval * 1000000000);
410

411
412
      //Allocate and initialize connection to Cassandra.
      dcdbConn = new DCDB::Connection(cassandraHost, atoi(cassandraPort.c_str()), settings.cassandraSettings.username, settings.cassandraSettings.password);
Alessio Netti's avatar
Alessio Netti committed
413
414
415
416
417
      dcdbConn->setNumThreadsIo(settings.cassandraSettings.numThreadsIo);
      dcdbConn->setQueueSizeIo(settings.cassandraSettings.queueSizeIo);
      uint32_t params[3] = {settings.cassandraSettings.coreConnPerHost, settings.cassandraSettings.maxConnPerHost, settings.cassandraSettings.maxConcRequests};
      dcdbConn->setBackendParams(params);

418

Axel Auweter's avatar
Axel Auweter committed
419
      if (!dcdbConn->connect()) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
420
          LOG(fatal) << "Cannot connect to Cassandra!";
421
422
423
424
425
426
          exit(EXIT_FAILURE);
      }

      /*
       * Legacy behavior: Initialize the DCDB schema in Cassandra.
       */
Axel Auweter's avatar
Axel Auweter committed
427
      dcdbConn->initSchema();
428

Alessio Netti's avatar
Alessio Netti committed
429

430
431
432
      /*
       * Allocate the SensorDataStore.
       */
433
      mySensorDataStore = new DCDB::SensorDataStore(dcdbConn);
434
      mySensorConfig = new DCDB::SensorConfig(dcdbConn);
435
436
437

      /*
       * Set TTL for data store inserts if TTL > 0.
438
       */
439
440
      if (settings.cassandraSettings.ttl > 0)
        mySensorDataStore->setTTL(settings.cassandraSettings.ttl);
Alessio Netti's avatar
Alessio Netti committed
441
      mySensorDataStore->setDebugLog(settings.cassandraSettings.debugLog);
442

443
444
445
      /*
       * Start the MQTT Message Server.
       */
446
      SimpleMQTTServer ms(listenHost, listenPort, settings.messageThreads, settings.messageSlots);
447
      
448
449
450
      ms.setMessageCallback(mqttCallback);
      ms.start();

Alessio Netti's avatar
Logging    
Alessio Netti committed
451
      LOG(info) << "MQTT Server running...";
452
      
453
454
455
456
457
458
459
460
461
462
463
464
      /*
       * Start the HTTP Server for the REST API
       */
      std::thread httpThread;
      httpHandler_t httpHandler;
      httpServer_t::options httpOptions(httpHandler);

      httpOptions.reuse_address(true);
      httpOptions.thread_pool(std::make_shared<boost::network::utils::thread_pool>());
      httpServer_t httpServer(httpOptions.address(restApiHost).port(restApiPort));
      httpThread = std::thread([&httpServer] { httpServer.run(); });

Alessio Netti's avatar
Logging    
Alessio Netti committed
465
      LOG(info) <<  "HTTP Server running...";
466

467
468
469
470
471
472
      /*
       * Run (hopefully) forever...
       */
      keepRunning = 1;
      timeval start, end;
      double elapsed;
473
474
475
      msgCtr = 0;
      pmsgCtr = 0;
      readingCtr = 0;
476

Alessio Netti's avatar
Alessio Netti committed
477
478
479
      gettimeofday(&start, NULL);
      uint64_t lastCleanup = start.tv_sec;

480
      LOG(info) << "Collect Agent running...";
481
482
      while(keepRunning) {
          gettimeofday(&start, NULL);
Alessio Netti's avatar
Alessio Netti committed
483
484
485
486
487
488
489
          if(start.tv_sec - lastCleanup > settings.cleaningInterval) {
              uint64_t purged = mySensorCache.clean(settings.cleaningInterval * 1000000000);
              lastCleanup = start.tv_sec;
              if(purged > 0)
                  LOG(info) << "Cache: purged " << purged << " obsolete entries";
          }

490
          sleep(60);
491
492
493
494
          /* not really thread safe but will do the job */
          gettimeofday(&end, NULL);
          elapsed = (end.tv_sec - start.tv_sec) * 1000.0;
          elapsed += (end.tv_usec - start.tv_usec) / 1000.0;
495
          float publish = msgCtr?(pmsgCtr*100.0)/msgCtr:0;
Alessio Netti's avatar
Alessio Netti committed
496
          if (settings.statistics && keepRunning) {
497
              LOG(info) << "Performance: " << (readingCtr/elapsed)*1000.0 << " inserts/s, " << (msgCtr/elapsed)*1000.0 << " messages/s (" << publish << "% PUBLISH)";
498
          }
499
          msgCtr = 0;
500
          pmsgCtr = 0;
501
	  readingCtr = 0;
502
503
      }

Alessio Netti's avatar
Logging    
Alessio Netti committed
504
      LOG(info) << "Stopping...";
505
506

      ms.stop();
Alessio Netti's avatar
Logging    
Alessio Netti committed
507
      LOG(info) << "MQTT Server stopped...";
508
509
      httpServer.stop();
      httpThread.join();
Alessio Netti's avatar
Logging    
Alessio Netti committed
510
      LOG(info) << "HTTP Server stopped...";
511
      delete mySensorDataStore;
512
      delete mySensorConfig;
Axel Auweter's avatar
Axel Auweter committed
513
514
      dcdbConn->disconnect();
      delete dcdbConn;
Alessio Netti's avatar
Logging    
Alessio Netti committed
515
      LOG(info) << "Collect Agent closed. Bye bye...";
516
  }
517
  catch (const exception& e) {
Alessio Netti's avatar
Logging    
Alessio Netti committed
518
      LOG(fatal) << "Exception: " << e.what();
519
      abrt(EXIT_FAILURE, INTERR);
520
521
  }

522
  return EXIT_SUCCESS;
523
}
524
525