Commit 0525d36d authored by Alessio Netti's avatar Alessio Netti

Analytics: re-worked sensor group stop mechanism

- Integrated changes to the stop mechanism
- Also re-worked the interface to have the same execOn* methods as Sensor Groups
parent d1d3b520
......@@ -196,6 +196,9 @@ void OperatorManager::unloadPlugin(const string& id) {
for (const auto& op : it->configurator->getOperators())
op->stop();
for (const auto& op : it->configurator->getOperators())
op->wait();
removeTopics(*it);
if (it->configurator)
......@@ -325,6 +328,9 @@ bool OperatorManager::stop(const string& plugin, const string& operatorN) {
op->stop();
out=true;
}
for (const auto &op : p.configurator->getOperators())
if(op->getStreaming() && (operatorN=="" || operatorN==op->getName()))
op->wait();
}
return out;
}
......
......@@ -121,13 +121,6 @@ public:
_unitAccess.store(false);
}
/**
* @brief Initializes this operator
*
* @param io Boost ASIO service to be used
*/
virtual void init(boost::asio::io_service& io) override { OperatorInterface::init(io); }
/**
* @brief Performs an on-demand compute task
*
......
......@@ -154,18 +154,7 @@ public:
return *this;
}
/**
* @brief Waits for the operator to complete its tasks
*
* Does a busy wait until all dispatched handlers are finished (_pendingTasks == 0).
*/
void wait() {
while(_pendingTasks) {
sleep(1);
}
}
/**
* @brief Initializes this operator
*
......@@ -193,6 +182,13 @@ public:
*/
virtual restResponse_t REST(const string& action, const unordered_map<string, string>& queries) = 0;
/**
* @brief Waits for the operator to complete its tasks
*
* This method must be implemented in derived classes.
*/
virtual void wait() = 0;
/**
* @brief Starts this operator
*
......@@ -283,10 +279,36 @@ public:
protected:
/**
* @brief Implement plugin specific actions to initialize an operator here.
*
* @details If a derived class requires further custom
* actions for initialization, this should be implemented here.
*/
virtual void execOnInit() {}
/**
* @brief Implement plugin-specific actions to start an operator here.
*
* @details If a derived class (i.e. a plugin group) requires further custom
* actions to start analytics, this should be implemented here.
*
* @return True on success, false otherwise.
*/
virtual bool execOnStart() { return true; }
/**
* @brief Implement plugin specific actions to stop a group here.
*
* @details If a derived class requires further custom actions
* to stop operation this should be implemented here.
*/
virtual void execOnStop() {}
/**
* @brief Performs a compute task
*
* This method is tasked with scheduling the next compute task, and invoking the internal
* @details This method is tasked with scheduling the next compute task, and invoking the internal
* compute() method, which encapsulates the real logic of the operator. The compute method
* is automatically called over units as required by the Operator's configuration.
*
......
......@@ -203,17 +203,41 @@ public:
*
* @param io Boost ASIO service to be used
*/
virtual void init(boost::asio::io_service& io) override {
virtual void init(boost::asio::io_service& io) final override {
OperatorInterface::init(io);
for(const auto u : _units)
u->init(_cacheSize);
this->execOnInit();
}
/**
* @brief Waits for the operator to complete its tasks
*
* Does a busy wait until all dispatched handlers are finished (_pendingTasks == 0).
*/
virtual void wait() final override {
uint64_t sleepms=10, i=0;
uint64_t timeout = _interval<10000 ? 30000 : _interval*3;
while(sleepms*i++ < timeout) {
if (_pendingTasks)
std::this_thread::sleep_for(std::chrono::milliseconds(sleepms));
else {
this->execOnStop();
LOG(info) << "Operator " << _name << " stopped.";
return;
}
}
LOG(warning) << "Operator " << _name << " will not finish! Skipping it";
}
/**
* @brief Starts this operator
*/
virtual void start() override {
virtual void start() final override {
if(_keepRunning) {
LOG(info) << "Operator " << _name << " already running.";
return;
......@@ -222,6 +246,11 @@ public:
return;
}
if (!this->execOnStart()) {
LOG(error) << "Operator " << _name << ": startup failed.";
return;
}
_keepRunning = 1;
_pendingTasks++;
_timer->async_wait(bind(&OperatorTemplate<S>::computeAsync, this));
......@@ -234,18 +263,15 @@ public:
/**
* @brief Stops this operator
*/
virtual void stop() override {
if(_keepRunning == 0) {
LOG(info) << "Operator " << _name << " already stopped.";
return;
} else if(!_streaming) {
LOG(error) << "On-demand operator " << _name << " cannot be stopped.";
virtual void stop() final override {
if(_keepRunning == 0 || !_streaming) {
LOG(debug) << "Operator " << _name << " already stopped.";
return;
}
_keepRunning = 0;
wait();
LOG(info) << "Operator " << _name << " stopped.";
//cancel any outstanding readAsync()
_timer->cancel();
}
/**
......
......@@ -47,11 +47,10 @@ void FilesinkOperator::printConfig(LOG_LEVEL ll) {
OperatorTemplate<FilesinkSensorBase>::printConfig(ll);
}
void FilesinkOperator::stop() {
void FilesinkOperator::execOnStop() {
for(const auto& u : _units)
for(const auto& in : u->getInputs())
in->closeFile();
OperatorTemplate<FilesinkSensorBase>::stop();
}
void FilesinkOperator::compute(U_Ptr unit) {
......
......@@ -53,7 +53,7 @@ public:
bool getAutoName() { return _autoName; }
void printConfig(LOG_LEVEL ll) override;
void stop() override;
void execOnStop() override;
protected:
......
......@@ -83,7 +83,7 @@ restResponse_t RegressorOperator::REST(const string& action, const unordered_map
return resp;
}
void RegressorOperator::init(boost::asio::io_service& io) {
void RegressorOperator::execOnInit() {
bool useDefault=true;
if(_modelIn!="") {
try {
......@@ -101,7 +101,6 @@ void RegressorOperator::init(boost::asio::io_service& io) {
_rForest = cv::ml::RTrees::create();
_rForest->setCalculateVarImportance(_importances);
}
OperatorTemplate<RegressorSensorBase>::init(io);
}
void RegressorOperator::printConfig(LOG_LEVEL ll) {
......
......@@ -55,7 +55,7 @@ public:
virtual restResponse_t REST(const string& action, const unordered_map<string, string>& queries) override;
virtual void init(boost::asio::io_service& io) override;
virtual void execOnInit() override;
void setInputPath(std::string in) { _modelIn = in; }
void setOutputPath(std::string out) { _modelOut = out; }
......
......@@ -39,9 +39,6 @@
#include <atomic>
#include <memory>
#include <boost/asio.hpp>
#include <thread>
#include <chrono>
#include "logging.h"
#include "sensorbase.h"
......
......@@ -364,7 +364,7 @@ public:
*/
virtual void wait() final override {
uint64_t sleepms=10, i=0;
uint64_t timeout = _interval<10000 ? 10000 : _interval;
uint64_t timeout = _interval<10000 ? 30000 : _interval*3;
while(sleepms*i++ < timeout) {
if (_pendingTasks)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment