//================================================================================ // Name : PerSystSqlOperator.cpp // Author : Carla Guillen // Contact : info@dcdb.it // Copyright : Leibniz Supercomputing Centre // Description : Template implementing features to use Units in Operators. //================================================================================ //================================================================================ // This file is part of DCDB (DataCenter DataBase) // Copyright (C) 2018-2019 Leibniz Supercomputing Centre // // This program is free software; you can redistribute it and/or // modify it under the terms of the GNU General Public License // as published by the Free Software Foundation; either version 2 // of the License, or (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with this program; if not, write to the Free Software // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. //================================================================================ #include "PerSystSqlOperator.h" #include #include #include #include #include #include #include #include #include #include #include "../../../common/include/logging.h" #include "../../../common/include/sensorbase.h" #include "../../../common/include/timestamp.h" #include "../../includes/CommonStatistics.h" #include "../../includes/QueryEngine.h" #include "../../includes/UnitTemplate.h" PerSystSqlOperator::PerSystSqlOperator(const std::string& name) : OperatorTemplate(name), JobOperatorTemplate(name), _number_of_even_quantiles(0), _severity_formula(NOFORMULA), _severity_threshold(0), _severity_exponent(0), _severity_max_memory(0), _go_back_ns(0) { } PerSystSqlOperator::~PerSystSqlOperator() { } void PerSystSqlOperator::compute(U_Ptr unit, qeJobData& jobData) { // Clearing the buffer, if already allocated _buffer.clear(); size_t elCtr=0; uint64_t my_timestamp = getTimestamp() - _go_back_ns; // Making sure that the aggregation boundaries do not go past the job start/end time uint64_t jobEnd = jobData.endTime!=0 && my_timestamp > jobData.endTime ? jobData.endTime : my_timestamp; uint64_t jobStart = jobEnd-my_timestamp < jobData.startTime ? jobData.startTime : jobEnd-my_timestamp; // Job units are hierarchical, and thus we iterate over all sub-units associated to each single node for(const auto& subUnit : unit->getSubUnits()) { // Getting the most recent values as specified in _window // Since we do not clear the internal buffer, all sensor readings will be accumulated in the same vector for(const auto& in : subUnit->getInputs()) { if(!_queryEngine.querySensor(in->getName(), my_timestamp, my_timestamp, _buffer, false)){ LOG(debug) << "Job Operator " << _name << " cannot read from sensor " << in->getName() << "!"; return; } } } compute_internal(unit, _buffer); } void PerSystSqlOperator::compute_internal(U_Ptr& unit, vector& buffer) { _quantileSensors.clear(); reading_t reading; AggregatorSensorBase::aggregationOps_t op; reading.timestamp = getTimestamp() - _go_back_ns; std::vector douBuffer; punToDoubles(buffer, douBuffer); // Performing the actual aggregation operation for(const auto& out : unit->getOutputs()) { op = out->getOperation(); if(op!=AggregatorSensorBase::QTL) { switch (op) { case AggregatorSensorBase::SUM: reading.value = punDoubleToLL(std::accumulate(douBuffer.begin(), douBuffer.end(), 0.0)); break; case AggregatorSensorBase::AVG: reading.value = punDoubleToLL(std::accumulate(douBuffer.begin(), douBuffer.end(), 0.0)/douBuffer.size()); break; case AggregatorSensorBase::OBS: reading.value = computeObs(buffer); break; case AggregatorSensorBase::AVG_SEV: reading.value = punDoubleToLL(computeSeverityAverage(douBuffer)); break; default: LOG(warning) << _name << ": Operation " << op << " not supported!"; reading.value = 0; break; } out->storeReading(reading); } else { _quantileSensors.push_back(out); } } if(!_quantileSensors.empty()) { vector quantiles; computeEvenQuantiles(douBuffer, _number_of_even_quantiles, quantiles); for(unsigned idx=0; idxstoreReading(reading); } } } void PerSystSqlOperator::compute(U_Ptr unit){ //nothing here! } double severity_formula1(double metric, double threshold, double exponent){ double val = metric - threshold; if (val > 0) { double ret = (pow(val, exponent)); if(ret > 1){ return 1; } return ret; } return 0; } double severity_formula2(double metric, double threshold, double exponent){ if(!threshold){ return -1; } double val = metric / threshold - 1; if (val > 0) { double ret= (pow(val, exponent)); if(ret > 1){ return 1; } return ret; } return 0; } double severity_formula3(double metric, double threshold, double exponent){ if (!threshold) { return -1; } double val = metric / threshold; if (val > 0) { double ret= (1 - pow(val, exponent)); if(ret > 1 ){ return 1; } if( ret < 0 ){ return 0; } return ret; } return 0; } double severity_memory(double metric, double threshold, double max_memory){ double denominator = max_memory - threshold; double severity = -1; if(denominator){ severity = metric - threshold/(max_memory - threshold); if(severity > 1) { severity = 1; } else if(severity < 0){ severity = 0; } } return severity; } double PerSystSqlOperator::computeSeverityAverage(std::vector & buffer){ std::vector severities; switch( _severity_formula ) { case (FORMULA1): for(auto val : buffer){ auto severity = severity_formula1(val, _severity_threshold, _severity_exponent); severities.push_back(severity); } break; case (FORMULA2): for(auto val: buffer){ auto severity = severity_formula2(val, _severity_threshold, _severity_exponent); severities.push_back(severity); } break; case (FORMULA3): for(auto val: buffer){ auto severity = severity_formula3(val, _severity_threshold, _severity_exponent); severities.push_back(severity); } break; case (MEMORY_FORMULA): for(auto val: buffer){ auto severity = severity_memory(val, _severity_threshold, _severity_max_memory); severities.push_back(severity); } break; case (NOFORMULA): for(auto val: buffer){ severities.push_back(severity_noformula()); } break; default: return 0.0; break; } if (severities.size()){ return (std::accumulate(severities.begin(),severities.end(), 0.0) / severities.size()); } return 0.0; } void punToDoubles(std::vector & buffer, std::vector & outDoubleVec){ for(auto & reading: buffer){ outDoubleVec.push_back(punLLToDouble(reading.value)); } } double punLLToDouble(long long value){ double * returnval; returnval = (double *)(&value); return *returnval; } long long punDoubleToLL(double value){ long long * returnval; returnval = (long long *)(&value); return *returnval; } void computeEvenQuantiles(std::vector &data, const unsigned int NUMBER_QUANTILES, std::vector &quantiles) { if (data.empty() || NUMBER_QUANTILES == 0) { return; } std::sort(data.begin(), data.end()); int elementNumber = data.size(); quantiles.resize(NUMBER_QUANTILES + 1); //+min double factor = elementNumber/static_cast(NUMBER_QUANTILES); quantiles[0] = data[0]; //minimum quantiles[NUMBER_QUANTILES] = data[data.size() - 1]; //maximum for (unsigned int i = 1; i < NUMBER_QUANTILES; i++) { if (elementNumber > 1) { int idx = static_cast(std::floor(i * factor)); if(idx == 0){ quantiles[i] = data[0]; } else { double rest = (i * factor) - idx; quantiles[i] = data[idx - 1] + rest * (data[idx] - data[idx - 1]); //ToDo scaling factor?? } } else { //optimization, we don't need to calculate all the quantiles quantiles[i] = data[0]; } } }