SensorGroupInterface.h 11.6 KB
Newer Older
1
2
3
//================================================================================
// Name        : SensorGroupInterface.h
// Author      : Micha Mueller
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Copyright   : Leibniz Supercomputing Centre
// Description : Abstract interface defining sensor group functionality.
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2018-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
27

28
29
30
31
/**
 * @defgroup pusherplugins Pusher Plugins
 * @ingroup  pusher
 *
32
33
 * @brief Collection of plugin interfaces, plugin templates summarizing common
 *        logic and the plugins itself.
34
35
 */

36
37
38
#ifndef SENSORGROUPINTERFACE_H_
#define SENSORGROUPINTERFACE_H_

39
40
41
42
#include <atomic>
#include <memory>
#include <boost/asio.hpp>

43
44
#include "logging.h"
#include "sensorbase.h"
45

46
47
48
/**
 * @brief Abstract interface defining sensor group functionality.
 *
49
50
51
 * @details Sensor groups should not implement this interface themselves but
 *          inherit the SensorGroupTemplate instead.
 *
52
53
 * @ingroup pusherplugins
 */
54
class SensorGroupInterface {
55
public:
56

57
58
59
60
61
62
63
64
65
66
    SensorGroupInterface(const std::string& groupName) :
        _groupName(groupName),
        _mqttPart(""),
        _sync(true),
        _keepRunning(false),
        _minValues(1),
        _interval(1000),
        _pendingTasks(0),
        _timer(nullptr) {
    }
67

68
69
70
71
    SensorGroupInterface(const SensorGroupInterface& other) :
        _groupName(other._groupName),
        _mqttPart(other._mqttPart),
        _sync(other._sync),
72
        _keepRunning(false),
73
74
        _minValues(other._minValues),
        _interval(other._interval),
75
        _pendingTasks(0),
76
77
78
79
80
81
82
83
84
        _timer(nullptr) {
    }

    virtual ~SensorGroupInterface() {}

    SensorGroupInterface& operator=(const SensorGroupInterface& other) {
        _groupName = other._groupName;
        _mqttPart = other._mqttPart;
        _sync = other._sync;
85
        _keepRunning = false;
86
87
        _minValues = other._minValues;
        _interval = other._interval;
88
        _pendingTasks.store(0);
89
90
91
92
93
94
95
96
97
98
99
100
101
        _timer = nullptr;

        return *this;
    }

    ///@name Getters
    ///@{
    const std::string& 	getGroupName()	const	{ return _groupName; }
    const std::string& 	getMqttPart()	const	{ return _mqttPart; }
    bool				getSync()		const	{ return _sync; }
    unsigned			getMinValues()	const	{ return _minValues; }
    unsigned			getInterval()	const	{ return _interval; }
    ///@}
102

103
104
105
    ///@name Setters
    ///@{
    void setGroupName(const std::string& groupName)	{ _groupName = groupName; }
106
107
108
109
110
111
112
113
114
115
    void setMqttPart(const std::string& mqttPart)	{
        _mqttPart = mqttPart;
        //sanitize mqttPart into uniform /xxxx format
        if (_mqttPart.front() != '/') {
            _mqttPart.insert(0, "/");
        }
        if (_mqttPart.back() == '/') {
            _mqttPart.erase(_mqttPart.size()-1);
        }
    }
116
117
118
119
    void setSync(bool sync)							{ _sync = sync; }
    void setMinValues(unsigned minValues)			{ _minValues = minValues; }
    void setInterval(unsigned interval)				{ _interval = interval; }
    ///@}
120

121
    ///@name Publicly accessible interface methods (implemented in SensorGroupTemplate)
122
    ///@{
123
124
125
126
127
128
129
130
131
132
133
    /**
     * @brief Initialize the sensor group.
     *
     * @details Derived classes overwriting this method must ensure to either
     *          call this method or reproduce its functionality.
     *
     * @param io IO service to initialize the timer with.
     */
    virtual void init(boost::asio::io_service& io) {
        _timer.reset(new boost::asio::deadline_timer(io, boost::posix_time::seconds(0)));
    }
134

135
136
137
138
    /**
     * @brief Start the sensor group (i.e. start collecting data).
     */
    virtual void start() = 0;
139

140
    /**
141
142
     * @brief Stop the sensor group (i.e. stop collecting data).
     */
143
144
145
146
147
148
149
150
151
152
    virtual void stop() = 0;

    /**
     * @brief Add a sensor to this group.
     *
     * @param s Shared pointer to the sensor.
     */
    virtual void pushBackSensor(SBasePtr s)		= 0;

    /**
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
     * @brief Acquire access to all sensors of this group.
     *
     * @details Always release sensor access via releaseSensors() afterwards to
     *          ensure multi-threading safety!
     *          acquireSensors() and releaseSensors() allow for thread-safe
     *          (locked) access to _baseSensors. However, as most plugins do not
     *          require synchronized access (and we want to avoid the induced
     *          lock overhead for them) the actual locking mechanism is only
     *          implemented in plugins actually requiring thread-safe access.
     *          Every caller which may require thread safe access to
     *          _baseSensors should use acquireSensors() and releaseSensors().
     *          If a plugin does not implement a locking mechanism because it
     *          does not require synchronized access releaseSensors() can be
     *          omitted.
     *          See the Caliper plugin for a SensorGroup which implements locked
     *          access to _baseSensors.
169
170
171
     *
     * @return A std::vector with shared pointers to all associated sensors.
     */
172
173
174
175
176
177
178
179
180
181
    virtual std::vector<SBasePtr>& acquireSensors() { return _baseSensors; }

    /**
     * @brief Release previously acquired access to sensors of this group.
     *
     * @details Always acquire sensors via acquireSensors() beforehand!
     *          See description of acquireSensors() for an in-depth reasoning
     *          about this method.
     */
    virtual void releaseSensors() { /* do nothing in base implementation */ }
182

183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
    /**
     * @brief Print interface configuration.
     *
     * @details Derived classes overwriting this method must ensure to either
     *          call this method or reproduce its functionality.
     *
     * @param ll Log severity level to be used from logger.
     */
    virtual void printConfig(LOG_LEVEL ll) {
        LOG_VAR(ll) << "        Sensor Group: " << _groupName;
        if (_mqttPart != "") {
            LOG_VAR(ll) << "            MQTT part: " << _mqttPart;
        }

        if (_sync) {
            LOG_VAR(ll) << "            Synchronized readings enabled";
        } else {
            LOG_VAR(ll) << "            Synchronized readings disabled";
        }

        LOG_VAR(ll) << "            minValues: " << _minValues;
        LOG_VAR(ll) << "            interval:  " << _interval;
    }
206
    ///@}
207

208
protected:
209

210
211
    ///@name Internal methods (implemented in a plugin's SensorGroup)
    ///@{
212
213
214
    /**
     * @brief Read data for all sensors once.
     */
215
216
217
    virtual void read() = 0;

    /**
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
     * @brief Implement plugin specific actions to initialize a group here.
     *
     * @details If a derived class (i.e. a plugin group) requires further custom
     *          actions for initialization, this should be implemented here.
     *          %initGroup() is appropriately called during init().
     */
    virtual void execOnInit() { /* do nothing if not overwritten */ }

    /**
     * @brief Implement plugin specific actions to start a group here.
     *
     * @details If a derived class (i.e. a plugin group) requires further custom
     *          actions to start polling data (e.g. open a file descriptor),
     *          this should be implemented here. %startGroup() is appropriately
     *          called during start().
233
     *
234
     * @return True on success, false otherwise.
235
     */
236
237
238
239
240
241
242
243
244
245
246
    virtual bool execOnStart() { return true; }

    /**
     * @brief Implement plugin specific actions to stop a group here.
     *
     * @details If a derived class (i.e. a plugin group) requires further custom
     *          actions to stop polling data (e.g. close a file descriptor),
     *          this should be implemented here. %stopGroup() is appropriately
     *          called during stop().
     */
    virtual void execOnStop() { /* do nothing if not overwritten */ }
247
248
249
250
251
252
253
254
255
256
257
258

    /**
     * @brief Print information about plugin specific group attributes (or
     *        nothing if no such attributes are present).
     *
     * @details Only overwrite if necessary.
     *
     * @param ll Severity level to log with
     */
    virtual void printGroupConfig(LOG_LEVEL ll) {
        LOG_VAR(ll) << "            No other plugin-specific group attributes defined";
    }
259
	///@}
260

261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
	///@name Utility methods
    ///@{
    /**
     * @brief Does a busy wait until all dispatched handlers are finished
     *        (_pendingTasks == 0).
     *
     * @details If the wait takes longer than a reasonable amount of time we
     *          return anyway, to not block termination of dcdbpusher.
     */
    void wait() {
        unsigned short retries = 3;

        for (unsigned short i = 1; i <= retries; i++) {
            if (_pendingTasks) {
                LOG(info) << "Group " << _groupName << " not yet finished. Waiting... (" << i << "/" << retries << ")";
                sleep((_interval/1000) + 1);
            } else {
                return;
            }
        }

        LOG(warning) << "Group " << _groupName << " will not finish! Skipping it";
    }

    /**
     * @brief Calculate timestamp for the next reading.
     *
     * @return Timestamp in the future to wait for
     */
    uint64_t nextReadingTime() {
        uint64_t now = getTimestamp();
        uint64_t next;
        if (_sync) {
            uint64_t interval64 = static_cast<uint64_t>(_interval);
            uint64_t now_ms = now / 1000 / 1000;
            //synchronize all measurements with other sensors
            uint64_t waitToStart = interval64 - (now_ms%interval64);
            //less than 1 ms seconds is too small, so we wait the entire interval for the next measurement
            if(!waitToStart ){
                    return (now_ms + interval64)*1000*1000;
            }
            return (now_ms + waitToStart)*1000*1000;
        } else {
            return now + MS_TO_NS(_interval);
        }
    }
    ///@}

309
    //TODO unify naming scheme: _groupName --> _name (also refactor getter+setter)
310
311
312
313
314
315
316
317
    std::string _groupName; ///< String name of this group
    std::string	_mqttPart; ///< MQTT part identifying this group
    bool _sync; ///< Should the timer (i.e. the read cycle of this groups) be synchronized with other groups?
    bool _keepRunning; ///< Continue with next reading cycle (i.e. set timer again after reading)?
    unsigned int _minValues; ///< Minimum number of values a sensor should gather before they get pushed (to reduce MQTT overhead)
    unsigned int _interval; ///< Reading interval cycle in milliseconds
    std::atomic_uint _pendingTasks; ///< Number of currently outstanding read operations
    std::unique_ptr<boost::asio::deadline_timer> _timer; ///< Time readings in a periodic interval
318
    std::vector<SBasePtr> _baseSensors; ///< Vector with sensors associated to this group
319
    LOGGER lg; ///< Personal logging instance
320
321
};

322
323
324
//for better readability
using SGroupPtr = std::shared_ptr<SensorGroupInterface>;

325
#endif /* SENSORGROUPINTERFACE_H_ */