Commit 8ea5658d authored by Alessio Netti's avatar Alessio Netti

Analytics: flattening of hierarchical units

- For plugins that use hierarchical units (e.g., job analyzers) a
"flatten" plugin option was added. If enabled, output sensors in the
sub-units of each unit will be exposed as well, and their output values
propagated
- Minor fixes here and there
parent 572c6921
......@@ -225,7 +225,7 @@ std::string RegressorAnalyzer::getImportances() {
std::vector<ImportancePair> impLabels;
cv::Mat_<float> impValues;
_rForest->getVarImportance().convertTo(impValues, CV_32F);
if(impValues.empty() || _units.empty())
if(impValues.empty() || _units.empty() || impValues.total()!=REG_NUMFEATURES*_units[0]->getInputs().size())
return "Analyzer " + _name + ": error when computing feature importances.";
for(size_t idx=0; idx<impValues.total(); idx++) {
......
......@@ -421,6 +421,7 @@ protected:
// Reading all derived attributes, if any
analyzer(an, config);
an.setMqttPart(MQTTChecker::formatTopic(_mqttPrefix) + MQTTChecker::formatTopic(an.getMqttPart()));
// Instantiating units
if(!an.getTemplate()) {
return readUnits(an, protoInputs, protoOutputs, inputMode);
......@@ -509,9 +510,7 @@ protected:
virtual bool readUnits(Analyzer& an, std::vector<shared_ptr<SBase>>& protoInputs, std::vector<shared_ptr<SBase>>& protoOutputs, inputMode_t inputMode) {
vector <shared_ptr<UnitTemplate<SBase>>> *units = NULL;
try {
units = _unitGen.generateUnits(protoInputs, protoOutputs, inputMode,
MQTTChecker::formatTopic(_mqttPrefix) + MQTTChecker::formatTopic(an.getMqttPart()),
!an.getStreaming(), an.getRelaxed());
units = _unitGen.generateUnits(protoInputs, protoOutputs, inputMode, an.getMqttPart(), !an.getStreaming(), an.getRelaxed());
}
catch (const std::exception &e) {
LOG(error) << _analyzerName << " " << an.getName() << ": Error when creating units: " << e.what();
......
......@@ -84,6 +84,7 @@ public:
_streaming(true),
_sync(true),
_dynamic(false),
_flatten(false),
_unitID(-1),
_keepRunning(0),
_minValues(1),
......@@ -108,6 +109,7 @@ public:
_streaming(other._streaming),
_sync(other._sync),
_dynamic(other._dynamic),
_flatten(other._flatten),
_unitID(other._unitID),
_keepRunning(other._keepRunning),
_minValues(other._minValues),
......@@ -138,6 +140,7 @@ public:
_streaming = other._streaming;
_sync = other._sync;
_dynamic = other._dynamic;
_flatten = other._flatten;
_keepRunning = other._keepRunning;
_minValues = other._minValues;
_interval = other._interval;
......@@ -245,20 +248,21 @@ public:
virtual void printConfig(LOG_LEVEL ll) = 0;
// Getter methods
const string& getName() const { return _name; }
const string& getMqttPart() const { return _mqttPart; }
bool getTemplate() const { return _isTemplate; }
bool getRelaxed() const { return _relaxed; }
bool getSync() const { return _sync; }
bool getDuplicate() const { return _duplicate; }
bool getStreaming() const { return _streaming; }
unsigned getMinValues() const { return _minValues; }
unsigned getInterval() const { return _interval; }
unsigned getCacheSize() const { return _cacheSize; }
const string& getName() const { return _name; }
const string& getMqttPart() const { return _mqttPart; }
bool getTemplate() const { return _isTemplate; }
bool getRelaxed() const { return _relaxed; }
bool getSync() const { return _sync; }
bool getDuplicate() const { return _duplicate; }
bool getStreaming() const { return _streaming; }
unsigned getMinValues() const { return _minValues; }
unsigned getInterval() const { return _interval; }
unsigned getCacheSize() const { return _cacheSize; }
unsigned getUnitCacheLimit() const { return _unitCacheLimit; }
unsigned getDelayInterval() const { return _delayInterval; }
int getUnitID() const { return _unitID; }
bool getDynamic() { return _dynamic; }
unsigned getDelayInterval() const { return _delayInterval; }
int getUnitID() const { return _unitID; }
bool getDynamic() const { return _dynamic; }
bool getFlatten() const { return _flatten; }
// Setter methods
void setName(const string& name) { _name = name; }
......@@ -306,6 +310,8 @@ protected:
bool _sync;
// Indicates whether the analyzer generates units dynamically at runtime, or only at initialization
bool _dynamic;
// Indicates whether the analyzer generates hierarchical units that must be flattened (their sub-units exposed)
bool _flatten;
// ID of the units this analyzer works on
int _unitID;
// Determines if the analyzer can keep running or must terminate
......
......@@ -136,7 +136,7 @@ public:
*/
virtual void printConfig(LOG_LEVEL ll) override {
if(_mqttPart!="")
LOG_VAR(ll) << " MQTT part: " << _mqttPart;
LOG_VAR(ll) << " MQTT prefix: " << _mqttPart;
LOG_VAR(ll) << " Sync readings: " << (_sync ? "enabled" : "disabled");
LOG_VAR(ll) << " Streaming mode: " << (_streaming ? "enabled" : "disabled");
LOG_VAR(ll) << " Duplicated mode: " << (_duplicate ? "enabled" : "disabled");
......@@ -206,10 +206,8 @@ public:
virtual void init(boost::asio::io_service& io) override {
AnalyzerInterface::init(io);
for(const auto u : _units) {
for(const auto s : u->getOutputs())
s->initSensor(_cacheSize);
}
for(const auto u : _units)
u->init(_cacheSize);
}
/**
......@@ -300,14 +298,12 @@ public:
throw std::runtime_error("No template unit in analyzer " + _name + "!");
LOG(debug) << "Analyzer " << _name << ": cache miss for unit " << node << ".";
U_Ptr uTemplate = _unitCache->at(SensorNavigator::templateKey);
tempUnit = unitGen.generateUnit(node, uTemplate->getInputs(), uTemplate->getOutputs(), uTemplate->getInputMode(), "", _relaxed);
tempUnit = unitGen.generateUnit(node, uTemplate->getInputs(), uTemplate->getOutputs(), uTemplate->getInputMode(), _mqttPart, _relaxed);
addToUnitCache(tempUnit);
}
// Initializing sensors if necessary
for (const auto s : tempUnit->getOutputs())
if (!s->isInit())
s->initSensor(_cacheSize);
tempUnit->init(_cacheSize);
compute(tempUnit);
for (const auto &o : tempUnit->getOutputs()) {
......@@ -345,7 +341,7 @@ public:
* overall overhead. The cache has a limited size: once this size is reached, at every insertion
* the oldest entry in the cache is removed.
*
* @param unit Shared pointer to the Unit objecy to be added to the cache
* @param unit Shared pointer to the Unit object to be added to the cache
*/
void addToUnitCache(U_Ptr unit) {
if(!_unitCache) {
......@@ -376,7 +372,7 @@ public:
*
*/
virtual void collapseUnits() {
if(_unitID < 0 || _units.empty()) {
if (_unitID < 0 || _units.empty()) {
LOG(error) << "Analyzer " << _name << ": Cannot collapse units!";
return;
}
......
......@@ -158,6 +158,13 @@ public:
outMap.insert(make_pair(o->getName(), o->getLatestValue()));
o->clearReadingQueue();
}
if(this->_flatten) {
for (const auto& su : jobUnit->getSubUnits())
for (const auto &o : su->getOutputs()) {
outMap.insert(make_pair(o->getName(), o->getLatestValue()));
o->clearReadingQueue();
}
}
} else
throw std::runtime_error("Analyzer " + this->_name + ": cannot retrieve job data!");
} catch(const exception& e) {
......@@ -167,11 +174,18 @@ public:
this->_onDemandLock.store(false);
} else if( this->_keepRunning ) {
bool found = false;
for(const auto& u : getUnits())
//Spinning explicitly as we need to iterate on the derived Unit objects
while(_unitAccess.exchange(true)) {}
for(const auto& u : this->_units)
if(u->getName() == node) {
found = true;
for(const auto& o : u->getBaseOutputs())
outMap.insert(make_pair(o->getName(), o->getLatestValue()));
if(this->_flatten) {
for (const auto& su : u->getSubUnits())
for (const auto &o : su->getOutputs())
outMap.insert(make_pair(o->getName(), o->getLatestValue()));
}
}
releaseUnits();
......@@ -231,13 +245,13 @@ protected:
vector<string> nodes;
for (const auto &n : jobData.nodes)
nodes.push_back(translateNodeName(n));
jobUnit = unitGen.generateJobUnit(jobTopic, nodes, uTemplate->getInputs(), uTemplate->getOutputs(), uTemplate->getInputMode(), jobTopic, this->_relaxed);
// The job unit is generated as a hierarchical unit with the top level unit and the sub-units having
// the same set of output sensors
jobUnit = unitGen.generateHierarchicalUnit(jobTopic, nodes, uTemplate->getOutputs(), uTemplate->getInputs(),
uTemplate->getOutputs(), uTemplate->getInputMode(), jobTopic, this->_relaxed);
// Initializing sensors if necessary
for (const auto s : jobUnit->getOutputs())
if (!s->isInit())
s->initSensor(this->_cacheSize);
jobUnit->init(this->_cacheSize, this->_flatten);
this->addToUnitCache(jobUnit);
}
return jobUnit;
......@@ -300,8 +314,13 @@ protected:
while(_unitAccess.exchange(true)) {}
this->clearUnits();
for(const auto& ju : _tempUnits)
if(ju)
if(ju) {
this->addUnit(ju);
if(this->_flatten) {
for (const auto& su : ju->getSubUnits())
this->addUnit(su);
}
}
_unitAccess.store(false);
_tempUnits.clear();
}
......
......@@ -161,28 +161,7 @@ public:
}
return sensors;
}
/**
* @brief Resolves and formats a string encoding a tree level in function of a job id
*
* This method takes as input strings in the format specified for parseNodeLevelString(). It then
* takes as input also a job Id. The method will then return the one sensor encoded by s that is
* associated to the input job ID. This method is meant to be a simplified version of
* resolveNodeLevelString.
*
* @param s String to be parsed
* @param job Job ID to be used
* @return Resulting sensor name associated to the input job id
*/
string resolveJobString(const string& s, const string& job) {
string newName = s;
if(!boost::regex_search(s.c_str(), _match, _blockRx))
throw invalid_argument("JobUnitGenerator: sensor string is incorrectly formatted!");
else
newName = boost::regex_replace(newName, _blockRx, job);
return newName;
}
/**
* @brief Computes and instantiates units associated to the input analyzer
*
......@@ -268,7 +247,7 @@ public:
* @param relaxed If True, checks on the existence of input sensors are ignored
* @return A shared pointer to the generated unit object
*/
shared_ptr<UnitTemplate<SBase>> generateUnit(const string& u, vector<shared_ptr<SBase>>& inputs, vector<shared_ptr<SBase>>& outputs,
shared_ptr<UnitTemplate<SBase>> generateUnit(const string& u, vector<shared_ptr<SBase>>& inputs, vector<shared_ptr<SBase>>& outputs,
inputMode_t inputMode, string mqttPrefix="", bool relaxed=false) {
// If no outputs are defined, no units can be instantiated
......@@ -287,57 +266,59 @@ public:
}
/**
* @brief Computes and instantiates a job unit
* @brief Computes and instantiates a hierarchical unit
*
* Job units slightly differ from conventional units. First, they are hierarchical: the top
* unit is related to the job itself, and contains all job-related outputs that are
* eventually propagated. This unit does not have inputs, as in, sensors. Then, there are as
* many children units as nodes on which the job was executed: for each of these, all of the
* inputs that were specified are present, together with node-related outputs.
* Hierarchical units differ from conventional units. First, they have two levels: s top
* unit which contains all "global" outputs that are eventually propagated. This unit does
* not have inputs, as in, sensors. Then, there are as many children units as specified
* tree nodes: for each of these, all of the inputs that were specified are present,
* together with node-related outputs.
*
* @param j Job identifier used to name the unit
* @param nodes Vector of node identifiers on which the job was executed
* @param inputs The vector of "prototype" sensor objects for inputs
* @param outputs The vector of "prototype" sensor objects for outputs
* @param name Identifier used to name the unit
* @param nodes Vector of node identifiers used to build the sub-units
* @param outputs The vector of "prototype" sensor objects for the top level unit's outputs
* @param subInputs The vector of "prototype" sensor objects for the sub-units' inputs
* @param subOutputs The vector of "prototype" sensor objects for the sub-units' outputs
* @param inputMode Defines the method with which input sensors are instantiated for each unit
* @param mqttPrefix MQTT prefix to use for the output sensors at the job level
* @param mqttPrefix MQTT prefix to use for the output sensors at the top level
* @param relaxed If True, checks on the existence of input sensors are ignored
* @return A shared pointers to the generated job unit object
* @return A shared pointers to the generated hierarchical unit object
*/
shared_ptr<UnitTemplate<SBase>> generateJobUnit(const string& j, vector<std::string>& nodes, vector<shared_ptr<SBase>>& inputs,
vector<shared_ptr<SBase>>& outputs, inputMode_t inputMode, string mqttPrefix="", bool relaxed=false) {
shared_ptr<UnitTemplate<SBase>> generateHierarchicalUnit(const string& name, vector<std::string>& nodes,
vector<shared_ptr<SBase>>& outputs, vector<shared_ptr<SBase>>& subInputs, vector<shared_ptr<SBase>>& subOutputs,
inputMode_t inputMode, string mqttPrefix="", bool relaxed=false) {
// If no outputs are defined, no units can be instantiated
if((inputs.size()==0 && inputMode==SELECTIVE) || outputs.size() == 0)
throw invalid_argument("JobUnitGenerator: Invalid inputs or outputs!");
if((subInputs.size()==0 && inputMode==SELECTIVE) || subOutputs.size()==0)
throw invalid_argument("HiUnitGenerator: Invalid inputs or outputs!");
// Output sensors must share the same unit pattern
if(!isConsistent(outputs))
throw invalid_argument("JobUnitGenerator: Incoherent output levels!");
if(!isConsistent(subOutputs))
throw invalid_argument("HiUnitGenerator: Incoherent output levels!");
shared_ptr<UnitTemplate<SBase>> jobUnit = make_shared<UnitTemplate<SBase>>(j);
shared_ptr<UnitTemplate<SBase>> topUnit = make_shared<UnitTemplate<SBase>>(name);
try {
for (const auto &nodeName : nodes) {
// The unit specified as input must belong to the domain of the outputs
if (!nodeBelongsToPattern(nodeName, outputs[0]->getName()))
throw domain_error("JobUnitGenerator: Node " + nodeName + " does not belong to this unit domain!");
jobUnit->addSubUnit(_generateUnit(nodeName, inputs, outputs, inputMode, mqttPrefix, relaxed));
if (!nodeBelongsToPattern(nodeName, subOutputs[0]->getName()))
throw domain_error("HiUnitGenerator: Node " + nodeName + " does not belong to this unit domain!");
topUnit->addSubUnit(_generateUnit(nodeName, subInputs, subOutputs, inputMode, mqttPrefix, relaxed));
}
} catch(const invalid_argument& e) {
jobUnit->clear();
throw invalid_argument("JobUnitGenerator: One or more node sub-units for job " + j + " could not be generated!");
topUnit->clear();
throw invalid_argument("HiUnitGenerator: One or more node sub-units for unit" + name + " could not be generated!");
}
// Mapping outputs
for(const auto& out : outputs) {
shared_ptr<SBase> uOut = make_shared<SBase>(*out);
uOut->setName(resolveJobString(uOut->getName(), j));
uOut->setMqtt(MQTTChecker::formatTopic(j) + MQTTChecker::formatTopic(uOut->getMqtt()));
uOut->setMqtt(MQTTChecker::formatTopic(mqttPrefix) + MQTTChecker::formatTopic(name) + MQTTChecker::formatTopic(uOut->getMqtt()));
uOut->setName(uOut->getMqtt());
// Duplicating the file sink adding the name of each unit to the path
if(uOut->getSinkPath()!="")
uOut->setSinkPath(MQTTChecker::topicToFile(uOut->getMqtt(), uOut->getSinkPath()));
jobUnit->addOutput(uOut);
topUnit->addOutput(uOut);
}
return jobUnit;
return topUnit;
}
/**
......@@ -415,6 +396,7 @@ protected:
for (const auto &s : *sensors) {
if (!addedSensors.count(s)) {
SBase uIn(*in);
uIn.setMqtt(_navi->getNodeTopic(s));
uIn.setName(s);
if (!(_navi->sensorExists(uIn.getName()) || relaxed)) {
delete sensors;
......@@ -443,8 +425,8 @@ protected:
// Mapping outputs
for(const auto& out : outputs) {
SBase uOut(*out);
// Checking if the unit's node exists (throws an exception if it doesn't)
sensors = resolveNodeLevelString(uOut.getName(), u);
uOut.setName(*sensors->begin());
delete sensors;
// If we are instantiating output sensors by unit, we generate mqtt topics by using the prefix
// associated to the respective node in the sensor tree, and the sensor suffix itself
......@@ -453,6 +435,8 @@ protected:
// If we are not using units (only unit is root, out of the hierarchy) we build sensors like in samplers
else
uOut.setMqtt(MQTTChecker::formatTopic(mqttPrefix) + MQTTChecker::formatTopic(uOut.getMqtt()));
// Setting the name back to the MQTT topic
uOut.setName(uOut.getMqtt());
// Duplicating the file sink adding the name of each unit to the path
if(uOut.getSinkPath()!="")
uOut.setSinkPath(MQTTChecker::topicToFile(uOut.getMqtt(), uOut.getSinkPath()));
......
......@@ -56,6 +56,14 @@ public:
*/
virtual ~UnitInterface() {}
/**
* @brief Initializes the sensors in the unit
*
* @param cacheSize size of the sensor cache
* @param flatten if true, sensors in sub-units are also initialized
*/
virtual void init(unsigned int cacheSize, bool flatten=false) = 0;
/**
* @brief Sets the name of this unit
*
......
......@@ -153,6 +153,24 @@ public:
_subUnits.clear();
}
/**
* @brief Initializes the sensors in the unit
*
* @param cacheSize size of the sensor cache
* @param flatten if true, sensors in sub-units are also initialized
*/
void init(unsigned int cacheSize, bool flatten=false) override {
for(const auto s : _outputs)
if (!s->isInit())
s->initSensor(cacheSize);
if(flatten) {
for (const auto &su : _subUnits)
for (const auto s : su->getOutputs())
if (!s->isInit())
s->initSensor(cacheSize);
}
}
/**
* @brief Sets the name of this unit
*
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment