Commit 7d197c01 authored by Micha Müller's avatar Micha Müller
Browse files

Rework tool dcdbslurmjob

-sends data as JSON to CollectAgent instead of storing in Cassandra directly
-adapt CA to support iob data
parent 13351bfc
...@@ -50,6 +50,9 @@ ...@@ -50,6 +50,9 @@
#include <string> #include <string>
#include <boost/date_time/posix_time/posix_time.hpp> #include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/foreach.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>
#include <dcdb/connection.h> #include <dcdb/connection.h>
#include <dcdb/sensordatastore.h> #include <dcdb/sensordatastore.h>
...@@ -363,34 +366,102 @@ int mqttCallback(SimpleMQTTMessage *msg) ...@@ -363,34 +366,102 @@ int mqttCallback(SimpleMQTTMessage *msg)
} else { } else {
LOG(error) << "Topic could not be converted to SID"; LOG(error) << "Topic could not be converted to SID";
} }
} else { } else if (strncmp(topic, DCDB_JOBDATA, DCDB_JOBDATA_LEN) == 0) {
mqttPayload buf, *payload; /**
* This message contains Slurm job data in JSON format. We need to
len = msg->getPayloadLength(); * parse the payload and store it within the JobDataStore.
//In the 64 bit message case, the collect agent provides a timestamp */
if (len == sizeof(uint64_t)) { if ((len = msg->getPayloadLength()) == 0) {
payload = &buf; LOG(error) << "Empty job data message received!";
payload->value = *((int64_t *) msg->getPayload()); return 1;
payload->timestamp = Messaging::calculateTimestamp(); }
len = sizeof(uint64_t) * 2;
} //parse JSON into JobData object
//...otherwise it just retrieves it from the MQTT message payload. string payload((char *)msg->getPayload(), len);
else if ((len % sizeof(mqttPayload) == 0) && (len > 0)) { DCDB::JobData jd;
payload = (mqttPayload *) msg->getPayload(); try {
} boost::property_tree::iptree config;
//...otherwise this message is malformed -> ignore... std::istringstream str(payload);
else { boost::property_tree::read_json(str, config);
LOG(error) << "Message malformed"; BOOST_FOREACH (boost::property_tree::iptree::value_type &val, config) {
return 1; if (boost::iequals(val.first, "jobid")) {
} jd.jobId = val.second.data();
} else if (boost::iequals(val.first, "userid")) {
jd.userId = val.second.data();
} else if (boost::iequals(val.first, "starttime")) {
jd.startTime = DCDB::TimeStamp((uint64_t)stoull(val.second.data()));
} else if (boost::iequals(val.first, "endtime")) {
jd.endTime = DCDB::TimeStamp((uint64_t)stoull(val.second.data()));
} else if (boost::iequals(val.first, "nodes")) {
BOOST_FOREACH (boost::property_tree::iptree::value_type &node, val.second) {
jd.nodes.push_back(node.second.data());
}
}
}
} catch (const std::exception &e) {
LOG(error) << "Invalid job data packet received!";
return 1;
}
#ifdef DEBUG
LOG(debug) << "JobId = " << jd.jobId;
LOG(debug) << "UserId = " << jd.userId;
LOG(debug) << "Start = " << jd.startTime.getString();
LOG(debug) << "End = " << jd.endTime.getString();
LOG(debug) << "Nodes: ";
for (const auto &n : jd.nodes) {
LOG(debug) << " " << n;
}
#endif
/* //store JobData into Storage Backend
if (jd.endTime == DCDB::TimeStamp((uint64_t)0)) {
//starting job data
if (myJobDataStore->insertJob(jd) != DCDB::JD_OK) {
LOG(error) << "Job data insert failed!";
return 1;
}
} else {
//ending job data
DCDB::JobData tmp;
if (myJobDataStore->getJobById(tmp, jd.jobId) != DCDB::JD_OK) {
LOG(error) << "Could not retrieve job to be updated!";
return 1;
}
if (myJobDataStore->updateEndtime(tmp.jobId, tmp.startTime, jd.endTime) != DCDB::JD_OK) {
LOG(error) << "Could not update end time of job!";
return 1;
}
}
} else {
mqttPayload buf, *payload;
len = msg->getPayloadLength();
//In the 64 bit message case, the collect agent provides a timestamp
if (len == sizeof(uint64_t)) {
payload = &buf;
payload->value = *((int64_t *)msg->getPayload());
payload->timestamp = Messaging::calculateTimestamp();
len = sizeof(uint64_t) * 2;
}
//...otherwise it just retrieves it from the MQTT message payload.
else if ((len % sizeof(mqttPayload) == 0) && (len > 0)) {
payload = (mqttPayload *)msg->getPayload();
}
//...otherwise this message is malformed -> ignore...
else {
LOG(error) << "Message malformed";
return 1;
}
/*
* Check if we can decode the message topic * Check if we can decode the message topic
* into a valid SensorId. If successful, store * into a valid SensorId. If successful, store
* the record in the database. * the record in the database.
*/ */
DCDB::SensorId sid; DCDB::SensorId sid;
if (sid.mqttTopicConvert(msg->getTopic())) { if (sid.mqttTopicConvert(msg->getTopic())) {
#if 0 #if 0
cout << "Topic decode successful:" << endl cout << "Topic decode successful:" << endl
<< " Raw: " << hex << setw(16) << setfill('0') << sid.getRaw()[0] << hex << setw(16) << setfill('0') << sid.getRaw()[1] << endl << " Raw: " << hex << setw(16) << setfill('0') << sid.getRaw()[0] << hex << setw(16) << setfill('0') << sid.getRaw()[1] << endl
......
...@@ -57,6 +57,8 @@ ...@@ -57,6 +57,8 @@
#define DCDB_MET_LEN 19 #define DCDB_MET_LEN 19
#define DCDB_CALIEVT "/DCDB_CE/" #define DCDB_CALIEVT "/DCDB_CE/"
#define DCDB_CALIEVT_LEN 9 #define DCDB_CALIEVT_LEN 9
#define DCDB_JOBDATA "/DCDB_JOBDATA/"
#define DCDB_JOBDATA_LEN 14
#pragma pack(push,1) #pragma pack(push,1)
......
...@@ -2,7 +2,7 @@ include ../../config.mk ...@@ -2,7 +2,7 @@ include ../../config.mk
CXXFLAGS += -I../../common/include/ -I../../lib/include -I$(DCDBDEPLOYPATH)/include CXXFLAGS += -I../../common/include/ -I../../lib/include -I$(DCDBDEPLOYPATH)/include
OBJS = dcdbslurmjob.o OBJS = dcdbslurmjob.o
LIBS = -L../../lib -L$(DCDBDEPLOYPATH)/lib -ldcdb -lcassandra -luv -lboost_random -lboost_system -lboost_date_time -lboost_regex -lssl -lcrypto LIBS = -L../../lib -L$(DCDBDEPLOYPATH)/lib -ldcdb -lcassandra -lmosquitto -luv -lboost_random -lboost_system -lboost_date_time -lboost_regex -lssl -lcrypto
TARGET = dcdbslurmjob TARGET = dcdbslurmjob
.PHONY : clean install .PHONY : clean install
......
//================================================================================ //================================================================================
// Name : dcdbslurmjob.cpp // Name : dcdbslurmjob.cpp
// Author : Michael Ott // Author : Michael Ott, Micha Mueller
// Copyright : Leibniz Supercomputing Centre // Copyright : Leibniz Supercomputing Centre
// Description : Main file of the dcdbslurmjob command line utility // Description : Main file of the dcdbslurmjob command line utility
//================================================================================ //================================================================================
...@@ -24,34 +24,27 @@ ...@@ -24,34 +24,27 @@
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//================================================================================ //================================================================================
#include <cstdlib> #include "../../common/include/globalconfiguration.h"
#include <iostream>
#include <boost/algorithm/string.hpp>
#include "timestamp.h" #include "timestamp.h"
#include <dcdb/connection.h> #include <boost/algorithm/string.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>
#include <cstdlib>
#include <dcdb/jobdatastore.h> #include <dcdb/jobdatastore.h>
#include "../../common/include/globalconfiguration.h" #include <iostream>
#include <mosquitto.h>
DCDB::Connection* dcdbConn;
DCDB::JobDataStore *myJobDataStore;
/* /*
* Print usage information * Print usage information
*/ */
void usage() { void usage() {
/*
1 2 3 4 5 6 7 8
012345678901234567890123456789012345678901234567890123456789012345678901234567890
*/
std::cout << "Usage:" << std::endl; std::cout << "Usage:" << std::endl;
std::cout << " dcdbslurmjob [-c<host>] [-u<username>] [-p<password>] [-t<timestamp>] [-n<nodelist>] [-j<jobid>] [-i<userid>] start|stop" << std::endl; std::cout << " dcdbslurmjob [-b<host>] [-t<timestamp>] [-n<nodelist>] [-j<jobid>] [-i<userid>] start|stop" << std::endl;
std::cout << " dcdbslurmjob -h" << std::endl; std::cout << " dcdbslurmjob -h" << std::endl;
std::cout << std::endl; std::cout << std::endl;
std::cout << "Options:" << std::endl; std::cout << "Options:" << std::endl;
std::cout << " -c<host> Cassandra host [default: 127.0.0.1:9042]" << std::endl; std::cout << " -b<host> MQTT broker [default: 127.0.0.1:1883]" << std::endl;
std::cout << " -u<username> Cassandra username [default: none]" << std::endl;
std::cout << " -p<password> Cassandra password [default: none]" << std::endl;
std::cout << " -t<timestamp> Timestamp value [default: now]" << std::endl; std::cout << " -t<timestamp> Timestamp value [default: now]" << std::endl;
std::cout << " -n<nodelist> Comma-separated nodelist [default: SLURM_JOB_NODELIST]" << std::endl; std::cout << " -n<nodelist> Comma-separated nodelist [default: SLURM_JOB_NODELIST]" << std::endl;
std::cout << " -j<jobid> Numerical job id [default: SLURM_JOB_ID var]" << std::endl; std::cout << " -j<jobid> Numerical job id [default: SLURM_JOB_ID var]" << std::endl;
...@@ -80,14 +73,20 @@ void splitNodeList(const std::string& str, DCDB::NodeList& nl, char delim = ',') ...@@ -80,14 +73,20 @@ void splitNodeList(const std::string& str, DCDB::NodeList& nl, char delim = ',')
} }
} }
/**
* Retrieves Slurm job data from environment variables and pushes them to the
* specified CollectAgent. Alternatively, job data can be passed as command line
* options.
*/
int main(int argc, char** argv) { int main(int argc, char** argv) {
std::cout << "dcdbslurmjob " << VERSION << std::endl << std::endl; std::cout << "dcdbslurmjob " << VERSION << std::endl << std::endl;
std::string cassandraHost="127.0.0.1", cassandraPort="9042", cassandraUser="", cassandraPassword=""; std::string brokerHost = "127.0.0.1";
int brokerPort = 1883;
std::string nodelist="", jobId="", userId=""; std::string nodelist="", jobId="", userId="";
uint64_t ts=0; uint64_t ts=0;
// Defining options // Defining options
const char* opts = "c:u:p:n:t:j:i:h"; const char *opts = "b:n:t:j:i:h";
char ret; char ret;
while ((ret = getopt(argc, argv, opts))!=-1) { while ((ret = getopt(argc, argv, opts))!=-1) {
...@@ -112,25 +111,17 @@ int main(int argc, char** argv) { ...@@ -112,25 +111,17 @@ int main(int argc, char** argv) {
optind = 1; optind = 1;
while ((ret=getopt(argc, argv, opts))!=-1) { while ((ret=getopt(argc, argv, opts))!=-1) {
switch(ret) { switch(ret) {
case 'c': case 'b': {
cassandraHost = parseNetworkHost(optarg); brokerHost = parseNetworkHost(optarg);
cassandraPort = parseNetworkPort(optarg); std::string port = parseNetworkPort(optarg);
if(cassandraPort=="") cassandraPort = std::string("9042"); if (port != "") {
break; brokerPort = std::stoi(port);
case 'u': } else {
cassandraUser = optarg; brokerPort = 1883;
break; }
case 'p': { break;
cassandraPassword = optarg; }
// What does this do? Mask the password? case 'n':
size_t pwdLen = strlen(optarg);
memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
if (pwdLen > 3) {
memset(optarg+3, 0, pwdLen-3);
}
break;
}
case 'n':
nodelist = optarg; nodelist = optarg;
break; break;
case 't': case 't':
...@@ -149,16 +140,31 @@ int main(int argc, char** argv) { ...@@ -149,16 +140,31 @@ int main(int argc, char** argv) {
} }
} }
//Allocate and initialize connection to Cassandra. //Initialize Mosquitto library and connect to broker
dcdbConn = new DCDB::Connection(cassandraHost, atoi(cassandraPort.c_str()), cassandraUser, cassandraPassword); struct mosquitto *_mosq;
char hostname[256];
if (!dcdbConn->connect()) {
std::cerr << "Cannot connect to Cassandra!"; if (gethostname(hostname, 255) != 0) {
return 1; std::cerr << "Cannot get hostname!";
return 1;
}
hostname[255] = '\0';
mosquitto_lib_init();
_mosq = mosquitto_new(hostname, false, NULL);
if (!_mosq) {
perror(NULL);
return 1;
}
if (mosquitto_connect(_mosq, brokerHost.c_str(), brokerPort, 1000) != MOSQ_ERR_SUCCESS) {
std::cerr << "Could not connect to MQTT broker " << brokerHost << ":" << std::to_string(brokerPort);
return 1;
} }
myJobDataStore = new DCDB::JobDataStore(dcdbConn);
//collect job data
DCDB::JobData jd; DCDB::JobData jd;
int retCode = 0;
if(ts==0) if(ts==0)
ts = getTimestamp(); ts = getTimestamp();
if(jobId=="") if(jobId=="")
...@@ -187,37 +193,63 @@ int main(int argc, char** argv) { ...@@ -187,37 +193,63 @@ int main(int argc, char** argv) {
jd.nodes = nl; jd.nodes = nl;
} catch(const std::invalid_argument& e) { } catch(const std::invalid_argument& e) {
std::cerr << "Invalid input format!"; std::cerr << "Invalid input format!";
return 1; retCode = 1;
} goto exit;
if(myJobDataStore->insertJob(jd) != DCDB::JD_OK) { }
std::cerr << "Job data insert failed!";
return 1;
}
} else if (boost::iequals(argv[argc-1], "stop")) { } else if (boost::iequals(argv[argc-1], "stop")) {
std::cout << "JOBID = " << jobId << std::endl; std::cout << "JOBID = " << jobId << std::endl;
std::cout << "STOP = " << ts << std::endl; std::cout << "STOP = " << ts << std::endl;
try { try {
if (myJobDataStore->getJobById(jd, jobId) != DCDB::JD_OK) { jd.jobId = jobId;
std::cerr << "Could not retrieve job to be updated!"; jd.endTime = DCDB::TimeStamp(ts);
return 1; } catch(const std::invalid_argument& e) {
}
} catch(const std::invalid_argument& e) {
std::cerr << "Invalid input format!"; std::cerr << "Invalid input format!";
return 1; retCode = 1;
} goto exit;
}
if(myJobDataStore->updateEndtime(jd.jobId, jd.startTime, DCDB::TimeStamp(ts)) != DCDB::JD_OK) { }
std::cerr << "Could not update end time of job!";
return 1; {
} //create job data string in JSON format
std::string payload = "";
std::string topic = "/DCDB_JOBDATA/"; //do not change or keep in sync with simplemqttservermessage.h
boost::property_tree::ptree config;
std::ostringstream output;
config.clear();
config.push_back(boost::property_tree::ptree::value_type("jobid", boost::property_tree::ptree(jd.jobId)));
config.push_back(boost::property_tree::ptree::value_type("userid", boost::property_tree::ptree(jd.userId)));
config.push_back(boost::property_tree::ptree::value_type("starttime", boost::property_tree::ptree(std::to_string(jd.startTime.getRaw()))));
config.push_back(boost::property_tree::ptree::value_type("endtime", boost::property_tree::ptree(std::to_string(jd.endTime.getRaw()))));
boost::property_tree::ptree nodes;
for (const auto &n : jd.nodes) {
nodes.push_back(boost::property_tree::ptree::value_type("", boost::property_tree::ptree(n)));
}
config.push_back(boost::property_tree::ptree::value_type("nodes", nodes));
boost::property_tree::write_json(output, config, true);
payload = output.str();
//std::cout << "Payload:\n" << payload << std::endl;
//send it to broker
if (mosquitto_publish(_mosq, NULL, topic.c_str(), payload.length(), payload.c_str(), 1, false) != MOSQ_ERR_SUCCESS) {
std::cerr << "Broker not reachable! Job data was not published.";
retCode = 1;
goto exit;
}
if (mosquitto_loop(_mosq, -1, 1) != MOSQ_ERR_SUCCESS) {
std::cerr << "Error in mosquitto_loop!";
retCode = 1;
goto exit;
}
} }
delete myJobDataStore; //hasta la vista
dcdbConn->disconnect(); exit:
delete dcdbConn; mosquitto_disconnect(_mosq);
return 0; mosquitto_destroy(_mosq);
mosquitto_lib_cleanup();
return retCode;
} }
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment