Commit d1d3b520 authored by Alessio Netti's avatar Alessio Netti
Browse files

Re-worked sensor group stop mechanism

- Decoupled the stop() and wait() methods
- A stop command is issued to all sensor groups, then a wait is performed
- Wait() now does busy waiting at a 10ms frequency and times out based
on the interval of the sensor group
parent 412ac0dd
......@@ -145,9 +145,10 @@ bool PluginManager::loadPlugin(const string& name,
void PluginManager::unloadPlugin(const string& id) {
for (auto it = _plugins.begin(); it != _plugins.end(); ++it) {
if (it->id == id || id == "") {
for (const auto& g : it->configurator->getSensorGroups()) {
for (const auto& g : it->configurator->getSensorGroups())
g->stop();
}
for (const auto& g : it->configurator->getSensorGroups())
g->wait();
removeTopics(*it);
......@@ -217,10 +218,14 @@ bool PluginManager::stopPlugin(const string& id) {
for (const auto& p : _plugins) {
if (p.id == id || id == "") {
// Issuing stop command
for (const auto& g : p.configurator->getSensorGroups()) {
found = true;
g->stop();
}
// Waiting for sensor group termination
for (const auto& g : p.configurator->getSensorGroups())
g->wait();
}
}
......
......@@ -286,10 +286,12 @@ public:
*/
void clearConfig() final {
ConfiguratorInterface::clearConfig();
//bring everything to a halt
for(auto g : _sensorGroups) {
// Bring everything to a halt
for(auto g : _sensorGroups)
g->stop();
}
// Wait for stop
for(auto g : _sensorGroups)
g->wait();
//clean up sensors/groups/entitys and templates
for(auto e : _sensorEntitys) {
......@@ -874,10 +876,12 @@ public:
*/
void clearConfig() final {
ConfiguratorInterface::clearConfig();
//bring everything to a halt
for(auto g : _sensorGroups) {
// Bring everything to a halt
for(auto g : _sensorGroups)
g->stop();
}
// Wait for stop
for(auto g : _sensorGroups)
g->wait();
//clean up sensors/groups and templates
for (auto tb : _templateSensorBases) {
......
......@@ -39,6 +39,9 @@
#include <atomic>
#include <memory>
#include <boost/asio.hpp>
#include <thread>
#include <chrono>
#include "logging.h"
#include "sensorbase.h"
......@@ -131,7 +134,12 @@ public:
virtual void init(boost::asio::io_service& io) {
_timer.reset(new boost::asio::deadline_timer(io, boost::posix_time::seconds(0)));
}
/**
* @brief Waits for the termination of the sensor group.
*/
virtual void wait() = 0;
/**
* @brief Start the sensor group (i.e. start collecting data).
*/
......@@ -139,9 +147,11 @@ public:
/**
* @brief Stop the sensor group (i.e. stop collecting data).
*
* @details Must be followed by a call to the wait() method.
*/
virtual void stop() = 0;
/**
* @brief Add a sensor to this group.
*
......@@ -258,30 +268,8 @@ protected:
}
///@}
///@name Utility methods
///@name Utility methods
///@{
/**
* @brief Does a busy wait until all dispatched handlers are finished
* (_pendingTasks == 0).
*
* @details If the wait takes longer than a reasonable amount of time we
* return anyway, to not block termination of dcdbpusher.
*/
void wait() {
unsigned short retries = 3;
for (unsigned short i = 1; i <= retries; i++) {
if (_pendingTasks) {
LOG(info) << "Group " << _groupName << " not yet finished. Waiting... (" << i << "/" << retries << ")";
sleep((_interval/1000) + 1);
} else {
return;
}
}
LOG(warning) << "Group " << _groupName << " will not finish! Skipping it";
}
/**
* @brief Calculate timestamp for the next reading.
*
......
......@@ -131,6 +131,30 @@ public:
this->execOnInit();
}
/**
* @brief Does a busy wait until all dispatched handlers are finished
* (_pendingTasks == 0).
*
* @details If the wait takes longer than a reasonable amount of time we
* return anyway, to not block termination of dcdbpusher.
*/
virtual void wait() final override {
uint64_t sleepms=10, i=0;
uint64_t timeout = _interval<10000 ? 10000 : _interval;
while(sleepms*i++ < timeout) {
if (_pendingTasks)
std::this_thread::sleep_for(std::chrono::milliseconds(sleepms));
else {
this->execOnStop();
LOG(info) << "Sensorgroup " << _groupName << " stopped.";
return;
}
}
LOG(warning) << "Group " << _groupName << " will not finish! Skipping it";
}
/**
* @brief Start the sensor group (i.e. start collecting data).
*
......@@ -167,17 +191,13 @@ public:
*/
virtual void stop() final override {
if (!_keepRunning) {
LOG(info) << "Sensorgroup " << _groupName << " already stopped.";
LOG(debug) << "Sensorgroup " << _groupName << " already stopped.";
return;
}
_keepRunning = false;
//cancel any outstanding readAsync()
_timer->cancel();
wait();
this->execOnStop();
LOG(info) << "Sensorgroup " << _groupName << " stopped.";
}
/**
......@@ -335,6 +355,30 @@ public:
this->execOnInit();
}
/**
* @brief Does a busy wait until all dispatched handlers are finished
* (_pendingTasks == 0).
*
* @details If the wait takes longer than a reasonable amount of time we
* return anyway, to not block termination of dcdbpusher.
*/
virtual void wait() final override {
uint64_t sleepms=10, i=0;
uint64_t timeout = _interval<10000 ? 10000 : _interval;
while(sleepms*i++ < timeout) {
if (_pendingTasks)
std::this_thread::sleep_for(std::chrono::milliseconds(sleepms));
else {
this->execOnStop();
LOG(info) << "Sensorgroup " << _groupName << " stopped.";
return;
}
}
LOG(warning) << "Group " << _groupName << " will not finish! Skipping it";
}
/**
* @brief Start the sensor group (i.e. start collecting data).
*
......@@ -367,17 +411,13 @@ public:
*/
virtual void stop() final override {
if (!_keepRunning) {
LOG(info) << "Sensorgroup " << _groupName << " already stopped.";
LOG(debug) << "Sensorgroup " << _groupName << " already stopped.";
return;
}
_keepRunning = false;
//cancel any outstanding readAsync()
_timer->cancel();
wait();
this->execOnStop();
LOG(info) << "Sensorgroup " << _groupName << " stopped.";
}
/**
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment