2.12.2021, 9:00 - 11:00: Due to updates GitLab may be unavailable for some minutes between 09:00 and 11:00.

sensorbase.h 12.6 KB
Newer Older
1
2
3
//================================================================================
// Name        : SensorBase.h
// Author      : Micha Mueller
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Copyright   : Leibniz Supercomputing Centre
// Description : General sensor base class.
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2018-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
27
28
29
30

#ifndef SRC_SENSORBASE_H_
#define SRC_SENSORBASE_H_

31
#include <fstream>
32
#include <memory>
33
#include <string>
34
#include <limits.h>
35
#include <boost/lockfree/spsc_queue.hpp>
36
#include "logging.h"
Alessio Netti's avatar
Alessio Netti committed
37
#include "cacheentry.h"
38
#include "metadatastore.h"
39

40
41
42
43
44
/**
 * @brief General sensor base class.
 *
 * @ingroup common
 */
45
46
47
48
class SensorBase {
public:
	SensorBase(const std::string& name) :
		_name(name),
49
		_mqtt(""),
50
		_skipConstVal(false),
51
		_publish(true),
52
		_cacheInterval(900000),
53
54
		_subsamplingFactor(1),
		_subsamplingIndex(0),
55
		_factor(1),
56
		_cache(nullptr),
57
		_delta(false),
58
		_deltaMax(LLONG_MAX),
59
		_firstReading(true),
60
61
		_readingQueue(nullptr),
		_metadata(nullptr) {
62

Alessio Netti's avatar
Alessio Netti committed
63
64
        _lastRawUValue.timestamp = 0;
        _lastRawUValue.value     = 0;
65
66
        _lastRawValue.timestamp = 0;
        _lastRawValue.value     = 0;
Alessio Netti's avatar
Alessio Netti committed
67
68
        _latestValue.timestamp	= 0;
        _latestValue.value		= 0;
69
70
        _lastSentValue.timestamp= 0;
        _lastSentValue.value	= 0;
Alessio Netti's avatar
Alessio Netti committed
71
72
        _accumulator.timestamp  = 0;
        _accumulator.value		= 0;
73
74
	}

75
76
77
	SensorBase(const SensorBase& other) :
		_name(other._name),
		_mqtt(other._mqtt),
78
		_skipConstVal(other._skipConstVal),
79
		_publish(other._publish),
80
		_cacheInterval(other._cacheInterval),
81
82
		_subsamplingFactor(other._subsamplingFactor),
		_subsamplingIndex(0),
83
		_factor(other._factor),
84
		_cache(nullptr),
85
		_delta(other._delta),
86
		_deltaMax(other._deltaMax),
87
		_firstReading(true),
Alessio Netti's avatar
Alessio Netti committed
88
		_lastRawUValue(other._lastRawUValue),
Alessio Netti's avatar
Alessio Netti committed
89
		_lastRawValue(other._lastRawValue),
Alessio Netti's avatar
Alessio Netti committed
90
		_latestValue(other._latestValue),
91
        _lastSentValue(other._lastSentValue),
Alessio Netti's avatar
Alessio Netti committed
92
        _accumulator(other._accumulator),
93
94
95
96
		_readingQueue(nullptr) {
	    
        _metadata.reset(other._metadata.get() ? new SensorMetadata(*other._metadata) : nullptr);
	}
97

98
	virtual ~SensorBase() {}
99
100
101
102

	SensorBase& operator=(const SensorBase& other) {
		_name = other._name;
		_mqtt = other._mqtt;
103
		_skipConstVal = other._skipConstVal;
104
		_publish = other._publish;
105
		_cacheInterval = other._cacheInterval;
106
107
		_subsamplingFactor = other._subsamplingFactor;
		_subsamplingIndex = 0;
108
		_factor = other._factor;
109
		_cache.reset(nullptr);
110
		_delta = other._delta;
111
		_deltaMax = other._deltaMax;
112
		_firstReading = true;
113
114
115
116
		_lastRawUValue.timestamp = other._lastRawUValue.timestamp;
        _lastRawUValue.value     = other._lastRawUValue.value;
		_lastRawValue.timestamp = other._lastRawValue.timestamp;
        _lastRawValue.value     = other._lastRawValue.value;
117
118
		_latestValue.timestamp	= other._latestValue.timestamp;
		_latestValue.value		= other._latestValue.value;
119
120
        _lastSentValue.timestamp= other._lastSentValue.timestamp;
        _lastSentValue.value	= other._lastSentValue.value;
Alessio Netti's avatar
Alessio Netti committed
121
122
        _accumulator.timestamp  = other._accumulator.timestamp;
        _accumulator.value      = other._accumulator.value;
123
		_readingQueue.reset(nullptr);
124
        _metadata.reset(other._metadata.get() ? new SensorMetadata(*other._metadata) : nullptr);
125
126

		return *this;
127
128
	}

129
	const bool 				isDelta()			const 	{ return _delta;}
130
	const uint64_t			getDeltaMaxValue()  const 	{ return _deltaMax; }
131
132
	const std::string& 		getName() 			const	{ return _name; }
	const std::string&		getMqtt() 			const	{ return _mqtt; }
133
	bool					getSkipConstVal()	const	{ return _skipConstVal; }
134
135
	bool					getPublish()		const	{ return _publish; }
        bool					getDelta()		const	{ return _delta; }
Alessio Netti's avatar
Alessio Netti committed
136
	unsigned				getCacheInterval()	const	{ return _cacheInterval; }
Alessio Netti's avatar
Alessio Netti committed
137
	int 					getSubsampling()	const   { return _subsamplingFactor; }
138
139
	double					getFactor()		const	{ return _factor; }

Alessio Netti's avatar
Alessio Netti committed
140
	const CacheEntry* const	getCache() 			const	{ return _cache.get(); }
Micha Mueller's avatar
Micha Mueller committed
141
	const reading_t&		getLatestValue()	const	{ return _latestValue; }
Alessio Netti's avatar
Alessio Netti committed
142
	const bool				isInit()			const 	{ return _cache && _readingQueue; }
143

144
145
	// Exposing the reading queue is necessary for publishing sensor data from the collectagent
	boost::lockfree::spsc_queue<reading_t>* getReadingQueue() { return _readingQueue.get(); }
146
147
148
149
	SensorMetadata* getMetadata()						      { return _metadata.get(); }
	
	void	clearMetadata()									{ _metadata.reset(nullptr); }
	void	setMetadata(SensorMetadata& s)					{ _metadata.reset(new SensorMetadata(s)); }
150
	void	setSkipConstVal(bool skipConstVal)				{ _skipConstVal = skipConstVal;	}
151
	void	setPublish(bool pub)							{ _publish = pub; }
152
	void	setDelta(const bool delta) 						{ _delta = delta; }
153
	void	setDeltaMaxValue(const uint64_t maxv)			{ _deltaMax = maxv; }
154
	void	setName(const std::string& name)				{ _name = name; }
155
156
	void	setMqtt(const std::string& mqtt)				{ _mqtt = mqtt; }
	void 	setCacheInterval(unsigned cacheInterval)		{ _cacheInterval = cacheInterval; }
Alessio Netti's avatar
Alessio Netti committed
157
	void	setSubsampling(int factor)						{ _subsamplingFactor = factor; }
158
	void	setFactor(const double &factor)			{ _factor = factor; }
159
160
	void    setLastRaw(int64_t raw)                         { _lastRawValue.value = raw; }
	void    setLastURaw(uint64_t raw)                       { _lastRawUValue.value = raw; }
161
162
163

	const std::size_t	getSizeOfReadingQueue() const { return _readingQueue->read_available(); }
	std::size_t 		popReadingQueue(reading_t *reads, std::size_t max) const	{ return _readingQueue->pop(reads, max); }
Alessio Netti's avatar
Alessio Netti committed
164
	void 				clearReadingQueue() const	{ reading_t buf; while(_readingQueue->pop(buf)) {} }
165
166
	void				pushReadingQueue(reading_t *reads, std::size_t count) const	{ _readingQueue->push(reads, count); }

167
	virtual void initSensor(unsigned interval, unsigned queueLen) {
Alessio Netti's avatar
Alessio Netti committed
168
		uint64_t cacheSize = _cacheInterval / interval + 1;
169
		if(!_cache) {
Alessio Netti's avatar
Alessio Netti committed
170
171
			//TODO: have all time-related configuration parameters use the same unit (e.g. milliseconds)
			_cache.reset(new CacheEntry( (uint64_t)_cacheInterval * 1000000, cacheSize));
172
			_cache->updateBatchSize(1, true);
173
174
		}
		if(!_readingQueue) {
175
			_readingQueue.reset(new boost::lockfree::spsc_queue<reading_t>(queueLen));
176
177
178
		}
	}

179
180
	/**
	 * Store a reading, in order to get it pushed to the data base eventually.
181
	 * Also this method takes care of other optional reading post-processing,
182
183
184
185
186
187
	 * e.g. delta computation, subsampling, caching, scaling, etc.
	 *
	 * This is the primary storeReading() and should be used whenever possible.
	 *
	 * @param rawReading  Reading struct with value and timestamp to be stored.
	 * @param factor      Scaling factor, which is applied to the reading value (optional)
Micha Müller's avatar
Micha Müller committed
188
	 * @param storeGlobal Store reading in reading queue, so that it can get pushed.
189
	 */
190
	void storeReading(reading_t rawReading, double factor=1.0, bool storeGlobal=true) {
191
192
193
194
        reading_t reading = rawReading;
        if( _delta ) {
            if (!_firstReading) {
                if (rawReading.value < _lastRawValue.value)
195
                    reading.value = (rawReading.value + ((int64_t)_deltaMax - _lastRawValue.value)) * factor * _factor;
196
                else
197
                    reading.value = (rawReading.value - _lastRawValue.value) * factor * _factor;
198
199
200
201
202
203
204
205
            } else {
                _firstReading = false;
                _lastRawValue = rawReading;
                return;
            }
            _lastRawValue = rawReading;
        }
        else
206
            reading.value = rawReading.value * factor * _factor;
207

208
209
210
211
		storeReadingLocal(reading);
		if (storeGlobal) {
		    storeReadingGlobal(reading);
		}
212
213
	}

214
215
	/**
     * Store an unsigned reading, in order to get it pushed to the data base eventually.
216
     * Also this method takes care of other optional reading post-processing,
217
218
219
220
221
222
223
224
225
226
227
     * e.g. delta computation, subsampling, caching, scaling, etc.
     *
     * This is a variant of the primary storeReading() for monotonically increasing
     * sensors reading unsigned 64bit values which may require more than the 63bit
     * offered by a signed reading_t. The readings are still stored as signed int64
     * in the database, therefore all such sensors should enable storage of deltas!
     *
     * This variant only adapts the delta computation for ureading_t actually.
     *
     * @param rawReading  Reading struct with (usigned) value and timestamp to be stored.
     * @param factor      Scaling factor, which is applied to the reading value (optional)
Micha Müller's avatar
Micha Müller committed
228
     * @param storeGlobal Store reading in reading queue, so that it can get pushed.
229
     */
230
	void storeReading(ureading_t rawReading, double factor=1.0, bool storeGlobal=true) {
231
232
        reading_t reading;
        reading.timestamp = rawReading.timestamp;
233
234
        if( _delta ) {
            if (!_firstReading) {
235
                if (rawReading.value < _lastRawUValue.value)
236
                    reading.value = (rawReading.value + (_deltaMax - _lastRawUValue.value)) * factor;
237
238
                else
                    reading.value = (rawReading.value - _lastRawUValue.value) * factor;
239
            } else {
240
241
242
                _firstReading = false;
                _lastRawUValue = rawReading;
                return;
243
244
245
246
247
248
            }
            _lastRawUValue = rawReading;
        }
        else
            reading.value = rawReading.value * factor;

249
250
251
252
        storeReadingLocal(reading);
        if (storeGlobal) {
            storeReadingGlobal(reading);
        }
253
254
    }

255
256
257
258
259
260
    /**
     * Store reading within the sensor, but do not put it in the readingQueue
     * so the reading does not get pushed but the caches are still updated.
     */
    inline
    void storeReadingLocal(reading_t reading) {
261
262
        _cache->store(reading);
        _latestValue = reading;
263
    }
264

265
266
267
268
269
270
    /**
     * Store the reading in the readingQueue so it can get pushed.
     */
    inline
    void storeReadingGlobal(reading_t reading) {
        if( _delta )
271
272
273
274
275
            // If in delta mode, _accumulator acts as a buffer, summing all deltas for the subsampling period
            _accumulator.value += reading.value;
        else
            _accumulator.value = reading.value;

Alessio Netti's avatar
Alessio Netti committed
276
        if (_subsamplingFactor>0 && _subsamplingIndex++%_subsamplingFactor==0) {
277
278
279
280
281
282
283
284
285
            _accumulator.timestamp = reading.timestamp;
            //TODO: if sensor starts with values of 0, these won't be pushed. This should be fixed
            if( !(_skipConstVal && (_accumulator.value == _lastSentValue.value)) ) {
                _readingQueue->push(_accumulator);
                _lastSentValue = _accumulator;
            }
            // We reset the accumulator's value for the correct accumulation of deltas
            _accumulator.value = 0;
        }
286
    }
287
    
288
    virtual void printConfig(LOG_LEVEL ll, LOGGER& lg, unsigned leadingSpaces=16) {
289
290
291
292
293
		std::string leading(leadingSpaces, ' ');
		LOG_VAR(ll) << leading << _name;
		if (getSubsampling() != 1) {
			LOG_VAR(ll) << leading << "    SubSampling:      " << getSubsampling();
		}
294
		LOG_VAR(ll) << leading << "    Factor:            " << _factor;
295
296
297
298
299
300
		LOG_VAR(ll) << leading << "    Skip const values: " << (_skipConstVal ? "true" : "false");
		LOG_VAR(ll) << leading << "    Store delta only:  " << (_delta ? "true" : "false");
		if(_delta) {
			LOG_VAR(ll) << leading << "    Maximum value:     " << _deltaMax;
		}
		LOG_VAR(ll) << leading << "    Publish:           " << (_publish ? "true" : "false");
301
    }
302

303
304
protected:

305
306
	std::string _name;
	std::string _mqtt;
307
	bool _skipConstVal;
308
	bool _publish;
309
	unsigned int _cacheInterval;
Alessio Netti's avatar
Alessio Netti committed
310
	int _subsamplingFactor;
311
	unsigned int _subsamplingIndex;
312
	double _factor;
Alessio Netti's avatar
Alessio Netti committed
313
	std::unique_ptr<CacheEntry> _cache;
314
	bool _delta;
315
	uint64_t _deltaMax;
316
	bool _firstReading;
317
	ureading_t _lastRawUValue;
318
	reading_t _lastRawValue;
319
	reading_t _latestValue;
320
	reading_t _lastSentValue;
Alessio Netti's avatar
Alessio Netti committed
321
	reading_t _accumulator;
322
	std::unique_ptr<boost::lockfree::spsc_queue<reading_t>> _readingQueue;
323
	std::unique_ptr<SensorMetadata> _metadata;
324
325
};

326
327
328
//for better readability
using SBasePtr = std::shared_ptr<SensorBase>;

329
#endif /* SRC_SENSORBASE_H_ */