Commit 12af40ab authored by Alessio Netti's avatar Alessio Netti

Analytics: refactoring is complete

- Hierarchical units are now supported across the framework in all plugins
parent 501d19f6
......@@ -145,17 +145,7 @@ public:
if(!jobUnit)
throw std::runtime_error("Job " + node + " not in the domain of operator " + this->_name + "!");
this->compute(jobUnit, _jobDataVec[0]);
for (const auto &o : jobUnit->getOutputs()) {
outMap.insert(make_pair(o->getName(), o->getLatestValue()));
o->clearReadingQueue();
}
if(this->_flatten) {
for (const auto& su : jobUnit->getSubUnits())
for (const auto &o : su->getOutputs()) {
outMap.insert(make_pair(o->getName(), o->getLatestValue()));
o->clearReadingQueue();
}
}
this->retrieveAndFlush(outMap, jobUnit);
} else
throw std::runtime_error("Operator " + this->_name + ": cannot retrieve job data!");
} catch(const exception& e) {
......@@ -170,13 +160,7 @@ public:
for(const auto& u : this->_units)
if(u->getName() == node) {
found = true;
for(const auto& o : u->getBaseOutputs())
outMap.insert(make_pair(o->getName(), o->getLatestValue()));
if(this->_flatten) {
for (const auto& su : u->getSubUnits())
for (const auto &o : su->getOutputs())
outMap.insert(make_pair(o->getName(), o->getLatestValue()));
}
this->retrieveAndFlush(outMap, u, false);
}
releaseUnits();
......@@ -235,12 +219,10 @@ protected:
U_Ptr uTemplate = this->_unitCache->at(SensorNavigator::templateKey);
shared_ptr<SensorNavigator> navi = this->_queryEngine.getNavigator();
UnitGenerator<S> unitGen(navi);
// The job unit is generated as a hierarchical unit with the top level unit and the sub-units having
// the same set of output sensors
jobUnit = unitGen.generateHierarchicalUnit(jobTopic, jobData.nodes, uTemplate->getOutputs(), uTemplate->getInputs(),
uTemplate->getOutputs(), uTemplate->getInputMode(), this->_mqttPart, false, false, this->_relaxed);
// The job unit is generated as a hierarchical unit
jobUnit = unitGen.generateFromTemplate(uTemplate, jobTopic, jobData.nodes, this->_mqttPart, this->_enforceTopics, this->_relaxed);
// Initializing sensors if necessary
jobUnit->init(this->_cacheSize, this->_flatten);
jobUnit->init(this->_cacheSize);
this->addToUnitCache(jobUnit);
}
return jobUnit;
......@@ -300,13 +282,8 @@ protected:
while(_unitAccess.exchange(true)) {}
this->clearUnits();
for(const auto& ju : _tempUnits)
if(ju) {
if(ju)
this->addUnit(ju);
if(this->_flatten) {
for (const auto& su : ju->getSubUnits())
this->addUnit(su);
}
}
_unitAccess.store(false);
_tempUnits.clear();
}
......
......@@ -86,7 +86,6 @@ public:
_sync(true),
_dynamic(false),
_disabled(false),
_flatten(false),
_unitID(-1),
_keepRunning(0),
_minValues(1),
......@@ -113,7 +112,6 @@ public:
_sync(other._sync),
_dynamic(other._dynamic),
_disabled(other._disabled),
_flatten(other._flatten),
_unitID(other._unitID),
_keepRunning(other._keepRunning),
_minValues(other._minValues),
......@@ -146,7 +144,6 @@ public:
_sync = other._sync;
_dynamic = other._dynamic;
_disabled = other._disabled;
_flatten = other._flatten;
_keepRunning = other._keepRunning;
_minValues = other._minValues;
_interval = other._interval;
......@@ -266,7 +263,6 @@ public:
int getUnitID() const { return _unitID; }
bool getDynamic() const { return _dynamic; }
bool getDisabled() const { return _disabled; }
bool getFlatten() const { return _flatten; }
// Setter methods
void setName(const string& name) { _name = name; }
......@@ -346,8 +342,6 @@ protected:
bool _dynamic;
// If true, the operator is initialized but "disabled" and cannot be used
bool _disabled;
// Indicates whether the operator generates hierarchical units that must be flattened (their sub-units exposed)
bool _flatten;
// ID of the units this operator works on
int _unitID;
// Determines if the operator can keep running or must terminate
......
......@@ -167,9 +167,16 @@ public:
if (U_Ptr dUnit = dynamic_pointer_cast< UnitTemplate<S> >(u)) {
_units.push_back(dUnit);
_baseUnits.push_back(u);
if(dUnit->isTopUnit())
for(auto& subUnit : dUnit->getSubUnits()) {
if (U_Ptr dSubUnit = dynamic_pointer_cast< UnitTemplate<S> >(subUnit))
_baseUnits.push_back(dSubUnit);
else
LOG(error) << "Operator " << _name << ": Type mismatch when storing sub-unit! Will be omitted";
}
}
else
LOG(error) << "Operator " << _name << ": Type mismatch when storing output sensor! Sensor omitted";
LOG(error) << "Operator " << _name << ": Type mismatch when storing unit! Will be omitted";
}
/**
......@@ -326,34 +333,30 @@ public:
throw std::runtime_error("No template unit in operator " + _name + "!");
LOG(debug) << "Operator " << _name << ": cache miss for unit " << node << ".";
U_Ptr uTemplate = _unitCache->at(SensorNavigator::templateKey);
tempUnit = unitGen.generateUnit(node, uTemplate->getInputs(), uTemplate->getOutputs(), uTemplate->getInputMode(), _mqttPart, false, false, _relaxed);
tempUnit = unitGen.generateFromTemplate(uTemplate, node, list<string>(), _mqttPart, _enforceTopics, _relaxed);
addToUnitCache(tempUnit);
}
// Initializing sensors if necessary
tempUnit->init(_cacheSize);
compute(tempUnit);
for (const auto &o : tempUnit->getOutputs()) {
outMap.insert(make_pair(o->getName(), o->getLatestValue()));
o->clearReadingQueue();
}
retrieveAndFlush(outMap, tempUnit);
} catch(const exception& e) {
_onDemandLock.store(false);
throw;
}
_onDemandLock.store(false);
} else if( _keepRunning && !_disabled ) {
bool found = false;
// We iterate over _baseUnits and not _units because it only contains the units this operator is working on
for(const auto& u : _baseUnits)
if(u->getName() == node) {
found = true;
for(const auto& o : u->getBaseOutputs())
outMap.insert(make_pair(o->getName(), o->getLatestValue()));
}
if(!_duplicate) {
for(const auto &u : _units)
if(u->getName() == node) {
found = true;
retrieveAndFlush(outMap, u, false);
}
} else if(_unitID>=0 && node==_units[_unitID]->getName()) {
found = true;
retrieveAndFlush(outMap, _units[_unitID], false);
}
if(!found)
throw std::domain_error("Node " + node + " does not belong to the domain of " + _name + "!");
......@@ -406,9 +409,37 @@ public:
}
_baseUnits.clear();
_baseUnits.push_back(_units[_unitID]);
// If the unit is hierarchical, we add its subunits as well
if(_units[_unitID]->isTopUnit())
for(auto& subUnit : _units[_unitID]->getSubUnits())
_baseUnits.push_back(subUnit);
}
protected:
/**
* @brief Retrieves output values of a unit and puts them in a map
*
* @param outMap string to reading_t map in which outputs must be stored
* @param unit Unit (flat or hierarchical) to be scanned
* @param flushQueues If true, the queues of outbound sensor values will be flushed as well
*/
void retrieveAndFlush(map<string, reading_t>& outMap, U_Ptr unit, bool flushQueues=true) {
// Retrieving top-level outputs
for (const auto &o : unit->getOutputs()) {
outMap.insert(make_pair(o->getName(), o->getLatestValue()));
if(flushQueues)
o->clearReadingQueue();
}
// Retrieving sub-unit outputs (if any)
for (const auto &subUnit : unit->getSubUnits()) {
for (const auto &o : subUnit->getOutputs()) {
outMap.insert(make_pair(o->getName(), o->getLatestValue()));
if(flushQueues)
o->clearReadingQueue();
}
}
}
/**
* @brief Returns the timestamp associated with the next compute task
......
......@@ -196,10 +196,10 @@ public:
throw invalid_argument("UnitGenerator: only root unit is supported for this template type!");
shared_ptr <UnitTemplate<SBase>> subUnit = tUnit->getSubUnits()[0];
return generateHierarchicalUnit(u, subNames, tUnit->getBaseOutputs(), subUnit->getBaseInputs(),
subUnit->getBaseOutputs(), tUnit->getInputMode(), mqttPrefix, false, enforceTopics, relaxed);
return generateHierarchicalUnit(u, subNames, tUnit->getOutputs(), subUnit->getInputs(),
subUnit->getOutputs(), tUnit->getInputMode(), mqttPrefix, false, enforceTopics, relaxed);
} else {
return generateUnit(u, tUnit->getBaseInputs(), tUnit->getBaseOutputs(), tUnit->getInputMode(),
return generateUnit(u, tUnit->getInputs(), tUnit->getOutputs(), tUnit->getInputMode(),
mqttPrefix, false, enforceTopics, relaxed);
}
}
......@@ -275,9 +275,15 @@ public:
try {
unitObjects->push_back(_generateUnit(u, inputs, outputs, inputMode, mqttPrefix, enforceTopics, relaxed));
} catch(const exception& e) {
LOG(error) << e.what();
LOG(error) << "UnitGenerator: cannot build unit " << u << "!";
continue;
if(units->size()>1) {
LOG(error) << e.what();
LOG(error) << "UnitGenerator: cannot build unit " << u << "!";
continue;
} else {
delete units;
delete unitObjects;
throw;
}
}
}
else {
......@@ -340,18 +346,24 @@ public:
std::string effPrefix;
if(u==SensorNavigator::rootKey) {
effPrefix = MQTTChecker::formatTopic(mqttPrefix);
topUnit->setName(SensorNavigator::rootKey);
} else {
effPrefix = MQTTChecker::formatTopic(mqttPrefix) + MQTTChecker::formatTopic(u);
topUnit->setName(effPrefix + "/");
topUnit->setName(MQTTChecker::formatTopic(u) + "/");
}
try {
topUnit->setSubUnits(*generateUnits(subNames, subInputs, subOutputs, inputMode, effPrefix, ondemand, enforceTopics, relaxed));
vector<shared_ptr<UnitTemplate<SBase>>>* units = generateUnits(subNames, subInputs, subOutputs, inputMode, effPrefix, ondemand, enforceTopics, relaxed);
topUnit->setSubUnits(*units);
delete units;
} catch (const invalid_argument &e) {
topUnit->clear();
LOG(error) << e.what();
LOG(error) << "HierarchicalUnitGenerator: cannot build unit " << u << "!";
continue;
if(uNames.size()>1) {
LOG(error) << e.what();
LOG(error) << "HierarchicalUnitGenerator: cannot build unit " << u << "!";
continue;
} else {
delete unitObjects;
throw;
}
}
// Mapping outputs
......@@ -598,12 +610,7 @@ protected:
unitOutputs.push_back(make_shared<SBase>(uOut));
}
shared_ptr<UnitTemplate<SBase>> unPtr = make_shared<UnitTemplate<SBase>>(u, unitInputs, unitOutputs);
// If unit is non-root and enforceTopics is true, unit name is /prefix/unitName/
if(enforceTopics && u!=SensorNavigator::rootKey)
unPtr->setName(MQTTChecker::formatTopic(mqttPrefix) + MQTTChecker::formatTopic(u) + "/");
// If unit is non-root and enforceTopics is false, unit name is /unitName/
// If unit is root, its name is left as __root__
else if(u!=SensorNavigator::rootKey)
if(u!=SensorNavigator::rootKey)
unPtr->setName(MQTTChecker::formatTopic(u) + "/");
unPtr->setInputMode(inputMode);
return unPtr;
......
......@@ -60,9 +60,8 @@ public:
* @brief Initializes the sensors in the unit
*
* @param cacheSize size of the sensor cache
* @param flatten if true, sensors in sub-units are also initialized
*/
virtual void init(unsigned int cacheSize, bool flatten=false) = 0;
virtual void init(unsigned int cacheSize) = 0;
/**
* @brief Sets the name of this unit
......
......@@ -177,19 +177,16 @@ public:
/**
* @brief Initializes the sensors in the unit
*
* @param cacheSize size of the sensor cache
* @param flatten if true, sensors in sub-units are also initialized
* @param cacheSize size of the sensor cache
*/
void init(unsigned int cacheSize, bool flatten=false) override {
void init(unsigned int cacheSize) override {
for(const auto s : _outputs)
if (!s->isInit())
s->initSensor(cacheSize);
if(flatten) {
for (const auto &su : _subUnits)
for (const auto s : su->getOutputs())
if (!s->isInit())
s->initSensor(cacheSize);
}
for (const auto &su : _subUnits)
for (const auto s : su->getOutputs())
if (!s->isInit())
s->initSensor(cacheSize);
}
/**
......
......@@ -73,11 +73,11 @@ void AggregatorConfigurator::operatorAttributes(AggregatorOperator& op, CFG_VAL
bool AggregatorConfigurator::unit(UnitTemplate<AggregatorSensorBase>& u) {
if(u.isTopUnit()) {
LOG(error) << _operatorName << ": This operator type only supports flat units!";
LOG(error) << " " << _operatorName << ": This operator type only supports flat units!";
return false;
}
if(u.getOutputs().empty()) {
LOG(error) << _operatorName << ": At least one output sensor per unit must be defined!";
LOG(error) << " " << _operatorName << ": At least one output sensor per unit must be defined!";
return false;
}
return true;
......
......@@ -71,11 +71,11 @@ void JobAggregatorConfigurator::operatorAttributes(JobAggregatorOperator& op, CF
bool JobAggregatorConfigurator::unit(UnitTemplate<AggregatorSensorBase>& u) {
if(!u.isTopUnit()) {
LOG(error) << _operatorName << ": This operator type only supports hierarchical units!";
LOG(error) << " " << _operatorName << ": This operator type only supports hierarchical units!";
return false;
}
if(u.getOutputs().empty()) {
LOG(error) << _operatorName << ": At least one output sensor per unit must be defined!";
LOG(error) << " " << _operatorName << ": At least one output sensor per unit must be defined!";
return false;
}
return true;
......
......@@ -53,11 +53,11 @@ void FilesinkConfigurator::operatorAttributes(FilesinkOperator& op, CFG_VAL conf
bool FilesinkConfigurator::unit(UnitTemplate<FilesinkSensorBase>& u) {
if(u.isTopUnit()) {
LOG(error) << _operatorName << ": This operator type only supports flat units!";
LOG(error) << " " << _operatorName << ": This operator type only supports flat units!";
return false;
}
if(!u.getOutputs().empty()) {
LOG(error) << _operatorName << ": This is a file sink, no output sensors can be defined!";
LOG(error) << " " << _operatorName << ": This is a file sink, no output sensors can be defined!";
return false;
}
return true;
......
......@@ -114,11 +114,11 @@ void PerSystSqlConfigurator::operatorAttributes(PerSystSqlOperator& op, CFG_VAL
bool PerSystSqlConfigurator::unit(UnitTemplate<AggregatorSensorBase>& u) {
if(!u.isTopUnit()) {
LOG(error) << _operatorName << ": This operator type only supports hierarchical units!";
LOG(error) << " " << _operatorName << ": This operator type only supports hierarchical units!";
return false;
}
if(u.getOutputs().empty()) {
LOG(error) << _operatorName << ": At least one output sensor per unit must be defined!";
LOG(error) << " " << _operatorName << ": At least one output sensor per unit must be defined!";
return false;
}
return true;
......
......@@ -64,7 +64,7 @@ void RegressorConfigurator::operatorAttributes(RegressorOperator& op, CFG_VAL co
bool RegressorConfigurator::unit(UnitTemplate<RegressorSensorBase>& u) {
if(u.isTopUnit()) {
LOG(error) << _operatorName << ": This operator type only supports flat units!";
LOG(error) << " " << _operatorName << ": This operator type only supports flat units!";
return false;
}
bool targetSet=false;
......@@ -78,11 +78,11 @@ bool RegressorConfigurator::unit(UnitTemplate<RegressorSensorBase>& u) {
}
}
if(!targetSet) {
LOG(error) << _operatorName << ": No regression target was specified!";
LOG(error) << " " << _operatorName << ": No regression target was specified!";
return false;
}
if(u.getOutputs().size()!=1) {
LOG(error) << _operatorName << ": Only one output sensor per unit is allowed!";
LOG(error) << " " << _operatorName << ": Only one output sensor per unit is allowed!";
return false;
}
return true;
......
......@@ -161,11 +161,11 @@ void SMUCNGPerfConfigurator::operatorAttributes(SMUCNGPerfOperator& op, CFG_VAL
bool SMUCNGPerfConfigurator::unit(UnitTemplate<SMUCSensorBase>& u) {
if(u.isTopUnit()) {
LOG(error) << _operatorName << ": This operator type only supports flat units!";
LOG(error) << " " << _operatorName << ": This operator type only supports flat units!";
return false;
}
if(u.getOutputs().empty()) {
LOG(error) << _operatorName << ": At least one output sensor per unit must be defined!";
LOG(error) << " " << _operatorName << ": At least one output sensor per unit must be defined!";
return false;
}
return true;
......
......@@ -50,7 +50,7 @@ void TesterOperatorConfigurator::operatorAttributes(TesterOperator& op, CFG_VAL
bool TesterOperatorConfigurator::unit(UnitTemplate<SensorBase>& u) {
if(u.getOutputs().size()!=1) {
LOG(error) << _operatorName << ": Only one output sensor per unit is allowed!";
LOG(error) << " " << _operatorName << ": Only one output sensor per unit is allowed!";
return false;
}
else
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment