Commit 4e33a5ff authored by Alessio Netti's avatar Alessio Netti
Browse files

Analytics: job ID filtering support for job operators

parent e01274d8
...@@ -63,6 +63,7 @@ this framework are the following: ...@@ -63,6 +63,7 @@ this framework are the following:
| filter | Regular expression used to filter the set of sensors in the sensor tree. Everything that matches is included, the rest is discarded. | filter | Regular expression used to filter the set of sensors in the sensor tree. Everything that matches is included, the rest is discarded.
| jobFilter | Regular expression used to filter the jobs processed by job operators. The expression is applied to all nodes of the job's nodelist to extract certain information (e.g., rack or island). | jobFilter | Regular expression used to filter the jobs processed by job operators. The expression is applied to all nodes of the job's nodelist to extract certain information (e.g., rack or island).
| jobMatch | String against which the node names filtered through the _jobFilter_ are checked, to determine if a job is to be processed (see this [section](#jobOperators)). | jobMatch | String against which the node names filtered through the _jobFilter_ are checked, to determine if a job is to be processed (see this [section](#jobOperators)).
| jobIdFilter | Like the jobFilter, this is a regular expression used to filter out jobs that do not match it. In this case, the job ID is checked against the regex and the job is discarded if a match is not found.
| **operatorPlugins** | Block containing the specification of all data analytics plugin to be instantiated. | **operatorPlugins** | Block containing the specification of all data analytics plugin to be instantiated.
| plugin _name_ | The plugin name is used to build the corresponding lib-name (e.g. average --> libdcdboperator_average.1.0) | plugin _name_ | The plugin name is used to build the corresponding lib-name (e.g. average --> libdcdboperator_average.1.0)
| path | Specify the path where the plugin (the shared library) is located. If left empty, DCDB will look in the default lib-directories (usr/lib and friends) for the plugin file. | path | Specify the path where the plugin (the shared library) is located. If left empty, DCDB will look in the default lib-directories (usr/lib and friends) for the plugin file.
......
...@@ -64,7 +64,9 @@ public: ...@@ -64,7 +64,9 @@ public:
this->_dynamic = true; this->_dynamic = true;
this->_jobFilterStr = QueryEngine::getInstance().getJobFilter(); this->_jobFilterStr = QueryEngine::getInstance().getJobFilter();
this->_jobMatchStr = QueryEngine::getInstance().getJobMatch(); this->_jobMatchStr = QueryEngine::getInstance().getJobMatch();
this->_jobIdFilterStr = QueryEngine::getInstance().getJobIdFilter();
this->_jobFilter = boost::regex(this->_jobFilterStr); this->_jobFilter = boost::regex(this->_jobFilterStr);
this->_jobIdFilter = boost::regex(this->_jobIdFilterStr);
} }
/** /**
...@@ -78,7 +80,9 @@ public: ...@@ -78,7 +80,9 @@ public:
this->_dynamic = true; this->_dynamic = true;
this->_jobFilterStr = QueryEngine::getInstance().getJobFilter(); this->_jobFilterStr = QueryEngine::getInstance().getJobFilter();
this->_jobMatchStr = QueryEngine::getInstance().getJobMatch(); this->_jobMatchStr = QueryEngine::getInstance().getJobMatch();
this->_jobIdFilterStr = QueryEngine::getInstance().getJobIdFilter();
this->_jobFilter = boost::regex(this->_jobFilterStr); this->_jobFilter = boost::regex(this->_jobFilterStr);
this->_jobIdFilter = boost::regex(this->_jobIdFilterStr);
} }
/** /**
...@@ -90,7 +94,9 @@ public: ...@@ -90,7 +94,9 @@ public:
this->_dynamic = true; this->_dynamic = true;
this->_jobFilterStr = QueryEngine::getInstance().getJobFilter(); this->_jobFilterStr = QueryEngine::getInstance().getJobFilter();
this->_jobMatchStr = QueryEngine::getInstance().getJobMatch(); this->_jobMatchStr = QueryEngine::getInstance().getJobMatch();
this->_jobIdFilterStr = QueryEngine::getInstance().getJobIdFilter();
this->_jobFilter = boost::regex(this->_jobFilterStr); this->_jobFilter = boost::regex(this->_jobFilterStr);
this->_jobIdFilter = boost::regex(this->_jobIdFilterStr);
return *this; return *this;
} }
...@@ -250,6 +256,10 @@ protected: ...@@ -250,6 +256,10 @@ protected:
for(auto& nodeName : jobData.nodes) for(auto& nodeName : jobData.nodes)
nodeName = MQTTChecker::formatTopic(nodeName) + std::string(1, MQTT_SEP); nodeName = MQTTChecker::formatTopic(nodeName) + std::string(1, MQTT_SEP);
// First, we apply the job ID filter, if configured
if(_jobIdFilterStr!="" && !boost::regex_search(jobData.jobId.c_str(), _match, _jobIdFilter))
return false;
// No filter was set - every job is accepted // No filter was set - every job is accepted
if(_jobFilterStr=="" || _jobMatchStr=="") if(_jobFilterStr=="" || _jobMatchStr=="")
return true; return true;
...@@ -350,6 +360,9 @@ protected: ...@@ -350,6 +360,9 @@ protected:
string _jobMatchStr; string _jobMatchStr;
boost::regex _jobFilter; boost::regex _jobFilter;
boost::cmatch _match; boost::cmatch _match;
// Filters for jobs based on their IDs
string _jobIdFilterStr;
boost::regex _jobIdFilter;
// Logger object // Logger object
boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg; boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
......
...@@ -130,6 +130,17 @@ public: ...@@ -130,6 +130,17 @@ public:
*/ */
void setJobFilter(const string& jfilter) { _jobFilter = jfilter; } void setJobFilter(const string& jfilter) { _jobFilter = jfilter; }
/**
* @brief Set the current job ID filter
*
* This method sets the internal filter string used by job operators to identify
* the set of jobs for which they are responsible, based on their ID. All jobs whose ID does not
* match this filter are excluded.
*
* @param jidfilter String containing the new job ID filter
*/
void setJobIDFilter(const string& jidfilter) { _jobIdFilter = jidfilter; }
/** /**
* @brief Set the current job match string * @brief Set the current job match string
* *
...@@ -223,6 +234,13 @@ public: ...@@ -223,6 +234,13 @@ public:
*/ */
const string& getJobFilter() { return _jobFilter; } const string& getJobFilter() { return _jobFilter; }
/**
* @brief Returns the current job ID filter
*
* @return String containing the current job ID filter
*/
const string& getJobIdFilter() { return _jobIdFilter; }
/** /**
* @brief Returns the current job match string * @brief Returns the current job match string
* *
...@@ -405,6 +423,8 @@ private: ...@@ -405,6 +423,8 @@ private:
string _jobFilter; string _jobFilter;
// String storing the matching string resulting from the job filter for a job to be processed // String storing the matching string resulting from the job filter for a job to be processed
string _jobMatch; string _jobMatch;
// String storing the job ID filter to be used by job operators
string _jobIdFilter;
}; };
#endif //PROJECT_QUERYENGINE_H #endif //PROJECT_QUERYENGINE_H
...@@ -749,6 +749,7 @@ int main(int argc, char* const argv[]) { ...@@ -749,6 +749,7 @@ int main(int argc, char* const argv[]) {
queryEngine.setFilter(analyticsSettings.filter); queryEngine.setFilter(analyticsSettings.filter);
queryEngine.setJobFilter(analyticsSettings.jobFilter); queryEngine.setJobFilter(analyticsSettings.jobFilter);
queryEngine.setJobMatch(analyticsSettings.jobMatch); queryEngine.setJobMatch(analyticsSettings.jobMatch);
queryEngine.setJobIDFilter(analyticsSettings.jobIdFilter);
queryEngine.setSensorHierarchy(analyticsSettings.hierarchy); queryEngine.setSensorHierarchy(analyticsSettings.hierarchy);
queryEngine.setQueryCallback(sensorQueryCallback); queryEngine.setQueryCallback(sensorQueryCallback);
queryEngine.setGroupQueryCallback(sensorGroupQueryCallback); queryEngine.setGroupQueryCallback(sensorGroupQueryCallback);
...@@ -779,6 +780,7 @@ int main(int argc, char* const argv[]) { ...@@ -779,6 +780,7 @@ int main(int argc, char* const argv[]) {
LOG(info) << " Filter: " << (analyticsSettings.filter!="" ? analyticsSettings.filter : "none"); LOG(info) << " Filter: " << (analyticsSettings.filter!="" ? analyticsSettings.filter : "none");
LOG(info) << " Job Filter: " << (analyticsSettings.jobFilter != "" ? analyticsSettings.jobFilter : "none"); LOG(info) << " Job Filter: " << (analyticsSettings.jobFilter != "" ? analyticsSettings.jobFilter : "none");
LOG(info) << " Job Match: " << (analyticsSettings.jobMatch != "" ? analyticsSettings.jobMatch : "none"); LOG(info) << " Job Match: " << (analyticsSettings.jobMatch != "" ? analyticsSettings.jobMatch : "none");
LOG(info) << " Job ID Filter: " << (analyticsSettings.jobIdFilter != "" ? analyticsSettings.jobIdFilter : "none");
LOG(info) << "Cassandra Driver Settings:"; LOG(info) << "Cassandra Driver Settings:";
LOG(info) << " Address: " << cassandraSettings.host << ":" << cassandraSettings.port; LOG(info) << " Address: " << cassandraSettings.host << ":" << cassandraSettings.port;
......
...@@ -94,6 +94,7 @@ public: ...@@ -94,6 +94,7 @@ public:
std::string filter = ""; std::string filter = "";
std::string jobFilter = ""; std::string jobFilter = "";
std::string jobMatch = ""; std::string jobMatch = "";
std::string jobIdFilter = "";
}; };
/** /**
......
...@@ -97,6 +97,8 @@ void GlobalConfiguration::readConfig() { ...@@ -97,6 +97,8 @@ void GlobalConfiguration::readConfig() {
analyticsSettings.jobFilter = global.second.data(); analyticsSettings.jobFilter = global.second.data();
} else if (boost::iequals(global.first, "jobMatch")) { } else if (boost::iequals(global.first, "jobMatch")) {
analyticsSettings.jobMatch = global.second.data(); analyticsSettings.jobMatch = global.second.data();
} else if (boost::iequals(global.first, "jobIdFilter")) {
analyticsSettings.jobIdFilter = global.second.data();
} else { } else {
LOG(warning) << " Value \"" << global.first << "\" not recognized. Omitting"; LOG(warning) << " Value \"" << global.first << "\" not recognized. Omitting";
} }
......
...@@ -318,6 +318,7 @@ int main(int argc, char **argv) { ...@@ -318,6 +318,7 @@ int main(int argc, char **argv) {
_queryEngine.setFilter(analyticsSettings.filter); _queryEngine.setFilter(analyticsSettings.filter);
_queryEngine.setJobFilter(analyticsSettings.jobFilter); _queryEngine.setJobFilter(analyticsSettings.jobFilter);
_queryEngine.setJobMatch(analyticsSettings.jobMatch); _queryEngine.setJobMatch(analyticsSettings.jobMatch);
_queryEngine.setJobIDFilter(analyticsSettings.jobIdFilter);
_queryEngine.setSensorHierarchy(analyticsSettings.hierarchy); _queryEngine.setSensorHierarchy(analyticsSettings.hierarchy);
_queryEngine.setQueryCallback(sensorQueryCallback); _queryEngine.setQueryCallback(sensorQueryCallback);
_queryEngine.setGroupQueryCallback(sensorGroupQueryCallback); _queryEngine.setGroupQueryCallback(sensorGroupQueryCallback);
...@@ -380,6 +381,7 @@ int main(int argc, char **argv) { ...@@ -380,6 +381,7 @@ int main(int argc, char **argv) {
LOG(info) << " Filter: " << (analyticsSettings.filter != "" ? analyticsSettings.filter : "none"); LOG(info) << " Filter: " << (analyticsSettings.filter != "" ? analyticsSettings.filter : "none");
LOG(info) << " Job Filter: " << (analyticsSettings.jobFilter != "" ? analyticsSettings.jobFilter : "none"); LOG(info) << " Job Filter: " << (analyticsSettings.jobFilter != "" ? analyticsSettings.jobFilter : "none");
LOG(info) << " Job Match: " << (analyticsSettings.jobMatch != "" ? analyticsSettings.jobMatch : "none"); LOG(info) << " Job Match: " << (analyticsSettings.jobMatch != "" ? analyticsSettings.jobMatch : "none");
LOG(info) << " Job ID Filter: " << (analyticsSettings.jobIdFilter != "" ? analyticsSettings.jobIdFilter : "none");
if (restAPISettings.enabled) { if (restAPISettings.enabled) {
LOG(info) << "RestAPI Settings:"; LOG(info) << "RestAPI Settings:";
LOG(info) << " REST Server: " << restAPISettings.host << ":" << restAPISettings.port; LOG(info) << " REST Server: " << restAPISettings.host << ":" << restAPISettings.port;
......
...@@ -144,7 +144,7 @@ dcdbquery -h <host> <Sensor 1> [<Sensor 2> ...] <Start> <End> ...@@ -144,7 +144,7 @@ dcdbquery -h <host> <Sensor 1> [<Sensor 2> ...] <Start> <End>
Like in dcdbconfig, the *-h* argument specifies the hostname associated to an Apache Cassandra instance. Then, the tool Like in dcdbconfig, the *-h* argument specifies the hostname associated to an Apache Cassandra instance. Then, the tool
accepts a list of sensor names to be queried (represented by the *\<Sensor\>* arguments) and a time range for the query accepts a list of sensor names to be queried (represented by the *\<Sensor\>* arguments) and a time range for the query
(represented by the *\<Start\>* and *\<End\>* arguments). The range of the query can either be supplied with a pair of 64-bit timestamps (represented by the *\<Start\>* and *\<End\>* arguments). The range of the query can either be supplied with a pair of 64-bit timestamps
(expressed in nanoseconds), or with two relative timestamps (e.g., *now-1h* as *Start* and *now* as *End*). Moreover, a (expressed in nanoseconds), or with two relative timestamps (e.g., *now-1h* as *\<Start\>* and *now* as *\<End\>*). Moreover, a
series of operations can be automatically performed over the returned set of sensor values. Operations can be specified series of operations can be automatically performed over the returned set of sensor values. Operations can be specified
like in the following: like in the following:
...@@ -158,8 +158,8 @@ The currently supported operations are *delta*, *delta_t*, *derivative* and *int ...@@ -158,8 +158,8 @@ The currently supported operations are *delta*, *delta_t*, *derivative* and *int
when using sensor operations. when using sensor operations.
> NOTE 2 &ensp;&ensp;&ensp;&ensp;&ensp; When using the *-j* option to query sensors for specific jobs, the *<Start>* and > NOTE 2 &ensp;&ensp;&ensp;&ensp;&ensp; When using the *-j* option to query sensors for specific jobs, the *\<Start\>* and
*<End>* arguments are omitted, since they are inferred automatically from the job's start and end times. *\<End\>* arguments are omitted, since they are inferred automatically from the job's start and end times.
## Examples <a name="dcdbqueryexamples"></a> ## Examples <a name="dcdbqueryexamples"></a>
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment