Commit c8442902 authored by Alessio Netti's avatar Alessio Netti
Browse files

Configurable QoS level

- Added a qosLevel parameter for MQTT communication in the config
- Can either be 0, 1 or 2
- Default is 1, but setting a value of 0 can help improving latency
in case of extremely high message rates
parent 17c0009f
......@@ -6,6 +6,7 @@ global {
daemonize false
tempdir .
cacheInterval 120
qosLevel 1
}
restAPI {
......
......@@ -24,6 +24,7 @@ Configuration::Configuration(const std::string& cfgFilePath) :
}
//set default values for global variables
_global.qosLevel = 1;
_global.daemonize = 0;
_global.brokerHost = "";
_global.brokerPort = 1883;
......@@ -87,6 +88,10 @@ bool Configuration::readGlobal() {
if (_global.pluginSettings.tempdir[_global.pluginSettings.tempdir.length()-1] != '/') {
_global.pluginSettings.tempdir.append("/");
}
} else if (boost::iequals(global.first, "qosLevel")) {
_global.qosLevel = stoi(global.second.data());
if(_global.qosLevel < 0 || _global.qosLevel > 2)
_global.qosLevel = 1;
} else if (boost::iequals(global.first, "threads")) {
_global.threads = stoi(global.second.data());
} else if (boost::iequals(global.first, "daemonize")) {
......
......@@ -18,6 +18,7 @@
typedef struct {
int daemonize;
int brokerPort;
int qosLevel;
std::string brokerHost;
uint32_t threads;
boost::log::trivial::severity_level logLevelFile;
......
......@@ -14,9 +14,10 @@
#define LOGM(sev) LOG(sev) << "Mosquitto: "
MQTTPusher::MQTTPusher(int brokerPort, const std::string& brokerHost,
const std::string& mqttPrefix, const std::string& sensorPattern, pluginVector_t& plugins) :
_brokerPort(brokerPort),
MQTTPusher::MQTTPusher(int brokerPort, const std::string& brokerHost, const std::string& mqttPrefix,
const std::string& sensorPattern, int qosLevel, pluginVector_t& plugins) :
_qosLevel(qosLevel),
_brokerPort(brokerPort),
_brokerHost(brokerHost),
_sensorPattern(sensorPattern),
_plugins(plugins),
......@@ -138,7 +139,7 @@ void MQTTPusher::sendReadings(SensorBase* s, reading_t* reads, std::size_t& tota
}
#endif
//try to send them to the broker
if (mosquitto_publish(_mosq, NULL, (s->getMqtt()).c_str(), sizeof(reading_t)*count, reads, 1, false) != MOSQ_ERR_SUCCESS) {
if (mosquitto_publish(_mosq, NULL, (s->getMqtt()).c_str(), sizeof(reading_t)*count, reads, _qosLevel, false) != MOSQ_ERR_SUCCESS) {
//could not send them --> push the sensor values back into the queue
LOGM(error) << "Could not send message! Trying again later";
......@@ -172,7 +173,7 @@ bool MQTTPusher::sendMappings() {
name = boost::regex_replace(name, pluginReg, p.id);
//try to send mapping to the broker
if (mosquitto_publish(_mosq, NULL, topic.c_str(), name.length(), name.c_str(), 1, false) != MOSQ_ERR_SUCCESS) {
if (mosquitto_publish(_mosq, NULL, topic.c_str(), name.length(), name.c_str(), _qosLevel, false) != MOSQ_ERR_SUCCESS) {
LOGM(error) << "Broker not reachable! Only " << publishCtr << " sensors were published.";
_connected = false;
return true;
......
......@@ -21,8 +21,8 @@
*/
class MQTTPusher {
public:
MQTTPusher(int brokerPort, const std::string& _brokerHost,
const std::string& mqttPrefix, const std::string& sensorPattern, pluginVector_t& plugins);
MQTTPusher(int brokerPort, const std::string& _brokerHost, const std::string& mqttPrefix,
const std::string& sensorPattern, int qosLevel, pluginVector_t& plugins);
virtual ~MQTTPusher();
void push();
......@@ -51,6 +51,7 @@ private:
void sendReadings(SensorBase* s, reading_t* reads, std::size_t& totalCount);
bool sendMappings();
int _qosLevel;
int _brokerPort;
std::string _brokerHost;
std::string _sensorPattern;
......
......@@ -265,7 +265,8 @@ int main(int argc, char** argv) {
#endif
//MQTTPusher and Https server get their own threads
_mqttPusher = new MQTTPusher(globalSettings.brokerPort, globalSettings.brokerHost, pluginSettings.mqttPrefix, pluginSettings.sensorPattern, _configuration->getPlugins());
_mqttPusher = new MQTTPusher(globalSettings.brokerPort, globalSettings.brokerHost, pluginSettings.mqttPrefix,
pluginSettings.sensorPattern, globalSettings.qosLevel, _configuration->getPlugins());
_httpsServer = new HttpsServer(restAPISettings, _configuration->getPlugins(), _mqttPusher, io);
_configuration->readAuthkeys(_httpsServer);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment