collectagent.cpp 4.91 KB
Newer Older
1
2
3
4
5
//============================================================================
// Name        : CollectAgent.cpp
// Author      : Axel Auweter
// Version     :
// Copyright   : Leibniz Supercomputing Centre
6
// Description : Main code of the CollectAgent
7
8
//============================================================================

9
#include <cstdlib>
10
#include <signal.h>
11
12
#include <unistd.h>
#include <string>
13

14
#include <boost/date_time/posix_time/posix_time.hpp>
15

16
17
#include <dcdb/sensordatastore.h>

18
#include "simplemqttserver.h"
19

20
using namespace std;
21
22

int keepRunning;
23
24
uint64_t msgCtr;
uint64_t pmsgCtr;
25
SensorDataStore *mySensorDataStore;
26
std::string host;
27

28

29
30
void sigHandler(int sig)
{
31
32
33
  keepRunning = 0;
}

34
35
void mqttCallback(SimpleMQTTMessage *msg)
{
36

37
38
39
  /*
   * Increment the msgCtr/vmsgCtr for statistics.
   */
40
  msgCtr++;
41
42
43
44
45
46
  if (msg->isPublish())
    pmsgCtr++;

  /*
   * Decode the message and put into the database.
   */
47
  if (msg->isPublish()) {
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
	  
	  uint64_t val;
	  uint64_t ts;
	  
	  //In the 64 bit message case, the collect agent provides a timestamp
	  if(msg->getPayloadLength()==sizeof(uint64_t)) {
		  
		  val = *((uint64_t*)msg->getPayload());
		  
		  /*
		   * Calculate Time Stamp.
		   */
           boost::posix_time::ptime epoch(boost::gregorian::date(1970,1,1));
		   boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
		   boost::posix_time::time_duration diff = now - epoch;
           
           ts = diff.total_nanoseconds();
	   }
	   
	   //...otherwise it just retrieves it from the MQTT message payload. 
	   else {
		   
		   uint64_t *payload;
		   payload = (uint64_t*)msg->getPayload();
		   
	       val = payload[0];
		   ts = payload[1];
	   }
76
77
78
79
80
81
82
83

      /*
       * Check if we can decode the message topic
       * into a valid SensorId. If successful, store
       * the record in the database.
       */
      SensorId sid;
      if (mySensorDataStore->topicToSid(&sid,msg->getTopic())) {
84
#if 0
85
86
87
88
89
90
91
92
93
94
95
96
97
          cout << "Topic decode successful:"
               << "\nRaw: " << sid.raw[0] << " " << sid.raw[1]
               << "\ndatacenter_id: " << hex << (uint32_t)sid.dl.datacenter_id
               << "\ncluster_id: " << hex << (uint32_t)sid.dl.cluster_id
               << "\nrack_id_msb: " << hex << (uint32_t)sid.dl.rack_id_msb
               << "\nrack_id_lsb: " << hex << (uint32_t)sid.dl.rack_id_lsb
               << "\nchassis_id: " << hex << (uint32_t)sid.dl.chassis_id
               << "\nbic_id: " << hex << (uint32_t)sid.dl.bic_id
               << "\nbnc_id: " << hex << (uint32_t)sid.dl.bnc_id
               << "\nknc_id: " << hex << (uint32_t)sid.dl.knc_id
               << "\ndevice_id: " << hex << sid.dsid.device_id
               << "\nsensor_id: " << hex << sid.dsid.sensor_id
               << "\n";
98
99
#endif

100
101
          //mySensorDataStore->insert(&sid, ts, *((uint64_t*)msg->getPayload()));
          mySensorDataStore->insert(&sid, ts, val);
102
      }
103
  }
104
  delete msg;
105
106
}

107
108
109
110
111
112
113
114
115
116
117
/*
 * Print usage information
 */
void usage() {
  printf("Usage: collectagent [-h <host>]\n");
  printf("Collectagent will accept remote connections by listening to the\n");
  printf("specified listen address <host> at port 1883 (default MQTT port).\n");
  printf("The default <host> is localhost.\n\n");
}

int main(int argc, char* const argv[]) {
118
119

  try{
120
      /*
121
       * Catch SIGINT signals to allow for proper server shutdowns.
122
       */
123
      signal(SIGINT, sigHandler);
124

125
126
      /* Parse command line */
      char ret;
127
128
      host="localhost";
      while ((ret=getopt(argc, argv, "h:?"))!=EOF) {
129
          switch(ret) {
130
131
132
133
134
135
136
              case 'h':
                  host = optarg;
                  break;
              case '?':
              default:
                  usage();
                  exit(EXIT_FAILURE);
137
138
139
          }
      }

140
141
142
143
144
      /*
       * Allocate and initialize sensor data store.
       */
      mySensorDataStore = new SensorDataStore();

145
146
147
      /*
       * Start the MQTT Message Server.
       */
148
149
      SimpleMQTTServer ms(host,"1883");
      
150
151
152
153
154
      ms.setMessageCallback(mqttCallback);
      ms.start();

      cout << "Server running...\n";

155
156
157
158
159
160
161
      /*
       * Run (hopefully) forever...
       */
      keepRunning = 1;
      timeval start, end;
      double elapsed;

162
163
      while(keepRunning) {
          gettimeofday(&start, NULL);
164
          sleep(60);
165
166
167
168
          /* not really thread safe but will do the job */
          gettimeofday(&end, NULL);
          elapsed = (end.tv_sec - start.tv_sec) * 1000.0;
          elapsed += (end.tv_usec - start.tv_usec) / 1000.0;
169
170
          float publish = msgCtr?(pmsgCtr*100.0)/msgCtr:0;
          cout << "Message rate: " << (msgCtr/elapsed)*1000.0 << " messages/second (" << publish << "% PUBLISH)\n";
171
          msgCtr = 0;
172
          pmsgCtr = 0;
173
174
      }

175
      cout << "Stopping...\n";
176
177

      ms.stop();
178
      delete mySensorDataStore;
179
  }
180
181
  catch (const exception& e) {
      cout << "Exception: " << e.what() << "\n";
182
      exit(EXIT_FAILURE);
183
184
  }

185
  return EXIT_SUCCESS;
186
}
187
188