Commit da1dca74 authored by Micha Mueller's avatar Micha Mueller
Browse files

Refactor handling of sensors: instead of all together we now store them plugin-wise

parent 9b61ed64
global {
mqttPrefix /FF112233445566778899FFFF
address_cache ../../test.cache
address_cache ./test.cache
interface eth0
port 22222
timeout 1000
......
......@@ -6,7 +6,6 @@
*/
#include "Configuration.h"
#include <iostream>
#include <string>
#include <unistd.h>
#include <dlfcn.h>
......@@ -144,9 +143,10 @@ bool Configuration::read() {
//open plugin
//dl-code based on http://tldp.org/HOWTO/C++-dlopen/thesolution.html
ifstream pluginStream(pluginConfig.c_str());
if (pluginStream.good()) {
if (FILE *file = fopen(pluginConfig.c_str(), "r")) {
fclose(file);
dl_t dynLib;
dynLib.id = plugin.second.data();
dynLib.DL = NULL;
dynLib.configurator = NULL;
......@@ -179,19 +179,21 @@ bool Configuration::read() {
//set prefix to global prefix (may be overwritten)
dynLib.configurator->setGlobalSettings(_global);
//read in config
std::vector<Sensor*>& sensors = dynLib.configurator->readConfig(pluginConfig);
if (!(dynLib.configurator->readConfig(pluginConfig))) {
LOG(error) << "Plugin \"" << plugin.second.data() << "\" could not read configuration!";
return false;
}
// returning an empty vector sensor may indicate problems with the config file
if(sensors.size() == 0) {
if(dynLib.configurator->getSensors().size() == 0) {
LOG(warning) << "Plugin \"" << plugin.second.data() << "\" created no sensors!";
}
//save only pointers to the sensors; copy-constructing an dynamically loaded object may lead to segfaults
//but before storing them, check if an MQTT-suffix was assigned twice
for(auto s : sensors) {
if(checkMqtt(s->getMqtt())) {
_sensors.push_back(s);
} else {
for(auto s : dynLib.configurator->getSensors()) {
bool ok = checkMqtt(s->getMqtt());
if(!ok) {
LOG(error) << "Problematic MQTT-Topics! Please check your config files";
return false;
}
......@@ -236,6 +238,6 @@ global_t Configuration::getGlobal() const {
return _global;
}
sensorVector_t& Configuration::getSensors() {
return _sensors;
pluginVector_t& Configuration::getPlugins() {
return _plugins;
}
......@@ -17,22 +17,23 @@
#include <boost/property_tree/ptree.hpp>
#include <boost/log/trivial.hpp>
typedef std::vector<Sensor*> sensorVector_t;
typedef std::set<std::string> mqttSet_t;
/**
* Class responsible of reading the global configuration as well as loading and invoking required dynamic libraries.
*/
class Configuration {
//struct of values required for a dynamic library.
typedef struct {
std::string id;
void* DL;
Configurator* configurator;
create_t* create;
destroy_t* destroy;
} dl_t;
typedef std::vector<dl_t> pluginVector_t;
typedef std::set<std::string> mqttSet_t;
/**
* Class responsible of reading the global configuration as well as loading and invoking required dynamic libraries.
*/
class Configuration {
public:
/**
* Create new Configuration. Sets global config file to read from to cfgFile.
......@@ -77,17 +78,20 @@ public:
void setGlobal(const global_t& global);
global_t getGlobal() const;
sensorVector_t& getSensors();
/*
* Return all plugins (in form of their dl_t struct)
*/
pluginVector_t& getPlugins();
private:
std::string _cfgFilePath;
global_t _global;
sensorVector_t _sensors;
mqttSet_t _mqttTopics;
//vector which holds data of the opened plugins
std::vector<dl_t> _plugins;
pluginVector_t _plugins;
boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
};
......
......@@ -35,16 +35,21 @@ public:
/**
* Read in the given configuration
* @param cfgPath Path to the config-file
* @return Reference to the vector of sensors which were created
* @return true on success, false otherwise
*/
virtual std::vector<Sensor*>& readConfig(std::string cfgPath) = 0;
virtual bool readConfig(std::string cfgPath) = 0;
virtual void setGlobalSettings(const global_t& globalSettings) {
_mqttPrefix = globalSettings.mqttPrefix;
}
std::vector<Sensor*>& getSensors() {
return _sensors;
}
protected:
std::string _mqttPrefix;
std::vector<Sensor*> _sensors;
boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
};
......
......@@ -15,9 +15,9 @@
extern volatile int keepRunning;
MQTTPusher::MQTTPusher(int brokerPort, const std::string& brokerHost,
const std::string& mqttPrefix, sensorVector_t& sensors) :
const std::string& mqttPrefix, pluginVector_t& plugins) :
_brokerPort(brokerPort), _brokerHost(brokerHost),
_sensors(sensors),_connected(false) {
_plugins(plugins),_connected(false) {
//first print some info
int mosqMajor, mosqMinor, mosqRevision;
......@@ -73,45 +73,47 @@ void MQTTPusher::push() {
std::size_t totalCount = 0;
while (keepRunning || totalCount) {
totalCount = 0;
for(auto s : _sensors) {
if (s->getSizeOfReadingQueue() >= s->getMinValues()) {
//there was a (unintended) disconnect in the meantime --> reconnect
if (!_connected) {
LOGM(error) << "Lost connection. Reconnecting...";
if (mosquitto_reconnect(_mosq) != MOSQ_ERR_SUCCESS) {
LOGM(error) << "Could not reconnect to MQTT broker " << _brokerHost << ":" << _brokerPort << std::endl;
sleep(5);
} else {
_connected = true;
LOGM(info) << "Connection established!";
for(auto& p : _plugins) {
for(auto s : p.configurator->getSensors()) {
if (s->getSizeOfReadingQueue() >= s->getMinValues()) {
//there was a (unintended) disconnect in the meantime --> reconnect
if (!_connected) {
LOGM(error) << "Lost connection. Reconnecting...";
if (mosquitto_reconnect(_mosq) != MOSQ_ERR_SUCCESS) {
LOGM(error) << "Could not reconnect to MQTT broker " << _brokerHost << ":" << _brokerPort << std::endl;
sleep(5);
} else {
_connected = true;
LOGM(info) << "Connection established!";
}
}
}
if (_connected) {
//get all sensor values out of its queue
std::size_t count = s->popReadingQueue(reads, 1024);
totalCount+= count;
#ifdef DEBUG
LOGM(debug) << "Sending " << count << " values from " << s->getName();
#endif
#if DEBUG
for (std::size_t i=0; i<count; i++) {
LOG(debug) << " " << reads[i].timestamp << " " << reads[i].value;
}
#endif
//try to send them to the broker
if (mosquitto_publish(_mosq, NULL, (s->getMqtt()).c_str(), sizeof(reading_t)*count, reads, 1, false) != MOSQ_ERR_SUCCESS) {
//could not send them --> push the sensor values back into the queue
LOGM(error) << "Could not send message! Trying again later";
_connected = false;
s->pushReadingQueue(reads, count);
totalCount -= count;
sleep(5);
break;
if (_connected) {
//get all sensor values out of its queue
std::size_t count = s->popReadingQueue(reads, 1024);
totalCount+= count;
#ifdef DEBUG
LOGM(debug) << "Sending " << count << " values from " << s->getName();
#endif
#if DEBUG
for (std::size_t i=0; i<count; i++) {
LOG(debug) << " " << reads[i].timestamp << " " << reads[i].value;
}
#endif
//try to send them to the broker
if (mosquitto_publish(_mosq, NULL, (s->getMqtt()).c_str(), sizeof(reading_t)*count, reads, 1, false) != MOSQ_ERR_SUCCESS) {
//could not send them --> push the sensor values back into the queue
LOGM(error) << "Could not send message! Trying again later";
_connected = false;
s->pushReadingQueue(reads, count);
totalCount -= count;
sleep(5);
break;
}
}
}
}
......
......@@ -17,7 +17,7 @@
class MQTTPusher {
public:
MQTTPusher(int brokerPort, const std::string& _brokerHost,
const std::string& mqttPrefix, sensorVector_t& sensors);
const std::string& mqttPrefix, pluginVector_t& plugins);
virtual ~MQTTPusher();
void push();
......@@ -25,7 +25,7 @@ public:
private:
int _brokerPort;
std::string _brokerHost;
sensorVector_t& _sensors;
pluginVector_t& _plugins;
struct mosquitto* _mosq;
bool _connected;
......
......@@ -46,15 +46,18 @@
using namespace std;
volatile int keepRunning;
sensorVector_t sensors;
pluginVector_t plugins;
void sigintHandler(int sig) {
boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
LOG(fatal) << "Received SIGINT";
//Stop all sensors
LOG(info) << "Stopping sensors...";
for(auto s : sensors) {
s->stopPolling();
for(auto& p : plugins) {
LOG(info) << "Stop \"" << p.id << "\" plugin";
for(auto s : p.configurator->getSensors()) {
s->stopPolling();
}
}
LOG(info) << "Flushing MQTT queues...";
//Stop MQTTPusher
......@@ -259,7 +262,7 @@ int main(int argc, char** argv) {
LOG(fatal) << "Failed to read configuration!";
return 1;
}
sensors = cfg.getSensors();
plugins = cfg.getPlugins();
//give some feedback
LOG(info) << "Global Settings:";
......@@ -275,15 +278,21 @@ int main(int argc, char** argv) {
//Init all sensors
LOG(info) << "Init sensors...";
for(auto s : sensors) {
LOG(debug) << " -" << s->getName();
s->init(io);
for(auto& p : plugins) {
LOG(info) << "Init \"" << p.id << "\" plugin";
for(auto s : p.configurator->getSensors()) {
LOG(debug) << " -" << s->getName();
s->init(io);
}
}
//Start all sensors
LOG(info) << "Starting sensors...";
for(auto s : sensors) {
s->startPolling();
for(auto& p : plugins) {
LOG(info) << "Start \"" << p.id << "\" plugin";
for(auto s : p.configurator->getSensors()) {
s->startPolling();
}
}
LOG(info) << "Sensors started!";
......@@ -311,7 +320,7 @@ int main(int argc, char** argv) {
}
//MQTTPusher gets his own thread
MQTTPusher mqttPusher(globalSettings.brokerPort, globalSettings.brokerHost, globalSettings.mqttPrefix, sensors);
MQTTPusher mqttPusher(globalSettings.brokerPort, globalSettings.brokerHost, globalSettings.mqttPrefix, plugins);
boost::thread mqttThread(bind(&MQTTPusher::push, &mqttPusher));
LOG(info) << "Threads created!";
......
......@@ -24,7 +24,7 @@ BACnetConfigurator::~BACnetConfigurator() {
delete _bacClient;
}
std::vector<Sensor*>& BACnetConfigurator::readConfig(std::string cfgPath) {
bool BACnetConfigurator::readConfig(std::string cfgPath) {
boost::property_tree::iptree cfg;
boost::property_tree::read_info(cfgPath, cfg);
......@@ -66,7 +66,7 @@ std::vector<Sensor*>& BACnetConfigurator::readConfig(std::string cfgPath) {
_bacClient->init(interface, address_cache, port, timeout, apdu_timeout, apdu_retries);
} catch (const std::exception& e) {
LOG(error) << "Could not initialize BACnetClient: " << e.what();
return _sensors;
return false;
}
//read template sensors
......@@ -143,7 +143,7 @@ std::vector<Sensor*>& BACnetConfigurator::readConfig(std::string cfgPath) {
}
}
}
return _sensors;
return true;
}
bool BACnetConfigurator::readSensor(BACnetSensor& sensor, boost::property_tree::iptree& config) {
......
......@@ -30,8 +30,9 @@ public:
/**
* Read in the configuration for BACnet-sensors.
* @param cfgPath Path + name of the config-file
* @return true on success, false otherwise
*/
std::vector<Sensor*>& readConfig(std::string cfgPath);
bool readConfig(std::string cfgPath);
private:
/**
......@@ -44,7 +45,6 @@ private:
bool readSensor(BACnetSensor& sensor, boost::property_tree::iptree& config);
BACnetClient* _bacClient;
std::vector<Sensor*> _sensors;
sensorMap_t _templateSensors;
};
......
......@@ -28,7 +28,7 @@ IPMIConfigurator::~IPMIConfigurator() {
}
}
std::vector<Sensor*>& IPMIConfigurator::readConfig(std::string cfgPath) {
bool IPMIConfigurator::readConfig(std::string cfgPath) {
boost::property_tree::iptree cfg;
boost::property_tree::read_info(cfgPath, cfg);
......@@ -116,7 +116,7 @@ std::vector<Sensor*>& IPMIConfigurator::readConfig(std::string cfgPath) {
}
}
}
return _sensors;
return true;
}
bool IPMIConfigurator::readSensor(DCDB::IPMISensor& sensor, boost::property_tree::iptree& config) {
......
......@@ -33,7 +33,7 @@ namespace DCDB {
IPMIConfigurator();
virtual ~IPMIConfigurator();
std::vector<Sensor*>& readConfig(std::string cfgPath);
bool readConfig(std::string cfgPath);
//Overwrite from Configurator
virtual void setGlobalSettings(const global_t& globalSettings) {
......@@ -52,7 +52,6 @@ namespace DCDB {
bool readSensor(IPMISensor& sensor, boost::property_tree::iptree& config);
std::string _tempdir;
std::vector<Sensor*> _sensors;
sensorMap_t _templateSensors;
hostList_t _hosts;
globalHost_t _globalHost;
......
......@@ -27,7 +27,7 @@ PDUConfigurator::~PDUConfigurator() {
}
}
std::vector<Sensor*>& PDUConfigurator::readConfig(std::string cfgPath) {
bool PDUConfigurator::readConfig(std::string cfgPath) {
boost::property_tree::iptree cfg;
boost::property_tree::read_info(cfgPath, cfg);
......@@ -123,7 +123,7 @@ std::vector<Sensor*>& PDUConfigurator::readConfig(std::string cfgPath) {
}
}
}
return _sensors;
return true;
}
bool PDUConfigurator::readSensor(PDUSensor& sensor, boost::property_tree::iptree& config) {
......
......@@ -30,9 +30,9 @@ public:
/**
* Read in the configuration for PDU-sensors.
* @param cfgPath Path + name of the config-file
* @return Vector with pointers to every created sensors
* @return true on success, false otherwise
*/
std::vector<Sensor*>& readConfig(std::string cfgPath);
bool readConfig(std::string cfgPath);
private:
/**
......@@ -50,7 +50,6 @@ private:
*/
void parsePathString(PDUSensor& sensor, const std::string& pathString);
std::vector<Sensor*> _sensors;
sensorMap_t _templateSensors;
pduList_t _pdus;
};
......
......@@ -61,7 +61,7 @@ PerfeventConfigurator::~PerfeventConfigurator() {
}
}
std::vector<Sensor*>& PerfeventConfigurator::readConfig(std::string cfgPath) {
bool PerfeventConfigurator::readConfig(std::string cfgPath) {
//read template counters
boost::property_tree::iptree cfg;
......@@ -165,7 +165,7 @@ std::vector<Sensor*>& PerfeventConfigurator::readConfig(std::string cfgPath) {
}
}
}
return _sensors;
return true;
}
bool PerfeventConfigurator::readCounter(PerfCounter& counter, boost::property_tree::iptree& config) {
......
......@@ -30,8 +30,9 @@ public:
/**
* Read the configuration for perfpusher.
* @param cfgPath Path + name of the configuration file
* @return true on success, false otherwise
*/
std::vector<Sensor*>& readConfig(std::string cfgPath);
bool readConfig(std::string cfgPath);
private:
/**
......@@ -63,7 +64,6 @@ private:
*/
std::set<int> parseCpuString(const std::string& cpuString);
std::vector<Sensor*> _sensors;
counterMap_t _templateCounters;
templateCpuMap_t _templateCpus;
......
......@@ -24,7 +24,7 @@ SysfsConfigurator::~SysfsConfigurator() {
}
}
std::vector<Sensor*>& SysfsConfigurator::readConfig(std::string cfgPath) {
bool SysfsConfigurator::readConfig(std::string cfgPath) {
boost::property_tree::iptree cfg;
boost::property_tree::read_info(cfgPath, cfg);
......@@ -82,7 +82,7 @@ std::vector<Sensor*>& SysfsConfigurator::readConfig(std::string cfgPath) {
}
}
}
return _sensors;
return true;
}
void SysfsConfigurator::readSensor(SysfsSensor& sensor, boost::property_tree::iptree& config) {
......
......@@ -27,8 +27,9 @@ public:
/**
* Read in the configuration for sysfs-sensors.
* @param cfgPath Path + name of the config-file
* @return true on success, false otherwise
*/
std::vector<Sensor*>& readConfig(std::string cfgPath);
bool readConfig(std::string cfgPath);
private:
/**
......@@ -38,7 +39,6 @@ private:
*/
void readSensor(SysfsSensor& sensor, boost::property_tree::iptree& config);
std::vector<Sensor*> _sensors;
sensorMap_t _templateSensors;
};
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment