Commit b07598c8 authored by Alessio Netti's avatar Alessio Netti
Browse files

Data analytics framework merge

parents 0b0635f1 a59764f7
......@@ -15,11 +15,12 @@ include $(DCDBCOREPATH)/common.mk
VERSION = $(shell git describe --long|sed 's/-\([0-9]*\)/.\1/')
CXXFLAGS = -std=c++11 -DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG -DBOOST_NETWORK_ENABLE_HTTPS -O2 -g -Wall -Wno-unused-function -Wno-deprecated-declarations -Wno-unused-variable -DBOOST_LOG_DYN_LINK -I$(DCDBBASEPATH)/dcdb/include -I$(DCDBDEPLOYPATH)/include -I$(DCDBDEPSPATH)/cpp-netlib-0.12.0-final/deps/asio/asio/include -DVERSION=\"$(VERSION)\"
CXXFLAGS = -std=c++11 -DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG -DBOOST_NETWORK_ENABLE_HTTPS -O2 -g -Wall -Wno-unused-function -Wno-unused-local-typedef -Wno-deprecated-declarations -Wno-unused-variable -DBOOST_LOG_DYN_LINK -I$(DCDBBASEPATH)/dcdb/include -I$(DCDBDEPLOYPATH)/include -I$(DCDBDEPSPATH)/cpp-netlib-0.12.0-final/deps/asio/asio/include -DVERSION=\"$(VERSION)\"
LIBS = -L../deps/mosquitto_build/lib -L$(DCDBDEPLOYPATH)/lib/ -ldl -lmosquitto -lboost_system -lboost_thread -lboost_log_setup -lboost_log -lboost_regex -lpthread -lcrypto -lssl -lcppnetlib-server-parsers -lcppnetlib-uri -rdynamic
OBJS = src/dcdbpusher.o src/Configuration.o src/MQTTPusher.o src/HttpsServer.o
OBJS = src/dcdbpusher.o src/Configuration.o src/MQTTPusher.o src/HttpsServer.o src/analytics/AnalyticsManager.o src/analytics/SensorNavigator.o
PLUGINS = procfs pdu sysfs ipmi bacnet snmp gpfsmon tester
ANALYZERS = average
ifeq ($(OS),Darwin)
BACNET_PORT = bsd
......@@ -33,16 +34,17 @@ else
PLUGINFLAGS = -fPIC
endif
PLUGIN_LIBS = $(foreach p,$(PLUGINS),libdcdbplugin_$(p).$(LIBEXT))
ANALYZER_LIBS = $(foreach p,$(ANALYZERS),libdcdbanalyzer_$(p).$(LIBEXT))
$(TARGET): $(foreach f,$(DISTFILESPATHS),$(DCDBDEPSPATH)/$(f)/.installed) $(OBJS)
$(CXX) -o $(TARGET) $(OBJS) $(LIBS)
all: $(TARGET) $(PLUGIN_LIBS)
all: $(TARGET) $(PLUGIN_LIBS) $(ANALYZER_LIBS)
debug: CXXFLAGS += -DDEBUG
debug: all
clean:
rm -f $(PLUGIN_LIBS) $(TARGET) $(shell find -name "*.o")
rm -f $(PLUGIN_LIBS) $(ANALYZER_LIBS) $(TARGET) $(shell find -name "*.o")
$(DCDBDEPSPATH)/bacnet-stack-$(BACNET-STACK_VERSION)/.installed: $(DCDBDEPSPATH)/bacnet-stack-$(BACNET-STACK_VERSION)/.patched
@echo ""
......@@ -67,10 +69,13 @@ $(OBJS) : %.o : %.cpp
install_lib: $(PLUGIN_LIBS)
install $^ $(DCDBDEPLOYPATH)/lib/
install_conf: $(foreach p,global $(PLUGINS),config/$(p).conf)
install_analyzer: $(ANALYZER_LIBS)
install $^ $(DCDBDEPLOYPATH)/lib/
install_conf: $(foreach p,global $(PLUGINS) $(ANALYZERS),config/$(p).conf)
install -m 644 $^ $(DCDBDEPLOYPATH)/etc/
install: $(TARGET) install_lib
install: $(TARGET) install_lib install_analyzer
install $(TARGET) $(DCDBDEPLOYPATH)/bin/
@echo "Done with installation."
@echo "====================================="
......@@ -86,28 +91,31 @@ libdcdbplugin_sysfs.$(LIBEXT): src/sensors/sysfs/SysfsSensorGroup.o src/sensors/
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex
libdcdbplugin_perfevent.$(LIBEXT): src/sensors/perfevent/PerfSensorGroup.o src/sensors/perfevent/PerfeventConfigurator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex
libdcdbplugin_ipmi.$(LIBEXT): src/sensors/ipmi/IPMISensorGroup.o src/sensors/ipmi/IPMIHost.o src/sensors/ipmi/IPMIConfigurator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lfreeipmi -lboost_regex
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex -lfreeipmi
libdcdbplugin_pdu.$(LIBEXT): src/sensors/pdu/PDUSensorGroup.o src/sensors/pdu/PDUUnit.o src/sensors/pdu/PDUConfigurator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lcrypto -lssl -lboost_log -lboost_system
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lcrypto -lssl -lboost_log -lboost_regex -lboost_system
libdcdbplugin_bacnet.$(LIBEXT): src/sensors/bacnet/BACnetSensorGroup.o src/sensors/bacnet/BACnetClient.o src/sensors/bacnet/BACnetConfigurator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lbacnet
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex -lbacnet
libdcdbplugin_snmp.$(LIBEXT): src/sensors/snmp/SNMPSensorGroup.o src/sensors/snmp/SNMPConnection.o src/sensors/snmp/SNMPConfigurator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lnetsnmp -lnetsnmpagent
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex -lnetsnmp -lnetsnmpagent
libdcdbplugin_procfs.$(LIBEXT): src/sensors/procfs/ProcfsSensorGroup.o src/sensors/procfs/ProcfsParser.o src/sensors/procfs/ProcfsConfigurator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex
libdcdbplugin_tester.$(LIBEXT): src/sensors/tester/TesterSensorGroup.o src/sensors/tester/TesterConfigurator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex
libdcdbplugin_gpfsmon.$(LIBEXT): src/sensors/gpfsmon/GpfsmonSensorGroup.o src/sensors/gpfsmon/GpfsmonConfigurator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex
#libdcdbplugin_opa.$(LIBEXT): src/sensors/opa/OpaSensorGroup.o src/sensors/opa/OpaConfigurator.o
# $(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lopamgt -libverbs -libumad -lssl
# $(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex -lopamgt -libverbs -libumad -lssl
libdcdbanalyzer_average.$(LIBEXT): src/analytics/analyzers/average/AverageAnalyzer.o src/analytics/analyzers/average/AverageConfigurator.o src/analytics/SensorNavigator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex
global {
mqttPrefix /FF112233445566778899AAB
}
template_average def1 {
interval 1000
minValues 3
mqttPart FF0
duplicate false
streaming true
}
average avg1 {
default def1
mqttPart FF0
mqttStart 00
input {
sensor "<unit>col_user"
sensor "<unit-1>MemFree"
}
output {
sensor "<unit, filter cpu250>sum" {
mqttsuffix 76
}
sensor "<unit, filter cpu250>max" {
mqttsuffix 77
}
sensor "<unit, filter cpu250>avg" {
mqttsuffix 78
}
}
}
average avg2 {
default def1
interval 1500
mqttPart FF1
mqttStart 00
input {
sensor "<unit>col_user"
sensor "<unit - 1>MemFree"
}
output {
sensor "<unit - 1>sum" {
mqttsuffix 76
}
sensor "<unit - 1>max" {
mqttsuffix 77
}
sensor "<unit - 1>avg" {
mqttsuffix 78
}
}
}
average avg3 {
default def1
interval 1500
mqttPart FF2
mqttStart 00
input {
all-recursive
}
output {
sensor "<unit - 1>sumall" {
mqttsuffix 80
}
sensor "<unit - 1>maxall" {
mqttsuffix 81
}
sensor "<unit - 1>avgall" {
mqttsuffix 82
}
}
}
......@@ -59,3 +59,7 @@ plugins {
}
}
analyzerPlugins {
}
......@@ -26,6 +26,7 @@ Configuration::Configuration(const std::string& cfgFilePath) :
//set default values for global variables
_global.qosLevel = 1;
_global.daemonize = 0;
_global.hierarchy = "";
_global.brokerHost = "";
_global.brokerPort = 1883;
_global.threads = 1;
......@@ -84,6 +85,8 @@ bool Configuration::readGlobal() {
}
} else if (boost::iequals(global.first, "sensorpattern")) {
_global.pluginSettings.sensorPattern = global.second.data();
} else if (boost::iequals(global.first, "hierarchy")) {
_global.hierarchy = global.second.data();
} else if (boost::iequals(global.first, "tempdir")) {
_global.pluginSettings.tempdir = global.second.data();
if (_global.pluginSettings.tempdir[_global.pluginSettings.tempdir.length()-1] != '/') {
......@@ -340,7 +343,7 @@ bool Configuration::checkMqtt(const std::string& mqtt) {
return false;
}
auto returnIt = _mqttTopics.insert(mqtt);
auto returnIt = _mqttTopics.insert(str);
if (!returnIt.second) {
LOG(error) << "MQTT-Topic \"" << mqtt << "\" used twice!";
return false;
......
......@@ -15,18 +15,19 @@
#include <boost/log/trivial.hpp>
#include "includes/PluginDefinitions.h"
typedef struct {
typedef struct {
int daemonize;
int brokerPort;
int qosLevel;
std::string brokerHost;
std::string hierarchy;
uint32_t threads;
unsigned int maxMsgNum;
boost::log::trivial::severity_level logLevelFile;
boost::log::trivial::severity_level logLevelCmd;
pluginSettings_t pluginSettings;
restAPISettings_t restAPISettings;
} global_t;
} global_t;
/**
* Class responsible of reading the global configuration as well as loading and invoking required dynamic libraries.
......
......@@ -14,7 +14,7 @@
#include <string>
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/info_parser.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/algorithm/string/split.hpp>
#define LOGH(sev) LOG(sev) << "HttpsServer: "
......@@ -105,6 +105,8 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
if (pathStrs[0] == "help") {
response = "dcdbpusher RESTful API cheatsheet:\n"
" -GET: /help This help message\n"
" /analytics/help\n"
" An help message for data analytics commands\n"
" /plugins List of currently loaded plugins (Discovery)\n"
" /[plugin]/sensors\n"
" List of currently running sensors which belong\n"
......@@ -116,10 +118,11 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
" -PUT: /[plugin]/[start|stop|reload]\n"
" Start/stop the sensors of the plugin or\n"
" reload the plugin configuration\n"
"\n"
"All resources have to be prepended by host:port and need at\n"
"least the query ?authkey=[token] at the end. Multiple queries\n"
"need to be separated by semicolons(';')\n";
"\n";
//"All resources have to be prepended by host:port and need at\n"
//"least the query ?authkey=[token] at the end. Multiple queries\n"
//"need to be separated by semicolons(';')\n";
response += _httpsServer._manager->restCheatSheet;
} else {
//first check permission
if (!_httpsServer.check_authkey(auth_value, permission::GETReq)) {
......@@ -128,14 +131,32 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
goto error;
}
if (pathStrs[0] == "plugins") {
//Managing REST GET commands to the data analytics framework
if(pathStrs[0] == "analytics") {
try {
restResponse_t reply = _httpsServer._manager->REST(pathStrs, queries, method, _httpsServer._io);
data << reply.data;
response = reply.response;
} catch(const std::invalid_argument &e) {
LOGH(warning) << e.what();
connection->set_status(server::connection::bad_request);
goto error;
} catch(const std::domain_error &e) {
response = e.what();
connection->set_status(server::connection::not_found);
} catch(const std::exception &e) {
LOGH(warning) << e.what();
connection->set_status(server::connection::internal_server_error);
goto error;
}
} else if (pathStrs[0] == "plugins") {
if (json) {
boost::property_tree::ptree root, plugins;
for(auto& p : _httpsServer._plugins) {
plugins.put(p.id, "");
}
root.add_child("plugins", plugins);
boost::property_tree::write_info(data, root);
boost::property_tree::write_json(data, root, true);
} else {
for(auto& p : _httpsServer._plugins) {
data << p.id << "\n";
......@@ -166,7 +187,7 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
sensors.add_child(g->getGroupName(), group);
}
root.add_child(p.id, sensors);
boost::property_tree::write_info(data, root);
boost::property_tree::write_json(data, root, true);
} else {
for(auto g : p.configurator->getSensorGroups()) {
for(auto s : g->getSensors()) {
......@@ -205,6 +226,7 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
}
//process actual request
bool found = false;
response = "Plugin not found!";
connection->set_status(server::connection::not_found);
......@@ -214,6 +236,7 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
for(const auto& g : p.configurator->getSensorGroups()) {
for(const auto& s : g->getSensors()) {
if (s->getName() == sensor) {
found = true;
response = pathStrs[0] + "::" + sensor + _httpsServer.calcAvg(*s, time);
connection->set_status(server::connection::ok);
break;
......@@ -222,6 +245,23 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
}
}
}
if(!found) {
for(auto& p : _httpsServer._manager->getPlugins())
if (p.id == pathStrs[0]) {
response = "Sensor not found!";
for(const auto& a : p.configurator->getAnalyzers())
if(a->getStreaming())
for(const auto& u : a->getUnits())
for (const auto& s : u->getBaseOutputs())
if (s->getName() == sensor) {
found = true;
response = pathStrs[0] + "::" + sensor + _httpsServer.calcAvg(*s, time);
connection->set_status(server::connection::ok);
break;
}
}
}
}
}
}
......@@ -234,6 +274,33 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
goto error;
}
//Managing REST PUT commands to the data analytics framework
if(pathStrs[0] == "analytics") {
if( pathStrs.back() == "reload" ) {
_httpsServer._mqttPusher->halt();
// Wait until MQTTPusher is paused in order to reload plugins
while (!_httpsServer._mqttPusher->isHalted()) { sleep(1); }
}
try {
restResponse_t reply = _httpsServer._manager->REST(pathStrs, queries, method, _httpsServer._io);
data << reply.data;
response = reply.response;
} catch(const std::invalid_argument &e) {
LOGH(warning) << e.what();
connection->set_status(server::connection::bad_request);
goto error;
} catch(const std::domain_error &e) {
response = e.what();
connection->set_status(server::connection::not_found);
} catch(const std::exception &e) {
response = e.what();
connection->set_status(server::connection::internal_server_error);
}
// Continue MQTTPusher when a reload was performed
if( pathStrs.back() == "reload" )
_httpsServer._mqttPusher->cont();
} else {
if (pathStrs.size() < 2) {
LOGH(warning) << "Received malformed request: No second path part";
connection->set_status(server::connection::bad_request);
......@@ -248,9 +315,9 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
//switch code depending on selected action
if (action == "start") {
for(auto& p : _httpsServer._plugins) {
for (auto &p : _httpsServer._plugins) {
if (p.id == pathStrs[0]) {
for(const auto& g : p.configurator->getSensorGroups()) {
for (const auto &g : p.configurator->getSensorGroups()) {
g->start();
}
response = "Plugin " + pathStrs[0] + ": Sensors started";
......@@ -259,9 +326,9 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
}
}
} else if (action == "stop") {
for(auto& p : _httpsServer._plugins) {
for (auto &p : _httpsServer._plugins) {
if (p.id == pathStrs[0]) {
for(const auto& g : p.configurator->getSensorGroups()) {
for (const auto &g : p.configurator->getSensorGroups()) {
g->stop();
}
response = "Plugin " + pathStrs[0] + ": Sensors stopped";
......@@ -270,7 +337,7 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
}
}
} else if (action == "reload") {
for(auto& p : _httpsServer._plugins) {
for (auto &p : _httpsServer._plugins) {
if (p.id == pathStrs[0]) {
//before modifying the plugin we need to ensure that we have exclusive access
//therefore pause the only other concurrent user (MQTTPusher)
......@@ -288,7 +355,7 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
connection->set_status(server::connection::internal_server_error);
}
for(const auto& g : p.configurator->getSensorGroups()) {
for (const auto &g : p.configurator->getSensorGroups()) {
g->init(_httpsServer._io);
g->start();
}
......@@ -303,6 +370,21 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
connection->set_status(server::connection::not_supported);
goto error;
}
//Updating the SensorNavigator on plugin changes
QueryEngine &qEngine = QueryEngine::getInstance();
std::shared_ptr <SensorNavigator> navigator = std::make_shared<SensorNavigator>();
vector <std::string> names, topics;
for (const auto &p : _httpsServer._plugins)
for (const auto &g : p.configurator->getSensorGroups())
for (const auto &s : g->getSensors()) {
names.push_back(s->getName());
topics.push_back(s->getMqtt());
}
navigator->buildTree(qEngine.getSensorHierarchy(), &names, &topics);
qEngine.setNavigator(navigator);
qEngine.triggerUpdate();
}
}
LOGH(info) << "Responding: " << response;
......@@ -320,8 +402,8 @@ void HttpsServer::requestHandler::log(const server::string_type& message) {
LOGH(error) << message;
}
HttpsServer::HttpsServer(restAPISettings_t restAPISettings, pluginVector_t& plugins, MQTTPusher* mqttPusher, boost::asio::io_service& io) :
_plugins(plugins), _mqttPusher(mqttPusher), _io(io), _handler(*this) {
HttpsServer::HttpsServer(restAPISettings_t restAPISettings, pluginVector_t& plugins, MQTTPusher* mqttPusher, AnalyticsManager* manager, boost::asio::io_service& io) :
_plugins(plugins), _mqttPusher(mqttPusher), _manager(manager), _io(io), _handler(*this) {
std::shared_ptr<asio::ssl::context> ctx = std::make_shared<asio::ssl::context>(asio::ssl::context::sslv23);
ctx->set_options(asio::ssl::context::default_workarounds | asio::ssl::context::no_sslv3 | asio::ssl::context::single_dh_use);
......
......@@ -25,6 +25,7 @@
#include "includes/Logging.h"
#include "includes/PluginDefinitions.h"
#include "MQTTPusher.h"
#include "analytics/AnalyticsManager.h"
typedef struct {
std::string restHost;
......@@ -47,7 +48,7 @@ typedef std::map<std::string, std::bitset<NUM_PERMISSIONS>> authkeyMap_t;
class HttpsServer {
public:
HttpsServer(restAPISettings_t restAPISettings, pluginVector_t& plugins, MQTTPusher* mqttPusher, boost::asio::io_service& io);
HttpsServer(restAPISettings_t restAPISettings, pluginVector_t& plugins, MQTTPusher* mqttPusher, AnalyticsManager* manager, boost::asio::io_service& io);
virtual ~HttpsServer();
bool addAuthkey(authkeyMap_t::value_type authkey) {
......@@ -108,6 +109,7 @@ private:
pluginVector_t& _plugins;
authkeyMap_t _authkeys;
MQTTPusher* _mqttPusher;
AnalyticsManager* _manager;
boost::asio::io_service& _io;
server* _server;
......
......@@ -9,17 +9,18 @@
#include <iostream>
#include <string>
#include <unistd.h>
#include <boost/regex.hpp>
#include "timestamp.h"
#define LOGM(sev) LOG(sev) << "Mosquitto: "
MQTTPusher::MQTTPusher(int brokerPort, const std::string& brokerHost, const std::string& sensorPattern, int qosLevel, pluginVector_t& plugins, unsigned int maxNumberOfMessages) :
MQTTPusher::MQTTPusher(int brokerPort, const std::string& brokerHost, const std::string& sensorPattern, int qosLevel,
pluginVector_t& plugins, an_pluginVector_t& aPlugins, unsigned int maxNumberOfMessages) :
_qosLevel(qosLevel),
_brokerPort(brokerPort),
_brokerHost(brokerHost),
_sensorPattern(sensorPattern),
_plugins(plugins),
_analyticsPlugins(aPlugins),
_connected(false),
_keepRunning(true),
_overrideMsgCap(true),
......@@ -77,7 +78,7 @@ void MQTTPusher::push() {
}
}
//Performing sensor name auto-publish if necessary
//Performing auto-publish if necessary
sendMappings();
computeMsgRate();
......@@ -92,6 +93,7 @@ void MQTTPusher::push() {
}
_halted = false;
totalCount = 0;
// Push sensor data
for(auto& p : _plugins) {
if(_doHalt) {
//for faster response
......@@ -109,6 +111,25 @@ void MQTTPusher::push() {
}
}
}
// Push output analytics sensors
for(auto& p : _analyticsPlugins) {
if(_doHalt) {
break;
}
for(const auto& a : p.configurator->getAnalyzers()) {
for(const auto& u : a->getUnits()) {
for(const auto& s : u->getBaseOutputs()) {
if (s->getSizeOfReadingQueue() >= a->getMinValues()) {
if (_overrideMsgCap || totalCount < _maxNumberOfMessages) {
sendReadings(*s, reads, totalCount);
} else {
break;
}
}
}
}
}
}
int mosqErr;
if ((mosqErr = mosquitto_loop(_mosq, -1, 1)) != MOSQ_ERR_SUCCESS) {
LOGM(error) << "Error in mosquitto_loop: " << mosquitto_strerror(mosqErr);
......@@ -159,27 +180,37 @@ void MQTTPusher::sendReadings(SensorBase& s, reading_t* reads, std::size_t& tota
}
bool MQTTPusher::sendMappings() {
boost::regex sensorReg(SENSOR_PATTERN), groupReg(GROUP_PATTERN), pluginReg(PLUGIN_PATTERN);
boost::cmatch match;
if(_sensorPattern == "")
return false;
else if ( !boost::regex_search(_sensorPattern.c_str(), match, sensorReg) ) {
LOGM(error) << "Invalid sensor naming pattern. You must at least include " << SENSOR_PATTERN << "!";
return true;
}
std::string topic, name;
unsigned int publishCtr=0;
// Performing auto-publish for sensors
for(auto& p: _plugins)
for(auto g: p.configurator->getSensorGroups())
for(auto s: g->getSensors()) {
for(auto& g: p.configurator->getSensorGroups())
for(auto& s: g->getSensors()) {
topic = std::string(DCDB_MAP) + s->getMqtt();
name = _sensorPattern;
name = boost::regex_replace(name, sensorReg, s->getName());
name = boost::regex_replace(name, groupReg, g->getGroupName());
name = boost::regex_replace(name, pluginReg, p.id);
name = s->getName();
//try to send mapping to the broker
// Try to send mapping to the broker
if (mosquitto_publish(_mosq, NULL, topic.c_str(), name.length(), name.c_str(), _qosLevel, false) != MOSQ_ERR_SUCCESS) {
LOGM(error) << "Broker not reachable! Only " << publishCtr << " sensors were published.";
_connected = false;
return true;
}
else
publishCtr++;
}
// Performing auto-publish for analytics output sensors
for(auto& p: _analyticsPlugins)
for(auto& a: p.configurator->getAnalyzers())
for(auto& u: a->getUnits())
for(auto& s: u->getBaseOutputs()) {
topic = std::string(DCDB_MAP) + s->getMqtt();
name = s->getName();
// Try to send mapping to the broker
if (mosquitto_publish(_mosq, NULL, topic.c_str(), name.length(), name.c_str(), _qosLevel, false) != MOSQ_ERR_SUCCESS) {
LOGM(error) << "Broker not reachable! Only " << publishCtr << " sensors were published.";
_connected = false;
......@@ -191,12 +222,17 @@ bool MQTTPusher::sendMappings() {
LOGM(info) << "Sensor name auto-publish performed for all sensors!";
return true;
}
void MQTTPusher::computeMsgRate() {
// Computing number of sent MQTT messages per second
float msgRate = 0;
for(auto& p : _plugins)