collectagent.cpp 13.3 KB
Newer Older
1
2
//================================================================================
// Name        : collectagent.cpp
3
4
// Author      : Axel Auweter
// Copyright   : Leibniz Supercomputing Centre
5
// Description : Main code of the CollectAgent
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2011-2016 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
26

27
28
#include <boost/network/protocol/http/server.hpp>
#include <boost/network/utils/thread_pool.hpp>
29
#include <boost/network/uri.hpp>
30
#include <iostream>
31
#include <cstdlib>
32
#include <signal.h>
33
34
#include <unistd.h>
#include <string>
35

36
#include <boost/date_time/posix_time/posix_time.hpp>
37

38
#include <dcdb/connection.h>
39
40
#include <dcdb/sensordatastore.h>

41
#include "simplemqttserver.h"
42
#include "messaging.h"
43
#include "abrt.h"
44
#include "dcdbdaemon.h"
45

46
47
#include "sensorcache.h"

48
49
50
#define __STDC_FORMAT_MACROS
#include <inttypes.h>

51
using namespace std;
52

53
54
55
56
57
58
59
60
#define LISTENHOST "localhost"
#define LISTENPORT "1883"
#define CASSANDRAHOST "127.0.0.1"
#define CASSANDRAPORT "9042"
#define RESTAPIHOST "0.0.0.0"
#define RESTAPIPORT "8080"
#define TTL "0"

61
int keepRunning;
62
bool statistics;
63
64
uint64_t msgCtr;
uint64_t pmsgCtr;
65
DCDB::SensorDataStore *mySensorDataStore;
66
DCDB::SensorCache mySensorCache;
67

68
/* Normal termination (SIGINT, CTRL+C) */
69
70
void sigHandler(int sig)
{
71
72
73
  keepRunning = 0;
}

74
75
76
77
78
79
/* Crash */
void abrtHandler(int sig)
{
  abrt(EXIT_FAILURE, SIGNAL);
}

80
81
82
83
84
struct httpHandler_t;
typedef boost::network::http::server<httpHandler_t> httpServer_t;
struct httpHandler_t {
  void operator()(httpServer_t::request const &request, httpServer_t::connection_ptr connection) {
    httpServer_t::string_type ip = source(request);
85
    std::ostringstream data;
86
    static httpServer_t::response_header headers[] = { { "Connection", "close" }, { "Content-Type", "text/plain" } };
87

88
89
    //try getting the latest value 
    try {
90
91
92
93
94
95
      boost::network::uri::uri uri("http://localhost"+request.destination);
      std::map<std::string, std::string> queries;
      boost::network::uri::query_map(uri, queries);
      int avg = atoi(queries.find("avg")->second.c_str());

      uint64_t val = mySensorCache.getSensor(uri.path(), (uint64_t) avg);
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119

      data << val << "\n";
      //data << "Sid : " << sid.toString() << ", Value: " << val << "." << std::endl;

      connection->set_status(httpServer_t::connection::ok);
      connection->set_headers(boost::make_iterator_range(headers, headers + 2));
      connection->write(data.str());

    }

    catch (const std::invalid_argument& e) {
      connection->set_status(httpServer_t::connection::not_found);
      connection->set_headers(boost::make_iterator_range(headers, headers + 2));
      connection->write("Error: Sensor id not found.\n");
    } catch (const std::out_of_range &e) {
      connection->set_status(httpServer_t::connection::no_content);
      connection->set_headers(boost::make_iterator_range(headers, headers + 2));
      connection->write("Error: Sensor unavailable.\n");
    } catch (const std::exception& e) {
      connection->set_status(httpServer_t::connection::internal_server_error);
      connection->set_headers(boost::make_iterator_range(headers, headers + 2));
      connection->write("Server error.\n");
    }

120
121
122
123
  }
};


124
int mqttCallback(SimpleMQTTMessage *msg)
125
126
127
128
{
  /*
   * Increment the msgCtr/vmsgCtr for statistics.
   */
129
  msgCtr++;
130
131
132
133
134
135
  if (msg->isPublish())
    pmsgCtr++;

  /*
   * Decode the message and put into the database.
   */
136
  if (msg->isPublish()) {
137
138
      mqttPayload buf, *payload;
      uint64_t len;
139

140
      len = msg->getPayloadLength();
141
      //In the 64 bit message case, the collect agent provides a timestamp
142
143
144
145
      if (len == sizeof(uint64_t)) {
        payload = &buf;
        payload->value = *((uint64_t*)msg->getPayload());
        payload->timestamp = Messaging::calculateTimestamp();
146
        len = sizeof(uint64_t) * 2;
147
148
      }
      //...otherwise it just retrieves it from the MQTT message payload.
149
150
      else if((len%sizeof(mqttPayload)==0) && (len>0)){
          payload = (mqttPayload*)msg->getPayload();
151
152
153
      }
      //...otherwise this message is malformed -> ignore...
      else {
154
    	  cout << "Message malformed\n";
155
          return 1;
156
      }
157
158
159
160
161
162

      /*
       * Check if we can decode the message topic
       * into a valid SensorId. If successful, store
       * the record in the database.
       */
163
      DCDB::SensorId sid;
Axel Auweter's avatar
Axel Auweter committed
164
      if (sid.mqttTopicConvert(msg->getTopic())) {
165
#if 0
166
167
168
169
170
171
172
173
174
175
176
177
          cout << "Topic decode successful:" << endl
              << "  Raw:            " << hex << setw(16) << setfill('0') << sid.getRaw()[0] << hex << setw(16) << setfill('0') << sid.getRaw()[1] << endl
              << "  DeviceLocation: " << hex << setw(16) << setfill('0') << sid.getDeviceLocation() << endl
              << "  device_id:      " << hex << setw(8) << setfill('0') << sid.getDeviceSensorId().device_id << endl
              << "  reserved:       " << hex << setw(4) << setfill('0') << sid.getDeviceSensorId().rsvd << endl
              << "  sensor_number:  " << hex << setw(4) << setfill('0') << sid.getDeviceSensorId().sensor_number << endl << dec;

          cout << "Payload ("  << len/sizeof(mqttPayload) << " messages):"<< endl;
          for (uint64_t i=0; i<len/sizeof(mqttPayload); i++) {
        	cout << "  " << i << ": ts=" << payload[i].timestamp << " val=" << payload[i].value << endl;
          }
          cout << endl;
178
#endif
179
180
181
          for (uint64_t i=0; i<len/sizeof(mqttPayload); i++) {
            mySensorDataStore->insert(&sid, payload[i].timestamp, payload[i].value);
            mySensorCache.storeSensor(sid, payload[i].timestamp, payload[i].value);
182

183
          }
184
          //mySensorCache.dump();
185
      }
186
#if 1
187
188
      else {
          cout << "Wrong topic format: " << msg->getTopic() << "\n";
189
        return 1;
190
191
      }
#endif
192
  }
193
  return 0;
194
195
}

196
197
198



199
200
201
202
/*
 * Print usage information
 */
void usage() {
203
204
205
206
207
208
209
210
211
212
  /*
             1         2         3         4         5         6         7         8
   012345678901234567890123456789012345678901234567890123456789012345678901234567890
   */
  cout << "Usage:" << endl;
  cout << "  collectagent [-m<host>] [-r<host>] [-c<host>] [-t<ttl>] [-d] [-s]" << endl;
  cout << "  collectagent -h" << endl;
  cout << endl;
  
  cout << "Options:" << endl;
Michael Ott's avatar
Michael Ott committed
213
214
215
216
217
218
  cout << "  -m<host>      MQTT listen address     [default: " << LISTENHOST << ":" << LISTENPORT << "]" << endl;
  cout << "  -r<host>      REST API listen address [default: " << RESTAPIHOST << ":" << RESTAPIPORT << "]" << endl;
  cout << "  -c<host>      Cassandra host          [default: " << CASSANDRAHOST << ":" << CASSANDRAPORT << "]" << endl;
  cout << "  -u<username>  Cassandra username      [default: none]" << endl;
  cout << "  -p<password>  Cassandra password      [default: none]" << endl;
  cout << "  -t<ttl>       Cassandra insert TTL    [default: " << TTL << "]" << endl;
219
  cout << endl;
Michael Ott's avatar
Michael Ott committed
220
221
222
  cout << "  -d            Daemonize" << endl;
  cout << "  -s            Print message statistics" << endl;
  cout << "  -h            This help page" << endl;
223
  cout << endl;
224
225
226
}

int main(int argc, char* const argv[]) {
227
228

  try{
229
      /*
230
       * Catch SIGINT signals to allow for proper server shutdowns.
231
       */
232
      signal(SIGINT, sigHandler);
233

234
235
236
237
238
239
240
      /*
       * Catch critical signals to allow for backtraces
       */
      signal(SIGABRT, abrtHandler);
      signal(SIGSEGV, abrtHandler);
      signal(SIGTERM, abrtHandler);

241
      /* Parse command line */
242
      int ret;
243
      std::string listenHost, cassandraHost, restApiHost, ttl;
Michael Ott's avatar
Michael Ott committed
244
      std::string cassandraUser, cassandraPassword;
245
      std::string listenPort, cassandraPort, restApiPort;
246
247
248
249
250
    
      listenHost = LISTENHOST;
      cassandraHost = CASSANDRAHOST;
      restApiHost = RESTAPIHOST;
      ttl = "0";
251
      statistics = false;
Michael Ott's avatar
Michael Ott committed
252
253
    
    while ((ret=getopt(argc, argv, "l:c:u:p:t:r:dDsh"))!=-1) {
254
          switch(ret) {
255
256
              case 'l':
                  listenHost = optarg;
257
                  break;
258
259
260
              case 'c':
                  cassandraHost = optarg;
                  break;
Michael Ott's avatar
Michael Ott committed
261
262
              case 'u':
                  cassandraUser = optarg;
263
                  break;
Michael Ott's avatar
Michael Ott committed
264
265
266
267
268
269
270
271
272
              case 'p': {
                  cassandraPassword = optarg;
                  size_t pwdLen = strlen(optarg);
                  memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
                  if (pwdLen > 3) {
                      memset(optarg+3, 0, pwdLen-3);
                  }
                  break;
              }
273
274
275
              case 't':
                  ttl = optarg;
                  break;
Michael Ott's avatar
Michael Ott committed
276
277
278
              case 'r':
                restApiHost = optarg;
                break;
279
              case 'd':
280
              case 'D':
281
                  dcdbdaemon();
282
                  break;
283
284
285
              case 's':
                  statistics = true;
                  break;
286
              case 'h':
287
288
289
              default:
                  usage();
                  exit(EXIT_FAILURE);
290
291
292
          }
      }

293
294
295
296
297
298
299
300
      /*
       * Parse hostnames for port specifications
       */
      size_t pos = listenHost.find(":");
      if (pos != string::npos) {
        listenPort = listenHost.substr(pos+1);
        listenHost.erase(pos);
      } else {
301
        listenPort = LISTENPORT;
302
303
304
305
306
307
      }
      pos = cassandraHost.find(":");
      if (pos != string::npos) {
        cassandraPort = cassandraHost.substr(pos+1);
        cassandraHost.erase(pos);
      } else {
308
        cassandraPort = CASSANDRAPORT;
309
310
311
312
313
314
      }
      pos = restApiHost.find(":");
      if (pos != string::npos) {
        restApiPort = restApiHost.substr(pos+1);
        restApiHost.erase(pos);
      } else {
315
        restApiPort = RESTAPIPORT;
316
317
318
      }


319
      /*
320
321
       * Allocate and initialize connection to Cassandra.
       */
322
      DCDB::Connection* dcdbConn;
Michael Ott's avatar
Michael Ott committed
323
      dcdbConn = new DCDB::Connection(cassandraHost, atoi(cassandraPort.c_str()), cassandraUser, cassandraPassword);
324

Axel Auweter's avatar
Axel Auweter committed
325
      if (!dcdbConn->connect()) {
326
327
328
329
330
331
332
          std::cout << "Cannot connect to Cassandra!" << std::endl;
          exit(EXIT_FAILURE);
      }

      /*
       * Legacy behavior: Initialize the DCDB schema in Cassandra.
       */
Axel Auweter's avatar
Axel Auweter committed
333
      dcdbConn->initSchema();
334
335
336
337

      /*
       * Allocate the SensorDataStore.
       */
338
      mySensorDataStore = new DCDB::SensorDataStore(dcdbConn);
339
340
341

      /*
       * Set TTL for data store inserts if TTL > 0.
342
       */
343
344
345
346
347
348
      uint64_t ttlInt;
      std::istringstream ttlParser(ttl);
      if (!(ttlParser >> ttlInt)) {
          std::cout << "Invalid TTL!" << std::endl;
          exit(EXIT_FAILURE);
      }
349
350
351
      if (ttlInt) {
        mySensorDataStore->setTTL(ttlInt);
      }
352

353
354
355
      /*
       * Start the MQTT Message Server.
       */
356
      SimpleMQTTServer ms(listenHost, listenPort);
357
      
358
359
360
      ms.setMessageCallback(mqttCallback);
      ms.start();

361
362
      cout << "MQTT Server running..." << std::endl;
      
363
364
365
366
367
368
369
370
371
372
373
374
375
376
      /*
       * Start the HTTP Server for the REST API
       */
      std::thread httpThread;
      httpHandler_t httpHandler;
      httpServer_t::options httpOptions(httpHandler);

      httpOptions.reuse_address(true);
      httpOptions.thread_pool(std::make_shared<boost::network::utils::thread_pool>());
      httpServer_t httpServer(httpOptions.address(restApiHost).port(restApiPort));
      httpThread = std::thread([&httpServer] { httpServer.run(); });

      cout << "HTTP Server running..." << std::endl;

377
378
379
380
381
382
      /*
       * Run (hopefully) forever...
       */
      keepRunning = 1;
      timeval start, end;
      double elapsed;
383
384
      
      cout << "Collect Agent running..." << std::endl;
385

386
387
      while(keepRunning) {
          gettimeofday(&start, NULL);
388
          sleep(60);
389
390
391
392
          /* not really thread safe but will do the job */
          gettimeofday(&end, NULL);
          elapsed = (end.tv_sec - start.tv_sec) * 1000.0;
          elapsed += (end.tv_usec - start.tv_usec) / 1000.0;
393
          float publish = msgCtr?(pmsgCtr*100.0)/msgCtr:0;
394
395
396
          if (statistics) {
              cout << "Message rate: " << (msgCtr/elapsed)*1000.0 << " messages/second (" << publish << "% PUBLISH)\n";
          }
397
          msgCtr = 0;
398
          pmsgCtr = 0;
399
400
      }

401
      cout << "Stopping...\n";
402
403

      ms.stop();
404
      cout << "MQTT Server stopped..." << std::endl;
405
406
      httpServer.stop();
      httpThread.join();
407
      cout << "HTTP Server stopped..." << std::endl;
408
      delete mySensorDataStore;
Axel Auweter's avatar
Axel Auweter committed
409
410
      dcdbConn->disconnect();
      delete dcdbConn;
411
      cout << "Collect Agent closed. Bye bye..." << std::endl;
412
  }
413
414
  catch (const exception& e) {
      cout << "Exception: " << e.what() << "\n";
415
      abrt(EXIT_FAILURE, INTERR);
416
417
  }

418
  return EXIT_SUCCESS;
419
}
420
421