Commit 37a44e0d authored by Alessio Netti's avatar Alessio Netti

Analytics: Job aggregator plugin

- Compiles fine but completely untested for the moment
- Also addressed many bugs for job analyzer templates
parent 49b9fdd6
......@@ -4,7 +4,7 @@ include ../config.mk
CXXFLAGS += -DBOOST_NETWORK_ENABLE_HTTPS -I../common/include -I$(DCDBDEPLOYPATH)/include
LIBS = -L../lib -L$(DCDBDEPLOYPATH)/lib/ -ldl -lboost_system -lboost_thread -lboost_log_setup -lboost_log -lboost_regex -lpthread -rdynamic
ANALYZERS = aggregator
ANALYZERS = aggregator job_aggregator
ifeq ($(OS),Darwin)
BACNET_PORT = bsd
......@@ -45,3 +45,6 @@ analyzers/%.o: CXXFLAGS+= $(PLUGINFLAGS)
libdcdbanalyzer_aggregator.$(LIBEXT): analyzers/aggregator/AggregatorAnalyzer.o analyzers/aggregator/AggregatorConfigurator.o ../common/src/sensornavigator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex
libdcdbanalyzer_job_aggregator.$(LIBEXT): analyzers/aggregator/AggregatorAnalyzer.o analyzers/aggregator/JobAggregatorAnalyzer.o analyzers/aggregator/JobAggregatorConfigurator.o ../common/src/sensornavigator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex
......@@ -55,7 +55,10 @@ void AggregatorAnalyzer::compute(U_Ptr unit) {
return;
}
}
compute_internal(unit, _buffer);
}
void AggregatorAnalyzer::compute_internal(U_Ptr unit, vector<reading_t> *buffer) {
_quantileSensors.clear();
reading_t reading;
AggregatorSensorBase::aggregationOps_t op;
......@@ -87,7 +90,7 @@ void AggregatorAnalyzer::compute(U_Ptr unit) {
} else
_quantileSensors.push_back(out);
}
if(!_quantileSensors.empty()) {
vector<int64_t> result = computeQuantiles(_buffer);
for(unsigned idx=0; idx<result.size(); idx++) {
......
......@@ -32,7 +32,7 @@
#include <math.h>
#include <algorithm>
class AggregatorAnalyzer : public AnalyzerTemplate<AggregatorSensorBase> {
class AggregatorAnalyzer : virtual public AnalyzerTemplate<AggregatorSensorBase> {
public:
......@@ -45,9 +45,11 @@ public:
void printConfig(LOG_LEVEL ll) override;
private:
protected:
void compute(U_Ptr unit) override;
virtual void compute(U_Ptr unit) override;
// Internal method containing the actual logic of the analyzer
void compute_internal(U_Ptr unit, vector<reading_t> *buffer);
// A separate method for each operation implies code redundancy, but also better efficiency and less useless
// variables used by specific operations lying around
int64_t computeSum(vector<reading_t> *buffer);
......
......@@ -30,7 +30,7 @@
#include "../../includes/AnalyzerConfiguratorTemplate.h"
#include "AggregatorAnalyzer.h"
class AggregatorConfigurator : public AnalyzerConfiguratorTemplate<AggregatorAnalyzer, AggregatorSensorBase> {
class AggregatorConfigurator : virtual public AnalyzerConfiguratorTemplate<AggregatorAnalyzer, AggregatorSensorBase> {
public:
AggregatorConfigurator();
......
//================================================================================
// Name : JobAggregatorAnalyzer.cpp
// Author : Alessio Netti
// Copyright : Leibniz Supercomputing Centre
// Description :
//================================================================================
//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2019-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//================================================================================
#include "JobAggregatorAnalyzer.h"
JobAggregatorAnalyzer::JobAggregatorAnalyzer(const std::string& name) :
AnalyzerTemplate(name),
AggregatorAnalyzer(name),
JobAnalyzerTemplate(name) {}
JobAggregatorAnalyzer::~JobAggregatorAnalyzer() {}
void JobAggregatorAnalyzer::compute(U_Ptr unit) {
// Clearing the buffer, if already allocated
if(_buffer)
_buffer->clear();
size_t elCtr=0;
// Job units are hierarchical, and thus we iterate over all sub-units associated to each single node
for(const auto& subUnit : unit->getSubUnits()) {
// Getting the most recent values as specified in _window
// Since we do not clear the internal buffer, all sensor readings will be accumulated in the same vector
for(const auto& in : subUnit->getInputs()) {
elCtr = _buffer == nullptr ? 0 : _buffer->size();
_buffer = _queryEngine.querySensor(in->getName(), _window, 0, _buffer);
if (!_buffer || _buffer->size() <= elCtr) {
LOG(debug) << "Job Analyzer " << _name << " cannot read from sensor " << in->getName() << "!";
return;
}
}
}
compute_internal(unit, _buffer);
}
\ No newline at end of file
//================================================================================
// Name : JobAggregatorAnalyzer.h
// Author : Alessio Netti
// Copyright : Leibniz Supercomputing Centre
// Description :
//================================================================================
//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2019-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//================================================================================
#ifndef PROJECT_JOBAGGREGATORANALYZER_H
#define PROJECT_JOBAGGREGATORANALYZER_H
#include "AggregatorAnalyzer.h"
#include "../../includes/JobAnalyzerTemplate.h"
class JobAggregatorAnalyzer : public AggregatorAnalyzer, public JobAnalyzerTemplate<AggregatorSensorBase> {
public:
JobAggregatorAnalyzer(const std::string& name);
virtual ~JobAggregatorAnalyzer();
private:
void compute(U_Ptr unit) override;
};
#endif //PROJECT_JOBAGGREGATORANALYZER_H
//================================================================================
// Name : JobAggregatorConfigurator.cpp
// Author : Alessio Netti
// Copyright : Leibniz Supercomputing Centre
// Description :
//================================================================================
//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2019-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//================================================================================
#include "JobAggregatorConfigurator.h"
JobAggregatorConfigurator::JobAggregatorConfigurator() : JobAnalyzerConfiguratorTemplate() {
_analyzerName = "aggregator";
_baseName = "sensor";
}
JobAggregatorConfigurator::~JobAggregatorConfigurator() {}
void JobAggregatorConfigurator::sensorBase(AggregatorSensorBase& s, CFG_VAL config) {
BOOST_FOREACH(boost::property_tree::iptree::value_type &val, config)
{
if (boost::iequals(val.first, "operation")) {
std::string opName = val.second.data();
if (opName == "sum")
s.setOperation(AggregatorSensorBase::SUM);
else if (opName == "average")
s.setOperation(AggregatorSensorBase::AVG);
else if (opName == "maximum")
s.setOperation(AggregatorSensorBase::MAX);
else if (opName == "minimum")
s.setOperation(AggregatorSensorBase::MIN);
else if (opName == "std")
s.setOperation(AggregatorSensorBase::STD);
else if (opName == "percentiles")
s.setOperation(AggregatorSensorBase::QTL);
} else if (boost::iequals(val.first, "percentile")) {
size_t quantile = stoull(val.second.data());
if( quantile>0 && quantile<100 )
s.setQuantile(quantile);
}
}
}
void JobAggregatorConfigurator::analyzer(JobAggregatorAnalyzer& a, CFG_VAL config) {
BOOST_FOREACH(boost::property_tree::iptree::value_type &val, config)
{
if (boost::iequals(val.first, "window"))
a.setWindow(stoull(val.second.data()) * 1000000);
}
}
bool JobAggregatorConfigurator::unit(UnitTemplate<AggregatorSensorBase>& u) { return true; }
\ No newline at end of file
//================================================================================
// Name : JobAggregatorConfigurator.h
// Author : Alessio Netti
// Copyright : Leibniz Supercomputing Centre
// Description :
//================================================================================
//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2019-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//================================================================================
#ifndef PROJECT_JOBAGGREGATORCONFIGURATOR_H
#define PROJECT_JOBAGGREGATORCONFIGURATOR_H
#include "JobAggregatorAnalyzer.h"
#include "../../includes/JobAnalyzerConfiguratorTemplate.h"
class JobAggregatorConfigurator : public JobAnalyzerConfiguratorTemplate<JobAggregatorAnalyzer, AggregatorSensorBase> {
public:
JobAggregatorConfigurator();
~JobAggregatorConfigurator();
private:
void sensorBase(AggregatorSensorBase& s, CFG_VAL config) override;
void analyzer(JobAggregatorAnalyzer& a, CFG_VAL config) override;
bool unit(UnitTemplate<AggregatorSensorBase>& u) override;
};
extern "C" AnalyzerConfiguratorInterface* create() {
return new JobAggregatorConfigurator;
}
extern "C" void destroy(AnalyzerConfiguratorInterface* c) {
delete c;
}
#endif //PROJECT_JOBAGGREGATORCONFIGURATOR_H
......@@ -37,7 +37,7 @@
* design of job analyzers.
*/
template <class Analyzer, class SBase = SensorBase>
class JobAnalyzerConfiguratorTemplate : public AnalyzerConfiguratorTemplate<Analyzer, SBase> {
class JobAnalyzerConfiguratorTemplate : virtual public AnalyzerConfiguratorTemplate<Analyzer, SBase> {
// Verifying the types of input classes
static_assert(std::is_base_of<SensorBase, SBase>::value, "SBase must derive from SensorBase!");
......@@ -100,19 +100,19 @@ protected:
delete units;
return false;
}
if(*units.size() > 1) {
if(units->size() > 1) {
LOG(error) << this->_analyzerName << " " << an.getName() << ": Invalid job template unit, please check your configuration!";
delete units;
return false;
}
shared_ptr<UnitTemplate<SBase>> jobUnit = *units[0];
shared_ptr<UnitTemplate<SBase>> jobUnit = units->at(0);
delete units;
an.clearUnits();
if(!constructSensorTopics(*jobUnit, an))
if(!this->constructSensorTopics(*jobUnit, an))
return false;
if (unit(*jobUnit)) {
if (this->unit(*jobUnit)) {
an.addToUnitCache(jobUnit);
LOG(debug) << " Template job unit " + jobUnit->getName() + " generated.";
} else {
......
......@@ -36,7 +36,7 @@
*
*/
template <typename S>
class JobAnalyzerTemplate : public AnalyzerTemplate<S> {
class JobAnalyzerTemplate : virtual public AnalyzerTemplate<S> {
// The template shall only be instantiated for classes which derive from SensorBase
static_assert(is_base_of<SensorBase, S>::value, "S must derive from SensorBase!");
......@@ -81,6 +81,7 @@ public:
AnalyzerTemplate<S>::operator=(other);
_jobDataVec = nullptr;
this->_dynamic = true;
return *this;
}
/**
......@@ -112,7 +113,7 @@ public:
*
* This method must be called anytime operations on units are performed through getUnits().
*/
virtual void releaseUnits() {
virtual void releaseUnits() override {
_unitAccess.store(false);
}
......@@ -144,9 +145,9 @@ public:
vector<qeJobData>* buf = this->_queryEngine.queryJob(jobId, 0, 0, _jobDataVec, true, false);
if(buf) _jobDataVec = buf;
if(buf && !buf->empty()) {
U_Ptr jobUnit = jobDataToUnit(buf[0]);
U_Ptr jobUnit = jobDataToUnit(buf->at(0));
compute(jobUnit);
this->compute(jobUnit);
for (const auto &o : jobUnit->getOutputs()) {
outMap.insert(make_pair(o->getName(), o->getLatestValue()));
o->clearReadingQueue();
......@@ -188,7 +189,7 @@ protected:
* @param jobData a qeJobData struct containing job information
* @return A shared pointer to a job unit object
*/
virtual U_Ptr jobDataToUnit(qeJobData& jobData) {
virtual U_Ptr jobDataToUnit(const qeJobData& jobData) {
string jobTopic = MQTTChecker::jobToTopic(jobData.jobId);
U_Ptr jobUnit = nullptr;
if(!this->_unitCache)
......@@ -215,7 +216,7 @@ protected:
if (!s->isInit())
s->initSensor(this->_cacheSize);
addToUnitCache(jobUnit);
this->addToUnitCache(jobUnit);
}
return jobUnit;
}
......@@ -266,12 +267,12 @@ protected:
// Performing actual computation on each unit
for(const auto& ju : _tempUnits)
compute(ju);
this->compute(ju);
// Acquiring the spinlock to refresh the exposed units
while(_unitAccess.exchange(true)) {}
this->clearUnits();
for(const auto& ju : _tempUnits)
addUnit(ju);
this->addUnit(ju);
_unitAccess.store(false);
_tempUnits.clear();
}
......
......@@ -312,8 +312,7 @@ public:
if(!isConsistent(outputs))
throw invalid_argument("JobUnitGenerator: Incoherent output levels!");
shared_ptr<UnitTemplate<SBase>> jobUnit = make_shared<UnitTemplate<SBase>>();
jobUnit->setName(j);
shared_ptr<UnitTemplate<SBase>> jobUnit = make_shared<UnitTemplate<SBase>>(j);
try {
for (const auto &nodeName : nodes) {
// The unit specified as input must belong to the domain of the outputs
......@@ -327,12 +326,12 @@ public:
}
// Mapping outputs
for(const auto& out : outputs) {
SBase uOut(*out);
uOut.setName(resolveJobString(uOut.getName(), j));
uOut.setMqtt(_navi->buildTopicForNode(j, uOut.getMqtt()));
shared_ptr<SBase> uOut = make_shared<SBase>(*out);
uOut->setName(resolveJobString(uOut->getName(), j));
uOut->setMqtt(_navi->buildTopicForNode(j, uOut->getMqtt()));
// Duplicating the file sink adding the name of each unit to the path
if(uOut.getSinkPath()!="")
uOut.setSinkPath(MQTTChecker::topicToFile(uOut.getMqtt(), uOut.getSinkPath()));
if(uOut->getSinkPath()!="")
uOut->setSinkPath(MQTTChecker::topicToFile(uOut->getMqtt(), uOut->getSinkPath()));
jobUnit->addOutput(uOut);
}
return jobUnit;
......
......@@ -47,7 +47,22 @@ protected:
using S_Ptr = std::shared_ptr<S>;
public:
/**
* @brief Class constructor
*
* @param name Name of this unit
*/
UnitTemplate(const std::string& name) :
UnitInterface(),
_name(name),
_inputMode(SELECTIVE) {
// base inputs and outputs vectors are constructed using iterators
_baseInputs = std::vector<SBasePtr>(_inputs.begin(), _inputs.end());
_baseOutputs = std::vector<SBasePtr>(_outputs.begin(), _outputs.end());
}
/**
* @brief Class constructor
*
......@@ -194,6 +209,13 @@ public:
*/
std::vector<S_Ptr>& getOutputs() { return _outputs; }
/**
* @brief Get the internal vector of sub-units of this unit
*
* @return A reference to the internal vector of sub-unit pointers
*/
std::vector<std::shared_ptr<UnitTemplate<S>>>& getSubUnits() { return _subUnits; }
/**
* @brief Set the inputs of this unit
*
......@@ -252,7 +274,7 @@ public:
* @param sUnit A shared pointer to a UnitTemplate object
*/
void addSubUnit(const std::shared_ptr<UnitTemplate<S>> sUnit) { _subUnits.push_back(sUnit); }
/**
* @brief Prints the current unit configuration
*
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment