Commit 44503d5d authored by Michael Ott's avatar Michael Ott
Browse files

Allow for multiple Cassandra/MQTT hosts to be specified, picking one randomly to connect to

parent bb8597e8
...@@ -62,8 +62,8 @@ void usage() { ...@@ -62,8 +62,8 @@ void usage() {
std::cout << std::endl; std::cout << std::endl;
std::cout << "Options:" << std::endl; std::cout << "Options:" << std::endl;
std::cout << " -b<host> MQTT broker [default: 127.0.0.1:1883]" << std::endl; std::cout << " -b<hosts> List of MQTT brokers [default: localhost:1883]" << std::endl;
std::cout << " -c<host> Cassandra host [default: 127.0.0.1:9042]" << std::endl; std::cout << " -c<hosts> List of Cassandra hosts [default: none]" << std::endl;
std::cout << " -u<username> Cassandra username [default: none]" << std::endl; std::cout << " -u<username> Cassandra username [default: none]" << std::endl;
std::cout << " -p<password> Cassandra password [default: none]" << std::endl; std::cout << " -p<password> Cassandra password [default: none]" << std::endl;
std::cout << " -t<timestamp> Timestamp value [default: now]" << std::endl; std::cout << " -t<timestamp> Timestamp value [default: now]" << std::endl;
...@@ -136,6 +136,23 @@ void convertNodeList(DCDB::NodeList& nl, std::string substitution) { ...@@ -136,6 +136,23 @@ void convertNodeList(DCDB::NodeList& nl, std::string substitution) {
} }
} }
void splitHostList(const std::string& str, std::vector<std::string>& hl, char delim = ',')
{
hl.clear();
std::stringstream ss(str);
std::string token;
while (std::getline(ss, token, delim)) {
hl.push_back(token);
}
}
void pickRandomHost(const std::vector<std::string>& hl, std::string& host, int& port) {
srand (time(NULL));
int n = rand() % hl.size();
host = parseNetworkHost(hl[n]);
port = atoi(parseNetworkPort(hl[n]).c_str());
}
/** /**
* Retrieves Slurm job data from environment variables and sends it to either a * Retrieves Slurm job data from environment variables and sends it to either a
* CollectAgent or a Cassandra database. Job data can also be passed as command * CollectAgent or a Cassandra database. Job data can also be passed as command
...@@ -149,8 +166,9 @@ int main(int argc, char** argv) { ...@@ -149,8 +166,9 @@ int main(int argc, char** argv) {
DCDB::JobDataStore *myJobDataStore = nullptr; DCDB::JobDataStore *myJobDataStore = nullptr;
struct mosquitto * _mosq = nullptr; struct mosquitto * _mosq = nullptr;
std::string host = "127.0.0.1", cassandraPort = "9042", cassandraUser = "", cassandraPassword = ""; std::vector<std::string> hostList;
int brokerPort = 1883; std::string host = "", cassandraUser = "", cassandraPassword = "";
int port;
std::string nodelist="", jobId="", userId=""; std::string nodelist="", jobId="", userId="";
std::string substitution=""; std::string substitution="";
uint64_t ts=0; uint64_t ts=0;
...@@ -183,21 +201,12 @@ int main(int argc, char** argv) { ...@@ -183,21 +201,12 @@ int main(int argc, char** argv) {
switch(ret) { switch(ret) {
case 'b': { case 'b': {
cassandra = false; cassandra = false;
host = parseNetworkHost(optarg); splitHostList(optarg, hostList);
std::string port = parseNetworkPort(optarg);
if (port != "") {
brokerPort = std::stoi(port);
} else {
brokerPort = 1883;
}
break; break;
} }
case 'c': case 'c':
cassandra = true; cassandra = true;
host = parseNetworkHost(optarg); splitHostList(optarg, hostList);
cassandraPort = parseNetworkPort(optarg);
if (cassandraPort == "")
cassandraPort = std::string("9042");
break; break;
case 'u': case 'u':
cassandra = true; cassandra = true;
...@@ -238,15 +247,24 @@ int main(int argc, char** argv) { ...@@ -238,15 +247,24 @@ int main(int argc, char** argv) {
return 1; return 1;
} }
} }
if (hostList.size() == 0) {
hostList.push_back("localhost");
}
if (cassandra) { if (cassandra) {
//Allocate and initialize connection to Cassandra. //Allocate and initialize connection to Cassandra.
dcdbConn = new DCDB::Connection(host, atoi(cassandraPort.c_str()), cassandraUser, cassandraPassword); pickRandomHost(hostList, host, port);
if (port == 0) {
port = 9042;
}
dcdbConn = new DCDB::Connection(host, port, cassandraUser, cassandraPassword);
if (!dcdbConn->connect()) { if (!dcdbConn->connect()) {
std::cerr << "Cannot connect to Cassandra!" << std::endl; std::cerr << "Cannot connect to Cassandra server " << host << ":" << port << std::endl;
return 1; return 1;
} }
std::cout << "Connected to Cassandra server " << host << ":" << port << std::endl;
myJobDataStore = new DCDB::JobDataStore(dcdbConn); myJobDataStore = new DCDB::JobDataStore(dcdbConn);
} else { } else {
//Initialize Mosquitto library and connect to broker //Initialize Mosquitto library and connect to broker
...@@ -264,10 +282,16 @@ int main(int argc, char** argv) { ...@@ -264,10 +282,16 @@ int main(int argc, char** argv) {
return 1; return 1;
} }
if (mosquitto_connect(_mosq, host.c_str(), brokerPort, 1000) != MOSQ_ERR_SUCCESS) { pickRandomHost(hostList, host, port);
std::cerr << "Could not connect to MQTT broker " << host << ":" << std::to_string(brokerPort) << std::endl; if (port == 0) {
port = 1883;
}
if (mosquitto_connect(_mosq, host.c_str(), port, 1000) != MOSQ_ERR_SUCCESS) {
std::cerr << "Could not connect to MQTT broker " << host << ":" << port << std::endl;
return 1; return 1;
} }
std::cout << "Connected to MQTT broker " << host << ":" << port << std::endl;
} }
//collect job data //collect job data
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment