Commit fe6c230e authored by Alessio Netti's avatar Alessio Netti
Browse files

Analytics: job operators now use string job IDs

parent 0f38456f
...@@ -138,7 +138,7 @@ public: ...@@ -138,7 +138,7 @@ public:
try { try {
// Getting exclusive access to the operator // Getting exclusive access to the operator
while( this->_onDemandLock.exchange(true) ) {} while( this->_onDemandLock.exchange(true) ) {}
uint32_t jobId = MQTTChecker::topicToJob(node); std::string jobId = MQTTChecker::topicToJob(node);
_jobDataVec.clear(); _jobDataVec.clear();
if(this->_queryEngine.queryJob(jobId, 0, 0, _jobDataVec, true, false) && !_jobDataVec.empty()) { if(this->_queryEngine.queryJob(jobId, 0, 0, _jobDataVec, true, false) && !_jobDataVec.empty()) {
U_Ptr jobUnit = jobDataToUnit(_jobDataVec[0]); U_Ptr jobUnit = jobDataToUnit(_jobDataVec[0]);
...@@ -262,7 +262,7 @@ protected: ...@@ -262,7 +262,7 @@ protected:
virtual void computeAsync() override { virtual void computeAsync() override {
try { try {
_jobDataVec.clear(); _jobDataVec.clear();
if(this->_queryEngine.queryJob(0, this->_interval * 1000000, 0, _jobDataVec, true, true)) { if(this->_queryEngine.queryJob("", this->_interval * 1000000, 0, _jobDataVec, true, true)) {
_tempUnits.clear(); _tempUnits.clear();
// Producing units from the job data, discarding invalid jobs in the process // Producing units from the job data, discarding invalid jobs in the process
for(auto& job : _jobDataVec) { for(auto& job : _jobDataVec) {
......
...@@ -36,8 +36,8 @@ ...@@ -36,8 +36,8 @@
using namespace std; using namespace std;
struct qeJobData { struct qeJobData {
uint32_t jobId; std::string jobId;
uint32_t userId; std::string userId;
uint64_t startTime; uint64_t startTime;
uint64_t endTime; uint64_t endTime;
std::list<std::string> nodes; std::list<std::string> nodes;
...@@ -46,7 +46,7 @@ struct qeJobData { ...@@ -46,7 +46,7 @@ struct qeJobData {
//Typedef for the callback used to retrieve sensors //Typedef for the callback used to retrieve sensors
typedef bool (*QueryEngineCallback)(const string&, const uint64_t, const uint64_t, vector<reading_t>&, const bool); typedef bool (*QueryEngineCallback)(const string&, const uint64_t, const uint64_t, vector<reading_t>&, const bool);
//Typedef for the job retrieval callback //Typedef for the job retrieval callback
typedef bool (*QueryEngineJobCallback)(const uint32_t, const uint64_t, const uint64_t, vector<qeJobData>&, const bool, const bool); typedef bool (*QueryEngineJobCallback)(const string&, const uint64_t, const uint64_t, vector<qeJobData>&, const bool, const bool);
//Typedef for the metadata retrieval callback //Typedef for the metadata retrieval callback
typedef bool (*QueryEngineMetadataCallback)(const string&, SensorMetadata&); typedef bool (*QueryEngineMetadataCallback)(const string&, SensorMetadata&);
...@@ -257,7 +257,7 @@ public: ...@@ -257,7 +257,7 @@ public:
* @param range If true, the jobId parameter is ignored, and all jobs in the given time range are returned * @param range If true, the jobId parameter is ignored, and all jobs in the given time range are returned
* @return True if successful, false otherwise * @return True if successful, false otherwise
*/ */
bool queryJob(const uint32_t jobId, const uint64_t startTs, const uint64_t endTs, vector<qeJobData>& buffer, const bool rel=true, const bool range=false) { bool queryJob(const string& jobId, const uint64_t startTs, const uint64_t endTs, vector<qeJobData>& buffer, const bool rel=true, const bool range=false) {
if(!_jCallback) if(!_jCallback)
throw runtime_error("Query Engine: job callback not set!"); throw runtime_error("Query Engine: job callback not set!");
if((startTs > endTs && !rel) || (startTs < endTs && rel)) if((startTs > endTs && !rel) || (startTs < endTs && rel))
......
...@@ -219,11 +219,11 @@ bool PerSystDB::getCurrentSuffixAggregateTable(std::string & suffix){ ...@@ -219,11 +219,11 @@ bool PerSystDB::getCurrentSuffixAggregateTable(std::string & suffix){
} }
bool PerSystDB::insertIntoJob(const std::string& job_id_string, unsigned long long uid, int & job_id_db, const std::string & suffix){ bool PerSystDB::insertIntoJob(const std::string& job_id_string, const std::string& uid, int & job_id_db, const std::string & suffix){
std::lock_guard<std::mutex> lock(mut); std::lock_guard<std::mutex> lock(mut);
std::stringstream build_insert; std::stringstream build_insert;
build_insert << "INSERT INTO Accounting (job_id_string, user, aggregate_first_suffix, aggregate_last_suffix) VALUES (\'" << job_id_string << "\',\'"; build_insert << "INSERT INTO Accounting (job_id_string, user, aggregate_first_suffix, aggregate_last_suffix) VALUES (\'" << job_id_string << "\',\'";
auto* pass = getpwuid(static_cast<uid_t>(uid)); auto* pass = getpwuid(static_cast<uid_t>(std::stoull(uid)));
if (pass == nullptr) { if (pass == nullptr) {
LOG(error)<< "User " << uid << " not found in system."; LOG(error)<< "User " << uid << " not found in system.";
return false; return false;
......
...@@ -113,7 +113,7 @@ public: ...@@ -113,7 +113,7 @@ public:
/** /**
* Insert job in the accounting table. * Insert job in the accounting table.
*/ */
bool insertIntoJob(const std::string& job_id_string, unsigned long long uid, int & job_id_db, const std::string & suffix); bool insertIntoJob(const std::string& job_id_string, const std::string& uid, int & job_id_db, const std::string & suffix);
/** /**
* Insert performance data into the aggregate table (Aggregate_<suffix> * Insert performance data into the aggregate table (Aggregate_<suffix>
......
...@@ -92,7 +92,7 @@ DCDB::SCError err; ...@@ -92,7 +92,7 @@ DCDB::SCError err;
QueryEngine& queryEngine = QueryEngine::getInstance(); QueryEngine& queryEngine = QueryEngine::getInstance();
logger_t lg; logger_t lg;
bool jobQueryCallback(const uint32_t jobId, const uint64_t startTs, const uint64_t endTs, vector<qeJobData>& buffer, const bool rel, const bool range) { bool jobQueryCallback(const string& jobId, const uint64_t startTs, const uint64_t endTs, vector<qeJobData>& buffer, const bool rel, const bool range) {
std::list<JobData> tempList; std::list<JobData> tempList;
JobData tempData; JobData tempData;
qeJobData tempQeData; qeJobData tempQeData;
......
...@@ -68,8 +68,8 @@ public: ...@@ -68,8 +68,8 @@ public:
* @param jobId The job ID value to be processed * @param jobId The job ID value to be processed
* @return The processed MQTT topic * @return The processed MQTT topic
*/ */
static std::string jobToTopic(uint32_t jobId) { static std::string jobToTopic(std::string jobId) {
return "/job" + std::to_string(jobId) + "/"; return "/job" + jobId + "/";
} }
/** /**
...@@ -78,13 +78,13 @@ public: ...@@ -78,13 +78,13 @@ public:
* @param topic The topic to be processed * @param topic The topic to be processed
* @return The numerical job ID * @return The numerical job ID
*/ */
static uint32_t topicToJob(const std::string& topic) { static std::string topicToJob(const std::string& topic) {
std::string jobKey(JOB_STR), jobId = topic; std::string jobKey(JOB_STR), jobId = topic;
jobId.erase(std::remove(jobId.begin(), jobId.end(), MQTT_SEP), jobId.end()); jobId.erase(std::remove(jobId.begin(), jobId.end(), MQTT_SEP), jobId.end());
size_t pos = jobId.find(jobKey); size_t pos = jobId.find(jobKey);
if (pos != std::string::npos) if (pos != std::string::npos)
jobId.erase(pos, jobKey.length()); jobId.erase(pos, jobKey.length());
return std::stoull(jobId); return jobId;
} }
/** /**
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment