MQTTPusher.cpp 12.4 KB
Newer Older
1
2
3
//================================================================================
// Name        : MQTTPusher.cpp
// Author      : Michael Ott, Micha Mueller
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Copyright   : Leibniz Supercomputing Centre
// Description : Collects values from the sensors and pushes them to the database.
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2017-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
27
28
29
30
31

#include "MQTTPusher.h"
#include <iostream>
#include <string>
#include <unistd.h>
32
#include "timestamp.h"
33

34
MQTTPusher::MQTTPusher(int brokerPort, const std::string& brokerHost, const bool autoPublish, int qosLevel,
35
					   pusherPluginStorage_t& plugins, op_pluginVector_t& oPlugins, int maxNumberOfMessages, unsigned int maxInflightMsgNum, unsigned int maxQueuedMsgNum) :
36
37
		_qosLevel(qosLevel),
		_brokerPort(brokerPort),
38
		_brokerHost(brokerHost),
39
		_autoPublish(autoPublish),
40
		_plugins(plugins),
41
		_operatorPlugins(oPlugins),
42
		_connected(false),
43
		_keepRunning(true),
Alessio Netti's avatar
Alessio Netti committed
44
		_msgCap(DISABLED),
45
		_doHalt(false),
46
		_halted(false),
47
48
49
		_maxNumberOfMessages(maxNumberOfMessages),
		_maxInflightMsgNum(maxInflightMsgNum),
		_maxQueuedMsgNum(maxQueuedMsgNum) {
Micha Mueller's avatar
Micha Mueller committed
50
51

	//first print some info
52
53
	int mosqMajor, mosqMinor, mosqRevision;
	mosquitto_lib_version(&mosqMajor, &mosqMinor, &mosqRevision);
54
	LOGM(info) << mosqMajor << "." << mosqMinor << "." << mosqRevision;
55
56
	char hostname[256];
	if (gethostname(hostname, 255) != 0) {
57
		LOG(fatal) << "Cannot get hostname";
58
59
60
		exit(EXIT_FAILURE);
	}
	hostname[255] = '\0';
61
	LOG(info) << "Hostname: " << hostname;
Micha Mueller's avatar
Micha Mueller committed
62
	//enough information
63

Micha Mueller's avatar
Micha Mueller committed
64
	//init mosquitto-struct
65
	mosquitto_lib_init();
66
	_mosq = mosquitto_new(hostname, false, NULL);
67
68
69
70
	if (!_mosq) {
		perror(NULL);
		exit(EXIT_FAILURE);
	}
71

72
	mosquitto_threaded_set(_mosq, true);
73
74
	mosquitto_max_inflight_messages_set(_mosq, _maxInflightMsgNum);
	mosquitto_max_queued_messages_set(_mosq, _maxQueuedMsgNum);
75
76
77
78
79
80
}

MQTTPusher::~MQTTPusher() {
	if(_connected) {
		mosquitto_disconnect(_mosq);
	}
Micha Mueller's avatar
Micha Mueller committed
81
82
	mosquitto_destroy(_mosq);
	mosquitto_lib_cleanup();
83
84
85
}

void MQTTPusher::push() {
86
87
    int mosqErr;
    uint64_t idleTime = 0;
Micha Mueller's avatar
Micha Mueller committed
88
	//connect to broker (if necessary)
89
	while (_keepRunning && !_connected) {
90
		if (mosquitto_connect(_mosq, _brokerHost.c_str(), _brokerPort, 1000) != MOSQ_ERR_SUCCESS) {
91
			LOGM(error) << "Could not connect to MQTT broker " << _brokerHost << ":" << _brokerPort;
92
93
94
			sleep(1);
		} else {
			_connected = true;
95
			LOGM(info) << "Connection established!";
96
97
		}
	}
98

Alessio Netti's avatar
Alessio Netti committed
99
100
101
	//Performing auto-publish if necessary
	sendMappings();

102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
	// De-allocating metadata for all sensors - after auto-publish it is not needed anymore
	for(auto& p: _plugins) {
		for (auto &g: p.configurator->getSensorGroups()) {
			for (auto &s: g->acquireSensors()) {
				s->clearMetadata();
			}
			g->releaseSensors();
		}
	}
	for(auto& p: _operatorPlugins)
		for(auto& op: p.configurator->getOperators()) {
			for (auto &u: op->getUnits()) {
				for (auto &s: u->getBaseOutputs())
					s->clearMetadata();
				for (auto &s: u->getBaseInputs())
					s->clearMetadata();
			}
			op->releaseUnits();
		}

Alessio Netti's avatar
Alessio Netti committed
122
	computeMsgRate();
Micha Mueller's avatar
Micha Mueller committed
123
	//collect sensor-data
lu43jih's avatar
lu43jih committed
124
125
	reading_t* reads = new reading_t[SensorBase::QUEUE_MAXLIMIT];
	std::size_t totalCount = 0; //number of messages
126
	while (_keepRunning || totalCount) {
127
128
129
130
131
132
		if (_doHalt) {
			_halted = true;
			sleep(2);
			continue;
		}
		_halted = false;
133
134
135
136
137
138
139
140
141
142
		
		//there was a (unintended) disconnect in the meantime --> reconnect
		if (!_connected) {
			LOGM(error) << "Lost connection. Reconnecting...";
			if (mosquitto_reconnect(_mosq) != MOSQ_ERR_SUCCESS) {
				LOGM(error) << "Could not reconnect to MQTT broker " << _brokerHost << ":" << _brokerPort << std::endl;
				sleep(5);
			} else {
				_connected = true;
				LOGM(info) << "Connection established!";
143
			}
144
		}
145

146
		if (_connected) {
147
148
149
150
151
152
153
154
155
156
            if(getTimestamp() - idleTime >= PUSHER_IDLETIME) {
                idleTime = getTimestamp();
                totalCount = 0;
                // Push sensor data
                for (auto &p : _plugins) {
                    if (_doHalt) {
                        //for faster response
                        break;
                    }
                    for (const auto &g : p.configurator->getSensorGroups()) {
157
                        for (const auto &s : g->acquireSensors()) {
158
                            if (s->getSizeOfReadingQueue() >= g->getMinValues()) {
Alessio Netti's avatar
Alessio Netti committed
159
                                if (_msgCap == DISABLED || totalCount < (unsigned)_maxNumberOfMessages) {
160
161
162
163
164
165
166
167
                                    if (sendReadings(*s, reads, totalCount) > 0) {
                                        break;
                                    }
                                } else {
                                    break; //ultimately we will go to sleep 1 second
                                }
                            }
                        }
168
                        g->releaseSensors();
169
170
171
                    }
                }
                // Push output analytics sensors
172
                for (auto &p : _operatorPlugins) {
173
174
175
                    if (_doHalt) {
                        break;
                    }
176
177
178
                    for (const auto &op : p.configurator->getOperators()) {
                    	if(op->getStreaming()) {
							for (const auto &u : op->getUnits()) {
179
								for (const auto &s : u->getBaseOutputs()) {
180
									if (s->getSizeOfReadingQueue() >= op->getMinValues()) {
181
182
183
184
185
186
187
188
189
190
										if (_msgCap == DISABLED || totalCount < (unsigned) _maxNumberOfMessages) {
											if (sendReadings(*s, reads, totalCount) > 0) {
												break;
											}
										} else {
											break;
										}
									}
								}
							}
191
							op->releaseUnits();
192
						}
193
194
195
196
                    }
                }
            }

197
198
199
200
201
202
203
204
			if ((mosqErr = mosquitto_loop(_mosq, -1, 1)) != MOSQ_ERR_SUCCESS) {
				if (mosqErr == MOSQ_ERR_CONN_LOST) {
					LOGM(info) << "Disconnected.";
					_connected = false;
				} else {
					LOGM(error) << "Error in mosquitto_loop: " << mosquitto_strerror(mosqErr);
				}
			}
Michael Ott's avatar
Michael Ott committed
205
		}
206
	}
Micha Mueller's avatar
Micha Mueller committed
207
	delete[] reads;
208
	mosquitto_disconnect(_mosq);
209
}
210

211
212
213
214
215
int MQTTPusher::sendReadings(SensorBase& s, reading_t* reads, std::size_t& totalCount) {
	//get all sensor values out of its queue
	std::size_t count = s.popReadingQueue(reads, SensorBase::QUEUE_MAXLIMIT);
	//totalCount+= count;
	totalCount+= 1;
216
#ifdef DEBUG
217
	LOGM(debug) << "Sending " << count << " values from " << s.getName();
218
#endif
219
	
220
#if DEBUG
221
222
223
	for (std::size_t i=0; i<count; i++) {
		LOG(debug) << "  " << reads[i].timestamp << " " << reads[i].value;
	}
224
#endif
225
226
227
228
229
230
231
	//try to send them to the broker
	int rc;
	if ((rc = mosquitto_publish(_mosq, NULL, (s.getMqtt()).c_str(), sizeof(reading_t)*count, reads, _qosLevel, false)) != MOSQ_ERR_SUCCESS) {
		//could not send them --> push the sensor values back into the queue
		if (rc == MOSQ_ERR_NOMEM) {
			LOGM(info) << "Can\'t queue additional messages";
		} else {
232
233
234
			LOGM(error) << "Could not send message! Trying again later";
			_connected = false;
		}
235
236
237
238
		s.pushReadingQueue(reads, count);
		//totalCount -= count;
		totalCount -= 1;
		return 1;
239
	}
240
	return 0;
241
}
242
243

bool MQTTPusher::sendMappings() {
244
	if(!_autoPublish) return false;
245

246
	std::string topic, payload;
247
248
	unsigned int publishCtr=0;
	// Performing auto-publish for sensors
249
250
	for(auto& p: _plugins) {
		for(auto& g: p.configurator->getSensorGroups()) {
251
			for(auto& s: g->acquireSensors()) {
252
253
254
255
256
257
258
259
260
261
262
263
264
265
				if(s->getPublish()) {
					topic = std::string(DCDB_MAP) + s->getMqtt();
					// If metadata is missing, we use a dummy temporary structure
					if (!s->getMetadata()) {
						SensorMetadata sm;
						sm.publicName = s->getName();
						s->setMetadata(sm);
					}
					try {
						payload = s->getMetadata()->getJSON();
					} catch (const std::exception &e) {
						LOGM(error) << "Malformed metadata for sensor " << s->getName() << "!";
						continue;
					}
266

267
268
269
270
271
272
273
					// Try to send mapping to the broker
					if (mosquitto_publish(_mosq, NULL, topic.c_str(), payload.length(), payload.c_str(), _qosLevel, false) != MOSQ_ERR_SUCCESS) {
						LOGM(error) << "Broker not reachable! Only " << publishCtr << " sensors were published.";
						_connected = false;
						return true;
					} else
						publishCtr++;
274
275
				}
			}
276
			g->releaseSensors();
277
278
		}
	}
279
280

	// Performing auto-publish for analytics output sensors
281
282
283
284
	for(auto& p: _operatorPlugins)
		for(auto& op: p.configurator->getOperators())
			if(op->getStreaming() && !op->getDynamic()) {
				for (auto &u: op->getUnits())
285
					for (auto &s: u->getBaseOutputs()) {
286
287
288
289
290
291
292
293
294
295
296
297
298
						if (s->getPublish()) {
							topic = std::string(DCDB_MAP) + s->getMqtt();
							if (!s->getMetadata()) {
								SensorMetadata sm;
								sm.publicName = s->getName();
								s->setMetadata(sm);
							}
							try {
								payload = s->getMetadata()->getJSON();
							} catch (const std::exception &e) {
								LOGM(error) << "Malformed metadata for sensor " << s->getName() << "!";
								continue;
							}
299

300
301
302
303
304
305
306
307
							// Try to send mapping to the broker
							if (mosquitto_publish(_mosq, NULL, topic.c_str(), payload.length(), payload.c_str(), _qosLevel, false) != MOSQ_ERR_SUCCESS) {
								LOGM(error) << "Broker not reachable! Only " << publishCtr << " sensors were published.";
								_connected = false;
								return true;
							} else
								publishCtr++;
						}
308
					}
309
				op->releaseUnits();
310
			}
311
	LOGM(info) << "Sensor name auto-publish performed for " << publishCtr << " sensors!";
312
313
	return true;
}
314

315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
bool MQTTPusher::halt(unsigned short timeout) {
    _doHalt = true;

    for (unsigned short i = 1; i <= timeout; i++) {
        if (_halted) {
            return true;
        } else {
            LOGM(info) << "Waiting for push cycle to pause... (" << i << "/" << timeout << ")";
            sleep(1);
        }
    }

    cont();
    LOGM(info) << "Timeout: push cycle did not pause. Continuing...";
    return false;
}

Alessio Netti's avatar
Alessio Netti committed
332
333
334
void MQTTPusher::computeMsgRate() {
	// Computing number of sent MQTT messages per second
	float msgRate = 0;
335
	bool dynWarning = false;
336
337
	for(auto& p : _plugins) {
		for(const auto& g : p.configurator->getSensorGroups()) {
338
339
			msgRate += (float)g->acquireSensors().size() * ( 1000.0f / (float)g->getInterval() ) / (float)g->getMinValues();
			g->releaseSensors();
340
341
		}
	}
342
343
344
345
346
347
348
	for(auto& p : _operatorPlugins)
		for(const auto& op : p.configurator->getOperators()) {
            if (op->getStreaming() && !op->getDynamic()) {
                for (const auto &u : op->getUnits())
                    msgRate += (float) u->getBaseOutputs().size() * (1000.0f / (float) op->getInterval()) / (float) op->getMinValues();
                op->releaseUnits();
            } else if (op->getDynamic())
349
350
                dynWarning = true;
        }
Alessio Netti's avatar
Alessio Netti committed
351
	// The formula below assumes the pusher's sleep time is 1 sec; if not, change accordingly
Alessio Netti's avatar
Alessio Netti committed
352
353
354
355
356
357
358
359
360
361
362
363
364
	if(_maxNumberOfMessages >= 0 && _msgCap != MINIMUM) {
		_msgCap = _maxNumberOfMessages == 0 || msgRate > _maxNumberOfMessages ? DISABLED : ENABLED;
		if (_msgCap == DISABLED && _maxNumberOfMessages > 0)
			LOGM(warning) << "Cannot enforce max rate of " << _maxNumberOfMessages << " msg/s lower than actual " << msgRate << " msg/s!";
		else if(_maxNumberOfMessages > 0)
			LOGM(info) << "Enforcing message cap of " << _maxNumberOfMessages << " msg/s against actual " << msgRate << " msg/s.";
		else
			LOGM(info) << "No message cap enforced. Predicted message rate is " << msgRate << " msg/s.";
	} else {
		_msgCap = MINIMUM;
		_maxNumberOfMessages = msgRate + 10;
		LOGM(info) << "Enforcing message cap of " << _maxNumberOfMessages << " msg/s against actual " << msgRate << " msg/s.";
	}
365
366
	if(_msgCap!=DISABLED && dynWarning)
	    LOGM(warning) << "Attention! The computed message rate does not account for analyzers with dynamically-generated sensors.";
Alessio Netti's avatar
Alessio Netti committed
367
}