Commit a1fc4e26 authored by Alessio Netti's avatar Alessio Netti

WIP: Data Analytics Framework

- Basic framework structure to perform streaming data analytics
on dcdbpusher
- Code currently compiles, but is not tested and integration with
dcdbpusher is still in progress
- Do not attempt to use this code yet :)
parent 5c2c9a4b
//
// Created by Netti, Alessio on 10.12.18.
//
#include "AnalyticsManager.h"
void AnalyticsManager::clear() {
for(const auto& p : _plugins)
p.destroy(p.configurator);
_plugins.clear();
}
bool AnalyticsManager::load(const string& path, const string& globalFile, const pluginSettings_t& pluginSettings) {
//The load code is pretty much the same as in Configuration.cpp to load pusher plugins
_configPath = path;
_pluginSettings = pluginSettings;
boost::property_tree::iptree cfg;
try {
boost::property_tree::read_info(path + globalFile, cfg);
} catch (boost::property_tree::info_parser_error& e) {
LOG(error) << "Error when reading analyzer plugins from global.conf: " << e.what();
return false;
}
BOOST_FOREACH(boost::property_tree::iptree::value_type &plugin, cfg.get_child("analyzerPlugins")) {
if (boost::iequals(plugin.first, "analyzerPlugin")) {
if (!plugin.second.empty()) {
LOG(info) << "Loading analyzer plugin \"" << plugin.second.data() << "\"...";
std::string pluginConfig; //path to config file for plugin
std::string pluginLib = "libdcdbanalyzer_" + plugin.second.data();
#if __APPLE__
pluginLib+= ".dylib";
#else
pluginLib+= ".so";
#endif
BOOST_FOREACH(boost::property_tree::iptree::value_type &val, plugin.second) {
if (boost::iequals(val.first, "path")) {
std::string path = val.second.data();
// If path not specified we will look up in the default lib-directories (usr/lib and friends)
if (path != "") {
if (path[path.length()-1] != '/')
path.append("/");
pluginLib = path + pluginLib;
}
} else if (boost::iequals(val.first, "config")) {
pluginConfig = val.second.data();
// If config-path not specified we will look for pluginName.conf in the global.conf directory
if (pluginConfig == "")
pluginConfig = _configPath + plugin.second.data() + ".conf";
} else {
LOG(warning) << " Value \"" << val.first << "\" not recognized. Omitting";
}
}
// Open dl-code based on http://tldp.org/HOWTO/C++-dlopen/thesolution.html
if (FILE *file = fopen(pluginConfig.c_str(), "r")) {
fclose(file);
an_dl_t dynLib;
dynLib.id = plugin.second.data();
dynLib.DL = NULL;
dynLib.configurator = NULL;
// If plugin.conf exists, open libdcdbanalyzer_pluginName.so and read config
LOG(info) << pluginConfig << " found";
dynLib.DL = dlopen(pluginLib.c_str(), RTLD_NOW);
if(!dynLib.DL) {
LOG(error) << "Cannot load " << dynLib.id << "-library: " << dlerror();
return false;
}
dlerror();
// Set dynLib an_dl_t struct, load create and destroy symbols
dynLib.create = (an_create_t*) dlsym(dynLib.DL, "create");
const char* dlsym_error = dlerror();
if (dlsym_error) {
LOG(error) << "Cannot load symbol create for " << dynLib.id << ": " << dlsym_error;
return false;
}
dynLib.destroy = (an_destroy_t*) dlsym(dynLib.DL, "destroy");
dlsym_error = dlerror();
if (dlsym_error) {
LOG(error) << "Cannot load symbol destroy for " << dynLib.id << ": " << dlsym_error;
return false;
}
dynLib.configurator = dynLib.create();
dynLib.configurator->setGlobalSettings(_pluginSettings);
// Read the analyzer plugin configuration
if (!(dynLib.configurator->readConfig(pluginConfig))) {
LOG(error) << "Plugin \"" << dynLib.id << "\" could not read configuration!";
return false;
}
// Returning an empty vector may indicate problems with the config file
if(dynLib.configurator->getAnalyzers().size() == 0) {
LOG(warning) << "Plugin \"" << dynLib.id << "\" created no analyzers!";
}
//check if an MQTT-suffix was assigned twice
for(const auto& a : dynLib.configurator->getAnalyzers()) {
for(const auto& u : a->getUnits())
for(const auto& o: u->getBaseOutputs()) {
//TODO: make MQTT topic check generic
bool ok = true; //checkMqtt(o->getMqtt());
if(!ok) {
LOG(error) << "Problematic MQTT-Topics! Please check your config files";
return false;
}
}
}
//save dl-struct
_plugins.push_back(dynLib);
LOG(info) << "Plugin \"" << dynLib.id << "\" loaded!";
} else {
LOG(info) << pluginConfig << " not found. Omitting";
}
}
}
}
return true;
}
bool AnalyticsManager::init(boost::asio::io_service& io, const string& plugin) {
for (const auto &p : _plugins)
//Actions always affect either one or all plugins, and always all analyzers within said plugin
if(plugin=="" || plugin==p.id)
for (const auto &a : p.configurator->getAnalyzers())
a->init(io);
return true;
}
bool AnalyticsManager::reload(boost::asio::io_service& io, const string& plugin) {
for (const auto &p : _plugins)
if(plugin=="" || plugin==p.id) {
if( !p.configurator->reReadConfig() )
return false;
for (const auto &a : p.configurator->getAnalyzers())
a->init(io);
}
return true;
}
bool AnalyticsManager::start(const string& plugin) {
for (const auto &p : _plugins)
if(plugin=="" || plugin==p.id)
for (const auto &a : p.configurator->getAnalyzers())
a->start();
return true;
}
bool AnalyticsManager::stop(const string& plugin) {
for (const auto &p : _plugins)
if(plugin=="" || plugin==p.id)
for (const auto &a : p.configurator->getAnalyzers())
a->stop();
return true;
}
string AnalyticsManager::forwardREST(const string& command) {
//TODO: implement REST interface integration
return "";
}
//
// Created by Netti, Alessio on 10.12.18.
//
#ifndef PROJECT_ANALYTICSMANAGER_H
#define PROJECT_ANALYTICSMANAGER_H
#include <boost/foreach.hpp>
#include <boost/property_tree/info_parser.hpp>
#include <boost/algorithm/string.hpp>
#include <boost/asio.hpp>
#include <dlfcn.h>
#include "includes/UnitInterface.h"
#include "includes/AnalyzerConfiguratorInterface.h"
#include "../includes/Logging.h"
using namespace std;
// Struct of values required to load analyzer dynamic libraries.
typedef struct {
std::string id;
void* DL;
AnalyzerConfiguratorInterface* configurator;
an_create_t* create;
an_destroy_t* destroy;
} an_dl_t;
//TODO: manage states
//TODO: manage interaction with auto-publish
/**
* Management class for the entire data analytics framework
*
* Allows to load, configure, start and generally manage plugins developed with the data analytics framework.
*/
class AnalyticsManager {
public:
/**
* @brief Class constructor
*/
AnalyticsManager() {}
/**
* @brief Class destructor
*/
~AnalyticsManager() { clear(); }
/**
* @brief Resets the state of the data analytics framework
*
* All currently running analyzers will be stopped, and the related objects destroyed.
*/
void clear();
/**
* @brief Loads plugins as specified in the input config file
*
* This method will load .dll libraries containing data analytics plugins. It will then
* configure them, according to the respective configuration files, and store the generated
* AnalyzerInterface objects, ready to be started.
*
* @param path Path to the global and plugin configuration files
* @param globalFile Name of the global file (usually global.conf or collectagent.conf)
* @param pluginSettings Structure containing global plugin settings
* @return true if successful, false otherwise
*/
bool load(const string& path, const string& globalFile, const pluginSettings_t& pluginSettings);
/**
* @brief Initialize one or more stored plugins
*
* This method must be called after "load", and before "start". It will prepare analyzers for
* operation, and initialize the related sensors and caches.
*
* @param io Boost ASIO service to be used
* @param plugin Name of the plugin on which the action must be performed. If none, all plugins will be affected
* @return true if successful, false otherwise
*/
bool init(boost::asio::io_service& io, const string& plugin="");
/**
* @brief Reload one or more plugins
*
* This method will cause all running analyzers of a plugin to be stopped and destroyed. The
* configuration file is then read once again, and new analyzers are created and initialized.
*
* @param io Boost ASIO service to be used
* @param plugin Name of the plugin on which the action must be performed. If none, all plugins will be affected
* @return true if successful, false otherwise
*/
bool reload(boost::asio::io_service& io, const string& plugin="");
/**
* @brief Start one or more stored plugins
*
* This method must be called after "init". It will start up all analyzers stored in a plugin,
* which will then perform computation autonomously according to their sampling rates.
*
* @param plugin Name of the plugin on which the action must be performed. If none, all plugins will be affected
* @return true if successful, false otherwise
*/
bool start(const string& plugin="");
/**
* @brief Stops one or more stored plugins
*
* This method must be called after "start". It will stop all analyzers of a plugin that are
* currently running, and halt their computation. The analyzers can be re-started again later
* with the "start" method.
*
* @param plugin Name of the plugin on which the action must be performed. If none, all plugins will be affected
* @return true if successful, false otherwise
*/
bool stop(const string& plugin="");
/**
* @brief Supply a REST command to the manager
*
* Commands must be plugin-specific. Those will be forwarded to said plugins, if of GET type, and
* the result will be collected. If PUT type, one or more actions will be performed on the plugin
* (e.g. start or stop).
*
* @param command REST command to be forwarded
* @return Result as a plain string
*/
string forwardREST(const string& command);
/**
* @brief Get the vector of currently loaded plugins
*
* The vector can then be used to access single analyzers within plugins, the related units, and
* finally all output sensors.
*
* @return Vector of plugins represented as an_dl_t structures
*/
const std::vector<an_dl_t>& getPlugins() { return _plugins; }
protected:
// Vector of plugins represented as an_dl_t structures
std::vector<an_dl_t> _plugins;
// Path used to load config files
string _configPath;
// Structure containing global plugin settings
pluginSettings_t _pluginSettings;
//Logger object
boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
};
#endif //PROJECT_ANALYTICSMANAGER_H
//
// Created by Netti, Alessio on 11.12.18.
//
#include "SensorNavigator.h"
void SensorNavigator::clearTree() {
if(_sensorTree) {
_sensorTree->clear();
delete _sensorTree;
_sensorTree = NULL;
}
if(_hierarchy) {
_hierarchy->clear();
delete _hierarchy;
_hierarchy = NULL;
}
_treeDepth = -1;
_usingTopics = false;
}
bool SensorNavigator::nodeExists(const string& node) {
return _sensorTree && _sensorTree->count(node) && !isSensorNode(node);
}
bool SensorNavigator::sensorExists(const string& node) {
return _sensorTree && _sensorTree->count(node) && isSensorNode(node);
}
int SensorNavigator::parseNodeLevelString(const string& s) {
if(!_sensorTree)
throw runtime_error("SensorNavigator: sensor tree not initialized!");
if(boost::regex_search(s.c_str(), _match, _nodeRx)) {
string token = _match.str(0);
int lv = !boost::regex_search(s.c_str(), _match, _numRx) ? _treeDepth : _treeDepth - (int)std::stoi(_match.str(0));
return lv<-1 ? -1 : lv;
}
else
return -1;
}
set<string> *SensorNavigator::resolveNodeLevelString(const string& s, const string& node) {
int level = parseNodeLevelString(s);
set<string> *sensors = new set<string>();
if( level <= -1 )
sensors->insert(s);
else {
set<string> *nodes = navigate(node, getNodeDepth(node) - level);
for(const auto& n : *nodes)
sensors->insert(boost::regex_replace(s, _nodeRx, n));
delete nodes;
}
return sensors;
}
string SensorNavigator::buildTopicForNode(const string& node, const string& suffix, int len) {
if(!_sensorTree || !_sensorTree->count(node) || isSensorNode(node))
throw domain_error("SensorNavigator: node not found in tree!");
string nodePrefix = getNodeTopic(node);
if(nodePrefix.length() + suffix.length() > len)
throw invalid_argument("SensorNavigator: cannot build topic, too many characters!");
return nodePrefix + string(len - nodePrefix.length() - suffix.length(), 'F') + suffix;
}
bool SensorNavigator::isSensorNode(const string& node) {
if(!_sensorTree || !_sensorTree->count(node))
throw domain_error("SensorNavigator: node not found in tree!");
return _isSensorNode(_sensorTree->at(node));
}
int SensorNavigator::getNodeDepth(const string& node) {
if(!_sensorTree || !_sensorTree->count(node))
throw domain_error("SensorNavigator: node not found in tree!");
return _sensorTree->at(node).depth;
}
string SensorNavigator::getNodeTopic(const string& node) {
if(!_sensorTree || !_sensorTree->count(node))
throw domain_error("SensorNavigator: node not found in tree!");
return _usingTopics ? _sensorTree->at(node).topic : node;
}
void SensorNavigator::buildTree(const string& hierarchy, const vector<string> *sensors, const vector<string> *topics, const string& delimiter) {
if( hierarchy == "" )
throw invalid_argument("SensorNavigator: cannot build sensor hierarchy!");
vector<string> hierarchyVec;
string hBuf = hierarchy;
size_t pos;
while( !hBuf.empty() ) {
pos = hBuf.find(delimiter);
hierarchyVec.push_back(hBuf.substr(0, pos));
hBuf.erase(0, pos + delimiter.length());
}
buildTree(&hierarchyVec, sensors, topics);
}
void SensorNavigator::buildTree(const vector<string> *hierarchy, const vector<string> *sensors, const vector<string> *topics) {
if(sensors->size() > 0 && hierarchy->size() > 0)
clearTree();
else
throw invalid_argument("SensorNavigator: cannot build sensor hierarchy!");
_hierarchy = new vector<boost::regex>();
string s="";
//Each level in the hierarchy includes the regular expressions at the upper levels
for(const auto& l: *hierarchy) {
s += l;
_hierarchy->push_back(boost::regex(s));
}
_usingTopics = topics != NULL;
_sensorTree = new map<string, Node>();
//We initialize the sensor tree by pushing the root supernode
Node rootNode = {-1, set<string>(), set<string>(), "", ""};
_sensorTree->insert(make_pair(rootKey, rootNode));
//We iterate over all sensor names that were supplied and build up the tree
for(int i=0; i<sensors->size(); i++)
_addSensor(sensors->at(i), topics ? topics->at(i) : "");
}
void SensorNavigator::buildCassandraTree(const map<string, vector<string> > *table, const string& root, const string& ignore) {
if(table->size() > 0 && table->count(root))
clearTree();
else
throw invalid_argument("SensorNavigator: cannot build sensor hierarchy!");
_hierarchy = NULL;
_usingTopics = false;
_sensorTree = new map<string, Node>();
boost::regex ignoreReg(ignore);
//We initialize the sensor tree by pushing the root supernode
Node rootNode = {-1, set<string>(), set<string>(), "", ""};
_sensorTree->insert(make_pair(rootKey, rootNode));
//We manually add the root's children to the tree, so as to map the Cassandra root key to the one we use
for(const auto& s : table->at(root)) {
if(!boost::regex_search(s.c_str(), _match, ignoreReg)) {
Node newNode = {0, set<string>(), set<string>(), rootKey, ""};
_sensorTree->insert(make_pair(s, newNode));
if (table->count(s)) {
_sensorTree->at(rootKey).children.insert(s);
_addCassandraSensor(s, table, 1, ignoreReg);
} else {
//If the entry is a sensor it will share the same depth as its father, hence the -1 decrease
_sensorTree->at(s).depth -= 1;
_sensorTree->at(rootKey).sensors.insert(s);
}
}
}
}
map<string, SensorNavigator::Node> *SensorNavigator::getSubTree(const string& node, int depth) {
if(!_sensorTree || !_sensorTree->count(node))
throw domain_error("SensorNavigator: node not found in tree!");
if( depth < -1 )
throw out_of_range("SensorNavigator: depth not valid for subtree query!");
//We start by picking the start node, then we call the recursive routine
map<string, SensorNavigator::Node> *m = new map<string, Node>();
m->insert(make_pair(node,_sensorTree->at(node)));
_getSubTree(node, m, depth);
return m;
}
set<string> *SensorNavigator::getNodes(int depth, bool recursive) {
if( depth < -1 || depth > _treeDepth )
throw out_of_range("SensorNavigator: depth not valid for node query!");
if(!_sensorTree)
throw runtime_error("SensorNavigator: sensor tree not initialized!");
//We pick all nodes in the tree whose depth is consistent with the input
//Iterating over the node map is sufficient for this purpose
set<string> *vec = new set<string>();
for(const auto& n : *_sensorTree)
if(!_isSensorNode(n.second) && (n.second.depth == depth || (recursive && n.second.depth >= depth)))
vec->insert(n.first);
return vec;
}
set<string> *SensorNavigator::getNodes(const string& node, bool recursive) {
if(!_sensorTree || !_sensorTree->count(node))
throw domain_error("SensorNavigator: node not found in tree!");
if(isSensorNode(node))
throw invalid_argument("SensorNavigator: input must be a node, not a sensor!");
//We start by picking all children of the start node, then we proceed in the subtree (if recursive)
set<string> *vec = new set<string>();
_getNodes(node, vec, recursive ? 0 : 1);
return vec;
}
set<string> *SensorNavigator::getSensors(int depth, bool recursive) {
if( depth < -1 || depth > _treeDepth )
throw out_of_range("SensorNavigator: depth not valid for sensor query!");
if(!_sensorTree)
throw runtime_error("SensorNavigator: sensor tree not initialized!");
//Like in getNodes, we iterate over the node map and pick all sensors that are at least at the given input depth
set<string> *vec = new set<string>();
for(const auto& n : *_sensorTree)
if(!_isSensorNode(n.second) && (n.second.depth == depth || (recursive && n.second.depth >= depth)))
vec->insert(n.second.sensors.begin(), n.second.sensors.end());
return vec;
}
set<string> *SensorNavigator::getSensors(const string& node, bool recursive) {
if(!_sensorTree || !_sensorTree->count(node))
throw domain_error("SensorNavigator: node not found in tree!");
if(isSensorNode(node))
throw invalid_argument("SensorNavigator: input must be a node, not a sensor!");
//We start by picking all sensors of the start node, then we call the recursive routine
set<string> *vec = new set<string>();
_getSensors(node, vec, recursive ? 0 : 1);
return vec;
}
set<string> *SensorNavigator::navigate(const string& node, int direction) {
if( direction==0 )
throw invalid_argument("SensorNavigator: direction not valid for node navigation query!");
if(!_sensorTree || !_sensorTree->count(node))
throw domain_error("SensorNavigator: node not found in tree!");
set<string> *vec = new set<string>();
//if direction is negative, we go up in the tree
if(direction < 0) {
int ctr = -direction;
string currNode = node;
//We go up the sensor tree in iterative fashion
while (ctr-- > 1 && currNode != rootKey) {
currNode = _sensorTree->at(currNode).parent;
}
vec->insert(_sensorTree->at(currNode).parent);
}
//Else, we go down to the children
else {
_getNodes(node, vec, direction);
}
return vec;
}
void SensorNavigator::_getNodes(const string& node, set<string> *vec, int depth) {
if(!_sensorTree || !_sensorTree->count(node))
return;
if(depth<=1)
vec->insert(_sensorTree->at(node).children.begin(), _sensorTree->at(node).children.end());
//We iterate over all children of the input node, and call the recursive routine over them
//depth=1 implies that we are at the last level of the search; in this case, we do not continue
//an input value of depth<1 implies that the search must proceed for the entire subtree
if(depth!=1)
for(const auto& n : _sensorTree->at(node).children)
_getNodes(n, vec, depth-1);
}
void SensorNavigator::_getSensors(const string& node, set<string> *vec, int depth) {
if(!_sensorTree || !_sensorTree->count(node))
return;
if(depth<=1)
vec->insert(_sensorTree->at(node).sensors.begin(), _sensorTree->at(node).sensors.end());
//We iterate over all children of the input node, and call the recursive routine over them