The expiration time for new job artifacts in CI/CD pipelines is now 30 days (GitLab default). Previously generated artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

Commit 49c79894 authored by Micha Mueller's avatar Micha Mueller
Browse files

Rework sysfs-files to enable outsourcing into an dynamic library

Removed no longer required files from SYSFSSensor
Renamed SYSFS to Sysfs
Introduce Configurator-base class, derive SysfsConfigurator from base-class
parent f8705d95
global {
mqttBroker localhost:1883
mqttprefix /00112233445566778899AABB0000 //new prefix for every sensor?
mqttprefix /00112233445566778899AABB0000
threads 4
}
Sensors {
perfcounter /*optional path + filename ?*/
sysfspusher
}
......@@ -9,6 +9,7 @@
#include <iostream>
#include <string>
#include <unistd.h>
#include <dlfcn.h>
#include <boost/foreach.hpp>
#include <boost/property_tree/info_parser.hpp>
......@@ -31,7 +32,7 @@ Configuration::Configuration(const std::string& cfgFilePath) :
}
Configuration::~Configuration() {
// TODO Auto-generated destructor stub
//TODO destroy dl's
}
bool Configuration::read() {
......@@ -68,12 +69,45 @@ bool Configuration::read() {
}
}
//dl-code based on http://tldp.org/HOWTO/C++-dlopen/thesolution.html
std::string sysfsConfig = _cfgFilePath;
sysfsConfig.append("sysfs.conf");
ifstream sysfs(sysfsConfig.c_str());
if (sysfs.good()) {
cout << "sysfs.conf found" << endl;
//sysfs.conf exists. Open sysfs.so
void* sysfs = dlopen("./sysfs.so", RTLD_NOW);
if(!sysfs) {
cerr << "Cannot load sysfs-library: " << dlerror() << endl;
return false;
}
//reset errors
dlerror();
create_t* sysfsCreator = (create_t*) dlsym(sysfs, "create");
const char* dlsym_error = dlerror();
if (dlsym_error) {
cerr << "Cannot load symbol create for sysfs: " << dlsym_error << endl;
return false;
}
destroy_t* sysfsDestructor = (destroy_t*) dlsym(sysfs, "destroy");
dlsym_error = dlerror();
if (dlsym_error) {
cerr << "Cannot load symbol destroy for sysfs: " << dlsym_error << endl;
return false;
}
Configurator* sysfsConfigurator = sysfsCreator();
std::vector<Sensor>& sysfsSensors = sysfsConfigurator->readConfig(sysfsConfig);
for(auto& s : sysfsSensors) {
if(checkMqtt(s.getMqtt())) {
_sensors.push_back(&s);
} else {
cerr << "MQTT-Suffix used twice! Aborting" << endl;
}
}
}
std::string perfeventConfig = _cfgFilePath;
......@@ -82,13 +116,14 @@ bool Configuration::read() {
if (perfevent.good()) {
cout << "perfevent.conf found" << endl;
//perfevent.conf exists. Open perfevent.so
//TODO check if mqtt used twice!
}
//TODO check for more config-files
return true;
}
/*
bool Configuration::readSensorVals(Sensor& sensor, boost::property_tree::iptree& config) {
BOOST_FOREACH(boost::property_tree::iptree::value_type &val, config) {
if (boost::iequals(val.first, "interval")) {
......@@ -103,7 +138,7 @@ bool Configuration::readSensorVals(Sensor& sensor, boost::property_tree::iptree&
cout << " Interval : " << sensor.getInterval() << endl;
cout << " minValues: " << sensor.getMinValues() << endl;
return true;
}
}*/
bool Configuration::checkMqtt(const std::string& mqtt) {
auto returnIt = _mqttSuffixes.insert(mqtt);
......
......@@ -51,7 +51,8 @@ public:
* @param config A property(sub)tree containing the values
* @return True on success, false otherwise
*/
bool readSensorVals(Sensor& sensor, boost::property_tree::iptree& config);
//CURRENTLY NOT USED. EVERY SENSOR READS ALL VALUES ON ITS OWN
//bool readSensorVals(Sensor& sensor, boost::property_tree::iptree& config);
/**
* Check if the mqtt-suffix is already in use.
......
/*
* Configurator.h
*
* Created on: 13.01.2018
* Author: Micha Mueller
*/
#ifndef SRC_CONFIGURATOR_H_
#define SRC_CONFIGURATOR_H_
#include "Configuration.h"
class Configurator {
public:
Configurator() {}
virtual ~Configurator() {}
virtual void readConfig(std::string cfgPath) = 0;
};
typedef Configurator* create_t();
typedef void destroy_t(Configurator*);
#endif /* SRC_CONFIGURATOR_H_ */
TARGET = sysfspusher
DCDBBASEPATH ?= $(realpath $(dir $(lastword $(MAKEFILE_LIST)))/..)
DCDBCOREPATH ?= $(DCDBBASEPATH)/dcdb
include $(DCDBCOREPATH)/common.mk
CXXFLAGS = -std=c++0x -O2 -g -Wall -Werror -Wno-unused-local-typedefs -Wno-unknown-warning-option -Wno-deprecated-declarations -I$(DCDBDEPLOYPATH)/include -I$(DCDBBASEPATH)/include
CXXFLAGS = -O2 -g -Wall -Wno-unused-function $(CXX11FLAGS) -I$(DCDBBASEPATH)/dcdb/include -I$(DCDBDEPLOYPATH)/include
LIBS = -L../deps/mosquitto_build/lib -L$(DCDBDEPLOYPATH)/lib/ -lmosquitto -lboost_system -lboost_thread
OBJS = sysfspusher.o SYSFSConfiguration.o SYSFSSensor.o MQTTPusher.o
$(TARGET): $(OBJS)
$(CXX) -o $(TARGET) $(OBJS) $(LIBS)
all: $(TARGET)
clean:
rm -f $(OBJS) $(TARGET)
install: $(TARGET)
install $(TARGET) $(DCDBDEPLOYPATH)/bin/
install -m 644 $(TARGET).conf $(DCDBDEPLOYPATH)/etc
/*
* SYSFSConfiguration.cpp
* SysfsConfigurator.cpp
*
* Created on: 18.11.2017
* Author: Micha Mueller
*/
#include "SYSFSConfiguration.h"
#include "SysfsConfigurator.h"
#include <iostream>
#include "SYSFSSensor.h"
#include "SysfsSensor.h"
#include <boost/foreach.hpp>
#include <boost/property_tree/info_parser.hpp>
......@@ -15,43 +15,24 @@
using namespace std;
SYSFSConfiguration::SYSFSConfiguration(string cfgFile) {
//set default values for global variables
_global.brokerHost = "";
_global.brokerPort = 1883;
_global.mqttPrefix = "";
SysfsConfigurator::SysfsConfigurator() {
boost::property_tree::iptree cfg;
boost::property_tree::read_info(cfgFile, cfg);
}
//read global variables
BOOST_FOREACH(boost::property_tree::iptree::value_type &global, cfg.get_child("global")) {
cout << global.first << " " << global.second.data() << endl;
if(boost::iequals(global.first, "mqttBroker")) {
_global.brokerHost = global.second.data();
size_t pos = _global.brokerHost.find(":");
if (pos != string::npos) {
_global.brokerPort = stoi(_global.brokerHost.substr(pos+1));
_global.brokerHost.erase(pos);
}
} else if(boost::iequals(global.first, "mqttprefix")) {
_global.mqttPrefix = global.second.data();
if (_global.mqttPrefix[_global.mqttPrefix.length()-1] != '/') {
_global.mqttPrefix.append("/");
}
} else if (boost::iequals(global.first, "threads")) {
_global.threads = stoi(global.second.data());
} else {
cout << " Value \"" << global.first << "\" not recognized. Omitting..." << endl;
}
}
SysfsConfigurator::~SysfsConfigurator() {
// TODO Auto-generated destructor stub
}
std::vector<SysfsSensor>& SysfsConfigurator::readConfig(std::string cfgPath) {
boost::property_tree::iptree cfg;
boost::property_tree::read_info(cfgPath, cfg);
//read template sensors
BOOST_FOREACH(boost::property_tree::iptree::value_type &sensor, cfg.get_child("SensorTemplate")) {
if (boost::iequals(sensor.first, "sensor")) {
cout << "Template Sensor \"" << sensor.second.data() << "\"" << endl;
if (!sensor.second.empty()) {
SYSFSSensor sysfsSensor(sensor.second.data());
SysfsSensor sysfsSensor(sensor.second.data());
readSensor(sysfsSensor, sensor.second);
_templateSensors.insert(sensorMap_t::value_type(sysfsSensor.getName(), sysfsSensor));
}
......@@ -63,7 +44,7 @@ SYSFSConfiguration::SYSFSConfiguration(string cfgFile) {
if (boost::iequals(sensor.first, "sensor")) {
cout << "Sensor \"" << sensor.second.data() << "\"" << endl;
if (!sensor.second.empty()) {
SYSFSSensor sysfsSensor(sensor.second.data());
SysfsSensor sysfsSensor(sensor.second.data());
//first check if default sensor is given
boost::optional<boost::property_tree::iptree&> defaultS = sensor.second.get_child_optional("default");
......@@ -83,13 +64,10 @@ SYSFSConfiguration::SYSFSConfiguration(string cfgFile) {
}
}
}
return _sensors;
}
SYSFSConfiguration::~SYSFSConfiguration() {
// TODO Auto-generated destructor stub
}
void SYSFSConfiguration::readSensor(SYSFSSensor& sensor, boost::property_tree::iptree& config) {
void SysfsConfigurator::readSensor(SysfsSensor& sensor, boost::property_tree::iptree& config) {
BOOST_FOREACH(boost::property_tree::iptree::value_type &s, config) {
if (boost::iequals(s.first, ("path"))) {
sensor.setPath(s.second.data());
......
......@@ -8,43 +8,48 @@
#ifndef SYSFSCONFIGURATION_H_
#define SYSFSCONFIGURATION_H_
#include "SysfsSensor.h"
#include "../../Configurator.h"
#include "../../Configuration.h"
#include <vector>
#include <string>
#include <map>
#include <boost/property_tree/ptree.hpp>
class SYSFSSensor;
class SysfsConfigurator : public Configurator{
class SYSFSConfiguration {
typedef std::vector<SYSFSSensor> sensorVector_t;
typedef std::map<std::string, SYSFSSensor> sensorMap_t;
typedef struct {
int brokerPort;
std::string brokerHost;
std::string mqttPrefix;
uint32_t threads;
} global_t;
typedef std::map<std::string, SysfsSensor> sysfsSensorMap_t;
public:
SysfsConfigurator();
virtual ~SysfsConfigurator();
/**
* Read the configuration for sysfsPusher.
* @param cfgFile Name of the configuration file.
* Read in the configuration for sysfs-sensors.
* @param cfgPath Path + name of the config-file
* @param config Configuration where to store sensor-pointers and check for double used MQTT-suffixes
*/
SYSFSConfiguration(std::string cfgFile);
virtual ~SYSFSConfiguration();
sensorVector_t _sensors;
global_t _global;
std::vector<SysfsSensor>& readConfig(std::string cfgPath);
private:
/**
* Set the variables of sensor according to the values specified in config.
* @param sensor The sensor to be configured
* @param config A property(sub)tree containing the values
* @return True on success, false otherwise
*/
void readSensor(SYSFSSensor& sensor, boost::property_tree::iptree& config);
void readSensor(SysfsSensor& sensor, boost::property_tree::iptree& config);
sensorMap_t _templateSensors;
std::vector<SysfsSensor> _sensors;
sysfsSensorMap_t _templateSensors;
};
extern "C" Configurator* create() {
return new SysfsConfigurator;
}
extern "C" void destroy(Configurator* c) {
delete c;
}
#endif /* SYSFSCONFIGURATION_H_ */
......@@ -5,7 +5,7 @@
* Author: Micha Mueller
*/
#include "SYSFSSensor.h"
#include "SysfsSensor.h"
#include "timestamp.h"
#include <functional>
......@@ -13,34 +13,23 @@ extern volatile int keepRunning;
using namespace std;
SYSFSSensor::SYSFSSensor(const std::string& name) {
_name = name;
SysfsSensor::SysfsSensor(const std::string& name) :
Sensor(name) {
_path = "";
_mqtt = "";
_minValues = 1;
_interval = 1000;
_file = NULL;
_filter = false;
//_regx = "";
_substitution = "";
_timer = NULL;
_latestReading = 0;
_readingQueue = NULL;
}
SYSFSSensor::~SYSFSSensor() {
if (_readingQueue != NULL) {
delete _readingQueue;
_readingQueue = NULL;
}
SysfsSensor::~SysfsSensor() {
if (_file != NULL) {
fclose(_file);
_file = NULL;
}
}
void SYSFSSensor::read() {
void SysfsSensor::read() {
reading_t reading;
char buf[1024];
......@@ -63,7 +52,7 @@ void SYSFSSensor::read() {
}
} catch (const std::exception& e) {
#ifdef DEBUG
cout << "Sensor " << _name << ": Could not parse value!" << endl;
cout << "SysfsSensor " << _name << ": Could not parse value!" << endl;
#endif
return;
}
......@@ -75,21 +64,17 @@ void SYSFSSensor::read() {
_readingQueue->push(reading);
}
void SYSFSSensor::readAsync() {
void SysfsSensor::readAsync() {
uint64_t now = getTimestamp();
read();
if (_timer != NULL && keepRunning) {
uint64_t next = now + MS_TO_NS(_interval);
_timer->expires_at(timestamp2ptime(next));
_timer->async_wait(std::bind(&SYSFSSensor::readAsync, this));
_timer->async_wait(std::bind(&SysfsSensor::readAsync, this));
}
}
void SYSFSSensor::startPolling(boost::asio::io_service& io) {
if(_readingQueue == NULL) {
_readingQueue = new boost::lockfree::spsc_queue<reading_t>(1024);
}
void SysfsSensor::startPolling(boost::asio::io_service& io) {
if(_file == NULL) {
_file = fopen(_path.c_str(), "r");
if (_file == NULL) {
......@@ -98,5 +83,5 @@ void SYSFSSensor::startPolling(boost::asio::io_service& io) {
}
}
_timer = new boost::asio::deadline_timer(io, boost::posix_time::seconds(0));
_timer->async_wait(std::bind(&SYSFSSensor::readAsync, this));
_timer->async_wait(std::bind(&SysfsSensor::readAsync, this));
}
......@@ -8,31 +8,14 @@
#ifndef SYSFSSENSOR_H_
#define SYSFSSENSOR_H_
//#define DEBUG
#include <string>
#include "../../Sensor.h"
#include <regex>
#include <boost/asio.hpp>
#include <boost/lockfree/spsc_queue.hpp>
typedef struct {
uint64_t value;
uint64_t timestamp;
} reading_t;
class SYSFSSensor {
class SysfsSensor : public Sensor {
public:
SYSFSSensor(const std::string& name);
virtual ~SYSFSSensor();
const std::string& getName() const {
return _name;
}
void setName(const std::string& name) {
_name = name;
}
SysfsSensor(const std::string& name);
virtual ~SysfsSensor();
const std::string& getPath() const {
return _path;
......@@ -42,22 +25,6 @@ public:
_path = path;
}
const std::string& getMqtt() const {
return _mqtt;
}
void setMqtt(const std::string& mqtt) {
_mqtt = mqtt;
}
uint64_t getMinValues() const {
return _minValues;
}
void setMinValues(uint64_t minValues) {
_minValues = minValues;
}
FILE* getFile() const {
return _file;
}
......@@ -66,14 +33,6 @@ public:
_file = file;
}
int getInterval() const {
return _interval;
}
void setInterval(int interval) {
_interval = interval;
}
bool hasFilter() const {
return _filter;
}
......@@ -98,18 +57,6 @@ public:
_substitution = substitution;
}
const std::size_t getSizeOfReadingQueue() const {
return _readingQueue->read_available();
}
std::size_t popReadingQueue(reading_t *reads, std::size_t max) const {
return _readingQueue->pop(reads, max);
}
void pushReadingQueue(reading_t *reads, std::size_t count) const {
_readingQueue->push(reads, count);
}
void read();
void readAsync();
......@@ -117,20 +64,12 @@ public:
void startPolling(boost::asio::io_service& io);
private:
std::string _name;
std::string _path;
std::string _mqtt;
uint64_t _minValues;
FILE* _file;
int _interval;
bool _filter;
std::regex _regx;
std::string _substitution;
boost::asio::deadline_timer* _timer;
uint64_t _latestReading;
boost::lockfree::spsc_queue<reading_t>* _readingQueue;
};
#endif /* SYSFSSENSOR_H_ */
/*
* MQTTPusher.cpp
*
* Created on: 27 Jan 2017
* Author: ottmi
*/
#include "MQTTPusher.h"
#include "SYSFSSensor.h"
#include <boost/foreach.hpp>
#include <iostream>
#include <string>
#include <unistd.h>
#include <mosquitto.h>
extern volatile int keepRunning;
MQTTPusher::MQTTPusher(SYSFSConfiguration& cfg) :
_cfg(cfg), _connected(false) {
//print some info
int mosqMajor, mosqMinor, mosqRevision;
mosquitto_lib_version(&mosqMajor, &mosqMinor, &mosqRevision);
std::cout << "mosquitto " << mosqMajor << "." << mosqMinor << "." << mosqRevision << std::endl;
char hostname[256];
if (gethostname(hostname, 255) != 0) {
fprintf(stderr, "Cannot get hostname.\n");
exit(EXIT_FAILURE);
}
hostname[255] = '\0';
std::cout << "Hostname: " << hostname << std::endl;
mosquitto_lib_init();
std::string clientID(_cfg._global.mqttPrefix);
_mosq = mosquitto_new(_cfg._global.mqttPrefix.c_str(), false, NULL);
if (!_mosq) {
perror(NULL);
exit(EXIT_FAILURE);
}
}
MQTTPusher::~MQTTPusher() {
if(_connected) {
mosquitto_disconnect(_mosq);
}
}
void MQTTPusher::push() {
while (keepRunning && !_connected) {
if (mosquitto_connect(_mosq, _cfg._global.brokerHost.c_str(), _cfg._global.brokerPort, 1000) != MOSQ_ERR_SUCCESS) {
std::cout << "Mosquitto: could not connect to MQTT broker "
<< _cfg._global.brokerHost << ":"
<< _cfg._global.brokerPort << std::endl;
sleep(1);
} else {
_connected = true;
}
}
reading_t* reads = new reading_t[1024];
std::size_t totalCount = 0;
while (keepRunning || totalCount) {
totalCount = 0;
BOOST_FOREACH(SYSFSSensor& sensor, _cfg._sensors) {
if (sensor.getSizeOfReadingQueue() >= sensor.getMinValues()) {
if (!_connected) {
if (mosquitto_reconnect(_mosq) != MOSQ_ERR_SUCCESS) {
std::cout << "mosquitto: could not reconnect to MQTT broker " << _cfg._global.brokerHost << ":" << _cfg._global.brokerPort << std::endl;
sleep(1);
} else {
_connected = true;
}
}
if (_connected) {
std::size_t count = sensor.popReadingQueue(reads, 1024);
totalCount+= count;
#ifdef DEBUG
std::cout << sensor.getName() << " has read " << count << " values:" << std::endl;
for (std::size_t i=0; i<count; i++) {
std::cout << " " << reads[i].timestamp << " " << reads[i].value << std::endl;
}
#endif
if (mosquitto_publish(_mosq, NULL, (_cfg._global.mqttPrefix+sensor.getMqtt()).c_str(), sizeof(reading_t)*count, reads, 0, false) != MOSQ_ERR_SUCCESS) {
std::cerr << "mosquitto: could not send message! Trying again later" << std::endl;
mosquitto_disconnect(_mosq);
_connected = false;
sensor.pushReadingQueue(reads, count);
sleep(5);
break;
}
}