Commit 501d19f6 authored by Alessio Netti's avatar Alessio Netti

Analytics: refactored OperatorConfiguratorTemplate (WIP)

- The configurator now accounts for hierarchical units
- Added checks in existing plugins to enforce flat/hierarchical units
- DISCLAIMER: this has likely broken the framework until I refactor OperatorTemplate
parent 9ed2230e
......@@ -84,36 +84,28 @@ protected:
* even in streaming mode, to build dynamically all appropriate units for jobs that are
* currently running in the system.
*
* @param op The operator whose units must be created
* @param protoInputs The vector of prototype input sensors
* @param protoOutputs The vector of prototype output sensors
* @param inputMode Input mode to be used (selective, all or all_recursive)
* @return True if successful, false otherwise
* @param op The operator whose units must be created
* @param protoInputs The vector of prototype input sensors
* @param protoOutputs The vector of prototype output sensors
* @param protoGlobalOutputs The vector of prototype global output sensors, if any
* @param inputMode Input mode to be used (selective, all or all_recursive)
* @return True if successful, false otherwise
*/
virtual bool readUnits(Operator& op, std::vector<shared_ptr<SBase>>& protoInputs, std::vector<shared_ptr<SBase>>& protoOutputs, inputMode_t inputMode) {
virtual bool readUnits(Operator& op, std::vector<shared_ptr<SBase>>& protoInputs, std::vector<shared_ptr<SBase>>& protoOutputs,
std::vector<shared_ptr<SBase>>& protoGlobalOutputs, inputMode_t inputMode) {
// Forcing the job operator to not be duplicated
op.setDuplicate(false);
vector <shared_ptr<UnitTemplate<SBase>>> *units = NULL;
shared_ptr<UnitTemplate<SBase>> jobUnit;
try {
units = this->_unitGen.generateUnits(std::list<std::string>(), protoInputs, protoOutputs, inputMode, op.getMqttPart(), true, false, op.getRelaxed());
jobUnit = this->_unitGen.generateHierarchicalUnit(SensorNavigator::rootKey, std::list<std::string>(), protoGlobalOutputs,
protoInputs, protoOutputs, inputMode, op.getMqttPart(), true, op.getEnforceTopics(), op.getRelaxed());
}
catch (const std::exception &e) {
LOG(error) << this->_operatorName << " " << op.getName() << ": Error when creating template job unit: " << e.what();
delete units;
return false;
}
if(units->size() > 1) {
LOG(error) << this->_operatorName << " " << op.getName() << ": Invalid job template unit, please check your configuration!";
delete units;
return false;
}
shared_ptr<UnitTemplate<SBase>> jobUnit = units->at(0);
delete units;
op.clearUnits();
//if(!this->constructSensorTopics(*jobUnit, op))
// return false;
if (this->unit(*jobUnit)) {
op.addToUnitCache(jobUnit);
LOG(debug) << " Template job unit " + jobUnit->getName() + " generated.";
......
......@@ -72,8 +72,11 @@ protected:
const char DASH = '-';
// Keywords used to identify input and output sensor blocks
const string INPUT_BLOCK = "input";
const string OUTPUT_BLOCK = "output";
const string INPUT_BLOCK_LEGACY = "input";
const string OUTPUT_BLOCK_LEGACY = "output";
const string INPUT_BLOCK = "unitInput";
const string OUTPUT_BLOCK = "unitOutput";
const string GLOBAL_OUTPUT_BLOCK = "globalOutput";
const string ALL_CLAUSE = "all";
const string ALL_REC_CLAUSE = "all-recursive";
......@@ -353,7 +356,7 @@ protected:
*/
bool readOperator(Operator& op, CFG_VAL config) {
// Vectors containing "prototype" inputs and outputs to be modified with the actual compute units
std::vector<shared_ptr<SBase>> protoInputs, protoOutputs;
std::vector<shared_ptr<SBase>> protoInputs, protoOutputs, protoGlobalOutputs;
inputMode_t inputMode = SELECTIVE;
// Check for the existence of a template definition to initialize the operator
boost::optional<boost::property_tree::iptree&> def = config.get_child_optional("default");
......@@ -383,6 +386,8 @@ protected:
op.setMinValues(stoull(val.second.data()));
} else if (boost::iequals(val.first, "mqttPart")) {
op.setMqttPart(val.second.data());
} else if (boost::iequals(val.first, "enforceTopics")) {
op.setEnforceTopics(to_bool(val.second.data()));
} else if (boost::iequals(val.first, "sync")) {
op.setSync(to_bool(val.second.data()));
} else if (boost::iequals(val.first, "disabled")) {
......@@ -397,8 +402,8 @@ protected:
op.setUnitCacheLimit(stoull(val.second.data()));
} else if (boost::iequals(val.first, "streaming")) {
op.setStreaming(to_bool(val.second.data()));
} else if (boost::iequals(val.first, INPUT_BLOCK) || boost::iequals(val.first, OUTPUT_BLOCK)) {
// Instantiating all sensors contained within the "input" or "output" block
} else if (isInputBlock(val.first) || isOutputBlock(val.first) || isGlobalOutputBlock(val.first)) {
// Instantiating all sensors contained within the "unitInput", "unitOutput" or "globalOutput" block
BOOST_FOREACH(boost::property_tree::iptree::value_type &valInner, val.second)
{
if (boost::iequals(valInner.first, _baseName)) {
......@@ -406,13 +411,18 @@ protected:
SBase sensor(valInner.second.data());
if (readSensorBase(sensor, valInner.second, false)) {
shared_ptr<SBase> sensorPtr = make_shared<SBase>(sensor);
val.first==INPUT_BLOCK ? protoInputs.push_back(sensorPtr) : protoOutputs.push_back(sensorPtr);
if(isInputBlock(val.first))
protoInputs.push_back(sensorPtr);
else if(isOutputBlock(val.first))
protoOutputs.push_back(sensorPtr);
else
protoGlobalOutputs.push_back(sensorPtr);
} else {
LOG(warning) << "I/O " << _baseName << " " << op.getName() << "::" << sensor.getName() << " could not be read! Omitting";
}
// An "all" or "all-recursive" statement in the input block causes all sensors related to the specific
// unit to be picked
} else if (boost::iequals(val.first, INPUT_BLOCK) && (boost::iequals(valInner.first, ALL_CLAUSE) || boost::iequals(valInner.first, ALL_REC_CLAUSE))) {
// An "all" or "all-recursive" statement in the input block causes all sensors related to the specific
// unit to be picked
} else if (isInputBlock(val.first) && (boost::iequals(valInner.first, ALL_CLAUSE) || boost::iequals(valInner.first, ALL_REC_CLAUSE))) {
inputMode = boost::iequals(valInner.first, ALL_CLAUSE) ? ALL : ALL_RECURSIVE;
} else {
LOG(error) << "\"" << valInner.first << "\": unknown I/O construct!";
......@@ -427,7 +437,7 @@ protected:
// Instantiating units
if(!op.getTemplate()) {
op.setMqttPart(MQTTChecker::formatTopic(_mqttPrefix) + MQTTChecker::formatTopic(op.getMqttPart()));
return readUnits(op, protoInputs, protoOutputs, inputMode);
return readUnits(op, protoInputs, protoOutputs, protoGlobalOutputs, inputMode);
} else {
// If the operator is a template, we add it to the related map
auto ret = _templateOperators.insert(std::pair<std::string, Operator*>(op.getName(), &op));
......@@ -502,19 +512,25 @@ protected:
* virtual such as to allow for flexibility in case specific operators require different
* assignment policies (such as job operators).
*
* @param op The operator whose units must be created
* @param protoInputs The vector of prototype input sensors
* @param protoOutputs The vector of prototype output sensors
* @param inputMode Input mode to be used (selective, all or all_recursive)
* @return True if successful, false otherwise
* @param op The operator whose units must be created
* @param protoInputs The vector of prototype input sensors
* @param protoOutputs The vector of prototype output sensors
* @param protoGlobalOutputs The vector of prototype global output sensors, if any
* @param inputMode Input mode to be used (selective, all or all_recursive)
* @return True if successful, false otherwise
*/
virtual bool readUnits(Operator& op, std::vector<shared_ptr<SBase>>& protoInputs, std::vector<shared_ptr<SBase>>& protoOutputs, inputMode_t inputMode) {
virtual bool readUnits(Operator& op, std::vector<shared_ptr<SBase>>& protoInputs, std::vector<shared_ptr<SBase>>& protoOutputs,
std::vector<shared_ptr<SBase>>& protoGlobalOutputs, inputMode_t inputMode) {
vector <shared_ptr<UnitTemplate<SBase>>> *units = NULL;
if(protoOutputs.empty())
LOG(debug) << " No output specified, generating sink unit.";
// If we employ a hierarchical unit (which will be the root unit) we disable duplication
if(!protoGlobalOutputs.empty())
op.setDuplicate(false);
try {
units = _unitGen.generateUnits(std::list<std::string>(), protoInputs, protoOutputs, inputMode, op.getMqttPart(), !op.getStreaming(), false, op.getRelaxed());
units = _unitGen.generateAutoUnit(SensorNavigator::rootKey, std::list<std::string>(), protoGlobalOutputs, protoInputs,
protoOutputs, inputMode, op.getMqttPart(), !op.getStreaming(), op.getEnforceTopics(), op.getRelaxed());
}
catch (const std::exception &e) {
LOG(error) << _operatorName << " " << op.getName() << ": Error when creating units: " << e.what();
......@@ -594,8 +610,41 @@ protected:
// Performing name construction
for(auto& s: u.getOutputs())
s->setName(s->getMqtt());
for(auto& subUnit: u.getSubUnits())
for(auto& s : subUnit->getOutputs())
s->setName(s->getMqtt());
return true;
}
/**
* @brief Returns true if the input string describes an input block
*
* @param s The string to be checked
* @return True if s is a input block, false otherwise
*/
bool isInputBlock(const std::string& s ) {
return boost::iequals(s, INPUT_BLOCK) || boost::iequals(s, INPUT_BLOCK_LEGACY);
}
/**
* @brief Returns true if the input string describes an output block
*
* @param s The string to be checked
* @return True if s is a output block, false otherwise
*/
bool isOutputBlock(const std::string& s ) {
return boost::iequals(s, OUTPUT_BLOCK) || boost::iequals(s, OUTPUT_BLOCK_LEGACY);
}
/**
* @brief Returns true if the input string describes a global output block
*
* @param s The string to be checked
* @return True if s is a global output block, false otherwise
*/
bool isGlobalOutputBlock(const std::string& s ) {
return boost::iequals(s, GLOBAL_OUTPUT_BLOCK);
}
// Instance of a QueryEngine object
QueryEngine& _queryEngine;
......
......@@ -273,7 +273,7 @@ public:
void setMqttPart(const string& mqttPart) { _mqttPart = mqttPart; }
void setTemplate(bool t) { _isTemplate = t; }
void setRelaxed(bool r) { _relaxed = r; }
void setEnforcetopics(bool e) { _enforceTopics = e; }
void setEnforceTopics(bool e) { _enforceTopics = e; }
void setSync(bool sync) { _sync = sync; }
void setUnitID(int u) { _unitID = u; }
void setStreaming(bool streaming) { _streaming = streaming; }
......
......@@ -192,6 +192,8 @@ public:
if (tUnit->isTopUnit()) {
if (tUnit->getSubUnits().size() != 1 || tUnit->getBaseOutputs().empty())
throw invalid_argument("UnitGenerator: hierarchical template unit is malformed!");
if(subNames.empty() && u!=SensorNavigator::rootKey)
throw invalid_argument("UnitGenerator: only root unit is supported for this template type!");
shared_ptr <UnitTemplate<SBase>> subUnit = tUnit->getSubUnits()[0];
return generateHierarchicalUnit(u, subNames, tUnit->getBaseOutputs(), subUnit->getBaseInputs(),
......
......@@ -71,4 +71,14 @@ void AggregatorConfigurator::operatorAttributes(AggregatorOperator& op, CFG_VAL
}
}
bool AggregatorConfigurator::unit(UnitTemplate<AggregatorSensorBase>& u) { return true; }
bool AggregatorConfigurator::unit(UnitTemplate<AggregatorSensorBase>& u) {
if(u.isTopUnit()) {
LOG(error) << _operatorName << ": This operator type only supports flat units!";
return false;
}
if(u.getOutputs().empty()) {
LOG(error) << _operatorName << ": At least one output sensor per unit must be defined!";
return false;
}
return true;
}
......@@ -70,6 +70,10 @@ void JobAggregatorConfigurator::operatorAttributes(JobAggregatorOperator& op, CF
}
bool JobAggregatorConfigurator::unit(UnitTemplate<AggregatorSensorBase>& u) {
if(!u.isTopUnit()) {
LOG(error) << _operatorName << ": This operator type only supports hierarchical units!";
return false;
}
if(u.getOutputs().empty()) {
LOG(error) << _operatorName << ": At least one output sensor per unit must be defined!";
return false;
......
......@@ -52,6 +52,10 @@ void FilesinkConfigurator::operatorAttributes(FilesinkOperator& op, CFG_VAL conf
}
bool FilesinkConfigurator::unit(UnitTemplate<FilesinkSensorBase>& u) {
if(u.isTopUnit()) {
LOG(error) << _operatorName << ": This operator type only supports flat units!";
return false;
}
if(!u.getOutputs().empty()) {
LOG(error) << _operatorName << ": This is a file sink, no output sensors can be defined!";
return false;
......
......@@ -113,12 +113,22 @@ void PerSystSqlConfigurator::operatorAttributes(PerSystSqlOperator& op, CFG_VAL
}
bool PerSystSqlConfigurator::unit(UnitTemplate<AggregatorSensorBase>& u) {
if(!u.isTopUnit()) {
LOG(error) << _operatorName << ": This operator type only supports hierarchical units!";
return false;
}
if(u.getOutputs().empty()) {
LOG(error) << _operatorName << ": At least one output sensor per unit must be defined!";
return false;
}
return true;
}
bool PerSystSqlConfigurator::readUnits(PerSystSqlOperator& op,
std::vector<shared_ptr<AggregatorSensorBase>>& protoInputs,
std::vector<shared_ptr<AggregatorSensorBase>>& protoOutputs, inputMode_t inputMode) {
std::vector<shared_ptr<AggregatorSensorBase>>& protoOutputs,
std::vector<shared_ptr<AggregatorSensorBase>>& protoGlobalOutputs,
inputMode_t inputMode) {
int num_quantiles = op.getNumberOfEvenQuantiles();
......@@ -127,7 +137,7 @@ bool PerSystSqlConfigurator::readUnits(PerSystSqlOperator& op,
}
bool quantile_found = false;
AggregatorSensorBase quantsensor("");
for(auto &sensor: protoOutputs){
for(auto &sensor: protoGlobalOutputs) {
if(sensor->getOperation() == AggregatorSensorBase::QTL){
quantile_found = true;
quantsensor = *(sensor.get());
......@@ -138,13 +148,13 @@ bool PerSystSqlConfigurator::readUnits(PerSystSqlOperator& op,
}
}
for(int i = 1; i <= num_quantiles && quantile_found; ++i){
for(int i = 1; i <= num_quantiles && quantile_found; ++i) {
auto outputSensor = std::make_shared<AggregatorSensorBase>(quantsensor);
outputSensor->setMqtt(outputSensor->getMqtt() + std::to_string(i));
outputSensor->setOperation(AggregatorSensorBase::QTL);
outputSensor->setPercentile(i);
protoOutputs.push_back(outputSensor);
protoGlobalOutputs.push_back(outputSensor);
}
return JobOperatorConfiguratorTemplate::readUnits(op, protoInputs, protoOutputs, inputMode);
return JobOperatorConfiguratorTemplate::readUnits(op, protoInputs, protoOutputs, protoGlobalOutputs, inputMode);
}
......@@ -43,7 +43,8 @@ private:
void sensorBase(AggregatorSensorBase& s, CFG_VAL config) override;
void operatorAttributes(PerSystSqlOperator& op, CFG_VAL config) override;
bool unit(UnitTemplate<AggregatorSensorBase>& u) override;
bool readUnits(PerSystSqlOperator& op, std::vector<shared_ptr<AggregatorSensorBase>>& protoInputs, std::vector<shared_ptr<AggregatorSensorBase>>& protoOutputs, inputMode_t inputMode) override;
bool readUnits(PerSystSqlOperator& op, std::vector<shared_ptr<AggregatorSensorBase>>& protoInputs, std::vector<shared_ptr<AggregatorSensorBase>>& protoOutputs,
std::vector<shared_ptr<AggregatorSensorBase>>& protoGlobalOutputs, inputMode_t inputMode) override;
};
extern "C" OperatorConfiguratorInterface* create() {
......
......@@ -62,7 +62,11 @@ void RegressorConfigurator::operatorAttributes(RegressorOperator& op, CFG_VAL co
}
}
bool RegressorConfigurator::unit(UnitTemplate<RegressorSensorBase>& u) {
bool RegressorConfigurator::unit(UnitTemplate<RegressorSensorBase>& u) {
if(u.isTopUnit()) {
LOG(error) << _operatorName << ": This operator type only supports flat units!";
return false;
}
bool targetSet=false;
for(const auto& in : u.getInputs())
if(in->getTrainingTarget()) {
......
......@@ -160,5 +160,13 @@ void SMUCNGPerfConfigurator::operatorAttributes(SMUCNGPerfOperator& op, CFG_VAL
}
bool SMUCNGPerfConfigurator::unit(UnitTemplate<SMUCSensorBase>& u) {
if(u.isTopUnit()) {
LOG(error) << _operatorName << ": This operator type only supports flat units!";
return false;
}
if(u.getOutputs().empty()) {
LOG(error) << _operatorName << ": At least one output sensor per unit must be defined!";
return false;
}
return true;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment