collectagent.cpp 3.66 KB
Newer Older
1
2
3
4
5
//============================================================================
// Name        : CollectAgent.cpp
// Author      : Axel Auweter
// Version     :
// Copyright   : Leibniz Supercomputing Centre
6
// Description : Main code of the CollectAgent
7
8
//============================================================================

9
#include <cstdlib>
10
#include <signal.h>
11

12
#include <boost/date_time/posix_time/posix_time.hpp>
13

14
15
#include <dcdb/sensordatastore.h>

16
#include "simplemqttserver.h"
17

18
using namespace std;
19
20

int keepRunning;
21
22
uint64_t msgCtr;
uint64_t pmsgCtr;
23
SensorDataStore *mySensorDataStore;
24

25
26
void sigHandler(int sig)
{
27
28
29
  keepRunning = 0;
}

30
31
32
33
34
void mqttCallback(SimpleMQTTMessage *msg)
{
  /*
   * Increment the msgCtr/vmsgCtr for statistics.
   */
35
  msgCtr++;
36
37
38
39
40
41
  if (msg->isPublish())
    pmsgCtr++;

  /*
   * Decode the message and put into the database.
   */
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
  if (msg->isPublish()) {
      /*
       * Calculate Time Stamp.
       */
      boost::posix_time::ptime epoch(boost::gregorian::date(1970,1,1));
      boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
      boost::posix_time::time_duration diff = now - epoch;

      uint64_t ts = diff.total_nanoseconds();

      /*
       * Check if we can decode the message topic
       * into a valid SensorId. If successful, store
       * the record in the database.
       */
      SensorId sid;
      if (mySensorDataStore->topicToSid(&sid,msg->getTopic())) {
59
#if 0
60
61
62
63
64
65
66
67
68
69
70
71
72
          cout << "Topic decode successful:"
               << "\nRaw: " << sid.raw[0] << " " << sid.raw[1]
               << "\ndatacenter_id: " << hex << (uint32_t)sid.dl.datacenter_id
               << "\ncluster_id: " << hex << (uint32_t)sid.dl.cluster_id
               << "\nrack_id_msb: " << hex << (uint32_t)sid.dl.rack_id_msb
               << "\nrack_id_lsb: " << hex << (uint32_t)sid.dl.rack_id_lsb
               << "\nchassis_id: " << hex << (uint32_t)sid.dl.chassis_id
               << "\nbic_id: " << hex << (uint32_t)sid.dl.bic_id
               << "\nbnc_id: " << hex << (uint32_t)sid.dl.bnc_id
               << "\nknc_id: " << hex << (uint32_t)sid.dl.knc_id
               << "\ndevice_id: " << hex << sid.dsid.device_id
               << "\nsensor_id: " << hex << sid.dsid.sensor_id
               << "\n";
73
74
#endif

75
          mySensorDataStore->insert(&sid, ts, *((uint64_t*)msg->getPayload()));
76
      }
77
  }
78
  delete msg;
79
80
}

81
int main(void) {
82
83

  try{
84
      /*
85
       * Catch SIGINT signals to allow for proper server shutdowns.
86
       */
87
      signal(SIGINT, sigHandler);
88
89

      /*
90
       * Allocate and initialize sensor data store.
91
       */
92
      mySensorDataStore = new SensorDataStore();
93
94
95
96

      /*
       * Start the MQTT Message Server.
       */
97
98
99
100
101
102
      SimpleMQTTServer ms;
      ms.setMessageCallback(mqttCallback);
      ms.start();

      cout << "Server running...\n";

103
104
105
106
107
108
109
      /*
       * Run (hopefully) forever...
       */
      keepRunning = 1;
      timeval start, end;
      double elapsed;

110
111
      while(keepRunning) {
          gettimeofday(&start, NULL);
112
          sleep(60);
113
114
115
116
          /* not really thread safe but will do the job */
          gettimeofday(&end, NULL);
          elapsed = (end.tv_sec - start.tv_sec) * 1000.0;
          elapsed += (end.tv_usec - start.tv_usec) / 1000.0;
117
118
          float publish = msgCtr?(pmsgCtr*100.0)/msgCtr:0;
          cout << "Message rate: " << (msgCtr/elapsed)*1000.0 << " messages/second (" << publish << "% PUBLISH)\n";
119
          msgCtr = 0;
120
          pmsgCtr = 0;
121
122
      }

123
      cout << "Stopping...\n";
124
125

      ms.stop();
126
      delete mySensorDataStore;
127
  }
128
129
  catch (const exception& e) {
      cout << "Exception: " << e.what() << "\n";
130
      exit(EXIT_FAILURE);
131
132
  }

133
  return EXIT_SUCCESS;
134
}