In January 2021 we will introduce a 10 GB quota for project repositories. Higher limits for individual projects will be available on request. Please see https://doku.lrz.de/display/PUBLIC/GitLab for more information.

Commit 8a184a8b authored by Carla Guillen Carias's avatar Carla Guillen Carias

Adding more uniqueness for job ids from slurm (now user and number of nodes...

Adding more uniqueness for job ids from slurm (now user and number of nodes has to match). Refactored a little as well.
parent fad2ae88
......@@ -39,7 +39,7 @@
#include "MariaDB.h"
#include "MariaDB.h"
/**************SQLResult****************/
class SQLResult {
private:
......@@ -61,6 +61,41 @@ public:
}
};
/**************JobCache****************/
const std::string DELIMITER = "|";
std::string createIdJobCache(const std::string uid, int number_of_nodes, const std::string &job_id_string){
std::stringstream id;
id << job_id_string << DELIMITER << uid << DELIMITER << number_of_nodes;
return id.str();
}
void JobCache::addJobToCache(const std::string uid, int number_of_nodes, const std::string &job_id_string, const std::string & job_id_db){
//remove one element before inserting (the last condition (_jobCache.size() > JOB_CACHE_MAX_SIZE) shouldn't really happen...
if(_jobCacheMap.size() == JOB_CACHE_MAX_SIZE || _jobCacheMap.size() > JOB_CACHE_MAX_SIZE){
using MyPairType = std::pair<std::string, Job_info_t>;
auto smallest = std::min_element(_jobCacheMap.begin(), _jobCacheMap.end(),
[](const MyPairType& l, const MyPairType& r) -> bool {return l.second.last_seen_timestamp < r.second.last_seen_timestamp;});
_jobCacheMap.erase(smallest);
}
Job_info_t ji;
ji.job_id_db = job_id_db;
ji.last_seen_timestamp = getTimestamp();
_jobCacheMap[createIdJobCache(uid, number_of_nodes, job_id_string)] = ji;
}
bool JobCache::findJobInCache(const std::string uid, int number_of_nodes, const std::string &job_id_string, Job_info_t * job_info){
auto found = _jobCacheMap.find(createIdJobCache(uid, number_of_nodes, job_id_string));
if(found != _jobCacheMap.end()){ //found
job_info = &(found->second);
job_info->last_seen_timestamp = getTimestamp();
return true;
}
return false;
}
/**************MariaDB****************/
MariaDB * MariaDB::instance = nullptr;
std::mutex MariaDB::mut;
std::once_flag MariaDB::init_once;
......@@ -116,30 +151,17 @@ MariaDB::~MariaDB(){
}
bool MariaDB::getDBJobIDs(std::vector<std::string> & job_id_strings, std::map<std::string, std::string>& job_id_map) {
bool MariaDB::getDBJobID(const std::string & job_id_string, std::string& job_db_id, const std::string & user, int number_nodes) {
std::lock_guard<std::mutex> lock(mut);
std::vector<std::string> notfound;
for(auto & job_id_str: job_id_strings){
auto found = _jobCache.find(job_id_str);
if(found != _jobCache.end()){
job_id_map[job_id_str] = found->second.job_id_db;
found->second.last_seen_timestamp = getTimestamp();
} else {
notfound.push_back(job_id_str);
}
}
if(!notfound.size()){ //every job was found
return true;
Job_info_t *job_info=nullptr;
if(_jobCache.findJobInCache(user, number_nodes, job_id_string, job_info)){ //found
job_db_id = job_info->job_id_db;
return true; //job found
}
std::stringstream build_query;
build_query << "SELECT job_id, job_id_string FROM Accounting WHERE job_id_string IN (";
for (std::vector<std::string>::size_type i = 0; i < notfound.size(); ++i) {
build_query << "'" << notfound[i] << "'";
if (i != notfound.size() - 1) { //not last element
build_query << ",";
}
}
build_query << ")";
build_query << "SELECT job_id, job_id_string FROM Accounting WHERE job_id_string LIKE '" << job_id_string << "' AND user LIKE '" << user << "'";
build_query << " AND nodes=" << number_nodes;
auto query = build_query.str();
LOG(debug)<< query;
if (mysql_real_query(_mysql, query.c_str(), query.size())) {
......@@ -152,28 +174,14 @@ bool MariaDB::getDBJobIDs(std::vector<std::string> & job_id_strings, std::map<st
MYSQL_ROW row;
while ((row = result.fetch_row())) {
if (row[0]) {
std::string job_id_db = row[0];
job_db_id = row[0];
std::string job_id_string = std::string(row[1]);
job_id_map[job_id_string] = job_id_db;
addJobToCache(job_id_string, job_id_db);
_jobCache.addJobToCache(user, number_nodes, job_id_string, job_db_id);
return true; //found
}
}
}
return true;
}
void MariaDB::addJobToCache(const std::string &job_id_string, std::string & job_id_db){
//remove one element before inserting (the last condition (_jobCache.size() > JOB_CACHE_MAX_SIZE) shouldn't really happen...
if(_jobCache.size() == JOB_CACHE_MAX_SIZE || _jobCache.size() > JOB_CACHE_MAX_SIZE){
using MyPairType = std::pair<std::string, MariaDB::Job_info_t>;
auto smallest = std::min_element(_jobCache.begin(), _jobCache.end(),
[](const MyPairType& l, const MyPairType& r) -> bool {return l.second.last_seen_timestamp < r.second.last_seen_timestamp;});
_jobCache.erase(smallest);
}
Job_info_t ji;
ji.job_id_db = job_id_db;
ji.last_seen_timestamp = getTimestamp();
_jobCache[job_id_string] = ji;
return false;
}
bool MariaDB::getCurrentSuffixAggregateTable(std::string & suffix){
......@@ -217,20 +225,19 @@ bool MariaDB::getCurrentSuffixAggregateTable(std::string & suffix){
}
bool MariaDB::insertIntoJob(const std::string& job_id_string, const std::string& uid, int & job_id_db, const std::string & suffix){
bool MariaDB::insertIntoJob(const std::string& job_id_string, const std::string& uid, std::string & job_id_db, const std::string & suffix, int number_nodes){
std::lock_guard<std::mutex> lock(mut);
//maybe another thread did this for us
auto found = _jobCache.find(job_id_string);
if(found != _jobCache.end()){
job_id_db = std::stoi(found->second.job_id_db);
found->second.last_seen_timestamp = getTimestamp();
Job_info_t *job_info=nullptr;
if(_jobCache.findJobInCache(uid,number_nodes, job_id_string, job_info )){
job_id_db = job_info->job_id_db;
return true;
}
//Also check that job was not inserted by another collector
//Also check that job was not inserted by another collector (shouldn't really happen but "sicher ist sicher"
std::stringstream build_query;
build_query << "SELECT job_id, job_id_string FROM Accounting WHERE job_id_string ='";
build_query << job_id_string << "' AND user='" << uid <<"'";
build_query << job_id_string << "' AND user='" << uid <<"' AND nodes=" << number_nodes;
auto select_query = build_query.str();
LOG(debug) << select_query;
......@@ -245,18 +252,18 @@ bool MariaDB::insertIntoJob(const std::string& job_id_string, const std::string&
MYSQL_ROW row;
while ((row = result.fetch_row())) {
if (row[0]) {
std::string db_job_id = row[0];
addJobToCache(job_id_string, db_job_id);
job_id_db = row[0];
_jobCache.addJobToCache(uid, number_nodes, job_id_string, job_id_db);
job_found_in_db=true;
job_id_db = std::stoi(db_job_id);
}
}
}
if(!job_found_in_db) {
std::stringstream build_insert;
build_insert << "INSERT IGNORE INTO Accounting (job_id_string, user, aggregate_first_suffix, aggregate_last_suffix) VALUES (\'" << job_id_string << "\',\'";
build_insert << "INSERT IGNORE INTO Accounting (job_id_string, user, nodes, aggregate_first_suffix, aggregate_last_suffix) VALUES (\'" << job_id_string << "\',\'";
build_insert << uid << "\',\'";
build_insert << number_nodes << "\',\'";
build_insert << suffix << "\',\'" << suffix << "\')";
std::string query = build_insert.str();
LOG(debug)<< query;
......@@ -265,7 +272,7 @@ bool MariaDB::insertIntoJob(const std::string& job_id_string, const std::string&
print_error();
return false;
}
job_id_db = mysql_insert_id(_mysql);
job_id_db = std::to_string(mysql_insert_id(_mysql));
}
return true;
}
......@@ -339,7 +346,7 @@ bool MariaDB::createNewAggregate(std::string& new_suffix){
if(mysql_real_query(_mysql, query2.c_str(), query2.size() )){
if(mysql_errno(_mysql) == 1050){
return true; //table exists!
return true; //table exists!
}
print_error();
return false;
......@@ -350,39 +357,24 @@ bool MariaDB::createNewAggregate(std::string& new_suffix){
}
bool MariaDB::updateJobsLastSuffix(std::map<std::string, std::string>& job_map, std::string & suffix){
bool MariaDB::updateJobsLastSuffix(const std::string & job_id_string, const std::string & user, int number_nodes, const std::string& job_id_db, std::string & suffix){
std::lock_guard<std::mutex> lock(mut);
std::vector<std::string> db_update_jobs;
for(auto &job: job_map){
auto found = _jobCache.find(job.first);
if(found != _jobCache.end()){
if(found->second.job_current_table_suffix.size() == 0 || found->second.job_current_table_suffix != suffix){
found->second.job_current_table_suffix = suffix; //set new suffix
db_update_jobs.push_back(job.second); //write on vector to update
}
Job_info_t* job_info = nullptr;
if(_jobCache.findJobInCache(user, number_nodes, job_id_string, job_info)){ //found
if(job_info->job_current_table_suffix.empty() ||job_info->job_current_table_suffix != suffix){
job_info->job_current_table_suffix = suffix; //set new suffix
//must update;
} else {
//no need to update in databse
return true;
}
}
} //not found must update
if(db_update_jobs.empty()) {
return true;
}
std::stringstream build_update;
build_update << "UPDATE Accounting SET aggregate_last_suffix=\'" << suffix
<< "\' WHERE job_id IN (";
unsigned int count = 0;
for (auto & job_db_id : db_update_jobs) {
build_update << job_db_id;
if (count + 1 != db_update_jobs.size()) {
build_update << ",";
} else {
build_update << ")";
}
count++;
}
build_update << "UPDATE Accounting SET aggregate_last_suffix=\'" << suffix << "\' WHERE job_id=" << job_id_db;
auto query = build_update.str();
LOG(debug)<< query;
if (mysql_real_query(_mysql, query.c_str(), query.size())) {
print_error();
return false;
......@@ -393,8 +385,7 @@ bool MariaDB::updateJobsLastSuffix(std::map<std::string, std::string>& job_map,
bool MariaDB::getTableSuffix(std::string & table_suffix){
std::lock_guard<std::mutex> lock(mut);
if (!getCurrentSuffixAggregateTable(table_suffix)
&& !createNewAggregate(table_suffix)) {
if (!getCurrentSuffixAggregateTable(table_suffix) && !createNewAggregate(table_suffix)) {
return false;
}
return true;
......
......@@ -45,14 +45,22 @@ struct Aggregate_info_t {
float severity_average;
};
class MariaDB {
private:
struct Job_info_t {
struct Job_info_t {
std::string job_id_db;
unsigned long long last_seen_timestamp;
std::string job_current_table_suffix;
};
};
class JobCache {
private:
std::map<std::string, Job_info_t> _jobCacheMap; //< Job id string to job data
const std::size_t JOB_CACHE_MAX_SIZE = 10000;
public:
void addJobToCache(const std::string uid, int number_of_nodes, const std::string &job_id_string, const std::string & job_id_db);
bool findJobInCache(const std::string uid, int number_of_nodes, const std::string &job_id_string, Job_info_t * job_info);
};
class MariaDB {
public:
enum Rotation_t {
......@@ -76,8 +84,7 @@ protected:
static std::mutex mut;
static std::once_flag init_once;
bool _initialized;
std::map<std::string, Job_info_t> _jobCache; //< Job id string to job data
const std::size_t JOB_CACHE_MAX_SIZE = 10000;
JobCache _jobCache;
/** print error.
* Prints the mysql error message. If connection is gone (Error 2006) then we also close the connection.
......@@ -87,7 +94,6 @@ protected:
bool getCurrentSuffixAggregateTable(std::string & new_suffix);
bool createNewAggregate(std::string& new_suffix);
void getNewDates(const std::string& last_end_timestamp, std::string & begin_timestamp, std::string & end_timestamp);
void addJobToCache(const std::string &job_id_string, std::string & job_id_db);
public:
......@@ -111,12 +117,12 @@ public:
* @param job_id_strings job id strings including array jobid.
* @param job_id_map job_id_string to job_id (db) map
*/
bool getDBJobIDs(std::vector<std::string> & job_id_strings, std::map<std::string, std::string>& job_id_map);
bool getDBJobID(const std::string & job_id_string, std::string& job_db_id, const std::string & user, int number_nodes);
/**
* Insert job in the accounting table.
*/
bool insertIntoJob(const std::string& job_id_string, const std::string& uid, int & job_id_db, const std::string & suffix);
bool insertIntoJob(const std::string& job_id_string, const std::string& uid, std::string & job_id_db, const std::string & suffix, int number_nodes);
/**
* Insert performance data into the aggregate table (Aggregate_<suffix>
......@@ -128,7 +134,7 @@ public:
* @param job_id_map job_id_string to job_id (db) map
* @param suffix Aggregate table suffix
*/
bool updateJobsLastSuffix(std::map<std::string, std::string>& job_map, std::string & suffix);
bool updateJobsLastSuffix(const std::string & job_id_string, const std::string & user, int number_nodes, const std::string& job_id_db, std::string & suffix);
/**
* Get the next or the current table suffix
......
......@@ -179,35 +179,19 @@ void PerSystSqlOperator::compute(U_Ptr unit, qeJobData& jobData) {
return;
}
std::vector<std::string> job_ids = {jobData.jobId};
std::map<std::string, std::string> job_map;
std::string table_suffix;
if(!_persystdb->getTableSuffix(table_suffix)){
LOG(error) << "failed to create table!";
LOG(error) << "Failed to create Aggregate table!";
return;
}
if(!_persystdb->getDBJobIDs(job_ids, job_map)){
return;
if(!_persystdb->getDBJobID(jobData.jobId, agg_info.job_id_db, jobData.userId, jobData.nodes.size())){
if(!_persystdb->insertIntoJob(jobData.jobId, jobData.userId, agg_info.job_id_db, table_suffix, jobData.nodes.size())){
LOG(error) << "Job insertion not possible, no job id db available for slurm job id" << jobData.jobId;
return;
}
}
// handle jobs which are not present
for(auto &job_id_string : job_ids ){
auto search = job_map.find(job_id_string);
if(search == job_map.end()){ //Not found
int job_id_db;
if(_persystdb->insertIntoJob(job_id_string, jobData.userId, job_id_db, table_suffix)){
agg_info.job_id_db = std::to_string(job_id_db);
} else {
LOG(error) << "Job insertion not possible, no job id db found for slurm job id" << job_id_string;
continue;
}
} else { //found
agg_info.job_id_db = search->second;
}
}
_persystdb->updateJobsLastSuffix(job_map, table_suffix);
_persystdb->updateJobsLastSuffix(jobData.jobId, jobData.userId, jobData.nodes.size(), agg_info.job_id_db, table_suffix);
agg_info.timestamp = (my_timestamp/1e9);
_persystdb->insertInAggregateTable(table_suffix, agg_info);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment