Commit 03cfacd2 authored by Michael Ott's avatar Michael Ott
Browse files

Introduce MQTTPusher to push sensor readings from each sensor's queue to a MQTT broker

parent 242ee80d
......@@ -21,6 +21,7 @@ namespace DCDB {
_global.threads = 1;
_global.sessionTimeout = 0;
_global.retransmissionTimeout = 0;
_global.mqttBrokerPort = 1883;
boost::property_tree::iptree cfg;
boost::property_tree::read_info(cfgFile, cfg);
......@@ -33,6 +34,13 @@ namespace DCDB {
_global.sessionTimeout = stoi(global.second.data());
} else if(boost::iequals(global.first, "RetransmissionTimeout")) {
_global.retransmissionTimeout = stoi(global.second.data());
} else if(boost::iequals(global.first, "mqttBroker")) {
_global.mqttBrokerHost = global.second.data();
std::size_t pos = _global.mqttBrokerHost.find(":");
if (pos != std::string::npos) {
_global.mqttBrokerPort = stoi(_global.mqttBrokerHost.substr(pos+1));
_global.mqttBrokerHost.erase(pos);
}
}
}
......
......@@ -24,6 +24,8 @@ namespace DCDB {
uint32_t threads;
uint32_t sessionTimeout;
uint32_t retransmissionTimeout;
std::string mqttBrokerHost;
uint16_t mqttBrokerPort;
} global_t;
public:
......
/*
* MQTTPusher.cpp
*
* Created on: 27 Jan 2017
* Author: ottmi
*/
#include "MQTTPusher.h"
#include "Configuration.h"
#include "IPMIHost.h"
#include "IPMISensor.h"
#include <boost/foreach.hpp>
#include <iostream>
#include <unistd.h>
#include <mosquitto.h>
namespace DCDB {
MQTTPusher::MQTTPusher(Configuration& cfg) : _cfg(cfg), _connected(false) {
mosquitto_lib_init();
_mosq = mosquitto_new("ipmipusher", true, NULL);
if (mosquitto_connect(_mosq, _cfg._global.mqttBrokerHost.c_str(), _cfg._global.mqttBrokerPort, 1000) != MOSQ_ERR_SUCCESS) {
throw std::runtime_error("mosquitto: could not connect to host");
} else {
_connected = true;
}
}
MQTTPusher::~MQTTPusher() {
// TODO Auto-generated destructor stub
}
void MQTTPusher::push() {
reading_t* reads = new reading_t[1024];
while (1)
BOOST_FOREACH(IPMIHost &host, _cfg._hosts) {
BOOST_FOREACH(IPMISensor& sensor, host.getSensors()) {
std::size_t count = sensor.popReadingQueue(reads, 1024);
if (count) {
if (!_connected) {
mosquitto_reconnect(_mosq);
}
#ifdef DEBUG
std::cout << sensor.getHost()->getHostName() << "::" << sensor.getName() << " " << count << " reads:" << std::endl;
#endif
for (std::size_t i=0; i<count; i++) {
#ifdef DEBUG
std::cout << " " << reads[i].timestamp << " " << reads[i].value << std::endl;
#endif
}
int rc = mosquitto_publish(_mosq, NULL, std::string(host.getMqttPrefix()+sensor.getMqttSuffix()).c_str(), sizeof(reading_t)*count, reads, 0, false) != MOSQ_ERR_SUCCESS;
if (rc != MOSQ_ERR_SUCCESS) {
throw std::runtime_error(std::string("mosquitto: ") + std::string(mosquitto_strerror(errno)));
mosquitto_disconnect(_mosq);
_connected = 0;
break;
}
}
}
sleep(5);
}
}
} /* namespace DCDB */
/*
* MQTTPusher.h
*
* Created on: 27 Jan 2017
* Author: ottmi
*/
#ifndef MQTTPUSHER_H_
#define MQTTPUSHER_H_
#include "Configuration.h"
#include <mosquitto.h>
namespace DCDB {
class MQTTPusher {
public:
MQTTPusher(Configuration& cfg);
virtual ~MQTTPusher();
void push();
private:
Configuration& _cfg;
struct mosquitto* _mosq;
bool _connected;
};
} /* namespace DCDB */
#endif /* MQTTPUSHER_H_ */
......@@ -9,9 +9,9 @@ DISTFILES_HASHES = freeipmi-1.5.5.tar.gz|b8abfefee0b757f351d8fab777e3c1bb
include $(DCDBCOREPATH)/common.mk
CXXFLAGS = -O2 -g -Wall $(CXX11FLAGS) -I$(DCDBDEPLOYPATH)/include/ -I$(DCDBBASEPATH)/include -I../deps/freeipmi-$(FREEIPMI_VERSION)/libfreeipmi/include -I../deps/freeipmi-$(FREEIPMI_VERSION)/libfreeipmi/api
OBJS = ipmipusher.o IPMISensor.o IPMIHost.o Configuration.o
LIBS = -L$(DCDBDEPLOYPATH)/lib/ -lfreeipmi -lboost_thread -lboost_regex -lboost_system -lboost_date_time
CXXFLAGS = -O2 -g -Wall -DDEBUG $(CXX11FLAGS) -I$(DCDBDEPLOYPATH)/include/ -I$(DCDBBASEPATH)/include -I../deps/freeipmi-$(FREEIPMI_VERSION)/libfreeipmi/include -I../deps/freeipmi-$(FREEIPMI_VERSION)/libfreeipmi/api
OBJS = ipmipusher.o IPMISensor.o IPMIHost.o Configuration.o MQTTPusher.o
LIBS = -L$(DCDBDEPLOYPATH)/lib/ -lfreeipmi -lmosquitto -lboost_thread -lboost_regex -lboost_system -lboost_date_time
TARGET = ipmipusher
$(DCDBDEPSPATH)/freeipmi-$(FREEIPMI_VERSION)/.installed: $(DCDBDEPSPATH)/freeipmi-$(FREEIPMI_VERSION)/.patched
......
......@@ -2,12 +2,13 @@ global {
threads 16
sessiontimeout 500
retransmissiontimeout 200
mqttBroker localhost:1883
}
sensors {
sensor energy {
type raw
freq 2000
freq 1000
cmd "0x00 0x2e 0x81 0x4d 0x4f 0x00 0x00 0x01 0x82 0x00 0x08"
start 5
stop 12
......@@ -19,7 +20,7 @@ hosts {
host localhost {
username "USERID"
password "PASSW0RD"
mqttprefix "/00/11/2233445566/778899AA"
mqttprefix "/00/11/2233445566/778899AABB"
sensors {
sensor energy {
MQTTsuffix 11223344
......
......@@ -9,6 +9,7 @@
#include "IPMIHost.h"
#include "IPMISensor.h"
#include "MQTTPusher.h"
#include "Configuration.h"
int main (int argc, char **argv) {
......@@ -31,6 +32,11 @@ int main (int argc, char **argv) {
for (std::size_t i = 0; i < cfg._global.threads; i++) {
threads.create_thread(boost::bind(&boost::asio::io_service::run, &io));
}
DCDB::MQTTPusher mqttPusher(cfg);
boost::thread mqttThread(boost::bind(&DCDB::MQTTPusher::push, &mqttPusher));
mqttThread.join();
threads.join_all();
return 0;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment