SensorBase.h 10.5 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
/*
 * SensorBase.h
 *
 *  Created on: 09.08.2018
 *      Author: Micha Mueller
 */

#ifndef SRC_SENSORBASE_H_
#define SRC_SENSORBASE_H_

11
#include <fstream>
12
#include <memory>
13
#include <string>
14
#include <limits.h>
15
16
17
#include <boost/lockfree/spsc_queue.hpp>

typedef struct {
18
	int64_t  value;
19
20
21
	uint64_t timestamp;
} reading_t;

22
23
24
25
26
typedef struct {
  uint64_t value;
  uint64_t timestamp;
} ureading_t;

27
28
class SensorBase {
public:
lu43jih's avatar
lu43jih committed
29
30
	static const size_t QUEUE_MAXLIMIT=1024;

31
32
	SensorBase(const std::string& name) :
		_name(name),
33
		_mqtt(""),
34
		_sinkPath(""),
35
		_skipConstVal(false),
36
		_cacheInterval(900000),
37
38
		_subsamplingFactor(1),
		_subsamplingIndex(0),
39
		_cacheSize(1),
40
		_cacheIndex(0),
41
		_cache(nullptr),
42
		_delta(false),
43
		_firstReading(true),
44
45
		_readingQueue(nullptr),
		_sinkFile(nullptr) {
46
47
48

		_latestValue.timestamp	= 0;
		_latestValue.value		= 0;
49
50
        _lastSentValue.timestamp= 0;
        _lastSentValue.value	= 0;
51
52
		_lastRawValue.timestamp = 0;
		_lastRawValue.value		= 0;
Alessio Netti's avatar
Alessio Netti committed
53
54
        _accumulator.timestamp  = 0;
        _accumulator.value		= 0;
55
56
	}

57
58
59
	SensorBase(const SensorBase& other) :
		_name(other._name),
		_mqtt(other._mqtt),
60
		_skipConstVal(other._skipConstVal),
61
		_cacheInterval(other._cacheInterval),
62
63
		_subsamplingFactor(other._subsamplingFactor),
		_subsamplingIndex(0),
64
		_cacheSize(other._cacheSize),
65
		_cacheIndex(0),
66
		_cache(nullptr),
67
		_delta(other._delta),
68
		_firstReading(true),
69
		_latestValue(other._latestValue),
70
		_lastRawValue(other._lastRawValue),
71
        _lastSentValue(other._lastSentValue),
Alessio Netti's avatar
Alessio Netti committed
72
        _accumulator(other._accumulator),
73
74
		_readingQueue(nullptr),
		_sinkFile(nullptr) {}
75
76
77
78
79
80

	virtual ~SensorBase() {}

	SensorBase& operator=(const SensorBase& other) {
		_name = other._name;
		_mqtt = other._mqtt;
81
		_skipConstVal = other._skipConstVal;
82
		_cacheInterval = other._cacheInterval;
83
84
		_subsamplingFactor = other._subsamplingFactor;
		_subsamplingIndex = 0;
85
		_cacheSize = other._cacheSize;
86
		_cacheIndex = 0;
87
		_cache.reset(nullptr);
88
		_delta = other._delta;
89
		_firstReading = true;
90
91
		_latestValue.timestamp	= other._latestValue.timestamp;
		_latestValue.value		= other._latestValue.value;
92
93
		_lastRawValue.timestamp = other._lastRawValue.timestamp;
		_lastRawValue.value		= other._lastRawValue.value;
94
95
        _lastSentValue.timestamp= other._lastSentValue.timestamp;
        _lastSentValue.value	= other._lastSentValue.value;
Alessio Netti's avatar
Alessio Netti committed
96
97
        _accumulator.timestamp  = other._accumulator.timestamp;
        _accumulator.value      = other._accumulator.value;
98
		_readingQueue.reset(nullptr);
99
		_sinkFile.reset(nullptr);
100
101

		return *this;
102
103
	}

104
	const bool 				isDelta()			const 	{ return _delta;}
105
106
	const std::string& 		getName() 			const	{ return _name; }
	const std::string&		getMqtt() 			const	{ return _mqtt; }
107
	const std::string&		getSinkPath() 		const	{ return _sinkPath; }
108
	bool					getSkipConstVal()	const	{ return _skipConstVal; }
109
	unsigned				getCacheSize()		const	{ return _cacheSize; }
110
	unsigned 				getSubsampling()	const   { return _subsamplingFactor; }
111
	const reading_t * const	getCache() 			const	{ return _cache.get(); }
Micha Mueller's avatar
Micha Mueller committed
112
	const reading_t&		getLatestValue()	const	{ return _latestValue; }
113

114
	void	setSkipConstVal(bool skipConstVal)				{ _skipConstVal = skipConstVal;	}
115
116
117
	void	setDelta(const bool delta) 						{ _delta = delta; }
	void	setName(const std::string& name, int cpuID=-1)	{ _name = formatName(name, cpuID); }
	void	setMqtt(const std::string& mqtt)				{ _mqtt = mqtt; }
118
	void 	setSinkPath(const std::string& path)			{ _sinkPath = path; }
119
	void 	setCacheInterval(unsigned cacheInterval)		{ _cacheInterval = cacheInterval; }
120
	void	setSubsampling(unsigned factor)					{ _subsamplingFactor = factor; }
121
122
	void    setLastRaw(int64_t raw)                         { _lastRawValue.value = raw; }
	void    setLastURaw(uint64_t raw)                       { _lastRawUValue.value = raw; }
123
124
125
126
127

	const std::size_t	getSizeOfReadingQueue() const { return _readingQueue->read_available(); }
	std::size_t 		popReadingQueue(reading_t *reads, std::size_t max) const	{ return _readingQueue->pop(reads, max); }
	void				pushReadingQueue(reading_t *reads, std::size_t count) const	{ _readingQueue->push(reads, count); }

128
129
	void initSensor(unsigned interval) {
		_cacheSize = _cacheInterval / interval + 1;
130
		if(!_cache) {
131
132
			_cache.reset(new reading_t[_cacheSize]);
			for(unsigned i = 0; i < _cacheSize; i++) {
133
134
135
136
				_cache[i] = _latestValue;	//_latestValue should equal (0,0) at this point
			}
		}
		if(!_readingQueue) {
lu43jih's avatar
lu43jih committed
137
			_readingQueue.reset(new boost::lockfree::spsc_queue<reading_t>(QUEUE_MAXLIMIT));
138
		}
139
140
141
142
143
		if(!_sinkFile && _sinkPath != "") {
			_sinkFile.reset(new std::ofstream(_sinkPath));
			if(!_sinkFile->is_open())
				_sinkFile.reset(nullptr);
		}
144
145
	}

146
147
148
149
150
151
152
153
154
155
156
157
158
	/**
	 * Store a reading, in order to get it pushed to the data base eventually.
	 * Also this methods takes care of other optional reading post-processing,
	 * e.g. delta computation, subsampling, caching, scaling, etc.
	 *
	 * This is the primary storeReading() and should be used whenever possible.
	 *
	 * @param rawReading  Reading struct with value and timestamp to be stored.
	 * @param factor      Scaling factor, which is applied to the reading value (optional)
	 * @param maxValue    Maximum possible value of the reading; required for the
	 *                    delta computation to detect an overflow.
	 */
	void storeReading(reading_t rawReading, double factor=1.0, unsigned long long maxValue=LLONG_MAX) {
Michael Ott's avatar
Michael Ott committed
159
		reading_t reading = rawReading;
160
		if( _delta ) {
161
162
163
164
165
166
167
168
169
170
		    if (!_firstReading) {
              if (rawReading.value < _lastRawValue.value)
                  reading.value = (rawReading.value + (maxValue - _lastRawValue.value)) * factor;
              else
                  reading.value = (rawReading.value - _lastRawValue.value) * factor;
		    } else {
		      _firstReading = false;
		      _lastRawValue = rawReading;
		      return;
		    }
Michael Ott's avatar
Michael Ott committed
171
			_lastRawValue = rawReading;
172
173
		}
		else
Michael Ott's avatar
Michael Ott committed
174
			reading.value = rawReading.value * factor;
175

Alessio Netti's avatar
Alessio Netti committed
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
		if( _delta )
			// If in delta mode, _accumulator acts as a buffer, summing all deltas for the subsampling period
		    _accumulator.value += reading.value;
		else
		    _accumulator.value = reading.value;

		if (_subsamplingIndex++ % _subsamplingFactor == 0) {
            _accumulator.timestamp = reading.timestamp;
            //TODO: if sensor starts with values of 0, these won't be pushed. This should be fixed
            if( !(_skipConstVal && (_accumulator.value == _lastSentValue.value)) ) {
                _readingQueue->push(_accumulator);
                _lastSentValue = _accumulator;
            }
            // We reset the accumulator's value for the correct accumulation of deltas
			_accumulator.value = 0;
191
192
		}
		if (_sinkFile) {
Alessio Netti's avatar
Alessio Netti committed
193
194
195
196
		    try {
                _sinkFile->seekp(0, std::ios::beg);
                *_sinkFile << reading.value << std::endl;
            } catch(const std::exception &e) { _sinkFile->close(); _sinkFile.reset(nullptr); }
197
		}
198

Michael Ott's avatar
Michael Ott committed
199
		_cache[_cacheIndex] = reading;
200
		_cacheIndex = (_cacheIndex + 1) % _cacheSize;
Michael Ott's avatar
Michael Ott committed
201
		_latestValue = reading;
202
203
	}

204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
	/**
     * Store an unsigned reading, in order to get it pushed to the data base eventually.
     * Also this methods takes care of other optional reading post-processing,
     * e.g. delta computation, subsampling, caching, scaling, etc.
     *
     * This is a variant of the primary storeReading() for monotonically increasing
     * sensors reading unsigned 64bit values which may require more than the 63bit
     * offered by a signed reading_t. The readings are still stored as signed int64
     * in the database, therefore all such sensors should enable storage of deltas!
     *
     * This variant only adapts the delta computation for ureading_t actually.
     * FIXME: Avoid code duplication
     *
     * @param rawReading  Reading struct with (usigned) value and timestamp to be stored.
     * @param factor      Scaling factor, which is applied to the reading value (optional)
     * @param maxValue    Maximum possible value of the reading; required for the
     *                    delta computation to detect an overflow.
     */
	void storeReading(ureading_t rawReading, double factor=1.0, unsigned long long maxValue=ULLONG_MAX) {
        reading_t reading = rawReading;
        if( _delta ) {
            if (!_firstReading) {
              if (rawReading.value < _lastRawUValue.value)
                  reading.value = (rawReading.value + (maxValue - _lastRawUValue.value)) * factor;
              else
                  reading.value = (rawReading.value - _lastRawUValue.value) * factor;
            } else {
              _firstReading = false;
              _lastRawUValue = rawReading;
              return;
            }
            _lastRawUValue = rawReading;
        }
        else
            reading.value = rawReading.value * factor;

        if( _delta )
            // If in delta mode, _accumulator acts as a buffer, summing all deltas for the subsampling period
            _accumulator.value += reading.value;
        else
            _accumulator.value = reading.value;

        if (_subsamplingIndex++ % _subsamplingFactor == 0) {
            _accumulator.timestamp = reading.timestamp;
            //TODO: if sensor starts with values of 0, these won't be pushed. This should be fixed
            if( !(_skipConstVal && (_accumulator.value == _lastSentValue.value)) ) {
                _readingQueue->push(_accumulator);
                _lastSentValue = _accumulator;
            }
            // We reset the accumulator's value for the correct accumulation of deltas
            _accumulator.value = 0;
        }
        if (_sinkFile) {
            try {
                _sinkFile->seekp(0, std::ios::beg);
                *_sinkFile << reading.value << std::endl;
            } catch(const std::exception &e) { _sinkFile->close(); _sinkFile.reset(nullptr); }
        }

        _cache[_cacheIndex] = reading;
        _cacheIndex = (_cacheIndex + 1) % _cacheSize;
        _latestValue = reading;
    }

Alessio Netti's avatar
Alessio Netti committed
268
269
	static std::string formatName(const std::string& name, int cpuID=-1) {return cpuID<0 ? name : "cpu" + std::to_string(cpuID) + "." + name;}

270
271
272
273
protected:

	std::string _name;
	std::string _mqtt;
274
	std::string _sinkPath;
275
	bool _skipConstVal;
276
	unsigned int _cacheInterval;
277
278
	unsigned int _subsamplingFactor;
	unsigned int _subsamplingIndex;
279
	unsigned int _cacheSize;
280
	unsigned int _cacheIndex;
281
	std::unique_ptr<reading_t[]> _cache;
282
	bool _delta;
283
	bool _firstReading;
284
	ureading_t _lastRawUValue;
285
	reading_t _lastRawValue;
286
	reading_t _latestValue;
287
	reading_t _lastSentValue;
Alessio Netti's avatar
Alessio Netti committed
288
	reading_t _accumulator;
289
	std::unique_ptr<boost::lockfree::spsc_queue<reading_t>> _readingQueue;
290
	std::unique_ptr<std::ofstream> _sinkFile;
291
292
};

293
294
295
//for better readability
using SBasePtr = std::shared_ptr<SensorBase>;

296
#endif /* SRC_SENSORBASE_H_ */