Commit a5292b06 authored by Alessio Netti's avatar Alessio Netti
Browse files

Merge remote-tracking branch 'remotes/origin/master' into mqttTopics

parents 6516c02f 5468b5b4
......@@ -2,6 +2,9 @@
deps/
install/
## Ignore DCDB generated documentation
doc/html
## Ignore object files and other binaries
*.o
*.so
......
......@@ -61,7 +61,7 @@ PUBHEADERS = pusherpqueue.h dcdbdaemon.h
FULL_CC = $(shell which $(CC))
FULL_CXX = $(shell which $(CXX))
.PHONY : info all clean cleanall distclean check-cross-compile deps depsinstall $(SUB_DIRS)
.PHONY : info all clean cleanall cleandoc distclean check-cross-compile deps depsinstall $(SUB_DIRS) doc
info:
@echo ""
......@@ -97,8 +97,12 @@ clean:
cleandeps:
@$(foreach f,$(DISTFILESPATHS),echo "Cleaning $(f)..." && rm -rf $(DCDBDEPSPATH)/$(f) && echo;)
@$(foreach f,cpp-netlib,echo "Cleaning $(f)..." && rm -rf $(DCDBDEPSPATH)/$(f)_build && echo;)
cleandoc:
@echo "Cleaning doc/html..."
@rm -rf doc/html
cleanall: clean cleandeps
cleanall: clean cleandeps cleandoc
distclean: clean
@echo "Wiping dependencies..."
......@@ -110,10 +114,15 @@ mrproper: distclean
@echo ""
@echo "Wiping installation directory..."
@rm -rf $(DCDBDEPLOYPATH)
doc:
@echo "Generating doxygen HTML documentation..."
@cd doc/ && doxygen Doxyfile
@echo "Generated docs into doc/html"
all: check-cross-compile deps $(foreach s,$(SUB_DIRS),$(s)-build)
all: check-cross-compile deps $(foreach s,$(SUB_DIRS),$(s)-build) doc
install: depsinstall $(SUB_DIRS)
install: depsinstall $(SUB_DIRS) doc
@cd common/include && install $(PUBHEADERS) $(DCDBDEPLOYPATH)/include/dcdb && cd ..
@echo DONE
......
......@@ -48,6 +48,8 @@ bool AnalyticsManager::load(const string& path, const string& globalFile, const
if (_configPath != "" && _configPath[_configPath.length()-1] != '/')
_configPath.append("/");
else if(_configPath.empty())
_configPath = "./";
try {
boost::property_tree::read_info(_configPath + globalFile, cfg);
......@@ -66,92 +68,130 @@ bool AnalyticsManager::load(const string& path, const string& globalFile, const
BOOST_FOREACH(boost::property_tree::iptree::value_type &plugin, cfg.get_child("analyzerPlugins")) {
if (boost::iequals(plugin.first, "analyzerPlugin")) {
if (!plugin.second.empty()) {
LOG(info) << "Loading analyzer plugin \"" << plugin.second.data() << "\"...";
std::string pluginConfig; //path to config file for plugin
std::string pluginLib = "libdcdbanalyzer_" + plugin.second.data();
#if __APPLE__
pluginLib+= ".dylib";
#else
pluginLib+= ".so";
#endif
string pluginConfig="", pluginPath="";
BOOST_FOREACH(boost::property_tree::iptree::value_type &val, plugin.second) {
if (boost::iequals(val.first, "path")) {
std::string iPath = val.second.data();
// If path not specified we will look up in the default lib-directories (usr/lib and friends)
if (iPath != "") {
if (iPath[iPath.length()-1] != '/')
iPath.append("/");
pluginLib = iPath + pluginLib;
}
pluginPath = val.second.data();
} else if (boost::iequals(val.first, "config")) {
pluginConfig = val.second.data();
// If config-path not specified we will look for pluginName.conf in the global conf directory
if (pluginConfig == "")
pluginConfig = _configPath + plugin.second.data() + ".conf";
} else {
LOG(warning) << " Value \"" << val.first << "\" not recognized. Omitting";
}
}
// Open dl-code based on http://tldp.org/HOWTO/C++-dlopen/thesolution.html
if (FILE *file = fopen(pluginConfig.c_str(), "r")) {
fclose(file);
an_dl_t dynLib;
dynLib.id = plugin.second.data();
dynLib.DL = NULL;
dynLib.configurator = NULL;
// If plugin.conf exists, open libdcdbanalyzer_pluginName.so and read config
LOG(info) << pluginConfig << " found";
dynLib.DL = dlopen(pluginLib.c_str(), RTLD_NOW);
if(!dynLib.DL) {
LOG(error) << "Cannot load " << dynLib.id << "-library: " << dlerror();
return false;
}
dlerror();
// Set dynLib an_dl_t struct, load create and destroy symbols
dynLib.create = (an_create_t*) dlsym(dynLib.DL, "create");
const char* dlsym_error = dlerror();
if (dlsym_error) {
LOG(error) << "Cannot load symbol create for " << dynLib.id << ": " << dlsym_error;
return false;
}
if(!loadPlugin(plugin.second.data(), pluginPath, pluginConfig))
return false;
}
}
}
_status = LOADED;
return true;
}
dynLib.destroy = (an_destroy_t*) dlsym(dynLib.DL, "destroy");
dlsym_error = dlerror();
if (dlsym_error) {
LOG(error) << "Cannot load symbol destroy for " << dynLib.id << ": " << dlsym_error;
return false;
}
bool AnalyticsManager::loadPlugin(const string& name, const string& pluginPath, const string& config) {
LOG(info) << "Loading analyzer plugin \"" << name << "\"...";
std::string pluginConfig; //path to config file for plugin
std::string pluginLib = "libdcdbanalyzer_" + name;
#if __APPLE__
pluginLib+= ".dylib";
#else
pluginLib+= ".so";
#endif
std::string iPath = pluginPath;
// If path not specified we will look up in the default lib-directories (usr/lib and friends)
if (iPath != "") {
if (iPath[iPath.length()-1] != '/')
iPath.append("/");
pluginLib = iPath + pluginLib;
}
pluginConfig = config;
// If config-path not specified we will look for pluginName.conf in the global conf directory
if (pluginConfig == "")
pluginConfig = _configPath + name + ".conf";
// Open dl-code based on http://tldp.org/HOWTO/C++-dlopen/thesolution.html
if (FILE *file = fopen(pluginConfig.c_str(), "r")) {
fclose(file);
an_dl_t dynLib;
dynLib.id = name;
dynLib.DL = NULL;
dynLib.configurator = NULL;
// If plugin.conf exists, open libdcdbanalyzer_pluginName.so and read config
LOG(info) << pluginConfig << " found";
dynLib.DL = dlopen(pluginLib.c_str(), RTLD_NOW);
if(!dynLib.DL) {
LOG(error) << "Cannot load " << dynLib.id << "-library: " << dlerror();
return false;
}
dlerror();
// Set dynLib an_dl_t struct, load create and destroy symbols
dynLib.create = (an_create_t*) dlsym(dynLib.DL, "create");
const char* dlsym_error = dlerror();
if (dlsym_error) {
LOG(error) << "Cannot load symbol create for " << dynLib.id << ": " << dlsym_error;
return false;
}
dynLib.configurator = dynLib.create();
dynLib.configurator->setGlobalSettings(_pluginSettings);
// Read the analyzer plugin configuration
if (!(dynLib.configurator->readConfig(pluginConfig))) {
LOG(error) << "Plugin \"" << dynLib.id << "\" could not read configuration!";
return false;
}
dynLib.destroy = (an_destroy_t*) dlsym(dynLib.DL, "destroy");
dlsym_error = dlerror();
if (dlsym_error) {
LOG(error) << "Cannot load symbol destroy for " << dynLib.id << ": " << dlsym_error;
return false;
}
// Returning an empty vector may indicate problems with the config file
if(dynLib.configurator->getAnalyzers().size() == 0) {
LOG(warning) << "Plugin \"" << dynLib.id << "\" created no analyzers!";
} else if(!checkTopics(dynLib)) {
LOG(error) << "Problematic MQTT topics or sensor names, please check your config files!";
return false;
}
//save dl-struct
_plugins.push_back(dynLib);
LOG(info) << "Plugin \"" << dynLib.id << "\" loaded!";
} else {
LOG(info) << pluginConfig << " not found. Omitting";
}
}
dynLib.configurator = dynLib.create();
dynLib.configurator->setGlobalSettings(_pluginSettings);
// Read the analyzer plugin configuration
if (!(dynLib.configurator->readConfig(pluginConfig))) {
LOG(error) << "Plugin \"" << dynLib.id << "\" could not read configuration!";
return false;
}
// Returning an empty vector may indicate problems with the config file
if(dynLib.configurator->getAnalyzers().size() == 0) {
LOG(warning) << "Plugin \"" << dynLib.id << "\" created no analyzers!";
} else if(!checkTopics(dynLib)) {
LOG(error) << "Problematic MQTT topics or sensor names, please check your config files!";
return false;
}
//save dl-struct
_plugins.push_back(dynLib);
LOG(info) << "Plugin \"" << dynLib.id << "\" loaded!";
} else {
LOG(info) << pluginConfig << " not found. Omitting";
return false;
}
_status = LOADED;
return true;
}
void AnalyticsManager::unloadPlugin(const string& id) {
for (auto it = _plugins.begin(); it != _plugins.end(); ++it) {
if(it->id == id || id == "") {
for (const auto& a : it->configurator->getAnalyzers())
a->stop();
removeTopics(*it);
if (it->configurator)
it->destroy(it->configurator);
if (it->DL)
dlclose(it->DL);
if (id != "") {
_plugins.erase(it);
return;
}
}
}
if (id == "")
_plugins.clear();
}
bool AnalyticsManager::init(boost::asio::io_service& io, const string& plugin) {
if(_status != LOADED) {
LOG(error) << "Cannot init, AnalyticsManager is not loaded!";
......@@ -280,7 +320,6 @@ void AnalyticsManager::addRestEndpoints(RESTHttpsServer* restServer) {
restServer->addEndpoint("/analytics/start", {http::verb::put, stdBind(PUT_analytics_start)});
restServer->addEndpoint("/analytics/stop", {http::verb::put, stdBind(PUT_analytics_stop)});
restServer->addEndpoint("/analytics/reload", {http::verb::put, stdBind(PUT_analytics_reload)});
restServer->addEndpoint("/analytics/compute", {http::verb::put, stdBind(PUT_analytics_compute)});
restServer->addEndpoint("/analytics/analyzer", {http::verb::put, stdBind(PUT_analytics_analyzer)});
}
......@@ -498,36 +537,6 @@ void AnalyticsManager::PUT_analytics_stop(endpointArgs) {
}
}
void AnalyticsManager::PUT_analytics_reload(endpointArgs) {
if (!managerLoaded(res)) {
return;
}
/*
* This endpoint must either be overwritten (by adding a custom
* "analyzer/reload" endpoint) or must not be used. A reload requires
* an external io_service object and can therefore not be conducted by the
* AnalyticsManager itself.
*/
res.body() = "Sorry! It seems like this endpoint was not properly implemented.\n";
res.result(http::status::not_implemented);
/*
const std::string plugin = getQuery("plugin", queries);
if (!reload(_io, plugin)) {
res.body() = "Plugin not found or reload failed, please check the config files and MQTT topics!\n";
res.result(http::status::not_found);
} else if (!start(plugin)){
res.body() = "Plugin cannot be restarted!\n";
res.result(http::status::internal_server_error);
} else {
res.body() = "Plugin " + plugin + ": Sensors reloaded";
res.result(http::status::ok);
}
*/
}
void AnalyticsManager::PUT_analytics_compute(endpointArgs) {
if (!managerLoaded(res)) {
return;
......
......@@ -137,6 +137,23 @@ public:
*/
bool stop(const string& plugin="", const string& analyzer="");
/**
* @brief Loads a plugin dynamically
*
* @param name Name of the plugin
* @param pluginPath Path to the plugin library
* @param config Path to the configuration file
* @return True if successful, false otherwise
*/
bool loadPlugin(const string& name, const string& pluginPath, const string& config);
/**
* @brief Unloads a currently loaded plugin
*
* @param id Name of the plugin to be unloaded
*/
void unloadPlugin(const string& id);
/**
* @brief Get the vector of currently loaded plugins
*
......@@ -182,6 +199,11 @@ public:
" Stop all or only a specific analytics plugin or\n"
" stop only a specific analyzer within a plugin.\n"
" /reload?[plugin] Reload all or only a specific analytics plugin.\n"
" /load?plugin;[path];[config]\n"
" Load a new plugin. Optionally specify path to the\n"
" shared library and/or the config file for the \n"
" plugin.\n"
" /unload?plugin Unload a plugin.\n"
" /compute?plugin;analyzer;[unit];[json]\n"
" Query the specified analyzer for a unit. Default\n"
" unit is the root.\n"
......
......@@ -24,8 +24,9 @@ CARestAPI::CARestAPI(serverSettings_t settings,
_analyticsController->getManager()->addRestEndpoints(this);
//Overwrite analytic's reload endpoint because we need to stop analyticscontroller beforehand
addEndpoint("/analytics/reload", {http::verb::put, stdBind(PUT_analytics_reload)});
addEndpoint("/analytics/load", {http::verb::put, stdBind(PUT_analytics_load)});
addEndpoint("/analytics/unload", {http::verb::put, stdBind(PUT_analytics_unload)});
}
void CARestAPI::GET_help(endpointArgs) {
......@@ -59,7 +60,6 @@ void CARestAPI::GET_average(endpointArgs) {
//try getting the latest value
try {
int64_t val = _sensorCache->getSensor(sensor, (uint64_t) time * 1000000000);
res.body() = "collectagent::" + sensor + " Average of last " +
std::to_string(time) + " seconds is " + std::to_string(val);
res.result(http::status::ok);
......@@ -105,3 +105,45 @@ void CARestAPI::PUT_analytics_reload(endpointArgs) {
_analyticsController->resume();
}
void CARestAPI::PUT_analytics_load(endpointArgs) {
const std::string plugin = getQuery("plugin", queries);
const std::string path = getQuery("path", queries);
const std::string config = getQuery("config", queries);
if (!hasPlugin(plugin, res)) {
return;
}
// Wait until controller is paused in order to reload plugins
_analyticsController->halt(true);
if(_analyticsController->getManager()->loadPlugin(plugin, path, config)) {
res.body() = "Analytics plugin " + plugin + " successfully loaded!\n";
res.result(http::status::ok);
_analyticsController->getManager()->init(_analyticsController->getIoService(), plugin);
} else {
res.body() = "Failed to load analytics plugin " + plugin + "!\n";
res.result(http::status::internal_server_error);
}
_analyticsController->resume();
}
void CARestAPI::PUT_analytics_unload(endpointArgs) {
const std::string plugin = getQuery("plugin", queries);
if (!hasPlugin(plugin, res)) {
return;
}
// Wait until controller is paused in order to reload plugins
_analyticsController->halt(true);
_analyticsController->getManager()->unloadPlugin(plugin);
res.body() = "Analytics plugin " + plugin + " unloaded.\n";
res.result(http::status::ok);
_analyticsController->resume();
}
......@@ -79,6 +79,39 @@ private:
*/
void PUT_analytics_reload(endpointArgs);
/**
* PUT "/analytics/load"
*
* @brief Load and initialize an analytics plugin but do not start it yet.
*
* Queries | key | possible values | explanation
* -------------------------------------------------------------------------
* Required | plugin | name of the new | specify the plugin
* | | plugin |
* Optional | path | file path | specify a file path where to
* | | | search for the shared lib.
* | | | Defaults to (usr/lib etc.)
* | config | file path + name | specify the config file for
* | | | the plugin. Defaults to
* | | | ./PLUGINNAME.conf
*
*/
void PUT_analytics_load(endpointArgs);
/**
* PUT "/analytics/unload"
*
* @brief Unload a specific analytics plugin.
*
*
* Queries | key | possible values | explanation
* -------------------------------------------------------------------------
* Required | - | - | -
* Optional | plugin | all analyzer plugin | unload only the specified
* | | names | plugin
*/
void PUT_analytics_unload(endpointArgs);
SensorCache* _sensorCache;
AnalyticsController* _analyticsController;
};
......
......@@ -71,9 +71,9 @@ public:
}
static inline uint16_t hostToBE(uint16_t x) {
#if defined BOOST_ENDIAN_BIG_BYTE
#if BOOST_ENDIAN_BIG_BYTE
return x;
#elif defined BOOST_ENDIAN_LITTLE_BYTE
#elif BOOST_ENDIAN_LITTLE_BYTE
return swap(x);
#else
# error Endianess of host is undefined!
......@@ -81,9 +81,9 @@ public:
}
static inline uint32_t hostToBE(uint32_t x) {
#if defined BOOST_ENDIAN_BIG_BYTE
#if BOOST_ENDIAN_BIG_BYTE
return x;
#elif defined BOOST_ENDIAN_LITTLE_BYTE
#elif BOOST_ENDIAN_LITTLE_BYTE
return swap(x);
#else
# error Endianess of host is undefined!
......@@ -91,9 +91,9 @@ public:
}
static inline uint64_t hostToBE(uint64_t x) {
#if defined BOOST_ENDIAN_BIG_BYTE
#if BOOST_ENDIAN_BIG_BYTE
return x;
#elif defined BOOST_ENDIAN_LITTLE_BYTE
#elif BOOST_ENDIAN_LITTLE_BYTE
return swap(x);
#else
# error Endianess of host is undefined!
......@@ -101,9 +101,9 @@ public:
}
static inline uint16_t hostToLE(uint16_t x) {
#if defined BOOST_ENDIAN_BIG_BYTE
#if BOOST_ENDIAN_BIG_BYTE
return swap(x);
#elif defined BOOST_ENDIAN_LITTLE_BYTE
#elif BOOST_ENDIAN_LITTLE_BYTE
return x;
#else
# error Endianess of host is undefined!
......@@ -111,9 +111,9 @@ public:
}
static inline uint32_t hostToLE(uint32_t x) {
#if defined BOOST_ENDIAN_BIG_BYTE
#if BOOST_ENDIAN_BIG_BYTE
return swap(x);
#elif defined BOOST_ENDIAN_LITTLE_BYTE
#elif BOOST_ENDIAN_LITTLE_BYTE
return x;
#else
# error Endianess of host is undefined!
......@@ -121,9 +121,9 @@ public:
}
static inline uint64_t hostToLE(uint64_t x) {
#if defined BOOST_ENDIAN_BIG_BYTE
#if BOOST_ENDIAN_BIG_BYTE
return swap(x);
#elif defined BOOST_ENDIAN_LITTLE_BYTE
#elif BOOST_ENDIAN_LITTLE_BYTE
return x;
#else
# error Endianess of host is undefined!
......
......@@ -64,7 +64,7 @@ public:
_readingQueue(nullptr),
_sinkFile(nullptr) {}
virtual ~SensorBase() {}
virtual ~SensorBase() { if(_sinkFile) _sinkFile->close(); }
SensorBase& operator=(const SensorBase& other) {
_name = other._name;
......
......@@ -44,6 +44,7 @@ bool Configuration::readPlugins(PluginManager& pluginManager) {
return false;
}
pluginManager.setCfgFilePath(_cfgFilePath);
//read plugins
BOOST_FOREACH(boost::property_tree::iptree::value_type &plugin, cfg.get_child("plugins")) {
if (boost::iequals(plugin.first, "plugin")) {
......@@ -59,10 +60,6 @@ bool Configuration::readPlugins(PluginManager& pluginManager) {
pluginPath = val.second.data();
} else if (boost::iequals(val.first, "config")) {
pluginConfig = val.second.data();
//if config-path not specified we will look for pluginName.conf in the dcdbpusher.conf directory
if (pluginConfig == "") {
pluginConfig = _cfgFilePath + plugin.second.data() + ".conf";
}
} else {
LOG(warning) << " Value \"" << val.first << "\" not recognized. Omitting";
}
......
......@@ -14,7 +14,8 @@
using namespace std;
PluginManager::PluginManager(const pluginSettings_t& pluginSettings) :
_pluginSettings(pluginSettings) {}
_pluginSettings(pluginSettings),
_cfgFilePath("./") {}
PluginManager::~PluginManager() {
unloadPlugin();
......@@ -42,9 +43,9 @@ bool PluginManager::loadPlugin(const string& name,
pluginLib = pluginPath + pluginLib;
}
// build plugin config path
//if config-path not specified we will look for pluginName.conf in the dcdbpusher.conf directory
if (config == "") {
pluginConfig = "./" + name + ".conf";
pluginConfig = _cfgFilePath + name + ".conf";
} else {
pluginConfig = config;
}
......
......@@ -156,6 +156,13 @@ public:
pusherPluginStorage_t& getPlugins() {
return _plugins;
}
/**
* @brief Sets the internal path to be used when loading plugins by default
*
* @param p The string to be used as path
*/
void setCfgFilePath(const std::string& p) { _cfgFilePath = p; }
private:
......@@ -180,6 +187,7 @@ private:
pusherPluginStorage_t _plugins; /**< Storage to hold all loaded plugins */
pluginSettings_t _pluginSettings; /**< Default plugin settings to use when
loading a new plugin */
std::string _cfgFilePath;
logger_t lg;
};
......
......@@ -41,8 +41,9 @@ RestAPI::RestAPI(serverSettings_t settings,
_manager->addRestEndpoints(this);
//Overwrite analytic's reload endpoint because we need to stop MQTTPusher beforehand
addEndpoint("/analytics/reload", {http::verb::put, stdBind(PUT_analytics_reload)});
addEndpoint("/analytics/load", {http::verb::put, stdBind(PUT_analytics_load)});
addEndpoint("/analytics/unload", {http::verb::put, stdBind(PUT_analytics_unload)});
}
void RestAPI::GET_help(endpointArgs) {
......@@ -198,7 +199,7 @@ void RestAPI::PUT_load(endpointArgs) {
//before modifying the plugin we need to ensure that we have exclusive access
//therefore pause the only other concurrent user (MQTTPusher)
if (!_mqttPusher->halt()) {
res.body() = "Could not reload plugin (Timeout while waiting).\n";
res.body() = "Could not load plugin (Timeout while waiting).\n";