Commit 6f7a286b authored by Micha Mueller's avatar Micha Mueller
Browse files

MQTTPusher: Add call to mosquitto_loop to ensure all messages are sent

parent 9b672383
......@@ -61,6 +61,13 @@ void MQTTPusher::push() {
}
}
if (mosquitto_loop_start(_mosq) != MOSQ_ERR_SUCCESS) {
LOGM(fatal) << "Setup failed";
keepRunning = 0;
mosquitto_disconnect(_mosq);
return;
}
//collect sensor-data
reading_t* reads = new reading_t[1024];
std::size_t totalCount = 0;
......@@ -74,7 +81,7 @@ void MQTTPusher::push() {
LOGM(error) << "Lost connection. Reconnecting...";
if (mosquitto_reconnect(_mosq) != MOSQ_ERR_SUCCESS) {
LOGM(error) << "Could not reconnect to MQTT broker " << _brokerHost << ":" << _brokerPort << std::endl;
sleep(1);
sleep(5);
} else {
_connected = true;
LOGM(info) << "Connection established!";
......@@ -96,11 +103,10 @@ void MQTTPusher::push() {
}
#endif
//try to send them to the broker
if (mosquitto_publish(_mosq, NULL, (s->getMqtt()).c_str(), sizeof(reading_t)*count, reads, 0, false) != MOSQ_ERR_SUCCESS) {
if (mosquitto_publish(_mosq, NULL, (s->getMqtt()).c_str(), sizeof(reading_t)*count, reads, 1, false) != MOSQ_ERR_SUCCESS) {
//could not send them --> push the sensor values back into the queue
LOGM(error) << "Could not send message! Trying again later";
mosquitto_disconnect(_mosq);
_connected = false;
s->pushReadingQueue(reads, count);
totalCount -= count;
......@@ -112,4 +118,6 @@ void MQTTPusher::push() {
}
sleep(1);
}
mosquitto_disconnect(_mosq);
mosquitto_loop_stop(_mosq, false);
}
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment