/* * SensorBase.h * * Created on: 09.08.2018 * Author: Micha Mueller */ #ifndef SRC_SENSORBASE_H_ #define SRC_SENSORBASE_H_ #include #include #include #include #include #include "logging.h" typedef struct { int64_t value; uint64_t timestamp; } reading_t; typedef struct { uint64_t value; uint64_t timestamp; } ureading_t; class SensorBase { public: static const size_t QUEUE_MAXLIMIT=1024; SensorBase(const std::string& name) : _name(name), _mqtt(""), _sinkPath(""), _skipConstVal(false), _cacheInterval(900000), _subsamplingFactor(1), _subsamplingIndex(0), _cacheSize(1), _cacheIndex(0), _cache(nullptr), _delta(false), _firstReading(true), _readingQueue(nullptr), _sinkFile(nullptr) { _lastRawUValue.timestamp = 0; _lastRawUValue.value = 0; _lastRawValue.timestamp = 0; _lastRawValue.value = 0; _latestValue.timestamp = 0; _latestValue.value = 0; _lastSentValue.timestamp= 0; _lastSentValue.value = 0; _accumulator.timestamp = 0; _accumulator.value = 0; } SensorBase(const SensorBase& other) : _name(other._name), _mqtt(other._mqtt), _skipConstVal(other._skipConstVal), _cacheInterval(other._cacheInterval), _subsamplingFactor(other._subsamplingFactor), _subsamplingIndex(0), _cacheSize(other._cacheSize), _cacheIndex(0), _cache(nullptr), _delta(other._delta), _firstReading(true), _lastRawUValue(other._lastRawUValue), _lastRawValue(other._lastRawValue), _latestValue(other._latestValue), _lastSentValue(other._lastSentValue), _accumulator(other._accumulator), _readingQueue(nullptr), _sinkFile(nullptr) {} virtual ~SensorBase() {} SensorBase& operator=(const SensorBase& other) { _name = other._name; _mqtt = other._mqtt; _skipConstVal = other._skipConstVal; _cacheInterval = other._cacheInterval; _subsamplingFactor = other._subsamplingFactor; _subsamplingIndex = 0; _cacheSize = other._cacheSize; _cacheIndex = 0; _cache.reset(nullptr); _delta = other._delta; _firstReading = true; _lastRawUValue.timestamp = other._lastRawUValue.timestamp; _lastRawUValue.value = other._lastRawUValue.value; _lastRawValue.timestamp = other._lastRawValue.timestamp; _lastRawValue.value = other._lastRawValue.value; _latestValue.timestamp = other._latestValue.timestamp; _latestValue.value = other._latestValue.value; _lastSentValue.timestamp= other._lastSentValue.timestamp; _lastSentValue.value = other._lastSentValue.value; _accumulator.timestamp = other._accumulator.timestamp; _accumulator.value = other._accumulator.value; _readingQueue.reset(nullptr); _sinkFile.reset(nullptr); return *this; } const bool isDelta() const { return _delta;} const std::string& getName() const { return _name; } const std::string& getMqtt() const { return _mqtt; } const std::string& getSinkPath() const { return _sinkPath; } bool getSkipConstVal() const { return _skipConstVal; } unsigned getCacheSize() const { return _cacheSize; } unsigned getCacheInterval() const { return _cacheInterval; } unsigned getSubsampling() const { return _subsamplingFactor; } const reading_t * const getCache() const { return _cache.get(); } const reading_t& getLatestValue() const { return _latestValue; } const bool isInit() const { return _cache && _readingQueue; } void setSkipConstVal(bool skipConstVal) { _skipConstVal = skipConstVal; } void setDelta(const bool delta) { _delta = delta; } void setName(const std::string& name, int cpuID=-1) { _name = formatName(name, cpuID); } void setMqtt(const std::string& mqtt) { _mqtt = mqtt; } void setSinkPath(const std::string& path) { _sinkPath = path; } void setCacheInterval(unsigned cacheInterval) { _cacheInterval = cacheInterval; } void setSubsampling(unsigned factor) { _subsamplingFactor = factor; } void setLastRaw(int64_t raw) { _lastRawValue.value = raw; } void setLastURaw(uint64_t raw) { _lastRawUValue.value = raw; } const std::size_t getSizeOfReadingQueue() const { return _readingQueue->read_available(); } std::size_t popReadingQueue(reading_t *reads, std::size_t max) const { return _readingQueue->pop(reads, max); } void clearReadingQueue() const { reading_t buf; while(_readingQueue->pop(buf)) {} } void pushReadingQueue(reading_t *reads, std::size_t count) const { _readingQueue->push(reads, count); } void initSensor(unsigned interval) { _cacheSize = _cacheInterval / interval + 1; if(!_cache) { _cache.reset(new reading_t[_cacheSize]); for(unsigned i = 0; i < _cacheSize; i++) { _cache[i] = _latestValue; //_latestValue should equal (0,0) at this point } } if(!_readingQueue) { _readingQueue.reset(new boost::lockfree::spsc_queue(QUEUE_MAXLIMIT)); } if(!_sinkFile && _sinkPath != "") { _sinkFile.reset(new std::ofstream(_sinkPath)); if(!_sinkFile->is_open()) _sinkFile.reset(nullptr); } } /** * Store a reading, in order to get it pushed to the data base eventually. * Also this methods takes care of other optional reading post-processing, * e.g. delta computation, subsampling, caching, scaling, etc. * * This is the primary storeReading() and should be used whenever possible. * * @param rawReading Reading struct with value and timestamp to be stored. * @param factor Scaling factor, which is applied to the reading value (optional) * @param maxValue Maximum possible value of the reading; required for the * delta computation to detect an overflow. */ void storeReading(reading_t rawReading, double factor=1.0, long long maxValue=LLONG_MAX) { reading_t reading = rawReading; if( _delta ) { if (!_firstReading) { if (rawReading.value < _lastRawValue.value) reading.value = (rawReading.value + (maxValue - _lastRawValue.value)) * factor; else reading.value = (rawReading.value - _lastRawValue.value) * factor; } else { _firstReading = false; _lastRawValue = rawReading; return; } _lastRawValue = rawReading; } else reading.value = rawReading.value * factor; if( _delta ) // If in delta mode, _accumulator acts as a buffer, summing all deltas for the subsampling period _accumulator.value += reading.value; else _accumulator.value = reading.value; if (_subsamplingIndex++ % _subsamplingFactor == 0) { _accumulator.timestamp = reading.timestamp; //TODO: if sensor starts with values of 0, these won't be pushed. This should be fixed if( !(_skipConstVal && (_accumulator.value == _lastSentValue.value)) ) { _readingQueue->push(_accumulator); _lastSentValue = _accumulator; } // We reset the accumulator's value for the correct accumulation of deltas _accumulator.value = 0; } if (_sinkFile) { try { _sinkFile->seekp(0, std::ios::beg); *_sinkFile << reading.value << std::endl; } catch(const std::exception &e) { _sinkFile->close(); _sinkFile.reset(nullptr); } } _cache[_cacheIndex] = reading; _cacheIndex = (_cacheIndex + 1) % _cacheSize; _latestValue = reading; } int64_t getCacheOffset(int64_t t) { if( t < 0) return -1; // Converting from milliseconds to nanoseconds int64_t offset = ( ( (int64_t)_cacheSize * t ) / ( (int64_t)_cacheInterval * 1000000 ) ) + 1; if(offset > _cacheSize) return -1; return ( _cacheSize + _cacheIndex - offset ) % _cacheSize; } /** * Store an unsigned reading, in order to get it pushed to the data base eventually. * Also this methods takes care of other optional reading post-processing, * e.g. delta computation, subsampling, caching, scaling, etc. * * This is a variant of the primary storeReading() for monotonically increasing * sensors reading unsigned 64bit values which may require more than the 63bit * offered by a signed reading_t. The readings are still stored as signed int64 * in the database, therefore all such sensors should enable storage of deltas! * * This variant only adapts the delta computation for ureading_t actually. * FIXME: Avoid code duplication * * @param rawReading Reading struct with (usigned) value and timestamp to be stored. * @param factor Scaling factor, which is applied to the reading value (optional) * @param maxValue Maximum possible value of the reading; required for the * delta computation to detect an overflow. */ void storeReading(ureading_t rawReading, double factor=1.0, unsigned long long maxValue=ULLONG_MAX) { reading_t reading; reading.timestamp = rawReading.timestamp; if( _delta ) { if (!_firstReading) { if (rawReading.value < _lastRawUValue.value) reading.value = (rawReading.value + (maxValue - _lastRawUValue.value)) * factor; else reading.value = (rawReading.value - _lastRawUValue.value) * factor; } else { _firstReading = false; _lastRawUValue = rawReading; return; } _lastRawUValue = rawReading; } else reading.value = rawReading.value * factor; if( _delta ) // If in delta mode, _accumulator acts as a buffer, summing all deltas for the subsampling period _accumulator.value += reading.value; else _accumulator.value = reading.value; if (_subsamplingIndex++ % _subsamplingFactor == 0) { _accumulator.timestamp = reading.timestamp; //TODO: if sensor starts with values of 0, these won't be pushed. This should be fixed if( !(_skipConstVal && (_accumulator.value == _lastSentValue.value)) ) { _readingQueue->push(_accumulator); _lastSentValue = _accumulator; } // We reset the accumulator's value for the correct accumulation of deltas _accumulator.value = 0; } if (_sinkFile) { try { _sinkFile->seekp(0, std::ios::beg); *_sinkFile << reading.value << std::endl; } catch(const std::exception &e) { _sinkFile->close(); _sinkFile.reset(nullptr); } } _cache[_cacheIndex] = reading; _cacheIndex = (_cacheIndex + 1) % _cacheSize; _latestValue = reading; } static std::string formatName(const std::string& name, int cpuID=-1) {return cpuID<0 ? name : "cpu" + std::to_string(cpuID) + "." + name;} virtual void printConfig(LOG_LEVEL ll, LOGGER& lg) { LOG_VAR(ll) << " Sensor: " << _name; LOG_VAR(ll) << " MQTT Topic: " << _mqtt; LOG_VAR(ll) << " sink: " << getSinkPath(); LOG_VAR(ll) << " subSampling: " << getSubsampling(); if(_skipConstVal) { LOG_VAR(ll) << " Skipping constant values"; } else { LOG_VAR(ll) << " No skipping of constant values"; } if(_delta) { LOG_VAR(ll) << " Storing delta readings"; } else { LOG_VAR(ll) << " Storing absolute readings"; } } protected: std::string _name; std::string _mqtt; std::string _sinkPath; bool _skipConstVal; unsigned int _cacheInterval; unsigned int _subsamplingFactor; unsigned int _subsamplingIndex; unsigned int _cacheSize; unsigned int _cacheIndex; std::unique_ptr _cache; bool _delta; bool _firstReading; ureading_t _lastRawUValue; reading_t _lastRawValue; reading_t _latestValue; reading_t _lastSentValue; reading_t _accumulator; std::unique_ptr> _readingQueue; std::unique_ptr _sinkFile; }; //for better readability using SBasePtr = std::shared_ptr; #endif /* SRC_SENSORBASE_H_ */