2.12.2021, 9:00 - 11:00: Due to updates GitLab may be unavailable for some minutes between 09:00 and 11:00.

sensorbase.h 12.4 KB
Newer Older
1
2
3
//================================================================================
// Name        : SensorBase.h
// Author      : Micha Mueller
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Copyright   : Leibniz Supercomputing Centre
// Description : General sensor base class.
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2018-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
27
28
29
30

#ifndef SRC_SENSORBASE_H_
#define SRC_SENSORBASE_H_

31
#include <fstream>
32
#include <memory>
33
#include <string>
34
#include <limits.h>
35
#include <boost/lockfree/spsc_queue.hpp>
36
#include "logging.h"
Alessio Netti's avatar
Alessio Netti committed
37
#include "cacheentry.h"
38
#include "metadatastore.h"
39

40
41
42
43
44
/**
 * @brief General sensor base class.
 *
 * @ingroup common
 */
45
46
47
48
class SensorBase {
public:
	SensorBase(const std::string& name) :
		_name(name),
49
		_mqtt(""),
50
		_skipConstVal(false),
51
		_publish(true),
52
		_cacheInterval(900000),
53
54
		_subsamplingFactor(1),
		_subsamplingIndex(0),
55
		_cache(nullptr),
56
		_delta(false),
57
		_deltaMax(LLONG_MAX),
58
		_firstReading(true),
59
60
		_readingQueue(nullptr),
		_metadata(nullptr) {
61

Alessio Netti's avatar
Alessio Netti committed
62
63
        _lastRawUValue.timestamp = 0;
        _lastRawUValue.value     = 0;
64
65
        _lastRawValue.timestamp = 0;
        _lastRawValue.value     = 0;
Alessio Netti's avatar
Alessio Netti committed
66
67
        _latestValue.timestamp	= 0;
        _latestValue.value		= 0;
68
69
        _lastSentValue.timestamp= 0;
        _lastSentValue.value	= 0;
Alessio Netti's avatar
Alessio Netti committed
70
71
        _accumulator.timestamp  = 0;
        _accumulator.value		= 0;
72
73
	}

74
75
76
	SensorBase(const SensorBase& other) :
		_name(other._name),
		_mqtt(other._mqtt),
77
		_skipConstVal(other._skipConstVal),
78
		_publish(other._publish),
79
		_cacheInterval(other._cacheInterval),
80
81
		_subsamplingFactor(other._subsamplingFactor),
		_subsamplingIndex(0),
82
		_cache(nullptr),
83
		_delta(other._delta),
84
		_deltaMax(other._deltaMax),
85
		_firstReading(true),
Alessio Netti's avatar
Alessio Netti committed
86
		_lastRawUValue(other._lastRawUValue),
Alessio Netti's avatar
Alessio Netti committed
87
		_lastRawValue(other._lastRawValue),
Alessio Netti's avatar
Alessio Netti committed
88
		_latestValue(other._latestValue),
89
        _lastSentValue(other._lastSentValue),
Alessio Netti's avatar
Alessio Netti committed
90
        _accumulator(other._accumulator),
91
92
93
94
		_readingQueue(nullptr) {
	    
        _metadata.reset(other._metadata.get() ? new SensorMetadata(*other._metadata) : nullptr);
	}
95

96
	virtual ~SensorBase() {}
97
98
99
100

	SensorBase& operator=(const SensorBase& other) {
		_name = other._name;
		_mqtt = other._mqtt;
101
		_skipConstVal = other._skipConstVal;
102
		_publish = other._publish;
103
		_cacheInterval = other._cacheInterval;
104
105
		_subsamplingFactor = other._subsamplingFactor;
		_subsamplingIndex = 0;
106
		_cache.reset(nullptr);
107
		_delta = other._delta;
108
		_deltaMax = other._deltaMax;
109
		_firstReading = true;
110
111
112
113
		_lastRawUValue.timestamp = other._lastRawUValue.timestamp;
        _lastRawUValue.value     = other._lastRawUValue.value;
		_lastRawValue.timestamp = other._lastRawValue.timestamp;
        _lastRawValue.value     = other._lastRawValue.value;
114
115
		_latestValue.timestamp	= other._latestValue.timestamp;
		_latestValue.value		= other._latestValue.value;
116
117
        _lastSentValue.timestamp= other._lastSentValue.timestamp;
        _lastSentValue.value	= other._lastSentValue.value;
Alessio Netti's avatar
Alessio Netti committed
118
119
        _accumulator.timestamp  = other._accumulator.timestamp;
        _accumulator.value      = other._accumulator.value;
120
		_readingQueue.reset(nullptr);
121
        _metadata.reset(other._metadata.get() ? new SensorMetadata(*other._metadata) : nullptr);
122
123

		return *this;
124
125
	}

126
	const bool 				isDelta()			const 	{ return _delta;}
127
	const uint64_t			getDeltaMaxValue()  const 	{ return _deltaMax; }
128
129
	const std::string& 		getName() 			const	{ return _name; }
	const std::string&		getMqtt() 			const	{ return _mqtt; }
130
	bool					getSkipConstVal()	const	{ return _skipConstVal; }
131
132
	bool					getPublish()		const	{ return _publish; }
        bool					getDelta()		const	{ return _delta; }
Alessio Netti's avatar
Alessio Netti committed
133
	unsigned				getCacheInterval()	const	{ return _cacheInterval; }
Alessio Netti's avatar
Alessio Netti committed
134
	int 					getSubsampling()	const   { return _subsamplingFactor; }
Alessio Netti's avatar
Alessio Netti committed
135
	const CacheEntry* const	getCache() 			const	{ return _cache.get(); }
Micha Mueller's avatar
Micha Mueller committed
136
	const reading_t&		getLatestValue()	const	{ return _latestValue; }
Alessio Netti's avatar
Alessio Netti committed
137
	const bool				isInit()			const 	{ return _cache && _readingQueue; }
138

139
140
	// Exposing the reading queue is necessary for publishing sensor data from the collectagent
	boost::lockfree::spsc_queue<reading_t>* getReadingQueue() { return _readingQueue.get(); }
141
142
143
144
	SensorMetadata* getMetadata()						      { return _metadata.get(); }
	
	void	clearMetadata()									{ _metadata.reset(nullptr); }
	void	setMetadata(SensorMetadata& s)					{ _metadata.reset(new SensorMetadata(s)); }
145
	void	setSkipConstVal(bool skipConstVal)				{ _skipConstVal = skipConstVal;	}
146
	void	setPublish(bool pub)							{ _publish = pub; }
147
	void	setDelta(const bool delta) 						{ _delta = delta; }
148
	void	setDeltaMaxValue(const uint64_t maxv)			{ _deltaMax = maxv; }
149
	void	setName(const std::string& name)				{ _name = name; }
150
151
	void	setMqtt(const std::string& mqtt)				{ _mqtt = mqtt; }
	void 	setCacheInterval(unsigned cacheInterval)		{ _cacheInterval = cacheInterval; }
Alessio Netti's avatar
Alessio Netti committed
152
	void	setSubsampling(int factor)						{ _subsamplingFactor = factor; }
153
154
	void    setLastRaw(int64_t raw)                         { _lastRawValue.value = raw; }
	void    setLastURaw(uint64_t raw)                       { _lastRawUValue.value = raw; }
155
156
157

	const std::size_t	getSizeOfReadingQueue() const { return _readingQueue->read_available(); }
	std::size_t 		popReadingQueue(reading_t *reads, std::size_t max) const	{ return _readingQueue->pop(reads, max); }
Alessio Netti's avatar
Alessio Netti committed
158
	void 				clearReadingQueue() const	{ reading_t buf; while(_readingQueue->pop(buf)) {} }
159
160
	void				pushReadingQueue(reading_t *reads, std::size_t count) const	{ _readingQueue->push(reads, count); }

161
	virtual void initSensor(unsigned interval, unsigned queueLen) {
Alessio Netti's avatar
Alessio Netti committed
162
		uint64_t cacheSize = _cacheInterval / interval + 1;
163
		if(!_cache) {
Alessio Netti's avatar
Alessio Netti committed
164
165
			//TODO: have all time-related configuration parameters use the same unit (e.g. milliseconds)
			_cache.reset(new CacheEntry( (uint64_t)_cacheInterval * 1000000, cacheSize));
166
			_cache->updateBatchSize(1, true);
167
168
		}
		if(!_readingQueue) {
169
			_readingQueue.reset(new boost::lockfree::spsc_queue<reading_t>(queueLen));
170
171
172
		}
	}

173
174
	/**
	 * Store a reading, in order to get it pushed to the data base eventually.
175
	 * Also this method takes care of other optional reading post-processing,
176
177
178
179
180
181
	 * e.g. delta computation, subsampling, caching, scaling, etc.
	 *
	 * This is the primary storeReading() and should be used whenever possible.
	 *
	 * @param rawReading  Reading struct with value and timestamp to be stored.
	 * @param factor      Scaling factor, which is applied to the reading value (optional)
Micha Müller's avatar
Micha Müller committed
182
	 * @param storeGlobal Store reading in reading queue, so that it can get pushed.
183
	 */
184
	void storeReading(reading_t rawReading, double factor=1.0, bool storeGlobal=true) {
185
186
187
188
        reading_t reading = rawReading;
        if( _delta ) {
            if (!_firstReading) {
                if (rawReading.value < _lastRawValue.value)
189
                    reading.value = (rawReading.value + ((int64_t)_deltaMax - _lastRawValue.value)) * factor;
190
191
192
193
194
195
196
197
198
199
200
                else
                    reading.value = (rawReading.value - _lastRawValue.value) * factor;
            } else {
                _firstReading = false;
                _lastRawValue = rawReading;
                return;
            }
            _lastRawValue = rawReading;
        }
        else
            reading.value = rawReading.value * factor;
201

202
203
204
205
		storeReadingLocal(reading);
		if (storeGlobal) {
		    storeReadingGlobal(reading);
		}
206
207
	}

208
209
	/**
     * Store an unsigned reading, in order to get it pushed to the data base eventually.
210
     * Also this method takes care of other optional reading post-processing,
211
212
213
214
215
216
217
218
219
220
221
     * e.g. delta computation, subsampling, caching, scaling, etc.
     *
     * This is a variant of the primary storeReading() for monotonically increasing
     * sensors reading unsigned 64bit values which may require more than the 63bit
     * offered by a signed reading_t. The readings are still stored as signed int64
     * in the database, therefore all such sensors should enable storage of deltas!
     *
     * This variant only adapts the delta computation for ureading_t actually.
     *
     * @param rawReading  Reading struct with (usigned) value and timestamp to be stored.
     * @param factor      Scaling factor, which is applied to the reading value (optional)
Micha Müller's avatar
Micha Müller committed
222
     * @param storeGlobal Store reading in reading queue, so that it can get pushed.
223
     */
224
	void storeReading(ureading_t rawReading, double factor=1.0, bool storeGlobal=true) {
225
226
        reading_t reading;
        reading.timestamp = rawReading.timestamp;
227
228
        if( _delta ) {
            if (!_firstReading) {
229
                if (rawReading.value < _lastRawUValue.value)
230
                    reading.value = (rawReading.value + (_deltaMax - _lastRawUValue.value)) * factor;
231
232
                else
                    reading.value = (rawReading.value - _lastRawUValue.value) * factor;
233
            } else {
234
235
236
                _firstReading = false;
                _lastRawUValue = rawReading;
                return;
237
238
239
240
241
242
            }
            _lastRawUValue = rawReading;
        }
        else
            reading.value = rawReading.value * factor;

243
244
245
246
        storeReadingLocal(reading);
        if (storeGlobal) {
            storeReadingGlobal(reading);
        }
247
248
    }

249
250
251
252
253
254
    /**
     * Store reading within the sensor, but do not put it in the readingQueue
     * so the reading does not get pushed but the caches are still updated.
     */
    inline
    void storeReadingLocal(reading_t reading) {
255
256
        _cache->store(reading);
        _latestValue = reading;
257
    }
258

259
260
261
262
263
264
    /**
     * Store the reading in the readingQueue so it can get pushed.
     */
    inline
    void storeReadingGlobal(reading_t reading) {
        if( _delta )
265
266
267
268
269
            // If in delta mode, _accumulator acts as a buffer, summing all deltas for the subsampling period
            _accumulator.value += reading.value;
        else
            _accumulator.value = reading.value;

Alessio Netti's avatar
Alessio Netti committed
270
        if (_subsamplingFactor>0 && _subsamplingIndex++%_subsamplingFactor==0) {
271
272
273
274
275
276
277
278
279
            _accumulator.timestamp = reading.timestamp;
            //TODO: if sensor starts with values of 0, these won't be pushed. This should be fixed
            if( !(_skipConstVal && (_accumulator.value == _lastSentValue.value)) ) {
                _readingQueue->push(_accumulator);
                _lastSentValue = _accumulator;
            }
            // We reset the accumulator's value for the correct accumulation of deltas
            _accumulator.value = 0;
        }
280
    }
281
    
282
    virtual void printConfig(LOG_LEVEL ll, LOGGER& lg, unsigned leadingSpaces=16) {
283
284
285
286
287
288
289
290
291
292
293
		std::string leading(leadingSpaces, ' ');
		LOG_VAR(ll) << leading << _name;
		if (getSubsampling() != 1) {
			LOG_VAR(ll) << leading << "    SubSampling:      " << getSubsampling();
		}
		LOG_VAR(ll) << leading << "    Skip const values: " << (_skipConstVal ? "true" : "false");
		LOG_VAR(ll) << leading << "    Store delta only:  " << (_delta ? "true" : "false");
		if(_delta) {
			LOG_VAR(ll) << leading << "    Maximum value:     " << _deltaMax;
		}
		LOG_VAR(ll) << leading << "    Publish:           " << (_publish ? "true" : "false");
294
    }
295

296
297
protected:

298
299
	std::string _name;
	std::string _mqtt;
300
	bool _skipConstVal;
301
	bool _publish;
302
	unsigned int _cacheInterval;
Alessio Netti's avatar
Alessio Netti committed
303
	int _subsamplingFactor;
304
	unsigned int _subsamplingIndex;
Alessio Netti's avatar
Alessio Netti committed
305
	std::unique_ptr<CacheEntry> _cache;
306
	bool _delta;
307
	uint64_t _deltaMax;
308
	bool _firstReading;
309
	ureading_t _lastRawUValue;
310
	reading_t _lastRawValue;
311
	reading_t _latestValue;
312
	reading_t _lastSentValue;
Alessio Netti's avatar
Alessio Netti committed
313
	reading_t _accumulator;
314
	std::unique_ptr<boost::lockfree::spsc_queue<reading_t>> _readingQueue;
315
	std::unique_ptr<SensorMetadata> _metadata;
316
317
};

318
319
320
//for better readability
using SBasePtr = std::shared_ptr<SensorBase>;

321
#endif /* SRC_SENSORBASE_H_ */