Commit e40a2002 authored by Michael Ott's avatar Michael Ott
Browse files

Make use of mosquitto_max_inflight_messages_set() and the new...

Make use of mosquitto_max_inflight_messages_set() and the new mosquitto_max_queued_messages_set() functions to limit the amount of memory spent in mosquitto for queued messages.
Rework the handling of disconnects/reconnects
parent 1543c085
......@@ -69,6 +69,8 @@ Please have a look at the provided `config/global.conf` example to get familiar
| sensorpattern | pattern used to perform automatic sensor name publishing. See the corresponding [section](#autopublish) for more information.
| threads | Specify how many threads should be created to handle the sensors async. Default value of threads is 1. Note that the MQTTPusher always starts an extra thread. So the actual number of started threads is always one more than defined here. Specifying not enough threads can result in a delay for some sensors until they are read.
| maxMsgNum | To avoid publishing too many MQTT messages at once you can define here a maximum count of values that are published in one turn. After reaching this limit the MQTTPusher will be forced to sleep for a short time before continuing.
|maxInflightMsgNum|Maximum number of messages that can be "inflight". This is a MQTT term and should match the broker's setting. Set to 0 for unlimited.
|maxQueuedMsgNum|Maximum number of MQTT messages (including "inflight") that should be queued. This is to limit the amount of memory that is used for buffering. Set to 0 for unlimited.
| verbosity | Level of detail in the logfile (dcdb.log). Set to a value between 5 (all log-messages, default) and 0 (only fatal messages). NOTE: level of verbosity for the command-line log can be set via the -v flag independently when invoking dcdbpusher.
| daemonize | Set to 'true' if dcdbpusher should run detached as daemon. Default is false.
| tempdir | One can specify a writeable directory where dcdbpusher can write its temporary and logging files to. Default is the current (' ./ ' ) directory.
......
......@@ -32,6 +32,8 @@ Configuration::Configuration(const std::string& cfgFilePath) :
_global.brokerPort = 1883;
_global.threads = 1;
_global.maxMsgNum = 100;
_global.maxInflightMsgNum = 20;
_global.maxQueuedMsgNum = 0;
_global.logLevelFile = boost::log::trivial::trace;
_global.logLevelCmd = boost::log::trivial::info;
......@@ -97,7 +99,11 @@ bool Configuration::readGlobal() {
_global.qosLevel = stoi(global.second.data());
if(_global.qosLevel < 0 || _global.qosLevel > 2)
_global.qosLevel = 1;
} else if (boost::iequals(global.first, "threads")) {
} else if (boost::iequals(global.first, "maxInflightMsgNum")) {
_global.maxInflightMsgNum = stoull(global.second.data());
}else if (boost::iequals(global.first, "maxQueuedMsgNum")) {
_global.maxQueuedMsgNum = stoull(global.second.data());
}else if (boost::iequals(global.first, "threads")) {
_global.threads = stoi(global.second.data());
} else if (boost::iequals(global.first, "maxMsgNum")) {
_global.maxMsgNum = stoull(global.second.data());
......
......@@ -20,6 +20,8 @@ typedef struct {
int daemonize;
int brokerPort;
int qosLevel;
unsigned int maxInflightMsgNum;
unsigned int maxQueuedMsgNum;
std::string brokerHost;
std::string hierarchy;
uint32_t threads;
......
......@@ -14,7 +14,7 @@
#define LOGM(sev) LOG(sev) << "Mosquitto: "
MQTTPusher::MQTTPusher(int brokerPort, const std::string& brokerHost, const std::string& sensorPattern, int qosLevel,
pluginVector_t& plugins, an_pluginVector_t& aPlugins, unsigned int maxNumberOfMessages) :
pluginVector_t& plugins, an_pluginVector_t& aPlugins, unsigned int maxNumberOfMessages, unsigned int maxInflightMsgNum, unsigned int maxQueuedMsgNum) :
_qosLevel(qosLevel),
_brokerPort(brokerPort),
_brokerHost(brokerHost),
......@@ -26,7 +26,9 @@ MQTTPusher::MQTTPusher(int brokerPort, const std::string& brokerHost, const std:
_overrideMsgCap(true),
_doHalt(false),
_halted(false),
_maxNumberOfMessages(maxNumberOfMessages) {
_maxNumberOfMessages(maxNumberOfMessages),
_maxInflightMsgNum(maxInflightMsgNum),
_maxQueuedMsgNum(maxQueuedMsgNum) {
//first print some info
int mosqMajor, mosqMinor, mosqRevision;
......@@ -50,13 +52,8 @@ MQTTPusher::MQTTPusher(int brokerPort, const std::string& brokerHost, const std:
}
mosquitto_threaded_set(_mosq, true);
if (mosquitto_connect(_mosq, _brokerHost.c_str(), _brokerPort, 1000) != MOSQ_ERR_SUCCESS) {
LOGM(error) << "Could not connect to MQTT broker " << _brokerHost << ":" << _brokerPort;
sleep(1);
} else {
_connected = true;
LOGM(info) << "Connection established!";
}
mosquitto_max_inflight_messages_set(_mosq, _maxInflightMsgNum);
mosquitto_max_queued_messages_set(_mosq, _maxQueuedMsgNum);
}
MQTTPusher::~MQTTPusher() {
......@@ -66,7 +63,6 @@ MQTTPusher::~MQTTPusher() {
}
void MQTTPusher::push() {
//connect to broker (if necessary)
while (_keepRunning && !_connected) {
if (mosquitto_connect(_mosq, _brokerHost.c_str(), _brokerPort, 1000) != MOSQ_ERR_SUCCESS) {
......@@ -92,91 +88,106 @@ void MQTTPusher::push() {
continue;
}
_halted = false;
totalCount = 0;
// Push sensor data
for(auto& p : _plugins) {
if(_doHalt) {
//for faster response
break;
//there was a (unintended) disconnect in the meantime --> reconnect
if (!_connected) {
LOGM(error) << "Lost connection. Reconnecting...";
if (mosquitto_reconnect(_mosq) != MOSQ_ERR_SUCCESS) {
LOGM(error) << "Could not reconnect to MQTT broker " << _brokerHost << ":" << _brokerPort << std::endl;
sleep(5);
} else {
_connected = true;
LOGM(info) << "Connection established!";
}
for(const auto& g : p.configurator->getSensorGroups()) {
for(const auto& s : g->getSensors()) {
if (s->getSizeOfReadingQueue() >= g->getMinValues()) {
if(_overrideMsgCap || totalCount < _maxNumberOfMessages){
sendReadings(*s, reads, totalCount);
} else {
break; //ultimately we will go to sleep 1 second
}
if (_connected) {
totalCount = 0;
// Push sensor data
for(auto& p : _plugins) {
if(_doHalt) {
//for faster response
break;
}
for(const auto& g : p.configurator->getSensorGroups()) {
for(const auto& s : g->getSensors()) {
if (s->getSizeOfReadingQueue() >= g->getMinValues()) {
if(_overrideMsgCap || totalCount < _maxNumberOfMessages){
if (sendReadings(*s, reads, totalCount) > 0) {
break;
}
} else {
break; //ultimately we will go to sleep 1 second
}
}
}
}
}
}
// Push output analytics sensors
for(auto& p : _analyticsPlugins) {
if(_doHalt) {
break;
}
for(const auto& a : p.configurator->getAnalyzers()) {
for(const auto& u : a->getUnits()) {
for(const auto& s : u->getBaseOutputs()) {
if (s->getSizeOfReadingQueue() >= a->getMinValues()) {
if (_overrideMsgCap || totalCount < _maxNumberOfMessages) {
sendReadings(*s, reads, totalCount);
} else {
break;
// Push output analytics sensors
for(auto& p : _analyticsPlugins) {
if(_doHalt) {
break;
}
for(const auto& a : p.configurator->getAnalyzers()) {
for(const auto& u : a->getUnits()) {
for(const auto& s : u->getBaseOutputs()) {
if (s->getSizeOfReadingQueue() >= a->getMinValues()) {
if (_overrideMsgCap || totalCount < _maxNumberOfMessages) {
if (sendReadings(*s, reads, totalCount) > 0) {
break;
}
} else {
break;
}
}
}
}
}
}
}
int mosqErr;
if ((mosqErr = mosquitto_loop(_mosq, -1, 1)) != MOSQ_ERR_SUCCESS) {
LOGM(error) << "Error in mosquitto_loop: " << mosquitto_strerror(mosqErr);
int mosqErr;
if ((mosqErr = mosquitto_loop(_mosq, -1, 1)) != MOSQ_ERR_SUCCESS) {
if (mosqErr == MOSQ_ERR_CONN_LOST) {
LOGM(info) << "Disconnected.";
_connected = false;
} else {
LOGM(error) << "Error in mosquitto_loop: " << mosquitto_strerror(mosqErr);
}
}
}
}
mosquitto_disconnect(_mosq);
}
void MQTTPusher::sendReadings(SensorBase& s, reading_t* reads, std::size_t& totalCount) {
//there was a (unintended) disconnect in the meantime --> reconnect
if (!_connected) {
LOGM(error) << "Lost connection. Reconnecting...";
if (mosquitto_reconnect(_mosq) != MOSQ_ERR_SUCCESS) {
LOGM(error) << "Could not reconnect to MQTT broker " << _brokerHost << ":" << _brokerPort << std::endl;
sleep(5);
} else {
_connected = true;
LOGM(info) << "Connection established!";
}
}
if (_connected) {
//get all sensor values out of its queue
std::size_t count = s.popReadingQueue(reads, SensorBase::QUEUE_MAXLIMIT);
//totalCount+= count;
totalCount+= 1;
int MQTTPusher::sendReadings(SensorBase& s, reading_t* reads, std::size_t& totalCount) {
//get all sensor values out of its queue
std::size_t count = s.popReadingQueue(reads, SensorBase::QUEUE_MAXLIMIT);
//totalCount+= count;
totalCount+= 1;
#ifdef DEBUG
LOGM(debug) << "Sending " << count << " values from " << s.getName();
LOGM(debug) << "Sending " << count << " values from " << s.getName();
#endif
#if DEBUG
for (std::size_t i=0; i<count; i++) {
LOG(debug) << " " << reads[i].timestamp << " " << reads[i].value;
}
for (std::size_t i=0; i<count; i++) {
LOG(debug) << " " << reads[i].timestamp << " " << reads[i].value;
}
#endif
//try to send them to the broker
if (mosquitto_publish(_mosq, NULL, (s.getMqtt()).c_str(), sizeof(reading_t)*count, reads, _qosLevel, false) != MOSQ_ERR_SUCCESS) {
//could not send them --> push the sensor values back into the queue
//try to send them to the broker
int rc;
if ((rc = mosquitto_publish(_mosq, NULL, (s.getMqtt()).c_str(), sizeof(reading_t)*count, reads, _qosLevel, false)) != MOSQ_ERR_SUCCESS) {
//could not send them --> push the sensor values back into the queue
if (rc == MOSQ_ERR_NOMEM) {
LOGM(info) << "Can\'t queue additional messages";
} else {
LOGM(error) << "Could not send message! Trying again later";
_connected = false;
s.pushReadingQueue(reads, count);
//totalCount -= count;
totalCount -= 1;
sleep(5);
}
s.pushReadingQueue(reads, count);
//totalCount -= count;
totalCount -= 1;
return 1;
}
return 0;
}
bool MQTTPusher::sendMappings() {
......
......@@ -22,7 +22,7 @@
class MQTTPusher {
public:
MQTTPusher(int brokerPort, const std::string& brokerHost, const std::string& sensorPattern, int qosLevel,
pluginVector_t& plugins, an_pluginVector_t& aPlugins, unsigned int maxNumberOfMessages);
pluginVector_t& plugins, an_pluginVector_t& aPlugins, unsigned int maxNumberOfMessages, unsigned int maxInflightMsgNum, unsigned int maxQueuedMsgNum);
virtual ~MQTTPusher();
void push();
......@@ -50,7 +50,7 @@ public:
}
private:
void sendReadings(SensorBase& s, reading_t* reads, std::size_t& totalCount);
int sendReadings(SensorBase& s, reading_t* reads, std::size_t& totalCount);
void computeMsgRate();
int _qosLevel;
......@@ -66,6 +66,8 @@ private:
bool _doHalt;
bool _halted;
unsigned int _maxNumberOfMessages;
unsigned int _maxInflightMsgNum;
unsigned int _maxQueuedMsgNum;
boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
};
......
......@@ -360,7 +360,7 @@ int main(int argc, char** argv) {
//MQTTPusher and Https server get their own threads
_analyticsManager = new AnalyticsManager();
_mqttPusher = new MQTTPusher(globalSettings.brokerPort, globalSettings.brokerHost, pluginSettings.sensorPattern, globalSettings.qosLevel,
_configuration->getPlugins(), _analyticsManager->getPlugins(), globalSettings.maxMsgNum);
_configuration->getPlugins(), _analyticsManager->getPlugins(), globalSettings.maxMsgNum, globalSettings.maxInflightMsgNum, globalSettings.maxQueuedMsgNum);
_httpsServer = new HttpsServer(restAPISettings, _configuration->getPlugins(), _mqttPusher, _analyticsManager, io);
_configuration->readAuthkeys(_httpsServer);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment