Commit 6d2b07a0 authored by Alessio Netti's avatar Alessio Netti
Browse files

Analytics: Aggregator plugin re-worked

- Now supports also standard deviation and percentile computation
- Multiple output sensors per analyzer can be defined, each performing
a different aggregation task
- QueryEngine has been slightly modified so as not to clear the input
sensor reading buffer (if any), thus allowing to accumulate sensor
values with subsequent calls
parent e629215b
......@@ -587,13 +587,19 @@ Here we describe available plugins in DCDBAnalytics, and how to configure them.
## Aggregator Plugin <a name="averagePlugin"></a>
The _Aggregator_ plugin implements simple data processing algorithms. Specifically, this plugin allows to perform basic
aggregation operations over a set of input sensors, which are then written as output to one sensor per analyzer.
aggregation operations over a set of input sensors, which are then written as output.
The configuration parameters specific to the _Aggregator_ plugin are the following:
| Value | Explanation |
|:----- |:----------- |
| window | Length in milliseconds of the time window that is used to retrieve recent readings for the input sensors, starting from the latest one.
| operation | Operation to be performed over the input sensors. Can be "sum", "average", "maximum" or "minimum".
Additionally, output sensors in analyzers of the Aggregator plugin accept the following parameters:
| Value | Explanation |
|:----- |:----------- |
| operation | Operation to be performed over the input sensors. Can be "sum", "average", "maximum", "minimum", "std" or "percentiles".
| percentile | Specific percentile to be computed when using the "percentiles" operation. Can be an integer in the (0,100) range.
## Writing DCDBAnalytics Plugins <a name="writingPlugins"></a>
Generating a DCDBAnalytics plugin requires implementing a _Analyzer_ and _Configurator_ class which contain all logic
......
......@@ -26,9 +26,9 @@
#include "AggregatorAnalyzer.h"
const std::string AggregatorAnalyzer::opNames[] = {"sum", "average", "maximum", "minimum"};
AggregatorAnalyzer::AggregatorAnalyzer(const std::string& name) : AnalyzerTemplate(name) { _window = 0; _op = SUM; }
AggregatorAnalyzer::AggregatorAnalyzer(const std::string& name) : AnalyzerTemplate(name) {
_window = 0;
}
AggregatorAnalyzer::~AggregatorAnalyzer() {
if(_buffer)
......@@ -37,119 +37,123 @@ AggregatorAnalyzer::~AggregatorAnalyzer() {
void AggregatorAnalyzer::printConfig(LOG_LEVEL ll) {
LOG_VAR(ll) << " Window: " << _window;
LOG_VAR(ll) << " Operation: " << opNames[_op];
AnalyzerTemplate<SensorBase>::printConfig(ll);
AnalyzerTemplate<AggregatorSensorBase>::printConfig(ll);
}
void AggregatorAnalyzer::compute(U_Ptr unit) {
switch(_op) {
case SUM:
computeSum(unit);
break;
case AVG:
computeAvg(unit);
break;
case MIN:
computeMin(unit);
break;
case MAX:
computeMax(unit);
break;
default:
break;
}
}
void AggregatorAnalyzer::computeSum(U_Ptr unit) {
int64_t acc=0;
// Clearing the buffer, if already allocated
if(_buffer)
_buffer->clear();
size_t elCtr=0;
for(const auto& in : unit->getInputs()) {
// Getting the most recent values as specified in _window
// Since we do not clear the internal buffer, all sensor readings will be accumulated in the same vector
elCtr = _buffer==nullptr ? 0 : _buffer->size();
_buffer = _queryEngine.querySensor(in->getName(), _window, 0, _buffer);
if(!_buffer || _buffer->empty()) {
if(!_buffer || _buffer->size()<=elCtr) {
LOG(debug) << "Analyzer " << _name << " cannot read from sensor " << in->getName() << "!";
return;
}
for(const auto& v : *_buffer)
acc += v.value;
}
reading_t out;
out.timestamp = getTimestamp();
out.value = acc;
unit->getOutputs()[0]->storeReading(out);
}
void AggregatorAnalyzer::computeAvg(U_Ptr unit) {
int64_t acc=0, ctr=0;
for(const auto& in : unit->getInputs()) {
// Getting the most recent values as specified in _window
_buffer = _queryEngine.querySensor(in->getName(), _window, 0, _buffer);
if(!_buffer || _buffer->empty()) {
LOG(debug) << "Analyzer " << _name << " cannot read from sensor " << in->getName() << "!";
return;
_quantileSensors.clear();
reading_t reading;
AggregatorSensorBase::aggregationOps_t op;
reading.timestamp = getTimestamp();
// Performing the actual aggregation operation
for(const auto& out : unit->getOutputs()) {
op = out->getOperation();
if(op!=AggregatorSensorBase::QTL) {
switch (op) {
case AggregatorSensorBase::SUM:
reading.value = computeSum(_buffer);
break;
case AggregatorSensorBase::AVG:
reading.value = computeAvg(_buffer);
break;
case AggregatorSensorBase::MIN:
reading.value = computeMin(_buffer);
break;
case AggregatorSensorBase::MAX:
reading.value = computeMax(_buffer);
break;
case AggregatorSensorBase::STD:
reading.value = computeStd(_buffer);
break;
default:
break;
}
out->storeReading(reading);
} else
_quantileSensors.push_back(out);
}
if(!_quantileSensors.empty()) {
vector<int64_t> result = computeQuantiles(_buffer);
for(unsigned idx=0; idx<result.size(); idx++) {
reading.value = result[idx];
_quantileSensors[idx]->storeReading(reading);
}
}
}
for(const auto& v : *_buffer) {
int64_t AggregatorAnalyzer::computeSum(vector<reading_t> *buffer) {
int64_t acc=0;
for(const auto& v : *buffer)
acc += v.value;
ctr++;
}
}
return acc;
}
int64_t AggregatorAnalyzer::computeAvg(vector<reading_t> *buffer) {
int64_t acc=0, ctr=buffer->size();
for(const auto& v : *buffer)
acc += v.value;
acc = ctr > 0 ? acc/ctr : acc;
reading_t out;
out.timestamp = getTimestamp();
out.value = acc;
unit->getOutputs()[0]->storeReading(out);
return acc;
}
void AggregatorAnalyzer::computeMax(U_Ptr unit) {
int64_t AggregatorAnalyzer::computeMax(vector<reading_t> *buffer) {
int64_t acc=0;
for(const auto& in : unit->getInputs()) {
// Getting the most recent values as specified in _window
_buffer = _queryEngine.querySensor(in->getName(), _window, 0, _buffer);
if(!_buffer || _buffer->empty()) {
LOG(debug) << "Analyzer " << _name << " cannot read from sensor " << in->getName() << "!";
return;
bool maxInit=false;
for(const auto& v : *_buffer)
if(v.value>acc || !maxInit) {
acc = v.value;
maxInit = true;
}
for(const auto& v : *_buffer)
if( v.value>acc )
acc = v.value;
}
reading_t out;
out.timestamp = getTimestamp();
out.value = acc;
unit->getOutputs()[0]->storeReading(out);
return acc;
}
void AggregatorAnalyzer::computeMin(U_Ptr unit) {
int64_t AggregatorAnalyzer::computeMin(vector<reading_t> *buffer) {
int64_t acc=0;
bool minInit=false;
for(const auto& in : unit->getInputs()) {
// Getting the most recent values as specified in _window
_buffer = _queryEngine.querySensor(in->getName(), _window, 0, _buffer);
if(!_buffer || _buffer->empty()) {
LOG(debug) << "Analyzer " << _name << " cannot read from sensor " << in->getName() << "!";
return;
for(const auto& v : *_buffer)
if(v.value<acc || !minInit) {
acc = v.value;
minInit = true;
}
return acc;
}
for(const auto& v : *_buffer)
if( v.value<acc || !minInit ) {
acc = v.value;
minInit = true;
}
int64_t AggregatorAnalyzer::computeStd(vector<reading_t> *buffer) {
int64_t avg = computeAvg(buffer);
int64_t acc=0, val=0, ctr=buffer->size();
for(const auto& v : *buffer) {
val = v.value - avg;
acc += val*val;
}
acc = ctr > 0 ? sqrt(acc/ctr) : sqrt(acc);
return acc;
}
reading_t out;
out.timestamp = getTimestamp();
out.value = acc;
unit->getOutputs()[0]->storeReading(out);
vector<int64_t> AggregatorAnalyzer::computeQuantiles(vector<reading_t> *buffer) {
size_t idx, mod;
vector<int64_t> result;
// Sorting the sensor reading buffer to extract quantiles
std::sort(buffer->begin(), buffer->end(), [ ](const reading_t& lhs, const reading_t& rhs) { return lhs.value < rhs.value; });
for(const auto& q : _quantileSensors) {
idx = (buffer->size() * q->getQuantile()) / 100;
mod = (buffer->size() * q->getQuantile()) % 100;
result.push_back((mod==0 || idx==buffer->size()-1) ? buffer->at(idx).value : (buffer->at(idx).value + buffer->at(idx+1).value)/2);
}
return result;
}
......@@ -28,21 +28,21 @@
#define PROJECT_AGGREGATORANALYZER_H
#include "../../includes/AnalyzerTemplate.h"
#include "AggregatorSensorBase.h"
#include <math.h>
#include <algorithm>
class AggregatorAnalyzer : public AnalyzerTemplate<SensorBase> {
class AggregatorAnalyzer : public AnalyzerTemplate<AggregatorSensorBase> {
public:
enum aggregationOps_t { SUM = 0, AVG = 1, MAX = 2, MIN = 3 };
const static std::string opNames[];
AggregatorAnalyzer(const std::string& name);
virtual ~AggregatorAnalyzer();
void setWindow(unsigned long long w) { _window = w; }
void setOperation(aggregationOps_t op) { _op = op; }
unsigned long long getWindow() { return _window; }
aggregationOps_t getOperation() { return _op; }
void setWindow(unsigned long long w) { _window = w; }
unsigned long long getWindow() { return _window; }
void printConfig(LOG_LEVEL ll) override;
private:
......@@ -50,15 +50,17 @@ private:
void compute(U_Ptr unit) override;
// A separate method for each operation implies code redundancy, but also better efficiency and less useless
// variables used by specific operations lying around
void computeSum(U_Ptr unit);
void computeAvg(U_Ptr unit);
void computeMax(U_Ptr unit);
void computeMin(U_Ptr unit);
int64_t computeSum(vector<reading_t> *buffer);
int64_t computeAvg(vector<reading_t> *buffer);
int64_t computeMax(vector<reading_t> *buffer);
int64_t computeMin(vector<reading_t> *buffer);
int64_t computeStd(vector<reading_t> *buffer);
vector<int64_t> computeQuantiles(vector<reading_t> *buffer);
vector<reading_t> *_buffer = NULL;
vector<AggregatorSBPtr> _quantileSensors;
unsigned long long _window;
aggregationOps_t _op;
};
#endif //PROJECT_AGGREGATORANALYZER_H
......@@ -33,8 +33,29 @@ AggregatorConfigurator::AggregatorConfigurator() : AnalyzerConfiguratorTemplate(
AggregatorConfigurator::~AggregatorConfigurator() {}
void AggregatorConfigurator::sensorBase(SensorBase& s, CFG_VAL config) {
void AggregatorConfigurator::sensorBase(AggregatorSensorBase& s, CFG_VAL config) {
BOOST_FOREACH(boost::property_tree::iptree::value_type &val, config)
{
if (boost::iequals(val.first, "operation")) {
std::string opName = val.second.data();
if (opName == "sum")
s.setOperation(AggregatorSensorBase::SUM);
else if (opName == "average")
s.setOperation(AggregatorSensorBase::AVG);
else if (opName == "maximum")
s.setOperation(AggregatorSensorBase::MAX);
else if (opName == "minimum")
s.setOperation(AggregatorSensorBase::MIN);
else if (opName == "std")
s.setOperation(AggregatorSensorBase::STD);
else if (opName == "percentiles")
s.setOperation(AggregatorSensorBase::QTL);
} else if (boost::iequals(val.first, "percentile")) {
size_t quantile = stoull(val.second.data());
if( quantile>0 && quantile<100 )
s.setQuantile(quantile);
}
}
}
void AggregatorConfigurator::analyzer(AggregatorAnalyzer& a, CFG_VAL config) {
......@@ -42,25 +63,7 @@ void AggregatorConfigurator::analyzer(AggregatorAnalyzer& a, CFG_VAL config) {
{
if (boost::iequals(val.first, "window"))
a.setWindow(stoull(val.second.data()) * 1000000);
else if (boost::iequals(val.first, "operation")) {
std::string opName = val.second.data();
if(opName == AggregatorAnalyzer::opNames[0])
a.setOperation(AggregatorAnalyzer::SUM);
else if(opName == AggregatorAnalyzer::opNames[1])
a.setOperation(AggregatorAnalyzer::AVG);
else if(opName == AggregatorAnalyzer::opNames[2])
a.setOperation(AggregatorAnalyzer::MAX);
else if(opName == AggregatorAnalyzer::opNames[3])
a.setOperation(AggregatorAnalyzer::MIN);
}
}
}
bool AggregatorConfigurator::unit(UnitTemplate<SensorBase>& u) {
if(u.getOutputs().size() != _outputs) {
LOG(error) << "AggregatorAnalyzer Supports only 1 output per unit!";
return false;
}
else
return true;
}
bool AggregatorConfigurator::unit(UnitTemplate<AggregatorSensorBase>& u) { return true; }
......@@ -30,19 +30,17 @@
#include "../../includes/AnalyzerConfiguratorTemplate.h"
#include "AggregatorAnalyzer.h"
class AggregatorConfigurator : public AnalyzerConfiguratorTemplate<AggregatorAnalyzer, SensorBase> {
class AggregatorConfigurator : public AnalyzerConfiguratorTemplate<AggregatorAnalyzer, AggregatorSensorBase> {
public:
AggregatorConfigurator();
~AggregatorConfigurator();
private:
void sensorBase(SensorBase& s, CFG_VAL config) override;
void sensorBase(AggregatorSensorBase& s, CFG_VAL config) override;
void analyzer(AggregatorAnalyzer& a, CFG_VAL config) override;
bool unit(UnitTemplate<SensorBase>& u) override;
const unsigned _outputs = 1;
bool unit(UnitTemplate<AggregatorSensorBase>& u) override;
};
extern "C" AnalyzerConfiguratorInterface* create() {
......
//================================================================================
// Name : AggregatorSensorBase.h
// Author : Alessio Netti
// Copyright : Leibniz Supercomputing Centre
// Description :
//================================================================================
//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2019-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//================================================================================
#ifndef PROJECT_AGGREGATORSENSORBASE_H
#define PROJECT_AGGREGATORSENSORBASE_H
#include "sensorbase.h"
class AggregatorSensorBase : public SensorBase {
public:
// Enum to identify aggregation operations
enum aggregationOps_t { SUM = 0, AVG = 1, MAX = 2, MIN = 3, STD = 4, QTL = 5 };
// Constructor and destructor
AggregatorSensorBase(const std::string& name) : SensorBase(name) {
_opType = SUM;
_quantile = 50;
}
// Copy constructor
AggregatorSensorBase(AggregatorSensorBase& other) : SensorBase(other) {
_opType = other._opType;
_quantile = other._quantile;
}
virtual ~AggregatorSensorBase() {}
void setOperation(aggregationOps_t op) { _opType = op; }
void setQuantile(size_t q) { _quantile = q; }
aggregationOps_t getOperation() { return _opType; }
size_t getQuantile() { return _quantile; }
void printConfig(LOG_LEVEL ll, LOGGER& lg, unsigned leadingSpaces=16) {
SensorBase::printConfig(ll, lg, leadingSpaces);
std::string leading(leadingSpaces, ' ');
LOG_VAR(ll) << leading << " Operation: " << getOpString(_opType);
if(_opType==QTL)
LOG_VAR(ll) << leading << " Percentile: " << _quantile;
}
protected:
std::string getOpString(aggregationOps_t op) {
std::string opString;
switch(op) {
case SUM:
opString = "sum";
break;
case MAX:
opString = "maximum";
break;
case MIN:
opString = "minimum";
break;
case AVG:
opString = "average";
break;
case STD:
opString = "std";
break;
case QTL:
opString = "percentiles";
break;
default:
opString = "invalid";
break;
}
return opString;
}
aggregationOps_t _opType;
size_t _quantile;
};
using AggregatorSBPtr = std::shared_ptr<AggregatorSensorBase>;
#endif //PROJECT_AGGREGATORSENSORBASE_H
......@@ -12,7 +12,6 @@ streaming true
aggregator avg1 {
default def1
window 2000
operation sum
input {
......@@ -26,6 +25,7 @@ operation sum
sensor "<bottomup, filter cpu250>sum" {
mqttsuffix /sum
operation sum
}
}
......@@ -35,7 +35,6 @@ operation sum
aggregator avg2 {
default def1
interval 1500
operation average
input {
......@@ -49,6 +48,7 @@ operation average
sensor "<bottomup 1>avg" {
mqttsuffix /avg
operation average
}
}
......@@ -59,7 +59,6 @@ aggregator avg3 {
default def1
interval 1500
mqttPart /mypart
operation maximum
input {
......@@ -71,6 +70,7 @@ operation maximum
sensor "<bottomup 1>maxall" {
mqttsuffix /maxall
operation maximum
}
}
......
......@@ -397,7 +397,7 @@ protected:
{
if (boost::iequals(valInner.first, _baseName)) {
LOG(debug) << " I/O " << _baseName << " " << valInner.second.data();
SBase sensor = SBase(valInner.second.data());
SBase sensor(valInner.second.data());
if (readSensorBase(sensor, valInner.second, false)) {
shared_ptr<SBase> sensorPtr = make_shared<SBase>(sensor);
val.first==INPUT_BLOCK ? protoInputs.push_back(sensorPtr) : protoOutputs.push_back(sensorPtr);
......
......@@ -263,7 +263,7 @@ public:
LOG_VAR(ll) << " Unit: " << _name;
LOG_VAR(ll) << " Inputs: ";
for (const auto &i : _inputs)
LOG_VAR(ll) << " " << i->getName();
LOG_VAR(ll) << " " << i->getName();
LOG_VAR(ll) << " Outputs: ";
for (const auto &o : _outputs)
o->printConfig(ll, lg, 20);
......
......@@ -91,7 +91,7 @@ std::vector<qeJobData>* jobQueryCallback(const uint32_t jobId, const uint64_t st
if(!buffer)
buffer = new std::vector<qeJobData>();
buffer->clear();
//buffer->clear();
for(auto& jd : tempList) {
tempQeData.jobId = jd.jobId;
tempQeData.userId = jd.userId;
......@@ -117,8 +117,10 @@ std::vector<reading_t>* sensorQueryCallback(const string& name, const uint64_t s
sid.mqttTopicConvert(topic);
if(mySensorCache.getSensorMap().count(sid) > 0) {
CacheEntry &entry = mySensorCache.getSensorMap()[sid];
// Counting the number of elements in the buffer before accessing the cache
size_t elCtr = (buffer==nullptr) ? 0 : buffer->size();
output = entry.getView(startTs, endTs, buffer, rel);
if (output->size() > 0)
if (output->size() > elCtr)
return output;
}
// If we are here then the sensor was not found in the cache - we need to fetch data from Cassandra
......@@ -135,13 +137,8 @@ std::vector<reading_t>* sensorQueryCallback(const string& name, const uint64_t s
DCDB::TimeStamp start(startTsInt), end(endTsInt);
sensor.query(results, start, end, DCDB::AGGREGATE_NONE);
// Dealing with allocations that may have been performed by the cache search
if(output)
output->clear();
else if(buffer) {
buffer->clear();
output = buffer;
} else
output = new std::vector<reading_t>();
if(!output)