Commit 83b2ac44 authored by Alessio Netti's avatar Alessio Netti

Analytics: added file sink plugin

- This plugin allows to write arbitrary sensor data to the local file system
- Removed the obsolete sink functionality in SensorBase
- Also fixed a minor bug concerning MQTT topic assignment in template analyzers
parent c9249c22
......@@ -4,7 +4,7 @@ include ../config.mk
CXXFLAGS += -DBOOST_NETWORK_ENABLE_HTTPS -I../common/include -I$(DCDBDEPLOYPATH)/include -I$(DCDBDEPLOYPATH)/include/opencv4
LIBS = -L../lib -L$(DCDBDEPLOYPATH)/lib/ -ldl -lboost_system -lboost_thread -lboost_log_setup -lboost_log -lboost_regex -lpthread -rdynamic
ANALYZERS = aggregator regressor job_aggregator testeranalyzer smucngperfanalyzer
ANALYZERS = aggregator regressor job_aggregator testeranalyzer filesink smucngperfanalyzer
ifeq ($(OS),Darwin)
BACNET_PORT = bsd
......@@ -54,6 +54,9 @@ libdcdbanalyzer_job_aggregator.$(LIBEXT): analyzers/aggregator/AggregatorAnalyze
libdcdbanalyzer_testeranalyzer.$(LIBEXT): analyzers/testeranalyzer/TesterAnalyzer.o analyzers/testeranalyzer/TesterAnalyzerConfigurator.o ../common/src/sensornavigator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex
libdcdbanalyzer_filesink.$(LIBEXT): analyzers/filesink/FilesinkAnalyzer.o analyzers/filesink/FilesinkConfigurator.o ../common/src/sensornavigator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex
libdcdbanalyzer_smucngperfanalyzer.$(LIBEXT): analyzers/smucngperfanalyzer/SMUCNGPerfAnalyzer.o analyzers/smucngperfanalyzer/SMUCNGPerfConfigurator.o ../common/src/sensornavigator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex
......@@ -21,8 +21,9 @@
1. [Aggregator Plugin](#averagePlugin)
2. [Job Aggregator Plugin](#jobaveragePlugin)
3. [Regressor Plugin](#regressorPlugin)
4. [Tester Plugin](#testerPlugin)
5. [Writing Plugins](#writingPlugins)
4. [File Sink Plugin](#filesinkPlugin)
5. [Tester Plugin](#testerPlugin)
6. [Writing Plugins](#writingPlugins)
# Introduction <a name="introduction"></a>
In this Readme we describe the DCDB Data Analytics framework, and all data abstractions that are associated with it.
......@@ -670,6 +671,20 @@ Finally, the Regressor plugin supports the following additional REST API action:
| train | Triggers a new training phase for the random forest model. Feature vectors are temporarily collected in-memory until _trainingSamples_ vectors are obtained. Until this moment, the old random forest model is still used to perform prediction.
| importances | Returns the sorted importance values for the input features, together with the respective labels, if available.
## File Sink Plugin <a name="filesinkPlugin"></a>
The _File Sink_ plugin allows to write the output of any other sensor to the local file system. As such, it does not produce output sensors by itself, and only reads from input sensors.
The input sensors can either be fully qualified, or can be described through the unit system. In this case, multiple input sensors can be generated automatically, and the respective output paths need to be adjusted by enabling the _autoName_ attribute described below, to prevent multiple sensors from being written to the same file. The file sink analyzers (named sinks) support the following attributes:
| Value | Explanation |
|:----- |:----------- |
| autoName | Boolean. If false, the output paths associated to sensors are interpreted literally, and a file is opened for them. If true, only the part in the path describing the current directory is used, while the file itself is named accordingly to the MQTT topic of the specific sensor.
Additionally, input sensors in sinks accept the following parameters:
| Value | Explanation |
|:----- |:----------- |
| path | The path to which the sensors's readings should be written. It is interpreted as described above for the _autoName_ attribute.
## Tester Plugin <a name="testerPlugin"></a>
The _Tester_ plugin can be used to test the functionality and performance of the query engine, as well as of the unit system. It will perform a specified number of queries over the set of input sensors for each unit, and then output as a sensor the total number of retrieved readings. The following are the configuration parameters for analyzers in the _Tester_ plugin:
......
//================================================================================
// Name : FilesinkAnalyzer.cpp
// Author : Alessio Netti
// Contact : info@dcdb.it
// Copyright : Leibniz Supercomputing Centre
// Description :
//================================================================================
//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2019-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//================================================================================
#include "FilesinkAnalyzer.h"
FilesinkAnalyzer::FilesinkAnalyzer(const std::string& name) : AnalyzerTemplate(name) {
_autoName = false;
_buffer = nullptr;
}
FilesinkAnalyzer::FilesinkAnalyzer(const FilesinkAnalyzer& other) : AnalyzerTemplate(other) {
_autoName = other._autoName;
_buffer = nullptr;
}
FilesinkAnalyzer::~FilesinkAnalyzer() {
if(_buffer)
delete _buffer;
}
void FilesinkAnalyzer::printConfig(LOG_LEVEL ll) {
LOG_VAR(ll) << " Auto naming: " << (_autoName ? "enabled" : "disabled");
AnalyzerTemplate<FilesinkSensorBase>::printConfig(ll);
}
void FilesinkAnalyzer::stop() {
for(const auto& u : _units)
for(const auto& in : u->getInputs())
in->closeFile();
AnalyzerTemplate<FilesinkSensorBase>::stop();
}
void FilesinkAnalyzer::compute(U_Ptr unit) {
for(const auto& in : unit->getInputs()) {
if(!in->isOpen()) {
in->setPath(adjustPath(in));
if(!in->openFile()) {
LOG(error) << "Analyzer " + _name + ": failed to open file for sensor " << in->getName() << "!";
continue;
}
}
// Clearing the buffer, if already allocated
if(_buffer)
_buffer->clear();
_buffer = _queryEngine.querySensor(in->getName(), 0, 0, _buffer);
if(!_buffer || _buffer->empty())
LOG(error) << "Analyzer " + _name + ": cannot read from sensor " + in->getName() + "!";
if(!in->writeFile(_buffer->at(_buffer->size()-1)))
LOG(error) << "Analyzer " + _name + ": failed file write for sensor " << in->getName() << "!";
}
}
std::string FilesinkAnalyzer::adjustPath(FilesinkSBPtr s) {
std::string adjPath = s->getPath();
if(this->_autoName || adjPath.empty()) {
// If no path is specified, we fall back to the current directory
if(adjPath.empty())
adjPath = "./";
size_t lastSep = adjPath.find_last_of('/');
// If no separator can be found in the path, we append one
if(lastSep == std::string::npos)
adjPath += '/';
// Else, we pick the path up to the directory and remove the last segment
else
adjPath = adjPath.substr(0, lastSep+1);
adjPath += MQTTChecker::topicToName(s->getMqtt());
}
return adjPath;
}
\ No newline at end of file
//================================================================================
// Name : FilesinkAnalyzer.h
// Author : Alessio Netti
// Contact : info@dcdb.it
// Copyright : Leibniz Supercomputing Centre
// Description :
//================================================================================
//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2019-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//================================================================================
#ifndef PROJECT_FILESINKANALYZER_H
#define PROJECT_FILESINKANALYZER_H
#include "../../includes/AnalyzerTemplate.h"
#include "FilesinkSensorBase.h"
#include <math.h>
#include <algorithm>
/**
* @brief Filesink analyzer plugin.
*
* @ingroup filesink
*/
class FilesinkAnalyzer : virtual public AnalyzerTemplate<FilesinkSensorBase> {
public:
FilesinkAnalyzer(const std::string& name);
FilesinkAnalyzer(const FilesinkAnalyzer& other);
virtual ~FilesinkAnalyzer();
void setAutoName(bool a) { _autoName = a; }
bool getAutoName() { return _autoName; }
void printConfig(LOG_LEVEL ll) override;
void stop() override;
protected:
virtual void compute(U_Ptr unit) override;
std::string adjustPath(FilesinkSBPtr s);
bool _autoName;
vector<reading_t> *_buffer;
};
#endif //PROJECT_FILESINKANALYZER_H
//================================================================================
// Name : FilesinkConfigurator.cpp
// Author : Alessio Netti
// Contact : info@dcdb.it
// Copyright : Leibniz Supercomputing Centre
// Description :
//================================================================================
//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2019-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//================================================================================
#include "FilesinkConfigurator.h"
FilesinkConfigurator::FilesinkConfigurator() : AnalyzerConfiguratorTemplate() {
_analyzerName = "sink";
_baseName = "sensor";
}
FilesinkConfigurator::~FilesinkConfigurator() {}
void FilesinkConfigurator::sensorBase(FilesinkSensorBase& s, CFG_VAL config) {
BOOST_FOREACH(boost::property_tree::iptree::value_type &val, config)
{
if (boost::iequals(val.first, "path")) {
s.setPath(val.second.data());
}
}
}
void FilesinkConfigurator::analyzer(FilesinkAnalyzer& a, CFG_VAL config) {
BOOST_FOREACH(boost::property_tree::iptree::value_type &val, config)
{
if (boost::iequals(val.first, "autoName"))
a.setAutoName(to_bool(val.second.data()));
}
}
bool FilesinkConfigurator::unit(UnitTemplate<FilesinkSensorBase>& u) {
if(!u.getOutputs().empty()) {
LOG(error) << _analyzerName << ": This is a file sink, no output sensors can be defined!";
return false;
}
return true;
}
//================================================================================
// Name : FilesinkConfigurator.h
// Author : Alessio Netti
// Contact : info@dcdb.it
// Copyright : Leibniz Supercomputing Centre
// Description :
//================================================================================
//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2019-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//================================================================================
#ifndef PROJECT_FILESINKCONFIGURATOR_H
#define PROJECT_FILESINKCONFIGURATOR_H
#include "../../includes/AnalyzerConfiguratorTemplate.h"
#include "FilesinkAnalyzer.h"
/**
* @brief Configurator for the filesink plugin.
*
* @ingroup filesink
*/
class FilesinkConfigurator : virtual public AnalyzerConfiguratorTemplate<FilesinkAnalyzer, FilesinkSensorBase> {
public:
FilesinkConfigurator();
~FilesinkConfigurator();
private:
void sensorBase(FilesinkSensorBase& s, CFG_VAL config) override;
void analyzer(FilesinkAnalyzer& a, CFG_VAL config) override;
bool unit(UnitTemplate<FilesinkSensorBase>& u) override;
};
extern "C" AnalyzerConfiguratorInterface* create() {
return new FilesinkConfigurator;
}
extern "C" void destroy(AnalyzerConfiguratorInterface* c) {
delete c;
}
#endif //PROJECT_FILESINKCONFIGURATOR_H
//================================================================================
// Name : FilesinkSensorBase.h
// Author : Alessio Netti
// Contact : info@dcdb.it
// Copyright : Leibniz Supercomputing Centre
// Description :
//================================================================================
//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2019-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//================================================================================
/**
* @defgroup filesink Filesink analyzer plugin.
* @ingroup analyzer
*
* @brief Filesink analyzer plugin.
*/
#ifndef PROJECT_FILESINKSENSORBASE_H
#define PROJECT_FILESINKSENSORBASE_H
#include "sensorbase.h"
#include <fstream>
/**
* @brief Sensor base for filesink plugin
*
* @ingroup filesink
*/
class FilesinkSensorBase : public SensorBase {
public:
// Constructor and destructor
FilesinkSensorBase(const std::string& name) : SensorBase(name) {
_file.reset(nullptr);
_adjusted = false;
_path = "";
}
// Copy constructor
FilesinkSensorBase(FilesinkSensorBase& other) : SensorBase(other) {
_file.reset(nullptr);
_adjusted = false;
_path = other._path;
}
FilesinkSensorBase& operator=(const FilesinkSensorBase& other) {
SensorBase::operator=(other);
_file.reset(nullptr);
_adjusted = false;
_path = other._path;
return *this;
}
virtual ~FilesinkSensorBase() {}
void setPath(const std::string& path) { _path = path; }
void setAdjusted(bool a) { _adjusted = a; }
const std::string& getPath() { return _path; }
bool getAdjusted() { return _adjusted; }
bool isOpen() { return _file && _file->is_open(); }
bool openFile() {
closeFile();
_file.reset(new std::ofstream(_path));
if (!_file->is_open()) {
_file.reset(nullptr);
return false;
}
return true;
}
bool closeFile() {
if(_file && _file->is_open())
_file->close();
_file.reset(nullptr);
return true;
}
bool writeFile(reading_t val) {
if(_file && _file->is_open()) {
try {
_file->seekp(0, std::ios::beg);
*_file << val.value << std::endl;
return true;
} catch(const std::exception &e) {
_file->close();
_file.reset(nullptr);
}
}
return false;
}
void printConfig(LOG_LEVEL ll, LOGGER& lg, unsigned leadingSpaces=16) {
SensorBase::printConfig(ll, lg, leadingSpaces);
std::string leading(leadingSpaces, ' ');
LOG_VAR(ll) << leading << " Path: " << _path;
}
protected:
bool _adjusted;
std::string _path;
std::unique_ptr<std::ofstream> _file;
};
using FilesinkSBPtr = std::shared_ptr<FilesinkSensorBase>;
#endif //PROJECT_FILESINKSENSORBASE_H
......@@ -158,7 +158,7 @@ void RegressorAnalyzer::trainRandomForest() {
try {
_rForest->save(_modelOut);
} catch(const std::exception& e) {
LOG(error) << "Analyzer " + _name + ": cannot save model to file!"; }
LOG(error) << "Analyzer " + _name + ": cannot save the model to a file!"; }
}
}
......
template_sink def1 {
interval 1000
streaming true
}
; In this first example we explicitly pick the input sensors and the paths they should be written to
sink s1 {
default def1
autoName false
input {
sensor "/test/cpu0/col_user" {
path /home/col_user_cpu0.log
}
sensor "/test/MemFree" {
path /home/memfree.log
}
}
}
; In this case we enable automatic naming, and use the unit system to automatically pick the col_idle sensors
; associated to all CPUs; these will be written into the specified path, each with a filename corresponding to the
; MQTT topic
sink s2 {
default def1
autoName true
input {
sensor "<bottomup, filter cpu>col_idle" {
path /home/cpudata/
}
}
}
......@@ -421,9 +421,9 @@ protected:
// Reading all derived attributes, if any
analyzer(an, config);
an.setMqttPart(MQTTChecker::formatTopic(_mqttPrefix) + MQTTChecker::formatTopic(an.getMqttPart()));
// Instantiating units
if(!an.getTemplate()) {
an.setMqttPart(MQTTChecker::formatTopic(_mqttPrefix) + MQTTChecker::formatTopic(an.getMqttPart()));
return readUnits(an, protoInputs, protoOutputs, inputMode);
} else {
// If the analyzer is a template, we add it to the related map
......@@ -474,8 +474,6 @@ protected:
sBase.setSkipConstVal(to_bool(val.second.data()));
} else if (boost::iequals(val.first, "delta")) {
sBase.setDelta(to_bool(val.second.data()));
} else if (boost::iequals(val.first, "sink")) {
sBase.setSinkPath(val.second.data());
} else if (boost::iequals(val.first, "subSampling")) {
sBase.setSubsampling(std::stoul(val.second.data()));
}
......
......@@ -313,9 +313,6 @@ public:
shared_ptr<SBase> uOut = make_shared<SBase>(*out);
uOut->setMqtt(MQTTChecker::formatTopic(mqttPrefix) + MQTTChecker::formatTopic(name) + MQTTChecker::formatTopic(uOut->getMqtt()));
uOut->setName(uOut->getMqtt());
// Duplicating the file sink adding the name of each unit to the path
if(uOut->getSinkPath()!="")
uOut->setSinkPath(MQTTChecker::topicToFile(uOut->getMqtt(), uOut->getSinkPath()));
topUnit->addOutput(uOut);
}
return topUnit;
......@@ -437,9 +434,6 @@ protected:
uOut.setMqtt(MQTTChecker::formatTopic(mqttPrefix) + MQTTChecker::formatTopic(uOut.getMqtt()));
// Setting the name back to the MQTT topic
uOut.setName(uOut.getMqtt());
// Duplicating the file sink adding the name of each unit to the path
if(uOut.getSinkPath()!="")
uOut.setSinkPath(MQTTChecker::topicToFile(uOut.getMqtt(), uOut.getSinkPath()));
unitOutputs.push_back(make_shared<SBase>(uOut));
}
shared_ptr<UnitTemplate<SBase>> unPtr = make_shared<UnitTemplate<SBase>>(u, unitInputs, unitOutputs);
......
......@@ -48,7 +48,6 @@ public:
SensorBase(const std::string& name) :
_name(name),
_mqtt(""),
_sinkPath(""),
_skipConstVal(false),
_cacheInterval(900000),
_subsamplingFactor(1),
......@@ -56,8 +55,7 @@ public:
_cache(nullptr),
_delta(false),
_firstReading(true),
_readingQueue(nullptr),
_sinkFile(nullptr) {
_readingQueue(nullptr) {
_lastRawUValue.timestamp = 0;
_lastRawUValue.value = 0;
......@@ -74,7 +72,6 @@ public:
SensorBase(const SensorBase& other) :
_name(other._name),
_mqtt(other._mqtt),
_sinkPath(other._sinkPath),
_skipConstVal(other._skipConstVal),
_cacheInterval(other._cacheInterval),
_subsamplingFactor(other._subsamplingFactor),
......@@ -87,15 +84,13 @@ public:
_latestValue(other._latestValue),
_lastSentValue(other._lastSentValue),
_accumulator(other._accumulator),
_readingQueue(nullptr),
_sinkFile(nullptr) {}
_readingQueue(nullptr) {}
virtual ~SensorBase() { if(_sinkFile) _sinkFile->close(); }
virtual ~SensorBase() {}
SensorBase& operator=(const SensorBase& other) {
_name = other._name;
_mqtt = other._mqtt;
_sinkPath = other._sinkPath;
_skipConstVal = other._skipConstVal;
_cacheInterval = other._cacheInterval;
_subsamplingFactor = other._subsamplingFactor;
......@@ -114,7 +109,6 @@ public:
_accumulator.timestamp = other._accumulator.timestamp;