16.12.2021, 9:00 - 11:00: Due to updates GitLab may be unavailable for some minutes between 09:00 and 11:00.

Commit 135a79f7 authored by Alessio Netti's avatar Alessio Netti
Browse files

Analytics: dynamic loading and unloading of plugins

- Analytics plugins can be now loaded and unloaded dynamically
- DCDBPusher now looks into the current configuration directory if no
path is supplied when loading/reloading a plugin
parent 9af179f3
......@@ -48,6 +48,8 @@ bool AnalyticsManager::load(const string& path, const string& globalFile, const
if (_configPath != "" && _configPath[_configPath.length()-1] != '/')
_configPath.append("/");
else if(_configPath.empty())
_configPath = "./";
try {
boost::property_tree::read_info(_configPath + globalFile, cfg);
......@@ -66,92 +68,130 @@ bool AnalyticsManager::load(const string& path, const string& globalFile, const
BOOST_FOREACH(boost::property_tree::iptree::value_type &plugin, cfg.get_child("analyzerPlugins")) {
if (boost::iequals(plugin.first, "analyzerPlugin")) {
if (!plugin.second.empty()) {
LOG(info) << "Loading analyzer plugin \"" << plugin.second.data() << "\"...";
std::string pluginConfig; //path to config file for plugin
std::string pluginLib = "libdcdbanalyzer_" + plugin.second.data();
#if __APPLE__
pluginLib+= ".dylib";
#else
pluginLib+= ".so";
#endif
string pluginConfig="", pluginPath="";
BOOST_FOREACH(boost::property_tree::iptree::value_type &val, plugin.second) {
if (boost::iequals(val.first, "path")) {
std::string iPath = val.second.data();
// If path not specified we will look up in the default lib-directories (usr/lib and friends)
if (iPath != "") {
if (iPath[iPath.length()-1] != '/')
iPath.append("/");
pluginLib = iPath + pluginLib;
}
pluginPath = val.second.data();
} else if (boost::iequals(val.first, "config")) {
pluginConfig = val.second.data();
// If config-path not specified we will look for pluginName.conf in the global conf directory
if (pluginConfig == "")
pluginConfig = _configPath + plugin.second.data() + ".conf";
} else {
LOG(warning) << " Value \"" << val.first << "\" not recognized. Omitting";
}
}
// Open dl-code based on http://tldp.org/HOWTO/C++-dlopen/thesolution.html
if (FILE *file = fopen(pluginConfig.c_str(), "r")) {
fclose(file);
an_dl_t dynLib;
dynLib.id = plugin.second.data();
dynLib.DL = NULL;
dynLib.configurator = NULL;
// If plugin.conf exists, open libdcdbanalyzer_pluginName.so and read config
LOG(info) << pluginConfig << " found";
dynLib.DL = dlopen(pluginLib.c_str(), RTLD_NOW);
if(!dynLib.DL) {
LOG(error) << "Cannot load " << dynLib.id << "-library: " << dlerror();
return false;
}
dlerror();
// Set dynLib an_dl_t struct, load create and destroy symbols
dynLib.create = (an_create_t*) dlsym(dynLib.DL, "create");
const char* dlsym_error = dlerror();
if (dlsym_error) {
LOG(error) << "Cannot load symbol create for " << dynLib.id << ": " << dlsym_error;
return false;
}
if(!loadPlugin(plugin.second.data(), pluginPath, pluginConfig))
return false;
}
}
}
_status = LOADED;
return true;
}
dynLib.destroy = (an_destroy_t*) dlsym(dynLib.DL, "destroy");
dlsym_error = dlerror();
if (dlsym_error) {
LOG(error) << "Cannot load symbol destroy for " << dynLib.id << ": " << dlsym_error;
return false;
}
bool AnalyticsManager::loadPlugin(const string& name, const string& pluginPath, const string& config) {
LOG(info) << "Loading analyzer plugin \"" << name << "\"...";
std::string pluginConfig; //path to config file for plugin
std::string pluginLib = "libdcdbanalyzer_" + name;
#if __APPLE__
pluginLib+= ".dylib";
#else
pluginLib+= ".so";
#endif
std::string iPath = pluginPath;
// If path not specified we will look up in the default lib-directories (usr/lib and friends)
if (iPath != "") {
if (iPath[iPath.length()-1] != '/')
iPath.append("/");
pluginLib = iPath + pluginLib;
}
pluginConfig = config;
// If config-path not specified we will look for pluginName.conf in the global conf directory
if (pluginConfig == "")
pluginConfig = _configPath + name + ".conf";
// Open dl-code based on http://tldp.org/HOWTO/C++-dlopen/thesolution.html
if (FILE *file = fopen(pluginConfig.c_str(), "r")) {
fclose(file);
an_dl_t dynLib;
dynLib.id = name;
dynLib.DL = NULL;
dynLib.configurator = NULL;
// If plugin.conf exists, open libdcdbanalyzer_pluginName.so and read config
LOG(info) << pluginConfig << " found";
dynLib.DL = dlopen(pluginLib.c_str(), RTLD_NOW);
if(!dynLib.DL) {
LOG(error) << "Cannot load " << dynLib.id << "-library: " << dlerror();
return false;
}
dlerror();
// Set dynLib an_dl_t struct, load create and destroy symbols
dynLib.create = (an_create_t*) dlsym(dynLib.DL, "create");
const char* dlsym_error = dlerror();
if (dlsym_error) {
LOG(error) << "Cannot load symbol create for " << dynLib.id << ": " << dlsym_error;
return false;
}
dynLib.configurator = dynLib.create();
dynLib.configurator->setGlobalSettings(_pluginSettings);
// Read the analyzer plugin configuration
if (!(dynLib.configurator->readConfig(pluginConfig))) {
LOG(error) << "Plugin \"" << dynLib.id << "\" could not read configuration!";
return false;
}
dynLib.destroy = (an_destroy_t*) dlsym(dynLib.DL, "destroy");
dlsym_error = dlerror();
if (dlsym_error) {
LOG(error) << "Cannot load symbol destroy for " << dynLib.id << ": " << dlsym_error;
return false;
}
// Returning an empty vector may indicate problems with the config file
if(dynLib.configurator->getAnalyzers().size() == 0) {
LOG(warning) << "Plugin \"" << dynLib.id << "\" created no analyzers!";
} else if(!checkTopics(dynLib)) {
LOG(error) << "Problematic MQTT topics or sensor names, please check your config files!";
return false;
}
//save dl-struct
_plugins.push_back(dynLib);
LOG(info) << "Plugin \"" << dynLib.id << "\" loaded!";
} else {
LOG(info) << pluginConfig << " not found. Omitting";
}
}
dynLib.configurator = dynLib.create();
dynLib.configurator->setGlobalSettings(_pluginSettings);
// Read the analyzer plugin configuration
if (!(dynLib.configurator->readConfig(pluginConfig))) {
LOG(error) << "Plugin \"" << dynLib.id << "\" could not read configuration!";
return false;
}
// Returning an empty vector may indicate problems with the config file
if(dynLib.configurator->getAnalyzers().size() == 0) {
LOG(warning) << "Plugin \"" << dynLib.id << "\" created no analyzers!";
} else if(!checkTopics(dynLib)) {
LOG(error) << "Problematic MQTT topics or sensor names, please check your config files!";
return false;
}
//save dl-struct
_plugins.push_back(dynLib);
LOG(info) << "Plugin \"" << dynLib.id << "\" loaded!";
} else {
LOG(info) << pluginConfig << " not found. Omitting";
return false;
}
_status = LOADED;
return true;
}
void AnalyticsManager::unloadPlugin(const string& id) {
for (auto it = _plugins.begin(); it != _plugins.end(); ++it) {
if(it->id == id || id == "") {
for (const auto& a : it->configurator->getAnalyzers())
a->stop();
removeTopics(*it);
if (it->configurator)
it->destroy(it->configurator);
if (it->DL)
dlclose(it->DL);
if (id != "") {
_plugins.erase(it);
return;
}
}
}
if (id == "")
_plugins.clear();
}
bool AnalyticsManager::init(boost::asio::io_service& io, const string& plugin) {
if(_status != LOADED) {
LOG(error) << "Cannot init, AnalyticsManager is not loaded!";
......@@ -280,7 +320,6 @@ void AnalyticsManager::addRestEndpoints(RESTHttpsServer* restServer) {
restServer->addEndpoint("/analytics/start", {http::verb::put, stdBind(PUT_analytics_start)});
restServer->addEndpoint("/analytics/stop", {http::verb::put, stdBind(PUT_analytics_stop)});
restServer->addEndpoint("/analytics/reload", {http::verb::put, stdBind(PUT_analytics_reload)});
restServer->addEndpoint("/analytics/compute", {http::verb::put, stdBind(PUT_analytics_compute)});
restServer->addEndpoint("/analytics/analyzer", {http::verb::put, stdBind(PUT_analytics_analyzer)});
}
......@@ -498,36 +537,6 @@ void AnalyticsManager::PUT_analytics_stop(endpointArgs) {
}
}
void AnalyticsManager::PUT_analytics_reload(endpointArgs) {
if (!managerLoaded(res)) {
return;
}
/*
* This endpoint must either be overwritten (by adding a custom
* "analyzer/reload" endpoint) or must not be used. A reload requires
* an external io_service object and can therefore not be conducted by the
* AnalyticsManager itself.
*/
res.body() = "Sorry! It seems like this endpoint was not properly implemented.\n";
res.result(http::status::not_implemented);
/*
const std::string plugin = getQuery("plugin", queries);
if (!reload(_io, plugin)) {
res.body() = "Plugin not found or reload failed, please check the config files and MQTT topics!\n";
res.result(http::status::not_found);
} else if (!start(plugin)){
res.body() = "Plugin cannot be restarted!\n";
res.result(http::status::internal_server_error);
} else {
res.body() = "Plugin " + plugin + ": Sensors reloaded";
res.result(http::status::ok);
}
*/
}
void AnalyticsManager::PUT_analytics_compute(endpointArgs) {
if (!managerLoaded(res)) {
return;
......
......@@ -137,6 +137,23 @@ public:
*/
bool stop(const string& plugin="", const string& analyzer="");
/**
* @brief Loads a plugin dynamically
*
* @param name Name of the plugin
* @param pluginPath Path to the plugin library
* @param config Path to the configuration file
* @return True if successful, false otherwise
*/
bool loadPlugin(const string& name, const string& pluginPath, const string& config);
/**
* @brief Unloads a currently loaded plugin
*
* @param id Name of the plugin to be unloaded
*/
void unloadPlugin(const string& id);
/**
* @brief Get the vector of currently loaded plugins
*
......@@ -182,6 +199,11 @@ public:
" Stop all or only a specific analytics plugin or\n"
" stop only a specific analyzer within a plugin.\n"
" /reload?[plugin] Reload all or only a specific analytics plugin.\n"
" /load?plugin;[path];[config]\n"
" Load a new plugin. Optionally specify path to the\n"
" shared library and/or the config file for the \n"
" plugin.\n"
" /unload?plugin Unload a plugin.\n"
" /compute?plugin;analyzer;[unit];[json]\n"
" Query the specified analyzer for a unit. Default\n"
" unit is the root.\n"
......
......@@ -24,8 +24,9 @@ CARestAPI::CARestAPI(serverSettings_t settings,
_analyticsController->getManager()->addRestEndpoints(this);
//Overwrite analytic's reload endpoint because we need to stop analyticscontroller beforehand
addEndpoint("/analytics/reload", {http::verb::put, stdBind(PUT_analytics_reload)});
addEndpoint("/analytics/load", {http::verb::put, stdBind(PUT_analytics_load)});
addEndpoint("/analytics/unload", {http::verb::put, stdBind(PUT_analytics_unload)});
}
void CARestAPI::GET_help(endpointArgs) {
......@@ -58,9 +59,7 @@ void CARestAPI::GET_average(endpointArgs) {
//try getting the latest value
try {
//TODO: switch from SID input to sensor name input
int64_t val = _sensorCache->getSensor(sensor, (uint64_t) time * 1000000000);
res.body() = "collectagent::" + sensor + " Average of last " +
std::to_string(time) + " seconds is " + std::to_string(val);
res.result(http::status::ok);
......@@ -106,3 +105,45 @@ void CARestAPI::PUT_analytics_reload(endpointArgs) {
_analyticsController->resume();
}
void CARestAPI::PUT_analytics_load(endpointArgs) {
const std::string plugin = getQuery("plugin", queries);
const std::string path = getQuery("path", queries);
const std::string config = getQuery("config", queries);
if (!hasPlugin(plugin, res)) {
return;
}
// Wait until controller is paused in order to reload plugins
_analyticsController->halt(true);
if(_analyticsController->getManager()->loadPlugin(plugin, path, config)) {
res.body() = "Analytics plugin " + plugin + " successfully loaded!\n";
res.result(http::status::ok);
_analyticsController->getManager()->init(_analyticsController->getIoService(), plugin);
} else {
res.body() = "Failed to load analytics plugin " + plugin + "!\n";
res.result(http::status::internal_server_error);
}
_analyticsController->resume();
}
void CARestAPI::PUT_analytics_unload(endpointArgs) {
const std::string plugin = getQuery("plugin", queries);
if (!hasPlugin(plugin, res)) {
return;
}
// Wait until controller is paused in order to reload plugins
_analyticsController->halt(true);
_analyticsController->getManager()->unloadPlugin(plugin);
res.body() = "Analytics plugin " + plugin + " unloaded.\n";
res.result(http::status::ok);
_analyticsController->resume();
}
......@@ -78,6 +78,39 @@ private:
*/
void PUT_analytics_reload(endpointArgs);
/**
* PUT "/analytics/load"
*
* @brief Load and initialize an analytics plugin but do not start it yet.
*
* Queries | key | possible values | explanation
* -------------------------------------------------------------------------
* Required | plugin | name of the new | specify the plugin
* | | plugin |
* Optional | path | file path | specify a file path where to
* | | | search for the shared lib.
* | | | Defaults to (usr/lib etc.)
* | config | file path + name | specify the config file for
* | | | the plugin. Defaults to
* | | | ./PLUGINNAME.conf
*
*/
void PUT_analytics_load(endpointArgs);
/**
* PUT "/analytics/unload"
*
* @brief Unload a specific analytics plugin.
*
*
* Queries | key | possible values | explanation
* -------------------------------------------------------------------------
* Required | - | - | -
* Optional | plugin | all analyzer plugin | unload only the specified
* | | names | plugin
*/
void PUT_analytics_unload(endpointArgs);
SensorCache* _sensorCache;
AnalyticsController* _analyticsController;
};
......
......@@ -64,7 +64,7 @@ public:
_readingQueue(nullptr),
_sinkFile(nullptr) {}
virtual ~SensorBase() {}
virtual ~SensorBase() { if(_sinkFile) _sinkFile->close(); }
SensorBase& operator=(const SensorBase& other) {
_name = other._name;
......
......@@ -44,6 +44,7 @@ bool Configuration::readPlugins(PluginManager& pluginManager) {
return false;
}
pluginManager.setCfgFilePath(_cfgFilePath);
//read plugins
BOOST_FOREACH(boost::property_tree::iptree::value_type &plugin, cfg.get_child("plugins")) {
if (boost::iequals(plugin.first, "plugin")) {
......@@ -59,10 +60,6 @@ bool Configuration::readPlugins(PluginManager& pluginManager) {
pluginPath = val.second.data();
} else if (boost::iequals(val.first, "config")) {
pluginConfig = val.second.data();
//if config-path not specified we will look for pluginName.conf in the dcdbpusher.conf directory
if (pluginConfig == "") {
pluginConfig = _cfgFilePath + plugin.second.data() + ".conf";
}
} else {
LOG(warning) << " Value \"" << val.first << "\" not recognized. Omitting";
}
......
......@@ -14,7 +14,8 @@
using namespace std;
PluginManager::PluginManager(const pluginSettings_t& pluginSettings) :
_pluginSettings(pluginSettings) {}
_pluginSettings(pluginSettings),
_cfgFilePath("./") {}
PluginManager::~PluginManager() {
unloadPlugin();
......@@ -42,9 +43,9 @@ bool PluginManager::loadPlugin(const string& name,
pluginLib = pluginPath + pluginLib;
}
// build plugin config path
//if config-path not specified we will look for pluginName.conf in the dcdbpusher.conf directory
if (config == "") {
pluginConfig = "./" + name + ".conf";
pluginConfig = _cfgFilePath + name + ".conf";
} else {
pluginConfig = config;
}
......
......@@ -156,6 +156,13 @@ public:
pusherPluginStorage_t& getPlugins() {
return _plugins;
}
/**
* @brief Sets the internal path to be used when loading plugins by default
*
* @param p The string to be used as path
*/
void setCfgFilePath(const std::string& p) { _cfgFilePath = p; }
private:
......@@ -180,6 +187,7 @@ private:
pusherPluginStorage_t _plugins; /**< Storage to hold all loaded plugins */
pluginSettings_t _pluginSettings; /**< Default plugin settings to use when
loading a new plugin */
std::string _cfgFilePath;
logger_t lg;
};
......
......@@ -41,8 +41,9 @@ RestAPI::RestAPI(serverSettings_t settings,
_manager->addRestEndpoints(this);
//Overwrite analytic's reload endpoint because we need to stop MQTTPusher beforehand
addEndpoint("/analytics/reload", {http::verb::put, stdBind(PUT_analytics_reload)});
addEndpoint("/analytics/load", {http::verb::put, stdBind(PUT_analytics_load)});
addEndpoint("/analytics/unload", {http::verb::put, stdBind(PUT_analytics_unload)});
}
void RestAPI::GET_help(endpointArgs) {
......@@ -198,7 +199,7 @@ void RestAPI::PUT_load(endpointArgs) {
//before modifying the plugin we need to ensure that we have exclusive access
//therefore pause the only other concurrent user (MQTTPusher)
if (!_mqttPusher->halt()) {
res.body() = "Could not reload plugin (Timeout while waiting).\n";
res.body() = "Could not load plugin (Timeout while waiting).\n";
res.result(http::status::internal_server_error);
return;
}
......@@ -215,22 +216,7 @@ void RestAPI::PUT_load(endpointArgs) {
//continue MQTTPusher
_mqttPusher->cont();
//Updating the SensorNavigator on plugin reloads, if analytics plugins are currently running
if (_manager!=NULL && _manager->getPlugins().size()>0) {
QueryEngine &qEngine = QueryEngine::getInstance();
std::shared_ptr <SensorNavigator> navigator = std::make_shared<SensorNavigator>();
std::vector<std::string> names, topics;
for (const auto &p : _pluginManager->getPlugins())
for (const auto &g : p.configurator->getSensorGroups())
for (const auto &s : g->getSensors()) {
names.push_back(s->getName());
topics.push_back(s->getMqtt());
}
navigator->buildTree(qEngine.getSensorHierarchy(), &names, &topics);
qEngine.setNavigator(navigator);
qEngine.triggerUpdate();
}
reloadNavigator();
}
void RestAPI::PUT_unload(endpointArgs) {
......@@ -240,10 +226,19 @@ void RestAPI::PUT_unload(endpointArgs) {
return;
}
if (!_mqttPusher->halt()) {
res.body() = "Could not unload plugin (Timeout while waiting).\n";
res.result(http::status::internal_server_error);
return;
}
_pluginManager->unloadPlugin(plugin);
res.body() = "Plugin " + plugin + " unloaded.\n";
res.result(http::status::ok);
return;
//continue MQTTPusher
_mqttPusher->cont();
reloadNavigator();
}
void RestAPI::PUT_start(endpointArgs) {
......@@ -302,22 +297,7 @@ void RestAPI::PUT_reload(endpointArgs) {
//continue MQTTPusher
_mqttPusher->cont();
//Updating the SensorNavigator on plugin reloads, if analytics plugins are currently running
if (_manager!=NULL && _manager->getPlugins().size()>0) {
QueryEngine &qEngine = QueryEngine::getInstance();
std::shared_ptr <SensorNavigator> navigator = std::make_shared<SensorNavigator>();
std::vector<std::string> names, topics;
for (const auto &p : _pluginManager->getPlugins())
for (const auto &g : p.configurator->getSensorGroups())
for (const auto &s : g->getSensors()) {
names.push_back(s->getName());
topics.push_back(s->getMqtt());
}
navigator->buildTree(qEngine.getSensorHierarchy(), &names, &topics);
qEngine.setNavigator(navigator);
qEngine.triggerUpdate();
}
reloadNavigator();
}
void RestAPI::PUT_analytics_reload(endpointArgs) {
......@@ -347,3 +327,73 @@ void RestAPI::PUT_analytics_reload(endpointArgs) {
res.result(http::status::internal_server_error);
}
}
void RestAPI::PUT_analytics_load(endpointArgs) {
const std::string plugin = getQuery("plugin", queries);
const std::string path = getQuery("path", queries);
const std::string config = getQuery("config", queries);
if (!hasPlugin(plugin, res)) {
return;
}
//before modifying the plugin we need to ensure that we have exclusive access
//therefore pause the only other concurrent user (MQTTPusher)
if (!_mqttPusher->halt()) {
res.body() = "Could not load analytics plugin (Timeout while waiting).\n";
res.result(http::status::internal_server_error);
return;
}
if(_manager->loadPlugin(plugin, path, config)) {
res.body() = "Analytics plugin " + plugin + " successfully loaded!\n";
res.result(http::status::ok);
_manager->init(_io, plugin);
} else {
res.body() = "Failed to load analytics plugin " + plugin + "!\n";
res.result(http::status::internal_server_error);
}
//continue MQTTPusher
_mqttPusher->cont();
}
void RestAPI::PUT_analytics_unload(endpointArgs) {
const std::string plugin = getQuery("plugin", queries);
if (!hasPlugin(plugin, res)) {
return;
}
if (!_mqttPusher->halt()) {
res.body() = "Could not unload analytics plugin (Timeout while waiting).\n";