Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

dcdbslurmjob.cpp 11.6 KB
Newer Older
1
2
//================================================================================
// Name        : dcdbslurmjob.cpp
Micha Müller's avatar
Micha Müller committed
3
// Author      : Michael Ott, Micha Mueller
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Copyright   : Leibniz Supercomputing Centre
// Description : Main file of the dcdbslurmjob command line utility
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2011-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================

Micha Müller's avatar
Micha Müller committed
27
#include "../../common/include/globalconfiguration.h"
28
#include "timestamp.h"
Micha Müller's avatar
Micha Müller committed
29
30
31
32
#include <boost/algorithm/string.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>
#include <cstdlib>
33
#include <dcdb/connection.h>
Alessio Netti's avatar
Alessio Netti committed
34
#include <dcdb/jobdatastore.h>
Micha Müller's avatar
Micha Müller committed
35
36
#include <iostream>
#include <mosquitto.h>
Alessio Netti's avatar
Alessio Netti committed
37

38
39
40
41
42
43
44
45
46
47
48
49
#define SLURMJOBTIMEOUT 60000000000


int msgId = -1;
bool done = false;

void publishCallback(struct mosquitto *mosq, void *obj, int mid) {
	if(msgId != -1 && mid == msgId)
		done = true;
}


Alessio Netti's avatar
Alessio Netti committed
50
51
52
53
54
/*
 * Print usage information
 */
void usage() {
    std::cout << "Usage:" << std::endl;
Micha Müller's avatar
Micha Müller committed
55
    std::cout << "  dcdbslurmjob [-b<host>] [-t<timestamp>] [-n<nodelist>] [-j<jobid>] [-i<userid>] start|stop" << std::endl;
56
    std::cout << "  dcdbslurmjob [-c<host>] [-u<username>] [-p<password>] [-t<timestamp>] [-n<nodelist>] [-j<jobid>] [-i<userid>] start|stop" << std::endl;
Alessio Netti's avatar
Alessio Netti committed
57
58
59
60
    std::cout << "  dcdbslurmjob -h" << std::endl;
    std::cout << std::endl;

    std::cout << "Options:" << std::endl;
Micha Müller's avatar
Micha Müller committed
61
    std::cout << "  -b<host>      MQTT broker              [default: 127.0.0.1:1883]" << std::endl;
62
63
64
    std::cout << "  -c<host>      Cassandra host           [default: 127.0.0.1:9042]" << std::endl;
    std::cout << "  -u<username>  Cassandra username       [default: none]" << std::endl;
    std::cout << "  -p<password>  Cassandra password       [default: none]" << std::endl;
Alessio Netti's avatar
Alessio Netti committed
65
66
67
68
69
70
71
    std::cout << "  -t<timestamp> Timestamp value          [default: now]" << std::endl;
    std::cout << "  -n<nodelist>  Comma-separated nodelist [default: SLURM_JOB_NODELIST]" << std::endl;
    std::cout << "  -j<jobid>     Numerical job id         [default: SLURM_JOB_ID var]" << std::endl;
    std::cout << "  -i<userid>    Numerical user id        [default: SLURM_JOB_USER var]" << std::endl;
    std::cout << std::endl;
    std::cout << "  -h            This help page" << std::endl;
    std::cout << std::endl;
72
    std::cout << "Options -b and -c|u|p are mutual exclusive! If both are specified, the latter takes precedence. By default MQTT broker is specified." << std::endl;
Alessio Netti's avatar
Alessio Netti committed
73
}
74
75
76
77

std::string getEnv(const char* var) {
    char* str = std::getenv(var);
    if (str != NULL) {
Alessio Netti's avatar
Alessio Netti committed
78
	    return std::string(str);
79
    } else {
Alessio Netti's avatar
Alessio Netti committed
80
81
82
83
84
85
86
87
88
89
90
	    return std::string("");
    }
}

void splitNodeList(const std::string& str, DCDB::NodeList& nl, char delim = ',')
{
    nl.clear();
    std::stringstream ss(str);
    std::string token;
    while (std::getline(ss, token, delim)) {
        nl.push_back(token);
91
92
93
    }
}

Micha Müller's avatar
Micha Müller committed
94
/**
95
96
97
 * Retrieves Slurm job data from environment variables and sends it to either a
 * CollectAgent or a Cassandra database. Job data can also be passed as command
 * line options.
Micha Müller's avatar
Micha Müller committed
98
 */
99
100
int main(int argc, char** argv) {
    std::cout << "dcdbslurmjob " << VERSION << std::endl << std::endl;
101
102
103
104
105
106
107

    bool                cassandra = false;
    DCDB::Connection *  dcdbConn = nullptr;
    DCDB::JobDataStore *myJobDataStore = nullptr;
    struct mosquitto *  _mosq = nullptr;

    std::string host = "127.0.0.1", cassandraPort = "9042", cassandraUser = "", cassandraPassword = "";
Micha Müller's avatar
Micha Müller committed
108
    int         brokerPort = 1883;
Alessio Netti's avatar
Alessio Netti committed
109
110
    std::string nodelist="", jobId="", userId="";
    uint64_t ts=0;
111
    
Alessio Netti's avatar
Alessio Netti committed
112
    // Defining options
113
    const char *opts = "b:c:u:p:n:t:j:i:h";
Alessio Netti's avatar
Alessio Netti committed
114
115
116
117
118
119
120
121
122
123
124

    char ret;
    while ((ret = getopt(argc, argv, opts))!=-1) {
        switch (ret)
        {
            case 'h':
                usage();
                return 0;
            default:
                break;
        }
125
126
    }
    
Alessio Netti's avatar
Alessio Netti committed
127
128
129
130
131
132
    if (argc < 2) {
        std::cerr << "At least one argument is required: start or stop" << std::endl;
        return 1;
    } else if(!boost::iequals(argv[argc-1], "start") && !boost::iequals(argv[argc-1], "stop")) {
        std::cerr << "Unsupported action: must either be start or stop" << std::endl;
        return 1;
133
134
    }
    
Alessio Netti's avatar
Alessio Netti committed
135
136
137
    optind = 1;
    while ((ret=getopt(argc, argv, opts))!=-1) {
        switch(ret) {
Micha Müller's avatar
Micha Müller committed
138
		case 'b': {
139
140
			cassandra = false;
			host = parseNetworkHost(optarg);
Micha Müller's avatar
Micha Müller committed
141
142
143
144
145
146
147
148
			std::string port = parseNetworkPort(optarg);
			if (port != "") {
				brokerPort = std::stoi(port);
			} else {
				brokerPort = 1883;
			}
			break;
		}
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
		case 'c':
			cassandra = true;
			host = parseNetworkHost(optarg);
			cassandraPort = parseNetworkPort(optarg);
			if (cassandraPort == "")
				cassandraPort = std::string("9042");
			break;
		case 'u':
			cassandra = true;
			cassandraUser = optarg;
			break;
		case 'p': {
			cassandra = true;
			cassandraPassword = optarg;
			// What does this do? Mask the password?
			size_t pwdLen = strlen(optarg);
			memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
			if (pwdLen > 3) {
				memset(optarg + 3, 0, pwdLen - 3);
			}
			break;
		}
Micha Müller's avatar
Micha Müller committed
171
	    case 'n':
Alessio Netti's avatar
Alessio Netti committed
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
                nodelist = optarg;
                break;
            case 't':
                ts = std::stoull(optarg);
                break;
            case 'j':
                jobId = optarg;
                break;
            case 'i':
                userId = optarg;
                break;
            case 'h':
            default:
                usage();
                return 1;
        }
    }

190
191
192
    if (cassandra) {
	    //Allocate and initialize connection to Cassandra.
	    dcdbConn = new DCDB::Connection(host, atoi(cassandraPort.c_str()), cassandraUser, cassandraPassword);
Micha Müller's avatar
Micha Müller committed
193

194
195
196
197
198
199
200
201
	    if (!dcdbConn->connect()) {
		    std::cerr << "Cannot connect to Cassandra!" << std::endl;
		    return 1;
	    }
	    myJobDataStore = new DCDB::JobDataStore(dcdbConn);
    } else {
	    //Initialize Mosquitto library and connect to broker
	    char hostname[256];
Micha Müller's avatar
Micha Müller committed
202

203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
	    if (gethostname(hostname, 255) != 0) {
		    std::cerr << "Cannot get hostname!" << std::endl;
		    return 1;
	    }
	    hostname[255] = '\0';
	    mosquitto_lib_init();
	    _mosq = mosquitto_new(hostname, false, NULL);
	    if (!_mosq) {
		    perror(NULL);
		    return 1;
	    }

	    if (mosquitto_connect(_mosq, host.c_str(), brokerPort, 1000) != MOSQ_ERR_SUCCESS) {
		    std::cerr << "Could not connect to MQTT broker " << host << ":" << std::to_string(brokerPort) << std::endl;
		    return 1;
	    }
Alessio Netti's avatar
Alessio Netti committed
219
    }
Micha Müller's avatar
Micha Müller committed
220
221

    //collect job data
Alessio Netti's avatar
Alessio Netti committed
222
    DCDB::JobData jd;
Micha Müller's avatar
Micha Müller committed
223
224
    int           retCode = 0;

Alessio Netti's avatar
Alessio Netti committed
225
226
    if(ts==0)
        ts = getTimestamp();
227
    if(jobId=="") {
Alessio Netti's avatar
Alessio Netti committed
228
        jobId = getEnv("SLURM_JOB_ID");
229
230
231
232
	if(jobId=="") {
	    jobId = getEnv("SLURM_JOBID");
	}
    }
Alessio Netti's avatar
Alessio Netti committed
233
234
235
236
237
    
    if (boost::iequals(argv[argc-1], "start")) {
        
        if(userId=="")
            userId = getEnv("SLURM_JOB_USER");
238
        if(nodelist=="") {
Alessio Netti's avatar
Alessio Netti committed
239
            nodelist = getEnv("SLURM_JOB_NODELIST");
240
241
242
243
244
	    if(nodelist=="") {
		nodelist = getEnv("SLURM_NODELIST");
	    }
	}

Alessio Netti's avatar
Alessio Netti committed
245
246
247
248
249
250
251
252
253
        std::cout << "JOBID = " << jobId << std::endl;
        std::cout << "USER  = " << userId << std::endl;
        std::cout << "START = " << ts << std::endl;
        std::cout << "NODES = " << nodelist << std::endl;
        
        DCDB::NodeList nl;
        splitNodeList(nodelist, nl, ',');
        
        try {
254
255
            jd.jobId     = jobId;
            jd.userId    = userId;
Alessio Netti's avatar
Alessio Netti committed
256
257
258
259
            jd.startTime = DCDB::TimeStamp(ts);
            jd.endTime   = DCDB::TimeStamp((uint64_t)0);
            jd.nodes     = nl;
        } catch(const std::invalid_argument& e) {
260
261
262
263
264
265
266
267
268
		std::cerr << "Invalid input format!" << std::endl;
		retCode = 1;
		goto exit;
	}

	if (cassandra && (myJobDataStore->insertJob(jd) != DCDB::JD_OK)) {
		std::cerr << "Job data insert failed!" << std::endl;
		retCode = 1;
		goto exit;
Micha Müller's avatar
Micha Müller committed
269
	}
Alessio Netti's avatar
Alessio Netti committed
270
271
272
273
274
275
    } else if (boost::iequals(argv[argc-1], "stop")) {
        
        std::cout << "JOBID = " << jobId << std::endl;
        std::cout << "STOP  = " << ts << std::endl;
        
        try {
Micha Müller's avatar
Micha Müller committed
276
277
		jd.jobId = jobId;
		jd.endTime = DCDB::TimeStamp(ts);
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
	} catch (const std::invalid_argument &e) {
		std::cerr << "Invalid input format!" << std::endl;
		retCode = 1;
		goto exit;
	}

	if (cassandra) {
		DCDB::JobData jobStart;
		if (myJobDataStore->getJobById(jobStart, jd.jobId) != DCDB::JD_OK) {
			std::cerr << "Could not retrieve job to be updated!" << std::endl;
			retCode = 1;
			goto exit;
		}
		if (myJobDataStore->updateEndtime(jobStart.jobId, jobStart.startTime, jd.endTime) != DCDB::JD_OK) {
			std::cerr << "Could not update end time of job!" << std::endl;
			retCode = 1;
			goto exit;
		}
Micha Müller's avatar
Micha Müller committed
296
297
298
	}
    }

299
300
301
302
    //Message sent to CollectAgent is independent of start/stop. We send the
    //same JSON in either case. CA does job insert or update depending
    //on job endtime value.
    if (!cassandra) {
Micha Müller's avatar
Micha Müller committed
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
	    //create job data string in JSON format
	    std::string                 payload = "";
	    std::string                 topic = "/DCDB_JOBDATA/"; //do not change or keep in sync with simplemqttservermessage.h
	    boost::property_tree::ptree config;
	    std::ostringstream          output;
	    config.clear();
	    config.push_back(boost::property_tree::ptree::value_type("jobid", boost::property_tree::ptree(jd.jobId)));
	    config.push_back(boost::property_tree::ptree::value_type("userid", boost::property_tree::ptree(jd.userId)));
	    config.push_back(boost::property_tree::ptree::value_type("starttime", boost::property_tree::ptree(std::to_string(jd.startTime.getRaw()))));
	    config.push_back(boost::property_tree::ptree::value_type("endtime", boost::property_tree::ptree(std::to_string(jd.endTime.getRaw()))));
	    boost::property_tree::ptree nodes;
	    for (const auto &n : jd.nodes) {
		    nodes.push_back(boost::property_tree::ptree::value_type("", boost::property_tree::ptree(n)));
	    }
	    config.push_back(boost::property_tree::ptree::value_type("nodes", nodes));
	    boost::property_tree::write_json(output, config, true);
	    payload = output.str();

	    //std::cout << "Payload:\n" << payload << std::endl;
322
323
324
	    
	    mosquitto_publish_callback_set(_mosq, publishCallback);
	    uint64_t startTs = getTimestamp();
Micha Müller's avatar
Micha Müller committed
325
	    //send it to broker
326
	    if (mosquitto_publish(_mosq, &msgId, topic.c_str(), payload.length(), payload.c_str(), 1, false) != MOSQ_ERR_SUCCESS) {
327
		    std::cerr << "Broker not reachable! Job data was not published." << std::endl;
Micha Müller's avatar
Micha Müller committed
328
329
330
		    retCode = 1;
		    goto exit;
	    }
331
332
333
334
335
336
337
338
	    
	    do {
			if (mosquitto_loop(_mosq, -1, 1) != MOSQ_ERR_SUCCESS) {
				std::cerr << "Error in mosquitto_loop!" << std::endl;
				retCode = 1;
				goto exit;
			}
		} while(!done && getTimestamp() - startTs < SLURMJOBTIMEOUT);
Alessio Netti's avatar
Alessio Netti committed
339
340
    }

Micha Müller's avatar
Micha Müller committed
341
342
//hasta la vista
exit:
343
344
345
346
347
348
349
350
351
	if (cassandra) {
		delete myJobDataStore;
		dcdbConn->disconnect();
		delete dcdbConn;
	} else {
		mosquitto_disconnect(_mosq);
		mosquitto_destroy(_mosq);
		mosquitto_lib_cleanup();
	}
Micha Müller's avatar
Micha Müller committed
352
	return retCode;
353
}