Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

Commit 605a3f3c authored by Micha Mueller's avatar Micha Mueller
Browse files

Switch sensor group implementation. Implies huge code restructuring.

This is the first batch of changes, more to come
parent 62aea5ef
......@@ -16,7 +16,7 @@ include $(DCDBCOREPATH)/common.mk
CXXFLAGS = -std=c++11 -DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG -DBOOST_NETWORK_ENABLE_HTTPS -O2 -g -Wall -Wno-unused-function -Wno-deprecated-declarations -Wno-unused-variable -DBOOST_LOG_DYN_LINK -I$(DCDBBASEPATH)/dcdb/include -I$(DCDBDEPLOYPATH)/include -I$(DCDBDEPSPATH)/cpp-netlib-0.12.0-final/deps/asio/asio/include
LIBS = -L../deps/mosquitto_build/lib -L$(DCDBDEPLOYPATH)/lib/ -ldl -lmosquitto -lboost_system -lboost_thread -lboost_log_setup -lboost_log -lpthread -lcrypto -lssl -lcppnetlib-server-parsers -lcppnetlib-uri -rdynamic
OBJS = src/dcdbpusher.o src/Configuration.o src/Sensor.o src/MQTTPusher.o src/HttpsServer.o src/SensorGroup.o
OBJS = src/dcdbpusher.o src/Configuration.o src/MQTTPusher.o src/HttpsServer.o
PLUGINS_BASE = libdcdbplugin_pdu libdcdbplugin_sysfs libdcdbplugin_ipmi libdcdbplugin_bacnet libdcdbplugin_snmp
ifeq ($(OS),Darwin)
......@@ -75,7 +75,7 @@ src/sensors/%.o: CXXFLAGS+= $(PLUGINFLAGS) -I$(DCDBDEPSPATH)/bacnet-stack-$(BACN
libdcdbplugin_sysfs.$(LIBEXT): src/Sensor.o src/sensors/sysfs/SysfsSensor.o src/sensors/sysfs/SysfsConfigurator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system
libdcdbplugin_perfevent.$(LIBEXT): src/Sensor.o src/SensorGroup.o src/sensors/perfevent/PerfCounter.o src/sensors/perfevent/PerfeventConfigurator.o src/sensors/perfevent/PerfCounterGroup.o
libdcdbplugin_perfevent.$(LIBEXT): src/sensors/perfevent/PerfCounter.o src/sensors/perfevent/PerfeventConfigurator.o src/sensors/perfevent/PerfCounterGroup.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system
libdcdbplugin_ipmi.$(LIBEXT): src/Sensor.o src/sensors/ipmi/IPMISensor.o src/sensors/ipmi/IPMIHost.o src/sensors/ipmi/IPMIConfigurator.o
......
......@@ -12,7 +12,6 @@
#include "HttpsServer.h"
#include "PluginDefinitions.h"
#include "Sensor.h"
#include <boost/property_tree/ptree.hpp>
#include <boost/log/trivial.hpp>
......
......@@ -8,11 +8,10 @@
#ifndef SRC_CONFIGURATORINTERFACE_H_
#define SRC_CONFIGURATORINTERFACE_H_
#include "Sensor.h"
#include "SensorGroup.h"
#include <string>
#include <vector>
#include "headers/SingleSensor.h"
#include "headers/SensorGroupTemplate.h"
typedef struct {
std::string mqttPrefix;
......@@ -31,8 +30,8 @@ public:
virtual bool readConfig(std::string cfgPath) = 0;
virtual bool reReadConfig() = 0;
virtual void setGlobalSettings(const pluginSettings_t& pluginSettings) = 0;
virtual std::vector<Sensor*>& getSensors() = 0;
virtual std::vector<SensorGroup*>& getSensorGroups() = 0;
virtual std::vector<SingleSensor*>& getSensors() = 0;
virtual std::vector<SensorGroupInterface*>& getSensorGroups() = 0;
protected:
boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
......
/*
* Configurator.h
* ConfiguratorTemplate.h
*
* Created on: 13.01.2018
* Author: Micha Mueller
......@@ -10,6 +10,10 @@
#include "ConfiguratorInterface.h"
#include "headers/SensorBase.h"
#include "headers/SingleSensor.h"
#include "headers/SensorGroupTemplate.h"
#include <map>
#include <boost/foreach.hpp>
......@@ -22,10 +26,11 @@
/**
* Non-virtual interface template for the configurators.
*/
template <typename S>
template <typename SBase, typename S>
class ConfiguratorTemplate : public ConfiguratorInterface {
//the template shall only be instantiated for classes which implement the sensor interface
static_assert(std::is_base_of<Sensor, S>::value, "S must derive from Sensor!");
//the template shall only be instantiated for classes which derive from SensorBase/SingleSensor
static_assert(std::is_base_of<SingleSensor, S>::value, "S must derive from Sensor!");
static_assert(std::is_base_of<SensorBase, SBase>::value, "SBase must derive from SensorBase!");
protected:
typedef std::map<std::string, S> sensorMap_t;
......@@ -78,7 +83,7 @@ public:
LOG(debug) << "Template Sensor \"" << sensorVal.second.data() << "\"";
if (!sensorVal.second.empty()) {
S sensor(sensorVal.second.data());
if(readSensor(sensor, sensorVal.second)) {
if(readSingleSensor(sensor, sensorVal.second)) {
_templateSensors.insert(std::pair<std::string, S>(sensor.getName(), sensor));
} else {
LOG(warning) << "Template sensor \"" << sensorVal.second.data() << "\" has bad values! Ignoring...";
......@@ -100,10 +105,10 @@ public:
bool reReadConfig() final {
//bring everything to a halt
for (auto s : _sensors) {
s->stopPolling();
s->stop();
}
for(auto g : _sensorGroups) {
g->stopPolling();
g->stop();
}
//wait until everything is halted
......@@ -152,7 +157,7 @@ public:
*
* @return Vector containing pointers to all sensors of this plugin
*/
std::vector<Sensor*>& getSensors() final {
std::vector<SingleSensor*>& getSensors() final {
return _sensors;
}
......@@ -161,23 +166,54 @@ public:
*
* @return Vector containing pointers to all sensor groups of this plugin
*/
std::vector<SensorGroup*>& getSensorGroups() {
std::vector<SensorGroupInterface*>& getSensorGroups() {
return _sensorGroups;
}
protected:
/**
* Non-virtual interface method for class-internal use only.
* Reads and sets the common base values of a sensor, then calls
* the corresponding derived function.
* Reads and sets the values for a SingleSensor. This is only a
* convenience function and only calls readSensorBase() and
* readSensorInterface() itself.
*
* @param sensor The sensor for which to set the values
* @param config A boost property (sub-)tree containing the sensor values
*
* @return True on success, false otherwise
*/
bool readSingleSensor(S& sensor, boost::property_tree::iptree& config) {
return (readSensorBase(sensor, config) | readSensorInterface(sensor, config));
}
/**
* Non-virtual interface method for class-internal use only.
* Reads and sets the common base values of a sensor (currently none),
* then calls the corresponding derived function to read plugin specific
* values.
*
* @param sensor The sensor for which to set the values
* @param config A boost property (sub-)tree containing the sensor values
*
* @return True on success, false otherwise
*/
bool readSensor(S& sensor, boost::property_tree::iptree& config) {
//read in base values
bool readSensorBase(SBase& sensor, boost::property_tree::iptree& config) {
return derivedReadSensorBase(sensor, config);
}
/**
* Non-virtual interface method for class-internal use only.
* Reads and sets the common base values of a SensorInterface, then calls
* the corresponding derived function to read in plugin specific values
* (currently none).
*
* @param sensor The sensor for which to set the values
* @param config A boost property (sub-)tree containing the sensor values
*
* @return True on success, false otherwise
*/
bool readSensorInterface(SensorInterface& sensor, boost::property_tree::iptree& config) {
//read in values inherited from SensorInterface
BOOST_FOREACH(boost::property_tree::iptree::value_type &val, config) {
if (STRCMP(val, "interval")) {
sensor.setInterval(stoull(val.second.data()));
......@@ -190,7 +226,8 @@ protected:
LOG(debug) << " Interval : " << sensor.getInterval();
LOG(debug) << " minValues: " << sensor.getMinValues();
return derivedReadSensor(sensor, config);
/* return derivedReadSensorInterface(sensor, config); */
return true;
}
/**
......@@ -219,20 +256,20 @@ protected:
/**
* Pure virtual interface method, responsible for reading plugin-specific sensor
* values.
* base values.
*
* @param sensor The sensor for which to set the values
* @param config A boost property (sub-)tree containing the sensor values
*
* @return True on success, false otherwise
*/
virtual bool derivedReadSensor(S& sensor, boost::property_tree::iptree& config) = 0;
virtual bool derivedReadSensorBase(SBase& sensor, boost::property_tree::iptree& config) = 0;
std::string _cfgPath;
std::string _mqttPrefix;
unsigned int _cacheInterval;
std::vector<Sensor*> _sensors;
std::vector<SensorGroup*> _sensorGroups;
std::vector<SingleSensor*> _sensors;
std::vector<SensorGroupInterface*> _sensorGroups;
sensorMap_t _templateSensors;
};
......
......@@ -155,7 +155,7 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
for(auto s : g->getSensors()) {
group.put(s->getName(), "");
}
sensors.add_child(g->getName(), group);
sensors.add_child(g->getGroupName(), group);
}
root.add_child(p.id, sensors);
boost::property_tree::write_info(data, root);
......@@ -198,7 +198,7 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
response = "Sensor not found!";
for(auto s : p.configurator->getSensors()) {
if (s->getName() == sensor) {
response = pathStrs[0] + "::" + sensor + _httpsServer.calcAvg(s, time);
response = pathStrs[0] + "::" + sensor + _httpsServer.calcAvg(s, s->getCacheSize(), time);
connection->set_status(server::connection::ok);
break;
}
......@@ -206,7 +206,7 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
for(auto g : p.configurator->getSensorGroups()) {
for(auto s : g->getSensors()) {
if (s->getName() == sensor) {
response = pathStrs[0] + "::" + sensor + _httpsServer.calcAvg(s, time);
response = pathStrs[0] + "::" + sensor + _httpsServer.calcAvg(s, g->getCacheSize(), time);
connection->set_status(server::connection::ok);
break;
}
......@@ -242,10 +242,10 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
for(auto& p : _httpsServer._plugins) {
if (p.id == pathStrs[0]) {
for(auto s : p.configurator->getSensors()) {
s->startPolling();
s->start();
}
for(auto g : p.configurator->getSensorGroups()) {
g->startPolling();
g->start();
}
response = "Plugin " + pathStrs[0] + ": Sensors started";
connection->set_status(server::connection::ok);
......@@ -256,10 +256,10 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
for(auto& p : _httpsServer._plugins) {
if (p.id == pathStrs[0]) {
for(auto s : p.configurator->getSensors()) {
s->stopPolling();
s->stop();
}
for(auto g : p.configurator->getSensorGroups()) {
g->stopPolling();
g->stop();
}
response = "Plugin " + pathStrs[0] + ": Sensors stopped";
connection->set_status(server::connection::ok);
......@@ -287,11 +287,11 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
for(auto s : p.configurator->getSensors()) {
s->init(_httpsServer._io);
s->startPolling();
s->start();
}
for(auto g : p.configurator->getSensorGroups()) {
g->init(_httpsServer._io);
g->startPolling();
g->start();
}
//continue MQTTPusher
......@@ -341,13 +341,12 @@ HttpsServer::~HttpsServer() {
delete _server;
}
std::string HttpsServer::calcAvg(Sensor* const s, uint64_t time) {
std::string HttpsServer::calcAvg(SensorBase* const s, unsigned cacheSize, uint64_t time) {
uint64_t avg = 0;
const reading_t * const cache = s->getCache();
unsigned size = s->getCacheSize();
unsigned count = 0;
for(unsigned i = 0; i < size; i++) {
for(unsigned i = 0; i < cacheSize; i++) {
if (cache[i].timestamp > time) {
avg += cache[i].value;
count++;
......
......@@ -94,7 +94,7 @@ private:
*
* @return Response message of the form " Average of last *count* values is *avg*"
*/
std::string calcAvg(Sensor* const s, uint64_t time);
std::string calcAvg(SensorBase* const s, unsigned cacheSize, uint64_t time);
/*
* Check if the authkey is valid and has the permission requiredPerm associated
......
......@@ -106,7 +106,7 @@ void MQTTPusher::push() {
mosquitto_loop_stop(_mosq, false);
}
void MQTTPusher::sendReadings(Sensor* s, reading_t* reads, std::size_t& totalCount) {
void MQTTPusher::sendReadings(SensorBase* s, reading_t* reads, std::size_t& totalCount) {
//there was a (unintended) disconnect in the meantime --> reconnect
if (!_connected) {
LOGM(error) << "Lost connection. Reconnecting...";
......
......@@ -43,7 +43,7 @@ public:
}
private:
void sendReadings(Sensor* s, reading_t* reads, std::size_t& totalCount);
void sendReadings(SensorBase* s, reading_t* reads, std::size_t& totalCount);
int _brokerPort;
std::string _brokerHost;
......
/*
* Sensor.cpp
*
* Created on: 12.01.2018
* Author: Micha Mueller
*/
#include "Sensor.h"
#include "timestamp.h"
#include <unistd.h>
#include <functional>
Sensor::Sensor(const std::string name) :
_name(name), _mqtt(""), _keepRunning(0), _minValues(1), _interval(1000),
_cacheInterval(900000), _cacheSize(0), _cacheIndex(0), _pendingTasks(0) {
_latestValue.timestamp = 0;
_latestValue.value = 0;
_cache = NULL;
_timer = NULL;
_readingQueue = NULL;
}
Sensor::Sensor(const Sensor& other) :
_name(other._name),
_mqtt(other._mqtt),
_keepRunning(other._keepRunning),
_minValues(other._minValues),
_interval(other._interval),
_cacheInterval(other._cacheInterval),
_cacheSize(other._cacheSize),
_cacheIndex(other._cacheIndex),
_cache(other._cache),
_latestValue(other._latestValue),
_timer(other._timer),
_readingQueue(other._readingQueue) {
_pendingTasks.store(other._pendingTasks.load());
}
Sensor& Sensor::operator=(const Sensor& other) {
_name = other._name;
_mqtt = other._mqtt;
_keepRunning = other._keepRunning;
_minValues = other._minValues;
_interval = other._interval;
_cacheInterval = other._cacheInterval;
_cacheSize = other._cacheSize;
_cacheIndex = other._cacheIndex;
_pendingTasks.store(other._pendingTasks.load());
_cache = other._cache;
_latestValue.timestamp = other._latestValue.timestamp;
_latestValue.value = other._latestValue.value;
_timer = other._timer;
_readingQueue = other._readingQueue;
return *this;
}
Sensor::~Sensor() {
if(_cache) {
delete[] _cache;
}
if(_readingQueue) {
delete _readingQueue;
}
if(_timer) {
delete _timer;
}
}
const std::string& Sensor::getName() const {
return _name;
}
void Sensor::setName(const std::string& name) {
_name = name;
}
const std::string& Sensor::getMqtt() const {
return _mqtt;
}
void Sensor::setMqtt(const std::string& mqtt) {
_mqtt = mqtt;
}
unsigned Sensor::getMinValues() const {
return _minValues;
}
void Sensor::setMinValues(unsigned minValues) {
_minValues = minValues;
}
unsigned Sensor::getInterval() const {
return _interval;
}
void Sensor::setInterval(unsigned interval) {
_interval = interval;
}
unsigned Sensor::getCacheSize() const {
return _cacheSize;
}
void Sensor::setCacheInterval(unsigned cacheInterval) {
_cacheInterval = cacheInterval;
}
const reading_t * const Sensor::getCache() const {
return _cache;
}
const reading_t Sensor::getLatestValue() const {
return _latestValue;
}
const std::size_t Sensor::getSizeOfReadingQueue() const {
return _readingQueue->read_available();
}
std::size_t Sensor::popReadingQueue(reading_t *reads, std::size_t max) const {
return _readingQueue->pop(reads, max);
}
void Sensor::pushReadingQueue(reading_t *reads, std::size_t count) const {
_readingQueue->push(reads, count);
}
void Sensor::init(boost::asio::io_service& io) {
if(!_cache) {
_cacheSize = _cacheInterval / _interval + 1;
_cache = new reading_t[_cacheSize];
for(unsigned i = 0; i < _cacheSize; i++) {
_cache[i] = _latestValue; //_latestValue should equal (0,0) at this point
}
}
if(!_readingQueue) {
_readingQueue = new boost::lockfree::spsc_queue<reading_t>(1024);
}
if(!_timer) {
_timer = new boost::asio::deadline_timer(io, boost::posix_time::seconds(0));
}
}
void Sensor::storeReading(reading_t reading) {
_readingQueue->push(reading);
_cache[_cacheIndex] = reading;
_cacheIndex = (_cacheIndex + 1) % _cacheSize;
_latestValue.value = reading.value;
_latestValue.timestamp = reading.timestamp;
}
void Sensor::wait() {
while(_pendingTasks) {
sleep(1);
}
}
/*
* Sensor.h
*
* Created on: 12.01.2018
* Author: Micha Mueller
*/
#ifndef SENSOR_H_
#define SENSOR_H_
#include <string>
#include <atomic>
#include <boost/asio.hpp>
#include <boost/lockfree/spsc_queue.hpp>
#include "Logging.h"
typedef struct {
uint64_t value;
uint64_t timestamp;
} reading_t;
/**
* Abstract base class which defines the interface for concrete sensors.
* A concrete (derived) sensor must at least implement the read()-,
* readAsync()- and startPolling()-methods.
*/
class Sensor {
public:
Sensor(const std::string name);
Sensor(const Sensor& other);
Sensor& operator=(const Sensor& other);
virtual ~Sensor();
//non-overwriteable methods
const std::string& getName() const;
void setName(const std::string& name);
const std::string& getMqtt() const;
void setMqtt(const std::string& mqtt);
unsigned getMinValues() const;
void setMinValues(unsigned minValues);
unsigned getInterval() const;
void setInterval(unsigned interval);
unsigned getCacheSize() const;
void setCacheInterval(unsigned cacheInterval);
const reading_t * const getCache() const;
const reading_t getLatestValue() const;
const std::size_t getSizeOfReadingQueue() const;
std::size_t popReadingQueue(reading_t *reads, std::size_t max) const;
void pushReadingQueue(reading_t *reads, std::size_t count) const;
//can be overwritten methods
virtual void init(boost::asio::io_service& io);
virtual void storeReading(reading_t reading);
//have to be overwritten methods
virtual void read() = 0;
virtual void readAsync() = 0;
virtual void startPolling() = 0;
virtual void stopPolling() = 0;
/**
* Does a busy wait until all dispatched handlers are finished (_pendingTasks == 0)
*/
void wait();
protected:
std::string _name;
std::string _mqtt;
int _keepRunning;
unsigned int _minValues;
unsigned int _interval;
unsigned int _cacheInterval;
unsigned int _cacheSize;
unsigned int _cacheIndex;
std::atomic_uint _pendingTasks;
reading_t * _cache;
reading_t _latestValue;
boost::asio::deadline_timer* _timer;
boost::lockfree::spsc_queue<reading_t>* _readingQueue;
boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
};
#endif /* SENSOR_H_ */
/*
* SensorGroup.cpp
*
* Created on: 03.08.2018
* Author: Micha Mueller
*/
#include "SensorGroup.h"
SensorGroup::SensorGroup(const std::string name) :
_name(name), _keepRunning(0), _minValues(1), _interval(1000),
_cacheInterval(900000), _cacheSize(0), _cacheIndex(0), _pendingTasks(0) {
_timer = 0;
}
SensorGroup::SensorGroup(const SensorGroup& other) :
_name(other._name),
_keepRunning(other._keepRunning),
_minValues(other._minValues),
_interval(other._interval),
_cacheInterval(other._cacheInterval),
_cacheSize(other._cacheSize),
_cacheIndex(other._cacheIndex),
_sensors(other._sensors),
_timer(other._timer) {
_pendingTasks.store(other._pendingTasks.load());
}
SensorGroup& SensorGroup::operator=(const SensorGroup& other) {
_name = other._name;
_keepRunning = other._keepRunning;
_minValues = other._minValues;
_interval = other._interval;
_cacheInterval = other._cacheInterval;
_cacheSize = other._cacheSize;
_cacheIndex = other._cacheIndex;
_pendingTasks.store(other._pendingTasks.load());
_sensors = other._sensors;
_timer = other._timer;