SensorGroupInterface.h 11.1 KB
Newer Older
1
2
3
//================================================================================
// Name        : SensorGroupInterface.h
// Author      : Micha Mueller
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Copyright   : Leibniz Supercomputing Centre
// Description : Abstract interface defining sensor group functionality.
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2018-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
27

28
29
30
31
/**
 * @defgroup pusherplugins Pusher Plugins
 * @ingroup  pusher
 *
32
33
 * @brief Collection of plugin interfaces, plugin templates summarizing common
 *        logic and the plugins itself.
34
35
 */

36
37
38
#ifndef SENSORGROUPINTERFACE_H_
#define SENSORGROUPINTERFACE_H_

39
40
41
42
#include <atomic>
#include <memory>
#include <boost/asio.hpp>

43
44
#include "logging.h"
#include "sensorbase.h"
45

46
47
48
/**
 * @brief Abstract interface defining sensor group functionality.
 *
49
50
51
 * @details Sensor groups should not implement this interface themselves but
 *          inherit the SensorGroupTemplate instead.
 *
52
53
 * @ingroup pusherplugins
 */
54
class SensorGroupInterface {
55
public:
56

57
58
59
60
61
62
63
64
65
66
    SensorGroupInterface(const std::string& groupName) :
        _groupName(groupName),
        _mqttPart(""),
        _sync(true),
        _keepRunning(false),
        _minValues(1),
        _interval(1000),
        _pendingTasks(0),
        _timer(nullptr) {
    }
67

68
69
70
71
    SensorGroupInterface(const SensorGroupInterface& other) :
        _groupName(other._groupName),
        _mqttPart(other._mqttPart),
        _sync(other._sync),
72
        _keepRunning(false),
73
74
        _minValues(other._minValues),
        _interval(other._interval),
75
        _pendingTasks(0),
76
77
78
79
80
81
82
83
84
        _timer(nullptr) {
    }

    virtual ~SensorGroupInterface() {}

    SensorGroupInterface& operator=(const SensorGroupInterface& other) {
        _groupName = other._groupName;
        _mqttPart = other._mqttPart;
        _sync = other._sync;
85
        _keepRunning = false;
86
87
        _minValues = other._minValues;
        _interval = other._interval;
88
        _pendingTasks.store(0);
89
90
91
92
93
94
95
96
97
98
99
100
101
        _timer = nullptr;

        return *this;
    }

    ///@name Getters
    ///@{
    const std::string& 	getGroupName()	const	{ return _groupName; }
    const std::string& 	getMqttPart()	const	{ return _mqttPart; }
    bool				getSync()		const	{ return _sync; }
    unsigned			getMinValues()	const	{ return _minValues; }
    unsigned			getInterval()	const	{ return _interval; }
    ///@}
102

103
104
105
    ///@name Setters
    ///@{
    void setGroupName(const std::string& groupName)	{ _groupName = groupName; }
106
107
108
109
110
111
112
113
114
115
    void setMqttPart(const std::string& mqttPart)	{
        _mqttPart = mqttPart;
        //sanitize mqttPart into uniform /xxxx format
        if (_mqttPart.front() != '/') {
            _mqttPart.insert(0, "/");
        }
        if (_mqttPart.back() == '/') {
            _mqttPart.erase(_mqttPart.size()-1);
        }
    }
116
117
118
119
    void setSync(bool sync)							{ _sync = sync; }
    void setMinValues(unsigned minValues)			{ _minValues = minValues; }
    void setInterval(unsigned interval)				{ _interval = interval; }
    ///@}
120

121
    ///@name Publicly accessible interface methods (implemented in SensorGroupTemplate)
122
    ///@{
123
124
125
126
127
128
129
130
131
132
133
    /**
     * @brief Initialize the sensor group.
     *
     * @details Derived classes overwriting this method must ensure to either
     *          call this method or reproduce its functionality.
     *
     * @param io IO service to initialize the timer with.
     */
    virtual void init(boost::asio::io_service& io) {
        _timer.reset(new boost::asio::deadline_timer(io, boost::posix_time::seconds(0)));
    }
134
135
136
137
138
139
    
    /**
     * @brief Waits for the termination of the sensor group.
     */
    virtual void wait() = 0;
    
140
141
142
143
    /**
     * @brief Start the sensor group (i.e. start collecting data).
     */
    virtual void start() = 0;
144

145
    /**
146
     * @brief Stop the sensor group (i.e. stop collecting data).
147
148
     * 
     * @details Must be followed by a call to the wait() method.
149
     */
150
    virtual void stop() = 0;
151
    
152
153
154
155
156
157
158
159
    /**
     * @brief Add a sensor to this group.
     *
     * @param s Shared pointer to the sensor.
     */
    virtual void pushBackSensor(SBasePtr s)		= 0;

    /**
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
     * @brief Acquire access to all sensors of this group.
     *
     * @details Always release sensor access via releaseSensors() afterwards to
     *          ensure multi-threading safety!
     *          acquireSensors() and releaseSensors() allow for thread-safe
     *          (locked) access to _baseSensors. However, as most plugins do not
     *          require synchronized access (and we want to avoid the induced
     *          lock overhead for them) the actual locking mechanism is only
     *          implemented in plugins actually requiring thread-safe access.
     *          Every caller which may require thread safe access to
     *          _baseSensors should use acquireSensors() and releaseSensors().
     *          If a plugin does not implement a locking mechanism because it
     *          does not require synchronized access releaseSensors() can be
     *          omitted.
     *          See the Caliper plugin for a SensorGroup which implements locked
     *          access to _baseSensors.
176
177
178
     *
     * @return A std::vector with shared pointers to all associated sensors.
     */
179
180
181
182
183
184
185
186
187
188
    virtual std::vector<SBasePtr>& acquireSensors() { return _baseSensors; }

    /**
     * @brief Release previously acquired access to sensors of this group.
     *
     * @details Always acquire sensors via acquireSensors() beforehand!
     *          See description of acquireSensors() for an in-depth reasoning
     *          about this method.
     */
    virtual void releaseSensors() { /* do nothing in base implementation */ }
189

190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
    /**
     * @brief Print interface configuration.
     *
     * @details Derived classes overwriting this method must ensure to either
     *          call this method or reproduce its functionality.
     *
     * @param ll Log severity level to be used from logger.
     */
    virtual void printConfig(LOG_LEVEL ll) {
        LOG_VAR(ll) << "        Sensor Group: " << _groupName;
        if (_mqttPart != "") {
            LOG_VAR(ll) << "            MQTT part: " << _mqttPart;
        }

        if (_sync) {
            LOG_VAR(ll) << "            Synchronized readings enabled";
        } else {
            LOG_VAR(ll) << "            Synchronized readings disabled";
        }

        LOG_VAR(ll) << "            minValues: " << _minValues;
        LOG_VAR(ll) << "            interval:  " << _interval;
    }
213
    ///@}
214

215
protected:
216

217
218
    ///@name Internal methods (implemented in a plugin's SensorGroup)
    ///@{
219
220
221
    /**
     * @brief Read data for all sensors once.
     */
222
223
224
    virtual void read() = 0;

    /**
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
     * @brief Implement plugin specific actions to initialize a group here.
     *
     * @details If a derived class (i.e. a plugin group) requires further custom
     *          actions for initialization, this should be implemented here.
     *          %initGroup() is appropriately called during init().
     */
    virtual void execOnInit() { /* do nothing if not overwritten */ }

    /**
     * @brief Implement plugin specific actions to start a group here.
     *
     * @details If a derived class (i.e. a plugin group) requires further custom
     *          actions to start polling data (e.g. open a file descriptor),
     *          this should be implemented here. %startGroup() is appropriately
     *          called during start().
240
     *
241
     * @return True on success, false otherwise.
242
     */
243
244
245
246
247
248
249
250
251
252
253
    virtual bool execOnStart() { return true; }

    /**
     * @brief Implement plugin specific actions to stop a group here.
     *
     * @details If a derived class (i.e. a plugin group) requires further custom
     *          actions to stop polling data (e.g. close a file descriptor),
     *          this should be implemented here. %stopGroup() is appropriately
     *          called during stop().
     */
    virtual void execOnStop() { /* do nothing if not overwritten */ }
254
255
256
257
258
259
260
261
262
263
264
265

    /**
     * @brief Print information about plugin specific group attributes (or
     *        nothing if no such attributes are present).
     *
     * @details Only overwrite if necessary.
     *
     * @param ll Severity level to log with
     */
    virtual void printGroupConfig(LOG_LEVEL ll) {
        LOG_VAR(ll) << "            No other plugin-specific group attributes defined";
    }
266
	///@}
267

268
    ///@name Utility methods
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
    ///@{
    /**
     * @brief Calculate timestamp for the next reading.
     *
     * @return Timestamp in the future to wait for
     */
    uint64_t nextReadingTime() {
        uint64_t now = getTimestamp();
        uint64_t next;
        if (_sync) {
            uint64_t interval64 = static_cast<uint64_t>(_interval);
            uint64_t now_ms = now / 1000 / 1000;
            //synchronize all measurements with other sensors
            uint64_t waitToStart = interval64 - (now_ms%interval64);
            //less than 1 ms seconds is too small, so we wait the entire interval for the next measurement
            if(!waitToStart ){
                    return (now_ms + interval64)*1000*1000;
            }
            return (now_ms + waitToStart)*1000*1000;
        } else {
            return now + MS_TO_NS(_interval);
        }
    }
    ///@}

294
    //TODO unify naming scheme: _groupName --> _name (also refactor getter+setter)
295
296
297
298
299
300
301
302
    std::string _groupName; ///< String name of this group
    std::string	_mqttPart; ///< MQTT part identifying this group
    bool _sync; ///< Should the timer (i.e. the read cycle of this groups) be synchronized with other groups?
    bool _keepRunning; ///< Continue with next reading cycle (i.e. set timer again after reading)?
    unsigned int _minValues; ///< Minimum number of values a sensor should gather before they get pushed (to reduce MQTT overhead)
    unsigned int _interval; ///< Reading interval cycle in milliseconds
    std::atomic_uint _pendingTasks; ///< Number of currently outstanding read operations
    std::unique_ptr<boost::asio::deadline_timer> _timer; ///< Time readings in a periodic interval
303
    std::vector<SBasePtr> _baseSensors; ///< Vector with sensors associated to this group
304
    LOGGER lg; ///< Personal logging instance
305
306
};

307
308
309
//for better readability
using SGroupPtr = std::shared_ptr<SensorGroupInterface>;

310
#endif /* SENSORGROUPINTERFACE_H_ */