Commit 9ed2230e authored by Alessio Netti's avatar Alessio Netti
Browse files

Analytics: refactored UnitGenerator (2)

- Added generateFromTemplate method
- Fixed handling of special cases (on-demand and root units)
- Added enforceTopics option to force a MQTT prefix for unit outputs
- printConfig in UnitTemplate now accounts for hierarchical units
- Updated README
parent b08c1680
......@@ -283,7 +283,7 @@ bool OperatorManager::checkTopics(op_dl_t p) {
MQTTChecker& mqttCheck = MQTTChecker::getInstance();
bool validTopics=true;
for(const auto& op : p.configurator->getOperators()) {
if (!mqttCheck.checkGroup(op->getName()) && !(op->getStreaming() && op->getDuplicate()))
if (!(op->getStreaming() && op->getDuplicate()) && !mqttCheck.checkGroup(op->getName()))
validTopics = false;
if (op->getStreaming()) {
for (const auto &u : op->getUnits())
......
# DCDB Data Analytics Framework
# Wintermute, the DCDB Data Analytics Framework
### Table of contents
1. [Introduction](#introduction)
2. [DCDBAnalytics](#dcdbanalytics)
2. [DCDB Wintermute](#dcdbanalytics)
1. [Global Configuration](#globalConfiguration)
2. [Operators](#operators)
1. [The Sensor Tree](#sensorTree)
......@@ -27,29 +27,29 @@
2. [Writing Plugins](#writingPlugins)
# Introduction <a name="introduction"></a>
In this Readme we describe the DCDB Data Analytics framework, and all data abstractions that are associated with it.
In this Readme we describe Wintermute, the DCDB Data Analytics framework, and all data abstractions that are associated with it.
# DCDBAnalytics <a name="dcdbanalytics"></a>
The DCDBAnalytics framework is built on top of DCDB, and allows to perform data analytics based on sensor data
in a variety of ways. DCDBAnalytics can be deployed both in DCDBPusher and in DCDBCollectAgent, with some minor
# DCDB Wintermute <a name="dcdbanalytics"></a>
The Wintermute framework is built on top of DCDB, and allows to perform data analytics based on sensor data
in a variety of ways. Wintermute can be deployed both in DCDB Pusher and Collect Agent, with some minor
differences:
* **DCDBPusher**: only sensor data that is sampled locally and that is contained within the sensor cache can be used for
* **DCDB Pusher**: only sensor data that is sampled locally and that is contained within the sensor cache can be used for
data analytics. However, this is the preferable way to deploy simple models on a large-scale, as all computation is
performed within compute nodes, dramatically increasing scalability;
* **DCDBCollectAgent**: all available sensor data, in the local cache and in the Cassandra database, can be used for data
* **DCDB Collect Agent**: all available sensor data, in the local cache and in the Cassandra database, can be used for data
analytics. This approach is preferable for models that require data from multiple sources at once
(e.g., clustering-based anomaly detection), or for models that are deployed in [on-demand](#operatorConfiguration) mode.
## Global Configuration <a name="globalConfiguration"></a>
DCDBAnalytics shares the same configuration structure as DCDBPusher and DCDBCollectAgent, using a global.conf configuration file.
Wintermute shares the same configuration structure as DCDB Pusher and Collect Agent, using a global.conf configuration file.
All output sensors of the frameworks are therefore affected by configuration parameters described in the global Readme.
Additional parameters specific to this framework are the following:
| Value | Explanation |
|:----- |:----------- |
| **analytics** | Wrapper structure for the data analytics-specific values.
| hierarchy | Space-separated sequence of regular expressions used to infer the local (DCDBPusher) or global (DCDBCollectAgent) sensor hierarchy. This parameter should be wrapped in quotes to ensure proper parsing. See the Sensor Tree [section](#sensorTree) for more details.
| hierarchy | Space-separated sequence of regular expressions used to infer the local (DCDB Pusher) or global (DCDB Collect Agent) sensor hierarchy. This parameter should be wrapped in quotes to ensure proper parsing. See the Sensor Tree [section](#sensorTree) for more details.
| filter | Regular expression used to filter the set of sensors in the sensor tree. Everything that matches is included, the rest is discarded.
| jobFilter | Regular expression used to filter the jobs processed by job operators. The expression is applied to the first node of the job's nodelist. If a match is found the job is processed, otherwise it is discarded. This behavior can be changed at the plugin level.
| **operatorPlugins** | Block containing the specification of all data analytics plugin to be instantiated.
......@@ -59,9 +59,9 @@ Additional parameters specific to this framework are the following:
| | |
## Operators <a name="operators"></a>
Operators are the basic building block in DCDBAnalytics. A Operator is instantiated within a plugin, performs a specific
Operators are the basic building block in Wintermute. A Operator is instantiated within a plugin, performs a specific
task and acts on sets of inputs and outputs called _units_. Operators are functionally equivalent to _sensor groups_
in DCDBPusher, but instead of sampling data, they process such data and output new sensors. Some high-level examples
in DCDB Pusher, but instead of sampling data, they process such data and output new sensors. Some high-level examples
of operators are the following:
* An operator that performs time series regression on a particular input sensor, and outputs its prediction;
......@@ -74,11 +74,11 @@ compute nodes;
### The Sensor Tree <a name="sensorTree"></a>
Before diving into the configuration and instantiation of operators, we introduce the concept of _sensor tree_. A sensor
tree is simply a data structure expressing the hierarchy of sensors that are being sampled; internal nodes express
hierarchical entities (e.g. clusters, racks, nodes, cpus), whereas leaf nodes express actual sensors. In DCDBPusher,
hierarchical entities (e.g. clusters, racks, nodes, cpus), whereas leaf nodes express actual sensors. In DCDB Pusher,
a sensor tree refers only to the local hierarchy, while in DCDBCollectAgent it can capture the hierarchy of the entire
system being sampled.
A sensor tree is built at initialization time of DCDBAnalytics, and is implemented in the _SensorNavigator_ class.
A sensor tree is built at initialization time of DCDB Wintermute, and is implemented in the _SensorNavigator_ class.
Its construction is regulated by the _hierarchy_ global configuration parameter, which can be for example the following:
```
......@@ -125,7 +125,7 @@ dot-separated part of the sensor name expresses a level in the hierarchy. The to
at runtime as well.
> NOTE 2 &ensp;&ensp;&ensp; Sensor trees are always built from the names of sensors _as they are published_. Therefore,
please make sure to use the _-a_ option in DCDBPusher appropriately, to build sensor names that express the desired hierarchy.
please make sure to use the _-a_ option in DCDB Pusher appropriately, to build sensor names that express the desired hierarchy.
### The Unit System <a name="unitSystem"></a>
......@@ -146,7 +146,7 @@ Operators can operate in two different modes:
* **Streaming**: streaming operators perform data analytics online and autonomously, processing incoming sensor data at regular intervals.
The units of streaming operators are completely resolved and instantiated at configuration time. The type of output of streaming
operators is identical to that of _sensors_ in DCDBPusher, which are pushed to DCDBCollectAgent and finally to the Cassandra database,
operators is identical to that of _sensors_ in DCDB Pusher, which are pushed to DCDBCollectAgent and finally to the Cassandra database,
resulting in a time series representation;
* **On-demand**: on-demand operators do not perform data analytics autonomously, but only when queried by users. Unlike
for streaming operators, the units of on-demand operators are not instantiated at configuration, but only when a query is performed. When
......@@ -159,8 +159,8 @@ when data is required at specific times and for specific purposes, and when the
operators unfeasible.
### Operator Configuration <a name="operatorConfiguration"></a>
Here we describe how to configure and instantiate operators in DCDBAnalytics. The configuration scheme is very similar
to that of _sensor groups_ in DCDBPusher, and a _global_ configuration block can be defined in each plugin configuration
Here we describe how to configure and instantiate operators in Wintermute. The configuration scheme is very similar
to that of _sensor groups_ in DCDB Pusher, and a _global_ configuration block can be defined in each plugin configuration
file. The following is instead a list of configuration parameters that are available for the operators themselves:
| Value | Explanation |
......@@ -172,11 +172,12 @@ file. The following is instead a list of configuration parameters that are avail
| unitCacheLimit | Defines the maximum size of the unit cache that is used in the on-demand and job modes. Default is 1000.
| minValues | Minimum number of readings that need to be stored in output sensors before these are pushed as MQTT messages. Only used for operators in _streaming_ mode.
| mqttPart | Part of the MQTT topic associated to this operator. Only used when the Unit system is not employed (see this [section](#mqttTopics)).
| enforceTopics | If set to _true_, mqttPart will be forcibly pre-pended to the MQTT topics of all output sensors in the operator (see this [section](#mqttTopics)).
| sync | If set to _true_, computation will be performed at time intervals synchronized with sensor readings.
| disabled | If set to _true_, the operator will be instantiated but will not be started and will not be available for queries.
| duplicate | If set to _false_, only one operator object will be instantiated. Such operator will perform computation over all units that are instantiated, at every interval, sequentially. If set to _true_, the operator object will be duplicated such that each copy will have one unit associated to it. This allows to exploit parallelism between units, but results in separate models to avoid race conditions.
| streaming | If set to _true_, the operator will operate in _streaming_ mode, pushing output sensors regularly. If set to _false_, the operator will instead operate in _on-demand_ mode.
| input | Block of input sensors that must be used to instantiate the units of this operator. These can both be a list of strings, or fully-qualified _Sensor_ blocks containing specific attributes (see DCDBPusher Readme).
| input | Block of input sensors that must be used to instantiate the units of this operator. These can both be a list of strings, or fully-qualified _Sensor_ blocks containing specific attributes (see DCDB Pusher Readme).
| output | Block of output sensors that will be associated to this operator. These must be _Sensor_ blocks containing valid MQTT suffixes. Note that the number of output sensors is usually fixed depending on the type of operator.
| | |
......@@ -324,7 +325,7 @@ The MQTT topics associated to output sensors of a certain operator are construct
on the unit they belong to:
* **Root unit**: if the output sensors belong to the _root_ unit, that is, they do not belong to any level in the sensor
hierarchy and are uniquely defined, the respective topics are constructed like in DCDBPusher sensors, by concatenating
hierarchy and are uniquely defined, the respective topics are constructed like in DCDB Pusher sensors, by concatenating
the MQTT prefix, operator part and sensor suffix that are defined;
* **Job unit**: if the output sensors belong to a _job_ unit in a job operator (see below), the MQTT topic is constructed
by concatenating the MQTT prefix, the operator part, a job suffix (e.g., /job1334) and finally the sensor suffix;
......@@ -363,8 +364,8 @@ from the last computation to the present; it will then build one job unit for ea
> NOTE &ensp;&ensp;&ensp;&ensp;&ensp; The _duplicate_ setting does not affect job operators.
## Rest API <a name="restApi"></a>
DCDBAnalytics provides a REST API that can be used to perform various management operations on the framework. The
API is functionally identical to that of DCDBPusher, and is hosted at the same address. All requests that are targeted
Wintermute provides a REST API that can be used to perform various management operations on the framework. The
API is functionally identical to that of DCDB Pusher, and is hosted at the same address. All requests that are targeted
at the data analytics framework must have a resource path starting with _/analytics_.
### List of ressources <a name="listOfRessources"></a>
......@@ -588,7 +589,7 @@ Prefix `/analytics` left out!
> NOTE &ensp;&ensp;&ensp;&ensp;&ensp; Opt. = Optional
> NOTE 2 &ensp;&ensp;&ensp;&ensp;&ensp; The value of operator output sensors can be retrieved with the _compute_ resource, or with the [plugin]/[sensor]/avg resource defined in the DCDBPusher REST API.
> NOTE 2 &ensp;&ensp;&ensp;&ensp;&ensp; The value of operator output sensors can be retrieved with the _compute_ resource, or with the [plugin]/[sensor]/avg resource defined in the DCDB Pusher REST API.
> NOTE 3 &ensp;&ensp;&ensp;&ensp;&ensp; Developers can integrate their custom REST API resources that are plugin-specific, by implementing the _REST_ method in _OperatorTemplate_. To know more about plugin-specific resources, please refer to the respective documentation.
......@@ -619,7 +620,7 @@ PUT https://localhost:8000/analytics/compute?plugin=average;operator=avgOperator
> NOTE &ensp;&ensp;&ensp;&ensp;&ensp; The analytics RestAPI requires authentication credentials as well.
# Plugins <a name="plugins"></a>
Here we describe available plugins in DCDBAnalytics, and how to configure them.
Here we describe available plugins in Wintermute, and how to configure them.
## Aggregator Plugin <a name="averagePlugin"></a>
The _Aggregator_ plugin implements simple data processing algorithms. Specifically, this plugin allows to perform basic
......@@ -691,7 +692,7 @@ The _Tester_ plugin can be used to test the functionality and performance of the
| relative | If true, the _relative_ query mode will be used. Otherwise the _absolute_ mode is used.
# Sink Plugins <a name="sinkplugins"></a>
Here we describe available plugins in DCDBAnalytics that are devoted to the output of sensor data (_sinks_), and that do not perform any analysis.
Here we describe available plugins in Wintermute that are devoted to the output of sensor data (_sinks_), and that do not perform any analysis.
## File Sink Plugin <a name="filesinkPlugin"></a>
The _File Sink_ plugin allows to write the output of any other sensor to the local file system. As such, it does not produce output sensors by itself, and only reads from input sensors.
......@@ -708,7 +709,7 @@ Additionally, input sensors in sinks accept the following parameters:
| path | The path to which the sensors's readings should be written. It is interpreted as described above for the _autoName_ attribute.
## Writing DCDB Analytics Plugins <a name="writingPlugins"></a>
Generating a DCDBAnalytics plugin requires implementing a _Operator_ and _Configurator_ class which contain all logic
Generating a DCDB Wintermute plugin requires implementing a _Operator_ and _Configurator_ class which contain all logic
tied to the specific plugin. Such classes should be derived from _OperatorTemplate_ and _OperatorConfiguratorTemplate_
respectively, which contain all plugin-agnostic configuration and runtime features. Please refer to the documentation
of the _Aggregator_ plugin for an overview of how a basic plugin can be implemented.
......@@ -95,7 +95,7 @@ protected:
op.setDuplicate(false);
vector <shared_ptr<UnitTemplate<SBase>>> *units = NULL;
try {
units = this->_unitGen.generateUnits(std::list<std::string>(), protoInputs, protoOutputs, inputMode, op.getMqttPart(), true, op.getRelaxed());
units = this->_unitGen.generateUnits(std::list<std::string>(), protoInputs, protoOutputs, inputMode, op.getMqttPart(), true, false, op.getRelaxed());
}
catch (const std::exception &e) {
LOG(error) << this->_operatorName << " " << op.getName() << ": Error when creating template job unit: " << e.what();
......
......@@ -238,7 +238,7 @@ protected:
// The job unit is generated as a hierarchical unit with the top level unit and the sub-units having
// the same set of output sensors
jobUnit = unitGen.generateHierarchicalUnit(jobTopic, jobData.nodes, uTemplate->getOutputs(), uTemplate->getInputs(),
uTemplate->getOutputs(), uTemplate->getInputMode(), this->_mqttPart, this->_relaxed);
uTemplate->getOutputs(), uTemplate->getInputMode(), this->_mqttPart, false, false, this->_relaxed);
// Initializing sensors if necessary
jobUnit->init(this->_cacheSize, this->_flatten);
this->addToUnitCache(jobUnit);
......
......@@ -514,7 +514,7 @@ protected:
LOG(debug) << " No output specified, generating sink unit.";
try {
units = _unitGen.generateUnits(std::list<std::string>(), protoInputs, protoOutputs, inputMode, op.getMqttPart(), !op.getStreaming(), op.getRelaxed());
units = _unitGen.generateUnits(std::list<std::string>(), protoInputs, protoOutputs, inputMode, op.getMqttPart(), !op.getStreaming(), false, op.getRelaxed());
}
catch (const std::exception &e) {
LOG(error) << _operatorName << " " << op.getName() << ": Error when creating units: " << e.what();
......
......@@ -80,6 +80,7 @@ public:
_mqttPart(""),
_isTemplate(false),
_relaxed(false),
_enforceTopics(false),
_duplicate(false),
_streaming(true),
_sync(true),
......@@ -106,6 +107,7 @@ public:
_mqttPart(other._mqttPart),
_isTemplate(other._isTemplate),
_relaxed(other._relaxed),
_enforceTopics(other._enforceTopics),
_duplicate(other._duplicate),
_streaming(other._streaming),
_sync(other._sync),
......@@ -137,6 +139,7 @@ public:
_mqttPart = other._mqttPart;
_isTemplate = other._isTemplate;
_relaxed = other._relaxed;
_enforceTopics = other._enforceTopics;
_unitID = other._unitID;
_duplicate = other._duplicate;
_streaming = other._streaming;
......@@ -251,6 +254,7 @@ public:
const string& getMqttPart() const { return _mqttPart; }
bool getTemplate() const { return _isTemplate; }
bool getRelaxed() const { return _relaxed; }
bool getEnforceTopics() const { return _enforceTopics; }
bool getSync() const { return _sync; }
bool getDuplicate() const { return _duplicate; }
bool getStreaming() const { return _streaming; }
......@@ -269,6 +273,7 @@ public:
void setMqttPart(const string& mqttPart) { _mqttPart = mqttPart; }
void setTemplate(bool t) { _isTemplate = t; }
void setRelaxed(bool r) { _relaxed = r; }
void setEnforcetopics(bool e) { _enforceTopics = e; }
void setSync(bool sync) { _sync = sync; }
void setUnitID(int u) { _unitID = u; }
void setStreaming(bool streaming) { _streaming = streaming; }
......@@ -329,6 +334,8 @@ protected:
bool _isTemplate;
// If the operator's units must be built in relaxed mode
bool _relaxed;
// If true, when building the units of this operator all output sensors will have _mqttPart prepended to them
bool _enforceTopics;
// If true, the operator is a duplicate of another
bool _duplicate;
// If true, the operator performs computation in streaming
......
......@@ -147,8 +147,11 @@ public:
LOG_VAR(ll) << " Unit Cache Size: " << _unitCacheLimit;
if(!_units.empty()) {
LOG_VAR(ll) << " Units:";
for (auto u : _units)
u->printConfig(ll, lg);
if(_unitID<0)
for (auto u : _units)
u->printConfig(ll, lg);
else
_units[_unitID]->printConfig(ll, lg);
} else
LOG_VAR(ll) << " Units: none";
}
......@@ -323,7 +326,7 @@ public:
throw std::runtime_error("No template unit in operator " + _name + "!");
LOG(debug) << "Operator " << _name << ": cache miss for unit " << node << ".";
U_Ptr uTemplate = _unitCache->at(SensorNavigator::templateKey);
tempUnit = unitGen.generateUnit(node, uTemplate->getInputs(), uTemplate->getOutputs(), uTemplate->getInputMode(), _mqttPart, _relaxed);
tempUnit = unitGen.generateUnit(node, uTemplate->getInputs(), uTemplate->getOutputs(), uTemplate->getInputMode(), _mqttPart, false, false, _relaxed);
addToUnitCache(tempUnit);
}
......
This diff is collapsed.
......@@ -113,10 +113,11 @@ public:
/**
* @brief Prints the current unit configuration
*
* @param ll Logging level at which the configuration is printed
* @param lg Logger object to be used
* @param ll Logging level at which the configuration is printed
* @param lg Logger object to be used
* @param leadingSpaces Number of leading spaces to pre-pend
*/
virtual void printConfig(LOG_LEVEL ll, LOGGER& lg) = 0;
virtual void printConfig(LOG_LEVEL ll, LOGGER& lg, unsigned leadingSpaces=16) = 0;
};
......
......@@ -337,21 +337,24 @@ public:
/**
* @brief Prints the current unit configuration
*
* @param ll Logging level at which the configuration is printed
* @param lg Logger object to be used
* @param ll Logging level at which the configuration is printed
* @param lg Logger object to be used
* @param leadingSpaces Number of leading spaces to pre-pend
*/
virtual void printConfig(LOG_LEVEL ll, LOGGER& lg) override {
LOG_VAR(ll) << " Unit: " << _name;
LOG_VAR(ll) << " Inputs: ";
virtual void printConfig(LOG_LEVEL ll, LOGGER& lg, unsigned leadingSpaces=16) override {
if(leadingSpaces>30) return;
std::string leading(leadingSpaces, ' ');
LOG_VAR(ll) << leading << "Unit: " << _name;
LOG_VAR(ll) << leading << "Inputs: ";
for (const auto &i : _inputs)
LOG_VAR(ll) << " " << i->getName();
LOG_VAR(ll) << " Outputs: ";
LOG_VAR(ll) << leading << " " << i->getName();
LOG_VAR(ll) << leading << "Outputs: ";
for (const auto &o : _outputs)
o->printConfig(ll, lg, 20);
o->printConfig(ll, lg, leadingSpaces+4);
if(_subUnits.size()>0) {
LOG_VAR(ll) << " Sub-units: ";
LOG_VAR(ll) << leading << "Sub-units: ";
for (const auto &u : _subUnits)
LOG_VAR(ll) << " " << u->getName();
u->printConfig(ll, lg, leadingSpaces+4);
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment