SensorGroupInterface.h 8.03 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//================================================================================
// Name        : SensorGroupInterface.h
// Author      : Micha Mueller
// Copyright   : Leibniz Supercomputing Centre
// Description : Abstract interface defining sensor group functionality.
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2018-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
26

27
28
29
30
/**
 * @defgroup pusherplugins Pusher Plugins
 * @ingroup  pusher
 *
31
32
 * @brief Collection of plugin interfaces, plugin templates summarizing common
 *        logic and the plugins itself.
33
34
 */

35
36
37
#ifndef SENSORGROUPINTERFACE_H_
#define SENSORGROUPINTERFACE_H_

38
39
40
41
#include <atomic>
#include <memory>
#include <boost/asio.hpp>

42
43
#include "logging.h"
#include "sensorbase.h"
44

45
46
47
/**
 * @brief Abstract interface defining sensor group functionality.
 *
48
49
50
 * @details Sensor groups should not implement this interface themselves but
 *          inherit the SensorGroupTemplate instead.
 *
51
52
 * @ingroup pusherplugins
 */
53
class SensorGroupInterface {
54
public:
55

56
	SensorGroupInterface(const std::string& groupName) :
57
		_groupName(groupName),
58
		_mqttPart(""),
59
		_sync(true),
60
		_keepRunning(false),
61
62
63
64
65
66
67
		_minValues(1),
		_interval(1000),
		_pendingTasks(0),
		_timer(nullptr) {}

	SensorGroupInterface(const SensorGroupInterface& other) :
		_groupName(other._groupName),
68
		_mqttPart(other._mqttPart),
69
		_sync(other._sync),
70
71
72
73
74
75
		_keepRunning(other._keepRunning),
		_minValues(other._minValues),
		_interval(other._interval),
		_timer(nullptr) {
		_pendingTasks.store(other._pendingTasks.load());
	}
76
77
78

	virtual ~SensorGroupInterface() {}

79
80
	SensorGroupInterface& operator=(const SensorGroupInterface& other) {
		_groupName = other._groupName;
81
		_mqttPart = other._mqttPart;
82
		_sync = other._sync;
83
84
85
86
87
88
89
90
91
		_keepRunning = other._keepRunning;
		_minValues = other._minValues;
		_interval = other._interval;
		_pendingTasks.store(other._pendingTasks.load());
		_timer = nullptr;

		return *this;
	}

92
93
	///@name Getters
	///@{
94
	const std::string& 	getGroupName()	const	{ return _groupName; }
95
	const std::string& 	getMqttPart()	const	{ return _mqttPart; }
96
	bool				getSync()		const	{ return _sync; }
97
98
	unsigned			getMinValues()	const	{ return _minValues; }
	unsigned			getInterval()	const	{ return _interval; }
99
	///@}
100

101
102
	///@name Setters
	///@{
103
	void setGroupName(const std::string& groupName)	{ _groupName = groupName; }
104
	void setMqttPart(const std::string& mqttPart)	{ _mqttPart = mqttPart; }
105
	void setSync(bool sync)							{ _sync = sync; }
106
107
	void setMinValues(unsigned minValues)			{ _minValues = minValues; }
	void setInterval(unsigned interval)				{ _interval = interval; }
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
	///@}


	///@name Interface methods
	///@{
    /**
     * @brief Initialize the sensor group.
     *
     * @details Derived classes overwriting this method must ensure to either
     *          call this method or reproduce its functionality.
     *
     * @param io IO service to initialize the timer with.
     */
    virtual void init(boost::asio::io_service& io) {
        _timer.reset(new boost::asio::deadline_timer(io, boost::posix_time::seconds(0)));
    }
124
125

	/**
126
	 * @brief Start the sensor group (i.e. start collecting data).
127
128
	 */
	virtual void start() = 0;
129
130
131
132

	/**
     * @brief Stop the sensor group (i.e. stop collecting data).
     */
133
	virtual void stop() = 0;
134

135
136
137
138
139
	/**
	 * @brief Add a sensor to this group.
	 *
	 * @param s Shared pointer to the sensor.
	 */
140
	virtual void pushBackSensor(SBasePtr s)		= 0;
141
142
143
144
145
146

	/**
	 * @brief Retrieve all sensors of this group.
	 *
	 * @return A std::vector with shared pointers to all associated sensors.
	 */
147
	virtual std::vector<SBasePtr>& getSensors()	= 0;
148

149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
    /**
     * @brief Print interface configuration.
     *
     * @details Derived classes overwriting this method must ensure to either
     *          call this method or reproduce its functionality.
     *
     * @param ll Log severity level to be used from logger.
     */
    virtual void printConfig(LOG_LEVEL ll) {
        LOG_VAR(ll) << "        Sensor Group: " << _groupName;
        if (_mqttPart != "") {
            LOG_VAR(ll) << "            MQTT part: " << _mqttPart;
        }

        if (_sync) {
            LOG_VAR(ll) << "            Synchronized readings enabled";
        } else {
            LOG_VAR(ll) << "            Synchronized readings disabled";
        }

        LOG_VAR(ll) << "            minValues: " << _minValues;
        LOG_VAR(ll) << "            interval:  " << _interval;
    }

173
protected:
174
175
176
177

    /**
     * @brief Read data for all sensors once.
     */
178
	virtual void read() = 0;
179
180
181
182
183
184

	/**
	 * @brief Asynchronous callback if _timer expires.
	 *
	 * @details Issues a read() and sets the timer again if _keepRunning is true.
	 */
185
	virtual void readAsync() = 0;
186
	///@}
187

188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
	///@name Utility methods
    ///@{
public:
    /**
     * @brief Does a busy wait until all dispatched handlers are finished
     *        (_pendingTasks == 0).
     *
     * @details If the wait takes longer than a reasonable amount of time we
     *          return anyway, to not block termination of dcdbpusher.
     */
    void wait() {
        unsigned short retries = 3;

        for (unsigned short i = 1; i <= retries; i++) {
            if (_pendingTasks) {
                LOG(info) << "Group " << _groupName << " not yet finished. Waiting... (" << i << "/" << retries << ")";
                sleep((_interval/1000) + 1);
            } else {
                return;
            }
        }

        LOG(warning) << "Group " << _groupName << " will not finish! Skipping it";
    }

protected:
    /**
     * @brief Calculate timestamp for the next reading.
     *
     * @return Timestamp in the future to wait for
     */
    uint64_t nextReadingTime() {
        uint64_t now = getTimestamp();
        uint64_t next;
        if (_sync) {
            uint64_t interval64 = static_cast<uint64_t>(_interval);
            uint64_t now_ms = now / 1000 / 1000;
            //synchronize all measurements with other sensors
            uint64_t waitToStart = interval64 - (now_ms%interval64);
            //less than 1 ms seconds is too small, so we wait the entire interval for the next measurement
            if(!waitToStart ){
                    return (now_ms + interval64)*1000*1000;
            }
            return (now_ms + waitToStart)*1000*1000;
        } else {
            return now + MS_TO_NS(_interval);
        }
    }
    ///@}

	std::string _groupName; ///< String name of this group
	std::string	_mqttPart; ///< MQTT part identifying this group
	bool _sync; ///< Should the timer (i.e. the read cycle of this groups) be synchronized with other groups?
	bool _keepRunning; ///< Continue with next reading cycle (i.e. set timer again after reading)?
	unsigned int _minValues; ///< Minimum number of values a sensor should gather before they get pushed (to reduce MQTT overhead)
	unsigned int _interval; ///< Reading interval cycle in milliseconds
	std::atomic_uint _pendingTasks; ///< Number of currently outstanding read operations
	std::unique_ptr<boost::asio::deadline_timer> _timer; ///< Time readings in a periodic interval
	LOGGER lg; ///< Personal logging instance
247
248
};

249
250
251
//for better readability
using SGroupPtr = std::shared_ptr<SensorGroupInterface>;

252
#endif /* SENSORGROUPINTERFACE_H_ */