In January 2021 we will introduce a 10 GB quota for project repositories. Higher limits for individual projects will be available on request. Please see https://doku.lrz.de/display/PUBLIC/GitLab for more information.

Commit 02fcff78 authored by Alessio Netti's avatar Alessio Netti

Analytics: GMM clustering plugin

- A plugin based on gaussian mixture models to enable clustering for
performance variation analysis or outlier detection
parent 3d9aed4f
......@@ -51,6 +51,9 @@ libdcdboperator_smoothing.$(LIBEXT): operators/smoothing/SmoothingOperator.o ope
libdcdboperator_regressor.$(LIBEXT): operators/regressor/RegressorOperator.o operators/regressor/RegressorConfigurator.o ../common/src/sensornavigator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex -lopencv_core -lopencv_ml
libdcdboperator_clustering.$(LIBEXT): operators/clustering/ClusteringOperator.o operators/clustering/ClusteringConfigurator.o ../common/src/sensornavigator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex -lopencv_core -lopencv_ml
libdcdboperator_job_aggregator.$(LIBEXT): operators/aggregator/AggregatorOperator.o operators/aggregator/JobAggregatorOperator.o operators/aggregator/JobAggregatorConfigurator.o ../common/src/sensornavigator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex
......
......@@ -22,7 +22,8 @@
1. [Aggregator Plugin](#averagePlugin)
2. [Job Aggregator Plugin](#jobaveragePlugin)
3. [Regressor Plugin](#regressorPlugin)
4. [Tester Plugin](#testerPlugin)
4. [Clustering Plugin](#clusteringPlugin)
5. [Tester Plugin](#testerPlugin)
4. [Sink Plugins](#sinkplugins)
1. [File Sink Plugin](#filesinkPlugin)
2. [Writing Plugins](#writingPlugins)
......@@ -853,13 +854,37 @@ Additionally, input sensors in operators of the Regressor plugin accept the foll
|:----- |:----------- |
| target | Boolean value. If true, this sensor represents the target for regression. Every unit in operators of the regressor plugin must have excatly one target sensor.
Finally, the Regressor plugin supports the following additional REST API action:
Finally, the Regressor plugin supports the following additional REST API actions:
| Action | Explanation |
|:----- |:----------- |
| train | Triggers a new training phase for the random forest model. Feature vectors are temporarily collected in-memory until _trainingSamples_ vectors are obtained. Until this moment, the old random forest model is still used to perform prediction.
| importances | Returns the sorted importance values for the input features, together with the respective labels, if available.
## Clustering Plugin <a name="clusteringPlugin"></a>
The _Clustering_ plugin implements a gaussian mixture model for performance variation analysis and outlier detection. The plugin is based on the OpenCV library, similarly to the _Regressor_ plugin.
The input sensors of operators define the axes of the clustering space and hence the number of dimensions of the associated gaussian components. The units of the operator, instead, define the number of points in the clustering space: for this reason,
the Clustering plugin employs hierarchical units, so that the clustering is performed once for all present sub-units, at each computation interval. When prediction is performed (after training of the GMM model, depending on the _reuseModel_ attribute) the
label of the gaussian component to which each sub-unit belongs is stored in the only output sensor of that sub-unit. Operators of the Clustering plugin support the following configuration parameters:
| Value | Explanation |
|:----- |:----------- |
| window | Length in milliseconds of the time window that is used to retrieve recent readings for the input sensors, starting from the latest one.
| numComponents | Number of gaussian components in the mixture model.
| reuseModel | Boolean value. If false, the GMM model is trained at each computation, otherwise only once or when the "train" REST API action is used. Default is false.
| outlierCut | Threshold used to determine outliers when performing the Mahalanobis distance.
| inputPath | Path of a file from which a pre-trained random forest model must be loaded.
| outputPath | Path of a file to which the random forest model trained at runtime must be saved.
The Clustering plugin does not provide any additional configuration parameters for its input and output sensors.
However, it supports the following additional REST API actions:
| Action | Explanation |
|:----- |:----------- |
| train | Triggers a new training phase for the gaussian mixture model. At the next computation interval, the feature vectors of all units of the operator are combined to perform training, after which predicted labels are given as output.
| means | Returns the means of the generated gaussian components in the trained mixture model.
## Tester Plugin <a name="testerPlugin"></a>
The _Tester_ plugin can be used to test the functionality and performance of the query engine, as well as of the Unit System. It will perform a specified number of queries over the set of input sensors for each unit, and then output as a sensor the total number of retrieved readings. The following are the configuration parameters for operators in the _Tester_ plugin:
......
template_clustering def1 {
interval 3600000
minValues 1
streaming true
window 3600000
}
clustering cl1 {
default def1
window 5000
input {
; The input sensors define the dimensions of the clustering space
sensor "<bottomup 1>power"
sensor "<bottomup 1>temp"
sensor "<bottomup 1>col_idle"
}
output {
; The only output is the gaussian component label for each point
sensor "<bottomup 1>gmmLabel" {
mqttsuffix /gmmLabel
}
}
}
......@@ -271,7 +271,7 @@ public:
const list<std::string>& subNames, string mqttPrefix="", bool enforceTopics=false, bool relaxed=false) {
if (tUnit->isTopUnit()) {
if (tUnit->getSubUnits().size() != 1 || tUnit->getBaseOutputs().empty())
if (tUnit->getSubUnits().size() != 1)
throw invalid_argument("UnitGenerator: hierarchical template unit is malformed!");
if(subNames.empty() && u!=SensorNavigator::rootKey)
throw invalid_argument("UnitGenerator: only root unit is supported for this template type!");
......
//================================================================================
// Name : ClusteringConfigurator.cpp
// Author : Alessio Netti
// Contact : info@dcdb.it
// Copyright : Leibniz Supercomputing Centre
// Description :
//================================================================================
//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2019-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//================================================================================
#include "ClusteringConfigurator.h"
ClusteringConfigurator::ClusteringConfigurator() : OperatorConfiguratorTemplate() {
_operatorName = "clustering";
_baseName = "sensor";
}
ClusteringConfigurator::~ClusteringConfigurator() {}
void ClusteringConfigurator::sensorBase(ClusteringSensorBase& s, CFG_VAL config) {}
void ClusteringConfigurator::operatorAttributes(ClusteringOperator& op, CFG_VAL config) {
BOOST_FOREACH(boost::property_tree::iptree::value_type &val, config)
{
if(boost::iequals(val.first, "window"))
op.setAggregationWindow(stoull(val.second.data()) * 1000000);
else if(boost::iequals(val.first, "inputPath"))
op.setInputPath(val.second.data());
else if(boost::iequals(val.first, "outputPath"))
op.setOutputPath(val.second.data());
else if(boost::iequals(val.first, "numComponents"))
op.setNumComponents(stoull(val.second.data()));
else if(boost::iequals(val.first, "outlierCut"))
op.setOutlierCut(stod(val.second.data()));
else if(boost::iequals(val.first, "reuseModel"))
op.setReuseModel(to_bool(val.second.data()));
}
}
bool ClusteringConfigurator::unit(UnitTemplate<ClusteringSensorBase>& u) {
if(!u.isTopUnit()) {
LOG(error) << " " << _operatorName << ": This operator type only supports hierarchical units!";
return false;
}
if(u.getSubUnits().empty()) {
LOG(error) << " " << _operatorName << ": No sub-units were instantiated!";
return false;
}
for(const auto& su : u.getSubUnits())
if(su->getOutputs().size()!=1) {
LOG(error) << " " << _operatorName << ": Only one output sensor per unit is allowed!";
return false;
}
return true;
}
bool ClusteringConfigurator::readUnits(ClusteringOperator& op, std::vector<ClusteringSBPtr>& protoInputs, std::vector<ClusteringSBPtr>& protoOutputs,
std::vector<ClusteringSBPtr>& protoGlobalOutputs, inputMode_t inputMode) {
if(op.getDuplicate()) {
LOG(warning) << this->_operatorName << " " << op.getName() << ": The units of this operator cannot be duplicated.";
op.setDuplicate(false);
}
shared_ptr<UnitTemplate<ClusteringSensorBase>> un = nullptr;
try {
un = _unitGen.generateHierarchicalUnit(SensorNavigator::rootKey, std::list<std::string>(), protoGlobalOutputs, protoInputs,
protoOutputs, inputMode, op.getMqttPart(), !op.getStreaming(), op.getEnforceTopics(), op.getRelaxed());
}
catch (const std::exception &e) {
LOG(error) << _operatorName << " " << op.getName() << ": Error when creating units: " << e.what();
return false;
}
if (op.getStreaming()) {
if(!constructSensorTopics(*un, op)) {
op.clearUnits();
return false;
}
if (!unit(*un)) {
LOG(error) << " Unit " << un->getName() << " did not pass the final check!";
op.clearUnits();
return false;
} else {
LOG(debug) << " Unit " << un->getName() << " generated.";
op.addUnit(un);
}
} else {
if (unit(*un)) {
op.addToUnitCache(un);
LOG(debug) << " Template unit for on-demand operation " + un->getName() + " generated.";
} else {
LOG(error) << " Template unit " << un->getName() << " did not pass the final check!";
op.clearUnits();
return false;
}
}
return true;
}
//================================================================================
// Name : ClusteringConfigurator.h
// Author : Alessio Netti
// Contact : info@dcdb.it
// Copyright : Leibniz Supercomputing Centre
// Description :
//================================================================================
//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2019-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//================================================================================
#ifndef PROJECT_CLUSTERINGCONFIGURATOR_H
#define PROJECT_CLUSTERINGCONFIGURATOR_H
#include "../../includes/OperatorConfiguratorTemplate.h"
#include "ClusteringOperator.h"
/**
* @brief Configurator for the clustering plugin.
*
* @ingroup clustering
*/
class ClusteringConfigurator : virtual public OperatorConfiguratorTemplate<ClusteringOperator, ClusteringSensorBase> {
public:
ClusteringConfigurator();
~ClusteringConfigurator();
private:
void sensorBase(ClusteringSensorBase& s, CFG_VAL config) override;
void operatorAttributes(ClusteringOperator& op, CFG_VAL config) override;
bool unit(UnitTemplate<ClusteringSensorBase>& u) override;
bool readUnits(ClusteringOperator& op, std::vector<ClusteringSBPtr>& protoInputs, std::vector<ClusteringSBPtr>& protoOutputs,
std::vector<ClusteringSBPtr>& protoGlobalOutputs, inputMode_t inputMode) override;
};
extern "C" OperatorConfiguratorInterface* create() {
return new ClusteringConfigurator;
}
extern "C" void destroy(OperatorConfiguratorInterface* c) {
delete c;
}
#endif //PROJECT_CLUSTERINGCONFIGURATOR_H
//================================================================================
// Name : ClusteringOperator.cpp
// Author : Alessio Netti
// Contact : info@dcdb.it
// Copyright : Leibniz Supercomputing Centre
// Description :
//================================================================================
//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2019-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//================================================================================
#include "ClusteringOperator.h"
ClusteringOperator::ClusteringOperator(const std::string& name) : OperatorTemplate(name) {
_modelIn = "";
_modelOut = "";
_aggregationWindow = 0;
_numComponents = 3;
_outlierCut = 2.0f;
_reuseModel = false;
_trainingPending = true;
_trainingSet = cv::Mat();
_currentfVector = cv::Mat();
}
ClusteringOperator::ClusteringOperator(const ClusteringOperator& other) : OperatorTemplate(other) {
_modelIn = other._modelIn;
_modelOut = "";
_aggregationWindow = other._aggregationWindow;
_numComponents = other._numComponents;
_outlierCut = other._outlierCut;
_reuseModel = other._reuseModel;
_trainingPending = true;
_trainingSet = cv::Mat();
_currentfVector = cv::Mat();
}
ClusteringOperator::~ClusteringOperator() {
_gmm.release();
}
restResponse_t ClusteringOperator::REST(const string& action, const unordered_map<string, string>& queries) {
restResponse_t resp;
if(action=="train") {
resp.response = "Re-training triggered for gaussian mixture model " + this->_name + "!\n";
this->_trainingPending = true;
} else
throw invalid_argument("Unknown plugin action " + action + " requested!");
return resp;
}
void ClusteringOperator::execOnInit() {
bool useDefault=true;
if(_modelIn!="") {
try {
_gmm = cv::ml::EM::load(_modelIn);
if(!_gmm->isTrained() || _units.empty() || _units[0]->getSubUnits().empty() || _units[0]->getSubUnits()[0]->getInputs().size()!=(uint64_t)_gmm->getMeans().size().width)
LOG(error) << "Operator " + _name + ": incompatible model, falling back to default!";
else {
_trainingPending = false;
useDefault = false;
}
} catch(const std::exception& e) {
LOG(error) << "Operator " + _name + ": cannot load model from file, falling back to default!"; }
}
if(useDefault) {
_gmm = cv::ml::EM::create();
_gmm->setClustersNumber(_numComponents);
}
}
void ClusteringOperator::printConfig(LOG_LEVEL ll) {
LOG_VAR(ll) << " Window: " << _aggregationWindow;
LOG_VAR(ll) << " Input Path: " << (_modelIn!="" ? _modelIn : std::string("none"));
LOG_VAR(ll) << " Output Path: " << (_modelOut!="" ? _modelOut : std::string("none"));
LOG_VAR(ll) << " Clusters: " << _numComponents;
LOG_VAR(ll) << " Outlier Cut: " << _outlierCut;
LOG_VAR(ll) << " Reuse Model: " << (_reuseModel ? "enabled" : "disabled");
OperatorTemplate<ClusteringSensorBase>::printConfig(ll);
}
void ClusteringOperator::compute(U_Ptr unit) {
_trainingSet = cv::Mat();
for(const auto& su : unit->getSubUnits()) {
computeFeatureVector(su);
_trainingSet.push_back(_currentfVector);
}
if(!_trainingSet.empty()) {
if (_trainingPending || !_reuseModel) {
if(_gmm.empty())
throw std::runtime_error("Operator " + _name + ": cannot perform training, missing model!");
if(!_gmm->trainEM(_trainingSet))
throw std::runtime_error("Operator " + _name + ": model training failed!");
_trainingPending = false;
LOG(debug) << "Operator " + _name + ": model training performed.";
if(_modelOut!="") {
try {
_gmm->save(_modelOut);
} catch(const std::exception& e) {
LOG(error) << "Operator " + _name + ": cannot save the model to a file!"; }
}
}
if(_gmm.empty() || !_gmm->isTrained())
throw std::runtime_error("Operator " + _name + ": cannot perform prediction, the model is untrained!");
std::vector<std::shared_ptr<UnitTemplate<ClusteringSensorBase>>> subUnits = unit->getSubUnits();
cv::Vec2d res;
reading_t predict;
predict.timestamp = getTimestamp();
for(unsigned int idx=0; idx<subUnits.size(); idx++) {
res = _gmm->predict2(_currentfVector, cv::noArray());
predict.value = (int64_t)res[1];
subUnits[idx]->getOutputs()[0]->storeReading(predict);
}
}
}
void ClusteringOperator::computeFeatureVector(U_Ptr unit) {
_currentfVector = cv::Mat(1, unit->getInputs().size(), CV_32F);
std::vector<ClusteringSBPtr>& inputs = unit->getInputs();
for(size_t idx=0; idx<inputs.size(); idx++) {
_mean=0;
_buffer.clear();
if(!_queryEngine.querySensor(inputs[idx]->getName(), _aggregationWindow, 0, _buffer) || _buffer.empty())
throw std::runtime_error("Operator " + _name + ": cannot read from sensor " + inputs[idx]->getName() + "!");
// Computing MEAN
for(const auto& v : _buffer)
_mean += v.value;
_mean /= _buffer.size();
// Casting and storing the statistical features
_currentfVector.at<float>(idx) = (float)_mean;
}
}
//================================================================================
// Name : ClusteringOperator.h
// Author : Alessio Netti
// Contact : info@dcdb.it
// Copyright : Leibniz Supercomputing Centre
// Description :
//================================================================================
//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2019-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//================================================================================
#ifndef PROJECT_CLUSTERINGOPERATOR_H
#define PROJECT_CLUSTERINGOPERATOR_H
#include "../../includes/OperatorTemplate.h"
#include "ClusteringSensorBase.h"
#include "opencv4/opencv2/core/mat.hpp"
#include "opencv4/opencv2/core/cvstd.hpp"
#include "opencv4/opencv2/ml.hpp"
#include <math.h>
#include <random>
#define OUTLIER_ID 1000
/**
* @brief Clustering operator plugin.
*
* @ingroup clustering
*/
class ClusteringOperator : virtual public OperatorTemplate<ClusteringSensorBase> {
public:
ClusteringOperator(const std::string& name);
ClusteringOperator(const ClusteringOperator& other);
virtual ~ClusteringOperator();
virtual restResponse_t REST(const string& action, const unordered_map<string, string>& queries) override;
virtual void execOnInit() override;
void setInputPath(std::string in) { _modelIn = in; }
void setOutputPath(std::string out) { _modelOut = out; }
void setAggregationWindow(unsigned long long a) { _aggregationWindow = a; }
void setNumComponents(unsigned long long n) { _numComponents = n; }
void setOutlierCut(float s) { _outlierCut = s; }
void setReuseModel(bool r) { _reuseModel = r; }
void triggerTraining() { _trainingPending = true; }
std::string getInputPath() { return _modelIn;}
std::string getOutputPath() { return _modelOut; }
unsigned long long getAggregationWindow() { return _aggregationWindow; }
unsigned long long getNumComponents() { return _numComponents; }
float getOutlierCut() { return _outlierCut; }
bool getReuseModel() { return _reuseModel; }
void printConfig(LOG_LEVEL ll) override;
protected:
virtual void compute(U_Ptr unit) override;
void computeFeatureVector(U_Ptr unit);
std::string _modelOut;
std::string _modelIn;
unsigned long long _aggregationWindow;
unsigned long long _numComponents;
float _outlierCut;
bool _reuseModel;
bool _trainingPending;
vector<reading_t> _buffer;
cv::Ptr<cv::ml::EM> _gmm;
cv::Mat _trainingSet;
cv::Mat _currentfVector;
// Misc buffers
int64_t _mean;
};
#endif //PROJECT_CLUSTERINGOPERATOR_H
//================================================================================
// Name : ClusteringSensorBase.h
// Author : Alessio Netti
// Contact : info@dcdb.it
// Copyright : Leibniz Supercomputing Centre
// Description :
//================================================================================
//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2019-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//================================================================================
/**
* @defgroup clustering Clustering operator plugin.
* @ingroup operator
*
* @brief Clustering operator plugin.
*/
#ifndef PROJECT_CLUSTERINGSENSORBASE_H
#define PROJECT_CLUSTERINGSENSORBASE_H
#include "sensorbase.h"
/**
* @brief Sensor base for clustering plugin
*
* @ingroup clustering
*/
class ClusteringSensorBase : public SensorBase {
public:
// Constructor and destructor
ClusteringSensorBase(const std::string& name) : SensorBase(name) {}
// Copy constructor
ClusteringSensorBase(ClusteringSensorBase& other) : SensorBase(other) {}
virtual ~ClusteringSensorBase() {}
void printConfig(LOG_LEVEL ll, LOGGER& lg, unsigned leadingSpaces=16) {
SensorBase::printConfig(ll, lg, leadingSpaces);
std::string leading(leadingSpaces, ' ');
}
};
using ClusteringSBPtr = std::shared_ptr<ClusteringSensorBase>;
#endif //PROJECT_CLUSTERINGSENSORBASE_H
......@@ -6,7 +6,7 @@ DCDBDEPLOYPATH ?= $(DCDBBASEPATH)/install
PLUGINS = procfs pdu sysfs ipmi bacnet snmp gpfsmon msr tester
# data analytics plugins to be built
OPERATORS = aggregator smoothing regressor job_aggregator testeroperator filesink smucngperf persystsql
OPERATORS = aggregator smoothing regressor clustering job_aggregator testeroperator filesink smucngperf persystsql
DEFAULT_VERSION = 0.3
GIT_VERSION = $(shell git describe --tags 2>/dev/null|sed 's/-\([0-9]*\)/.\1/')
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment