Commit b9bb118b authored by Micha Mueller's avatar Micha Mueller
Browse files

Delete old sysfspusher-files

parent 848c0e3b
/*
* MQTTPusher.cpp
*
* Created on: 27 Jan 2017
* Author: ottmi
*/
#include "MQTTPusher.h"
#include "SYSFSSensor.h"
#include <boost/foreach.hpp>
#include <iostream>
#include <string>
#include <unistd.h>
#include <mosquitto.h>
extern volatile int keepRunning;
MQTTPusher::MQTTPusher(SYSFSConfiguration& cfg) :
_cfg(cfg), _connected(false) {
//print some info
int mosqMajor, mosqMinor, mosqRevision;
mosquitto_lib_version(&mosqMajor, &mosqMinor, &mosqRevision);
std::cout << "mosquitto " << mosqMajor << "." << mosqMinor << "." << mosqRevision << std::endl;
char hostname[256];
if (gethostname(hostname, 255) != 0) {
fprintf(stderr, "Cannot get hostname.\n");
exit(EXIT_FAILURE);
}
hostname[255] = '\0';
std::cout << "Hostname: " << hostname << std::endl;
mosquitto_lib_init();
std::string clientID(_cfg._global.mqttPrefix);
_mosq = mosquitto_new(_cfg._global.mqttPrefix.c_str(), false, NULL);
if (!_mosq) {
perror(NULL);
exit(EXIT_FAILURE);
}
}
MQTTPusher::~MQTTPusher() {
if(_connected) {
mosquitto_disconnect(_mosq);
}
}
void MQTTPusher::push() {
while (keepRunning && !_connected) {
if (mosquitto_connect(_mosq, _cfg._global.brokerHost.c_str(), _cfg._global.brokerPort, 1000) != MOSQ_ERR_SUCCESS) {
std::cout << "Mosquitto: could not connect to MQTT broker "
<< _cfg._global.brokerHost << ":"
<< _cfg._global.brokerPort << std::endl;
sleep(1);
} else {
_connected = true;
}
}
reading_t* reads = new reading_t[1024];
std::size_t totalCount = 0;
while (keepRunning || totalCount) {
totalCount = 0;
BOOST_FOREACH(SYSFSSensor& sensor, _cfg._sensors) {
if (sensor.getSizeOfReadingQueue() >= sensor.getMinValues()) {
if (!_connected) {
if (mosquitto_reconnect(_mosq) != MOSQ_ERR_SUCCESS) {
std::cout << "mosquitto: could not reconnect to MQTT broker " << _cfg._global.brokerHost << ":" << _cfg._global.brokerPort << std::endl;
sleep(1);
} else {
_connected = true;
}
}
if (_connected) {
std::size_t count = sensor.popReadingQueue(reads, 1024);
totalCount+= count;
#ifdef DEBUG
std::cout << sensor.getName() << " has read " << count << " values:" << std::endl;
for (std::size_t i=0; i<count; i++) {
std::cout << " " << reads[i].timestamp << " " << reads[i].value << std::endl;
}
#endif
if (mosquitto_publish(_mosq, NULL, (_cfg._global.mqttPrefix+sensor.getMqtt()).c_str(), sizeof(reading_t)*count, reads, 0, false) != MOSQ_ERR_SUCCESS) {
std::cerr << "mosquitto: could not send message! Trying again later" << std::endl;
mosquitto_disconnect(_mosq);
_connected = false;
sensor.pushReadingQueue(reads, count);
sleep(5);
break;
}
}
}
}
sleep(1);
}
}
/*
* MQTTPusher.h
*
* Created on: 27 Jan 2017
* Author: ottmi
*/
#ifndef MQTTPUSHER_H_
#define MQTTPUSHER_H_
#include "SYSFSConfiguration.h"
#include <mosquitto.h>
class MQTTPusher {
public:
MQTTPusher(SYSFSConfiguration& cfg);
virtual ~MQTTPusher();
void push();
private:
SYSFSConfiguration& _cfg;
struct mosquitto* _mosq;
bool _connected;
};
#endif /* MQTTPUSHER_H_ */
TARGET = sysfspusher
DCDBBASEPATH ?= $(realpath $(dir $(lastword $(MAKEFILE_LIST)))/..)
DCDBCOREPATH ?= $(DCDBBASEPATH)/dcdb
include $(DCDBCOREPATH)/common.mk
CXXFLAGS = -std=c++0x -O2 -g -Wall -Werror -Wno-unused-local-typedefs -Wno-unknown-warning-option -Wno-deprecated-declarations -I$(DCDBDEPLOYPATH)/include -I$(DCDBBASEPATH)/include
CXXFLAGS = -O2 -g -Wall -Wno-unused-function $(CXX11FLAGS) -I$(DCDBBASEPATH)/dcdb/include -I$(DCDBDEPLOYPATH)/include
LIBS = -L../deps/mosquitto_build/lib -L$(DCDBDEPLOYPATH)/lib/ -lmosquitto -lboost_system -lboost_thread
OBJS = sysfspusher.o SYSFSConfiguration.o SYSFSSensor.o MQTTPusher.o
$(TARGET): $(OBJS)
$(CXX) -o $(TARGET) $(OBJS) $(LIBS)
all: $(TARGET)
clean:
rm -f $(OBJS) $(TARGET)
install: $(TARGET)
install $(TARGET) $(DCDBDEPLOYPATH)/bin/
install -m 644 $(TARGET).conf $(DCDBDEPLOYPATH)/etc
/*
* SYSFSConfiguration.cpp
*
* Created on: 18.11.2017
* Author: Micha Mueller
*/
#include "SYSFSConfiguration.h"
#include <iostream>
#include "SYSFSSensor.h"
#include <boost/foreach.hpp>
#include <boost/property_tree/info_parser.hpp>
#include <boost/algorithm/string.hpp>
using namespace std;
SYSFSConfiguration::SYSFSConfiguration(string cfgFile) {
//set default values for global variables
_global.brokerHost = "";
_global.brokerPort = 1883;
_global.mqttPrefix = "";
boost::property_tree::iptree cfg;
boost::property_tree::read_info(cfgFile, cfg);
//read global variables
BOOST_FOREACH(boost::property_tree::iptree::value_type &global, cfg.get_child("global")) {
cout << global.first << " " << global.second.data() << endl;
if(boost::iequals(global.first, "mqttBroker")) {
_global.brokerHost = global.second.data();
size_t pos = _global.brokerHost.find(":");
if (pos != string::npos) {
_global.brokerPort = stoi(_global.brokerHost.substr(pos+1));
_global.brokerHost.erase(pos);
}
} else if(boost::iequals(global.first, "mqttprefix")) {
_global.mqttPrefix = global.second.data();
if (_global.mqttPrefix[_global.mqttPrefix.length()-1] != '/') {
_global.mqttPrefix.append("/");
}
} else if (boost::iequals(global.first, "threads")) {
_global.threads = stoi(global.second.data());
} else {
cout << " Value \"" << global.first << "\" not recognized. Omitting..." << endl;
}
}
//read template sensors
BOOST_FOREACH(boost::property_tree::iptree::value_type &sensor, cfg.get_child("SensorTemplate")) {
if (boost::iequals(sensor.first, "sensor")) {
cout << "Template Sensor \"" << sensor.second.data() << "\"" << endl;
if (!sensor.second.empty()) {
SYSFSSensor sysfsSensor(sensor.second.data());
readSensor(sysfsSensor, sensor.second);
_templateSensors.insert(sensorMap_t::value_type(sysfsSensor.getName(), sysfsSensor));
}
}
}
//read one sensor at a time
BOOST_FOREACH(boost::property_tree::iptree::value_type &sensor, cfg.get_child("sensors")) {
if (boost::iequals(sensor.first, "sensor")) {
cout << "Sensor \"" << sensor.second.data() << "\"" << endl;
if (!sensor.second.empty()) {
SYSFSSensor sysfsSensor(sensor.second.data());
//first check if default sensor is given
boost::optional<boost::property_tree::iptree&> defaultS = sensor.second.get_child_optional("default");
if(defaultS) {
cout << " Using \"" << defaultS.get().data() << "\" as default." << endl;
sensorMap_t::iterator it = _templateSensors.find(defaultS.get().data());
if(it != _templateSensors.end()) {
sysfsSensor = it->second;
sysfsSensor.setName(sensor.second.data());
} else {
cout << "Template sensor \"" << defaultS.get().data() << "\" not found! Using standard values." << endl;
}
}
//read remaining values
readSensor(sysfsSensor, sensor.second);
_sensors.push_back(sysfsSensor);
}
}
}
}
SYSFSConfiguration::~SYSFSConfiguration() {
// TODO Auto-generated destructor stub
}
void SYSFSConfiguration::readSensor(SYSFSSensor& sensor, boost::property_tree::iptree& config) {
BOOST_FOREACH(boost::property_tree::iptree::value_type &s, config) {
if (boost::iequals(s.first, ("path"))) {
sensor.setPath(s.second.data());
} else if (boost::iequals(s.first, "interval")) {
sensor.setInterval(stoi(s.second.data()));
} else if (boost::iequals(s.first, "mqttsuffix")) {
sensor.setMqtt(s.second.data());
} else if (boost::iequals(s.first, "minValues")) {
sensor.setMinValues(stoull(s.second.data()));
} else if (boost::iequals(s.first, "filter")) {
sensor.setFilter(true);
string input = s.second.data();
// Notes on regexes:
// 1. For substitution sed syntax ("s/.../.../") is used. Therefore extended regular expressions (ERE) are used as regex-syntax.
// ERE is closest to Basic RE (BRE), which is actually used by sed, but requires less escaping.
// 2. If a \ ("backslash") is needed in the regex (for escaping), always use \\ ("double backslash") as
// the regex is read in as string and strings also escape with backslash
// 3. Boost uses whitespaces as separators in his property trees. Either use [[:space:]] in the regex or put it in quotation marks ("")
// 4. To be able to reference parts of the match (for substitution) use groups.
// Groups are created with parentheses.
// 5. If using character classes like [[:digit:]] always make sure to use double brackets ("[[" and "]]") or they are not recognized.
// See "https://www.gnu.org/software/sed/manual/html_node/ERE-syntax.html#ERE-syntax" for ERE-syntax
// See "http://www.boost.org/doc/libs/1_65_1/libs/regex/doc/html/boost_regex/format/sed_format.html" for more info substitution syntax.
//check if input has sed format of "s/.../.../" for substitution
regex checkSubstitute("s([^\\\\]{1})([\\S|\\s]*)\\1([\\S|\\s]*)\\1");
smatch matchResults;
if(regex_match(input, matchResults, checkSubstitute)) {
//input has substitute format
#ifdef DEBUG
cout << " Init Regex with " << matchResults[2].str() << endl;
cout << " Substitution: " << matchResults[3].str() << endl;
#endif
sensor.setRegex(regex(matchResults[2].str(), regex_constants::extended));
sensor.setSubstitution(matchResults[3].str());
} else {
//input is only a regex
#ifdef DEBUG
cout << " Init Regex with " << input << endl;
#endif
sensor.setRegex(regex(input, regex_constants::extended));
sensor.setSubstitution("&");
}
} else if (boost::iequals(s.first, "default")) {
//avoid unnecessary "Value not recognized" message
} else {
cout << " Value \"" << s.first << "\" not recognized. Omitting..." << endl;
}
}
cout << " Path : " << sensor.getPath() << endl;
cout << " MQTT : " << sensor.getMqtt() << endl;
cout << " Interval: " << sensor.getInterval() << endl;
cout << " minValues:" << sensor.getMinValues() << endl;
if (sensor.hasFilter()) {
//regex cannot be converted back to string
cout << " Using regular expression to filter data" << endl;
}
return;
}
/*
* SYSFSConfiguration.h
*
* Created on: 18.11.2017
* Author: Micha Mueller
*/
#ifndef SYSFSCONFIGURATION_H_
#define SYSFSCONFIGURATION_H_
#include <vector>
#include <string>
#include <map>
#include <boost/property_tree/ptree.hpp>
class SYSFSSensor;
class SYSFSConfiguration {
typedef std::vector<SYSFSSensor> sensorVector_t;
typedef std::map<std::string, SYSFSSensor> sensorMap_t;
typedef struct {
int brokerPort;
std::string brokerHost;
std::string mqttPrefix;
uint32_t threads;
} global_t;
public:
/**
* Read the configuration for sysfsPusher.
* @param cfgFile Name of the configuration file.
*/
SYSFSConfiguration(std::string cfgFile);
virtual ~SYSFSConfiguration();
sensorVector_t _sensors;
global_t _global;
private:
/**
* Set the variables of sensor according to the values specified in config.
* @param sensor The sensor to be configured
* @param config A property(sub)tree containing the values
*/
void readSensor(SYSFSSensor& sensor, boost::property_tree::iptree& config);
sensorMap_t _templateSensors;
};
#endif /* SYSFSCONFIGURATION_H_ */
/*
* SYSFSSensor.cpp
*
* Created on: 18.11.2017
* Author: Micha Mueller
*/
#include "SYSFSSensor.h"
#include "timestamp.h"
#include <functional>
extern volatile int keepRunning;
using namespace std;
SYSFSSensor::SYSFSSensor(const std::string& name) {
_name = name;
_path = "";
_mqtt = "";
_minValues = 1;
_interval = 1000;
_file = NULL;
_filter = false;
//_regx = "";
_substitution = "";
_timer = NULL;
_latestReading = 0;
_readingQueue = NULL;
}
SYSFSSensor::~SYSFSSensor() {
if (_readingQueue != NULL) {
delete _readingQueue;
_readingQueue = NULL;
}
if (_file != NULL) {
fclose(_file);
_file = NULL;
}
}
void SYSFSSensor::read() {
reading_t reading;
char buf[1024];
reading.timestamp = getTimestamp();
fseek(_file, 0, SEEK_SET);
size_t nelem = fread(buf, 1, 1024, _file);
if (nelem) {
buf[nelem-1] = 0;
try {
//filter the payload if necessary
if(_filter) {
//substitution must have sed format
//if no substitution is defined the whole regex-match is copied as is.
//parts which do not match are not copied --> filter
reading.value = stoll(regex_replace(buf, _regx, _substitution, regex_constants::format_sed | regex_constants::format_no_copy));
} else {
reading.value = stoll(buf);
}
} catch (const std::exception& e) {
#ifdef DEBUG
cout << "Sensor " << _name << ": Could not parse value!" << endl;
#endif
return;
}
#ifdef DEBUG
cout << "[" << prettyPrintTimestamp(reading.timestamp) << "] " << _name << ": \"" << reading.value << "\"" << endl;
#endif
}
_readingQueue->push(reading);
}
void SYSFSSensor::readAsync() {
uint64_t now = getTimestamp();
read();
if (_timer != NULL && keepRunning) {
uint64_t next = now + MS_TO_NS(_interval);
_timer->expires_at(timestamp2ptime(next));
_timer->async_wait(std::bind(&SYSFSSensor::readAsync, this));
}
}
void SYSFSSensor::startPolling(boost::asio::io_service& io) {
if(_readingQueue == NULL) {
_readingQueue = new boost::lockfree::spsc_queue<reading_t>(1024);
}
if(_file == NULL) {
_file = fopen(_path.c_str(), "r");
if (_file == NULL) {
cerr << "Error opening sensor \"" << _name << "\": " << strerror(errno) << endl;
return;
}
}
_timer = new boost::asio::deadline_timer(io, boost::posix_time::seconds(0));
_timer->async_wait(std::bind(&SYSFSSensor::readAsync, this));
}
/*
* SYSFSSensor.h
*
* Created on: 18.11.2017
* Author: Micha Mueller
*/
#ifndef SYSFSSENSOR_H_
#define SYSFSSENSOR_H_
//#define DEBUG
#include <string>
#include <regex>
#include <boost/asio.hpp>
#include <boost/lockfree/spsc_queue.hpp>
typedef struct {
uint64_t value;
uint64_t timestamp;
} reading_t;
class SYSFSSensor {
public:
SYSFSSensor(const std::string& name);
virtual ~SYSFSSensor();
const std::string& getName() const {
return _name;
}
void setName(const std::string& name) {
_name = name;
}
const std::string& getPath() const {
return _path;
}
void setPath(const std::string& path) {
_path = path;
}
const std::string& getMqtt() const {
return _mqtt;
}
void setMqtt(const std::string& mqtt) {
_mqtt = mqtt;
}
uint64_t getMinValues() const {
return _minValues;
}
void setMinValues(uint64_t minValues) {
_minValues = minValues;
}
FILE* getFile() const {
return _file;
}
void setFile(FILE* file) {
_file = file;
}
int getInterval() const {
return _interval;
}
void setInterval(int interval) {
_interval = interval;
}
bool hasFilter() const {
return _filter;
}
void setFilter(bool filter) {
_filter = filter;
}
std::regex getRegex() const {
return _regx;
}
void setRegex(std::regex regx) {
_regx = regx;
}
const std::string& getSubstitution() const {
return _substitution;
}
void setSubstitution(const std::string& substitution) {
_substitution = substitution;
}
const std::size_t getSizeOfReadingQueue() const {
return _readingQueue->read_available();
}
std::size_t popReadingQueue(reading_t *reads, std::size_t max) const {
return _readingQueue->pop(reads, max);
}
void pushReadingQueue(reading_t *reads, std::size_t count) const {
_readingQueue->push(reads, count);
}
void read();
void readAsync();
void startPolling(boost::asio::io_service& io);
private:
std::string _name;
std::string _path;
std::string _mqtt;
uint64_t _minValues;
FILE* _file;
int _interval;
bool _filter;
std::regex _regx;
std::string _substitution;
boost::asio::deadline_timer* _timer;
uint64_t _latestReading;
boost::lockfree::spsc_queue<reading_t>* _readingQueue;
};
#endif /* SYSFSSENSOR_H_ */
global {
mqttBroker localhost:1883
mqttprefix /00112233445566778899AABB0000
threads 4
}
SensorTemplate {
sensor def1 {
path /home/micha/LRZ/dcdbOwnFork/sysfspusher/temp
interval 1000
mqttsuffix 0000
filter "s/Temperature ([[:digit:]]+)\\.([[:digit:]]+)°C/\\1\\200/"
minValues 3
}
sensor def2 {
interval 2000
}
}
sensors {
sensor temp0 {
default def1
}
sensor temp1 {
path /sys/devices/virtual/thermal/thermal_zone1/temp