sensorbase.h 11.9 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
/*
 * SensorBase.h
 *
 *  Created on: 09.08.2018
 *      Author: Micha Mueller
 */

#ifndef SRC_SENSORBASE_H_
#define SRC_SENSORBASE_H_

11
#include <fstream>
12
#include <memory>
13
#include <string>
14
#include <limits.h>
15
#include <boost/lockfree/spsc_queue.hpp>
16
#include "logging.h"
17
18

typedef struct {
19
	int64_t  value;
20
21
22
	uint64_t timestamp;
} reading_t;

23
24
25
26
27
typedef struct {
  uint64_t value;
  uint64_t timestamp;
} ureading_t;

28
29
class SensorBase {
public:
lu43jih's avatar
lu43jih committed
30
31
	static const size_t QUEUE_MAXLIMIT=1024;

32
33
	SensorBase(const std::string& name) :
		_name(name),
34
		_mqtt(""),
35
		_sinkPath(""),
36
		_skipConstVal(false),
37
		_cacheInterval(900000),
38
39
		_subsamplingFactor(1),
		_subsamplingIndex(0),
40
		_cacheSize(1),
41
		_cacheIndex(0),
42
		_cache(nullptr),
43
		_delta(false),
44
		_firstReading(true),
45
46
		_readingQueue(nullptr),
		_sinkFile(nullptr) {
47

Alessio Netti's avatar
Alessio Netti committed
48
49
        _lastRawUValue.timestamp = 0;
        _lastRawUValue.value     = 0;
50
51
        _lastRawValue.timestamp = 0;
        _lastRawValue.value     = 0;
Alessio Netti's avatar
Alessio Netti committed
52
53
        _latestValue.timestamp	= 0;
        _latestValue.value		= 0;
54
55
        _lastSentValue.timestamp= 0;
        _lastSentValue.value	= 0;
Alessio Netti's avatar
Alessio Netti committed
56
57
        _accumulator.timestamp  = 0;
        _accumulator.value		= 0;
58
59
	}

60
61
62
	SensorBase(const SensorBase& other) :
		_name(other._name),
		_mqtt(other._mqtt),
63
		_skipConstVal(other._skipConstVal),
64
		_cacheInterval(other._cacheInterval),
65
66
		_subsamplingFactor(other._subsamplingFactor),
		_subsamplingIndex(0),
67
		_cacheSize(other._cacheSize),
68
		_cacheIndex(0),
69
		_cache(nullptr),
70
		_delta(other._delta),
71
		_firstReading(true),
Alessio Netti's avatar
Alessio Netti committed
72
		_lastRawUValue(other._lastRawUValue),
Alessio Netti's avatar
Alessio Netti committed
73
		_lastRawValue(other._lastRawValue),
Alessio Netti's avatar
Alessio Netti committed
74
		_latestValue(other._latestValue),
75
        _lastSentValue(other._lastSentValue),
Alessio Netti's avatar
Alessio Netti committed
76
        _accumulator(other._accumulator),
77
78
		_readingQueue(nullptr),
		_sinkFile(nullptr) {}
79
80
81
82
83
84

	virtual ~SensorBase() {}

	SensorBase& operator=(const SensorBase& other) {
		_name = other._name;
		_mqtt = other._mqtt;
85
		_skipConstVal = other._skipConstVal;
86
		_cacheInterval = other._cacheInterval;
87
88
		_subsamplingFactor = other._subsamplingFactor;
		_subsamplingIndex = 0;
89
		_cacheSize = other._cacheSize;
90
		_cacheIndex = 0;
91
		_cache.reset(nullptr);
92
		_delta = other._delta;
93
		_firstReading = true;
94
95
96
97
		_lastRawUValue.timestamp = other._lastRawUValue.timestamp;
        _lastRawUValue.value     = other._lastRawUValue.value;
		_lastRawValue.timestamp = other._lastRawValue.timestamp;
        _lastRawValue.value     = other._lastRawValue.value;
98
99
		_latestValue.timestamp	= other._latestValue.timestamp;
		_latestValue.value		= other._latestValue.value;
100
101
        _lastSentValue.timestamp= other._lastSentValue.timestamp;
        _lastSentValue.value	= other._lastSentValue.value;
Alessio Netti's avatar
Alessio Netti committed
102
103
        _accumulator.timestamp  = other._accumulator.timestamp;
        _accumulator.value      = other._accumulator.value;
104
		_readingQueue.reset(nullptr);
105
		_sinkFile.reset(nullptr);
106
107

		return *this;
108
109
	}

110
	const bool 				isDelta()			const 	{ return _delta;}
111
112
	const std::string& 		getName() 			const	{ return _name; }
	const std::string&		getMqtt() 			const	{ return _mqtt; }
113
	const std::string&		getSinkPath() 		const	{ return _sinkPath; }
114
	bool					getSkipConstVal()	const	{ return _skipConstVal; }
115
	unsigned				getCacheSize()		const	{ return _cacheSize; }
Alessio Netti's avatar
Alessio Netti committed
116
	unsigned				getCacheInterval()	const	{ return _cacheInterval; }
117
	unsigned 				getSubsampling()	const   { return _subsamplingFactor; }
118
	const reading_t * const	getCache() 			const	{ return _cache.get(); }
Micha Mueller's avatar
Micha Mueller committed
119
	const reading_t&		getLatestValue()	const	{ return _latestValue; }
Alessio Netti's avatar
Alessio Netti committed
120
	const bool				isInit()			const 	{ return _cache && _readingQueue; }
121

122
	void	setSkipConstVal(bool skipConstVal)				{ _skipConstVal = skipConstVal;	}
123
124
125
	void	setDelta(const bool delta) 						{ _delta = delta; }
	void	setName(const std::string& name, int cpuID=-1)	{ _name = formatName(name, cpuID); }
	void	setMqtt(const std::string& mqtt)				{ _mqtt = mqtt; }
126
	void 	setSinkPath(const std::string& path)			{ _sinkPath = path; }
127
	void 	setCacheInterval(unsigned cacheInterval)		{ _cacheInterval = cacheInterval; }
128
	void	setSubsampling(unsigned factor)					{ _subsamplingFactor = factor; }
129
130
	void    setLastRaw(int64_t raw)                         { _lastRawValue.value = raw; }
	void    setLastURaw(uint64_t raw)                       { _lastRawUValue.value = raw; }
131
132
133

	const std::size_t	getSizeOfReadingQueue() const { return _readingQueue->read_available(); }
	std::size_t 		popReadingQueue(reading_t *reads, std::size_t max) const	{ return _readingQueue->pop(reads, max); }
Alessio Netti's avatar
Alessio Netti committed
134
	void 				clearReadingQueue() const	{ reading_t buf; while(_readingQueue->pop(buf)) {} }
135
136
	void				pushReadingQueue(reading_t *reads, std::size_t count) const	{ _readingQueue->push(reads, count); }

137
138
	void initSensor(unsigned interval) {
		_cacheSize = _cacheInterval / interval + 1;
139
		if(!_cache) {
140
141
			_cache.reset(new reading_t[_cacheSize]);
			for(unsigned i = 0; i < _cacheSize; i++) {
142
143
144
145
				_cache[i] = _latestValue;	//_latestValue should equal (0,0) at this point
			}
		}
		if(!_readingQueue) {
lu43jih's avatar
lu43jih committed
146
			_readingQueue.reset(new boost::lockfree::spsc_queue<reading_t>(QUEUE_MAXLIMIT));
147
		}
148
149
150
151
152
		if(!_sinkFile && _sinkPath != "") {
			_sinkFile.reset(new std::ofstream(_sinkPath));
			if(!_sinkFile->is_open())
				_sinkFile.reset(nullptr);
		}
153
154
	}

155
156
157
158
159
160
161
162
163
164
165
166
	/**
	 * Store a reading, in order to get it pushed to the data base eventually.
	 * Also this methods takes care of other optional reading post-processing,
	 * e.g. delta computation, subsampling, caching, scaling, etc.
	 *
	 * This is the primary storeReading() and should be used whenever possible.
	 *
	 * @param rawReading  Reading struct with value and timestamp to be stored.
	 * @param factor      Scaling factor, which is applied to the reading value (optional)
	 * @param maxValue    Maximum possible value of the reading; required for the
	 *                    delta computation to detect an overflow.
	 */
Micha Mueller's avatar
Micha Mueller committed
167
	void storeReading(reading_t rawReading, double factor=1.0, long long maxValue=LLONG_MAX) {
Michael Ott's avatar
Michael Ott committed
168
		reading_t reading = rawReading;
169
		if( _delta ) {
170
171
172
173
174
175
176
177
178
179
		    if (!_firstReading) {
              if (rawReading.value < _lastRawValue.value)
                  reading.value = (rawReading.value + (maxValue - _lastRawValue.value)) * factor;
              else
                  reading.value = (rawReading.value - _lastRawValue.value) * factor;
		    } else {
		      _firstReading = false;
		      _lastRawValue = rawReading;
		      return;
		    }
Michael Ott's avatar
Michael Ott committed
180
			_lastRawValue = rawReading;
181
182
		}
		else
Michael Ott's avatar
Michael Ott committed
183
			reading.value = rawReading.value * factor;
184

Alessio Netti's avatar
Alessio Netti committed
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
		if( _delta )
			// If in delta mode, _accumulator acts as a buffer, summing all deltas for the subsampling period
		    _accumulator.value += reading.value;
		else
		    _accumulator.value = reading.value;

		if (_subsamplingIndex++ % _subsamplingFactor == 0) {
            _accumulator.timestamp = reading.timestamp;
            //TODO: if sensor starts with values of 0, these won't be pushed. This should be fixed
            if( !(_skipConstVal && (_accumulator.value == _lastSentValue.value)) ) {
                _readingQueue->push(_accumulator);
                _lastSentValue = _accumulator;
            }
            // We reset the accumulator's value for the correct accumulation of deltas
			_accumulator.value = 0;
200
201
		}
		if (_sinkFile) {
Alessio Netti's avatar
Alessio Netti committed
202
203
204
205
		    try {
                _sinkFile->seekp(0, std::ios::beg);
                *_sinkFile << reading.value << std::endl;
            } catch(const std::exception &e) { _sinkFile->close(); _sinkFile.reset(nullptr); }
206
		}
207

Michael Ott's avatar
Michael Ott committed
208
		_cache[_cacheIndex] = reading;
209
		_cacheIndex = (_cacheIndex + 1) % _cacheSize;
Michael Ott's avatar
Michael Ott committed
210
		_latestValue = reading;
211
212
	}

213
214
215
216
	int64_t getCacheOffset(int64_t t) {
		if( t < 0)
			return -1;
		// Converting from milliseconds to nanoseconds
Alessio Netti's avatar
Alessio Netti committed
217
		int64_t offset = ( ( (int64_t)_cacheSize * t ) / ( (int64_t)_cacheInterval * 1000000 ) ) + 1;
218
219
220
221
222
		if(offset > _cacheSize)
			return -1;
		return ( _cacheSize + _cacheIndex - offset ) % _cacheSize;
	}

223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
	/**
     * Store an unsigned reading, in order to get it pushed to the data base eventually.
     * Also this methods takes care of other optional reading post-processing,
     * e.g. delta computation, subsampling, caching, scaling, etc.
     *
     * This is a variant of the primary storeReading() for monotonically increasing
     * sensors reading unsigned 64bit values which may require more than the 63bit
     * offered by a signed reading_t. The readings are still stored as signed int64
     * in the database, therefore all such sensors should enable storage of deltas!
     *
     * This variant only adapts the delta computation for ureading_t actually.
     * FIXME: Avoid code duplication
     *
     * @param rawReading  Reading struct with (usigned) value and timestamp to be stored.
     * @param factor      Scaling factor, which is applied to the reading value (optional)
     * @param maxValue    Maximum possible value of the reading; required for the
     *                    delta computation to detect an overflow.
     */
	void storeReading(ureading_t rawReading, double factor=1.0, unsigned long long maxValue=ULLONG_MAX) {
242
243
        reading_t reading;
        reading.timestamp = rawReading.timestamp;
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
        if( _delta ) {
            if (!_firstReading) {
              if (rawReading.value < _lastRawUValue.value)
                  reading.value = (rawReading.value + (maxValue - _lastRawUValue.value)) * factor;
              else
                  reading.value = (rawReading.value - _lastRawUValue.value) * factor;
            } else {
              _firstReading = false;
              _lastRawUValue = rawReading;
              return;
            }
            _lastRawUValue = rawReading;
        }
        else
            reading.value = rawReading.value * factor;

        if( _delta )
            // If in delta mode, _accumulator acts as a buffer, summing all deltas for the subsampling period
            _accumulator.value += reading.value;
        else
            _accumulator.value = reading.value;

        if (_subsamplingIndex++ % _subsamplingFactor == 0) {
            _accumulator.timestamp = reading.timestamp;
            //TODO: if sensor starts with values of 0, these won't be pushed. This should be fixed
            if( !(_skipConstVal && (_accumulator.value == _lastSentValue.value)) ) {
                _readingQueue->push(_accumulator);
                _lastSentValue = _accumulator;
            }
            // We reset the accumulator's value for the correct accumulation of deltas
            _accumulator.value = 0;
        }
        if (_sinkFile) {
            try {
                _sinkFile->seekp(0, std::ios::beg);
                *_sinkFile << reading.value << std::endl;
            } catch(const std::exception &e) { _sinkFile->close(); _sinkFile.reset(nullptr); }
        }

        _cache[_cacheIndex] = reading;
        _cacheIndex = (_cacheIndex + 1) % _cacheSize;
        _latestValue = reading;
    }

Alessio Netti's avatar
Alessio Netti committed
288
289
	static std::string formatName(const std::string& name, int cpuID=-1) {return cpuID<0 ? name : "cpu" + std::to_string(cpuID) + "." + name;}

290
	virtual void printConfig(LOG_LEVEL ll, LOGGER& lg) {
Micha Mueller's avatar
Micha Mueller committed
291
292
293
294
295
296
297
298
299
	  LOG_VAR(ll) << "    Sensor: " << _name;
	  LOG_VAR(ll) << "     MQTT Topic:  " << _mqtt;
	  LOG_VAR(ll) << "     sink:        " << getSinkPath();
      LOG_VAR(ll) << "     subSampling: " << getSubsampling();
      if(_skipConstVal) {
        LOG_VAR(ll) << "     Skipping constant values";
      } else {
        LOG_VAR(ll) << "     No skipping of constant values";
      }
300

Micha Mueller's avatar
Micha Mueller committed
301
302
      if(_delta) {
        LOG_VAR(ll) << "     Storing delta readings";
303
      } else {
Micha Mueller's avatar
Micha Mueller committed
304
        LOG_VAR(ll) << "     Storing absolute readings";
305
306
307
      }
	}

308
309
310
311
protected:

	std::string _name;
	std::string _mqtt;
312
	std::string _sinkPath;
313
	bool _skipConstVal;
314
	unsigned int _cacheInterval;
315
316
	unsigned int _subsamplingFactor;
	unsigned int _subsamplingIndex;
317
	unsigned int _cacheSize;
318
	unsigned int _cacheIndex;
319
	std::unique_ptr<reading_t[]> _cache;
320
	bool _delta;
321
	bool _firstReading;
322
	ureading_t _lastRawUValue;
323
	reading_t _lastRawValue;
324
	reading_t _latestValue;
325
	reading_t _lastSentValue;
Alessio Netti's avatar
Alessio Netti committed
326
	reading_t _accumulator;
327
	std::unique_ptr<boost::lockfree::spsc_queue<reading_t>> _readingQueue;
328
	std::unique_ptr<std::ofstream> _sinkFile;
329
330
};

331
332
333
//for better readability
using SBasePtr = std::shared_ptr<SensorBase>;

334
#endif /* SRC_SENSORBASE_H_ */