Commit 5549140f authored by Alessio Netti's avatar Alessio Netti

Analytics: Regression plugin

- An OpenCV-based regression plugin using Random Forest models
- Also added a few missing copy constructors
parent cc8c0d49
include ../config.mk
#include ../common.mk
CXXFLAGS += -DBOOST_NETWORK_ENABLE_HTTPS -I../common/include -I$(DCDBDEPLOYPATH)/include
CXXFLAGS += -DBOOST_NETWORK_ENABLE_HTTPS -I../common/include -I$(DCDBDEPLOYPATH)/include -I$(DCDBDEPLOYPATH)/include/opencv4
LIBS = -L../lib -L$(DCDBDEPLOYPATH)/lib/ -ldl -lboost_system -lboost_thread -lboost_log_setup -lboost_log -lboost_regex -lpthread -rdynamic
ANALYZERS = aggregator job_aggregator
ANALYZERS = aggregator regressor job_aggregator
ifeq ($(OS),Darwin)
BACNET_PORT = bsd
......@@ -46,5 +46,8 @@ analyzers/%.o: CXXFLAGS+= $(PLUGINFLAGS)
libdcdbanalyzer_aggregator.$(LIBEXT): analyzers/aggregator/AggregatorAnalyzer.o analyzers/aggregator/AggregatorConfigurator.o ../common/src/sensornavigator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex
libdcdbanalyzer_regressor.$(LIBEXT): analyzers/regressor/RegressorAnalyzer.o analyzers/regressor/RegressorConfigurator.o ../common/src/sensornavigator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex -lopencv_core -lopencv_ml
libdcdbanalyzer_job_aggregator.$(LIBEXT): analyzers/aggregator/AggregatorAnalyzer.o analyzers/aggregator/JobAggregatorAnalyzer.o analyzers/aggregator/JobAggregatorConfigurator.o ../common/src/sensornavigator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex
......@@ -30,6 +30,10 @@ AggregatorAnalyzer::AggregatorAnalyzer(const std::string& name) : AnalyzerTempla
_window = 0;
}
AggregatorAnalyzer::AggregatorAnalyzer(const AggregatorAnalyzer& other) : AnalyzerTemplate(other) {
_window = other._window;
}
AggregatorAnalyzer::~AggregatorAnalyzer() {
if(_buffer)
delete _buffer;
......
......@@ -42,6 +42,8 @@ class AggregatorAnalyzer : virtual public AnalyzerTemplate<AggregatorSensorBase>
public:
AggregatorAnalyzer(const std::string& name);
AggregatorAnalyzer(const AggregatorAnalyzer& other);
virtual ~AggregatorAnalyzer();
void setWindow(unsigned long long w) { _window = w; }
......
......@@ -26,11 +26,13 @@
#include "JobAggregatorAnalyzer.h"
JobAggregatorAnalyzer::JobAggregatorAnalyzer(const std::string& name) :
JobAggregatorAnalyzer::JobAggregatorAnalyzer(const std::string& name) :
AnalyzerTemplate(name),
AggregatorAnalyzer(name),
JobAnalyzerTemplate(name) {}
JobAggregatorAnalyzer::JobAggregatorAnalyzer(const JobAggregatorAnalyzer& other) : AnalyzerTemplate(other._name), AggregatorAnalyzer(other), JobAnalyzerTemplate(other._name) {}
JobAggregatorAnalyzer::~JobAggregatorAnalyzer() {}
void JobAggregatorAnalyzer::compute(U_Ptr unit, qeJobData& jobData) {
......
......@@ -39,6 +39,8 @@ class JobAggregatorAnalyzer : public AggregatorAnalyzer, public JobAnalyzerTempl
public:
JobAggregatorAnalyzer(const std::string& name);
JobAggregatorAnalyzer(const JobAggregatorAnalyzer& other);
virtual ~JobAggregatorAnalyzer();
protected:
......
//================================================================================
// Name : RegressorAnalyzer.cpp
// Author : Alessio Netti
// Copyright : Leibniz Supercomputing Centre
// Description :
//================================================================================
//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2019-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//================================================================================
#include "RegressorAnalyzer.h"
RegressorAnalyzer::RegressorAnalyzer(const std::string& name) : AnalyzerTemplate(name) {
_modelIn = "";
_modelOut = "";
_aggregationWindow = 0;
_targetDistance = 1;
_trainingSamples = 256;
_trainingPending = true;
_trainingSet = nullptr;
_responseSet = nullptr;
_currentfVector = nullptr;
_buffer = nullptr;
}
RegressorAnalyzer::RegressorAnalyzer(const RegressorAnalyzer& other) : AnalyzerTemplate(other) {
_modelIn = other._modelIn;
//_modelOut = other._modelOut;
_aggregationWindow = other._aggregationWindow;
_targetDistance = other._targetDistance;
_trainingSamples = other._trainingSamples;
}
RegressorAnalyzer::~RegressorAnalyzer() {
if(_trainingSet)
delete _trainingSet;
if(_responseSet)
delete _responseSet;
if(_currentfVector)
delete _currentfVector;
if(_buffer)
delete _buffer;
_rForest.release();
_buffer = nullptr;
_currentfVector = nullptr;
_trainingSet = nullptr;
_responseSet = nullptr;
}
restResponse_t RegressorAnalyzer::REST(const string& action, const unordered_map<string, string>& queries) {
restResponse_t resp;
if(action=="train") {
resp.response = "Re-training triggered for regressor " + this->_name + "!";
this->_trainingPending = true;
} else
throw invalid_argument("Unknown plugin action " + action + " requested!");
return resp;
}
void RegressorAnalyzer::init(boost::asio::io_service& io) {
bool useDefault=true;
if(_modelIn!="") {
try {
_rForest = cv::ml::RTrees::load(_modelIn);
if(!_rForest->isTrained() || _units.empty() || _units[0]->getInputs().size()*REG_NUMFEATURES!=_rForest->getVarCount())
LOG(error) << "Analyzer " + _name + ": incompatible model, falling back to default!";
else {
_trainingPending = false;
useDefault = false;
}
} catch(const std::exception& e) {
LOG(error) << "Analyzer " + _name + ": cannot load model from file, falling back to default!"; }
}
if(useDefault)
_rForest = cv::ml::RTrees::create();
AnalyzerTemplate<RegressorSensorBase>::init(io);
}
void RegressorAnalyzer::printConfig(LOG_LEVEL ll) {
LOG_VAR(ll) << " Window: " << _aggregationWindow;
LOG_VAR(ll) << " Target Distance: " << _targetDistance;
LOG_VAR(ll) << " Training Sample: " << _trainingSamples;
LOG_VAR(ll) << " Input Path: " << (_modelIn!="" ? _modelIn : std::string("none"));
LOG_VAR(ll) << " Output Path: " << (_modelOut!="" ? _modelOut : std::string("none"));
AnalyzerTemplate<RegressorSensorBase>::printConfig(ll);
}
void RegressorAnalyzer::compute(U_Ptr unit) {
computeFeatureVector(unit);
if (_trainingPending && _streaming) {
if (!_trainingSet)
_trainingSet = new cv::Mat();
if (!_responseSet)
_responseSet = new cv::Mat();
_trainingSet->push_back(*_currentfVector);
_responseSet->push_back(_currentTarget);
if (_trainingSet->size().height >= _trainingSamples + _targetDistance)
trainRandomForest();
}
if(_rForest.empty() || !(_rForest->isTrained() || (_trainingPending && _streaming)))
throw std::runtime_error("Analyzer " + _name + ": cannot perform prediction, the model is untrained!");
if(_rForest->isTrained()) {
reading_t predict;
predict.value = (int64_t) _rForest->predict(*_currentfVector);
//LOG(error) << "FVector" << *_currentfVector;
//LOG(error) << "Prediction: " << _rForest->predict(*_currentfVector);
predict.timestamp = getTimestamp();
unit->getOutputs()[0]->storeReading(predict);
}
}
void RegressorAnalyzer::trainRandomForest() {
if(!_trainingSet || _rForest.empty())
throw std::runtime_error("Analyzer " + _name + ": cannot perform training, missing model!");
if(_responseSet->size().height <= _targetDistance)
throw std::runtime_error("Analyzer " + _name + ": cannot perform training, insufficient data!");
// Shifting the training and response sets so as to obtain the desired prediction distance
*_responseSet = _responseSet->rowRange(_targetDistance, _responseSet->size().height-1);
*_trainingSet = _trainingSet->rowRange(0, _trainingSet->size().height-1-_targetDistance);
//LOG(error) << "Training set:";
//LOG(error) << *_trainingSet;
//LOG(error) << "Response set:";
//LOG(error) << *_responseSet;
if(!_rForest->train(*_trainingSet, cv::ml::ROW_SAMPLE, *_responseSet))
throw std::runtime_error("Analyzer " + _name + ": model training failed!");
delete _trainingSet;
_trainingSet = nullptr;
delete _responseSet;
_responseSet = nullptr;
_trainingPending = false;
LOG(info) << "Analyzer " + _name + ": model training performed.";
if(_modelOut!="") {
try {
_rForest->save(_modelOut);
} catch(const std::exception& e) {
LOG(error) << "Analyzer " + _name + ": cannot save model to file!"; }
}
}
void RegressorAnalyzer::computeFeatureVector(U_Ptr unit) {
if(!_currentfVector)
_currentfVector = new cv::Mat(1, unit->getInputs().size()*REG_NUMFEATURES, CV_32F);
int64_t val;
size_t qId, qMod, idx, fIdx;
std::vector<RegressorSBPtr>& inputs = unit->getInputs();
for(idx=0; idx<inputs.size(); idx++) {
_mean=0; _std=0; _diffsum=0; _qtl25=0; _qtl75=0;
if(_buffer)
_buffer->clear();
_buffer = _queryEngine.querySensor(inputs[idx]->getName(), _aggregationWindow, 0, _buffer);
if(!_buffer || _buffer->empty())
throw std::runtime_error("Analyzer " + _name + ": cannot read from sensor " + inputs[idx]->getName() + "!");
if (inputs[idx]->getTrainingTarget())
_currentTarget = (float)_buffer->back().value;
// Computing MEAN and SUM OF DIFFERENCES
val = _buffer->front().value;
for(const auto& v : *_buffer) {
_mean += v.value;
_diffsum += v.value - val;
val = v.value;
}
_mean /= _buffer->size();
// Computing STD
for(const auto& v : *_buffer) {
val = v.value - _mean;
_std += val*val;
}
_std = sqrt(_std/_buffer->size());
// I know, sorting is costly; here, we assume that the aggregation window of sensor data is going to be relatively
// small, in which case the O(log(N)) complexity of the std::sort implementation converges to O(N)
std::sort(_buffer->begin(), _buffer->end(), [ ](const reading_t& lhs, const reading_t& rhs) { return lhs.value < rhs.value; });
// Computing 25th PERCENTILE
qId = (_buffer->size() * 25) / 100;
qMod = (_buffer->size() * 25) % 100;
_qtl25 = (qMod==0 || qId==_buffer->size()-1) ? _buffer->at(qId).value : (_buffer->at(qId).value + _buffer->at(qId+1).value)/2;
// Computing 75th PERCENTILE
qId = (_buffer->size() * 75) / 100;
qMod = (_buffer->size() * 75) % 100;
_qtl75 = (qMod==0 || qId==_buffer->size()-1) ? _buffer->at(qId).value : (_buffer->at(qId).value + _buffer->at(qId+1).value)/2;
fIdx = idx * REG_NUMFEATURES;
// Casting and storing the statistical features
_currentfVector->at<float>(fIdx) = (float)_mean;
_currentfVector->at<float>(fIdx+1) = (float)_std;
_currentfVector->at<float>(fIdx+2) = (float)_diffsum;
_currentfVector->at<float>(fIdx+3) = (float)_qtl25;
_currentfVector->at<float>(fIdx+4) = (float)_qtl75;
}
//LOG(error) << "Target: " << _currentTarget;
//LOG(error) << "Vector: ";
//for(idx=0; idx<_currentfVector->size().width;idx++)
// LOG(error) << _currentfVector->at<float>(idx);
}
//================================================================================
// Name : RegressorAnalyzer.h
// Author : Alessio Netti
// Copyright : Leibniz Supercomputing Centre
// Description :
//================================================================================
//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2019-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//================================================================================
#ifndef PROJECT_REGRESSORANALYZER_H
#define PROJECT_REGRESSORANALYZER_H
#include "../../includes/AnalyzerTemplate.h"
#include "RegressorSensorBase.h"
#include "opencv4/opencv2/core/mat.hpp"
#include "opencv4/opencv2/core/cvstd.hpp"
#include "opencv4/opencv2/ml.hpp"
#include <math.h>
#define REG_NUMFEATURES 5
/**
* @brief Regressor analyzer plugin.
*
* @ingroup regressor
*/
class RegressorAnalyzer : virtual public AnalyzerTemplate<RegressorSensorBase> {
public:
RegressorAnalyzer(const std::string& name);
RegressorAnalyzer(const RegressorAnalyzer& other);
virtual ~RegressorAnalyzer();
virtual restResponse_t REST(const string& action, const unordered_map<string, string>& queries) override;
virtual void init(boost::asio::io_service& io) override;
void setInputPath(std::string in) { _modelIn = in; }
void setOutputPath(std::string out) { _modelOut = out; }
void setAggregationWindow(unsigned long long a) { _aggregationWindow = a; }
void setTrainingSamples(unsigned long long t) { _trainingSamples = t; }
void setTargetDistance(unsigned long long d) { _targetDistance = d; }
void triggerTraining() { _trainingPending = true;}
std::string getInputPath() { return _modelIn;}
std::string getOutputPath() { return _modelOut; }
unsigned long long getAggregationWindow() { return _aggregationWindow; }
unsigned long long getTrainingSamples() { return _trainingSamples; }
void printConfig(LOG_LEVEL ll) override;
protected:
virtual void compute(U_Ptr unit) override;
void computeFeatureVector(U_Ptr unit);
void trainRandomForest();
std::string _modelOut;
std::string _modelIn;
unsigned long long _aggregationWindow;
unsigned long long _trainingSamples;
unsigned long long _targetDistance;
bool _trainingPending;
vector<reading_t> *_buffer;
cv::Ptr<cv::ml::RTrees> _rForest;
cv::Mat *_trainingSet;
cv::Mat *_responseSet;
cv::Mat *_currentfVector;
float _currentTarget;
// Misc buffers
int64_t _mean;
int64_t _std;
int64_t _diffsum;
int64_t _qtl25;
int64_t _qtl75;
};
#endif //PROJECT_REGRESSORANALYZER_H
//================================================================================
// Name : RegressorConfigurator.cpp
// Author : Alessio Netti
// Copyright : Leibniz Supercomputing Centre
// Description :
//================================================================================
//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2019-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//================================================================================
#include "RegressorConfigurator.h"
RegressorConfigurator::RegressorConfigurator() : AnalyzerConfiguratorTemplate() {
_analyzerName = "regressor";
_baseName = "sensor";
}
RegressorConfigurator::~RegressorConfigurator() {}
void RegressorConfigurator::sensorBase(RegressorSensorBase& s, CFG_VAL config) {
BOOST_FOREACH(boost::property_tree::iptree::value_type &val, config)
{
if (boost::iequals(val.first, "target")) {
s.setTrainingTarget(to_bool(val.second.data()));
std::string opName = val.second.data();
}
}
}
void RegressorConfigurator::analyzer(RegressorAnalyzer& a, CFG_VAL config) {
BOOST_FOREACH(boost::property_tree::iptree::value_type &val, config)
{
if (boost::iequals(val.first, "trainingSamples"))
a.setTrainingSamples(stoull(val.second.data()));
else if(boost::iequals(val.first, "window"))
a.setAggregationWindow(stoull(val.second.data()) * 1000000);
else if(boost::iequals(val.first, "targetDistance"))
a.setTargetDistance(stoull(val.second.data()));
else if(boost::iequals(val.first, "inputPath"))
a.setInputPath(val.second.data());
else if(boost::iequals(val.first, "outputPath"))
a.setOutputPath(val.second.data());
}
}
bool RegressorConfigurator::unit(UnitTemplate<RegressorSensorBase>& u) {
bool targetSet=false;
for(const auto& in : u.getInputs())
if(in->getTrainingTarget()) {
if(!targetSet)
targetSet = true;
else {
LOG(error) << _analyzerName << ": Only one regression target can be specified!";
return false;
}
}
if(!targetSet) {
LOG(error) << _analyzerName << ": No regression target was specified!";
return false;
}
if(u.getOutputs().size()!=1) {
LOG(error) << _analyzerName << ": Only one output sensor per unit is allowed!";
return false;
}
return true;
}
//================================================================================
// Name : RegressorConfigurator.h
// Author : Alessio Netti
// Copyright : Leibniz Supercomputing Centre
// Description :
//================================================================================
//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2019-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//================================================================================
#ifndef PROJECT_REGRESSORCONFIGURATOR_H
#define PROJECT_REGRESSORCONFIGURATOR_H
#include "../../includes/AnalyzerConfiguratorTemplate.h"
#include "RegressorAnalyzer.h"
/**
* @brief Configurator for the regressor plugin.
*
* @ingroup regressor
*/
class RegressorConfigurator : virtual public AnalyzerConfiguratorTemplate<RegressorAnalyzer, RegressorSensorBase> {
public:
RegressorConfigurator();
~RegressorConfigurator();
private:
void sensorBase(RegressorSensorBase& s, CFG_VAL config) override;
void analyzer(RegressorAnalyzer& a, CFG_VAL config) override;
bool unit(UnitTemplate<RegressorSensorBase>& u) override;
};
extern "C" AnalyzerConfiguratorInterface* create() {
return new RegressorConfigurator;
}
extern "C" void destroy(AnalyzerConfiguratorInterface* c) {
delete c;
}
#endif //PROJECT_REGRESSORCONFIGURATOR_H
//================================================================================
// Name : RegressorSensorBase.h
// Author : Alessio Netti
// Copyright : Leibniz Supercomputing Centre
// Description :
//================================================================================
//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2019-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//================================================================================
/**
* @defgroup regressor Regressor analyzer plugin.
* @ingroup analyzer
*
* @brief Regressor analyzer plugin.
*/
#ifndef PROJECT_REGRESSORSENSORBASE_H
#define PROJECT_REGRESSORSENSORBASE_H
#include "sensorbase.h"
/**
* @brief Sensor base for regressor plugin
*
* @ingroup regressor
*/
class RegressorSensorBase : public SensorBase {
public:
// Constructor and destructor
RegressorSensorBase(const std::string& name) : SensorBase(name) {
_trainingTarget = false;
}
// Copy constructor
RegressorSensorBase(RegressorSensorBase& other) : SensorBase(other) {
_trainingTarget = other._trainingTarget;
}
virtual ~RegressorSensorBase() {}
void setTrainingTarget(bool t) { _trainingTarget=t; }