Commit ea761e77 authored by Carla Guillen's avatar Carla Guillen
Browse files

Making PerSystDB singleton and adding there the thread locks.

parent 954c8f1d
......@@ -68,10 +68,13 @@ PerSystDB * PerSystDB::instance = nullptr;
std::mutex PerSystDB::mut;
void PerSystDB::print_error(){
LOG(error) << "Error(" << mysql_errno(_mysql) << ") [" << mysql_sqlstate(_mysql) << "] \""<< mysql_error(_mysql) << "\"" ;
LOG(error) << "Error(" << mysql_errno(_mysql) << ") [" << mysql_sqlstate(_mysql) << "] \""<< mysql_error(_mysql) << "\"" ;
if(mysql_errno(_mysql) == 2006){
_initialized = false;
}
}
PerSystDB::PerSystDB(): _mysql(NULL), _rotation(EVERY_MONTH), _every_x_days(0), _end_aggregate_timestamp(0) {
PerSystDB::PerSystDB(): _mysql(NULL), _rotation(EVERY_MONTH), _every_x_days(0), _end_aggregate_timestamp(0), _initialized(false) {
}
PerSystDB * PerSystDB::getInstance(){
......@@ -85,20 +88,28 @@ PerSystDB * PerSystDB::getInstance(){
}
bool PerSystDB::initializeConnection(const std::string & host, const std::string & user,
const std::string & password, const std::string & database_name,
Rotation_t rotation, int port, unsigned int every_x_days) {
_mysql = mysql_init(NULL);
if(!mysql_real_connect(_mysql, host.c_str(), user.c_str(), password.c_str(), database_name.c_str(), port, NULL, 0)){
print_error();
return false;
const std::string & password, const std::string & database_name,
Rotation_t rotation, int port, unsigned int every_x_days) {
std::lock_guard<std::mutex> lock(mut);
if (!_initialized) {
_mysql = mysql_init(NULL);
if(!mysql_real_connect(_mysql, host.c_str(), user.c_str(), password.c_str(), database_name.c_str(), port, NULL, 0)){
print_error();
return false;
}
LOG(debug) << "Successfully connected to mariadb";
_initialized = true;
}
LOG(debug) << "Successfully connected to mariadb";
return true;
}
bool PerSystDB::finalizeConnection(){
mysql_close(_mysql);
LOG(debug) << "Closed mariadb";
std::lock_guard<std::mutex> lock(mut);
if(_initialized){
mysql_close(_mysql);
LOG(debug) << "Closed mariadb";
_initialized = false;
}
return true;
}
......@@ -108,6 +119,7 @@ PerSystDB::~PerSystDB(){
}
bool PerSystDB::getDBJobIDs(std::vector<std::string> & job_id_strings, std::map<std::string, std::string>& job_id_map) {
std::lock_guard<std::mutex> lock(mut);
std::stringstream build_query;
build_query << "SELECT job_id, job_id_string FROM Accounting WHERE job_id_string IN (";
for (std::vector<std::string>::size_type i = 0; i < job_id_strings.size();
......@@ -139,47 +151,48 @@ bool PerSystDB::getDBJobIDs(std::vector<std::string> & job_id_strings, std::map<
bool PerSystDB::getCurrentSuffixAggregateTable(std::string & suffix){
if(_end_aggregate_timestamp){
auto now_uts = getTimestamp();
if(now_uts < _end_aggregate_timestamp) { //suffix found, don't do anything
suffix = _current_table_suffix;
return true;
}
}
auto right_now = boost::posix_time::second_clock::local_time();
auto date_time = boost::posix_time::to_iso_extended_string(right_now);
std::replace( date_time.begin(), date_time.end(), 'T', ' ');
std::stringstream build_query;
build_query << "SELECT suffix, UNIX_TIMESTAMP(end_timestamp) FROM SuffixToAggregateTable WHERE begin_timestamp < \'";
build_query << date_time << "\' AND end_timestamp > \'" << date_time << "\'";
auto query = build_query.str();
LOG(debug) << query;
if(mysql_real_query(_mysql, query.c_str(), query.size())){
print_error();
return false;
}
SQLResult result(_mysql);
if(result.get()){
MYSQL_ROW row;
while((row = result.fetch_row())){
if(row[0] && row[1]){
suffix = std::string(row[0]);
_current_table_suffix = suffix;
std::string row1(row[1]);
_end_aggregate_timestamp = std::stoull(row1) * 1e9;
return true;
} else {
return false;
}
}
}
return false;
if(_end_aggregate_timestamp){
auto now_uts = getTimestamp();
if(now_uts < _end_aggregate_timestamp) { //suffix found, don't do anything
suffix = _current_table_suffix;
return true;
}
}
auto right_now = boost::posix_time::second_clock::local_time();
auto date_time = boost::posix_time::to_iso_extended_string(right_now);
std::replace( date_time.begin(), date_time.end(), 'T', ' ');
std::stringstream build_query;
build_query << "SELECT suffix, UNIX_TIMESTAMP(end_timestamp) FROM SuffixToAggregateTable WHERE begin_timestamp < \'";
build_query << date_time << "\' AND end_timestamp > \'" << date_time << "\'";
auto query = build_query.str();
LOG(debug) << query;
if(mysql_real_query(_mysql, query.c_str(), query.size())){
print_error();
return false;
}
SQLResult result(_mysql);
if(result.get()){
MYSQL_ROW row;
while((row = result.fetch_row())){
if(row[0] && row[1]){
suffix = std::string(row[0]);
_current_table_suffix = suffix;
std::string row1(row[1]);
_end_aggregate_timestamp = std::stoull(row1) * 1e9;
return true;
} else {
return false;
}
}
}
return false;
}
bool PerSystDB::insertIntoJob(const std::string& job_id_string, unsigned long long uid, int & job_id_db, const std::string & suffix){
std::lock_guard<std::mutex> lock(mut);
std::stringstream build_insert;
build_insert << "INSERT INTO Accounting (job_id_string, user, aggregate_first_suffix, aggregate_last_suffix) VALUES (\'" << job_id_string << "\',\'";
auto* pass = getpwuid(static_cast<uid_t>(uid));
......@@ -202,6 +215,7 @@ bool PerSystDB::insertIntoJob(const std::string& job_id_string, unsigned long lo
bool PerSystDB::insertInAggregateTable(const std::string& suffix, Aggregate_info_t & agg){
std::lock_guard<std::mutex> lock(mut);
std::stringstream build_insert;
build_insert << "INSERT INTO Aggregate_" << suffix << " VALUES ( FROM_UNIXTIME(\'" << agg.timestamp;
build_insert << "\'), \'" << agg.job_id_db;
......@@ -303,10 +317,11 @@ bool PerSystDB::updateJobsLastSuffix(std::map<std::string, std::string>& job_map
bool PerSystDB::getTableSuffix(std::string & table_suffix){
if(!getCurrentSuffixAggregateTable(table_suffix) && !createNewAggregate(table_suffix)){
return false;
}
return true;
std::lock_guard<std::mutex> lock(mut);
if(!getCurrentSuffixAggregateTable(table_suffix) && !createNewAggregate(table_suffix)){
return false;
}
return true;
}
void PerSystDB::getNewDates(const std::string& last_end_timestamp, std::string & begin_timestamp, std::string & end_timestamp){
......
......@@ -56,6 +56,10 @@ public:
};
protected:
PerSystDB();
virtual ~PerSystDB();
MYSQL *_mysql;
Rotation_t _rotation;
unsigned int _every_x_days; //ignored except when EVERY_XDAYS is chosen
......@@ -63,14 +67,16 @@ protected:
std::string _current_table_suffix;
boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
void print_error();
static PerSystDB * instance;
static std::mutex mut;
virtual ~PerSystDB();
bool _initialized;
void print_error();
bool getCurrentSuffixAggregateTable(std::string & new_suffix);
bool createNewAggregate(std::string& new_suffix);
public:
PerSystDB();
bool initializeConnection(const std::string & host,
const std::string & user, const std::string & password,
const std::string & database_name, Rotation_t rotation, int port =3306, unsigned int every_x_days = 0);
......@@ -81,7 +87,7 @@ public:
* Check if job_id (db) exist. If map empty it doesn't exist/job not found is not yet on accounting.
* @param job_id_map job_id_string to job_id (db) map
*/
bool getDBJobIDs(std::vector<std::string> & job_id_strings, std::map<std::string, std::string>& job_id_map);
bool getDBJobIDs(std::vector<std::string> & job_id_strings, std::map<std::string, std::string>& job_id_map);
bool insertIntoJob(const std::string& job_id_string, unsigned long long uid, int & job_id_db, const std::string & suffix);
void getNewDates(const std::string& last_end_timestamp, std::string & begin_timestamp, std::string & end_timestamp);
......
......@@ -46,9 +46,6 @@
#include "../../includes/QueryEngine.h"
#include "../../includes/UnitTemplate.h"
int PerSystSqlOperator::_number_of_calls = 0;
bool PerSystSqlOperator::persystdb_initialized = false;
std::mutex PerSystSqlOperator::mut;
PerSystSqlOperator::PerSystSqlOperator(const std::string& name) :
OperatorTemplate(name), JobOperatorTemplate(name), _number_of_even_quantiles(
......@@ -153,14 +150,8 @@ void PerSystSqlOperator::compute(U_Ptr unit, qeJobData& jobData) {
compute_internal(unit, _buffer, agg_info);
if( _backend == MARIADB ) {
std::lock_guard<std::mutex> lk(mut);
if (!persystdb_initialized) {
bool persystdb_initialized = _persystdb->initializeConnection(_conn.host, _conn.user, _conn.password, _conn.database_name, _conn.rotation, _conn.port, _conn.every_x_days);
if(!persystdb_initialized) {
LOG(error) << "Unable to establish connection to database";
return;
}
}
if(!_persystdb->initializeConnection(_conn.host, _conn.user, _conn.password, _conn.database_name, _conn.rotation, _conn.port, _conn.every_x_days))
LOG(error) << "Database not initialized";
std::stringstream jobidBuilder;
jobidBuilder << jobData.jobId;
......@@ -194,11 +185,7 @@ void PerSystSqlOperator::compute(U_Ptr unit, qeJobData& jobData) {
agg_info.timestamp = (my_timestamp/1e9);
_persystdb->insertInAggregateTable(table_suffix, agg_info);
if(_number_of_calls % 10 == 0 && persystdb_initialized){
_persystdb->finalizeConnection();
persystdb_initialized = false;
}
_number_of_calls++;
// _persystdb->finalizeConnection();
}
}
......
......@@ -139,12 +139,9 @@ private:
};
MariaDB_conn_t _conn;
static int _number_of_calls;
int _property_id;
PerSystDB *_persystdb;
const int SCALING_FACTOR_SEVERITY=1000000;
static bool persystdb_initialized;
static std::mutex mut;
protected:
virtual void compute(U_Ptr unit) override;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment