Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

sensorbase.h 11.1 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
/*
 * SensorBase.h
 *
 *  Created on: 09.08.2018
 *      Author: Micha Mueller
 */

#ifndef SRC_SENSORBASE_H_
#define SRC_SENSORBASE_H_

11
#include <fstream>
12
#include <memory>
13
#include <string>
14
#include <limits.h>
15
#include <boost/lockfree/spsc_queue.hpp>
16
#include "logging.h"
Alessio Netti's avatar
Alessio Netti committed
17
#include "cacheentry.h"
18

19
20
class SensorBase {
public:
lu43jih's avatar
lu43jih committed
21
22
	static const size_t QUEUE_MAXLIMIT=1024;

23
24
	SensorBase(const std::string& name) :
		_name(name),
25
		_mqtt(""),
26
		_sinkPath(""),
27
		_skipConstVal(false),
28
		_cacheInterval(900000),
29
30
		_subsamplingFactor(1),
		_subsamplingIndex(0),
31
		_cache(nullptr),
32
		_delta(false),
33
		_firstReading(true),
34
35
		_readingQueue(nullptr),
		_sinkFile(nullptr) {
36

Alessio Netti's avatar
Alessio Netti committed
37
38
        _lastRawUValue.timestamp = 0;
        _lastRawUValue.value     = 0;
39
40
        _lastRawValue.timestamp = 0;
        _lastRawValue.value     = 0;
Alessio Netti's avatar
Alessio Netti committed
41
42
        _latestValue.timestamp	= 0;
        _latestValue.value		= 0;
43
44
        _lastSentValue.timestamp= 0;
        _lastSentValue.value	= 0;
Alessio Netti's avatar
Alessio Netti committed
45
46
        _accumulator.timestamp  = 0;
        _accumulator.value		= 0;
47
48
	}

49
50
51
	SensorBase(const SensorBase& other) :
		_name(other._name),
		_mqtt(other._mqtt),
52
		_skipConstVal(other._skipConstVal),
53
		_cacheInterval(other._cacheInterval),
54
55
		_subsamplingFactor(other._subsamplingFactor),
		_subsamplingIndex(0),
56
		_cache(nullptr),
57
		_delta(other._delta),
58
		_firstReading(true),
Alessio Netti's avatar
Alessio Netti committed
59
		_lastRawUValue(other._lastRawUValue),
Alessio Netti's avatar
Alessio Netti committed
60
		_lastRawValue(other._lastRawValue),
Alessio Netti's avatar
Alessio Netti committed
61
		_latestValue(other._latestValue),
62
        _lastSentValue(other._lastSentValue),
Alessio Netti's avatar
Alessio Netti committed
63
        _accumulator(other._accumulator),
64
65
		_readingQueue(nullptr),
		_sinkFile(nullptr) {}
66
67
68
69
70
71

	virtual ~SensorBase() {}

	SensorBase& operator=(const SensorBase& other) {
		_name = other._name;
		_mqtt = other._mqtt;
72
		_skipConstVal = other._skipConstVal;
73
		_cacheInterval = other._cacheInterval;
74
75
		_subsamplingFactor = other._subsamplingFactor;
		_subsamplingIndex = 0;
76
		_cache.reset(nullptr);
77
		_delta = other._delta;
78
		_firstReading = true;
79
80
81
82
		_lastRawUValue.timestamp = other._lastRawUValue.timestamp;
        _lastRawUValue.value     = other._lastRawUValue.value;
		_lastRawValue.timestamp = other._lastRawValue.timestamp;
        _lastRawValue.value     = other._lastRawValue.value;
83
84
		_latestValue.timestamp	= other._latestValue.timestamp;
		_latestValue.value		= other._latestValue.value;
85
86
        _lastSentValue.timestamp= other._lastSentValue.timestamp;
        _lastSentValue.value	= other._lastSentValue.value;
Alessio Netti's avatar
Alessio Netti committed
87
88
        _accumulator.timestamp  = other._accumulator.timestamp;
        _accumulator.value      = other._accumulator.value;
89
		_readingQueue.reset(nullptr);
90
		_sinkFile.reset(nullptr);
91
92

		return *this;
93
94
	}

95
	const bool 				isDelta()			const 	{ return _delta;}
96
97
	const std::string& 		getName() 			const	{ return _name; }
	const std::string&		getMqtt() 			const	{ return _mqtt; }
98
	const std::string&		getSinkPath() 		const	{ return _sinkPath; }
99
	bool					getSkipConstVal()	const	{ return _skipConstVal; }
Alessio Netti's avatar
Alessio Netti committed
100
	unsigned				getCacheInterval()	const	{ return _cacheInterval; }
101
	unsigned 				getSubsampling()	const   { return _subsamplingFactor; }
Alessio Netti's avatar
Alessio Netti committed
102
	const CacheEntry* const	getCache() 			const	{ return _cache.get(); }
Micha Mueller's avatar
Micha Mueller committed
103
	const reading_t&		getLatestValue()	const	{ return _latestValue; }
Alessio Netti's avatar
Alessio Netti committed
104
	const bool				isInit()			const 	{ return _cache && _readingQueue; }
105

106
107
108
	// Exposing the reading queue is necessary for publishing sensor data from the collectagent
	boost::lockfree::spsc_queue<reading_t>* getReadingQueue() { return _readingQueue.get(); }

109
	void	setSkipConstVal(bool skipConstVal)				{ _skipConstVal = skipConstVal;	}
110
111
112
	void	setDelta(const bool delta) 						{ _delta = delta; }
	void	setName(const std::string& name, int cpuID=-1)	{ _name = formatName(name, cpuID); }
	void	setMqtt(const std::string& mqtt)				{ _mqtt = mqtt; }
113
	void 	setSinkPath(const std::string& path)			{ _sinkPath = path; }
114
	void 	setCacheInterval(unsigned cacheInterval)		{ _cacheInterval = cacheInterval; }
115
	void	setSubsampling(unsigned factor)					{ _subsamplingFactor = factor; }
116
117
	void    setLastRaw(int64_t raw)                         { _lastRawValue.value = raw; }
	void    setLastURaw(uint64_t raw)                       { _lastRawUValue.value = raw; }
118
119
120

	const std::size_t	getSizeOfReadingQueue() const { return _readingQueue->read_available(); }
	std::size_t 		popReadingQueue(reading_t *reads, std::size_t max) const	{ return _readingQueue->pop(reads, max); }
Alessio Netti's avatar
Alessio Netti committed
121
	void 				clearReadingQueue() const	{ reading_t buf; while(_readingQueue->pop(buf)) {} }
122
123
	void				pushReadingQueue(reading_t *reads, std::size_t count) const	{ _readingQueue->push(reads, count); }

124
	void initSensor(unsigned interval) {
Alessio Netti's avatar
Alessio Netti committed
125
		uint64_t cacheSize = _cacheInterval / interval + 1;
126
		if(!_cache) {
Alessio Netti's avatar
Alessio Netti committed
127
128
			//TODO: have all time-related configuration parameters use the same unit (e.g. milliseconds)
			_cache.reset(new CacheEntry( (uint64_t)_cacheInterval * 1000000, cacheSize));
129
			_cache->updateBatchSize(1, true);
130
131
		}
		if(!_readingQueue) {
lu43jih's avatar
lu43jih committed
132
			_readingQueue.reset(new boost::lockfree::spsc_queue<reading_t>(QUEUE_MAXLIMIT));
133
		}
134
135
136
137
138
		if(!_sinkFile && _sinkPath != "") {
			_sinkFile.reset(new std::ofstream(_sinkPath));
			if(!_sinkFile->is_open())
				_sinkFile.reset(nullptr);
		}
139
140
	}

141
142
	/**
	 * Store a reading, in order to get it pushed to the data base eventually.
143
	 * Also this method takes care of other optional reading post-processing,
144
145
146
147
148
149
150
151
152
	 * e.g. delta computation, subsampling, caching, scaling, etc.
	 *
	 * This is the primary storeReading() and should be used whenever possible.
	 *
	 * @param rawReading  Reading struct with value and timestamp to be stored.
	 * @param factor      Scaling factor, which is applied to the reading value (optional)
	 * @param maxValue    Maximum possible value of the reading; required for the
	 *                    delta computation to detect an overflow.
	 */
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
	void storeReading(reading_t rawReading, double factor=1.0, long long maxValue=LLONG_MAX, bool storeGlobal=true) {
        reading_t reading = rawReading;
        if( _delta ) {
            if (!_firstReading) {
                if (rawReading.value < _lastRawValue.value)
                    reading.value = (rawReading.value + (maxValue - _lastRawValue.value)) * factor;
                else
                    reading.value = (rawReading.value - _lastRawValue.value) * factor;
            } else {
                _firstReading = false;
                _lastRawValue = rawReading;
                return;
            }
            _lastRawValue = rawReading;
        }
        else
            reading.value = rawReading.value * factor;
170

171
172
173
174
		storeReadingLocal(reading);
		if (storeGlobal) {
		    storeReadingGlobal(reading);
		}
175
176
	}

177
178
	/**
     * Store an unsigned reading, in order to get it pushed to the data base eventually.
179
     * Also this method takes care of other optional reading post-processing,
180
181
182
183
184
185
186
187
188
189
190
191
192
193
     * e.g. delta computation, subsampling, caching, scaling, etc.
     *
     * This is a variant of the primary storeReading() for monotonically increasing
     * sensors reading unsigned 64bit values which may require more than the 63bit
     * offered by a signed reading_t. The readings are still stored as signed int64
     * in the database, therefore all such sensors should enable storage of deltas!
     *
     * This variant only adapts the delta computation for ureading_t actually.
     *
     * @param rawReading  Reading struct with (usigned) value and timestamp to be stored.
     * @param factor      Scaling factor, which is applied to the reading value (optional)
     * @param maxValue    Maximum possible value of the reading; required for the
     *                    delta computation to detect an overflow.
     */
194
	void storeReading(ureading_t rawReading, double factor=1.0, unsigned long long maxValue=ULLONG_MAX, bool storeGlobal=true) {
195
196
        reading_t reading;
        reading.timestamp = rawReading.timestamp;
197
198
        if( _delta ) {
            if (!_firstReading) {
199
200
201
202
                if (rawReading.value < _lastRawUValue.value)
                    reading.value = (rawReading.value + (maxValue - _lastRawUValue.value)) * factor;
                else
                    reading.value = (rawReading.value - _lastRawUValue.value) * factor;
203
            } else {
204
205
206
                _firstReading = false;
                _lastRawUValue = rawReading;
                return;
207
208
209
210
211
212
            }
            _lastRawUValue = rawReading;
        }
        else
            reading.value = rawReading.value * factor;

213
214
215
216
        storeReadingLocal(reading);
        if (storeGlobal) {
            storeReadingGlobal(reading);
        }
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
    }

	static std::string formatName(const std::string& name, int cpuID=-1) {return cpuID<0 ? name : "cpu" + std::to_string(cpuID) + "." + name;}

	virtual void printConfig(LOG_LEVEL ll, LOGGER& lg, unsigned leadingSpaces=16) {
	  std::string leading(leadingSpaces, ' ');
	  LOG_VAR(ll) << leading << "Sensor: " << _name;
	  LOG_VAR(ll) << leading << "    MQTT Topic:  " << _mqtt;
	  LOG_VAR(ll) << leading << "    Sink:        " << (getSinkPath() != "" ? getSinkPath() : "none");
      LOG_VAR(ll) << leading << "    SubSampling: " << getSubsampling();
      LOG_VAR(ll) << leading << (_skipConstVal ? "    Skipping constant values" : "    No skipping of constant values");
      LOG_VAR(ll) << leading << (_delta ? "    Storing delta readings" : "    Storing absolute readings");
	}

protected:

	/**
234
235
	 * Store reading within the sensor, but do not put it in the readingQueue
	 * so the reading does not get pushed but the caches are still updated.
236
237
	 */
	inline
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
	void storeReadingLocal(reading_t reading) {
	    if (_sinkFile) {
            try {
                _sinkFile->seekp(0, std::ios::beg);
                *_sinkFile << reading.value << std::endl;
            } catch(const std::exception &e) { _sinkFile->close(); _sinkFile.reset(nullptr); }
        }

        _cache->store(reading);
        _latestValue = reading;
	}

	/**
	 * Store the reading in the readingQueue so it can get pushed.
	 */
	inline
	void storeReadingGlobal(reading_t reading) {
	    if( _delta )
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
            // If in delta mode, _accumulator acts as a buffer, summing all deltas for the subsampling period
            _accumulator.value += reading.value;
        else
            _accumulator.value = reading.value;

        if (_subsamplingIndex++ % _subsamplingFactor == 0) {
            _accumulator.timestamp = reading.timestamp;
            //TODO: if sensor starts with values of 0, these won't be pushed. This should be fixed
            if( !(_skipConstVal && (_accumulator.value == _lastSentValue.value)) ) {
                _readingQueue->push(_accumulator);
                _lastSentValue = _accumulator;
            }
            // We reset the accumulator's value for the correct accumulation of deltas
            _accumulator.value = 0;
        }
271
272
	}

273
274
	std::string _name;
	std::string _mqtt;
275
	std::string _sinkPath;
276
	bool _skipConstVal;
277
	unsigned int _cacheInterval;
278
279
	unsigned int _subsamplingFactor;
	unsigned int _subsamplingIndex;
Alessio Netti's avatar
Alessio Netti committed
280
	std::unique_ptr<CacheEntry> _cache;
281
	bool _delta;
282
	bool _firstReading;
283
	ureading_t _lastRawUValue;
284
	reading_t _lastRawValue;
285
	reading_t _latestValue;
286
	reading_t _lastSentValue;
Alessio Netti's avatar
Alessio Netti committed
287
	reading_t _accumulator;
288
	std::unique_ptr<boost::lockfree::spsc_queue<reading_t>> _readingQueue;
289
	std::unique_ptr<std::ofstream> _sinkFile;
290
291
};

292
293
294
//for better readability
using SBasePtr = std::shared_ptr<SensorBase>;

295
#endif /* SRC_SENSORBASE_H_ */