Commit 2a1cb43b authored by Micha Müller's avatar Micha Müller
Browse files

Pusher: Replace senor mutex with plugin specific locking method

parent 27702504
...@@ -132,12 +132,8 @@ void MQTTPusher::push() { ...@@ -132,12 +132,8 @@ void MQTTPusher::push() {
//for faster response //for faster response
break; break;
} }
if (!p.configurator->trylockSensors()) {
//skip this plugin if it is currently locked and try again later
continue;
}
for (const auto &g : p.configurator->getSensorGroups()) { for (const auto &g : p.configurator->getSensorGroups()) {
for (const auto &s : g->getSensors()) { for (const auto &s : g->acquireSensors()) {
if (s->getSizeOfReadingQueue() >= g->getMinValues()) { if (s->getSizeOfReadingQueue() >= g->getMinValues()) {
if (_msgCap == DISABLED || totalCount < (unsigned)_maxNumberOfMessages) { if (_msgCap == DISABLED || totalCount < (unsigned)_maxNumberOfMessages) {
if (sendReadings(*s, reads, totalCount) > 0) { if (sendReadings(*s, reads, totalCount) > 0) {
...@@ -148,8 +144,8 @@ void MQTTPusher::push() { ...@@ -148,8 +144,8 @@ void MQTTPusher::push() {
} }
} }
} }
g->releaseSensors();
} }
p.configurator->unlockSensors();
} }
// Push output analytics sensors // Push output analytics sensors
for (auto &p : _analyticsPlugins) { for (auto &p : _analyticsPlugins) {
...@@ -230,9 +226,8 @@ bool MQTTPusher::sendMappings() { ...@@ -230,9 +226,8 @@ bool MQTTPusher::sendMappings() {
unsigned int publishCtr=0; unsigned int publishCtr=0;
// Performing auto-publish for sensors // Performing auto-publish for sensors
for(auto& p: _plugins) { for(auto& p: _plugins) {
p.configurator->lockSensors();
for(auto& g: p.configurator->getSensorGroups()) { for(auto& g: p.configurator->getSensorGroups()) {
for(auto& s: g->getSensors()) { for(auto& s: g->acquireSensors()) {
topic = std::string(DCDB_MAP) + s->getMqtt(); topic = std::string(DCDB_MAP) + s->getMqtt();
name = s->getName(); name = s->getName();
...@@ -245,8 +240,8 @@ bool MQTTPusher::sendMappings() { ...@@ -245,8 +240,8 @@ bool MQTTPusher::sendMappings() {
else else
publishCtr++; publishCtr++;
} }
g->releaseSensors();
} }
p.configurator->unlockSensors();
} }
// Performing auto-publish for analytics output sensors // Performing auto-publish for analytics output sensors
...@@ -294,11 +289,10 @@ void MQTTPusher::computeMsgRate() { ...@@ -294,11 +289,10 @@ void MQTTPusher::computeMsgRate() {
float msgRate = 0; float msgRate = 0;
bool dynWarning = false; bool dynWarning = false;
for(auto& p : _plugins) { for(auto& p : _plugins) {
p.configurator->lockSensors();
for(const auto& g : p.configurator->getSensorGroups()) { for(const auto& g : p.configurator->getSensorGroups()) {
msgRate += (float)g->getSensors().size() * ( 1000.0f / (float)g->getInterval() ) / (float)g->getMinValues(); msgRate += (float)g->acquireSensors().size() * ( 1000.0f / (float)g->getInterval() ) / (float)g->getMinValues();
g->releaseSensors();
} }
p.configurator->unlockSensors();
} }
for(auto& p : _analyticsPlugins) for(auto& p : _analyticsPlugins)
for(const auto& a : p.configurator->getAnalyzers()) { for(const auto& a : p.configurator->getAnalyzers()) {
......
...@@ -235,9 +235,7 @@ bool PluginManager::reloadPluginConfig(const string& id) { ...@@ -235,9 +235,7 @@ bool PluginManager::reloadPluginConfig(const string& id) {
for (const auto& p : _plugins) { for (const auto& p : _plugins) {
if (p.id == id) { if (p.id == id) {
// Removing obsolete MQTT topics // Removing obsolete MQTT topics
p.configurator->lockSensors();
removeTopics(p); removeTopics(p);
p.configurator->unlockSensors();
if (p.configurator->reReadConfig()) { if (p.configurator->reReadConfig()) {
// Perform checks on MQTT topics // Perform checks on MQTT topics
if(!checkTopics(p)) { if(!checkTopics(p)) {
...@@ -266,11 +264,12 @@ bool PluginManager::checkTopics(const pusherPlugin_t& p) { ...@@ -266,11 +264,12 @@ bool PluginManager::checkTopics(const pusherPlugin_t& p) {
if (!mqttCheck.checkGroup(g->getGroupName())) { if (!mqttCheck.checkGroup(g->getGroupName())) {
validTopics = false; validTopics = false;
} }
for (const auto& s : g->getSensors()) { for (const auto& s : g->acquireSensors()) {
if (!mqttCheck.checkTopic(s->getMqtt()) || !mqttCheck.checkName(s->getName())) { if (!mqttCheck.checkTopic(s->getMqtt()) || !mqttCheck.checkName(s->getName())) {
validTopics = false; validTopics = false;
} }
} }
g->releaseSensors();
} }
return validTopics; return validTopics;
...@@ -280,9 +279,10 @@ void PluginManager::removeTopics(const pusherPlugin_t& p) { ...@@ -280,9 +279,10 @@ void PluginManager::removeTopics(const pusherPlugin_t& p) {
MQTTChecker& mqttCheck = MQTTChecker::getInstance(); MQTTChecker& mqttCheck = MQTTChecker::getInstance();
for(const auto& g : p.configurator->getSensorGroups()) { for(const auto& g : p.configurator->getSensorGroups()) {
mqttCheck.removeGroup(g->getGroupName()); mqttCheck.removeGroup(g->getGroupName());
for (const auto &s : g->getSensors()) { for (const auto &s : g->acquireSensors()) {
mqttCheck.removeTopic(s->getMqtt()); mqttCheck.removeTopic(s->getMqtt());
mqttCheck.removeName(s->getName()); mqttCheck.removeName(s->getName());
} }
g->releaseSensors();
} }
} }
...@@ -97,28 +97,28 @@ void RestAPI::GET_sensors(endpointArgs) { ...@@ -97,28 +97,28 @@ void RestAPI::GET_sensors(endpointArgs) {
for(const auto& p : _pluginManager->getPlugins()) { for(const auto& p : _pluginManager->getPlugins()) {
if (p.id == plugin) { if (p.id == plugin) {
p.configurator->lockSensors();
std::ostringstream data; std::ostringstream data;
if (getQuery("json", queries) == "true") { if (getQuery("json", queries) == "true") {
boost::property_tree::ptree root, sensors; boost::property_tree::ptree root, sensors;
for(const auto& g : p.configurator->getSensorGroups()) { for(const auto& g : p.configurator->getSensorGroups()) {
boost::property_tree::ptree group; boost::property_tree::ptree group;
for(const auto& s : g->getSensors()) { for(const auto& s : g->acquireSensors()) {
group.push_back(boost::property_tree::ptree::value_type("", boost::property_tree::ptree(s->getMqtt()))); group.push_back(boost::property_tree::ptree::value_type("", boost::property_tree::ptree(s->getMqtt())));
} }
g->releaseSensors();
sensors.add_child(g->getGroupName(), group); sensors.add_child(g->getGroupName(), group);
} }
root.add_child(p.id, sensors); root.add_child(p.id, sensors);
boost::property_tree::write_json(data, root, true); boost::property_tree::write_json(data, root, true);
} else { } else {
for(const auto& g : p.configurator->getSensorGroups()) { for(const auto& g : p.configurator->getSensorGroups()) {
for(const auto& s : g->getSensors()) { for(const auto& s : g->acquireSensors()) {
data << g->getGroupName() << "::" << s->getMqtt() << "\n"; data << g->getGroupName() << "::" << s->getMqtt() << "\n";
} }
g->releaseSensors();
} }
} }
p.configurator->unlockSensors();
res.body() = data.str(); res.body() = data.str();
res.result(http::status::ok); res.result(http::status::ok);
return; return;
...@@ -156,9 +156,8 @@ void RestAPI::GET_average(endpointArgs) { ...@@ -156,9 +156,8 @@ void RestAPI::GET_average(endpointArgs) {
for(const auto& p : _pluginManager->getPlugins()) { for(const auto& p : _pluginManager->getPlugins()) {
if (p.id == plugin) { if (p.id == plugin) {
res.body() = "Sensor not found!\n"; res.body() = "Sensor not found!\n";
p.configurator->lockSensors();
for(const auto& g : p.configurator->getSensorGroups()) { for(const auto& g : p.configurator->getSensorGroups()) {
for(const auto& s : g->getSensors()) { for(const auto& s : g->acquireSensors()) {
if (s->getName() == sensor && s->isInit()) { if (s->getName() == sensor && s->isInit()) {
uint64_t avg = 0; uint64_t avg = 0;
try { try {
...@@ -167,16 +166,18 @@ void RestAPI::GET_average(endpointArgs) { ...@@ -167,16 +166,18 @@ void RestAPI::GET_average(endpointArgs) {
res.body() = "Unable to compute average: "; res.body() = "Unable to compute average: ";
res.body() += e.what(); res.body() += e.what();
res.result(http::status::internal_server_error); res.result(http::status::internal_server_error);
g->releaseSensors();
return; return;
} }
res.body() = plugin + "::" + sensor + " Average of last " + res.body() = plugin + "::" + sensor + " Average of last " +
std::to_string(time) + " seconds is " + std::to_string(avg) + "\n"; std::to_string(time) + " seconds is " + std::to_string(avg) + "\n";
res.result(http::status::ok); res.result(http::status::ok);
g->releaseSensors();
return; return;
} }
} }
g->releaseSensors();
} }
p.configurator->unlockSensors();
} }
} }
...@@ -432,13 +433,13 @@ void RestAPI::reloadQueryEngine(const bool force) { ...@@ -432,13 +433,13 @@ void RestAPI::reloadQueryEngine(const bool force) {
std::vector<std::string> topics; std::vector<std::string> topics;
for (const auto &p : _pluginManager->getPlugins()) { for (const auto &p : _pluginManager->getPlugins()) {
p.configurator->lockSensors(); for (const auto &g : p.configurator->getSensorGroups()) {
for (const auto &g : p.configurator->getSensorGroups()) for (const auto &s : g->acquireSensors()) {
for (const auto &s : g->getSensors()) {
topics.push_back(s->getMqtt()); topics.push_back(s->getMqtt());
sensorMap->insert(std::make_pair(s->getName(), s)); sensorMap->insert(std::make_pair(s->getName(), s));
} }
p.configurator->unlockSensors(); g->releaseSensors();
}
} }
// Adding data analytics sensors to the map // Adding data analytics sensors to the map
......
...@@ -265,9 +265,11 @@ int main(int argc, char** argv) { ...@@ -265,9 +265,11 @@ int main(int argc, char** argv) {
std::shared_ptr <SensorNavigator> navigator = std::make_shared<SensorNavigator>(); std::shared_ptr <SensorNavigator> navigator = std::make_shared<SensorNavigator>();
vector <std::string> topics; vector <std::string> topics;
for (const auto &p : _pluginManager->getPlugins()) for (const auto &p : _pluginManager->getPlugins())
for (const auto &g : p.configurator->getSensorGroups()) for (const auto &g : p.configurator->getSensorGroups()) {
for (const auto &s : g->getSensors()) for (const auto &s : g->acquireSensors())
topics.push_back(s->getMqtt()); topics.push_back(s->getMqtt());
g->releaseSensors();
}
try { try {
navigator->setFilter(analyticsSettings.filter); navigator->setFilter(analyticsSettings.filter);
navigator->buildTree(analyticsSettings.hierarchy, &topics); navigator->buildTree(analyticsSettings.hierarchy, &topics);
...@@ -293,9 +295,11 @@ int main(int argc, char** argv) { ...@@ -293,9 +295,11 @@ int main(int argc, char** argv) {
std::shared_ptr <std::map<std::string, SBasePtr>> sensorMap = std::make_shared<std::map<std::string, SBasePtr>>(); std::shared_ptr <std::map<std::string, SBasePtr>> sensorMap = std::make_shared<std::map<std::string, SBasePtr>>();
for (const auto &p : _pluginManager->getPlugins()) for (const auto &p : _pluginManager->getPlugins())
for (const auto &g : p.configurator->getSensorGroups()) for (const auto &g : p.configurator->getSensorGroups()) {
for (const auto &s : g->getSensors()) for (const auto &s : g->acquireSensors())
sensorMap->insert(std::make_pair(s->getName(), s)); sensorMap->insert(std::make_pair(s->getName(), s));
g->releaseSensors();
}
for(auto& p : _analyticsManager->getPlugins()) { for(auto& p : _analyticsManager->getPlugins()) {
for (const auto &a : p.configurator->getAnalyzers()) for (const auto &a : p.configurator->getAnalyzers())
......
...@@ -27,7 +27,6 @@ ...@@ -27,7 +27,6 @@
#ifndef SRC_CONFIGURATORINTERFACE_H_ #ifndef SRC_CONFIGURATORINTERFACE_H_
#define SRC_CONFIGURATORINTERFACE_H_ #define SRC_CONFIGURATORINTERFACE_H_
#include <mutex>
#include <string> #include <string>
#include <vector> #include <vector>
...@@ -118,36 +117,6 @@ public: ...@@ -118,36 +117,6 @@ public:
derivedSetGlobalSettings(pluginSettings); derivedSetGlobalSettings(pluginSettings);
} }
/**
* @brief Lock access to plugin sensors for exclusive access.
*
* @details Blocks if already occupied until the lock gets available again.
*/
void lockSensors() {
return _mutex.lock();
}
/**
* @brief Try to lock access to plugin sensors for exclusive access.
*
* @details Returns immediately.
*
* @details True if locked, false if failed to lock.
*/
bool trylockSensors() {
return _mutex.try_lock();
}
/**
* @brief Unlock the previously locked access to plugin sensors.
*
* @details Causes undefined behavior if sensors were not successfully
* locked before.
*/
void unlockSensors() {
_mutex.lock();
}
/** /**
* @brief Get all sensor groups. * @brief Get all sensor groups.
* *
...@@ -320,9 +289,6 @@ protected: ...@@ -320,9 +289,6 @@ protected:
std::string _cfgPath; ///< Path + name of config file to read from std::string _cfgPath; ///< Path + name of config file to read from
std::string _mqttPrefix; ///< Global MQTT prefix std::string _mqttPrefix; ///< Global MQTT prefix
unsigned int _cacheInterval; ///< Time interval in ms all sensors should cache unsigned int _cacheInterval; ///< Time interval in ms all sensors should cache
//TODO lazy approach: only synchronized all source code locations which possible can interfere with caliper plugin.
// It may be necessary to sync other source code locations if this mutex is used for other purposes than for Caliper
std::mutex _mutex; ///< Allow for thread-safe modification of sensors during runtime (currently only used for Caliper plugin)
std::vector<SGroupPtr> _sensorGroupInterfaces; ///< Sensor group storage std::vector<SGroupPtr> _sensorGroupInterfaces; ///< Sensor group storage
......
...@@ -238,8 +238,10 @@ public: ...@@ -238,8 +238,10 @@ public:
//group which consists of only one sensor //group which consists of only one sensor
SB_Ptr sensor; SB_Ptr sensor;
//perhaps one sensor is already present because it was copied from the template group //perhaps one sensor is already present because it was copied from the template group
if (group->getSensors().size() != 0) { if (group->acquireSensors().size() != 0) {
sensor = std::dynamic_pointer_cast<SBase>(group->getSensors()[0]); group->releaseSensors();
sensor = std::dynamic_pointer_cast<SBase>(group->acquireSensors()[0]);
group->releaseSensors();
//check if cast was successful (sensor != nullptr) //check if cast was successful (sensor != nullptr)
if (sensor) { if (sensor) {
sensor->setName(val.second.data()); sensor->setName(val.second.data());
...@@ -254,6 +256,7 @@ public: ...@@ -254,6 +256,7 @@ public:
<< val.second.data() << " had a type mismatch when casting! Omitting"; << val.second.data() << " had a type mismatch when casting! Omitting";
} }
} else { } else {
group->releaseSensors();
sensor = std::make_shared<SBase>(val.second.data()); sensor = std::make_shared<SBase>(val.second.data());
if (readSensorBase(*sensor, val.second)) { if (readSensorBase(*sensor, val.second)) {
group->pushBackSensor(sensor); group->pushBackSensor(sensor);
...@@ -558,8 +561,10 @@ protected: ...@@ -558,8 +561,10 @@ protected:
group->setEntity(&sEntity); group->setEntity(&sEntity);
SB_Ptr sensor; SB_Ptr sensor;
//perhaps one sensor is already present because it was copied from the template group //perhaps one sensor is already present because it was copied from the template group
if (group->getSensors().size() != 0) { if (group->aquireSensors().size() != 0) {
sensor = std::dynamic_pointer_cast<SBase>(group->getSensors()[0]); group->releaseSensors();
sensor = std::dynamic_pointer_cast<SBase>(group->acquireSensors()[0]);
group->releaseSensors();
//check if cast was successful (sensor != nullptr) //check if cast was successful (sensor != nullptr)
if (sensor) { if (sensor) {
sensor->setName(val.second.data()); sensor->setName(val.second.data());
...@@ -642,8 +647,8 @@ protected: ...@@ -642,8 +647,8 @@ protected:
*/ */
bool constructSensorTopics() { bool constructSensorTopics() {
// Sensor names are adjusted according to the respective MQTT topics // Sensor names are adjusted according to the respective MQTT topics
for(auto& g: _sensorGroups) for(auto& g: _sensorGroups) {
for(auto& s: g->getSensors()) { for(auto& s: g->acquireSensors()) {
s->setMqtt(MQTTChecker::formatTopic(_mqttPrefix) + s->setMqtt(MQTTChecker::formatTopic(_mqttPrefix) +
MQTTChecker::formatTopic(g->getEntity()->getMqttPart()) + MQTTChecker::formatTopic(g->getEntity()->getMqttPart()) +
MQTTChecker::formatTopic(g->getMqttPart()) + MQTTChecker::formatTopic(g->getMqttPart()) +
...@@ -652,6 +657,8 @@ protected: ...@@ -652,6 +657,8 @@ protected:
if(s->getSinkPath()!="") if(s->getSinkPath()!="")
s->setSinkPath(MQTTChecker::topicToFile(s->getMqtt(), s->getSinkPath())); s->setSinkPath(MQTTChecker::topicToFile(s->getMqtt(), s->getSinkPath()));
} }
g->releaseSensors();
}
return true; return true;
} }
///@} ///@}
...@@ -822,8 +829,10 @@ public: ...@@ -822,8 +829,10 @@ public:
//group which consists of only one sensor //group which consists of only one sensor
SB_Ptr sensor; SB_Ptr sensor;
//perhaps one sensor is already present because it was copied from the template group //perhaps one sensor is already present because it was copied from the template group
if (group->getSensors().size() != 0) { if (group->acquireSensors().size() != 0) {
sensor = std::dynamic_pointer_cast<SBase>(group->getSensors()[0]); group->releaseSensors();
sensor = std::dynamic_pointer_cast<SBase>(group->acquireSensors()[0]);
group->releaseSensors();
//check if cast was successful (sensor != nullptr) //check if cast was successful (sensor != nullptr)
if (sensor) { if (sensor) {
sensor->setName(val.second.data()); sensor->setName(val.second.data());
...@@ -838,6 +847,7 @@ public: ...@@ -838,6 +847,7 @@ public:
<< val.second.data() << " had a type mismatch when casting! Omitting"; << val.second.data() << " had a type mismatch when casting! Omitting";
} }
} else { } else {
group->releaseSensors();
sensor = std::make_shared<SBase>(val.second.data()); sensor = std::make_shared<SBase>(val.second.data());
if (readSensorBase(*sensor, val.second)) { if (readSensorBase(*sensor, val.second)) {
group->pushBackSensor(sensor); group->pushBackSensor(sensor);
...@@ -1056,7 +1066,7 @@ protected: ...@@ -1056,7 +1066,7 @@ protected:
bool constructSensorTopics() { bool constructSensorTopics() {
// Sensor names are adjusted according to the respective MQTT topics // Sensor names are adjusted according to the respective MQTT topics
for(auto& g: _sensorGroups) for(auto& g: _sensorGroups)
for(auto& s: g->getSensors()) { for(auto& s: g->acquireSensors()) {
s->setMqtt(MQTTChecker::formatTopic(_mqttPrefix) + s->setMqtt(MQTTChecker::formatTopic(_mqttPrefix) +
MQTTChecker::formatTopic(g->getMqttPart()) + MQTTChecker::formatTopic(g->getMqttPart()) +
MQTTChecker::formatTopic(s->getMqtt())); MQTTChecker::formatTopic(s->getMqtt()));
......
...@@ -149,11 +149,35 @@ public: ...@@ -149,11 +149,35 @@ public:
virtual void pushBackSensor(SBasePtr s) = 0; virtual void pushBackSensor(SBasePtr s) = 0;
/** /**
* @brief Retrieve all sensors of this group. * @brief Acquire access to all sensors of this group.
*
* @details Always release sensor access via releaseSensors() afterwards to
* ensure multi-threading safety!
* acquireSensors() and releaseSensors() allow for thread-safe
* (locked) access to _baseSensors. However, as most plugins do not
* require synchronized access (and we want to avoid the induced
* lock overhead for them) the actual locking mechanism is only
* implemented in plugins actually requiring thread-safe access.
* Every caller which may require thread safe access to
* _baseSensors should use acquireSensors() and releaseSensors().
* If a plugin does not implement a locking mechanism because it
* does not require synchronized access releaseSensors() can be
* omitted.
* See the Caliper plugin for a SensorGroup which implements locked
* access to _baseSensors.
* *
* @return A std::vector with shared pointers to all associated sensors. * @return A std::vector with shared pointers to all associated sensors.
*/ */
virtual std::vector<SBasePtr>& getSensors() = 0; virtual std::vector<SBasePtr>& acquireSensors() { return _baseSensors; }
/**
* @brief Release previously acquired access to sensors of this group.
*
* @details Always acquire sensors via acquireSensors() beforehand!
* See description of acquireSensors() for an in-depth reasoning
* about this method.
*/
virtual void releaseSensors() { /* do nothing in base implementation */ }
/** /**
* @brief Print interface configuration. * @brief Print interface configuration.
...@@ -290,6 +314,7 @@ protected: ...@@ -290,6 +314,7 @@ protected:
unsigned int _interval; ///< Reading interval cycle in milliseconds unsigned int _interval; ///< Reading interval cycle in milliseconds
std::atomic_uint _pendingTasks; ///< Number of currently outstanding read operations std::atomic_uint _pendingTasks; ///< Number of currently outstanding read operations
std::unique_ptr<boost::asio::deadline_timer> _timer; ///< Time readings in a periodic interval std::unique_ptr<boost::asio::deadline_timer> _timer; ///< Time readings in a periodic interval
std::vector<SBasePtr> _baseSensors; ///< Vector with sensors associated to this group
LOGGER lg; ///< Personal logging instance LOGGER lg; ///< Personal logging instance
}; };
......
...@@ -44,6 +44,10 @@ ...@@ -44,6 +44,10 @@
* the SensorGroupTemplate<S, nullptr_t>. However, having two templates * the SensorGroupTemplate<S, nullptr_t>. However, having two templates
* for cases with and without entities greatly reduces code duplication * for cases with and without entities greatly reduces code duplication
* in derived plugin sensor groups. * in derived plugin sensor groups.
* Internal usage of _baseSensors is not synchronized via
* acquireSensors()/releaseSensors() as it is only accessed (e.g. in
* (de-)constructor) when no other caller is expected to access
* _baseSensors.
* *
* @ingroup pusherplugins * @ingroup pusherplugins
*/ */
...@@ -192,15 +196,6 @@ public: ...@@ -192,15 +196,6 @@ public:
} }
} }
/**
* @brief Retrieve all sensors of this group.
*
* @details Not intended to be overwritten.
*
* @return A std::vector with shared pointers to all associated sensors.
*/
virtual std::vector<SBasePtr>& getSensors() final override { return _baseSensors; }
/** /**
* @brief Print SensorGroup configuration. * @brief Print SensorGroup configuration.
* *
...@@ -249,8 +244,7 @@ protected: ...@@ -249,8 +244,7 @@ protected:
_pendingTasks--; _pendingTasks--;
} }
std::vector<S_Ptr> _sensors; ///< Sensors associated with this group std::vector<S_Ptr> _sensors; ///< Store pointers to actual sensor objects in addition to general sensor base pointers