Commit b57cedf4 authored by Michael Ott's avatar Michael Ott

Iterate over all MQTT brokers if connections fail

parent be25a587
......@@ -148,11 +148,14 @@ void splitHostList(const std::string& str, std::vector<std::string>& hl, char de
}
}
void pickRandomHost(const std::vector<std::string>& hl, std::string& host, int& port) {
void pickRandomHost(std::vector<std::string>& hl, std::string& host, int& port, bool erase = false) {
srand (time(NULL));
int n = rand() % hl.size();
host = parseNetworkHost(hl[n]);
port = atoi(parseNetworkPort(hl[n]).c_str());
if (erase) {
hl.erase(hl.begin()+n);
}
}
/**
......@@ -293,16 +296,24 @@ int main(int argc, char** argv) {
return 1;
}
pickRandomHost(hostList, host, port);
if (port == 0) {
port = 1883;
}
if (mosquitto_connect(_mosq, host.c_str(), port, 1000) != MOSQ_ERR_SUCCESS) {
std::cerr << "Could not connect to MQTT broker " << host << ":" << port << std::endl;
int ret = MOSQ_ERR_UNKNOWN;
do {
pickRandomHost(hostList, host, port, true);
if (port == 0) {
port = 1883;
}
if ((ret = mosquitto_connect(_mosq, host.c_str(), port, 1000)) != MOSQ_ERR_SUCCESS) {
std::cerr << "Could not connect to MQTT broker " << host << ":" << port << " (" << mosquitto_strerror(ret) << ")" <<std::endl;
} else {
std::cout << "Connected to MQTT broker " << host << ":" << port << ", using QoS " << qos << std::endl;
break;
}
} while (hostList.size() > 0);
if (ret != MOSQ_ERR_SUCCESS) {
std::cout << "No more MQTT brokers left, aborting" << std::endl;
return 1;
}
std::cout << "Connected to MQTT broker " << host << ":" << port << ", using QoS " << qos << std::endl;
}
//collect job data
......@@ -421,20 +432,21 @@ int main(int argc, char** argv) {
mosquitto_publish_callback_set(_mosq, publishCallback);
uint64_t startTs = getTimestamp();
int ret = MOSQ_ERR_UNKNOWN;
//send it to broker
if (mosquitto_publish(_mosq, &msgId, topic.c_str(), payload.length(), payload.c_str(), qos, false) != MOSQ_ERR_SUCCESS) {
std::cerr << "Broker not reachable! Job data was not published." << std::endl;
if ((ret = mosquitto_publish(_mosq, &msgId, topic.c_str(), payload.length(), payload.c_str(), qos, false)) != MOSQ_ERR_SUCCESS) {
std::cerr << "Could not publish job data via MQTT: " << mosquitto_strerror(ret) << std::endl;
retCode = 1;
goto exit;
}
do {
if (mosquitto_loop(_mosq, -1, 1) != MOSQ_ERR_SUCCESS) {
std::cerr << "Error in mosquitto_loop!" << std::endl;
retCode = 1;
goto exit;
}
} while(!done && getTimestamp() - startTs < SLURMJOBTIMEOUT);
if ((ret = mosquitto_loop(_mosq, -1, 1)) != MOSQ_ERR_SUCCESS) {
std::cerr << "Error in mosquitto_loop: " << mosquitto_strerror(ret) << std::endl;
retCode = 1;
goto exit;
}
} while(!done && getTimestamp() - startTs < SLURMJOBTIMEOUT);
}
//hasta la vista
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment