Commit 8933e3f9 authored by Alessio Netti's avatar Alessio Netti
Browse files

dcdbslurmjob: support for job steps and other fixes

- If the tool is launched within a job step, the job id will be updated
accordingly (e.g., 12345.1)
- If the SLURM_JOB_USER variable is not defined, we use USER
- Added node substitution for the DEEP-EST prototype
parent 3455eb84
......@@ -38,6 +38,8 @@
#include "dcdb/version.h"
#include "version.h"
#define SLURM_JOBSTEP_SEP "."
int msgId = -1;
bool done = false;
......@@ -92,30 +94,30 @@ void splitNodeList(const std::string& str, DCDB::NodeList& nl)
boost::regex r1("([^,[]+)(\\[[0-9,-]+\\])?(,|$)", boost::regex::extended);
boost::smatch m1;
while (boost::regex_search(s1, m1, r1)) {
std::string hostBase = m1[1].str();
if (m1[2].str().size() == 0) {
nl.push_back(hostBase);
} else {
std::string s2 = m1[2].str();
boost::regex r2("([0-9]+)-?([0-9]+)?(,|\\])", boost::regex::extended);
boost::smatch m2;
while (boost::regex_search(s2, m2, r2)) {
if (m2[2] == "") {
nl.push_back(hostBase + m2[1].str());
std::string hostBase = m1[1].str();
if (m1[2].str().size() == 0) {
nl.push_back(hostBase);
} else {
int start = atoi(m2[1].str().c_str());
int stop = atoi(m2[2].str().c_str());
for (int i=start; i<=stop; i++) {
std::stringstream ss;
ss << std::setw(m2[2].str().length()) << std::setfill('0') << i;
nl.push_back(hostBase + ss.str());
}
std::string s2 = m1[2].str();
boost::regex r2("([0-9]+)-?([0-9]+)?(,|\\])", boost::regex::extended);
boost::smatch m2;
while (boost::regex_search(s2, m2, r2)) {
if (m2[2] == "") {
nl.push_back(hostBase + m2[1].str());
} else {
int start = atoi(m2[1].str().c_str());
int stop = atoi(m2[2].str().c_str());
for (int i=start; i<=stop; i++) {
std::stringstream ss;
ss << std::setw(m2[2].str().length()) << std::setfill('0') << i;
nl.push_back(hostBase + ss.str());
}
}
s2 = m2.suffix().str();
}
}
s2 = m2.suffix().str();
}
}
s1 = m1.suffix().str();
s1 = m1.suffix().str();
}
}
......@@ -125,13 +127,13 @@ void convertNodeList(DCDB::NodeList& nl, std::string substitution) {
boost::smatch matchResults;
if (regex_match(substitution, matchResults, checkSubstitute)) {
//input has substitute format
boost::regex re = (boost::regex(matchResults[2].str(), boost::regex_constants::extended));
std::string fmt = matchResults[3].str();
for (auto &n: nl) {
n = boost::regex_replace(n, re, fmt);
//std::cout << n <<" => " << mqtt << std::endl;
}
//input has substitute format
boost::regex re = (boost::regex(matchResults[2].str(), boost::regex_constants::extended));
std::string fmt = matchResults[3].str();
for (auto &n: nl) {
n = boost::regex_replace(n, re, fmt);
//std::cout << n <<" => " << mqtt << std::endl;
}
}
}
......@@ -141,7 +143,7 @@ void splitHostList(const std::string& str, std::vector<std::string>& hl, char de
std::stringstream ss(str);
std::string token;
while (std::getline(ss, token, delim)) {
hl.push_back(token);
hl.push_back(token);
}
}
......@@ -151,7 +153,7 @@ void pickRandomHost(std::vector<std::string>& hl, std::string& host, int& port,
host = parseNetworkHost(hl[n]);
port = atoi(parseNetworkPort(hl[n]).c_str());
if (erase) {
hl.erase(hl.begin()+n);
hl.erase(hl.begin()+n);
}
}
......@@ -163,15 +165,15 @@ void pickRandomHost(std::vector<std::string>& hl, std::string& host, int& port,
int main(int argc, char** argv) {
std::cout << "dcdbslurmjob " << VERSION << std::endl << std::endl;
bool cassandra = false;
bool cassandra = false;
DCDB::Connection * dcdbConn = nullptr;
DCDB::JobDataStore *myJobDataStore = nullptr;
struct mosquitto * _mosq = nullptr;
std::vector<std::string> hostList;
std::string host = "", cassandraUser = "", cassandraPassword = "";
int port;
std::string nodelist="", jobId="", userId="";
int port;
std::string nodelist="", jobId="", userId="", stepId="";
std::string substitution="";
int maxJobLength = -1;
int qos = 1;
......@@ -235,43 +237,46 @@ int main(int argc, char** argv) {
break;
}
case 'n':
nodelist = optarg;
break;
case 't':
ts = std::stoull(optarg);
break;
case 'j':
jobId = optarg;
break;
case 'i':
userId = optarg;
break;
nodelist = optarg;
break;
case 't':
ts = std::stoull(optarg);
break;
case 'j':
jobId = optarg;
break;
case 'i':
userId = optarg;
break;
case 's':
substitution = optarg;
if (substitution == "SNG") {
substitution = "s%([fi][0-9]{2})(r[0-9]{2})(c[0-9]{2})(s[0-9]{2})%/sng/\\1/\\2/\\3/\\4%";
maxJobLength = 48;
}
break;
substitution = optarg;
if (substitution == "SNG") {
substitution = "s%([fi][0-9]{2})(r[0-9]{2})(c[0-9]{2})(s[0-9]{2})%/sng/\\1/\\2/\\3/\\4%";
maxJobLength = 48;
} else if (substitution == "DEEPEST") {
substitution = "s%dp-(cn|dam|esb)([0-9]{2})%/deepest/\\1/s\\2%";
maxJobLength = 20;
}
break;
case 'm':
maxJobLength = std::stoull(optarg);
break;
case 'h':
default:
usage();
return 1;
maxJobLength = std::stoull(optarg);
break;
case 'h':
default:
usage();
return 1;
}
}
if (hostList.size() == 0) {
hostList.push_back("localhost");
hostList.push_back("localhost");
}
if (cassandra) {
//Allocate and initialize connection to Cassandra.
pickRandomHost(hostList, host, port);
if (port == 0) {
port = 9042;
port = 9042;
}
dcdbConn = new DCDB::Connection(host, port, cassandraUser, cassandraPassword);
......@@ -311,6 +316,7 @@ int main(int argc, char** argv) {
break;
}
} while (hostList.size() > 0);
if (ret != MOSQ_ERR_SUCCESS) {
std::cout << "No more MQTT brokers left, aborting" << std::endl;
return 1;
......@@ -319,48 +325,58 @@ int main(int argc, char** argv) {
//collect job data
DCDB::JobData jd;
int retCode = 0;
int retCode = 0;
if(ts==0)
ts = getTimestamp();
if(jobId=="") {
jobId = getEnv("SLURM_JOB_ID");
if(jobId=="") {
jobId = getEnv("SLURM_JOBID");
jobId = getEnv("SLURM_JOB_ID");
if (jobId == "") {
jobId = getEnv("SLURM_JOBID");
}
stepId = getEnv("SLURM_STEP_ID");
if (stepId=="") {
stepId = getEnv("SLURM_STEPID");
}
if (stepId!="" && jobId!="")
jobId = jobId + SLURM_JOBSTEP_SEP + stepId;
}
}
if (boost::iequals(argv[argc-1], "start")) {
if(userId=="") {
userId = getEnv("SLURM_JOB_USER");
if(userId=="") {
userId = getEnv("USER");
}
}
if(userId=="")
userId = getEnv("SLURM_JOB_USER");
if(nodelist=="") {
if(nodelist=="") {
nodelist = getEnv("SLURM_JOB_NODELIST");
if(nodelist=="") {
nodelist = getEnv("SLURM_NODELIST");
}
}
if(nodelist=="") {
nodelist = getEnv("SLURM_NODELIST");
}
}
DCDB::NodeList nl;
splitNodeList(nodelist, nl);
convertNodeList(nl, substitution);
DCDB::NodeList nl;
splitNodeList(nodelist, nl);
convertNodeList(nl, substitution);
std::cout << "JOBID = " << jobId << std::endl;
std::cout << "JOBID = " << jobId << std::endl;
std::cout << "USER = " << userId << std::endl;
std::cout << "START = " << ts << std::endl;
std::cout << "NODELIST = " << nodelist << std::endl;
std::cout << "SUBST = " << substitution << std::endl;
if (maxJobLength >= 0) {
std::cout << "JOBLEN = " << maxJobLength << std::endl;
}
std::cout << "NODES =";
for (auto &n: nl) {
std::cout << " " << n;
}
std::cout << std::endl;
std::cout << "SUBST = " << substitution << std::endl;
if (maxJobLength >= 0) {
std::cout << "JOBLEN = " << maxJobLength << std::endl;
}
std::cout << "NODES =";
for (auto &n: nl) {
std::cout << " " << n;
}
std::cout << std::endl;
try {
jd.jobId = jobId;
jd.userId = userId;
......@@ -368,43 +384,42 @@ int main(int argc, char** argv) {
jd.endTime = (maxJobLength >= 0) ? DCDB::TimeStamp((uint64_t) (ts + S_TO_NS((uint64_t)maxJobLength * 3600ull) + 1)) : DCDB::TimeStamp((uint64_t)0);
jd.nodes = nl;
} catch(const std::invalid_argument& e) {
std::cerr << "Invalid input format!" << std::endl;
retCode = 1;
goto exit;
}
std::cerr << "Invalid input format!" << std::endl;
retCode = 1;
goto exit;
}
if (cassandra && (myJobDataStore->insertJob(jd) != DCDB::JD_OK)) {
std::cerr << "Job data insert failed!" << std::endl;
retCode = 1;
goto exit;
}
if (cassandra && (myJobDataStore->insertJob(jd) != DCDB::JD_OK)) {
std::cerr << "Job data insert failed!" << std::endl;
retCode = 1;
goto exit;
}
} else if (boost::iequals(argv[argc-1], "stop")) {
std::cout << "JOBID = " << jobId << std::endl;
std::cout << "STOP = " << ts << std::endl;
try {
jd.jobId = jobId;
jd.endTime = DCDB::TimeStamp(ts);
} catch (const std::invalid_argument &e) {
std::cerr << "Invalid input format!" << std::endl;
retCode = 1;
goto exit;
}
if (cassandra) {
DCDB::JobData jobStart;
if (myJobDataStore->getJobById(jobStart, jd.jobId) != DCDB::JD_OK) {
std::cerr << "Could not retrieve job to be updated!" << std::endl;
jd.jobId = jobId;
jd.endTime = DCDB::TimeStamp(ts);
} catch (const std::invalid_argument &e) {
std::cerr << "Invalid input format!" << std::endl;
retCode = 1;
goto exit;
}
if (myJobDataStore->updateEndtime(jobStart.jobId, jobStart.startTime, jd.endTime) != DCDB::JD_OK) {
std::cerr << "Could not update end time of job!" << std::endl;
retCode = 1;
goto exit;
if (cassandra) {
DCDB::JobData jobStart;
if (myJobDataStore->getJobById(jobStart, jd.jobId) != DCDB::JD_OK) {
std::cerr << "Could not retrieve job to be updated!" << std::endl;
retCode = 1;
goto exit;
}
if (myJobDataStore->updateEndtime(jobStart.jobId, jobStart.startTime, jd.endTime) != DCDB::JD_OK) {
std::cerr << "Could not update end time of job!" << std::endl;
retCode = 1;
goto exit;
}
}
}
}
//Message sent to CollectAgent is independent of start/stop. We send the
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment