Commit 0f38456f authored by Alessio Netti's avatar Alessio Netti
Browse files

Merge remote-tracking branch 'origin/development' into development

parents 0b053866 3da788c6
...@@ -121,12 +121,24 @@ PerSystDB::~PerSystDB(){ ...@@ -121,12 +121,24 @@ PerSystDB::~PerSystDB(){
bool PerSystDB::getDBJobIDs(std::vector<std::string> & job_id_strings, std::map<std::string, std::string>& job_id_map) { bool PerSystDB::getDBJobIDs(std::vector<std::string> & job_id_strings, std::map<std::string, std::string>& job_id_map) {
std::lock_guard<std::mutex> lock(mut); std::lock_guard<std::mutex> lock(mut);
std::vector<std::string> notfound;
for(auto & job_id_str: job_id_strings){
auto found = _jobCache.find(job_id_str);
if(found != _jobCache.end()){
job_id_map[job_id_str] = found->second.job_id_db;
found->second.last_seen_timestamp = getTimestamp();
} else {
notfound.push_back(job_id_str);
}
}
if(!notfound.size()){ //every job was found
return true;
}
std::stringstream build_query; std::stringstream build_query;
build_query << "SELECT job_id, job_id_string FROM Accounting WHERE job_id_string IN ("; build_query << "SELECT job_id, job_id_string FROM Accounting WHERE job_id_string IN (";
for (std::vector<std::string>::size_type i = 0; i < job_id_strings.size(); for (std::vector<std::string>::size_type i = 0; i < notfound.size(); ++i) {
++i) { build_query << "'" << notfound[i] << "'";
build_query << "'" << job_id_strings[i] << "'"; if (i != notfound.size() - 1) { //not last element
if (i != job_id_strings.size() - 1) { //not last element
build_query << ","; build_query << ",";
} }
} }
...@@ -143,13 +155,28 @@ bool PerSystDB::getDBJobIDs(std::vector<std::string> & job_id_strings, std::map< ...@@ -143,13 +155,28 @@ bool PerSystDB::getDBJobIDs(std::vector<std::string> & job_id_strings, std::map<
MYSQL_ROW row; MYSQL_ROW row;
while ((row = result.fetch_row())) { while ((row = result.fetch_row())) {
if (row[0]) { if (row[0]) {
job_id_map[std::string(row[1])] = row[0]; std::string job_id_db = row[0];
std::string job_id_string = std::string(row[1]);
job_id_map[job_id_string] = job_id_db;
addJobToCache(job_id_string, job_id_db);
} }
} }
} }
return true; return true;
} }
void PerSystDB::addJobToCache(std::string &job_id_string, std::string & job_id_db){
if(_jobCache.size() == JOB_CACHE_MAX_SIZE){ //remove one element before inserting
using MyPairType = std::pair<std::string, PerSystDB::Job_info_t>;
auto smallest = std::min_element(_jobCache.begin(), _jobCache.end(),
[](const MyPairType& l, const MyPairType& r) -> bool {return l.second.last_seen_timestamp < r.second.last_seen_timestamp;});
_jobCache.erase(smallest);
}
Job_info_t ji;
ji.job_id_db = job_id_db;
ji.last_seen_timestamp = getTimestamp();
_jobCache[job_id_string] = ji;
}
bool PerSystDB::getCurrentSuffixAggregateTable(std::string & suffix){ bool PerSystDB::getCurrentSuffixAggregateTable(std::string & suffix){
if(_end_aggregate_timestamp){ if(_end_aggregate_timestamp){
......
...@@ -36,64 +36,103 @@ ...@@ -36,64 +36,103 @@
#include <mutex> #include <mutex>
struct Aggregate_info_t { struct Aggregate_info_t {
std::string job_id_db; std::string job_id_db;
unsigned int timestamp; unsigned int timestamp;
unsigned int property_type_id; unsigned int property_type_id;
unsigned int num_of_observations; unsigned int num_of_observations;
float average; float average;
std::vector<float> quantiles; std::vector<float> quantiles;
float severity_average; float severity_average;
}; };
class PerSystDB {
private:
struct Job_info_t {
std::string job_id_db;
unsigned long long last_seen_timestamp;
};
class PerSystDB {
public: public:
enum Rotation_t { enum Rotation_t {
EVERY_YEAR, EVERY_YEAR, EVERY_MONTH, EVERY_XDAYS //number of days must be provided
EVERY_MONTH, };
EVERY_XDAYS //number of days must be provided
};
protected: protected:
PerSystDB(); PerSystDB();
virtual ~PerSystDB(); virtual ~PerSystDB();
MYSQL *_mysql; MYSQL *_mysql;
Rotation_t _rotation; Rotation_t _rotation;
unsigned int _every_x_days; //ignored except when EVERY_XDAYS is chosen unsigned int _every_x_days; //ignored except when EVERY_XDAYS is chosen
unsigned long long _end_aggregate_timestamp; unsigned long long _end_aggregate_timestamp;
std::string _current_table_suffix; std::string _current_table_suffix;
boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg; boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
static PerSystDB * instance; static PerSystDB * instance;
static std::mutex mut; static std::mutex mut;
bool _initialized; bool _initialized;
std::map<std::string, Job_info_t> _jobCache;
const std::size_t JOB_CACHE_MAX_SIZE = 10000;
void print_error();
/** print error.
* Prints the mysql error message. If connection is gone (Error 2006) then we also close the connection.
* Please check with isInitialized() to initialize it again.
*/
void print_error();
bool getCurrentSuffixAggregateTable(std::string & new_suffix); bool getCurrentSuffixAggregateTable(std::string & new_suffix);
bool createNewAggregate(std::string& new_suffix); bool createNewAggregate(std::string& new_suffix);
void getNewDates(const std::string& last_end_timestamp, std::string & begin_timestamp, std::string & end_timestamp);
void addJobToCache(std::string &job_id_string, std::string & job_id_db);
public: public:
bool initializeConnection(const std::string & host,
const std::string & user, const std::string & password,
const std::string & database_name, Rotation_t rotation, int port =3306, unsigned int every_x_days = 0);
bool finalizeConnection();
/**
* Connect to database.
*/
bool initializeConnection(const std::string & host, const std::string & user, const std::string & password,
const std::string & database_name, Rotation_t rotation, int port = 3306, unsigned int every_x_days = 0);
bool isInitialized(){
return _initialized;
}
/** /**
* Check if job_id (db) exist. If map empty it doesn't exist/job not found is not yet on accounting. * Disconnect
* @param job_id_map job_id_string to job_id (db) map */
*/ bool finalizeConnection();
bool getDBJobIDs(std::vector<std::string> & job_id_strings, std::map<std::string, std::string>& job_id_map);
/**
* Check if job_id (db) exist. If map empty it doesn't exist/job not found is not yet on accounting.
* @param job_id_strings job id strings including array jobid.
* @param job_id_map job_id_string to job_id (db) map
*/
bool getDBJobIDs(std::vector<std::string> & job_id_strings, std::map<std::string, std::string>& job_id_map);
/**
* Insert job in the accounting table.
*/
bool insertIntoJob(const std::string& job_id_string, unsigned long long uid, int & job_id_db, const std::string & suffix); bool insertIntoJob(const std::string& job_id_string, unsigned long long uid, int & job_id_db, const std::string & suffix);
void getNewDates(const std::string& last_end_timestamp, std::string & begin_timestamp, std::string & end_timestamp);
/**
* Insert performance data into the aggregate table (Aggregate_<suffix>
*/
bool insertInAggregateTable(const std::string& suffix, Aggregate_info_t & agg_info); bool insertInAggregateTable(const std::string& suffix, Aggregate_info_t & agg_info);
/**
* Update the last suffix in the Accounting table
*/
bool updateJobsLastSuffix(std::map<std::string, std::string>& job_map, std::string & suffix); bool updateJobsLastSuffix(std::map<std::string, std::string>& job_map, std::string & suffix);
/**
* Get the next or the current table suffix
*/
bool getTableSuffix(std::string & table_suffix); bool getTableSuffix(std::string & table_suffix);
/**
* Singleton object. Get here your instance!
*/
static PerSystDB * getInstance(); static PerSystDB * getInstance();
}; };
......
...@@ -116,6 +116,23 @@ void PerSystSqlOperator::printConfig(LOG_LEVEL ll) { ...@@ -116,6 +116,23 @@ void PerSystSqlOperator::printConfig(LOG_LEVEL ll) {
LOG_VAR(ll) << "\tseverity_max_memory=" << _severity_max_memory; LOG_VAR(ll) << "\tseverity_max_memory=" << _severity_max_memory;
} }
bool PerSystSqlOperator::execOnStart(){
if( _backend == MARIADB ) {
if(!_persystdb->initializeConnection(_conn.host, _conn.user, _conn.password, _conn.database_name, _conn.rotation, _conn.port, _conn.every_x_days)){
LOG(error) << "Database not initialized";
return false;
}
}
return true;
}
void PerSystSqlOperator::execOnStop(){
if( _backend == MARIADB ) {
_persystdb->finalizeConnection();
}
}
void PerSystSqlOperator::compute(U_Ptr unit, qeJobData& jobData) { void PerSystSqlOperator::compute(U_Ptr unit, qeJobData& jobData) {
// Clearing the buffer, if already allocated // Clearing the buffer, if already allocated
_buffer.clear(); _buffer.clear();
...@@ -150,8 +167,13 @@ void PerSystSqlOperator::compute(U_Ptr unit, qeJobData& jobData) { ...@@ -150,8 +167,13 @@ void PerSystSqlOperator::compute(U_Ptr unit, qeJobData& jobData) {
compute_internal(unit, _buffer, agg_info); compute_internal(unit, _buffer, agg_info);
if( _backend == MARIADB ) { if( _backend == MARIADB ) {
if(!_persystdb->initializeConnection(_conn.host, _conn.user, _conn.password, _conn.database_name, _conn.rotation, _conn.port, _conn.every_x_days)) if (!_persystdb->isInitialized()
&& !_persystdb->initializeConnection(_conn.host, _conn.user, _conn.password, _conn.database_name, _conn.rotation,
_conn.port, _conn.every_x_days)) {
LOG(error) << "Database not initialized"; LOG(error) << "Database not initialized";
return;
}
std::stringstream jobidBuilder; std::stringstream jobidBuilder;
jobidBuilder << jobData.jobId; jobidBuilder << jobData.jobId;
...@@ -185,7 +207,6 @@ void PerSystSqlOperator::compute(U_Ptr unit, qeJobData& jobData) { ...@@ -185,7 +207,6 @@ void PerSystSqlOperator::compute(U_Ptr unit, qeJobData& jobData) {
agg_info.timestamp = (my_timestamp/1e9); agg_info.timestamp = (my_timestamp/1e9);
_persystdb->insertInAggregateTable(table_suffix, agg_info); _persystdb->insertInAggregateTable(table_suffix, agg_info);
// _persystdb->finalizeConnection();
} }
} }
......
...@@ -148,6 +148,8 @@ protected: ...@@ -148,6 +148,8 @@ protected:
void compute_internal(U_Ptr& unit, vector<reading_t>& buffer, Aggregate_info_t &agg_info); void compute_internal(U_Ptr& unit, vector<reading_t>& buffer, Aggregate_info_t &agg_info);
double computeSeverityAverage(vector<double> & buffer); double computeSeverityAverage(vector<double> & buffer);
void convertToDoubles(std::vector<reading_t> &buffer, std::vector<double> &douBuffer); void convertToDoubles(std::vector<reading_t> &buffer, std::vector<double> &douBuffer);
bool execOnStart() override;
void execOnStop() override;
}; };
double severity_formula1(double metric, double threshold, double exponent); double severity_formula1(double metric, double threshold, double exponent);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment