Commit 89ebfe7b authored by Alessio Netti's avatar Alessio Netti
Browse files

dcdbslurmjob: added support for SLURM job packs and job arrays

- In the case of job pack leaders, two entries are generated: one for
the sub-job, and another one for the whole pack.
parent 64276567
......@@ -43,6 +43,12 @@
int msgId = -1;
bool done = false;
DCDB::Connection * dcdbConn = nullptr;
DCDB::JobDataStore *myJobDataStore = nullptr;
struct mosquitto * myMosq = nullptr;
int timeout = 10;
int qos = 1;
void publishCallback(struct mosquitto *mosq, void *obj, int mid) {
if(msgId != -1 && mid == msgId)
done = true;
......@@ -158,6 +164,72 @@ void pickRandomHost(std::vector<std::string>& hl, std::string& host, int& port,
}
}
int insertJob(DCDB::JobData job, bool start, bool cassandra) {
// Cassandra-based job insertion
if (cassandra) {
if (start) {
if(myJobDataStore->insertJob(job) != DCDB::JD_OK) {
std::cerr << "Job data insert for job " << job.jobId << " failed!" << std::endl;
return 1;
}
}
else {
DCDB::JobData jobStart;
if (myJobDataStore->getJobById(jobStart, job.jobId, job.domainId) != DCDB::JD_OK) {
std::cerr << "Could not retrieve job " << job.jobId << " to be updated!" << std::endl;
return 1;
}
if (myJobDataStore->updateEndtime(jobStart.jobId, jobStart.startTime, job.endTime, job.domainId) != DCDB::JD_OK) {
std::cerr << "Could not update end time of job " << job.jobId << "!" << std::endl;
return 1;
}
}
// MQTT-based job insertion
} else {
// Create job data string in JSON format
std::string payload = "";
std::string topic = "/DCDB_JOBDATA/"; // Do not change or keep in sync with simplemqttservermessage.h
boost::property_tree::ptree config;
std::ostringstream output;
config.clear();
config.push_back(boost::property_tree::ptree::value_type("domainid", boost::property_tree::ptree(job.domainId)));
config.push_back(boost::property_tree::ptree::value_type("jobid", boost::property_tree::ptree(job.jobId)));
config.push_back(boost::property_tree::ptree::value_type("userid", boost::property_tree::ptree(job.userId)));
config.push_back(boost::property_tree::ptree::value_type("starttime", boost::property_tree::ptree(std::to_string(job.startTime.getRaw()))));
config.push_back(boost::property_tree::ptree::value_type("endtime", boost::property_tree::ptree(std::to_string(job.endTime.getRaw()))));
boost::property_tree::ptree nodes;
for (const auto &n : job.nodes) {
nodes.push_back(boost::property_tree::ptree::value_type("", boost::property_tree::ptree(n)));
}
config.push_back(boost::property_tree::ptree::value_type("nodes", nodes));
boost::property_tree::write_json(output, config, true);
payload = output.str();
//std::cout << "Payload:\n" << payload << std::endl;
mosquitto_publish_callback_set(myMosq, publishCallback);
uint64_t startTs = getTimestamp();
msgId = -1;
done = false;
int ret = MOSQ_ERR_UNKNOWN;
// Message sent to CollectAgent is independent of start/stop. We send the
// same JSON in either case. CA does job insert or update depending
// on job endtime value.
if ((ret = mosquitto_publish(myMosq, &msgId, topic.c_str(), payload.length(), payload.c_str(), qos, false)) != MOSQ_ERR_SUCCESS) {
std::cerr << "Could not publish data for job " << job.jobId << " via MQTT: " << mosquitto_strerror(ret) << std::endl;
return 1;
}
do {
if ((ret = mosquitto_loop(myMosq, -1, 1)) != MOSQ_ERR_SUCCESS) {
std::cerr << "Error in mosquitto_loop for job " << job.jobId << ": " << mosquitto_strerror(ret) << std::endl;
return 1;
}
} while(!done && getTimestamp() - startTs < (uint64_t)S_TO_NS(timeout));
}
return true;
}
/**
* Retrieves Slurm job data from environment variables and sends it to either a
* CollectAgent or a Cassandra database. Job data can also be passed as command
......@@ -167,18 +239,14 @@ int main(int argc, char** argv) {
std::cout << "dcdbslurmjob " << VERSION << std::endl << std::endl;
bool cassandra = false;
DCDB::Connection * dcdbConn = nullptr;
DCDB::JobDataStore *myJobDataStore = nullptr;
struct mosquitto * _mosq = nullptr;
std::vector<std::string> hostList;
std::string host = "", cassandraUser = "", cassandraPassword = "";
int port;
std::string nodelist="", jobId="", userId="", stepId="";
std::string nodelist="", pnodelist="", jobId="", userId="", stepId="", packId="", taskId="";
std::string domainId = JOB_DEFAULT_DOMAIN;
std::string substitution="";
int maxJobLength = -1;
int qos = 1;
int timeout = 10;
uint64_t ts=0;
......@@ -277,8 +345,9 @@ int main(int argc, char** argv) {
hostList.push_back("localhost");
}
// Initialize transport
if (cassandra) {
//Allocate and initialize connection to Cassandra.
// Allocate and initialize connection to Cassandra.
pickRandomHost(hostList, host, port);
if (port == 0) {
port = 9042;
......@@ -292,7 +361,7 @@ int main(int argc, char** argv) {
std::cout << "Connected to Cassandra server " << host << ":" << port << std::endl;
myJobDataStore = new DCDB::JobDataStore(dcdbConn);
} else {
//Initialize Mosquitto library and connect to broker
// Initialize Mosquitto library and connect to broker
char hostname[256];
if (gethostname(hostname, 255) != 0) {
......@@ -301,8 +370,8 @@ int main(int argc, char** argv) {
}
hostname[255] = '\0';
mosquitto_lib_init();
_mosq = mosquitto_new(hostname, false, NULL);
if (!_mosq) {
myMosq = mosquitto_new(hostname, false, NULL);
if (!myMosq) {
perror(NULL);
return 1;
}
......@@ -314,7 +383,7 @@ int main(int argc, char** argv) {
port = 1883;
}
if ((ret = mosquitto_connect(_mosq, host.c_str(), port, 1000)) != MOSQ_ERR_SUCCESS) {
if ((ret = mosquitto_connect(myMosq, host.c_str(), port, 1000)) != MOSQ_ERR_SUCCESS) {
std::cerr << "Could not connect to MQTT broker " << host << ":" << port << " (" << mosquitto_strerror(ret) << ")" <<std::endl;
} else {
std::cout << "Connected to MQTT broker " << host << ":" << port << ", using QoS " << qos << std::endl;
......@@ -328,19 +397,37 @@ int main(int argc, char** argv) {
}
}
//collect job data
// Collect job data
bool start = boost::iequals(argv[argc-1], "start");
bool isPackLeader=false;
DCDB::JobData jd;
int retCode = 0;
if(ts==0)
// Fetching timestamp
if(ts==0) {
ts = getTimestamp();
}
// Fetching job ID
if(jobId=="") {
// Is this a job array?
if((jobId=getEnv("SLURM_ARRAY_JOB_ID")) != "" && (taskId = getEnv("SLURM_ARRAY_TASK_ID")) != "") {
jobId = jobId + "_" + taskId;
// Is this a job pack? Packs and arrays cannot be combined in SLURM
} else if ((jobId=getEnv("SLURM_PACK_JOB_ID")) != "" && (packId = getEnv("SLURM_PACK_JOB_OFFSET")) != "") {
isPackLeader = packId=="0";
jobId = jobId + "+" + packId;
// In this case, packId contains the job ID of the whole pack
packId = getEnv("SLURM_PACK_JOB_ID");
// Is this an ordinary job?
} else {
jobId = getEnv("SLURM_JOB_ID");
if (jobId == "") {
jobId = getEnv("SLURM_JOBID");
}
}
// Is this a step within a job/pack/array?
stepId = getEnv("SLURM_STEP_ID");
if (stepId=="") {
stepId = getEnv("SLURM_STEPID");
......@@ -349,7 +436,7 @@ int main(int argc, char** argv) {
jobId = jobId + SLURM_JOBSTEP_SEP + stepId;
}
if (boost::iequals(argv[argc-1], "start")) {
// Fetching user ID
if(userId=="") {
userId = getEnv("SLURM_JOB_USER");
if(userId=="") {
......@@ -357,14 +444,19 @@ int main(int argc, char** argv) {
}
}
DCDB::NodeList nl, pnl;
if (start) {
if(nodelist=="") {
nodelist = getEnv("SLURM_JOB_NODELIST");
if(nodelist=="") {
if (nodelist == "") {
nodelist = getEnv("SLURM_NODELIST");
}
// Getting the whole pack's node list, if necessary
if(isPackLeader) {
pnodelist = getEnv("SLURM_PACK_JOB_NODELIST");
}
}
DCDB::NodeList nl;
splitNodeList(nodelist, nl);
convertNodeList(nl, substitution);
......@@ -383,6 +475,17 @@ int main(int argc, char** argv) {
}
std::cout << std::endl;
// Only for job pack leaders that are starting up
if(isPackLeader) {
splitNodeList(pnodelist, pnl);
convertNodeList(pnl, substitution);
std::cout << "PACK =";
for (auto &n: nl) {
std::cout << " " << n;
}
std::cout << std::endl;
}
try {
jd.domainId = domainId;
jd.jobId = jobId;
......@@ -396,11 +499,6 @@ int main(int argc, char** argv) {
goto exit;
}
if (cassandra && (myJobDataStore->insertJob(jd) != DCDB::JD_OK)) {
std::cerr << "Job data insert failed!" << std::endl;
retCode = 1;
goto exit;
}
} else if (boost::iequals(argv[argc-1], "stop")) {
std::cout << "DOMAINID = " << domainId << std::endl;
std::cout << "JOBID = " << jobId << std::endl;
......@@ -415,75 +513,25 @@ int main(int argc, char** argv) {
retCode = 1;
goto exit;
}
if (cassandra) {
DCDB::JobData jobStart;
if (myJobDataStore->getJobById(jobStart, jd.jobId, jd.domainId) != DCDB::JD_OK) {
std::cerr << "Could not retrieve job to be updated!" << std::endl;
retCode = 1;
goto exit;
}
if (myJobDataStore->updateEndtime(jobStart.jobId, jobStart.startTime, jd.endTime, jd.domainId) != DCDB::JD_OK) {
std::cerr << "Could not update end time of job!" << std::endl;
retCode = 1;
goto exit;
}
}
}
//Message sent to CollectAgent is independent of start/stop. We send the
//same JSON in either case. CA does job insert or update depending
//on job endtime value.
if (!cassandra) {
//create job data string in JSON format
std::string payload = "";
std::string topic = "/DCDB_JOBDATA/"; //do not change or keep in sync with simplemqttservermessage.h
boost::property_tree::ptree config;
std::ostringstream output;
config.clear();
config.push_back(boost::property_tree::ptree::value_type("domainid", boost::property_tree::ptree(jd.domainId)));
config.push_back(boost::property_tree::ptree::value_type("jobid", boost::property_tree::ptree(jd.jobId)));
config.push_back(boost::property_tree::ptree::value_type("userid", boost::property_tree::ptree(jd.userId)));
config.push_back(boost::property_tree::ptree::value_type("starttime", boost::property_tree::ptree(std::to_string(jd.startTime.getRaw()))));
config.push_back(boost::property_tree::ptree::value_type("endtime", boost::property_tree::ptree(std::to_string(jd.endTime.getRaw()))));
boost::property_tree::ptree nodes;
for (const auto &n : jd.nodes) {
nodes.push_back(boost::property_tree::ptree::value_type("", boost::property_tree::ptree(n)));
}
config.push_back(boost::property_tree::ptree::value_type("nodes", nodes));
boost::property_tree::write_json(output, config, true);
payload = output.str();
//std::cout << "Payload:\n" << payload << std::endl;
mosquitto_publish_callback_set(_mosq, publishCallback);
uint64_t startTs = getTimestamp();
int ret = MOSQ_ERR_UNKNOWN;
//send it to broker
if ((ret = mosquitto_publish(_mosq, &msgId, topic.c_str(), payload.length(), payload.c_str(), qos, false)) != MOSQ_ERR_SUCCESS) {
std::cerr << "Could not publish job data via MQTT: " << mosquitto_strerror(ret) << std::endl;
retCode = 1;
goto exit;
}
do {
if ((ret = mosquitto_loop(_mosq, -1, 1)) != MOSQ_ERR_SUCCESS) {
std::cerr << "Error in mosquitto_loop: " << mosquitto_strerror(ret) << std::endl;
retCode = 1;
goto exit;
retCode = insertJob(jd, start, cassandra);
if(isPackLeader) {
if(start) {
jd.nodes = pnl;
}
} while(!done && getTimestamp() - startTs < (uint64_t)S_TO_NS(timeout));
jd.jobId = packId;
retCode = insertJob(jd, start, cassandra);
}
//hasta la vista
exit:
if (cassandra) {
delete myJobDataStore;
dcdbConn->disconnect();
delete dcdbConn;
} else {
mosquitto_disconnect(_mosq);
mosquitto_destroy(_mosq);
mosquitto_disconnect(myMosq);
mosquitto_destroy(myMosq);
mosquitto_lib_cleanup();
}
return retCode;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment