2.12.2021, 9:00 - 11:00: Due to updates GitLab may be unavailable for some minutes between 09:00 and 11:00.

Commit c6498a8b authored by Alessio Netti's avatar Alessio Netti
Browse files

Data Analytics Working Prototype

- First working version of Data Analytics Framework
- A test plugin that performs the sum and max of its input sensors was
implemented
parent 18f03ef4
......@@ -17,10 +17,11 @@ VERSION = $(shell git describe --long|sed 's/-\([0-9]*\)/.\1/')
CXXFLAGS = -std=c++11 -DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG -DBOOST_NETWORK_ENABLE_HTTPS -O2 -g -Wall -Wno-unused-function -Wno-deprecated-declarations -Wno-unused-variable -DBOOST_LOG_DYN_LINK -I$(DCDBBASEPATH)/dcdb/include -I$(DCDBDEPLOYPATH)/include -I$(DCDBDEPSPATH)/cpp-netlib-0.12.0-final/deps/asio/asio/include -DVERSION=\"$(VERSION)\"
LIBS = -L../deps/mosquitto_build/lib -L$(DCDBDEPLOYPATH)/lib/ -ldl -lmosquitto -lboost_system -lboost_thread -lboost_log_setup -lboost_log -lboost_regex -lpthread -lcrypto -lssl -lcppnetlib-server-parsers -lcppnetlib-uri -rdynamic
OBJS = src/dcdbpusher.o src/Configuration.o src/MQTTPusher.o src/HttpsServer.o
OBJS = src/dcdbpusher.o src/Configuration.o src/MQTTPusher.o src/HttpsServer.o src/analytics/AnalyticsManager.o src/analytics/SensorNavigator.o
PLUGINS = procfs pdu sysfs ipmi bacnet snmp gpfsmon tester
ANALYZERS = average
ifeq ($(OS),Darwin)
BACNET_PORT = bsd
LIBEXT = dylib
......@@ -33,30 +34,31 @@ else
PLUGINFLAGS = -fPIC
endif
PLUGIN_LIBS = $(foreach p,$(PLUGINS),libdcdbplugin_$(p).$(LIBEXT))
ANALYZER_LIBS = $(foreach p,$(ANALYZERS),libdcdbanalyzer_$(p).$(LIBEXT))
$(TARGET): $(foreach f,$(DISTFILESPATHS),$(DCDBDEPSPATH)/$(f)/.installed) $(OBJS)
$(CXX) -o $(TARGET) $(OBJS) $(LIBS)
all: $(TARGET) $(PLUGIN_LIBS)
all: $(TARGET) $(PLUGIN_LIBS) $(ANALYZER_LIBS)
debug: CXXFLAGS += -DDEBUG
debug: all
clean:
rm -f $(PLUGIN_LIBS) $(TARGET) $(shell find -name "*.o")
rm -f $(PLUGIN_LIBS) $(ANALYZER_LIBS) $(TARGET) $(shell find -name "*.o")
$(DCDBDEPSPATH)/bacnet-stack-$(BACNET-STACK_VERSION)/.installed: $(DCDBDEPSPATH)/bacnet-stack-$(BACNET-STACK_VERSION)/.patched
$(DCDBDEPSPATH)/bacnet-stack-$(BACNET-STACK_VERSION)/.installed: $(DCDBDEPSPATH)/bacnet-stack-$(BACNET-STACK_VERSION)/.patched
@echo ""
@echo "Building BACNet-Stack..."
cd $(@D) && BACNET_PORT=$(BACNET_PORT) MAKE_DEFINE=-fpic make -j $(MAKETHREADS) library && \
install $(DCDBDEPSPATH)/bacnet-stack-$(BACNET-STACK_VERSION)/lib/libbacnet.a /$(DCDBDEPLOYPATH)/lib/ && touch $(@)
$(DCDBDEPSPATH)/freeipmi-$(FREEIPMI_VERSION)/.installed: $(DCDBDEPSPATH)/freeipmi-$(FREEIPMI_VERSION)/.patched
$(DCDBDEPSPATH)/freeipmi-$(FREEIPMI_VERSION)/.installed: $(DCDBDEPSPATH)/freeipmi-$(FREEIPMI_VERSION)/.patched
@echo ""
@echo "Building FreeIPMI library..."
cd $(@D) && ./configure --prefix=$(DCDBDEPLOYPATH) --without-argp
cd $(@D) && make -j $(MAKETHREADS) && make install && touch $(@)
$(DCDBDEPSPATH)/net-snmp-$(NET-SNMP_VERSION)/.installed: $(DCDBDEPSPATH)/net-snmp-$(NET-SNMP_VERSION)/.patched
$(DCDBDEPSPATH)/net-snmp-$(NET-SNMP_VERSION)/.installed: $(DCDBDEPSPATH)/net-snmp-$(NET-SNMP_VERSION)/.patched
@echo ""
@echo "Building net-SNMP library..."
cd $(@D) && ./configure --prefix=$(DCDBDEPLOYPATH) --with-default-snmp-version=3 --with-sys-contact= --with-sys-location= --with-logfile=none --with-persistent-directory=$(DCDBDEPLOYPATH)/var/net-snmp --disable-embedded-perl --disable-perl-cc-checks --without-perl-modules --disable-agent --disable-applications --disable-manuals --disable-scripts --disable-mibs
......@@ -67,16 +69,19 @@ $(OBJS) : %.o : %.cpp
install_lib: $(PLUGIN_LIBS)
install $^ $(DCDBDEPLOYPATH)/lib/
install_conf: $(foreach p,global $(PLUGINS),config/$(p).conf)
install_analyzer: $(ANALYZER_LIBS)
install $^ $(DCDBDEPLOYPATH)/lib/
install_conf: $(foreach p,global $(PLUGINS) $(ANALYZERS),config/$(p).conf)
install -m 644 $^ $(DCDBDEPLOYPATH)/etc/
install: $(TARGET) install_lib
install: $(TARGET) install_lib install_analyzer
install $(TARGET) $(DCDBDEPLOYPATH)/bin/
@echo "Done with installation."
@echo "====================================="
@echo "To copy the configuration files type:"
@echo " > make install_conf"
src/Sensor.o: CXXFLAGS+= $(PLUGINFLAGS)
src/SensorGroup.o: CXXFLAGS+= $(PLUGINFLAGS)
src/sensors/%.o: CXXFLAGS+= $(PLUGINFLAGS) -I$(DCDBDEPSPATH)/bacnet-stack-$(BACNET-STACK_VERSION)/include -I$(DCDBDEPSPATH)/bacnet-stack-$(BACNET-STACK_VERSION)/ports/$(BACNET_PORT)
......@@ -93,10 +98,10 @@ libdcdbplugin_ipmi.$(LIBEXT): src/sensors/ipmi/IPMISensorGroup.o src/sensors/ipm
libdcdbplugin_pdu.$(LIBEXT): src/sensors/pdu/PDUSensorGroup.o src/sensors/pdu/PDUUnit.o src/sensors/pdu/PDUConfigurator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lcrypto -lssl -lboost_log -lboost_system
libdcdbplugin_bacnet.$(LIBEXT): src/sensors/bacnet/BACnetSensorGroup.o src/sensors/bacnet/BACnetClient.o src/sensors/bacnet/BACnetConfigurator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lbacnet
libdcdbplugin_snmp.$(LIBEXT): src/sensors/snmp/SNMPSensorGroup.o src/sensors/snmp/SNMPConnection.o src/sensors/snmp/SNMPConfigurator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lnetsnmp -lnetsnmpagent
......@@ -105,9 +110,12 @@ libdcdbplugin_procfs.$(LIBEXT): src/sensors/procfs/ProcfsSensorGroup.o src/senso
libdcdbplugin_tester.$(LIBEXT): src/sensors/tester/TesterSensorGroup.o src/sensors/tester/TesterConfigurator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system
libdcdbplugin_gpfsmon.$(LIBEXT): src/sensors/gpfsmon/GpfsmonSensorGroup.o src/sensors/gpfsmon/GpfsmonConfigurator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system
#libdcdbplugin_opa.$(LIBEXT): src/sensors/opa/OpaSensorGroup.o src/sensors/opa/OpaConfigurator.o
# $(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lopamgt -libverbs -libumad -lssl
libdcdbanalyzer_average.$(LIBEXT): src/analytics/analyzers/average/AverageAnalyzer.o src/analytics/analyzers/average/AverageConfigurator.o src/analytics/SensorNavigator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex
global {
mqttPrefix /FF112233445566778899AAB
}
template_analyzer def1 {
interval 1000
minValues 3
mqttPart FF0
duplicate false
streaming true
}
analyzer average1 {
default def1
mqttPart FF0
mqttStart 00
input {
sensor <unit>col_user
sensor <unit-1>MemFree
}
output {
sensor <unit>sum {
mqttsuffix 36
}
sensor <unit>max {
mqttsuffix 37
}
}
}
......@@ -26,6 +26,7 @@ Configuration::Configuration(const std::string& cfgFilePath) :
//set default values for global variables
_global.qosLevel = 1;
_global.daemonize = 0;
_global.hierarchy = "";
_global.brokerHost = "";
_global.brokerPort = 1883;
_global.threads = 1;
......@@ -84,6 +85,8 @@ bool Configuration::readGlobal() {
}
} else if (boost::iequals(global.first, "sensorpattern")) {
_global.pluginSettings.sensorPattern = global.second.data();
} else if (boost::iequals(global.first, "hierarchy")) {
_global.hierarchy = global.second.data();
} else if (boost::iequals(global.first, "tempdir")) {
_global.pluginSettings.tempdir = global.second.data();
if (_global.pluginSettings.tempdir[_global.pluginSettings.tempdir.length()-1] != '/') {
......@@ -340,6 +343,7 @@ bool Configuration::checkMqtt(const std::string& mqtt) {
return false;
}
//TODO: fix bug here
auto returnIt = _mqttTopics.insert(mqtt);
if (!returnIt.second) {
LOG(error) << "MQTT-Topic \"" << mqtt << "\" used twice!";
......
......@@ -15,18 +15,19 @@
#include <boost/log/trivial.hpp>
#include "includes/PluginDefinitions.h"
typedef struct {
int daemonize;
int brokerPort;
int qosLevel;
std::string brokerHost;
uint32_t threads;
unsigned int maxMsgNum;
boost::log::trivial::severity_level logLevelFile;
boost::log::trivial::severity_level logLevelCmd;
pluginSettings_t pluginSettings;
restAPISettings_t restAPISettings;
} global_t;
typedef struct {
int daemonize;
int brokerPort;
int qosLevel;
std::string brokerHost;
std::string hierarchy;
uint32_t threads;
unsigned int maxMsgNum;
boost::log::trivial::severity_level logLevelFile;
boost::log::trivial::severity_level logLevelCmd;
pluginSettings_t pluginSettings;
restAPISettings_t restAPISettings;
} global_t;
/**
* Class responsible of reading the global configuration as well as loading and invoking required dynamic libraries.
......@@ -84,7 +85,7 @@ public:
* @return The boost::log severity level
*/
boost::log::trivial::severity_level translateLogLevel(int logLevel);
/**
* Check if the mqtt-suffix is already in use.
* @param mqtt The MQTT-suffix to check
......
......@@ -104,22 +104,22 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
if (method == "GET") {
if (pathStrs[0] == "help") {
response = "dcdbpusher RESTful API cheatsheet:\n"
" -GET: /help This help message\n"
" /plugins List of currently loaded plugins (Discovery)\n"
" /[plugin]/sensors\n"
" List of currently running sensors which belong\n"
" to the specified plugin (Discovery)\n"
" /[plugin]/[sensor]/avg?interval=[timeInSec]\n"
" Average of last sensor readings from the last\n"
" [interval] seconds or of all cached readings\n"
" if no interval is given\n"
" -PUT: /[plugin]/[start|stop|reload]\n"
" Start/stop the sensors of the plugin or\n"
" reload the plugin configuration\n"
"\n"
"All resources have to be prepended by host:port and need at\n"
"least the query ?authkey=[token] at the end. Multiple queries\n"
"need to be separated by semicolons(';')\n";
" -GET: /help This help message\n"
" /plugins List of currently loaded plugins (Discovery)\n"
" /[plugin]/sensors\n"
" List of currently running sensors which belong\n"
" to the specified plugin (Discovery)\n"
" /[plugin]/[sensor]/avg?interval=[timeInSec]\n"
" Average of last sensor readings from the last\n"
" [interval] seconds or of all cached readings\n"
" if no interval is given\n"
" -PUT: /[plugin]/[start|stop|reload]\n"
" Start/stop the sensors of the plugin or\n"
" reload the plugin configuration\n"
"\n"
"All resources have to be prepended by host:port and need at\n"
"least the query ?authkey=[token] at the end. Multiple queries\n"
"need to be separated by semicolons(';')\n";
} else {
//first check permission
if (!_httpsServer.check_authkey(auth_value, permission::GETReq)) {
......@@ -127,7 +127,7 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
connection->set_status(server::connection::unauthorized);
goto error;
}
if (pathStrs[0] == "plugins") {
if (json) {
boost::property_tree::ptree root, plugins;
......@@ -148,16 +148,16 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
connection->set_status(server::connection::bad_request);
goto error;
}
if (pathStrs[1] == "sensors") {
response = "Plugin not found!";
connection->set_status(server::connection::not_found);
for(auto& p : _httpsServer._plugins) {
if (p.id == pathStrs[0]) {
if (json) {
boost::property_tree::ptree root, sensors;
for(auto g : p.configurator->getSensorGroups()) {
boost::property_tree::ptree group;
for(auto s : g->getSensors()) {
......@@ -180,23 +180,23 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
}
}
} else {
if (pathStrs.size() < 3) {
LOGH(warning) << "Received malformed request: No third path part";
connection->set_status(server::connection::bad_request);
goto error;
}
if (pathStrs[2] != "avg") {
LOGH(warning) << "Unknown action " << pathStrs[2] << " requested";
connection->set_status(server::connection::not_supported);
goto error;
}
std::string sensor = pathStrs[1];
std::string action = pathStrs[2];
uint64_t time = 0;
for (auto& p : queries) {
if (p.first == "interval") {
time = getTimestamp();
......@@ -207,7 +207,7 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
//process actual request
response = "Plugin not found!";
connection->set_status(server::connection::not_found);
for(auto& p : _httpsServer._plugins) {
if (p.id == pathStrs[0]) {
response = "Sensor not found!";
......@@ -226,26 +226,26 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
}
}
} else if (method == "PUT") {
//first check permission
if (!_httpsServer.check_authkey(auth_value, permission::PUTReq)) {
LOGH(warning) << "Provided authentication token has insufficient permissions";
connection->set_status(server::connection::unauthorized);
goto error;
}
if (pathStrs.size() < 2) {
LOGH(warning) << "Received malformed request: No second path part";
connection->set_status(server::connection::bad_request);
goto error;
}
std::string action = pathStrs[1];
//process actual request
response = "Plugin not found!";
connection->set_status(server::connection::not_found);
//switch code depending on selected action
if (action == "start") {
for(auto& p : _httpsServer._plugins) {
......@@ -279,7 +279,7 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
while (!_httpsServer._mqttPusher->isHalted()) {
sleep(1);
}
if (p.configurator->reReadConfig()) {
response = "Plugin " + pathStrs[0] + ": Configuration reloaded";
connection->set_status(server::connection::ok);
......@@ -292,7 +292,7 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
g->init(_httpsServer._io);
g->start();
}
//continue MQTTPusher
_httpsServer._mqttPusher->cont();
break;
......@@ -303,6 +303,21 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
connection->set_status(server::connection::not_supported);
goto error;
}
//Updating the SensorNavigator on plugin changes
QueryEngine& qEngine = QueryEngine::getInstance();
std::shared_ptr<SensorNavigator> navigator = std::make_shared<SensorNavigator>();
vector<std::string> names, topics;
for(const auto& p : _httpsServer._plugins)
for(const auto& g : p.configurator->getSensorGroups())
for(const auto& s : g->getSensors()) {
names.push_back(s->getName());
topics.push_back(s->getMqtt());
}
navigator->buildTree(qEngine.getSensorHierarchy(), &names, &topics);
qEngine.setNavigator(navigator);
qEngine.updated = true;
}
LOGH(info) << "Responding: " << response;
......
......@@ -14,12 +14,14 @@
#define LOGM(sev) LOG(sev) << "Mosquitto: "
MQTTPusher::MQTTPusher(int brokerPort, const std::string& brokerHost, const std::string& sensorPattern, int qosLevel, pluginVector_t& plugins, unsigned int maxNumberOfMessages) :
MQTTPusher::MQTTPusher(int brokerPort, const std::string& brokerHost, const std::string& sensorPattern, int qosLevel,
pluginVector_t& plugins, an_pluginVector_t& aPlugins, unsigned int maxNumberOfMessages) :
_qosLevel(qosLevel),
_brokerPort(brokerPort),
_brokerHost(brokerHost),
_sensorPattern(sensorPattern),
_plugins(plugins),
_analyticsPlugins(aPlugins),
_connected(false),
_keepRunning(true),
_overrideMsgCap(true),
......@@ -47,7 +49,7 @@ MQTTPusher::MQTTPusher(int brokerPort, const std::string& brokerHost, const std:
perror(NULL);
exit(EXIT_FAILURE);
}
mosquitto_threaded_set(_mosq, true);
if (mosquitto_connect(_mosq, _brokerHost.c_str(), _brokerPort, 1000) != MOSQ_ERR_SUCCESS) {
LOGM(error) << "Could not connect to MQTT broker " << _brokerHost << ":" << _brokerPort;
......@@ -77,9 +79,6 @@ void MQTTPusher::push() {
}
}
//Performing sensor name auto-publish if necessary
sendMappings();
computeMsgRate();
//collect sensor-data
reading_t* reads = new reading_t[SensorBase::QUEUE_MAXLIMIT];
......@@ -92,6 +91,7 @@ void MQTTPusher::push() {
}
_halted = false;
totalCount = 0;
// Push sensor data
for(auto& p : _plugins) {
if(_doHalt) {
//for faster response
......@@ -109,6 +109,25 @@ void MQTTPusher::push() {
}
}
}
// Push output analytics sensors
for(auto& p : _analyticsPlugins) {
if(_doHalt) {
break;
}
for(const auto& a : p.configurator->getAnalyzers()) {
for(const auto& u : a->getUnits()) {
for(const auto& s : u->getBaseOutputs()) {
if (s->getSizeOfReadingQueue() >= a->getMinValues()) {
if (_overrideMsgCap || totalCount < _maxNumberOfMessages) {
sendReadings(*s, reads, totalCount);
} else {
break;
}
}
}
}
}
}
int mosqErr;
if ((mosqErr = mosquitto_loop(_mosq, -1, 1)) != MOSQ_ERR_SUCCESS) {
LOGM(error) << "Error in mosquitto_loop: " << mosquitto_strerror(mosqErr);
......@@ -159,6 +178,17 @@ void MQTTPusher::sendReadings(SensorBase& s, reading_t* reads, std::size_t& tota
}
bool MQTTPusher::sendMappings() {
//connect to broker (if necessary)
while (_keepRunning && !_connected) {
if (mosquitto_connect(_mosq, _brokerHost.c_str(), _brokerPort, 1000) != MOSQ_ERR_SUCCESS) {
LOGM(error) << "Could not connect to MQTT broker " << _brokerHost << ":" << _brokerPort;
sleep(1);
} else {
_connected = true;
LOGM(info) << "Connection established!";
}
}
boost::regex sensorReg(SENSOR_PATTERN), groupReg(GROUP_PATTERN), pluginReg(PLUGIN_PATTERN);
boost::cmatch match;
if(_sensorPattern == "")
......@@ -168,35 +198,100 @@ bool MQTTPusher::sendMappings() {
return true;
}
std::string topic, name;
unsigned int publishCtr=0;
for(auto& p: _plugins)
for(auto g: p.configurator->getSensorGroups())
for(auto s: g->getSensors()) {
topic = std::string(DCDB_MAP) + s->getMqtt();
name = _sensorPattern;
name = boost::regex_replace(name, sensorReg, s->getName());
name = boost::regex_replace(name, groupReg, g->getGroupName());
std::string topic, name;
unsigned int publishCtr=0;
// Performing auto-publish for sensors
for(auto& p: _plugins)
for(auto& g: p.configurator->getSensorGroups())
for(auto& s: g->getSensors()) {
topic = std::string(DCDB_MAP) + s->getMqtt();
name = _sensorPattern;
name = boost::regex_replace(name, sensorReg, s->getName());
name = boost::regex_replace(name, groupReg, g->getGroupName());
name = boost::regex_replace(name, pluginReg, p.id);
//try to send mapping to the broker
if (mosquitto_publish(_mosq, NULL, topic.c_str(), name.length(), name.c_str(), _qosLevel, false) != MOSQ_ERR_SUCCESS) {
LOGM(error) << "Broker not reachable! Only " << publishCtr << " sensors were published.";
_connected = false;
return true;
}
else
publishCtr++;
}
LOGM(info) << "Sensor name auto-publish performed for all sensors!";
// Setting the auto-publish name back to the sensor
s->setName(name);
//try to send mapping to the broker
if (mosquitto_publish(_mosq, NULL, topic.c_str(), name.length(), name.c_str(), _qosLevel, false) != MOSQ_ERR_SUCCESS) {
LOGM(error) << "Broker not reachable! Only " << publishCtr << " sensors were published.";
_connected = false;
return true;
}
else
publishCtr++;
}
LOGM(info) << "Sensor name auto-publish performed for all sensors!";
return true;
}
bool MQTTPusher::sendAnalyticsMappings() {
//connect to broker (if necessary)
while (_keepRunning && !_connected) {
if (mosquitto_connect(_mosq, _brokerHost.c_str(), _brokerPort, 1000) != MOSQ_ERR_SUCCESS) {
LOGM(error) << "Could not connect to MQTT broker " << _brokerHost << ":" << _brokerPort;
sleep(1);
} else {
_connected = true;
LOGM(info) << "Connection established!";
}
}
boost::regex sensorReg(SENSOR_PATTERN), groupReg(GROUP_PATTERN), pluginReg(PLUGIN_PATTERN);
boost::cmatch match;
if(_sensorPattern == "")
return false;
else if ( !boost::regex_search(_sensorPattern.c_str(), match, sensorReg) ) {
LOGM(error) << "Invalid sensor naming pattern. You must at least include " << SENSOR_PATTERN << "!";
return true;
}
// Performing auto-publish for analytics output sensors
std::string topic, name;
unsigned int publishCtr=0;
for(auto& p: _analyticsPlugins)
for(auto& a: p.configurator->getAnalyzers())
for(auto& u: a->getUnits())
for(auto& s: u->getBaseOutputs()) {
topic = std::string(DCDB_MAP) + s->getMqtt();
name = s->getName();
// If the unit is related to a system component all of its sensors will have their names adjusted already
// If it is root, then we apply normal auto-publish. This means that adding the analyzer's name
// or plugin ID do not always work, as of now
if( u->getName() == "root") {
name = _sensorPattern;
name = boost::regex_replace(name, sensorReg, s->getName());
name = boost::regex_replace(name, groupReg, a->getName());
name = boost::regex_replace(name, pluginReg, p.id);
}
s->setName(name);
//try to send mapping to the broker
if (mosquitto_publish(_mosq, NULL, topic.c_str(), name.length(), name.c_str(), _qosLevel, false) != MOSQ_ERR_SUCCESS) {
LOGM(error) << "Broker not reachable! Only " << publishCtr << " analytics output sensors were published.";
_connected = false;
return true;
}
else
publishCtr++;
}
LOGM(info) << "Sensor name auto-publish performed for all analytics output sensors!";
return true;
}
void MQTTPusher::computeMsgRate() {
// Computing number of sent MQTT messages per second
float msgRate = 0;
for(auto& p : _plugins)
for(const auto& g : p.configurator->getSensorGroups())
msgRate += g->getSensors().size() * ( 1000 / g->getInterval() ) / g->getMinValues();
for(auto& p : _analyticsPlugins)
for(const auto& a : p.configurator->getAnalyzers())
for(const auto& u : a->getUnits())
msgRate += u->getBaseOutputs().size() * ( 1000 / a->getInterval() ) / a->getMinValues();
// The formula below assumes the pusher's sleep time is 1 sec; if not, change accordingly
_overrideMsgCap = _maxNumberOfMessages == 0 || msgRate > _maxNumberOfMessages;
if( _overrideMsgCap && _maxNumberOfMessages != 0 )
......
<
......@@ -15,16 +15,22 @@
#include <mosquitto.h>
#include "includes/PluginDefinitions.h"
#include "includes/SensorBase.h"
#include "analytics/AnalyticsManager.h"
#include <map>
/**
* Class responsible for collecting values from the sensors and pushing them to the database.
*/
class MQTTPusher {
public:
MQTTPusher(int brokerPort, const std::string& brokerHost, const std::string& sensorPattern, int qosLevel, pluginVector_t& plugins, unsigned int maxNumberOfMessages);
MQTTPusher(int brokerPort, const std::string& brokerHost, const std::string& sensorPattern, int qosLevel,
pluginVector_t& plugins, an_pluginVector_t& aPlugins, unsigned int maxNumberOfMessages);
virtual ~MQTTPusher();
void push();
bool sendMappings();
bool sendAnalyticsMappings();
void start() {
_keepRunning = true;
......@@ -49,7 +55,6 @@ public:
private:
void sendReadings(SensorBase& s, reading_t* reads, std::size_t& totalCount);
bool sendMappings();
void computeMsgRate();
int _qosLevel;
......@@ -57,6 +62,7 @@ private:
std::string _brokerHost;
std::string _sensorPattern;
pluginVector_t& _plugins;
an_pluginVector_t& _analyticsPlugins;
struct mosquitto* _mosq;
bool _connected;
bool _keepRunning;
......