MQTTPusher.h 2.58 KB
Newer Older
1
2
3
4
/*
 * MQTTPusher.h
 *
 *  Created on: 13.12.2017
5
 *      Author: Michael Ott (original), Micha Mueller
6
7
8
9
10
 */

#ifndef MQTTPUSHER_H_
#define MQTTPUSHER_H_

11
#define DCDB_MAP "/DCDB_MAP/"
12
#define PUSHER_IDLETIME 1000000000
13

14
#include <mosquitto.h>
15
#include <map>
16
#include "PluginManager.h"
17
18
#include "sensorbase.h"
#include "../analytics/AnalyticsManager.h"
19

Alessio Netti's avatar
Alessio Netti committed
20
21
enum msgCap_t {DISABLED = 1, ENABLED = 2, MINIMUM = 3};

Micha Mueller's avatar
Micha Mueller committed
22
/**
Micha Mueller's avatar
Micha Mueller committed
23
 * Class responsible for collecting values from the sensors and pushing them to the database.
Micha Mueller's avatar
Micha Mueller committed
24
 */
25
26
class MQTTPusher {
public:
27
	MQTTPusher(int brokerPort, const std::string& brokerHost, const std::string& sensorPattern, int qosLevel,
28
			   pusherPluginStorage_t& plugins, an_pluginVector_t& aPlugins, int maxNumberOfMessages, unsigned int maxInflightMsgNum, unsigned int maxQueuedMsgNum);
29
30
	virtual ~MQTTPusher();

31
32
33
34
35
36
37
	/**
	 * @brief MQTTPusher's main execution loop.
	 *
	 * @details If MQTTPusher is started this function runs indefinitely until
	 *          a call to stop(). Execution of the main loop can be halted and
	 *          continued with halt() and cont() respectively.
	 */
38
	void push();
39
40
41
42
43
44

	/**
	 * @brief
	 *
	 * @return
	 */
45
	bool sendMappings();
46

47
48
49
	/**
	 * @brief Start MQTTPusher's push loop.
	 */
50
51
52
53
	void start() {
		_keepRunning = true;
	}

54
55
56
	/**
	 * @brief Stop MQTTPusher's push loop and terminate its execution.
	 */
57
58
59
60
	void stop() {
		_keepRunning = false;
	}

61
62
63
64
65
66
67
68
69
70
71
72
73
74
	/**
	 * @brief Blocking call to pause MQTTPusher's push loop.
	 *
	 * @details Instructs MQTTPusher to pause its push loop and blocks until
	 *          MQTTPusher is actually paused or a timeout occurred. On a
	 *          timeout execution of MQTTPusher is continued.
	 *
	 * @param timeout Time in seconds to wait for MQTTPusher to finish its
	 *                current push cycle.
	 *
	 * @return True if MQTTPusher was succesfully paused, false if a timeout
	 *         occurred and MQTTPusher still runs.
	 */
	bool halt(unsigned short timeout = 5);
75

76
77
78
	/**
	 * @brief Continue MQTTPusher's push loop.
	 */
79
	void cont() {
Alessio Netti's avatar
Alessio Netti committed
80
		computeMsgRate();
81
82
83
		_doHalt = false;
	}

84
private:
85
	int sendReadings(SensorBase& s, reading_t* reads, std::size_t& totalCount);
Alessio Netti's avatar
Alessio Netti committed
86
	void computeMsgRate();
87

Alessio Netti's avatar
Alessio Netti committed
88
	int _qosLevel;
89
90
	int _brokerPort;
	std::string _brokerHost;
91
	std::string _sensorPattern;
92
	pusherPluginStorage_t& _plugins;
93
	an_pluginVector_t& _analyticsPlugins;
94
95
	struct mosquitto* _mosq;
	bool _connected;
96
	bool _keepRunning;
Alessio Netti's avatar
Alessio Netti committed
97
	msgCap_t _msgCap;
98
99
	bool _doHalt;
	bool _halted;
Alessio Netti's avatar
Alessio Netti committed
100
	int _maxNumberOfMessages;
101
102
	unsigned int _maxInflightMsgNum;
	unsigned int _maxQueuedMsgNum;
103
104

	boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
105
106
107
};

#endif /* MQTTPUSHER_H_ */