Commit 3c197a34 authored by Alessio Netti's avatar Alessio Netti
Browse files

Merge remote-tracking branch 'remotes/origin/signatures-plugin' into development

parents ed699732 29a13d11
......@@ -57,6 +57,9 @@ libdcdboperator_classifier.$(LIBEXT): operators/regressor/RegressorOperator.o op
libdcdboperator_clustering.$(LIBEXT): operators/clustering/ClusteringOperator.o operators/clustering/ClusteringConfigurator.o ../common/src/sensornavigator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex -lopencv_core -lopencv_ml
libdcdboperator_cssignatures.$(LIBEXT): operators/cssignatures/CSOperator.o operators/cssignatures/CSConfigurator.o ../common/src/sensornavigator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex -lopencv_core
libdcdboperator_job_aggregator.$(LIBEXT): operators/aggregator/AggregatorOperator.o operators/aggregator/JobAggregatorOperator.o operators/aggregator/JobAggregatorConfigurator.o ../common/src/sensornavigator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex
......
......@@ -21,10 +21,12 @@
3. [Plugins](#plugins)
1. [Aggregator Plugin](#averagePlugin)
2. [Job Aggregator Plugin](#jobaveragePlugin)
3. [Regressor Plugin](#regressorPlugin)
4. [Classifier Plugin](#classifierPlugin)
5. [Clustering Plugin](#clusteringPlugin)
6. [Tester Plugin](#testerPlugin)
3. [Smoothing Plugin](#smoothingPlugin)
4. [Regressor Plugin](#regressorPlugin)
5. [Classifier Plugin](#classifierPlugin)
6. [Clustering Plugin](#clusteringPlugin)
6. [CS Signatures Plugin](#csPlugin)
7. [Tester Plugin](#testerPlugin)
4. [Sink Plugins](#sinkplugins)
1. [File Sink Plugin](#filesinkPlugin)
2. [Writing Plugins](#writingPlugins)
......@@ -904,6 +906,33 @@ However, it supports the following additional REST API actions:
| means | Returns the means of the generated gaussian components in the trained mixture model.
| covs | Returns the covariance matrices of the generated gaussian components in the trained mixture model.
## CS Signatures Plugin <a name="csPlugin"></a>
The _CS Signatures_ plugin computes signatures from sensor data as described in the paper _"Correlation-wise Smoothing: Lightweight Knowledge Extraction for HPC Monitoring Data"_ by Netti et al. The signatures aggregate data both in time and across sensors, and are composed by a specified number of complex-valued _blocks_.
Each of the blocks is then stored in two separate sensors, which contain respectively the real and imaginary part of the block. Like in the _Regressor_ and _Classifier_ plugins, the CS algorithm is trained using a specified number of samples, which are accumulated in memory, subsequently learning the correlations between sensors.
Operators in this plugin support the following configuration parameters:
| Value | Explanation |
|:----- |:----------- |
| window | Length in milliseconds of the time window that is used to retrieve recent readings for the input sensors, starting from the latest one, that are then aggregated in the signatures.
| trainingSamples | Number of samples for the sensors that are to be used to train the CS algorithm.
| numBlocks | Desired number of blocks in the signatures.
| inputPath | Path of a file in which the order of the sensors and their upper/lower bounds are stored.
| outputPath | Path of a file to which the order of the sensors and their upper/lower bounds must be saved.
Additionally, the output sensors of the CS Signatures plugin support the following parameters:
| Value | Explanation |
|:----- |:----------- |
| imag | Boolean value. Specifies whether the sensor should store the imaginary or real part of a block.
The output sensors are automatically duplicated according to the specified number of blocks, and a numerical identifier is appended to their MQTT topics. If no sensor with the _imag_ parameter set to true is specified, the signatures will contain only their real parts.
Finally, the plugin supports the following REST API actions:
| Action | Explanation |
|:----- |:----------- |
| train | Triggers a new training phase for the CS algorithm. For practical reasons, only the sensor data from the first unit of the operator is used for training.
## Tester Plugin <a name="testerPlugin"></a>
The _Tester_ plugin can be used to test the functionality and performance of the query engine, as well as of the Unit System. It will perform a specified number of queries over the set of input sensors for each unit, and then output as a sensor the total number of retrieved readings. The following are the configuration parameters for operators in the _Tester_ plugin:
......
template_signature def1 {
interval 1000
minValues 1
streaming true
window 30000
}
signature sig1 {
default def1
numBlocks 20
input {
; Supposing that we target a compute node, we pick all of its available sensors
all
}
output {
; There are at most two outputs, one for the real blocks and one for the imag blocks
; These will be duplicated automatically according to the desired number of blocks
sensor "<bottomup 1>cs-sig-real" {
mqttsuffix /cs-sig-real
}
sensor "<bottomup 1>cs-sig-imag" {
mqttsuffix /cs-sig-imag
imag true
}
}
}
//================================================================================
// Name : CSConfigurator.cpp
// Author : Alessio Netti
// Contact : info@dcdb.it
// Copyright : Leibniz Supercomputing Centre
// Description :
//================================================================================
//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2019-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//================================================================================
#include "CSConfigurator.h"
CSConfigurator::CSConfigurator() : OperatorConfiguratorTemplate() {
_operatorName = "signature";
_baseName = "sensor";
}
CSConfigurator::~CSConfigurator() {}
void CSConfigurator::sensorBase(CSSensorBase& s, CFG_VAL config) {
BOOST_FOREACH(boost::property_tree::iptree::value_type &val, config)
{
if (boost::iequals(val.first, "imag"))
s.setImag(to_bool(val.second.data()));
}
}
void CSConfigurator::operatorAttributes(CSOperator& op, CFG_VAL config) {
BOOST_FOREACH(boost::property_tree::iptree::value_type &val, config)
{
if(boost::iequals(val.first, "window"))
op.setAggregationWindow(stoull(val.second.data()) * 1000000);
else if(boost::iequals(val.first, "inputPath"))
op.setInputPath(val.second.data());
else if(boost::iequals(val.first, "outputPath"))
op.setOutputPath(val.second.data());
else if(boost::iequals(val.first, "reuseModel"))
op.setReuseModel(to_bool(val.second.data()));
else if(boost::iequals(val.first, "numBlocks"))
op.setNumBlocks(stoull(val.second.data()));
else if(boost::iequals(val.first, "trainingSamples"))
op.setTrainingSamples(stoull(val.second.data()));
}
}
bool CSConfigurator::unit(UnitTemplate<CSSensorBase>& u) {
if(u.isTopUnit()) {
LOG(error) << " " << _operatorName << ": This operator type only supports flat units!";
return false;
}
if(u.getOutputs().empty()) {
LOG(error) << " " << _operatorName << ": At least one output sensor per unit must be defined!";
return false;
}
return true;
}
bool CSConfigurator::readUnits(CSOperator& op, std::vector<CSSBPtr>& protoInputs, std::vector<CSSBPtr>& protoOutputs,
std::vector<CSSBPtr>& protoGlobalOutputs, inputMode_t inputMode) {
vector <shared_ptr<UnitTemplate<CSSensorBase>>> *units = NULL;
if(protoInputs.empty()) {
LOG(error) << this->_operatorName << " " << op.getName() << ": No input sensors specified!";
return false;
}
if(protoOutputs.empty() || !protoGlobalOutputs.empty() || protoOutputs.size() > 2) {
LOG(error) << this->_operatorName << " " << op.getName() << ": Units must be flat with at most two output sensors!";
return false;
}
bool realDone=false, imagDone=false;
vector<CSSBPtr> trueOutputs;
// Duplicating sensors according to the number of blocks
for (const auto &s : protoOutputs) {
if(!s->getImag() && !realDone) {
realDone = true;
} else if(s->getImag() && !imagDone) {
imagDone = true;
} else {
continue;
}
for(size_t i=0; i<op.getNumBlocks(); i++) {
auto outS = std::make_shared<CSSensorBase>(*s);
outS->setMqtt(outS->getMqtt() + std::to_string(i));
outS->setName(outS->getName() + std::to_string(i));
trueOutputs.push_back(outS);
}
}
// Replacing sensors
protoOutputs.clear();
protoOutputs = trueOutputs;
trueOutputs.clear();
try {
units = _unitGen.generateAutoUnit(SensorNavigator::rootKey, std::list<std::string>(), protoGlobalOutputs, protoInputs,
protoOutputs, inputMode, op.getMqttPart(), !op.getStreaming(), op.getEnforceTopics(), op.getRelaxed());
}
catch (const std::exception &e) {
LOG(error) << _operatorName << " " << op.getName() << ": Error when creating units: " << e.what();
delete units;
return false;
}
for (auto &u: *units) {
if (op.getStreaming()) {
if(!constructSensorTopics(*u, op)) {
op.clearUnits();
delete units;
return false;
}
if (!unit(*u)) {
LOG(error) << " Unit " << u->getName() << " did not pass the final check!";
op.clearUnits();
delete units;
return false;
} else {
LOG(debug) << " Unit " << u->getName() << " generated.";
op.addUnit(u);
}
} else {
if (unit(*u)) {
op.addToUnitCache(u);
LOG(debug) << " Template unit for on-demand operation " + u->getName() + " generated.";
} else {
LOG(error) << " Template unit " << u->getName() << " did not pass the final check!";
op.clearUnits();
delete units;
return false;
}
}
}
delete units;
return true;
}
//================================================================================
// Name : CSConfigurator.h
// Author : Alessio Netti
// Contact : info@dcdb.it
// Copyright : Leibniz Supercomputing Centre
// Description :
//================================================================================
//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2019-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//================================================================================
#ifndef PROJECT_CSCONFIGURATOR_H
#define PROJECT_CSCONFIGURATOR_H
#include "../../includes/OperatorConfiguratorTemplate.h"
#include "CSOperator.h"
/**
* @brief Configurator for the CS Smoothing plugin.
*
* @ingroup cs-smoothing
*/
class CSConfigurator : virtual public OperatorConfiguratorTemplate<CSOperator, CSSensorBase> {
public:
CSConfigurator();
virtual ~CSConfigurator();
private:
void sensorBase(CSSensorBase& s, CFG_VAL config) override;
void operatorAttributes(CSOperator& op, CFG_VAL config) override;
bool unit(UnitTemplate<CSSensorBase>& u) override;
bool readUnits(CSOperator& op, std::vector<CSSBPtr>& protoInputs, std::vector<CSSBPtr>& protoOutputs,
std::vector<CSSBPtr>& protoGlobalOutputs, inputMode_t inputMode) override;
};
extern "C" OperatorConfiguratorInterface* create() {
return new CSConfigurator;
}
extern "C" void destroy(OperatorConfiguratorInterface* c) {
delete c;
}
#endif //PROJECT_CSCONFIGURATOR_H
//================================================================================
// Name : CSOperator.cpp
// Author : Alessio Netti
// Contact : info@dcdb.it
// Copyright : Leibniz Supercomputing Centre
// Description :
//================================================================================
//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2019-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//================================================================================
#include "CSOperator.h"
CSOperator::CSOperator(const std::string& name) : OperatorTemplate(name) {
_modelIn = "";
_modelOut = "";
_aggregationWindow = 0;
_trainingSamples = 10000;
_numBlocks = 20;
_reuseModel = true;
_trainingPending = true;
}
CSOperator::CSOperator(const CSOperator& other) : OperatorTemplate(other) {
_modelIn = other._modelIn;
_modelOut = "";
_aggregationWindow = other._aggregationWindow;
_trainingSamples = other._trainingSamples;
_numBlocks = other._numBlocks;
_reuseModel = other._reuseModel;
_trainingPending = true;
}
CSOperator::~CSOperator() {}
restResponse_t CSOperator::REST(const string& action, const unordered_map<string, string>& queries) {
restResponse_t resp;
if(action=="train") {
resp.response = "Re-training triggered for CS Signatures operator " + this->_name + "!\n";
this->_trainingPending = true;
} else
throw invalid_argument("Unknown plugin action " + action + " requested!");
return resp;
}
void CSOperator::printConfig(LOG_LEVEL ll) {
LOG_VAR(ll) << " Window: " << _aggregationWindow;
LOG_VAR(ll) << " Input Path: " << (_modelIn!="" ? _modelIn : std::string("none"));
LOG_VAR(ll) << " Output Path: " << (_modelOut!="" ? _modelOut : std::string("none"));
LOG_VAR(ll) << " Blocks: " << _numBlocks;
LOG_VAR(ll) << " Training Sample: " << _trainingSamples;
LOG_VAR(ll) << " Reuse Model: " << (_reuseModel ? "enabled" : "disabled");
OperatorTemplate<CSSensorBase>::printConfig(ll);
}
void CSOperator::execOnInit() {
bool useDefault=true;
if(_modelIn!="") {
try {
if(!readFromFile(_modelIn))
LOG(error) << "Operator " + _name + ": incompatible CS data, falling back to default!";
else {
_trainingPending = false;
useDefault = false;
}
} catch(const std::exception& e) {
LOG(error) << "Operator " + _name + ": cannot load CS data from file, falling back to default!"; }
}
if(useDefault) {
_trainingPending = true;
_max.clear();
_min.clear();
_permVector.clear();
}
_trainingData.clear();
_trainingUnit = "";
}
void CSOperator::compute(U_Ptr unit) {
uint64_t nowTs = getTimestamp();
// Training-related tasks
if(_trainingPending && _streaming && (_trainingUnit==unit->getName() || _trainingUnit=="")) {
_trainingUnit = unit->getName();
// Fetching sensor data
if(_trainingData.empty())
_trainingData.resize(unit->getInputs().size());
for(size_t idx=0; idx<unit->getInputs().size(); idx++)
accumulateData(_trainingData, unit->getInputs()[idx], idx, nowTs);
// Performing training once enough samples are obtained
if(!_trainingData.empty() && _trainingData[0].size() >= _trainingSamples) {
computeMinMax(_trainingData);
computePermutation(_trainingData);
_actualBlocks = _trainingData.size() < _numBlocks ? _trainingData.size() : _numBlocks;
_blockLen = (float)_trainingData.size() / (float)_actualBlocks;
_trainingData.clear();
_trainingUnit = "";
_trainingPending = false;
}
}
// If the operator is in an invalid state
if(_permVector.empty() && !(_trainingPending && _streaming)) {
throw std::runtime_error("Operator " + _name + ": cannot compute signatures, no CS data available!");
// If an unit has an unexpected number of input sensors
} else if(!_permVector.empty() && _permVector.size()!=unit->getInputs().size()) {
throw std::runtime_error("Operator " + _name + ": unit " + unit->getName() + " has an anomalous number of inputs!");
}
if(!_permVector.empty()) {
computeSignature(unit, nowTs);
}
}
// -------------------------------------- INPUT / OUTPUT --------------------------------------
bool CSOperator::dumpToFile(std::string &path) {
boost::property_tree::ptree root, blocks;
std::ostringstream data;
if(_trainingPending || _permVector.empty())
return false;
// In JSON mode, sensors are arranged hierarchically by plugin->operator->sensor
for(size_t idx=0; idx<_numBlocks; idx++) {
boost::property_tree::ptree group;
group.push_back(boost::property_tree::ptree::value_type("idx", std::to_string(_permVector[idx])));
group.push_back(boost::property_tree::ptree::value_type("min", std::to_string(_min[idx])));
group.push_back(boost::property_tree::ptree::value_type("max", std::to_string(_max[idx])));
blocks.add_child(std::to_string(idx), group);
}
root.add_child(std::to_string(_numBlocks), blocks);
try {
boost::property_tree::write_json(data, root, true);
std::ofstream outFile(path);
if(!outFile.is_open())
return false;
else {
outFile << data.str();
outFile.close();
}
} catch(const std::exception &e) { return false; }
return true;
}
bool CSOperator::readFromFile(std::string &path) {
boost::property_tree::iptree config;
try {
boost::property_tree::read_json(path, config);
} catch(const std::exception &e) { return false; }
// The root JSON node encodes the number of blocks and has to match with that of the operator
std::string blockString = std::to_string(_numBlocks);
if(config.find(blockString) == config.not_found())
return false;
std::vector<size_t> newPermVector(_numBlocks);
std::vector<int64_t> newMin(_numBlocks);
std::vector<int64_t> newMax(_numBlocks);
BOOST_FOREACH(boost::property_tree::iptree::value_type &val, config.get_child(blockString)) {
size_t blockID = std::stoull(val.first);
boost::property_tree::iptree &blk = val.second;
if(blk.find("idx")==blk.not_found() || blk.find("min")==blk.not_found() || blk.find("max")==blk.not_found())
return false;
BOOST_FOREACH(boost::property_tree::iptree::value_type &val2, blk) {
if (boost::iequals(val2.first, "idx")) {
newPermVector[blockID] = std::stoull(val2.second.data());
} else if (boost::iequals(val2.first, "min")) {
newMin[blockID] = std::stoll(val2.second.data());
} else if (boost::iequals(val2.first, "max")) {
newMax[blockID] = std::stoll(val2.second.data());
}
}
}
// Replacing the operator's CS data
_permVector = newPermVector;
_min = newMin;
_max = newMax;
return true;
}
// -------------------------------------- MODEL TRAINING --------------------------------------
// Accumulates sensor data in-memory for later training
void CSOperator::accumulateData(std::vector<std::vector<reading_t>>& v, CSSBPtr s, size_t idx, uint64_t nowTs) {
// We query all new data for the sensor since the last one - we want a clean time series
uint64_t endTs = nowTs;
uint64_t startTs = v[idx].empty() ? endTs - _aggregationWindow : v[idx].back().timestamp;
_buffer.clear();
// This query might possibly fail very often, depending on the batching of sensors
if(!_queryEngine.querySensor(s->getName(), startTs, endTs, _buffer, false))
return;
// We add the queried values only if they are actually "new"
if(!_buffer.empty() && _buffer[0].timestamp>v[idx].back().timestamp)
v[idx].insert(v[idx].end(), _buffer.begin(), _buffer.end());
}
// Applies the sorting stage of the CS method and finds a permutation vector
void CSOperator::computePermutation(std::vector<std::vector<reading_t>>& v) {
// Each column of the matrix will be an interpolated sensor
cv::Mat sensorMatrix = cv::Mat(_trainingSamples, v.size(), CV_64F);
// Evaluation parameters post-interpolation
double startEval=v[0].front().timestamp;
double stepEval=(v[0].back().timestamp - v[0].front().timestamp) / _trainingSamples;
double startInterp, stepInterp;
for(size_t idx=0; idx<v.size(); idx++) {
std::vector<reading_t>& vals = v[idx];
startInterp = startEval - vals.front().timestamp;
stepInterp = (vals.back().timestamp - vals.front().timestamp) / vals.size();
// Copying element by element into a temporary vector - ugly and slow
std::vector<double> sValues(vals.size());
for(size_t idx2=0; idx2<vals.size(); idx2++)
sValues[idx2] = (double)vals[idx2].value;
// Spline interpolation
boost::math::cubic_b_spline<double> spline(sValues.begin(), sValues.end(), startInterp, stepInterp);
// Evaluating in the interpolated points and storing in the matrix
for(size_t idx2=0; idx2<_trainingSamples; idx2++)
sensorMatrix.at<double>(idx, idx2) = spline(stepEval*idx2);
}
// Calculating covariance matrix
cv::Mat covMatrix, meanMatrix;
cv::calcCovarMatrix(sensorMatrix, covMatrix, meanMatrix, cv::COVAR_COLS + cv::COVAR_SCALE + cv::COVAR_NORMAL, CV_64F);
sensorMatrix.release();
// Transforming the matrix
convertToCorrelation(covMatrix);
// Initial set of available sensors
std::set<size_t> availSet;
for(size_t idx=0; idx<v.size(); idx++)
availSet.insert(idx);
_permVector.clear();
double corrMax = -1000.0;
double corrCoef = 0.0;
size_t corrIdx = 0;
for(size_t idx=0; idx<v.size(); idx++) {
if (covMatrix.at<double>(idx, idx) > corrMax) {
corrMax = covMatrix.at<double>(idx, idx);
corrIdx = idx;
}
}
_permVector.push_back(corrIdx);
availSet.erase(corrIdx);
// Correlation-based sorting
while(!availSet.empty()) {
corrMax = -1000;
corrIdx = 0;
for(const auto& avId : availSet) {
corrCoef = covMatrix.at<double>(avId, avId) * covMatrix.at<double>(_permVector.back(), avId);
if(corrCoef > corrMax) {