Commit 5da840d6 authored by Micha Müller's avatar Micha Müller
Browse files

Pusher WIP: Refactor SensorGroups

- make use of new EntityInterface
- _keepRunning is now a bool
- extend and improve (doxygen) documentation
parent 1828a11b
......@@ -28,8 +28,8 @@
* @defgroup pusherplugins Pusher Plugins
* @ingroup pusher
*
* @brief Collection of plugin interfaces, plugin templates summarizing common logic
* and the plugins itself.
* @brief Collection of plugin interfaces, plugin templates summarizing common
* logic and the plugins itself.
*/
#ifndef SENSORGROUPINTERFACE_H_
......@@ -45,6 +45,9 @@
/**
* @brief Abstract interface defining sensor group functionality.
*
* @details Sensor groups should not implement this interface themselves but
* inherit the SensorGroupTemplate instead.
*
* @ingroup pusherplugins
*/
class SensorGroupInterface {
......@@ -54,7 +57,7 @@ public:
_groupName(groupName),
_mqttPart(""),
_sync(true),
_keepRunning(0),
_keepRunning(false),
_minValues(1),
_interval(1000),
_pendingTasks(0),
......@@ -86,79 +89,161 @@ public:
return *this;
}
///@name Getters
///@{
const std::string& getGroupName() const { return _groupName; }
const std::string& getMqttPart() const { return _mqttPart; }
bool getSync() const { return _sync; }
unsigned getMinValues() const { return _minValues; }
unsigned getInterval() const { return _interval; }
///@}
///@name Setters
///@{
void setGroupName(const std::string& groupName) { _groupName = groupName; }
void setMqttPart(const std::string& mqttPart) { _mqttPart = mqttPart; }
void setSync(bool sync) { _sync = sync; }
void setMinValues(unsigned minValues) { _minValues = minValues; }
void setInterval(unsigned interval) { _interval = interval; }
///@}
///@name Interface methods
///@{
/**
* @brief Initialize the sensor group.
*
* @details Derived classes overwriting this method must ensure to either
* call this method or reproduce its functionality.
*
* @param io IO service to initialize the timer with.
*/
virtual void init(boost::asio::io_service& io) {
_timer.reset(new boost::asio::deadline_timer(io, boost::posix_time::seconds(0)));
}
/**
* Does a busy wait until all dispatched handlers are finished (_pendingTasks == 0)
* If the wait takes longer than a reasonable amount of time we return anyway,
* to not block termination of dcdbpusher.
* @brief Start the sensor group (i.e. start collecting data).
*/
void wait() {
unsigned short retries = 3;
for (unsigned short i = 1; i <= retries; i++) {
if (_pendingTasks) {
LOG(info) << "Group " << _groupName << " not yet finished. Waiting... (" << i << "/" << retries << ")";
sleep((_interval/1000) + 1);
} else {
return;
}
}
LOG(warning) << "Group " << _groupName << " will not finish! Skipping it";
}
//can be overwritten
virtual void init(boost::asio::io_service& io) {
_timer.reset(new boost::asio::deadline_timer(io, boost::posix_time::seconds(0)));
}
virtual void printConfig(LOG_LEVEL ll) {
LOG_VAR(ll) << " Sensor Group: " << _groupName;
if (_mqttPart != "") {
LOG_VAR(ll) << " MQTT part: " << _mqttPart;
}
if (_sync) {
LOG_VAR(ll) << " Synchronized readings enabled";
} else {
LOG_VAR(ll) << " Synchronized readings disabled";
}
LOG_VAR(ll) << " minValues: " << _minValues;
LOG_VAR(ll) << " interval: " << _interval;
}
//have to be overwritten
virtual void start() = 0;
/**
* @brief Stop the sensor group (i.e. stop collecting data).
*/
virtual void stop() = 0;
/**
* @brief Add a sensor to this group.
*
* @param s Shared pointer to the sensor.
*/
virtual void pushBackSensor(SBasePtr s) = 0;
/**
* @brief Retrieve all sensors of this group.
*
* @return A std::vector with shared pointers to all associated sensors.
*/
virtual std::vector<SBasePtr>& getSensors() = 0;
/**
* @brief Print interface configuration.
*
* @details Derived classes overwriting this method must ensure to either
* call this method or reproduce its functionality.
*
* @param ll Log severity level to be used from logger.
*/
virtual void printConfig(LOG_LEVEL ll) {
LOG_VAR(ll) << " Sensor Group: " << _groupName;
if (_mqttPart != "") {
LOG_VAR(ll) << " MQTT part: " << _mqttPart;
}
if (_sync) {
LOG_VAR(ll) << " Synchronized readings enabled";
} else {
LOG_VAR(ll) << " Synchronized readings disabled";
}
LOG_VAR(ll) << " minValues: " << _minValues;
LOG_VAR(ll) << " interval: " << _interval;
}
protected:
/**
* @brief Read data for all sensors once.
*/
virtual void read() = 0;
/**
* @brief Asynchronous callback if _timer expires.
*
* @details Issues a read() and sets the timer again if _keepRunning is true.
*/
virtual void readAsync() = 0;
///@}
std::string _groupName;
std::string _mqttPart;
bool _sync;
int _keepRunning;
unsigned int _minValues;
unsigned int _interval;
std::atomic_uint _pendingTasks;
std::unique_ptr<boost::asio::deadline_timer> _timer;
LOGGER lg;
///@name Utility methods
///@{
public:
/**
* @brief Does a busy wait until all dispatched handlers are finished
* (_pendingTasks == 0).
*
* @details If the wait takes longer than a reasonable amount of time we
* return anyway, to not block termination of dcdbpusher.
*/
void wait() {
unsigned short retries = 3;
for (unsigned short i = 1; i <= retries; i++) {
if (_pendingTasks) {
LOG(info) << "Group " << _groupName << " not yet finished. Waiting... (" << i << "/" << retries << ")";
sleep((_interval/1000) + 1);
} else {
return;
}
}
LOG(warning) << "Group " << _groupName << " will not finish! Skipping it";
}
protected:
/**
* @brief Calculate timestamp for the next reading.
*
* @return Timestamp in the future to wait for
*/
uint64_t nextReadingTime() {
uint64_t now = getTimestamp();
uint64_t next;
if (_sync) {
uint64_t interval64 = static_cast<uint64_t>(_interval);
uint64_t now_ms = now / 1000 / 1000;
//synchronize all measurements with other sensors
uint64_t waitToStart = interval64 - (now_ms%interval64);
//less than 1 ms seconds is too small, so we wait the entire interval for the next measurement
if(!waitToStart ){
return (now_ms + interval64)*1000*1000;
}
return (now_ms + waitToStart)*1000*1000;
} else {
return now + MS_TO_NS(_interval);
}
}
///@}
std::string _groupName; ///< String name of this group
std::string _mqttPart; ///< MQTT part identifying this group
bool _sync; ///< Should the timer (i.e. the read cycle of this groups) be synchronized with other groups?
bool _keepRunning; ///< Continue with next reading cycle (i.e. set timer again after reading)?
unsigned int _minValues; ///< Minimum number of values a sensor should gather before they get pushed (to reduce MQTT overhead)
unsigned int _interval; ///< Reading interval cycle in milliseconds
std::atomic_uint _pendingTasks; ///< Number of currently outstanding read operations
std::unique_ptr<boost::asio::deadline_timer> _timer; ///< Time readings in a periodic interval
LOGGER lg; ///< Personal logging instance
};
//for better readability
......
......@@ -28,6 +28,7 @@
#define SENSORGROUPTEMPLATE_H_
#include "SensorGroupInterface.h"
#include "EntityInterface.h"
#include "timestamp.h"
......@@ -35,24 +36,35 @@
#include <memory>
/**
* @brief Interface template for sensor group implementations.
* @brief Interface template for sensor group implementations with entities.
*
* @details There is a partial template specialization for E = nullptr_t, i.e.
* for groups without entities. Common code has to be duplicated and
* maintained twice here and in the template specialization. However,
* having two templates for cases with and without entities greatly
* reduces code duplication in derived plugin sensor groups.
*
* @ingroup pusherplugins
*/
template <typename S>
template <class S, class E = nullptr_t>
class SensorGroupTemplate : public SensorGroupInterface {
//the template shall only be instantiated for classes which derive from SensorBase
//the template shall only be instantiated for classes which derive from SensorBase and EntityInterface respectively
static_assert(std::is_base_of<SensorBase, S>::value, "S must derive from SensorBase!");
static_assert(std::is_base_of<EntityInterface, E>::value, "E must derive from EntityInterface!");
protected:
using S_Ptr = std::shared_ptr<S>;
using E_Ptr = std::shared_ptr<E>;
public:
SensorGroupTemplate(const std::string groupName) :
SensorGroupInterface(groupName) {}
SensorGroupInterface(groupName),
_entity(nullptr) {}
SensorGroupTemplate(const SensorGroupTemplate& other) :
SensorGroupInterface(other) {
SensorGroupInterface(other),
_entity(other._entity) {
for(auto s : other._sensors) {
S_Ptr sensor = std::make_shared<S>(*s);
_sensors.push_back(sensor);
......@@ -76,10 +88,85 @@ public:
_baseSensors.push_back(sensor);
}
_entity = other._entity;
return *this;
}
virtual void pushBackSensor(SBasePtr s) override {
/**
* @brief Initialize the sensor group.
*
* @details This method must not be overwritten. See execOnInit() if custom
* actions are required during initialization. Initializes
* associated sensors and entity.
*
* @param io IO service to initialize the timer with.
*/
virtual void init(boost::asio::io_service& io) final override {
if(!_entity) {
LOG(error) << "No entity set for group " << _groupName << "! Cannot initialize group";
return;
}
SensorGroupInterface::init(io);
for(auto s : _sensors) {
s->initSensor(_interval);
}
_entity->initEntity(io);
this->execOnInit();
}
/**
* @brief Start the sensor group (i.e. start collecting data).
*
* @details This method must not be overwritten. See execOnStart() if custom
* actions are required during startup.
*/
virtual void start() final override {
if (_keepRunning) {
//we have been started already
LOG(info) << "Sensorgroup " << _groupName << " already running.";
return;
}
this->execOnStart();
if (_entity) {
_keepRunning = true;
_pendingTasks++;
_timer->async_wait(_entity->getStrand()->wrap(std::bind(&SensorGroupTemplate::readAsync, this)));
LOG(info) << "Sensorgroup " << _groupName << " started.";
} else {
LOG(error) << "No entity set for group " << _groupName << "! Cannot start polling.";
}
}
/**
* @brief Stop the sensor group (i.e. stop collecting data).
*
* @details This method must not be overwritten. See execOnStop() if custom
* actions are required during shutdown.
*/
virtual void stop() final override {
_keepRunning = false;
//cancel any outstanding readAsync()
_timer->cancel();
this->execOnStop();
LOG(info) << "Sensorgroup " << _groupName << " stopped.";
}
/**
* @brief Add a sensor to this group.
*
* @details Not intended to be overwritten.
*
* @param s Shared pointer to the sensor.
*/
virtual void pushBackSensor(SBasePtr s) final override {
//check if dynamic cast returns nullptr
if (S_Ptr dSensor = std::dynamic_pointer_cast<S>(s)) {
_sensors.push_back(dSensor);
......@@ -89,46 +176,311 @@ public:
}
}
virtual std::vector<SBasePtr>& getSensors() override { return _baseSensors; }
/**
* @brief Retrieve all sensors of this group.
*
* @details Not intended to be overwritten.
*
* @return A std::vector with shared pointers to all associated sensors.
*/
virtual std::vector<SBasePtr>& getSensors() final override { return _baseSensors; }
virtual void init(boost::asio::io_service& io) override {
SensorGroupInterface::init(io);
//TODO only call this printMethod!
/**
* @brief Print SensorGroup configuration.
*
* @details Always call %SensorGroupTemplate::printConfig() to print
* complete configuration of a sensor group. This method takes
* care of calling SensorGroupInterface::printConfig() and
* derived %printConfig() methods in correct order.
*
* @param ll Log severity level to be used from logger.
*/
virtual void printConfig(LOG_LEVEL ll) override {
//print common base attributes
SensorGroupInterface::printConfig(ll);
for(auto s : _sensors) {
s->initSensor(_interval);
}
}
//print plugin specific group attributes
this->printConfig(ll);
virtual void printConfig(LOG_LEVEL ll) override {
LOG_VAR(ll) << " Sensors:";
for(auto s : _sensors) {
s->SensorBase::printConfig(ll, lg);
s->printConfig(ll, lg);
}
//print associated sensors
LOG_VAR(ll) << " Sensors:";
for(auto s : _sensors) {
s->SensorBase::printConfig(ll, lg);
s->printConfig(ll, lg);
}
}
protected:
/** Calculate timestamp for the next reading
* @return Timestamp in the future to wait for
*/
uint64_t nextReadingTime() {
uint64_t now = getTimestamp();
uint64_t next;
if (_sync) {
uint64_t interval64 = static_cast<uint64_t>(_interval);
uint64_t now_ms = now / 1000 / 1000;
uint64_t waitToStart = interval64 - (now_ms%interval64); //synchronize all measurements with other sensors
if(!waitToStart ){ // less than 1 ms seconds is too small, so we wait the entire interval for the next measurement
return (now_ms + interval64)*1000*1000;
}
return (now_ms + waitToStart)*1000*1000;
} else {
return now + MS_TO_NS(_interval);
}
/**
* @brief Asynchronous callback if _timer expires.
*
* @details Issues a read() and sets the timer again if _keepRunning is true.
* Not intended to be overwritten.
*/
void readAsync() final override {
this->read();
if (_timer && _keepRunning) {
_timer->expires_at(timestamp2ptime(nextReadingTime()));
_pendingTasks++;
_timer->async_wait(_entity->getStrand()->wrap(std::bind(&SensorGroupTemplate::readAsync, this)));
}
_pendingTasks--;
}
std::vector<S_Ptr> _sensors;
std::vector<SBasePtr> _baseSensors;
///@name Can be overwritten
///@{
/**
* @brief Implement plugin specific actions to initialize a group here.
*
* @details If a derived class (i.e. a plugin group) requires further custom
* actions for initialization, this should be implemented here.
* %initGroup() is appropriately called by this template during
* init().
*/
virtual void execOnInit() { /* do nothing if not overwritten */ };
/**
* @brief Implement plugin specific actions to start a group here.
*
* @details If a derived class (i.e. a plugin group) requires further custom
* actions to start polling data (e.g. open a file descriptor),
* this should be implemented here. %startGroup() is appropriately
* called by this template during start().
*/
virtual void execOnStart() { /* do nothing if not overwritten */ };
/**
* @brief Implement plugin specific actions to stop a group here.
*
* @details If a derived class (i.e. a plugin group) requires further custom
* actions to stop polling data (e.g. close a file descriptor),
* this should be implemented here. %stopGroup() is appropriately
* called by this template during stop().
*/
virtual void execOnStop() { /* do nothing if not overwritten */ };
///@}
std::vector<S_Ptr> _sensors; ///< Sensors associated with this group
std::vector<SBasePtr> _baseSensors; ///< Maintain vector with SensorBase pointers for fast getSensors() implementation
E_Ptr _entity; ///< Entity this group is associated to
};
/**
* @brief Interface partial template specialization for sensor group
* implementations without entities.
*
* @details There is a general template for groups with entities. Common code
* has to be duplicated and maintained twice here and in the general
* template. However, having two templates for cases with and without
* entities greatly reduces code duplication in derived plugin sensor
* groups.
*
* @ingroup pusherplugins
*/
template <class S>
class SensorGroupTemplate<S, nullptr_t> : public SensorGroupInterface {
//the template shall only be instantiated for classes which derive from SensorBase
static_assert(std::is_base_of<SensorBase, S>::value, "S must derive from SensorBase!");
protected:
using S_Ptr = std::shared_ptr<S>;
public:
SensorGroupTemplate(const std::string groupName) :
SensorGroupInterface(groupName){}
SensorGroupTemplate(const SensorGroupTemplate& other) :
SensorGroupInterface(other){
for(auto s : other._sensors) {
S_Ptr sensor = std::make_shared<S>(*s);
_sensors.push_back(sensor);
_baseSensors.push_back(sensor);
}
}
virtual ~SensorGroupTemplate() {
_sensors.clear();
_baseSensors.clear();
}
SensorGroupTemplate& operator=(const SensorGroupTemplate& other) {
SensorGroupInterface::operator=(other);
_sensors.clear();
_baseSensors.clear();
for(auto s : other._sensors) {
S_Ptr sensor = std::make_shared<S>(*s);
_sensors.push_back(sensor);
_baseSensors.push_back(sensor);
}
return *this;
}
/**
* @brief Initialize the sensor group.
*
* @details This method must not be overwritten. See execOnInit() if custom
* actions are required during initialization. Initializes
* associated sensors and entity.
*
* @param io IO service to initialize the timer with.
*/
virtual void init(boost::asio::io_service& io) final override {
SensorGroupInterface::init(io);
for(auto s : _sensors) {
s->initSensor(_interval);
}
this->execOnInit();
}
/**
* @brief Start the sensor group (i.e. start collecting data).
*
* @details This method must not be overwritten. See execOnStart() if custom
* actions are required during startup.
*/
virtual void start() final override {
if (_keepRunning) {
//we have been started already
LOG(info) << "Sensorgroup " << _groupName << " already running.";
return;
}
this->execOnStart();
_keepRunning = true;
_pendingTasks++;
_timer->async_wait(std::bind(&SensorGroupTemplate::readAsync, this));
LOG(info) << "Sensorgroup " << _groupName << " started.";
}
/**
* @brief Stop the sensor group (i.e. stop collecting data).
*
* @details This method must not be overwritten. See execOnStop() if custom
* actions are required during shutdown.
*/
virtual void stop() final override {
_keepRunning = false;
//cancel any outstanding readAsync()
_timer->cancel();
this->execOnStop();
LOG(info) << "Sensorgroup " << _groupName << " stopped.";
}
/**