Commit eaac5b55 authored by Alessio Netti's avatar Alessio Netti

Analytics: classifier plugin

- Based on the regressor plugin
parent b6d659be
......@@ -51,6 +51,9 @@ libdcdboperator_smoothing.$(LIBEXT): operators/smoothing/SmoothingOperator.o ope
libdcdboperator_regressor.$(LIBEXT): operators/regressor/RegressorOperator.o operators/regressor/RegressorConfigurator.o ../common/src/sensornavigator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex -lopencv_core -lopencv_ml
libdcdboperator_classifier.$(LIBEXT): operators/regressor/RegressorOperator.o operators/regressor/ClassifierOperator.o operators/regressor/ClassifierConfigurator.o ../common/src/sensornavigator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex -lopencv_core -lopencv_ml
libdcdboperator_clustering.$(LIBEXT): operators/clustering/ClusteringOperator.o operators/clustering/ClusteringConfigurator.o ../common/src/sensornavigator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex -lopencv_core -lopencv_ml
......
......@@ -22,8 +22,9 @@
1. [Aggregator Plugin](#averagePlugin)
2. [Job Aggregator Plugin](#jobaveragePlugin)
3. [Regressor Plugin](#regressorPlugin)
4. [Clustering Plugin](#clusteringPlugin)
5. [Tester Plugin](#testerPlugin)
4. [Classifier Plugin](#classifierPlugin)
5. [Clustering Plugin](#clusteringPlugin)
6. [Tester Plugin](#testerPlugin)
4. [Sink Plugins](#sinkplugins)
1. [File Sink Plugin](#filesinkPlugin)
2. [Writing Plugins](#writingPlugins)
......@@ -869,6 +870,14 @@ Finally, the Regressor plugin supports the following additional REST API actions
| train | Triggers a new training phase for the random forest model. Feature vectors are temporarily collected in-memory until _trainingSamples_ vectors are obtained. Until this moment, the old random forest model is still used to perform prediction.
| importances | Returns the sorted importance values for the input features, together with the respective labels, if available.
## Classifier Plugin <a name="classifierPlugin"></a>
The _Classifier_ plugin, as the name implies, performs machine learning classification. It is based on the Regressor plugin, and as such it also uses OpenCV random forest models. The plugin supplies the same options and has the same behavior as the Regressor plugin, with the following two exceptions:
* The _target_ parameter here indicates a sensor which stores the labels (as numerical integer identifiers) to be used for training and on which classification will be based. The mapping from the integer labels to their text equivalent is left to the users. Moreover, unlike in the
Regressor plugin, the target sensor is always excluded from the feature vectors.
* The _targetDistance_ parameter is not used here, as it is only meaningful for regression.
## Clustering Plugin <a name="clusteringPlugin"></a>
The _Clustering_ plugin implements a gaussian mixture model for performance variation analysis and outlier detection. The plugin is based on the OpenCV library, similarly to the _Regressor_ plugin.
......
template_classifier def1 {
interval 1000
minValues 1
duplicate false
streaming true
window 10000
trainingSamples 3600
}
classifier clf1 {
default def1
window 20000
input {
sensor "<bottomup 1>ctxt"
sensor "<bottomup 1>Active"
sensor "<bottomup>col_user"
sensor "<bottomup>col_system"
sensor "<bottomup>branch_misses"
; This is a sensor containing the labels to be used for training
sensor "<bottomup 1>healthy" {
target true
}
}
output {
sensor "<bottomup 1>healthy_pred" {
mqttsuffix /healthyPred
}
}
}
\ No newline at end of file
//================================================================================
// Name : ClassifierConfigurator.cpp
// Author : Alessio Netti
// Contact : info@dcdb.it
// Copyright : Leibniz Supercomputing Centre
// Description :
//================================================================================
//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2019-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//================================================================================
#include "ClassifierConfigurator.h"
ClassifierConfigurator::ClassifierConfigurator() {
_operatorName = "classifier";
_baseName = "sensor";
}
ClassifierConfigurator::~ClassifierConfigurator() {}
void ClassifierConfigurator::sensorBase(RegressorSensorBase& s, CFG_VAL config) {
BOOST_FOREACH(boost::property_tree::iptree::value_type &val, config)
{
if (boost::iequals(val.first, "target")) {
s.setTrainingTarget(to_bool(val.second.data()));
std::string opName = val.second.data();
}
}
}
void ClassifierConfigurator::operatorAttributes(ClassifierOperator& op, CFG_VAL config) {
BOOST_FOREACH(boost::property_tree::iptree::value_type &val, config)
{
if (boost::iequals(val.first, "trainingSamples"))
op.setTrainingSamples(stoull(val.second.data()));
else if(boost::iequals(val.first, "window"))
op.setAggregationWindow(stoull(val.second.data()) * 1000000);
else if(boost::iequals(val.first, "inputPath"))
op.setInputPath(val.second.data());
else if(boost::iequals(val.first, "outputPath"))
op.setOutputPath(val.second.data());
else if(boost::iequals(val.first, "getImportances"))
op.setComputeImportances(to_bool(val.second.data()));
}
}
bool ClassifierConfigurator::unit(UnitTemplate<RegressorSensorBase>& u) {
if(u.isTopUnit()) {
LOG(error) << " " << _operatorName << ": This operator type only supports flat units!";
return false;
}
bool targetSet=false;
for(const auto& in : u.getInputs())
if(in->getTrainingTarget()) {
if(!targetSet)
targetSet = true;
else {
LOG(error) << _operatorName << ": Only one classification target can be specified!";
return false;
}
}
if(!targetSet) {
LOG(error) << " " << _operatorName << ": No classification target was specified!";
return false;
}
if(u.getOutputs().size()!=1) {
LOG(error) << " " << _operatorName << ": Only one output sensor per unit is allowed!";
return false;
}
return true;
}
\ No newline at end of file
//================================================================================
// Name : ClassifierConfigurator.h
// Author : Alessio Netti
// Contact : info@dcdb.it
// Copyright : Leibniz Supercomputing Centre
// Description :
//================================================================================
//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2019-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//================================================================================
#ifndef PROJECT_CLASSIFIERCONFIGURATOR_H
#define PROJECT_CLASSIFIERCONFIGURATOR_H
#include "../../includes/OperatorConfiguratorTemplate.h"
#include "ClassifierOperator.h"
/**
* @brief Configurator for the classifier plugin.
*
* @ingroup classifier
*/
class ClassifierConfigurator : virtual public OperatorConfiguratorTemplate<ClassifierOperator, RegressorSensorBase> {
public:
ClassifierConfigurator();
virtual ~ClassifierConfigurator();
private:
void sensorBase(RegressorSensorBase& s, CFG_VAL config) override;
void operatorAttributes(ClassifierOperator& op, CFG_VAL config) override;
bool unit(UnitTemplate<RegressorSensorBase>& u) override;
};
extern "C" OperatorConfiguratorInterface* create() {
return new ClassifierConfigurator;
}
extern "C" void destroy(OperatorConfiguratorInterface* c) {
delete c;
}
#endif //PROJECT_CLASSIFIERCONFIGURATOR_H
//================================================================================
// Name : ClassifierOperator.cpp
// Author : Alessio Netti
// Contact : info@dcdb.it
// Copyright : Leibniz Supercomputing Centre
// Description :
//================================================================================
//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2019-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//================================================================================
#include "ClassifierOperator.h"
ClassifierOperator::ClassifierOperator(const std::string& name) : OperatorTemplate(name), RegressorOperator(name) {
_targetDistance = 0;
_includeTarget = false;
}
ClassifierOperator::ClassifierOperator(const ClassifierOperator& other) : OperatorTemplate(other), RegressorOperator(other) {
_targetDistance = 0;
_includeTarget = false;
}
ClassifierOperator::~ClassifierOperator() {}
void ClassifierOperator::printConfig(LOG_LEVEL ll) {
LOG_VAR(ll) << " Window: " << _aggregationWindow;
LOG_VAR(ll) << " Training Sample: " << _trainingSamples;
LOG_VAR(ll) << " Input Path: " << (_modelIn!="" ? _modelIn : std::string("none"));
LOG_VAR(ll) << " Output Path: " << (_modelOut!="" ? _modelOut : std::string("none"));
LOG_VAR(ll) << " Importances: " << (_importances ? "enabled" : "disabled");
OperatorTemplate<RegressorSensorBase>::printConfig(ll);
}
void ClassifierOperator::compute(U_Ptr unit) {
computeFeatureVector(unit);
if (_trainingPending && _streaming) {
if (!_trainingSet)
_trainingSet = new cv::Mat();
if (!_responseSet)
_responseSet = new cv::Mat();
_trainingSet->push_back(*_currentfVector);
// Using an int instead of a float for the responses makes OpenCV interpret the variable as categorical
_currentClass = (int)_currentTarget;
_responseSet->push_back(_currentClass);
if ((uint64_t)_trainingSet->size().height >= _trainingSamples + _targetDistance)
trainRandomForest();
}
if(_rForest.empty() || !(_rForest->isTrained() || (_trainingPending && _streaming)))
throw std::runtime_error("Operator " + _name + ": cannot perform prediction, the model is untrained!");
if(_rForest->isTrained()) {
reading_t predict;
predict.value = (int64_t) _rForest->predict(*_currentfVector);
predict.timestamp = getTimestamp();
unit->getOutputs()[0]->storeReading(predict);
}
}
//================================================================================
// Name : ClassifierOperator.h
// Author : Alessio Netti
// Contact : info@dcdb.it
// Copyright : Leibniz Supercomputing Centre
// Description :
//================================================================================
//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2019-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//================================================================================
#ifndef PROJECT_CLASSIFIEROPERATOR_H
#define PROJECT_CLASSIFIEROPERATOR_H
#include "RegressorOperator.h"
/**
* @brief Classifier operator plugin.
*
* @ingroup classifier
*/
class ClassifierOperator : virtual public RegressorOperator {
public:
ClassifierOperator(const std::string& name);
ClassifierOperator(const ClassifierOperator& other);
virtual ~ClassifierOperator();
void printConfig(LOG_LEVEL ll) override;
protected:
void compute(U_Ptr unit) override;
int _currentClass;
};
#endif //PROJECT_CLASSIFIEROPERATOR_H
\ No newline at end of file
......@@ -35,6 +35,7 @@ RegressorOperator::RegressorOperator(const std::string& name) : OperatorTemplate
_trainingSamples = 256;
_trainingPending = true;
_importances = false;
_includeTarget = true;
_trainingSet = nullptr;
_responseSet = nullptr;
_currentfVector = nullptr;
......@@ -47,6 +48,7 @@ RegressorOperator::RegressorOperator(const RegressorOperator& other) : OperatorT
_targetDistance = other._targetDistance;
_trainingSamples = other._trainingSamples;
_importances = other._importances;
_includeTarget = true;
_trainingPending = true;
_trainingSet = nullptr;
_responseSet = nullptr;
......@@ -194,42 +196,54 @@ void RegressorOperator::computeFeatureVector(U_Ptr unit) {
if (inputs[idx]->getTrainingTarget())
_currentTarget = (float)_buffer.back().value;
// Computing MEAN and SUM OF DIFFERENCES
val = _buffer.front().value;
for(const auto& v : _buffer) {
_mean += v.value;
_diffsum += v.value - val;
val = v.value;
}
_mean /= _buffer.size();
// Computing STD
for(const auto& v : _buffer) {
val = v.value - _mean;
_std += val*val;
if(!inputs[idx]->getTrainingTarget() || _includeTarget) {
// Computing MEAN and SUM OF DIFFERENCES
val = _buffer.front().value;
for (const auto &v : _buffer) {
_mean += v.value;
_diffsum += v.value - val;
val = v.value;
}
_mean /= _buffer.size();
// Computing STD
for (const auto &v : _buffer) {
val = v.value - _mean;
_std += val * val;
}
_std = sqrt(_std / _buffer.size());
// I know, sorting is costly; here, we assume that the aggregation window of sensor data is going to be relatively
// small, in which case the O(log(N)) complexity of the std::sort implementation converges to O(N)
std::sort(_buffer.begin(), _buffer.end(),
[](const reading_t &lhs, const reading_t &rhs) { return lhs.value < rhs.value; });
// Computing 25th PERCENTILE
qId = (_buffer.size() * 25) / 100;
qMod = (_buffer.size() * 25) % 100;
_qtl25 = (qMod == 0 || qId == _buffer.size() - 1) ? _buffer[qId].value :
(_buffer[qId].value + _buffer[qId + 1].value) / 2;
// Computing 75th PERCENTILE
qId = (_buffer.size() * 75) / 100;
qMod = (_buffer.size() * 75) % 100;
_qtl75 = (qMod == 0 || qId == _buffer.size() - 1) ? _buffer[qId].value :
(_buffer[qId].value + _buffer[qId + 1].value) / 2;
fIdx = idx * REG_NUMFEATURES;
// Casting and storing the statistical features
_currentfVector->at<float>(fIdx) = (float) _mean;
_currentfVector->at<float>(fIdx + 1) = (float) _std;
_currentfVector->at<float>(fIdx + 2) = (float) _diffsum;
_currentfVector->at<float>(fIdx + 3) = (float) _qtl25;
_currentfVector->at<float>(fIdx + 4) = (float) _qtl75;
_currentfVector->at<float>(fIdx + 5) = (float) _buffer[_buffer.size() - 1].value;
} else {
_currentfVector->at<float>(fIdx) = 0.0f;
_currentfVector->at<float>(fIdx + 1) = 0.0f;
_currentfVector->at<float>(fIdx + 2) = 0.0f;
_currentfVector->at<float>(fIdx + 3) = 0.0f;
_currentfVector->at<float>(fIdx + 4) = 0.0f;
_currentfVector->at<float>(fIdx + 5) = 0.0f;
}
_std = sqrt(_std/_buffer.size());
// I know, sorting is costly; here, we assume that the aggregation window of sensor data is going to be relatively
// small, in which case the O(log(N)) complexity of the std::sort implementation converges to O(N)
std::sort(_buffer.begin(), _buffer.end(), [ ](const reading_t& lhs, const reading_t& rhs) { return lhs.value < rhs.value; });
// Computing 25th PERCENTILE
qId = (_buffer.size() * 25) / 100;
qMod = (_buffer.size() * 25) % 100;
_qtl25 = (qMod==0 || qId==_buffer.size()-1) ? _buffer[qId].value : (_buffer[qId].value + _buffer[qId+1].value)/2;
// Computing 75th PERCENTILE
qId = (_buffer.size() * 75) / 100;
qMod = (_buffer.size() * 75) % 100;
_qtl75 = (qMod==0 || qId==_buffer.size()-1) ? _buffer[qId].value : (_buffer[qId].value + _buffer[qId+1].value)/2;
fIdx = idx * REG_NUMFEATURES;
// Casting and storing the statistical features
_currentfVector->at<float>(fIdx) = (float)_mean;
_currentfVector->at<float>(fIdx+1) = (float)_std;
_currentfVector->at<float>(fIdx+2) = (float)_diffsum;
_currentfVector->at<float>(fIdx+3) = (float)_qtl25;
_currentfVector->at<float>(fIdx+4) = (float)_qtl75;
_currentfVector->at<float>(fIdx+5) = (float)_buffer[_buffer.size()-1].value;
}
//LOG(error) << "Target: " << _currentTarget;
//LOG(error) << "Vector: ";
......
......@@ -88,6 +88,7 @@ protected:
unsigned long long _targetDistance;
bool _trainingPending;
bool _importances;
bool _includeTarget;
vector<reading_t> _buffer;
cv::Ptr<cv::ml::RTrees> _rForest;
......
......@@ -6,7 +6,7 @@ DCDBDEPLOYPATH ?= $(DCDBBASEPATH)/install
PLUGINS = sysfs ipmi pdu bacnet snmp procfs tester gpfsmon msr
# data analytics plugins to be built
OPERATORS = aggregator smoothing regressor clustering job_aggregator testeroperator filesink smucngperf persystsql
OPERATORS = aggregator smoothing regressor classifier clustering job_aggregator testeroperator filesink smucngperf persystsql
DEFAULT_VERSION = 0.4
GIT_VERSION = $(shell git describe --tags 2>/dev/null|sed 's/-\([0-9]*\)/.\1/')
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment