2.12.2021, 9:00 - 11:00: Due to updates GitLab may be unavailable for some minutes between 09:00 and 11:00.

MQTTPusher.h 4.23 KB
Newer Older
1
2
3
//================================================================================
// Name        : MQTTPusher.h
// Author      : Michael Ott, Micha Mueller
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Copyright   : Leibniz Supercomputing Centre
// Description : Collects values from the sensors and pushes them to the database.
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2017-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
27
28
29
30

#ifndef MQTTPUSHER_H_
#define MQTTPUSHER_H_

31
#define DCDB_MAP "/DCDB_MAP/"
32
#define DCDB_MET "/DCDB_MAP/METADATA/"
33
#define PUSHER_IDLETIME 1000000000
34

35
#include "../analytics/OperatorManager.h"
36
#include "PluginManager.h"
37
#include "sensorbase.h"
38
39
#include <map>
#include <mosquitto.h>
40

41
42
43
enum msgCap_t { DISABLED = 1,
		ENABLED = 2,
		MINIMUM = 3 };
Alessio Netti's avatar
Alessio Netti committed
44

Micha Mueller's avatar
Micha Mueller committed
45
/**
46
47
48
 * @brief Collects values from the sensors and pushes them to the database.
 *
 * @ingroup pusher
Micha Mueller's avatar
Micha Mueller committed
49
 */
50
class MQTTPusher {
51
52
      public:
	MQTTPusher(int brokerPort, const std::string &brokerHost, const bool autoPublish, int qosLevel,
53
		   pusherPluginStorage_t &plugins, op_pluginVector_t &oPlugins, int maxNumberOfMessages, unsigned int maxInflightMsgNum, unsigned int maxQueuedMsgNum, unsigned int statisticsInterval, std::string statisticsMqttTopic);
54
55
	virtual ~MQTTPusher();

56
57
58
59
60
61
62
	/**
	 * @brief MQTTPusher's main execution loop.
	 *
	 * @details If MQTTPusher is started this function runs indefinitely until
	 *          a call to stop(). Execution of the main loop can be halted and
	 *          continued with halt() and cont() respectively.
	 */
63
	void push();
64
65
66
67
68
69

	/**
	 * @brief
	 *
	 * @return
	 */
70
	bool sendMappings();
71

72
73
74
	/**
	 * @brief Start MQTTPusher's push loop.
	 */
75
76
77
78
	void start() {
		_keepRunning = true;
	}

79
80
81
	/**
	 * @brief Stop MQTTPusher's push loop and terminate its execution.
	 */
82
83
84
85
	void stop() {
		_keepRunning = false;
	}

86
87
88
89
90
91
92
93
94
95
96
97
98
99
	/**
	 * @brief Blocking call to pause MQTTPusher's push loop.
	 *
	 * @details Instructs MQTTPusher to pause its push loop and blocks until
	 *          MQTTPusher is actually paused or a timeout occurred. On a
	 *          timeout execution of MQTTPusher is continued.
	 *
	 * @param timeout Time in seconds to wait for MQTTPusher to finish its
	 *                current push cycle.
	 *
	 * @return True if MQTTPusher was succesfully paused, false if a timeout
	 *         occurred and MQTTPusher still runs.
	 */
	bool halt(unsigned short timeout = 5);
100

101
102
103
	/**
	 * @brief Continue MQTTPusher's push loop.
	 */
104
	void cont() {
Alessio Netti's avatar
Alessio Netti committed
105
		computeMsgRate();
106
107
108
		_doHalt = false;
	}

109
110
      private:
	int  sendReadings(SensorBase &s, reading_t *reads, std::size_t &totalCount);
Alessio Netti's avatar
Alessio Netti committed
111
	void computeMsgRate();
112

113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
	int                    _qosLevel;
	int                    _brokerPort;
	std::string            _brokerHost;
	bool                   _autoPublish;
	pusherPluginStorage_t &_plugins;
	op_pluginVector_t &    _operatorPlugins;
	struct mosquitto *     _mosq;
	bool                   _connected;
	bool                   _keepRunning;
	msgCap_t               _msgCap;
	bool                   _doHalt;
	bool                   _halted;
	int                    _maxNumberOfMessages;
	unsigned int           _maxInflightMsgNum;
	unsigned int           _maxQueuedMsgNum;
Michael Ott's avatar
Michael Ott committed
128
	unsigned int           _statisticsInterval;
129
	std::string            _statisticsMqttTopic;
130
131

	boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
132
133
134
};

#endif /* MQTTPUSHER_H_ */