Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

sensorbase.h 12.4 KB
Newer Older
1
2
3
//================================================================================
// Name        : SensorBase.h
// Author      : Micha Mueller
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Copyright   : Leibniz Supercomputing Centre
// Description : General sensor base class.
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2018-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
27
28
29
30

#ifndef SRC_SENSORBASE_H_
#define SRC_SENSORBASE_H_

31
#include <fstream>
32
#include <memory>
33
#include <string>
34
#include <limits.h>
35
#include <boost/lockfree/spsc_queue.hpp>
36
#include "logging.h"
Alessio Netti's avatar
Alessio Netti committed
37
#include "cacheentry.h"
38
#include "metadatastore.h"
39

40
41
42
43
44
/**
 * @brief General sensor base class.
 *
 * @ingroup common
 */
45
46
47
48
class SensorBase {
public:
	SensorBase(const std::string& name) :
		_name(name),
49
		_mqtt(""),
50
		_skipConstVal(false),
51
		_publish(true),
52
		_cacheInterval(900000),
53
54
		_subsamplingFactor(1),
		_subsamplingIndex(0),
55
		_cache(nullptr),
56
		_delta(false),
57
		_deltaMax(LLONG_MAX),
58
		_firstReading(true),
59
60
		_readingQueue(nullptr),
		_metadata(nullptr) {
61

Alessio Netti's avatar
Alessio Netti committed
62
63
        _lastRawUValue.timestamp = 0;
        _lastRawUValue.value     = 0;
64
65
        _lastRawValue.timestamp = 0;
        _lastRawValue.value     = 0;
Alessio Netti's avatar
Alessio Netti committed
66
67
        _latestValue.timestamp	= 0;
        _latestValue.value		= 0;
68
69
        _lastSentValue.timestamp= 0;
        _lastSentValue.value	= 0;
Alessio Netti's avatar
Alessio Netti committed
70
71
        _accumulator.timestamp  = 0;
        _accumulator.value		= 0;
72
73
	}

74
75
76
	SensorBase(const SensorBase& other) :
		_name(other._name),
		_mqtt(other._mqtt),
77
		_skipConstVal(other._skipConstVal),
78
		_publish(other._publish),
79
		_cacheInterval(other._cacheInterval),
80
81
		_subsamplingFactor(other._subsamplingFactor),
		_subsamplingIndex(0),
82
		_cache(nullptr),
83
		_delta(other._delta),
84
		_deltaMax(other._deltaMax),
85
		_firstReading(true),
Alessio Netti's avatar
Alessio Netti committed
86
		_lastRawUValue(other._lastRawUValue),
Alessio Netti's avatar
Alessio Netti committed
87
		_lastRawValue(other._lastRawValue),
Alessio Netti's avatar
Alessio Netti committed
88
		_latestValue(other._latestValue),
89
        _lastSentValue(other._lastSentValue),
Alessio Netti's avatar
Alessio Netti committed
90
        _accumulator(other._accumulator),
91
92
93
94
		_readingQueue(nullptr) {
	    
        _metadata.reset(other._metadata.get() ? new SensorMetadata(*other._metadata) : nullptr);
	}
95

96
	virtual ~SensorBase() {}
97
98
99
100

	SensorBase& operator=(const SensorBase& other) {
		_name = other._name;
		_mqtt = other._mqtt;
101
		_skipConstVal = other._skipConstVal;
102
		_publish = other._publish;
103
		_cacheInterval = other._cacheInterval;
104
105
		_subsamplingFactor = other._subsamplingFactor;
		_subsamplingIndex = 0;
106
		_cache.reset(nullptr);
107
		_delta = other._delta;
108
		_deltaMax = other._deltaMax;
109
		_firstReading = true;
110
111
112
113
		_lastRawUValue.timestamp = other._lastRawUValue.timestamp;
        _lastRawUValue.value     = other._lastRawUValue.value;
		_lastRawValue.timestamp = other._lastRawValue.timestamp;
        _lastRawValue.value     = other._lastRawValue.value;
114
115
		_latestValue.timestamp	= other._latestValue.timestamp;
		_latestValue.value		= other._latestValue.value;
116
117
        _lastSentValue.timestamp= other._lastSentValue.timestamp;
        _lastSentValue.value	= other._lastSentValue.value;
Alessio Netti's avatar
Alessio Netti committed
118
119
        _accumulator.timestamp  = other._accumulator.timestamp;
        _accumulator.value      = other._accumulator.value;
120
		_readingQueue.reset(nullptr);
121
        _metadata.reset(other._metadata.get() ? new SensorMetadata(*other._metadata) : nullptr);
122
123

		return *this;
124
125
	}

126
	const bool 				isDelta()			const 	{ return _delta;}
127
	const uint64_t			getDeltaMaxValue()  const 	{ return _deltaMax; }
128
129
	const std::string& 		getName() 			const	{ return _name; }
	const std::string&		getMqtt() 			const	{ return _mqtt; }
130
	bool					getSkipConstVal()	const	{ return _skipConstVal; }
131
132
	bool					getPublish()		const	{ return _publish; }
        bool					getDelta()		const	{ return _delta; }
Alessio Netti's avatar
Alessio Netti committed
133
	unsigned				getCacheInterval()	const	{ return _cacheInterval; }
Alessio Netti's avatar
Alessio Netti committed
134
	int 					getSubsampling()	const   { return _subsamplingFactor; }
Alessio Netti's avatar
Alessio Netti committed
135
	const CacheEntry* const	getCache() 			const	{ return _cache.get(); }
Micha Mueller's avatar
Micha Mueller committed
136
	const reading_t&		getLatestValue()	const	{ return _latestValue; }
Alessio Netti's avatar
Alessio Netti committed
137
	const bool				isInit()			const 	{ return _cache && _readingQueue; }
138

139
140
	// Exposing the reading queue is necessary for publishing sensor data from the collectagent
	boost::lockfree::spsc_queue<reading_t>* getReadingQueue() { return _readingQueue.get(); }
141
142
143
144
	SensorMetadata* getMetadata()						      { return _metadata.get(); }
	
	void	clearMetadata()									{ _metadata.reset(nullptr); }
	void	setMetadata(SensorMetadata& s)					{ _metadata.reset(new SensorMetadata(s)); }
145
	void	setSkipConstVal(bool skipConstVal)				{ _skipConstVal = skipConstVal;	}
146
	void	setPublish(bool pub)							{ _publish = pub; }
147
	void	setDelta(const bool delta) 						{ _delta = delta; }
148
	void	setDeltaMaxValue(const uint64_t maxv)			{ _deltaMax = maxv; }
149
	void	setName(const std::string& name)				{ _name = name; }
150
151
	void	setMqtt(const std::string& mqtt)				{ _mqtt = mqtt; }
	void 	setCacheInterval(unsigned cacheInterval)		{ _cacheInterval = cacheInterval; }
Alessio Netti's avatar
Alessio Netti committed
152
	void	setSubsampling(int factor)						{ _subsamplingFactor = factor; }
153
154
	void    setLastRaw(int64_t raw)                         { _lastRawValue.value = raw; }
	void    setLastURaw(uint64_t raw)                       { _lastRawUValue.value = raw; }
155
156
157

	const std::size_t	getSizeOfReadingQueue() const { return _readingQueue->read_available(); }
	std::size_t 		popReadingQueue(reading_t *reads, std::size_t max) const	{ return _readingQueue->pop(reads, max); }
Alessio Netti's avatar
Alessio Netti committed
158
	void 				clearReadingQueue() const	{ reading_t buf; while(_readingQueue->pop(buf)) {} }
159
160
	void				pushReadingQueue(reading_t *reads, std::size_t count) const	{ _readingQueue->push(reads, count); }

161
	virtual void initSensor(unsigned interval, unsigned queueLen) {
Alessio Netti's avatar
Alessio Netti committed
162
		uint64_t cacheSize = _cacheInterval / interval + 1;
163
		if(!_cache) {
Alessio Netti's avatar
Alessio Netti committed
164
165
			//TODO: have all time-related configuration parameters use the same unit (e.g. milliseconds)
			_cache.reset(new CacheEntry( (uint64_t)_cacheInterval * 1000000, cacheSize));
166
			_cache->updateBatchSize(1, true);
167
168
		}
		if(!_readingQueue) {
169
			_readingQueue.reset(new boost::lockfree::spsc_queue<reading_t>(queueLen));
170
171
172
		}
	}

173
174
	/**
	 * Store a reading, in order to get it pushed to the data base eventually.
175
	 * Also this method takes care of other optional reading post-processing,
176
177
178
179
180
181
	 * e.g. delta computation, subsampling, caching, scaling, etc.
	 *
	 * This is the primary storeReading() and should be used whenever possible.
	 *
	 * @param rawReading  Reading struct with value and timestamp to be stored.
	 * @param factor      Scaling factor, which is applied to the reading value (optional)
Micha Müller's avatar
Micha Müller committed
182
	 * @param storeGlobal Store reading in reading queue, so that it can get pushed.
183
	 */
184
	void storeReading(reading_t rawReading, double factor=1.0, bool storeGlobal=true) {
185
186
187
188
        reading_t reading = rawReading;
        if( _delta ) {
            if (!_firstReading) {
                if (rawReading.value < _lastRawValue.value)
189
                    reading.value = (rawReading.value + ((int64_t)_deltaMax - _lastRawValue.value)) * factor;
190
191
192
193
194
195
196
197
198
199
200
                else
                    reading.value = (rawReading.value - _lastRawValue.value) * factor;
            } else {
                _firstReading = false;
                _lastRawValue = rawReading;
                return;
            }
            _lastRawValue = rawReading;
        }
        else
            reading.value = rawReading.value * factor;
201

202
203
204
205
		storeReadingLocal(reading);
		if (storeGlobal) {
		    storeReadingGlobal(reading);
		}
206
207
	}

208
209
	/**
     * Store an unsigned reading, in order to get it pushed to the data base eventually.
210
     * Also this method takes care of other optional reading post-processing,
211
212
213
214
215
216
217
218
219
220
221
     * e.g. delta computation, subsampling, caching, scaling, etc.
     *
     * This is a variant of the primary storeReading() for monotonically increasing
     * sensors reading unsigned 64bit values which may require more than the 63bit
     * offered by a signed reading_t. The readings are still stored as signed int64
     * in the database, therefore all such sensors should enable storage of deltas!
     *
     * This variant only adapts the delta computation for ureading_t actually.
     *
     * @param rawReading  Reading struct with (usigned) value and timestamp to be stored.
     * @param factor      Scaling factor, which is applied to the reading value (optional)
Micha Müller's avatar
Micha Müller committed
222
     * @param storeGlobal Store reading in reading queue, so that it can get pushed.
223
     */
224
	void storeReading(ureading_t rawReading, double factor=1.0, bool storeGlobal=true) {
225
226
        reading_t reading;
        reading.timestamp = rawReading.timestamp;
227
228
        if( _delta ) {
            if (!_firstReading) {
229
                if (rawReading.value < _lastRawUValue.value)
230
                    reading.value = (rawReading.value + (_deltaMax - _lastRawUValue.value)) * factor;
231
232
                else
                    reading.value = (rawReading.value - _lastRawUValue.value) * factor;
233
            } else {
234
235
236
                _firstReading = false;
                _lastRawUValue = rawReading;
                return;
237
238
239
240
241
242
            }
            _lastRawUValue = rawReading;
        }
        else
            reading.value = rawReading.value * factor;

243
244
245
246
        storeReadingLocal(reading);
        if (storeGlobal) {
            storeReadingGlobal(reading);
        }
247
248
    }

249
250
251
252
253
254
    /**
     * Store reading within the sensor, but do not put it in the readingQueue
     * so the reading does not get pushed but the caches are still updated.
     */
    inline
    void storeReadingLocal(reading_t reading) {
255
256
        _cache->store(reading);
        _latestValue = reading;
257
    }
258

259
260
261
262
263
264
    /**
     * Store the reading in the readingQueue so it can get pushed.
     */
    inline
    void storeReadingGlobal(reading_t reading) {
        if( _delta )
265
266
267
268
269
            // If in delta mode, _accumulator acts as a buffer, summing all deltas for the subsampling period
            _accumulator.value += reading.value;
        else
            _accumulator.value = reading.value;

Alessio Netti's avatar
Alessio Netti committed
270
        if (_subsamplingFactor>0 && _subsamplingIndex++%_subsamplingFactor==0) {
271
272
273
274
275
276
277
278
279
            _accumulator.timestamp = reading.timestamp;
            //TODO: if sensor starts with values of 0, these won't be pushed. This should be fixed
            if( !(_skipConstVal && (_accumulator.value == _lastSentValue.value)) ) {
                _readingQueue->push(_accumulator);
                _lastSentValue = _accumulator;
            }
            // We reset the accumulator's value for the correct accumulation of deltas
            _accumulator.value = 0;
        }
280
    }
281
    
282
    virtual void printConfig(LOG_LEVEL ll, LOGGER& lg, unsigned leadingSpaces=16) {
283
284
285
286
287
288
289
290
291
292
293
		std::string leading(leadingSpaces, ' ');
		LOG_VAR(ll) << leading << _name;
		if (getSubsampling() != 1) {
			LOG_VAR(ll) << leading << "    SubSampling:      " << getSubsampling();
		}
		LOG_VAR(ll) << leading << "    Skip const values: " << (_skipConstVal ? "true" : "false");
		LOG_VAR(ll) << leading << "    Store delta only:  " << (_delta ? "true" : "false");
		if(_delta) {
			LOG_VAR(ll) << leading << "    Maximum value:     " << _deltaMax;
		}
		LOG_VAR(ll) << leading << "    Publish:           " << (_publish ? "true" : "false");
294
    }
295

296
297
protected:

298
299
	std::string _name;
	std::string _mqtt;
300
	bool _skipConstVal;
301
	bool _publish;
302
	unsigned int _cacheInterval;
Alessio Netti's avatar
Alessio Netti committed
303
	int _subsamplingFactor;
304
	unsigned int _subsamplingIndex;
Alessio Netti's avatar
Alessio Netti committed
305
	std::unique_ptr<CacheEntry> _cache;
306
	bool _delta;
307
	uint64_t _deltaMax;
308
	bool _firstReading;
309
	ureading_t _lastRawUValue;
310
	reading_t _lastRawValue;
311
	reading_t _latestValue;
312
	reading_t _lastSentValue;
Alessio Netti's avatar
Alessio Netti committed
313
	reading_t _accumulator;
314
	std::unique_ptr<boost::lockfree::spsc_queue<reading_t>> _readingQueue;
315
	std::unique_ptr<SensorMetadata> _metadata;
316
317
};

318
319
320
//for better readability
using SBasePtr = std::shared_ptr<SensorBase>;

321
#endif /* SRC_SENSORBASE_H_ */