/* * SensorBase.h * * Created on: 09.08.2018 * Author: Micha Mueller */ #ifndef SRC_SENSORBASE_H_ #define SRC_SENSORBASE_H_ #include #include #include #include #include typedef struct { int64_t value; uint64_t timestamp; } reading_t; class SensorBase { public: static const size_t QUEUE_MAXLIMIT=1024; SensorBase(const std::string& name) : _name(name), _mqtt(""), _sinkPath(""), _skipConstVal(false), _cacheInterval(900000), _subsamplingFactor(1), _subsamplingIndex(0), _cacheSize(1), _cacheIndex(0), _cache(nullptr), _delta(false), _firstReading(true), _readingQueue(nullptr), _sinkFile(nullptr) { _latestValue.timestamp = 0; _latestValue.value = 0; _lastSentValue.timestamp= 0; _lastSentValue.value = 0; _lastRawValue.timestamp = 0; _lastRawValue.value = 0; } SensorBase(const SensorBase& other) : _name(other._name), _mqtt(other._mqtt), _sinkPath(other._sinkPath), _skipConstVal(other._skipConstVal), _cacheInterval(other._cacheInterval), _subsamplingFactor(other._subsamplingFactor), _subsamplingIndex(0), _cacheSize(other._cacheSize), _cacheIndex(0), _cache(nullptr), _delta(other._delta), _firstReading(true), _latestValue(other._latestValue), _lastRawValue(other._lastRawValue), _lastSentValue(other._lastSentValue), _readingQueue(nullptr), _sinkFile(nullptr) {} virtual ~SensorBase() {} SensorBase& operator=(const SensorBase& other) { _name = other._name; _mqtt = other._mqtt; _sinkPath = other._sinkPath; _skipConstVal = other._skipConstVal; _cacheInterval = other._cacheInterval; _subsamplingFactor = other._subsamplingFactor; _subsamplingIndex = 0; _cacheSize = other._cacheSize; _cacheIndex = 0; _cache.reset(nullptr); _delta = other._delta; _firstReading = true; _latestValue.timestamp = other._latestValue.timestamp; _latestValue.value = other._latestValue.value; _lastRawValue.timestamp = other._lastRawValue.timestamp; _lastRawValue.value = other._lastRawValue.value; _lastSentValue.timestamp= other._lastSentValue.timestamp; _lastSentValue.value = other._lastSentValue.value; _readingQueue.reset(nullptr); _sinkFile.reset(nullptr); return *this; } const bool isDelta() const { return _delta;} const std::string& getName() const { return _name; } const std::string& getMqtt() const { return _mqtt; } const std::string& getSinkPath() const { return _sinkPath; } bool getSkipConstVal() const { return _skipConstVal; } unsigned getCacheSize() const { return _cacheSize; } unsigned getSubsampling() const { return _subsamplingFactor; } const reading_t * const getCache() const { return _cache.get(); } const reading_t& getLatestValue() const { return _latestValue; } void setSkipConstVal(bool skipConstVal) { _skipConstVal = skipConstVal; } void setDelta(const bool delta) { _delta = delta; } void setName(const std::string& name, int cpuID=-1) { _name = formatName(name, cpuID); } void setMqtt(const std::string& mqtt) { _mqtt = mqtt; } void setSinkPath(const std::string& path) { _sinkPath = path; } void setCacheInterval(unsigned cacheInterval) { _cacheInterval = cacheInterval; } void setSubsampling(unsigned factor) { _subsamplingFactor = factor; } const std::size_t getSizeOfReadingQueue() const { return _readingQueue->read_available(); } std::size_t popReadingQueue(reading_t *reads, std::size_t max) const { return _readingQueue->pop(reads, max); } void pushReadingQueue(reading_t *reads, std::size_t count) const { _readingQueue->push(reads, count); } void initSensor(unsigned interval) { _cacheSize = _cacheInterval / interval + 1; if(!_cache) { _cache.reset(new reading_t[_cacheSize]); for(unsigned i = 0; i < _cacheSize; i++) { _cache[i] = _latestValue; //_latestValue should equal (0,0) at this point } } if(!_readingQueue) { _readingQueue.reset(new boost::lockfree::spsc_queue(QUEUE_MAXLIMIT)); } if(!_sinkFile && _sinkPath != "") { _sinkFile.reset(new std::ofstream(_sinkPath)); if(!_sinkFile->is_open()) _sinkFile.reset(nullptr); } } void storeReading(reading_t rawReading, double factor=1.0, unsigned long long maxValue=ULLONG_MAX) { reading_t reading = rawReading; if( _delta ) { if (!_firstReading) { if (rawReading.value < _lastRawValue.value) reading.value = (rawReading.value + (maxValue - _lastRawValue.value)) * factor; else reading.value = (rawReading.value - _lastRawValue.value) * factor; } else { _firstReading = false; _lastRawValue = rawReading; return; } _lastRawValue = rawReading; } else reading.value = rawReading.value * factor; //TODO: if sensor starts with values of 0, these won't be pushed. This should be fixed if (!(_skipConstVal && (reading.value == _lastSentValue.value)) && _subsamplingIndex++ % _subsamplingFactor == 0) { _readingQueue->push(reading); _lastSentValue = reading; } if (_sinkFile) { _sinkFile->seekp(0, std::ios::beg); *_sinkFile << reading.value << std::endl; } _cache[_cacheIndex] = reading; _cacheIndex = (_cacheIndex + 1) % _cacheSize; _latestValue = reading; } static std::string formatName(const std::string& name, int cpuID=-1) {return cpuID<0 ? name : "cpu" + std::to_string(cpuID) + "." + name;} protected: std::string _name; std::string _mqtt; std::string _sinkPath; bool _skipConstVal; unsigned int _cacheInterval; unsigned int _subsamplingFactor; unsigned int _subsamplingIndex; unsigned int _cacheSize; unsigned int _cacheIndex; std::unique_ptr _cache; bool _delta; bool _firstReading; reading_t _latestValue; reading_t _lastRawValue; reading_t _lastSentValue; std::unique_ptr> _readingQueue; std::unique_ptr _sinkFile; }; //for better readability using SBasePtr = std::shared_ptr; #endif /* SRC_SENSORBASE_H_ */