Commit 6e0123a2 authored by Micha Mueller's avatar Micha Mueller
Browse files

Make caching interval configurable

parent a20dd95f
......@@ -43,6 +43,7 @@ global {
verbosity 5
daemonize false
tempdir ./../testDir/
cacheInterval 900
}
plugins {
......@@ -65,11 +66,12 @@ Explanation of the values:
| global | Wrapper structure for the global values.
|   restAddr | Define address and port where the REST API should run on. See the corresponding [section](#restApi) for more information.
|   mqttBroker | Define address and port of the MQTT-broker which collects the messages (sensor values) send by dcdbpusher.
|   mqttprefix | To not rewrite a full MQTT-topic for every sensor one can specify here a consistend prefix.
|   mqttprefix | To not rewrite a full MQTT-topic for every sensor one can specify here a consistent prefix.
|   threads | Specify how many threads should be created to handle the sensors async. Default value of threads is 1. Note that the MQTTPusher always starts an extra thread. So the actual number of started threads is always one more than defined here. Specifying not enough threads can result in a delay for some sensors until they are read.
|   verbosity | Level of detail in the logfile (dcdb.log). Set to a value between 5 (all log-messages, default) and 0 (only fatal messages). NOTE: level of verbosity for the command-line log can be set via the -v flag independently when invoking dcdbpusher.
|   daemonize | Set to 'true' if dcdbpusher should run detached as daemon. Default is false.
|   tempdir | One can specify a writeable directory where dcdbpusher can write its temporary and logging files to. Default is the current (' ./ ' ) directory.
|   cacheInterval | Define a time interval in seconds. The last sensor readings within this time interval will be kept. This value can be overwritten by plugins.
| | |
| plugins | In this section one can specify the plugins which should be used.
|   plugin name | The plugin name is used to build the corresponding lib-name (e.g. sysfs --> libdcdbplugin_sysfs.1.0)
......@@ -101,6 +103,7 @@ Although every plugin requires different configuration parameters for its sensor
|
global { |global {
mqttprefix /00112233445566778899AABB0000 | mqttprefix /00112233445566778899AABB0000
cacheInterval 120 | cacheInterval 120
} |}
|
SensorTemplate { |SensorTemplate {
......@@ -142,8 +145,9 @@ Explanation of the values:
| Value                   | Explanation |
|:----- |:----------- |
| global | Here one can overwrite the global values defined in `global.conf`. The overwritten values are only used in the scope of the specific plugin.
|   mqttprefix | Currently only overwriting of the mqttprefix is supported.
| global | Here one can overwrite the global values defined in `global.conf`. The overwritten values are only used in the scope of the specific plugin. Overwriting of global values is completely optional. However, even if no global values are overwritten at least the `global{}` struct should be present.
|   mqttprefix | Define separate MQTT prefix for this plugin.
|   cacheInterval | Overwrite global caching interval with plugin specific value.
| | |
| SensorTemplate | Define template sensors to be used later in the configuration. This feature is mainly for convenience reasons. One is not obligated to define any template sensors. However it is required to at least define an empty SensorTemplate {} structure.
|   sensor name | Every template sensor needs a name for future reference. A template sensor can define every value (including values specific to a plugin) a normal sensor can (see below).
......
......@@ -6,6 +6,7 @@ global {
timeout 1000
apdu_timeout 200
apdu_retries 1
cacheInterval 90
}
templates {
......
......@@ -6,6 +6,7 @@ global {
verbosity 5
daemonize false
tempdir .
cacheInterval 120
}
plugins {
......
global {
cacheInterval 60
sessiontimeout 500
retransmissiontimeout 200
mqttprefix /AABBAABBAABBAACCDDCCDDCC
......
......@@ -32,6 +32,7 @@ Configuration::Configuration(const std::string& cfgFilePath) :
_global.mqttPrefix = "";
_global.threads = 1;
_global.tempdir = "./";
_global.cacheInterval = 900000;
//log levels will get inverted...
_global.logLevelFile = boost::log::trivial::fatal;
_global.logLevelCmd = boost::log::trivial::warning;
......@@ -97,6 +98,9 @@ bool Configuration::readGlobal() {
}
} else if (boost::iequals(global.first, "verbosity")) {
_global.logLevelFile = static_cast<boost::log::trivial::severity_level>(stoi(global.second.data()));
} else if (boost::iequals(global.first, "cacheInterval")) {
_global.cacheInterval = stoul(global.second.data());
_global.cacheInterval *= 1000;
} else {
LOG(warning) << " Value \"" << global.first << "\" not recognized. Omitting";
}
......
......@@ -22,6 +22,7 @@ typedef struct {
std::string mqttPrefix;
std::string tempdir;
uint32_t threads;
unsigned int cacheInterval;
boost::log::trivial::severity_level logLevelFile;
boost::log::trivial::severity_level logLevelCmd;
} global_t;
......@@ -31,7 +32,7 @@ typedef struct {
*/
class Configurator {
public:
Configurator() {}
Configurator() : _cacheInterval(900000) {}
virtual ~Configurator() {}
/**
......@@ -43,6 +44,7 @@ public:
virtual void setGlobalSettings(const global_t& globalSettings) {
_mqttPrefix = globalSettings.mqttPrefix;
_cacheInterval = globalSettings.cacheInterval;
}
std::vector<Sensor*>& getSensors() {
......@@ -51,6 +53,7 @@ public:
protected:
std::string _mqttPrefix;
unsigned int _cacheInterval;
std::vector<Sensor*> _sensors;
boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
};
......
......@@ -85,13 +85,14 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
if (s->getName() == sensor) {
uint64_t avg = 0;
const reading_t * const cache = s->getCache();
unsigned size = s->getCacheSize();
for(int i = 0; i < CACHE_SIZE; i++) {
for(unsigned i = 0; i < size; i++) {
avg += cache[i].value;
}
avg /= CACHE_SIZE;
avg /= size;
response = plugin + "::" + sensor + " Average of last " + std::to_string(CACHE_SIZE) + " values is " + std::to_string(avg);
response = plugin + "::" + sensor + " Average of last " + std::to_string(size) + " values is " + std::to_string(avg);
connection->set_status(server::connection::ok);
goto end;
}
......
......@@ -12,21 +12,21 @@
#include <functional>
Sensor::Sensor(const std::string name) :
_name(name), _mqtt(""), _keepRunning(0), _minValues(1), _interval(1000) {
_name(name), _mqtt(""), _keepRunning(0), _minValues(1), _interval(1000),
_cacheInterval(900000), _cacheSize(0), _cacheIndex(0) {
_latestValue.timestamp = 0;
_latestValue.value = 0;
_cacheIndex = 0;
for(int i = 0; i < CACHE_SIZE; i++) {
_cache[i] = _latestValue;
}
_cache = NULL;
_timer = NULL;
_readingQueue = NULL;
}
Sensor::~Sensor() {
if(_cache) {
delete[] _cache;
}
if(_readingQueue) {
delete _readingQueue;
}
......@@ -67,6 +67,14 @@ void Sensor::setInterval(unsigned interval) {
_interval = interval;
}
unsigned Sensor::getCacheSize() const {
return _cacheSize;
}
void Sensor::setCacheInterval(unsigned cacheInterval) {
_cacheInterval = cacheInterval;
}
const reading_t * const Sensor::getCache() const {
return _cache;
}
......@@ -84,6 +92,15 @@ void Sensor::pushReadingQueue(reading_t *reads, std::size_t count) const {
}
void Sensor::init(boost::asio::io_service& io) {
if(!_cache) {
_cacheSize = _cacheInterval / _interval + 1;
_cache = new reading_t[_cacheSize];
for(unsigned i = 0; i < _cacheSize; i++) {
_cache[i] = _latestValue; //_latestValue should equal (0,0) at this point
}
}
if(!_readingQueue) {
_readingQueue = new boost::lockfree::spsc_queue<reading_t>(1024);
}
......@@ -95,5 +112,5 @@ void Sensor::init(boost::asio::io_service& io) {
void Sensor::storeReading(reading_t reading) {
_readingQueue->push(reading);
_cache[_cacheIndex] = reading;
_cacheIndex = (_cacheIndex + 1) % CACHE_SIZE;
_cacheIndex = (_cacheIndex + 1) % _cacheSize;
}
......@@ -14,8 +14,6 @@
#include "Logging.h"
#define CACHE_SIZE 5
typedef struct {
uint64_t value;
uint64_t timestamp;
......@@ -45,6 +43,10 @@ public:
unsigned getInterval() const;
void setInterval(unsigned interval);
unsigned getCacheSize() const;
void setCacheInterval(unsigned cacheInterval);
const reading_t * const getCache() const;
const std::size_t getSizeOfReadingQueue() const;
......@@ -68,9 +70,11 @@ protected:
int _keepRunning;
unsigned int _minValues;
unsigned int _interval;
unsigned int _cacheInterval;
unsigned int _cacheSize;
unsigned int _cacheIndex;
reading_t * _cache;
reading_t _latestValue;
reading_t _cache[CACHE_SIZE];
boost::asio::deadline_timer* _timer;
boost::lockfree::spsc_queue<reading_t>* _readingQueue;
boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
......
......@@ -278,6 +278,7 @@ int main(int argc, char** argv) {
LOG(info) << " Daemonize: Disabled";
}
LOG(info) << " Write-Dir: " << globalSettings.tempdir;
LOG(info) << " CacheInterval: " << globalSettings.cacheInterval / 1000 << " [s]";
//Init all sensors
LOG(info) << "Init sensors...";
......
......@@ -56,7 +56,14 @@ bool BACnetConfigurator::readConfig(std::string cfgPath) {
LOG(debug) << " apdu_retries " << apdu_retries;
} else if (boost::iequals(global.first, "mqttprefix")) {
_mqttPrefix = global.second.data();
if (_mqttPrefix[_mqttPrefix.length()-1] != '/') {
_mqttPrefix.append("/");
}
LOG(debug) << " Using own MQTT-Prefix " << _mqttPrefix;
} else if (boost::iequals(global.first, "cacheInterval")) {
_cacheInterval = stoul(global.second.data());
LOG(debug) << " Using own caching interval " << _cacheInterval << " [s]";
_cacheInterval *= 1000;
} else {
LOG(error) << " Value \"" << global.first << "\" not recognized. Omitting...";
}
......@@ -166,6 +173,8 @@ bool BACnetConfigurator::readSensor(BACnetSensor& sensor, boost::property_tree::
}
}
sensor.setCacheInterval(_cacheInterval);
LOG(debug) << " MQTTtopic:" << sensor.getMqtt();
LOG(debug) << " Interval : " << sensor.getInterval();
LOG(debug) << " minValues: " << sensor.getMinValues();
......
......@@ -42,7 +42,14 @@ bool IPMIConfigurator::readConfig(std::string cfgPath) {
LOG(debug) << " RetransmissionTimeout " << _globalHost.retransmissionTimeout;
} else if (boost::iequals(global.first, "mqttprefix")) {
_mqttPrefix = global.second.data();
if (_mqttPrefix[_mqttPrefix.length()-1] != '/') {
_mqttPrefix.append("/");
}
LOG(debug) << " Using own MQTT-Prefix " << _mqttPrefix;
} else if (boost::iequals(global.first, "cacheInterval")) {
_cacheInterval = stoul(global.second.data());
LOG(debug) << " Using own caching interval " << _cacheInterval << " [s]";
_cacheInterval *= 1000;
} else {
LOG(error) << " Value \"" << global.first << "\" not recognized. Omitting...";
}
......@@ -152,6 +159,8 @@ bool IPMIConfigurator::readSensor(DCDB::IPMISensor& sensor, boost::property_tree
}
}
sensor.setCacheInterval(_cacheInterval);
LOG(debug) << " MQTTSuffix:" << sensor.getMqtt();
LOG(debug) << " Interval : " << sensor.getInterval();
LOG(debug) << " minValues: " << sensor.getMinValues();
......
......@@ -32,7 +32,6 @@ bool PDUConfigurator::readConfig(std::string cfgPath) {
boost::property_tree::read_info(cfgPath, cfg);
//read global variables (if present overwrite those from global.conf)
//currently only overwriting of mqttPrefix is supported
boost::optional<boost::property_tree::iptree&> globalVals = cfg.get_child_optional("global");
if (globalVals) {
BOOST_FOREACH(boost::property_tree::iptree::value_type &global, cfg.get_child("global")) {
......@@ -42,6 +41,10 @@ bool PDUConfigurator::readConfig(std::string cfgPath) {
_mqttPrefix.append("/");
}
LOG(debug) << " Using own MQTT-Prefix " << _mqttPrefix;
} else if (boost::iequals(global.first, "cacheInterval")) {
_cacheInterval = stoul(global.second.data());
LOG(debug) << " Using own caching interval " << _cacheInterval << " [s]";
_cacheInterval *= 1000;
} else {
LOG(error) << " Value \"" << global.first << "\" not recognized. Omitting...";
}
......@@ -142,6 +145,7 @@ bool PDUConfigurator::readSensor(PDUSensor& sensor, boost::property_tree::iptree
LOG(warning) << " Value \"" << val.first << "\" not recognized. Omitting...";
}
}
sensor.setCacheInterval(_cacheInterval);
LOG(debug) << " MQTT : " << sensor.getMqtt();
LOG(debug) << " Interval : " << sensor.getInterval();
......
......@@ -69,7 +69,6 @@ bool PerfeventConfigurator::readConfig(std::string cfgPath) {
boost::property_tree::read_info(cfgPath, cfg);
//read global variables (if present overwrite those from global.conf)
//currently only overwriting of mqttPrefix is supported
boost::optional<boost::property_tree::iptree&> globalVals = cfg.get_child_optional("global");
if (globalVals) {
BOOST_FOREACH(boost::property_tree::iptree::value_type &global, cfg.get_child("global")) {
......@@ -79,6 +78,10 @@ bool PerfeventConfigurator::readConfig(std::string cfgPath) {
_mqttPrefix.append("/");
}
LOG(debug) << " Using own MQTT-Prefix " << _mqttPrefix;
} else if (boost::iequals(global.first, "cacheInterval")) {
_cacheInterval = stoul(global.second.data());
LOG(debug) << " Using own caching interval " << _cacheInterval << " [s]";
_cacheInterval *= 1000;
} else {
LOG(warning) << " Value \"" << global.first << "\" not recognized. Omitting...";
}
......@@ -214,6 +217,8 @@ bool PerfeventConfigurator::readCounter(PerfCounter& counter, boost::property_tr
}
}
counter.setCacheInterval(_cacheInterval);
LOG(debug) << " StartMQTT: " << counter.getMqtt();
LOG(debug) << " Interval : " << counter.getInterval();
LOG(debug) << " minValues: " << counter.getMinValues();
......
......@@ -29,7 +29,6 @@ bool SysfsConfigurator::readConfig(std::string cfgPath) {
boost::property_tree::read_info(cfgPath, cfg);
//read global variables (if present overwrite those from global.conf)
//currently only overwriting of mqttPrefix is supported
boost::optional<boost::property_tree::iptree&> globalVals = cfg.get_child_optional("global");
if (globalVals) {
BOOST_FOREACH(boost::property_tree::iptree::value_type &global, cfg.get_child("global")) {
......@@ -39,6 +38,10 @@ bool SysfsConfigurator::readConfig(std::string cfgPath) {
_mqttPrefix.append("/");
}
LOG(debug) << " Using own MQTT-Prefix " << _mqttPrefix;
} else if (boost::iequals(global.first, "cacheInterval")) {
_cacheInterval = stoul(global.second.data());
LOG(debug) << " Using own caching interval " << _cacheInterval << " [s]";
_cacheInterval *= 1000;
} else {
LOG(warning) << " Value \"" << global.first << "\" not recognized. Omitting...";
}
......@@ -99,18 +102,6 @@ void SysfsConfigurator::readSensor(SysfsSensor& sensor, boost::property_tree::ip
sensor.setFilter(true);
string input = s.second.data();
// Notes on regexes:
// 1. For substitution sed syntax ("s/.../.../") is used. Therefore extended regular expressions (ERE) are used as regex-syntax.
// ERE is closest to Basic RE (BRE), which is actually used by sed, but requires less escaping.
// 2. If a \ ("backslash") is needed in the regex (for escaping), always use \\ ("double backslash") as
// the regex is read in as string and strings also escape with backslash
// 3. Boost uses whitespaces as separators in his property trees. Either use [[:space:]] in the regex or put it in quotation marks ("")
// 4. To be able to reference parts of the match (for substitution) use groups.
// Groups are created with parentheses.
// 5. If using character classes like [[:digit:]] always make sure to use double brackets ("[[" and "]]") or they are not recognized.
// See "https://www.gnu.org/software/sed/manual/html_node/ERE-syntax.html#ERE-syntax" for ERE-syntax
// See "http://www.boost.org/doc/libs/1_65_1/libs/regex/doc/html/boost_regex/format/sed_format.html" for more info substitution syntax.
//check if input has sed format of "s/.../.../" for substitution
regex checkSubstitute("s([^\\\\]{1})([\\S|\\s]*)\\1([\\S|\\s]*)\\1");
smatch matchResults;
......@@ -133,6 +124,9 @@ void SysfsConfigurator::readSensor(SysfsSensor& sensor, boost::property_tree::ip
LOG(warning) << " Value \"" << s.first << "\" not recognized. Omitting...";
}
}
sensor.setCacheInterval(_cacheInterval);
LOG(debug) << " Path : " << sensor.getPath();
LOG(debug) << " MQTT : " << sensor.getMqtt();
LOG(debug) << " Interval: " << sensor.getInterval();
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment