Commit 94cf7193 authored by Alessio Netti's avatar Alessio Netti
Browse files

Changes and Fixes

- Unit instantiation logic moved to UnitGenerator
- Regex-based filtering system to configure units was added
- Thread safety in QueryEngine access addressed
- Tons of bugfixes
parent c6498a8b
......@@ -2,7 +2,7 @@ global {
mqttPrefix /FF112233445566778899AAB
}
template_analyzer def1 {
template_average def1 {
interval 1000
minValues 3
mqttPart FF0
......@@ -10,27 +10,93 @@ duplicate false
streaming true
}
analyzer average1 {
average avg1 {
default def1
mqttPart FF0
mqttStart 00
input {
sensor <unit>col_user
sensor "<unit>col_user"
sensor <unit-1>MemFree
sensor "<unit-1>MemFree"
}
output {
sensor <unit>sum {
mqttsuffix 36
sensor "<unit, filter cpu250>sum" {
mqttsuffix 76
}
sensor <unit>max {
mqttsuffix 37
sensor "<unit, filter cpu250>max" {
mqttsuffix 77
}
sensor "<unit, filter cpu250>avg" {
mqttsuffix 78
}
}
}
average avg2 {
default def1
interval 1500
mqttPart FF1
mqttStart 00
input {
sensor "<unit>col_user"
sensor "<unit - 1>MemFree"
}
output {
sensor "<unit - 1>sum" {
mqttsuffix 76
}
sensor "<unit - 1>max" {
mqttsuffix 77
}
sensor "<unit - 1>avg" {
mqttsuffix 78
}
}
}
average avg3 {
default def1
interval 1500
mqttPart FF2
mqttStart 00
input {
all-recursive
}
output {
sensor "<unit - 1>sumall" {
mqttsuffix 80
}
sensor "<unit - 1>maxall" {
mqttsuffix 81
}
sensor "<unit - 1>avgall" {
mqttsuffix 82
}
}
......
......@@ -343,8 +343,7 @@ bool Configuration::checkMqtt(const std::string& mqtt) {
return false;
}
//TODO: fix bug here
auto returnIt = _mqttTopics.insert(mqtt);
auto returnIt = _mqttTopics.insert(str);
if (!returnIt.second) {
LOG(error) << "MQTT-Topic \"" << mqtt << "\" used twice!";
return false;
......
......@@ -316,7 +316,7 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
}
navigator->buildTree(qEngine.getSensorHierarchy(), &names, &topics);
qEngine.setNavigator(navigator);
qEngine.updated = true;
qEngine.triggerUpdate();
}
......
......@@ -8,6 +8,7 @@ void AnalyticsManager::clear() {
for(const auto& p : _plugins)
p.destroy(p.configurator);
_plugins.clear();
_status = CLEAR;
}
bool AnalyticsManager::load(const string& path, const string& globalFile, const pluginSettings_t& pluginSettings) {
......@@ -108,10 +109,16 @@ bool AnalyticsManager::load(const string& path, const string& globalFile, const
}
}
}
_status = LOADED;
return true;
}
bool AnalyticsManager::mqttCheck(pluginVector_t& pushers) {
if(_status != LOADED) {
LOG(error) << "Cannot perform MQTT check, AnalyticsManager is not loaded!";
return false;
}
std::set<std::string> _mqttTopics;
// Initializing set with topics from pusher sensors
......@@ -144,6 +151,10 @@ bool AnalyticsManager::mqttCheck(pluginVector_t& pushers) {
}
bool AnalyticsManager::init(boost::asio::io_service& io, const string& plugin) {
if(_status != LOADED) {
LOG(error) << "Cannot init, AnalyticsManager is not loaded!";
return false;
}
for (const auto &p : _plugins)
//Actions always affect either one or all plugins, and always all analyzers within said plugin
if(plugin=="" || plugin==p.id) {
......@@ -155,6 +166,10 @@ bool AnalyticsManager::init(boost::asio::io_service& io, const string& plugin) {
}
bool AnalyticsManager::reload(boost::asio::io_service& io, const string& plugin) {
if(_status != LOADED) {
LOG(error) << "Cannot reload, AnalyticsManager is not loaded!";
return false;
}
for (const auto &p : _plugins)
if(plugin=="" || plugin==p.id) {
LOG(info) << "Reload \"" << p.id << "\" data analytics plugin";
......@@ -167,6 +182,10 @@ bool AnalyticsManager::reload(boost::asio::io_service& io, const string& plugin)
}
bool AnalyticsManager::start(const string& plugin) {
if(_status != LOADED) {
LOG(error) << "Cannot start, AnalyticsManager is not loaded!";
return false;
}
for (const auto &p : _plugins)
if(plugin=="" || plugin==p.id) {
LOG(info) << "Start \"" << p.id << "\" data analytics plugin";
......@@ -177,6 +196,10 @@ bool AnalyticsManager::start(const string& plugin) {
}
bool AnalyticsManager::stop(const string& plugin) {
if(_status != LOADED) {
LOG(error) << "Cannot stop, AnalyticsManager is not loaded!";
return false;
}
for (const auto &p : _plugins)
if(plugin=="" || plugin==p.id) {
LOG(info) << "Stop \"" << p.id << "\" data analytics plugin";
......@@ -187,6 +210,10 @@ bool AnalyticsManager::stop(const string& plugin) {
}
string AnalyticsManager::forwardREST(const string& command) {
if(_status != LOADED) {
LOG(error) << "Cannot forward REST command, AnalyticsManager is not loaded!";
return "";
}
//TODO: implement REST interface integration
return "";
}
......@@ -29,7 +29,6 @@ typedef struct {
typedef std::vector<an_dl_t> an_pluginVector_t;
//TODO: manage states (maybe?)
/**
* Management class for the entire data analytics framework
*
......@@ -39,10 +38,12 @@ class AnalyticsManager {
public:
enum managerState_t { CLEAR = 1, LOADED = 2};
/**
* @brief Class constructor
*/
AnalyticsManager() {}
AnalyticsManager() { _status = CLEAR; }
/**
* @brief Class destructor
......@@ -158,6 +159,8 @@ protected:
string _configPath;
// Structure containing global plugin settings
pluginSettings_t _pluginSettings;
// Keeps track of the manager's state
managerState_t _status;
//Logger object
boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
......
......@@ -27,33 +27,6 @@ bool SensorNavigator::sensorExists(const string& node) {
return _sensorTree && _sensorTree->count(node) && isSensorNode(node);
}
int SensorNavigator::parseNodeLevelString(const string& s) {
if(!_sensorTree)
throw runtime_error("SensorNavigator: sensor tree not initialized!");
if(boost::regex_search(s.c_str(), _match, _nodeRx)) {
string token = _match.str(0);
int lv = !boost::regex_search(s.c_str(), _match, _numRx) ? _treeDepth : _treeDepth - (int)std::stoi(_match.str(0));
return lv<-1 ? -1 : lv;
}
else
return -1;
}
set<string> *SensorNavigator::resolveNodeLevelString(const string& s, const string& node) {
int level = parseNodeLevelString(s);
set<string> *sensors = new set<string>();
if( level <= -1 )
sensors->insert(s);
else {
set<string> *nodes = navigate(node, level - getNodeDepth(node));
for(const auto& n : *nodes)
sensors->insert(boost::regex_replace(s, _nodeRx, n));
delete nodes;
}
return sensors;
}
string SensorNavigator::buildTopicForNode(const string& node, const string& suffix, int len) {
if(!_sensorTree || !_sensorTree->count(node) || isSensorNode(node))
throw domain_error("SensorNavigator: node not found in tree!");
......
......@@ -59,6 +59,11 @@ public:
clearTree();
}
/**
* @brief Returns true if a sensor tree has been generated, false otherwise
**/
bool treeExists() { return _sensorTree; }
/**
* @brief Creates a sensor tree.
*
......@@ -109,7 +114,7 @@ public:
/**
* Clears the internal sensor tree (if any) and releases all related memory
* @brief Clears the internal sensor tree (if any) and releases all related memory
**/
void clearTree();
......@@ -262,35 +267,6 @@ public:
*/
set<string> *navigate(const string& node="root", int direction=1);
/**
* @brief Parses a string encoding a tree level
*
* This method serves to parse strings that are used to express hierarchy levels in config files
* of the data analytics framework. These strings are in the format "<node-X>.*", and signify
* "sensors that are in nodes X levels up from the deepest level in the sensor tree". As such,
* the method returns the depth level of sensors represented by the input string. Note that
* "<node+X>.*" is not supported, because the system relates to the deepest level of the current
* sensor tree.
*
* @param s String to be parsed
* @return Absolute depth level in the tree that is encoded in the string
*/
int parseNodeLevelString(const string& s);
/**
* @brief Resolves a string encoding a tree level starting from a given node
*
* This method takes as input strings in the format specified for parseNodeLevelString(). It then
* takes as input also the name of a node in the sensor tree. The method will then return the set
* of sensors expressed by "s", that belong to nodes encoded in its hierarchy level and that are
* related to "node", either as ancestors or descendants.
*
* @param s String to be parsed
* @param node Name of the target node
* @return Set of sensors encoded in "s" that are associated with "node"
*/
set<string> *resolveNodeLevelString(const string& s, const string& node);
/**
* @brief Builds a MQTT topic for a new sensors associated with a given node
*
......@@ -327,10 +303,6 @@ protected:
vector<boost::regex> *_hierarchy;
boost::cmatch _match;
//Regular expressions used in the parseNodeLevelString and resolveNodeLevelString methods
const boost::regex _nodeRx = boost::regex("<[ \\t]*unit[ \\t]*(-[ \\t]*[0-9]+[ \\t]*)?>");
const boost::regex _numRx = boost::regex("[0-9]+");
};
#endif //PROJECT_SENSORNAVIGATOR_H
......@@ -12,7 +12,7 @@ AverageAnalyzer::~AverageAnalyzer() {
}
void AverageAnalyzer::compute(int unitID) {
int max=0, sum=0;
unsigned long long max=0, sum=0, avg=0;
for(const auto& in : _units[unitID]->getInputs()) {
// Getting only the most recent value
......@@ -36,22 +36,7 @@ void AverageAnalyzer::compute(int unitID) {
out.value = max;
_units[unitID]->getOutputs()[1]->storeReading(out);
}
void AverageAnalyzer::start() {
if(_keepRunning) {
LOG(info) << "Analyzer " << _name << " already running.";
return;
}
out.value = sum > 0 ? sum / _units[unitID]->getInputs().size() : 0;
_units[unitID]->getOutputs()[2]->storeReading(out);
for(const auto& u : _units)
if(u->getOutputs().size() > _outputs) {
LOG(error) << "Analyzer" << _name << "Supports only 2 outputs per-unit!";
return;
}
_keepRunning = 1;
_pendingTasks++;
_timer->async_wait(std::bind(&AverageAnalyzer::computeAsync, this));
LOG(info) << "Analyzer " << _name << " started.";
}
\ No newline at end of file
}
......@@ -13,13 +13,10 @@ public:
AverageAnalyzer(const std::string& name);
virtual ~AverageAnalyzer();
void start() override;
private:
void compute(int unitID) override;
unsigned _outputs = 2;
vector<reading_t> *_buffer = NULL;
};
......
......@@ -6,7 +6,7 @@
AverageConfigurator::AverageConfigurator() : AnalyzerConfiguratorTemplate() {
_analyzerName = "average";
_baseName = "sensor";
_baseName = "sensor";
}
AverageConfigurator::~AverageConfigurator() {}
......@@ -19,6 +19,11 @@ void AverageConfigurator::analyzer(AverageAnalyzer& a, CFG_VAL config) {
}
void AverageConfigurator::unit(UnitTemplate<SensorBase>& u) {
bool AverageConfigurator::unit(UnitTemplate<SensorBase>& u) {
if(u.getOutputs().size() != _outputs) {
LOG(error) << "AverageAnalyzer Supports only 2 outputs per unit!";
return false;
}
else
return true;
}
......@@ -18,7 +18,9 @@ private:
void sensorBase(SensorBase& s, CFG_VAL config) override;
void analyzer(AverageAnalyzer& a, CFG_VAL config) override;
void unit(UnitTemplate<SensorBase>& u) override;
bool unit(UnitTemplate<SensorBase>& u) override;
const unsigned _outputs = 3;
};
extern "C" AnalyzerConfiguratorInterface* create() {
......
......@@ -13,6 +13,7 @@
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/info_parser.hpp>
#include "AnalyzerTemplate.h"
#include "UnitGenerator.h"
#include "AnalyzerConfiguratorInterface.h"
#include "../../includes/SensorBase.h"
......@@ -52,7 +53,6 @@ protected:
const string ALL_CLAUSE = "all";
const string ALL_REC_CLAUSE = "all-recursive";
enum inputMode_t { SELECTIVE = 1, ALL = 2, ALL_RECURSIVE = 3 };
public:
......@@ -114,7 +114,7 @@ public:
*/
bool readConfig(std::string cfgPath) {
_cfgPath = cfgPath;
_navigator = _queryEngine.getNavigator();
_unitGen.setNavigator(_queryEngine.getNavigator());
boost::property_tree::iptree cfg;
boost::property_tree::read_info(cfgPath, cfg);
......@@ -237,8 +237,9 @@ protected:
* Pure virtual interface method, responsible for performing user-specified checks on units.
*
* @param u The unit that has been created
* @return True if the unit is valid, False otherwise
*/
virtual void unit(UnitTemplate<SBase>& u) = 0;
virtual bool unit(UnitTemplate<SBase>& u) = 0;
/**
* @brief Reads additional global attributes on top of the default ones
......@@ -335,139 +336,35 @@ protected:
// Reading all derived attributes, if any
analyzer(an, config);
// Instantiating units and returning the result
return an.getTemplate() ? true : readUnits(an, protoInputs, protoOutputs, inputMode);
}
/**
* @brief Computes and instantiates units associated to the input analyzer
*
* Non-virtual interface method for class-internal use only. This will compute the list of
* units that must be instantiated, starting from the outputs of the analyzer. The inputs
* for each unit are then computed, and the units finalized.
*
* @param an The analyzer for which units must be computed
* @param inputs The vector of "prototype" sensor objects for inputs
* @param outputs The vector of "prototype" sensor objects for outputs
* @param inputMode Defines the method with which input sensors are instantiated for each unit
* @return True if successful, false otherwise
*/
bool readUnits(Analyzer& an, vector<SBase>& inputs, vector<SBase>& outputs, inputMode_t inputMode) {
// If no outputs are defined, no units can be instantiated
if((inputs.size()==0 && inputMode==SELECTIVE) || outputs.size() == 0)
return false;
// We iterate over the outputs, and compute their depth level in the current sensor tree. From such depth level,
// the list of units is defined, consisting in all nodes in the sensor tree at the level of the outputs
int unitLevel = _navigator->parseNodeLevelString(outputs[0].getName());
for(const auto& out : outputs)
if(unitLevel != _navigator->parseNodeLevelString(out.getName())) {
LOG(error) << _analyzerName << " " << an.getName() << " has incoherent output levels! Omitting";
return false;
}
std::set<std::string>* units = NULL;
if(unitLevel > -1)
units = _navigator->getNodes(unitLevel, false);
// If no depth level was found (output sensor names do not contain any <unit-X> keyword) we assume that
// everything relates to root
else if(unitLevel == -1) {
units = new std::set<std::string>();
units->insert("root");
}
if(!units) {
LOG(error) << _analyzerName << " " << an.getName() << " has a invalid output level! Omitting";
return false;
}
// We iterate over the units, and resolve their inputs and outputs starting from the prototype definitions
std::vector< std::shared_ptr<SBase> > unitInputs, unitOutputs;
std::set<std::string>* sensors;
for(const auto& u : *units) {
LOG(debug) << " Unit \"" << u << "\"";
unitInputs.clear();
unitOutputs.clear();
if(inputMode == SELECTIVE)
// Mapping inputs
for(const auto& in : inputs) {
// Depending on the relationship of an input prototype sensor to the output level, it could be
// mapped to one sensor or more: for example, if output has level <unit-1>, and an input sensor
// has level <unit-2>, than the input will be unique, and the sensor associated to the father of the
// unit. If the other way around, the input will consist of multiple sensors, one for each child of
// the unit
sensors = _navigator->resolveNodeLevelString(in.getName(), u);
if( sensors->empty() ) {
LOG(error) << _analyzerName << " " << an.getName() << " has " << in.getName()
<< " sensor which cannot be resolved! Omitting";
delete units;
delete sensors;
return false;
} else
for(const auto& s : *sensors) {
SBase uIn(in);
uIn.setName(s);
if (!_navigator->sensorExists(uIn.getName())) {
LOG(error) << _analyzerName << " " << an.getName() << " has invalid " << uIn.getName()
<< " sensor! Omitting";
delete units;
delete sensors;
return false;
}
LOG(debug) << " Input " << uIn.getName();
unitInputs.push_back(std::make_shared<SBase>(uIn));
}
delete sensors;
}
// If no input sensors were specified, we pick all sensors related to the specific unit
// This means that when output unit is "root" (or not specified) we get only sensors at the highest level
else {
sensors = _navigator->getSensors(u, inputMode == ALL_RECURSIVE);
for(const auto& s : *sensors) {
SBase uIn(s);
unitInputs.push_back(std::make_shared<SBase>(uIn));
}
delete sensors;
}
// Mapping outputs
for(const auto& out : outputs) {
SBase uOut(out);
sensors = _navigator->resolveNodeLevelString(uOut.getName(), u);
uOut.setName(*sensors->begin());
delete sensors;
// If we are instantiating output sensors by unit, we generate mqtt topics by using the prefix
// associated to the respective node in the sensor tree, and the sensor suffix itself
if(u != "root") {
try {
uOut.setMqtt(_navigator->buildTopicForNode(u, uOut.getMqtt()));
} catch(const std::exception& e) {
LOG(error) << _analyzerName << " " << an.getName() << " has invalid MQTT topic for output sensor "
<< uOut.getName() << "!";
delete units;
return false; }
// Duplicating the file sink adding the name of each unit to the path
std::string sPath = uOut.getSinkPath();
if(sPath != "") {
size_t idx = sPath.find_last_of("/\\");
if( idx != std::string::npos)
uOut.setSinkPath(sPath.substr(0, idx+1) + u + "_" + sPath.substr(idx+1, std::string::npos));
else
uOut.setSinkPath(u + "_" + sPath);
}
}
// If we are not using units (only unit is root, out of the hierarchy) we build sensors like in samplers
// Instantiating units and returning the result
if(!an.getTemplate()) {
vector< shared_ptr<UnitTemplate<SBase>>> *units=NULL;
try {
units = _unitGen.generateUnits(protoInputs, protoOutputs, inputMode, _mqttPrefix + an.getMqttPart()); }
catch(const std::exception& e) {
LOG(error) << _analyzerName << " " << an.getName() << ": Error when creating units: " << e.what();
delete units;
return false; }
for(auto& u: *units) {
LOG(debug) << " Unit \"" << u->getName() << "\"";
for(const auto& i : u->getInputs())
LOG(debug) << " Input " << i->getName();
for(const auto& o : u->getOutputs())
LOG(debug) << " Output " << o->getName() << " using MQTT-topic \"" << o->getMqtt() << "\"";
if(!unit(*u)) {
LOG(error) << " Unit " << u->getName() << " did not pass the final check!";
an.clearUnits();
delete units;