Commit c03a71d6 authored by Carla Guillen Carias's avatar Carla Guillen Carias
Browse files

Fixing errors

parent 0c95e467
......@@ -8,27 +8,35 @@ streaming true
jobtsaggregator cpi {
default def1
number_quantiles 10
topic_quantiles
input {
;/.../computenode/cpux/cpi
sensor "<bottomup 1>cpi"
}
output {
; In this case "bottomup 1" is the sensor tree level associated to compute nodes
sensor "<bottomup 1>avg/cpi" {
; /slurmjobid/cpi/avg
sensor "<bottomup 1>/cpi.avg" {
mqttsuffix /avg/cpi
operation average
}
;per job??
sensor "<bottomup 1>numobs/cpi" {
mqttsuffix /numobs/cpi
sensor "<bottomup 1>cpi.numobs" {
mqttsuffix /cpi.numobs
operation numobs
}
sensor "<bottomup 2>avgseverity/cpi" {
mqttsuffix /avgseverity/cpi
sensor "<bottomup 1>cpi.avgseverity" {
mqttsuffix /cpi.avgseverity
operation average_severity
}
sensor "<bottomup 1>cpi.quantile" {
mqttsuffix /cpi.quantile
operation quantile
}
}
}
......
......@@ -47,7 +47,7 @@ class AggregatorSensorBase : public SensorBase {
public:
// Enum to identify aggregation operations
enum aggregationOps_t { SUM = 0, AVG = 1, MAX = 2, MIN = 3, STD = 4, QTL = 5, OBS = 6 };
enum aggregationOps_t { SUM = 0, AVG = 1, MAX = 2, MIN = 3, STD = 4, QTL = 5, OBS = 6, AVG_SEV = 7 };
// Constructor and destructor
AggregatorSensorBase(const std::string& name) : SensorBase(name) {
......
......@@ -65,6 +65,8 @@ void JobTSAggregatorConfigurator::sensorBase(AggregatorSensorBase& s, CFG_VAL co
s.setOperation(AggregatorSensorBase::QTL);
else if (opName == "observations")
s.setOperation(AggregatorSensorBase::OBS);
else if (opName == "average_severity")
s.setOperation(AggregatorSensorBase::AVG_SEV);
}
}
}
......@@ -85,22 +87,30 @@ bool JobTSAggregatorConfigurator::unit(UnitTemplate<AggregatorSensorBase>& u) {
bool JobTSAggregatorConfigurator::readUnits(JobTSAggregatorOperator& op,
std::vector<shared_ptr<AggregatorSensorBase>>& protoInputs,
std::vector<shared_ptr<AggregatorSensorBase>>& protoOutputs, inputMode_t inputMode) {
bool succ = OperatorConfiguratorTemplate::readUnits(op, protoInputs, protoOutputs, inputMode);
int num_quantiles = op.getNumberOfEvenQuantiles();
if(num_quantiles == 0){
return succ;
return false;
}
AggregatorSensorBase quantsensor;
for(auto &sensor: protoOutputs){
if(sensor->getOperation() == AggregatorSensorBase::QTL){
quantsensor = *(sensor.get());
sensor->setPercentile(0);
auto topic = sensor->getMqtt();
sensor->setMqtt(topic + "0");
break;
}
}
for(int i = 0; i <= num_quantiles; ++i){
std::stringstream builder;
builder << "quantile" << i << "/" << num_quantiles << "/" << op.getName();
auto outputSensor = std::make_shared<AggregatorSensorBase>(builder.str());
//ToDo how do we set the mqtt? here?
for(int i = 1; i <= num_quantiles; ++i){
auto outputSensor = std::make_shared<AggregatorSensorBase>(quantsensor);
outputSensor->setMqtt(outputSensor->getMqtt() + std::to_string(i));
outputSensor->setOperation(AggregatorSensorBase::QTL);
outputSensor->setPercentile(i);
protoOutputs.push_back(outputSensor);
op.setQuantileSensor(outputSensor);
}
return succ;
return JobOperatorConfiguratorTemplate::readUnits(op, protoInputs, protoOutputs, inputMode);
}
......@@ -73,6 +73,7 @@ void JobTSAggregatorOperator::compute(U_Ptr unit, qeJobData& jobData) {
}
void JobTSAggregatorOperator::compute_internal(U_Ptr unit, vector<reading_t> buffer) {
_quantileSensors.clear();
reading_t reading;
AggregatorSensorBase::aggregationOps_t op;
reading.timestamp = getTimestamp() - 10e9;
......@@ -105,6 +106,8 @@ void JobTSAggregatorOperator::compute_internal(U_Ptr unit, vector<reading_t> buf
break;
}
out->storeReading(reading);
} else {
_quantileSensors.push_back(out);
}
}
......@@ -121,3 +124,62 @@ void JobTSAggregatorOperator::compute_internal(U_Ptr unit, vector<reading_t> buf
void JobTSAggregatorOperator::compute(U_Ptr unit){
//nothing here!
}
float severity_formula1(float metric, float threshold, float exponent){
float val = metric - threshold;
if (val > 0) {
float ret = (pow(val, exponent));
if(ret > 1){
return 1;
}
return ret;
}
return 0;
}
float severity_formula2(float metric, float threshold, float exponent){
if(!threshold){
return -1;
}
float val = metric / threshold - 1;
if (val > 0) {
float ret= (pow(val, exponent));
if(ret > 1){
return 1;
}
return ret;
}
return 0;
}
float severity_formula3(float metric, float threshold, float exponent){
if (!threshold) {
return -1;
}
float val = metric / threshold;
if (val > 0) {
float ret= (1 - pow(val, exponent));
if(ret > 1 ){
return 1;
}
if( ret < 0 ){
return 0;
}
return ret;
}
return 0;
}
float severity_memory(float metric, float threshold, float max_memory){
float denominator = max_memory - threshold;
float severity = -1;
if(denominator){
severity = metric - threshold/(max_memory - threshold);
if(severity > 1) {
severity = 1;
} else if(severity < 0){
severity = 0;
}
}
return severity;
}
......@@ -62,50 +62,10 @@ protected:
unsigned int number_of_even_quantiles;
};
float severity_formula1(float metric, float threshold, float exponent){
float val = metric - threshold;
if (val > 0) {
float ret = (pow(val, exponent));
if(ret > 1){
return 1;
}
return ret;
}
return 0;
}
float severity_formula2(float metric, float threshold, float exponent){
if(threshold == 0){
return -1;
}
float val = metric / threshold - 1;
if (val > 0) {
float ret= (pow(val, exponent));
if(ret > 1){
return 1;
}
return ret;
}
return 0;
}
float severity_formula3(float metric, float threshold, float exponent){
if (threshold == 0) {
return -1;
}
float val = metric / threshold;
if (val > 0) {
float ret= (1 - pow(val, exponent));
if(ret > 1 ){
return 1;
}
if( ret < 0 ){
return 0;
}
return ret;
}
return 0;
}
float severity_formula1(float metric, float threshold, float exponent);
float severity_formula2(float metric, float threshold, float exponent);
float severity_formula3(float metric, float threshold, float exponent);
float severity_memory(float metric, float threshold, float max_memory);
constexpr float severity_noformula(){return 0;} //No severity
#endif /* ANALYTICS_OPERATORS_PERSYSTSQL_JOBTSAGGREGATOROPERATOR_H_ */
......@@ -67,7 +67,8 @@ void PerSystDB::print_error(){
//mysql_close(mysql);
}
PerSystDB::PerSystDB(MYSQL *mysql, Rotation_t rotation, unsigned int every_x_days): _mysql(mysql), _rotation(rotation), _every_x_days(every_x_days), _internal_connection(false) {
PerSystDB::PerSystDB(MYSQL *mysql, Rotation_t rotation, unsigned int every_x_days):
_mysql(mysql), _rotation(rotation), _every_x_days(every_x_days), _internal_connection(false) {
}
......@@ -83,10 +84,6 @@ PerSystDB::~PerSystDB(){
}
}
/**
* Check if job_id (db) exist. If map empty it doesn't exist/job not found is not yet on accounting.
* @param job_id_map job_id_string to job_id (db) map
*/
bool PerSystDB::getDBJobIDs(std::vector<std::string> & job_id_strings, std::map<std::string, std::string>& job_id_map ){
std::stringstream build_query;
build_query << "SELECT job_id, job_id_string FROM Accounting WHERE job_id_string IN (";
......@@ -291,32 +288,33 @@ using days = std::chrono::duration<int, std::ratio_multiply<std::ratio<24>, std:
using years = std::chrono::duration<int, std::ratio_multiply<std::ratio<146097, 400>, days::period>>;
using months = std::chrono::duration<int, std::ratio_divide<years::period, std::ratio<12>>>;
void PerSystDB::getNewDates(const std::string& last_end_timestamp, std::string & begin_timestamp, std::string & end_timestamp){
begin_timestamp = last_end_timestamp;
std::tm tm = {};
std::stringstream ss(last_end_timestamp);
ss >> std::get_time(&tm, "%Y-%m-%d %H:%M:%S");
auto tp = std::chrono::system_clock::from_time_t(std::mktime(&tm));
switch(_rotation) {
case EVERY_YEAR:
tp += years{1};
break;
case EVERY_MONTH:
tp += months{1};
break;
case EVERY_XDAYS:
tp += days{_every_x_days};
break;
default:
tp += months{1};
break;
};
auto in_time_t = std::chrono::system_clock::to_time_t(tp);
auto date_time = std::put_time(std::localtime(&in_time_t), "%Y-%m-%d %H:%M:%S");
std::stringstream build_date;
build_date << date_time;
end_timestamp = build_date.str();
void PerSystDB::getNewDates(const std::string& last_end_timestamp,
std::string & begin_timestamp, std::string & end_timestamp) {
begin_timestamp = last_end_timestamp;
std::tm tm = { };
std::stringstream ss(last_end_timestamp);
ss >> std::get_time(&tm, "%Y-%m-%d %H:%M:%S");
auto tp = std::chrono::system_clock::from_time_t(std::mktime(&tm));
switch (_rotation) {
case EVERY_YEAR:
tp += years { 1 };
break;
case EVERY_MONTH:
tp += months { 1 };
break;
case EVERY_XDAYS:
tp += days { _every_x_days };
break;
default:
tp += months { 1 };
break;
};
auto in_time_t = std::chrono::system_clock::to_time_t(tp);
auto date_time = std::put_time(std::localtime(&in_time_t),"%Y-%m-%d %H:%M:%S");
std::stringstream build_date;
build_date << date_time;
end_timestamp = build_date.str();
}
......@@ -65,6 +65,10 @@ public:
PerSystDB(const std::string & host, const std::string & user, const std::string & password, const std::string & database_name, int port, Rotation_t rotation, unsigned int every_x_days=0);
virtual ~PerSystDB();
/**
* Check if job_id (db) exist. If map empty it doesn't exist/job not found is not yet on accounting.
* @param job_id_map job_id_string to job_id (db) map
*/
bool getDBJobIDs(std::vector<std::string> & job_id_strings, std::map<std::string, std::string>& job_id_map);
bool getCurrentSuffixAggregateTable(std::string & new_suffix);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment