Commit 626c6c7c authored by Alessio Netti's avatar Alessio Netti
Browse files

Auto-publish changes

- Auto-publish has been split in two parts: construction of the
sensor names is performed within the Configurators, whereas the MQTT
messages are sent from the MQTTPusher
- This way, sensors always have the correct names upon plugin reloads
- Will make switching to textual MQTT topics easier
parent 68a61f34
......@@ -15,7 +15,7 @@ include $(DCDBCOREPATH)/common.mk
VERSION = $(shell git describe --long|sed 's/-\([0-9]*\)/.\1/')
CXXFLAGS = -std=c++11 -DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG -DBOOST_NETWORK_ENABLE_HTTPS -O2 -g -Wall -Wno-unused-function -Wno-deprecated-declarations -Wno-unused-variable -DBOOST_LOG_DYN_LINK -I$(DCDBBASEPATH)/dcdb/include -I$(DCDBDEPLOYPATH)/include -I$(DCDBDEPSPATH)/cpp-netlib-0.12.0-final/deps/asio/asio/include -DVERSION=\"$(VERSION)\"
CXXFLAGS = -std=c++11 -DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG -DBOOST_NETWORK_ENABLE_HTTPS -O2 -g -Wall -Wno-unused-function -Wno-unused-local-typedef -Wno-deprecated-declarations -Wno-unused-variable -DBOOST_LOG_DYN_LINK -I$(DCDBBASEPATH)/dcdb/include -I$(DCDBDEPLOYPATH)/include -I$(DCDBDEPSPATH)/cpp-netlib-0.12.0-final/deps/asio/asio/include -DVERSION=\"$(VERSION)\"
LIBS = -L../deps/mosquitto_build/lib -L$(DCDBDEPLOYPATH)/lib/ -ldl -lmosquitto -lboost_system -lboost_thread -lboost_log_setup -lboost_log -lboost_regex -lpthread -lcrypto -lssl -lcppnetlib-server-parsers -lcppnetlib-uri -rdynamic
OBJS = src/dcdbpusher.o src/Configuration.o src/MQTTPusher.o src/HttpsServer.o src/analytics/AnalyticsManager.o src/analytics/SensorNavigator.o
......@@ -91,31 +91,31 @@ libdcdbplugin_sysfs.$(LIBEXT): src/sensors/sysfs/SysfsSensorGroup.o src/sensors/
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex
libdcdbplugin_perfevent.$(LIBEXT): src/sensors/perfevent/PerfSensorGroup.o src/sensors/perfevent/PerfeventConfigurator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex
libdcdbplugin_ipmi.$(LIBEXT): src/sensors/ipmi/IPMISensorGroup.o src/sensors/ipmi/IPMIHost.o src/sensors/ipmi/IPMIConfigurator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lfreeipmi -lboost_regex
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex -lfreeipmi
libdcdbplugin_pdu.$(LIBEXT): src/sensors/pdu/PDUSensorGroup.o src/sensors/pdu/PDUUnit.o src/sensors/pdu/PDUConfigurator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lcrypto -lssl -lboost_log -lboost_system
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lcrypto -lssl -lboost_log -lboost_regex -lboost_system
libdcdbplugin_bacnet.$(LIBEXT): src/sensors/bacnet/BACnetSensorGroup.o src/sensors/bacnet/BACnetClient.o src/sensors/bacnet/BACnetConfigurator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lbacnet
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex -lbacnet
libdcdbplugin_snmp.$(LIBEXT): src/sensors/snmp/SNMPSensorGroup.o src/sensors/snmp/SNMPConnection.o src/sensors/snmp/SNMPConfigurator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lnetsnmp -lnetsnmpagent
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex -lnetsnmp -lnetsnmpagent
libdcdbplugin_procfs.$(LIBEXT): src/sensors/procfs/ProcfsSensorGroup.o src/sensors/procfs/ProcfsParser.o src/sensors/procfs/ProcfsConfigurator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex
libdcdbplugin_tester.$(LIBEXT): src/sensors/tester/TesterSensorGroup.o src/sensors/tester/TesterConfigurator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex
libdcdbplugin_gpfsmon.$(LIBEXT): src/sensors/gpfsmon/GpfsmonSensorGroup.o src/sensors/gpfsmon/GpfsmonConfigurator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex
#libdcdbplugin_opa.$(LIBEXT): src/sensors/opa/OpaSensorGroup.o src/sensors/opa/OpaConfigurator.o
# $(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lopamgt -libverbs -libumad -lssl
# $(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex -lopamgt -libverbs -libumad -lssl
libdcdbanalyzer_average.$(LIBEXT): src/analytics/analyzers/average/AverageAnalyzer.o src/analytics/analyzers/average/AverageConfigurator.o src/analytics/SensorNavigator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex
......@@ -9,7 +9,6 @@
#include <iostream>
#include <string>
#include <unistd.h>
#include <boost/regex.hpp>
#include "timestamp.h"
#define LOGM(sev) LOG(sev) << "Mosquitto: "
......@@ -79,6 +78,9 @@ void MQTTPusher::push() {
}
}
//Performing auto-publish if necessary
sendMappings();
computeMsgRate();
//collect sensor-data
reading_t* reads = new reading_t[SensorBase::QUEUE_MAXLIMIT];
......@@ -178,25 +180,8 @@ void MQTTPusher::sendReadings(SensorBase& s, reading_t* reads, std::size_t& tota
}
bool MQTTPusher::sendMappings() {
//connect to broker (if necessary)
while (_keepRunning && !_connected) {
if (mosquitto_connect(_mosq, _brokerHost.c_str(), _brokerPort, 1000) != MOSQ_ERR_SUCCESS) {
LOGM(error) << "Could not connect to MQTT broker " << _brokerHost << ":" << _brokerPort;
sleep(1);
} else {
_connected = true;
LOGM(info) << "Connection established!";
}
}
boost::regex sensorReg(SENSOR_PATTERN), groupReg(GROUP_PATTERN), pluginReg(PLUGIN_PATTERN);
boost::cmatch match;
if(_sensorPattern == "")
return false;
else if ( !boost::regex_search(_sensorPattern.c_str(), match, sensorReg) ) {
LOGM(error) << "Invalid sensor naming pattern. You must at least include " << SENSOR_PATTERN << "!";
return true;
}
std::string topic, name;
unsigned int publishCtr=0;
......@@ -205,15 +190,9 @@ bool MQTTPusher::sendMappings() {
for(auto& g: p.configurator->getSensorGroups())
for(auto& s: g->getSensors()) {
topic = std::string(DCDB_MAP) + s->getMqtt();
name = _sensorPattern;
name = boost::regex_replace(name, sensorReg, s->getName());
name = boost::regex_replace(name, groupReg, g->getGroupName());
name = boost::regex_replace(name, pluginReg, p.id);
name = s->getName();
// Setting the auto-publish name back to the sensor
s->setName(name);
//try to send mapping to the broker
// Try to send mapping to the broker
if (mosquitto_publish(_mosq, NULL, topic.c_str(), name.length(), name.c_str(), _qosLevel, false) != MOSQ_ERR_SUCCESS) {
LOGM(error) << "Broker not reachable! Only " << publishCtr << " sensors were published.";
_connected = false;
......@@ -222,63 +201,25 @@ bool MQTTPusher::sendMappings() {
else
publishCtr++;
}
LOGM(info) << "Sensor name auto-publish performed for all sensors!";
return true;
}
bool MQTTPusher::sendAnalyticsMappings() {
//connect to broker (if necessary)
while (_keepRunning && !_connected) {
if (mosquitto_connect(_mosq, _brokerHost.c_str(), _brokerPort, 1000) != MOSQ_ERR_SUCCESS) {
LOGM(error) << "Could not connect to MQTT broker " << _brokerHost << ":" << _brokerPort;
sleep(1);
} else {
_connected = true;
LOGM(info) << "Connection established!";
}
}
boost::regex sensorReg(SENSOR_PATTERN), groupReg(GROUP_PATTERN), pluginReg(PLUGIN_PATTERN);
boost::cmatch match;
if(_sensorPattern == "")
return false;
else if ( !boost::regex_search(_sensorPattern.c_str(), match, sensorReg) ) {
LOGM(error) << "Invalid sensor naming pattern. You must at least include " << SENSOR_PATTERN << "!";
return true;
}
// Performing auto-publish for analytics output sensors
std::string topic, name;
unsigned int publishCtr=0;
for(auto& p: _analyticsPlugins)
for(auto& a: p.configurator->getAnalyzers())
for(auto& u: a->getUnits())
for(auto& s: u->getBaseOutputs()) {
topic = std::string(DCDB_MAP) + s->getMqtt();
name = s->getName();
// If the unit is related to a system component all of its sensors will have their names adjusted already
// If it is root, then we apply normal auto-publish. This means that adding the analyzer's name
// or plugin ID do not always work, as of now
if( u->getName() == "root") {
name = _sensorPattern;
name = boost::regex_replace(name, sensorReg, s->getName());
name = boost::regex_replace(name, groupReg, a->getName());
name = boost::regex_replace(name, pluginReg, p.id);
}
s->setName(name);
name = s->getName();
//try to send mapping to the broker
// Try to send mapping to the broker
if (mosquitto_publish(_mosq, NULL, topic.c_str(), name.length(), name.c_str(), _qosLevel, false) != MOSQ_ERR_SUCCESS) {
LOGM(error) << "Broker not reachable! Only " << publishCtr << " analytics output sensors were published.";
LOGM(error) << "Broker not reachable! Only " << publishCtr << " sensors were published.";
_connected = false;
return true;
}
else
publishCtr++;
}
LOGM(info) << "Sensor name auto-publish performed for all analytics output sensors!";
LOGM(info) << "Sensor name auto-publish performed for all sensors!";
return true;
}
......
......@@ -9,9 +9,6 @@
#define MQTTPUSHER_H_
#define DCDB_MAP "/DCDB_MAP/"
#define SENSOR_PATTERN "<sensor>"
#define GROUP_PATTERN "<group>"
#define PLUGIN_PATTERN "<plugin>"
#include <mosquitto.h>
#include "includes/PluginDefinitions.h"
......@@ -30,7 +27,6 @@ public:
void push();
bool sendMappings();
bool sendAnalyticsMappings();
void start() {
_keepRunning = true;
......
......@@ -12,7 +12,6 @@
#include "AnalyzerInterface.h"
#include "../../includes/ConfiguratorInterface.h"
//TODO: fix auto-publish behavior on plugin reload
/**
* Interface to configurators for data analyzer plugins
*
......
......@@ -52,6 +52,8 @@ protected:
const string ALL_CLAUSE = "all";
const string ALL_REC_CLAUSE = "all-recursive";
const std::string SENSOR_PATTERN = "<sensor>";
const std::string GROUP_PATTERN = "<group>";
public:
......@@ -64,6 +66,7 @@ public:
_baseName("INVALID"),
_cfgPath(""),
_mqttPrefix(""),
_sensorPattern(""),
_cacheInterval(900000) {}
/**
......@@ -98,6 +101,7 @@ public:
*/
virtual void setGlobalSettings(const pluginSettings_t& pluginSettings) final {
_mqttPrefix = pluginSettings.mqttPrefix;
_sensorPattern = pluginSettings.sensorPattern;
_cacheInterval = pluginSettings.cacheInterval;
}
......@@ -339,8 +343,7 @@ protected:
if(!an.getTemplate()) {
vector <shared_ptr<UnitTemplate<SBase>>> *units = NULL;
try {
units = _unitGen.generateUnits(protoInputs, protoOutputs, inputMode, _mqttPrefix + an.getMqttPart(),
!an.getStreaming());
units = _unitGen.generateUnits(protoInputs, protoOutputs, inputMode, _mqttPrefix + an.getMqttPart(), !an.getStreaming());
}
catch (const std::exception &e) {
LOG(error) << _analyzerName << " " << an.getName() << ": Error when creating units: " << e.what();
......@@ -349,6 +352,7 @@ protected:
}
for (auto &u: *units) {
constructSensorNames(*u, an);
if (an.getStreaming()) {
LOG(debug) << " Unit \"" << u->getName() << "\"";
for (const auto &i : u->getInputs())
......@@ -435,6 +439,8 @@ protected:
_cacheInterval = stoul(global.second.data());
LOG(debug) << " Using own caching interval " << _cacheInterval << " [s]";
_cacheInterval *= 1000;
} else if (boost::iequals(global.first, "sensorpattern")) {
_sensorPattern = global.second.data();
}
}
global(config.get_child("global"));
......@@ -442,6 +448,41 @@ protected:
return true;
}
//TODO: switch to textual MQTT topics and use only MQTTPrefix
/**
* @brief Adjusts the names of the sensors
*
* Names are modified according to the sensorPattern specified in the global
* settings. Operates in tandem with the auto-publish feature.
*/
void constructSensorNames(UnitTemplate<SBase>& u, Analyzer& an) {
boost::regex sensorReg(SENSOR_PATTERN), groupReg(GROUP_PATTERN);
boost::cmatch match;
//TODO: move sensorpattern checks somewhere else
if(_sensorPattern == "")
return;
else if (!boost::regex_search(_sensorPattern.c_str(), match, sensorReg)) {
LOG(error) << "Invalid sensor naming pattern " << _sensorPattern << ". You must at least include " << SENSOR_PATTERN << "!";
return;
}
std::string name;
// Performing name construction
for(auto& s: u.getOutputs()) {
name = s->getName();
// If the unit is related to a system component all of its sensors will have their names adjusted already
// If it is root, then we apply normal auto-publish. This means that adding the analyzer's name
// or plugin ID do not always work, as of now
if( u.getName() == "root") {
name = _sensorPattern;
name = boost::regex_replace(name, sensorReg, s->getName());
name = boost::regex_replace(name, groupReg, an.getName());
}
s->setName(name);
}
return;
}
// Instance of a QueryEngine object
QueryEngine& _queryEngine;
// UnitGenerator object used to create units
......@@ -456,6 +497,8 @@ protected:
std::string _cfgPath;
// Default MQTT prefix to be used when creating output sensors
std::string _mqttPrefix;
// String used to construct sensor names
std::string _sensorPattern;
// Interval in seconds for the cache of each sensor
unsigned int _cacheInterval;
// The vector of analyzers, in the form of pointers to AnalyzerInterface objects
......
......@@ -140,6 +140,7 @@ public:
* @param ondemand If True, no unit resolution is performed and a raw unit template is stored
* @return A vector of shared pointers to the generated unit objects
*/
//TODO: if ondemand and root, resolve unit immediately
vector<shared_ptr<UnitTemplate<SBase>>> *generateUnits(vector<shared_ptr<SBase>>& inputs, vector<shared_ptr<SBase>>& outputs, inputMode_t inputMode, string mqttPrefix="", bool ondemand=false) {
// If no outputs are defined, no units can be instantiated
if((inputs.empty() && inputMode==SELECTIVE) || outputs.empty())
......
......@@ -355,10 +355,6 @@ int main(int argc, char** argv) {
}
}
//Performing sensor name auto-publish if necessary
//This must be done before the SensorNavigator is created and the data analytics manager is initialized,
//so that they have access to finalized sensor names
_mqttPusher->sendMappings();
// Preparing the SensorNavigator
bool failedTree = false;
std::shared_ptr<SensorNavigator> navigator = std::make_shared<SensorNavigator>();
......@@ -395,8 +391,6 @@ int main(int argc, char** argv) {
LOG(info) << "Starting analyzers...";
_analyticsManager->start();
_mqttPusher->sendAnalyticsMappings();
}
LOG(info) << "Sensors started!";
......
......@@ -15,6 +15,7 @@
#include <boost/algorithm/string.hpp>
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/info_parser.hpp>
#include <boost/regex.hpp>
#include "ConfiguratorInterface.h"
#include "SensorBase.h"
#include "SensorGroupTemplate.h"
......@@ -24,10 +25,6 @@
#include <sstream>
#include <iomanip>
#include <iostream>
#include <sstream>
#include <iomanip>
//#define STRCMP(node,str) boost::iequals(node.first,str) //DEPRECATED
#define CFG_VAL boost::property_tree::iptree&
......@@ -57,6 +54,9 @@ protected:
const char CLOSE_SQBRKET = ']';
const char DASH = '-';
const std::string SENSOR_PATTERN = "<sensor>";
const std::string GROUP_PATTERN = "<group>";
public:
ConfiguratorTemplate() :
_entityName("INVALID"),
......@@ -64,6 +64,7 @@ public:
_baseName("INVALID"),
_cfgPath(""),
_mqttPrefix(""),
_sensorPattern(""),
_cacheInterval(900000) {}
ConfiguratorTemplate(const ConfiguratorTemplate&) = delete;
......@@ -251,6 +252,7 @@ public:
}
}
//read of config finished. Now we build the mqtt-topic for every sensor
constructSensorNames();
for(const auto& g : _sensorGroups) {
for(const auto& s : g->getSensors()) {
s->setMqtt(_mqttPrefix + g->getMqttPart() + s->getMqtt());
......@@ -309,6 +311,7 @@ public:
*/
void setGlobalSettings(const pluginSettings_t& pluginSettings) final {
_mqttPrefix = pluginSettings.mqttPrefix;
_sensorPattern = pluginSettings.sensorPattern;
_cacheInterval = pluginSettings.cacheInterval;
derivedSetGlobalSettings(pluginSettings);
......@@ -583,6 +586,8 @@ protected:
_cacheInterval = stoul(global.second.data());
LOG(debug) << " Using own caching interval " << _cacheInterval << " [s]";
_cacheInterval *= 1000;
} else if (boost::iequals(global.first, "sensorpattern")) {
_sensorPattern = global.second.data();
}
}
global(config.get_child("global"));
......@@ -778,12 +783,40 @@ protected:
return cpus;
}
/**
* Adjusts the names of the sensors in generated groups according to the sensorPattern specified in the global
* settings. Operates in tandem with the auto-publish feature.
*/
void constructSensorNames() {
boost::regex sensorReg(SENSOR_PATTERN), groupReg(GROUP_PATTERN);
boost::cmatch match;
if(_sensorPattern == "")
return;
else if (!boost::regex_search(_sensorPattern.c_str(), match, sensorReg)) {
LOG(error) << "Invalid sensor naming pattern " << _sensorPattern << ". You must at least include " << SENSOR_PATTERN << "!";
return;
}
std::string name;
// Performing auto-publish for sensors
for(auto& g: _sensorGroups)
for(auto& s: g->getSensors()) {
name = _sensorPattern;
name = boost::regex_replace(name, sensorReg, s->getName());
name = boost::regex_replace(name, groupReg, g->getGroupName());
// Setting the auto-publish name back to the sensor
s->setName(name);
}
return;
}
std::string _entityName;
std::string _groupName;
std::string _baseName;
std::string _cfgPath;
std::string _mqttPrefix;
std::string _sensorPattern;
unsigned int _cacheInterval;
std::vector<SGroupPtr> _sensorGroupInterfaces;
std::vector<SG_Ptr> _sensorGroups;
......
......@@ -67,7 +67,7 @@ public:
virtual std::vector<SBasePtr>& getSensors() override { return _baseSensors; }
virtual void init(boost::asio::io_service& io) {
virtual void init(boost::asio::io_service& io) override {
SensorGroupInterface::init(io);
for(auto s : _sensors) {
......
......@@ -229,6 +229,7 @@ bool PerfeventConfigurator::readConfig(std::string cfgPath) {
}
}
//read of config finished. Now we build the mqtt-topic for every sensor
constructSensorNames();
for(const auto& g : _sensorGroups) {
for(const auto& s : g->getSensors()) {
s->setMqtt(_mqttPrefix + g->getMqttPart() + s->getMqtt());
......
......@@ -74,6 +74,10 @@ bool ProcfsConfigurator::readConfig(std::string cfgPath) {
return false;
}
}
constructSensorNames();
for(const auto& g : _sensorGroups)
for(const auto& s : g->getSensors())
LOG(debug) << g->getGroupName() << "::" << s->getName() << " using MQTT-topic \"" << s->getMqtt() << "\"";
return true;
}
......@@ -174,7 +178,6 @@ void ProcfsConfigurator::sensorGroup(ProcfsSensorGroup& sGroup, CFG_VAL config)
// The suffix is automatically increased for each core through the metricsCounter vector
else
sensor->setMqtt(_mqttPrefix + formatMqttCPU(sGroup.getMqttPart(), sensor->getCPUId()) + "/" + sensor->getMqtt());
LOG(debug) << sGroup.getGroupName() << "::" << sensor->getName() << " using MQTT-topic \"" << sensor->getMqtt() << "\"";
}
// De-allocating memory
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment