Commit ca1364e0 authored by Carla Guillen Carias's avatar Carla Guillen Carias
Browse files

Adding concurrency protection for database.

parent d3561152
......@@ -64,6 +64,9 @@ public:
}
};
PerSystDB * PerSystDB::instance = nullptr;
std::mutex PerSystDB::mut;
void PerSystDB::print_error(){
LOG(error) << "Error(" << mysql_errno(_mysql) << ") [" << mysql_sqlstate(_mysql) << "] \""<< mysql_error(_mysql) << "\"" ;
}
......@@ -74,7 +77,16 @@ PerSystDB::PerSystDB(): _mysql(NULL), _rotation(EVERY_MONTH), _every_x_days(0),
PerSystDB::PerSystDB(MYSQL *mysql, Rotation_t rotation, unsigned int every_x_days) :
_mysql(mysql), _rotation(rotation), _every_x_days(every_x_days), _internal_connection(
false), _end_aggregate_timestamp(0) {
}
PerSystDB * PerSystDB::getInstance(){
// no lock here
if (instance) return instance;
std::lock_guard<std::mutex> lock(mut);
if (instance) return instance;
return instance = new PerSystDB();
}
bool PerSystDB::initializeConnection(const std::string & host, const std::string & user,
......@@ -85,11 +97,13 @@ bool PerSystDB::initializeConnection(const std::string & host, const std::string
print_error();
return false;
}
LOG(debug) << "Successfully connected to mariadb";
return true;
}
bool PerSystDB::finalizeConnection(){
mysql_close(_mysql);
LOG(debug) << "Closed mariadb";
return true;
}
......@@ -101,6 +115,8 @@ PerSystDB::PerSystDB(const std::string & host, const std::string & user,
_mysql = mysql_init(NULL);
if(!mysql_real_connect(_mysql, host.c_str(), user.c_str(), password.c_str(), database_name.c_str(), port, NULL, 0)){
print_error();
} else {
LOG(debug) << "Internal Connection: Successfully connected to mariadb";
}
}
......@@ -108,36 +124,38 @@ PerSystDB::PerSystDB(const std::string & host, const std::string & user,
PerSystDB::~PerSystDB(){
if(_internal_connection){
mysql_close(_mysql);
LOG(debug) << "Internal Connection: Disconnected from mariadb";
}
}
bool PerSystDB::getDBJobIDs(std::vector<std::string> & job_id_strings, std::map<std::string, std::string>& job_id_map ){
std::stringstream build_query;
build_query << "SELECT job_id, job_id_string FROM Accounting WHERE job_id_string IN (";
for(std::vector<std::string>::size_type i = 0; i < job_id_strings.size(); ++i){
build_query << "'" << job_id_strings[i] << "'";
if (i != job_id_strings.size()-1) { //not last element
build_query << ",";
}
}
build_query << ")";
auto query = build_query.str();
LOG(debug) << query;
if(mysql_real_query(_mysql, query.c_str(), query.size())){
print_error();
return false;
}
bool PerSystDB::getDBJobIDs(std::vector<std::string> & job_id_strings, std::map<std::string, std::string>& job_id_map) {
std::stringstream build_query;
build_query << "SELECT job_id, job_id_string FROM Accounting WHERE job_id_string IN (";
for (std::vector<std::string>::size_type i = 0; i < job_id_strings.size();
++i) {
build_query << "'" << job_id_strings[i] << "'";
if (i != job_id_strings.size() - 1) { //not last element
build_query << ",";
}
}
build_query << ")";
auto query = build_query.str();
LOG(debug)<< query;
if (mysql_real_query(_mysql, query.c_str(), query.size())) {
print_error();
return false;
}
SQLResult result(_mysql);
if(result.get()){
MYSQL_ROW row;
while((row = result.fetch_row())){
if(row[0]){
job_id_map[std::string(row[1])]= row[0];
}
}
}
return true;
SQLResult result(_mysql);
if (result.get()) {
MYSQL_ROW row;
while ((row = result.fetch_row())) {
if (row[0]) {
job_id_map[std::string(row[1])] = row[0];
}
}
}
return true;
}
......@@ -182,20 +200,24 @@ bool PerSystDB::getCurrentSuffixAggregateTable(std::string & suffix){
bool PerSystDB::insertIntoJob(const std::string& job_id_string, unsigned long long uid, int & job_id_db, const std::string & suffix){
std::stringstream build_insert;
build_insert << "INSERT INTO Accounting (job_id_string, user, aggregate_first_suffix, aggregate_last_suffix) VALUES (\'" << job_id_string << "\',\'";
auto* pass = getpwuid(static_cast<uid_t>(uid));
build_insert << pass->pw_name << "\',\'";
build_insert << suffix << "\',\'" << suffix << "\')";
std::string query = build_insert.str();
LOG(debug) << query;
std::stringstream build_insert;
build_insert << "INSERT INTO Accounting (job_id_string, user, aggregate_first_suffix, aggregate_last_suffix) VALUES (\'" << job_id_string << "\',\'";
auto* pass = getpwuid(static_cast<uid_t>(uid));
if (pass == nullptr) {
LOG(error)<< "User " << uid << " not found in system.";
return false;
}
build_insert << pass->pw_name << "\',\'";
build_insert << suffix << "\',\'" << suffix << "\')";
std::string query = build_insert.str();
LOG(debug)<< query;
if(mysql_real_query(_mysql, query.c_str(), query.size())){
print_error();
return false;
}
job_id_db = mysql_insert_id(_mysql);
return true;
if (mysql_real_query(_mysql, query.c_str(), query.size())) {
print_error();
return false;
}
job_id_db = mysql_insert_id(_mysql);
return true;
}
......
......@@ -33,6 +33,7 @@
#include <vector>
#include <string>
#include <map>
#include <mutex>
struct Aggregate_info_t {
std::string job_id_db;
......@@ -64,6 +65,8 @@ protected:
boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
void print_error();
static PerSystDB * instance;
static std::mutex mut;
public:
PerSystDB(); //Used for later initialization of mysql
bool initializeConnection(const std::string & host,
......@@ -90,6 +93,7 @@ public:
bool insertInAggregateTable(const std::string& suffix, Aggregate_info_t & agg_info);
bool updateJobsLastSuffix(std::map<std::string, std::string>& job_map, std::string & suffix);
bool getTableSuffix(std::string & table_suffix);
static PerSystDB * getInstance();
};
......
......@@ -47,13 +47,15 @@
#include "../../includes/UnitTemplate.h"
int PerSystSqlOperator::_number_of_calls = 0;
PerSystDB PerSystSqlOperator::persystdb;
bool PerSystSqlOperator::persystdb_initialized = false;
std::mutex PerSystSqlOperator::mut;
PerSystSqlOperator::PerSystSqlOperator(const std::string& name) :
OperatorTemplate(name), JobOperatorTemplate(name), _number_of_even_quantiles(
0), _severity_formula(NOFORMULA), _severity_threshold(0), _severity_exponent(
0), _severity_max_memory(0), _go_back_ns(0), _backend(DEFAULT), _scaling_factor(
1), _property_id(0) {
_persystdb = PerSystDB::getInstance();
}
PerSystSqlOperator::~PerSystSqlOperator() {
......@@ -91,6 +93,7 @@ void PerSystSqlOperator::copy(const PerSystSqlOperator& other){
this->_conn.rotation = other._conn.rotation;
this->_conn.user = other._conn.user;
this->_property_id = other._property_id;
this->_persystdb = other._persystdb;
}
void PerSystSqlOperator::printConfig(LOG_LEVEL ll) {
......@@ -138,17 +141,24 @@ void PerSystSqlOperator::compute(U_Ptr unit, qeJobData& jobData) {
}
}
}
static bool persystdb_initialized = false;
if ( _backend == MARIADB && !persystdb_initialized) {
bool persystdb_initialized = persystdb.initializeConnection(_conn.host, _conn.user, _conn.password, _conn.database_name, _conn.rotation, _conn.port, _conn.every_x_days);
if(!persystdb_initialized) {
LOG(error) << "Unable to establish connection to database";
return;
}
if(_buffer.size() == 0){
LOG(error) << "PerSystSql Operator " << _name << ": no data in queryEngine found!";
return;
}
Aggregate_info_t agg_info;
std::string table_suffix;
if(_backend == MARIADB){
compute_internal(unit, _buffer, agg_info);
if( _backend == MARIADB ) {
std::lock_guard<std::mutex> lk(mut);
if (!persystdb_initialized) {
bool persystdb_initialized = _persystdb->initializeConnection(_conn.host, _conn.user, _conn.password, _conn.database_name, _conn.rotation, _conn.port, _conn.every_x_days);
if(!persystdb_initialized) {
LOG(error) << "Unable to establish connection to database";
return;
}
}
std::stringstream jobidBuilder;
jobidBuilder << jobData.jobId;
......@@ -156,35 +166,32 @@ void PerSystSqlOperator::compute(U_Ptr unit, qeJobData& jobData) {
job_ids.push_back(jobidBuilder.str());
std::map<std::string, std::string> job_map;
if(!persystdb.getTableSuffix(table_suffix)){
LOG(error) << "failed to create table!";
return;
}
if(!persystdb.getDBJobIDs(job_ids, job_map)){
return;
}
// handle jobs which are not present
for(auto &job_id_string : job_ids ){
auto search = job_map.find(job_id_string);
if(search == job_map.end()){ //Not found
int job_id_db;
if(persystdb.insertIntoJob(job_id_string, jobData.userId, job_id_db, table_suffix)){
agg_info.job_id_db = std::to_string(job_id_db);
} else {
continue;
}
}
}
agg_info.timestamp = (my_timestamp/1e9);
}
std::string table_suffix;
if(!_persystdb->getTableSuffix(table_suffix)){
LOG(error) << "failed to create table!";
return;
}
if(!_persystdb->getDBJobIDs(job_ids, job_map)){
return;
}
compute_internal(unit, _buffer, agg_info);
// handle jobs which are not present
for(auto &job_id_string : job_ids ){
auto search = job_map.find(job_id_string);
if(search == job_map.end()){ //Not found
int job_id_db;
if(_persystdb->insertIntoJob(job_id_string, jobData.userId, job_id_db, table_suffix)){
agg_info.job_id_db = std::to_string(job_id_db);
} else {
continue;
}
}
}
agg_info.timestamp = (my_timestamp/1e9);
if(_backend == MARIADB){
persystdb.insertInAggregateTable(table_suffix, agg_info);
_persystdb->insertInAggregateTable(table_suffix, agg_info);
if(_number_of_calls % 10 == 0 && persystdb_initialized){
persystdb.finalizeConnection();
_persystdb->finalizeConnection();
persystdb_initialized = false;
}
_number_of_calls++;
......
......@@ -28,13 +28,17 @@
#ifndef ANALYTICS_OPERATORS_PERSYSTSQL_PERSYSTSQLOPERATOR_H_
#define ANALYTICS_OPERATORS_PERSYSTSQL_PERSYSTSQLOPERATOR_H_
#include <string>
#include <vector>
#include <mutex>
#include "../../../common/include/cacheentry.h"
#include "../../../common/include/logging.h"
#include "../../includes/JobOperatorTemplate.h"
#include "../aggregator/AggregatorSensorBase.h"
#include "PerSystDB.h"
class PerSystSqlOperator: public JobOperatorTemplate<AggregatorSensorBase>{
public:
enum Formula {
......@@ -137,8 +141,10 @@ private:
MariaDB_conn_t _conn;
static int _number_of_calls;
int _property_id;
static PerSystDB persystdb;
PerSystDB *_persystdb;
const int SCALING_FACTOR_SEVERITY=1000000;
static bool persystdb_initialized;
static std::mutex mut;
protected:
virtual void compute(U_Ptr unit) override;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment