Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

sensorbase.h 12.3 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//================================================================================
// Name        : SensorBase.h
// Author      : Micha Mueller
// Copyright   : Leibniz Supercomputing Centre
// Description : General sensor base class.
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2018-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
26
27
28
29

#ifndef SRC_SENSORBASE_H_
#define SRC_SENSORBASE_H_

30
#include <fstream>
31
#include <memory>
32
#include <string>
33
#include <limits.h>
34
#include <boost/lockfree/spsc_queue.hpp>
35
#include "logging.h"
Alessio Netti's avatar
Alessio Netti committed
36
#include "cacheentry.h"
37

38
39
class SensorBase {
public:
lu43jih's avatar
lu43jih committed
40
41
	static const size_t QUEUE_MAXLIMIT=1024;

42
43
	SensorBase(const std::string& name) :
		_name(name),
44
		_mqtt(""),
45
		_sinkPath(""),
46
		_skipConstVal(false),
47
		_cacheInterval(900000),
48
49
		_subsamplingFactor(1),
		_subsamplingIndex(0),
50
		_cache(nullptr),
51
		_delta(false),
52
		_firstReading(true),
53
54
		_readingQueue(nullptr),
		_sinkFile(nullptr) {
55

Alessio Netti's avatar
Alessio Netti committed
56
57
        _lastRawUValue.timestamp = 0;
        _lastRawUValue.value     = 0;
58
59
        _lastRawValue.timestamp = 0;
        _lastRawValue.value     = 0;
Alessio Netti's avatar
Alessio Netti committed
60
61
        _latestValue.timestamp	= 0;
        _latestValue.value		= 0;
62
63
        _lastSentValue.timestamp= 0;
        _lastSentValue.value	= 0;
Alessio Netti's avatar
Alessio Netti committed
64
65
        _accumulator.timestamp  = 0;
        _accumulator.value		= 0;
66
67
	}

68
69
70
	SensorBase(const SensorBase& other) :
		_name(other._name),
		_mqtt(other._mqtt),
71
		_sinkPath(other._sinkPath),
72
		_skipConstVal(other._skipConstVal),
73
		_cacheInterval(other._cacheInterval),
74
75
		_subsamplingFactor(other._subsamplingFactor),
		_subsamplingIndex(0),
76
		_cache(nullptr),
77
		_delta(other._delta),
78
		_firstReading(true),
Alessio Netti's avatar
Alessio Netti committed
79
		_lastRawUValue(other._lastRawUValue),
Alessio Netti's avatar
Alessio Netti committed
80
		_lastRawValue(other._lastRawValue),
Alessio Netti's avatar
Alessio Netti committed
81
		_latestValue(other._latestValue),
82
        _lastSentValue(other._lastSentValue),
Alessio Netti's avatar
Alessio Netti committed
83
        _accumulator(other._accumulator),
84
85
		_readingQueue(nullptr),
		_sinkFile(nullptr) {}
86

87
	virtual ~SensorBase() { if(_sinkFile) _sinkFile->close(); }
88
89
90
91

	SensorBase& operator=(const SensorBase& other) {
		_name = other._name;
		_mqtt = other._mqtt;
92
		_sinkPath = other._sinkPath;
93
		_skipConstVal = other._skipConstVal;
94
		_cacheInterval = other._cacheInterval;
95
96
		_subsamplingFactor = other._subsamplingFactor;
		_subsamplingIndex = 0;
97
		_cache.reset(nullptr);
98
		_delta = other._delta;
99
		_firstReading = true;
100
101
102
103
		_lastRawUValue.timestamp = other._lastRawUValue.timestamp;
        _lastRawUValue.value     = other._lastRawUValue.value;
		_lastRawValue.timestamp = other._lastRawValue.timestamp;
        _lastRawValue.value     = other._lastRawValue.value;
104
105
		_latestValue.timestamp	= other._latestValue.timestamp;
		_latestValue.value		= other._latestValue.value;
106
107
        _lastSentValue.timestamp= other._lastSentValue.timestamp;
        _lastSentValue.value	= other._lastSentValue.value;
Alessio Netti's avatar
Alessio Netti committed
108
109
        _accumulator.timestamp  = other._accumulator.timestamp;
        _accumulator.value      = other._accumulator.value;
110
		_readingQueue.reset(nullptr);
111
		_sinkFile.reset(nullptr);
112
113

		return *this;
114
115
	}

116
	const bool 				isDelta()			const 	{ return _delta;}
117
118
	const std::string& 		getName() 			const	{ return _name; }
	const std::string&		getMqtt() 			const	{ return _mqtt; }
119
	const std::string&		getSinkPath() 		const	{ return _sinkPath; }
120
	bool					getSkipConstVal()	const	{ return _skipConstVal; }
Alessio Netti's avatar
Alessio Netti committed
121
	unsigned				getCacheInterval()	const	{ return _cacheInterval; }
122
	unsigned 				getSubsampling()	const   { return _subsamplingFactor; }
Alessio Netti's avatar
Alessio Netti committed
123
	const CacheEntry* const	getCache() 			const	{ return _cache.get(); }
Micha Mueller's avatar
Micha Mueller committed
124
	const reading_t&		getLatestValue()	const	{ return _latestValue; }
Alessio Netti's avatar
Alessio Netti committed
125
	const bool				isInit()			const 	{ return _cache && _readingQueue; }
126

127
128
129
	// Exposing the reading queue is necessary for publishing sensor data from the collectagent
	boost::lockfree::spsc_queue<reading_t>* getReadingQueue() { return _readingQueue.get(); }

130
	void	setSkipConstVal(bool skipConstVal)				{ _skipConstVal = skipConstVal;	}
131
	void	setDelta(const bool delta) 						{ _delta = delta; }
132
	void	setName(const std::string& name)				{ _name = name; }
133
	void	setMqtt(const std::string& mqtt)				{ _mqtt = mqtt; }
134
	void 	setSinkPath(const std::string& path)			{ _sinkPath = path; }
135
	void 	setCacheInterval(unsigned cacheInterval)		{ _cacheInterval = cacheInterval; }
136
	void	setSubsampling(unsigned factor)					{ _subsamplingFactor = factor; }
137
138
	void    setLastRaw(int64_t raw)                         { _lastRawValue.value = raw; }
	void    setLastURaw(uint64_t raw)                       { _lastRawUValue.value = raw; }
139
140
141

	const std::size_t	getSizeOfReadingQueue() const { return _readingQueue->read_available(); }
	std::size_t 		popReadingQueue(reading_t *reads, std::size_t max) const	{ return _readingQueue->pop(reads, max); }
Alessio Netti's avatar
Alessio Netti committed
142
	void 				clearReadingQueue() const	{ reading_t buf; while(_readingQueue->pop(buf)) {} }
143
144
	void				pushReadingQueue(reading_t *reads, std::size_t count) const	{ _readingQueue->push(reads, count); }

145
	void initSensor(unsigned interval) {
Alessio Netti's avatar
Alessio Netti committed
146
		uint64_t cacheSize = _cacheInterval / interval + 1;
147
		if(!_cache) {
Alessio Netti's avatar
Alessio Netti committed
148
149
			//TODO: have all time-related configuration parameters use the same unit (e.g. milliseconds)
			_cache.reset(new CacheEntry( (uint64_t)_cacheInterval * 1000000, cacheSize));
150
			_cache->updateBatchSize(1, true);
151
152
		}
		if(!_readingQueue) {
lu43jih's avatar
lu43jih committed
153
			_readingQueue.reset(new boost::lockfree::spsc_queue<reading_t>(QUEUE_MAXLIMIT));
154
		}
155
156
157
158
159
		if(!_sinkFile && _sinkPath != "") {
			_sinkFile.reset(new std::ofstream(_sinkPath));
			if(!_sinkFile->is_open())
				_sinkFile.reset(nullptr);
		}
160
161
	}

162
163
	/**
	 * Store a reading, in order to get it pushed to the data base eventually.
164
	 * Also this method takes care of other optional reading post-processing,
165
166
167
168
169
170
171
172
173
	 * e.g. delta computation, subsampling, caching, scaling, etc.
	 *
	 * This is the primary storeReading() and should be used whenever possible.
	 *
	 * @param rawReading  Reading struct with value and timestamp to be stored.
	 * @param factor      Scaling factor, which is applied to the reading value (optional)
	 * @param maxValue    Maximum possible value of the reading; required for the
	 *                    delta computation to detect an overflow.
	 */
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
	void storeReading(reading_t rawReading, double factor=1.0, long long maxValue=LLONG_MAX, bool storeGlobal=true) {
        reading_t reading = rawReading;
        if( _delta ) {
            if (!_firstReading) {
                if (rawReading.value < _lastRawValue.value)
                    reading.value = (rawReading.value + (maxValue - _lastRawValue.value)) * factor;
                else
                    reading.value = (rawReading.value - _lastRawValue.value) * factor;
            } else {
                _firstReading = false;
                _lastRawValue = rawReading;
                return;
            }
            _lastRawValue = rawReading;
        }
        else
            reading.value = rawReading.value * factor;
191

192
193
194
195
		storeReadingLocal(reading);
		if (storeGlobal) {
		    storeReadingGlobal(reading);
		}
196
197
	}

198
199
	/**
     * Store an unsigned reading, in order to get it pushed to the data base eventually.
200
     * Also this method takes care of other optional reading post-processing,
201
202
203
204
205
206
207
208
209
210
211
212
213
214
     * e.g. delta computation, subsampling, caching, scaling, etc.
     *
     * This is a variant of the primary storeReading() for monotonically increasing
     * sensors reading unsigned 64bit values which may require more than the 63bit
     * offered by a signed reading_t. The readings are still stored as signed int64
     * in the database, therefore all such sensors should enable storage of deltas!
     *
     * This variant only adapts the delta computation for ureading_t actually.
     *
     * @param rawReading  Reading struct with (usigned) value and timestamp to be stored.
     * @param factor      Scaling factor, which is applied to the reading value (optional)
     * @param maxValue    Maximum possible value of the reading; required for the
     *                    delta computation to detect an overflow.
     */
215
	void storeReading(ureading_t rawReading, double factor=1.0, unsigned long long maxValue=ULLONG_MAX, bool storeGlobal=true) {
216
217
        reading_t reading;
        reading.timestamp = rawReading.timestamp;
218
219
        if( _delta ) {
            if (!_firstReading) {
220
221
222
223
                if (rawReading.value < _lastRawUValue.value)
                    reading.value = (rawReading.value + (maxValue - _lastRawUValue.value)) * factor;
                else
                    reading.value = (rawReading.value - _lastRawUValue.value) * factor;
224
            } else {
225
226
227
                _firstReading = false;
                _lastRawUValue = rawReading;
                return;
228
229
230
231
232
233
            }
            _lastRawUValue = rawReading;
        }
        else
            reading.value = rawReading.value * factor;

234
235
236
237
        storeReadingLocal(reading);
        if (storeGlobal) {
            storeReadingGlobal(reading);
        }
238
239
    }

240
241
242
243
244
245
246
    /**
     * Store reading within the sensor, but do not put it in the readingQueue
     * so the reading does not get pushed but the caches are still updated.
     */
    inline
    void storeReadingLocal(reading_t reading) {
        if (_sinkFile) {
247
248
249
250
251
252
253
254
            try {
                _sinkFile->seekp(0, std::ios::beg);
                *_sinkFile << reading.value << std::endl;
            } catch(const std::exception &e) { _sinkFile->close(); _sinkFile.reset(nullptr); }
        }

        _cache->store(reading);
        _latestValue = reading;
255
    }
256

257
258
259
260
261
262
    /**
     * Store the reading in the readingQueue so it can get pushed.
     */
    inline
    void storeReadingGlobal(reading_t reading) {
        if( _delta )
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
            // If in delta mode, _accumulator acts as a buffer, summing all deltas for the subsampling period
            _accumulator.value += reading.value;
        else
            _accumulator.value = reading.value;

        if (_subsamplingIndex++ % _subsamplingFactor == 0) {
            _accumulator.timestamp = reading.timestamp;
            //TODO: if sensor starts with values of 0, these won't be pushed. This should be fixed
            if( !(_skipConstVal && (_accumulator.value == _lastSentValue.value)) ) {
                _readingQueue->push(_accumulator);
                _lastSentValue = _accumulator;
            }
            // We reset the accumulator's value for the correct accumulation of deltas
            _accumulator.value = 0;
        }
278
    }
279
    
280
281
282
	virtual void printConfig(LOG_LEVEL ll, LOGGER& lg, unsigned leadingSpaces=16) {
	  std::string leading(leadingSpaces, ' ');
	  LOG_VAR(ll) << leading << "Sensor: " << _name;
Alessio Netti's avatar
Alessio Netti committed
283
	  //LOG_VAR(ll) << leading << "    MQTT Topic:  " << _mqtt;
284
285
286
287
	  LOG_VAR(ll) << leading << "    Sink:        " << (getSinkPath() != "" ? getSinkPath() : "none");
      LOG_VAR(ll) << leading << "    SubSampling: " << getSubsampling();
      LOG_VAR(ll) << leading << (_skipConstVal ? "    Skipping constant values" : "    No skipping of constant values");
      LOG_VAR(ll) << leading << (_delta ? "    Storing delta readings" : "    Storing absolute readings");
288
289
	}

290
291
protected:

292
293
	std::string _name;
	std::string _mqtt;
294
	std::string _sinkPath;
295
	bool _skipConstVal;
296
	unsigned int _cacheInterval;
297
298
	unsigned int _subsamplingFactor;
	unsigned int _subsamplingIndex;
Alessio Netti's avatar
Alessio Netti committed
299
	std::unique_ptr<CacheEntry> _cache;
300
	bool _delta;
301
	bool _firstReading;
302
	ureading_t _lastRawUValue;
303
	reading_t _lastRawValue;
304
	reading_t _latestValue;
305
	reading_t _lastSentValue;
Alessio Netti's avatar
Alessio Netti committed
306
	reading_t _accumulator;
307
	std::unique_ptr<boost::lockfree::spsc_queue<reading_t>> _readingQueue;
308
	std::unique_ptr<std::ofstream> _sinkFile;
309
310
};

311
312
313
//for better readability
using SBasePtr = std::shared_ptr<SensorBase>;

314
#endif /* SRC_SENSORBASE_H_ */