Commit 1a754858 authored by Alessio Netti's avatar Alessio Netti

DA: delayed start feature

- Using the "delay" parameter, analyzers can be started in a delayed way
- Useful to let sensor caches fill and prevent errors being thrown
parent 90c38552
......@@ -158,6 +158,7 @@ file. The following is instead a list of configuration parameters that are avail
|:----- |:----------- |
| default | Name of the template that must be used to configure this analyzer.
| interval | Specifies how often the analyzer will be invoked to perform computations, and thus the sampling interval of its output sensors. Only used for analyzers in _streaming_ mode.
| delay | Delay in milliseconds to be applied to the start of the analyzer. This parameter only applies to streaming analyzers. It can be used to allow for input sensor caches to be populated before the analyzer is started.
| minValues | Minimum number of readings that need to be stored in output sensors before these are pushed as MQTT messages. Only used for analyzers in _streaming_ mode.
| mqttPart | Part of the MQTT topic associated to this analyzer. Only used when the Unit system is not employed (see this [section](#mqttTopics)).
| sync | If set to _true_, computation will be performed at time intervals synchronized with sensor readings.
......
......@@ -308,6 +308,8 @@ protected:
an.setMqttPart(val.second.data());
} else if (boost::iequals(val.first, "sync")) {
an.setSync(val.second.data() == "true");
} else if (boost::iequals(val.first, "delay")) {
an.setDelayInterval(stoull(val.second.data()) / 1000);
} else if (boost::iequals(val.first, "duplicate")) {
an.setDuplicate(val.second.data() == "true");
} else if (boost::iequals(val.first, "streaming")) {
......
......@@ -53,6 +53,7 @@ public:
_interval(1000),
_cacheInterval(900000),
_cacheSize(1),
_delayInterval(0),
_pendingTasks(0),
_onDemandLock(false),
_timer(nullptr) {}
......@@ -73,6 +74,7 @@ public:
_interval(other._interval),
_cacheInterval(other._cacheInterval),
_cacheSize(other._cacheSize),
_delayInterval(other._delayInterval),
_pendingTasks(0),
_onDemandLock(false),
_timer(nullptr) {}
......@@ -98,6 +100,7 @@ public:
_interval = other._interval;
_cacheInterval = other._cacheInterval;
_cacheSize = other._cacheSize;
_delayInterval = other._delayInterval;
_pendingTasks.store(0);
_onDemandLock.store(false);
_timer = nullptr;
......@@ -200,6 +203,7 @@ public:
unsigned getMinValues() const { return _minValues; }
unsigned getInterval() const { return _interval; }
unsigned getCacheSize() const { return _cacheSize; }
unsigned getDelayInterval() const { return _delayInterval; }
int getUnitID() const { return _unitID; }
// Setter methods
......@@ -213,6 +217,7 @@ public:
void setMinValues(unsigned minValues) { _minValues = minValues; }
void setInterval(unsigned interval) { _interval = interval; }
void setCacheInterval(unsigned cacheInterval) { _cacheInterval = cacheInterval; }
void setDelayInterval(unsigned delayInterval) { _delayInterval = delayInterval; }
virtual vector<UnitPtr>& getUnits() = 0;
protected:
......@@ -263,6 +268,8 @@ protected:
unsigned int _cacheInterval;
// Real size of the cache, as determined from cacheInterval
unsigned int _cacheSize;
// Time in seconds to wait for before starting computation
unsigned int _delayInterval;
// Number of pending ASIO tasks
atomic_uint _pendingTasks;
// Lock used to serialize access to the ondemand functionality
......
......@@ -166,7 +166,10 @@ public:
_keepRunning = 1;
_pendingTasks++;
_timer->async_wait(bind(&AnalyzerTemplate<S>::computeAsync, this));
LOG(info) << "Analyzer " << _name << " started.";
if(_delayInterval == 0)
LOG(info) << "Analyzer " << _name << " started.";
else
LOG(info) << "Analyzer " << _name << " will be started after a delay of " << _delayInterval << " seconds.";
}
/**
......@@ -360,6 +363,13 @@ protected:
*
*/
virtual void computeAsync() override {
// Sleeping until we are allowed to start
if(_delayInterval > 0) {
sleep(_delayInterval);
_delayInterval = 0;
LOG(info) << "Analyzer " + _name + ": starting computation after delayed start!";
}
try {
if (_duplicate && _unitID >= 0)
compute(_unitID);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment