MQTTPusher.cpp 8.86 KB
Newer Older
1
2
3
4
/*
 * MQTTPusher.cpp
 *
 *  Created on: 13.12.2017
5
 *      Author: Michael Ott (original), Micha Mueller
6
7
8
9
10
11
 */

#include "MQTTPusher.h"
#include <iostream>
#include <string>
#include <unistd.h>
12
#include "timestamp.h"
13

14
15
#define LOGM(sev) LOG(sev) << "Mosquitto: "

16
MQTTPusher::MQTTPusher(int brokerPort, const std::string& brokerHost, const std::string& sensorPattern, int qosLevel,
Alessio Netti's avatar
Alessio Netti committed
17
					   pluginVector_t& plugins, an_pluginVector_t& aPlugins, int maxNumberOfMessages, unsigned int maxInflightMsgNum, unsigned int maxQueuedMsgNum) :
18
19
		_qosLevel(qosLevel),
		_brokerPort(brokerPort),
20
		_brokerHost(brokerHost),
21
		_sensorPattern(sensorPattern),
22
		_plugins(plugins),
23
		_analyticsPlugins(aPlugins),
24
		_connected(false),
25
		_keepRunning(true),
Alessio Netti's avatar
Alessio Netti committed
26
		_msgCap(DISABLED),
27
		_doHalt(false),
28
		_halted(false),
29
30
31
		_maxNumberOfMessages(maxNumberOfMessages),
		_maxInflightMsgNum(maxInflightMsgNum),
		_maxQueuedMsgNum(maxQueuedMsgNum) {
Micha Mueller's avatar
Micha Mueller committed
32
33

	//first print some info
34
35
	int mosqMajor, mosqMinor, mosqRevision;
	mosquitto_lib_version(&mosqMajor, &mosqMinor, &mosqRevision);
36
	LOGM(info) << mosqMajor << "." << mosqMinor << "." << mosqRevision;
37
38
	char hostname[256];
	if (gethostname(hostname, 255) != 0) {
39
		LOG(fatal) << "Cannot get hostname";
40
41
42
		exit(EXIT_FAILURE);
	}
	hostname[255] = '\0';
43
	LOG(info) << "Hostname: " << hostname;
Micha Mueller's avatar
Micha Mueller committed
44
	//enough information
45

Micha Mueller's avatar
Micha Mueller committed
46
	//init mosquitto-struct
47
	mosquitto_lib_init();
48
	_mosq = mosquitto_new(hostname, false, NULL);
49
50
51
52
	if (!_mosq) {
		perror(NULL);
		exit(EXIT_FAILURE);
	}
53

54
	mosquitto_threaded_set(_mosq, true);
55
56
	mosquitto_max_inflight_messages_set(_mosq, _maxInflightMsgNum);
	mosquitto_max_queued_messages_set(_mosq, _maxQueuedMsgNum);
57
58
59
60
61
62
63
64
65
}

MQTTPusher::~MQTTPusher() {
	if(_connected) {
		mosquitto_disconnect(_mosq);
	}
}

void MQTTPusher::push() {
66
67
    int mosqErr;
    uint64_t idleTime = 0;
Micha Mueller's avatar
Micha Mueller committed
68
	//connect to broker (if necessary)
69
	while (_keepRunning && !_connected) {
70
		if (mosquitto_connect(_mosq, _brokerHost.c_str(), _brokerPort, 1000) != MOSQ_ERR_SUCCESS) {
71
			LOGM(error) << "Could not connect to MQTT broker " << _brokerHost << ":" << _brokerPort;
72
73
74
			sleep(1);
		} else {
			_connected = true;
75
			LOGM(info) << "Connection established!";
76
77
		}
	}
78

Alessio Netti's avatar
Alessio Netti committed
79
80
81
	//Performing auto-publish if necessary
	sendMappings();

Alessio Netti's avatar
Alessio Netti committed
82
	computeMsgRate();
Micha Mueller's avatar
Micha Mueller committed
83
	//collect sensor-data
lu43jih's avatar
lu43jih committed
84
85
	reading_t* reads = new reading_t[SensorBase::QUEUE_MAXLIMIT];
	std::size_t totalCount = 0; //number of messages
86
	while (_keepRunning || totalCount) {
87
88
89
90
91
92
		if (_doHalt) {
			_halted = true;
			sleep(2);
			continue;
		}
		_halted = false;
93
94
95
96
97
98
99
100
101
102
		
		//there was a (unintended) disconnect in the meantime --> reconnect
		if (!_connected) {
			LOGM(error) << "Lost connection. Reconnecting...";
			if (mosquitto_reconnect(_mosq) != MOSQ_ERR_SUCCESS) {
				LOGM(error) << "Could not reconnect to MQTT broker " << _brokerHost << ":" << _brokerPort << std::endl;
				sleep(5);
			} else {
				_connected = true;
				LOGM(info) << "Connection established!";
103
			}
104
		}
105

106
		if (_connected) {
107
108
109
110
111
112
113
114
115
116
117
118
            if(getTimestamp() - idleTime >= PUSHER_IDLETIME) {
                idleTime = getTimestamp();
                totalCount = 0;
                // Push sensor data
                for (auto &p : _plugins) {
                    if (_doHalt) {
                        //for faster response
                        break;
                    }
                    for (const auto &g : p.configurator->getSensorGroups()) {
                        for (const auto &s : g->getSensors()) {
                            if (s->getSizeOfReadingQueue() >= g->getMinValues()) {
Alessio Netti's avatar
Alessio Netti committed
119
                                if (_msgCap == DISABLED || totalCount < (unsigned)_maxNumberOfMessages) {
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
                                    if (sendReadings(*s, reads, totalCount) > 0) {
                                        break;
                                    }
                                } else {
                                    break; //ultimately we will go to sleep 1 second
                                }
                            }
                        }
                    }
                }
                // Push output analytics sensors
                for (auto &p : _analyticsPlugins) {
                    if (_doHalt) {
                        break;
                    }
                    for (const auto &a : p.configurator->getAnalyzers()) {
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
                    	if(a->getStreaming()) {
							for (const auto &u : a->getUnits()) {
								for (const auto &s : u->getBaseOutputs()) {
									if (s->getSizeOfReadingQueue() >= a->getMinValues()) {
										if (_msgCap == DISABLED || totalCount < (unsigned) _maxNumberOfMessages) {
											if (sendReadings(*s, reads, totalCount) > 0) {
												break;
											}
										} else {
											break;
										}
									}
								}
							}
						}
151
152
153
154
                    }
                }
            }

155
156
157
158
159
160
161
162
			if ((mosqErr = mosquitto_loop(_mosq, -1, 1)) != MOSQ_ERR_SUCCESS) {
				if (mosqErr == MOSQ_ERR_CONN_LOST) {
					LOGM(info) << "Disconnected.";
					_connected = false;
				} else {
					LOGM(error) << "Error in mosquitto_loop: " << mosquitto_strerror(mosqErr);
				}
			}
Michael Ott's avatar
Michael Ott committed
163
		}
164
	}
165
	mosquitto_disconnect(_mosq);
166
}
167

168
169
170
171
172
int MQTTPusher::sendReadings(SensorBase& s, reading_t* reads, std::size_t& totalCount) {
	//get all sensor values out of its queue
	std::size_t count = s.popReadingQueue(reads, SensorBase::QUEUE_MAXLIMIT);
	//totalCount+= count;
	totalCount+= 1;
173
#ifdef DEBUG
174
	LOGM(debug) << "Sending " << count << " values from " << s.getName();
175
#endif
176
	
177
#if DEBUG
178
179
180
	for (std::size_t i=0; i<count; i++) {
		LOG(debug) << "  " << reads[i].timestamp << " " << reads[i].value;
	}
181
#endif
182
183
184
185
186
187
188
	//try to send them to the broker
	int rc;
	if ((rc = mosquitto_publish(_mosq, NULL, (s.getMqtt()).c_str(), sizeof(reading_t)*count, reads, _qosLevel, false)) != MOSQ_ERR_SUCCESS) {
		//could not send them --> push the sensor values back into the queue
		if (rc == MOSQ_ERR_NOMEM) {
			LOGM(info) << "Can\'t queue additional messages";
		} else {
189
190
191
			LOGM(error) << "Could not send message! Trying again later";
			_connected = false;
		}
192
193
194
195
		s.pushReadingQueue(reads, count);
		//totalCount -= count;
		totalCount -= 1;
		return 1;
196
	}
197
	return 0;
198
}
199
200
201
202
203

bool MQTTPusher::sendMappings() {
	if(_sensorPattern == "")
		return false;

204
205
206
207
208
209
210
	std::string topic, name;
	unsigned int publishCtr=0;
	// Performing auto-publish for sensors
	for(auto& p: _plugins)
		for(auto& g: p.configurator->getSensorGroups())
			for(auto& s: g->getSensors()) {
				topic = std::string(DCDB_MAP) + s->getMqtt();
Alessio Netti's avatar
Alessio Netti committed
211
				name  = s->getName();
212

Alessio Netti's avatar
Alessio Netti committed
213
				// Try to send mapping to the broker
214
215
216
217
218
219
220
221
222
223
224
225
				if (mosquitto_publish(_mosq, NULL, topic.c_str(), name.length(), name.c_str(), _qosLevel, false) != MOSQ_ERR_SUCCESS) {
					LOGM(error) << "Broker not reachable! Only " << publishCtr << " sensors were published.";
					_connected = false;
					return true;
				}
				else
					publishCtr++;
			}

	// Performing auto-publish for analytics output sensors
	for(auto& p: _analyticsPlugins)
		for(auto& a: p.configurator->getAnalyzers())
226
227
228
229
230
231
232
233
234
235
236
237
238
239
			if(a->getStreaming())
				for(auto& u: a->getUnits())
					for(auto& s: u->getBaseOutputs()) {
						topic = std::string(DCDB_MAP) + s->getMqtt();
						name  = s->getName();
	
						// Try to send mapping to the broker
						if (mosquitto_publish(_mosq, NULL, topic.c_str(), name.length(), name.c_str(), _qosLevel, false) != MOSQ_ERR_SUCCESS) {
							LOGM(error) << "Broker not reachable! Only " << publishCtr << " sensors were published.";
							_connected = false;
							return true;
						}
						else
							publishCtr++;
240
					}
Alessio Netti's avatar
Alessio Netti committed
241
	LOGM(info) << "Sensor name auto-publish performed for all " << publishCtr << " sensors!";
242
243
	return true;
}
244

Alessio Netti's avatar
Alessio Netti committed
245
246
247
248
249
void MQTTPusher::computeMsgRate() {
	// Computing number of sent MQTT messages per second
	float msgRate = 0;
	for(auto& p : _plugins)
		for(const auto& g : p.configurator->getSensorGroups())
Alessio Netti's avatar
Alessio Netti committed
250
			msgRate += (float)g->getSensors().size() * ( 1000.0f / (float)g->getInterval() ) / (float)g->getMinValues();
251
252
253
	for(auto& p : _analyticsPlugins)
		for(const auto& a : p.configurator->getAnalyzers())
			for(const auto& u : a->getUnits())
Alessio Netti's avatar
Alessio Netti committed
254
				msgRate += (float)u->getBaseOutputs().size() * ( 1000.0f / (float)a->getInterval() ) / (float)a->getMinValues();
Alessio Netti's avatar
Alessio Netti committed
255
	// The formula below assumes the pusher's sleep time is 1 sec; if not, change accordingly
Alessio Netti's avatar
Alessio Netti committed
256
257
258
259
260
261
262
263
264
265
266
267
268
	if(_maxNumberOfMessages >= 0 && _msgCap != MINIMUM) {
		_msgCap = _maxNumberOfMessages == 0 || msgRate > _maxNumberOfMessages ? DISABLED : ENABLED;
		if (_msgCap == DISABLED && _maxNumberOfMessages > 0)
			LOGM(warning) << "Cannot enforce max rate of " << _maxNumberOfMessages << " msg/s lower than actual " << msgRate << " msg/s!";
		else if(_maxNumberOfMessages > 0)
			LOGM(info) << "Enforcing message cap of " << _maxNumberOfMessages << " msg/s against actual " << msgRate << " msg/s.";
		else
			LOGM(info) << "No message cap enforced. Predicted message rate is " << msgRate << " msg/s.";
	} else {
		_msgCap = MINIMUM;
		_maxNumberOfMessages = msgRate + 10;
		LOGM(info) << "Enforcing message cap of " << _maxNumberOfMessages << " msg/s against actual " << msgRate << " msg/s.";
	}
Alessio Netti's avatar
Alessio Netti committed
269
}