MQTTPusher.cpp 3.24 KB
Newer Older
1
2
3
4
/*
 * MQTTPusher.cpp
 *
 *  Created on: 13.12.2017
5
 *      Author: Michael Ott (original), Micha Mueller
6
7
8
9
10
11
12
 */

#include "MQTTPusher.h"
#include <iostream>
#include <string>
#include <unistd.h>

13
14
#define LOGM(sev) LOG(sev) << "Mosquitto: "

15
16
17
18
19
extern volatile int keepRunning;

MQTTPusher::MQTTPusher(int brokerPort, const std::string& brokerHost,
		const std::string& mqttPrefix, sensorVector_t& sensors) :
		_brokerPort(brokerPort), _brokerHost(brokerHost),
20
		_sensors(sensors),_connected(false) {
Micha Mueller's avatar
Micha Mueller committed
21
22

	//first print some info
23
24
	int mosqMajor, mosqMinor, mosqRevision;
	mosquitto_lib_version(&mosqMajor, &mosqMinor, &mosqRevision);
25
	LOGM(info) << mosqMajor << "." << mosqMinor << "." << mosqRevision;
26
27
	char hostname[256];
	if (gethostname(hostname, 255) != 0) {
28
		LOG(fatal) << "Cannot get hostname";
29
30
31
		exit(EXIT_FAILURE);
	}
	hostname[255] = '\0';
32
	LOG(info) << "Hostname: " << hostname;
Micha Mueller's avatar
Micha Mueller committed
33
	//enough information
34

Micha Mueller's avatar
Micha Mueller committed
35
	//init mosquitto-struct
36
	mosquitto_lib_init();
37
38
	std::string clientID(mqttPrefix);
	_mosq = mosquitto_new(mqttPrefix.c_str(), false, NULL);
39
40
41
42
43
44
45
46
47
48
49
50
51
	if (!_mosq) {
		perror(NULL);
		exit(EXIT_FAILURE);
	}
}

MQTTPusher::~MQTTPusher() {
	if(_connected) {
		mosquitto_disconnect(_mosq);
	}
}

void MQTTPusher::push() {
Micha Mueller's avatar
Micha Mueller committed
52
53

	//connect to broker (if necessary)
54
55
	while (keepRunning && !_connected) {
		if (mosquitto_connect(_mosq, _brokerHost.c_str(), _brokerPort, 1000) != MOSQ_ERR_SUCCESS) {
56
			LOGM(error) << "Could not connect to MQTT broker " << _brokerHost << ":" << _brokerPort;
57
58
59
			sleep(1);
		} else {
			_connected = true;
60
			LOGM(info) << "Connection established!";
61
62
63
		}
	}

64
65
66
67
68
69
70
	if (mosquitto_loop_start(_mosq) != MOSQ_ERR_SUCCESS) {
		LOGM(fatal) << "Setup failed";
		keepRunning = 0;
		mosquitto_disconnect(_mosq);
		return;
	}

Micha Mueller's avatar
Micha Mueller committed
71
	//collect sensor-data
72
73
74
75
	reading_t* reads = new reading_t[1024];
	std::size_t totalCount = 0;
	while (keepRunning || totalCount) {
		totalCount = 0;
76
77
		for(auto s : _sensors) {
			if (s->getSizeOfReadingQueue() >= s->getMinValues()) {
Micha Mueller's avatar
Micha Mueller committed
78
79

				//there was a (unintended) disconnect in the meantime --> reconnect
80
				if (!_connected) {
81
					LOGM(error) << "Lost connection. Reconnecting...";
82
					if (mosquitto_reconnect(_mosq) != MOSQ_ERR_SUCCESS) {
83
						LOGM(error) << "Could not reconnect to MQTT broker " << _brokerHost << ":" << _brokerPort << std::endl;
84
						sleep(5);
85
86
					} else {
						_connected = true;
87
						LOGM(info) << "Connection established!";
88
89
90
91
					}
				}

				if (_connected) {
Micha Mueller's avatar
Micha Mueller committed
92
93

					//get all sensor values out of its queue
94
					std::size_t count = s->popReadingQueue(reads, 1024);
95
96
					totalCount+= count;
#ifdef DEBUG
97
					LOGM(debug) << "Sending " << count << " values from " << s->getName();
98
99
#endif

100
#if DEBUG
101
					for (std::size_t i=0; i<count; i++) {
102
						LOG(debug) << "  " << reads[i].timestamp << " " << reads[i].value;
103
104
					}
#endif
Micha Mueller's avatar
Micha Mueller committed
105
					//try to send them to the broker
106
					if (mosquitto_publish(_mosq, NULL, (s->getMqtt()).c_str(), sizeof(reading_t)*count, reads, 1, false) != MOSQ_ERR_SUCCESS) {
Micha Mueller's avatar
Micha Mueller committed
107
108

						//could not send them --> push the sensor values back into the queue
109
						LOGM(error) << "Could not send message! Trying again later";
110
						_connected = false;
111
						s->pushReadingQueue(reads, count);
112
113
114
115
116
117
118
119
120
						totalCount -= count;
						sleep(5);
						break;
					}
				}
			}
		}
		sleep(1);
	}
121
122
	mosquitto_disconnect(_mosq);
	mosquitto_loop_stop(_mosq, false);
123
}