Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

Commit 18aae967 authored by Alessio Netti's avatar Alessio Netti
Browse files

Sensor name auto-publish feature

- dcdbpusher can now be configured to automatically publish its
sensors by communicating the MQTT topic -> name mapping to the
collectagent through appropriate messages
- See the README for further details
- As of now, dcdbpusher performs auto-publish (when required to) and
then proceeds with normal operation; this should be changed so that
the pusher terminates gracefully after having published the sensors
parent 94c8543b
......@@ -14,7 +14,7 @@ DISTFILES_HASHES = bacnet-stack-$(BACNET-STACK_VERSION).tgz|66b69111d91432fa67a7
include $(DCDBCOREPATH)/common.mk
CXXFLAGS = -std=c++11 -DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG -DBOOST_NETWORK_ENABLE_HTTPS -O2 -g -Wall -Wno-unused-function -Wno-deprecated-declarations -Wno-unused-variable -DBOOST_LOG_DYN_LINK -I$(DCDBBASEPATH)/dcdb/include -I$(DCDBDEPLOYPATH)/include -I$(DCDBDEPSPATH)/cpp-netlib-0.12.0-final/deps/asio/asio/include
LIBS = -L../deps/mosquitto_build/lib -L$(DCDBDEPLOYPATH)/lib/ -ldl -lmosquitto -lboost_system -lboost_thread -lboost_log_setup -lboost_log -lpthread -lcrypto -lssl -lcppnetlib-server-parsers -lcppnetlib-uri -rdynamic
LIBS = -L../deps/mosquitto_build/lib -L$(DCDBDEPLOYPATH)/lib/ -ldl -lmosquitto -lboost_system -lboost_thread -lboost_log_setup -lboost_log -lboost_regex -lpthread -lcrypto -lssl -lcppnetlib-server-parsers -lcppnetlib-uri -rdynamic
OBJS = src/dcdbpusher.o src/Configuration.o src/MQTTPusher.o src/HttpsServer.o
PLUGINS = procfs pdu sysfs ipmi bacnet snmp
......
......@@ -66,6 +66,7 @@ Please have a look at the provided `config/global.conf` example to get familiar
| global | Wrapper structure for the global values.
| mqttBroker | Define address and port of the MQTT-broker which collects the messages (sensor values) send by dcdbpusher.
| mqttprefix | To not rewrite a full MQTT-topic for every sensor one can specify here a consistent prefix.
| sensorpattern | pattern used to perform automatic sensor name publishing. See the corresponding [section](#autopublish) for more information.
| threads | Specify how many threads should be created to handle the sensors async. Default value of threads is 1. Note that the MQTTPusher always starts an extra thread. So the actual number of started threads is always one more than defined here. Specifying not enough threads can result in a delay for some sensors until they are read.
| verbosity | Level of detail in the logfile (dcdb.log). Set to a value between 5 (all log-messages, default) and 0 (only fatal messages). NOTE: level of verbosity for the command-line log can be set via the -v flag independently when invoking dcdbpusher.
| daemonize | Set to 'true' if dcdbpusher should run detached as daemon. Default is false.
......@@ -134,7 +135,7 @@ PUT https://localhost:8000/bacnet/stop?authkey=myToken
## MQTT topic <a name="mqttTopic"></a>
For communication between the different DCDB-components (database, dcdbpusher) the [MQTT protocol](https://mqtt.org/) is used. In order to identify each sensor, everyone has to have a unique MQTT topic assigned. A MQTT topic for DCDB consists of exactly 128 bits (= 32 hex characters), not including '/' separators. The topic for a sensor is build by appending up to 4 parts:
For communication between the different DCDB-components (database, dcdbpusher) the [MQTT protocol](https://mqtt.org/) is used. In order to identify each sensor, everyone has to have a unique MQTT topic assigned. A MQTT topic for DCDB consists of exactly 112 bits (= 28 hex characters), not including '/' separators. The topic for a sensor is built by appending up to 4 parts:
1. mqttprefix (e.g. /00112233445566778899AA)
2. mqttpart of entity (if supported by plugin, e.g. /BB)
3. mqttpart of group (e.g. /1122)
......@@ -142,6 +143,32 @@ For communication between the different DCDB-components (database, dcdbpusher) t
Then the topic for the sensor is /00112233445566778899AA/BB/1122/3344.
### Automatic sensor name publishing <a name="autopublish"></a>
In order to perform queries through the *dcdbquery* tool for a certain sensor, a mapping from its MQTT topic to the desired displayed name must be supplied through the *dcdbconfig* tool, which stores this information in the underlying Cassandra datastore.
However, dcdbpusher can also handle this task automatically, and perform the publishing of all configured sensors. To enable this feature, users must use the *sensorpattern* global configuration parameter, which can also be supplied through the -a command line option.
Such sensor pattern is a string defining the naming scheme to be used by dcdbpusher: it is composed of a fixed part, which will be present in the names of all sensors, and of several wildcards, which are automatically replaced by dcdbpusher with the information of the specific sensor. Current supported wildcards are:
* \<sensor\>: the sensor's name as set in the configuration file of the corresponding plugin;
* \<group\>: the name of the sensor group to which the single sensor belongs;
* \<plugin\>: the name of the plugin managing the sensor group.
Note that the \<sensor\> wildcard *must* be supplied in order for the sensor pattern to be considered valid. If not, the sensor pattern will be discarded and automatic sensor name publishing will not be performed. An example of a valid sensor pattern is the following:
```
myHostname.<group>.<sensor>
```
This sensor pattern will include the hostname (in this case, *myHostname*) to distinguish between the sensors of different hosts, plus the sensor group and name. For the *MemFree* and *nr_alloc_batch* sensors defined in the default [ProcFS config file](config/procfs.conf), this pattern will produce the following names:
```
myHostname.meminfo.MemFree
myHostname.vmstat.nr_alloc_batch
```
It is advised to always include the group name together with the sensor name in the pattern, as dcdbpusher does not perform any checks on the uniqueness of sensor names.
# Plugins <a name ="plugins"></a>
The core of dcdbpusher is responsible of collecting all the values read by the sensors and sending them to the database. However, the main functionality of the sensors comes from the various plugins. Every plugin corresponds to a special sensor functionality.
......
......@@ -31,6 +31,7 @@ Configuration::Configuration(const std::string& cfgFilePath) :
_global.logLevelFile = boost::log::trivial::trace;
_global.logLevelCmd = boost::log::trivial::info;
_global.pluginSettings.sensorPattern = "";
_global.pluginSettings.mqttPrefix = "";
_global.pluginSettings.tempdir = "./";
_global.pluginSettings.cacheInterval = 900000;
......@@ -79,6 +80,8 @@ bool Configuration::readGlobal() {
if (_global.pluginSettings.mqttPrefix[_global.pluginSettings.mqttPrefix.length()-1] != '/') {
_global.pluginSettings.mqttPrefix.append("/");
}
} else if (boost::iequals(global.first, "sensorpattern")) {
_global.pluginSettings.sensorPattern = global.second.data();
} else if (boost::iequals(global.first, "tempdir")) {
_global.pluginSettings.tempdir = global.second.data();
if (_global.pluginSettings.tempdir[_global.pluginSettings.tempdir.length()-1] != '/') {
......
......@@ -9,14 +9,16 @@
#include <iostream>
#include <string>
#include <unistd.h>
#include <boost/regex.hpp>
#include "timestamp.h"
#define LOGM(sev) LOG(sev) << "Mosquitto: "
MQTTPusher::MQTTPusher(int brokerPort, const std::string& brokerHost,
const std::string& mqttPrefix, pluginVector_t& plugins) :
const std::string& mqttPrefix, const std::string& sensorPattern, pluginVector_t& plugins) :
_brokerPort(brokerPort),
_brokerHost(brokerHost),
_sensorPattern(sensorPattern),
_plugins(plugins),
_connected(false),
_keepRunning(true),
......@@ -72,36 +74,39 @@ void MQTTPusher::push() {
LOGM(info) << "Connection established!";
}
}
//collect sensor-data
reading_t* reads = new reading_t[1024];
std::size_t totalCount = 0;
while (_keepRunning || totalCount) {
if (_doHalt) {
_halted = true;
sleep(2);
continue;
}
_halted = false;
totalCount = 0;
for(auto& p : _plugins) {
if(_doHalt) {
//for faster response
break;
}
for(auto g : p.configurator->getSensorGroups()) {
for(auto s : g->getSensors()) {
if (s->getSizeOfReadingQueue() >= g->getMinValues()) {
sendReadings(s, reads, totalCount);
}
}
}
}
int mosqErr;
if ((mosqErr = mosquitto_loop(_mosq, -1, 1)) != MOSQ_ERR_SUCCESS) {
LOGM(error) << "Error in mosquitto_loop: " << mosquitto_strerror(mosqErr);
}
}
//Performing sensor name auto-publish if necessary
sendMappings();
//collect sensor-data
reading_t *reads = new reading_t[1024];
std::size_t totalCount = 0;
while (_keepRunning || totalCount) {
if (_doHalt) {
_halted = true;
sleep(2);
continue;
}
_halted = false;
totalCount = 0;
for (auto &p : _plugins) {
if (_doHalt) {
//for faster response
break;
}
for (auto g : p.configurator->getSensorGroups()) {
for (auto s : g->getSensors()) {
if (s->getSizeOfReadingQueue() >= g->getMinValues()) {
sendReadings(s, reads, totalCount);
}
}
}
}
int mosqErr;
if ((mosqErr = mosquitto_loop(_mosq, -1, 1)) != MOSQ_ERR_SUCCESS) {
LOGM(error) << "Error in mosquitto_loop: " << mosquitto_strerror(mosqErr);
}
}
mosquitto_disconnect(_mosq);
}
......@@ -144,3 +149,37 @@ void MQTTPusher::sendReadings(SensorBase* s, reading_t* reads, std::size_t& tota
}
}
}
bool MQTTPusher::sendMappings() {
boost::regex sensorReg(SENSOR_PATTERN), groupReg(GROUP_PATTERN), pluginReg(PLUGIN_PATTERN);
boost::cmatch match;
if(_sensorPattern == "")
return false;
else if ( !boost::regex_search(_sensorPattern.c_str(), match, sensorReg) ) {
LOGM(error) << "Invalid sensor naming pattern. You must at least include " << SENSOR_PATTERN << "!";
return true;
}
std::string topic, name;
unsigned int publishCtr=0;
for(auto& p: _plugins)
for(auto g: p.configurator->getSensorGroups())
for(auto s: g->getSensors()) {
topic = std::string(DCDB_MAP) + s->getMqtt();
name = _sensorPattern;
name = boost::regex_replace(name, sensorReg, s->getName());
name = boost::regex_replace(name, groupReg, g->getGroupName());
name = boost::regex_replace(name, pluginReg, p.id);
//try to send mapping to the broker
if (mosquitto_publish(_mosq, NULL, topic.c_str(), name.length(), name.c_str(), 1, false) != MOSQ_ERR_SUCCESS) {
LOGM(error) << "Broker not reachable! Only " << publishCtr << " sensors were published.";
_connected = false;
return true;
}
else
publishCtr++;
}
LOGM(info) << "Sensor name auto-publish performed for all sensors!";
return true;
}
......@@ -8,6 +8,11 @@
#ifndef MQTTPUSHER_H_
#define MQTTPUSHER_H_
#define DCDB_MAP "/DCDB_MAP/"
#define SENSOR_PATTERN "<sensor>"
#define GROUP_PATTERN "<group>"
#define PLUGIN_PATTERN "<plugin>"
#include <mosquitto.h>
#include "includes/PluginDefinitions.h"
......@@ -17,7 +22,7 @@
class MQTTPusher {
public:
MQTTPusher(int brokerPort, const std::string& _brokerHost,
const std::string& mqttPrefix, pluginVector_t& plugins);
const std::string& mqttPrefix, const std::string& sensorPattern, pluginVector_t& plugins);
virtual ~MQTTPusher();
void push();
......@@ -44,9 +49,11 @@ public:
private:
void sendReadings(SensorBase* s, reading_t* reads, std::size_t& totalCount);
bool sendMappings();
int _brokerPort;
std::string _brokerHost;
std::string _sensorPattern;
pluginVector_t& _plugins;
struct mosquitto* _mosq;
bool _connected;
......
......@@ -85,11 +85,12 @@ void printSyntax()
012345678901234567890123456789012345678901234567890123456789012345678901234567890
*/
cout << "Usage:" << endl;
cout << " dcdbpusher [-d] [-b<host>] [-p<port>] [-m<string>] [-t<number>] <path/to/configfiles/>" << endl;
cout << " dcdbpusher [-d] [-a<string>] [-b<host>] [-p<port>] [-m<string>] [-t<number>] <path/to/configfiles/>" << endl;
cout << " dcdbpusher -h" << endl;
cout << endl;
cout << "Options:" << endl;
cout << " -a <string> Enable sensor name auto-publish using the specified string as pattern" << endl;
cout << " -b <host> MQTT broker" << endl;
cout << " -p <port> MQTT broker port" << endl;
cout << " -m <string> MQTT topic prefix" << endl;
......@@ -116,7 +117,7 @@ int main(int argc, char** argv) {
}
//define allowed command-line options once
const char opts[] = "b:p:m:t:v:w:c:dh";
const char opts[] = "a:b:p:m:t:v:w:c:dh";
//check if help flag specified
char c;
......@@ -170,6 +171,9 @@ int main(int argc, char** argv) {
while ((c = getopt(argc, argv, opts)) != -1) {
switch (c)
{
case 'a':
pluginSettings.sensorPattern = optarg;
break;
case 'b':
globalSettings.brokerHost = optarg;
break;
......@@ -261,7 +265,7 @@ int main(int argc, char** argv) {
#endif
//MQTTPusher and Https server get their own threads
_mqttPusher = new MQTTPusher(globalSettings.brokerPort, globalSettings.brokerHost, pluginSettings.mqttPrefix, _configuration->getPlugins());
_mqttPusher = new MQTTPusher(globalSettings.brokerPort, globalSettings.brokerHost, pluginSettings.mqttPrefix, pluginSettings.sensorPattern, _configuration->getPlugins());
_httpsServer = new HttpsServer(restAPISettings, _configuration->getPlugins(), _mqttPusher, io);
_configuration->readAuthkeys(_httpsServer);
......
......@@ -14,6 +14,7 @@
#include "SensorGroupTemplate.h"
typedef struct {
std::string sensorPattern;
std::string mqttPrefix;
std::string tempdir;
unsigned int cacheInterval;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment