Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

dcdbslurmjob.cpp 15.3 KB
Newer Older
1
2
//================================================================================
// Name        : dcdbslurmjob.cpp
Micha Müller's avatar
Micha Müller committed
3
// Author      : Michael Ott, Micha Mueller
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Copyright   : Leibniz Supercomputing Centre
// Description : Main file of the dcdbslurmjob command line utility
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2011-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================

Micha Müller's avatar
Micha Müller committed
27
#include "../../common/include/globalconfiguration.h"
28
#include "timestamp.h"
Micha Müller's avatar
Micha Müller committed
29
#include <boost/algorithm/string.hpp>
30
#include <boost/regex.hpp>
Micha Müller's avatar
Micha Müller committed
31
32
33
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>
#include <cstdlib>
34
#include <dcdb/connection.h>
Alessio Netti's avatar
Alessio Netti committed
35
#include <dcdb/jobdatastore.h>
Micha Müller's avatar
Micha Müller committed
36
37
#include <iostream>
#include <mosquitto.h>
38
39
40
#include "dcdb/version.h"
#include "version.h"

41
42
43
44
45
46
47
48
49
int msgId = -1;
bool done = false;

void publishCallback(struct mosquitto *mosq, void *obj, int mid) {
	if(msgId != -1 && mid == msgId)
		done = true;
}


Alessio Netti's avatar
Alessio Netti committed
50
51
52
53
54
/*
 * Print usage information
 */
void usage() {
    std::cout << "Usage:" << std::endl;
Micha Müller's avatar
Micha Müller committed
55
    std::cout << "  dcdbslurmjob [-b<host>] [-t<timestamp>] [-n<nodelist>] [-j<jobid>] [-i<userid>] start|stop" << std::endl;
56
    std::cout << "  dcdbslurmjob [-c<host>] [-u<username>] [-p<password>] [-t<timestamp>] [-n<nodelist>] [-j<jobid>] [-i<userid>] [-s<pattern>] start|stop" << std::endl;
Alessio Netti's avatar
Alessio Netti committed
57
58
59
60
    std::cout << "  dcdbslurmjob -h" << std::endl;
    std::cout << std::endl;

    std::cout << "Options:" << std::endl;
61
    std::cout << "  -b<hosts>     List of MQTT brokers           [default: localhost:1883]" << std::endl;
62
    std::cout << "  -q<qos>       MQTT QoS to use                [default: 0]" << std::endl;
63
    std::cout << "  -o<timeout>   MQTT timeout in seconds        [default: 10]" << std::endl;
64
    std::cout << "  -c<hosts>     List of Cassandra hosts        [default: none]" << std::endl;
65
66
67
68
69
70
71
    std::cout << "  -u<username>  Cassandra username             [default: none]" << std::endl;
    std::cout << "  -p<password>  Cassandra password             [default: none]" << std::endl;
    std::cout << "  -t<timestamp> Timestamp value                [default: now]" << std::endl;
    std::cout << "  -n<nodelist>  Comma-separated nodelist       [default: SLURM_JOB_NODELIST]" << std::endl;
    std::cout << "  -j<jobid>     Numerical job id               [default: SLURM_JOB_ID var]" << std::endl;
    std::cout << "  -i<userid>    Numerical user id              [default: SLURM_JOB_USER var]" << std::endl;
    std::cout << "  -s<pattern>   Nodelist substitution pattern  [default: none]" << std::endl;
72
    std::cout << "  -m<pattern>   Maximum job length in h        [default: none]" << std::endl;
Alessio Netti's avatar
Alessio Netti committed
73
74
75
    std::cout << std::endl;
    std::cout << "  -h            This help page" << std::endl;
    std::cout << std::endl;
76
    std::cout << "Options -b and -c|u|p are mutual exclusive! If both are specified, the latter takes precedence. By default MQTT broker is specified." << std::endl;
Alessio Netti's avatar
Alessio Netti committed
77
}
78
79
80
81

std::string getEnv(const char* var) {
    char* str = std::getenv(var);
    if (str != NULL) {
Alessio Netti's avatar
Alessio Netti committed
82
	    return std::string(str);
83
    } else {
Alessio Netti's avatar
Alessio Netti committed
84
85
86
87
	    return std::string("");
    }
}

88
void splitNodeList(const std::string& str, DCDB::NodeList& nl)
Alessio Netti's avatar
Alessio Netti committed
89
90
{
    nl.clear();
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
    std::string s1 = str;
    boost::regex r1("([^,[]+)(\\[[0-9,-]+\\])?(,|$)", boost::regex::extended);
    boost::smatch m1;
    while (boost::regex_search(s1, m1, r1)) {
	std::string hostBase = m1[1].str();
	
	if (m1[2].str().size() == 0) {
	    nl.push_back(hostBase);
	} else {
	    std::string s2 = m1[2].str();
	    boost::regex r2("([0-9]+)-?([0-9]+)?(,|\\])", boost::regex::extended);
	    boost::smatch m2;
	    while (boost::regex_search(s2, m2, r2)) {
		if (m2[2] == "") {
		    nl.push_back(hostBase + m2[1].str());
		} else {
		    int start = atoi(m2[1].str().c_str());
		    int stop = atoi(m2[2].str().c_str());
		    for (int i=start; i<=stop; i++) {
			std::stringstream ss;
			ss << std::setw(m2[2].str().length()) << std::setfill('0') << i;
			nl.push_back(hostBase + ss.str());
		    }
		}
		s2 = m2.suffix().str();
	    }
	}
	s1 = m1.suffix().str();
119
120
121
    }
}

122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
void convertNodeList(DCDB::NodeList& nl, std::string substitution) {
    //check if input has sed format of "s/.../.../" for substitution
    boost::regex  checkSubstitute("s([^\\\\]{1})([\\S|\\s]*)\\1([\\S|\\s]*)\\1");
    boost::smatch matchResults;
    
    if (regex_match(substitution, matchResults, checkSubstitute)) {
	//input has substitute format
	boost::regex re = (boost::regex(matchResults[2].str(), boost::regex_constants::extended));
	std::string fmt = matchResults[3].str();
	for (auto &n: nl) {
	     n = boost::regex_replace(n, re, fmt);
	     //std::cout << n <<" => " << mqtt << std::endl;
	 }
    }
}

138
139
140
141
142
143
144
145
146
147
void splitHostList(const std::string& str, std::vector<std::string>& hl, char delim = ',')
{
    hl.clear();
    std::stringstream ss(str);
    std::string token;
    while (std::getline(ss, token, delim)) {
	hl.push_back(token);
    }
}

148
void pickRandomHost(std::vector<std::string>& hl, std::string& host, int& port, bool erase = false) {
149
150
151
152
    srand (time(NULL));
    int n = rand() % hl.size();
    host = parseNetworkHost(hl[n]);
    port = atoi(parseNetworkPort(hl[n]).c_str());
153
154
155
    if (erase) {
	hl.erase(hl.begin()+n);
    }
156
157
}

Micha Müller's avatar
Micha Müller committed
158
/**
159
160
161
 * Retrieves Slurm job data from environment variables and sends it to either a
 * CollectAgent or a Cassandra database. Job data can also be passed as command
 * line options.
Micha Müller's avatar
Micha Müller committed
162
 */
163
164
int main(int argc, char** argv) {
    std::cout << "dcdbslurmjob " << VERSION << std::endl << std::endl;
165
166
167
168
169
170

    bool                cassandra = false;
    DCDB::Connection *  dcdbConn = nullptr;
    DCDB::JobDataStore *myJobDataStore = nullptr;
    struct mosquitto *  _mosq = nullptr;

171
172
173
    std::vector<std::string> hostList;
    std::string host = "", cassandraUser = "", cassandraPassword = "";
    int         port;
Alessio Netti's avatar
Alessio Netti committed
174
    std::string nodelist="", jobId="", userId="";
175
    std::string substitution="";
176
    int maxJobLength = -1;
177
    int qos = 0;
178
    int timeout = 10;
Alessio Netti's avatar
Alessio Netti committed
179
    uint64_t ts=0;
180
    
Alessio Netti's avatar
Alessio Netti committed
181
    // Defining options
182
    const char *opts = "b:q:o:c:u:p:n:t:j:i:s:m:h";
Alessio Netti's avatar
Alessio Netti committed
183
184
185
186
187
188
189
190
191
192
193

    char ret;
    while ((ret = getopt(argc, argv, opts))!=-1) {
        switch (ret)
        {
            case 'h':
                usage();
                return 0;
            default:
                break;
        }
194
195
    }
    
Alessio Netti's avatar
Alessio Netti committed
196
197
198
199
200
201
    if (argc < 2) {
        std::cerr << "At least one argument is required: start or stop" << std::endl;
        return 1;
    } else if(!boost::iequals(argv[argc-1], "start") && !boost::iequals(argv[argc-1], "stop")) {
        std::cerr << "Unsupported action: must either be start or stop" << std::endl;
        return 1;
202
203
    }
    
Alessio Netti's avatar
Alessio Netti committed
204
205
206
    optind = 1;
    while ((ret=getopt(argc, argv, opts))!=-1) {
        switch(ret) {
Micha Müller's avatar
Micha Müller committed
207
		case 'b': {
208
			cassandra = false;
209
			splitHostList(optarg, hostList);
Micha Müller's avatar
Micha Müller committed
210
211
			break;
		}
212
213
214
		case 'q':
			qos = atoi(optarg);
			break;
215
216
217
		case 'o':
			timeout = atoi(optarg);
			break;
218
219
		case 'c':
			cassandra = true;
220
			splitHostList(optarg, hostList);
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
			break;
		case 'u':
			cassandra = true;
			cassandraUser = optarg;
			break;
		case 'p': {
			cassandra = true;
			cassandraPassword = optarg;
			// What does this do? Mask the password?
			size_t pwdLen = strlen(optarg);
			memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
			if (pwdLen > 3) {
				memset(optarg + 3, 0, pwdLen - 3);
			}
			break;
		}
Micha Müller's avatar
Micha Müller committed
237
	    case 'n':
Alessio Netti's avatar
Alessio Netti committed
238
239
240
241
242
243
244
245
246
247
248
                nodelist = optarg;
                break;
            case 't':
                ts = std::stoull(optarg);
                break;
            case 'j':
                jobId = optarg;
                break;
            case 'i':
                userId = optarg;
                break;
249
	    case 's':
250
251
252
		substitution = optarg;
		if (substitution == "SNG") {
			substitution = "s%([fi][0-9]{2})(r[0-9]{2})(c[0-9]{2})(s[0-9]{2})%/sng/\\1/\\2/\\3/\\4%";
253
			maxJobLength = 48;
254
		}
255
		break;
256
257
258
	    case 'm':
		maxJobLength = std::stoull(optarg);
		break;
Alessio Netti's avatar
Alessio Netti committed
259
260
261
262
263
264
            case 'h':
            default:
                usage();
                return 1;
        }
    }
265
266
267
268
    
    if (hostList.size() == 0) {
	hostList.push_back("localhost");
    }
Alessio Netti's avatar
Alessio Netti committed
269

270
271
    if (cassandra) {
	    //Allocate and initialize connection to Cassandra.
272
273
274
275
276
277
	    pickRandomHost(hostList, host, port);
	    if (port == 0) {
		port = 9042;
	    }
	
	    dcdbConn = new DCDB::Connection(host, port, cassandraUser, cassandraPassword);
278
	    if (!dcdbConn->connect()) {
279
		    std::cerr << "Cannot connect to Cassandra server " << host << ":" << port << std::endl;
280
281
		    return 1;
	    }
282
	    std::cout << "Connected to Cassandra server " << host << ":" << port << std::endl;
283
284
285
286
	    myJobDataStore = new DCDB::JobDataStore(dcdbConn);
    } else {
	    //Initialize Mosquitto library and connect to broker
	    char hostname[256];
Micha Müller's avatar
Micha Müller committed
287

288
289
290
291
292
293
294
295
296
297
298
299
	    if (gethostname(hostname, 255) != 0) {
		    std::cerr << "Cannot get hostname!" << std::endl;
		    return 1;
	    }
	    hostname[255] = '\0';
	    mosquitto_lib_init();
	    _mosq = mosquitto_new(hostname, false, NULL);
	    if (!_mosq) {
		    perror(NULL);
		    return 1;
	    }

300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
	    int ret = MOSQ_ERR_UNKNOWN;
	    do {
		    pickRandomHost(hostList, host, port, true);
		    if (port == 0) {
			    port = 1883;
		    }
		    
		    if ((ret = mosquitto_connect(_mosq, host.c_str(), port, 1000)) != MOSQ_ERR_SUCCESS) {
			    std::cerr << "Could not connect to MQTT broker " << host << ":" << port << " (" << mosquitto_strerror(ret) << ")" <<std::endl;
		    } else {
			    std::cout << "Connected to MQTT broker " << host << ":" << port << ", using QoS " << qos << std::endl;
			    break;
		    }
	    } while (hostList.size() > 0);
	    if (ret != MOSQ_ERR_SUCCESS) {
		    std::cout << "No more MQTT brokers left, aborting" << std::endl;
316
317
		    return 1;
	    }
Alessio Netti's avatar
Alessio Netti committed
318
    }
Micha Müller's avatar
Micha Müller committed
319
320

    //collect job data
Alessio Netti's avatar
Alessio Netti committed
321
    DCDB::JobData jd;
Micha Müller's avatar
Micha Müller committed
322
323
    int           retCode = 0;

Alessio Netti's avatar
Alessio Netti committed
324
325
    if(ts==0)
        ts = getTimestamp();
326
    if(jobId=="") {
Alessio Netti's avatar
Alessio Netti committed
327
        jobId = getEnv("SLURM_JOB_ID");
328
329
330
331
	if(jobId=="") {
	    jobId = getEnv("SLURM_JOBID");
	}
    }
Alessio Netti's avatar
Alessio Netti committed
332
333
334
335
336
    
    if (boost::iequals(argv[argc-1], "start")) {
        
        if(userId=="")
            userId = getEnv("SLURM_JOB_USER");
337
        if(nodelist=="") {
Alessio Netti's avatar
Alessio Netti committed
338
            nodelist = getEnv("SLURM_JOB_NODELIST");
339
340
341
342
343
	    if(nodelist=="") {
		nodelist = getEnv("SLURM_NODELIST");
	    }
	}

344
345
	DCDB::NodeList nl;
	splitNodeList(nodelist, nl);
346
	convertNodeList(nl, substitution);
347
348
349
350
351

	std::cout << "JOBID    = " << jobId << std::endl;
        std::cout << "USER     = " << userId << std::endl;
        std::cout << "START    = " << ts << std::endl;
        std::cout << "NODELIST = " << nodelist << std::endl;
352
	std::cout << "SUBST    = " << substitution << std::endl;
353
354
355
	if (maxJobLength >= 0) {
	    std::cout << "JOBLEN   = " << maxJobLength << std::endl;
	}
356
357
358
359
360
361
	std::cout << "NODES    =";
	for (auto &n: nl) {
	    std::cout << " " << n;
	}
	std::cout << std::endl;

362
363
	
	
Alessio Netti's avatar
Alessio Netti committed
364
        try {
365
366
            jd.jobId     = jobId;
            jd.userId    = userId;
Alessio Netti's avatar
Alessio Netti committed
367
            jd.startTime = DCDB::TimeStamp(ts);
Michael Ott's avatar
Michael Ott committed
368
            jd.endTime   = (maxJobLength >= 0) ? DCDB::TimeStamp((uint64_t) (ts + S_TO_NS((uint64_t)maxJobLength * 3600ull) + 1)) : DCDB::TimeStamp((uint64_t)0);
Alessio Netti's avatar
Alessio Netti committed
369
370
            jd.nodes     = nl;
        } catch(const std::invalid_argument& e) {
371
372
373
374
375
376
377
378
379
		std::cerr << "Invalid input format!" << std::endl;
		retCode = 1;
		goto exit;
	}

	if (cassandra && (myJobDataStore->insertJob(jd) != DCDB::JD_OK)) {
		std::cerr << "Job data insert failed!" << std::endl;
		retCode = 1;
		goto exit;
Micha Müller's avatar
Micha Müller committed
380
	}
Alessio Netti's avatar
Alessio Netti committed
381
382
383
384
385
386
    } else if (boost::iequals(argv[argc-1], "stop")) {
        
        std::cout << "JOBID = " << jobId << std::endl;
        std::cout << "STOP  = " << ts << std::endl;
        
        try {
Micha Müller's avatar
Micha Müller committed
387
388
		jd.jobId = jobId;
		jd.endTime = DCDB::TimeStamp(ts);
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
	} catch (const std::invalid_argument &e) {
		std::cerr << "Invalid input format!" << std::endl;
		retCode = 1;
		goto exit;
	}

	if (cassandra) {
		DCDB::JobData jobStart;
		if (myJobDataStore->getJobById(jobStart, jd.jobId) != DCDB::JD_OK) {
			std::cerr << "Could not retrieve job to be updated!" << std::endl;
			retCode = 1;
			goto exit;
		}
		if (myJobDataStore->updateEndtime(jobStart.jobId, jobStart.startTime, jd.endTime) != DCDB::JD_OK) {
			std::cerr << "Could not update end time of job!" << std::endl;
			retCode = 1;
			goto exit;
		}
Micha Müller's avatar
Micha Müller committed
407
408
409
	}
    }

410
411
412
413
    //Message sent to CollectAgent is independent of start/stop. We send the
    //same JSON in either case. CA does job insert or update depending
    //on job endtime value.
    if (!cassandra) {
Micha Müller's avatar
Micha Müller committed
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
	    //create job data string in JSON format
	    std::string                 payload = "";
	    std::string                 topic = "/DCDB_JOBDATA/"; //do not change or keep in sync with simplemqttservermessage.h
	    boost::property_tree::ptree config;
	    std::ostringstream          output;
	    config.clear();
	    config.push_back(boost::property_tree::ptree::value_type("jobid", boost::property_tree::ptree(jd.jobId)));
	    config.push_back(boost::property_tree::ptree::value_type("userid", boost::property_tree::ptree(jd.userId)));
	    config.push_back(boost::property_tree::ptree::value_type("starttime", boost::property_tree::ptree(std::to_string(jd.startTime.getRaw()))));
	    config.push_back(boost::property_tree::ptree::value_type("endtime", boost::property_tree::ptree(std::to_string(jd.endTime.getRaw()))));
	    boost::property_tree::ptree nodes;
	    for (const auto &n : jd.nodes) {
		    nodes.push_back(boost::property_tree::ptree::value_type("", boost::property_tree::ptree(n)));
	    }
	    config.push_back(boost::property_tree::ptree::value_type("nodes", nodes));
	    boost::property_tree::write_json(output, config, true);
	    payload = output.str();

	    //std::cout << "Payload:\n" << payload << std::endl;
433
434
435
	    
	    mosquitto_publish_callback_set(_mosq, publishCallback);
	    uint64_t startTs = getTimestamp();
436
	    int ret = MOSQ_ERR_UNKNOWN;
Micha Müller's avatar
Micha Müller committed
437
	    //send it to broker
438
439
	    if ((ret = mosquitto_publish(_mosq, &msgId, topic.c_str(), payload.length(), payload.c_str(), qos, false)) != MOSQ_ERR_SUCCESS) {
		    std::cerr << "Could not publish job data via MQTT: " << mosquitto_strerror(ret) << std::endl;
Micha Müller's avatar
Micha Müller committed
440
441
442
		    retCode = 1;
		    goto exit;
	    }
443
444
	    
	    do {
445
446
447
448
449
		    if ((ret = mosquitto_loop(_mosq, -1, 1)) != MOSQ_ERR_SUCCESS) {
			    std::cerr << "Error in mosquitto_loop: " << mosquitto_strerror(ret) << std::endl;
			    retCode = 1;
			    goto exit;
		    }
450
	    } while(!done && getTimestamp() - startTs < S_TO_NS(timeout));
Alessio Netti's avatar
Alessio Netti committed
451
452
    }

Micha Müller's avatar
Micha Müller committed
453
454
//hasta la vista
exit:
455
456
457
458
459
460
461
462
463
	if (cassandra) {
		delete myJobDataStore;
		dcdbConn->disconnect();
		delete dcdbConn;
	} else {
		mosquitto_disconnect(_mosq);
		mosquitto_destroy(_mosq);
		mosquitto_lib_cleanup();
	}
Micha Müller's avatar
Micha Müller committed
464
	return retCode;
465
}