Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

Commit 29a13d11 authored by Alessio Netti's avatar Alessio Netti
Browse files

Analytics: added signatures plugin

parent 16a940c8
......@@ -57,6 +57,9 @@ libdcdboperator_classifier.$(LIBEXT): operators/regressor/RegressorOperator.o op
libdcdboperator_clustering.$(LIBEXT): operators/clustering/ClusteringOperator.o operators/clustering/ClusteringConfigurator.o ../common/src/sensornavigator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex -lopencv_core -lopencv_ml
libdcdboperator_cssignatures.$(LIBEXT): operators/cssignatures/CSOperator.o operators/cssignatures/CSConfigurator.o ../common/src/sensornavigator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex -lopencv_core
libdcdboperator_job_aggregator.$(LIBEXT): operators/aggregator/AggregatorOperator.o operators/aggregator/JobAggregatorOperator.o operators/aggregator/JobAggregatorConfigurator.o ../common/src/sensornavigator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex
......
......@@ -21,10 +21,12 @@
3. [Plugins](#plugins)
1. [Aggregator Plugin](#averagePlugin)
2. [Job Aggregator Plugin](#jobaveragePlugin)
3. [Regressor Plugin](#regressorPlugin)
4. [Classifier Plugin](#classifierPlugin)
5. [Clustering Plugin](#clusteringPlugin)
6. [Tester Plugin](#testerPlugin)
3. [Smoothing Plugin](#smoothingPlugin)
4. [Regressor Plugin](#regressorPlugin)
5. [Classifier Plugin](#classifierPlugin)
6. [Clustering Plugin](#clusteringPlugin)
6. [CS Signatures Plugin](#csPlugin)
7. [Tester Plugin](#testerPlugin)
4. [Sink Plugins](#sinkplugins)
1. [File Sink Plugin](#filesinkPlugin)
2. [Writing Plugins](#writingPlugins)
......@@ -904,6 +906,33 @@ However, it supports the following additional REST API actions:
| means | Returns the means of the generated gaussian components in the trained mixture model.
| covs | Returns the covariance matrices of the generated gaussian components in the trained mixture model.
## CS Signatures Plugin <a name="csPlugin"></a>
The _CS Signatures_ plugin computes signatures from sensor data as described in the paper _"Correlation-wise Smoothing: Lightweight Knowledge Extraction for HPC Monitoring Data"_ by Netti et al. The signatures aggregate data both in time and across sensors, and are composed by a specified number of complex-valued _blocks_.
Each of the blocks is then stored in two separate sensors, which contain respectively the real and imaginary part of the block. Like in the _Regressor_ and _Classifier_ plugins, the CS algorithm is trained using a specified number of samples, which are accumulated in memory, subsequently learning the correlations between sensors.
Operators in this plugin support the following configuration parameters:
| Value | Explanation |
|:----- |:----------- |
| window | Length in milliseconds of the time window that is used to retrieve recent readings for the input sensors, starting from the latest one, that are then aggregated in the signatures.
| trainingSamples | Number of samples for the sensors that are to be used to train the CS algorithm.
| numBlocks | Desired number of blocks in the signatures.
| inputPath | Path of a file in which the order of the sensors and their upper/lower bounds are stored.
| outputPath | Path of a file to which the order of the sensors and their upper/lower bounds must be saved.
Additionally, the output sensors of the CS Signatures plugin support the following parameters:
| Value | Explanation |
|:----- |:----------- |
| imag | Boolean value. Specifies whether the sensor should store the imaginary or real part of a block.
The output sensors are automatically duplicated according to the specified number of blocks, and a numerical identifier is appended to their MQTT topics. If no sensor with the _imag_ parameter set to true is specified, the signatures will contain only their real parts.
Finally, the plugin supports the following REST API actions:
| Action | Explanation |
|:----- |:----------- |
| train | Triggers a new training phase for the CS algorithm. For practical reasons, only the sensor data from the first unit of the operator is used for training.
## Tester Plugin <a name="testerPlugin"></a>
The _Tester_ plugin can be used to test the functionality and performance of the query engine, as well as of the Unit System. It will perform a specified number of queries over the set of input sensors for each unit, and then output as a sensor the total number of retrieved readings. The following are the configuration parameters for operators in the _Tester_ plugin:
......
template_signature def1 {
interval 1000
minValues 1
streaming true
window 30000
}
signature sig1 {
default def1
numBlocks 20
input {
; Supposing that we target a compute node, we pick all of its available sensors
all
}
output {
; There are at most two outputs, one for the real blocks and one for the imag blocks
; These will be duplicated automatically according to the desired number of blocks
sensor "<bottomup 1>cs-sig-real" {
mqttsuffix /cs-sig-real
}
sensor "<bottomup 1>cs-sig-imag" {
mqttsuffix /cs-sig-imag
imag true
}
}
}
//================================================================================
// Name : CSConfigurator.cpp
// Author : Alessio Netti
// Contact : info@dcdb.it
// Copyright : Leibniz Supercomputing Centre
// Description :
//================================================================================
//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2019-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//================================================================================
#include "CSConfigurator.h"
CSConfigurator::CSConfigurator() : OperatorConfiguratorTemplate() {
_operatorName = "signature";
_baseName = "sensor";
}
CSConfigurator::~CSConfigurator() {}
void CSConfigurator::sensorBase(CSSensorBase& s, CFG_VAL config) {
BOOST_FOREACH(boost::property_tree::iptree::value_type &val, config)
{
if (boost::iequals(val.first, "imag"))
s.setImag(to_bool(val.second.data()));
}
}
void CSConfigurator::operatorAttributes(CSOperator& op, CFG_VAL config) {
BOOST_FOREACH(boost::property_tree::iptree::value_type &val, config)
{
if(boost::iequals(val.first, "window"))
op.setAggregationWindow(stoull(val.second.data()) * 1000000);
else if(boost::iequals(val.first, "inputPath"))
op.setInputPath(val.second.data());
else if(boost::iequals(val.first, "outputPath"))
op.setOutputPath(val.second.data());
else if(boost::iequals(val.first, "reuseModel"))
op.setReuseModel(to_bool(val.second.data()));
else if(boost::iequals(val.first, "numBlocks"))
op.setNumBlocks(stoull(val.second.data()));
else if(boost::iequals(val.first, "trainingSamples"))
op.setTrainingSamples(stoull(val.second.data()));
}
}
bool CSConfigurator::unit(UnitTemplate<CSSensorBase>& u) {
if(u.isTopUnit()) {
LOG(error) << " " << _operatorName << ": This operator type only supports flat units!";
return false;
}
if(u.getOutputs().empty()) {
LOG(error) << " " << _operatorName << ": At least one output sensor per unit must be defined!";
return false;
}
return true;
}
bool CSConfigurator::readUnits(CSOperator& op, std::vector<CSSBPtr>& protoInputs, std::vector<CSSBPtr>& protoOutputs,
std::vector<CSSBPtr>& protoGlobalOutputs, inputMode_t inputMode) {
vector <shared_ptr<UnitTemplate<CSSensorBase>>> *units = NULL;
if(protoInputs.empty()) {
LOG(error) << this->_operatorName << " " << op.getName() << ": No input sensors specified!";
return false;
}
if(protoOutputs.empty() || !protoGlobalOutputs.empty() || protoOutputs.size() > 2) {
LOG(error) << this->_operatorName << " " << op.getName() << ": Units must be flat with at most two output sensors!";
return false;
}
bool realDone=false, imagDone=false;
vector<CSSBPtr> trueOutputs;
// Duplicating sensors according to the number of blocks
for (const auto &s : protoOutputs) {
if(!s->getImag() && !realDone) {
realDone = true;
} else if(s->getImag() && !imagDone) {
imagDone = true;
} else {
continue;
}
for(size_t i=0; i<op.getNumBlocks(); i++) {
auto outS = std::make_shared<CSSensorBase>(*s);
outS->setMqtt(outS->getMqtt() + std::to_string(i));
outS->setName(outS->getName() + std::to_string(i));
trueOutputs.push_back(outS);
}
}
// Replacing sensors
protoOutputs.clear();
protoOutputs = trueOutputs;
trueOutputs.clear();
try {
units = _unitGen.generateAutoUnit(SensorNavigator::rootKey, std::list<std::string>(), protoGlobalOutputs, protoInputs,
protoOutputs, inputMode, op.getMqttPart(), !op.getStreaming(), op.getEnforceTopics(), op.getRelaxed());
}
catch (const std::exception &e) {
LOG(error) << _operatorName << " " << op.getName() << ": Error when creating units: " << e.what();
delete units;
return false;
}
for (auto &u: *units) {
if (op.getStreaming()) {
if(!constructSensorTopics(*u, op)) {
op.clearUnits();
delete units;
return false;
}
if (!unit(*u)) {
LOG(error) << " Unit " << u->getName() << " did not pass the final check!";
op.clearUnits();
delete units;
return false;
} else {
LOG(debug) << " Unit " << u->getName() << " generated.";
op.addUnit(u);
}
} else {
if (unit(*u)) {
op.addToUnitCache(u);
LOG(debug) << " Template unit for on-demand operation " + u->getName() + " generated.";
} else {
LOG(error) << " Template unit " << u->getName() << " did not pass the final check!";
op.clearUnits();
delete units;
return false;
}
}
}
delete units;
return true;
}
//================================================================================
// Name : CSConfigurator.h
// Author : Alessio Netti
// Contact : info@dcdb.it
// Copyright : Leibniz Supercomputing Centre
// Description :
//================================================================================
//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2019-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//================================================================================
#ifndef PROJECT_CSCONFIGURATOR_H
#define PROJECT_CSCONFIGURATOR_H
#include "../../includes/OperatorConfiguratorTemplate.h"
#include "CSOperator.h"
/**
* @brief Configurator for the CS Smoothing plugin.
*
* @ingroup cs-smoothing
*/
class CSConfigurator : virtual public OperatorConfiguratorTemplate<CSOperator, CSSensorBase> {
public:
CSConfigurator();
virtual ~CSConfigurator();
private:
void sensorBase(CSSensorBase& s, CFG_VAL config) override;
void operatorAttributes(CSOperator& op, CFG_VAL config) override;
bool unit(UnitTemplate<CSSensorBase>& u) override;
bool readUnits(CSOperator& op, std::vector<CSSBPtr>& protoInputs, std::vector<CSSBPtr>& protoOutputs,
std::vector<CSSBPtr>& protoGlobalOutputs, inputMode_t inputMode) override;
};
extern "C" OperatorConfiguratorInterface* create() {
return new CSConfigurator;
}
extern "C" void destroy(OperatorConfiguratorInterface* c) {
delete c;
}
#endif //PROJECT_CSCONFIGURATOR_H
//================================================================================
// Name : CSOperator.cpp
// Author : Alessio Netti
// Contact : info@dcdb.it
// Copyright : Leibniz Supercomputing Centre
// Description :
//================================================================================
//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2019-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//================================================================================
#include "CSOperator.h"
CSOperator::CSOperator(const std::string& name) : OperatorTemplate(name) {
_modelIn = "";
_modelOut = "";
_aggregationWindow = 0;
_trainingSamples = 10000;
_numBlocks = 20;
_reuseModel = true;
_trainingPending = true;
}
CSOperator::CSOperator(const CSOperator& other) : OperatorTemplate(other) {
_modelIn = other._modelIn;
_modelOut = "";
_aggregationWindow = other._aggregationWindow;
_trainingSamples = other._trainingSamples;
_numBlocks = other._numBlocks;
_reuseModel = other._reuseModel;
_trainingPending = true;
}
CSOperator::~CSOperator() {}
restResponse_t CSOperator::REST(const string& action, const unordered_map<string, string>& queries) {
restResponse_t resp;
if(action=="train") {
resp.response = "Re-training triggered for CS Signatures operator " + this->_name + "!\n";
this->_trainingPending = true;
} else
throw invalid_argument("Unknown plugin action " + action + " requested!");
return resp;
}
void CSOperator::printConfig(LOG_LEVEL ll) {
LOG_VAR(ll) << " Window: " << _aggregationWindow;
LOG_VAR(ll) << " Input Path: " << (_modelIn!="" ? _modelIn : std::string("none"));
LOG_VAR(ll) << " Output Path: " << (_modelOut!="" ? _modelOut : std::string("none"));
LOG_VAR(ll) << " Blocks: " << _numBlocks;
LOG_VAR(ll) << " Training Sample: " << _trainingSamples;
LOG_VAR(ll) << " Reuse Model: " << (_reuseModel ? "enabled" : "disabled");
OperatorTemplate<CSSensorBase>::printConfig(ll);
}
void CSOperator::execOnInit() {
bool useDefault=true;
if(_modelIn!="") {
try {
if(!readFromFile(_modelIn))
LOG(error) << "Operator " + _name + ": incompatible CS data, falling back to default!";
else {
_trainingPending = false;
useDefault = false;
}
} catch(const std::exception& e) {
LOG(error) << "Operator " + _name + ": cannot load CS data from file, falling back to default!"; }
}
if(useDefault) {
_trainingPending = true;
_max.clear();
_min.clear();
_permVector.clear();
}
_trainingData.clear();
_trainingUnit = "";
}
void CSOperator::compute(U_Ptr unit) {
uint64_t nowTs = getTimestamp();
// Training-related tasks
if(_trainingPending && _streaming && (_trainingUnit==unit->getName() || _trainingUnit=="")) {
_trainingUnit = unit->getName();
// Fetching sensor data
if(_trainingData.empty())
_trainingData.resize(unit->getInputs().size());
for(size_t idx=0; idx<unit->getInputs().size(); idx++)
accumulateData(_trainingData, unit->getInputs()[idx], idx, nowTs);
// Performing training once enough samples are obtained
if(!_trainingData.empty() && _trainingData[0].size() >= _trainingSamples) {
computeMinMax(_trainingData);
computePermutation(_trainingData);
_actualBlocks = _trainingData.size() < _numBlocks ? _trainingData.size() : _numBlocks;
_blockLen = (float)_trainingData.size() / (float)_actualBlocks;
_trainingData.clear();
_trainingUnit = "";
_trainingPending = false;
}
}
// If the operator is in an invalid state
if(_permVector.empty() && !(_trainingPending && _streaming)) {
throw std::runtime_error("Operator " + _name + ": cannot compute signatures, no CS data available!");
// If an unit has an unexpected number of input sensors
} else if(!_permVector.empty() && _permVector.size()!=unit->getInputs().size()) {
throw std::runtime_error("Operator " + _name + ": unit " + unit->getName() + " has an anomalous number of inputs!");
}
if(!_permVector.empty()) {
computeSignature(unit, nowTs);
}
}
// -------------------------------------- INPUT / OUTPUT --------------------------------------
bool CSOperator::dumpToFile(std::string &path) {
boost::property_tree::ptree root, blocks;
std::ostringstream data;
if(_trainingPending || _permVector.empty())
return false;
// In JSON mode, sensors are arranged hierarchically by plugin->operator->sensor
for(size_t idx=0; idx<_numBlocks; idx++) {
boost::property_tree::ptree group;
group.push_back(boost::property_tree::ptree::value_type("idx", std::to_string(_permVector[idx])));
group.push_back(boost::property_tree::ptree::value_type("min", std::to_string(_min[idx])));
group.push_back(boost::property_tree::ptree::value_type("max", std::to_string(_max[idx])));
blocks.add_child(std::to_string(idx), group);
}
root.add_child(std::to_string(_numBlocks), blocks);
try {
boost::property_tree::write_json(data, root, true);
std::ofstream outFile(path);
if(!outFile.is_open())
return false;
else {
outFile << data.str();
outFile.close();
}
} catch(const std::exception &e) { return false; }
return true;
}
bool CSOperator::readFromFile(std::string &path) {
boost::property_tree::iptree config;
try {
boost::property_tree::read_json(path, config);
} catch(const std::exception &e) { return false; }
// The root JSON node encodes the number of blocks and has to match with that of the operator
std::string blockString = std::to_string(_numBlocks);
if(config.find(blockString) == config.not_found())
return false;
std::vector<size_t> newPermVector(_numBlocks);
std::vector<int64_t> newMin(_numBlocks);
std::vector<int64_t> newMax(_numBlocks);
BOOST_FOREACH(boost::property_tree::iptree::value_type &val, config.get_child(blockString)) {
size_t blockID = std::stoull(val.first);
boost::property_tree::iptree &blk = val.second;
if(blk.find("idx")==blk.not_found() || blk.find("min")==blk.not_found() || blk.find("max")==blk.not_found())
return false;
BOOST_FOREACH(boost::property_tree::iptree::value_type &val2, blk) {
if (boost::iequals(val2.first, "idx")) {
newPermVector[blockID] = std::stoull(val2.second.data());
} else if (boost::iequals(val2.first, "min")) {
newMin[blockID] = std::stoll(val2.second.data());
} else if (boost::iequals(val2.first, "max")) {
newMax[blockID] = std::stoll(val2.second.data());
}
}
}
// Replacing the operator's CS data
_permVector = newPermVector;
_min = newMin;
_max = newMax;
return true;
}
// -------------------------------------- MODEL TRAINING --------------------------------------
// Accumulates sensor data in-memory for later training
void CSOperator::accumulateData(std::vector<std::vector<reading_t>>& v, CSSBPtr s, size_t idx, uint64_t nowTs) {
// We query all new data for the sensor since the last one - we want a clean time series
uint64_t endTs = nowTs;
uint64_t startTs = v[idx].empty() ? endTs - _aggregationWindow : v[idx].back().timestamp;
_buffer.clear();
// This query might possibly fail very often, depending on the batching of sensors
if(!_queryEngine.querySensor(s->getName(), startTs, endTs, _buffer, false))
return;
// We add the queried values only if they are actually "new"
if(!_buffer.empty() && _buffer[0].timestamp>v[idx].back().timestamp)
v[idx].insert(v[idx].end(), _buffer.begin(), _buffer.end());
}
// Applies the sorting stage of the CS method and finds a permutation vector
void CSOperator::computePermutation(std::vector<std::vector<reading_t>>& v) {
// Each column of the matrix will be an interpolated sensor
cv::Mat sensorMatrix = cv::Mat(_trainingSamples, v.size(), CV_64F);
// Evaluation parameters post-interpolation
double startEval=v[0].front().timestamp;
double stepEval=(v[0].back().timestamp - v[0].front().timestamp) / _trainingSamples;
double startInterp, stepInterp;
for(size_t idx=0; idx<v.size(); idx++) {
std::vector<reading_t>& vals = v[idx];
startInterp = startEval - vals.front().timestamp;
stepInterp = (vals.back().timestamp - vals.front().timestamp) / vals.size();
// Copying element by element into a temporary vector - ugly and slow
std::vector<double> sValues(vals.size());
for(size_t idx2=0; idx2<vals.size(); idx2++)
sValues[idx2] = (double)vals[idx2].value;
// Spline interpolation
boost::math::cubic_b_spline<double> spline(sValues.begin(), sValues.end(), startInterp, stepInterp);
// Evaluating in the interpolated points and storing in the matrix
for(size_t idx2=0; idx2<_trainingSamples; idx2++)
sensorMatrix.at<double>(idx, idx2) = spline(stepEval*idx2);
}
// Calculating covariance matrix
cv::Mat covMatrix, meanMatrix;
cv::calcCovarMatrix(sensorMatrix, covMatrix, meanMatrix, cv::COVAR_COLS + cv::COVAR_SCALE + cv::COVAR_NORMAL, CV_64F);
sensorMatrix.release();
// Transforming the matrix
convertToCorrelation(covMatrix);
// Initial set of available sensors
std::set<size_t> availSet;
for(size_t idx=0; idx<v.size(); idx++)
availSet.insert(idx);
_permVector.clear();
double corrMax = -1000.0;
double corrCoef = 0.0;
size_t corrIdx = 0;
for(size_t idx=0; idx<v.size(); idx++) {
if (covMatrix.at<double>(idx, idx) > corrMax) {
corrMax = covMatrix.at<double>(idx, idx);
corrIdx = idx;
}
}
_permVector.push_back(corrIdx);
availSet.erase(corrIdx);
// Correlation-based sorting
while(!availSet.empty()) {
corrMax = -1000;
corrIdx = 0;
for(const auto& avId : availSet) {
corrCoef = covMatrix.at<double>(avId, avId) * covMatrix.at<double>(_permVector.back(), avId);
if(corrCoef > corrMax) {