Commit f2ff1eee authored by Alessio Netti's avatar Alessio Netti

Fixing performance issues on QoS 1

- Implemented a check in MQTTPusher that prevents the main publish
loop from being invoked anytime mosquitto_loop returns, but only once
every second at most
parent 875a77c8
......@@ -63,6 +63,8 @@ MQTTPusher::~MQTTPusher() {
}
void MQTTPusher::push() {
int mosqErr;
uint64_t idleTime = 0;
//connect to broker (if necessary)
while (_keepRunning && !_connected) {
if (mosquitto_connect(_mosq, _brokerHost.c_str(), _brokerPort, 1000) != MOSQ_ERR_SUCCESS) {
......@@ -100,51 +102,54 @@ void MQTTPusher::push() {
LOGM(info) << "Connection established!";
}
}
if (_connected) {
totalCount = 0;
// Push sensor data
for(auto& p : _plugins) {
if(_doHalt) {
//for faster response
break;
}
for(const auto& g : p.configurator->getSensorGroups()) {
for(const auto& s : g->getSensors()) {
if (s->getSizeOfReadingQueue() >= g->getMinValues()) {
if(_overrideMsgCap || totalCount < _maxNumberOfMessages){
if (sendReadings(*s, reads, totalCount) > 0) {
break;
}
} else {
break; //ultimately we will go to sleep 1 second
}
}
}
}
}
// Push output analytics sensors
for(auto& p : _analyticsPlugins) {
if(_doHalt) {
break;
}
for(const auto& a : p.configurator->getAnalyzers()) {
for(const auto& u : a->getUnits()) {
for(const auto& s : u->getBaseOutputs()) {
if (s->getSizeOfReadingQueue() >= a->getMinValues()) {
if (_overrideMsgCap || totalCount < _maxNumberOfMessages) {
if (sendReadings(*s, reads, totalCount) > 0) {
break;
}
} else {
break;
}
}
}
}
}
}
int mosqErr;
if(getTimestamp() - idleTime >= PUSHER_IDLETIME) {
idleTime = getTimestamp();
totalCount = 0;
// Push sensor data
for (auto &p : _plugins) {
if (_doHalt) {
//for faster response
break;
}
for (const auto &g : p.configurator->getSensorGroups()) {
for (const auto &s : g->getSensors()) {
if (s->getSizeOfReadingQueue() >= g->getMinValues()) {
if (_overrideMsgCap || totalCount < _maxNumberOfMessages) {
if (sendReadings(*s, reads, totalCount) > 0) {
break;
}
} else {
break; //ultimately we will go to sleep 1 second
}
}
}
}
}
// Push output analytics sensors
for (auto &p : _analyticsPlugins) {
if (_doHalt) {
break;
}
for (const auto &a : p.configurator->getAnalyzers()) {
for (const auto &u : a->getUnits()) {
for (const auto &s : u->getBaseOutputs()) {
if (s->getSizeOfReadingQueue() >= a->getMinValues()) {
if (_overrideMsgCap || totalCount < _maxNumberOfMessages) {
if (sendReadings(*s, reads, totalCount) > 0) {
break;
}
} else {
break;
}
}
}
}
}
}
}
if ((mosqErr = mosquitto_loop(_mosq, -1, 1)) != MOSQ_ERR_SUCCESS) {
if (mosqErr == MOSQ_ERR_CONN_LOST) {
LOGM(info) << "Disconnected.";
......
......@@ -9,6 +9,7 @@
#define MQTTPUSHER_H_
#define DCDB_MAP "/DCDB_MAP/"
#define PUSHER_IDLETIME 1000000000
#include <mosquitto.h>
#include "includes/PluginDefinitions.h"
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment