Commit e60c5ab9 authored by Carla Guillen's avatar Carla Guillen
Browse files

Merge branch 'development' of https://gitlab.lrz.de/dcdb/dcdb into development

parents bb3f4462 584093ba
......@@ -48,9 +48,11 @@ Additional parameters specific to this framework are the following:
| Value | Explanation |
|:----- |:----------- |
| analytics | Wrapper structure for the data analytics-specific values.
| **analytics** | Wrapper structure for the data analytics-specific values.
| hierarchy | Space-separated sequence of regular expressions used to infer the local (DCDBPusher) or global (DCDBCollectAgent) sensor hierarchy. This parameter should be wrapped in quotes to ensure proper parsing. See the Sensor Tree [section](#sensorTree) for more details.
| operatorPlugins | Block containing the specification of all data analytics plugin to be instantiated.
| filter | Regular expression used to filter the set of sensors in the sensor tree. Everything that matches is included, the rest is discarded.
| jobFilter | Regular expression used to filter the jobs processed by job operators. The expression is applied to the first node of the job's nodelist. If a match is found the job is processed, otherwise it is discarded. This behavior can be changed at the plugin level.
| **operatorPlugins** | Block containing the specification of all data analytics plugin to be instantiated.
| plugin _name_ | The plugin name is used to build the corresponding lib-name (e.g. average --> libdcdboperator_average.1.0)
| path | Specify the path where the plugin (the shared library) is located. If left empty, DCDB will look in the default lib-directories (usr/lib and friends) for the plugin file.
| config | One can specify a separate config-file (including path to it) for the plugin to use. If not specified, DCDB will look up pluginName.conf (e.g. average.conf) in the same directory where global.conf is located.
......
......@@ -63,6 +63,8 @@ public:
_unitAccess.store(false);
this->_dynamic = true;
this->_jobFilterStr = QueryEngine::getInstance().getJobFilter();
this->_jobFilter = boost::regex(this->_jobFilterStr);
}
/**
......@@ -75,6 +77,8 @@ public:
_unitAccess.store(false);
this->_dynamic = true;
this->_jobFilterStr = QueryEngine::getInstance().getJobFilter();
this->_jobFilter = boost::regex(this->_jobFilterStr);
}
/**
......@@ -85,6 +89,8 @@ public:
OperatorTemplate<S>::operator=(other);
_jobDataVec = nullptr;
this->_dynamic = true;
this->_jobFilterStr = QueryEngine::getInstance().getJobFilter();
this->_jobFilter = boost::regex(this->_jobFilterStr);
return *this;
}
......@@ -145,7 +151,8 @@ public:
if(buf) _jobDataVec = buf;
if(buf && !buf->empty()) {
U_Ptr jobUnit = jobDataToUnit(_jobDataVec->at(0));
if(!jobUnit)
throw std::runtime_error("Job " + node + " not in the domain of operator " + this->_name + "!");
this->compute(jobUnit, _jobDataVec->at(0));
for (const auto &o : jobUnit->getOutputs()) {
outMap.insert(make_pair(o->getName(), o->getLatestValue()));
......@@ -216,7 +223,7 @@ protected:
* @param jobData a qeJobData struct containing job information
* @return A shared pointer to a job unit object
*/
virtual U_Ptr jobDataToUnit(const qeJobData& jobData) {
virtual U_Ptr jobDataToUnit(qeJobData& jobData) {
string jobTopic = MQTTChecker::jobToTopic(jobData.jobId);
U_Ptr jobUnit = nullptr;
if(!this->_unitCache)
......@@ -232,17 +239,15 @@ protected:
throw std::runtime_error("No template unit in operator " + this->_name + "!");
if(!this->_streaming)
LOG(debug) << "Operator " << this->_name << ": cache miss for unit " << jobTopic << ".";
if(!this->filterJob(jobData))
return nullptr;
U_Ptr uTemplate = this->_unitCache->at(SensorNavigator::templateKey);
shared_ptr<SensorNavigator> navi = this->_queryEngine.getNavigator();
UnitGenerator<S> unitGen(navi);
vector<string> nodes;
for (const auto &n : jobData.nodes)
nodes.push_back(translateNodeName(n));
// The job unit is generated as a hierarchical unit with the top level unit and the sub-units having
// the same set of output sensors
jobUnit = unitGen.generateHierarchicalUnit(jobTopic, nodes, uTemplate->getOutputs(), uTemplate->getInputs(),
jobUnit = unitGen.generateHierarchicalUnit(jobTopic, jobData.nodes, uTemplate->getOutputs(), uTemplate->getInputs(),
uTemplate->getOutputs(), uTemplate->getInputMode(), jobTopic, this->_relaxed);
// Initializing sensors if necessary
jobUnit->init(this->_cacheSize, this->_flatten);
this->addToUnitCache(jobUnit);
......@@ -251,17 +256,23 @@ protected:
}
/**
* @brief Translates a node name as returned by the resource manager to an internal representation
* @brief Tests the job against the internal filter
*
* The external node name is usually just the hostname associated to the machine. This
* representation usually needs to be converted to an internal one that reflects the hierarchy
* described by the sensor navigator. Since this logic is sytem-dependent, users can freely
* override this method.
* This method is used to filter out jobs for which this operator is not responsible. By default,
* the operator checks the first node in the nodelist of the job, and if its hostname matches
* with the internal job filter regex, the job is accepted. This method can be overridden to
* implement more complex filtering policies.
*
* @param n Raw node hostname
* @return Converted sensor navigator-friendly node name
* @param jobData a qeJobData struct containing job information
* @return True if the job should be processed, false otherwise
*/
virtual string translateNodeName(string n) { return MQTTChecker::formatTopic(n) + std::string(1, MQTT_SEP); }
virtual bool filterJob(qeJobData& jobData) {
if(jobData.nodes.empty())
return false;
for(auto& nodeName : jobData.nodes)
nodeName = MQTTChecker::formatTopic(nodeName) + std::string(1, MQTT_SEP);
return _jobFilterStr=="" || boost::regex_search(jobData.nodes.front().c_str(), _match, _jobFilter);
}
/**
* @brief Performs a compute task
......@@ -290,7 +301,7 @@ protected:
_jobDataVec = buf;
_tempUnits.clear();
// Producing units from the job data, discarding invalid jobs in the process
for(const auto& job : *_jobDataVec) {
for(auto& job : *_jobDataVec) {
try {
_tempUnits.push_back(jobDataToUnit(job));
} catch(const invalid_argument& e2) {
......@@ -338,6 +349,10 @@ protected:
atomic<bool> _unitAccess;
// Vector of job data structures used to retrieve job data at runtime
vector<qeJobData>* _jobDataVec;
// Regex object used to filter out jobs
string _jobFilterStr;
boost::regex _jobFilter;
boost::cmatch _match;
// Logger object
boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
......
......@@ -115,6 +115,16 @@ public:
*/
void setFilter(const string& filter) { _filter = filter; }
/**
* @brief Set the current job filter
*
* This method sets the internal filter string used by job operators to identify
* the set of jobs for which they are responsible, based on their nodelist.
*
* @param filter String containing the new job filter
*/
void setJobFilter(const string& jfilter) { _jobFilter = jfilter; }
/**
* @brief Sets the internal callback to retrieve sensor data
*
......@@ -166,6 +176,13 @@ public:
*/
const string& getFilter() { return _filter; }
/**
* @brief Returns the current job filter
*
* @return String containing the current job filter
*/
const string& getJobFilter() { return _jobFilter; }
/**
* @brief Perform a sensor query
*
......@@ -293,6 +310,8 @@ private:
string _sensorHierarchy;
// String storing the filter to be used when building a sensor navigator
string _filter;
// String storing the job filter to be used by job operators
string _jobFilter;
};
#endif //PROJECT_QUERYENGINE_H
......@@ -29,6 +29,7 @@
#define PROJECT_UNITGENERATOR_H
#include <set>
#include <list>
#include <boost/regex.hpp>
#include <boost/algorithm/string.hpp>
#include "sensornavigator.h"
......@@ -275,7 +276,7 @@ public:
* together with node-related outputs.
*
* @param name Identifier used to name the unit
* @param nodes Vector of node identifiers used to build the sub-units
* @param nodes List of node identifiers used to build the sub-units
* @param outputs The vector of "prototype" sensor objects for the top level unit's outputs
* @param subInputs The vector of "prototype" sensor objects for the sub-units' inputs
* @param subOutputs The vector of "prototype" sensor objects for the sub-units' outputs
......@@ -284,7 +285,7 @@ public:
* @param relaxed If True, checks on the existence of input sensors are ignored
* @return A shared pointers to the generated hierarchical unit object
*/
shared_ptr<UnitTemplate<SBase>> generateHierarchicalUnit(const string& name, vector<std::string>& nodes,
shared_ptr<UnitTemplate<SBase>> generateHierarchicalUnit(const string& name, const list<std::string>& nodes,
vector<shared_ptr<SBase>>& outputs, vector<shared_ptr<SBase>>& subInputs, vector<shared_ptr<SBase>>& subOutputs,
inputMode_t inputMode, string mqttPrefix="", bool relaxed=false) {
......
......@@ -494,6 +494,7 @@ int main(int argc, char* const argv[]) {
analyticsController = new AnalyticsController(mySensorConfig, mySensorDataStore);
analyticsController->setCache(&mySensorCache);
queryEngine.setFilter(analyticsSettings.filter);
queryEngine.setJobFilter(analyticsSettings.jobFilter);
queryEngine.setSensorHierarchy(analyticsSettings.hierarchy);
queryEngine.setQueryCallback(sensorQueryCallback);
queryEngine.setJobQueryCallback(jobQueryCallback);
......
......@@ -84,6 +84,7 @@ public:
analyticsSettings_t() {}
std::string hierarchy = "";
std::string filter = "";
std::string jobFilter = "";
};
/**
......
......@@ -83,6 +83,8 @@ bool GlobalConfiguration::readConfig() {
analyticsSettings.hierarchy = global.second.data();
} else if (boost::iequals(global.first, "filter")) {
analyticsSettings.filter = global.second.data();
} else if (boost::iequals(global.first, "jobFilter")) {
analyticsSettings.jobFilter = global.second.data();
} else {
LOG(warning) << " Value \"" << global.first << "\" not recognized. Omitting";
}
......
......@@ -285,6 +285,7 @@ int main(int argc, char** argv) {
}
_queryEngine.setFilter(analyticsSettings.filter);
_queryEngine.setJobFilter(analyticsSettings.jobFilter);
_queryEngine.setSensorHierarchy(analyticsSettings.hierarchy);
_queryEngine.setQueryCallback(sensorQueryCallback);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment