Commit c65c6d53 authored by Alessio Netti's avatar Alessio Netti
Browse files

Improvement management of on-demand units

- When the analyzer is created, we check whether it only has the root
unit; if so, the unit is immediately resolved and instantiated even if
the analyzer is on-demand
parent 626c6c7c
......@@ -402,7 +402,7 @@ restResponse_t AnalyticsManager::REST(const vector<string>& pathStrs, const vect
if(pathStrs.size() < 4)
throw invalid_argument("Received malformed request, no third path part!");
string unit = "root";
string unit = SensorNavigator::rootKey;
for (auto& p : queries)
if (p.first == "unit")
unit = p.second;
......
......@@ -4,6 +4,8 @@
#include "SensorNavigator.h"
const string SensorNavigator::rootKey = "root";
void SensorNavigator::clearTree() {
if(_sensorTree) {
_sensorTree->clear();
......
......@@ -27,7 +27,7 @@ class SensorNavigator {
public:
//Identifies the root supernode
const string rootKey = "root";
static const string rootKey;
//Internal structure to store nodes
struct Node {
......
......@@ -448,13 +448,13 @@ protected:
return true;
}
//TODO: switch to textual MQTT topics and use only MQTTPrefix
/**
* @brief Adjusts the names of the sensors
*
* Names are modified according to the sensorPattern specified in the global
* settings. Operates in tandem with the auto-publish feature.
*/
//TODO: switch to textual MQTT topics and use only MQTTPrefix
void constructSensorNames(UnitTemplate<SBase>& u, Analyzer& an) {
boost::regex sensorReg(SENSOR_PATTERN), groupReg(GROUP_PATTERN);
boost::cmatch match;
......@@ -473,7 +473,7 @@ protected:
// If the unit is related to a system component all of its sensors will have their names adjusted already
// If it is root, then we apply normal auto-publish. This means that adding the analyzer's name
// or plugin ID do not always work, as of now
if( u.getName() == "root") {
if( u.getName() == SensorNavigator::rootKey) {
name = _sensorPattern;
name = boost::regex_replace(name, sensorReg, s->getName());
name = boost::regex_replace(name, groupReg, an.getName());
......
......@@ -199,15 +199,25 @@ public:
* @param node Unit name for which the query must be performed
* @return a map<string, reading_t> containing the output of the query
*/
virtual map<string, reading_t> computeOnDemand(const string& node="") override {
virtual map<string, reading_t> computeOnDemand(const string& node="root") override {
map<string, reading_t> outMap;
if( !_streaming ) {
UnitGenerator<S> unitGen(_queryEngine.getNavigator());
shared_ptr<SensorNavigator> navi = _queryEngine.getNavigator();
UnitGenerator<S> unitGen(navi);
// We check whether the input node belongs to this analyzer's unit domain
if(_units.size()==0)
throw std::runtime_error("Initialization issue in analyzer " + _name + "!");
U_Ptr tempUnit = unitGen.generateUnit(node, _units[0]->getInputs(), _units[0]->getOutputs() , _units[0]->getInputMode(), "");
// If the ondemand template unit refers to root it means it has been completely resolved already,
// and therefore we can duplicate such unit without doing any resolution
U_Ptr tempUnit = nullptr;
if(_units[0]->getName() == SensorNavigator::rootKey) {
if (node == SensorNavigator::rootKey)
tempUnit = unitGen.duplicateUnit(_units[0]);
else
throw domain_error("UnitGenerator: Node " + node + " does not belong to this unit domain!");
} else
tempUnit = unitGen.generateUnit(node, _units[0]->getInputs(), _units[0]->getOutputs() , _units[0]->getInputMode(), "");
for(const auto s : tempUnit->getOutputs())
s->initSensor(_cacheSize);
......
......@@ -140,7 +140,6 @@ public:
* @param ondemand If True, no unit resolution is performed and a raw unit template is stored
* @return A vector of shared pointers to the generated unit objects
*/
//TODO: if ondemand and root, resolve unit immediately
vector<shared_ptr<UnitTemplate<SBase>>> *generateUnits(vector<shared_ptr<SBase>>& inputs, vector<shared_ptr<SBase>>& outputs, inputMode_t inputMode, string mqttPrefix="", bool ondemand=false) {
// If no outputs are defined, no units can be instantiated
if((inputs.empty() && inputMode==SELECTIVE) || outputs.empty())
......@@ -155,12 +154,12 @@ public:
int unitLevel = parseNodeLevelString(outputs[0]->getName());
set<string>* units = NULL;
if(unitLevel > -1)
units = resolveNodeLevelString(outputs[0]->getName(), _navi->rootKey, false);
units = resolveNodeLevelString(outputs[0]->getName(), SensorNavigator::rootKey, false);
// If no depth level was found (output sensor names do not contain any <unit-X> keyword) we assume that
// everything relates to root
else if(unitLevel == -1) {
units = new set<string>();
units->insert(_navi->rootKey);
units->insert(SensorNavigator::rootKey);
}
if(!units || units->empty())
......@@ -168,7 +167,9 @@ public:
// We iterate over the units, and resolve their inputs and outputs starting from the prototype definitions
vector<shared_ptr<UnitTemplate<SBase>>> *unitObjects = new vector<shared_ptr<UnitTemplate<SBase>>>();
if(!ondemand)
// If the unit pattern refers to root, we can resolve the unit immediately even if it is on demand
//TODO: remove this distinction after switching to textual MQTT topics, and don't resolve root unit
if(!ondemand || unitLevel==-1)
for(const auto& u : *units) {
try {
unitObjects->push_back(_generateUnit(u, inputs, outputs, inputMode, mqttPrefix));
......@@ -179,7 +180,7 @@ public:
}
}
else {
shared_ptr<UnitTemplate<SBase>> unPtr = make_shared<UnitTemplate<SBase>>("__template__", inputs, outputs);
shared_ptr<UnitTemplate<SBase>> unPtr = make_shared<UnitTemplate<SBase>>("template", inputs, outputs);
unPtr->setInputMode(inputMode);
unitObjects->push_back(unPtr);
}
......@@ -220,12 +221,34 @@ public:
return _generateUnit(u, inputs, outputs, inputMode, mqttPrefix);
}
/**
* @brief Duplicates the input unit
*
* Creates a deep copy of the unit given as input. The copy constructor for UnitTemplate,
* in fact, only creates shallow copies that share the same reference to Sensor objects.
*
* @param orig Shared pointer to the original unit object
* @return A duplicate of the input unit
*/
shared_ptr<UnitTemplate<SBase>> duplicateUnit(shared_ptr<UnitTemplate<SBase>> orig) {
vector<shared_ptr<SBase>> unitInputs, unitOutputs;
for(const auto& i : orig->getInputs())
unitInputs.push_back(make_shared<SBase>(*i));
for(const auto& o : orig->getOutputs())
unitOutputs.push_back(make_shared<SBase>(*o));
shared_ptr<UnitTemplate<SBase>> unPtr = make_shared<UnitTemplate<SBase>>(orig->getName(), unitInputs, unitOutputs);
unPtr->setInputMode(orig->getInputMode());
return unPtr;
}
protected:
// This private method will resolve all nodes in the current sensor tree that satisfy the "unit" pattern,
// and then verify whether "node" belongs to this set or not.
bool nodeBelongsToPattern(const string& node, const string& unit) {
set<string>* units = resolveNodeLevelString(unit, _navi->rootKey, false);
set<string>* units = resolveNodeLevelString(unit, SensorNavigator::rootKey, false);
if(!units->count(node)) {
delete units;
return false;
......@@ -304,7 +327,7 @@ protected:
delete sensors;
// If we are instantiating output sensors by unit, we generate mqtt topics by using the prefix
// associated to the respective node in the sensor tree, and the sensor suffix itself
if(u != _navi->rootKey) {
if(u != SensorNavigator::rootKey) {
uOut.setMqtt(_navi->buildTopicForNode(u, uOut.getMqtt()));
// Duplicating the file sink adding the name of each unit to the path
string sPath = uOut.getSinkPath();
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment