Commit 97bcb486 authored by Carla Guillen Carias's avatar Carla Guillen Carias
Browse files

PerSystDB: adding job cache to avoid querying mysql job ids. executeOnStart...

PerSystDB: adding job cache to avoid querying mysql job ids. executeOnStart and executeOnStop are now used in the PerSystSqlOperator.
parent fed7252c
......@@ -121,12 +121,24 @@ PerSystDB::~PerSystDB(){
bool PerSystDB::getDBJobIDs(std::vector<std::string> & job_id_strings, std::map<std::string, std::string>& job_id_map) {
std::lock_guard<std::mutex> lock(mut);
std::vector<std::string> notfound;
for(auto & job_id_str: job_id_strings){
auto found = _jobCache.find(job_id_str);
if(found != _jobCache.end()){
job_id_map[job_id_str] = found->second.job_id_db;
found->second.last_seen_timestamp = getTimestamp();
} else {
notfound.push_back(job_id_str);
}
}
if(!notfound.size()){ //every job was found
return true;
}
std::stringstream build_query;
build_query << "SELECT job_id, job_id_string FROM Accounting WHERE job_id_string IN (";
for (std::vector<std::string>::size_type i = 0; i < job_id_strings.size();
++i) {
build_query << "'" << job_id_strings[i] << "'";
if (i != job_id_strings.size() - 1) { //not last element
for (std::vector<std::string>::size_type i = 0; i < notfound.size(); ++i) {
build_query << "'" << notfound[i] << "'";
if (i != notfound.size() - 1) { //not last element
build_query << ",";
}
}
......@@ -143,13 +155,28 @@ bool PerSystDB::getDBJobIDs(std::vector<std::string> & job_id_strings, std::map<
MYSQL_ROW row;
while ((row = result.fetch_row())) {
if (row[0]) {
job_id_map[std::string(row[1])] = row[0];
std::string job_id_db = row[0];
std::string job_id_string = std::string(row[1]);
job_id_map[job_id_string] = job_id_db;
addJobToCache(job_id_string, job_id_db);
}
}
}
return true;
}
void PerSystDB::addJobToCache(std::string &job_id_string, std::string & job_id_db){
if(_jobCache.size() == JOB_CACHE_MAX_SIZE){ //remove one element before inserting
using MyPairType = std::pair<std::string, PerSystDB::Job_info_t>;
auto smallest = std::min_element(_jobCache.begin(), _jobCache.end(),
[](const MyPairType& l, const MyPairType& r) -> bool {return l.second.last_seen_timestamp < r.second.last_seen_timestamp;});
_jobCache.erase(smallest);
}
Job_info_t ji;
ji.job_id_db = job_id_db;
ji.last_seen_timestamp = getTimestamp();
_jobCache[job_id_string] = ji;
}
bool PerSystDB::getCurrentSuffixAggregateTable(std::string & suffix){
if(_end_aggregate_timestamp){
......
......@@ -36,64 +36,103 @@
#include <mutex>
struct Aggregate_info_t {
std::string job_id_db;
unsigned int timestamp;
unsigned int property_type_id;
unsigned int num_of_observations;
float average;
std::vector<float> quantiles;
float severity_average;
std::string job_id_db;
unsigned int timestamp;
unsigned int property_type_id;
unsigned int num_of_observations;
float average;
std::vector<float> quantiles;
float severity_average;
};
class PerSystDB {
private:
struct Job_info_t {
std::string job_id_db;
unsigned long long last_seen_timestamp;
};
class PerSystDB {
public:
enum Rotation_t {
EVERY_YEAR,
EVERY_MONTH,
EVERY_XDAYS //number of days must be provided
};
enum Rotation_t {
EVERY_YEAR, EVERY_MONTH, EVERY_XDAYS //number of days must be provided
};
protected:
PerSystDB();
virtual ~PerSystDB();
MYSQL *_mysql;
Rotation_t _rotation;
unsigned int _every_x_days; //ignored except when EVERY_XDAYS is chosen
unsigned long long _end_aggregate_timestamp;
std::string _current_table_suffix;
boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
MYSQL *_mysql;
Rotation_t _rotation;
unsigned int _every_x_days; //ignored except when EVERY_XDAYS is chosen
unsigned long long _end_aggregate_timestamp;
std::string _current_table_suffix;
boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
static PerSystDB * instance;
static std::mutex mut;
static PerSystDB * instance;
static std::mutex mut;
bool _initialized;
void print_error();
std::map<std::string, Job_info_t> _jobCache;
const std::size_t JOB_CACHE_MAX_SIZE = 10000;
/** print error.
* Prints the mysql error message. If connection is gone (Error 2006) then we also close the connection.
* Please check with isInitialized() to initialize it again.
*/
void print_error();
bool getCurrentSuffixAggregateTable(std::string & new_suffix);
bool createNewAggregate(std::string& new_suffix);
void getNewDates(const std::string& last_end_timestamp, std::string & begin_timestamp, std::string & end_timestamp);
void addJobToCache(std::string &job_id_string, std::string & job_id_db);
public:
bool initializeConnection(const std::string & host,
const std::string & user, const std::string & password,
const std::string & database_name, Rotation_t rotation, int port =3306, unsigned int every_x_days = 0);
bool finalizeConnection();
/**
* Connect to database.
*/
bool initializeConnection(const std::string & host, const std::string & user, const std::string & password,
const std::string & database_name, Rotation_t rotation, int port = 3306, unsigned int every_x_days = 0);
bool isInitialized(){
return _initialized;
}
/**
* Check if job_id (db) exist. If map empty it doesn't exist/job not found is not yet on accounting.
* @param job_id_map job_id_string to job_id (db) map
*/
bool getDBJobIDs(std::vector<std::string> & job_id_strings, std::map<std::string, std::string>& job_id_map);
/**
* Disconnect
*/
bool finalizeConnection();
/**
* Check if job_id (db) exist. If map empty it doesn't exist/job not found is not yet on accounting.
* @param job_id_strings job id strings including array jobid.
* @param job_id_map job_id_string to job_id (db) map
*/
bool getDBJobIDs(std::vector<std::string> & job_id_strings, std::map<std::string, std::string>& job_id_map);
/**
* Insert job in the accounting table.
*/
bool insertIntoJob(const std::string& job_id_string, unsigned long long uid, int & job_id_db, const std::string & suffix);
void getNewDates(const std::string& last_end_timestamp, std::string & begin_timestamp, std::string & end_timestamp);
/**
* Insert performance data into the aggregate table (Aggregate_<suffix>
*/
bool insertInAggregateTable(const std::string& suffix, Aggregate_info_t & agg_info);
/**
* Update the last suffix in the Accounting table
*/
bool updateJobsLastSuffix(std::map<std::string, std::string>& job_map, std::string & suffix);
/**
* Get the next or the current table suffix
*/
bool getTableSuffix(std::string & table_suffix);
/**
* Singleton object. Get here your instance!
*/
static PerSystDB * getInstance();
};
......
......@@ -116,6 +116,23 @@ void PerSystSqlOperator::printConfig(LOG_LEVEL ll) {
LOG_VAR(ll) << "\tseverity_max_memory=" << _severity_max_memory;
}
bool PerSystSqlOperator::execOnStart(){
if( _backend == MARIADB ) {
if(!_persystdb->initializeConnection(_conn.host, _conn.user, _conn.password, _conn.database_name, _conn.rotation, _conn.port, _conn.every_x_days)){
LOG(error) << "Database not initialized";
return false;
}
}
return true;
}
void PerSystSqlOperator::execOnStop(){
if( _backend == MARIADB ) {
_persystdb->finalizeConnection();
}
}
void PerSystSqlOperator::compute(U_Ptr unit, qeJobData& jobData) {
// Clearing the buffer, if already allocated
_buffer.clear();
......@@ -150,8 +167,13 @@ void PerSystSqlOperator::compute(U_Ptr unit, qeJobData& jobData) {
compute_internal(unit, _buffer, agg_info);
if( _backend == MARIADB ) {
if(!_persystdb->initializeConnection(_conn.host, _conn.user, _conn.password, _conn.database_name, _conn.rotation, _conn.port, _conn.every_x_days))
if (!_persystdb->isInitialized()
&& !_persystdb->initializeConnection(_conn.host, _conn.user, _conn.password, _conn.database_name, _conn.rotation,
_conn.port, _conn.every_x_days)) {
LOG(error) << "Database not initialized";
return;
}
std::stringstream jobidBuilder;
jobidBuilder << jobData.jobId;
......@@ -185,7 +207,6 @@ void PerSystSqlOperator::compute(U_Ptr unit, qeJobData& jobData) {
agg_info.timestamp = (my_timestamp/1e9);
_persystdb->insertInAggregateTable(table_suffix, agg_info);
// _persystdb->finalizeConnection();
}
}
......
......@@ -148,6 +148,8 @@ protected:
void compute_internal(U_Ptr& unit, vector<reading_t>& buffer, Aggregate_info_t &agg_info);
double computeSeverityAverage(vector<double> & buffer);
void convertToDoubles(std::vector<reading_t> &buffer, std::vector<double> &douBuffer);
bool execOnStart() override;
void execOnStop() override;
};
double severity_formula1(double metric, double threshold, double exponent);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment