In January 2021 we will introduce a 10 GB quota for project repositories. Higher limits for individual projects will be available on request. Please see https://doku.lrz.de/display/PUBLIC/GitLab for more information.

Commit 421dcf37 authored by Carla Guillen Carias's avatar Carla Guillen Carias

Making checks since different collectors will be concurrently updating the database

parent 0f2b3e30
......@@ -117,16 +117,13 @@ MariaDB::~MariaDB(){
}
bool MariaDB::getDBJobIDs(std::vector<std::string> & job_id_strings, std::map<std::string, std::string>& job_id_map) {
std::lock_guard<std::mutex> lock(mut);
std::vector<std::string> notfound;
std::unique_lock<std::mutex> lock(mut);
lock.unlock();
for(auto & job_id_str: job_id_strings){
auto found = _jobCache.find(job_id_str);
if(found != _jobCache.end()){
job_id_map[job_id_str] = found->second.job_id_db;
lock.lock();
found->second.last_seen_timestamp = getTimestamp();
lock.unlock();
} else {
notfound.push_back(job_id_str);
}
......@@ -145,7 +142,6 @@ bool MariaDB::getDBJobIDs(std::vector<std::string> & job_id_strings, std::map<st
build_query << ")";
auto query = build_query.str();
LOG(debug)<< query;
lock.lock();
if (mysql_real_query(_mysql, query.c_str(), query.size())) {
print_error();
return false;
......@@ -166,7 +162,7 @@ bool MariaDB::getDBJobIDs(std::vector<std::string> & job_id_strings, std::map<st
return true;
}
void MariaDB::addJobToCache(std::string &job_id_string, std::string & job_id_db){
void MariaDB::addJobToCache(const std::string &job_id_string, std::string & job_id_db){
//remove one element before inserting (the last condition (_jobCache.size() > JOB_CACHE_MAX_SIZE) shouldn't really happen...
if(_jobCache.size() == JOB_CACHE_MAX_SIZE || _jobCache.size() > JOB_CACHE_MAX_SIZE){
using MyPairType = std::pair<std::string, MariaDB::Job_info_t>;
......@@ -222,19 +218,55 @@ bool MariaDB::getCurrentSuffixAggregateTable(std::string & suffix){
bool MariaDB::insertIntoJob(const std::string& job_id_string, const std::string& uid, int & job_id_db, const std::string & suffix){
std::stringstream build_insert;
build_insert << "INSERT INTO Accounting (job_id_string, user, aggregate_first_suffix, aggregate_last_suffix) VALUES (\'" << job_id_string << "\',\'";
build_insert << uid << "\',\'";
build_insert << suffix << "\',\'" << suffix << "\')";
std::string query = build_insert.str();
LOG(debug)<< query;
std::lock_guard<std::mutex> lock(mut);
if (mysql_real_query(_mysql, query.c_str(), query.size())) {
//maybe another thread did this for us
auto found = _jobCache.find(job_id_string);
if(found != _jobCache.end()){
job_id_db = std::stoi(found->second.job_id_db);
found->second.last_seen_timestamp = getTimestamp();
return true;
}
//Also check that job was not inserted by another collector
std::stringstream build_query;
build_query << "SELECT job_id, job_id_string FROM Accounting WHERE job_id_string ='";
build_query << job_id_string << "' AND user='" << uid <<"'";
auto select_query = build_query.str();
LOG(debug) << select_query;
if (mysql_real_query(_mysql, select_query.c_str(), select_query.size())) {
print_error();
return false;
}
job_id_db = mysql_insert_id(_mysql);
bool job_found_in_db = false;
SQLResult result(_mysql);
if (result.get()) {
MYSQL_ROW row;
while ((row = result.fetch_row())) {
if (row[0]) {
std::string db_job_id = row[0];
addJobToCache(job_id_string, db_job_id);
job_found_in_db=true;
job_id_db = std::stoi(db_job_id);
}
}
}
if(!job_found_in_db) {
std::stringstream build_insert;
build_insert << "INSERT IGNORE INTO Accounting (job_id_string, user, aggregate_first_suffix, aggregate_last_suffix) VALUES (\'" << job_id_string << "\',\'";
build_insert << uid << "\',\'";
build_insert << suffix << "\',\'" << suffix << "\')";
std::string query = build_insert.str();
LOG(debug)<< query;
if (mysql_real_query(_mysql, query.c_str(), query.size())) {
print_error();
return false;
}
job_id_db = mysql_insert_id(_mysql);
}
return true;
}
......
......@@ -87,7 +87,7 @@ protected:
bool getCurrentSuffixAggregateTable(std::string & new_suffix);
bool createNewAggregate(std::string& new_suffix);
void getNewDates(const std::string& last_end_timestamp, std::string & begin_timestamp, std::string & end_timestamp);
void addJobToCache(std::string &job_id_string, std::string & job_id_db);
void addJobToCache(const std::string &job_id_string, std::string & job_id_db);
public:
......
......@@ -199,7 +199,8 @@ void PerSystSqlOperator::compute(U_Ptr unit, qeJobData& jobData) {
int job_id_db;
if(_persystdb->insertIntoJob(job_id_string, jobData.userId, job_id_db, table_suffix)){
agg_info.job_id_db = std::to_string(job_id_db);
} else { //Todo: error message?
} else {
LOG(error) << "Job insertion not possible, no job id db found for slurm job id" << job_id_string;
continue;
}
} else { //found
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment