Commit 755c388c authored by Alessio Netti's avatar Alessio Netti
Browse files

Checks on maxNumberOfMessages

- At starting time, the actual MQTT message rate across all plugins is
computed and compared with the maxNumberOfMessages parameter
- If the latter is less than the former, the setting is ignored and no
message cap is imposed
- This prevents situations in which a low cap prevents readings
for some sensors from being sent, leading to missing data and potential
memory leaks
- The check is repeated any time the sensor config changes
parent 84b51cbe
......@@ -20,6 +20,7 @@ MQTTPusher::MQTTPusher(int brokerPort, const std::string& brokerHost,
_plugins(plugins),
_connected(false),
_keepRunning(true),
_overrideMsgCap(true),
_doHalt(false),
_halted(false),
_maxNumberOfMessages(maxNumberOfMessages){
......@@ -72,6 +73,7 @@ void MQTTPusher::push() {
return;
}
computeMsgRate();
//collect sensor-data
reading_t* reads = new reading_t[SensorBase::QUEUE_MAXLIMIT];
std::size_t totalCount = 0; //number of messages
......@@ -91,7 +93,7 @@ void MQTTPusher::push() {
for(const auto& g : p.configurator->getSensorGroups()) {
for(const auto& s : g->getSensors()) {
if (s->getSizeOfReadingQueue() >= g->getMinValues()) {
if(totalCount < _maxNumberOfMessages){
if(_overrideMsgCap || totalCount < _maxNumberOfMessages){
sendReadings(*s, reads, totalCount);
} else {
break; //ultimately we will go to sleep 1 second
......@@ -147,3 +149,15 @@ void MQTTPusher::sendReadings(SensorBase& s, reading_t* reads, std::size_t& tota
}
}
}
void MQTTPusher::computeMsgRate() {
// Computing number of sent MQTT messages per second
float msgRate = 0;
for(auto& p : _plugins)
for(const auto& g : p.configurator->getSensorGroups())
msgRate += g->getSensors().size() * ( 1000 / g->getInterval() ) / g->getMinValues();
// The formula below assumes the pusher's sleep time is 1 sec; if not, change accordingly
_overrideMsgCap = _maxNumberOfMessages == 0 || msgRate > _maxNumberOfMessages;
if( _overrideMsgCap && _maxNumberOfMessages != 0 )
LOGM(error) << "Cannot enforce max rate of " << _maxNumberOfMessages << " msg/s lower than actual " << msgRate << " msg/s!";
}
......@@ -35,6 +35,7 @@ public:
}
void cont() {
computeMsgRate();
_doHalt = false;
}
......@@ -44,6 +45,7 @@ public:
private:
void sendReadings(SensorBase& s, reading_t* reads, std::size_t& totalCount);
void computeMsgRate();
int _brokerPort;
std::string _brokerHost;
......@@ -51,6 +53,7 @@ private:
struct mosquitto* _mosq;
bool _connected;
bool _keepRunning;
bool _overrideMsgCap;
bool _doHalt;
bool _halted;
unsigned int _maxNumberOfMessages;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment