Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

Commit a9c9a1cc authored by Alessio Netti's avatar Alessio Netti
Browse files

Job queries in QueryEngine

- Job data queries have been integrated in the QueryEngine
parent f1e6cc3d
......@@ -11,8 +11,18 @@
using namespace std;
struct qeJobData {
uint32_t jobId;
uint32_t userId;
uint64_t startTime;
uint64_t endTime;
std::list<std::string> nodes;
};
//Typedef for the callback used to retrieve sensors
typedef vector<reading_t>* (*QueryEngineCallback)(const string&, const uint64_t, const uint64_t, vector<reading_t>*, const bool rel);
typedef vector<reading_t>* (*QueryEngineCallback)(const string&, const uint64_t, const uint64_t, vector<reading_t>*, const bool);
//Typedef for the job retrieval callback
typedef vector<qeJobData>* (*QueryEngineJobCallback)(const uint32_t, const uint64_t, const uint64_t, vector<qeJobData>*, const bool, const bool);
/**
* Class that grants query access to local and remote sensors
......@@ -70,6 +80,17 @@ public:
*/
void setQueryCallback(QueryEngineCallback cb) { _callback = cb; }
/**
* @brief Sets the internal callback to retrieve job data
*
* This method sets the internal callback that will be used by the QueryEngine to retrieve job
* data and thus implement an abstraction layer. Behavior of the callback must be identical to
* that specified in setQueryCallback.
*
* @param jcb Pointer to a function of type QueryEngineJobCallback
*/
void setJobQueryCallback(QueryEngineJobCallback jcb) { _jCallback = jcb; }
/**
* @brief Returns the internal SensorNavigator objects
*
......@@ -104,7 +125,7 @@ public:
* @param startTs Start timestamp (in nanoseconds) of the time range for the query
* @param endTs End timestamp (in nanoseconds) of the time range for the query. Must be >= startTs
* @param buffer Vector in which readings must be stored. If NULL, a new vector will be allocated
* @param true If true, the input timestamps are considered to be relative offset against "now"
* @param rel If true, the input timestamps are considered to be relative offset against "now"
* @return Pointer to a vector containing readings for the given query
*/
vector<reading_t>* querySensor(const string& name, const uint64_t startTs, const uint64_t endTs, vector<reading_t>* buffer, const bool rel=true) {
......@@ -115,6 +136,38 @@ public:
return _callback(name, startTs, endTs, buffer, rel);
}
/**
* @brief Perform a job query
*
* This method allows to retrieve data for jobs running in a given time range. The input
* "buffer" vector allows to re-use memory over successive readings. Note that in order to use
* this method, a callback must have been set through the setJobQueryCallback method. If not, this
* method will throw an exception.
*
* The "rel" argument governs how the search is performed in local sensor caches: if set to true,
* startTs and endTs indicate relative offsets against the most recent reading, and the returned
* vector is a view of the cache whose range is computed statically in O(1), and therefore the
* underlying data may be slightly unaligned depending on the sampling rate. If rel is set to
* false, startTs and endTs are interpreted as absolute timestamps, and the cache view is
* determined by performing binary search with O(log(n)) complexity, thus resulting in a accurate
* time range. This parameter does not affect the query method when using the Cassandra datastore.
*
* @param jobId ID of the job to be retrieved (only if range=false)
* @param startTs Start timestamp (in nanoseconds) of the time range for the query (only if range=true)
* @param endTs End timestamp (in nanoseconds) of the time range for the query. (only if range=true)
* @param buffer Vector in which job info must be stored. If NULL, a new vector will be allocated
* @param rel If true, the input timestamps are considered to be relative offset against "now"
* @param range If true, the jobId parameter is ignored, and all jobs in the given time range are returned
* @return Pointer to a vector containing job information for the given query
*/
vector<qeJobData>* queryJob(const uint32_t jobId, const uint64_t startTs, const uint64_t endTs, vector<qeJobData>* buffer, const bool rel=true, const bool range=false) {
if(!_jCallback)
throw runtime_error("Query Engine: job callback not set!");
if((startTs > endTs && !rel) || (startTs < endTs && rel))
throw invalid_argument("Query Engine: invalid time range!");
return _jCallback(jobId, startTs, endTs, buffer, rel, range);
}
void triggerUpdate() {
updating.store(false);
updated.store(true);
......@@ -132,6 +185,7 @@ private:
QueryEngine() {
_navigator = NULL;
_callback = NULL;
_jCallback = NULL;
updated.store(false);
updating.store(false);
}
......@@ -150,6 +204,8 @@ private:
shared_ptr<SensorNavigator> _navigator;
// Callback used to retrieve sensor data
QueryEngineCallback _callback;
// Callback used to retrieve job data
QueryEngineJobCallback _jCallback;
// String storing the current sensor hierarchy, used for convenience
string _sensorHierarchy;
};
......
......@@ -37,6 +37,7 @@
#include <dcdb/connection.h>
#include <dcdb/sensordatastore.h>
#include <dcdb/jobdatastore.h>
#include <dcdb/sensorconfig.h>
#include <dcdb/version.h>
#include <dcdb/sensor.h>
......@@ -65,11 +66,46 @@ SensorCache mySensorCache;
AnalyticsController* analyticsController;
DCDB::Connection* dcdbConn;
DCDB::SensorDataStore *mySensorDataStore;
DCDB::JobDataStore *myJobDataStore;
DCDB::SensorConfig *mySensorConfig;
DCDB::SCError err;
QueryEngine& queryEngine = QueryEngine::getInstance();
logger_t lg;
std::vector<qeJobData>* jobQueryCallback(const uint32_t jobId, const uint64_t startTs, const uint64_t endTs, vector<qeJobData>* buffer, const bool rel, const bool range) {
std::list<JobData> tempList;
JobData tempData;
qeJobData tempQeData;
JDError err;
if(range) {
// Getting a list of jobs in the given time range
uint64_t now = getTimestamp();
uint64_t startTsInt = rel ? now - startTs : startTs;
uint64_t endTsInt = rel ? now - endTs : endTs;
DCDB::TimeStamp start(startTsInt), end(endTsInt);
err = myJobDataStore->getJobsInIntervalExcl(tempList, start, end);
if(err != JD_OK) return NULL;
} else {
// Getting a single job by id
err = myJobDataStore->getJobById(tempData, jobId);
if(err != JD_OK) return NULL;
tempList.push_back(tempData);
}
if(!buffer)
buffer = new std::vector<qeJobData>();
buffer->clear();
for(auto& jd : tempList) {
tempQeData.jobId = jd.jobId;
tempQeData.userId = jd.userId;
tempQeData.startTime = jd.startTime.getRaw();
tempQeData.endTime = jd.endTime.getRaw();
tempQeData.nodes = jd.nodes;
buffer->push_back(tempQeData);
}
return buffer;
}
std::vector<reading_t>* sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>* buffer, const bool rel) {
std::string topic;
// Getting the topic of the queried sensor from the Navigator
......@@ -609,6 +645,7 @@ int main(int argc, char* const argv[]) {
*/
mySensorDataStore = new DCDB::SensorDataStore(dcdbConn);
mySensorConfig = new DCDB::SensorConfig(dcdbConn);
myJobDataStore = new DCDB::JobDataStore(dcdbConn);
/*
* Set TTL for data store inserts if TTL > 0.
......@@ -622,6 +659,7 @@ int main(int argc, char* const argv[]) {
if(!analyticsController->initialize(settings, argv[argc - 1]))
return EXIT_FAILURE;
queryEngine.setQueryCallback(sensorQueryCallback);
queryEngine.setJobQueryCallback(jobQueryCallback);
LOG_LEVEL vLogLevel = settings.validateConfig ? LOG_LEVEL::info : LOG_LEVEL::debug;
LOG_VAR(vLogLevel) << "----- Configuration -----";
......@@ -752,6 +790,7 @@ int main(int argc, char* const argv[]) {
httpThread.join();
LOG(info) << "HTTP Server stopped...";
delete mySensorDataStore;
delete myJobDataStore;
delete mySensorConfig;
dcdbConn->disconnect();
delete dcdbConn;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment