MQTTPusher.h 3.82 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//================================================================================
// Name        : MQTTPusher.h
// Author      : Michael Ott, Micha Mueller
// Copyright   : Leibniz Supercomputing Centre
// Description : Collects values from the sensors and pushes them to the database.
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2017-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
26
27
28
29

#ifndef MQTTPUSHER_H_
#define MQTTPUSHER_H_

30
#define DCDB_MAP "/DCDB_MAP/"
31
#define PUSHER_IDLETIME 1000000000
32

33
#include <mosquitto.h>
34
#include <map>
35
#include "PluginManager.h"
36
37
#include "sensorbase.h"
#include "../analytics/AnalyticsManager.h"
38

Alessio Netti's avatar
Alessio Netti committed
39
40
enum msgCap_t {DISABLED = 1, ENABLED = 2, MINIMUM = 3};

41
42
43
44
45
/**
 * @brief Collects values from the sensors and pushes them to the database.
 *
 * @ingroup pusher
 */
46
47
class MQTTPusher {
public:
48
	MQTTPusher(int brokerPort, const std::string& brokerHost, const std::string& sensorPattern, int qosLevel,
49
			   pusherPluginStorage_t& plugins, an_pluginVector_t& aPlugins, int maxNumberOfMessages, unsigned int maxInflightMsgNum, unsigned int maxQueuedMsgNum);
50
51
	virtual ~MQTTPusher();

52
53
54
55
56
57
58
	/**
	 * @brief MQTTPusher's main execution loop.
	 *
	 * @details If MQTTPusher is started this function runs indefinitely until
	 *          a call to stop(). Execution of the main loop can be halted and
	 *          continued with halt() and cont() respectively.
	 */
59
	void push();
60
61
62
63
64
65

	/**
	 * @brief
	 *
	 * @return
	 */
66
	bool sendMappings();
67

68
69
70
	/**
	 * @brief Start MQTTPusher's push loop.
	 */
71
72
73
74
	void start() {
		_keepRunning = true;
	}

75
76
77
	/**
	 * @brief Stop MQTTPusher's push loop and terminate its execution.
	 */
78
79
80
81
	void stop() {
		_keepRunning = false;
	}

82
83
84
85
86
87
88
89
90
91
92
93
94
95
	/**
	 * @brief Blocking call to pause MQTTPusher's push loop.
	 *
	 * @details Instructs MQTTPusher to pause its push loop and blocks until
	 *          MQTTPusher is actually paused or a timeout occurred. On a
	 *          timeout execution of MQTTPusher is continued.
	 *
	 * @param timeout Time in seconds to wait for MQTTPusher to finish its
	 *                current push cycle.
	 *
	 * @return True if MQTTPusher was succesfully paused, false if a timeout
	 *         occurred and MQTTPusher still runs.
	 */
	bool halt(unsigned short timeout = 5);
96

97
98
99
	/**
	 * @brief Continue MQTTPusher's push loop.
	 */
100
	void cont() {
Alessio Netti's avatar
Alessio Netti committed
101
		computeMsgRate();
102
103
104
		_doHalt = false;
	}

105
private:
106
	int sendReadings(SensorBase& s, reading_t* reads, std::size_t& totalCount);
Alessio Netti's avatar
Alessio Netti committed
107
	void computeMsgRate();
108

Alessio Netti's avatar
Alessio Netti committed
109
	int _qosLevel;
110
111
	int _brokerPort;
	std::string _brokerHost;
112
	std::string _sensorPattern;
113
	pusherPluginStorage_t& _plugins;
114
	an_pluginVector_t& _analyticsPlugins;
115
116
	struct mosquitto* _mosq;
	bool _connected;
117
	bool _keepRunning;
Alessio Netti's avatar
Alessio Netti committed
118
	msgCap_t _msgCap;
119
120
	bool _doHalt;
	bool _halted;
Alessio Netti's avatar
Alessio Netti committed
121
	int _maxNumberOfMessages;
122
123
	unsigned int _maxInflightMsgNum;
	unsigned int _maxQueuedMsgNum;
124
125

	boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
126
127
128
};

#endif /* MQTTPUSHER_H_ */