Commit c368888c authored by Michael Ott's avatar Michael Ott
Browse files

Move io_context creation out of AnalyticsController into main() and pass it to...

Move io_context creation out of AnalyticsController into main() and pass it to PluginManager and OperatorManager constructors instead of initPlugin() and startPlugin(). This avoids passing it to RestAPI.
parent d6ff5f6a
......@@ -220,7 +220,7 @@ void OperatorManager::unloadPlugin(const string& id) {
_plugins.clear();
}
bool OperatorManager::init(boost::asio::io_service& io, const string& plugin) {
bool OperatorManager::init(const string& plugin) {
if(_status != LOADED) {
LOG(error) << "Cannot init, OperatorManager is not loaded!";
return false;
......@@ -232,12 +232,12 @@ bool OperatorManager::init(boost::asio::io_service& io, const string& plugin) {
out = true;
LOG(info) << "Init " << p.id << " operator plugin";
for (const auto &op : p.configurator->getOperators())
op->init(io);
op->init(_io);
}
return out;
}
bool OperatorManager::reload(boost::asio::io_service& io, const string& plugin) {
bool OperatorManager::reload(const string& plugin) {
if(_status != LOADED) {
LOG(error) << "Cannot reload, OperatorManager is not loaded!";
return false;
......@@ -259,7 +259,7 @@ bool OperatorManager::reload(boost::asio::io_service& io, const string& plugin)
return false;
} else
for (const auto &op : p.configurator->getOperators())
op->init(io);
op->init(_io);
}
return out;
}
......
......@@ -82,8 +82,9 @@ public:
/**
* @brief Class constructor
* @param io Boost ASIO service to be used
*/
OperatorManager() { _status = CLEAR; }
OperatorManager(boost::asio::io_context& io) : _io(io) { _status = CLEAR; }
/**
* @brief Class destructor
......@@ -130,11 +131,10 @@ public:
* This method must be called after "load", and before "start". It will prepare operators for
* operation, and initialize the related sensors and caches.
*
* @param io Boost ASIO service to be used
* @param plugin Name of the plugin on which the action must be performed. If none, all plugins will be affected
* @return true if successful, false otherwise
*/
bool init(boost::asio::io_service& io, const string& plugin="");
bool init(const string& plugin="");
/**
* @brief Reload one or more plugins
......@@ -142,11 +142,10 @@ public:
* This method will cause all running operators of a plugin to be stopped and destroyed. The
* configuration file is then read once again, and new operators are created and initialized.
*
* @param io Boost ASIO service to be used
* @param plugin Name of the plugin on which the action must be performed. If none, all plugins will be affected
* @return true if successful, false otherwise
*/
bool reload(boost::asio::io_service& io, const string& plugin="");
bool reload(const string& plugin="");
/**
* @brief Start one or more stored plugins
......@@ -452,6 +451,7 @@ private:
*/
void PUT_analytics_operator(endpointArgs);
boost::asio::io_context& _io;
};
#endif //PROJECT_ANALYTICSMANAGER_H
......@@ -267,7 +267,7 @@ void CARestAPI::PUT_analytics_reload(endpointArgs) {
// Wait until controller is paused in order to reload plugins
_analyticsController->halt(true);
if (!_analyticsController->getManager()->reload(_analyticsController->getIoService(), plugin)) {
if (!_analyticsController->getManager()->reload(plugin)) {
res.body() = "Plugin not found or reload failed, please check the config files and MQTT topics!\n";
res.result(http::status::not_found);
} else if (!_analyticsController->getManager()->start(plugin)) {
......@@ -297,7 +297,7 @@ void CARestAPI::PUT_analytics_load(endpointArgs) {
res.body() = "Operator plugin " + plugin + " successfully loaded!\n";
res.result(http::status::ok);
_analyticsController->getManager()->init(_analyticsController->getIoService(), plugin);
_analyticsController->getManager()->init(plugin);
} else {
res.body() = "Failed to load operator plugin " + plugin + "!\n";
res.result(http::status::internal_server_error);
......
......@@ -86,16 +86,6 @@ bool AnalyticsController::initialize(Configuration& settings) {
if(!_queryEngine.updating.is_lock_free())
LOG(warning) << "This machine does not support lock-free atomics. Performance may be degraded.";
LOG(info) << "Creating threads...";
// Dummy to keep io service alive even if no tasks remain (e.g. because all sensors have been stopped over REST API)
// Inherited from DCDB Pusher
_keepAliveWork = make_shared<boost::asio::io_service::work>(_io);
// Create pool of threads which handle the sensors
for(size_t i = 0; i < _settings.threads; i++) {
_threads.create_thread(bind(static_cast< size_t (boost::asio::io_service::*) () >(&boost::asio::io_service::run), &_io));
}
LOG(info) << "Threads created!";
_initialized = true;
return true;
}
......@@ -106,7 +96,7 @@ void AnalyticsController::run() {
return;
LOG(info) << "Init operators...";
_manager->init(_io);
_manager->init();
LOG(info) << "Starting operators...";
_manager->start();
LOG(info) << "Sensors started!";
......
......@@ -65,10 +65,11 @@ public:
* @param dcdbCfg SensorConfig object to be used to retrieve sensor meta-data from Cassandra
* @param dcdbStore SensorDataStore object to be used to insert sensor readings into Cassandra
*/
AnalyticsController(DCDB::SensorConfig *dcdbCfg, DCDB::SensorDataStore *dcdbStore) {
_dcdbCfg = dcdbCfg;
_dcdbStore = dcdbStore;
_manager = make_shared<OperatorManager>();
AnalyticsController(DCDB::SensorConfig *dcdbCfg, DCDB::SensorDataStore *dcdbStore, boost::asio::io_context& io)
: _dcdbCfg(dcdbCfg),
_dcdbStore(dcdbStore),
_io(io) {
_manager = make_shared<OperatorManager>(io);
_navigator = nullptr;
_sensorCache = nullptr;
_metadataStore = nullptr;
......@@ -179,13 +180,6 @@ public:
*/
uint64_t getReadingCtr() { uint64_t ctr=_readingCtr; _readingCtr=0; return ctr; }
/**
* @brief Return the io_service used by the analytics controller.
*
* @return Reference to this object's boost::asio::io_service.
*/
boost::asio::io_service& getIoService() { return _io; }
/**
* @brief Rebuilds the internal sensor navigator.
*
......@@ -228,7 +222,7 @@ private:
// Main management thread for the analytics controller
boost::thread _mainThread;
// IO service for the operators
boost::asio::io_service _io;
boost::asio::io_context& _io;
// Underlying thread pool
boost::thread_group _threads;
// Dummy task to keep thread pool alive
......
......@@ -103,6 +103,7 @@ MetadataStore *metadataStore;
CARestAPI* httpsServer = nullptr;
DCDB::SCError err;
QueryEngine& queryEngine = QueryEngine::getInstance();
boost::shared_ptr<boost::asio::io_context::work> keepAliveWork;
logger_t lg;
bool jobQueryCallback(const string& jobId, const uint64_t startTs, const uint64_t endTs, vector<qeJobData>& buffer, const bool rel, const bool range, const string& domainId) {
......@@ -271,6 +272,8 @@ void sigHandler(int sig)
LOG(fatal) << "Received SIGUSR1 via REST API";
retCode = !httpsServer ? EXIT_SUCCESS : httpsServer->getReturnCode();
}
keepAliveWork.reset();
keepRunning = 0;
}
......@@ -754,8 +757,11 @@ int main(int argc, char* const argv[]) {
metadataStore->store(*sBuf.getPattern(), sBuf);
}
publicSensors.clear();
analyticsController = new AnalyticsController(mySensorConfig, mySensorDataStore);
boost::asio::io_context io;
boost::thread_group threads;
analyticsController = new AnalyticsController(mySensorConfig, mySensorDataStore, io);
analyticsController->setCache(&mySensorCache);
analyticsController->setMetadataStore(metadataStore);
queryEngine.setFilter(analyticsSettings.filter);
......@@ -779,6 +785,7 @@ int main(int argc, char* const argv[]) {
LOG(info) << " MQTT-listenAddress: " << settings.mqttListenHost << ":" << settings.mqttListenPort;
LOG(info) << " CacheInterval: " << int(pluginSettings.cacheInterval/1000) << " [s]";
LOG(info) << " CleaningInterval: " << settings.cleaningInterval << " [s]";
LOG(info) << " Threads: " << settings.threads;
LOG(info) << " MessageThreads: " << settings.messageThreads;
LOG(info) << " MessageSlots: " << settings.messageSlots;
LOG(info) << " Daemonize: " << (settings.daemonize ? "Enabled" : "Disabled");
......@@ -853,9 +860,20 @@ int main(int argc, char* const argv[]) {
if (settings.validateConfig)
return EXIT_SUCCESS;
else
analyticsController->start();
LOG(info) << "Creating threads...";
// Dummy to keep io service alive even if no tasks remain (e.g. because all sensors have been stopped over REST API)
// Inherited from DCDB Pusher
keepAliveWork = boost::make_shared<boost::asio::io_context::work>(io);
// Create pool of threads which handle the sensors
for(size_t i = 0; i < settings.threads; i++) {
threads.create_thread(bind(static_cast< size_t (boost::asio::io_context::*) () >(&boost::asio::io_context::run), &io));
}
LOG(info) << "Threads created!";
analyticsController->start();
LOG(info) << "AnalyticsController running...";
/*
* Start the MQTT Message Server.
*/
......@@ -870,7 +888,7 @@ int main(int argc, char* const argv[]) {
* Start the HTTP Server for the REST API
*/
if (restAPISettings.enabled) {
httpsServer = new CARestAPI(restAPISettings, &config.influxSettings, &mySensorCache, mySensorDataStore, mySensorConfig, analyticsController, &ms, analyticsController->getIoService());
httpsServer = new CARestAPI(restAPISettings, &config.influxSettings, &mySensorCache, mySensorDataStore, mySensorConfig, analyticsController, &ms, io);
config.readRestAPIUsers(httpsServer);
httpsServer->start();
LOG(info) << "HTTP Server running...";
......
......@@ -33,8 +33,9 @@
using namespace std;
PluginManager::PluginManager(const pluginSettings_t &pluginSettings)
: _pluginSettings(pluginSettings),
PluginManager::PluginManager(boost::asio::io_context &io, const pluginSettings_t &pluginSettings)
: _io(io),
_pluginSettings(pluginSettings),
_cfgFilePath("./") {}
PluginManager::~PluginManager() {
......@@ -176,8 +177,7 @@ void PluginManager::unloadPlugin(const string &id) {
}
}
bool PluginManager::initPlugin(boost::asio::io_service &io,
const string & id) {
bool PluginManager::initPlugin(const string &id) {
bool found = false;
for (const auto &p : _plugins) {
......@@ -185,7 +185,7 @@ bool PluginManager::initPlugin(boost::asio::io_service &io,
LOG(info) << "Init " << p.id << " plugin";
for (const auto &g : p.configurator->getSensorGroups()) {
found = true;
g->init(io);
g->init(_io);
}
}
}
......
......@@ -63,10 +63,11 @@ class PluginManager {
/**
* @brief Constructor.
*
* @param io IO service to initialize the plugin's groups with.
* @param pluginSettings Use this plugin settings as default when loading
* a new plugin.
*/
PluginManager(const pluginSettings_t &pluginSettings);
PluginManager(boost::asio::io_context &io, const pluginSettings_t &pluginSettings);
/**
* @brief Destructor.
......@@ -117,13 +118,11 @@ class PluginManager {
* @details Initializes a plugin so it can be started. A plugin has to be
* be initialized only once after it was (re)loaded.
*
* @param io IO service to initialize the plugin's groups with.
* @param id Identifying name of the plugin.
*
* @return True on success, false if the plugin could not be found.
*/
bool initPlugin(boost::asio::io_service &io,
const std::string & id = "");
bool initPlugin(const std::string &id = "");
// Undocumented: if no plugin name is specified all plugins are started.
/**
......@@ -205,10 +204,11 @@ class PluginManager {
*/
void removeTopics(const pusherPlugin_t &p);
pusherPluginStorage_t _plugins; ///< Storage to hold all loaded plugins
pluginSettings_t _pluginSettings; ///< Default plugin settings to use when loading a new plugin
std::string _cfgFilePath;
logger_t lg;
pusherPluginStorage_t _plugins; ///< Storage to hold all loaded plugins
pluginSettings_t _pluginSettings; ///< Default plugin settings to use when loading a new plugin
std::string _cfgFilePath;
boost::asio::io_context& _io;
logger_t lg;
};
#endif /* DCDBPUSHER_PLUGINMANAGER_H_ */
......@@ -48,8 +48,7 @@ RestAPI::RestAPI(serverSettings_t settings,
: RESTHttpsServer(settings, io),
_pluginManager(pluginManager),
_mqttPusher(mqttPusher),
_manager(manager),
_io(io) {
_manager(manager) {
addEndpoint("/help", {http::verb::get, stdBind(GET_help)});
addEndpoint("/version", {http::verb::get, stdBind(GET_version)});
......@@ -258,7 +257,7 @@ void RestAPI::PUT_load(endpointArgs) {
res.body() = "Plugin " + plugin + " successfully loaded!\n";
res.result(http::status::ok);
_pluginManager->initPlugin(_io, plugin);
_pluginManager->initPlugin(plugin);
} else {
res.body() = "Failed to load plugin " + plugin + "!\n";
res.result(http::status::internal_server_error);
......@@ -342,7 +341,7 @@ void RestAPI::PUT_reload(endpointArgs) {
res.body() = "Plugin " + plugin + ": Configuration reloaded.\n";
res.result(http::status::ok);
_pluginManager->initPlugin(_io, plugin);
_pluginManager->initPlugin(plugin);
_pluginManager->startPlugin(plugin);
} else {
res.body() = "Could not reload plugin (Plugin not found or invalid config file).\n";
......@@ -368,7 +367,7 @@ void RestAPI::PUT_analytics_reload(endpointArgs) {
unloadQueryEngine();
if (!_manager->reload(_io, plugin)) {
if (!_manager->reload(plugin)) {
res.body() = "Plugin not found or reload failed, please check the config files and MQTT topics!\n";
res.result(http::status::not_found);
} else if (!_manager->start(plugin)) {
......@@ -410,7 +409,7 @@ void RestAPI::PUT_analytics_load(endpointArgs) {
res.body() = "Operator plugin " + plugin + " successfully loaded!\n";
res.result(http::status::ok);
_manager->init(_io, plugin);
_manager->init(plugin);
} else {
res.body() = "Failed to load operator plugin " + plugin + "!\n";
res.result(http::status::internal_server_error);
......
......@@ -48,7 +48,7 @@ class RestAPI : public RESTHttpsServer {
PluginManager * pluginManager,
MQTTPusher * mqttPusher,
OperatorManager * manager,
boost::asio::io_context &io);
boost::asio::io_context &io);
virtual ~RestAPI() {}
......@@ -293,7 +293,6 @@ class RestAPI : public RESTHttpsServer {
PluginManager * _pluginManager;
MQTTPusher * _mqttPusher;
OperatorManager * _manager;
boost::asio::io_context &_io;
};
#endif /* DCDBPUSHER_RESTAPI_H_ */
......@@ -291,14 +291,14 @@ int main(int argc, char **argv) {
LOG(info) << "Logging setup complete";
_pluginManager = new PluginManager(pluginSettings);
_pluginManager = new PluginManager(io, pluginSettings);
//Read in rest of configuration. Also creates all sensors
if (!_configuration->readPlugins(*_pluginManager)) {
LOG(fatal) << "Failed to read configuration!";
return 1;
}
_operatorManager = new OperatorManager();
_operatorManager = new OperatorManager(io);
// Preparing the SensorNavigator
if (_operatorManager->probe(globalSettings.cfgFilePath, globalSettings.cfgFileName)) {
std::shared_ptr<SensorNavigator> navigator = std::make_shared<SensorNavigator>();
......@@ -432,7 +432,7 @@ int main(int argc, char **argv) {
//Init all sensors
LOG(info) << "Init sensors...";
_pluginManager->initPlugin(io);
_pluginManager->initPlugin();
//Start all sensors
LOG(info) << "Starting sensors...";
......@@ -442,7 +442,7 @@ int main(int argc, char **argv) {
LOG(warning) << "This machine does not support lock-free atomics. Performance may be degraded.";
LOG(info) << "Init operators...";
_operatorManager->init(io);
_operatorManager->init();
LOG(info) << "Starting operators...";
_operatorManager->start();
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment