10.12., 9:00 - 11:00: Due to updates GitLab may be unavailable for some minutes between 09:00 and 11:00.

Commit 1e559ae7 authored by Micha Müller's avatar Micha Müller

Slurmjob tool: revive functionality to insert into Cassandra directly

parent 4fb4ad5c
......@@ -30,6 +30,7 @@
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>
#include <cstdlib>
#include <dcdb/connection.h>
#include <dcdb/jobdatastore.h>
#include <iostream>
#include <mosquitto.h>
......@@ -40,11 +41,15 @@
void usage() {
std::cout << "Usage:" << std::endl;
std::cout << " dcdbslurmjob [-b<host>] [-t<timestamp>] [-n<nodelist>] [-j<jobid>] [-i<userid>] start|stop" << std::endl;
std::cout << " dcdbslurmjob [-c<host>] [-u<username>] [-p<password>] [-t<timestamp>] [-n<nodelist>] [-j<jobid>] [-i<userid>] start|stop" << std::endl;
std::cout << " dcdbslurmjob -h" << std::endl;
std::cout << std::endl;
std::cout << "Options:" << std::endl;
std::cout << " -b<host> MQTT broker [default: 127.0.0.1:1883]" << std::endl;
std::cout << " -c<host> Cassandra host [default: 127.0.0.1:9042]" << std::endl;
std::cout << " -u<username> Cassandra username [default: none]" << std::endl;
std::cout << " -p<password> Cassandra password [default: none]" << std::endl;
std::cout << " -t<timestamp> Timestamp value [default: now]" << std::endl;
std::cout << " -n<nodelist> Comma-separated nodelist [default: SLURM_JOB_NODELIST]" << std::endl;
std::cout << " -j<jobid> Numerical job id [default: SLURM_JOB_ID var]" << std::endl;
......@@ -52,6 +57,7 @@ void usage() {
std::cout << std::endl;
std::cout << " -h This help page" << std::endl;
std::cout << std::endl;
std::cout << "Options -b and -c|u|p are mutual exclusive! If both are specified, the latter takes precedence. By default MQTT broker is specified." << std::endl;
}
std::string getEnv(const char* var) {
......@@ -74,19 +80,25 @@ void splitNodeList(const std::string& str, DCDB::NodeList& nl, char delim = ',')
}
/**
* Retrieves Slurm job data from environment variables and pushes them to the
* specified CollectAgent. Alternatively, job data can be passed as command line
* options.
* Retrieves Slurm job data from environment variables and sends it to either a
* CollectAgent or a Cassandra database. Job data can also be passed as command
* line options.
*/
int main(int argc, char** argv) {
std::cout << "dcdbslurmjob " << VERSION << std::endl << std::endl;
std::string brokerHost = "127.0.0.1";
bool cassandra = false;
DCDB::Connection * dcdbConn = nullptr;
DCDB::JobDataStore *myJobDataStore = nullptr;
struct mosquitto * _mosq = nullptr;
std::string host = "127.0.0.1", cassandraPort = "9042", cassandraUser = "", cassandraPassword = "";
int brokerPort = 1883;
std::string nodelist="", jobId="", userId="";
uint64_t ts=0;
// Defining options
const char *opts = "b:n:t:j:i:h";
const char *opts = "b:c:u:p:n:t:j:i:h";
char ret;
while ((ret = getopt(argc, argv, opts))!=-1) {
......@@ -112,7 +124,8 @@ int main(int argc, char** argv) {
while ((ret=getopt(argc, argv, opts))!=-1) {
switch(ret) {
case 'b': {
brokerHost = parseNetworkHost(optarg);
cassandra = false;
host = parseNetworkHost(optarg);
std::string port = parseNetworkPort(optarg);
if (port != "") {
brokerPort = std::stoi(port);
......@@ -121,6 +134,28 @@ int main(int argc, char** argv) {
}
break;
}
case 'c':
cassandra = true;
host = parseNetworkHost(optarg);
cassandraPort = parseNetworkPort(optarg);
if (cassandraPort == "")
cassandraPort = std::string("9042");
break;
case 'u':
cassandra = true;
cassandraUser = optarg;
break;
case 'p': {
cassandra = true;
cassandraPassword = optarg;
// What does this do? Mask the password?
size_t pwdLen = strlen(optarg);
memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
if (pwdLen > 3) {
memset(optarg + 3, 0, pwdLen - 3);
}
break;
}
case 'n':
nodelist = optarg;
break;
......@@ -140,25 +175,35 @@ int main(int argc, char** argv) {
}
}
//Initialize Mosquitto library and connect to broker
struct mosquitto *_mosq;
char hostname[256];
if (cassandra) {
//Allocate and initialize connection to Cassandra.
dcdbConn = new DCDB::Connection(host, atoi(cassandraPort.c_str()), cassandraUser, cassandraPassword);
if (gethostname(hostname, 255) != 0) {
std::cerr << "Cannot get hostname!";
return 1;
}
hostname[255] = '\0';
mosquitto_lib_init();
_mosq = mosquitto_new(hostname, false, NULL);
if (!_mosq) {
perror(NULL);
return 1;
}
if (!dcdbConn->connect()) {
std::cerr << "Cannot connect to Cassandra!" << std::endl;
return 1;
}
myJobDataStore = new DCDB::JobDataStore(dcdbConn);
} else {
//Initialize Mosquitto library and connect to broker
char hostname[256];
if (mosquitto_connect(_mosq, brokerHost.c_str(), brokerPort, 1000) != MOSQ_ERR_SUCCESS) {
std::cerr << "Could not connect to MQTT broker " << brokerHost << ":" << std::to_string(brokerPort);
return 1;
if (gethostname(hostname, 255) != 0) {
std::cerr << "Cannot get hostname!" << std::endl;
return 1;
}
hostname[255] = '\0';
mosquitto_lib_init();
_mosq = mosquitto_new(hostname, false, NULL);
if (!_mosq) {
perror(NULL);
return 1;
}
if (mosquitto_connect(_mosq, host.c_str(), brokerPort, 1000) != MOSQ_ERR_SUCCESS) {
std::cerr << "Could not connect to MQTT broker " << host << ":" << std::to_string(brokerPort) << std::endl;
return 1;
}
}
//collect job data
......@@ -192,9 +237,15 @@ int main(int argc, char** argv) {
jd.endTime = DCDB::TimeStamp((uint64_t)0);
jd.nodes = nl;
} catch(const std::invalid_argument& e) {
std::cerr << "Invalid input format!";
retCode = 1;
goto exit;
std::cerr << "Invalid input format!" << std::endl;
retCode = 1;
goto exit;
}
if (cassandra && (myJobDataStore->insertJob(jd) != DCDB::JD_OK)) {
std::cerr << "Job data insert failed!" << std::endl;
retCode = 1;
goto exit;
}
} else if (boost::iequals(argv[argc-1], "stop")) {
......@@ -204,14 +255,31 @@ int main(int argc, char** argv) {
try {
jd.jobId = jobId;
jd.endTime = DCDB::TimeStamp(ts);
} catch(const std::invalid_argument& e) {
std::cerr << "Invalid input format!";
retCode = 1;
goto exit;
} catch (const std::invalid_argument &e) {
std::cerr << "Invalid input format!" << std::endl;
retCode = 1;
goto exit;
}
if (cassandra) {
DCDB::JobData jobStart;
if (myJobDataStore->getJobById(jobStart, jd.jobId) != DCDB::JD_OK) {
std::cerr << "Could not retrieve job to be updated!" << std::endl;
retCode = 1;
goto exit;
}
if (myJobDataStore->updateEndtime(jobStart.jobId, jobStart.startTime, jd.endTime) != DCDB::JD_OK) {
std::cerr << "Could not update end time of job!" << std::endl;
retCode = 1;
goto exit;
}
}
}
{
//Message sent to CollectAgent is independent of start/stop. We send the
//same JSON in either case. CA does job insert or update depending
//on job endtime value.
if (!cassandra) {
//create job data string in JSON format
std::string payload = "";
std::string topic = "/DCDB_JOBDATA/"; //do not change or keep in sync with simplemqttservermessage.h
......@@ -234,13 +302,13 @@ int main(int argc, char** argv) {
//send it to broker
if (mosquitto_publish(_mosq, NULL, topic.c_str(), payload.length(), payload.c_str(), 1, false) != MOSQ_ERR_SUCCESS) {
std::cerr << "Broker not reachable! Job data was not published.";
std::cerr << "Broker not reachable! Job data was not published." << std::endl;
retCode = 1;
goto exit;
}
if (mosquitto_loop(_mosq, -1, 1) != MOSQ_ERR_SUCCESS) {
std::cerr << "Error in mosquitto_loop!";
std::cerr << "Error in mosquitto_loop!" << std::endl;
retCode = 1;
goto exit;
}
......@@ -248,8 +316,14 @@ int main(int argc, char** argv) {
//hasta la vista
exit:
mosquitto_disconnect(_mosq);
mosquitto_destroy(_mosq);
mosquitto_lib_cleanup();
if (cassandra) {
delete myJobDataStore;
dcdbConn->disconnect();
delete dcdbConn;
} else {
mosquitto_disconnect(_mosq);
mosquitto_destroy(_mosq);
mosquitto_lib_cleanup();
}
return retCode;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment