Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

Commit f4e0fda3 authored by Alessio Netti's avatar Alessio Netti
Browse files

Analytics: integration of the job domain ID into operators

parent 406c7fd9
......@@ -64,6 +64,7 @@ this framework are the following:
| jobFilter | Regular expression used to filter the jobs processed by job operators. The expression is applied to all nodes of the job's nodelist to extract certain information (e.g., rack or island).
| jobMatch | String against which the node names filtered through the _jobFilter_ are checked, to determine if a job is to be processed (see this [section](#jobOperators)).
| jobIdFilter | Like the jobFilter, this is a regular expression used to filter out jobs that do not match it. In this case, the job ID is checked against the regex and the job is discarded if a match is not found.
| jobDomainId | Specifies the job domain ID (e.g., HPC system, SLURM partition) to query for jobs.
| **operatorPlugins** | Block containing the specification of all data analytics plugin to be instantiated.
| plugin _name_ | The plugin name is used to build the corresponding lib-name (e.g. average --> libdcdboperator_average.1.0)
| path | Specify the path where the plugin (the shared library) is located. If left empty, DCDB will look in the default lib-directories (usr/lib and friends) for the plugin file.
......
......@@ -65,6 +65,7 @@ public:
this->_jobFilterStr = QueryEngine::getInstance().getJobFilter();
this->_jobMatchStr = QueryEngine::getInstance().getJobMatch();
this->_jobIdFilterStr = QueryEngine::getInstance().getJobIdFilter();
this->_jobDomainId = QueryEngine::getInstance().getJobDomainId();
this->_jobFilter = boost::regex(this->_jobFilterStr);
this->_jobIdFilter = boost::regex(this->_jobIdFilterStr);
}
......@@ -81,6 +82,7 @@ public:
this->_jobFilterStr = QueryEngine::getInstance().getJobFilter();
this->_jobMatchStr = QueryEngine::getInstance().getJobMatch();
this->_jobIdFilterStr = QueryEngine::getInstance().getJobIdFilter();
this->_jobDomainId = QueryEngine::getInstance().getJobDomainId();
this->_jobFilter = boost::regex(this->_jobFilterStr);
this->_jobIdFilter = boost::regex(this->_jobIdFilterStr);
}
......@@ -95,6 +97,7 @@ public:
this->_jobFilterStr = QueryEngine::getInstance().getJobFilter();
this->_jobMatchStr = QueryEngine::getInstance().getJobMatch();
this->_jobIdFilterStr = QueryEngine::getInstance().getJobIdFilter();
this->_jobDomainId = QueryEngine::getInstance().getJobDomainId();
this->_jobFilter = boost::regex(this->_jobFilterStr);
this->_jobIdFilter = boost::regex(this->_jobIdFilterStr);
return *this;
......@@ -149,7 +152,7 @@ public:
while( this->_onDemandLock.exchange(true) ) {}
std::string jobId = MQTTChecker::topicToJob(node);
_jobDataVec.clear();
if(this->_queryEngine.queryJob(jobId, 0, 0, _jobDataVec, true, false) && !_jobDataVec.empty()) {
if(this->_queryEngine.queryJob(jobId, 0, 0, _jobDataVec, true, false, _jobDomainId) && !_jobDataVec.empty()) {
U_Ptr jobUnit = jobDataToUnit(_jobDataVec[0]);
if(!jobUnit)
throw std::runtime_error("Job " + node + " not in the domain of operator " + this->_name + "!");
......@@ -301,7 +304,7 @@ protected:
_jobDataVec.clear();
uint64_t queryTsEnd = !this->_scheduledTime ? getTimestamp() : this->_scheduledTime;
uint64_t queryTsStart = queryTsEnd - (this->_interval * 1000000);
if(this->_queryEngine.queryJob("", queryTsStart, queryTsEnd, _jobDataVec, false, true)) {
if(this->_queryEngine.queryJob("", queryTsStart, queryTsEnd, _jobDataVec, false, true, _jobDomainId)) {
_tempUnits.clear();
// Producing units from the job data, discarding invalid jobs in the process
for(auto& job : _jobDataVec) {
......@@ -363,6 +366,8 @@ protected:
// Filters for jobs based on their IDs
string _jobIdFilterStr;
boost::regex _jobIdFilter;
// Job domain ID to be used
string _jobDomainId;
// Logger object
boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
......
......@@ -36,6 +36,7 @@
using namespace std;
struct qeJobData {
std::string domainId;
std::string jobId;
std::string userId;
uint64_t startTime;
......@@ -48,7 +49,7 @@ typedef bool (*QueryEngineCallback)(const string&, const uint64_t, const uint64_
//Typedef for the callback used to retrieve sensors
typedef bool (*QueryEngineGroupCallback)(const vector<string>&, const uint64_t, const uint64_t, vector<reading_t>&, const bool, const uint64_t);
//Typedef for the job retrieval callback
typedef bool (*QueryEngineJobCallback)(const string&, const uint64_t, const uint64_t, vector<qeJobData>&, const bool, const bool);
typedef bool (*QueryEngineJobCallback)(const string&, const uint64_t, const uint64_t, vector<qeJobData>&, const bool, const bool, const string&);
//Typedef for the metadata retrieval callback
typedef bool (*QueryEngineMetadataCallback)(const string&, SensorMetadata&);
......@@ -153,6 +154,16 @@ public:
*/
void setJobMatch(const string& jMatch) { _jobMatch = jMatch; }
/**
* @brief Set the current job domain ID
*
* This method sets the internal domain ID to be used to query jobs. Jobs operators will be
* able to work only on jobs belonging to this specific domain.
*
* @param jDomain String containing the new job domain ID
*/
void setJobDomainId(const string& jDomain) { _jobDomainId = jDomain; }
/**
* @brief Sets the internal callback to retrieve sensor data
*
......@@ -244,10 +255,17 @@ public:
/**
* @brief Returns the current job match string
*
* @return String containing the current job match string
* @return String containing the current job match
*/
const string& getJobMatch() { return _jobMatch; }
/**
* @brief Returns the current job domain ID string
*
* @return String containing the current job domain ID
*/
const string& getJobDomainId() { return _jobDomainId; }
/**
* @brief Perform a sensor query
*
......@@ -323,14 +341,15 @@ public:
* @param buffer Reference to a vector in which job info must be stored.
* @param rel If true, the input timestamps are considered to be relative offset against "now"
* @param range If true, the jobId parameter is ignored, and all jobs in the given time range are returned
* @param domainId Job domain ID to be used for the query
* @return True if successful, false otherwise
*/
bool queryJob(const string& jobId, const uint64_t startTs, const uint64_t endTs, vector<qeJobData>& buffer, const bool rel=true, const bool range=false) {
bool queryJob(const string& jobId, const uint64_t startTs, const uint64_t endTs, vector<qeJobData>& buffer, const bool rel=true, const bool range=false, const string& domainId="default") {
if(!_jCallback)
throw runtime_error("Query Engine: job callback not set!");
if((startTs > endTs && !rel) || (startTs < endTs && rel))
throw invalid_argument("Query Engine: invalid time range!");
return _jCallback(jobId, startTs, endTs, buffer, rel, range);
return _jCallback(jobId, startTs, endTs, buffer, rel, range, domainId);
}
/**
......@@ -425,6 +444,8 @@ private:
string _jobMatch;
// String storing the job ID filter to be used by job operators
string _jobIdFilter;
// String containing the job domain ID to be queried by job operators
string _jobDomainId;
};
#endif //PROJECT_QUERYENGINE_H
......@@ -104,7 +104,7 @@ DCDB::SCError err;
QueryEngine& queryEngine = QueryEngine::getInstance();
logger_t lg;
bool jobQueryCallback(const string& jobId, const uint64_t startTs, const uint64_t endTs, vector<qeJobData>& buffer, const bool rel, const bool range) {
bool jobQueryCallback(const string& jobId, const uint64_t startTs, const uint64_t endTs, vector<qeJobData>& buffer, const bool rel, const bool range, const string& domainId) {
std::list<JobData> tempList;
JobData tempData;
qeJobData tempQeData;
......@@ -115,16 +115,17 @@ bool jobQueryCallback(const string& jobId, const uint64_t startTs, const uint64_
uint64_t startTsInt = rel ? now - startTs : startTs;
uint64_t endTsInt = rel ? now - endTs : endTs;
DCDB::TimeStamp start(startTsInt), end(endTsInt);
err = myJobDataStore->getJobsInIntervalRunning(tempList, start, end);
err = myJobDataStore->getJobsInIntervalRunning(tempList, start, end, domainId);
if(err != JD_OK) return false;
} else {
// Getting a single job by id
err = myJobDataStore->getJobById(tempData, jobId);
err = myJobDataStore->getJobById(tempData, jobId, domainId);
if(err != JD_OK) return false;
tempList.push_back(tempData);
}
for(auto& jd : tempList) {
tempQeData.domainId = jd.domainId;
tempQeData.jobId = jd.jobId;
tempQeData.userId = jd.userId;
tempQeData.startTime = jd.startTime.getRaw();
......@@ -752,6 +753,7 @@ int main(int argc, char* const argv[]) {
queryEngine.setJobFilter(analyticsSettings.jobFilter);
queryEngine.setJobMatch(analyticsSettings.jobMatch);
queryEngine.setJobIDFilter(analyticsSettings.jobIdFilter);
queryEngine.setJobDomainId(analyticsSettings.jobDomainId);
queryEngine.setSensorHierarchy(analyticsSettings.hierarchy);
queryEngine.setQueryCallback(sensorQueryCallback);
queryEngine.setGroupQueryCallback(sensorGroupQueryCallback);
......@@ -783,6 +785,7 @@ int main(int argc, char* const argv[]) {
LOG(info) << " Job Filter: " << (analyticsSettings.jobFilter != "" ? analyticsSettings.jobFilter : "none");
LOG(info) << " Job Match: " << (analyticsSettings.jobMatch != "" ? analyticsSettings.jobMatch : "none");
LOG(info) << " Job ID Filter: " << (analyticsSettings.jobIdFilter != "" ? analyticsSettings.jobIdFilter : "none");
LOG(info) << " Job Domain ID: " << analyticsSettings.jobDomainId;
LOG(info) << "Cassandra Driver Settings:";
LOG(info) << " Address: " << cassandraSettings.host << ":" << cassandraSettings.port;
......
......@@ -95,6 +95,7 @@ public:
std::string jobFilter = "";
std::string jobMatch = "";
std::string jobIdFilter = "";
std::string jobDomainId = "default";
};
/**
......
......@@ -99,6 +99,8 @@ void GlobalConfiguration::readConfig() {
analyticsSettings.jobMatch = global.second.data();
} else if (boost::iequals(global.first, "jobIdFilter")) {
analyticsSettings.jobIdFilter = global.second.data();
} else if (boost::iequals(global.first, "jobDomainId")) {
analyticsSettings.jobDomainId = global.second.data();
} else {
LOG(warning) << " Value \"" << global.first << "\" not recognized. Omitting";
}
......
......@@ -319,6 +319,7 @@ int main(int argc, char **argv) {
_queryEngine.setJobFilter(analyticsSettings.jobFilter);
_queryEngine.setJobMatch(analyticsSettings.jobMatch);
_queryEngine.setJobIDFilter(analyticsSettings.jobIdFilter);
_queryEngine.setJobDomainId(analyticsSettings.jobDomainId);
_queryEngine.setSensorHierarchy(analyticsSettings.hierarchy);
_queryEngine.setQueryCallback(sensorQueryCallback);
_queryEngine.setGroupQueryCallback(sensorGroupQueryCallback);
......@@ -382,6 +383,7 @@ int main(int argc, char **argv) {
LOG(info) << " Job Filter: " << (analyticsSettings.jobFilter != "" ? analyticsSettings.jobFilter : "none");
LOG(info) << " Job Match: " << (analyticsSettings.jobMatch != "" ? analyticsSettings.jobMatch : "none");
LOG(info) << " Job ID Filter: " << (analyticsSettings.jobIdFilter != "" ? analyticsSettings.jobIdFilter : "none");
LOG(info) << " Job Domain ID: " << analyticsSettings.jobDomainId;
if (restAPISettings.enabled) {
LOG(info) << "RestAPI Settings:";
LOG(info) << " REST Server: " << restAPISettings.host << ":" << restAPISettings.port;
......
......@@ -80,9 +80,9 @@ Showing all available information for a specific sensor:
dcdbconfig -h 127.0.0.1 sensor show /test/node1/AnonPages
```
Sample output:
Sample output:
```
```
dcdbconfig 0.4.135-gb57cedf (libdcdb 0.4.135-gb57cedf)
Details for public sensor /test/node1/AnonPages:
......@@ -93,7 +93,7 @@ Operations: -avg10,-avg300,-avg3600
Interval: 500000000
TTL: 0
Sensor Properties:
```
```
---
......@@ -103,15 +103,15 @@ Listing all jobs stored in the database that are currently running:
dcdbconfig -h 127.0.0.1 job running SYSTEM1
```
Sample output:
Sample output:
```
```
dcdbconfig 0.4.160-g48ce9d0 (libdcdb 0.4.161-gf25fe15)
Domain ID, Job ID, User ID, Start Time, End Time, #Nodes
SYSTEM1,800475,di726bwe,1597992340671657000,0,128
SYSTEM1,800499,ga526ppd,1597992340522786000,0,16
```
```
---
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment