SensorGroupInterface.h 12.1 KB
Newer Older
1
2
3
//================================================================================
// Name        : SensorGroupInterface.h
// Author      : Micha Mueller
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Copyright   : Leibniz Supercomputing Centre
// Description : Abstract interface defining sensor group functionality.
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2018-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
27

28
29
30
31
/**
 * @defgroup pusherplugins Pusher Plugins
 * @ingroup  pusher
 *
32
33
 * @brief Collection of plugin interfaces, plugin templates summarizing common
 *        logic and the plugins itself.
34
35
 */

36
37
38
#ifndef SENSORGROUPINTERFACE_H_
#define SENSORGROUPINTERFACE_H_

39
40
#include <atomic>
#include <boost/asio.hpp>
41
#include <memory>
42

43
44
#include "logging.h"
#include "sensorbase.h"
45

46
47
48
/**
 * @brief Abstract interface defining sensor group functionality.
 *
49
50
51
 * @details Sensor groups should not implement this interface themselves but
 *          inherit the SensorGroupTemplate instead.
 *
52
53
 * @ingroup pusherplugins
 */
54
class SensorGroupInterface {
55
56
57
58
59
60
61
62
63
      public:
	SensorGroupInterface(const std::string &groupName)
	    : _groupName(groupName),
	      _mqttPart(""),
	      _sync(true),
	      _keepRunning(false),
	      _disabled(false),
	      _minValues(1),
	      _interval(1000),
64
	      _queueSize(1024),
65
66
67
	      _pendingTasks(0),
	      _timer(nullptr) {
	}
68

69
70
71
72
73
74
75
76
	SensorGroupInterface(const SensorGroupInterface &other)
	    : _groupName(other._groupName),
	      _mqttPart(other._mqttPart),
	      _sync(other._sync),
	      _keepRunning(false),
	      _disabled(other._disabled),
	      _minValues(other._minValues),
	      _interval(other._interval),
77
	      _queueSize(other._queueSize),
78
79
80
	      _pendingTasks(0),
	      _timer(nullptr) {
	}
81

82
	virtual ~SensorGroupInterface() {}
83

84
85
86
87
88
89
90
91
	SensorGroupInterface &operator=(const SensorGroupInterface &other) {
		_groupName = other._groupName;
		_mqttPart = other._mqttPart;
		_sync = other._sync;
		_keepRunning = false;
		_disabled = other._disabled;
		_minValues = other._minValues;
		_interval = other._interval;
92
		_queueSize = other._queueSize;
93
94
		_pendingTasks.store(0);
		_timer = nullptr;
95

96
97
		return *this;
	}
98

99
100
101
102
103
	///@name Getters
	///@{
	const std::string &getGroupName() const { return _groupName; }
	const std::string &getMqttPart() const { return _mqttPart; }
	bool               getSync() const { return _sync; }
104
	virtual bool       isDisabled() const { return _disabled; }
105
106
	unsigned           getMinValues() const { return _minValues; }
	unsigned           getInterval() const { return _interval; }
107
	unsigned           getQueueSize() const { return _queueSize; }
108
	///@}
109

110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
	///@name Setters
	///@{
	void setGroupName(const std::string &groupName) { _groupName = groupName; }
	void setMqttPart(const std::string &mqttPart) {
		_mqttPart = mqttPart;
		//sanitize mqttPart into uniform /xxxx format
		if (_mqttPart.front() != '/') {
			_mqttPart.insert(0, "/");
		}
		if (_mqttPart.back() == '/') {
			_mqttPart.erase(_mqttPart.size() - 1);
		}
	}
	void setSync(bool sync) { _sync = sync; }
	void setDisabled(const bool disabled) { _disabled = disabled; }
	void setMinValues(unsigned minValues) { _minValues = minValues; }
	void setInterval(unsigned interval) { _interval = interval; }
127
	void setQueueSize(unsigned queueSize) { _queueSize = queueSize; }
128
	///@}
129

130
131
132
	///@name Publicly accessible interface methods (implemented in SensorGroupTemplate)
	///@{
	/**
133
134
135
136
137
138
139
     * @brief Initialize the sensor group.
     *
     * @details Derived classes overwriting this method must ensure to either
     *          call this method or reproduce its functionality.
     *
     * @param io IO service to initialize the timer with.
     */
140
141
142
143
144
	virtual void init(boost::asio::io_service &io) {
		_timer.reset(new boost::asio::deadline_timer(io, boost::posix_time::seconds(0)));
	}

	/**
145
146
     * @brief Waits for the termination of the sensor group.
     */
147
148
149
	virtual void wait() = 0;

	/**
150
151
     * @brief Start the sensor group (i.e. start collecting data).
     */
152
	virtual void start() = 0;
153

154
	/**
155
     * @brief Stop the sensor group (i.e. stop collecting data).
156
157
     * 
     * @details Must be followed by a call to the wait() method.
158
     */
159
160
161
	virtual void stop() = 0;

	/**
162
163
164
165
     * @brief Add a sensor to this group.
     *
     * @param s Shared pointer to the sensor.
     */
166
	virtual void pushBackSensor(SBasePtr s) = 0;
167

168
	/**
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
     * @brief Acquire access to all sensors of this group.
     *
     * @details Always release sensor access via releaseSensors() afterwards to
     *          ensure multi-threading safety!
     *          acquireSensors() and releaseSensors() allow for thread-safe
     *          (locked) access to _baseSensors. However, as most plugins do not
     *          require synchronized access (and we want to avoid the induced
     *          lock overhead for them) the actual locking mechanism is only
     *          implemented in plugins actually requiring thread-safe access.
     *          Every caller which may require thread safe access to
     *          _baseSensors should use acquireSensors() and releaseSensors().
     *          If a plugin does not implement a locking mechanism because it
     *          does not require synchronized access releaseSensors() can be
     *          omitted.
     *          See the Caliper plugin for a SensorGroup which implements locked
     *          access to _baseSensors.
185
186
187
     *
     * @return A std::vector with shared pointers to all associated sensors.
     */
188
	virtual std::vector<SBasePtr> &acquireSensors() { return _baseSensors; }
189

190
	/**
191
192
193
194
195
196
     * @brief Release previously acquired access to sensors of this group.
     *
     * @details Always acquire sensors via acquireSensors() beforehand!
     *          See description of acquireSensors() for an in-depth reasoning
     *          about this method.
     */
197
198
	virtual void releaseSensors() { /* do nothing in base implementation */
	}
199

200
201
202
203
204
205
206
207
	/**
	 * @brief Compute message rate of this group.
	 *
	 * @details Computes the message rate based on number of message, interval
	 *          and minValues
	 *
	 * @return The message rate in messages/s.
	 */
208
209
210
	virtual float getMsgRate() {
		float val = 0;
		for (const auto s: _baseSensors) {
211
212
			if(s->getSubsampling() > 0)
				val+= 1.0f / (float)s->getSubsampling();
213
214
215
		}
		return val * (1000.0f / (float)_interval) / (float)_minValues;
	}
216
 
217
	/**
218
219
220
221
222
223
224
     * @brief Print interface configuration.
     *
     * @details Derived classes overwriting this method must ensure to either
     *          call this method or reproduce its functionality.
     *
     * @param ll Log severity level to be used from logger.
     */
225
226
227
228
229
230
231
	virtual void printConfig(LOG_LEVEL ll, unsigned leadingSpaces = 8) {
		std::string leading(leadingSpaces, ' ');
		LOG_VAR(ll) << leading << _groupName;
		LOG_VAR(ll) << leading << "    Disabled:     " << (_disabled ? std::string("true") : std::string("false"));
		if (_mqttPart != "") {
			LOG_VAR(ll) << leading << "    MQTT part:    " << _mqttPart;
		}
232

233
234
235
		LOG_VAR(ll) << leading << "    Synchronized: " << (_sync ? std::string("true") : std::string("false"));
		LOG_VAR(ll) << leading << "    minValues:    " << _minValues;
		LOG_VAR(ll) << leading << "    interval:     " << _interval;
236
		LOG_VAR(ll) << leading << "    queueSize:    " << _queueSize;
237
238
	}
	///@}
239

240
241
242
243
      protected:
	///@name Internal methods (implemented in a plugin's SensorGroup)
	///@{
	/**
244
245
     * @brief Read data for all sensors once.
     */
246
	virtual void read() = 0;
247

248
	/**
249
250
251
252
253
254
     * @brief Implement plugin specific actions to initialize a group here.
     *
     * @details If a derived class (i.e. a plugin group) requires further custom
     *          actions for initialization, this should be implemented here.
     *          %initGroup() is appropriately called during init().
     */
255
256
	virtual void execOnInit() { /* do nothing if not overwritten */
	}
257

258
	/**
259
260
261
262
263
264
     * @brief Implement plugin specific actions to start a group here.
     *
     * @details If a derived class (i.e. a plugin group) requires further custom
     *          actions to start polling data (e.g. open a file descriptor),
     *          this should be implemented here. %startGroup() is appropriately
     *          called during start().
265
     *
266
     * @return True on success, false otherwise.
267
     */
268
	virtual bool execOnStart() { return true; }
269

270
	/**
271
272
273
274
275
276
277
     * @brief Implement plugin specific actions to stop a group here.
     *
     * @details If a derived class (i.e. a plugin group) requires further custom
     *          actions to stop polling data (e.g. close a file descriptor),
     *          this should be implemented here. %stopGroup() is appropriately
     *          called during stop().
     */
278
279
	virtual void execOnStop() { /* do nothing if not overwritten */
	}
280

281
	/**
282
283
284
285
286
287
288
     * @brief Print information about plugin specific group attributes (or
     *        nothing if no such attributes are present).
     *
     * @details Only overwrite if necessary.
     *
     * @param ll Severity level to log with
     */
289
290
	virtual void printGroupConfig(LOG_LEVEL ll, unsigned int leadingSpaces) {
	}
291
	///@}
292

293
294
295
	///@name Utility methods
	///@{
	/**
296
297
298
299
     * @brief Calculate timestamp for the next reading.
     *
     * @return Timestamp in the future to wait for
     */
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
	virtual uint64_t nextReadingTime() {
		uint64_t now = getTimestamp();
		uint64_t next;
		if (_sync) {
			uint64_t interval64 = static_cast<uint64_t>(_interval);
			uint64_t now_ms = now / 1000 / 1000;
			//synchronize all measurements with other sensors
			uint64_t waitToStart = interval64 - (now_ms % interval64);
			//less than 1 ms seconds is too small, so we wait the entire interval for the next measurement
			if (!waitToStart) {
				return (now_ms + interval64) * 1000 * 1000;
			}
			return (now_ms + waitToStart) * 1000 * 1000;
		} else {
			return now + MS_TO_NS(_interval);
		}
	}
	///@}
318

319
320
321
322
323
324
325
326
	//TODO unify naming scheme: _groupName --> _name (also refactor getter+setter)
	std::string                                  _groupName;   ///< String name of this group
	std::string                                  _mqttPart;    ///< MQTT part identifying this group
	bool                                         _sync;        ///< Should the timer (i.e. the read cycle of this groups) be synchronized with other groups?
	bool                                         _keepRunning; ///< Continue with next reading cycle (i.e. set timer again after reading)?
	bool                                         _disabled;
	unsigned int                                 _minValues;    ///< Minimum number of values a sensor should gather before they get pushed (to reduce MQTT overhead)
	unsigned int                                 _interval;     ///< Reading interval cycle in milliseconds
327
	unsigned int                                 _queueSize;    ///< Maximum number of queued readings
328
329
330
331
	std::atomic_uint                             _pendingTasks; ///< Number of currently outstanding read operations
	std::unique_ptr<boost::asio::deadline_timer> _timer;        ///< Time readings in a periodic interval
	std::vector<SBasePtr>                        _baseSensors;  ///< Vector with sensors associated to this group
	LOGGER                                       lg;            ///< Personal logging instance
332
333
};

334
335
336
//for better readability
using SGroupPtr = std::shared_ptr<SensorGroupInterface>;

337
#endif /* SENSORGROUPINTERFACE_H_ */