Commit 852271ce authored by Micha Mueller's avatar Micha Mueller
Browse files

Copy and adapt MQTTPusher from ipmipusher

Minor fixes to SYSFSSensor
parent 774c7e45
/*
* MQTTPusher.cpp
*
* Created on: 27 Jan 2017
* Author: ottmi
*/
#include "MQTTPusher.h"
#include "SYSFSSensor.h"
#include <boost/foreach.hpp>
#include <iostream>
#include <unistd.h>
#include <mosquitto.h>
extern volatile int keepRunning;
MQTTPusher::MQTTPusher(SYSFSConfiguration& cfg) :
_cfg(cfg), _connected(false) {
mosquitto_lib_init();
_mosq = mosquitto_new("sysfspusher", true, NULL);
}
MQTTPusher::~MQTTPusher() {
// TODO Auto-generated destructor stub
}
void MQTTPusher::push() {
while (keepRunning && !_connected) {
if (mosquitto_connect(_mosq, _cfg._global.brokerHost.c_str(), _cfg._global.brokerPort, 1000) != MOSQ_ERR_SUCCESS) {
std::cout << "mosquitto: could not connect to MQTT broker "
<< _cfg._global.brokerHost << ":"
<< _cfg._global.brokerPort << std::endl;
sleep(1);
} else {
_connected = true;
}
}
reading_t* reads = new reading_t[1024];
std::size_t totalCount = 0;
while (keepRunning || totalCount) {
totalCount = 0;
BOOST_FOREACH(SYSFSSensor& sensor, _cfg._sensors) {
if (sensor.getSizeOfReadingQueue()) {
if (!_connected) {
if (mosquitto_reconnect(_mosq) != MOSQ_ERR_SUCCESS) {
std::cout << "mosquitto: could not reconnect to MQTT broker " << _cfg._global.brokerHost << ":" << _cfg._global.brokerPort << std::endl;
} else {
_connected = true;
}
}
if (_connected) {
std::size_t count = sensor.popReadingQueue(reads, 1024);
totalCount+= count;
#if 0
std::cout << sensor.getName() << " " << count << " reads:" << std::endl;
for (std::size_t i=0; i<count; i++) {
std::cout << " " << reads[i].timestamp << " " << reads[i].value << std::endl;
}
#endif
int rc = mosquitto_publish(_mosq, NULL, std::string(_cfg._global.mqttPrefix+sensor.getMqtt()).c_str(), sizeof(reading_t)*count, reads, 0, false) != MOSQ_ERR_SUCCESS;
if (rc != MOSQ_ERR_SUCCESS) {
throw std::runtime_error(std::string("mosquitto: ") + std::string(mosquitto_strerror(errno)));
mosquitto_disconnect(_mosq);
_connected = 0;
break;
}
}
}
}
sleep(1);
}
}
/*
* MQTTPusher.h
*
* Created on: 27 Jan 2017
* Author: ottmi
*/
#ifndef MQTTPUSHER_H_
#define MQTTPUSHER_H_
#include "SYSFSConfiguration.h"
#include <mosquitto.h>
class MQTTPusher {
public:
MQTTPusher(SYSFSConfiguration& cfg);
virtual ~MQTTPusher();
void push();
private:
SYSFSConfiguration& _cfg;
struct mosquitto* _mosq;
bool _connected;
};
#endif /* MQTTPUSHER_H_ */
......@@ -7,8 +7,8 @@ include $(DCDBCOREPATH)/common.mk
CXXFLAGS = -std=c++0x -O2 -g -Wall -Werror -Wno-unused-local-typedefs -Wno-unknown-warning-option -Wno-deprecated-declarations -I$(DCDBDEPLOYPATH)/include -I$(DCDBBASEPATH)/include
CXXFLAGS = -O2 -g -Wall -Wno-unused-function $(CXX11FLAGS) -I$(DCDBBASEPATH)/dcdb/include -I$(DCDBDEPLOYPATH)/include
LIBS = -L../deps/mosquitto_build/lib -L$(DCDBDEPLOYPATH)/lib/ -lmosquitto
OBJS = sysfspusher.o helper.o SYSFSConfiguration.o SYSFSSensor.o
LIBS = -L../deps/mosquitto_build/lib -L$(DCDBDEPLOYPATH)/lib/ -lmosquitto -lboost_system
OBJS = sysfspusher.o helper.o SYSFSConfiguration.o SYSFSSensor.o MQTTPusher.o
$(TARGET): $(OBJS)
$(CXX) -o $(TARGET) $(OBJS) $(LIBS)
......
......@@ -7,6 +7,7 @@
#include "SYSFSSensor.h"
#include "timestamp.h"
#include <functional>
extern volatile int keepRunning;
......@@ -72,20 +73,20 @@ void SYSFSSensor::read() {
}
}
if (verbose) {
#ifdef DEBUG
std::cout << "[" << prettyPrintTimestamp(reading.timestamp) << "] " << _name << ": \"" << reading.value << "\"" << endl;
}
#endif
}
_readingQueue->push(reading);
}
void SYSFSSensor::readAsync(const boost::system::error_code& /*e*/) {
void SYSFSSensor::readAsync() {
uint64_t now = getTimestamp();
read();
if (_timer != NULL && keepRunning) {
uint64_t next = now + MS_TO_NS(_interval);
_timer->expires_at(timestamp2ptime(next));
_timer->async_wait(this->readAsync);
_timer->async_wait(std::bind(&SYSFSSensor::readAsync, this));
}
}
......@@ -102,5 +103,5 @@ void SYSFSSensor::startPolling(boost::asio::io_service& io) {
}
}
_timer = new boost::asio::deadline_timer(io, boost::posix_time::seconds(0));
_timer->async_wait(this->readAsync);
_timer->async_wait(std::bind(&SYSFSSensor::readAsync, this));
}
......@@ -13,12 +13,12 @@
#include <boost/asio.hpp>
#include <boost/lockfree/spsc_queue.hpp>
class SYSFSSensor {
typedef struct {
uint64_t value;
uint64_t timestamp;
} reading_t;
typedef struct {
uint64_t value;
uint64_t timestamp;
} reading_t;
class SYSFSSensor {
public:
SYSFSSensor(const std::string& name);
......@@ -128,13 +128,17 @@ public:
_substitution = substitution;
}
const std::size_t getSizeOfReadingQueue() const {
return _readingQueue->read_available();
}
std::size_t popReadingQueue(reading_t *reads, std::size_t max) const {
return _readingQueue->pop(reads, max);
}
void read();
void readAsync(const boost::system::error_code& /*e*/);
void readAsync();
void startPolling(boost::asio::io_service& io);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment