Commit 5dac2f55 authored by Alessio Netti's avatar Alessio Netti
Browse files

Fixed behavior of duplicated analyzers

- REST API now correctly performs data analytics computations on
duplicated streaming analyzers
- In duplicated streaming analyzers, the unit that is exposed through
getUnits() is only the one on which that specific analyzer is working.
The units of the other sibling analyzers are still accessible through
the internal _units vector
parent 4361e22d
......@@ -120,6 +120,7 @@ bool AnalyticsManager::load(const string& path, const string& globalFile, const
return true;
}
//TODO: when textual MQTT topics are in place, implement check method that ensures all sensors, groups, analyzers and entities have distinct names
bool AnalyticsManager::mqttCheck(pluginVector_t& pushers) {
if(_status != LOADED) {
LOG(error) << "Cannot perform MQTT check, AnalyticsManager is not loaded!";
......@@ -408,13 +409,25 @@ restResponse_t AnalyticsManager::REST(const vector<string>& pathStrs, const vect
if (p.first == "unit")
unit = p.second;
bool found = false;
bool found=false, unitFound=false;
for (auto &p : _plugins)
if (p.id == plugin)
for (auto &a : p.configurator->getAnalyzers())
if( a->getName() == analyzer ) {
found = true;
map<string, reading_t> outMap = a->computeOnDemand(unit);
map <string, reading_t> outMap;
try {
outMap = a->computeOnDemand(unit);
unitFound = true;
} catch(const domain_error& e) {
// In the particular case where an analyzer is duplicated, it could be that the right
// unit is found only after a few tries. Therefore, we handle the domain_error
// exception raised in AnalyzerTemplate, and allow the search to continue
if(a->getStreaming() && a->getDuplicate())
continue;
else
throw;
}
if (json) {
boost::property_tree::ptree root, outputs;
// Iterating through the outputs of the on-demand computation and adding them to a JSON
......@@ -435,6 +448,9 @@ restResponse_t AnalyticsManager::REST(const vector<string>& pathStrs, const vect
}
if(!found)
throw domain_error("Plugin or analyzer not found!");
// This if branch is accessed only if the target analyzer is streaming and duplicated
else if(!unitFound)
throw domain_error("Node " + unit + " does not belong to the domain of " + analyzer + "!");
} else {
// Managing custom REST PUT actions defined at the analyzer level
bool found = false;
......
......@@ -157,6 +157,7 @@ public:
for(unsigned int i=0; i < numUnits; i++) {
A_Ptr anCopy = std::make_shared<Analyzer>(*an);
anCopy->setUnitID(i);
anCopy->collapseUnits();
storeAnalyzer(anCopy);
}
} else
......
......@@ -264,10 +264,11 @@ public:
_onDemandLock.store(false);
} else if( _keepRunning ) {
bool found = false;
for(const auto& u : _units)
// We iterate over _baseUnits and not _units because it only contains the units this analyzer is working on
for(const auto& u : _baseUnits)
if(u->getName() == node) {
found = true;
for(const auto& o : u->getOutputs())
for(const auto& o : u->getBaseOutputs())
outMap.insert(make_pair(o->getName(), o->getLatestValue()));
}
......@@ -304,6 +305,26 @@ public:
_insertionLUT->insert(make_pair(getTimestamp(), unit));
}
/**
* @brief Clears the internal baseUnits vector and only preserves the currently active unit
*
* This method is used for duplicated analyzers that share the same unit vector, with different
* unit IDs. In order to expose distinct units to the outside, this method allows to keep
* in the internal baseUnits vector (which is exposed through getUnits) only the unit that is
* assigned to this specific analyzer, among those of the duplicated group. Access to the
* other units is preserved through the _units vector, that can be accessed from within this
* analyzer object.
*
*/
virtual void collapseUnits() {
if(_unitID < 0 || _units.empty()) {
LOG(error) << "Analyzer " << _name << ": Cannot collapse units!";
return;
}
_baseUnits.clear();
_baseUnits.push_back(_units[_unitID]);
}
protected:
/**
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment