Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

dcdbslurmjob.cpp 16.1 KB
Newer Older
1
2
//================================================================================
// Name        : dcdbslurmjob.cpp
Micha Müller's avatar
Micha Müller committed
3
// Author      : Michael Ott, Micha Mueller
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Copyright   : Leibniz Supercomputing Centre
// Description : Main file of the dcdbslurmjob command line utility
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2011-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================

Micha Müller's avatar
Micha Müller committed
27
#include "../../common/include/globalconfiguration.h"
28
#include "timestamp.h"
Micha Müller's avatar
Micha Müller committed
29
#include <boost/algorithm/string.hpp>
30
#include <boost/regex.hpp>
Micha Müller's avatar
Micha Müller committed
31
32
33
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>
#include <cstdlib>
34
#include <dcdb/connection.h>
Alessio Netti's avatar
Alessio Netti committed
35
#include <dcdb/jobdatastore.h>
Micha Müller's avatar
Micha Müller committed
36
37
#include <iostream>
#include <mosquitto.h>
38
39
40
#include "dcdb/version.h"
#include "version.h"

41
42
#define SLURM_JOBSTEP_SEP "."

43
44
45
46
47
48
49
50
51
int msgId = -1;
bool done = false;

void publishCallback(struct mosquitto *mosq, void *obj, int mid) {
	if(msgId != -1 && mid == msgId)
		done = true;
}


Alessio Netti's avatar
Alessio Netti committed
52
53
54
55
56
/*
 * Print usage information
 */
void usage() {
    std::cout << "Usage:" << std::endl;
57
    std::cout << "  dcdbslurmjob [-b<host>] [-t<timestamp>] [-n<nodelist>] [-d<domainid>] [-j<jobid>] [-i<userid>] start|stop" << std::endl;
58
    std::cout << "  dcdbslurmjob [-c<host>] [-u<username>] [-p<password>] [-t<timestamp>] [-n<nodelist>] [-j<jobid>] [-i<userid>] [-s<pattern>] start|stop" << std::endl;
Alessio Netti's avatar
Alessio Netti committed
59
60
61
62
    std::cout << "  dcdbslurmjob -h" << std::endl;
    std::cout << std::endl;

    std::cout << "Options:" << std::endl;
63
    std::cout << "  -b<hosts>     List of MQTT brokers           [default: localhost:1883]" << std::endl;
Michael Ott's avatar
Michael Ott committed
64
    std::cout << "  -q<qos>       MQTT QoS to use                [default: 1]" << std::endl;
65
    std::cout << "  -o<timeout>   MQTT timeout in seconds        [default: 10]" << std::endl;
66
    std::cout << "  -c<hosts>     List of Cassandra hosts        [default: none]" << std::endl;
67
68
69
70
    std::cout << "  -u<username>  Cassandra username             [default: none]" << std::endl;
    std::cout << "  -p<password>  Cassandra password             [default: none]" << std::endl;
    std::cout << "  -t<timestamp> Timestamp value                [default: now]" << std::endl;
    std::cout << "  -n<nodelist>  Comma-separated nodelist       [default: SLURM_JOB_NODELIST]" << std::endl;
71
72
    std::cout << "  -d<domainid>  Job domain id                  [default: default]" << std::endl;
    std::cout << "  -j<jobid>     String job id                  [default: SLURM_JOB_ID var]" << std::endl;
73
74
    std::cout << "  -i<userid>    Numerical user id              [default: SLURM_JOB_USER var]" << std::endl;
    std::cout << "  -s<pattern>   Nodelist substitution pattern  [default: none]" << std::endl;
75
    std::cout << "  -m<pattern>   Maximum job length in h        [default: none]" << std::endl;
Alessio Netti's avatar
Alessio Netti committed
76
77
78
    std::cout << std::endl;
    std::cout << "  -h            This help page" << std::endl;
    std::cout << std::endl;
79
    std::cout << "Options -b and -c|u|p are mutual exclusive! If both are specified, the latter takes precedence. By default MQTT broker is specified." << std::endl;
Alessio Netti's avatar
Alessio Netti committed
80
}
81
82
83
84

std::string getEnv(const char* var) {
    char* str = std::getenv(var);
    if (str != NULL) {
Alessio Netti's avatar
Alessio Netti committed
85
	    return std::string(str);
86
    } else {
Alessio Netti's avatar
Alessio Netti committed
87
88
89
90
	    return std::string("");
    }
}

91
void splitNodeList(const std::string& str, DCDB::NodeList& nl)
Alessio Netti's avatar
Alessio Netti committed
92
93
{
    nl.clear();
94
95
96
97
    std::string s1 = str;
    boost::regex r1("([^,[]+)(\\[[0-9,-]+\\])?(,|$)", boost::regex::extended);
    boost::smatch m1;
    while (boost::regex_search(s1, m1, r1)) {
98
99
100
101
		std::string hostBase = m1[1].str();
		
		if (m1[2].str().size() == 0) {
			nl.push_back(hostBase);
102
		} else {
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
			std::string s2 = m1[2].str();
			boost::regex r2("([0-9]+)-?([0-9]+)?(,|\\])", boost::regex::extended);
			boost::smatch m2;
			while (boost::regex_search(s2, m2, r2)) {
				if (m2[2] == "") {
					nl.push_back(hostBase + m2[1].str());
				} else {
					int start = atoi(m2[1].str().c_str());
					int stop = atoi(m2[2].str().c_str());
					for (int i=start; i<=stop; i++) {
						std::stringstream ss;
						ss << std::setw(m2[2].str().length()) << std::setfill('0') << i;
						nl.push_back(hostBase + ss.str());
					}
				}
				s2 = m2.suffix().str();
			}
120
		}
121
		s1 = m1.suffix().str();
122
123
124
    }
}

125
126
127
128
129
130
void convertNodeList(DCDB::NodeList& nl, std::string substitution) {
    //check if input has sed format of "s/.../.../" for substitution
    boost::regex  checkSubstitute("s([^\\\\]{1})([\\S|\\s]*)\\1([\\S|\\s]*)\\1");
    boost::smatch matchResults;
    
    if (regex_match(substitution, matchResults, checkSubstitute)) {
131
132
133
134
135
136
137
		//input has substitute format
		boost::regex re = (boost::regex(matchResults[2].str(), boost::regex_constants::extended));
		std::string fmt = matchResults[3].str();
		for (auto &n: nl) {
			 n = boost::regex_replace(n, re, fmt);
			 //std::cout << n <<" => " << mqtt << std::endl;
		}
138
139
140
    }
}

141
142
143
144
145
146
void splitHostList(const std::string& str, std::vector<std::string>& hl, char delim = ',')
{
    hl.clear();
    std::stringstream ss(str);
    std::string token;
    while (std::getline(ss, token, delim)) {
147
		hl.push_back(token);
148
149
150
    }
}

151
void pickRandomHost(std::vector<std::string>& hl, std::string& host, int& port, bool erase = false) {
152
153
154
155
    srand (time(NULL));
    int n = rand() % hl.size();
    host = parseNetworkHost(hl[n]);
    port = atoi(parseNetworkPort(hl[n]).c_str());
156
    if (erase) {
157
		hl.erase(hl.begin()+n);
158
    }
159
160
}

Micha Müller's avatar
Micha Müller committed
161
/**
162
163
164
 * Retrieves Slurm job data from environment variables and sends it to either a
 * CollectAgent or a Cassandra database. Job data can also be passed as command
 * line options.
Micha Müller's avatar
Micha Müller committed
165
 */
166
167
int main(int argc, char** argv) {
    std::cout << "dcdbslurmjob " << VERSION << std::endl << std::endl;
168

169
    bool cassandra = false;
170
171
172
173
    DCDB::Connection *  dcdbConn = nullptr;
    DCDB::JobDataStore *myJobDataStore = nullptr;
    struct mosquitto *  _mosq = nullptr;

174
175
    std::vector<std::string> hostList;
    std::string host = "", cassandraUser = "", cassandraPassword = "";
176
177
    int port;
    std::string nodelist="", jobId="", userId="", stepId="";
178
    std::string domainId = JOB_DEFAULT_DOMAIN;
179
    std::string substitution="";
180
    int maxJobLength = -1;
Michael Ott's avatar
Michael Ott committed
181
    int qos = 1;
182
    int timeout = 10;
Alessio Netti's avatar
Alessio Netti committed
183
    uint64_t ts=0;
184
    
Alessio Netti's avatar
Alessio Netti committed
185
    // Defining options
186
    const char *opts = "b:q:o:c:u:p:n:t:d:j:i:s:m:h";
Alessio Netti's avatar
Alessio Netti committed
187
188
189
190
191
192
193
194
195
196
197

    char ret;
    while ((ret = getopt(argc, argv, opts))!=-1) {
        switch (ret)
        {
            case 'h':
                usage();
                return 0;
            default:
                break;
        }
198
199
    }
    
Alessio Netti's avatar
Alessio Netti committed
200
201
202
203
204
205
    if (argc < 2) {
        std::cerr << "At least one argument is required: start or stop" << std::endl;
        return 1;
    } else if(!boost::iequals(argv[argc-1], "start") && !boost::iequals(argv[argc-1], "stop")) {
        std::cerr << "Unsupported action: must either be start or stop" << std::endl;
        return 1;
206
207
    }
    
Alessio Netti's avatar
Alessio Netti committed
208
209
210
    optind = 1;
    while ((ret=getopt(argc, argv, opts))!=-1) {
        switch(ret) {
Micha Müller's avatar
Micha Müller committed
211
		case 'b': {
212
			cassandra = false;
213
			splitHostList(optarg, hostList);
Micha Müller's avatar
Micha Müller committed
214
215
			break;
		}
216
217
218
		case 'q':
			qos = atoi(optarg);
			break;
219
220
221
		case 'o':
			timeout = atoi(optarg);
			break;
222
223
		case 'c':
			cassandra = true;
224
			splitHostList(optarg, hostList);
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
			break;
		case 'u':
			cassandra = true;
			cassandraUser = optarg;
			break;
		case 'p': {
			cassandra = true;
			cassandraPassword = optarg;
			// What does this do? Mask the password?
			size_t pwdLen = strlen(optarg);
			memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
			if (pwdLen > 3) {
				memset(optarg + 3, 0, pwdLen - 3);
			}
			break;
		}
Micha Müller's avatar
Micha Müller committed
241
	    case 'n':
242
243
244
245
246
			nodelist = optarg;
			break;
		case 't':
			ts = std::stoull(optarg);
			break;
247
248
249
        case 'd':
            domainId = optarg;
            break;
250
251
252
253
254
255
		case 'j':
			jobId = optarg;
			break;
		case 'i':
			userId = optarg;
			break;
256
	    case 's':
257
258
259
260
261
262
263
264
265
			substitution = optarg;
			if (substitution == "SNG") {
				substitution = "s%([fi][0-9]{2})(r[0-9]{2})(c[0-9]{2})(s[0-9]{2})%/sng/\\1/\\2/\\3/\\4%";
				maxJobLength = 48;
			} else if (substitution == "DEEPEST") {
				substitution = "s%dp-(cn|dam|esb)([0-9]{2})%/deepest/\\1/s\\2%";
				maxJobLength = 20;
			}
			break;
266
	    case 'm':
267
268
269
270
271
272
			maxJobLength = std::stoull(optarg);
			break;
		case 'h':
		default:
			usage();
			return 1;
Alessio Netti's avatar
Alessio Netti committed
273
274
        }
    }
275
276
    
    if (hostList.size() == 0) {
277
		hostList.push_back("localhost");
278
    }
Alessio Netti's avatar
Alessio Netti committed
279

280
281
    if (cassandra) {
	    //Allocate and initialize connection to Cassandra.
282
283
	    pickRandomHost(hostList, host, port);
	    if (port == 0) {
284
			port = 9042;
285
286
287
	    }
	
	    dcdbConn = new DCDB::Connection(host, port, cassandraUser, cassandraPassword);
288
	    if (!dcdbConn->connect()) {
289
		    std::cerr << "Cannot connect to Cassandra server " << host << ":" << port << std::endl;
290
291
		    return 1;
	    }
292
	    std::cout << "Connected to Cassandra server " << host << ":" << port << std::endl;
293
294
295
296
	    myJobDataStore = new DCDB::JobDataStore(dcdbConn);
    } else {
	    //Initialize Mosquitto library and connect to broker
	    char hostname[256];
Micha Müller's avatar
Micha Müller committed
297

298
299
300
301
302
303
304
305
306
307
308
309
	    if (gethostname(hostname, 255) != 0) {
		    std::cerr << "Cannot get hostname!" << std::endl;
		    return 1;
	    }
	    hostname[255] = '\0';
	    mosquitto_lib_init();
	    _mosq = mosquitto_new(hostname, false, NULL);
	    if (!_mosq) {
		    perror(NULL);
		    return 1;
	    }

310
311
312
313
314
315
316
317
318
319
320
321
322
323
	    int ret = MOSQ_ERR_UNKNOWN;
	    do {
		    pickRandomHost(hostList, host, port, true);
		    if (port == 0) {
			    port = 1883;
		    }
		    
		    if ((ret = mosquitto_connect(_mosq, host.c_str(), port, 1000)) != MOSQ_ERR_SUCCESS) {
			    std::cerr << "Could not connect to MQTT broker " << host << ":" << port << " (" << mosquitto_strerror(ret) << ")" <<std::endl;
		    } else {
			    std::cout << "Connected to MQTT broker " << host << ":" << port << ", using QoS " << qos << std::endl;
			    break;
		    }
	    } while (hostList.size() > 0);
324
	    
325
326
	    if (ret != MOSQ_ERR_SUCCESS) {
		    std::cout << "No more MQTT brokers left, aborting" << std::endl;
327
328
		    return 1;
	    }
Alessio Netti's avatar
Alessio Netti committed
329
    }
Micha Müller's avatar
Micha Müller committed
330
331

    //collect job data
Alessio Netti's avatar
Alessio Netti committed
332
    DCDB::JobData jd;
333
    int retCode = 0;
Micha Müller's avatar
Micha Müller committed
334

Alessio Netti's avatar
Alessio Netti committed
335
336
    if(ts==0)
        ts = getTimestamp();
337
    
338
    if(jobId=="") {
339
340
341
342
343
344
345
346
347
348
349
		jobId = getEnv("SLURM_JOB_ID");
		if (jobId == "") {
			jobId = getEnv("SLURM_JOBID");
		}

		stepId = getEnv("SLURM_STEP_ID");
		if (stepId=="") {
			stepId = getEnv("SLURM_STEPID");
		}
		if (stepId!="" && jobId!="")
			jobId = jobId + SLURM_JOBSTEP_SEP + stepId;
350
	}
351
	
Alessio Netti's avatar
Alessio Netti committed
352
    if (boost::iequals(argv[argc-1], "start")) {
353
354
355
356
357
358
        if(userId=="") {
			userId = getEnv("SLURM_JOB_USER");
			if(userId=="") {
				userId = getEnv("USER");
			}
		}
Alessio Netti's avatar
Alessio Netti committed
359
        
360
		if(nodelist=="") {
Alessio Netti's avatar
Alessio Netti committed
361
            nodelist = getEnv("SLURM_JOB_NODELIST");
362
363
364
365
	    	if(nodelist=="") {
				nodelist = getEnv("SLURM_NODELIST");
	    	}
		}
366

367
368
369
		DCDB::NodeList nl;
		splitNodeList(nodelist, nl);
		convertNodeList(nl, substitution);
370

371
		std::cout << "DOMAINID = " << domainId << std::endl;
372
		std::cout << "JOBID    = " << jobId << std::endl;
373
374
375
        std::cout << "USER     = " << userId << std::endl;
        std::cout << "START    = " << ts << std::endl;
        std::cout << "NODELIST = " << nodelist << std::endl;
376
377
378
379
380
381
382
383
384
385
		std::cout << "SUBST    = " << substitution << std::endl;
		if (maxJobLength >= 0) {
			std::cout << "JOBLEN   = " << maxJobLength << std::endl;
		}
		std::cout << "NODES    =";
		for (auto &n: nl) {
			std::cout << " " << n;
		}
		std::cout << std::endl;
		
Alessio Netti's avatar
Alessio Netti committed
386
        try {
387
            jd.domainId  = domainId;
388
389
            jd.jobId     = jobId;
            jd.userId    = userId;
Alessio Netti's avatar
Alessio Netti committed
390
            jd.startTime = DCDB::TimeStamp(ts);
Michael Ott's avatar
Michael Ott committed
391
            jd.endTime   = (maxJobLength >= 0) ? DCDB::TimeStamp((uint64_t) (ts + S_TO_NS((uint64_t)maxJobLength * 3600ull) + 1)) : DCDB::TimeStamp((uint64_t)0);
Alessio Netti's avatar
Alessio Netti committed
392
393
            jd.nodes     = nl;
        } catch(const std::invalid_argument& e) {
394
395
396
397
			std::cerr << "Invalid input format!" << std::endl;
			retCode = 1;
			goto exit;
		}
398

399
400
401
402
403
		if (cassandra && (myJobDataStore->insertJob(jd) != DCDB::JD_OK)) {
			std::cerr << "Job data insert failed!" << std::endl;
			retCode = 1;
			goto exit;
		}
Alessio Netti's avatar
Alessio Netti committed
404
    } else if (boost::iequals(argv[argc-1], "stop")) {
405
406
407
        std::cout << "DOMAINID = " << domainId << std::endl;
        std::cout << "JOBID    = " << jobId << std::endl;
        std::cout << "STOP     = " << ts << std::endl;
Alessio Netti's avatar
Alessio Netti committed
408
409
        
        try {
410
            jd.domainId = domainId;
411
412
413
414
			jd.jobId = jobId;
			jd.endTime = DCDB::TimeStamp(ts);
		} catch (const std::invalid_argument &e) {
			std::cerr << "Invalid input format!" << std::endl;
415
416
417
			retCode = 1;
			goto exit;
		}
418
419
420

		if (cassandra) {
			DCDB::JobData jobStart;
421
			if (myJobDataStore->getJobById(jobStart, jd.jobId, jd.domainId) != DCDB::JD_OK) {
422
423
424
425
				std::cerr << "Could not retrieve job to be updated!" << std::endl;
				retCode = 1;
				goto exit;
			}
426
			if (myJobDataStore->updateEndtime(jobStart.jobId, jobStart.startTime, jd.endTime, jd.domainId) != DCDB::JD_OK) {
427
428
429
430
				std::cerr << "Could not update end time of job!" << std::endl;
				retCode = 1;
				goto exit;
			}
431
		}
Micha Müller's avatar
Micha Müller committed
432
433
    }

434
435
436
437
    //Message sent to CollectAgent is independent of start/stop. We send the
    //same JSON in either case. CA does job insert or update depending
    //on job endtime value.
    if (!cassandra) {
Micha Müller's avatar
Micha Müller committed
438
439
440
441
442
443
	    //create job data string in JSON format
	    std::string                 payload = "";
	    std::string                 topic = "/DCDB_JOBDATA/"; //do not change or keep in sync with simplemqttservermessage.h
	    boost::property_tree::ptree config;
	    std::ostringstream          output;
	    config.clear();
444
        config.push_back(boost::property_tree::ptree::value_type("domainid", boost::property_tree::ptree(jd.domainId)));
Micha Müller's avatar
Micha Müller committed
445
446
447
448
449
450
451
452
453
454
455
456
457
	    config.push_back(boost::property_tree::ptree::value_type("jobid", boost::property_tree::ptree(jd.jobId)));
	    config.push_back(boost::property_tree::ptree::value_type("userid", boost::property_tree::ptree(jd.userId)));
	    config.push_back(boost::property_tree::ptree::value_type("starttime", boost::property_tree::ptree(std::to_string(jd.startTime.getRaw()))));
	    config.push_back(boost::property_tree::ptree::value_type("endtime", boost::property_tree::ptree(std::to_string(jd.endTime.getRaw()))));
	    boost::property_tree::ptree nodes;
	    for (const auto &n : jd.nodes) {
		    nodes.push_back(boost::property_tree::ptree::value_type("", boost::property_tree::ptree(n)));
	    }
	    config.push_back(boost::property_tree::ptree::value_type("nodes", nodes));
	    boost::property_tree::write_json(output, config, true);
	    payload = output.str();

	    //std::cout << "Payload:\n" << payload << std::endl;
458
459
460
	    
	    mosquitto_publish_callback_set(_mosq, publishCallback);
	    uint64_t startTs = getTimestamp();
461
	    int ret = MOSQ_ERR_UNKNOWN;
Micha Müller's avatar
Micha Müller committed
462
	    //send it to broker
463
464
	    if ((ret = mosquitto_publish(_mosq, &msgId, topic.c_str(), payload.length(), payload.c_str(), qos, false)) != MOSQ_ERR_SUCCESS) {
		    std::cerr << "Could not publish job data via MQTT: " << mosquitto_strerror(ret) << std::endl;
Micha Müller's avatar
Micha Müller committed
465
466
467
		    retCode = 1;
		    goto exit;
	    }
468
469
	    
	    do {
470
471
472
473
474
		    if ((ret = mosquitto_loop(_mosq, -1, 1)) != MOSQ_ERR_SUCCESS) {
			    std::cerr << "Error in mosquitto_loop: " << mosquitto_strerror(ret) << std::endl;
			    retCode = 1;
			    goto exit;
		    }
475
	    } while(!done && getTimestamp() - startTs < (uint64_t)S_TO_NS(timeout));
Alessio Netti's avatar
Alessio Netti committed
476
477
    }

Micha Müller's avatar
Micha Müller committed
478
479
//hasta la vista
exit:
480
481
482
483
484
485
486
487
488
	if (cassandra) {
		delete myJobDataStore;
		dcdbConn->disconnect();
		delete dcdbConn;
	} else {
		mosquitto_disconnect(_mosq);
		mosquitto_destroy(_mosq);
		mosquitto_lib_cleanup();
	}
Micha Müller's avatar
Micha Müller committed
489
	return retCode;
490
}