Commit 2c32cf2d authored by Carla Guillen's avatar Carla Guillen
Browse files

Changes before merge

parent 63f27050
......@@ -70,12 +70,31 @@ void PerSystDB::print_error(){
//mysql_close(mysql);
}
PerSystDB::PerSystDB(): _mysql(NULL), _rotation(EVERY_MONTH), _every_x_days(0), _internal_connection(false), _end_aggregate_timestamp(0) {
}
PerSystDB::PerSystDB(MYSQL *mysql, Rotation_t rotation, unsigned int every_x_days) :
_mysql(mysql), _rotation(rotation), _every_x_days(every_x_days), _internal_connection(
false), _end_aggregate_timestamp(0) {
}
bool PerSystDB::initializeConnection(const std::string & host, const std::string & user,
const std::string & password, const std::string & database_name,
Rotation_t rotation, int port, unsigned int every_x_days) {
_mysql = mysql_init(NULL);
if(!mysql_real_connect(_mysql, host.c_str(), user.c_str(), password.c_str(), database_name.c_str(), port, NULL, 0)){
print_error();
return false;
}
return true;
}
bool PerSystDB::finalizeConnection(){
mysql_close(_mysql);
return true;
}
PerSystDB::PerSystDB(const std::string & host, const std::string & user,
const std::string & password, const std::string & database_name,
int port, Rotation_t rotation, unsigned int every_x_days) :
......@@ -191,8 +210,8 @@ bool PerSystDB::insertIntoJob(const std::string& job_id_string, unsigned long lo
bool PerSystDB::insertInAggregateTable(const std::string& suffix, Aggregate_info_t & agg){
std::stringstream build_insert;
build_insert << "INSERT INTO Aggregate_" << suffix << " VALUES (\'" << agg.timestamp;
build_insert << "\', \'" << agg.job_id_db;
build_insert << "INSERT INTO Aggregate_" << suffix << " VALUES ( FROM_UNIXTIME(\'" << agg.timestamp;
build_insert << "\'), \'" << agg.job_id_db;
build_insert << "\', \'" << agg.property_type_id;
build_insert << "\', \'" << agg.num_of_observations;
build_insert << "\', \'" << agg.average;
......
......@@ -35,7 +35,7 @@
struct Aggregate_info_t {
std::string job_id_db;
std::string timestamp;
unsigned int timestamp;
unsigned int property_type_id;
unsigned int num_of_observations;
float average;
......@@ -64,23 +64,31 @@ protected:
void print_error();
public:
PerSystDB(MYSQL *mysql, Rotation_t rotation, unsigned int every_x_days=0);
PerSystDB(const std::string & host, const std::string & user, const std::string & password, const std::string & database_name, int port, Rotation_t rotation, unsigned int every_x_days=0);
virtual ~PerSystDB();
PerSystDB(); //Used for later initialization of mysql
bool initializeConnection(const std::string & host,
const std::string & user, const std::string & password,
const std::string & database_name, Rotation_t rotation, int port =3306, unsigned int every_x_days = 0);
bool finalizeConnection();
PerSystDB(MYSQL *mysql, Rotation_t rotation, unsigned int every_x_days=0);
PerSystDB(const std::string & host, const std::string & user,
const std::string & password, const std::string & database_name,
int port, Rotation_t rotation, unsigned int every_x_days = 0);
virtual ~PerSystDB();
/**
* Check if job_id (db) exist. If map empty it doesn't exist/job not found is not yet on accounting.
* @param job_id_map job_id_string to job_id (db) map
*/
bool getDBJobIDs(std::vector<std::string> & job_id_strings, std::map<std::string, std::string>& job_id_map);
bool getCurrentSuffixAggregateTable(std::string & new_suffix);
bool insertIntoJob(const std::string& job_id_string, unsigned long long uid, int & job_id_db, const std::string & suffix );
bool createNewAggregate(std::string& new_suffix);
void getNewDates(const std::string& last_end_timestamp, std::string & begin_timestamp, std::string & end_timestamp);
bool insertInAggregateTable(const std::string& suffix, Aggregate_info_t & agg_info);
bool updateJobsLastSuffix(std::map<std::string, std::string>& job_map, std::string & suffix);
bool getTableSuffix(std::string & table_suffix);
bool getDBJobIDs(std::vector<std::string> & job_id_strings, std::map<std::string, std::string>& job_id_map);
bool getCurrentSuffixAggregateTable(std::string & new_suffix);
bool insertIntoJob(const std::string& job_id_string, unsigned long long uid, int & job_id_db, const std::string & suffix);
bool createNewAggregate(std::string& new_suffix);
void getNewDates(const std::string& last_end_timestamp, std::string & begin_timestamp, std::string & end_timestamp);
bool insertInAggregateTable(const std::string& suffix, Aggregate_info_t & agg_info);
bool updateJobsLastSuffix(std::map<std::string, std::string>& job_map, std::string & suffix);
bool getTableSuffix(std::string & table_suffix);
};
......
......@@ -108,7 +108,10 @@ void PerSystSqlConfigurator::operatorAttributes(PerSystSqlOperator& op, CFG_VAL
} else if(boost::iequals(val.first, "scaling_factor")){
auto scaling_factor = std::stoull(val.second.data());
op.setScalingFactor(scaling_factor);
}
} else if(boost::iequals(val.first, "property_id")){
auto property_id = std::stoi(val.second.data());
op.setPropertyId(property_id);
}
}
}
......
......@@ -37,6 +37,7 @@
#include <memory>
#include <string>
#include <numeric>
#include <sstream>
#include "../../../common/include/logging.h"
#include "../../../common/include/sensorbase.h"
......@@ -45,108 +46,179 @@
#include "../../includes/QueryEngine.h"
#include "../../includes/UnitTemplate.h"
int PerSystSqlOperator::_number_of_calls = 0;
PerSystSqlOperator::PerSystSqlOperator(const std::string& name) :
OperatorTemplate(name), JobOperatorTemplate(name), _number_of_even_quantiles(0),
_severity_formula(NOFORMULA), _severity_threshold(0), _severity_exponent(0),
_severity_max_memory(0), _go_back_ns(0), _backend(DEFAULT), _scaling_factor(1) {
OperatorTemplate(name), JobOperatorTemplate(name), _number_of_even_quantiles(
0), _severity_formula(NOFORMULA), _severity_threshold(0), _severity_exponent(
0), _severity_max_memory(0), _go_back_ns(0), _backend(DEFAULT), _scaling_factor(
1) {
}
PerSystSqlOperator::~PerSystSqlOperator() {
}
void PerSystSqlOperator::compute(U_Ptr unit, qeJobData& jobData) {
// Clearing the buffer, if already allocated
// Clearing the buffer, if already allocated
_buffer.clear();
size_t elCtr=0;
uint64_t my_timestamp = getTimestamp() - _go_back_ns;
// Making sure that the aggregation boundaries do not go past the job start/end time
uint64_t jobEnd = jobData.endTime!=0 && my_timestamp > jobData.endTime ? jobData.endTime : my_timestamp;
uint64_t jobStart = jobEnd-my_timestamp < jobData.startTime ? jobData.startTime : jobEnd-my_timestamp;
// Job units are hierarchical, and thus we iterate over all sub-units associated to each single node
for(const auto& subUnit : unit->getSubUnits()) {
// Getting the most recent values as specified in _window
// Since we do not clear the internal buffer, all sensor readings will be accumulated in the same vector
for(const auto& in : subUnit->getInputs()) {
if(!_queryEngine.querySensor(in->getName(), my_timestamp, my_timestamp, _buffer, false)){
LOG(debug) << "PerSystSql Operator " << _name << " cannot read from sensor " << in->getName() << "!";
return;
}
}
}
compute_internal(unit, _buffer);
}
size_t elCtr = 0;
uint64_t my_timestamp = getTimestamp() - _go_back_ns;
// Making sure that the aggregation boundaries do not go past the job start/end time
uint64_t jobEnd =
jobData.endTime != 0 && my_timestamp > jobData.endTime ?
jobData.endTime : my_timestamp;
uint64_t jobStart =
jobEnd - my_timestamp < jobData.startTime ?
jobData.startTime : jobEnd - my_timestamp;
// Job units are hierarchical, and thus we iterate over all sub-units associated to each single node
for (const auto& subUnit : unit->getSubUnits()) {
// Getting the most recent values as specified in _window
// Since we do not clear the internal buffer, all sensor readings will be accumulated in the same vector
for (const auto& in : subUnit->getInputs()) {
if (!_queryEngine.querySensor(in->getName(), my_timestamp,
my_timestamp, _buffer, false)) {
LOG(debug)<< "PerSystSql Operator " << _name << " cannot read from sensor " << in->getName() << "!";
return;
}
}
}
static bool persystdb_initialized = false;
if(_backend == MARIADB){
if (!persystdb_initialized) {
LOG(debug)<< "PerSystSQL Operator Connection information:";
LOG(debug) << "\tHost=" << _conn.host;
LOG(debug) << "\tUser=" << _conn.user;
LOG(debug) << "\tDatabase=" << _conn.database_name;
LOG(debug) << "\tPort=" << _conn.port;
LOG(debug) << "\tRotation=" << _conn.rotation;
LOG(debug) << "\tEvery_X_days=" << _conn.every_x_days;
bool persystdb_initialized = persystdb.initializeConnection(_conn.host, _conn.user, _conn.password, _conn.database_name, _conn.rotation, _conn.port, _conn.every_x_days);
if(!persystdb_initialized) {
LOG(error) << "Unable to establish connection to database";
return;
}
}
}
Aggregate_info_t agg_info;
std::string table_suffix;
if(_backend == MARIADB){
std::stringstream jobidBuilder;
jobidBuilder << jobData.jobId;
void PerSystSqlOperator::compute_internal(U_Ptr& unit, vector<reading_t>& buffer) {
_quantileSensors.clear();
reading_t reading;
AggregatorSensorBase::aggregationOps_t op;
reading.timestamp = getTimestamp() - _go_back_ns;
std::vector<std::string> job_ids;
job_ids.push_back(jobidBuilder.str());
std::vector<double> douBuffer;
punToDoubles(buffer, douBuffer);
// Performing the actual aggregation operation
for(const auto& out : unit->getOutputs()) {
op = out->getOperation();
if(op!=AggregatorSensorBase::QTL) {
switch (op) {
case AggregatorSensorBase::SUM:
if(_backend == CASSANDRA){
reading.value = std::accumulate(douBuffer.begin(), douBuffer.end(), 0.0) * _scaling_factor;
} else {
reading.value = punDoubleToLL(std::accumulate(douBuffer.begin(), douBuffer.end(), 0.0));
}
break;
case AggregatorSensorBase::AVG:
if(_backend == CASSANDRA){
reading.value = std::accumulate(douBuffer.begin(), douBuffer.end(), 0.0)/douBuffer.size() * _scaling_factor;
} else {
reading.value = punDoubleToLL(std::accumulate(douBuffer.begin(), douBuffer.end(), 0.0)/douBuffer.size());
}
break;
case AggregatorSensorBase::OBS:
reading.value = computeObs(buffer);
break;
case AggregatorSensorBase::AVG_SEV:
if(_backend == CASSANDRA) {
reading.value = computeSeverityAverage(douBuffer)* _scaling_factor;
std::map<std::string, std::string> job_map;
if(!persystdb.getTableSuffix(table_suffix)){
LOG(error) << "failed to create table!";
return;
}
if(!persystdb.getDBJobIDs(job_ids, job_map)){
return;
}
// handle jobs which are not present
for(auto &job_id_string : job_ids ){
auto search = job_map.find(job_id_string);
if(search == job_map.end()){ //Not found
int job_id_db;
if(persystdb.insertIntoJob(job_id_string, jobData.userId, job_id_db, table_suffix)){
agg_info.job_id_db = std::to_string(job_id_db);
} else {
reading.value = punDoubleToLL(computeSeverityAverage(douBuffer));
continue;
}
break;
default:
LOG(warning) << _name << ": Operation " << op << " not supported!";
reading.value = 0;
break;
}
if(_backend == CASSANDRA){
out->storeReading(reading);
} else {
//ToDo
}
} else {
_quantileSensors.push_back(out);
}
}
}
}
agg_info.timestamp = (my_timestamp/1e9);
}
if(!_quantileSensors.empty()) {
vector<double> quantiles;
computeEvenQuantiles(douBuffer, _number_of_even_quantiles, quantiles);
for(unsigned idx=0; idx<quantiles.size(); idx++) {
reading.value = punDoubleToLL(quantiles[idx]);
_quantileSensors[idx]->storeReading(reading);
}
}
compute_internal(unit, _buffer, agg_info);
if(_backend == MARIADB){
persystdb.insertInAggregateTable(table_suffix, agg_info);
if(_number_of_calls % 10 == 0 && persystdb_initialized){
persystdb.finalizeConnection();
persystdb_initialized = false;
}
_number_of_calls++;
}
}
void PerSystSqlOperator::compute(U_Ptr unit){
void PerSystSqlOperator::compute_internal(U_Ptr& unit,
vector<reading_t>& buffer, Aggregate_info_t & agg_info) {
_quantileSensors.clear();
reading_t reading;
AggregatorSensorBase::aggregationOps_t op;
reading.timestamp = getTimestamp() - _go_back_ns;
std::vector<double> douBuffer;
punToDoubles(buffer, douBuffer);
// Performing the actual aggregation operation
for (const auto& out : unit->getOutputs()) {
op = out->getOperation();
if (op != AggregatorSensorBase::QTL) {
switch (op) {
case AggregatorSensorBase::AVG:
if (_backend == CASSANDRA) {
reading.value = std::accumulate(douBuffer.begin(),
douBuffer.end(), 0.0) / douBuffer.size()
* _scaling_factor;
} else {
agg_info.average = std::accumulate(douBuffer.begin(), douBuffer.end(), 0.0) / douBuffer.size();
}
break;
case AggregatorSensorBase::OBS:
reading.value = computeObs(buffer);
agg_info.num_of_observations = computeObs(buffer);
break;
case AggregatorSensorBase::AVG_SEV:
if (_backend == CASSANDRA) {
reading.value = computeSeverityAverage(douBuffer) * _scaling_factor;
} else {
agg_info.severity_average = computeSeverityAverage(douBuffer);
}
break;
default:
LOG(warning)<< _name << ": Operation " << op << " not supported!";
reading.value = 0;
break;
}
if(_backend == CASSANDRA) {
out->storeReading(reading);
}
} else {
_quantileSensors.push_back(out);
}
}
if (!_quantileSensors.empty()) {
vector<double> quantiles;
computeEvenQuantiles(douBuffer, _number_of_even_quantiles, quantiles);
if (_backend == CASSANDRA) {
for (unsigned idx = 0; idx < quantiles.size(); idx++) {
reading.value = quantiles[idx]*_scaling_factor;
_quantileSensors[idx]->storeReading(reading);
}
} else {
for(auto q: quantiles){
agg_info.quantiles.push_back(static_cast<float>(q));
}
}
}
agg_info.property_type_id = _property_id;
}
void PerSystSqlOperator::compute(U_Ptr unit) {
//nothing here!
}
double severity_formula1(double metric, double threshold, double exponent){
double severity_formula1(double metric, double threshold, double exponent) {
double val = metric - threshold;
if (val > 0) {
double ret = (pow(val, exponent));
if(ret > 1){
if (ret > 1) {
return 1;
}
return ret;
......@@ -154,14 +226,14 @@ double severity_formula1(double metric, double threshold, double exponent){
return 0;
}
double severity_formula2(double metric, double threshold, double exponent){
if(!threshold){
double severity_formula2(double metric, double threshold, double exponent) {
if (!threshold) {
return -1;
}
double val = metric / threshold - 1;
if (val > 0) {
double ret= (pow(val, exponent));
if(ret > 1){
double ret = (pow(val, exponent));
if (ret > 1) {
return 1;
}
return ret;
......@@ -169,17 +241,17 @@ double severity_formula2(double metric, double threshold, double exponent){
return 0;
}
double severity_formula3(double metric, double threshold, double exponent){
double severity_formula3(double metric, double threshold, double exponent) {
if (!threshold) {
return -1;
}
double val = metric / threshold;
if (val > 0) {
double ret= (1 - pow(val, exponent));
if(ret > 1 ){
double ret = (1 - pow(val, exponent));
if (ret > 1) {
return 1;
}
if( ret < 0 ){
if (ret < 0) {
return 0;
}
return ret;
......@@ -187,105 +259,112 @@ double severity_formula3(double metric, double threshold, double exponent){
return 0;
}
double severity_memory(double metric, double threshold, double max_memory){
double severity_memory(double metric, double threshold, double max_memory) {
double denominator = max_memory - threshold;
double severity = -1;
if(denominator){
severity = metric - threshold/(max_memory - threshold);
if(severity > 1) {
if (denominator) {
severity = metric - threshold / (max_memory - threshold);
if (severity > 1) {
severity = 1;
} else if(severity < 0){
} else if (severity < 0) {
severity = 0;
}
}
return severity;
}
double PerSystSqlOperator::computeSeverityAverage(std::vector<double> & buffer){
double PerSystSqlOperator::computeSeverityAverage(
std::vector<double> & buffer) {
std::vector<double> severities;
switch( _severity_formula ) {
case (FORMULA1):
for(auto val : buffer){
auto severity = severity_formula1(val, _severity_threshold, _severity_exponent);
severities.push_back(severity);
}
switch (_severity_formula) {
case (FORMULA1):
for (auto val : buffer) {
auto severity = severity_formula1(val, _severity_threshold,
_severity_exponent);
severities.push_back(severity);
}
break;
case (FORMULA2):
for(auto val: buffer){
auto severity = severity_formula2(val, _severity_threshold, _severity_exponent);
severities.push_back(severity);
}
case (FORMULA2):
for (auto val : buffer) {
auto severity = severity_formula2(val, _severity_threshold,
_severity_exponent);
severities.push_back(severity);
}
break;
case (FORMULA3):
for(auto val: buffer){
auto severity = severity_formula3(val, _severity_threshold, _severity_exponent);
severities.push_back(severity);
}
case (FORMULA3):
for (auto val : buffer) {
auto severity = severity_formula3(val, _severity_threshold,
_severity_exponent);
severities.push_back(severity);
}
break;
case (MEMORY_FORMULA):
for(auto val: buffer){
auto severity = severity_memory(val, _severity_threshold, _severity_max_memory);
severities.push_back(severity);
}
case (MEMORY_FORMULA):
for (auto val : buffer) {
auto severity = severity_memory(val, _severity_threshold,
_severity_max_memory);
severities.push_back(severity);
}
break;
case (NOFORMULA):
for(auto val: buffer){
severities.push_back(severity_noformula());
}
case (NOFORMULA):
for (auto val : buffer) {
severities.push_back(severity_noformula());
}
break;
default:
return 0.0;
break;
default:
return 0.0;
break;
}
if (severities.size()){
return (std::accumulate(severities.begin(),severities.end(), 0.0) / severities.size());
if (severities.size()) {
return (std::accumulate(severities.begin(), severities.end(), 0.0)
/ severities.size());
}
return 0.0;
}
void punToDoubles(std::vector<reading_t> & buffer, std::vector<double> & outDoubleVec){
for(auto & reading: buffer){
void punToDoubles(std::vector<reading_t> & buffer,
std::vector<double> & outDoubleVec) {
for (auto & reading : buffer) {
outDoubleVec.push_back(punLLToDouble(reading.value));
}
}
double punLLToDouble(long long value){
double * returnval;
returnval = (double *)(&value);
return *returnval;
double punLLToDouble(long long value) {
double * returnval;
returnval = (double *) (&value);
return *returnval;
}
long long punDoubleToLL(double value){
long long * returnval;
returnval = (long long *)(&value);
long long punDoubleToLL(double value) {
long long * returnval;
returnval = (long long *) (&value);
return *returnval;
return *returnval;
}
void computeEvenQuantiles(std::vector<double> &data, const unsigned int NUMBER_QUANTILES, std::vector<double> &quantiles) {
if (data.empty() || NUMBER_QUANTILES == 0) {
return;
}
std::sort(data.begin(), data.end());
int elementNumber = data.size();
quantiles.resize(NUMBER_QUANTILES + 1); //+min
double factor = elementNumber/static_cast<double>(NUMBER_QUANTILES);
quantiles[0] = data[0]; //minimum
quantiles[NUMBER_QUANTILES] = data[data.size() - 1]; //maximum
for (unsigned int i = 1; i < NUMBER_QUANTILES; i++) {
if (elementNumber > 1) {
int idx = static_cast<int>(std::floor(i * factor));
if(idx == 0){
quantiles[i] = data[0];
} else {
double rest = (i * factor) - idx;
quantiles[i] = data[idx - 1] + rest * (data[idx] - data[idx - 1]); //ToDo scaling factor??
}
} else { //optimization, we don't need to calculate all the quantiles
quantiles[i] = data[0];
}
}
void computeEvenQuantiles(std::vector<double> &data,
const unsigned int NUMBER_QUANTILES, std::vector<double> &quantiles) {
if (data.empty() || NUMBER_QUANTILES == 0) {