Commit 45d0f53d authored by Carla Guillen Carias's avatar Carla Guillen Carias
Browse files

Class for persystsql operator. Performs inserts into the database.

parent e2fc04b5
//================================================================================
// Name : PerSystDB.cpp
// Author : Carla Guillen
// Contact : info@dcdb.it
// Copyright : Leibniz Supercomputing Centre
// Description : Template implementing features to use Units in Operators.
//================================================================================
//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2018-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//================================================================================
#include "PerSystDB.h"
#include <string>
#include <sstream>
#include <iostream>
#include <vector>
#include <map>
#include <sys/types.h>
#include <chrono> // chrono::system_clock
#include <ctime> // localtime
#include <iomanip> // put_time
#include <sys/types.h> // uid_t
#include <pwd.h> // getpwuid
#include "PerSystDB.h"
class SQLResult {
private:
MYSQL_RES * _result;
public:
SQLResult(MYSQL * mysql){
_result= mysql_store_result(mysql);
}
~SQLResult(){
mysql_free_result(_result);
}
MYSQL_RES * get(){
return _result;
}
MYSQL_ROW fetch_row(){
return mysql_fetch_row(_result);
}
};
void PerSystDB::print_error(){
std::cout << "Error(" << mysql_errno(_mysql) << ") [" << mysql_sqlstate(_mysql) << "] \""<< mysql_error(_mysql) << "\"" << std::endl;
//mysql_close(mysql);
}
PerSystDB::PerSystDB(MYSQL *mysql, Rotation_t rotation, unsigned int every_x_days): _mysql(mysql), _rotation(rotation), _every_x_days(every_x_days), _internal_connection(false) {
}
PerSystDB::PerSystDB(const std::string & host, const std::string & user, const std::string & password, const std::string & database_name, int port, Rotation_t rotation, unsigned int every_x_days): _rotation(rotation), _every_x_days(every_x_days), _internal_connection(true) {
_mysql = mysql_init(NULL);
if(!mysql_real_connect(_mysql, host.c_str(), user.c_str(), password.c_str(), database_name.c_str(), port, NULL, 0)){
print_error();
}
}
PerSystDB::~PerSystDB(){
if(_internal_connection){
mysql_close(_mysql);
}
}
/**
* Check if job_id (db) exist. If map empty it doesn't exist/job not found is not yet on accounting.
* @param job_id_map job_id_string to job_id (db) map
*/
bool PerSystDB::getDBJobIDs(std::vector<std::string> & job_id_strings, std::map<std::string, std::string>& job_id_map ){
std::stringstream build_query;
build_query << "SELECT job_id, job_id_string FROM Accounting WHERE job_id_string IN (";
for(int i = 0; i < job_id_strings.size(); ++i){
build_query << "'" << job_id_strings[i] << "'";
if (i != job_id_strings.size()-1) { //not last element
build_query << ",";
}
}
build_query << ")";
auto query = build_query.str();
#if DEBUG
std::cout << query << std::endl;
#endif
if(mysql_real_query(_mysql, query.c_str(), query.size())){
print_error();
return false;
}
SQLResult result(_mysql);
if(result.get()){
MYSQL_ROW row;
while(row = result.fetch_row()){
if(row[0]){
job_id_map[std::string(row[1])]= row[0];
}
}
}
return true;
}
bool PerSystDB::getCurrentSuffixAggregateTable(std::string & suffix){
auto now = std::chrono::system_clock::now();
auto in_time_t = std::chrono::system_clock::to_time_t(now);
auto date_time = std::put_time(std::localtime(&in_time_t), "%Y-%m-%d %H:%M:%S");
std::stringstream build_query;
build_query << "SELECT suffix FROM SuffixToAggregateTable WHERE begin_timestamp < \'";
build_query << date_time << "\' AND end_timestamp > \'" << date_time << "\'";
auto query = build_query.str();
#if DEBUG
std::cout << query << std::endl;
#endif
if(mysql_real_query(_mysql, query.c_str(), query.size())){
print_error();
return false;
}
SQLResult result(_mysql);
if(result.get()){
MYSQL_ROW row;
while(row = result.fetch_row()){
if(row[0]){
suffix = std::string(row[0]);
return true;
} else {
return false;
}
}
}
return false;
}
bool PerSystDB::insertIntoJob(const std::string& job_id_string, unsigned long long uid, int & job_id_db, const std::string & suffix){
std::stringstream build_insert;
build_insert << "INSERT INTO Accounting (job_id_string, user, aggregate_first_suffix, aggregate_last_suffix) VALUES (\'" << job_id_string << "\',\'";
auto* pass = getpwuid(static_cast<uid_t>(uid));
build_insert << pass->pw_name << "\',\'";
build_insert << suffix << "\',\'" << suffix << "\')";
std::string query = build_insert.str();
#if DEBUG
std::cout << query << std::endl;
#endif
if(mysql_real_query(_mysql, query.c_str(), query.size())){
print_error();
return false;
}
job_id_db = mysql_insert_id(_mysql);
return true;
}
bool PerSystDB::insertInAggregateTable(const std::string& suffix, Aggregate_info_t & agg){
std::stringstream build_insert;
build_insert << "INSERT INTO Aggregate_" << suffix << " VALUES (\'" << agg.timestamp;
build_insert << "\', \'" << agg.job_id_db;
build_insert << "\', \'" << agg.property_type_id;
build_insert << "\', \'" << agg.num_of_observations;
build_insert << "\', \'" << agg.average;
for(auto quant: agg.quantiles){
build_insert << "\', \'" << quant;
}
build_insert << "\', \'" << agg.severity_average << "\')";
std::string query = build_insert.str();
#if DEBUG
std::cout << query << std::endl;
#endif
if(mysql_real_query(_mysql, query.c_str(), query.size())){
print_error();
return false;
}
return true;
}
bool PerSystDB::createNewAggregate(std::string& new_suffix){
std::string select = "SELECT suffix, end_timestamp FROM SuffixToAggregateTable ORDER BY end_timestamp DESC LIMIT 1";
if (mysql_real_query(_mysql, select.c_str(), select.size())){
print_error();
}
std::string last_suffix = "0";
std::string end_timestamp = "";
SQLResult result(_mysql);
if(result.get()){
MYSQL_ROW row;
while (row = result.fetch_row()){
if(row[0]){
last_suffix = std::string(row[0]);
}
if(row[1]){
end_timestamp = std::string(row[1]);
}
}
}
if(end_timestamp.size() == 0){
auto now = std::chrono::system_clock::now();
auto in_time_t = std::chrono::system_clock::to_time_t(now);
auto datetime = std::put_time(std::localtime(&in_time_t), "%Y-%m-%d %H:%M:%S");
std::stringstream dt;
dt << datetime;
end_timestamp = dt.str().substr(0,11);
end_timestamp = end_timestamp + "00:00:00";
}
int new_suff = std::stoi(last_suffix) + 1;
std::string new_begin_timestamp, new_end_timestamp;
getNewDates(end_timestamp, new_begin_timestamp, new_end_timestamp);
std::stringstream build_insert;
build_insert << "INSERT INTO SuffixToAggregateTable VALUES(\'" << new_suff;
build_insert << "\', \'" << new_begin_timestamp << "\', \'" << new_end_timestamp << "\')";
auto query = build_insert.str();
#if DEBUG
std::cout << query << std::endl;
#endif
if (mysql_real_query(_mysql, query.c_str(), query.size())){
print_error();
return false;
}
std::stringstream build_create;
build_create << "CREATE TABLE Aggregate_" << new_suff << " LIKE Aggregate";
auto query2 = build_create.str();
#if DEBUG
std::cout << query2 << std::endl;
#endif
if(mysql_real_query(_mysql, query2.c_str(), query2.size() )){
print_error();
return false;
}
new_suffix = std::to_string(new_suff);
return true;
}
bool PerSystDB::updateJobsLastSuffix(std::map<std::string, std::string>& job_map, std::string & suffix){
std::stringstream build_update;
build_update << "UPDATE Accounting SET aggregate_last_suffix=\'" << suffix << "\' WHERE job_id IN (";
unsigned int count = 0;
for(auto & kv: job_map){
build_update << kv.second;
if(count + 1 != job_map.size() ){
build_update << ",";
} else {
build_update << ")";
}
count++;
}
auto query = build_update.str();
#if DEBUG
std::cout << query << std::endl;
#endif
if (mysql_real_query(_mysql, query.c_str(), query.size())){
print_error();
}
}
bool PerSystDB::getTableSuffix(std::string & table_suffix){
if(!getCurrentSuffixAggregateTable(table_suffix) &&
!createNewAggregate(table_suffix)){
return false;
}
return true;
}
using days = std::chrono::duration<int, std::ratio_multiply<std::ratio<24>, std::chrono::hours::period>>;
using years = std::chrono::duration<int, std::ratio_multiply<std::ratio<146097, 400>, days::period>>;
using months = std::chrono::duration<int, std::ratio_divide<years::period, std::ratio<12>>>;
void PerSystDB::getNewDates(const std::string& last_end_timestamp, std::string & begin_timestamp, std::string & end_timestamp){
begin_timestamp = last_end_timestamp;
std::tm tm = {};
std::stringstream ss(last_end_timestamp);
ss >> std::get_time(&tm, "%Y-%m-%d %H:%M:%S");
auto tp = std::chrono::system_clock::from_time_t(std::mktime(&tm));
switch(_rotation) {
case EVERY_YEAR:
tp += years{1};
break;
case EVERY_MONTH:
tp += months{1};
break;
case EVERY_XDAYS:
tp += days{_every_x_days};
break;
default:
tp += months{1};
break;
};
auto in_time_t = std::chrono::system_clock::to_time_t(tp);
auto date_time = std::put_time(std::localtime(&in_time_t), "%Y-%m-%d %H:%M:%S");
std::stringstream build_date;
build_date << date_time;
end_timestamp = build_date.str();
}
//================================================================================
// Name : PerSystDB.h
// Author : Carla Guillen
// Contact : info@dcdb.it
// Copyright : Leibniz Supercomputing Centre
// Description : Template implementing features to use Units in Operators.
//================================================================================
//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2018-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//================================================================================
#ifndef ANALYTICS_OPERATORS_PERSYSTSQL_PERSYSTDB_H_
#define ANALYTICS_OPERATORS_PERSYSTSQL_PERSYSTDB_H_
#include <mysql.h>
#include <vector>
#include <string>
#include <map>
struct Aggregate_info_t {
std::string job_id_db;
std::string timestamp;
unsigned int property_type_id;
unsigned int num_of_observations;
float average;
std::vector<float> quantiles;
float severity_average;
};
class PerSystDB {
public:
enum Rotation_t {
EVERY_YEAR,
EVERY_MONTH,
EVERY_XDAYS //number of days must be provided
};
protected:
MYSQL *_mysql;
Rotation_t _rotation;
unsigned int _every_x_days; //ignored except when EVERY_XDAYS is chosen
bool _internal_connection;
void print_error();
public:
PerSystDB(MYSQL *mysql, Rotation_t rotation, unsigned int every_x_days=0);
PerSystDB(const std::string & host, const std::string & user, const std::string & password, const std::string & database_name, int port, Rotation_t rotation, unsigned int every_x_days=0);
virtual ~PerSystDB();
bool getDBJobIDs(std::vector<std::string> & job_id_strings, std::map<std::string, std::string>& job_id_map);
bool getCurrentSuffixAggregateTable(std::string & new_suffix);
bool insertIntoJob(const std::string& job_id_string, unsigned long long uid, int & job_id_db, const std::string & suffix );
bool createNewAggregate(std::string& new_suffix);
void getNewDates(const std::string& last_end_timestamp, std::string & begin_timestamp, std::string & end_timestamp);
bool insertInAggregateTable(const std::string& suffix, Aggregate_info_t & agg_info);
bool updateJobsLastSuffix(std::map<std::string, std::string>& job_map, std::string & suffix);
bool getTableSuffix(std::string & table_suffix);
};
#endif /* ANALYTICS_OPERATORS_PERSYSTSQL_PERSYSTDB_H_ */
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment