SensorGroupInterface.h 11.6 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//================================================================================
// Name        : SensorGroupInterface.h
// Author      : Micha Mueller
// Copyright   : Leibniz Supercomputing Centre
// Description : Abstract interface defining sensor group functionality.
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2018-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
26

27
28
29
30
/**
 * @defgroup pusherplugins Pusher Plugins
 * @ingroup  pusher
 *
31
32
 * @brief Collection of plugin interfaces, plugin templates summarizing common
 *        logic and the plugins itself.
33
34
 */

35
36
37
#ifndef SENSORGROUPINTERFACE_H_
#define SENSORGROUPINTERFACE_H_

38
39
40
41
#include <atomic>
#include <memory>
#include <boost/asio.hpp>

42
43
#include "logging.h"
#include "sensorbase.h"
44

45
46
47
/**
 * @brief Abstract interface defining sensor group functionality.
 *
48
49
50
 * @details Sensor groups should not implement this interface themselves but
 *          inherit the SensorGroupTemplate instead.
 *
51
52
 * @ingroup pusherplugins
 */
53
class SensorGroupInterface {
54
public:
55

56
57
58
59
60
61
62
63
64
65
    SensorGroupInterface(const std::string& groupName) :
        _groupName(groupName),
        _mqttPart(""),
        _sync(true),
        _keepRunning(false),
        _minValues(1),
        _interval(1000),
        _pendingTasks(0),
        _timer(nullptr) {
    }
66

67
68
69
70
    SensorGroupInterface(const SensorGroupInterface& other) :
        _groupName(other._groupName),
        _mqttPart(other._mqttPart),
        _sync(other._sync),
71
        _keepRunning(false),
72
73
        _minValues(other._minValues),
        _interval(other._interval),
74
        _pendingTasks(0),
75
76
77
78
79
80
81
82
83
        _timer(nullptr) {
    }

    virtual ~SensorGroupInterface() {}

    SensorGroupInterface& operator=(const SensorGroupInterface& other) {
        _groupName = other._groupName;
        _mqttPart = other._mqttPart;
        _sync = other._sync;
84
        _keepRunning = false;
85
86
        _minValues = other._minValues;
        _interval = other._interval;
87
        _pendingTasks.store(0);
88
89
90
91
92
93
94
95
96
97
98
99
100
        _timer = nullptr;

        return *this;
    }

    ///@name Getters
    ///@{
    const std::string& 	getGroupName()	const	{ return _groupName; }
    const std::string& 	getMqttPart()	const	{ return _mqttPart; }
    bool				getSync()		const	{ return _sync; }
    unsigned			getMinValues()	const	{ return _minValues; }
    unsigned			getInterval()	const	{ return _interval; }
    ///@}
101

102
103
104
    ///@name Setters
    ///@{
    void setGroupName(const std::string& groupName)	{ _groupName = groupName; }
105
106
107
108
109
110
111
112
113
114
    void setMqttPart(const std::string& mqttPart)	{
        _mqttPart = mqttPart;
        //sanitize mqttPart into uniform /xxxx format
        if (_mqttPart.front() != '/') {
            _mqttPart.insert(0, "/");
        }
        if (_mqttPart.back() == '/') {
            _mqttPart.erase(_mqttPart.size()-1);
        }
    }
115
116
117
118
    void setSync(bool sync)							{ _sync = sync; }
    void setMinValues(unsigned minValues)			{ _minValues = minValues; }
    void setInterval(unsigned interval)				{ _interval = interval; }
    ///@}
119

120
    ///@name Publicly accessible interface methods (implemented in SensorGroupTemplate)
121
    ///@{
122
123
124
125
126
127
128
129
130
131
132
    /**
     * @brief Initialize the sensor group.
     *
     * @details Derived classes overwriting this method must ensure to either
     *          call this method or reproduce its functionality.
     *
     * @param io IO service to initialize the timer with.
     */
    virtual void init(boost::asio::io_service& io) {
        _timer.reset(new boost::asio::deadline_timer(io, boost::posix_time::seconds(0)));
    }
133

134
135
136
137
    /**
     * @brief Start the sensor group (i.e. start collecting data).
     */
    virtual void start() = 0;
138

139
    /**
140
141
     * @brief Stop the sensor group (i.e. stop collecting data).
     */
142
143
144
145
146
147
148
149
150
151
    virtual void stop() = 0;

    /**
     * @brief Add a sensor to this group.
     *
     * @param s Shared pointer to the sensor.
     */
    virtual void pushBackSensor(SBasePtr s)		= 0;

    /**
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
     * @brief Acquire access to all sensors of this group.
     *
     * @details Always release sensor access via releaseSensors() afterwards to
     *          ensure multi-threading safety!
     *          acquireSensors() and releaseSensors() allow for thread-safe
     *          (locked) access to _baseSensors. However, as most plugins do not
     *          require synchronized access (and we want to avoid the induced
     *          lock overhead for them) the actual locking mechanism is only
     *          implemented in plugins actually requiring thread-safe access.
     *          Every caller which may require thread safe access to
     *          _baseSensors should use acquireSensors() and releaseSensors().
     *          If a plugin does not implement a locking mechanism because it
     *          does not require synchronized access releaseSensors() can be
     *          omitted.
     *          See the Caliper plugin for a SensorGroup which implements locked
     *          access to _baseSensors.
168
169
170
     *
     * @return A std::vector with shared pointers to all associated sensors.
     */
171
172
173
174
175
176
177
178
179
180
    virtual std::vector<SBasePtr>& acquireSensors() { return _baseSensors; }

    /**
     * @brief Release previously acquired access to sensors of this group.
     *
     * @details Always acquire sensors via acquireSensors() beforehand!
     *          See description of acquireSensors() for an in-depth reasoning
     *          about this method.
     */
    virtual void releaseSensors() { /* do nothing in base implementation */ }
181

182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
    /**
     * @brief Print interface configuration.
     *
     * @details Derived classes overwriting this method must ensure to either
     *          call this method or reproduce its functionality.
     *
     * @param ll Log severity level to be used from logger.
     */
    virtual void printConfig(LOG_LEVEL ll) {
        LOG_VAR(ll) << "        Sensor Group: " << _groupName;
        if (_mqttPart != "") {
            LOG_VAR(ll) << "            MQTT part: " << _mqttPart;
        }

        if (_sync) {
            LOG_VAR(ll) << "            Synchronized readings enabled";
        } else {
            LOG_VAR(ll) << "            Synchronized readings disabled";
        }

        LOG_VAR(ll) << "            minValues: " << _minValues;
        LOG_VAR(ll) << "            interval:  " << _interval;
    }
205
    ///@}
206

207
protected:
208

209
210
    ///@name Internal methods (implemented in a plugin's SensorGroup)
    ///@{
211
212
213
    /**
     * @brief Read data for all sensors once.
     */
214
215
216
    virtual void read() = 0;

    /**
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
     * @brief Implement plugin specific actions to initialize a group here.
     *
     * @details If a derived class (i.e. a plugin group) requires further custom
     *          actions for initialization, this should be implemented here.
     *          %initGroup() is appropriately called during init().
     */
    virtual void execOnInit() { /* do nothing if not overwritten */ }

    /**
     * @brief Implement plugin specific actions to start a group here.
     *
     * @details If a derived class (i.e. a plugin group) requires further custom
     *          actions to start polling data (e.g. open a file descriptor),
     *          this should be implemented here. %startGroup() is appropriately
     *          called during start().
232
     *
233
     * @return True on success, false otherwise.
234
     */
235
236
237
238
239
240
241
242
243
244
245
    virtual bool execOnStart() { return true; }

    /**
     * @brief Implement plugin specific actions to stop a group here.
     *
     * @details If a derived class (i.e. a plugin group) requires further custom
     *          actions to stop polling data (e.g. close a file descriptor),
     *          this should be implemented here. %stopGroup() is appropriately
     *          called during stop().
     */
    virtual void execOnStop() { /* do nothing if not overwritten */ }
246
247
248
249
250
251
252
253
254
255
256
257

    /**
     * @brief Print information about plugin specific group attributes (or
     *        nothing if no such attributes are present).
     *
     * @details Only overwrite if necessary.
     *
     * @param ll Severity level to log with
     */
    virtual void printGroupConfig(LOG_LEVEL ll) {
        LOG_VAR(ll) << "            No other plugin-specific group attributes defined";
    }
258
	///@}
259

260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
	///@name Utility methods
    ///@{
    /**
     * @brief Does a busy wait until all dispatched handlers are finished
     *        (_pendingTasks == 0).
     *
     * @details If the wait takes longer than a reasonable amount of time we
     *          return anyway, to not block termination of dcdbpusher.
     */
    void wait() {
        unsigned short retries = 3;

        for (unsigned short i = 1; i <= retries; i++) {
            if (_pendingTasks) {
                LOG(info) << "Group " << _groupName << " not yet finished. Waiting... (" << i << "/" << retries << ")";
                sleep((_interval/1000) + 1);
            } else {
                return;
            }
        }

        LOG(warning) << "Group " << _groupName << " will not finish! Skipping it";
    }

    /**
     * @brief Calculate timestamp for the next reading.
     *
     * @return Timestamp in the future to wait for
     */
    uint64_t nextReadingTime() {
        uint64_t now = getTimestamp();
        uint64_t next;
        if (_sync) {
            uint64_t interval64 = static_cast<uint64_t>(_interval);
            uint64_t now_ms = now / 1000 / 1000;
            //synchronize all measurements with other sensors
            uint64_t waitToStart = interval64 - (now_ms%interval64);
            //less than 1 ms seconds is too small, so we wait the entire interval for the next measurement
            if(!waitToStart ){
                    return (now_ms + interval64)*1000*1000;
            }
            return (now_ms + waitToStart)*1000*1000;
        } else {
            return now + MS_TO_NS(_interval);
        }
    }
    ///@}

308
    //TODO unify naming scheme: _groupName --> _name (also refactor getter+setter)
309
310
311
312
313
314
315
316
    std::string _groupName; ///< String name of this group
    std::string	_mqttPart; ///< MQTT part identifying this group
    bool _sync; ///< Should the timer (i.e. the read cycle of this groups) be synchronized with other groups?
    bool _keepRunning; ///< Continue with next reading cycle (i.e. set timer again after reading)?
    unsigned int _minValues; ///< Minimum number of values a sensor should gather before they get pushed (to reduce MQTT overhead)
    unsigned int _interval; ///< Reading interval cycle in milliseconds
    std::atomic_uint _pendingTasks; ///< Number of currently outstanding read operations
    std::unique_ptr<boost::asio::deadline_timer> _timer; ///< Time readings in a periodic interval
317
    std::vector<SBasePtr> _baseSensors; ///< Vector with sensors associated to this group
318
    LOGGER lg; ///< Personal logging instance
319
320
};

321
322
323
//for better readability
using SGroupPtr = std::shared_ptr<SensorGroupInterface>;

324
#endif /* SENSORGROUPINTERFACE_H_ */