Commit 3e683acf authored by Alessio Netti's avatar Alessio Netti
Browse files

Analytics: re-worked sensor map update mechanism in DCDBPusher

- In the new implementation the sensor map used to speed up access to
sensor data is rebuilt synchronously when plugins are reloaded/loaded/unloaded
- Any threads accessing the sensor query callback will block until
the update has finished
parent ed311bb0
......@@ -79,6 +79,17 @@ public:
* @param navi Pointer to a SensorNavigator object
*/
void setNavigator(shared_ptr<SensorNavigator> navi) { _navigator = navi; }
/**
* @brief Set internal map of sensors
*
* In certain query callback implementations, a sensor map structure is used to make access to
* sensor data more efficient. In this case, this method is to be used to set and expose such
* structure correctly.
*
* @param sMap Pointer to a sensor map structure
*/
void setSensorMap(shared_ptr<map<string, SBasePtr>> sMap) { _sensorMap = sMap; }
/**
* @brief Set the current sensor hierarchy
......@@ -124,11 +135,18 @@ public:
void setJobQueryCallback(QueryEngineJobCallback jcb) { _jCallback = jcb; }
/**
* @brief Returns the internal SensorNavigator objects
* @brief Returns the internal SensorNavigator object
*
* @return pointer to a SensorNavigator object
* @return Pointer to a SensorNavigator object
*/
const shared_ptr<SensorNavigator> getNavigator() { return _navigator; }
/**
* @brief Returns the internal sensor map data structure
*
* @return Pointer to a sensor map
*/
const shared_ptr<map<string, SBasePtr>> getSensorMap() { return _sensorMap; }
/**
* @brief Returns the current sensor hierarchy
......@@ -207,14 +225,32 @@ public:
return _jCallback(jobId, startTs, endTs, buffer, rel, range);
}
void triggerUpdate() {
/**
* @brief Locks access to the QueryEngine
*
* Once this method returns, the invoking thread can safely update all internal data structures
* of the QueryEngine. the unlock() method must be called afterwards.
*/
void lock() {
// Locking out new threads from the QueryEngine callbacks
updating.store(true);
// Waiting until all previous threads have finished using the QueryEngine
while(access.load()>0) {}
}
/**
* @brief Unlocks access to the QueryEngine
*
* Must be called after a lock() call.
*/
void unlock() {
access.store(0);
updating.store(false);
updated.store(true);
}
//Internal boolean flags used for utility purposes
atomic<bool> updated;
//Internal atomic flags used for utility purposes
atomic<bool> updating;
atomic<int> access;
private:
......@@ -223,10 +259,11 @@ private:
*/
QueryEngine() {
_navigator = NULL;
_sensorMap = NULL;
_callback = NULL;
_jCallback = NULL;
updated.store(false);
updating.store(false);
access.store(0);
}
/**
......@@ -241,6 +278,8 @@ private:
// Internal pointer to a SensorNavigator
shared_ptr<SensorNavigator> _navigator;
// Internal pointer to a sensor map used in certain query callback implementations
shared_ptr<map<string, SBasePtr>> _sensorMap;
// Callback used to retrieve sensor data
QueryEngineCallback _callback;
// Callback used to retrieve job data
......
......@@ -78,7 +78,6 @@ bool AnalyticsController::initialize(Configuration& settings, const string& conf
// Assigning the newly-built sensor navigator to the QueryEngine
_queryEngine.setNavigator(_navigator);
_queryEngine.triggerUpdate();
}
//TODO: find a better solution to disable the SensorBase default cache
......@@ -88,7 +87,7 @@ bool AnalyticsController::initialize(Configuration& settings, const string& conf
return false;
}
if(!_queryEngine.updated.is_lock_free())
if(!_queryEngine.updating.is_lock_free())
LOG(warning) << "This machine does not support lock-free atomics. Performance may be degraded.";
LOG(info) << "Creating threads...";
......
......@@ -223,6 +223,8 @@ void RestAPI::PUT_load(endpointArgs) {
res.result(http::status::internal_server_error);
return;
}
unloadQueryEngine();
if(_pluginManager->loadPlugin(plugin, path, config)) {
res.body() = "Plugin " + plugin + " successfully loaded!\n";
......@@ -236,7 +238,7 @@ void RestAPI::PUT_load(endpointArgs) {
//continue MQTTPusher
_mqttPusher->cont();
reloadNavigator();
reloadQueryEngine();
}
void RestAPI::PUT_unload(endpointArgs) {
......@@ -251,6 +253,8 @@ void RestAPI::PUT_unload(endpointArgs) {
res.result(http::status::internal_server_error);
return;
}
unloadQueryEngine();
_pluginManager->unloadPlugin(plugin);
res.body() = "Plugin " + plugin + " unloaded.\n";
......@@ -258,7 +262,7 @@ void RestAPI::PUT_unload(endpointArgs) {
//continue MQTTPusher
_mqttPusher->cont();
reloadNavigator();
reloadQueryEngine();
}
void RestAPI::PUT_start(endpointArgs) {
......@@ -303,6 +307,8 @@ void RestAPI::PUT_reload(endpointArgs) {
res.result(http::status::internal_server_error);
return;
}
unloadQueryEngine();
if(_pluginManager->reloadPluginConfig(plugin)) {
res.body() = "Plugin " + plugin + ": Configuration reloaded.\n";
......@@ -317,7 +323,7 @@ void RestAPI::PUT_reload(endpointArgs) {
//continue MQTTPusher
_mqttPusher->cont();
reloadNavigator();
reloadQueryEngine();
}
void RestAPI::PUT_analytics_reload(endpointArgs) {
......@@ -331,6 +337,10 @@ void RestAPI::PUT_analytics_reload(endpointArgs) {
// Wait until MQTTPusher is paused in order to reload plugins
if (_mqttPusher->halt()) {
_manager->stop(plugin);
unloadQueryEngine();
if (!_manager->reload(_io, plugin)) {
res.body() = "Plugin not found or reload failed, please check the config files and MQTT topics!\n";
res.result(http::status::not_found);
......@@ -341,7 +351,9 @@ void RestAPI::PUT_analytics_reload(endpointArgs) {
res.body() = "Plugin " + plugin + ": Sensors reloaded\n";
res.result(http::status::ok);
}
_mqttPusher->cont();
reloadQueryEngine();
} else {
res.body() = "Could not reload plugins (Timeout while waiting).\n";
res.result(http::status::internal_server_error);
......@@ -365,6 +377,8 @@ void RestAPI::PUT_analytics_load(endpointArgs) {
return;
}
unloadQueryEngine();
if(_manager->loadPlugin(plugin, path, config)) {
res.body() = "Analytics plugin " + plugin + " successfully loaded!\n";
res.result(http::status::ok);
......@@ -377,6 +391,7 @@ void RestAPI::PUT_analytics_load(endpointArgs) {
//continue MQTTPusher
_mqttPusher->cont();
reloadQueryEngine();
}
void RestAPI::PUT_analytics_unload(endpointArgs) {
......@@ -391,6 +406,9 @@ void RestAPI::PUT_analytics_unload(endpointArgs) {
res.result(http::status::internal_server_error);
return;
}
_manager->stop(plugin);
unloadQueryEngine();
_manager->unloadPlugin(plugin);
res.body() = "Analytics plugin " + plugin + " unloaded.\n";
......@@ -398,22 +416,55 @@ void RestAPI::PUT_analytics_unload(endpointArgs) {
//continue MQTTPusher
_mqttPusher->cont();
reloadQueryEngine();
}
void RestAPI::reloadNavigator() {
void RestAPI::reloadQueryEngine(const bool force) {
//Updating the SensorNavigator on plugin reloads, if analytics plugins are currently running
if (_manager!=NULL && _manager->getPlugins().size()>0) {
QueryEngine &qEngine = QueryEngine::getInstance();
std::shared_ptr <SensorNavigator> navigator = std::make_shared<SensorNavigator>();
std::vector<std::string> names, topics;
for (const auto &p : _pluginManager->getPlugins())
for (const auto &g : p.configurator->getSensorGroups())
for (const auto &s : g->getSensors()) {
names.push_back(s->getName());
topics.push_back(s->getMqtt());
}
navigator->buildTree(qEngine.getSensorHierarchy(), &names, &topics);
qEngine.setNavigator(navigator);
qEngine.triggerUpdate();
if (!force && (_manager==NULL || _manager->getPlugins().empty()))
return;
QueryEngine &qEngine = QueryEngine::getInstance();
std::shared_ptr <std::map<std::string, SBasePtr>> sensorMap = std::make_shared<std::map<std::string, SBasePtr>>();
std::shared_ptr <SensorNavigator> navigator = std::make_shared<SensorNavigator>();
std::vector<std::string> topics;
for (const auto &p : _pluginManager->getPlugins())
for (const auto &g : p.configurator->getSensorGroups())
for (const auto &s : g->getSensors()) {
topics.push_back(s->getMqtt());
sensorMap->insert(std::make_pair(s->getName(), s));
}
// Adding data analytics sensors to the map
for(auto& p : _manager->getPlugins()) {
for(const auto& a : p.configurator->getAnalyzers())
if (a->getStreaming()) {
for (const auto &u : a->getUnits())
for (const auto &o: u->getBaseOutputs())
sensorMap->insert(std::make_pair(o->getName(), o));
a->releaseUnits();
}
}
try {
navigator->buildTree(qEngine.getSensorHierarchy(), &topics);
} catch(const std::exception& e) {
qEngine.unlock();
return;
}
qEngine.setSensorMap(sensorMap);
qEngine.setNavigator(navigator);
// Unlocking the QueryEngine
qEngine.unlock();
}
void RestAPI::unloadQueryEngine() {
QueryEngine &qEngine = QueryEngine::getInstance();
// Locking access to the QueryEngine
qEngine.lock();
qEngine.setNavigator(nullptr);
qEngine.setSensorMap(nullptr);
}
......@@ -250,7 +250,8 @@ private:
/******************************************************************************/
// Utility method to reload the sensor navigator whenever needed
void reloadNavigator();
void reloadQueryEngine(const bool force=false);
void unloadQueryEngine();
PluginManager* _pluginManager;
MQTTPusher* _mqttPusher;
......
......@@ -69,47 +69,23 @@ MQTTPusher* _mqttPusher;
PluginManager* _pluginManager;
RestAPI* _httpsServer;
AnalyticsManager* _analyticsManager;
std::map<std::string, SBasePtr> _sensorMap;
QueryEngine& _queryEngine = QueryEngine::getInstance();
boost::shared_ptr<boost::asio::io_service::work> keepAliveWork;
//TODO: fix sensormap rebuilding after plugin unloads
std::vector<reading_t>* sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>* buffer, const bool rel) {
//Initializing the sensor map if necessary. Thread safe!
if(_queryEngine.updated.load()) {
if(!_queryEngine.updating.exchange(true)) {
_sensorMap.clear();
// Adding ordinary sensors to the map
for (auto &p : _pluginManager->getPlugins())
for (auto &g : p.configurator->getSensorGroups())
for (auto &s : g->getSensors())
_sensorMap.insert(std::make_pair(s->getName(), s));
// Adding data analytics sensors to the map
for(auto& p : _analyticsManager->getPlugins()) {
for(const auto& a : p.configurator->getAnalyzers())
if (a->getStreaming()) {
for (const auto &u : a->getUnits())
for (const auto &o: u->getBaseOutputs())
_sensorMap.insert(std::make_pair(o->getName(), o));
a->releaseUnits();
}
}
_queryEngine.updated.store(false);
_queryEngine.updating.store(false);
} else {
// Spinning while the sensormap is being built
while( _queryEngine.updating.load() ) {}
}
}
if(_sensorMap.count(name) > 0) {
SBasePtr sensor = _sensorMap[name];
if(!sensor->isInit())
return NULL;
else
return sensor->getCache()->getView(startTs, endTs, buffer, rel);
// Spinning on a lock if the query engine is being updated
while( _queryEngine.updating.load() ) {}
++_queryEngine.access;
std::vector<reading_t>* res = NULL;
shared_ptr<map<string, SBasePtr>> sensorMap = _queryEngine.getSensorMap();
if(sensorMap!=nullptr && sensorMap->count(name)>0) {
SBasePtr sensor = sensorMap->at(name);
res = sensor->isInit() ? sensor->getCache()->getView(startTs, endTs, buffer, rel) : NULL;
}
return NULL;
--_queryEngine.access;
return res;
}
void sigHandler(int sig) {
......@@ -296,7 +272,6 @@ int main(int argc, char** argv) {
topics.clear();
LOG(info) << "Built a sensor hierarchy tree of size " << navigator->getTreeSize() << " and depth " << navigator->getTreeDepth() << ".";
_queryEngine.setNavigator(navigator);
_queryEngine.triggerUpdate();
} catch (const std::invalid_argument &e) {
LOG(error) << e.what();
LOG(error) << "Failed to build sensor hierarchy tree!";
......@@ -307,9 +282,29 @@ int main(int argc, char** argv) {
_queryEngine.setFilter(analyticsSettings.filter);
_queryEngine.setSensorHierarchy(analyticsSettings.hierarchy);
_queryEngine.setQueryCallback(sensorQueryCallback);
if(!_analyticsManager->load(argv[argc-1], "dcdbpusher.conf", pluginSettings)) {
LOG(fatal) << "Failed to load data analytics manager!";
return 1;
} else if(!_analyticsManager->getPlugins().empty()) {
// Preparing the sensor map used for the QueryEngine
std::shared_ptr <std::map<std::string, SBasePtr>> sensorMap = std::make_shared<std::map<std::string, SBasePtr>>();
for (const auto &p : _pluginManager->getPlugins())
for (const auto &g : p.configurator->getSensorGroups())
for (const auto &s : g->getSensors())
sensorMap->insert(std::make_pair(s->getName(), s));
for(auto& p : _analyticsManager->getPlugins()) {
for (const auto &a : p.configurator->getAnalyzers())
if (a->getStreaming()) {
for (const auto &u : a->getUnits())
for (const auto &o: u->getBaseOutputs())
sensorMap->insert(std::make_pair(o->getName(), o));
a->releaseUnits();
}
}
_queryEngine.setSensorMap(sensorMap);
}
//print configuration to give some feedback
......@@ -383,7 +378,7 @@ int main(int argc, char** argv) {
LOG(info) << "Starting sensors...";
_pluginManager->startPlugin();
if(!_queryEngine.updated.is_lock_free())
if(!_queryEngine.updating.is_lock_free())
LOG(warning) << "This machine does not support lock-free atomics. Performance may be degraded.";
LOG(info) << "Init analyzers...";
......@@ -447,7 +442,8 @@ int main(int argc, char** argv) {
LOG(info) << "MQTTPusher stopped.";
LOG(info) << "Tearing down objects...";
_sensorMap.clear();
_queryEngine.setNavigator(nullptr);
_queryEngine.setSensorMap(nullptr);
delete _httpsServer;
delete _mqttPusher;
delete _analyticsManager;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment