Commit 56988601 authored by Micha Mueller's avatar Micha Mueller
Browse files

dcdbpusher: Integrate new PluginManager

parent 9f7e0524
......@@ -84,7 +84,7 @@ bool GlobalConfiguration::readConfig() {
restAPISettings.privateKey = global.second.data();
} else if (boost::iequals(global.first, "dhFile")) {
restAPISettings.dhFile = global.second.data();
} else if (boost::iequals(global.first, "authkey")) {
} else if (boost::iequals(global.first, "user")) {
//Avoids unnecessary "Value not recognized" message
} else {
LOG(warning) << " Value \"" << global.first << "\" not recognized. Omitting";
......
......@@ -8,22 +8,9 @@
#include "Configuration.h"
#include <string>
#include <unistd.h>
#include <dlfcn.h>
using namespace std;
Configuration::~Configuration() {
//close plugins
for (auto p : _plugins) {
if (p.configurator) {
p.destroy(p.configurator);
}
if (p.DL) {
dlclose(p.DL);
}
}
}
bool Configuration::readAdditionalValues(boost::property_tree::iptree::value_type &global) {
// ----- READING ADDITIONAL GLOBAL SETTINGS -----
if (boost::iequals(global.first, "mqttBroker")) {
......@@ -45,7 +32,7 @@ bool Configuration::readAdditionalValues(boost::property_tree::iptree::value_typ
return true;
}
bool Configuration::readPlugins() {
bool Configuration::readPlugins(PluginManager& pluginManager) {
std::string globalConfig = _cfgFilePath;
globalConfig.append(_cfgFileName);
......@@ -57,30 +44,19 @@ bool Configuration::readPlugins() {
return false;
}
MQTTChecker& mqttCheck = MQTTChecker::getInstance();
//read plugins
BOOST_FOREACH(boost::property_tree::iptree::value_type &plugin, cfg.get_child("plugins")) {
if (boost::iequals(plugin.first, "plugin")) {
if (!plugin.second.empty()) {
LOG(info) << "Loading plugin " << plugin.second.data() << "...";
std::string pluginConfig; //path to config file for plugin
std::string pluginLib = "libdcdbplugin_" + plugin.second.data(); //TODO add version information? //path to the plugin-lib
#if __APPLE__
pluginLib+= ".dylib";
#else
pluginLib+= ".so";
#endif
std::string pluginName = plugin.second.data();
std::string pluginConfig = ""; // path to config file for plugin
std::string pluginPath = ""; // path to plugin
LOG(info) << "Read plugin " << pluginName << "...";
BOOST_FOREACH(boost::property_tree::iptree::value_type &val, plugin.second) {
if (boost::iequals(val.first, "path")) {
std::string path = val.second.data();
//if path not specified we will look up in the default lib-directories (usr/lib and friends)
if (path != "") {
if (path[path.length()-1] != '/') {
path.append("/");
}
pluginLib = path + pluginLib;
}
pluginPath = val.second.data();
} else if (boost::iequals(val.first, "config")) {
pluginConfig = val.second.data();
//if config-path not specified we will look for pluginName.conf in the dcdbpusher.conf directory
......@@ -91,75 +67,11 @@ bool Configuration::readPlugins() {
LOG(warning) << " Value \"" << val.first << "\" not recognized. Omitting";
}
}
//open plugin
//dl-code based on http://tldp.org/HOWTO/C++-dlopen/thesolution.html
if (FILE *file = fopen(pluginConfig.c_str(), "r")) {
fclose(file);
dl_t dynLib;
dynLib.id = plugin.second.data();
dynLib.DL = NULL;
dynLib.configurator = NULL;
//plugin.conf exists --> open libdcdbplugin_pluginName.so and read config
LOG(info) << pluginConfig << " found";
dynLib.DL = dlopen(pluginLib.c_str(), RTLD_NOW);
if(!dynLib.DL) {
LOG(error) << "Cannot load " << dynLib.id << "-library: " << dlerror();
return false;
}
//reset errors
dlerror();
//set dynLib-struct
dynLib.create = (create_t*) dlsym(dynLib.DL, "create");
const char* dlsym_error = dlerror();
if (dlsym_error) {
LOG(error) << "Cannot load symbol create for " << dynLib.id << ": " << dlsym_error;
return false;
}
dynLib.destroy = (destroy_t*) dlsym(dynLib.DL, "destroy");
dlsym_error = dlerror();
if (dlsym_error) {
LOG(error) << "Cannot load symbol destroy for " << dynLib.id << ": " << dlsym_error;
return false;
}
dynLib.configurator = dynLib.create();
//set prefix to global prefix (may be overwritten)
dynLib.configurator->setGlobalSettings(pluginSettings);
//read in config
if (!(dynLib.configurator->readConfig(pluginConfig))) {
LOG(error) << "Plugin " << dynLib.id << " could not read configuration!";
return false;
}
// returning an empty vector may indicate problems with the config file
if(dynLib.configurator->getSensorGroups().size() == 0) {
LOG(warning) << "Plugin " << dynLib.id << " created no sensors!";
}
//check if an MQTT-suffix was assigned twice
bool validTopics=true;
for(const auto& g : dynLib.configurator->getSensorGroups()) {
if (!mqttCheck.checkGroup(g->getGroupName()))
validTopics = false;
for (const auto &s : g->getSensors())
if (!mqttCheck.checkTopic(s->getMqtt()) || !mqttCheck.checkName(s->getName()))
validTopics = false;
}
if(!validTopics) {
LOG(error) << "Problematic MQTT topics or sensor names, please check your config files!";
return false;
}
//save dl-struct
_plugins.push_back(dynLib);
LOG(info) << "Plugin " << dynLib.id << " " << dynLib.configurator->getVersion() << " loaded!";
} else {
LOG(info) << pluginConfig << " not found. Omitting";
if(!pluginManager.loadPlugin(pluginSettings, pluginName, pluginPath, pluginConfig)) {
LOG(error) << "Could not load plugin " << pluginName;
pluginManager.unloadPlugin();
return false;
}
}
}
......
......@@ -8,11 +8,11 @@
#ifndef CONFIGURATION_H_
#define CONFIGURATION_H_
#include "globalconfiguration.h"
#include <set>
#include <boost/log/trivial.hpp>
#include "includes/PluginDefinitions.h"
#include "mqttchecker.h"
#include "PluginManager.h"
#define BROKERPORT 1883
#define BROKERHOST "127.0.0.1"
......@@ -32,21 +32,17 @@ public:
*/
Configuration(const std::string& cfgFilePath, const std::string& cfgFileName) : GlobalConfiguration(cfgFilePath, cfgFileName) {}
virtual ~Configuration();
virtual ~Configuration() {}
/**
* Reads the plugin configuration section from global.conf (located at _cfgFilePath).
* Detects which sensor types are required and dynamically opens required plugins.
* Invokes the plugin to read its configuration and generate specified sensors.
* Triggers the pluginManager to load plugins as required.
*
* @param pluginManager The pluginManager object to store plugins.
*
* @return true on success, false otherwise
*/
bool readPlugins();
/**
* Return all plugins (in form of their dl_t struct)
*/
pluginVector_t& getPlugins() { return _plugins; }
bool readPlugins(PluginManager& pluginManager);
// Additional configuration parameters to be parsed and stored in the global block
int qosLevel = 1;
......@@ -59,12 +55,6 @@ public:
protected:
bool readAdditionalValues(boost::property_tree::iptree::value_type &global) override;
/**
* Vector which holds data of the opened plugins.
* Only references to this member should be handed around, to keep the plugins everywhere in sync.
*/
pluginVector_t _plugins;
};
#endif /* CONFIGURATION_H_ */
......@@ -12,7 +12,7 @@
#include "timestamp.h"
MQTTPusher::MQTTPusher(int brokerPort, const std::string& brokerHost, const std::string& sensorPattern, int qosLevel,
pluginVector_t& plugins, an_pluginVector_t& aPlugins, int maxNumberOfMessages, unsigned int maxInflightMsgNum, unsigned int maxQueuedMsgNum) :
pusherPluginStorage_t& plugins, an_pluginVector_t& aPlugins, int maxNumberOfMessages, unsigned int maxInflightMsgNum, unsigned int maxQueuedMsgNum) :
_qosLevel(qosLevel),
_brokerPort(brokerPort),
_brokerHost(brokerHost),
......
......@@ -13,7 +13,7 @@
#include <mosquitto.h>
#include <map>
#include "includes/PluginDefinitions.h"
#include "PluginManager.h"
#include "sensorbase.h"
#include "../analytics/AnalyticsManager.h"
......@@ -25,7 +25,7 @@ enum msgCap_t {DISABLED = 1, ENABLED = 2, MINIMUM = 3};
class MQTTPusher {
public:
MQTTPusher(int brokerPort, const std::string& brokerHost, const std::string& sensorPattern, int qosLevel,
pluginVector_t& plugins, an_pluginVector_t& aPlugins, int maxNumberOfMessages, unsigned int maxInflightMsgNum, unsigned int maxQueuedMsgNum);
pusherPluginStorage_t& plugins, an_pluginVector_t& aPlugins, int maxNumberOfMessages, unsigned int maxInflightMsgNum, unsigned int maxQueuedMsgNum);
virtual ~MQTTPusher();
/**
......@@ -89,7 +89,7 @@ private:
int _brokerPort;
std::string _brokerHost;
std::string _sensorPattern;
pluginVector_t& _plugins;
pusherPluginStorage_t& _plugins;
an_pluginVector_t& _analyticsPlugins;
struct mosquitto* _mosq;
bool _connected;
......
......@@ -130,12 +130,21 @@ void PluginManager::unloadPlugin(const string& id) {
if (it->DL) {
dlclose(it->DL);
}
_plugins.erase(it);
if (id != "") {
//only erase if we immediately return or otherwise our iterator would be invalidated
_plugins.erase(it);
return;
}
}
}
if (id == "") {
_plugins.clear();
}
}
bool PluginManager::initPlugin(boost::asio::io_service io,
bool PluginManager::initPlugin(boost::asio::io_service& io,
const string& id) {
bool found = false;
......
......@@ -18,6 +18,7 @@
/*
* Bundles all attributes to hold a dcdb-pusher plugin aka dynamic library.
* //TODO extract common code with AnalyticsManager in common base class
*/
typedef struct {
std::string id;
......@@ -92,7 +93,7 @@ public:
*
* @return True on success, false if the plugin could not be found.
*/
bool initPlugin(boost::asio::io_service io,
bool initPlugin(boost::asio::io_service& io,
const std::string& id = "");
// Undocumented: if no plugin name is specified all plugins are started.
......
......@@ -18,12 +18,12 @@
std::placeholders::_2)
RestAPI::RestAPI(serverSettings_t settings,
pluginVector_t& plugins,
PluginManager* pluginManager,
MQTTPusher* mqttPusher,
AnalyticsManager* manager,
boost::asio::io_service& io) :
RESTHttpsServer(settings),
_plugins(plugins),
_pluginManager(pluginManager),
_mqttPusher(mqttPusher),
_manager(manager),
_io(io) {
......@@ -52,13 +52,13 @@ void RestAPI::GET_plugins(endpointArgs) {
std::ostringstream data;
if (getQuery("json", queries) == "true") {
boost::property_tree::ptree root, plugins;
for (const auto& p : _plugins) {
for (const auto& p : _pluginManager->getPlugins()) {
plugins.put(p.id, "");
}
root.add_child("plugins", plugins);
boost::property_tree::write_json(data, root, true);
} else {
for (const auto& p : _plugins) {
for (const auto& p : _pluginManager->getPlugins()) {
data << p.id << "\n";
}
}
......@@ -73,7 +73,7 @@ void RestAPI::GET_sensors(endpointArgs) {
return;
}
for(const auto& p : _plugins) {
for(const auto& p : _pluginManager->getPlugins()) {
if (p.id == plugin) {
std::ostringstream data;
if (getQuery("json", queries) == "true") {
......@@ -129,7 +129,7 @@ void RestAPI::GET_average(endpointArgs) {
res.body() = "Plugin not found!\n";
res.result(http::status::not_found);
for(const auto& p : _plugins) {
for(const auto& p : _pluginManager->getPlugins()) {
if (p.id == plugin) {
res.body() = "Sensor not found!\n";
for(const auto& g : p.configurator->getSensorGroups()) {
......@@ -191,15 +191,10 @@ void RestAPI::PUT_start(endpointArgs) {
return;
}
for (const auto& p : _plugins) {
if (p.id == plugin) {
for (const auto& g : p.configurator->getSensorGroups()) {
g->start();
}
res.body() = "Plugin " + plugin + ": Sensors started\n";
res.result(http::status::ok);
return;
}
if(_pluginManager->startPlugin(plugin)) {
res.body() = "Plugin " + plugin + ": Sensors started\n";
res.result(http::status::ok);
return;
}
}
......@@ -210,15 +205,10 @@ void RestAPI::PUT_stop(endpointArgs) {
return;
}
for (const auto& p : _plugins) {
if (p.id == plugin) {
for (const auto& g : p.configurator->getSensorGroups()) {
g->stop();
}
res.body() = "Plugin " + plugin + ": Sensors stopped\n";
res.result(http::status::ok);
return;
}
if(_pluginManager->stopPlugin(plugin)) {
res.body() = "Plugin " + plugin + ": Sensors stopped\n";
res.result(http::status::ok);
return;
}
}
......@@ -229,50 +219,34 @@ void RestAPI::PUT_reload(endpointArgs) {
return;
}
for (const auto &p : _plugins) {
if (p.id == plugin) {
//before modifying the plugin we need to ensure that we have exclusive access
//therefore pause the only other concurrent user (MQTTPusher)
if (!_mqttPusher->halt()) {
res.body() = "Could not reload plugin (Timeout while waiting).\n";
res.result(http::status::internal_server_error);
return;
}
//before modifying the plugin we need to ensure that we have exclusive access
//therefore pause the only other concurrent user (MQTTPusher)
if (!_mqttPusher->halt()) {
res.body() = "Could not reload plugin (Timeout while waiting).\n";
res.result(http::status::internal_server_error);
return;
}
// Removing obsolete MQTT topics
removeTopics(p);
if (p.configurator->reReadConfig()) {
// Perform checks on MQTT topics
if(!checkTopics(p)) {
res.body() = "Plugin " + plugin + ": problematic MQTT topics or sensor names, please check your config files!\n";
res.result(http::status::internal_server_error);
removeTopics(p);
p.configurator->clearConfig();
} else {
res.body() = "Plugin " + plugin + ": Configuration reloaded.\n";
res.result(http::status::ok);
for (const auto& g : p.configurator->getSensorGroups()) {
g->init(_io);
g->start();
}
}
} else {
res.body() = "Plugin " + plugin + ": Could not reload configuration.\n";
res.result(http::status::internal_server_error);
}
if(_pluginManager->reloadPluginConfig(plugin)) {
res.body() = "Plugin " + plugin + ": Configuration reloaded.\n";
res.result(http::status::ok);
//continue MQTTPusher
_mqttPusher->cont();
break;
}
_pluginManager->initPlugin(_io, plugin);
_pluginManager->startPlugin(plugin);
} else {
res.body() = "Could not reload plugin (Plugin not found or invalid config file).\n";
res.result(http::status::internal_server_error);
}
//continue MQTTPusher
_mqttPusher->cont();
//Updating the SensorNavigator on plugin reloads, if analytics plugins are currently running
if (_manager!=NULL && _manager->getPlugins().size()>0) {
QueryEngine &qEngine = QueryEngine::getInstance();
std::shared_ptr <SensorNavigator> navigator = std::make_shared<SensorNavigator>();
std::vector<std::string> names, topics;
for (const auto &p : _plugins)
for (const auto &p : _pluginManager->getPlugins())
for (const auto &g : p.configurator->getSensorGroups())
for (const auto &s : g->getSensors()) {
names.push_back(s->getName());
......@@ -284,30 +258,6 @@ void RestAPI::PUT_reload(endpointArgs) {
}
}
void RestAPI::removeTopics(dl_t p) {
MQTTChecker& mqttCheck = MQTTChecker::getInstance();
for(const auto& g : p.configurator->getSensorGroups()) {
mqttCheck.removeGroup(g->getGroupName());
for (const auto &s : g->getSensors()) {
mqttCheck.removeTopic(s->getMqtt());
mqttCheck.removeName(s->getName());
}
}
}
bool RestAPI::checkTopics(dl_t p) {
MQTTChecker& mqttCheck = MQTTChecker::getInstance();
bool validTopics=true;
for(const auto& g : p.configurator->getSensorGroups()) {
if (!mqttCheck.checkGroup(g->getGroupName()))
validTopics = false;
for (const auto &s : g->getSensors())
if (!mqttCheck.checkTopic(s->getMqtt()) || !mqttCheck.checkName(s->getName()))
validTopics = false;
}
return validTopics;
}
void RestAPI::PUT_analytics_reload(endpointArgs) {
if (_manager->getStatus() != AnalyticsManager::LOADED) {
res.body() = "AnalyticsManager is not loaded!\n";
......
......@@ -12,15 +12,15 @@
#include <boost/asio.hpp>
#include "includes/PluginDefinitions.h"
#include "../analytics/AnalyticsManager.h"
#include "mqttchecker.h"
#include "MQTTPusher.h"
#include "PluginManager.h"
class RestAPI : public RESTHttpsServer {
public:
RestAPI(serverSettings_t settings,
pluginVector_t& plugins,
PluginManager* pluginManager,
MQTTPusher* mqttPusher,
AnalyticsManager* manager,
boost::asio::io_service& io);
......@@ -157,13 +157,7 @@ private:
/******************************************************************************/
// Utility method to remove all MQTT topics associated to a plugin from the used set
void removeTopics(dl_t p);
// Utility method to check for the validity of all MQTT topics in a plugin
bool checkTopics(dl_t p);
pluginVector_t& _plugins;
PluginManager* _pluginManager;
MQTTPusher* _mqttPusher;
AnalyticsManager* _manager;
boost::asio::io_service& _io;
......
......@@ -31,6 +31,7 @@
#include "Configuration.h"
#include "MQTTPusher.h"
#include "PluginManager.h"
#include "RestAPI.h"
#include "version.h"
......@@ -49,6 +50,7 @@ using namespace std;
Configuration* _configuration;
MQTTPusher* _mqttPusher;
PluginManager* _pluginManager;
RestAPI* _httpsServer;
AnalyticsManager* _analyticsManager;
std::map<std::string, SBasePtr> _sensorMap;
......@@ -62,7 +64,7 @@ std::vector<reading_t>* sensorQueryCallback(const string& name, const uint64_t s
if(!_queryEngine.updating.exchange(true)) {
_sensorMap.clear();
// Adding ordinary sensors to the map
for (auto &p : _configuration->getPlugins())
for (auto &p : _pluginManager->getPlugins())
for (auto &g : p.configurator->getSensorGroups())
for (auto &s : g->getSensors())
_sensorMap.insert(std::make_pair(s->getName(), s));
......@@ -97,14 +99,11 @@ void sigHandler(int sig) {
LOG(fatal) << "Received SIGINT";
else if( sig == SIGTERM )
LOG(fatal) << "Received SIGTERM";
//Stop all sensors
LOG(info) << "Stopping sensors...";
for(auto& p : _configuration->getPlugins()) {
LOG(info) << "Stop \"" << p.id << "\" plugin";
for(const auto& g : p.configurator->getSensorGroups()) {
g->stop();
}
}
_pluginManager->stopPlugin();
//Stop data analytics plugins and analyzers
_analyticsManager->stop();
......@@ -256,8 +255,9 @@ int main(int argc, char** argv) {
LOG(info) << "Logging setup complete";
_pluginManager = new PluginManager();
//Read in rest of configuration. Also creates all sensors
if(!_configuration->readPlugins()) {
if(!_configuration->readPlugins(*_pluginManager)) {
LOG(fatal) << "Failed to read configuration!";
return 1;
}
......@@ -267,7 +267,7 @@ int main(int argc, char** argv) {
if(_analyticsManager->probe(argv[argc-1], "dcdbpusher.conf")) {
std::shared_ptr <SensorNavigator> navigator = std::make_shared<SensorNavigator>();
vector <std::string> names, topics;
for (const auto &p : _configuration->getPlugins())
for (const auto &p : _pluginManager->getPlugins())
for (const auto &g : p.configurator->getSensorGroups())
for (const auto &s : g->getSensors()) {
names.push_back(s->getName());
......@@ -331,7 +331,7 @@ int main(int argc, char** argv) {
#endif
LOG_VAR(vLogLevel) << "----- Sampling Configuration -----";
for(auto& p : _configuration->getPlugins()) {
for(auto& p : _pluginManager->getPlugins()) {
LOG_VAR(vLogLevel) << "Sampling Plugin \"" << p.id << "\"";
p.configurator->printConfig(vLogLevel);
}
......@@ -344,8 +344,8 @@ int main(int argc, char** argv) {
//MQTTPusher and Https server get their own threads
_mqttPusher = new MQTTPusher(globalSettings.brokerPort, globalSettings.brokerHost, pluginSettings.sensorPattern, globalSettings.qosLevel,
_configuration->getPlugins(), _analyticsManager->getPlugins(), globalSettings.maxMsgNum, globalSettings.maxInflightMsgNum, globalSettings.maxQueuedMsgNum);
_httpsServer = new RestAPI(restAPISettings, _configuration->getPlugins(), _mqttPusher, _analyticsManager, io);
_pluginManager->getPlugins(), _analyticsManager->getPlugins(), globalSettings.maxMsgNum, globalSettings.maxInflightMsgNum, globalSettings.maxQueuedMsgNum);
_httpsServer = new RestAPI(restAPISettings, _pluginManager, _mqttPusher, _analyticsManager, io);
_configuration->readRestAPIUsers(_httpsServer);
LOG_VAR(vLogLevel) << "----- End Configuration -----";
......@@ -356,22 +356,11 @@ int main(int argc, char** argv) {
//Init all sensors
LOG(info) << "Init sensors...";
for(auto& p : _configuration->getPlugins()) {
LOG(info) << "Init \"" << p.id << "\" plugin";
for(const auto& g : p.configurator->getSensorGroups()) {
LOG(debug) << " -Group: " << g->getGroupName();
g->init(io);
}
}
_pluginManager->initPlugin(io);
//Start all sensors
LOG(info) << "Starting sensors...";
for(auto& p : _configuration->getPlugins()) {
LOG(info) << "Start \"" << p.id << "\" plugin";
for(const auto& g : p.configurator->getSensorGroups()) {
g->start();
}