//================================================================================ // Name : dcdbslurmjob.cpp // Author : Michael Ott, Micha Mueller // Copyright : Leibniz Supercomputing Centre // Description : Main file of the dcdbslurmjob command line utility //================================================================================ //================================================================================ // This file is part of DCDB (DataCenter DataBase) // Copyright (C) 2011-2019 Leibniz Supercomputing Centre // // This program is free software; you can redistribute it and/or // modify it under the terms of the GNU General Public License // as published by the Free Software Foundation; either version 2 // of the License, or (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with this program; if not, write to the Free Software // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. //================================================================================ #include "../../common/include/globalconfiguration.h" #include "timestamp.h" #include #include #include #include #include #include #include #include #include #include "dcdb/version.h" #include "version.h" #define SLURMJOBTIMEOUT 60000000000 int msgId = -1; bool done = false; void publishCallback(struct mosquitto *mosq, void *obj, int mid) { if(msgId != -1 && mid == msgId) done = true; } /* * Print usage information */ void usage() { std::cout << "Usage:" << std::endl; std::cout << " dcdbslurmjob [-b] [-t] [-n] [-j] [-i] start|stop" << std::endl; std::cout << " dcdbslurmjob [-c] [-u] [-p] [-t] [-n] [-j] [-i] [-s] start|stop" << std::endl; std::cout << " dcdbslurmjob -h" << std::endl; std::cout << std::endl; std::cout << "Options:" << std::endl; std::cout << " -b List of MQTT brokers [default: localhost:1883]" << std::endl; std::cout << " -q MQTT QoS to use [default: 0]" << std::endl; std::cout << " -c List of Cassandra hosts [default: none]" << std::endl; std::cout << " -u Cassandra username [default: none]" << std::endl; std::cout << " -p Cassandra password [default: none]" << std::endl; std::cout << " -t Timestamp value [default: now]" << std::endl; std::cout << " -n Comma-separated nodelist [default: SLURM_JOB_NODELIST]" << std::endl; std::cout << " -j Numerical job id [default: SLURM_JOB_ID var]" << std::endl; std::cout << " -i Numerical user id [default: SLURM_JOB_USER var]" << std::endl; std::cout << " -s Nodelist substitution pattern [default: none]" << std::endl; std::cout << " -m Maximum job length in h [default: none]" << std::endl; std::cout << std::endl; std::cout << " -h This help page" << std::endl; std::cout << std::endl; std::cout << "Options -b and -c|u|p are mutual exclusive! If both are specified, the latter takes precedence. By default MQTT broker is specified." << std::endl; } std::string getEnv(const char* var) { char* str = std::getenv(var); if (str != NULL) { return std::string(str); } else { return std::string(""); } } void splitNodeList(const std::string& str, DCDB::NodeList& nl) { nl.clear(); std::string s1 = str; boost::regex r1("([^,[]+)(\\[[0-9,-]+\\])?(,|$)", boost::regex::extended); boost::smatch m1; while (boost::regex_search(s1, m1, r1)) { std::string hostBase = m1[1].str(); if (m1[2].str().size() == 0) { nl.push_back(hostBase); } else { std::string s2 = m1[2].str(); boost::regex r2("([0-9]+)-?([0-9]+)?(,|\\])", boost::regex::extended); boost::smatch m2; while (boost::regex_search(s2, m2, r2)) { if (m2[2] == "") { nl.push_back(hostBase + m2[1].str()); } else { int start = atoi(m2[1].str().c_str()); int stop = atoi(m2[2].str().c_str()); for (int i=start; i<=stop; i++) { std::stringstream ss; ss << std::setw(m2[2].str().length()) << std::setfill('0') << i; nl.push_back(hostBase + ss.str()); } } s2 = m2.suffix().str(); } } s1 = m1.suffix().str(); } } void convertNodeList(DCDB::NodeList& nl, std::string substitution) { //check if input has sed format of "s/.../.../" for substitution boost::regex checkSubstitute("s([^\\\\]{1})([\\S|\\s]*)\\1([\\S|\\s]*)\\1"); boost::smatch matchResults; if (regex_match(substitution, matchResults, checkSubstitute)) { //input has substitute format boost::regex re = (boost::regex(matchResults[2].str(), boost::regex_constants::extended)); std::string fmt = matchResults[3].str(); for (auto &n: nl) { n = boost::regex_replace(n, re, fmt); //std::cout << n <<" => " << mqtt << std::endl; } } } void splitHostList(const std::string& str, std::vector& hl, char delim = ',') { hl.clear(); std::stringstream ss(str); std::string token; while (std::getline(ss, token, delim)) { hl.push_back(token); } } void pickRandomHost(const std::vector& hl, std::string& host, int& port) { srand (time(NULL)); int n = rand() % hl.size(); host = parseNetworkHost(hl[n]); port = atoi(parseNetworkPort(hl[n]).c_str()); } /** * Retrieves Slurm job data from environment variables and sends it to either a * CollectAgent or a Cassandra database. Job data can also be passed as command * line options. */ int main(int argc, char** argv) { std::cout << "dcdbslurmjob " << VERSION << std::endl << std::endl; bool cassandra = false; DCDB::Connection * dcdbConn = nullptr; DCDB::JobDataStore *myJobDataStore = nullptr; struct mosquitto * _mosq = nullptr; std::vector hostList; std::string host = "", cassandraUser = "", cassandraPassword = ""; int port; std::string nodelist="", jobId="", userId=""; std::string substitution=""; int maxJobLength = -1; int qos = 0; uint64_t ts=0; // Defining options const char *opts = "b:q:c:u:p:n:t:j:i:s:m:h"; char ret; while ((ret = getopt(argc, argv, opts))!=-1) { switch (ret) { case 'h': usage(); return 0; default: break; } } if (argc < 2) { std::cerr << "At least one argument is required: start or stop" << std::endl; return 1; } else if(!boost::iequals(argv[argc-1], "start") && !boost::iequals(argv[argc-1], "stop")) { std::cerr << "Unsupported action: must either be start or stop" << std::endl; return 1; } optind = 1; while ((ret=getopt(argc, argv, opts))!=-1) { switch(ret) { case 'b': { cassandra = false; splitHostList(optarg, hostList); break; } case 'q': qos = atoi(optarg); break; case 'c': cassandra = true; splitHostList(optarg, hostList); break; case 'u': cassandra = true; cassandraUser = optarg; break; case 'p': { cassandra = true; cassandraPassword = optarg; // What does this do? Mask the password? size_t pwdLen = strlen(optarg); memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen); if (pwdLen > 3) { memset(optarg + 3, 0, pwdLen - 3); } break; } case 'n': nodelist = optarg; break; case 't': ts = std::stoull(optarg); break; case 'j': jobId = optarg; break; case 'i': userId = optarg; break; case 's': substitution = optarg; if (substitution == "SNG") { substitution = "s%([fi][0-9]{2})(r[0-9]{2})(c[0-9]{2})(s[0-9]{2})%/sng/\\1/\\2/\\3/\\4%"; maxJobLength = 48; } break; case 'm': maxJobLength = std::stoull(optarg); break; case 'h': default: usage(); return 1; } } if (hostList.size() == 0) { hostList.push_back("localhost"); } if (cassandra) { //Allocate and initialize connection to Cassandra. pickRandomHost(hostList, host, port); if (port == 0) { port = 9042; } dcdbConn = new DCDB::Connection(host, port, cassandraUser, cassandraPassword); if (!dcdbConn->connect()) { std::cerr << "Cannot connect to Cassandra server " << host << ":" << port << std::endl; return 1; } std::cout << "Connected to Cassandra server " << host << ":" << port << std::endl; myJobDataStore = new DCDB::JobDataStore(dcdbConn); } else { //Initialize Mosquitto library and connect to broker char hostname[256]; if (gethostname(hostname, 255) != 0) { std::cerr << "Cannot get hostname!" << std::endl; return 1; } hostname[255] = '\0'; mosquitto_lib_init(); _mosq = mosquitto_new(hostname, false, NULL); if (!_mosq) { perror(NULL); return 1; } pickRandomHost(hostList, host, port); if (port == 0) { port = 1883; } if (mosquitto_connect(_mosq, host.c_str(), port, 1000) != MOSQ_ERR_SUCCESS) { std::cerr << "Could not connect to MQTT broker " << host << ":" << port << std::endl; return 1; } std::cout << "Connected to MQTT broker " << host << ":" << port << ", using QoS " << qos << std::endl; } //collect job data DCDB::JobData jd; int retCode = 0; if(ts==0) ts = getTimestamp(); if(jobId=="") { jobId = getEnv("SLURM_JOB_ID"); if(jobId=="") { jobId = getEnv("SLURM_JOBID"); } } if (boost::iequals(argv[argc-1], "start")) { if(userId=="") userId = getEnv("SLURM_JOB_USER"); if(nodelist=="") { nodelist = getEnv("SLURM_JOB_NODELIST"); if(nodelist=="") { nodelist = getEnv("SLURM_NODELIST"); } } DCDB::NodeList nl; splitNodeList(nodelist, nl); convertNodeList(nl, substitution); std::cout << "JOBID = " << jobId << std::endl; std::cout << "USER = " << userId << std::endl; std::cout << "START = " << ts << std::endl; std::cout << "NODELIST = " << nodelist << std::endl; std::cout << "SUBST = " << substitution << std::endl; if (maxJobLength >= 0) { std::cout << "JOBLEN = " << maxJobLength << std::endl; } std::cout << "NODES ="; for (auto &n: nl) { std::cout << " " << n; } std::cout << std::endl; try { jd.jobId = jobId; jd.userId = userId; jd.startTime = DCDB::TimeStamp(ts); jd.endTime = (maxJobLength >= 0) ? DCDB::TimeStamp((uint64_t) (ts + S_TO_NS((uint64_t)maxJobLength * 3600ull) + 1)) : DCDB::TimeStamp((uint64_t)0); jd.nodes = nl; } catch(const std::invalid_argument& e) { std::cerr << "Invalid input format!" << std::endl; retCode = 1; goto exit; } if (cassandra && (myJobDataStore->insertJob(jd) != DCDB::JD_OK)) { std::cerr << "Job data insert failed!" << std::endl; retCode = 1; goto exit; } } else if (boost::iequals(argv[argc-1], "stop")) { std::cout << "JOBID = " << jobId << std::endl; std::cout << "STOP = " << ts << std::endl; try { jd.jobId = jobId; jd.endTime = DCDB::TimeStamp(ts); } catch (const std::invalid_argument &e) { std::cerr << "Invalid input format!" << std::endl; retCode = 1; goto exit; } if (cassandra) { DCDB::JobData jobStart; if (myJobDataStore->getJobById(jobStart, jd.jobId) != DCDB::JD_OK) { std::cerr << "Could not retrieve job to be updated!" << std::endl; retCode = 1; goto exit; } if (myJobDataStore->updateEndtime(jobStart.jobId, jobStart.startTime, jd.endTime) != DCDB::JD_OK) { std::cerr << "Could not update end time of job!" << std::endl; retCode = 1; goto exit; } } } //Message sent to CollectAgent is independent of start/stop. We send the //same JSON in either case. CA does job insert or update depending //on job endtime value. if (!cassandra) { //create job data string in JSON format std::string payload = ""; std::string topic = "/DCDB_JOBDATA/"; //do not change or keep in sync with simplemqttservermessage.h boost::property_tree::ptree config; std::ostringstream output; config.clear(); config.push_back(boost::property_tree::ptree::value_type("jobid", boost::property_tree::ptree(jd.jobId))); config.push_back(boost::property_tree::ptree::value_type("userid", boost::property_tree::ptree(jd.userId))); config.push_back(boost::property_tree::ptree::value_type("starttime", boost::property_tree::ptree(std::to_string(jd.startTime.getRaw())))); config.push_back(boost::property_tree::ptree::value_type("endtime", boost::property_tree::ptree(std::to_string(jd.endTime.getRaw())))); boost::property_tree::ptree nodes; for (const auto &n : jd.nodes) { nodes.push_back(boost::property_tree::ptree::value_type("", boost::property_tree::ptree(n))); } config.push_back(boost::property_tree::ptree::value_type("nodes", nodes)); boost::property_tree::write_json(output, config, true); payload = output.str(); //std::cout << "Payload:\n" << payload << std::endl; mosquitto_publish_callback_set(_mosq, publishCallback); uint64_t startTs = getTimestamp(); //send it to broker if (mosquitto_publish(_mosq, &msgId, topic.c_str(), payload.length(), payload.c_str(), qos, false) != MOSQ_ERR_SUCCESS) { std::cerr << "Broker not reachable! Job data was not published." << std::endl; retCode = 1; goto exit; } do { if (mosquitto_loop(_mosq, -1, 1) != MOSQ_ERR_SUCCESS) { std::cerr << "Error in mosquitto_loop!" << std::endl; retCode = 1; goto exit; } } while(!done && getTimestamp() - startTs < SLURMJOBTIMEOUT); } //hasta la vista exit: if (cassandra) { delete myJobDataStore; dcdbConn->disconnect(); delete dcdbConn; } else { mosquitto_disconnect(_mosq); mosquitto_destroy(_mosq); mosquitto_lib_cleanup(); } return retCode; }