11.08., 9:00 - 11:00: Due to updates GitLab will be unavailable for some minutes between 09:00 and 11:00.

Commit d64b73e5 authored by Carla Guillen Carias's avatar Carla Guillen Carias

Merge branch 'development' of https://gitlab.lrz.de/dcdb/dcdb into development

parents 40431896 47c5bde7
......@@ -58,7 +58,8 @@ this framework are the following:
| **analytics** | Wrapper structure for the data analytics-specific values.
| hierarchy | Space-separated sequence of regular expressions used to infer the local (DCDB Pusher) or global (DCDB Collect Agent) sensor hierarchy. This parameter should be wrapped in quotes to ensure proper parsing. See the Sensor Tree [section](#sensorTree) for more details.
| filter | Regular expression used to filter the set of sensors in the sensor tree. Everything that matches is included, the rest is discarded.
| jobFilter | Regular expression used to filter the jobs processed by job operators. The expression is applied to the first node of the job's nodelist. If a match is found the job is processed, otherwise it is discarded.
| jobFilter | Regular expression used to filter the jobs processed by job operators. The expression is applied to all nodes of the job's nodelist to extract certain information (e.g., rack or island).
| jobMatch | String against which the node names filtered through the _jobFilter_ are checked, to determine if a job is to be processed (see this [section](#jobOperators)).
| **operatorPlugins** | Block containing the specification of all data analytics plugin to be instantiated.
| plugin _name_ | The plugin name is used to build the corresponding lib-name (e.g. average --> libdcdboperator_average.1.0)
| path | Specify the path where the plugin (the shared library) is located. If left empty, DCDB will look in the default lib-directories (usr/lib and friends) for the plugin file.
......@@ -466,6 +467,13 @@ Job operators also support the _streaming_ and _on-demand_ modes, which work lik
from the last computation to the present; it will then build one job unit for each of them, and subsequently perform computation;
* In **on-demand** mode, users can query a specific job id, for which a job unit is built and computation is performed.
A filtering mechanism can also be applied to select which jobs an operator should process. The default filtering policy uses
two parameters: a job _filter_ regular expression and a job _match_ string. When a job first appears in the system, the
job filter regex is applied to all of the node names in its nodelist. This regex could extract, for example, the portion
of the node name that encodes a certain _rack_ or _island_ in an HPC system. Then, frequencies are computed for each filtered
node name, and the mode is computed. If the mode corresponds to the job _match_ string, the job is assigned to the
operator. This policy can be overridden and changed on a per-plugin basis.
> NOTE       The _duplicate_ setting does not affect job operators.
> NOTE       In order to get units that operate at the _node_ level, the output sensors in the
......
......@@ -63,6 +63,7 @@ public:
_unitAccess.store(false);
this->_dynamic = true;
this->_jobFilterStr = QueryEngine::getInstance().getJobFilter();
this->_jobMatchStr = QueryEngine::getInstance().getJobMatch();
this->_jobFilter = boost::regex(this->_jobFilterStr);
}
......@@ -76,6 +77,7 @@ public:
_unitAccess.store(false);
this->_dynamic = true;
this->_jobFilterStr = QueryEngine::getInstance().getJobFilter();
this->_jobMatchStr = QueryEngine::getInstance().getJobMatch();
this->_jobFilter = boost::regex(this->_jobFilterStr);
}
......@@ -87,6 +89,7 @@ public:
OperatorTemplate<S>::operator=(other);
this->_dynamic = true;
this->_jobFilterStr = QueryEngine::getInstance().getJobFilter();
this->_jobMatchStr = QueryEngine::getInstance().getJobMatch();
this->_jobFilter = boost::regex(this->_jobFilterStr);
return *this;
}
......@@ -240,11 +243,35 @@ protected:
* @return True if the job should be processed, false otherwise
*/
virtual bool filterJob(qeJobData& jobData) {
// Job with no nodes - a unit cannot be built
if(jobData.nodes.empty())
return false;
// Filtering and formatting the node list
for(auto& nodeName : jobData.nodes)
nodeName = MQTTChecker::formatTopic(nodeName) + std::string(1, MQTT_SEP);
return _jobFilterStr=="" || boost::regex_search(jobData.nodes.front().c_str(), _match, _jobFilter);
// No filter was set - every job is accepted
if(_jobFilterStr=="" || _jobMatchStr=="")
return true;
// Counting the different matches to the job filter - e.g., different racks, islands, etc.
std::map<std::string, uint64_t> matchCtr;
for(const auto& nodeName : jobData.nodes) {
if(boost::regex_search(nodeName.c_str(), _match, _jobFilter)) {
++matchCtr[_match.str(0)];
}
}
// Computing the actual mode - the filtered node name acts as a tie breaker
std::pair<std::string, uint64_t> mode = {"", 0};
for(const auto& kv : matchCtr) {
if (kv.second > mode.second || (kv.second == mode.second && kv.first > mode.first)) {
mode = kv;
}
}
// If the mode corresponds to the job match string, the check is successful.
return mode.first == _jobMatchStr;
}
/**
......@@ -317,6 +344,7 @@ protected:
vector<qeJobData> _jobDataVec;
// Regex object used to filter out jobs
string _jobFilterStr;
string _jobMatchStr;
boost::regex _jobFilter;
boost::cmatch _match;
// Logger object
......
......@@ -130,6 +130,18 @@ public:
*/
void setJobFilter(const string& jfilter) { _jobFilter = jfilter; }
/**
* @brief Set the current job match string
*
* The job match string is used to check which jobs must be processed by job operators.
* For each node in the nodelist of a job, its hostname is filtered through the job filter.
* If the mode of the filtered hostnames corresponds with the job match string, the job
* is assigned to the job operator.
*
* @param match String containing the new job match string
*/
void setJobMatch(const string& jMatch) { _jobMatch = jMatch; }
/**
* @brief Sets the internal callback to retrieve sensor data
*
......@@ -211,6 +223,13 @@ public:
*/
const string& getJobFilter() { return _jobFilter; }
/**
* @brief Returns the current job match string
*
* @return String containing the current job match string
*/
const string& getJobMatch() { return _jobMatch; }
/**
* @brief Perform a sensor query
*
......@@ -383,6 +402,8 @@ private:
string _filter;
// String storing the job filter to be used by job operators
string _jobFilter;
// String storing the matching string resulting from the job filter for a job to be processed
string _jobMatch;
};
#endif //PROJECT_QUERYENGINE_H
......@@ -733,6 +733,7 @@ int main(int argc, char* const argv[]) {
analyticsController->setMetadataStore(metadataStore);
queryEngine.setFilter(analyticsSettings.filter);
queryEngine.setJobFilter(analyticsSettings.jobFilter);
queryEngine.setJobMatch(analyticsSettings.jobMatch);
queryEngine.setSensorHierarchy(analyticsSettings.hierarchy);
queryEngine.setQueryCallback(sensorQueryCallback);
queryEngine.setGroupQueryCallback(sensorGroupQueryCallback);
......@@ -761,6 +762,8 @@ int main(int argc, char* const argv[]) {
LOG(info) << "Analytics Settings:";
LOG(info) << " Hierarchy: " << (analyticsSettings.hierarchy!="" ? analyticsSettings.hierarchy : "none");
LOG(info) << " Filter: " << (analyticsSettings.filter!="" ? analyticsSettings.filter : "none");
LOG(info) << " Job Filter: " << (analyticsSettings.jobFilter != "" ? analyticsSettings.jobFilter : "none");
LOG(info) << " Job Match: " << (analyticsSettings.jobMatch != "" ? analyticsSettings.jobMatch : "none");
LOG(info) << "Cassandra Driver Settings:";
LOG(info) << " Address: " << cassandraSettings.host << ":" << cassandraSettings.port;
......
......@@ -86,6 +86,7 @@ public:
std::string hierarchy = "";
std::string filter = "";
std::string jobFilter = "";
std::string jobMatch = "";
};
/**
......
......@@ -85,6 +85,8 @@ bool GlobalConfiguration::readConfig() {
analyticsSettings.filter = global.second.data();
} else if (boost::iequals(global.first, "jobFilter")) {
analyticsSettings.jobFilter = global.second.data();
} else if (boost::iequals(global.first, "jobMatch")) {
analyticsSettings.jobMatch = global.second.data();
} else {
LOG(warning) << " Value \"" << global.first << "\" not recognized. Omitting";
}
......
......@@ -314,6 +314,7 @@ int main(int argc, char **argv) {
_queryEngine.setFilter(analyticsSettings.filter);
_queryEngine.setJobFilter(analyticsSettings.jobFilter);
_queryEngine.setJobMatch(analyticsSettings.jobMatch);
_queryEngine.setSensorHierarchy(analyticsSettings.hierarchy);
_queryEngine.setQueryCallback(sensorQueryCallback);
_queryEngine.setGroupQueryCallback(sensorGroupQueryCallback);
......@@ -374,6 +375,8 @@ int main(int argc, char **argv) {
LOG(info) << "Analytics Settings:";
LOG(info) << " Hierarchy: " << (analyticsSettings.hierarchy != "" ? analyticsSettings.hierarchy : "none");
LOG(info) << " Filter: " << (analyticsSettings.filter != "" ? analyticsSettings.filter : "none");
LOG(info) << " Job Filter: " << (analyticsSettings.jobFilter != "" ? analyticsSettings.jobFilter : "none");
LOG(info) << " Job Match: " << (analyticsSettings.jobMatch != "" ? analyticsSettings.jobMatch : "none");
if (restAPISettings.enabled) {
LOG(info) << "RestAPI Settings:";
LOG(info) << " REST Server: " << restAPISettings.host << ":" << restAPISettings.port;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment