Commit bec12ea1 authored by Alessio Netti's avatar Alessio Netti

Analytics: various fixes

- Integrated the "disabled" parameter for operators
- "operatorPlugin" can be left empty if path and config are default
- "operatorPlugin" config field is relative to _cfgPath if not absolute
- Relaxed the operator name uniqueness constraint, duplicated operators
now have (again) all the same name
- Minor bugfixes
parent 6071b439
......@@ -90,7 +90,7 @@ bool OperatorManager::load(const string& path, const string& globalFile, const p
//Reading plugins
BOOST_FOREACH(boost::property_tree::iptree::value_type &plugin, cfg.get_child("operatorPlugins")) {
if (boost::iequals(plugin.first, "operatorPlugin")) {
if (!plugin.second.empty()) {
if (!plugin.second.data().empty()) {
string pluginConfig="", pluginPath="";
BOOST_FOREACH(boost::property_tree::iptree::value_type &val, plugin.second) {
if (boost::iequals(val.first, "path")) {
......@@ -132,6 +132,8 @@ bool OperatorManager::loadPlugin(const string& name, const string& pluginPath, c
// If config-path not specified we will look for pluginName.conf in the global conf directory
if (pluginConfig == "")
pluginConfig = _configPath + name + ".conf";
else if(pluginConfig[0] != '/')
pluginConfig = _configPath + pluginConfig;
// Open dl-code based on http://tldp.org/HOWTO/C++-dlopen/thesolution.html
if (FILE *file = fopen(pluginConfig.c_str(), "r")) {
......@@ -281,7 +283,7 @@ bool OperatorManager::checkTopics(op_dl_t p) {
MQTTChecker& mqttCheck = MQTTChecker::getInstance();
bool validTopics=true;
for(const auto& op : p.configurator->getOperators()) {
if (!mqttCheck.checkGroup(op->getName()))
if (!mqttCheck.checkGroup(op->getName()) && !(op->getStreaming() && op->getDuplicate()))
validTopics = false;
if (op->getStreaming()) {
for (const auto &u : op->getUnits())
......@@ -598,11 +600,12 @@ void OperatorManager::PUT_analytics_compute(endpointArgs) {
std::ostringstream data;
bool unitFound=false;
bool unitFound=false, opFound=false;
for (const auto &p : _plugins) {
if (p.id == plugin) {
for (const auto &op : p.configurator->getOperators()) {
if( op->getName() == oper ) {
opFound = true;
std::map<std::string, reading_t> outMap;
try {
outMap = op->computeOnDemand(unit);
......@@ -647,8 +650,8 @@ void OperatorManager::PUT_analytics_compute(endpointArgs) {
}
}
// This if branch is accessed only if the target operator is streaming and duplicated
if(!unitFound) {
res.body() = "Node " + unit + " does not belong to the domain of " + oper + "\n!";
if(opFound && !unitFound) {
res.body() = "Node " + unit + " does not belong to the domain of " + oper + "!\n";
res.result(http::status::not_found);
}
......
......@@ -173,6 +173,7 @@ file. The following is instead a list of configuration parameters that are avail
| minValues | Minimum number of readings that need to be stored in output sensors before these are pushed as MQTT messages. Only used for operators in _streaming_ mode.
| mqttPart | Part of the MQTT topic associated to this operator. Only used when the Unit system is not employed (see this [section](#mqttTopics)).
| sync | If set to _true_, computation will be performed at time intervals synchronized with sensor readings.
| disabled | If set to _true_, the operator will be instantiated but will not be started and will not be available for queries.
| duplicate | If set to _false_, only one operator object will be instantiated. Such operator will perform computation over all units that are instantiated, at every interval, sequentially. If set to _true_, the operator object will be duplicated such that each copy will have one unit associated to it. This allows to exploit parallelism between units, but results in separate models to avoid race conditions.
| streaming | If set to _true_, the operator will operate in _streaming_ mode, pushing output sensors regularly. If set to _false_, the operator will instead operate in _on-demand_ mode.
| input | Block of input sensors that must be used to instantiate the units of this operator. These can both be a list of strings, or fully-qualified _Sensor_ blocks containing specific attributes (see DCDBPusher Readme).
......
......@@ -206,7 +206,7 @@ public:
for(unsigned int i=0; i < numUnits; i++) {
O_Ptr opCopy = std::make_shared<Operator>(*op);
opCopy->setUnitID(i);
opCopy->setName(opCopy->getName() + "@" + op->getUnits()[i]->getName());
//opCopy->setName(opCopy->getName() + "@" + op->getUnits()[i]->getName());
opCopy->collapseUnits();
storeOperator(opCopy);
}
......@@ -385,6 +385,8 @@ protected:
op.setMqttPart(val.second.data());
} else if (boost::iequals(val.first, "sync")) {
op.setSync(to_bool(val.second.data()));
} else if (boost::iequals(val.first, "disabled")) {
op.setDisabled(to_bool(val.second.data()));
} else if (boost::iequals(val.first, "delay")) {
op.setDelayInterval(stoull(val.second.data()));
} else if (boost::iequals(val.first, "duplicate")) {
......
......@@ -84,6 +84,7 @@ public:
_streaming(true),
_sync(true),
_dynamic(false),
_disabled(false),
_flatten(false),
_unitID(-1),
_keepRunning(0),
......@@ -109,6 +110,7 @@ public:
_streaming(other._streaming),
_sync(other._sync),
_dynamic(other._dynamic),
_disabled(other._disabled),
_flatten(other._flatten),
_unitID(other._unitID),
_keepRunning(other._keepRunning),
......@@ -140,6 +142,7 @@ public:
_streaming = other._streaming;
_sync = other._sync;
_dynamic = other._dynamic;
_disabled = other._disabled;
_flatten = other._flatten;
_keepRunning = other._keepRunning;
_minValues = other._minValues;
......@@ -258,6 +261,7 @@ public:
unsigned getDelayInterval() const { return _delayInterval; }
int getUnitID() const { return _unitID; }
bool getDynamic() const { return _dynamic; }
bool getDisabled() const { return _disabled; }
bool getFlatten() const { return _flatten; }
// Setter methods
......@@ -269,6 +273,7 @@ public:
void setUnitID(int u) { _unitID = u; }
void setStreaming(bool streaming) { _streaming = streaming; }
void setDuplicate(bool duplicate) { _duplicate = duplicate; }
void setDisabled(bool disabled) { _disabled = disabled; }
void setMinValues(unsigned minValues) { _minValues = minValues; }
void setInterval(unsigned interval) { _interval = interval; }
void setUnitCacheLimit(unsigned uc) { _unitCacheLimit = uc+1; }
......@@ -332,6 +337,8 @@ protected:
bool _sync;
// Indicates whether the operator generates units dynamically at runtime, or only at initialization
bool _dynamic;
// If true, the operator is initialized but "disabled" and cannot be used
bool _disabled;
// Indicates whether the operator generates hierarchical units that must be flattened (their sub-units exposed)
bool _flatten;
// ID of the units this operator works on
......
......@@ -137,6 +137,7 @@ public:
virtual void printConfig(LOG_LEVEL ll) override {
if(_mqttPart!="")
LOG_VAR(ll) << " MQTT prefix: " << _mqttPart;
LOG_VAR(ll) << " Disabled: " << (_disabled ? "true" : "false");
LOG_VAR(ll) << " Sync readings: " << (_sync ? "enabled" : "disabled");
LOG_VAR(ll) << " Streaming mode: " << (_streaming ? "enabled" : "disabled");
LOG_VAR(ll) << " Duplicated mode: " << (_duplicate ? "enabled" : "disabled");
......@@ -244,7 +245,8 @@ public:
} else if(!_streaming) {
LOG(error) << "On-demand operator " << _name << " cannot be started.";
return;
}
} else if(_disabled)
return;
if (!this->execOnStart()) {
LOG(error) << "Operator " << _name << ": startup failed.";
......@@ -299,7 +301,7 @@ public:
*/
virtual map<string, reading_t> computeOnDemand(const string& node="__root__") override {
map<string, reading_t> outMap;
if( !_streaming ) {
if( !_streaming && !_disabled ) {
shared_ptr<SensorNavigator> navi = _queryEngine.getNavigator();
UnitGenerator<S> unitGen(navi);
// We check whether the input node belongs to this operator's unit domain
......@@ -340,7 +342,7 @@ public:
}
_onDemandLock.store(false);
} else if( _keepRunning ) {
} else if( _keepRunning && !_disabled ) {
bool found = false;
// We iterate over _baseUnits and not _units because it only contains the units this operator is working on
for(const auto& u : _baseUnits)
......@@ -448,7 +450,7 @@ protected:
LOG(error) << e.what();
}
if (_timer && _keepRunning) {
if (_timer && _keepRunning && !_disabled) {
_timer->expires_at(timestamp2ptime(nextReadingTime()));
_pendingTasks++;
_timer->async_wait(bind(&OperatorTemplate::computeAsync, this));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment