/* * SensorBase.h * * Created on: 09.08.2018 * Author: Micha Mueller */ #ifndef SRC_SENSORBASE_H_ #define SRC_SENSORBASE_H_ #include #include #include #include #include #include "logging.h" #include "cacheentry.h" class SensorBase { public: static const size_t QUEUE_MAXLIMIT=1024; SensorBase(const std::string& name) : _name(name), _mqtt(""), _sinkPath(""), _skipConstVal(false), _cacheInterval(900000), _subsamplingFactor(1), _subsamplingIndex(0), _cache(nullptr), _delta(false), _firstReading(true), _readingQueue(nullptr), _sinkFile(nullptr) { _lastRawUValue.timestamp = 0; _lastRawUValue.value = 0; _lastRawValue.timestamp = 0; _lastRawValue.value = 0; _latestValue.timestamp = 0; _latestValue.value = 0; _lastSentValue.timestamp= 0; _lastSentValue.value = 0; _accumulator.timestamp = 0; _accumulator.value = 0; } SensorBase(const SensorBase& other) : _name(other._name), _mqtt(other._mqtt), _skipConstVal(other._skipConstVal), _cacheInterval(other._cacheInterval), _subsamplingFactor(other._subsamplingFactor), _subsamplingIndex(0), _cache(nullptr), _delta(other._delta), _firstReading(true), _lastRawUValue(other._lastRawUValue), _lastRawValue(other._lastRawValue), _latestValue(other._latestValue), _lastSentValue(other._lastSentValue), _accumulator(other._accumulator), _readingQueue(nullptr), _sinkFile(nullptr) {} virtual ~SensorBase() {} SensorBase& operator=(const SensorBase& other) { _name = other._name; _mqtt = other._mqtt; _skipConstVal = other._skipConstVal; _cacheInterval = other._cacheInterval; _subsamplingFactor = other._subsamplingFactor; _subsamplingIndex = 0; _cache.reset(nullptr); _delta = other._delta; _firstReading = true; _lastRawUValue.timestamp = other._lastRawUValue.timestamp; _lastRawUValue.value = other._lastRawUValue.value; _lastRawValue.timestamp = other._lastRawValue.timestamp; _lastRawValue.value = other._lastRawValue.value; _latestValue.timestamp = other._latestValue.timestamp; _latestValue.value = other._latestValue.value; _lastSentValue.timestamp= other._lastSentValue.timestamp; _lastSentValue.value = other._lastSentValue.value; _accumulator.timestamp = other._accumulator.timestamp; _accumulator.value = other._accumulator.value; _readingQueue.reset(nullptr); _sinkFile.reset(nullptr); return *this; } const bool isDelta() const { return _delta;} const std::string& getName() const { return _name; } const std::string& getMqtt() const { return _mqtt; } const std::string& getSinkPath() const { return _sinkPath; } bool getSkipConstVal() const { return _skipConstVal; } unsigned getCacheInterval() const { return _cacheInterval; } unsigned getSubsampling() const { return _subsamplingFactor; } const CacheEntry* const getCache() const { return _cache.get(); } const reading_t& getLatestValue() const { return _latestValue; } const bool isInit() const { return _cache && _readingQueue; } // Exposing the reading queue is necessary for publishing sensor data from the collectagent boost::lockfree::spsc_queue* getReadingQueue() { return _readingQueue.get(); } void setSkipConstVal(bool skipConstVal) { _skipConstVal = skipConstVal; } void setDelta(const bool delta) { _delta = delta; } void setName(const std::string& name, int cpuID=-1) { _name = formatName(name, cpuID); } void setMqtt(const std::string& mqtt) { _mqtt = mqtt; } void setSinkPath(const std::string& path) { _sinkPath = path; } void setCacheInterval(unsigned cacheInterval) { _cacheInterval = cacheInterval; } void setSubsampling(unsigned factor) { _subsamplingFactor = factor; } void setLastRaw(int64_t raw) { _lastRawValue.value = raw; } void setLastURaw(uint64_t raw) { _lastRawUValue.value = raw; } const std::size_t getSizeOfReadingQueue() const { return _readingQueue->read_available(); } std::size_t popReadingQueue(reading_t *reads, std::size_t max) const { return _readingQueue->pop(reads, max); } void clearReadingQueue() const { reading_t buf; while(_readingQueue->pop(buf)) {} } void pushReadingQueue(reading_t *reads, std::size_t count) const { _readingQueue->push(reads, count); } void initSensor(unsigned interval) { uint64_t cacheSize = _cacheInterval / interval + 1; if(!_cache) { //TODO: have all time-related configuration parameters use the same unit (e.g. milliseconds) _cache.reset(new CacheEntry( (uint64_t)_cacheInterval * 1000000, cacheSize)); } if(!_readingQueue) { _readingQueue.reset(new boost::lockfree::spsc_queue(QUEUE_MAXLIMIT)); } if(!_sinkFile && _sinkPath != "") { _sinkFile.reset(new std::ofstream(_sinkPath)); if(!_sinkFile->is_open()) _sinkFile.reset(nullptr); } } /** * Store a reading, in order to get it pushed to the data base eventually. * Also this method takes care of other optional reading post-processing, * e.g. delta computation, subsampling, caching, scaling, etc. * * This is the primary storeReading() and should be used whenever possible. * * @param rawReading Reading struct with value and timestamp to be stored. * @param factor Scaling factor, which is applied to the reading value (optional) * @param maxValue Maximum possible value of the reading; required for the * delta computation to detect an overflow. */ void storeReading(reading_t rawReading, double factor=1.0, long long maxValue=LLONG_MAX, bool storeGlobal=true) { reading_t reading = rawReading; if( _delta ) { if (!_firstReading) { if (rawReading.value < _lastRawValue.value) reading.value = (rawReading.value + (maxValue - _lastRawValue.value)) * factor; else reading.value = (rawReading.value - _lastRawValue.value) * factor; } else { _firstReading = false; _lastRawValue = rawReading; return; } _lastRawValue = rawReading; } else reading.value = rawReading.value * factor; storeReadingLocal(reading); if (storeGlobal) { storeReadingGlobal(reading); } } /** * Store an unsigned reading, in order to get it pushed to the data base eventually. * Also this method takes care of other optional reading post-processing, * e.g. delta computation, subsampling, caching, scaling, etc. * * This is a variant of the primary storeReading() for monotonically increasing * sensors reading unsigned 64bit values which may require more than the 63bit * offered by a signed reading_t. The readings are still stored as signed int64 * in the database, therefore all such sensors should enable storage of deltas! * * This variant only adapts the delta computation for ureading_t actually. * * @param rawReading Reading struct with (usigned) value and timestamp to be stored. * @param factor Scaling factor, which is applied to the reading value (optional) * @param maxValue Maximum possible value of the reading; required for the * delta computation to detect an overflow. */ void storeReading(ureading_t rawReading, double factor=1.0, unsigned long long maxValue=ULLONG_MAX, bool storeGlobal=true) { reading_t reading; reading.timestamp = rawReading.timestamp; if( _delta ) { if (!_firstReading) { if (rawReading.value < _lastRawUValue.value) reading.value = (rawReading.value + (maxValue - _lastRawUValue.value)) * factor; else reading.value = (rawReading.value - _lastRawUValue.value) * factor; } else { _firstReading = false; _lastRawUValue = rawReading; return; } _lastRawUValue = rawReading; } else reading.value = rawReading.value * factor; storeReadingLocal(reading); if (storeGlobal) { storeReadingGlobal(reading); } } static std::string formatName(const std::string& name, int cpuID=-1) {return cpuID<0 ? name : "cpu" + std::to_string(cpuID) + "." + name;} virtual void printConfig(LOG_LEVEL ll, LOGGER& lg, unsigned leadingSpaces=16) { std::string leading(leadingSpaces, ' '); LOG_VAR(ll) << leading << "Sensor: " << _name; LOG_VAR(ll) << leading << " MQTT Topic: " << _mqtt; LOG_VAR(ll) << leading << " Sink: " << (getSinkPath() != "" ? getSinkPath() : "none"); LOG_VAR(ll) << leading << " SubSampling: " << getSubsampling(); LOG_VAR(ll) << leading << (_skipConstVal ? " Skipping constant values" : " No skipping of constant values"); LOG_VAR(ll) << leading << (_delta ? " Storing delta readings" : " Storing absolute readings"); } protected: /** * Store reading within the sensor, but do not put it in the readingQueue * so the reading does not get pushed but the caches are still updated. */ inline void storeReadingLocal(reading_t reading) { if (_sinkFile) { try { _sinkFile->seekp(0, std::ios::beg); *_sinkFile << reading.value << std::endl; } catch(const std::exception &e) { _sinkFile->close(); _sinkFile.reset(nullptr); } } _cache->store(reading); _latestValue = reading; } /** * Store the reading in the readingQueue so it can get pushed. */ inline void storeReadingGlobal(reading_t reading) { if( _delta ) // If in delta mode, _accumulator acts as a buffer, summing all deltas for the subsampling period _accumulator.value += reading.value; else _accumulator.value = reading.value; if (_subsamplingIndex++ % _subsamplingFactor == 0) { _accumulator.timestamp = reading.timestamp; //TODO: if sensor starts with values of 0, these won't be pushed. This should be fixed if( !(_skipConstVal && (_accumulator.value == _lastSentValue.value)) ) { _readingQueue->push(_accumulator); _lastSentValue = _accumulator; } // We reset the accumulator's value for the correct accumulation of deltas _accumulator.value = 0; } } std::string _name; std::string _mqtt; std::string _sinkPath; bool _skipConstVal; unsigned int _cacheInterval; unsigned int _subsamplingFactor; unsigned int _subsamplingIndex; std::unique_ptr _cache; bool _delta; bool _firstReading; ureading_t _lastRawUValue; reading_t _lastRawValue; reading_t _latestValue; reading_t _lastSentValue; reading_t _accumulator; std::unique_ptr> _readingQueue; std::unique_ptr _sinkFile; }; //for better readability using SBasePtr = std::shared_ptr; #endif /* SRC_SENSORBASE_H_ */