Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

sensorbase.h 12.3 KB
Newer Older
1
2
3
//================================================================================
// Name        : SensorBase.h
// Author      : Micha Mueller
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Copyright   : Leibniz Supercomputing Centre
// Description : General sensor base class.
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2018-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
27
28
29
30

#ifndef SRC_SENSORBASE_H_
#define SRC_SENSORBASE_H_

31
#include <fstream>
32
#include <memory>
33
#include <string>
34
#include <limits.h>
35
#include <boost/lockfree/spsc_queue.hpp>
36
#include "logging.h"
Alessio Netti's avatar
Alessio Netti committed
37
#include "cacheentry.h"
38
#include "metadatastore.h"
39

40
41
42
43
44
/**
 * @brief General sensor base class.
 *
 * @ingroup common
 */
45
46
47
48
class SensorBase {
public:
	SensorBase(const std::string& name) :
		_name(name),
49
		_mqtt(""),
50
		_skipConstVal(false),
51
		_publish(true),
52
		_cacheInterval(900000),
53
54
		_subsamplingFactor(1),
		_subsamplingIndex(0),
55
		_cache(nullptr),
56
		_delta(false),
57
		_firstReading(true),
58
59
		_readingQueue(nullptr),
		_metadata(nullptr) {
60

Alessio Netti's avatar
Alessio Netti committed
61
62
        _lastRawUValue.timestamp = 0;
        _lastRawUValue.value     = 0;
63
64
        _lastRawValue.timestamp = 0;
        _lastRawValue.value     = 0;
Alessio Netti's avatar
Alessio Netti committed
65
66
        _latestValue.timestamp	= 0;
        _latestValue.value		= 0;
67
68
        _lastSentValue.timestamp= 0;
        _lastSentValue.value	= 0;
Alessio Netti's avatar
Alessio Netti committed
69
70
        _accumulator.timestamp  = 0;
        _accumulator.value		= 0;
71
72
	}

73
74
75
	SensorBase(const SensorBase& other) :
		_name(other._name),
		_mqtt(other._mqtt),
76
		_skipConstVal(other._skipConstVal),
77
		_publish(other._publish),
78
		_cacheInterval(other._cacheInterval),
79
80
		_subsamplingFactor(other._subsamplingFactor),
		_subsamplingIndex(0),
81
		_cache(nullptr),
82
		_delta(other._delta),
83
		_firstReading(true),
Alessio Netti's avatar
Alessio Netti committed
84
		_lastRawUValue(other._lastRawUValue),
Alessio Netti's avatar
Alessio Netti committed
85
		_lastRawValue(other._lastRawValue),
Alessio Netti's avatar
Alessio Netti committed
86
		_latestValue(other._latestValue),
87
        _lastSentValue(other._lastSentValue),
Alessio Netti's avatar
Alessio Netti committed
88
        _accumulator(other._accumulator),
89
90
91
92
		_readingQueue(nullptr) {
	    
        _metadata.reset(other._metadata.get() ? new SensorMetadata(*other._metadata) : nullptr);
	}
93

94
	virtual ~SensorBase() {}
95
96
97
98

	SensorBase& operator=(const SensorBase& other) {
		_name = other._name;
		_mqtt = other._mqtt;
99
		_skipConstVal = other._skipConstVal;
100
		_publish = other._publish;
101
		_cacheInterval = other._cacheInterval;
102
103
		_subsamplingFactor = other._subsamplingFactor;
		_subsamplingIndex = 0;
104
		_cache.reset(nullptr);
105
		_delta = other._delta;
106
		_firstReading = true;
107
108
109
110
		_lastRawUValue.timestamp = other._lastRawUValue.timestamp;
        _lastRawUValue.value     = other._lastRawUValue.value;
		_lastRawValue.timestamp = other._lastRawValue.timestamp;
        _lastRawValue.value     = other._lastRawValue.value;
111
112
		_latestValue.timestamp	= other._latestValue.timestamp;
		_latestValue.value		= other._latestValue.value;
113
114
        _lastSentValue.timestamp= other._lastSentValue.timestamp;
        _lastSentValue.value	= other._lastSentValue.value;
Alessio Netti's avatar
Alessio Netti committed
115
116
        _accumulator.timestamp  = other._accumulator.timestamp;
        _accumulator.value      = other._accumulator.value;
117
		_readingQueue.reset(nullptr);
118
        _metadata.reset(other._metadata.get() ? new SensorMetadata(*other._metadata) : nullptr);
119
120

		return *this;
121
122
	}

123
	const bool 				isDelta()			const 	{ return _delta;}
124
125
	const std::string& 		getName() 			const	{ return _name; }
	const std::string&		getMqtt() 			const	{ return _mqtt; }
126
	bool					getSkipConstVal()	const	{ return _skipConstVal; }
127
	bool					getPublish()		const 	{ return _publish; }
Alessio Netti's avatar
Alessio Netti committed
128
	unsigned				getCacheInterval()	const	{ return _cacheInterval; }
Alessio Netti's avatar
Alessio Netti committed
129
	int 					getSubsampling()	const   { return _subsamplingFactor; }
Alessio Netti's avatar
Alessio Netti committed
130
	const CacheEntry* const	getCache() 			const	{ return _cache.get(); }
Micha Mueller's avatar
Micha Mueller committed
131
	const reading_t&		getLatestValue()	const	{ return _latestValue; }
Alessio Netti's avatar
Alessio Netti committed
132
	const bool				isInit()			const 	{ return _cache && _readingQueue; }
133

134
135
	// Exposing the reading queue is necessary for publishing sensor data from the collectagent
	boost::lockfree::spsc_queue<reading_t>* getReadingQueue() { return _readingQueue.get(); }
136
137
138
139
	SensorMetadata* getMetadata()						      { return _metadata.get(); }
	
	void	clearMetadata()									{ _metadata.reset(nullptr); }
	void	setMetadata(SensorMetadata& s)					{ _metadata.reset(new SensorMetadata(s)); }
140
	void	setSkipConstVal(bool skipConstVal)				{ _skipConstVal = skipConstVal;	}
141
	void	setPublish(bool pub)							{ _publish = pub; }
142
	void	setDelta(const bool delta) 						{ _delta = delta; }
143
	void	setName(const std::string& name)				{ _name = name; }
144
145
	void	setMqtt(const std::string& mqtt)				{ _mqtt = mqtt; }
	void 	setCacheInterval(unsigned cacheInterval)		{ _cacheInterval = cacheInterval; }
Alessio Netti's avatar
Alessio Netti committed
146
	void	setSubsampling(int factor)						{ _subsamplingFactor = factor; }
147
148
	void    setLastRaw(int64_t raw)                         { _lastRawValue.value = raw; }
	void    setLastURaw(uint64_t raw)                       { _lastRawUValue.value = raw; }
149
150
151

	const std::size_t	getSizeOfReadingQueue() const { return _readingQueue->read_available(); }
	std::size_t 		popReadingQueue(reading_t *reads, std::size_t max) const	{ return _readingQueue->pop(reads, max); }
Alessio Netti's avatar
Alessio Netti committed
152
	void 				clearReadingQueue() const	{ reading_t buf; while(_readingQueue->pop(buf)) {} }
153
154
	void				pushReadingQueue(reading_t *reads, std::size_t count) const	{ _readingQueue->push(reads, count); }

155
	virtual void initSensor(unsigned interval, unsigned queueLen) {
Alessio Netti's avatar
Alessio Netti committed
156
		uint64_t cacheSize = _cacheInterval / interval + 1;
157
		if(!_cache) {
Alessio Netti's avatar
Alessio Netti committed
158
159
			//TODO: have all time-related configuration parameters use the same unit (e.g. milliseconds)
			_cache.reset(new CacheEntry( (uint64_t)_cacheInterval * 1000000, cacheSize));
160
			_cache->updateBatchSize(1, true);
161
162
		}
		if(!_readingQueue) {
163
			_readingQueue.reset(new boost::lockfree::spsc_queue<reading_t>(queueLen));
164
165
166
		}
	}

167
168
	/**
	 * Store a reading, in order to get it pushed to the data base eventually.
169
	 * Also this method takes care of other optional reading post-processing,
170
171
172
173
174
175
176
177
	 * e.g. delta computation, subsampling, caching, scaling, etc.
	 *
	 * This is the primary storeReading() and should be used whenever possible.
	 *
	 * @param rawReading  Reading struct with value and timestamp to be stored.
	 * @param factor      Scaling factor, which is applied to the reading value (optional)
	 * @param maxValue    Maximum possible value of the reading; required for the
	 *                    delta computation to detect an overflow.
Micha Müller's avatar
Micha Müller committed
178
	 * @param storeGlobal Store reading in reading queue, so that it can get pushed.
179
	 */
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
	void storeReading(reading_t rawReading, double factor=1.0, long long maxValue=LLONG_MAX, bool storeGlobal=true) {
        reading_t reading = rawReading;
        if( _delta ) {
            if (!_firstReading) {
                if (rawReading.value < _lastRawValue.value)
                    reading.value = (rawReading.value + (maxValue - _lastRawValue.value)) * factor;
                else
                    reading.value = (rawReading.value - _lastRawValue.value) * factor;
            } else {
                _firstReading = false;
                _lastRawValue = rawReading;
                return;
            }
            _lastRawValue = rawReading;
        }
        else
            reading.value = rawReading.value * factor;
197

198
199
200
201
		storeReadingLocal(reading);
		if (storeGlobal) {
		    storeReadingGlobal(reading);
		}
202
203
	}

204
205
	/**
     * Store an unsigned reading, in order to get it pushed to the data base eventually.
206
     * Also this method takes care of other optional reading post-processing,
207
208
209
210
211
212
213
214
215
216
217
218
219
     * e.g. delta computation, subsampling, caching, scaling, etc.
     *
     * This is a variant of the primary storeReading() for monotonically increasing
     * sensors reading unsigned 64bit values which may require more than the 63bit
     * offered by a signed reading_t. The readings are still stored as signed int64
     * in the database, therefore all such sensors should enable storage of deltas!
     *
     * This variant only adapts the delta computation for ureading_t actually.
     *
     * @param rawReading  Reading struct with (usigned) value and timestamp to be stored.
     * @param factor      Scaling factor, which is applied to the reading value (optional)
     * @param maxValue    Maximum possible value of the reading; required for the
     *                    delta computation to detect an overflow.
Micha Müller's avatar
Micha Müller committed
220
     * @param storeGlobal Store reading in reading queue, so that it can get pushed.
221
     */
222
	void storeReading(ureading_t rawReading, double factor=1.0, unsigned long long maxValue=ULLONG_MAX, bool storeGlobal=true) {
223
224
        reading_t reading;
        reading.timestamp = rawReading.timestamp;
225
226
        if( _delta ) {
            if (!_firstReading) {
227
228
229
230
                if (rawReading.value < _lastRawUValue.value)
                    reading.value = (rawReading.value + (maxValue - _lastRawUValue.value)) * factor;
                else
                    reading.value = (rawReading.value - _lastRawUValue.value) * factor;
231
            } else {
232
233
234
                _firstReading = false;
                _lastRawUValue = rawReading;
                return;
235
236
237
238
239
240
            }
            _lastRawUValue = rawReading;
        }
        else
            reading.value = rawReading.value * factor;

241
242
243
244
        storeReadingLocal(reading);
        if (storeGlobal) {
            storeReadingGlobal(reading);
        }
245
246
    }

247
248
249
250
251
252
    /**
     * Store reading within the sensor, but do not put it in the readingQueue
     * so the reading does not get pushed but the caches are still updated.
     */
    inline
    void storeReadingLocal(reading_t reading) {
253
254
        _cache->store(reading);
        _latestValue = reading;
255
    }
256

257
258
259
260
261
262
    /**
     * Store the reading in the readingQueue so it can get pushed.
     */
    inline
    void storeReadingGlobal(reading_t reading) {
        if( _delta )
263
264
265
266
267
            // If in delta mode, _accumulator acts as a buffer, summing all deltas for the subsampling period
            _accumulator.value += reading.value;
        else
            _accumulator.value = reading.value;

Alessio Netti's avatar
Alessio Netti committed
268
        if (_subsamplingFactor>0 && _subsamplingIndex++%_subsamplingFactor==0) {
269
270
271
272
273
274
275
276
277
            _accumulator.timestamp = reading.timestamp;
            //TODO: if sensor starts with values of 0, these won't be pushed. This should be fixed
            if( !(_skipConstVal && (_accumulator.value == _lastSentValue.value)) ) {
                _readingQueue->push(_accumulator);
                _lastSentValue = _accumulator;
            }
            // We reset the accumulator's value for the correct accumulation of deltas
            _accumulator.value = 0;
        }
278
    }
279
    
280
281
282
283
284
    virtual void printConfig(LOG_LEVEL ll, LOGGER& lg, unsigned leadingSpaces=16) {
	std::string leading(leadingSpaces, ' ');
	LOG_VAR(ll) << leading << _name;
	if (getSubsampling() != 1) {
	    LOG_VAR(ll) << leading << "    SubSampling:      " << getSubsampling();
285
	}
286
287
	LOG_VAR(ll) << leading << "    Skip const values: " << (_skipConstVal ? "true" : "false");
	LOG_VAR(ll) << leading << "    Store delta only:  " << (_delta ? "true" : "false");
288
	LOG_VAR(ll) << leading << "    Publish:           " << (_publish ? "true" : "false");
289
    }
290

291
292
protected:

293
294
	std::string _name;
	std::string _mqtt;
295
	bool _skipConstVal;
296
	bool _publish;
297
	unsigned int _cacheInterval;
Alessio Netti's avatar
Alessio Netti committed
298
	int _subsamplingFactor;
299
	unsigned int _subsamplingIndex;
Alessio Netti's avatar
Alessio Netti committed
300
	std::unique_ptr<CacheEntry> _cache;
301
	bool _delta;
302
	bool _firstReading;
303
	ureading_t _lastRawUValue;
304
	reading_t _lastRawValue;
305
	reading_t _latestValue;
306
	reading_t _lastSentValue;
Alessio Netti's avatar
Alessio Netti committed
307
	reading_t _accumulator;
308
	std::unique_ptr<boost::lockfree::spsc_queue<reading_t>> _readingQueue;
309
	std::unique_ptr<SensorMetadata> _metadata;
310
311
};

312
313
314
//for better readability
using SBasePtr = std::shared_ptr<SensorBase>;

315
#endif /* SRC_SENSORBASE_H_ */