dcdbslurmjob.cpp 11.5 KB
Newer Older
1
2
//================================================================================
// Name        : dcdbslurmjob.cpp
Micha Müller's avatar
Micha Müller committed
3
// Author      : Michael Ott, Micha Mueller
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Copyright   : Leibniz Supercomputing Centre
// Description : Main file of the dcdbslurmjob command line utility
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2011-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================

Micha Müller's avatar
Micha Müller committed
27
#include "../../common/include/globalconfiguration.h"
28
#include "timestamp.h"
Micha Müller's avatar
Micha Müller committed
29
30
31
32
#include <boost/algorithm/string.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>
#include <cstdlib>
33
#include <dcdb/connection.h>
Alessio Netti's avatar
Alessio Netti committed
34
#include <dcdb/jobdatastore.h>
Micha Müller's avatar
Micha Müller committed
35
36
#include <iostream>
#include <mosquitto.h>
Alessio Netti's avatar
Alessio Netti committed
37

38
39
40
41
42
43
44
45
46
47
48
49
#define SLURMJOBTIMEOUT 60000000000


int msgId = -1;
bool done = false;

void publishCallback(struct mosquitto *mosq, void *obj, int mid) {
	if(msgId != -1 && mid == msgId)
		done = true;
}


Alessio Netti's avatar
Alessio Netti committed
50
51
52
53
54
/*
 * Print usage information
 */
void usage() {
    std::cout << "Usage:" << std::endl;
Micha Müller's avatar
Micha Müller committed
55
    std::cout << "  dcdbslurmjob [-b<host>] [-t<timestamp>] [-n<nodelist>] [-j<jobid>] [-i<userid>] start|stop" << std::endl;
56
    std::cout << "  dcdbslurmjob [-c<host>] [-u<username>] [-p<password>] [-t<timestamp>] [-n<nodelist>] [-j<jobid>] [-i<userid>] start|stop" << std::endl;
Alessio Netti's avatar
Alessio Netti committed
57
58
59
60
    std::cout << "  dcdbslurmjob -h" << std::endl;
    std::cout << std::endl;

    std::cout << "Options:" << std::endl;
Micha Müller's avatar
Micha Müller committed
61
    std::cout << "  -b<host>      MQTT broker              [default: 127.0.0.1:1883]" << std::endl;
62
63
64
    std::cout << "  -c<host>      Cassandra host           [default: 127.0.0.1:9042]" << std::endl;
    std::cout << "  -u<username>  Cassandra username       [default: none]" << std::endl;
    std::cout << "  -p<password>  Cassandra password       [default: none]" << std::endl;
Alessio Netti's avatar
Alessio Netti committed
65
66
67
68
69
70
71
    std::cout << "  -t<timestamp> Timestamp value          [default: now]" << std::endl;
    std::cout << "  -n<nodelist>  Comma-separated nodelist [default: SLURM_JOB_NODELIST]" << std::endl;
    std::cout << "  -j<jobid>     Numerical job id         [default: SLURM_JOB_ID var]" << std::endl;
    std::cout << "  -i<userid>    Numerical user id        [default: SLURM_JOB_USER var]" << std::endl;
    std::cout << std::endl;
    std::cout << "  -h            This help page" << std::endl;
    std::cout << std::endl;
72
    std::cout << "Options -b and -c|u|p are mutual exclusive! If both are specified, the latter takes precedence. By default MQTT broker is specified." << std::endl;
Alessio Netti's avatar
Alessio Netti committed
73
}
74
75
76
77

std::string getEnv(const char* var) {
    char* str = std::getenv(var);
    if (str != NULL) {
Alessio Netti's avatar
Alessio Netti committed
78
	    return std::string(str);
79
    } else {
Alessio Netti's avatar
Alessio Netti committed
80
81
82
83
84
85
86
87
88
89
90
	    return std::string("");
    }
}

void splitNodeList(const std::string& str, DCDB::NodeList& nl, char delim = ',')
{
    nl.clear();
    std::stringstream ss(str);
    std::string token;
    while (std::getline(ss, token, delim)) {
        nl.push_back(token);
91
92
93
    }
}

Micha Müller's avatar
Micha Müller committed
94
/**
95
96
97
 * Retrieves Slurm job data from environment variables and sends it to either a
 * CollectAgent or a Cassandra database. Job data can also be passed as command
 * line options.
Micha Müller's avatar
Micha Müller committed
98
 */
99
100
int main(int argc, char** argv) {
    std::cout << "dcdbslurmjob " << VERSION << std::endl << std::endl;
101
102
103
104
105
106
107

    bool                cassandra = false;
    DCDB::Connection *  dcdbConn = nullptr;
    DCDB::JobDataStore *myJobDataStore = nullptr;
    struct mosquitto *  _mosq = nullptr;

    std::string host = "127.0.0.1", cassandraPort = "9042", cassandraUser = "", cassandraPassword = "";
Micha Müller's avatar
Micha Müller committed
108
    int         brokerPort = 1883;
Alessio Netti's avatar
Alessio Netti committed
109
110
    std::string nodelist="", jobId="", userId="";
    uint64_t ts=0;
111
    
Alessio Netti's avatar
Alessio Netti committed
112
    // Defining options
113
    const char *opts = "b:c:u:p:n:t:j:i:h";
Alessio Netti's avatar
Alessio Netti committed
114
115
116
117
118
119
120
121
122
123
124

    char ret;
    while ((ret = getopt(argc, argv, opts))!=-1) {
        switch (ret)
        {
            case 'h':
                usage();
                return 0;
            default:
                break;
        }
125
126
    }
    
Alessio Netti's avatar
Alessio Netti committed
127
128
129
130
131
132
    if (argc < 2) {
        std::cerr << "At least one argument is required: start or stop" << std::endl;
        return 1;
    } else if(!boost::iequals(argv[argc-1], "start") && !boost::iequals(argv[argc-1], "stop")) {
        std::cerr << "Unsupported action: must either be start or stop" << std::endl;
        return 1;
133
134
    }
    
Alessio Netti's avatar
Alessio Netti committed
135
136
137
    optind = 1;
    while ((ret=getopt(argc, argv, opts))!=-1) {
        switch(ret) {
Micha Müller's avatar
Micha Müller committed
138
		case 'b': {
139
140
			cassandra = false;
			host = parseNetworkHost(optarg);
Micha Müller's avatar
Micha Müller committed
141
142
143
144
145
146
147
148
			std::string port = parseNetworkPort(optarg);
			if (port != "") {
				brokerPort = std::stoi(port);
			} else {
				brokerPort = 1883;
			}
			break;
		}
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
		case 'c':
			cassandra = true;
			host = parseNetworkHost(optarg);
			cassandraPort = parseNetworkPort(optarg);
			if (cassandraPort == "")
				cassandraPort = std::string("9042");
			break;
		case 'u':
			cassandra = true;
			cassandraUser = optarg;
			break;
		case 'p': {
			cassandra = true;
			cassandraPassword = optarg;
			// What does this do? Mask the password?
			size_t pwdLen = strlen(optarg);
			memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
			if (pwdLen > 3) {
				memset(optarg + 3, 0, pwdLen - 3);
			}
			break;
		}
Micha Müller's avatar
Micha Müller committed
171
	    case 'n':
Alessio Netti's avatar
Alessio Netti committed
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
                nodelist = optarg;
                break;
            case 't':
                ts = std::stoull(optarg);
                break;
            case 'j':
                jobId = optarg;
                break;
            case 'i':
                userId = optarg;
                break;
            case 'h':
            default:
                usage();
                return 1;
        }
    }

190
191
192
    if (cassandra) {
	    //Allocate and initialize connection to Cassandra.
	    dcdbConn = new DCDB::Connection(host, atoi(cassandraPort.c_str()), cassandraUser, cassandraPassword);
Micha Müller's avatar
Micha Müller committed
193

194
195
196
197
198
199
200
201
	    if (!dcdbConn->connect()) {
		    std::cerr << "Cannot connect to Cassandra!" << std::endl;
		    return 1;
	    }
	    myJobDataStore = new DCDB::JobDataStore(dcdbConn);
    } else {
	    //Initialize Mosquitto library and connect to broker
	    char hostname[256];
Micha Müller's avatar
Micha Müller committed
202

203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
	    if (gethostname(hostname, 255) != 0) {
		    std::cerr << "Cannot get hostname!" << std::endl;
		    return 1;
	    }
	    hostname[255] = '\0';
	    mosquitto_lib_init();
	    _mosq = mosquitto_new(hostname, false, NULL);
	    if (!_mosq) {
		    perror(NULL);
		    return 1;
	    }

	    if (mosquitto_connect(_mosq, host.c_str(), brokerPort, 1000) != MOSQ_ERR_SUCCESS) {
		    std::cerr << "Could not connect to MQTT broker " << host << ":" << std::to_string(brokerPort) << std::endl;
		    return 1;
	    }
Alessio Netti's avatar
Alessio Netti committed
219
    }
Micha Müller's avatar
Micha Müller committed
220
221

    //collect job data
Alessio Netti's avatar
Alessio Netti committed
222
    DCDB::JobData jd;
Micha Müller's avatar
Micha Müller committed
223
224
    int           retCode = 0;

Alessio Netti's avatar
Alessio Netti committed
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
    if(ts==0)
        ts = getTimestamp();
    if(jobId=="")
        jobId = getEnv("SLURM_JOB_ID");
    
    if (boost::iequals(argv[argc-1], "start")) {
        
        if(userId=="")
            userId = getEnv("SLURM_JOB_USER");
        if(nodelist=="")
            nodelist = getEnv("SLURM_JOB_NODELIST");
        
        std::cout << "JOBID = " << jobId << std::endl;
        std::cout << "USER  = " << userId << std::endl;
        std::cout << "START = " << ts << std::endl;
        std::cout << "NODES = " << nodelist << std::endl;
        
        DCDB::NodeList nl;
        splitNodeList(nodelist, nl, ',');
        
        try {
246
247
            jd.jobId     = jobId;
            jd.userId    = userId;
Alessio Netti's avatar
Alessio Netti committed
248
249
250
251
            jd.startTime = DCDB::TimeStamp(ts);
            jd.endTime   = DCDB::TimeStamp((uint64_t)0);
            jd.nodes     = nl;
        } catch(const std::invalid_argument& e) {
252
253
254
255
256
257
258
259
260
		std::cerr << "Invalid input format!" << std::endl;
		retCode = 1;
		goto exit;
	}

	if (cassandra && (myJobDataStore->insertJob(jd) != DCDB::JD_OK)) {
		std::cerr << "Job data insert failed!" << std::endl;
		retCode = 1;
		goto exit;
Micha Müller's avatar
Micha Müller committed
261
	}
Alessio Netti's avatar
Alessio Netti committed
262
263
264
265
266
267
    } else if (boost::iequals(argv[argc-1], "stop")) {
        
        std::cout << "JOBID = " << jobId << std::endl;
        std::cout << "STOP  = " << ts << std::endl;
        
        try {
Micha Müller's avatar
Micha Müller committed
268
269
		jd.jobId = jobId;
		jd.endTime = DCDB::TimeStamp(ts);
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
	} catch (const std::invalid_argument &e) {
		std::cerr << "Invalid input format!" << std::endl;
		retCode = 1;
		goto exit;
	}

	if (cassandra) {
		DCDB::JobData jobStart;
		if (myJobDataStore->getJobById(jobStart, jd.jobId) != DCDB::JD_OK) {
			std::cerr << "Could not retrieve job to be updated!" << std::endl;
			retCode = 1;
			goto exit;
		}
		if (myJobDataStore->updateEndtime(jobStart.jobId, jobStart.startTime, jd.endTime) != DCDB::JD_OK) {
			std::cerr << "Could not update end time of job!" << std::endl;
			retCode = 1;
			goto exit;
		}
Micha Müller's avatar
Micha Müller committed
288
289
290
	}
    }

291
292
293
294
    //Message sent to CollectAgent is independent of start/stop. We send the
    //same JSON in either case. CA does job insert or update depending
    //on job endtime value.
    if (!cassandra) {
Micha Müller's avatar
Micha Müller committed
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
	    //create job data string in JSON format
	    std::string                 payload = "";
	    std::string                 topic = "/DCDB_JOBDATA/"; //do not change or keep in sync with simplemqttservermessage.h
	    boost::property_tree::ptree config;
	    std::ostringstream          output;
	    config.clear();
	    config.push_back(boost::property_tree::ptree::value_type("jobid", boost::property_tree::ptree(jd.jobId)));
	    config.push_back(boost::property_tree::ptree::value_type("userid", boost::property_tree::ptree(jd.userId)));
	    config.push_back(boost::property_tree::ptree::value_type("starttime", boost::property_tree::ptree(std::to_string(jd.startTime.getRaw()))));
	    config.push_back(boost::property_tree::ptree::value_type("endtime", boost::property_tree::ptree(std::to_string(jd.endTime.getRaw()))));
	    boost::property_tree::ptree nodes;
	    for (const auto &n : jd.nodes) {
		    nodes.push_back(boost::property_tree::ptree::value_type("", boost::property_tree::ptree(n)));
	    }
	    config.push_back(boost::property_tree::ptree::value_type("nodes", nodes));
	    boost::property_tree::write_json(output, config, true);
	    payload = output.str();

	    //std::cout << "Payload:\n" << payload << std::endl;
314
315
316
	    
	    mosquitto_publish_callback_set(_mosq, publishCallback);
	    uint64_t startTs = getTimestamp();
Micha Müller's avatar
Micha Müller committed
317
	    //send it to broker
318
	    if (mosquitto_publish(_mosq, &msgId, topic.c_str(), payload.length(), payload.c_str(), 1, false) != MOSQ_ERR_SUCCESS) {
319
		    std::cerr << "Broker not reachable! Job data was not published." << std::endl;
Micha Müller's avatar
Micha Müller committed
320
321
322
		    retCode = 1;
		    goto exit;
	    }
323
324
325
326
327
328
329
330
	    
	    do {
			if (mosquitto_loop(_mosq, -1, 1) != MOSQ_ERR_SUCCESS) {
				std::cerr << "Error in mosquitto_loop!" << std::endl;
				retCode = 1;
				goto exit;
			}
		} while(!done && getTimestamp() - startTs < SLURMJOBTIMEOUT);
Alessio Netti's avatar
Alessio Netti committed
331
332
    }

Micha Müller's avatar
Micha Müller committed
333
334
//hasta la vista
exit:
335
336
337
338
339
340
341
342
343
	if (cassandra) {
		delete myJobDataStore;
		dcdbConn->disconnect();
		delete dcdbConn;
	} else {
		mosquitto_disconnect(_mosq);
		mosquitto_destroy(_mosq);
		mosquitto_lib_cleanup();
	}
Micha Müller's avatar
Micha Müller committed
344
	return retCode;
345
}