Commit d76931a3 authored by Alessio Netti's avatar Alessio Netti
Browse files

Sensor subsampling and file output features

- Now sensors can be configured to be automatically written to files
(sink param)
- A per-sensor subsampling feature has also been added, allowing to
decimate the frequency of readings sent to MQTT compared to those that
have been sampled
- Also fixed a bug in MQTTpusher that caused reduced message rates
parent 5c1d54c2
......@@ -192,6 +192,8 @@ All the different plugins share some same general principles in common regarding
3. Sensors hold only those attributes which are necessary to uniquely identify the target sensor. Common base attributes:
* __mqttsuffix__ (to make its [mqtt-topic](#mqttTopic) unique)
* __delta__ (identifies a monotonic sensor. If set to "on", differences between successive readings are collected)
* __sink__ (a path to a file to which sensor readings should be written, disabled by default)
* __subSampling__ (subsampling factor S. If > 1, only one reading every S is sent over MQTT, and the others are kept locally)
5. Be aware that naming of sensor/group/entity is not fixed. A plugin developer can name them as he likes, e.g. counter/multicounter/host.
6. It is possible to define template groups or entities in the config file, but not template sensors (as a sensor should only consists of attributs which make him unique this would not be too useful). To specify a template group/entity simply prefix its definition with `template_` (see the example below). You can reference them later by using the `default` attribute. A template entity can consist of groups and these in turn can consist of sensors. When using a template, all of its attribute values are copied to the actual sensor. Copied attributes can be overwritten in the actual entity/sensor (some of them even should be overwritten, e.g. the mqttPart). However, groups/sensors associated with a template are copied to the actual entity/group and can NOT be overwritten. One can specify further groups/sensors which are then added to those copied from the template. Template entitys/groups itself or sensors within them are never used in live operation of the plugin. They are purely cosmetic for convenient configuration.
......
......@@ -15,17 +15,17 @@
#define LOGM(sev) LOG(sev) << "Mosquitto: "
MQTTPusher::MQTTPusher(int brokerPort, const std::string& brokerHost, const std::string& sensorPattern, int qosLevel, pluginVector_t& plugins, unsigned int maxNumberOfMessages) :
_brokerPort(brokerPort),
_qosLevel(qosLevel),
_brokerPort(brokerPort),
_brokerHost(brokerHost),
_sensorPattern(sensorPattern),
_qosLevel(qosLevel),
_plugins(plugins),
_maxNumberOfMessages(maxNumberOfMessages),
_plugins(plugins),
_connected(false),
_keepRunning(true),
_overrideMsgCap(true),
_doHalt(false),
_halted(false) {
_halted(false),
_maxNumberOfMessages(maxNumberOfMessages) {
//first print some info
int mosqMajor, mosqMinor, mosqRevision;
......@@ -113,7 +113,6 @@ void MQTTPusher::push() {
if ((mosqErr = mosquitto_loop(_mosq, -1, 1)) != MOSQ_ERR_SUCCESS) {
LOGM(error) << "Error in mosquitto_loop: " << mosquitto_strerror(mosqErr);
}
sleep(1);
}
mosquitto_disconnect(_mosq);
}
......@@ -201,5 +200,5 @@ void MQTTPusher::computeMsgRate() {
// The formula below assumes the pusher's sleep time is 1 sec; if not, change accordingly
_overrideMsgCap = _maxNumberOfMessages == 0 || msgRate > _maxNumberOfMessages;
if( _overrideMsgCap && _maxNumberOfMessages != 0 )
LOGM(error) << "Cannot enforce max rate of " << _maxNumberOfMessages << " msg/s lower than actual " << msgRate << " msg/s!";
LOGM(warning) << "Cannot enforce max rate of " << _maxNumberOfMessages << " msg/s lower than actual " << msgRate << " msg/s!";
}
......@@ -365,9 +365,12 @@ protected:
} else {
sBase.setSkipConstVal(false);
}
}
else if (boost::iequals(val.first, "delta")) {
} else if (boost::iequals(val.first, "delta")) {
sBase.setDelta( val.second.data() == "on" );
} else if (boost::iequals(val.first, "sink")) {
sBase.setSinkPath( val.second.data() );
} else if (boost::iequals(val.first, "subSampling")) {
sBase.setSubsampling( std::stoul(val.second.data()) );
}
}
sensorBase(sBase, config);
......
......@@ -8,6 +8,7 @@
#ifndef SRC_SENSORBASE_H_
#define SRC_SENSORBASE_H_
#include <fstream>
#include <memory>
#include <string>
#include <limits.h>
......@@ -25,16 +26,22 @@ public:
SensorBase(const std::string& name) :
_name(name),
_mqtt(""),
_sinkPath(""),
_skipConstVal(false),
_cacheInterval(900000),
_subsamplingFactor(1),
_subsamplingIndex(0),
_cacheSize(1),
_cacheIndex(0),
_cache(nullptr),
_delta(false),
_readingQueue(nullptr) {
_readingQueue(nullptr),
_sinkFile(nullptr) {
_latestValue.timestamp = 0;
_latestValue.value = 0;
_lastSentValue.timestamp= 0;
_lastSentValue.value = 0;
_lastRawValue.timestamp = 0;
_lastRawValue.value = 0;
}
......@@ -42,23 +49,31 @@ public:
SensorBase(const SensorBase& other) :
_name(other._name),
_mqtt(other._mqtt),
_sinkPath(other._sinkPath),
_skipConstVal(other._skipConstVal),
_cacheInterval(other._cacheInterval),
_subsamplingFactor(other._subsamplingFactor),
_subsamplingIndex(0),
_cacheSize(other._cacheSize),
_cacheIndex(0),
_cache(nullptr),
_delta(other._delta),
_latestValue(other._latestValue),
_lastRawValue(other._lastRawValue),
_readingQueue(nullptr) {}
_lastSentValue(other._lastSentValue),
_readingQueue(nullptr),
_sinkFile(nullptr) {}
virtual ~SensorBase() {}
SensorBase& operator=(const SensorBase& other) {
_name = other._name;
_mqtt = other._mqtt;
_sinkPath = other._sinkPath;
_skipConstVal = other._skipConstVal;
_cacheInterval = other._cacheInterval;
_subsamplingFactor = other._subsamplingFactor;
_subsamplingIndex = 0;
_cacheSize = other._cacheSize;
_cacheIndex = 0;
_cache.reset(nullptr);
......@@ -67,7 +82,10 @@ public:
_latestValue.value = other._latestValue.value;
_lastRawValue.timestamp = other._lastRawValue.timestamp;
_lastRawValue.value = other._lastRawValue.value;
_lastSentValue.timestamp= other._lastSentValue.timestamp;
_lastSentValue.value = other._lastSentValue.value;
_readingQueue.reset(nullptr);
_sinkFile.reset(nullptr);
return *this;
}
......@@ -75,16 +93,20 @@ public:
const bool isDelta() const { return _delta;}
const std::string& getName() const { return _name; }
const std::string& getMqtt() const { return _mqtt; }
const std::string& getSinkPath() const { return _sinkPath; }
bool getSkipConstVal() const { return _skipConstVal; }
unsigned getCacheSize() const { return _cacheSize; }
unsigned getSubsampling() const { return _subsamplingFactor; }
const reading_t * const getCache() const { return _cache.get(); }
const reading_t& getLatestValue() const { return _latestValue; }
void setSkipConstVal(bool skipConstVal) { _skipConstVal = skipConstVal; }
void setSkipConstVal(bool skipConstVal) { _skipConstVal = skipConstVal; }
void setDelta(const bool delta) { _delta = delta; }
void setName(const std::string& name, int cpuID=-1) { _name = formatName(name, cpuID); }
void setMqtt(const std::string& mqtt) { _mqtt = mqtt; }
void setSinkPath(const std::string& path) { _sinkPath = path; }
void setCacheInterval(unsigned cacheInterval) { _cacheInterval = cacheInterval; }
void setSubsampling(unsigned factor) { _subsamplingFactor = factor; }
const std::size_t getSizeOfReadingQueue() const { return _readingQueue->read_available(); }
std::size_t popReadingQueue(reading_t *reads, std::size_t max) const { return _readingQueue->pop(reads, max); }
......@@ -101,6 +123,11 @@ public:
if(!_readingQueue) {
_readingQueue.reset(new boost::lockfree::spsc_queue<reading_t>(QUEUE_MAXLIMIT));
}
if(!_sinkFile && _sinkPath != "") {
_sinkFile.reset(new std::ofstream(_sinkPath));
if(!_sinkFile->is_open())
_sinkFile.reset(nullptr);
}
}
void storeReading(reading_t rawReading, double factor=1.0, unsigned long long maxValue=ULLONG_MAX) {
......@@ -115,9 +142,16 @@ public:
else
reading.value = rawReading.value * factor;
if (!(_skipConstVal && (reading.value == _latestValue.value))) {
//TODO: if sensor starts with values of 0, these won't be pushed. This should be fixed
if (!(_skipConstVal && (reading.value == _lastSentValue.value)) && _subsamplingIndex++ % _subsamplingFactor == 0) {
_readingQueue->push(reading);
_lastSentValue = reading;
}
if (_sinkFile) {
_sinkFile->seekp(0, std::ios::beg);
*_sinkFile << reading.value << std::endl;
}
_cache[_cacheIndex] = reading;
_cacheIndex = (_cacheIndex + 1) % _cacheSize;
_latestValue = reading;
......@@ -129,15 +163,20 @@ protected:
std::string _name;
std::string _mqtt;
std::string _sinkPath;
bool _skipConstVal;
unsigned int _cacheInterval;
unsigned int _subsamplingFactor;
unsigned int _subsamplingIndex;
unsigned int _cacheSize;
unsigned int _cacheIndex;
std::unique_ptr<reading_t[]> _cache;
bool _delta;
reading_t _latestValue;
reading_t _lastRawValue;
reading_t _lastSentValue;
std::unique_ptr<boost::lockfree::spsc_queue<reading_t>> _readingQueue;
std::unique_ptr<std::ofstream> _sinkFile;
};
//for better readability
......
......@@ -45,6 +45,15 @@ public:
else
_latestValue.value = value * factor;
if (!(_skipConstVal && (_latestValue.value == _lastSentValue.value)) && _subsamplingIndex++ % _subsamplingFactor == 0) {
_readingQueue->push(_latestValue);
_lastSentValue = _latestValue;
}
if (_sinkFile) {
_sinkFile->seekp(0, std::ios::beg);
*_sinkFile << _latestValue.value;
}
_readingQueue->push(_latestValue);
_cache[_cacheIndex] = _latestValue;
_cacheIndex = (_cacheIndex + 1) % _cacheSize;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment