Commit ebbc2e42 authored by Micha Mueller's avatar Micha Mueller
Browse files

Merge branch 'master' of gitlab.lrz.de:dcdb/dcdbpusher

parents 3f801a2e 5dac2f55
......@@ -120,6 +120,7 @@ bool AnalyticsManager::load(const string& path, const string& globalFile, const
return true;
}
//TODO: when textual MQTT topics are in place, implement check method that ensures all sensors, groups, analyzers and entities have distinct names
bool AnalyticsManager::mqttCheck(pluginVector_t& pushers) {
if(_status != LOADED) {
LOG(error) << "Cannot perform MQTT check, AnalyticsManager is not loaded!";
......@@ -408,13 +409,25 @@ restResponse_t AnalyticsManager::REST(const vector<string>& pathStrs, const vect
if (p.first == "unit")
unit = p.second;
bool found = false;
bool found=false, unitFound=false;
for (auto &p : _plugins)
if (p.id == plugin)
for (auto &a : p.configurator->getAnalyzers())
if( a->getName() == analyzer ) {
found = true;
map<string, reading_t> outMap = a->computeOnDemand(unit);
map <string, reading_t> outMap;
try {
outMap = a->computeOnDemand(unit);
unitFound = true;
} catch(const domain_error& e) {
// In the particular case where an analyzer is duplicated, it could be that the right
// unit is found only after a few tries. Therefore, we handle the domain_error
// exception raised in AnalyzerTemplate, and allow the search to continue
if(a->getStreaming() && a->getDuplicate())
continue;
else
throw;
}
if (json) {
boost::property_tree::ptree root, outputs;
// Iterating through the outputs of the on-demand computation and adding them to a JSON
......@@ -435,6 +448,9 @@ restResponse_t AnalyticsManager::REST(const vector<string>& pathStrs, const vect
}
if(!found)
throw domain_error("Plugin or analyzer not found!");
// This if branch is accessed only if the target analyzer is streaming and duplicated
else if(!unitFound)
throw domain_error("Node " + unit + " does not belong to the domain of " + analyzer + "!");
} else {
// Managing custom REST PUT actions defined at the analyzer level
bool found = false;
......
......@@ -157,6 +157,7 @@ public:
for(unsigned int i=0; i < numUnits; i++) {
A_Ptr anCopy = std::make_shared<Analyzer>(*an);
anCopy->setUnitID(i);
anCopy->collapseUnits();
storeAnalyzer(anCopy);
}
} else
......
......@@ -264,10 +264,11 @@ public:
_onDemandLock.store(false);
} else if( _keepRunning ) {
bool found = false;
for(const auto& u : _units)
// We iterate over _baseUnits and not _units because it only contains the units this analyzer is working on
for(const auto& u : _baseUnits)
if(u->getName() == node) {
found = true;
for(const auto& o : u->getOutputs())
for(const auto& o : u->getBaseOutputs())
outMap.insert(make_pair(o->getName(), o->getLatestValue()));
}
......@@ -304,6 +305,26 @@ public:
_insertionLUT->insert(make_pair(getTimestamp(), unit));
}
/**
* @brief Clears the internal baseUnits vector and only preserves the currently active unit
*
* This method is used for duplicated analyzers that share the same unit vector, with different
* unit IDs. In order to expose distinct units to the outside, this method allows to keep
* in the internal baseUnits vector (which is exposed through getUnits) only the unit that is
* assigned to this specific analyzer, among those of the duplicated group. Access to the
* other units is preserved through the _units vector, that can be accessed from within this
* analyzer object.
*
*/
virtual void collapseUnits() {
if(_unitID < 0 || _units.empty()) {
LOG(error) << "Analyzer " << _name << ": Cannot collapse units!";
return;
}
_baseUnits.clear();
_baseUnits.push_back(_units[_unitID]);
}
protected:
/**
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment