sensorbase.h 12.4 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//================================================================================
// Name        : SensorBase.h
// Author      : Micha Mueller
// Copyright   : Leibniz Supercomputing Centre
// Description : General sensor base class.
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2018-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
26
27
28
29

#ifndef SRC_SENSORBASE_H_
#define SRC_SENSORBASE_H_

30
#include <fstream>
31
#include <memory>
32
#include <string>
33
#include <limits.h>
34
#include <boost/lockfree/spsc_queue.hpp>
35
#include "logging.h"
Alessio Netti's avatar
Alessio Netti committed
36
#include "cacheentry.h"
37

38
39
class SensorBase {
public:
lu43jih's avatar
lu43jih committed
40
41
	static const size_t QUEUE_MAXLIMIT=1024;

42
43
	SensorBase(const std::string& name) :
		_name(name),
44
		_mqtt(""),
45
		_sinkPath(""),
46
		_skipConstVal(false),
47
		_cacheInterval(900000),
48
49
		_subsamplingFactor(1),
		_subsamplingIndex(0),
50
		_cache(nullptr),
51
		_delta(false),
52
		_firstReading(true),
53
54
		_readingQueue(nullptr),
		_sinkFile(nullptr) {
55

Alessio Netti's avatar
Alessio Netti committed
56
57
        _lastRawUValue.timestamp = 0;
        _lastRawUValue.value     = 0;
58
59
        _lastRawValue.timestamp = 0;
        _lastRawValue.value     = 0;
Alessio Netti's avatar
Alessio Netti committed
60
61
        _latestValue.timestamp	= 0;
        _latestValue.value		= 0;
62
63
        _lastSentValue.timestamp= 0;
        _lastSentValue.value	= 0;
Alessio Netti's avatar
Alessio Netti committed
64
65
        _accumulator.timestamp  = 0;
        _accumulator.value		= 0;
66
67
	}

68
69
70
	SensorBase(const SensorBase& other) :
		_name(other._name),
		_mqtt(other._mqtt),
71
		_skipConstVal(other._skipConstVal),
72
		_cacheInterval(other._cacheInterval),
73
74
		_subsamplingFactor(other._subsamplingFactor),
		_subsamplingIndex(0),
75
		_cache(nullptr),
76
		_delta(other._delta),
77
		_firstReading(true),
Alessio Netti's avatar
Alessio Netti committed
78
		_lastRawUValue(other._lastRawUValue),
Alessio Netti's avatar
Alessio Netti committed
79
		_lastRawValue(other._lastRawValue),
Alessio Netti's avatar
Alessio Netti committed
80
		_latestValue(other._latestValue),
81
        _lastSentValue(other._lastSentValue),
Alessio Netti's avatar
Alessio Netti committed
82
        _accumulator(other._accumulator),
83
84
		_readingQueue(nullptr),
		_sinkFile(nullptr) {}
85

86
	virtual ~SensorBase() { if(_sinkFile) _sinkFile->close(); }
87
88
89
90

	SensorBase& operator=(const SensorBase& other) {
		_name = other._name;
		_mqtt = other._mqtt;
91
		_skipConstVal = other._skipConstVal;
92
		_cacheInterval = other._cacheInterval;
93
94
		_subsamplingFactor = other._subsamplingFactor;
		_subsamplingIndex = 0;
95
		_cache.reset(nullptr);
96
		_delta = other._delta;
97
		_firstReading = true;
98
99
100
101
		_lastRawUValue.timestamp = other._lastRawUValue.timestamp;
        _lastRawUValue.value     = other._lastRawUValue.value;
		_lastRawValue.timestamp = other._lastRawValue.timestamp;
        _lastRawValue.value     = other._lastRawValue.value;
102
103
		_latestValue.timestamp	= other._latestValue.timestamp;
		_latestValue.value		= other._latestValue.value;
104
105
        _lastSentValue.timestamp= other._lastSentValue.timestamp;
        _lastSentValue.value	= other._lastSentValue.value;
Alessio Netti's avatar
Alessio Netti committed
106
107
        _accumulator.timestamp  = other._accumulator.timestamp;
        _accumulator.value      = other._accumulator.value;
108
		_readingQueue.reset(nullptr);
109
		_sinkFile.reset(nullptr);
110
111

		return *this;
112
113
	}

114
	const bool 				isDelta()			const 	{ return _delta;}
115
116
	const std::string& 		getName() 			const	{ return _name; }
	const std::string&		getMqtt() 			const	{ return _mqtt; }
117
	const std::string&		getSinkPath() 		const	{ return _sinkPath; }
118
	bool					getSkipConstVal()	const	{ return _skipConstVal; }
Alessio Netti's avatar
Alessio Netti committed
119
	unsigned				getCacheInterval()	const	{ return _cacheInterval; }
120
	unsigned 				getSubsampling()	const   { return _subsamplingFactor; }
Alessio Netti's avatar
Alessio Netti committed
121
	const CacheEntry* const	getCache() 			const	{ return _cache.get(); }
Micha Mueller's avatar
Micha Mueller committed
122
	const reading_t&		getLatestValue()	const	{ return _latestValue; }
Alessio Netti's avatar
Alessio Netti committed
123
	const bool				isInit()			const 	{ return _cache && _readingQueue; }
124

125
126
127
	// Exposing the reading queue is necessary for publishing sensor data from the collectagent
	boost::lockfree::spsc_queue<reading_t>* getReadingQueue() { return _readingQueue.get(); }

128
	void	setSkipConstVal(bool skipConstVal)				{ _skipConstVal = skipConstVal;	}
129
130
131
	void	setDelta(const bool delta) 						{ _delta = delta; }
	void	setName(const std::string& name, int cpuID=-1)	{ _name = formatName(name, cpuID); }
	void	setMqtt(const std::string& mqtt)				{ _mqtt = mqtt; }
132
	void 	setSinkPath(const std::string& path)			{ _sinkPath = path; }
133
	void 	setCacheInterval(unsigned cacheInterval)		{ _cacheInterval = cacheInterval; }
134
	void	setSubsampling(unsigned factor)					{ _subsamplingFactor = factor; }
135
136
	void    setLastRaw(int64_t raw)                         { _lastRawValue.value = raw; }
	void    setLastURaw(uint64_t raw)                       { _lastRawUValue.value = raw; }
137
138
139

	const std::size_t	getSizeOfReadingQueue() const { return _readingQueue->read_available(); }
	std::size_t 		popReadingQueue(reading_t *reads, std::size_t max) const	{ return _readingQueue->pop(reads, max); }
Alessio Netti's avatar
Alessio Netti committed
140
	void 				clearReadingQueue() const	{ reading_t buf; while(_readingQueue->pop(buf)) {} }
141
142
	void				pushReadingQueue(reading_t *reads, std::size_t count) const	{ _readingQueue->push(reads, count); }

143
	void initSensor(unsigned interval) {
Alessio Netti's avatar
Alessio Netti committed
144
		uint64_t cacheSize = _cacheInterval / interval + 1;
145
		if(!_cache) {
Alessio Netti's avatar
Alessio Netti committed
146
147
			//TODO: have all time-related configuration parameters use the same unit (e.g. milliseconds)
			_cache.reset(new CacheEntry( (uint64_t)_cacheInterval * 1000000, cacheSize));
148
			_cache->updateBatchSize(1, true);
149
150
		}
		if(!_readingQueue) {
lu43jih's avatar
lu43jih committed
151
			_readingQueue.reset(new boost::lockfree::spsc_queue<reading_t>(QUEUE_MAXLIMIT));
152
		}
153
154
155
156
157
		if(!_sinkFile && _sinkPath != "") {
			_sinkFile.reset(new std::ofstream(_sinkPath));
			if(!_sinkFile->is_open())
				_sinkFile.reset(nullptr);
		}
158
159
	}

160
161
	/**
	 * Store a reading, in order to get it pushed to the data base eventually.
162
	 * Also this method takes care of other optional reading post-processing,
163
164
165
166
167
168
169
170
171
	 * e.g. delta computation, subsampling, caching, scaling, etc.
	 *
	 * This is the primary storeReading() and should be used whenever possible.
	 *
	 * @param rawReading  Reading struct with value and timestamp to be stored.
	 * @param factor      Scaling factor, which is applied to the reading value (optional)
	 * @param maxValue    Maximum possible value of the reading; required for the
	 *                    delta computation to detect an overflow.
	 */
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
	void storeReading(reading_t rawReading, double factor=1.0, long long maxValue=LLONG_MAX, bool storeGlobal=true) {
        reading_t reading = rawReading;
        if( _delta ) {
            if (!_firstReading) {
                if (rawReading.value < _lastRawValue.value)
                    reading.value = (rawReading.value + (maxValue - _lastRawValue.value)) * factor;
                else
                    reading.value = (rawReading.value - _lastRawValue.value) * factor;
            } else {
                _firstReading = false;
                _lastRawValue = rawReading;
                return;
            }
            _lastRawValue = rawReading;
        }
        else
            reading.value = rawReading.value * factor;
189

190
191
192
193
		storeReadingLocal(reading);
		if (storeGlobal) {
		    storeReadingGlobal(reading);
		}
194
195
	}

196
197
	/**
     * Store an unsigned reading, in order to get it pushed to the data base eventually.
198
     * Also this method takes care of other optional reading post-processing,
199
200
201
202
203
204
205
206
207
208
209
210
211
212
     * e.g. delta computation, subsampling, caching, scaling, etc.
     *
     * This is a variant of the primary storeReading() for monotonically increasing
     * sensors reading unsigned 64bit values which may require more than the 63bit
     * offered by a signed reading_t. The readings are still stored as signed int64
     * in the database, therefore all such sensors should enable storage of deltas!
     *
     * This variant only adapts the delta computation for ureading_t actually.
     *
     * @param rawReading  Reading struct with (usigned) value and timestamp to be stored.
     * @param factor      Scaling factor, which is applied to the reading value (optional)
     * @param maxValue    Maximum possible value of the reading; required for the
     *                    delta computation to detect an overflow.
     */
213
	void storeReading(ureading_t rawReading, double factor=1.0, unsigned long long maxValue=ULLONG_MAX, bool storeGlobal=true) {
214
215
        reading_t reading;
        reading.timestamp = rawReading.timestamp;
216
217
        if( _delta ) {
            if (!_firstReading) {
218
219
220
221
                if (rawReading.value < _lastRawUValue.value)
                    reading.value = (rawReading.value + (maxValue - _lastRawUValue.value)) * factor;
                else
                    reading.value = (rawReading.value - _lastRawUValue.value) * factor;
222
            } else {
223
224
225
                _firstReading = false;
                _lastRawUValue = rawReading;
                return;
226
227
228
229
230
231
            }
            _lastRawUValue = rawReading;
        }
        else
            reading.value = rawReading.value * factor;

232
233
234
235
        storeReadingLocal(reading);
        if (storeGlobal) {
            storeReadingGlobal(reading);
        }
236
237
    }

238
239
240
241
242
243
244
    /**
     * Store reading within the sensor, but do not put it in the readingQueue
     * so the reading does not get pushed but the caches are still updated.
     */
    inline
    void storeReadingLocal(reading_t reading) {
        if (_sinkFile) {
245
246
247
248
249
250
251
252
            try {
                _sinkFile->seekp(0, std::ios::beg);
                *_sinkFile << reading.value << std::endl;
            } catch(const std::exception &e) { _sinkFile->close(); _sinkFile.reset(nullptr); }
        }

        _cache->store(reading);
        _latestValue = reading;
253
    }
254

255
256
257
258
259
260
    /**
     * Store the reading in the readingQueue so it can get pushed.
     */
    inline
    void storeReadingGlobal(reading_t reading) {
        if( _delta )
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
            // If in delta mode, _accumulator acts as a buffer, summing all deltas for the subsampling period
            _accumulator.value += reading.value;
        else
            _accumulator.value = reading.value;

        if (_subsamplingIndex++ % _subsamplingFactor == 0) {
            _accumulator.timestamp = reading.timestamp;
            //TODO: if sensor starts with values of 0, these won't be pushed. This should be fixed
            if( !(_skipConstVal && (_accumulator.value == _lastSentValue.value)) ) {
                _readingQueue->push(_accumulator);
                _lastSentValue = _accumulator;
            }
            // We reset the accumulator's value for the correct accumulation of deltas
            _accumulator.value = 0;
        }
276
277
278
279
280
281
282
283
284
285
286
287
    }

	static std::string formatName(const std::string& name, int cpuID=-1) {return cpuID<0 ? name : "cpu" + std::to_string(cpuID) + "." + name;}

	virtual void printConfig(LOG_LEVEL ll, LOGGER& lg, unsigned leadingSpaces=16) {
	  std::string leading(leadingSpaces, ' ');
	  LOG_VAR(ll) << leading << "Sensor: " << _name;
	  LOG_VAR(ll) << leading << "    MQTT Topic:  " << _mqtt;
	  LOG_VAR(ll) << leading << "    Sink:        " << (getSinkPath() != "" ? getSinkPath() : "none");
      LOG_VAR(ll) << leading << "    SubSampling: " << getSubsampling();
      LOG_VAR(ll) << leading << (_skipConstVal ? "    Skipping constant values" : "    No skipping of constant values");
      LOG_VAR(ll) << leading << (_delta ? "    Storing delta readings" : "    Storing absolute readings");
288
289
	}

290
291
protected:

292
293
	std::string _name;
	std::string _mqtt;
294
	std::string _sinkPath;
295
	bool _skipConstVal;
296
	unsigned int _cacheInterval;
297
298
	unsigned int _subsamplingFactor;
	unsigned int _subsamplingIndex;
Alessio Netti's avatar
Alessio Netti committed
299
	std::unique_ptr<CacheEntry> _cache;
300
	bool _delta;
301
	bool _firstReading;
302
	ureading_t _lastRawUValue;
303
	reading_t _lastRawValue;
304
	reading_t _latestValue;
305
	reading_t _lastSentValue;
Alessio Netti's avatar
Alessio Netti committed
306
	reading_t _accumulator;
307
	std::unique_ptr<boost::lockfree::spsc_queue<reading_t>> _readingQueue;
308
	std::unique_ptr<std::ofstream> _sinkFile;
309
310
};

311
312
313
//for better readability
using SBasePtr = std::shared_ptr<SensorBase>;

314
#endif /* SRC_SENSORBASE_H_ */