Commit a0874ea2 authored by Alessio Netti's avatar Alessio Netti
Browse files

Merge remote-tracking branch 'remotes/origin/mqttTopics'

# Conflicts:
#	dcdbpusher/Makefile
parents 561c39ac 3e683acf
......@@ -262,12 +262,14 @@ void AnalyticsManager::removeTopics(an_dl_t p) {
MQTTChecker& mqttCheck = MQTTChecker::getInstance();
for(const auto& a : p.configurator->getAnalyzers()) {
mqttCheck.removeGroup(a->getName());
if (a->getStreaming())
if (a->getStreaming()) {
for (const auto &u : a->getUnits())
for (const auto &o: u->getBaseOutputs()) {
mqttCheck.removeTopic(o->getMqtt());
mqttCheck.removeName(o->getName());
}
a->releaseUnits();
}
}
}
......@@ -277,11 +279,13 @@ bool AnalyticsManager::checkTopics(an_dl_t p) {
for(const auto& a : p.configurator->getAnalyzers()) {
if (!mqttCheck.checkGroup(a->getName()))
validTopics = false;
if (a->getStreaming())
if (a->getStreaming()) {
for (const auto &u : a->getUnits())
for (const auto &o: u->getBaseOutputs())
if (!mqttCheck.checkTopic(o->getMqtt()) || !mqttCheck.checkName(o->getName()))
validTopics = false;
a->releaseUnits();
}
}
return validTopics;
}
......@@ -404,9 +408,10 @@ void AnalyticsManager::GET_analytics_sensors(endpointArgs) {
for (const auto& s : u->getBaseOutputs()) {
// Explicitly adding nodes to the ptree as to prevent BOOST from performing
// parsing on the node names
group.push_back(boost::property_tree::ptree::value_type(s->getName(), boost::property_tree::ptree(s->getMqtt())));
group.push_back(boost::property_tree::ptree::value_type("", boost::property_tree::ptree(s->getMqtt())));
}
}
a->releaseUnits();
sensors.add_child(a->getName(), group);
}
}
......@@ -418,9 +423,10 @@ void AnalyticsManager::GET_analytics_sensors(endpointArgs) {
found = true;
for (const auto& u : a->getUnits()) {
for (const auto& s : u->getBaseOutputs()) {
data << a->getName() << "." << s->getName() << " " << s->getMqtt() << "\n";
data << a->getName() << "::" << s->getMqtt() << "\n";
}
}
a->releaseUnits();
}
}
}
......@@ -461,8 +467,9 @@ void AnalyticsManager::GET_analytics_units(endpointArgs) {
found = true;
boost::property_tree::ptree group;
for (const auto& u : a->getUnits()) {
group.push_back(boost::property_tree::ptree::value_type(u->getName(), boost::property_tree::ptree()));
group.push_back(boost::property_tree::ptree::value_type("", boost::property_tree::ptree(u->getName())));
}
a->releaseUnits();
units.add_child(a->getName(), group);
}
root.add_child(p.id, units);
......@@ -472,8 +479,9 @@ void AnalyticsManager::GET_analytics_units(endpointArgs) {
if (a->getStreaming() && (analyzer == "" || analyzer == a->getName())) {
found = true;
for (const auto& u : a->getUnits()) {
data << a->getName() << "." << u->getName() << "\n";
data << a->getName() << "::" << u->getName() << "\n";
}
a->releaseUnits();
}
}
}
......
......@@ -158,7 +158,9 @@ file. The following is instead a list of configuration parameters that are avail
|:----- |:----------- |
| default | Name of the template that must be used to configure this analyzer.
| interval | Specifies how often the analyzer will be invoked to perform computations, and thus the sampling interval of its output sensors. Only used for analyzers in _streaming_ mode.
| relaxed | If set to _true_, the units of this analyzer will be instantiated even if some of the respective input sensors do not exist.
| delay | Delay in milliseconds to be applied to the start of the analyzer. This parameter only applies to streaming analyzers. It can be used to allow for input sensor caches to be populated before the analyzer is started.
| unitCacheLimit | Defines the maximum size of the unit cache that is used in the on-demand and job modes. Default is 1000.
| minValues | Minimum number of readings that need to be stored in output sensors before these are pushed as MQTT messages. Only used for analyzers in _streaming_ mode.
| mqttPart | Part of the MQTT topic associated to this analyzer. Only used when the Unit system is not employed (see this [section](#mqttTopics)).
| sync | If set to _true_, computation will be performed at time intervals synchronized with sensor readings.
......@@ -182,7 +184,7 @@ streaming true
average avg1 {
default def1
mqttPart FF0
mqttPart /avg1
input {
sensor col_user
......@@ -191,15 +193,15 @@ mqttPart FF0
output {
sensor sum {
mqttsuffix 76
mqttsuffix /sum
}
sensor max {
mqttsuffix 77
mqttsuffix /max
}
sensor avg {
mqttsuffix 78
mqttsuffix /avg
}
}
}
......@@ -224,7 +226,7 @@ streaming true
average avg1 {
default def1
mqttPart FF0
mqttPart /avg1
input {
sensor "<bottomup>col_user"
......@@ -233,15 +235,15 @@ mqttPart FF0
output {
sensor "<bottomup, filter cpu01>sum" {
mqttsuffix 76
mqttsuffix /sum
}
sensor "<bottomup, filter cpu01>max" {
mqttsuffix 77
mqttsuffix /max
}
sensor "<bottomup, filter cpu01>avg" {
mqttsuffix 78
mqttsuffix /avg
}
}
}
......@@ -315,6 +317,14 @@ the MQTT prefix, analyzer part and sensor suffix that are defined;
by concatenating the MQTT prefix associated to the unit (which is defined as _the portion of the MQTT topic shared by all sensors
belonging to such unit_) and the sensor suffix. The middle part of the topic is padded accordingly to ensure a fixed length.
#### Pipelining Analyzers <a name="pipelining"></a>
The inputs and outputs of streaming analyzers can be chained so as to form a processing pipeline. To enable this, users
need to configure analyzers by enabling the _relaxed_ configuration parameter, and by selecting as input the output sensors
of other analyzers. This is necessary as the analyzers are instantiated sequentially at startup, and
the framework cannot infer the correct order of initialization so as to resolve all dependencies transparently.
> NOTE &ensp;&ensp;&ensp;&ensp;&ensp; This feature is not supported when using analyzers in _on demand_ mode.
## Rest API <a name="restApi"></a>
DCDBAnalytics provides a REST API that can be used to perform various management operations on the framework. The
......
......@@ -41,29 +41,29 @@ void AggregatorAnalyzer::printConfig(LOG_LEVEL ll) {
AnalyzerTemplate<SensorBase>::printConfig(ll);
}
void AggregatorAnalyzer::compute(int unitID) {
void AggregatorAnalyzer::compute(U_Ptr unit) {
switch(_op) {
case SUM:
computeSum(unitID);
computeSum(unit);
break;
case AVG:
computeAvg(unitID);
computeAvg(unit);
break;
case MIN:
computeMin(unitID);
computeMin(unit);
break;
case MAX:
computeMax(unitID);
computeMax(unit);
break;
default:
break;
}
}
void AggregatorAnalyzer::computeSum(int unitID) {
void AggregatorAnalyzer::computeSum(U_Ptr unit) {
int64_t acc=0;
for(const auto& in : _units[unitID]->getInputs()) {
for(const auto& in : unit->getInputs()) {
// Getting the most recent values as specified in _window
_buffer = _queryEngine.querySensor(in->getName(), _window, 0, _buffer);
if(!_buffer || _buffer->empty()) {
......@@ -78,13 +78,13 @@ void AggregatorAnalyzer::computeSum(int unitID) {
reading_t out;
out.timestamp = getTimestamp();
out.value = acc;
_units[unitID]->getOutputs()[0]->storeReading(out);
unit->getOutputs()[0]->storeReading(out);
}
void AggregatorAnalyzer::computeAvg(int unitID) {
void AggregatorAnalyzer::computeAvg(U_Ptr unit) {
int64_t acc=0, ctr=0;
for(const auto& in : _units[unitID]->getInputs()) {
for(const auto& in : unit->getInputs()) {
// Getting the most recent values as specified in _window
_buffer = _queryEngine.querySensor(in->getName(), _window, 0, _buffer);
if(!_buffer || _buffer->empty()) {
......@@ -103,13 +103,13 @@ void AggregatorAnalyzer::computeAvg(int unitID) {
reading_t out;
out.timestamp = getTimestamp();
out.value = acc;
_units[unitID]->getOutputs()[0]->storeReading(out);
unit->getOutputs()[0]->storeReading(out);
}
void AggregatorAnalyzer::computeMax(int unitID) {
void AggregatorAnalyzer::computeMax(U_Ptr unit) {
int64_t acc=0;
for(const auto& in : _units[unitID]->getInputs()) {
for(const auto& in : unit->getInputs()) {
// Getting the most recent values as specified in _window
_buffer = _queryEngine.querySensor(in->getName(), _window, 0, _buffer);
if(!_buffer || _buffer->empty()) {
......@@ -125,14 +125,14 @@ void AggregatorAnalyzer::computeMax(int unitID) {
reading_t out;
out.timestamp = getTimestamp();
out.value = acc;
_units[unitID]->getOutputs()[0]->storeReading(out);
unit->getOutputs()[0]->storeReading(out);
}
void AggregatorAnalyzer::computeMin(int unitID) {
void AggregatorAnalyzer::computeMin(U_Ptr unit) {
int64_t acc=0;
bool minInit=false;
for(const auto& in : _units[unitID]->getInputs()) {
for(const auto& in : unit->getInputs()) {
// Getting the most recent values as specified in _window
_buffer = _queryEngine.querySensor(in->getName(), _window, 0, _buffer);
if(!_buffer || _buffer->empty()) {
......@@ -150,6 +150,6 @@ void AggregatorAnalyzer::computeMin(int unitID) {
reading_t out;
out.timestamp = getTimestamp();
out.value = acc;
_units[unitID]->getOutputs()[0]->storeReading(out);
unit->getOutputs()[0]->storeReading(out);
}
......@@ -47,13 +47,13 @@ public:
private:
void compute(int unitID) override;
void compute(U_Ptr unit) override;
// A separate method for each operation implies code redundancy, but also better efficiency and less useless
// variables used by specific operations lying around
void computeSum(int unitID);
void computeAvg(int unitID);
void computeMax(int unitID);
void computeMin(int unitID);
void computeSum(U_Ptr unit);
void computeAvg(U_Ptr unit);
void computeMax(U_Ptr unit);
void computeMin(U_Ptr unit);
vector<reading_t> *_buffer = NULL;
unsigned long long _window;
......
global {
mqttPrefix /FF112233445566778899AAB
mqttPrefix /test
}
template_aggregator def1 {
interval 1000
minValues 3
mqttPart FF0
duplicate false
streaming true
}
aggregator avg1 {
default def1
mqttPart FF0
window 2000
operation sum
......@@ -27,7 +25,7 @@ operation sum
output {
sensor "<bottomup, filter cpu250>sum" {
mqttsuffix 76
mqttsuffix /sum
}
}
......@@ -37,7 +35,6 @@ operation sum
aggregator avg2 {
default def1
interval 1500
mqttPart FF1
operation average
input {
......@@ -51,7 +48,7 @@ operation average
output {
sensor "<bottomup 1>avg" {
mqttsuffix 78
mqttsuffix /avg
}
}
......@@ -61,7 +58,7 @@ operation average
aggregator avg3 {
default def1
interval 1500
mqttPart FF2
mqttPart /mypart
operation maximum
input {
......@@ -73,7 +70,7 @@ operation maximum
output {
sensor "<bottomup 1>maxall" {
mqttsuffix 81
mqttsuffix /maxall
}
}
......
......@@ -73,10 +73,7 @@ protected:
const string OUTPUT_BLOCK = "output";
const string ALL_CLAUSE = "all";
const string ALL_REC_CLAUSE = "all-recursive";
const std::string SENSOR_PATTERN = "(?i)<sensor>";
const std::string GROUP_PATTERN = "(?i)<group>";
public:
/**
......@@ -88,7 +85,6 @@ public:
_baseName("INVALID"),
_cfgPath(""),
_mqttPrefix(""),
_sensorPattern(""),
_cacheInterval(900000) {}
/**
......@@ -127,7 +123,6 @@ public:
*/
virtual void setGlobalSettings(const pluginSettings_t& pluginSettings) final {
_mqttPrefix = pluginSettings.mqttPrefix;
_sensorPattern = pluginSettings.sensorPattern;
_cacheInterval = pluginSettings.cacheInterval;
}
......@@ -139,8 +134,7 @@ public:
void printConfig(LOG_LEVEL ll) final {
LOG_VAR(ll) << " General: ";
LOG_VAR(ll) << " MQTT-Prefix: " << (_mqttPrefix != "" ? _mqttPrefix : std::string("DEFAULT"));
LOG_VAR(ll) << " Sensor Pattern: " << (_sensorPattern != "" ? _sensorPattern : std::string("DEFAULT"));
LOG_VAR(ll) << " Cache interval: " << _cacheInterval << " ms";
LOG_VAR(ll) << " CacheInterval: " << _cacheInterval/1000 << " [s]";
//prints plugin specific configurator attributes and entities if present
printConfiguratorConfig(ll);
......@@ -204,6 +198,7 @@ public:
// If the analyzer must be duplicated for each compute unit, we copy-construct identical
// instances that have different unit IDs
unsigned numUnits = an->getUnits().size();
an->releaseUnits();
if(an->getDuplicate() && numUnits>1) {
for(unsigned int i=0; i < numUnits; i++) {
A_Ptr anCopy = std::make_shared<Analyzer>(*an);
......@@ -390,6 +385,10 @@ protected:
an.setDelayInterval(stoull(val.second.data()) / 1000);
} else if (boost::iequals(val.first, "duplicate")) {
an.setDuplicate(to_bool(val.second.data()));
} else if (boost::iequals(val.first, "relaxed")) {
an.setRelaxed(to_bool(val.second.data()));
} else if (boost::iequals(val.first, "unitCacheLimit")) {
an.setUnitCacheLimit(stoull(val.second.data()));
} else if (boost::iequals(val.first, "streaming")) {
an.setStreaming(to_bool(val.second.data()));
} else if (boost::iequals(val.first, INPUT_BLOCK) || boost::iequals(val.first, OUTPUT_BLOCK)) {
......@@ -416,51 +415,12 @@ protected:
}
}
}
// Reading all derived attributes, if any
analyzer(an, config);
// Instantiating units and returning the result
// Instantiating units
if(!an.getTemplate()) {
vector <shared_ptr<UnitTemplate<SBase>>> *units = NULL;
try {
units = _unitGen.generateUnits(protoInputs, protoOutputs, inputMode, _mqttPrefix + an.getMqttPart(), !an.getStreaming());
}
catch (const std::exception &e) {
LOG(error) << _analyzerName << " " << an.getName() << ": Error when creating units: " << e.what();
delete units;
return false;
}
for (auto &u: *units) {
if(!constructSensorNames(*u, an)) {
an.clearUnits();
delete units;
return false;
}
if (an.getStreaming()) {
if (!unit(*u)) {
LOG(error) << " Unit " << u->getName() << " did not pass the final check!";
an.clearUnits();
delete units;
return false;
} else {
LOG(debug) << " Unit " << u->getName() << " generated.";
an.addUnit(u);
}
} else {
if (unit(*u)) {
an.addToOndemandCache(u);
LOG(debug) << " Template unit for on-demand operation " + u->getName() + " generated.";
} else {
LOG(error) << " Template unit " << u->getName() << " did not pass the final check!";
an.clearUnits();
delete units;
return false;
}
}
}
delete units;
return readUnits(an, protoInputs, protoOutputs, inputMode);
} else {
// If the analyzer is a template, we add it to the related map
auto ret = _templateAnalyzers.insert(std::pair<std::string, Analyzer*>(an.getName(), &an));
......@@ -470,6 +430,7 @@ protected:
}
_templateProtoInputs.insert(std::pair<std::string, std::vector<shared_ptr<SBase>>>(an.getName(), protoInputs));
}
return true;
}
......@@ -526,6 +487,65 @@ protected:
return true;
}
/**
* @brief Instantiates all necessary units for a single analyzer
*
* This method will create and assign all unit objects for a single analyzer, given a set
* of prototype input sensors, prototype output sensors and an input mode. This method is
* virtual such as to allow for flexibility in case specific analyzers require different
* assignment policies (such as job analyzers).
*
* @param an The analyzer whose units must be created
* @param protoInputs The vector of prototype input sensors
* @param protoOutputs The vector of prototype output sensors
* @param inputMode Input mode to be used (selective, all or all_recursive)
* @return True if successful, false otherwise
*/
virtual bool readUnits(Analyzer& an, std::vector<shared_ptr<SBase>>& protoInputs, std::vector<shared_ptr<SBase>>& protoOutputs, inputMode_t inputMode) {
vector <shared_ptr<UnitTemplate<SBase>>> *units = NULL;
try {
units = _unitGen.generateUnits(protoInputs, protoOutputs, inputMode,
MQTTChecker::formatTopic(_mqttPrefix) + MQTTChecker::formatTopic(an.getMqttPart()),
!an.getStreaming(), an.getRelaxed());
}
catch (const std::exception &e) {
LOG(error) << _analyzerName << " " << an.getName() << ": Error when creating units: " << e.what();
delete units;
return false;
}
for (auto &u: *units) {
if(!constructSensorTopics(*u, an)) {
an.clearUnits();
delete units;
return false;
}
if (an.getStreaming()) {
if (!unit(*u)) {
LOG(error) << " Unit " << u->getName() << " did not pass the final check!";
an.clearUnits();
delete units;
return false;
} else {
LOG(debug) << " Unit " << u->getName() << " generated.";
an.addUnit(u);
}
} else {
if (unit(*u)) {
an.addToUnitCache(u);
LOG(debug) << " Template unit for on-demand operation " + u->getName() + " generated.";
} else {
LOG(error) << " Template unit " << u->getName() << " did not pass the final check!";
an.clearUnits();
delete units;
return false;
}
}
}
delete units;
return true;
}
/**
* @brief Reads the global configuration block
......@@ -551,9 +571,7 @@ protected:
_cacheInterval = stoul(global.second.data());
LOG(debug) << " Using own caching interval " << _cacheInterval << " [s]";
_cacheInterval *= 1000;
} else if (boost::iequals(global.first, "sensorpattern")) {
_sensorPattern = global.second.data();
}
}
}
global(config.get_child("global"));
}
......@@ -561,39 +579,17 @@ protected:
}
/**
* @brief Adjusts the names of the sensors
* @brief Adjusts the topics and names of the sensors
*
* Names are modified according to the sensorPattern specified in the global
* settings. Operates in tandem with the auto-publish feature.
* Names are set according to the corresponding topic.
*
* @return true if successful, false otherwise
*/
//TODO: switch to textual MQTT topics and use only MQTTPrefix
bool constructSensorNames(UnitTemplate<SBase>& u, Analyzer& an) {
boost::regex sensorReg(SENSOR_PATTERN), groupReg(GROUP_PATTERN);
boost::cmatch match;
//TODO: move sensorpattern checks somewhere else
if(_sensorPattern == "")
return true;
else if (!boost::regex_search(_sensorPattern.c_str(), match, sensorReg)) {
LOG(error) << "Invalid sensor naming pattern " << _sensorPattern << ". You must at least include <sensor>!";
return false;
}
bool constructSensorTopics(UnitTemplate<SBase>& u, Analyzer& an) {
std::string name;
// Performing name construction
for(auto& s: u.getOutputs()) {
name = s->getName();
// If the unit is related to a system component all of its sensors will have their names adjusted already
// If it is root, then we apply normal auto-publish. This means that adding the analyzer's name
// or plugin ID do not always work, as of now
if( u.getName() == SensorNavigator::rootKey) {
name = _sensorPattern;
name = boost::regex_replace(name, sensorReg, s->getName());
name = boost::regex_replace(name, groupReg, an.getName());
}
s->setName(name);
}
for(auto& s: u.getOutputs())
s->setName(s->getMqtt());
return true;
}
......@@ -611,8 +607,6 @@ protected:
std::string _cfgPath;
// Default MQTT prefix to be used when creating output sensors
std::string _mqttPrefix;
// String used to construct sensor names
std::string _sensorPattern;
// Interval in seconds for the cache of each sensor
unsigned int _cacheInterval;
// The vector of analyzers, in the form of pointers to AnalyzerInterface objects
......
......@@ -67,14 +67,17 @@ public:
_name(name),
_mqttPart(""),
_isTemplate(false),
_relaxed(false),
_duplicate(false),
_streaming(true),
_sync(true),
_dynamic(false),
_unitID(-1),
_keepRunning(0),
_minValues(1),
_interval(1000),
_cacheInterval(900000),
_unitCacheLimit(1000),
_cacheSize(1),
_delayInterval(0),
_pendingTasks(0),
......@@ -88,14 +91,17 @@ public:
_name(other._name),
_mqttPart(other._mqttPart),
_isTemplate(other._isTemplate),
_relaxed(other._relaxed),
_duplicate(other._duplicate),
_streaming(other._streaming),
_sync(other._sync),