Commit bcec3a69 authored by Micha Müller's avatar Micha Müller
Browse files

Pusher: Caliper plugin preparations: introduce sensor mutex to allow for...

Pusher: Caliper plugin preparations: introduce sensor mutex to allow for runtime addition of sensors
parent d66b6893
......@@ -132,6 +132,10 @@ void MQTTPusher::push() {
//for faster response
break;
}
if (!p.configurator->trylockSensors()) {
//skip this plugin if it is currently locked and try again later
continue;
}
for (const auto &g : p.configurator->getSensorGroups()) {
for (const auto &s : g->getSensors()) {
if (s->getSizeOfReadingQueue() >= g->getMinValues()) {
......@@ -145,6 +149,7 @@ void MQTTPusher::push() {
}
}
}
p.configurator->unlockSensors();
}
// Push output analytics sensors
for (auto &p : _analyticsPlugins) {
......@@ -224,8 +229,9 @@ bool MQTTPusher::sendMappings() {
std::string topic, name;
unsigned int publishCtr=0;
// Performing auto-publish for sensors
for(auto& p: _plugins)
for(auto& g: p.configurator->getSensorGroups())
for(auto& p: _plugins) {
p.configurator->lockSensors();
for(auto& g: p.configurator->getSensorGroups()) {
for(auto& s: g->getSensors()) {
topic = std::string(DCDB_MAP) + s->getMqtt();
name = s->getName();
......@@ -239,6 +245,9 @@ bool MQTTPusher::sendMappings() {
else
publishCtr++;
}
}
p.configurator->unlockSensors();
}
// Performing auto-publish for analytics output sensors
for(auto& p: _analyticsPlugins)
......@@ -284,9 +293,13 @@ void MQTTPusher::computeMsgRate() {
// Computing number of sent MQTT messages per second
float msgRate = 0;
bool dynWarning = false;
for(auto& p : _plugins)
for(const auto& g : p.configurator->getSensorGroups())
for(auto& p : _plugins) {
p.configurator->lockSensors();
for(const auto& g : p.configurator->getSensorGroups()) {
msgRate += (float)g->getSensors().size() * ( 1000.0f / (float)g->getInterval() ) / (float)g->getMinValues();
}
p.configurator->unlockSensors();
}
for(auto& p : _analyticsPlugins)
for(const auto& a : p.configurator->getAnalyzers()) {
if (a->getStreaming() && !a->getDynamic()) {
......
......@@ -235,7 +235,9 @@ bool PluginManager::reloadPluginConfig(const string& id) {
for (const auto& p : _plugins) {
if (p.id == id) {
// Removing obsolete MQTT topics
p.configurator->lockSensors();
removeTopics(p);
p.configurator->unlockSensors();
if (p.configurator->reReadConfig()) {
// Perform checks on MQTT topics
if(!checkTopics(p)) {
......
......@@ -206,9 +206,8 @@ private:
*/
void removeTopics(const pusherPlugin_t& p);
pusherPluginStorage_t _plugins; /**< Storage to hold all loaded plugins */
pluginSettings_t _pluginSettings; /**< Default plugin settings to use when
loading a new plugin */
pusherPluginStorage_t _plugins; ///< Storage to hold all loaded plugins
pluginSettings_t _pluginSettings; ///< Default plugin settings to use when loading a new plugin
std::string _cfgFilePath;
logger_t lg;
};
......
......@@ -97,6 +97,7 @@ void RestAPI::GET_sensors(endpointArgs) {
for(const auto& p : _pluginManager->getPlugins()) {
if (p.id == plugin) {
p.configurator->lockSensors();
std::ostringstream data;
if (getQuery("json", queries) == "true") {
boost::property_tree::ptree root, sensors;
......@@ -117,6 +118,7 @@ void RestAPI::GET_sensors(endpointArgs) {
}
}
}
p.configurator->unlockSensors();
res.body() = data.str();
res.result(http::status::ok);
return;
......@@ -154,6 +156,7 @@ void RestAPI::GET_average(endpointArgs) {
for(const auto& p : _pluginManager->getPlugins()) {
if (p.id == plugin) {
res.body() = "Sensor not found!\n";
p.configurator->lockSensors();
for(const auto& g : p.configurator->getSensorGroups()) {
for(const auto& s : g->getSensors()) {
if (s->getName() == sensor && s->isInit()) {
......@@ -173,6 +176,7 @@ void RestAPI::GET_average(endpointArgs) {
}
}
}
p.configurator->unlockSensors();
}
}
......@@ -427,12 +431,15 @@ void RestAPI::reloadQueryEngine(const bool force) {
std::shared_ptr <SensorNavigator> navigator = std::make_shared<SensorNavigator>();
std::vector<std::string> topics;
for (const auto &p : _pluginManager->getPlugins())
for (const auto &p : _pluginManager->getPlugins()) {
p.configurator->lockSensors();
for (const auto &g : p.configurator->getSensorGroups())
for (const auto &s : g->getSensors()) {
topics.push_back(s->getMqtt());
sensorMap->insert(std::make_pair(s->getName(), s));
}
p.configurator->unlockSensors();
}
// Adding data analytics sensors to the map
for(auto& p : _manager->getPlugins()) {
......
......@@ -27,6 +27,7 @@
#ifndef SRC_CONFIGURATORINTERFACE_H_
#define SRC_CONFIGURATORINTERFACE_H_
#include <mutex>
#include <string>
#include <vector>
......@@ -117,6 +118,36 @@ public:
derivedSetGlobalSettings(pluginSettings);
}
/**
* @brief Lock access to plugin sensors for exclusive access.
*
* @details Blocks if already occupied until the lock gets available again.
*/
void lockSensors() {
return _mutex.lock();
}
/**
* @brief Try to lock access to plugin sensors for exclusive access.
*
* @details Returns immediately.
*
* @details True if locked, false if failed to lock.
*/
bool trylockSensors() {
return _mutex.lock();
}
/**
* @brief Unlock the previously locked access to plugin sensors.
*
* @details Causes undefined behavior if sensors were not successfully
* locked before.
*/
void unlockSensors() {
return _mutex.lock();
}
/**
* @brief Get all sensor groups.
*
......@@ -289,6 +320,9 @@ protected:
std::string _cfgPath; ///< Path + name of config file to read from
std::string _mqttPrefix; ///< Global MQTT prefix
unsigned int _cacheInterval; ///< Time interval in ms all sensors should cache
//TODO lazy approach: only synchronized all source code locations which possible can interfere with caliper plugin.
// It may be necessary to sync other source code locations if this mutex is used for other purposes than for Caliper
std::mutex _mutex; ///< Allow for thread-safe modification of sensors during runtime (currently only used for Caliper plugin)
std::vector<SGroupPtr> _sensorGroupInterfaces; ///< Sensor group storage
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment