Commit de38e004 authored by Alessio Netti's avatar Alessio Netti

Analytics: fixes and small changes

- Fixed a bug that prevented relaxed mode from working properly for
operators deployed on the collectagent
- Changed the function of the "delay" parameter: it now specifies a
fixed offset applied to the computation interval of each operator,
useful to tune the flow of data in operator pipelines
parent 6ec47c85
......@@ -168,7 +168,7 @@ file. The following is instead a list of configuration parameters that are avail
| default | Name of the template that must be used to configure this operator.
| interval | Specifies how often the operator will be invoked to perform computations, and thus the sampling interval of its output sensors. Only used for operators in _streaming_ mode.
| relaxed | If set to _true_, the units of this operator will be instantiated even if some of the respective input sensors do not exist.
| delay | Delay in milliseconds to be applied to the start of the operator. This parameter only applies to streaming operators. It can be used to allow for input sensor caches to be populated before the operator is started.
| delay | Delay in milliseconds to be applied to the interval of the operator. This parameter can be used to tune how operator pipelines work, ensuring that the next computation stage is started only after the previous one has finished.
| unitCacheLimit | Defines the maximum size of the unit cache that is used in the on-demand and job modes. Default is 1000.
| minValues | Minimum number of readings that need to be stored in output sensors before these are pushed as MQTT messages. Only used for operators in _streaming_ mode.
| mqttPart | Part of the MQTT topic associated to this operator. Only used when the Unit system is not employed (see this [section](#mqttTopics)).
......
......@@ -278,12 +278,6 @@ protected:
*
*/
virtual void computeAsync() override {
if(this->_delayInterval > 0) {
sleep(this->_delayInterval);
this->_delayInterval = 0;
LOG(info) << "Operator " + this->_name + ": starting computation after delayed start!";
}
try {
_jobDataVec.clear();
if(this->_queryEngine.queryJob(0, this->_interval * 1000000, 0, _jobDataVec, true, true)) {
......
......@@ -386,7 +386,7 @@ protected:
} else if (boost::iequals(val.first, "sync")) {
op.setSync(to_bool(val.second.data()));
} else if (boost::iequals(val.first, "delay")) {
op.setDelayInterval(stoull(val.second.data()) / 1000);
op.setDelayInterval(stoull(val.second.data()));
} else if (boost::iequals(val.first, "duplicate")) {
op.setDuplicate(to_bool(val.second.data()));
} else if (boost::iequals(val.first, "relaxed")) {
......
......@@ -92,7 +92,7 @@ public:
_cacheInterval(900000),
_unitCacheLimit(1000),
_cacheSize(1),
_delayInterval(0),
_delayInterval(10),
_pendingTasks(0),
_onDemandLock(false),
_timer(nullptr) {}
......
......@@ -142,8 +142,8 @@ public:
LOG_VAR(ll) << " Duplicated mode: " << (_duplicate ? "enabled" : "disabled");
LOG_VAR(ll) << " MinValues: " << _minValues;
LOG_VAR(ll) << " Interval: " << _interval;
LOG_VAR(ll) << " Interval Delay: " << _delayInterval;
LOG_VAR(ll) << " Unit Cache Size: " << _unitCacheLimit;
LOG_VAR(ll) << " Start delay: " << _delayInterval;
if(!_units.empty()) {
LOG_VAR(ll) << " Units:";
for (auto u : _units)
......@@ -254,10 +254,7 @@ public:
_keepRunning = 1;
_pendingTasks++;
_timer->async_wait(bind(&OperatorTemplate<S>::computeAsync, this));
if(_delayInterval == 0)
LOG(info) << "Operator " << _name << " started.";
else
LOG(info) << "Operator " << _name << " will be started after a delay of " << _delayInterval << " seconds.";
LOG(info) << "Operator " << _name << " started.";
}
/**
......@@ -426,7 +423,7 @@ protected:
if(!waitToStart ){ // less than 1 ms seconds is too small, so we wait the entire interval for the next measurement
return (now_ms + interval64 + 10)*1000*1000;
}
return (now_ms + waitToStart + 10)*1000*1000;
return (now_ms + waitToStart + _delayInterval)*1000*1000;
} else {
return now + MS_TO_NS(_interval);
}
......@@ -441,13 +438,6 @@ protected:
*
*/
virtual void computeAsync() override {
// Sleeping until we are allowed to start
if(_delayInterval > 0) {
sleep(_delayInterval);
_delayInterval = 0;
LOG(info) << "Operator " + _name + ": starting computation after delayed start!";
}
try {
if (_duplicate && _unitID >= 0)
compute(_units[_unitID]);
......
......@@ -121,16 +121,16 @@ bool jobQueryCallback(const uint32_t jobId, const uint64_t startTs, const uint64
}
bool sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>& buffer, const bool rel) {
std::string topic;
std::string topic=name;
// Getting the topic of the queried sensor from the Navigator
// If not found, we try to use the input name as topic
try {
topic = queryEngine.getNavigator()->getNodeTopic(name);
} catch(const std::domain_error& e) {
return false;
}
} catch(const std::domain_error& e) {}
DCDB::SensorId sid;
// Creating a SID to perform the query
sid.mqttTopicConvert(topic);
if(!sid.mqttTopicConvert(topic))
return false;
if(mySensorCache.getSensorMap().count(sid) > 0) {
CacheEntry &entry = mySensorCache.getSensorMap()[sid];
if (entry.getView(startTs, endTs, buffer, rel))
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment