Commit d0d9b365 authored by Micha Mueller's avatar Micha Mueller
Browse files

Use boost threads for sysfspusher

Outsourcing of sysfspusher.cpp into multiple classes is now complete
parent 852271ce
......@@ -9,6 +9,7 @@
#include "SYSFSSensor.h"
#include <boost/foreach.hpp>
#include <iostream>
#include <string>
#include <unistd.h>
#include <mosquitto.h>
......@@ -16,18 +17,38 @@ extern volatile int keepRunning;
MQTTPusher::MQTTPusher(SYSFSConfiguration& cfg) :
_cfg(cfg), _connected(false) {
//print some info
int mosqMajor, mosqMinor, mosqRevision;
mosquitto_lib_version(&mosqMajor, &mosqMinor, &mosqRevision);
std::cout << "mosquitto " << mosqMajor << "." << mosqMinor << "." << mosqRevision << std::endl;
char hostname[256];
if (gethostname(hostname, 255) != 0) {
fprintf(stderr, "Cannot get hostname.\n");
exit(EXIT_FAILURE);
}
hostname[255] = '\0';
std::cout << "Hostname: " << hostname << std::endl;
mosquitto_lib_init();
_mosq = mosquitto_new("sysfspusher", true, NULL);
std::string clientID(_cfg._global.mqttPrefix);
_mosq = mosquitto_new(_cfg._global.mqttPrefix.c_str(), false, NULL);
if (!_mosq) {
perror(NULL);
exit(EXIT_FAILURE);
}
}
MQTTPusher::~MQTTPusher() {
// TODO Auto-generated destructor stub
if(_connected) {
mosquitto_disconnect(_mosq);
}
}
void MQTTPusher::push() {
while (keepRunning && !_connected) {
if (mosquitto_connect(_mosq, _cfg._global.brokerHost.c_str(), _cfg._global.brokerPort, 1000) != MOSQ_ERR_SUCCESS) {
std::cout << "mosquitto: could not connect to MQTT broker "
std::cout << "Mosquitto: could not connect to MQTT broker "
<< _cfg._global.brokerHost << ":"
<< _cfg._global.brokerPort << std::endl;
sleep(1);
......@@ -53,14 +74,13 @@ void MQTTPusher::push() {
if (_connected) {
std::size_t count = sensor.popReadingQueue(reads, 1024);
totalCount+= count;
#if 0
std::cout << sensor.getName() << " " << count << " reads:" << std::endl;
#ifdef DEBUG
std::cout << sensor.getName() << " has read " << count << " values:" << std::endl;
for (std::size_t i=0; i<count; i++) {
std::cout << " " << reads[i].timestamp << " " << reads[i].value << std::endl;
}
#endif
int rc = mosquitto_publish(_mosq, NULL, std::string(_cfg._global.mqttPrefix+sensor.getMqtt()).c_str(), sizeof(reading_t)*count, reads, 0, false) != MOSQ_ERR_SUCCESS;
if (rc != MOSQ_ERR_SUCCESS) {
if (mosquitto_publish(_mosq, NULL, (_cfg._global.mqttPrefix+sensor.getMqtt()).c_str(), sizeof(reading_t)*count, reads, 0, false) != MOSQ_ERR_SUCCESS) {
throw std::runtime_error(std::string("mosquitto: ") + std::string(mosquitto_strerror(errno)));
mosquitto_disconnect(_mosq);
_connected = 0;
......
......@@ -7,7 +7,7 @@ include $(DCDBCOREPATH)/common.mk
CXXFLAGS = -std=c++0x -O2 -g -Wall -Werror -Wno-unused-local-typedefs -Wno-unknown-warning-option -Wno-deprecated-declarations -I$(DCDBDEPLOYPATH)/include -I$(DCDBBASEPATH)/include
CXXFLAGS = -O2 -g -Wall -Wno-unused-function $(CXX11FLAGS) -I$(DCDBBASEPATH)/dcdb/include -I$(DCDBDEPLOYPATH)/include
LIBS = -L../deps/mosquitto_build/lib -L$(DCDBDEPLOYPATH)/lib/ -lmosquitto -lboost_system
LIBS = -L../deps/mosquitto_build/lib -L$(DCDBDEPLOYPATH)/lib/ -lmosquitto -lboost_system -lboost_thread
OBJS = sysfspusher.o helper.o SYSFSConfiguration.o SYSFSSensor.o MQTTPusher.o
$(TARGET): $(OBJS)
......
......@@ -39,6 +39,8 @@ SYSFSConfiguration::SYSFSConfiguration(string cfgFile) {
if (_global.mqttPrefix[_global.mqttPrefix.length()-1] != '/') {
_global.mqttPrefix.append("/");
}
} else if (boost::iequals(global.first, "threads")) {
_global.threads = stoi(global.second.data());
} else {
cout << " Value \"" << global.first << "\" not recognized. Omitting..." << endl;
}
......
......@@ -22,6 +22,7 @@ class SYSFSConfiguration {
int brokerPort;
std::string brokerHost;
std::string mqttPrefix;
uint32_t threads;
} global_t;
public:
......
......@@ -23,7 +23,6 @@ SYSFSSensor::SYSFSSensor(const std::string& name) {
_maxI = 0;
_maxV = 0;
_file = NULL;
_nextRead = 0;
_filter = false;
//_regx = "";
_substitution = "";
......
......@@ -96,14 +96,6 @@ public:
_maxV = maxV;
}
uint64_t getNextRead() const {
return _nextRead;
}
void setNextRead(uint64_t nextRead) {
_nextRead = nextRead;
}
bool hasFilter() const {
return _filter;
}
......@@ -152,7 +144,6 @@ private:
int64_t _maxV;
FILE* _file;
int _interval;
uint64_t _nextRead;
bool _filter;
std::regex _regx;
std::string _substitution;
......
global {
mqttBroker localhost:1883
mqttprefix /00112233445566778899AABB0000
//TODO value threads
threads 4
}
defaultSensors {
......
......@@ -25,7 +25,6 @@
//================================================================================
#include <iostream>
#include <vector>
#include <sstream>
#include <cstdio>
#include <cstdlib>
......@@ -33,30 +32,26 @@
#include <csignal>
#include <cerrno>
#include <unistd.h>
#include <mosquitto.h>
#include <dcdbdaemon.h>
#include <string>
#include <regex>
#include "helper.h"
#include <functional>
#include "SYSFSSensor.h"
#include "SYSFSConfiguration.h"
#include "MQTTPusher.h"
#include <boost/foreach.hpp>
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/info_parser.hpp>
#include <boost/algorithm/string.hpp>
#include <boost/asio.hpp>
#include <boost/thread/thread.hpp>
using namespace std;
vector<SYSFSSensor> sensorList;
int brokerPort;
string brokerHost;
string mqttprefix;
volatile int keepRunning;
int verbose = 0;
int keepRunning = 1;
int daemonize = 0;
void sigHandler(int sig) {
cout << "Received SIGINT. Stopping sysfs threads and flushing MQTT queues." << endl;
keepRunning = 0;
}
......@@ -81,179 +76,66 @@ void printSyntax()
cout << endl;
}
int readConfiguration(int argc, char **argv) {
if (argc > 1) {
char c;
while ((c = getopt(argc, argv, "b:t:dvh")) != -1) {
switch (c)
{
case 'b':
brokerHost = optarg;
break;
case 't':
mqttprefix = optarg;
break;
case 'd':
daemonize = 1;
break;
case 'v':
verbose = 1;
break;
case 'h':
//printSyntax();
return 1;
break;
default:
if (c != '?') cerr << "Unknown parameter: " << c << endl;
return 1;
}
}
}
//config file values will overwrite command-line options...
SYSFSConfiguration cfg(argv[argc-1]);
brokerHost = cfg._global.brokerHost;
brokerPort = cfg._global.brokerPort;
mqttprefix = cfg._global.mqttPrefix;
sensorList = cfg._sensors;
return 0;
}
uint64_t translate(uint64_t val, uint64_t minI, uint64_t maxI, uint64_t maxV) {
uint64_t ret = 0;
if (val > minI) {
ret = (val-minI) * maxV / (maxI-minI);
}
return ret;
}
int main(int argc, char** argv) {
if (argc > 1) {
readConfiguration(argc, argv);
} else {
printSyntax();
if (argc <= 1) {
std::cout << "Please specify a config file" << std::endl;
return 1;
}
int mosqMajor, mosqMinor, mosqRevision;
mosquitto_lib_version(&mosqMajor, &mosqMinor, &mosqRevision);
cout << "mosquitto " << mosqMajor << "." << mosqMinor << "." << mosqRevision << endl;
mosquitto_lib_init();
char hostname[256];
if (gethostname(hostname, 255) != 0) {
fprintf(stderr, "Cannot get hostname.\n");
exit(EXIT_FAILURE);
}
hostname[255] = '\0';
cout << "Hostname: " << hostname << endl;
keepRunning = 1;
signal(SIGINT, sigHandler);
string clientId(mqttprefix);
struct mosquitto* mosq;
mosq = mosquitto_new(clientId.c_str(), false, NULL);
if (!mosq) {
perror(NULL);
exit(EXIT_FAILURE);
}
boost::asio::io_service io;
boost::thread_group threads;
SYSFSConfiguration cfg(argv[argc-1]);
cout << "Connecting to broker: " << brokerHost << ":" << brokerPort << endl;
if (mosquitto_connect(mosq, brokerHost.c_str(), brokerPort, 1000) != MOSQ_ERR_SUCCESS) {
perror("\nCould not connect to host");
exit(EXIT_FAILURE);
//read in optional arguments
char c;
while ((c = getopt(argc, argv, "b:t:dvh")) != -1) {
switch (c)
{
case 'b':
cfg._global.brokerHost = optarg;
break;
case 't':
cfg._global.mqttPrefix = optarg;
break;
case 'd':
daemonize = 1;
break;
case 'v':
//TODO does nothing after outsourcing functionality
//verbose information is currently toggled at compile time with DEBUG
verbose = 1;
break;
case 'h':
printSyntax();
return 1;
break;
default:
if (c != '?') cerr << "Unknown parameter: " << c << endl;
return 1;
}
}
signal(SIGINT, sigHandler);
if (daemonize) {
dcdbdaemon();
dcdbdaemon();
}
char buf[1024];
uint64_t currentTime = getMsTime();
uint64_t nextWakeUp = currentTime;
for (unsigned int i = 0; i < sensorList.size(); i++) {
sensorList[i].setFile(fopen(sensorList[i].getPath().c_str(), "r"));
if (sensorList[i].getFile() == NULL) {
cerr << "Error opening sensor \"" << sensorList[i].getName() << "\": " << strerror(errno) << endl;
}
sensorList[i].setNextRead(currentTime);
BOOST_FOREACH(SYSFSSensor& sensor, cfg._sensors) {
cout << "Starting sensor " << sensor.getName() << endl;
sensor.startPolling(io);
}
while (keepRunning) {
int64_t sleepTime = nextWakeUp - currentTime;
uint64_t payload;
if (verbose) {
cout << endl << "Next wakeup will be in " << sleepTime << "ms." << endl;
}
if (sleepTime > 0) {
if (usleep(sleepTime * 1000) && (errno == EINTR)) {
break;
}
}
currentTime = getMsTime();
for (unsigned int i = 0; i < sensorList.size(); i++) {
if (sensorList[i].getNextRead() >= nextWakeUp - sensorList[i].getInterval() / 100
&& sensorList[i].getNextRead() <= nextWakeUp + sensorList[i].getInterval() / 100) {
fseek(sensorList[i].getFile(), 0, SEEK_SET);
size_t nelem = fread(buf, 1, 1024, sensorList[i].getFile());
if (nelem) {
buf[strlen(buf)-1] = 0;
//filter the payload if necessary
if(sensorList[i].hasFilter()) {
//substitution must have sed format
//if no substitution is defined the whole regex-match is copied as is.
//parts which do not match are not copied --> filter
payload = stoll(regex_replace(buf, sensorList[i].getRegex(), sensorList[i].getSubstitution(), regex_constants::format_sed | regex_constants::format_no_copy));
} else {
payload = stoll(buf);
}
if (sensorList[i].doConvert()) {
payload = translate(payload, sensorList[i].getMinI(), sensorList[i].getMaxI(), sensorList[i].getMaxV());
}
if (verbose) {
cout << "[" << currentTime << "] " << sensorList[i].getName() << ": \"" << payload << "\""<< endl;
}
if (mosquitto_publish(mosq, NULL, (clientId + sensorList[i].getMqtt()).c_str(), sizeof(payload), &payload, 0, false) != MOSQ_ERR_SUCCESS) {
cerr << "Warning: cannot send message. Disconnecting." << endl;
sleep(10);
mosquitto_disconnect(mosq);
cout << "Connecting to broker: " << brokerHost << ":" << brokerPort << endl;
if (mosquitto_connect(mosq, brokerHost.c_str(), brokerPort, 1000) != MOSQ_ERR_SUCCESS) {
perror("\nCould not connect to host");
}
}
}
sensorList[i].setNextRead(currentTime + sensorList[i].getInterval());
if (sensorList[i].getNextRead() < nextWakeUp) {
nextWakeUp = sensorList[i].getNextRead();
}
}
}
nextWakeUp = sensorList[0].getNextRead();
for (unsigned int i = 0; i < sensorList.size(); i++) {
if (sensorList[i].getNextRead() < nextWakeUp) {
nextWakeUp = sensorList[i].getNextRead();
}
}
for(size_t i = 0; i < cfg._global.threads; i++) {
threads.create_thread(bind(static_cast< size_t (boost::asio::io_service::*) () >(&boost::asio::io_service::run), &io));
}
mosquitto_disconnect(mosq);
for (unsigned int i = 0; i < sensorList.size(); i++) {
fclose(sensorList[i].getFile());
}
MQTTPusher mqttPusher(cfg);
boost::thread mqttThread(bind(&MQTTPusher::push, &mqttPusher));
mqttThread.join();
threads.join_all();
return 0;
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment