Commit 5d131975 authored by Micha Mueller's avatar Micha Mueller
Browse files

Add possibility to define a local MQTT-prefix for every plugin

Every sensor stores its full MQTT-topic now
parent fa86d2b9
globalHost {
sessiontimeout 500
retransmissiontimeout 200
mqttprefix /AABBAABBAABBAACCDDCCDDCC
}
sensors {
templateSensors {
sensor energy {
type raw
freq 1000
......
global {
mqttPrefix /FF112233445566778899AABBFFFF
}
SensorTemplate {
sensor def1 {
path /home/micha/LRZ/dcdbOwnFork/sysfspusher/temp
......
......@@ -44,7 +44,7 @@ Configuration::~Configuration() {
}
}
bool Configuration::read() {
bool Configuration::read(global_t cmdParams) {
std::string globalConfig = _cfgFilePath;
globalConfig.append("global.conf");
......@@ -81,6 +81,23 @@ bool Configuration::read() {
}
}
//overwrite global variables with params from cmd-line if provided
if (cmdParams.brokerHost != "") {
_global.brokerHost = cmdParams.brokerHost;
}
if (cmdParams.brokerPort != -1) {
_global.brokerPort = cmdParams.brokerPort;
}
if (cmdParams.mqttPrefix != "") {
_global.mqttPrefix = cmdParams.mqttPrefix;
}
if (cmdParams.threads != 0) {
_global.threads = cmdParams.threads;
}
if (cmdParams.daemonize != -1) {
_global.daemonize = cmdParams.daemonize;
}
//read plugins
BOOST_FOREACH(boost::property_tree::iptree::value_type &plugin, cfg.get_child("plugins")) {
if (boost::iequals(plugin.first, "plugin")) {
......@@ -145,6 +162,8 @@ bool Configuration::read() {
}
dynLib.configurator = dynLib.create();
//set prefix to global prefix (may be overwritten)
dynLib.configurator->setMqttPrefix(_global.mqttPrefix);
//read in config
std::vector<Sensor*>& sensors = dynLib.configurator->readConfig(pluginConfig);
......@@ -187,6 +206,12 @@ bool Configuration::readSensorVals(Sensor& sensor, boost::property_tree::iptree&
}*/
bool Configuration::checkMqtt(const std::string& mqtt) {
//MQTT topic must have at least 128 bit = 16 bytes = 32 hex chars
//but can have more (with some extra '/')
if (mqtt.length() < 32) {
cout << "MQTT-Topic \"" << mqtt << "\" not long enough!" << endl;
return false;
}
auto returnIt = _mqttSuffixes.insert(mqtt);
if (!returnIt.second) {
return false;
......
......@@ -54,9 +54,10 @@ public:
* Reads and sets global values as well as all sensors.
* Detects which sensor types are required and dynamically opens required libraries.
*
* @param cmdParams struct with global values provided from cmd-line
* @return true on success, false otherwise
*/
bool read();
bool read(global_t cmdParams);
/**
* Read and set general sensor values (like interval, minvalues, ...).
......
......@@ -27,6 +27,13 @@ public:
* @return Reference to the vector of sensors which were created
*/
virtual std::vector<Sensor*>& readConfig(std::string cfgPath) = 0;
void setMqttPrefix(const std::string& mqttPrefix) {
_mqttPrefix = mqttPrefix;
}
protected:
std::string _mqttPrefix;
};
//typedef for more readable usage of create()- and destroy()-methods, required for dynamic libraries
......
......@@ -15,7 +15,7 @@ extern volatile int keepRunning;
MQTTPusher::MQTTPusher(int brokerPort, const std::string& brokerHost,
const std::string& mqttPrefix, sensorVector_t& sensors) :
_brokerPort(brokerPort), _brokerHost(brokerHost),
_mqttPrefix(mqttPrefix),_sensors(sensors),_connected(false) {
_sensors(sensors),_connected(false) {
//first print some info
int mosqMajor, mosqMinor, mosqRevision;
......@@ -32,8 +32,8 @@ MQTTPusher::MQTTPusher(int brokerPort, const std::string& brokerHost,
//init mosquitto-struct
mosquitto_lib_init();
std::string clientID(_mqttPrefix);
_mosq = mosquitto_new(_mqttPrefix.c_str(), false, NULL);
std::string clientID(mqttPrefix);
_mosq = mosquitto_new(mqttPrefix.c_str(), false, NULL);
if (!_mosq) {
perror(NULL);
exit(EXIT_FAILURE);
......@@ -90,7 +90,7 @@ void MQTTPusher::push() {
}
#endif
//try to send them to the broker
if (mosquitto_publish(_mosq, NULL, (_mqttPrefix+s->getMqtt()).c_str(), sizeof(reading_t)*count, reads, 0, false) != MOSQ_ERR_SUCCESS) {
if (mosquitto_publish(_mosq, NULL, (s->getMqtt()).c_str(), sizeof(reading_t)*count, reads, 0, false) != MOSQ_ERR_SUCCESS) {
//could not send them --> push the sensor values back into the queue
std::cerr << "mosquitto: could not send message! Trying again later" << std::endl;
......
......@@ -25,7 +25,6 @@ public:
private:
int _brokerPort;
std::string _brokerHost;
std::string _mqttPrefix;
sensorVector_t& _sensors;
struct mosquitto* _mosq;
bool _connected;
......
......@@ -20,7 +20,7 @@ debug: CXXFLAGS += -DDEBUG
debug: $(TARGET)
clean:
rm -f *.o *.so $(TARGET)
rm -f *.o *.so* $(TARGET)
install: $(TARGET)
install $(TARGET) $(DCDBDEPLOYPATH)/bin/
......
......@@ -74,18 +74,42 @@ int main(int argc, char** argv) {
return 1;
}
//check if help-flag specified
//struct with global settings as provided by cmd-line.
//Use clear default values to identify which ones were overwritten with cmd line parameters later.
global_t cmdParams;
cmdParams.brokerHost = "";
cmdParams.brokerPort = -1;
cmdParams.mqttPrefix = "";
cmdParams.threads = 0;
cmdParams.daemonize = -1;
//read in params
char c;
while ((c = getopt(argc, argv, "b:p:m:t:dh")) != -1) {
switch (c)
{
case 'b':
cmdParams.brokerHost = optarg;
break;
case 'p':
cmdParams.brokerPort = atoi(optarg);
break;
case 'm':
cmdParams.mqttPrefix = optarg;
break;
case 't':
cmdParams.threads = stoul(optarg);
break;
case 'd':
cmdParams.daemonize = 1;
break;
case 'h':
printSyntax();
return 1;
break;
default:
//do nothing (other options are read later)
break;
if (c != '?') cerr << "Unknown parameter: " << c << endl;
return 1;
}
}
......@@ -98,44 +122,13 @@ int main(int argc, char** argv) {
Configuration cfg(argv[argc-1]);
//Read in Configuration. Also creates all Sensors
if(!cfg.read()) {
if(!cfg.read(cmdParams)) {
return 1;
}
global_t globalSettings = cfg.getGlobal();
sensorVector_t sensors = cfg.getSensors();
//reset getopt()
optind = 1;
//read in options (overwrite global.conf settings if necessary)
while ((c = getopt(argc, argv, "b:p:m:t:dh")) != -1) {
switch (c)
{
case 'b':
globalSettings.brokerHost = optarg;
break;
case 'p':
globalSettings.brokerPort = atoi(optarg);
break;
case 'm':
globalSettings.mqttPrefix = optarg;
break;
case 't':
globalSettings.threads = stoul(optarg);
break;
case 'd':
globalSettings.daemonize = 1;
break;
case 'h':
printSyntax();
return 1;
break;
default:
if (c != '?') cerr << "Unknown parameter: " << c << endl;
return 1;
}
}
//give user some feedback
cout << endl << "Using global settings:" << endl;
cout << " Broker: " << globalSettings.brokerHost << ":" << globalSettings.brokerPort << endl;
......
......@@ -23,7 +23,7 @@ namespace DCDB {
}
IPMIConfigurator::~IPMIConfigurator() {
for(auto s : _sensors) {
for(auto s : _templateSensors) {
delete s;
}
}
......@@ -38,10 +38,12 @@ namespace DCDB {
_globalHost.sessionTimeout = stoi(global.second.data());
} else if(boost::iequals(global.first, "RetransmissionTimeout")) {
_globalHost.retransmissionTimeout = stoi(global.second.data());
} else if(boost::iequals(global.first, "mqttprefix")) {
_mqttPrefix = global.second.data();
}
}
BOOST_FOREACH(boost::property_tree::iptree::value_type &sensor, cfg.get_child("sensors")) {
BOOST_FOREACH(boost::property_tree::iptree::value_type &sensor, cfg.get_child("templateSensors")) {
if (boost::iequals(sensor.first, "sensor")) {
std::cout << "Sensor " << sensor.second.data() << std::endl;
if (!sensor.second.empty()) {
......@@ -62,7 +64,7 @@ namespace DCDB {
}
}
}
_sensors.insert(sensorMap_t::value_type(ipmiSensor.getName(), ipmiSensor));
_templateSensors.insert(sensorMap_t::value_type(ipmiSensor.getName(), ipmiSensor));
}
}
}
......@@ -72,6 +74,7 @@ namespace DCDB {
std::cout << "Host " << host.second.data() << std::endl;
_hosts.push_back(DCDB::IPMIHost(host.second.data(), *this));
DCDB::IPMIHost &ipmiHost = _hosts.back();
ipmiHost.setMqttPrefix(_mqttPrefix);
if (!host.second.empty()) {
BOOST_FOREACH(boost::property_tree::iptree::value_type &h, host.second) {
std::cout << " " << h.first << " " << h.second.data() << std::endl;
......@@ -82,8 +85,8 @@ namespace DCDB {
} else if (boost::iequals(h.first, "sensors")) {
BOOST_FOREACH(boost::property_tree::iptree::value_type &s, h.second) {
std::cout << " " << s.first << " " << s.second.data() << std::endl;
sensorMap_t::iterator it = _sensors.find(s.second.data());
if (it != _sensors.end()) {
sensorMap_t::iterator it = _templateSensors.find(s.second.data());
if (it != _templateSensors.end()) {
DCDB::IPMISensor sensor = it->second;
boost::optional< boost::property_tree::iptree& > mqtt = s.second.get_child_optional("mqttsuffix");
if (mqtt) {
......@@ -99,12 +102,13 @@ namespace DCDB {
}
}
}
for (auto s : _sensors) {
//FIXME mqtt problem
s.second.setMqtt(std::string(s.second.getHost()->getMqttPrefix()+s.second.getMqtt()));
_sensorP.push_back(&s.second);
for (auto& h : _hosts) {
for (auto& s : h.getSensors()) {
s.setMqtt(std::string(h.getMqttPrefix()+s.getMqtt()));
_sensors.push_back(&s);
}
}
return _sensorP;
return _sensors;
}
} /* namespace DCDB */
......@@ -35,8 +35,8 @@ namespace DCDB {
std::vector<Sensor*>& readConfig(std::string cfgPath);
private:
std::vector<Sensor*> _sensorP;
sensorMap_t _sensors;
std::vector<Sensor*> _sensors;
sensorMap_t _templateSensors;
hostList_t _hosts;
globalHost_t _globalHost;
};
......
......@@ -87,14 +87,4 @@ namespace DCDB {
}
}
/* void IPMISensor::setMqttSuffix(const std::string& mqttSuffix) {
_mqttSuffix = mqttSuffix;
if (_mqttSuffix.front() != '/') {
_mqttSuffix.insert(0, "/");
}
if (_mqttSuffix.back() == '/') {
_mqttSuffix.erase(_mqttSuffix.size()-1);
}
}*/
} /* namespace DCDB */
......@@ -67,6 +67,23 @@ std::vector<Sensor*>& PerfeventConfigurator::readConfig(std::string cfgPath) {
boost::property_tree::iptree cfg;
boost::property_tree::read_info(cfgPath, cfg);
//read global variables (if present overwrite those from global.conf)
//currently only overwriting of mqttPrefix is supported
boost::optional<boost::property_tree::iptree&> globalVals = cfg.get_child_optional("global");
if (globalVals) {
BOOST_FOREACH(boost::property_tree::iptree::value_type &global, cfg.get_child("global")) {
if(boost::iequals(global.first, "mqttprefix")) {
_mqttPrefix = global.second.data();
if (_mqttPrefix[_mqttPrefix.length()-1] != '/') {
_mqttPrefix.append("/");
}
cout << " Using own MQTT-Prefix " << _mqttPrefix << endl;
} else {
cout << " Value \"" << global.first << "\" not recognized. Omitting..." << endl;
}
}
}
BOOST_FOREACH(boost::property_tree::iptree::value_type &counter, cfg.get_child("CounterTemplate")) {
if (boost::iequals(counter.first, "counter")) {
cout << "Template Counter \"" << counter.second.data() << "\"" << endl;
......@@ -114,8 +131,8 @@ std::vector<Sensor*>& PerfeventConfigurator::readConfig(std::string cfgPath) {
string incMqtt = increaseMqtt(startMqtt, i);
perfCC->setCpuId(i);
perfCC->setMqtt(incMqtt);
std::cout << " CPU " << perfCC->getCpuId() << " using MQTT-Suffix " << incMqtt << std::endl;
perfCC->setMqtt(_mqttPrefix + incMqtt);
std::cout << " CPU " << perfCC->getCpuId() << " using MQTT-Topic " << perfCC->getMqtt() << std::endl;
_sensors.push_back(perfCC);
}
} else {
......@@ -139,7 +156,7 @@ bool PerfeventConfigurator::readCounter(PerfCounter& counter, boost::property_tr
enumMap_t::iterator it = _enumType.find(val.second.data());
if(it != _enumType.end()) {
counter.setType(it->second);
cout << " Type: " << val.second.data() << " (=" << counter.getType() << ")" << endl;
cout << " Type: " << val.second.data() << " (= " << counter.getType() << ")" << endl;
} else {
cout << " Type \"" << val.second.data() << "\" not known." << endl;
return false;
......
......@@ -28,6 +28,23 @@ std::vector<Sensor*>& SysfsConfigurator::readConfig(std::string cfgPath) {
boost::property_tree::iptree cfg;
boost::property_tree::read_info(cfgPath, cfg);
//read global variables (if present overwrite those from global.conf)
//currently only overwriting of mqttPrefix is supported
boost::optional<boost::property_tree::iptree&> globalVals = cfg.get_child_optional("global");
if (globalVals) {
BOOST_FOREACH(boost::property_tree::iptree::value_type &global, cfg.get_child("global")) {
if(boost::iequals(global.first, "mqttprefix")) {
_mqttPrefix = global.second.data();
if (_mqttPrefix[_mqttPrefix.length()-1] != '/') {
_mqttPrefix.append("/");
}
cout << " Using own MQTT-Prefix " << _mqttPrefix << endl;
} else {
cout << " Value \"" << global.first << "\" not recognized. Omitting..." << endl;
}
}
}
//read template sensors
BOOST_FOREACH(boost::property_tree::iptree::value_type &sensor, cfg.get_child("SensorTemplate")) {
if (boost::iequals(sensor.first, "sensor")) {
......@@ -75,7 +92,7 @@ void SysfsConfigurator::readSensor(SysfsSensor& sensor, boost::property_tree::ip
} else if (boost::iequals(s.first, "interval")) {
sensor.setInterval(stoull(s.second.data()));
} else if (boost::iequals(s.first, "mqttsuffix")) {
sensor.setMqtt(s.second.data());
sensor.setMqtt(_mqttPrefix + s.second.data());
} else if (boost::iequals(s.first, "minValues")) {
sensor.setMinValues(stoull(s.second.data()));
} else if (boost::iequals(s.first, "filter")) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment