sensorbase.h 12.3 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//================================================================================
// Name        : SensorBase.h
// Author      : Micha Mueller
// Copyright   : Leibniz Supercomputing Centre
// Description : General sensor base class.
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2018-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
26
27
28
29

#ifndef SRC_SENSORBASE_H_
#define SRC_SENSORBASE_H_

30
#include <fstream>
31
#include <memory>
32
#include <string>
33
#include <limits.h>
34
#include <boost/lockfree/spsc_queue.hpp>
35
#include "logging.h"
Alessio Netti's avatar
Alessio Netti committed
36
#include "cacheentry.h"
37

38
39
class SensorBase {
public:
lu43jih's avatar
lu43jih committed
40
41
	static const size_t QUEUE_MAXLIMIT=1024;

42
43
	SensorBase(const std::string& name) :
		_name(name),
44
		_mqtt(""),
45
		_sinkPath(""),
46
		_skipConstVal(false),
47
		_cacheInterval(900000),
48
49
		_subsamplingFactor(1),
		_subsamplingIndex(0),
50
		_cache(nullptr),
51
		_delta(false),
52
		_firstReading(true),
53
54
		_readingQueue(nullptr),
		_sinkFile(nullptr) {
55

Alessio Netti's avatar
Alessio Netti committed
56
57
        _lastRawUValue.timestamp = 0;
        _lastRawUValue.value     = 0;
58
59
        _lastRawValue.timestamp = 0;
        _lastRawValue.value     = 0;
Alessio Netti's avatar
Alessio Netti committed
60
61
        _latestValue.timestamp	= 0;
        _latestValue.value		= 0;
62
63
        _lastSentValue.timestamp= 0;
        _lastSentValue.value	= 0;
Alessio Netti's avatar
Alessio Netti committed
64
65
        _accumulator.timestamp  = 0;
        _accumulator.value		= 0;
66
67
	}

68
69
70
	SensorBase(const SensorBase& other) :
		_name(other._name),
		_mqtt(other._mqtt),
71
		_sinkPath(other._sinkPath),
72
		_skipConstVal(other._skipConstVal),
73
		_cacheInterval(other._cacheInterval),
74
75
		_subsamplingFactor(other._subsamplingFactor),
		_subsamplingIndex(0),
76
		_cache(nullptr),
77
		_delta(other._delta),
78
		_firstReading(true),
Alessio Netti's avatar
Alessio Netti committed
79
		_lastRawUValue(other._lastRawUValue),
Alessio Netti's avatar
Alessio Netti committed
80
		_lastRawValue(other._lastRawValue),
Alessio Netti's avatar
Alessio Netti committed
81
		_latestValue(other._latestValue),
82
        _lastSentValue(other._lastSentValue),
Alessio Netti's avatar
Alessio Netti committed
83
        _accumulator(other._accumulator),
84
85
		_readingQueue(nullptr),
		_sinkFile(nullptr) {}
86

87
	virtual ~SensorBase() { if(_sinkFile) _sinkFile->close(); }
88
89
90
91

	SensorBase& operator=(const SensorBase& other) {
		_name = other._name;
		_mqtt = other._mqtt;
92
		_sinkPath = other._sinkPath;
93
		_skipConstVal = other._skipConstVal;
94
		_cacheInterval = other._cacheInterval;
95
96
		_subsamplingFactor = other._subsamplingFactor;
		_subsamplingIndex = 0;
97
		_cache.reset(nullptr);
98
		_delta = other._delta;
99
		_firstReading = true;
100
101
102
103
		_lastRawUValue.timestamp = other._lastRawUValue.timestamp;
        _lastRawUValue.value     = other._lastRawUValue.value;
		_lastRawValue.timestamp = other._lastRawValue.timestamp;
        _lastRawValue.value     = other._lastRawValue.value;
104
105
		_latestValue.timestamp	= other._latestValue.timestamp;
		_latestValue.value		= other._latestValue.value;
106
107
        _lastSentValue.timestamp= other._lastSentValue.timestamp;
        _lastSentValue.value	= other._lastSentValue.value;
Alessio Netti's avatar
Alessio Netti committed
108
109
        _accumulator.timestamp  = other._accumulator.timestamp;
        _accumulator.value      = other._accumulator.value;
110
		_readingQueue.reset(nullptr);
111
		_sinkFile.reset(nullptr);
112
113

		return *this;
114
115
	}

116
	const bool 				isDelta()			const 	{ return _delta;}
117
118
	const std::string& 		getName() 			const	{ return _name; }
	const std::string&		getMqtt() 			const	{ return _mqtt; }
119
	const std::string&		getSinkPath() 		const	{ return _sinkPath; }
120
	bool					getSkipConstVal()	const	{ return _skipConstVal; }
Alessio Netti's avatar
Alessio Netti committed
121
	unsigned				getCacheInterval()	const	{ return _cacheInterval; }
122
	unsigned 				getSubsampling()	const   { return _subsamplingFactor; }
Alessio Netti's avatar
Alessio Netti committed
123
	const CacheEntry* const	getCache() 			const	{ return _cache.get(); }
Micha Mueller's avatar
Micha Mueller committed
124
	const reading_t&		getLatestValue()	const	{ return _latestValue; }
Alessio Netti's avatar
Alessio Netti committed
125
	const bool				isInit()			const 	{ return _cache && _readingQueue; }
126

127
128
129
	// Exposing the reading queue is necessary for publishing sensor data from the collectagent
	boost::lockfree::spsc_queue<reading_t>* getReadingQueue() { return _readingQueue.get(); }

130
	void	setSkipConstVal(bool skipConstVal)				{ _skipConstVal = skipConstVal;	}
131
	void	setDelta(const bool delta) 						{ _delta = delta; }
132
	void	setName(const std::string& name)				{ _name = name; }
133
	void	setMqtt(const std::string& mqtt)				{ _mqtt = mqtt; }
134
	void 	setSinkPath(const std::string& path)			{ _sinkPath = path; }
135
	void 	setCacheInterval(unsigned cacheInterval)		{ _cacheInterval = cacheInterval; }
136
	void	setSubsampling(unsigned factor)					{ _subsamplingFactor = factor; }
137
138
	void    setLastRaw(int64_t raw)                         { _lastRawValue.value = raw; }
	void    setLastURaw(uint64_t raw)                       { _lastRawUValue.value = raw; }
139
140
141

	const std::size_t	getSizeOfReadingQueue() const { return _readingQueue->read_available(); }
	std::size_t 		popReadingQueue(reading_t *reads, std::size_t max) const	{ return _readingQueue->pop(reads, max); }
Alessio Netti's avatar
Alessio Netti committed
142
	void 				clearReadingQueue() const	{ reading_t buf; while(_readingQueue->pop(buf)) {} }
143
144
	void				pushReadingQueue(reading_t *reads, std::size_t count) const	{ _readingQueue->push(reads, count); }

145
	void initSensor(unsigned interval) {
Alessio Netti's avatar
Alessio Netti committed
146
		uint64_t cacheSize = _cacheInterval / interval + 1;
147
		if(!_cache) {
Alessio Netti's avatar
Alessio Netti committed
148
149
			//TODO: have all time-related configuration parameters use the same unit (e.g. milliseconds)
			_cache.reset(new CacheEntry( (uint64_t)_cacheInterval * 1000000, cacheSize));
150
			_cache->updateBatchSize(1, true);
151
152
		}
		if(!_readingQueue) {
lu43jih's avatar
lu43jih committed
153
			_readingQueue.reset(new boost::lockfree::spsc_queue<reading_t>(QUEUE_MAXLIMIT));
154
		}
155
156
157
158
159
		if(!_sinkFile && _sinkPath != "") {
			_sinkFile.reset(new std::ofstream(_sinkPath));
			if(!_sinkFile->is_open())
				_sinkFile.reset(nullptr);
		}
160
161
	}

162
163
	/**
	 * Store a reading, in order to get it pushed to the data base eventually.
164
	 * Also this method takes care of other optional reading post-processing,
165
166
167
168
169
170
171
172
173
	 * e.g. delta computation, subsampling, caching, scaling, etc.
	 *
	 * This is the primary storeReading() and should be used whenever possible.
	 *
	 * @param rawReading  Reading struct with value and timestamp to be stored.
	 * @param factor      Scaling factor, which is applied to the reading value (optional)
	 * @param maxValue    Maximum possible value of the reading; required for the
	 *                    delta computation to detect an overflow.
	 */
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
	void storeReading(reading_t rawReading, double factor=1.0, long long maxValue=LLONG_MAX, bool storeGlobal=true) {
        reading_t reading = rawReading;
        if( _delta ) {
            if (!_firstReading) {
                if (rawReading.value < _lastRawValue.value)
                    reading.value = (rawReading.value + (maxValue - _lastRawValue.value)) * factor;
                else
                    reading.value = (rawReading.value - _lastRawValue.value) * factor;
            } else {
                _firstReading = false;
                _lastRawValue = rawReading;
                return;
            }
            _lastRawValue = rawReading;
        }
        else
            reading.value = rawReading.value * factor;
191

192
193
194
195
		storeReadingLocal(reading);
		if (storeGlobal) {
		    storeReadingGlobal(reading);
		}
196
197
	}

198
199
	/**
     * Store an unsigned reading, in order to get it pushed to the data base eventually.
200
     * Also this method takes care of other optional reading post-processing,
201
202
203
204
205
206
207
208
209
210
211
212
213
214
     * e.g. delta computation, subsampling, caching, scaling, etc.
     *
     * This is a variant of the primary storeReading() for monotonically increasing
     * sensors reading unsigned 64bit values which may require more than the 63bit
     * offered by a signed reading_t. The readings are still stored as signed int64
     * in the database, therefore all such sensors should enable storage of deltas!
     *
     * This variant only adapts the delta computation for ureading_t actually.
     *
     * @param rawReading  Reading struct with (usigned) value and timestamp to be stored.
     * @param factor      Scaling factor, which is applied to the reading value (optional)
     * @param maxValue    Maximum possible value of the reading; required for the
     *                    delta computation to detect an overflow.
     */
215
	void storeReading(ureading_t rawReading, double factor=1.0, unsigned long long maxValue=ULLONG_MAX, bool storeGlobal=true) {
216
217
        reading_t reading;
        reading.timestamp = rawReading.timestamp;
218
219
        if( _delta ) {
            if (!_firstReading) {
220
221
222
223
                if (rawReading.value < _lastRawUValue.value)
                    reading.value = (rawReading.value + (maxValue - _lastRawUValue.value)) * factor;
                else
                    reading.value = (rawReading.value - _lastRawUValue.value) * factor;
224
            } else {
225
226
227
                _firstReading = false;
                _lastRawUValue = rawReading;
                return;
228
229
230
231
232
233
            }
            _lastRawUValue = rawReading;
        }
        else
            reading.value = rawReading.value * factor;

234
235
236
237
        storeReadingLocal(reading);
        if (storeGlobal) {
            storeReadingGlobal(reading);
        }
238
239
    }

240
241
242
243
244
245
246
    /**
     * Store reading within the sensor, but do not put it in the readingQueue
     * so the reading does not get pushed but the caches are still updated.
     */
    inline
    void storeReadingLocal(reading_t reading) {
        if (_sinkFile) {
247
248
249
250
251
252
253
254
            try {
                _sinkFile->seekp(0, std::ios::beg);
                *_sinkFile << reading.value << std::endl;
            } catch(const std::exception &e) { _sinkFile->close(); _sinkFile.reset(nullptr); }
        }

        _cache->store(reading);
        _latestValue = reading;
255
    }
256

257
258
259
260
261
262
    /**
     * Store the reading in the readingQueue so it can get pushed.
     */
    inline
    void storeReadingGlobal(reading_t reading) {
        if( _delta )
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
            // If in delta mode, _accumulator acts as a buffer, summing all deltas for the subsampling period
            _accumulator.value += reading.value;
        else
            _accumulator.value = reading.value;

        if (_subsamplingIndex++ % _subsamplingFactor == 0) {
            _accumulator.timestamp = reading.timestamp;
            //TODO: if sensor starts with values of 0, these won't be pushed. This should be fixed
            if( !(_skipConstVal && (_accumulator.value == _lastSentValue.value)) ) {
                _readingQueue->push(_accumulator);
                _lastSentValue = _accumulator;
            }
            // We reset the accumulator's value for the correct accumulation of deltas
            _accumulator.value = 0;
        }
278
    }
279
    
280
281
282
	virtual void printConfig(LOG_LEVEL ll, LOGGER& lg, unsigned leadingSpaces=16) {
	  std::string leading(leadingSpaces, ' ');
	  LOG_VAR(ll) << leading << "Sensor: " << _name;
Alessio Netti's avatar
Alessio Netti committed
283
	  //LOG_VAR(ll) << leading << "    MQTT Topic:  " << _mqtt;
284
285
286
287
	  LOG_VAR(ll) << leading << "    Sink:        " << (getSinkPath() != "" ? getSinkPath() : "none");
      LOG_VAR(ll) << leading << "    SubSampling: " << getSubsampling();
      LOG_VAR(ll) << leading << (_skipConstVal ? "    Skipping constant values" : "    No skipping of constant values");
      LOG_VAR(ll) << leading << (_delta ? "    Storing delta readings" : "    Storing absolute readings");
288
289
	}

290
291
protected:

292
293
	std::string _name;
	std::string _mqtt;
294
	std::string _sinkPath;
295
	bool _skipConstVal;
296
	unsigned int _cacheInterval;
297
298
	unsigned int _subsamplingFactor;
	unsigned int _subsamplingIndex;
Alessio Netti's avatar
Alessio Netti committed
299
	std::unique_ptr<CacheEntry> _cache;
300
	bool _delta;
301
	bool _firstReading;
302
	ureading_t _lastRawUValue;
303
	reading_t _lastRawValue;
304
	reading_t _latestValue;
305
	reading_t _lastSentValue;
Alessio Netti's avatar
Alessio Netti committed
306
	reading_t _accumulator;
307
	std::unique_ptr<boost::lockfree::spsc_queue<reading_t>> _readingQueue;
308
	std::unique_ptr<std::ofstream> _sinkFile;
309
310
};

311
312
313
//for better readability
using SBasePtr = std::shared_ptr<SensorBase>;

314
#endif /* SRC_SENSORBASE_H_ */