SensorBase.h 10.8 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
/*
 * SensorBase.h
 *
 *  Created on: 09.08.2018
 *      Author: Micha Mueller
 */

#ifndef SRC_SENSORBASE_H_
#define SRC_SENSORBASE_H_

11
#include <fstream>
12
#include <memory>
13
#include <string>
14
#include <limits.h>
15
16
17
#include <boost/lockfree/spsc_queue.hpp>

typedef struct {
18
	int64_t  value;
19
20
21
	uint64_t timestamp;
} reading_t;

22
23
24
25
26
typedef struct {
  uint64_t value;
  uint64_t timestamp;
} ureading_t;

27
28
class SensorBase {
public:
lu43jih's avatar
lu43jih committed
29
30
	static const size_t QUEUE_MAXLIMIT=1024;

31
32
	SensorBase(const std::string& name) :
		_name(name),
33
		_mqtt(""),
34
		_sinkPath(""),
35
		_skipConstVal(false),
36
		_cacheInterval(900000),
37
38
		_subsamplingFactor(1),
		_subsamplingIndex(0),
39
		_cacheSize(1),
40
		_cacheIndex(0),
41
		_cache(nullptr),
42
		_delta(false),
43
		_firstReading(true),
44
45
		_readingQueue(nullptr),
		_sinkFile(nullptr) {
46

47
48
49
50
	    _lastRawUValue.timestamp = 0;
	    _lastRawUValue.value     = 0;
        _lastRawValue.timestamp = 0;
        _lastRawValue.value     = 0;
51
52
		_latestValue.timestamp	= 0;
		_latestValue.value		= 0;
53
54
        _lastSentValue.timestamp= 0;
        _lastSentValue.value	= 0;
Alessio Netti's avatar
Alessio Netti committed
55
56
        _accumulator.timestamp  = 0;
        _accumulator.value		= 0;
57
58
	}

59
60
61
	SensorBase(const SensorBase& other) :
		_name(other._name),
		_mqtt(other._mqtt),
62
		_skipConstVal(other._skipConstVal),
63
		_cacheInterval(other._cacheInterval),
64
65
		_subsamplingFactor(other._subsamplingFactor),
		_subsamplingIndex(0),
66
		_cacheSize(other._cacheSize),
67
		_cacheIndex(0),
68
		_cache(nullptr),
69
		_delta(other._delta),
70
		_firstReading(true),
71
		_lastRawUValue(other._lastRawUValue),
72
		_lastRawValue(other._lastRawValue),
73
		_latestValue(other._latestValue),
74
        _lastSentValue(other._lastSentValue),
Alessio Netti's avatar
Alessio Netti committed
75
        _accumulator(other._accumulator),
76
77
		_readingQueue(nullptr),
		_sinkFile(nullptr) {}
78
79
80
81
82
83

	virtual ~SensorBase() {}

	SensorBase& operator=(const SensorBase& other) {
		_name = other._name;
		_mqtt = other._mqtt;
84
		_skipConstVal = other._skipConstVal;
85
		_cacheInterval = other._cacheInterval;
86
87
		_subsamplingFactor = other._subsamplingFactor;
		_subsamplingIndex = 0;
88
		_cacheSize = other._cacheSize;
89
		_cacheIndex = 0;
90
		_cache.reset(nullptr);
91
		_delta = other._delta;
92
		_firstReading = true;
93
94
95
96
		_lastRawUValue.timestamp = other._lastRawUValue.timestamp;
        _lastRawUValue.value     = other._lastRawUValue.value;
		_lastRawValue.timestamp = other._lastRawValue.timestamp;
        _lastRawValue.value     = other._lastRawValue.value;
97
98
		_latestValue.timestamp	= other._latestValue.timestamp;
		_latestValue.value		= other._latestValue.value;
99
100
        _lastSentValue.timestamp= other._lastSentValue.timestamp;
        _lastSentValue.value	= other._lastSentValue.value;
Alessio Netti's avatar
Alessio Netti committed
101
102
        _accumulator.timestamp  = other._accumulator.timestamp;
        _accumulator.value      = other._accumulator.value;
103
		_readingQueue.reset(nullptr);
104
		_sinkFile.reset(nullptr);
105
106

		return *this;
107
108
	}

109
	const bool 				isDelta()			const 	{ return _delta;}
110
111
	const std::string& 		getName() 			const	{ return _name; }
	const std::string&		getMqtt() 			const	{ return _mqtt; }
112
	const std::string&		getSinkPath() 		const	{ return _sinkPath; }
113
	bool					getSkipConstVal()	const	{ return _skipConstVal; }
114
	unsigned				getCacheSize()		const	{ return _cacheSize; }
115
	unsigned 				getSubsampling()	const   { return _subsamplingFactor; }
116
	const reading_t * const	getCache() 			const	{ return _cache.get(); }
Micha Mueller's avatar
Micha Mueller committed
117
	const reading_t&		getLatestValue()	const	{ return _latestValue; }
118

119
	void	setSkipConstVal(bool skipConstVal)				{ _skipConstVal = skipConstVal;	}
120
121
122
	void	setDelta(const bool delta) 						{ _delta = delta; }
	void	setName(const std::string& name, int cpuID=-1)	{ _name = formatName(name, cpuID); }
	void	setMqtt(const std::string& mqtt)				{ _mqtt = mqtt; }
123
	void 	setSinkPath(const std::string& path)			{ _sinkPath = path; }
124
	void 	setCacheInterval(unsigned cacheInterval)		{ _cacheInterval = cacheInterval; }
125
	void	setSubsampling(unsigned factor)					{ _subsamplingFactor = factor; }
126
127
	void    setLastRaw(int64_t raw)                         { _lastRawValue.value = raw; }
	void    setLastURaw(uint64_t raw)                       { _lastRawUValue.value = raw; }
128
129
130
131
132

	const std::size_t	getSizeOfReadingQueue() const { return _readingQueue->read_available(); }
	std::size_t 		popReadingQueue(reading_t *reads, std::size_t max) const	{ return _readingQueue->pop(reads, max); }
	void				pushReadingQueue(reading_t *reads, std::size_t count) const	{ _readingQueue->push(reads, count); }

133
134
	void initSensor(unsigned interval) {
		_cacheSize = _cacheInterval / interval + 1;
135
		if(!_cache) {
136
137
			_cache.reset(new reading_t[_cacheSize]);
			for(unsigned i = 0; i < _cacheSize; i++) {
138
139
140
141
				_cache[i] = _latestValue;	//_latestValue should equal (0,0) at this point
			}
		}
		if(!_readingQueue) {
lu43jih's avatar
lu43jih committed
142
			_readingQueue.reset(new boost::lockfree::spsc_queue<reading_t>(QUEUE_MAXLIMIT));
143
		}
144
145
146
147
148
		if(!_sinkFile && _sinkPath != "") {
			_sinkFile.reset(new std::ofstream(_sinkPath));
			if(!_sinkFile->is_open())
				_sinkFile.reset(nullptr);
		}
149
150
	}

151
152
153
154
155
156
157
158
159
160
161
162
163
	/**
	 * Store a reading, in order to get it pushed to the data base eventually.
	 * Also this methods takes care of other optional reading post-processing,
	 * e.g. delta computation, subsampling, caching, scaling, etc.
	 *
	 * This is the primary storeReading() and should be used whenever possible.
	 *
	 * @param rawReading  Reading struct with value and timestamp to be stored.
	 * @param factor      Scaling factor, which is applied to the reading value (optional)
	 * @param maxValue    Maximum possible value of the reading; required for the
	 *                    delta computation to detect an overflow.
	 */
	void storeReading(reading_t rawReading, double factor=1.0, unsigned long long maxValue=LLONG_MAX) {
Michael Ott's avatar
Michael Ott committed
164
		reading_t reading = rawReading;
165
		if( _delta ) {
166
167
168
169
170
171
172
173
174
175
		    if (!_firstReading) {
              if (rawReading.value < _lastRawValue.value)
                  reading.value = (rawReading.value + (maxValue - _lastRawValue.value)) * factor;
              else
                  reading.value = (rawReading.value - _lastRawValue.value) * factor;
		    } else {
		      _firstReading = false;
		      _lastRawValue = rawReading;
		      return;
		    }
Michael Ott's avatar
Michael Ott committed
176
			_lastRawValue = rawReading;
177
178
		}
		else
Michael Ott's avatar
Michael Ott committed
179
			reading.value = rawReading.value * factor;
180

Alessio Netti's avatar
Alessio Netti committed
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
		if( _delta )
			// If in delta mode, _accumulator acts as a buffer, summing all deltas for the subsampling period
		    _accumulator.value += reading.value;
		else
		    _accumulator.value = reading.value;

		if (_subsamplingIndex++ % _subsamplingFactor == 0) {
            _accumulator.timestamp = reading.timestamp;
            //TODO: if sensor starts with values of 0, these won't be pushed. This should be fixed
            if( !(_skipConstVal && (_accumulator.value == _lastSentValue.value)) ) {
                _readingQueue->push(_accumulator);
                _lastSentValue = _accumulator;
            }
            // We reset the accumulator's value for the correct accumulation of deltas
			_accumulator.value = 0;
196
197
		}
		if (_sinkFile) {
Alessio Netti's avatar
Alessio Netti committed
198
199
200
201
		    try {
                _sinkFile->seekp(0, std::ios::beg);
                *_sinkFile << reading.value << std::endl;
            } catch(const std::exception &e) { _sinkFile->close(); _sinkFile.reset(nullptr); }
202
		}
203

Michael Ott's avatar
Michael Ott committed
204
		_cache[_cacheIndex] = reading;
205
		_cacheIndex = (_cacheIndex + 1) % _cacheSize;
Michael Ott's avatar
Michael Ott committed
206
		_latestValue = reading;
207
208
	}

209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
	/**
     * Store an unsigned reading, in order to get it pushed to the data base eventually.
     * Also this methods takes care of other optional reading post-processing,
     * e.g. delta computation, subsampling, caching, scaling, etc.
     *
     * This is a variant of the primary storeReading() for monotonically increasing
     * sensors reading unsigned 64bit values which may require more than the 63bit
     * offered by a signed reading_t. The readings are still stored as signed int64
     * in the database, therefore all such sensors should enable storage of deltas!
     *
     * This variant only adapts the delta computation for ureading_t actually.
     * FIXME: Avoid code duplication
     *
     * @param rawReading  Reading struct with (usigned) value and timestamp to be stored.
     * @param factor      Scaling factor, which is applied to the reading value (optional)
     * @param maxValue    Maximum possible value of the reading; required for the
     *                    delta computation to detect an overflow.
     */
	void storeReading(ureading_t rawReading, double factor=1.0, unsigned long long maxValue=ULLONG_MAX) {
228
229
        reading_t reading;
        reading.timestamp = rawReading.timestamp;
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
        if( _delta ) {
            if (!_firstReading) {
              if (rawReading.value < _lastRawUValue.value)
                  reading.value = (rawReading.value + (maxValue - _lastRawUValue.value)) * factor;
              else
                  reading.value = (rawReading.value - _lastRawUValue.value) * factor;
            } else {
              _firstReading = false;
              _lastRawUValue = rawReading;
              return;
            }
            _lastRawUValue = rawReading;
        }
        else
            reading.value = rawReading.value * factor;

        if( _delta )
            // If in delta mode, _accumulator acts as a buffer, summing all deltas for the subsampling period
            _accumulator.value += reading.value;
        else
            _accumulator.value = reading.value;

        if (_subsamplingIndex++ % _subsamplingFactor == 0) {
            _accumulator.timestamp = reading.timestamp;
            //TODO: if sensor starts with values of 0, these won't be pushed. This should be fixed
            if( !(_skipConstVal && (_accumulator.value == _lastSentValue.value)) ) {
                _readingQueue->push(_accumulator);
                _lastSentValue = _accumulator;
            }
            // We reset the accumulator's value for the correct accumulation of deltas
            _accumulator.value = 0;
        }
        if (_sinkFile) {
            try {
                _sinkFile->seekp(0, std::ios::beg);
                *_sinkFile << reading.value << std::endl;
            } catch(const std::exception &e) { _sinkFile->close(); _sinkFile.reset(nullptr); }
        }

        _cache[_cacheIndex] = reading;
        _cacheIndex = (_cacheIndex + 1) % _cacheSize;
        _latestValue = reading;
    }

Alessio Netti's avatar
Alessio Netti committed
274
275
	static std::string formatName(const std::string& name, int cpuID=-1) {return cpuID<0 ? name : "cpu" + std::to_string(cpuID) + "." + name;}

276
277
278
279
protected:

	std::string _name;
	std::string _mqtt;
280
	std::string _sinkPath;
281
	bool _skipConstVal;
282
	unsigned int _cacheInterval;
283
284
	unsigned int _subsamplingFactor;
	unsigned int _subsamplingIndex;
285
	unsigned int _cacheSize;
286
	unsigned int _cacheIndex;
287
	std::unique_ptr<reading_t[]> _cache;
288
	bool _delta;
289
	bool _firstReading;
290
	ureading_t _lastRawUValue;
291
	reading_t _lastRawValue;
292
	reading_t _latestValue;
293
	reading_t _lastSentValue;
Alessio Netti's avatar
Alessio Netti committed
294
	reading_t _accumulator;
295
	std::unique_ptr<boost::lockfree::spsc_queue<reading_t>> _readingQueue;
296
	std::unique_ptr<std::ofstream> _sinkFile;
297
298
};

299
300
301
//for better readability
using SBasePtr = std::shared_ptr<SensorBase>;

302
#endif /* SRC_SENSORBASE_H_ */