sensorbase.h 12.1 KB
Newer Older
1
2
3
//================================================================================
// Name        : SensorBase.h
// Author      : Micha Mueller
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Copyright   : Leibniz Supercomputing Centre
// Description : General sensor base class.
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2018-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
27
28
29
30

#ifndef SRC_SENSORBASE_H_
#define SRC_SENSORBASE_H_

31
#include <fstream>
32
#include <memory>
33
#include <string>
34
#include <limits.h>
35
#include <boost/lockfree/spsc_queue.hpp>
36
#include "logging.h"
Alessio Netti's avatar
Alessio Netti committed
37
#include "cacheentry.h"
38
#include "metadatastore.h"
39

40
41
42
43
44
/**
 * @brief General sensor base class.
 *
 * @ingroup common
 */
45
46
class SensorBase {
public:
lu43jih's avatar
lu43jih committed
47
48
	static const size_t QUEUE_MAXLIMIT=1024;

49
50
	SensorBase(const std::string& name) :
		_name(name),
51
		_mqtt(""),
52
		_skipConstVal(false),
53
		_cacheInterval(900000),
54
55
		_subsamplingFactor(1),
		_subsamplingIndex(0),
56
		_cache(nullptr),
57
		_delta(false),
58
		_firstReading(true),
59
60
		_readingQueue(nullptr),
		_metadata(nullptr) {
61

Alessio Netti's avatar
Alessio Netti committed
62
63
        _lastRawUValue.timestamp = 0;
        _lastRawUValue.value     = 0;
64
65
        _lastRawValue.timestamp = 0;
        _lastRawValue.value     = 0;
Alessio Netti's avatar
Alessio Netti committed
66
67
        _latestValue.timestamp	= 0;
        _latestValue.value		= 0;
68
69
        _lastSentValue.timestamp= 0;
        _lastSentValue.value	= 0;
Alessio Netti's avatar
Alessio Netti committed
70
71
        _accumulator.timestamp  = 0;
        _accumulator.value		= 0;
72
73
	}

74
75
76
	SensorBase(const SensorBase& other) :
		_name(other._name),
		_mqtt(other._mqtt),
77
		_skipConstVal(other._skipConstVal),
78
		_cacheInterval(other._cacheInterval),
79
80
		_subsamplingFactor(other._subsamplingFactor),
		_subsamplingIndex(0),
81
		_cache(nullptr),
82
		_delta(other._delta),
83
		_firstReading(true),
Alessio Netti's avatar
Alessio Netti committed
84
		_lastRawUValue(other._lastRawUValue),
Alessio Netti's avatar
Alessio Netti committed
85
		_lastRawValue(other._lastRawValue),
Alessio Netti's avatar
Alessio Netti committed
86
		_latestValue(other._latestValue),
87
        _lastSentValue(other._lastSentValue),
Alessio Netti's avatar
Alessio Netti committed
88
        _accumulator(other._accumulator),
89
90
91
92
		_readingQueue(nullptr) {
	    
        _metadata.reset(other._metadata.get() ? new SensorMetadata(*other._metadata) : nullptr);
	}
93

94
	virtual ~SensorBase() {}
95
96
97
98

	SensorBase& operator=(const SensorBase& other) {
		_name = other._name;
		_mqtt = other._mqtt;
99
		_skipConstVal = other._skipConstVal;
100
		_cacheInterval = other._cacheInterval;
101
102
		_subsamplingFactor = other._subsamplingFactor;
		_subsamplingIndex = 0;
103
		_cache.reset(nullptr);
104
		_delta = other._delta;
105
		_firstReading = true;
106
107
108
109
		_lastRawUValue.timestamp = other._lastRawUValue.timestamp;
        _lastRawUValue.value     = other._lastRawUValue.value;
		_lastRawValue.timestamp = other._lastRawValue.timestamp;
        _lastRawValue.value     = other._lastRawValue.value;
110
111
		_latestValue.timestamp	= other._latestValue.timestamp;
		_latestValue.value		= other._latestValue.value;
112
113
        _lastSentValue.timestamp= other._lastSentValue.timestamp;
        _lastSentValue.value	= other._lastSentValue.value;
Alessio Netti's avatar
Alessio Netti committed
114
115
        _accumulator.timestamp  = other._accumulator.timestamp;
        _accumulator.value      = other._accumulator.value;
116
		_readingQueue.reset(nullptr);
117
        _metadata.reset(other._metadata.get() ? new SensorMetadata(*other._metadata) : nullptr);
118
119

		return *this;
120
121
	}

122
	const bool 				isDelta()			const 	{ return _delta;}
123
124
	const std::string& 		getName() 			const	{ return _name; }
	const std::string&		getMqtt() 			const	{ return _mqtt; }
125
	bool					getSkipConstVal()	const	{ return _skipConstVal; }
Alessio Netti's avatar
Alessio Netti committed
126
	unsigned				getCacheInterval()	const	{ return _cacheInterval; }
Alessio Netti's avatar
Alessio Netti committed
127
	int 					getSubsampling()	const   { return _subsamplingFactor; }
Alessio Netti's avatar
Alessio Netti committed
128
	const CacheEntry* const	getCache() 			const	{ return _cache.get(); }
Micha Mueller's avatar
Micha Mueller committed
129
	const reading_t&		getLatestValue()	const	{ return _latestValue; }
Alessio Netti's avatar
Alessio Netti committed
130
	const bool				isInit()			const 	{ return _cache && _readingQueue; }
131

132
133
	// Exposing the reading queue is necessary for publishing sensor data from the collectagent
	boost::lockfree::spsc_queue<reading_t>* getReadingQueue() { return _readingQueue.get(); }
134
135
136
137
	SensorMetadata* getMetadata()						      { return _metadata.get(); }
	
	void	clearMetadata()									{ _metadata.reset(nullptr); }
	void	setMetadata(SensorMetadata& s)					{ _metadata.reset(new SensorMetadata(s)); }
138
	void	setSkipConstVal(bool skipConstVal)				{ _skipConstVal = skipConstVal;	}
139
	void	setDelta(const bool delta) 						{ _delta = delta; }
140
	void	setName(const std::string& name)				{ _name = name; }
141
142
	void	setMqtt(const std::string& mqtt)				{ _mqtt = mqtt; }
	void 	setCacheInterval(unsigned cacheInterval)		{ _cacheInterval = cacheInterval; }
Alessio Netti's avatar
Alessio Netti committed
143
	void	setSubsampling(int factor)						{ _subsamplingFactor = factor; }
144
145
	void    setLastRaw(int64_t raw)                         { _lastRawValue.value = raw; }
	void    setLastURaw(uint64_t raw)                       { _lastRawUValue.value = raw; }
146
147
148

	const std::size_t	getSizeOfReadingQueue() const { return _readingQueue->read_available(); }
	std::size_t 		popReadingQueue(reading_t *reads, std::size_t max) const	{ return _readingQueue->pop(reads, max); }
Alessio Netti's avatar
Alessio Netti committed
149
	void 				clearReadingQueue() const	{ reading_t buf; while(_readingQueue->pop(buf)) {} }
150
151
	void				pushReadingQueue(reading_t *reads, std::size_t count) const	{ _readingQueue->push(reads, count); }

152
	void initSensor(unsigned interval) {
Alessio Netti's avatar
Alessio Netti committed
153
		uint64_t cacheSize = _cacheInterval / interval + 1;
154
		if(!_cache) {
Alessio Netti's avatar
Alessio Netti committed
155
156
			//TODO: have all time-related configuration parameters use the same unit (e.g. milliseconds)
			_cache.reset(new CacheEntry( (uint64_t)_cacheInterval * 1000000, cacheSize));
157
			_cache->updateBatchSize(1, true);
158
159
		}
		if(!_readingQueue) {
lu43jih's avatar
lu43jih committed
160
			_readingQueue.reset(new boost::lockfree::spsc_queue<reading_t>(QUEUE_MAXLIMIT));
161
162
163
		}
	}

164
165
	/**
	 * Store a reading, in order to get it pushed to the data base eventually.
166
	 * Also this method takes care of other optional reading post-processing,
167
168
169
170
171
172
173
174
	 * e.g. delta computation, subsampling, caching, scaling, etc.
	 *
	 * This is the primary storeReading() and should be used whenever possible.
	 *
	 * @param rawReading  Reading struct with value and timestamp to be stored.
	 * @param factor      Scaling factor, which is applied to the reading value (optional)
	 * @param maxValue    Maximum possible value of the reading; required for the
	 *                    delta computation to detect an overflow.
Micha Müller's avatar
Micha Müller committed
175
	 * @param storeGlobal Store reading in reading queue, so that it can get pushed.
176
	 */
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
	void storeReading(reading_t rawReading, double factor=1.0, long long maxValue=LLONG_MAX, bool storeGlobal=true) {
        reading_t reading = rawReading;
        if( _delta ) {
            if (!_firstReading) {
                if (rawReading.value < _lastRawValue.value)
                    reading.value = (rawReading.value + (maxValue - _lastRawValue.value)) * factor;
                else
                    reading.value = (rawReading.value - _lastRawValue.value) * factor;
            } else {
                _firstReading = false;
                _lastRawValue = rawReading;
                return;
            }
            _lastRawValue = rawReading;
        }
        else
            reading.value = rawReading.value * factor;
194

195
196
197
198
		storeReadingLocal(reading);
		if (storeGlobal) {
		    storeReadingGlobal(reading);
		}
199
200
	}

201
202
	/**
     * Store an unsigned reading, in order to get it pushed to the data base eventually.
203
     * Also this method takes care of other optional reading post-processing,
204
205
206
207
208
209
210
211
212
213
214
215
216
     * e.g. delta computation, subsampling, caching, scaling, etc.
     *
     * This is a variant of the primary storeReading() for monotonically increasing
     * sensors reading unsigned 64bit values which may require more than the 63bit
     * offered by a signed reading_t. The readings are still stored as signed int64
     * in the database, therefore all such sensors should enable storage of deltas!
     *
     * This variant only adapts the delta computation for ureading_t actually.
     *
     * @param rawReading  Reading struct with (usigned) value and timestamp to be stored.
     * @param factor      Scaling factor, which is applied to the reading value (optional)
     * @param maxValue    Maximum possible value of the reading; required for the
     *                    delta computation to detect an overflow.
Micha Müller's avatar
Micha Müller committed
217
     * @param storeGlobal Store reading in reading queue, so that it can get pushed.
218
     */
219
	void storeReading(ureading_t rawReading, double factor=1.0, unsigned long long maxValue=ULLONG_MAX, bool storeGlobal=true) {
220
221
        reading_t reading;
        reading.timestamp = rawReading.timestamp;
222
223
        if( _delta ) {
            if (!_firstReading) {
224
225
226
227
                if (rawReading.value < _lastRawUValue.value)
                    reading.value = (rawReading.value + (maxValue - _lastRawUValue.value)) * factor;
                else
                    reading.value = (rawReading.value - _lastRawUValue.value) * factor;
228
            } else {
229
230
231
                _firstReading = false;
                _lastRawUValue = rawReading;
                return;
232
233
234
235
236
237
            }
            _lastRawUValue = rawReading;
        }
        else
            reading.value = rawReading.value * factor;

238
239
240
241
        storeReadingLocal(reading);
        if (storeGlobal) {
            storeReadingGlobal(reading);
        }
242
243
    }

244
245
246
247
248
249
    /**
     * Store reading within the sensor, but do not put it in the readingQueue
     * so the reading does not get pushed but the caches are still updated.
     */
    inline
    void storeReadingLocal(reading_t reading) {
250
251
        _cache->store(reading);
        _latestValue = reading;
252
    }
253

254
255
256
257
258
259
    /**
     * Store the reading in the readingQueue so it can get pushed.
     */
    inline
    void storeReadingGlobal(reading_t reading) {
        if( _delta )
260
261
262
263
264
            // If in delta mode, _accumulator acts as a buffer, summing all deltas for the subsampling period
            _accumulator.value += reading.value;
        else
            _accumulator.value = reading.value;

Alessio Netti's avatar
Alessio Netti committed
265
        if (_subsamplingFactor>0 && _subsamplingIndex++%_subsamplingFactor==0) {
266
267
268
269
270
271
272
273
274
            _accumulator.timestamp = reading.timestamp;
            //TODO: if sensor starts with values of 0, these won't be pushed. This should be fixed
            if( !(_skipConstVal && (_accumulator.value == _lastSentValue.value)) ) {
                _readingQueue->push(_accumulator);
                _lastSentValue = _accumulator;
            }
            // We reset the accumulator's value for the correct accumulation of deltas
            _accumulator.value = 0;
        }
275
    }
276
    
277
278
279
280
281
    virtual void printConfig(LOG_LEVEL ll, LOGGER& lg, unsigned leadingSpaces=16) {
	std::string leading(leadingSpaces, ' ');
	LOG_VAR(ll) << leading << _name;
	if (getSubsampling() != 1) {
	    LOG_VAR(ll) << leading << "    SubSampling:      " << getSubsampling();
282
	}
283
284
285
	LOG_VAR(ll) << leading << "    Skip const values: " << (_skipConstVal ? "true" : "false");
	LOG_VAR(ll) << leading << "    Store delta only:  " << (_delta ? "true" : "false");
    }
286

287
288
protected:

289
290
	std::string _name;
	std::string _mqtt;
291
	bool _skipConstVal;
292
	unsigned int _cacheInterval;
Alessio Netti's avatar
Alessio Netti committed
293
	int _subsamplingFactor;
294
	unsigned int _subsamplingIndex;
Alessio Netti's avatar
Alessio Netti committed
295
	std::unique_ptr<CacheEntry> _cache;
296
	bool _delta;
297
	bool _firstReading;
298
	ureading_t _lastRawUValue;
299
	reading_t _lastRawValue;
300
	reading_t _latestValue;
301
	reading_t _lastSentValue;
Alessio Netti's avatar
Alessio Netti committed
302
	reading_t _accumulator;
303
	std::unique_ptr<boost::lockfree::spsc_queue<reading_t>> _readingQueue;
304
	std::unique_ptr<SensorMetadata> _metadata;
305
306
};

307
308
309
//for better readability
using SBasePtr = std::shared_ptr<SensorBase>;

310
#endif /* SRC_SENSORBASE_H_ */