Commit 60ae6562 authored by Alessio Netti's avatar Alessio Netti

Changes to message cap feature

- Setting a maxMsgNum of -1 will result in the minimum possible value
(slightly above the predicted message rate) being picked as a cap
- Added some logging to make the feature more verbose
parent 35e2db49
......@@ -106,7 +106,7 @@ bool Configuration::readGlobal() {
}else if (boost::iequals(global.first, "threads")) {
_global.threads = stoi(global.second.data());
} else if (boost::iequals(global.first, "maxMsgNum")) {
_global.maxMsgNum = stoull(global.second.data());
_global.maxMsgNum = stoi(global.second.data());
} else if (boost::iequals(global.first, "daemonize")) {
if (global.second.data() == "true") {
_global.daemonize = 1;
......
......@@ -25,7 +25,7 @@ typedef struct {
std::string brokerHost;
std::string hierarchy;
uint32_t threads;
unsigned int maxMsgNum;
int maxMsgNum;
boost::log::trivial::severity_level logLevelFile;
boost::log::trivial::severity_level logLevelCmd;
pluginSettings_t pluginSettings;
......
......@@ -14,7 +14,7 @@
#define LOGM(sev) LOG(sev) << "Mosquitto: "
MQTTPusher::MQTTPusher(int brokerPort, const std::string& brokerHost, const std::string& sensorPattern, int qosLevel,
pluginVector_t& plugins, an_pluginVector_t& aPlugins, unsigned int maxNumberOfMessages, unsigned int maxInflightMsgNum, unsigned int maxQueuedMsgNum) :
pluginVector_t& plugins, an_pluginVector_t& aPlugins, int maxNumberOfMessages, unsigned int maxInflightMsgNum, unsigned int maxQueuedMsgNum) :
_qosLevel(qosLevel),
_brokerPort(brokerPort),
_brokerHost(brokerHost),
......@@ -23,7 +23,7 @@ MQTTPusher::MQTTPusher(int brokerPort, const std::string& brokerHost, const std:
_analyticsPlugins(aPlugins),
_connected(false),
_keepRunning(true),
_overrideMsgCap(true),
_msgCap(DISABLED),
_doHalt(false),
_halted(false),
_maxNumberOfMessages(maxNumberOfMessages),
......@@ -116,7 +116,7 @@ void MQTTPusher::push() {
for (const auto &g : p.configurator->getSensorGroups()) {
for (const auto &s : g->getSensors()) {
if (s->getSizeOfReadingQueue() >= g->getMinValues()) {
if (_overrideMsgCap || totalCount < _maxNumberOfMessages) {
if (_msgCap == DISABLED || totalCount < _maxNumberOfMessages) {
if (sendReadings(*s, reads, totalCount) > 0) {
break;
}
......@@ -136,7 +136,7 @@ void MQTTPusher::push() {
for (const auto &u : a->getUnits()) {
for (const auto &s : u->getBaseOutputs()) {
if (s->getSizeOfReadingQueue() >= a->getMinValues()) {
if (_overrideMsgCap || totalCount < _maxNumberOfMessages) {
if (_msgCap == DISABLED || totalCount < _maxNumberOfMessages) {
if (sendReadings(*s, reads, totalCount) > 0) {
break;
}
......@@ -250,7 +250,17 @@ void MQTTPusher::computeMsgRate() {
for(const auto& u : a->getUnits())
msgRate += u->getBaseOutputs().size() * ( 1000 / a->getInterval() ) / a->getMinValues();
// The formula below assumes the pusher's sleep time is 1 sec; if not, change accordingly
_overrideMsgCap = _maxNumberOfMessages == 0 || msgRate > _maxNumberOfMessages;
if( _overrideMsgCap && _maxNumberOfMessages != 0 )
LOGM(warning) << "Cannot enforce max rate of " << _maxNumberOfMessages << " msg/s lower than actual " << msgRate << " msg/s!";
if(_maxNumberOfMessages >= 0 && _msgCap != MINIMUM) {
_msgCap = _maxNumberOfMessages == 0 || msgRate > _maxNumberOfMessages ? DISABLED : ENABLED;
if (_msgCap == DISABLED && _maxNumberOfMessages > 0)
LOGM(warning) << "Cannot enforce max rate of " << _maxNumberOfMessages << " msg/s lower than actual " << msgRate << " msg/s!";
else if(_maxNumberOfMessages > 0)
LOGM(info) << "Enforcing message cap of " << _maxNumberOfMessages << " msg/s against actual " << msgRate << " msg/s.";
else
LOGM(info) << "No message cap enforced. Predicted message rate is " << msgRate << " msg/s.";
} else {
_msgCap = MINIMUM;
_maxNumberOfMessages = msgRate + 10;
LOGM(info) << "Enforcing message cap of " << _maxNumberOfMessages << " msg/s against actual " << msgRate << " msg/s.";
}
}
......@@ -17,13 +17,15 @@
#include "analytics/AnalyticsManager.h"
#include <map>
enum msgCap_t {DISABLED = 1, ENABLED = 2, MINIMUM = 3};
/**
* Class responsible for collecting values from the sensors and pushing them to the database.
*/
class MQTTPusher {
public:
MQTTPusher(int brokerPort, const std::string& brokerHost, const std::string& sensorPattern, int qosLevel,
pluginVector_t& plugins, an_pluginVector_t& aPlugins, unsigned int maxNumberOfMessages, unsigned int maxInflightMsgNum, unsigned int maxQueuedMsgNum);
pluginVector_t& plugins, an_pluginVector_t& aPlugins, int maxNumberOfMessages, unsigned int maxInflightMsgNum, unsigned int maxQueuedMsgNum);
virtual ~MQTTPusher();
void push();
......@@ -63,10 +65,10 @@ private:
struct mosquitto* _mosq;
bool _connected;
bool _keepRunning;
bool _overrideMsgCap;
msgCap_t _msgCap;
bool _doHalt;
bool _halted;
unsigned int _maxNumberOfMessages;
int _maxNumberOfMessages;
unsigned int _maxInflightMsgNum;
unsigned int _maxQueuedMsgNum;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment