//================================================================================ // Name : dcdbslurmjob.cpp // Author : Michael Ott, Micha Mueller // Copyright : Leibniz Supercomputing Centre // Description : Main file of the dcdbslurmjob command line utility //================================================================================ //================================================================================ // This file is part of DCDB (DataCenter DataBase) // Copyright (C) 2011-2019 Leibniz Supercomputing Centre // // This program is free software; you can redistribute it and/or // modify it under the terms of the GNU General Public License // as published by the Free Software Foundation; either version 2 // of the License, or (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with this program; if not, write to the Free Software // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. //================================================================================ #include "../../common/include/globalconfiguration.h" #include "timestamp.h" #include #include #include #include #include #include #include #include #include #include "dcdb/version.h" #include "version.h" #define SLURMJOBTIMEOUT 60000000000 int msgId = -1; bool done = false; void publishCallback(struct mosquitto *mosq, void *obj, int mid) { if(msgId != -1 && mid == msgId) done = true; } /* * Print usage information */ void usage() { std::cout << "Usage:" << std::endl; std::cout << " dcdbslurmjob [-b] [-t] [-n] [-j] [-i] start|stop" << std::endl; std::cout << " dcdbslurmjob [-c] [-u] [-p] [-t] [-n] [-j] [-i] [-s] start|stop" << std::endl; std::cout << " dcdbslurmjob -h" << std::endl; std::cout << std::endl; std::cout << "Options:" << std::endl; std::cout << " -b MQTT broker [default: 127.0.0.1:1883]" << std::endl; std::cout << " -c Cassandra host [default: 127.0.0.1:9042]" << std::endl; std::cout << " -u Cassandra username [default: none]" << std::endl; std::cout << " -p Cassandra password [default: none]" << std::endl; std::cout << " -t Timestamp value [default: now]" << std::endl; std::cout << " -n Comma-separated nodelist [default: SLURM_JOB_NODELIST]" << std::endl; std::cout << " -j Numerical job id [default: SLURM_JOB_ID var]" << std::endl; std::cout << " -i Numerical user id [default: SLURM_JOB_USER var]" << std::endl; std::cout << " -s Nodelist substitution pattern [default: none]" << std::endl; std::cout << std::endl; std::cout << " -h This help page" << std::endl; std::cout << std::endl; std::cout << "Options -b and -c|u|p are mutual exclusive! If both are specified, the latter takes precedence. By default MQTT broker is specified." << std::endl; } std::string getEnv(const char* var) { char* str = std::getenv(var); if (str != NULL) { return std::string(str); } else { return std::string(""); } } void splitNodeList(const std::string& str, DCDB::NodeList& nl) { nl.clear(); std::string s1 = str; boost::regex r1("([^,[]+)(\\[[0-9,-]+\\])?(,|$)", boost::regex::extended); boost::smatch m1; while (boost::regex_search(s1, m1, r1)) { std::string hostBase = m1[1].str(); if (m1[2].str().size() == 0) { nl.push_back(hostBase); } else { std::string s2 = m1[2].str(); boost::regex r2("([0-9]+)-?([0-9]+)?(,|\\])", boost::regex::extended); boost::smatch m2; while (boost::regex_search(s2, m2, r2)) { if (m2[2] == "") { nl.push_back(hostBase + m2[1].str()); } else { int start = atoi(m2[1].str().c_str()); int stop = atoi(m2[2].str().c_str()); for (int i=start; i<=stop; i++) { std::stringstream ss; ss << std::setw(m2[2].str().length()) << std::setfill('0') << i; nl.push_back(hostBase + ss.str()); } } s2 = m2.suffix().str(); } } s1 = m1.suffix().str(); } } void convertNodeList(DCDB::NodeList& nl, std::string substitution) { //check if input has sed format of "s/.../.../" for substitution boost::regex checkSubstitute("s([^\\\\]{1})([\\S|\\s]*)\\1([\\S|\\s]*)\\1"); boost::smatch matchResults; if (regex_match(substitution, matchResults, checkSubstitute)) { //input has substitute format boost::regex re = (boost::regex(matchResults[2].str(), boost::regex_constants::extended)); std::string fmt = matchResults[3].str(); for (auto &n: nl) { n = boost::regex_replace(n, re, fmt); //std::cout << n <<" => " << mqtt << std::endl; } } } /** * Retrieves Slurm job data from environment variables and sends it to either a * CollectAgent or a Cassandra database. Job data can also be passed as command * line options. */ int main(int argc, char** argv) { std::cout << "dcdbslurmjob " << VERSION << std::endl << std::endl; bool cassandra = false; DCDB::Connection * dcdbConn = nullptr; DCDB::JobDataStore *myJobDataStore = nullptr; struct mosquitto * _mosq = nullptr; std::string host = "127.0.0.1", cassandraPort = "9042", cassandraUser = "", cassandraPassword = ""; int brokerPort = 1883; std::string nodelist="", jobId="", userId=""; std::string substitution=""; uint64_t ts=0; // Defining options const char *opts = "b:c:u:p:n:t:j:i:s:h"; char ret; while ((ret = getopt(argc, argv, opts))!=-1) { switch (ret) { case 'h': usage(); return 0; default: break; } } if (argc < 2) { std::cerr << "At least one argument is required: start or stop" << std::endl; return 1; } else if(!boost::iequals(argv[argc-1], "start") && !boost::iequals(argv[argc-1], "stop")) { std::cerr << "Unsupported action: must either be start or stop" << std::endl; return 1; } optind = 1; while ((ret=getopt(argc, argv, opts))!=-1) { switch(ret) { case 'b': { cassandra = false; host = parseNetworkHost(optarg); std::string port = parseNetworkPort(optarg); if (port != "") { brokerPort = std::stoi(port); } else { brokerPort = 1883; } break; } case 'c': cassandra = true; host = parseNetworkHost(optarg); cassandraPort = parseNetworkPort(optarg); if (cassandraPort == "") cassandraPort = std::string("9042"); break; case 'u': cassandra = true; cassandraUser = optarg; break; case 'p': { cassandra = true; cassandraPassword = optarg; // What does this do? Mask the password? size_t pwdLen = strlen(optarg); memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen); if (pwdLen > 3) { memset(optarg + 3, 0, pwdLen - 3); } break; } case 'n': nodelist = optarg; break; case 't': ts = std::stoull(optarg); break; case 'j': jobId = optarg; break; case 'i': userId = optarg; break; case 's': substitution = optarg; if (substitution == "SNG") { substitution = "s%([fi][0-9]{2})(r[0-9]{2})(c[0-9]{2})(s[0-9]{2})%/sng/\\1/\\2/\\3/\\4%"; } break; case 'h': default: usage(); return 1; } } if (cassandra) { //Allocate and initialize connection to Cassandra. dcdbConn = new DCDB::Connection(host, atoi(cassandraPort.c_str()), cassandraUser, cassandraPassword); if (!dcdbConn->connect()) { std::cerr << "Cannot connect to Cassandra!" << std::endl; return 1; } myJobDataStore = new DCDB::JobDataStore(dcdbConn); } else { //Initialize Mosquitto library and connect to broker char hostname[256]; if (gethostname(hostname, 255) != 0) { std::cerr << "Cannot get hostname!" << std::endl; return 1; } hostname[255] = '\0'; mosquitto_lib_init(); _mosq = mosquitto_new(hostname, false, NULL); if (!_mosq) { perror(NULL); return 1; } if (mosquitto_connect(_mosq, host.c_str(), brokerPort, 1000) != MOSQ_ERR_SUCCESS) { std::cerr << "Could not connect to MQTT broker " << host << ":" << std::to_string(brokerPort) << std::endl; return 1; } } //collect job data DCDB::JobData jd; int retCode = 0; if(ts==0) ts = getTimestamp(); if(jobId=="") { jobId = getEnv("SLURM_JOB_ID"); if(jobId=="") { jobId = getEnv("SLURM_JOBID"); } } if (boost::iequals(argv[argc-1], "start")) { if(userId=="") userId = getEnv("SLURM_JOB_USER"); if(nodelist=="") { nodelist = getEnv("SLURM_JOB_NODELIST"); if(nodelist=="") { nodelist = getEnv("SLURM_NODELIST"); } } DCDB::NodeList nl; splitNodeList(nodelist, nl); convertNodeList(nl, substitution); std::cout << "JOBID = " << jobId << std::endl; std::cout << "USER = " << userId << std::endl; std::cout << "START = " << ts << std::endl; std::cout << "NODELIST = " << nodelist << std::endl; std::cout << "SUBST = " << substitution << std::endl; std::cout << "NODES ="; for (auto &n: nl) { std::cout << " " << n; } std::cout << std::endl; try { jd.jobId = jobId; jd.userId = userId; jd.startTime = DCDB::TimeStamp(ts); jd.endTime = DCDB::TimeStamp((uint64_t)0); jd.nodes = nl; } catch(const std::invalid_argument& e) { std::cerr << "Invalid input format!" << std::endl; retCode = 1; goto exit; } if (cassandra && (myJobDataStore->insertJob(jd) != DCDB::JD_OK)) { std::cerr << "Job data insert failed!" << std::endl; retCode = 1; goto exit; } } else if (boost::iequals(argv[argc-1], "stop")) { std::cout << "JOBID = " << jobId << std::endl; std::cout << "STOP = " << ts << std::endl; try { jd.jobId = jobId; jd.endTime = DCDB::TimeStamp(ts); } catch (const std::invalid_argument &e) { std::cerr << "Invalid input format!" << std::endl; retCode = 1; goto exit; } if (cassandra) { DCDB::JobData jobStart; if (myJobDataStore->getJobById(jobStart, jd.jobId) != DCDB::JD_OK) { std::cerr << "Could not retrieve job to be updated!" << std::endl; retCode = 1; goto exit; } if (myJobDataStore->updateEndtime(jobStart.jobId, jobStart.startTime, jd.endTime) != DCDB::JD_OK) { std::cerr << "Could not update end time of job!" << std::endl; retCode = 1; goto exit; } } } //Message sent to CollectAgent is independent of start/stop. We send the //same JSON in either case. CA does job insert or update depending //on job endtime value. if (!cassandra) { //create job data string in JSON format std::string payload = ""; std::string topic = "/DCDB_JOBDATA/"; //do not change or keep in sync with simplemqttservermessage.h boost::property_tree::ptree config; std::ostringstream output; config.clear(); config.push_back(boost::property_tree::ptree::value_type("jobid", boost::property_tree::ptree(jd.jobId))); config.push_back(boost::property_tree::ptree::value_type("userid", boost::property_tree::ptree(jd.userId))); config.push_back(boost::property_tree::ptree::value_type("starttime", boost::property_tree::ptree(std::to_string(jd.startTime.getRaw())))); config.push_back(boost::property_tree::ptree::value_type("endtime", boost::property_tree::ptree(std::to_string(jd.endTime.getRaw())))); boost::property_tree::ptree nodes; for (const auto &n : jd.nodes) { nodes.push_back(boost::property_tree::ptree::value_type("", boost::property_tree::ptree(n))); } config.push_back(boost::property_tree::ptree::value_type("nodes", nodes)); boost::property_tree::write_json(output, config, true); payload = output.str(); //std::cout << "Payload:\n" << payload << std::endl; mosquitto_publish_callback_set(_mosq, publishCallback); uint64_t startTs = getTimestamp(); //send it to broker if (mosquitto_publish(_mosq, &msgId, topic.c_str(), payload.length(), payload.c_str(), 1, false) != MOSQ_ERR_SUCCESS) { std::cerr << "Broker not reachable! Job data was not published." << std::endl; retCode = 1; goto exit; } do { if (mosquitto_loop(_mosq, -1, 1) != MOSQ_ERR_SUCCESS) { std::cerr << "Error in mosquitto_loop!" << std::endl; retCode = 1; goto exit; } } while(!done && getTimestamp() - startTs < SLURMJOBTIMEOUT); } //hasta la vista exit: if (cassandra) { delete myJobDataStore; dcdbConn->disconnect(); delete dcdbConn; } else { mosquitto_disconnect(_mosq); mosquitto_destroy(_mosq); mosquitto_lib_cleanup(); } return retCode; }