Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

MQTTPusher.h 4.16 KB
Newer Older
1
2
3
//================================================================================
// Name        : MQTTPusher.h
// Author      : Michael Ott, Micha Mueller
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Copyright   : Leibniz Supercomputing Centre
// Description : Collects values from the sensors and pushes them to the database.
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2017-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
27
28
29
30

#ifndef MQTTPUSHER_H_
#define MQTTPUSHER_H_

31
#define DCDB_MAP "/DCDB_MAP/"
32
#define DCDB_MET "/DCDB_MAP/METADATA/"
33
#define PUSHER_IDLETIME 1000000000
34

35
#include "../analytics/OperatorManager.h"
36
#include "PluginManager.h"
37
#include "sensorbase.h"
38
39
#include <map>
#include <mosquitto.h>
40

41
42
43
enum msgCap_t { DISABLED = 1,
		ENABLED = 2,
		MINIMUM = 3 };
Alessio Netti's avatar
Alessio Netti committed
44

Micha Mueller's avatar
Micha Mueller committed
45
/**
46
47
48
 * @brief Collects values from the sensors and pushes them to the database.
 *
 * @ingroup pusher
Micha Mueller's avatar
Micha Mueller committed
49
 */
50
class MQTTPusher {
51
52
      public:
	MQTTPusher(int brokerPort, const std::string &brokerHost, const bool autoPublish, int qosLevel,
Michael Ott's avatar
Michael Ott committed
53
		   pusherPluginStorage_t &plugins, op_pluginVector_t &oPlugins, int maxNumberOfMessages, unsigned int maxInflightMsgNum, unsigned int maxQueuedMsgNum, unsigned int statisticsInterval);
54
55
	virtual ~MQTTPusher();

56
57
58
59
60
61
62
	/**
	 * @brief MQTTPusher's main execution loop.
	 *
	 * @details If MQTTPusher is started this function runs indefinitely until
	 *          a call to stop(). Execution of the main loop can be halted and
	 *          continued with halt() and cont() respectively.
	 */
63
	void push();
64
65
66
67
68
69

	/**
	 * @brief
	 *
	 * @return
	 */
70
	bool sendMappings();
71

72
73
74
	/**
	 * @brief Start MQTTPusher's push loop.
	 */
75
76
77
78
	void start() {
		_keepRunning = true;
	}

79
80
81
	/**
	 * @brief Stop MQTTPusher's push loop and terminate its execution.
	 */
82
83
84
85
	void stop() {
		_keepRunning = false;
	}

86
87
88
89
90
91
92
93
94
95
96
97
98
99
	/**
	 * @brief Blocking call to pause MQTTPusher's push loop.
	 *
	 * @details Instructs MQTTPusher to pause its push loop and blocks until
	 *          MQTTPusher is actually paused or a timeout occurred. On a
	 *          timeout execution of MQTTPusher is continued.
	 *
	 * @param timeout Time in seconds to wait for MQTTPusher to finish its
	 *                current push cycle.
	 *
	 * @return True if MQTTPusher was succesfully paused, false if a timeout
	 *         occurred and MQTTPusher still runs.
	 */
	bool halt(unsigned short timeout = 5);
100

101
102
103
	/**
	 * @brief Continue MQTTPusher's push loop.
	 */
104
	void cont() {
Alessio Netti's avatar
Alessio Netti committed
105
		computeMsgRate();
106
107
108
		_doHalt = false;
	}

109
110
      private:
	int  sendReadings(SensorBase &s, reading_t *reads, std::size_t &totalCount);
Alessio Netti's avatar
Alessio Netti committed
111
	void computeMsgRate();
112

113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
	int                    _qosLevel;
	int                    _brokerPort;
	std::string            _brokerHost;
	bool                   _autoPublish;
	pusherPluginStorage_t &_plugins;
	op_pluginVector_t &    _operatorPlugins;
	struct mosquitto *     _mosq;
	bool                   _connected;
	bool                   _keepRunning;
	msgCap_t               _msgCap;
	bool                   _doHalt;
	bool                   _halted;
	int                    _maxNumberOfMessages;
	unsigned int           _maxInflightMsgNum;
	unsigned int           _maxQueuedMsgNum;
Michael Ott's avatar
Michael Ott committed
128
	unsigned int           _statisticsInterval;
129
130

	boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
131
132
133
};

#endif /* MQTTPUSHER_H_ */