Commit 2b89e833 authored by Alessio Netti's avatar Alessio Netti
Browse files

WIP: string MQTT topics in dcdbpusher

parent a5e1dd30
......@@ -11,14 +11,15 @@
#include <boost/property_tree/info_parser.hpp>
#include <boost/algorithm/string.hpp>
#include "logging.h"
#include "mqttchecker.h"
// Wrapper class for plugin-specific settings
class pluginSettings_t {
public:
pluginSettings_t() {}
std::string sensorPattern = "";
std::string mqttPrefix = "";
std::string tempdir = "./";
bool autoPublish = false;
unsigned int cacheInterval = 900000;
};
......
......@@ -8,6 +8,8 @@
#include <set>
#include "logging.h"
#define MQTT_SEP '/'
/**
* Class that manages constraint for MQTT topic formatting
*
......@@ -31,6 +33,18 @@ public:
return m;
}
/**
* @brief Sanitizes and formats a MQTT topic or suffix
*
* @param topic The topic or suffix to be processed
* @param cpuID The cpu ID associated to this topic (if any)
* @return The processed MQTT topic or suffix
*/
//TODO: integrate proper topic formatting
static std::string formatTopic(const std::string& topic, int cpuID=-1) {
return cpuID<0 ? topic : "cpu" + std::to_string(cpuID) + std::to_string(MQTT_SEP) + topic;
}
/**
* @brief Resets the internal topics set
*/
......@@ -46,7 +60,7 @@ public:
*/
void removeTopic(const std::string& topic) {
std::string str(topic);
str.erase(std::remove(str.begin(), str.end(), '/'), str.end());
str.erase(std::remove(str.begin(), str.end(), MQTT_SEP), str.end());
_topics.erase(str);
}
......@@ -60,16 +74,9 @@ public:
* @return True if the topic is valid, False otherwise
*/
bool checkTopic(const std::string& topic) {
//MQTT topic must have 112 bit = 14 bytes = 28 hex chars
//but can have more with some extra '/', therefore remove all '/'
//We remove all '/' characters to detect duplicates
std::string str(topic);
str.erase(std::remove(str.begin(), str.end(), '/'), str.end());
if (str.length() != 28) {
LOG(error) << "MQTT-Topic \"" << topic << "\" contains " << str.length() << " hex characters, not 28 as required!";
return false;
}
str.erase(std::remove(str.begin(), str.end(), MQTT_SEP), str.end());
auto returnIt = _topics.insert(str);
if (!returnIt.second) {
LOG(error) << "MQTT-Topic \"" << topic << "\" used twice!";
......@@ -86,7 +93,6 @@ public:
*
* @param name The name (string) to be removed
*/
//TODO: get rid of these two methods once MQTT topics and sensor names are unified
void removeName(const std::string& name) {
_names.erase(name);
}
......@@ -100,11 +106,11 @@ public:
* @return True if the name is valid, False otherwise
*/
bool checkName(const std::string& name) {
//auto returnIt = _names.insert(name);
//if (!returnIt.second) {
// LOG(error) << "Name \"" << name << "\" used twice!";
// return false;
//}
auto returnIt = _names.insert(name);
if (!returnIt.second) {
LOG(error) << "Name \"" << name << "\" used twice!";
return false;
}
return true;
}
......@@ -129,11 +135,11 @@ public:
* @return True if the name is valid, False otherwise
*/
bool checkGroup(const std::string& name) {
//auto returnIt = _groups.insert(name);
//if (!returnIt.second) {
// LOG(error) << "Group name \"" << name << "\" used twice!";
// return false;
//}
auto returnIt = _groups.insert(name);
if (!returnIt.second) {
LOG(error) << "Group name \"" << name << "\" used twice!";
return false;
}
return true;
}
......
......@@ -108,7 +108,7 @@ public:
void setSkipConstVal(bool skipConstVal) { _skipConstVal = skipConstVal; }
void setDelta(const bool delta) { _delta = delta; }
void setName(const std::string& name, int cpuID=-1) { _name = formatName(name, cpuID); }
void setName(const std::string& name) { _name = name; }
void setMqtt(const std::string& mqtt) { _mqtt = mqtt; }
void setSinkPath(const std::string& path) { _sinkPath = path; }
void setCacheInterval(unsigned cacheInterval) { _cacheInterval = cacheInterval; }
......@@ -256,8 +256,6 @@ public:
}
}
static std::string formatName(const std::string& name, int cpuID=-1) {return cpuID<0 ? name : "cpu" + std::to_string(cpuID) + "." + name;}
virtual void printConfig(LOG_LEVEL ll, LOGGER& lg, unsigned leadingSpaces=16) {
std::string leading(leadingSpaces, ' ');
LOG_VAR(ll) << leading << "Sensor: " << _name;
......
......@@ -30,8 +30,8 @@ bool GlobalConfiguration::readConfig() {
pluginSettings.mqttPrefix = global.second.data();
if (pluginSettings.mqttPrefix[pluginSettings.mqttPrefix.length()-1] != '/')
pluginSettings.mqttPrefix.append("/");
} else if (boost::iequals(global.first, "sensorpattern")) {
pluginSettings.sensorPattern = global.second.data();
} else if (boost::iequals(global.first, "autoPublish")) {
pluginSettings.autoPublish = to_bool(global.second.data());
} else if (boost::iequals(global.first, "tempdir")) {
pluginSettings.tempdir = global.second.data();
if (pluginSettings.tempdir[pluginSettings.tempdir.length() - 1] != '/')
......
......@@ -13,7 +13,6 @@
#include "HttpsServer.h"
#include <boost/log/trivial.hpp>
#include "includes/PluginDefinitions.h"
#include "mqttchecker.h"
#define BROKERPORT 1883
#define BROKERHOST "127.0.0.1"
......
......@@ -13,12 +13,12 @@
#define LOGM(sev) LOG(sev) << "Mosquitto: "
MQTTPusher::MQTTPusher(int brokerPort, const std::string& brokerHost, const std::string& sensorPattern, int qosLevel,
MQTTPusher::MQTTPusher(int brokerPort, const std::string& brokerHost, bool autoPublish, int qosLevel,
pluginVector_t& plugins, an_pluginVector_t& aPlugins, int maxNumberOfMessages, unsigned int maxInflightMsgNum, unsigned int maxQueuedMsgNum) :
_qosLevel(qosLevel),
_brokerPort(brokerPort),
_brokerHost(brokerHost),
_sensorPattern(sensorPattern),
_autoPublish(autoPublish),
_plugins(plugins),
_analyticsPlugins(aPlugins),
_connected(false),
......@@ -198,8 +198,7 @@ int MQTTPusher::sendReadings(SensorBase& s, reading_t* reads, std::size_t& total
}
bool MQTTPusher::sendMappings() {
if(_sensorPattern == "")
return false;
if(!_autoPublish) return false;
std::string topic, name;
unsigned int publishCtr=0;
......
......@@ -24,7 +24,7 @@ enum msgCap_t {DISABLED = 1, ENABLED = 2, MINIMUM = 3};
*/
class MQTTPusher {
public:
MQTTPusher(int brokerPort, const std::string& brokerHost, const std::string& sensorPattern, int qosLevel,
MQTTPusher(int brokerPort, const std::string& brokerHost, bool autoPublish, int qosLevel,
pluginVector_t& plugins, an_pluginVector_t& aPlugins, int maxNumberOfMessages, unsigned int maxInflightMsgNum, unsigned int maxQueuedMsgNum);
virtual ~MQTTPusher();
......@@ -59,7 +59,7 @@ private:
int _qosLevel;
int _brokerPort;
std::string _brokerHost;
std::string _sensorPattern;
bool _autoPublish;
pluginVector_t& _plugins;
an_pluginVector_t& _analyticsPlugins;
struct mosquitto* _mosq;
......
......@@ -55,28 +55,28 @@ libdcdbplugin_sysfs.$(LIBEXT): sensors/sysfs/SysfsSensorGroup.o sensors/sysfs/Sy
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex
libdcdbplugin_perfevent.$(LIBEXT): sensors/perfevent/PerfSensorGroup.o sensors/perfevent/PerfeventConfigurator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system
libdcdbplugin_ipmi.$(LIBEXT): sensors/ipmi/IPMISensorGroup.o sensors/ipmi/IPMIHost.o sensors/ipmi/IPMIConfigurator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex -lfreeipmi
libdcdbplugin_pdu.$(LIBEXT): sensors/pdu/PDUSensorGroup.o sensors/pdu/PDUUnit.o sensors/pdu/PDUConfigurator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lcrypto -lssl -lboost_log -lboost_regex -lboost_system
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lcrypto -lssl -lboost_log -lboost_system
libdcdbplugin_bacnet.$(LIBEXT): sensors/bacnet/BACnetSensorGroup.o sensors/bacnet/BACnetClient.o sensors/bacnet/BACnetConfigurator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex -lbacnet
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lbacnet
libdcdbplugin_snmp.$(LIBEXT): sensors/snmp/SNMPSensorGroup.o sensors/snmp/SNMPConnection.o sensors/snmp/SNMPConfigurator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex -lnetsnmp -lnetsnmpagent
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lnetsnmp -lnetsnmpagent
libdcdbplugin_procfs.$(LIBEXT): sensors/procfs/ProcfsSensorGroup.o sensors/procfs/ProcfsParser.o sensors/procfs/ProcfsConfigurator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex
libdcdbplugin_tester.$(LIBEXT): sensors/tester/TesterSensorGroup.o sensors/tester/TesterConfigurator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system
libdcdbplugin_gpfsmon.$(LIBEXT): sensors/gpfsmon/GpfsmonSensorGroup.o sensors/gpfsmon/GpfsmonConfigurator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system
#libdcdbplugin_opa.$(LIBEXT): sensors/opa/OpaSensorGroup.o sensors/opa/OpaConfigurator.o
# $(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex -lopamgt -libverbs -libumad -lssl
# $(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lopamgt -libverbs -libumad -lssl
......@@ -128,12 +128,11 @@ void printSyntax()
*/
_configuration = new Configuration("", "dcdbpusher.conf");
cout << "Usage:" << endl;
cout << " dcdbpusher [-d] [-x] [-a<string>] [-b<host>] [-m<string>] [-w<path>] [-v<level>] <path/to/configfiles/>" << endl;
cout << " dcdbpusher [-d] [-x] [-a] [-b<host>] [-m<string>] [-w<path>] [-v<level>] <path/to/configfiles/>" << endl;
cout << " dcdbpusher -h" << endl;
cout << endl;
cout << "Options:" << endl;
cout << " -a <string> Auto-publish pattern [default: none]" << endl;
cout << " -b <host> MQTT broker [default: " << _configuration->brokerHost << ":" << _configuration->brokerPort << "]" << endl;
cout << " -m <string> MQTT topic prefix [default: none]" << endl;
cout << " -w <path> Writable temp dir [default: .]" << endl;
......@@ -142,6 +141,7 @@ void printSyntax()
cout << endl;
cout << " -d Daemonize" << endl;
cout << " -x Parse and print the config but do not actually start dcdbpusher" << endl;
cout << " -a Enable sensor auto-publish" << endl;
cout << " -h This help page" << endl;
cout << endl;
......@@ -161,7 +161,7 @@ int main(int argc, char** argv) {
}
//define allowed command-line options once
const char opts[] = "a:b:p:m:v:w:dxh";
const char opts[] = "b:p:m:v:w:dxah";
//check if help flag specified
char c;
......@@ -209,7 +209,7 @@ int main(int argc, char** argv) {
switch (c)
{
case 'a':
pluginSettings.sensorPattern = optarg;
pluginSettings.autoPublish = true;
break;
case 'b':
globalSettings.brokerHost = parseNetworkHost(optarg);
......@@ -306,6 +306,7 @@ int main(int argc, char** argv) {
LOG(info) << " MaxQueuedMsgNum: " << globalSettings.maxQueuedMsgNum;
LOG(info) << " MQTT-QoS: " << globalSettings.qosLevel;
LOG(info) << " MQTT-prefix: " << pluginSettings.mqttPrefix;
LOG(info) << " Auto-publish: " << (pluginSettings.autoPublish ? "Enabled" : "Disabled");
LOG(info) << " Write-Dir: " << pluginSettings.tempdir;
LOG(info) << " CacheInterval: " << pluginSettings.cacheInterval / 1000 << " [s]";
if(globalSettings.validateConfig) {
......@@ -345,7 +346,7 @@ int main(int argc, char** argv) {
}
//MQTTPusher and Https server get their own threads
_mqttPusher = new MQTTPusher(globalSettings.brokerPort, globalSettings.brokerHost, pluginSettings.sensorPattern, globalSettings.qosLevel,
_mqttPusher = new MQTTPusher(globalSettings.brokerPort, globalSettings.brokerHost, pluginSettings.autoPublish, globalSettings.qosLevel,
_configuration->getPlugins(), _analyticsManager->getPlugins(), globalSettings.maxMsgNum, globalSettings.maxInflightMsgNum, globalSettings.maxQueuedMsgNum);
_httpsServer = new HttpsServer(restAPISettings, _configuration->getPlugins(), _mqttPusher, _analyticsManager, io);
_configuration->readAuthkeys(_httpsServer);
......
......@@ -15,7 +15,6 @@
#include <boost/algorithm/string.hpp>
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/info_parser.hpp>
#include <boost/regex.hpp>
#include "ConfiguratorInterface.h"
#include "sensorbase.h"
#include "SensorGroupTemplate.h"
......@@ -57,9 +56,6 @@ protected:
const char CLOSE_SQBRKET = ']';
const char DASH = '-';
const std::string SENSOR_PATTERN = "(?i)<sensor>";
const std::string GROUP_PATTERN = "(?i)<group>";
public:
ConfiguratorTemplate() :
_entityName("INVALID"),
......@@ -67,7 +63,6 @@ public:
_baseName("INVALID"),
_cfgPath(""),
_mqttPrefix(""),
_sensorPattern(""),
_cacheInterval(DEFAULT_CACHE_INTERVAL) {}
ConfiguratorTemplate(const ConfiguratorTemplate&) = delete;
......@@ -255,14 +250,7 @@ public:
}
}
//read of config finished. Now we build the mqtt-topic for every sensor
if(!constructSensorNames())
return false;
for(const auto& g : _sensorGroups) {
for(const auto& s : g->getSensors()) {
s->setMqtt(_mqttPrefix + g->getMqttPart() + s->getMqtt());
}
}
return true;
return constructSensorTopics();
}
/**
......@@ -324,9 +312,6 @@ public:
} else {
LOG_VAR(ll) << " MQTT-Prefix: DEFAULT";
}
if (_sensorPattern != "") {
LOG_VAR(ll) << " Sensor Pattern: " << _sensorPattern;
}
if (_cacheInterval != DEFAULT_CACHE_INTERVAL) {
LOG_VAR(ll) << " Cache interval: " << _cacheInterval << " ms";
} else {
......@@ -353,9 +338,7 @@ public:
*/
void setGlobalSettings(const pluginSettings_t& pluginSettings) final {
_mqttPrefix = pluginSettings.mqttPrefix;
_sensorPattern = pluginSettings.sensorPattern;
_cacheInterval = pluginSettings.cacheInterval;
derivedSetGlobalSettings(pluginSettings);
}
......@@ -629,8 +612,6 @@ protected:
} else if (boost::iequals(global.first, "cacheInterval")) {
_cacheInterval = stoul(global.second.data());
_cacheInterval *= 1000;
} else if (boost::iequals(global.first, "sensorpattern")) {
_sensorPattern = global.second.data();
}
}
global(config.get_child("global"));
......@@ -734,61 +715,6 @@ protected:
*/
virtual void global(CFG_VAL config) {}
/**
* Increases by a certain value the input MQTT hex topic.
*
* Example: a mqtt="AAB7" and val=5 produce "AAC2" as output.
*
* @param mqtt: the MQTT hex string whose value has to be increased
* @param val: the value by which mqtt has to be increased
*
* @return the increased MQTT string
*
*/
const std::string increaseMqtt(const std::string& mqtt, int val) {
unsigned long mqttDigits = stoul(mqtt, 0, 16);
mqttDigits += val;
std::stringstream stream;
stream << std::setfill ('0') << std::setw(mqtt.length()) << std::uppercase << std::hex << mqttDigits;
return stream.str();
}
/**
* Replaces occurences of 'x' characters by a hex representation of a
* numerical CPU core ID. If no 'x' characters are found only the CPU hex
* string with specified width is returned.
*
* Examples: mqttPart= "xx", val=11 --> return "0B"
* mqttPart="A3xx", val=11 --> return "A30B"
* mqttPart="A3YY", val=11 --> return "000B"
*
* @param mqttPart: a template MQTT string, defines the length of the final string
* @param val: the value of the CPU core ID
*
* @return the hex string representation of the input CPU core ID
*
*/
const std::string formatMqttCPU(const std::string& mqttPart, unsigned int val) {
std::stringstream stream;
size_t n = std::count(mqttPart.begin(), mqttPart.end(), 'x');
if (n==0) {
stream << std::setfill ('0') << std::setw(mqttPart.length()) << std::uppercase << std::hex << val;
return stream.str();
} else {
std::string result(mqttPart);
stream << std::setfill ('0') << std::setw(n) << std::uppercase << std::hex << val;
std::string replacement = stream.str();
std::string pattern(n, 'x');
size_t index = result.find(pattern, index);
result.replace(index, n, replacement);
return result;
}
}
/**
* Tries to parse the given cpuString as integer numbers. On success, the specified numbers will be inserted
* into a set, which will be returned. On failure, an empty set is returned. A set is used to maintain uniqueness
......@@ -857,30 +783,16 @@ protected:
}
/**
* Adjusts the names of the sensors in generated groups according to the sensorPattern specified in the global
* settings. Operates in tandem with the auto-publish feature.
* @brief Adjusts the names of the sensors in generated groups.
*
* @return true if successful, false otherwise
*/
bool constructSensorNames() {
boost::regex sensorReg(SENSOR_PATTERN), groupReg(GROUP_PATTERN);
boost::cmatch match;
if(_sensorPattern == "")
return true;
else if (!boost::regex_search(_sensorPattern.c_str(), match, sensorReg)) {
LOG(error) << "Invalid sensor naming pattern " << _sensorPattern << ". You must at least include <sensor>!";
return false;
}
std::string name;
// Performing auto-publish for sensors
bool constructSensorTopics() {
// Sensor names are adjusted according to the respective MQTT topics
for(auto& g: _sensorGroups)
for(auto& s: g->getSensors()) {
name = _sensorPattern;
name = boost::regex_replace(name, sensorReg, s->getName());
name = boost::regex_replace(name, groupReg, g->getGroupName());
// Setting the auto-publish name back to the sensor
s->setName(name);
s->setMqtt(MQTTChecker::formatTopic(_mqttPrefix) + MQTTChecker::formatTopic(g->getMqttPart()) + MQTTChecker::formatTopic(s->getMqtt()));
s->setName(s->getMqtt());
}
return true;
}
......@@ -891,7 +803,6 @@ protected:
std::string _cfgPath;
std::string _mqttPrefix;
std::string _sensorPattern;
unsigned int _cacheInterval;
std::vector<SGroupPtr> _sensorGroupInterfaces;
std::vector<SG_Ptr> _sensorGroups;
......
......@@ -158,49 +158,37 @@ bool MSRConfigurator::readConfig(std::string cfgPath) {
}
}
//read of config finished. Now we build the mqtt-topic for every sensor
if(!constructSensorNames())
return false;
for(const auto& g : _sensorGroups) {
for(const auto& s : g->getSensors()) {
s->setMqtt(_mqttPrefix + g->getMqttPart() + s->getMqtt());
LOG(debug) << g->getGroupName() << "::" << s->getName() << " using MQTT-topic \"" << s->getMqtt() << "\"";
}
}
return true;
return constructSensorTopics();
}
void MSRConfigurator::customizeAndStore(SG_Ptr g) {
bool begin = true;
std::vector<unsigned> gCpus = g->getCpus();
std::vector<unsigned>::iterator it = gCpus.begin();
std::vector<SB_Ptr> original;
for(auto cpu : g->getCpus()) {
if (begin){
// Initializing the vector of "reference" sensors and configuring the first CPU
for(auto s : g->getSensors()) {
SB_Ptr sensor = std::dynamic_pointer_cast<MSRSensorBase>(s);
sensor->setCpu(cpu);
s->setName(s->getName(), cpu);
auto size = s->getMqtt().size();
s->setMqtt(formatMqttCPU("XX", cpu) + s->getMqtt().substr(size-2));
original.push_back(sensor);
// Copying the original sensors for reference
original.push_back(std::make_shared<MSRSensorBase>(*sensor));
sensor->setCpu(*it);
s->setMqtt(MQTTChecker::formatTopic(s->getMqtt(), *it));
}
begin = false;
} else {
for (auto s: original) {
auto s_otherCPUs = std::make_shared<MSRSensorBase>(s->getName());
std::size_t found = s->getName().find_first_of(".");
if( found != std::string::npos){
found++; //to skip the point
s_otherCPUs->setName(s->getName().substr(found), cpu);
}
s_otherCPUs->setCpu(cpu);
it++;
// Duplicating sensors for the remaining CPUs from the "reference" vector
for (; it != gCpus.end(); ++it) {
for (auto s : original) {
auto s_otherCPUs = std::make_shared<MSRSensorBase>(*s);
s_otherCPUs->setCpu(*it);
s_otherCPUs->setMetric(s->getMetric());
auto size = s->getMqtt().size();
s_otherCPUs->setMqtt(formatMqttCPU("XX", cpu) + s->getMqtt().substr(size-2));
s_otherCPUs->setMqtt(MQTTChecker::formatTopic(s->getMqtt(), *it));
g->pushBackSensor(s_otherCPUs);
}
}
}
storeSensorGroup(g);
}
......@@ -242,17 +242,10 @@ bool PerfeventConfigurator::readConfig(std::string cfgPath) {
return false;
}
}
//read of config finished. Now we build the mqtt-topic for every sensor
if(!constructSensorNames())
return false;
for(const auto& g : _sensorGroups) {
for(const auto& s : g->getSensors()) {
s->setMqtt(_mqttPrefix + g->getMqttPart() + s->getMqtt());
}
}
//we do not need them anymore
_templateCpus.clear();
return true;
//read of config finished. Now we build the mqtt-topic for every sensor
return constructSensorTopics();
}
void PerfeventConfigurator::customizeAndStore(PerfSensorGroup& group, CFG_VAL cfg) {
......@@ -290,8 +283,7 @@ void PerfeventConfigurator::customizeAndStore(PerfSensorGroup& group, CFG_VAL cf
for(const auto& s : SG->getPerfSensors()) {
s->setCpu(*it);
s->setName(s->getName(), *it);
s->setMqtt(formatMqttCPU(s->getMqtt(), *it));
s->setMqtt(MQTTChecker::formatTopic(s->getMqtt(), *it));
}
it++;
......@@ -299,11 +291,8 @@ void PerfeventConfigurator::customizeAndStore(PerfSensorGroup& group, CFG_VAL cf
for (; it != cpuSet.end(); ++it) {
for(auto s : sensors) {
std::shared_ptr<PerfSensorBase> sensor = std::make_shared<PerfSensorBase>(*s);
sensor->setCpu(*it);
sensor->setName(s->getName(), *it);
sensor->setMqtt(formatMqttCPU(s->getMqtt(), *it));
sensor->setMqtt(MQTTChecker::formatTopic(s->getMqtt(), *it));
SG->pushBackSensor(sensor);
}
}
......
......@@ -74,9 +74,7 @@ bool ProcfsConfigurator::readConfig(std::string cfgPath) {
return false;
}
}
if(!constructSensorNames())
return false;
return true;
return constructSensorTopics();
}
/**
......@@ -114,8 +112,6 @@ void ProcfsConfigurator::sensorGroup(ProcfsSensorGroup& sGroup, CFG_VAL config)
sGroup.setType(val.second.data());
} else if (boost::iequals(val.first, "path")) {
sGroup.setPath(val.second.data());
} else if (boost::iequals(val.first, "mqttStart")) {
sGroup.setMqttStart(val.second.data());
} else if (boost::iequals(val.first, "cpus")) {
sGroup.setCpuSet(parseCpuString(val.second.data()));
} else if (boost::iequals(val.first, "htVal")) {
......@@ -141,9 +137,6 @@ void ProcfsConfigurator::sensorGroup(ProcfsSensorGroup& sGroup, CFG_VAL config)
LOG(warning) << _groupName << " " << sGroup.getGroupName() << "::" << "Unspecified or invalid type! Available types are vmstat, meminfo, procstat, sar";
return; }
// if any sensor objects specified by users in the configuration file are present,
// automatic MQTT topic assignment is disabled and the topics supplied in the config are used
bool autoMQTT = sGroup.getSensors().empty();