Commit 6baa5ecd authored by Alessio Netti's avatar Alessio Netti

DA: Aggregator plugin

- A nicer and refactored version of the "average" plugin
- Allows to perform arbitrary aggregation operations over sensors, like
sum, average, or maximum
parent ad431672
......@@ -4,7 +4,7 @@ include ../common.mk
CXXFLAGS = -std=c++11 -DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG -DBOOST_NETWORK_ENABLE_HTTPS -O2 -g -Wall -Wno-unused-function -Wno-unused-local-typedefs -Wno-deprecated-declarations -Wno-unused-variable -DBOOST_LOG_DYN_LINK -I$(DCDBBASEPATH)/dcdb/common/include -I$(DCDBDEPLOYPATH)/include -DVERSION=\"$(VERSION)\"
LIBS = -L$(DCDBDEPLOYPATH)/lib/ -ldl -lboost_system -lboost_thread -lboost_log_setup -lboost_log -lboost_regex -lpthread -rdynamic
ANALYZERS = average
ANALYZERS = aggregator
ifeq ($(OS),Darwin)
BACNET_PORT = bsd
......@@ -43,5 +43,5 @@ install: install_analyzer
analyzers/%.o: CXXFLAGS+= $(PLUGINFLAGS)
../common/src/sensornavigator.o: CXXFLAGS+= $(PLUGINFLAGS)
libdcdbanalyzer_average.$(LIBEXT): analyzers/average/AverageAnalyzer.o analyzers/average/AverageConfigurator.o ../common/src/sensornavigator.o
libdcdbanalyzer_aggregator.$(LIBEXT): analyzers/aggregator/AggregatorAnalyzer.o analyzers/aggregator/AggregatorConfigurator.o ../common/src/sensornavigator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex
......@@ -385,16 +385,15 @@ PUT https://localhost:8000/analytics/average/avgAnalyzer1/compute?authkey=myToke
# Plugins <a name="plugins"></a>
Here we describe available plugins in DCDBAnalytics, and how to configure them.
## Average Plugin <a name="averagePlugin"></a>
The _Average_ plugin was developed for testing purposes and implements a simple data processing algorithm. Specifically,
this plugin computes and outputs the _sum_, _maximum_ and _average_ of its input sensors. Analyzers in the _Average_
plugin must have three outputs, which are mapped to the three metrics described above.
The configuration parameters specific to the _Average_ plugin are the following:
## Aggregator Plugin <a name="averagePlugin"></a>
The _Aggregator_ plugin implements simple data processing algorithms. Specifically, this plugin allows to perform basic
aggregation operations over a set of input sensors, which are then written as output to one sensor per analyzer.
The configuration parameters specific to the _Aggregator_ plugin are the following:
| Value | Explanation |
|:----- |:----------- |
| window | Length in milliseconds of the time window that is used to retrieve recent readings for the input sensors, starting from the latest one.
| operation | Operation to be performed over the input sensors. Can be "sum", "average", "maximum" or "minimum".
## Writing DCDBAnalytics Plugins <a name="writingPlugins"></a>
Generating a DCDBAnalytics plugin requires implementing a _Analyzer_ and _Configurator_ class which contain all logic
......
//
// Created by Netti, Alessio on 16.01.19.
//
#include "AggregatorAnalyzer.h"
const std::string AggregatorAnalyzer::opNames[] = {"sum", "average", "maximum", "minimum"};
AggregatorAnalyzer::AggregatorAnalyzer(const std::string& name) : AnalyzerTemplate(name) { _window = 0; _op = SUM; }
AggregatorAnalyzer::~AggregatorAnalyzer() {
if(_buffer)
delete _buffer;
}
void AggregatorAnalyzer::printConfig(LOG_LEVEL ll) {
LOG_VAR(ll) << " Window: " << _window;
LOG_VAR(ll) << " Operation: " << opNames[_op];
AnalyzerTemplate<SensorBase>::printConfig(ll);
}
void AggregatorAnalyzer::compute(int unitID) {
switch(_op) {
case SUM:
computeSum(unitID);
break;
case AVG:
computeAvg(unitID);
break;
case MIN:
computeMin(unitID);
break;
case MAX:
computeMax(unitID);
break;
default:
break;
}
}
void AggregatorAnalyzer::computeSum(int unitID) {
uint64_t acc=0;
for(const auto& in : _units[unitID]->getInputs()) {
// Getting the most recent values as specified in _window
_buffer = _queryEngine.querySensor(in->getName(), _window, 0, _buffer);
if(_buffer->empty()) {
LOG(error) << "Analyzer " << _name << " cannot read from sensor " << in->getName() << "!";
return;
}
for(const auto& v : *_buffer)
acc += v.value;
}
reading_t out;
out.timestamp = getTimestamp();
out.value = acc;
_units[unitID]->getOutputs()[0]->storeReading(out);
}
void AggregatorAnalyzer::computeAvg(int unitID) {
uint64_t acc=0, ctr=0;
for(const auto& in : _units[unitID]->getInputs()) {
// Getting the most recent values as specified in _window
_buffer = _queryEngine.querySensor(in->getName(), _window, 0, _buffer);
if(_buffer->empty()) {
LOG(error) << "Analyzer " << _name << " cannot read from sensor " << in->getName() << "!";
return;
}
for(const auto& v : *_buffer) {
acc += v.value;
ctr++;
}
}
acc = ctr > 0 ? acc/ctr : acc;
reading_t out;
out.timestamp = getTimestamp();
out.value = acc;
_units[unitID]->getOutputs()[0]->storeReading(out);
}
void AggregatorAnalyzer::computeMax(int unitID) {
uint64_t acc=0;
for(const auto& in : _units[unitID]->getInputs()) {
// Getting the most recent values as specified in _window
_buffer = _queryEngine.querySensor(in->getName(), _window, 0, _buffer);
if(_buffer->empty()) {
LOG(error) << "Analyzer " << _name << " cannot read from sensor " << in->getName() << "!";
return;
}
for(const auto& v : *_buffer)
if( v.value>acc )
acc = v.value;
}
reading_t out;
out.timestamp = getTimestamp();
out.value = acc;
_units[unitID]->getOutputs()[0]->storeReading(out);
}
void AggregatorAnalyzer::computeMin(int unitID) {
uint64_t acc=0;
bool minInit=false;
for(const auto& in : _units[unitID]->getInputs()) {
// Getting the most recent values as specified in _window
_buffer = _queryEngine.querySensor(in->getName(), _window, 0, _buffer);
if(_buffer->empty()) {
LOG(error) << "Analyzer " << _name << " cannot read from sensor " << in->getName() << "!";
return;
}
for(const auto& v : *_buffer)
if( v.value<acc || !minInit ) {
acc = v.value;
minInit = true;
}
}
reading_t out;
out.timestamp = getTimestamp();
out.value = acc;
_units[unitID]->getOutputs()[0]->storeReading(out);
}
......@@ -2,28 +2,41 @@
// Created by Netti, Alessio on 16.01.19.
//
#ifndef PROJECT_AVERAGEANALYZER_H
#define PROJECT_AVERAGEANALYZER_H
#ifndef PROJECT_AGGREGATORANALYZER_H
#define PROJECT_AGGREGATORANALYZER_H
#include "../../includes/AnalyzerTemplate.h"
class AverageAnalyzer : public AnalyzerTemplate<SensorBase> {
class AggregatorAnalyzer : public AnalyzerTemplate<SensorBase> {
public:
AverageAnalyzer(const std::string& name);
virtual ~AverageAnalyzer();
enum aggregationOps_t { SUM = 0, AVG = 1, MAX = 2, MIN = 3 };
const static std::string opNames[];
AggregatorAnalyzer(const std::string& name);
virtual ~AggregatorAnalyzer();
void setWindow(unsigned long long w) { _window = w; }
void setOperation(aggregationOps_t op) { _op = op; }
unsigned long long getWindow() { return _window; }
aggregationOps_t getOperation() { return _op; }
void printConfig(LOG_LEVEL ll) override;
private:
void compute(int unitID) override;
// A separate method for each operation implies code redundancy, but also better efficiency and less useless
// variables used by specific operations lying around
void computeSum(int unitID);
void computeAvg(int unitID);
void computeMax(int unitID);
void computeMin(int unitID);
vector<reading_t> *_buffer = NULL;
unsigned long long _window;
aggregationOps_t _op;
};
#endif //PROJECT_AVERAGEANALYZER_H
#endif //PROJECT_AGGREGATORANALYZER_H
//
// Created by Netti, Alessio on 16.01.19.
//
#include "AggregatorConfigurator.h"
AggregatorConfigurator::AggregatorConfigurator() : AnalyzerConfiguratorTemplate() {
_analyzerName = "aggregator";
_baseName = "sensor";
}
AggregatorConfigurator::~AggregatorConfigurator() {}
void AggregatorConfigurator::sensorBase(SensorBase& s, CFG_VAL config) {
}
void AggregatorConfigurator::analyzer(AggregatorAnalyzer& a, CFG_VAL config) {
BOOST_FOREACH(boost::property_tree::iptree::value_type &val, config)
{
if (boost::iequals(val.first, "window"))
a.setWindow(stoull(val.second.data()) * 1000000);
else if (boost::iequals(val.first, "operation")) {
std::string opName = val.second.data();
if(opName == AggregatorAnalyzer::opNames[0])
a.setOperation(AggregatorAnalyzer::SUM);
else if(opName == AggregatorAnalyzer::opNames[1])
a.setOperation(AggregatorAnalyzer::AVG);
else if(opName == AggregatorAnalyzer::opNames[2])
a.setOperation(AggregatorAnalyzer::MAX);
else if(opName == AggregatorAnalyzer::opNames[3])
a.setOperation(AggregatorAnalyzer::MIN);
}
}
}
bool AggregatorConfigurator::unit(UnitTemplate<SensorBase>& u) {
if(u.getOutputs().size() != _outputs) {
LOG(error) << "AggregatorAnalyzer Supports only 1 output per unit!";
return false;
}
else
return true;
}
......@@ -2,29 +2,29 @@
// Created by Netti, Alessio on 16.01.19.
//
#ifndef PROJECT_AVERAGECONFIGURATOR_H
#define PROJECT_AVERAGECONFIGURATOR_H
#ifndef PROJECT_AGGREGATORCONFIGURATOR_H
#define PROJECT_AGGREGATORCONFIGURATOR_H
#include "../../includes/AnalyzerConfiguratorTemplate.h"
#include "AverageAnalyzer.h"
#include "AggregatorAnalyzer.h"
class AverageConfigurator : public AnalyzerConfiguratorTemplate<AverageAnalyzer, SensorBase> {
class AggregatorConfigurator : public AnalyzerConfiguratorTemplate<AggregatorAnalyzer, SensorBase> {
public:
AverageConfigurator();
~AverageConfigurator();
AggregatorConfigurator();
~AggregatorConfigurator();
private:
void sensorBase(SensorBase& s, CFG_VAL config) override;
void analyzer(AverageAnalyzer& a, CFG_VAL config) override;
void analyzer(AggregatorAnalyzer& a, CFG_VAL config) override;
bool unit(UnitTemplate<SensorBase>& u) override;
const unsigned _outputs = 3;
const unsigned _outputs = 1;
};
extern "C" AnalyzerConfiguratorInterface* create() {
return new AverageConfigurator;
return new AggregatorConfigurator;
}
extern "C" void destroy(AnalyzerConfiguratorInterface* c) {
......
//
// Created by Netti, Alessio on 16.01.19.
//
#include "AverageAnalyzer.h"
AverageAnalyzer::AverageAnalyzer(const std::string& name) : AnalyzerTemplate(name) { _window = 0; }
AverageAnalyzer::~AverageAnalyzer() {
if(_buffer)
delete _buffer;
}
void AverageAnalyzer::printConfig(LOG_LEVEL ll) {
LOG_VAR(ll) << " Window: " << _window;
AnalyzerTemplate<SensorBase>::printConfig(ll);
}
void AverageAnalyzer::compute(int unitID) {
long long max=0, sum=0, avg=0;
for(const auto& in : _units[unitID]->getInputs()) {
// Getting the most recent values as specified in _window
_buffer = _queryEngine.querySensor(in->getName(), _window, 0, _buffer);
if(_buffer->empty()) {
LOG(error) << "Analyzer " << _name << " cannot read from sensor " << in->getName() << "!";
return;
}
for(const auto& v : *_buffer) {
sum += v.value;
if (v.value > max)
max = v.value;
}
}
reading_t out;
out.timestamp = getTimestamp();
out.value = sum;
_units[unitID]->getOutputs()[0]->storeReading(out);
out.value = max;
_units[unitID]->getOutputs()[1]->storeReading(out);
out.value = sum > 0 ? sum / _units[unitID]->getInputs().size() : 0;
_units[unitID]->getOutputs()[2]->storeReading(out);
}
//
// Created by Netti, Alessio on 16.01.19.
//
#include "AverageConfigurator.h"
AverageConfigurator::AverageConfigurator() : AnalyzerConfiguratorTemplate() {
_analyzerName = "average";
_baseName = "sensor";
}
AverageConfigurator::~AverageConfigurator() {}
void AverageConfigurator::sensorBase(SensorBase& s, CFG_VAL config) {
}
void AverageConfigurator::analyzer(AverageAnalyzer& a, CFG_VAL config) {
BOOST_FOREACH(boost::property_tree::iptree::value_type &val, config)
{
if (boost::iequals(val.first, "window"))
a.setWindow(stoull(val.second.data()) * 1000000);
}
}
bool AverageConfigurator::unit(UnitTemplate<SensorBase>& u) {
if(u.getOutputs().size() != _outputs) {
LOG(error) << "AverageAnalyzer Supports only 2 outputs per unit!";
return false;
}
else
return true;
}
......@@ -2,7 +2,7 @@ global {
mqttPrefix /FF112233445566778899AAB
}
template_average def1 {
template_aggregator def1 {
interval 1000
minValues 3
mqttPart FF0
......@@ -10,9 +10,11 @@ duplicate false
streaming true
}
average avg1 {
aggregator avg1 {
default def1
mqttPart FF0
window 2000
operation sum
input {
......@@ -28,22 +30,15 @@ mqttPart FF0
mqttsuffix 76
}
sensor "<bottomup, filter cpu250>max" {
mqttsuffix 77
}
sensor "<bottomup, filter cpu250>avg" {
mqttsuffix 78
}
}
}
average avg2 {
aggregator avg2 {
default def1
interval 1500
mqttPart FF1
operation average
input {
......@@ -55,14 +50,6 @@ mqttPart FF1
output {
sensor "<bottomup 1>sum" {
mqttsuffix 76
}
sensor "<bottomup 1>max" {
mqttsuffix 77
}
sensor "<bottomup 1>avg" {
mqttsuffix 78
}
......@@ -71,10 +58,11 @@ mqttPart FF1
}
average avg3 {
aggregator avg3 {
default def1
interval 1500
mqttPart FF2
operation maximum
input {
......@@ -84,18 +72,10 @@ mqttPart FF2
output {
sensor "<bottomup 1>sumall" {
mqttsuffix 80
}
sensor "<bottomup 1>maxall" {
sensor "<bottomup 1>maxall" {
mqttsuffix 81
}
sensor "<bottomup 1>avgall" {
mqttsuffix 82
}
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment