Commit 080fce90 authored by Micha Mueller's avatar Micha Mueller
Browse files

Rework MQTTPushers halt() method: now supports timeout

parent 058da43e
......@@ -242,6 +242,23 @@ bool MQTTPusher::sendMappings() {
return true;
}
bool MQTTPusher::halt(unsigned short timeout) {
_doHalt = true;
for (unsigned short i = 1; i <= timeout; i++) {
if (_halted) {
return true;
} else {
LOGM(info) << "Waiting for push cycle to pause... (" << i << "/" << timeout << ")";
sleep(1);
}
}
cont();
LOGM(info) << "Timeout: push cycle did not pause. Continuing...";
return false;
}
void MQTTPusher::computeMsgRate() {
// Computing number of sent MQTT messages per second
float msgRate = 0;
......
......@@ -28,30 +28,59 @@ public:
pluginVector_t& plugins, an_pluginVector_t& aPlugins, int maxNumberOfMessages, unsigned int maxInflightMsgNum, unsigned int maxQueuedMsgNum);
virtual ~MQTTPusher();
/**
* @brief MQTTPusher's main execution loop.
*
* @details If MQTTPusher is started this function runs indefinitely until
* a call to stop(). Execution of the main loop can be halted and
* continued with halt() and cont() respectively.
*/
void push();
/**
* @brief
*
* @return
*/
bool sendMappings();
/**
* @brief Start MQTTPusher's push loop.
*/
void start() {
_keepRunning = true;
}
/**
* @brief Stop MQTTPusher's push loop and terminate its execution.
*/
void stop() {
_keepRunning = false;
}
void halt() {
_doHalt = true;
}
/**
* @brief Blocking call to pause MQTTPusher's push loop.
*
* @details Instructs MQTTPusher to pause its push loop and blocks until
* MQTTPusher is actually paused or a timeout occurred. On a
* timeout execution of MQTTPusher is continued.
*
* @param timeout Time in seconds to wait for MQTTPusher to finish its
* current push cycle.
*
* @return True if MQTTPusher was succesfully paused, false if a timeout
* occurred and MQTTPusher still runs.
*/
bool halt(unsigned short timeout = 5);
/**
* @brief Continue MQTTPusher's push loop.
*/
void cont() {
computeMsgRate();
_doHalt = false;
}
bool isHalted() const {
return _halted;
}
private:
int sendReadings(SensorBase& s, reading_t* reads, std::size_t& totalCount);
void computeMsgRate();
......
......@@ -228,11 +228,12 @@ void RestAPI::PUT_reload(endpointArgs) {
if (p.id == plugin) {
//before modifying the plugin we need to ensure that we have exclusive access
//therefore pause the only other concurrent user (MQTTPusher)
_mqttPusher->halt();
//wait until MQTTPusher is paused
while (!_mqttPusher->isHalted()) {
sleep(1);
if (!_mqttPusher->halt()) {
res.body() = "Could not reload plugin (Timeout while waiting).\n";
res.result(http::status::internal_server_error);
return;
}
// Removing obsolete MQTT topics
removeTopics(p);
if (p.configurator->reReadConfig()) {
......@@ -313,35 +314,21 @@ void RestAPI::PUT_analytics_reload(endpointArgs) {
const std::string plugin = getQuery("plugin", queries);
//Pause mqttPusherin order to reload plugins
_mqttPusher->halt();
unsigned short retryWait = 0;
// Wait until MQTTPusher is paused in order to reload plugins
//do not wait longer than 10 seconds for mqttPusher to halt
//TODO implement timeout query
for (unsigned short i = 1; i < 6; i++) {
if (_mqttPusher->isHalted()) {
if (!_manager->reload(_io, plugin)) {
res.body() = "Plugin not found or reload failed, please check the config files and MQTT topics!\n";
res.result(http::status::not_found);
} else if (!_manager->start(plugin)){
res.body() = "Plugin cannot be restarted!\n";
res.result(http::status::internal_server_error);
} else {
res.body() = "Plugin " + plugin + ": Sensors reloaded\n";
res.result(http::status::ok);
}
_mqttPusher->cont();
return;
if (_mqttPusher->halt()) {
if (!_manager->reload(_io, plugin)) {
res.body() = "Plugin not found or reload failed, please check the config files and MQTT topics!\n";
res.result(http::status::not_found);
} else if (!_manager->start(plugin)){
res.body() = "Plugin cannot be restarted!\n";
res.result(http::status::internal_server_error);
} else {
ServerLOG(info) << "Waiting for MQTTPusher to halt ...(" << i << "/" << "5)";
sleep(1);
res.body() = "Plugin " + plugin + ": Sensors reloaded\n";
res.result(http::status::ok);
}
_mqttPusher->cont();
} else {
res.body() = "Could not reload plugins (Timeout while waiting).\n";
res.result(http::status::internal_server_error);
}
_mqttPusher->cont();
ServerLOG(info) << "Not waiting any longer for MQTTPusher";
res.body() = "Could not reload plugins (Timeout while waiting).\n";
res.result(http::status::internal_server_error);
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment