Commit a3f138ad authored by Carla Guillen's avatar Carla Guillen
Browse files

Automatic merge from git

Merge branch 'development' of https://gitlab.lrz.de/dcdb/dcdb into development
parents 2c32cf2d 0d3ebfb8
......@@ -283,7 +283,7 @@ bool OperatorManager::checkTopics(op_dl_t p) {
MQTTChecker& mqttCheck = MQTTChecker::getInstance();
bool validTopics=true;
for(const auto& op : p.configurator->getOperators()) {
if (!mqttCheck.checkGroup(op->getName()) && !(op->getStreaming() && op->getDuplicate()))
if (!(op->getStreaming() && op->getDuplicate()) && !mqttCheck.checkGroup(op->getName()))
validTopics = false;
if (op->getStreaming()) {
for (const auto &u : op->getUnits())
......
This diff is collapsed.
......@@ -9,7 +9,7 @@ aggregator avg1 {
default def1
window 2000
input {
unitInput {
sensor "<bottomup>col_user"
......@@ -17,10 +17,9 @@ window 2000
}
output {
globalOutput {
; In this case "bottomup 1" is the sensor tree level associated to compute nodes
sensor "<bottomup 1>sum" {
sensor sum {
mqttsuffix /sum
operation sum
}
......
......@@ -14,20 +14,25 @@ default def1
window 2000
relative false
input {
unitInput {
sensor "<bottomup>col_user"
sensor "<topdown>MemFree"
sensor "<bottomup, filter cpu>col_user"
sensor "<bottomup 1>MemFree"
}
output {
unitOutput {
sensor "<bottomup>queries" {
mqttsuffix /queries
}
}
; If globalOutput is defined, all units described by the unitInput and unitOutput blocks will be grouped
; in a single hierarchical unit
globalOutput {
sensor totalQueries {
mqttsuffix /totalQueries
}
}
}
......
......@@ -84,36 +84,28 @@ protected:
* even in streaming mode, to build dynamically all appropriate units for jobs that are
* currently running in the system.
*
* @param op The operator whose units must be created
* @param protoInputs The vector of prototype input sensors
* @param protoOutputs The vector of prototype output sensors
* @param inputMode Input mode to be used (selective, all or all_recursive)
* @return True if successful, false otherwise
* @param op The operator whose units must be created
* @param protoInputs The vector of prototype input sensors
* @param protoOutputs The vector of prototype output sensors
* @param protoGlobalOutputs The vector of prototype global output sensors, if any
* @param inputMode Input mode to be used (selective, all or all_recursive)
* @return True if successful, false otherwise
*/
virtual bool readUnits(Operator& op, std::vector<shared_ptr<SBase>>& protoInputs, std::vector<shared_ptr<SBase>>& protoOutputs, inputMode_t inputMode) {
virtual bool readUnits(Operator& op, std::vector<shared_ptr<SBase>>& protoInputs, std::vector<shared_ptr<SBase>>& protoOutputs,
std::vector<shared_ptr<SBase>>& protoGlobalOutputs, inputMode_t inputMode) {
// Forcing the job operator to not be duplicated
op.setDuplicate(false);
vector <shared_ptr<UnitTemplate<SBase>>> *units = NULL;
shared_ptr<UnitTemplate<SBase>> jobUnit;
try {
units = this->_unitGen.generateUnits(protoInputs, protoOutputs, inputMode, op.getMqttPart(), true, op.getRelaxed());
jobUnit = this->_unitGen.generateHierarchicalUnit(SensorNavigator::rootKey, std::list<std::string>(), protoGlobalOutputs,
protoInputs, protoOutputs, inputMode, op.getMqttPart(), true, op.getEnforceTopics(), op.getRelaxed());
}
catch (const std::exception &e) {
LOG(error) << this->_operatorName << " " << op.getName() << ": Error when creating template job unit: " << e.what();
delete units;
return false;
}
if(units->size() > 1) {
LOG(error) << this->_operatorName << " " << op.getName() << ": Invalid job template unit, please check your configuration!";
delete units;
return false;
}
shared_ptr<UnitTemplate<SBase>> jobUnit = units->at(0);
delete units;
op.clearUnits();
//if(!this->constructSensorTopics(*jobUnit, op))
// return false;
if (this->unit(*jobUnit)) {
op.addToUnitCache(jobUnit);
LOG(debug) << " Template job unit " + jobUnit->getName() + " generated.";
......
......@@ -145,17 +145,7 @@ public:
if(!jobUnit)
throw std::runtime_error("Job " + node + " not in the domain of operator " + this->_name + "!");
this->compute(jobUnit, _jobDataVec[0]);
for (const auto &o : jobUnit->getOutputs()) {
outMap.insert(make_pair(o->getName(), o->getLatestValue()));
o->clearReadingQueue();
}
if(this->_flatten) {
for (const auto& su : jobUnit->getSubUnits())
for (const auto &o : su->getOutputs()) {
outMap.insert(make_pair(o->getName(), o->getLatestValue()));
o->clearReadingQueue();
}
}
this->retrieveAndFlush(outMap, jobUnit);
} else
throw std::runtime_error("Operator " + this->_name + ": cannot retrieve job data!");
} catch(const exception& e) {
......@@ -170,13 +160,7 @@ public:
for(const auto& u : this->_units)
if(u->getName() == node) {
found = true;
for(const auto& o : u->getBaseOutputs())
outMap.insert(make_pair(o->getName(), o->getLatestValue()));
if(this->_flatten) {
for (const auto& su : u->getSubUnits())
for (const auto &o : su->getOutputs())
outMap.insert(make_pair(o->getName(), o->getLatestValue()));
}
this->retrieveAndFlush(outMap, u, false);
}
releaseUnits();
......@@ -235,12 +219,10 @@ protected:
U_Ptr uTemplate = this->_unitCache->at(SensorNavigator::templateKey);
shared_ptr<SensorNavigator> navi = this->_queryEngine.getNavigator();
UnitGenerator<S> unitGen(navi);
// The job unit is generated as a hierarchical unit with the top level unit and the sub-units having
// the same set of output sensors
jobUnit = unitGen.generateHierarchicalUnit(jobTopic, jobData.nodes, uTemplate->getOutputs(), uTemplate->getInputs(),
uTemplate->getOutputs(), uTemplate->getInputMode(), this->_mqttPart, this->_relaxed);
// The job unit is generated as a hierarchical unit
jobUnit = unitGen.generateFromTemplate(uTemplate, jobTopic, jobData.nodes, this->_mqttPart, this->_enforceTopics, this->_relaxed);
// Initializing sensors if necessary
jobUnit->init(this->_cacheSize, this->_flatten);
jobUnit->init(this->_cacheSize);
this->addToUnitCache(jobUnit);
}
return jobUnit;
......@@ -300,13 +282,8 @@ protected:
while(_unitAccess.exchange(true)) {}
this->clearUnits();
for(const auto& ju : _tempUnits)
if(ju) {
if(ju)
this->addUnit(ju);
if(this->_flatten) {
for (const auto& su : ju->getSubUnits())
this->addUnit(su);
}
}
_unitAccess.store(false);
_tempUnits.clear();
}
......
......@@ -72,8 +72,11 @@ protected:
const char DASH = '-';
// Keywords used to identify input and output sensor blocks
const string INPUT_BLOCK = "input";
const string OUTPUT_BLOCK = "output";
const string INPUT_BLOCK_LEGACY = "input";
const string OUTPUT_BLOCK_LEGACY = "output";
const string INPUT_BLOCK = "unitInput";
const string OUTPUT_BLOCK = "unitOutput";
const string GLOBAL_OUTPUT_BLOCK = "globalOutput";
const string ALL_CLAUSE = "all";
const string ALL_REC_CLAUSE = "all-recursive";
......@@ -353,7 +356,7 @@ protected:
*/
bool readOperator(Operator& op, CFG_VAL config) {
// Vectors containing "prototype" inputs and outputs to be modified with the actual compute units
std::vector<shared_ptr<SBase>> protoInputs, protoOutputs;
std::vector<shared_ptr<SBase>> protoInputs, protoOutputs, protoGlobalOutputs;
inputMode_t inputMode = SELECTIVE;
// Check for the existence of a template definition to initialize the operator
boost::optional<boost::property_tree::iptree&> def = config.get_child_optional("default");
......@@ -383,6 +386,8 @@ protected:
op.setMinValues(stoull(val.second.data()));
} else if (boost::iequals(val.first, "mqttPart")) {
op.setMqttPart(val.second.data());
} else if (boost::iequals(val.first, "enforceTopics")) {
op.setEnforceTopics(to_bool(val.second.data()));
} else if (boost::iequals(val.first, "sync")) {
op.setSync(to_bool(val.second.data()));
} else if (boost::iequals(val.first, "disabled")) {
......@@ -397,8 +402,8 @@ protected:
op.setUnitCacheLimit(stoull(val.second.data()));
} else if (boost::iequals(val.first, "streaming")) {
op.setStreaming(to_bool(val.second.data()));
} else if (boost::iequals(val.first, INPUT_BLOCK) || boost::iequals(val.first, OUTPUT_BLOCK)) {
// Instantiating all sensors contained within the "input" or "output" block
} else if (isInputBlock(val.first) || isOutputBlock(val.first) || isGlobalOutputBlock(val.first)) {
// Instantiating all sensors contained within the "unitInput", "unitOutput" or "globalOutput" block
BOOST_FOREACH(boost::property_tree::iptree::value_type &valInner, val.second)
{
if (boost::iequals(valInner.first, _baseName)) {
......@@ -406,13 +411,18 @@ protected:
SBase sensor(valInner.second.data());
if (readSensorBase(sensor, valInner.second, false)) {
shared_ptr<SBase> sensorPtr = make_shared<SBase>(sensor);
val.first==INPUT_BLOCK ? protoInputs.push_back(sensorPtr) : protoOutputs.push_back(sensorPtr);
if(isInputBlock(val.first))
protoInputs.push_back(sensorPtr);
else if(isOutputBlock(val.first))
protoOutputs.push_back(sensorPtr);
else
protoGlobalOutputs.push_back(sensorPtr);
} else {
LOG(warning) << "I/O " << _baseName << " " << op.getName() << "::" << sensor.getName() << " could not be read! Omitting";
}
// An "all" or "all-recursive" statement in the input block causes all sensors related to the specific
// unit to be picked
} else if (boost::iequals(val.first, INPUT_BLOCK) && (boost::iequals(valInner.first, ALL_CLAUSE) || boost::iequals(valInner.first, ALL_REC_CLAUSE))) {
// An "all" or "all-recursive" statement in the input block causes all sensors related to the specific
// unit to be picked
} else if (isInputBlock(val.first) && (boost::iequals(valInner.first, ALL_CLAUSE) || boost::iequals(valInner.first, ALL_REC_CLAUSE))) {
inputMode = boost::iequals(valInner.first, ALL_CLAUSE) ? ALL : ALL_RECURSIVE;
} else {
LOG(error) << "\"" << valInner.first << "\": unknown I/O construct!";
......@@ -427,7 +437,7 @@ protected:
// Instantiating units
if(!op.getTemplate()) {
op.setMqttPart(MQTTChecker::formatTopic(_mqttPrefix) + MQTTChecker::formatTopic(op.getMqttPart()));
return readUnits(op, protoInputs, protoOutputs, inputMode);
return readUnits(op, protoInputs, protoOutputs, protoGlobalOutputs, inputMode);
} else {
// If the operator is a template, we add it to the related map
auto ret = _templateOperators.insert(std::pair<std::string, Operator*>(op.getName(), &op));
......@@ -502,19 +512,25 @@ protected:
* virtual such as to allow for flexibility in case specific operators require different
* assignment policies (such as job operators).
*
* @param op The operator whose units must be created
* @param protoInputs The vector of prototype input sensors
* @param protoOutputs The vector of prototype output sensors
* @param inputMode Input mode to be used (selective, all or all_recursive)
* @return True if successful, false otherwise
* @param op The operator whose units must be created
* @param protoInputs The vector of prototype input sensors
* @param protoOutputs The vector of prototype output sensors
* @param protoGlobalOutputs The vector of prototype global output sensors, if any
* @param inputMode Input mode to be used (selective, all or all_recursive)
* @return True if successful, false otherwise
*/
virtual bool readUnits(Operator& op, std::vector<shared_ptr<SBase>>& protoInputs, std::vector<shared_ptr<SBase>>& protoOutputs, inputMode_t inputMode) {
virtual bool readUnits(Operator& op, std::vector<shared_ptr<SBase>>& protoInputs, std::vector<shared_ptr<SBase>>& protoOutputs,
std::vector<shared_ptr<SBase>>& protoGlobalOutputs, inputMode_t inputMode) {
vector <shared_ptr<UnitTemplate<SBase>>> *units = NULL;
if(protoOutputs.empty())
LOG(debug) << " No output specified, generating sink unit.";
// If we employ a hierarchical unit (which will be the root unit) we disable duplication
if(!protoGlobalOutputs.empty())
op.setDuplicate(false);
try {
units = _unitGen.generateUnits(protoInputs, protoOutputs, inputMode, op.getMqttPart(), !op.getStreaming(), op.getRelaxed());
units = _unitGen.generateAutoUnit(SensorNavigator::rootKey, std::list<std::string>(), protoGlobalOutputs, protoInputs,
protoOutputs, inputMode, op.getMqttPart(), !op.getStreaming(), op.getEnforceTopics(), op.getRelaxed());
}
catch (const std::exception &e) {
LOG(error) << _operatorName << " " << op.getName() << ": Error when creating units: " << e.what();
......@@ -594,8 +610,41 @@ protected:
// Performing name construction
for(auto& s: u.getOutputs())
s->setName(s->getMqtt());
for(auto& subUnit: u.getSubUnits())
for(auto& s : subUnit->getOutputs())
s->setName(s->getMqtt());
return true;
}
/**
* @brief Returns true if the input string describes an input block
*
* @param s The string to be checked
* @return True if s is a input block, false otherwise
*/
bool isInputBlock(const std::string& s ) {
return boost::iequals(s, INPUT_BLOCK) || boost::iequals(s, INPUT_BLOCK_LEGACY);
}
/**
* @brief Returns true if the input string describes an output block
*
* @param s The string to be checked
* @return True if s is a output block, false otherwise
*/
bool isOutputBlock(const std::string& s ) {
return boost::iequals(s, OUTPUT_BLOCK) || boost::iequals(s, OUTPUT_BLOCK_LEGACY);
}
/**
* @brief Returns true if the input string describes a global output block
*
* @param s The string to be checked
* @return True if s is a global output block, false otherwise
*/
bool isGlobalOutputBlock(const std::string& s ) {
return boost::iequals(s, GLOBAL_OUTPUT_BLOCK);
}
// Instance of a QueryEngine object
QueryEngine& _queryEngine;
......
......@@ -80,12 +80,12 @@ public:
_mqttPart(""),
_isTemplate(false),
_relaxed(false),
_enforceTopics(false),
_duplicate(false),
_streaming(true),
_sync(true),
_dynamic(false),
_disabled(false),
_flatten(false),
_unitID(-1),
_keepRunning(0),
_minValues(1),
......@@ -106,12 +106,12 @@ public:
_mqttPart(other._mqttPart),
_isTemplate(other._isTemplate),
_relaxed(other._relaxed),
_enforceTopics(other._enforceTopics),
_duplicate(other._duplicate),
_streaming(other._streaming),
_sync(other._sync),
_dynamic(other._dynamic),
_disabled(other._disabled),
_flatten(other._flatten),
_unitID(other._unitID),
_keepRunning(other._keepRunning),
_minValues(other._minValues),
......@@ -137,13 +137,13 @@ public:
_mqttPart = other._mqttPart;
_isTemplate = other._isTemplate;
_relaxed = other._relaxed;
_enforceTopics = other._enforceTopics;
_unitID = other._unitID;
_duplicate = other._duplicate;
_streaming = other._streaming;
_sync = other._sync;
_dynamic = other._dynamic;
_disabled = other._disabled;
_flatten = other._flatten;
_keepRunning = other._keepRunning;
_minValues = other._minValues;
_interval = other._interval;
......@@ -251,6 +251,7 @@ public:
const string& getMqttPart() const { return _mqttPart; }
bool getTemplate() const { return _isTemplate; }
bool getRelaxed() const { return _relaxed; }
bool getEnforceTopics() const { return _enforceTopics; }
bool getSync() const { return _sync; }
bool getDuplicate() const { return _duplicate; }
bool getStreaming() const { return _streaming; }
......@@ -262,13 +263,13 @@ public:
int getUnitID() const { return _unitID; }
bool getDynamic() const { return _dynamic; }
bool getDisabled() const { return _disabled; }
bool getFlatten() const { return _flatten; }
// Setter methods
void setName(const string& name) { _name = name; }
void setMqttPart(const string& mqttPart) { _mqttPart = mqttPart; }
void setTemplate(bool t) { _isTemplate = t; }
void setRelaxed(bool r) { _relaxed = r; }
void setEnforceTopics(bool e) { _enforceTopics = e; }
void setSync(bool sync) { _sync = sync; }
void setUnitID(int u) { _unitID = u; }
void setStreaming(bool streaming) { _streaming = streaming; }
......@@ -329,6 +330,8 @@ protected:
bool _isTemplate;
// If the operator's units must be built in relaxed mode
bool _relaxed;
// If true, when building the units of this operator all output sensors will have _mqttPart prepended to them
bool _enforceTopics;
// If true, the operator is a duplicate of another
bool _duplicate;
// If true, the operator performs computation in streaming
......@@ -339,8 +342,6 @@ protected:
bool _dynamic;
// If true, the operator is initialized but "disabled" and cannot be used
bool _disabled;
// Indicates whether the operator generates hierarchical units that must be flattened (their sub-units exposed)
bool _flatten;
// ID of the units this operator works on
int _unitID;
// Determines if the operator can keep running or must terminate
......
......@@ -147,8 +147,11 @@ public:
LOG_VAR(ll) << " Unit Cache Size: " << _unitCacheLimit;
if(!_units.empty()) {
LOG_VAR(ll) << " Units:";
for (auto u : _units)
u->printConfig(ll, lg);
if(_unitID<0)
for (auto u : _units)
u->printConfig(ll, lg);
else
_units[_unitID]->printConfig(ll, lg);
} else
LOG_VAR(ll) << " Units: none";
}
......@@ -164,9 +167,16 @@ public:
if (U_Ptr dUnit = dynamic_pointer_cast< UnitTemplate<S> >(u)) {
_units.push_back(dUnit);
_baseUnits.push_back(u);
if(dUnit->isTopUnit())
for(auto& subUnit : dUnit->getSubUnits()) {
if (U_Ptr dSubUnit = dynamic_pointer_cast< UnitTemplate<S> >(subUnit))
_baseUnits.push_back(dSubUnit);
else
LOG(error) << "Operator " << _name << ": Type mismatch when storing sub-unit! Will be omitted";
}
}
else
LOG(error) << "Operator " << _name << ": Type mismatch when storing output sensor! Sensor omitted";
LOG(error) << "Operator " << _name << ": Type mismatch when storing unit! Will be omitted";
}
/**
......@@ -323,34 +333,30 @@ public:
throw std::runtime_error("No template unit in operator " + _name + "!");
LOG(debug) << "Operator " << _name << ": cache miss for unit " << node << ".";
U_Ptr uTemplate = _unitCache->at(SensorNavigator::templateKey);
tempUnit = unitGen.generateUnit(node, uTemplate->getInputs(), uTemplate->getOutputs(), uTemplate->getInputMode(), _mqttPart, _relaxed);
tempUnit = unitGen.generateFromTemplate(uTemplate, node, list<string>(), _mqttPart, _enforceTopics, _relaxed);
addToUnitCache(tempUnit);
}
// Initializing sensors if necessary
tempUnit->init(_cacheSize);
compute(tempUnit);
for (const auto &o : tempUnit->getOutputs()) {
outMap.insert(make_pair(o->getName(), o->getLatestValue()));
o->clearReadingQueue();
}
retrieveAndFlush(outMap, tempUnit);
} catch(const exception& e) {
_onDemandLock.store(false);
throw;
}
_onDemandLock.store(false);
} else if( _keepRunning && !_disabled ) {
bool found = false;
// We iterate over _baseUnits and not _units because it only contains the units this operator is working on
for(const auto& u : _baseUnits)
if(u->getName() == node) {
found = true;
for(const auto& o : u->getBaseOutputs())
outMap.insert(make_pair(o->getName(), o->getLatestValue()));
}
if(!_duplicate) {
for(const auto &u : _units)
if(u->getName() == node) {
found = true;
retrieveAndFlush(outMap, u, false);
}
} else if(_unitID>=0 && node==_units[_unitID]->getName()) {
found = true;
retrieveAndFlush(outMap, _units[_unitID], false);
}
if(!found)
throw std::domain_error("Node " + node + " does not belong to the domain of " + _name + "!");
......@@ -403,9 +409,37 @@ public:
}
_baseUnits.clear();
_baseUnits.push_back(_units[_unitID]);
// If the unit is hierarchical, we add its subunits as well
if(_units[_unitID]->isTopUnit())
for(auto& subUnit : _units[_unitID]->getSubUnits())
_baseUnits.push_back(subUnit);
}
protected:
/**
* @brief Retrieves output values of a unit and puts them in a map
*
* @param outMap string to reading_t map in which outputs must be stored
* @param unit Unit (flat or hierarchical) to be scanned
* @param flushQueues If true, the queues of outbound sensor values will be flushed as well
*/
void retrieveAndFlush(map<string, reading_t>& outMap, U_Ptr unit, bool flushQueues=true) {
// Retrieving top-level outputs
for (const auto &o : unit->getOutputs()) {
outMap.insert(make_pair(o->getName(), o->getLatestValue()));
if(flushQueues)
o->clearReadingQueue();
}
// Retrieving sub-unit outputs (if any)
for (const auto &subUnit : unit->getSubUnits()) {
for (const auto &o : subUnit->getOutputs()) {
outMap.insert(make_pair(o->getName(), o->getLatestValue()));
if(flushQueues)
o->clearReadingQueue();
}
}
}
/**
* @brief Returns the timestamp associated with the next compute task
......
This diff is collapsed.
......@@ -60,9 +60,8 @@ public:
* @brief Initializes the sensors in the unit
*
* @param cacheSize size of the sensor cache
* @param flatten if true, sensors in sub-units are also initialized
*/
virtual void init(unsigned int cacheSize, bool flatten=false) = 0;
virtual void init(unsigned int cacheSize) = 0;
/**
* @brief Sets the name of this unit
......@@ -113,10 +112,11 @@ public:
/**
* @brief Prints the current unit configuration
*
* @param ll Logging level at which the configuration is printed
* @param lg Logger object to be used
* @param ll Logging level at which the configuration is printed
* @param lg Logger object to be used
* @param leadingSpaces Number of leading spaces to pre-pend
*/
virtual void printConfig(LOG_LEVEL ll, LOGGER& lg) = 0;
virtual void printConfig(LOG_LEVEL ll, LOGGER& lg, unsigned leadingSpaces=16) = 0;
};
......
......@@ -60,12 +60,8 @@ public:
UnitTemplate(const std::string& name) :
UnitInterface(),
_name(name),
_inputMode(SELECTIVE) {
// base inputs and outputs vectors are constructed using iterators
_baseInputs = std::vector<SBasePtr>(_inputs.begin(), _inputs.end());
_baseOutputs = std::vector<SBasePtr>(_outputs.begin(), _outputs.end());
}
_inputMode(SELECTIVE),
_parent(nullptr) {}
/**
* @brief Class constructor
......@@ -79,7 +75,8 @@ public:
_name(name),
_inputMode(SELECTIVE),
_inputs(inputs),
_outputs(outputs) {
_outputs(outputs),
_parent(nullptr) {
// base inputs and outputs vectors are constructed using iterators
_baseInputs = std::vector<SBasePtr>(_inputs.begin(), _inputs.end());
......@@ -91,7 +88,8 @@ public:
*/
UnitTemplate(const UnitTemplate& other) :
_name(other._name),
_inputMode(other._inputMode) {