Commit 7cb4c7fb authored by Alessio Netti's avatar Alessio Netti
Browse files

Analytics: added Job Analyzer Configurator

parent 4a7f20b6
......@@ -414,53 +414,12 @@ protected:
}
}
}
// Reading all derived attributes, if any
analyzer(an, config);
// Instantiating units and returning the result
// Instantiating units
if(!an.getTemplate()) {
vector <shared_ptr<UnitTemplate<SBase>>> *units = NULL;
try {
units = _unitGen.generateUnits(protoInputs, protoOutputs, inputMode,
MQTTChecker::formatTopic(_mqttPrefix) + MQTTChecker::formatTopic(an.getMqttPart()),
!an.getStreaming(), an.getRelaxed());
}
catch (const std::exception &e) {
LOG(error) << _analyzerName << " " << an.getName() << ": Error when creating units: " << e.what();
delete units;
return false;
}
for (auto &u: *units) {
if(!constructSensorTopics(*u, an)) {
an.clearUnits();
delete units;
return false;
}
if (an.getStreaming()) {
if (!unit(*u)) {
LOG(error) << " Unit " << u->getName() << " did not pass the final check!";
an.clearUnits();
delete units;
return false;
} else {
LOG(debug) << " Unit " << u->getName() << " generated.";
an.addUnit(u);
}
} else {
if (unit(*u)) {
an.addToUnitCache(u);
LOG(debug) << " Template unit for on-demand operation " + u->getName() + " generated.";
} else {
LOG(error) << " Template unit " << u->getName() << " did not pass the final check!";
an.clearUnits();
delete units;
return false;
}
}
}
delete units;
return readUnits(an, protoInputs, protoOutputs, inputMode);
} else {
// If the analyzer is a template, we add it to the related map
auto ret = _templateAnalyzers.insert(std::pair<std::string, Analyzer*>(an.getName(), &an));
......@@ -470,6 +429,7 @@ protected:
}
_templateProtoInputs.insert(std::pair<std::string, std::vector<shared_ptr<SBase>>>(an.getName(), protoInputs));
}
return true;
}
......@@ -526,6 +486,65 @@ protected:
return true;
}
/**
* @brief Instantiates all necessary units for a single analyzer
*
* This method will create and assign all unit objects for a single analyzer, given a set
* of prototype input sensors, prototype output sensors and an input mode. This method is
* virtual such as to allow for flexibility in case specific analyzers require different
* assignment policies (such as job analyzers).
*
* @param an The analyzer whose units must be created
* @param protoInputs The vector of prototype input sensors
* @param protoOutputs The vector of prototype output sensors
* @param inputMode Input mode to be used (selective, all or all_recursive)
* @return True if successful, false otherwise
*/
virtual bool readUnits(Analyzer& an, std::vector<shared_ptr<SBase>>& protoInputs, std::vector<shared_ptr<SBase>>& protoOutputs, inputMode_t inputMode) {
vector <shared_ptr<UnitTemplate<SBase>>> *units = NULL;
try {
units = _unitGen.generateUnits(protoInputs, protoOutputs, inputMode,
MQTTChecker::formatTopic(_mqttPrefix) + MQTTChecker::formatTopic(an.getMqttPart()),
!an.getStreaming(), an.getRelaxed());
}
catch (const std::exception &e) {
LOG(error) << _analyzerName << " " << an.getName() << ": Error when creating units: " << e.what();
delete units;
return false;
}
for (auto &u: *units) {
if(!constructSensorTopics(*u, an)) {
an.clearUnits();
delete units;
return false;
}
if (an.getStreaming()) {
if (!unit(*u)) {
LOG(error) << " Unit " << u->getName() << " did not pass the final check!";
an.clearUnits();
delete units;
return false;
} else {
LOG(debug) << " Unit " << u->getName() << " generated.";
an.addUnit(u);
}
} else {
if (unit(*u)) {
an.addToUnitCache(u);
LOG(debug) << " Template unit for on-demand operation " + u->getName() + " generated.";
} else {
LOG(error) << " Template unit " << u->getName() << " did not pass the final check!";
an.clearUnits();
delete units;
return false;
}
}
}
delete units;
return true;
}
/**
* @brief Reads the global configuration block
......
//
// Created by Netti, Alessio on 07.06.19.
//
#ifndef PROJECT_JOBANALYZERCONFIGURATORTEMPLATE_H
#define PROJECT_JOBANALYZERCONFIGURATORTEMPLATE_H
#include "AnalyzerConfiguratorTemplate.h"
#include "JobAnalyzerTemplate.h"
/**
* Template that implements a configurator for Job analyzer plugins.
*
* This template expands the standard AnalyzerConfiguratorTemplate, with very few changes to accomodate the different
* design of job analyzers.
*/
template <class Analyzer, class SBase = SensorBase>
class JobAnalyzerConfiguratorTemplate : public AnalyzerConfiguratorTemplate<Analyzer, SBase> {
// Verifying the types of input classes
static_assert(std::is_base_of<SensorBase, SBase>::value, "SBase must derive from SensorBase!");
static_assert(std::is_base_of<AnalyzerInterface, Analyzer>::value, "Analyzer must derive from AnalyzerInterface!");
protected:
// For readability
using A_Ptr = std::shared_ptr<Analyzer>;
public:
/**
* @brief Class constructor
*/
JobAnalyzerConfiguratorTemplate() : AnalyzerConfiguratorTemplate<Analyzer, SBase>() {}
/**
* @brief Copy constructor is not available
*/
JobAnalyzerConfiguratorTemplate(const JobAnalyzerConfiguratorTemplate&) = delete;
/**
* @brief Assignment operator is not available
*/
JobAnalyzerConfiguratorTemplate& operator=(const JobAnalyzerConfiguratorTemplate&) = delete;
/**
* @brief Class destructor
*/
virtual ~JobAnalyzerConfiguratorTemplate() {}
protected:
/**
* @brief Instantiates all necessary units for a single (job) analyzer
*
* When using job analyzers, the only unit that is always instantiated is ALWAYS the template
* unit, similarly to ordinary analyzers in on-demand mode. Such unit then is used at runtime,
* even in streaming mode, to build dynamically all appropriate units for jobs that are
* currently running in the system.
*
* @param an The analyzer whose units must be created
* @param protoInputs The vector of prototype input sensors
* @param protoOutputs The vector of prototype output sensors
* @param inputMode Input mode to be used (selective, all or all_recursive)
* @return True if successful, false otherwise
*/
virtual bool readUnits(Analyzer& an, std::vector<shared_ptr<SBase>>& protoInputs, std::vector<shared_ptr<SBase>>& protoOutputs, inputMode_t inputMode) {
vector <shared_ptr<UnitTemplate<SBase>>> *units = NULL;
try {
units = this->_unitGen.generateUnits(protoInputs, protoOutputs, inputMode,
MQTTChecker::formatTopic(this->_mqttPrefix) + MQTTChecker::formatTopic(an.getMqttPart()),
true, an.getRelaxed());
}
catch (const std::exception &e) {
LOG(error) << this->_analyzerName << " " << an.getName() << ": Error when creating template job unit: " << e.what();
delete units;
return false;
}
shared_ptr<UnitTemplate<SBase>> jobUnit = *units[0];
delete units;
an.clearUnits();
if(!constructSensorTopics(*jobUnit, an))
return false;
if (unit(*jobUnit)) {
an.addToUnitCache(jobUnit);
LOG(debug) << " Template job unit " + jobUnit->getName() + " generated.";
} else {
LOG(error) << " Template job unit " << jobUnit->getName() << " did not pass the final check!";
return false;
}
return true;
}
// Logger object
boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
};
#endif //PROJECT_JOBANALYZERCONFIGURATORTEMPLATE_H
......@@ -32,7 +32,6 @@
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/info_parser.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <unordered_set>
/**
* Template that implements features needed by Job Analyzers and complying to AnalyzerInterface.
......@@ -160,11 +159,11 @@ public:
if(pop) {
this->_baseUnits.clear();
this->_units.clear();
unordered_set<string> unitSet;
//unordered_set<string> unitSet;
U_Ptr u;
while(_jobUnitsQueue.pop(u))
if(unitSet.insert(u->getName()))
addUnit(u);
//if(unitSet.insert(u->getName()))
addUnit(u);
return this->_baseUnits;
} else
return _dummyBaseUnits;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment