Commit 75197e31 authored by Alessio Netti's avatar Alessio Netti
Browse files

MQTT topic checks

- Checks on MQTT topics have been made generic in the MQTTChecker singleton
class under "common"
- MQTT topic checks are now performed also when plugins are reloaded; if
the checks fail, the plugin is cleared and left uninitialized
- All sensors in a plugin not satisfying the checks are logged now, not
only the first one
- Fixed a minor bug that resulted in msgrates of 0 under certain
compilers, due to type promotion in C++
parent 81aa7087
......@@ -108,15 +108,9 @@ bool AnalyticsManager::load(const string& path, const string& globalFile, const
// Returning an empty vector may indicate problems with the config file
if(dynLib.configurator->getAnalyzers().size() == 0) {
LOG(warning) << "Plugin \"" << dynLib.id << "\" created no analyzers!";
} else {
for(const auto& a : dynLib.configurator->getAnalyzers())
if( a->getStreaming() )
for(const auto& u : a->getUnits())
for(const auto& o: u->getBaseOutputs())
if(!mqttCheck.check(o->getMqtt())) {
LOG(error) << "Problematic MQTT-Topics, please check your config files!";
return false;
}
} else if(!checkTopics(dynLib)) {
LOG(error) << "Problematic MQTT-Topics, please check your config files!";
return false;
}
//save dl-struct
_plugins.push_back(dynLib);
......@@ -158,14 +152,44 @@ bool AnalyticsManager::reload(boost::asio::io_service& io, const string& plugin)
if(plugin=="" || plugin==p.id) {
LOG(info) << "Reload \"" << p.id << "\" data analytics plugin";
out = true;
if( !p.configurator->reReadConfig() )
//Removing obsolete MQTT topics
removeTopics(p);
//Reloading plugin
if(!p.configurator->reReadConfig())
return false;
for (const auto &a : p.configurator->getAnalyzers())
a->init(io);
//Checking new MQTT topics
if(!checkTopics(p)) {
removeTopics(p);
p.configurator->clearConfig();
return false;
} else
for (const auto &a : p.configurator->getAnalyzers())
a->init(io);
}
return out;
}
void AnalyticsManager::removeTopics(an_dl_t p) {
MQTTChecker& mqttCheck = MQTTChecker::getInstance();
for(const auto& a : p.configurator->getAnalyzers())
if( a->getStreaming() )
for(const auto& u : a->getUnits())
for(const auto& o: u->getBaseOutputs())
mqttCheck.removeTopic(o->getMqtt());
}
bool AnalyticsManager::checkTopics(an_dl_t p) {
MQTTChecker& mqttCheck = MQTTChecker::getInstance();
bool validTopics=true;
for(const auto& a : p.configurator->getAnalyzers())
if( a->getStreaming() )
for(const auto& u : a->getUnits())
for(const auto& o: u->getBaseOutputs())
if(!mqttCheck.checkTopic(o->getMqtt()))
validTopics = false;
return validTopics;
}
bool AnalyticsManager::start(const string& plugin, const string& analyzer) {
if(_status != LOADED) {
LOG(error) << "Cannot start, AnalyticsManager is not loaded!";
......@@ -368,7 +392,7 @@ restResponse_t AnalyticsManager::REST(const vector<string>& pathStrs, const vect
throw domain_error("Plugin or analyzer not found!");
} else if (action == "reload") {
if(!reload(io, plugin))
throw domain_error("Plugin not found or reload failed!");
throw domain_error("Plugin not found or reload failed, please check the config files and MQTT topics!");
else if(!start(plugin))
throw runtime_error("Plugin cannot be restarted!");
reply.response = "Plugin " + plugin + ": Sensors reloaded";
......
......@@ -176,6 +176,11 @@ public:
protected:
// Utility method to drop all topics associated to a certain plugin
void removeTopics(an_dl_t p);
// Utility method to check all MQTT topics within a certain plugin
bool checkTopics(an_dl_t p);
// Vector of plugins represented as an_dl_t structures
std::vector<an_dl_t> _plugins;
// Path used to load config files
......
......@@ -54,6 +54,15 @@ public:
*/
virtual bool reReadConfig() = 0;
/**
* @brief Clears the plugin configuration
*
* This method will stop and clear all analyzers that were created, returning the plugin to its
* uninitialized state.
*
*/
virtual void clearConfig() = 0;
/**
* @brief Sets a structure containing global settings to be used during analyzer creation This
* method must be implemented in derived classes.
......
......@@ -175,14 +175,13 @@ public:
}
/**
* @brief Clear all instantiated analyzers and read the configuration again
* @brief Clears the plugin configuration
*
* This will stop any analyzers that have been created, destoy them and finally create new ones
* from a new configuration read pass.
* This will stop any analyzers that have been created, destroy them and return the plugin to
* its uninitialized state.
*
* @return True if successful, false otherwise
*/
bool reReadConfig() final {
void clearConfig() final {
// Stop all analyzers
for(auto a : _analyzers)
a->stop();
......@@ -199,6 +198,18 @@ public:
_analyzerInterfaces.clear();
_analyzers.clear();
_templateAnalyzers.clear();
}
/**
* @brief Clear all instantiated analyzers and read the configuration again
*
* This will stop any analyzers that have been created, destroy them and finally create new ones
* from a new configuration read pass.
*
* @return True if successful, false otherwise
*/
bool reReadConfig() final {
clearConfig();
// Reading the configuration once again
return readConfig(_cfgPath);
......
......@@ -34,9 +34,22 @@ public:
/**
* @brief Resets the internal topics set
*/
void reset() { _topics.clear(); }
void reset() { _topics.clear(); _groups.clear(); }
/**
* @brief Removes a topic from the internal set
*
* This method should be used to remove obsolete topics from the set of active topics. This
* usually happens when plugins are reloaded in dcdbpusher, and all old sensors are destroyed.
*
* @param topic The topic to be removed
*/
void removeTopic(const std::string& topic) {
std::string str(topic);
str.erase(std::remove(str.begin(), str.end(), '/'), str.end());
_topics.erase(str);
}
//TODO: when textual MQTT topics are in place, implement check method that ensures all sensors, groups, analyzers and entities have distinct names
/**
* @brief Performs a check on a certain MQTT topic
*
......@@ -46,7 +59,7 @@ public:
* @param topic An arbitrary MQTT topic to check
* @return True if the topic is valid, False otherwise
*/
bool check(const std::string& topic) {
bool checkTopic(const std::string& topic) {
//MQTT topic must have 112 bit = 14 bytes = 28 hex chars
//but can have more with some extra '/', therefore remove all '/'
std::string str(topic);
......@@ -65,6 +78,35 @@ public:
return true;
}
/**
* @brief Removes a name from the internal set of entities
*
* This method should be used to remove obsolete sensor groups, entities or analyzers once they
* are destroyed, e.g. on plugin reload actions.
*
* @param topic The name (string) to be removed
*/
void removeName(const std::string& name) {
_groups.erase(name);
}
/**
* @brief Performs a check on a certain name
*
* The check is passed if the name for the group, analyzer or entity is not used already.
*
* @param topic An arbitrary name (string) to check
* @return True if the name is valid, False otherwise
*/
bool checkName(const std::string& name) {
auto returnIt = _groups.insert(name);
if (!returnIt.second) {
LOG(error) << "Name \"" << name << "\" used twice!";
return false;
}
return true;
}
private:
/**
......@@ -84,6 +126,9 @@ private:
// Set used to keep track of topics
std::set<std::string> _topics;
// Set used to keep track of groups, analyzers and other entities
std::set<std::string> _groups;
// Logger object to notify MQTT check outcome
logger_t lg;
};
......
......@@ -296,14 +296,15 @@ bool Configuration::readPlugins() {
}
//check if an MQTT-suffix was assigned twice
for(const auto& g : dynLib.configurator->getSensorGroups()) {
for(const auto& s : g->getSensors()) {
bool ok = mqttCheck.check(s->getMqtt());
if(!ok) {
LOG(error) << "Problematic MQTT-Topics, please check your config files!";
return false;
}
}
bool validTopics=true;
for(const auto& g : dynLib.configurator->getSensorGroups())
for(const auto& s : g->getSensors())
if(!mqttCheck.checkTopic(s->getMqtt()))
validTopics=false;
if(!validTopics) {
LOG(error) << "Problematic MQTT-Topics, please check your config files!";
return false;
}
//save dl-struct
......
......@@ -346,20 +346,28 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
while (!_httpsServer._mqttPusher->isHalted()) {
sleep(1);
}
// Removing obsolete MQTT topics
_httpsServer.removeTopics(p);
if (p.configurator->reReadConfig()) {
response = "Plugin " + pathStrs[0] + ": Configuration reloaded";
connection->set_status(server::connection::ok);
// Perform checks on MQTT topics
if(!_httpsServer.checkTopics(p)) {
response = "Plugin " + pathStrs[0] + ": problematic MQTT-Topics, please check your config files!";
connection->set_status(server::connection::internal_server_error);
_httpsServer.removeTopics(p);
p.configurator->clearConfig();
} else {
response = "Plugin " + pathStrs[0] + ": Configuration reloaded";
connection->set_status(server::connection::ok);
for (const auto &g : p.configurator->getSensorGroups()) {
g->init(_httpsServer._io);
g->start();
}
}
} else {
response = "Plugin " + pathStrs[0] + ": Could not reload configuration";
connection->set_status(server::connection::internal_server_error);
}
for (const auto &g : p.configurator->getSensorGroups()) {
g->init(_httpsServer._io);
g->start();
}
//continue MQTTPusher
_httpsServer._mqttPusher->cont();
break;
......@@ -449,6 +457,23 @@ bool HttpsServer::check_authkey(const std::string& authkey, permission requiredP
return false;
}
void HttpsServer::removeTopics(dl_t p) {
MQTTChecker& mqttCheck = MQTTChecker::getInstance();
for(const auto& g : p.configurator->getSensorGroups())
for(const auto& s : g->getSensors())
mqttCheck.removeTopic(s->getMqtt());
}
bool HttpsServer::checkTopics(dl_t p) {
MQTTChecker& mqttCheck = MQTTChecker::getInstance();
bool validTopics=true;
for(const auto& g : p.configurator->getSensorGroups())
for(const auto& s : g->getSensors())
if(!mqttCheck.checkTopic(s->getMqtt()))
validTopics = false;
return validTopics;
}
/*
std::string HttpsServer::password_callback(std::size_t max_length, asio::ssl::context_base::password_purpose purpose) {
return std::string("pwd");
......
......@@ -22,6 +22,7 @@
#include <boost/asio.hpp>
#include "mqttchecker.h"
#include "logging.h"
#include "includes/PluginDefinitions.h"
#include "MQTTPusher.h"
......@@ -106,6 +107,12 @@ private:
*/
bool check_authkey(const std::string& authkey, permission requiredPerm);
// Utility method to remove all MQTT topics associated to a plugin from the used set
void removeTopics(dl_t p);
// Utility method to check for the validity of all MQTT topics in a plugin
bool checkTopics(dl_t p);
pluginVector_t& _plugins;
authkeyMap_t _authkeys;
MQTTPusher* _mqttPusher;
......
......@@ -244,11 +244,11 @@ void MQTTPusher::computeMsgRate() {
float msgRate = 0;
for(auto& p : _plugins)
for(const auto& g : p.configurator->getSensorGroups())
msgRate += g->getSensors().size() * ( 1000 / g->getInterval() ) / g->getMinValues();
msgRate += (float)g->getSensors().size() * ( 1000.0f / (float)g->getInterval() ) / (float)g->getMinValues();
for(auto& p : _analyticsPlugins)
for(const auto& a : p.configurator->getAnalyzers())
for(const auto& u : a->getUnits())
msgRate += u->getBaseOutputs().size() * ( 1000 / a->getInterval() ) / a->getMinValues();
msgRate += (float)u->getBaseOutputs().size() * ( 1000.0f / (float)a->getInterval() ) / (float)a->getMinValues();
// The formula below assumes the pusher's sleep time is 1 sec; if not, change accordingly
if(_maxNumberOfMessages >= 0 && _msgCap != MINIMUM) {
_msgCap = _maxNumberOfMessages == 0 || msgRate > _maxNumberOfMessages ? DISABLED : ENABLED;
......
......@@ -25,6 +25,7 @@ public:
virtual std::string getVersion() = 0;
virtual bool readConfig(std::string cfgPath) = 0;
virtual bool reReadConfig() = 0;
virtual void clearConfig() = 0;
virtual void printConfig(LOG_LEVEL ll) = 0;
virtual void setGlobalSettings(const pluginSettings_t& pluginSettings) = 0;
virtual std::vector<SGroupPtr>& getSensorGroups() = 0;
......
......@@ -264,11 +264,9 @@ public:
}
/**
* Clear internal storage and read in the configuration again.
*
* @return True on success, false otherwise
* Clear internal storage and returns the plugin to the uninitialized state.
*/
bool reReadConfig() final {
void clearConfig() final {
//bring everything to a halt
for(auto g : _sensorGroups) {
g->stop();
......@@ -298,6 +296,15 @@ public:
_templateSensorBases.clear();
_templateSensorGroups.clear();
_templateSensorEntitys.clear();
}
/**
* Clear internal storage and read in the configuration again.
*
* @return True on success, false otherwise
*/
bool reReadConfig() final {
clearConfig();
//back to the very beginning
return readConfig(_cfgPath);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment