SensorGroupInterface.h 9.06 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//================================================================================
// Name        : SensorGroupInterface.h
// Author      : Micha Mueller
// Copyright   : Leibniz Supercomputing Centre
// Description : Abstract interface defining sensor group functionality.
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2018-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
26

27
28
29
30
/**
 * @defgroup pusherplugins Pusher Plugins
 * @ingroup  pusher
 *
31
32
 * @brief Collection of plugin interfaces, plugin templates summarizing common
 *        logic and the plugins itself.
33
34
 */

35
36
37
#ifndef SENSORGROUPINTERFACE_H_
#define SENSORGROUPINTERFACE_H_

38
39
40
41
#include <atomic>
#include <memory>
#include <boost/asio.hpp>

42
43
#include "logging.h"
#include "sensorbase.h"
44

45
46
47
/**
 * @brief Abstract interface defining sensor group functionality.
 *
48
49
50
 * @details Sensor groups should not implement this interface themselves but
 *          inherit the SensorGroupTemplate instead.
 *
51
52
 * @ingroup pusherplugins
 */
53
class SensorGroupInterface {
54
public:
55

56
57
58
59
60
61
62
63
64
65
    SensorGroupInterface(const std::string& groupName) :
        _groupName(groupName),
        _mqttPart(""),
        _sync(true),
        _keepRunning(false),
        _minValues(1),
        _interval(1000),
        _pendingTasks(0),
        _timer(nullptr) {
    }
66

67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
    SensorGroupInterface(const SensorGroupInterface& other) :
        _groupName(other._groupName),
        _mqttPart(other._mqttPart),
        _sync(other._sync),
        _keepRunning(other._keepRunning),
        _minValues(other._minValues),
        _interval(other._interval),
        _timer(nullptr) {
        _pendingTasks.store(other._pendingTasks.load());
    }

    virtual ~SensorGroupInterface() {}

    SensorGroupInterface& operator=(const SensorGroupInterface& other) {
        _groupName = other._groupName;
        _mqttPart = other._mqttPart;
        _sync = other._sync;
        _keepRunning = other._keepRunning;
        _minValues = other._minValues;
        _interval = other._interval;
        _pendingTasks.store(other._pendingTasks.load());
        _timer = nullptr;

        return *this;
    }

    ///@name Getters
    ///@{
    const std::string& 	getGroupName()	const	{ return _groupName; }
    const std::string& 	getMqttPart()	const	{ return _mqttPart; }
    bool				getSync()		const	{ return _sync; }
    unsigned			getMinValues()	const	{ return _minValues; }
    unsigned			getInterval()	const	{ return _interval; }
    ///@}
101

102
103
104
    ///@name Setters
    ///@{
    void setGroupName(const std::string& groupName)	{ _groupName = groupName; }
105
106
107
108
109
110
111
112
113
114
    void setMqttPart(const std::string& mqttPart)	{
        _mqttPart = mqttPart;
        //sanitize mqttPart into uniform /xxxx format
        if (_mqttPart.front() != '/') {
            _mqttPart.insert(0, "/");
        }
        if (_mqttPart.back() == '/') {
            _mqttPart.erase(_mqttPart.size()-1);
        }
    }
115
116
117
118
    void setSync(bool sync)							{ _sync = sync; }
    void setMinValues(unsigned minValues)			{ _minValues = minValues; }
    void setInterval(unsigned interval)				{ _interval = interval; }
    ///@}
119

120
121
    ///@name Interface methods
    ///@{
122
123
124
125
126
127
128
129
130
131
132
    /**
     * @brief Initialize the sensor group.
     *
     * @details Derived classes overwriting this method must ensure to either
     *          call this method or reproduce its functionality.
     *
     * @param io IO service to initialize the timer with.
     */
    virtual void init(boost::asio::io_service& io) {
        _timer.reset(new boost::asio::deadline_timer(io, boost::posix_time::seconds(0)));
    }
133

134
135
136
137
    /**
     * @brief Start the sensor group (i.e. start collecting data).
     */
    virtual void start() = 0;
138

139
    /**
140
141
     * @brief Stop the sensor group (i.e. stop collecting data).
     */
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
    virtual void stop() = 0;

    /**
     * @brief Add a sensor to this group.
     *
     * @param s Shared pointer to the sensor.
     */
    virtual void pushBackSensor(SBasePtr s)		= 0;

    /**
     * @brief Retrieve all sensors of this group.
     *
     * @return A std::vector with shared pointers to all associated sensors.
     */
    virtual std::vector<SBasePtr>& getSensors()	= 0;
157

158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
    /**
     * @brief Print interface configuration.
     *
     * @details Derived classes overwriting this method must ensure to either
     *          call this method or reproduce its functionality.
     *
     * @param ll Log severity level to be used from logger.
     */
    virtual void printConfig(LOG_LEVEL ll) {
        LOG_VAR(ll) << "        Sensor Group: " << _groupName;
        if (_mqttPart != "") {
            LOG_VAR(ll) << "            MQTT part: " << _mqttPart;
        }

        if (_sync) {
            LOG_VAR(ll) << "            Synchronized readings enabled";
        } else {
            LOG_VAR(ll) << "            Synchronized readings disabled";
        }

        LOG_VAR(ll) << "            minValues: " << _minValues;
        LOG_VAR(ll) << "            interval:  " << _interval;
    }

182
protected:
183
184
185
186

    /**
     * @brief Read data for all sensors once.
     */
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
    virtual void read() = 0;

    /**
     * @brief Asynchronous callback if _timer expires.
     *
     * @details Issues a read() and sets the timer again if _keepRunning is true.
     */
    virtual void readAsync() = 0;

    /**
     * @brief Print information about plugin specific group attributes (or
     *        nothing if no such attributes are present).
     *
     * @details Only overwrite if necessary.
     *
     * @param ll Severity level to log with
     */
    virtual void printGroupConfig(LOG_LEVEL ll) {
        LOG_VAR(ll) << "            No other plugin-specific group attributes defined";
    }
207
	///@}
208

209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
	///@name Utility methods
    ///@{
public:
    /**
     * @brief Does a busy wait until all dispatched handlers are finished
     *        (_pendingTasks == 0).
     *
     * @details If the wait takes longer than a reasonable amount of time we
     *          return anyway, to not block termination of dcdbpusher.
     */
    void wait() {
        unsigned short retries = 3;

        for (unsigned short i = 1; i <= retries; i++) {
            if (_pendingTasks) {
                LOG(info) << "Group " << _groupName << " not yet finished. Waiting... (" << i << "/" << retries << ")";
                sleep((_interval/1000) + 1);
            } else {
                return;
            }
        }

        LOG(warning) << "Group " << _groupName << " will not finish! Skipping it";
    }

protected:
    /**
     * @brief Calculate timestamp for the next reading.
     *
     * @return Timestamp in the future to wait for
     */
    uint64_t nextReadingTime() {
        uint64_t now = getTimestamp();
        uint64_t next;
        if (_sync) {
            uint64_t interval64 = static_cast<uint64_t>(_interval);
            uint64_t now_ms = now / 1000 / 1000;
            //synchronize all measurements with other sensors
            uint64_t waitToStart = interval64 - (now_ms%interval64);
            //less than 1 ms seconds is too small, so we wait the entire interval for the next measurement
            if(!waitToStart ){
                    return (now_ms + interval64)*1000*1000;
            }
            return (now_ms + waitToStart)*1000*1000;
        } else {
            return now + MS_TO_NS(_interval);
        }
    }
    ///@}

259
    //TODO unify naming scheme: _groupName --> _name (also refactor getter+setter)
260
261
262
263
264
265
266
267
268
    std::string _groupName; ///< String name of this group
    std::string	_mqttPart; ///< MQTT part identifying this group
    bool _sync; ///< Should the timer (i.e. the read cycle of this groups) be synchronized with other groups?
    bool _keepRunning; ///< Continue with next reading cycle (i.e. set timer again after reading)?
    unsigned int _minValues; ///< Minimum number of values a sensor should gather before they get pushed (to reduce MQTT overhead)
    unsigned int _interval; ///< Reading interval cycle in milliseconds
    std::atomic_uint _pendingTasks; ///< Number of currently outstanding read operations
    std::unique_ptr<boost::asio::deadline_timer> _timer; ///< Time readings in a periodic interval
    LOGGER lg; ///< Personal logging instance
269
270
};

271
272
273
//for better readability
using SGroupPtr = std::shared_ptr<SensorGroupInterface>;

274
#endif /* SENSORGROUPINTERFACE_H_ */