Commit b0cdd32f authored by Carla Guillen Carias's avatar Carla Guillen Carias
Browse files

PerSyst data plugin to store the derived metrics per job, for the PerSyst MariaDB SQL Database.

parent 31d0c28b
template_jobtsaggregator def1 {
interval 1000
minValues 3
duplicate false
streaming true
}
jobtsaggregator cpi {
default def1
number_quantiles 10
input {
sensor "<bottomup 1>cpi"
}
output {
; In this case "bottomup 1" is the sensor tree level associated to compute nodes
sensor "<bottomup 1>deciles/cpi" {
mqttsuffix /decilesXX/cpi
operation quantiles
}
sensor "<bottomup 1>avg/cpi" {
mqttsuffix /avg/cpi
operation average
}
;per job??
sensor "<bottomup 1>numobs/cpi" {
mqttsuffix /numobs/cpi
operation numobs
}
sensor "<bottomup 2>avgseverity/cpi" {
mqttsuffix /avgseverity/cpi
operation average_severity
}
}
}
jobtsaggregator frequency {
default def1
number_quantiles 10
input {
sensor "<bottomup 1>cpi"
}
output {
; In this case "bottomup 1" is the sensor tree level associated to compute nodes
sensor "<bottomup 1>deciles/frequency" {
mqttsuffix /decilesXX/frequency
operation quantiles
}
sensor "<bottomup 1>avg/frequency" {
mqttsuffix /avg/frequency
operation average
}
sensor "<bottomup 1>numobs/frequency" {
mqttsuffix /numobs/frequency
operation numobs
}
}
}
//================================================================================
// Name : JobTSAggregatorConfigurator.cpp
// Author : Carla Guillen
// Contact : info@dcdb.it
// Copyright : Leibniz Supercomputing Centre
// Description : Template implementing features to use Units in Operators.
//================================================================================
//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2018-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//================================================================================
#include "JobTSAggregatorConfigurator.h"
#include <boost/algorithm/string/predicate.hpp>
#include <boost/foreach.hpp>
#include <boost/property_tree/detail/ptree_implementation.hpp>
#include <boost/property_tree/ptree_fwd.hpp>
#include <string>
#include <vector>
#include "../../includes/OperatorConfiguratorTemplate.h"
#include "../../includes/UnitInterface.h"
JobTSAggregatorConfigurator::JobTSAggregatorConfigurator(): JobOperatorConfiguratorTemplate() {
_operatorName = "persystsql";
_baseName = "sensor";
}
JobTSAggregatorConfigurator::~JobTSAggregatorConfigurator() {
}
void JobTSAggregatorConfigurator::sensorBase(AggregatorSensorBase& s, CFG_VAL config) {
BOOST_FOREACH(boost::property_tree::iptree::value_type &val, config)
{
if (boost::iequals(val.first, "operation")) {
std::string opName = val.second.data();
if (opName == "sum")
s.setOperation(AggregatorSensorBase::SUM);
else if (opName == "average")
s.setOperation(AggregatorSensorBase::AVG);
else if (opName == "maximum")
s.setOperation(AggregatorSensorBase::MAX);
else if (opName == "minimum")
s.setOperation(AggregatorSensorBase::MIN);
else if (opName == "std")
s.setOperation(AggregatorSensorBase::STD);
else if (opName == "deciles" || opName == "percentiles")
s.setOperation(AggregatorSensorBase::QTL);
else if (opName == "observations")
s.setOperation(AggregatorSensorBase::OBS);
}
}
}
void JobTSAggregatorConfigurator::operatorAttributes(JobTSAggregatorOperator& op, CFG_VAL config) {
BOOST_FOREACH(boost::property_tree::iptree::value_type &val, config) {
if (boost::iequals(val.first, "number_quantiles")) {
unsigned int num_quantiles = std::stoul(val.second.data());
op.setNumberOfEvenQuantiles(num_quantiles);
}
}
}
bool JobTSAggregatorConfigurator::unit(UnitTemplate<AggregatorSensorBase>& u) {
return true;
}
bool JobTSAggregatorConfigurator::readUnits(JobTSAggregatorOperator& op,
std::vector<shared_ptr<AggregatorSensorBase>>& protoInputs,
std::vector<shared_ptr<AggregatorSensorBase>>& protoOutputs, inputMode_t inputMode) {
bool succ = OperatorConfiguratorTemplate::readUnits(op, protoInputs, protoOutputs, inputMode);
int num_quantiles = op.getNumberOfEvenQuantiles();
for(int i = 0; i <= num_quantiles; ++i){
std::stringstream builder;
builder << "quantile" << i << " from " << num_quantiles << op.getName();
auto outputSensor = std::make_shared<AggregatorSensorBase>(builder.str());
outputSensor->setOperation(AggregatorSensorBase::QTL);
outputSensor->setPercentile(i);
protoOutputs.push_back(outputSensor);
op.setQuantileSensor(outputSensor);
}
return succ;
}
//================================================================================
// Name : JobTSAggregatorConfigurator.h
// Author : Carla Guillen
// Contact : info@dcdb.it
// Copyright : Leibniz Supercomputing Centre
// Description : Template implementing features to use Units in Operators.
//================================================================================
//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2018-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//================================================================================
#ifndef ANALYTICS_OPERATORS_PERSYSTSQL_JOBTSAGGREGATORCONFIGURATOR_H_
#define ANALYTICS_OPERATORS_PERSYSTSQL_JOBTSAGGREGATORCONFIGURATOR_H_
#include "../../includes/JobOperatorConfiguratorTemplate.h"
#include "../../includes/UnitTemplate.h"
#include "../aggregator/AggregatorSensorBase.h"
#include "../aggregator/JobAggregatorOperator.h"
#include "../smucngperf/SMUCSensorBase.h"
#include "JobTSAggregatorOperator.h"
class JobTSAggregatorConfigurator: public JobOperatorConfiguratorTemplate<JobTSAggregatorOperator, AggregatorSensorBase> {
public:
JobTSAggregatorConfigurator();
virtual ~JobTSAggregatorConfigurator();
private:
void sensorBase(AggregatorSensorBase& s, CFG_VAL config) override;
void operatorAttributes(JobTSAggregatorOperator& op, CFG_VAL config) override;
bool unit(UnitTemplate<AggregatorSensorBase>& u) override;
bool readUnits(JobTSAggregatorOperator& op, std::vector<shared_ptr<AggregatorSensorBase>>& protoInputs, std::vector<shared_ptr<AggregatorSensorBase>>& protoOutputs, inputMode_t inputMode);
};
extern "C" OperatorConfiguratorInterface* create() {
return new JobTSAggregatorConfigurator;
}
extern "C" void destroy(OperatorConfiguratorInterface* c) {
delete c;
}
#endif /* ANALYTICS_OPERATORS_PERSYSTSQL_JOBTSAGGREGATORCONFIGURATOR_H_ */
//================================================================================
// Name : JobTSAggregatorOperator.cpp
// Author : Carla Guillen
// Contact : info@dcdb.it
// Copyright : Leibniz Supercomputing Centre
// Description : Template implementing features to use Units in Operators.
//================================================================================
//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2018-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//================================================================================
#include "JobTSAggregatorOperator.h"
#include <boost/log/sources/record_ostream.hpp>
#include <boost/log/trivial.hpp>
#include <boost/log/utility/formatting_ostream.hpp>
#include <boost/parameter/keyword.hpp>
#include <stddef.h>
#include <sys/types.h>
#include <cstdint>
#include <string>
#include "../../../common/include/logging.h"
#include "../../../common/include/timestamp.h"
#include "../../includes/CommonStatistics.h"
#include "../../includes/QueryEngine.h"
JobTSAggregatorOperator::JobTSAggregatorOperator(const std::string& name) :
OperatorTemplate(name), JobOperatorTemplate(name), number_of_even_quantiles(0) {
}
JobTSAggregatorOperator::~JobTSAggregatorOperator() {
}
void JobTSAggregatorOperator::compute(U_Ptr unit, qeJobData& jobData) {
// Clearing the buffer, if already allocated
_buffer.clear();
size_t elCtr=0;
uint64_t my_timestamp = getTimestamp(); //TODO minus 10 seconds or so...
// Making sure that the aggregation boundaries do not go past the job start/end time
uint64_t jobEnd = jobData.endTime!=0 && my_timestamp > jobData.endTime ? jobData.endTime : my_timestamp;
uint64_t jobStart = jobEnd-my_timestamp < jobData.startTime ? jobData.startTime : jobEnd-my_timestamp;
// Job units are hierarchical, and thus we iterate over all sub-units associated to each single node
for(const auto& subUnit : unit->getSubUnits()) {
// Getting the most recent values as specified in _window
// Since we do not clear the internal buffer, all sensor readings will be accumulated in the same vector
for(const auto& in : subUnit->getInputs()) {
elCtr = _buffer.size();
_queryEngine.querySensor(in->getName(), my_timestamp, my_timestamp, _buffer, false);
if (_buffer.size() <= elCtr) {
LOG(debug) << "Job Operator " << _name << " cannot read from sensor " << in->getName() << "!";
return;
}
}
}
compute_internal(unit, _buffer);
}
void JobTSAggregatorOperator::compute_internal(U_Ptr unit, vector<reading_t> buffer) {
reading_t reading;
AggregatorSensorBase::aggregationOps_t op;
reading.timestamp = getTimestamp();
// Performing the actual aggregation operation
for(const auto& out : unit->getOutputs()) {
op = out->getOperation();
if(op!=AggregatorSensorBase::QTL) {
switch (op) {
case AggregatorSensorBase::SUM:
reading.value = computeSum(buffer);
break;
case AggregatorSensorBase::AVG:
reading.value = computeAvg(buffer);
break;
case AggregatorSensorBase::MIN:
reading.value = computeMin(buffer);
break;
case AggregatorSensorBase::MAX:
reading.value = computeMax(buffer);
break;
case AggregatorSensorBase::STD:
reading.value = computeStd(buffer);
break;
case AggregatorSensorBase::OBS:
reading.value = computeObs(buffer);
break;
default:
LOG(warning) << _name << ": Encountered unknown operation!";
reading.value = 0;
break;
}
out->storeReading(reading);
}
}
if(!_quantileSensors.empty()) {
vector<int64_t> quantiles;
computeEvenQuantiles(buffer, number_of_even_quantiles, quantiles);
for(unsigned idx=0; idx<quantiles.size(); idx++) {
reading.value = quantiles[idx];
_quantileSensors[idx]->storeReading(reading);
}
}
}
void JobTSAggregatorOperator::compute(U_Ptr unit){
//nothing here!
}
//================================================================================
// Name : JobTSAggregatorOperator.h
// Author : Carla Guillen
// Contact : info@dcdb.it
// Copyright : Leibniz Supercomputing Centre
// Description : Template implementing features to use Units in Operators.
//================================================================================
//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2018-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//================================================================================
#ifndef ANALYTICS_OPERATORS_PERSYSTSQL_JOBTSAGGREGATOROPERATOR_H_
#define ANALYTICS_OPERATORS_PERSYSTSQL_JOBTSAGGREGATOROPERATOR_H_
#include <vector>
#include "../../../common/include/cacheentry.h"
#include "../../includes/JobOperatorTemplate.h"
#include "../aggregator/AggregatorSensorBase.h"
class JobTSAggregatorOperator: public JobOperatorTemplate<AggregatorSensorBase>{
public:
JobTSAggregatorOperator(const std::string& name);
virtual ~JobTSAggregatorOperator();
void compute(U_Ptr unit, qeJobData& jobData) override;
unsigned int getNumberOfEvenQuantiles() const {
return number_of_even_quantiles;
}
void setNumberOfEvenQuantiles(unsigned int numberOfEvenQuantiles) {
number_of_even_quantiles = numberOfEvenQuantiles;
}
void setQuantileSensor(AggregatorSBPtr qSensor){
_quantileSensors.push_back(qSensor);
}
private:
std::vector<reading_t> _buffer;
protected:
virtual void compute(U_Ptr unit) override;
void compute_internal(U_Ptr unit, vector<reading_t> buffer);
std::vector<AggregatorSBPtr> _quantileSensors;
unsigned int number_of_even_quantiles;
};
#endif /* ANALYTICS_OPERATORS_PERSYSTSQL_JOBTSAGGREGATOROPERATOR_H_ */
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment