Commit 8b9bbaf7 authored by Michael Ott's avatar Michael Ott
Browse files

Open MQTT connection first to work around limitations in mosquitto with file descriptors > 1024.

Switch to mosquitto_loop instead of mosquitto_loop_start to avoid creating an additional thread.
parent b26cd7f3
......@@ -43,6 +43,15 @@ MQTTPusher::MQTTPusher(int brokerPort, const std::string& brokerHost,
perror(NULL);
exit(EXIT_FAILURE);
}
mosquitto_threaded_set(_mosq, true);
if (mosquitto_connect(_mosq, _brokerHost.c_str(), _brokerPort, 1000) != MOSQ_ERR_SUCCESS) {
LOGM(error) << "Could not connect to MQTT broker " << _brokerHost << ":" << _brokerPort;
sleep(1);
} else {
_connected = true;
LOGM(info) << "Connection established!";
}
}
MQTTPusher::~MQTTPusher() {
......@@ -63,14 +72,7 @@ void MQTTPusher::push() {
LOGM(info) << "Connection established!";
}
}
if (mosquitto_loop_start(_mosq) != MOSQ_ERR_SUCCESS) {
LOGM(fatal) << "Setup failed";
_keepRunning = false;
mosquitto_disconnect(_mosq);
return;
}
//collect sensor-data
reading_t* reads = new reading_t[1024];
std::size_t totalCount = 0;
......@@ -95,10 +97,12 @@ void MQTTPusher::push() {
}
}
}
sleep(1);
int mosqErr;
if ((mosqErr = mosquitto_loop(_mosq, -1, 1)) != MOSQ_ERR_SUCCESS) {
LOGM(error) << "Error in mosquitto_loop: " << mosquitto_strerror(mosqErr);
}
}
mosquitto_disconnect(_mosq);
mosquitto_loop_stop(_mosq, false);
}
void MQTTPusher::sendReadings(SensorBase* s, reading_t* reads, std::size_t& totalCount) {
......
......@@ -260,6 +260,11 @@ int main(int argc, char** argv) {
LOG(info) << " Certificate, private key and DH-param file not printed.";
#endif
//MQTTPusher and Https server get their own threads
_mqttPusher = new MQTTPusher(globalSettings.brokerPort, globalSettings.brokerHost, pluginSettings.mqttPrefix, _configuration->getPlugins());
_httpsServer = new HttpsServer(restAPISettings, _configuration->getPlugins(), _mqttPusher, io);
_configuration->readAuthkeys(_httpsServer);
//Init all sensors
LOG(info) << "Init sensors...";
for(auto& p : _configuration->getPlugins()) {
......@@ -305,12 +310,7 @@ int main(int argc, char** argv) {
for(size_t i = 0; i < globalSettings.threads; i++) {
threads.create_thread(bind(static_cast< size_t (boost::asio::io_service::*) () >(&boost::asio::io_service::run), &io));
}
//MQTTPusher and Https server get their own threads
_mqttPusher = new MQTTPusher(globalSettings.brokerPort, globalSettings.brokerHost, pluginSettings.mqttPrefix, _configuration->getPlugins());
_httpsServer = new HttpsServer(restAPISettings, _configuration->getPlugins(), _mqttPusher, io);
_configuration->readAuthkeys(_httpsServer);
boost::thread mqttThread(bind(&MQTTPusher::push, _mqttPusher));
boost::thread restThread(bind(&HttpsServer::run, _httpsServer));
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment