dcdbslurmjob.cpp 15.3 KB
Newer Older
1
2
//================================================================================
// Name        : dcdbslurmjob.cpp
Micha Müller's avatar
Micha Müller committed
3
// Author      : Michael Ott, Micha Mueller
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Copyright   : Leibniz Supercomputing Centre
// Description : Main file of the dcdbslurmjob command line utility
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2011-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================

Micha Müller's avatar
Micha Müller committed
27
#include "../../common/include/globalconfiguration.h"
28
#include "timestamp.h"
Micha Müller's avatar
Micha Müller committed
29
#include <boost/algorithm/string.hpp>
30
#include <boost/regex.hpp>
Micha Müller's avatar
Micha Müller committed
31
32
33
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>
#include <cstdlib>
34
#include <dcdb/connection.h>
Alessio Netti's avatar
Alessio Netti committed
35
#include <dcdb/jobdatastore.h>
Micha Müller's avatar
Micha Müller committed
36
37
#include <iostream>
#include <mosquitto.h>
38
39
40
#include "dcdb/version.h"
#include "version.h"

41
42
43
44
45
46
47
48
49
int msgId = -1;
bool done = false;

void publishCallback(struct mosquitto *mosq, void *obj, int mid) {
	if(msgId != -1 && mid == msgId)
		done = true;
}


Alessio Netti's avatar
Alessio Netti committed
50
51
52
53
54
/*
 * Print usage information
 */
void usage() {
    std::cout << "Usage:" << std::endl;
Micha Müller's avatar
Micha Müller committed
55
    std::cout << "  dcdbslurmjob [-b<host>] [-t<timestamp>] [-n<nodelist>] [-j<jobid>] [-i<userid>] start|stop" << std::endl;
56
    std::cout << "  dcdbslurmjob [-c<host>] [-u<username>] [-p<password>] [-t<timestamp>] [-n<nodelist>] [-j<jobid>] [-i<userid>] [-s<pattern>] start|stop" << std::endl;
Alessio Netti's avatar
Alessio Netti committed
57
58
59
60
    std::cout << "  dcdbslurmjob -h" << std::endl;
    std::cout << std::endl;

    std::cout << "Options:" << std::endl;
61
    std::cout << "  -b<hosts>     List of MQTT brokers           [default: localhost:1883]" << std::endl;
Michael Ott's avatar
Michael Ott committed
62
    std::cout << "  -q<qos>       MQTT QoS to use                [default: 1]" << std::endl;
63
    std::cout << "  -o<timeout>   MQTT timeout in seconds        [default: 10]" << std::endl;
64
    std::cout << "  -c<hosts>     List of Cassandra hosts        [default: none]" << std::endl;
65
66
67
68
69
70
71
    std::cout << "  -u<username>  Cassandra username             [default: none]" << std::endl;
    std::cout << "  -p<password>  Cassandra password             [default: none]" << std::endl;
    std::cout << "  -t<timestamp> Timestamp value                [default: now]" << std::endl;
    std::cout << "  -n<nodelist>  Comma-separated nodelist       [default: SLURM_JOB_NODELIST]" << std::endl;
    std::cout << "  -j<jobid>     Numerical job id               [default: SLURM_JOB_ID var]" << std::endl;
    std::cout << "  -i<userid>    Numerical user id              [default: SLURM_JOB_USER var]" << std::endl;
    std::cout << "  -s<pattern>   Nodelist substitution pattern  [default: none]" << std::endl;
72
    std::cout << "  -m<pattern>   Maximum job length in h        [default: none]" << std::endl;
Alessio Netti's avatar
Alessio Netti committed
73
74
75
    std::cout << std::endl;
    std::cout << "  -h            This help page" << std::endl;
    std::cout << std::endl;
76
    std::cout << "Options -b and -c|u|p are mutual exclusive! If both are specified, the latter takes precedence. By default MQTT broker is specified." << std::endl;
Alessio Netti's avatar
Alessio Netti committed
77
}
78
79
80
81

std::string getEnv(const char* var) {
    char* str = std::getenv(var);
    if (str != NULL) {
Alessio Netti's avatar
Alessio Netti committed
82
	    return std::string(str);
83
    } else {
Alessio Netti's avatar
Alessio Netti committed
84
85
86
87
	    return std::string("");
    }
}

88
void splitNodeList(const std::string& str, DCDB::NodeList& nl)
Alessio Netti's avatar
Alessio Netti committed
89
90
{
    nl.clear();
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
    std::string s1 = str;
    boost::regex r1("([^,[]+)(\\[[0-9,-]+\\])?(,|$)", boost::regex::extended);
    boost::smatch m1;
    while (boost::regex_search(s1, m1, r1)) {
	std::string hostBase = m1[1].str();
	
	if (m1[2].str().size() == 0) {
	    nl.push_back(hostBase);
	} else {
	    std::string s2 = m1[2].str();
	    boost::regex r2("([0-9]+)-?([0-9]+)?(,|\\])", boost::regex::extended);
	    boost::smatch m2;
	    while (boost::regex_search(s2, m2, r2)) {
		if (m2[2] == "") {
		    nl.push_back(hostBase + m2[1].str());
		} else {
		    int start = atoi(m2[1].str().c_str());
		    int stop = atoi(m2[2].str().c_str());
		    for (int i=start; i<=stop; i++) {
			std::stringstream ss;
			ss << std::setw(m2[2].str().length()) << std::setfill('0') << i;
			nl.push_back(hostBase + ss.str());
		    }
		}
		s2 = m2.suffix().str();
	    }
	}
	s1 = m1.suffix().str();
119
120
121
    }
}

122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
void convertNodeList(DCDB::NodeList& nl, std::string substitution) {
    //check if input has sed format of "s/.../.../" for substitution
    boost::regex  checkSubstitute("s([^\\\\]{1})([\\S|\\s]*)\\1([\\S|\\s]*)\\1");
    boost::smatch matchResults;
    
    if (regex_match(substitution, matchResults, checkSubstitute)) {
	//input has substitute format
	boost::regex re = (boost::regex(matchResults[2].str(), boost::regex_constants::extended));
	std::string fmt = matchResults[3].str();
	for (auto &n: nl) {
	     n = boost::regex_replace(n, re, fmt);
	     //std::cout << n <<" => " << mqtt << std::endl;
	 }
    }
}

138
139
140
141
142
143
144
145
146
147
void splitHostList(const std::string& str, std::vector<std::string>& hl, char delim = ',')
{
    hl.clear();
    std::stringstream ss(str);
    std::string token;
    while (std::getline(ss, token, delim)) {
	hl.push_back(token);
    }
}

148
void pickRandomHost(std::vector<std::string>& hl, std::string& host, int& port, bool erase = false) {
149
150
151
152
    srand (time(NULL));
    int n = rand() % hl.size();
    host = parseNetworkHost(hl[n]);
    port = atoi(parseNetworkPort(hl[n]).c_str());
153
154
155
    if (erase) {
	hl.erase(hl.begin()+n);
    }
156
157
}

Micha Müller's avatar
Micha Müller committed
158
/**
159
160
161
 * Retrieves Slurm job data from environment variables and sends it to either a
 * CollectAgent or a Cassandra database. Job data can also be passed as command
 * line options.
Micha Müller's avatar
Micha Müller committed
162
 */
163
164
int main(int argc, char** argv) {
    std::cout << "dcdbslurmjob " << VERSION << std::endl << std::endl;
165
166
167
168
169
170

    bool                cassandra = false;
    DCDB::Connection *  dcdbConn = nullptr;
    DCDB::JobDataStore *myJobDataStore = nullptr;
    struct mosquitto *  _mosq = nullptr;

171
172
173
    std::vector<std::string> hostList;
    std::string host = "", cassandraUser = "", cassandraPassword = "";
    int         port;
Alessio Netti's avatar
Alessio Netti committed
174
    std::string nodelist="", jobId="", userId="";
175
    std::string substitution="";
176
    int maxJobLength = -1;
Michael Ott's avatar
Michael Ott committed
177
    int qos = 1;
178
    int timeout = 10;
Alessio Netti's avatar
Alessio Netti committed
179
    uint64_t ts=0;
180
    
Alessio Netti's avatar
Alessio Netti committed
181
    // Defining options
182
    const char *opts = "b:q:o:c:u:p:n:t:j:i:s:m:h";
Alessio Netti's avatar
Alessio Netti committed
183
184
185
186
187
188
189
190
191
192
193

    char ret;
    while ((ret = getopt(argc, argv, opts))!=-1) {
        switch (ret)
        {
            case 'h':
                usage();
                return 0;
            default:
                break;
        }
194
195
    }
    
Alessio Netti's avatar
Alessio Netti committed
196
197
198
199
200
201
    if (argc < 2) {
        std::cerr << "At least one argument is required: start or stop" << std::endl;
        return 1;
    } else if(!boost::iequals(argv[argc-1], "start") && !boost::iequals(argv[argc-1], "stop")) {
        std::cerr << "Unsupported action: must either be start or stop" << std::endl;
        return 1;
202
203
    }
    
Alessio Netti's avatar
Alessio Netti committed
204
205
206
    optind = 1;
    while ((ret=getopt(argc, argv, opts))!=-1) {
        switch(ret) {
Micha Müller's avatar
Micha Müller committed
207
		case 'b': {
208
			cassandra = false;
209
			splitHostList(optarg, hostList);
Micha Müller's avatar
Micha Müller committed
210
211
			break;
		}
212
213
214
		case 'q':
			qos = atoi(optarg);
			break;
215
216
217
		case 'o':
			timeout = atoi(optarg);
			break;
218
219
		case 'c':
			cassandra = true;
220
			splitHostList(optarg, hostList);
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
			break;
		case 'u':
			cassandra = true;
			cassandraUser = optarg;
			break;
		case 'p': {
			cassandra = true;
			cassandraPassword = optarg;
			// What does this do? Mask the password?
			size_t pwdLen = strlen(optarg);
			memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
			if (pwdLen > 3) {
				memset(optarg + 3, 0, pwdLen - 3);
			}
			break;
		}
Micha Müller's avatar
Micha Müller committed
237
	    case 'n':
Alessio Netti's avatar
Alessio Netti committed
238
239
240
241
242
243
244
245
246
247
248
                nodelist = optarg;
                break;
            case 't':
                ts = std::stoull(optarg);
                break;
            case 'j':
                jobId = optarg;
                break;
            case 'i':
                userId = optarg;
                break;
249
	    case 's':
250
251
252
		substitution = optarg;
		if (substitution == "SNG") {
			substitution = "s%([fi][0-9]{2})(r[0-9]{2})(c[0-9]{2})(s[0-9]{2})%/sng/\\1/\\2/\\3/\\4%";
253
			maxJobLength = 48;
254
		}
255
		break;
256
257
258
	    case 'm':
		maxJobLength = std::stoull(optarg);
		break;
Alessio Netti's avatar
Alessio Netti committed
259
260
261
262
263
264
            case 'h':
            default:
                usage();
                return 1;
        }
    }
265
266
267
268
    
    if (hostList.size() == 0) {
	hostList.push_back("localhost");
    }
Alessio Netti's avatar
Alessio Netti committed
269

270
271
    if (cassandra) {
	    //Allocate and initialize connection to Cassandra.
272
273
274
275
276
277
	    pickRandomHost(hostList, host, port);
	    if (port == 0) {
		port = 9042;
	    }
	
	    dcdbConn = new DCDB::Connection(host, port, cassandraUser, cassandraPassword);
278
	    if (!dcdbConn->connect()) {
279
		    std::cerr << "Cannot connect to Cassandra server " << host << ":" << port << std::endl;
280
281
		    return 1;
	    }
282
	    std::cout << "Connected to Cassandra server " << host << ":" << port << std::endl;
283
284
285
286
	    myJobDataStore = new DCDB::JobDataStore(dcdbConn);
    } else {
	    //Initialize Mosquitto library and connect to broker
	    char hostname[256];
Micha Müller's avatar
Micha Müller committed
287

288
289
290
291
292
293
294
295
296
297
298
299
	    if (gethostname(hostname, 255) != 0) {
		    std::cerr << "Cannot get hostname!" << std::endl;
		    return 1;
	    }
	    hostname[255] = '\0';
	    mosquitto_lib_init();
	    _mosq = mosquitto_new(hostname, false, NULL);
	    if (!_mosq) {
		    perror(NULL);
		    return 1;
	    }

300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
	    int ret = MOSQ_ERR_UNKNOWN;
	    do {
		    pickRandomHost(hostList, host, port, true);
		    if (port == 0) {
			    port = 1883;
		    }
		    
		    if ((ret = mosquitto_connect(_mosq, host.c_str(), port, 1000)) != MOSQ_ERR_SUCCESS) {
			    std::cerr << "Could not connect to MQTT broker " << host << ":" << port << " (" << mosquitto_strerror(ret) << ")" <<std::endl;
		    } else {
			    std::cout << "Connected to MQTT broker " << host << ":" << port << ", using QoS " << qos << std::endl;
			    break;
		    }
	    } while (hostList.size() > 0);
	    if (ret != MOSQ_ERR_SUCCESS) {
		    std::cout << "No more MQTT brokers left, aborting" << std::endl;
316
317
		    return 1;
	    }
Alessio Netti's avatar
Alessio Netti committed
318
    }
Micha Müller's avatar
Micha Müller committed
319
320

    //collect job data
Alessio Netti's avatar
Alessio Netti committed
321
    DCDB::JobData jd;
Micha Müller's avatar
Micha Müller committed
322
323
    int           retCode = 0;

Alessio Netti's avatar
Alessio Netti committed
324
325
    if(ts==0)
        ts = getTimestamp();
326
    if(jobId=="") {
Alessio Netti's avatar
Alessio Netti committed
327
        jobId = getEnv("SLURM_JOB_ID");
328
329
330
331
	if(jobId=="") {
	    jobId = getEnv("SLURM_JOBID");
	}
    }
Alessio Netti's avatar
Alessio Netti committed
332
333
334
335
336
    
    if (boost::iequals(argv[argc-1], "start")) {
        
        if(userId=="")
            userId = getEnv("SLURM_JOB_USER");
337
        if(nodelist=="") {
Alessio Netti's avatar
Alessio Netti committed
338
            nodelist = getEnv("SLURM_JOB_NODELIST");
339
340
341
342
343
	    if(nodelist=="") {
		nodelist = getEnv("SLURM_NODELIST");
	    }
	}

344
345
	DCDB::NodeList nl;
	splitNodeList(nodelist, nl);
346
	convertNodeList(nl, substitution);
347
348
349
350
351

	std::cout << "JOBID    = " << jobId << std::endl;
        std::cout << "USER     = " << userId << std::endl;
        std::cout << "START    = " << ts << std::endl;
        std::cout << "NODELIST = " << nodelist << std::endl;
352
	std::cout << "SUBST    = " << substitution << std::endl;
353
354
355
	if (maxJobLength >= 0) {
	    std::cout << "JOBLEN   = " << maxJobLength << std::endl;
	}
356
357
358
359
360
361
	std::cout << "NODES    =";
	for (auto &n: nl) {
	    std::cout << " " << n;
	}
	std::cout << std::endl;

362
363
	
	
Alessio Netti's avatar
Alessio Netti committed
364
        try {
365
366
            jd.jobId     = jobId;
            jd.userId    = userId;
Alessio Netti's avatar
Alessio Netti committed
367
            jd.startTime = DCDB::TimeStamp(ts);
Michael Ott's avatar
Michael Ott committed
368
            jd.endTime   = (maxJobLength >= 0) ? DCDB::TimeStamp((uint64_t) (ts + S_TO_NS((uint64_t)maxJobLength * 3600ull) + 1)) : DCDB::TimeStamp((uint64_t)0);
Alessio Netti's avatar
Alessio Netti committed
369
370
            jd.nodes     = nl;
        } catch(const std::invalid_argument& e) {
371
372
373
374
375
376
377
378
379
		std::cerr << "Invalid input format!" << std::endl;
		retCode = 1;
		goto exit;
	}

	if (cassandra && (myJobDataStore->insertJob(jd) != DCDB::JD_OK)) {
		std::cerr << "Job data insert failed!" << std::endl;
		retCode = 1;
		goto exit;
Micha Müller's avatar
Micha Müller committed
380
	}
Alessio Netti's avatar
Alessio Netti committed
381
382
383
384
385
386
    } else if (boost::iequals(argv[argc-1], "stop")) {
        
        std::cout << "JOBID = " << jobId << std::endl;
        std::cout << "STOP  = " << ts << std::endl;
        
        try {
Micha Müller's avatar
Micha Müller committed
387
388
		jd.jobId = jobId;
		jd.endTime = DCDB::TimeStamp(ts);
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
	} catch (const std::invalid_argument &e) {
		std::cerr << "Invalid input format!" << std::endl;
		retCode = 1;
		goto exit;
	}

	if (cassandra) {
		DCDB::JobData jobStart;
		if (myJobDataStore->getJobById(jobStart, jd.jobId) != DCDB::JD_OK) {
			std::cerr << "Could not retrieve job to be updated!" << std::endl;
			retCode = 1;
			goto exit;
		}
		if (myJobDataStore->updateEndtime(jobStart.jobId, jobStart.startTime, jd.endTime) != DCDB::JD_OK) {
			std::cerr << "Could not update end time of job!" << std::endl;
			retCode = 1;
			goto exit;
		}
Micha Müller's avatar
Micha Müller committed
407
408
409
	}
    }

410
411
412
413
    //Message sent to CollectAgent is independent of start/stop. We send the
    //same JSON in either case. CA does job insert or update depending
    //on job endtime value.
    if (!cassandra) {
Micha Müller's avatar
Micha Müller committed
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
	    //create job data string in JSON format
	    std::string                 payload = "";
	    std::string                 topic = "/DCDB_JOBDATA/"; //do not change or keep in sync with simplemqttservermessage.h
	    boost::property_tree::ptree config;
	    std::ostringstream          output;
	    config.clear();
	    config.push_back(boost::property_tree::ptree::value_type("jobid", boost::property_tree::ptree(jd.jobId)));
	    config.push_back(boost::property_tree::ptree::value_type("userid", boost::property_tree::ptree(jd.userId)));
	    config.push_back(boost::property_tree::ptree::value_type("starttime", boost::property_tree::ptree(std::to_string(jd.startTime.getRaw()))));
	    config.push_back(boost::property_tree::ptree::value_type("endtime", boost::property_tree::ptree(std::to_string(jd.endTime.getRaw()))));
	    boost::property_tree::ptree nodes;
	    for (const auto &n : jd.nodes) {
		    nodes.push_back(boost::property_tree::ptree::value_type("", boost::property_tree::ptree(n)));
	    }
	    config.push_back(boost::property_tree::ptree::value_type("nodes", nodes));
	    boost::property_tree::write_json(output, config, true);
	    payload = output.str();

	    //std::cout << "Payload:\n" << payload << std::endl;
433
434
435
	    
	    mosquitto_publish_callback_set(_mosq, publishCallback);
	    uint64_t startTs = getTimestamp();
436
	    int ret = MOSQ_ERR_UNKNOWN;
Micha Müller's avatar
Micha Müller committed
437
	    //send it to broker
438
439
	    if ((ret = mosquitto_publish(_mosq, &msgId, topic.c_str(), payload.length(), payload.c_str(), qos, false)) != MOSQ_ERR_SUCCESS) {
		    std::cerr << "Could not publish job data via MQTT: " << mosquitto_strerror(ret) << std::endl;
Micha Müller's avatar
Micha Müller committed
440
441
442
		    retCode = 1;
		    goto exit;
	    }
443
444
	    
	    do {
445
446
447
448
449
		    if ((ret = mosquitto_loop(_mosq, -1, 1)) != MOSQ_ERR_SUCCESS) {
			    std::cerr << "Error in mosquitto_loop: " << mosquitto_strerror(ret) << std::endl;
			    retCode = 1;
			    goto exit;
		    }
450
	    } while(!done && getTimestamp() - startTs < S_TO_NS(timeout));
Alessio Netti's avatar
Alessio Netti committed
451
452
    }

Micha Müller's avatar
Micha Müller committed
453
454
//hasta la vista
exit:
455
456
457
458
459
460
461
462
463
	if (cassandra) {
		delete myJobDataStore;
		dcdbConn->disconnect();
		delete dcdbConn;
	} else {
		mosquitto_disconnect(_mosq);
		mosquitto_destroy(_mosq);
		mosquitto_lib_cleanup();
	}
Micha Müller's avatar
Micha Müller committed
464
	return retCode;
465
}