Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

dcdbslurmjob.cpp 11.5 KB
Newer Older
1
2
//================================================================================
// Name        : dcdbslurmjob.cpp
Micha Müller's avatar
Micha Müller committed
3
// Author      : Michael Ott, Micha Mueller
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Copyright   : Leibniz Supercomputing Centre
// Description : Main file of the dcdbslurmjob command line utility
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2011-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================

Micha Müller's avatar
Micha Müller committed
27
#include "../../common/include/globalconfiguration.h"
28
#include "timestamp.h"
Micha Müller's avatar
Micha Müller committed
29
30
31
32
#include <boost/algorithm/string.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>
#include <cstdlib>
33
#include <dcdb/connection.h>
Alessio Netti's avatar
Alessio Netti committed
34
#include <dcdb/jobdatastore.h>
Micha Müller's avatar
Micha Müller committed
35
36
#include <iostream>
#include <mosquitto.h>
Alessio Netti's avatar
Alessio Netti committed
37

38
39
40
41
42
43
44
45
46
47
48
49
#define SLURMJOBTIMEOUT 60000000000


int msgId = -1;
bool done = false;

void publishCallback(struct mosquitto *mosq, void *obj, int mid) {
	if(msgId != -1 && mid == msgId)
		done = true;
}


Alessio Netti's avatar
Alessio Netti committed
50
51
52
53
54
/*
 * Print usage information
 */
void usage() {
    std::cout << "Usage:" << std::endl;
Micha Müller's avatar
Micha Müller committed
55
    std::cout << "  dcdbslurmjob [-b<host>] [-t<timestamp>] [-n<nodelist>] [-j<jobid>] [-i<userid>] start|stop" << std::endl;
56
    std::cout << "  dcdbslurmjob [-c<host>] [-u<username>] [-p<password>] [-t<timestamp>] [-n<nodelist>] [-j<jobid>] [-i<userid>] start|stop" << std::endl;
Alessio Netti's avatar
Alessio Netti committed
57
58
59
60
    std::cout << "  dcdbslurmjob -h" << std::endl;
    std::cout << std::endl;

    std::cout << "Options:" << std::endl;
Micha Müller's avatar
Micha Müller committed
61
    std::cout << "  -b<host>      MQTT broker              [default: 127.0.0.1:1883]" << std::endl;
62
63
64
    std::cout << "  -c<host>      Cassandra host           [default: 127.0.0.1:9042]" << std::endl;
    std::cout << "  -u<username>  Cassandra username       [default: none]" << std::endl;
    std::cout << "  -p<password>  Cassandra password       [default: none]" << std::endl;
Alessio Netti's avatar
Alessio Netti committed
65
66
67
68
69
70
71
    std::cout << "  -t<timestamp> Timestamp value          [default: now]" << std::endl;
    std::cout << "  -n<nodelist>  Comma-separated nodelist [default: SLURM_JOB_NODELIST]" << std::endl;
    std::cout << "  -j<jobid>     Numerical job id         [default: SLURM_JOB_ID var]" << std::endl;
    std::cout << "  -i<userid>    Numerical user id        [default: SLURM_JOB_USER var]" << std::endl;
    std::cout << std::endl;
    std::cout << "  -h            This help page" << std::endl;
    std::cout << std::endl;
72
    std::cout << "Options -b and -c|u|p are mutual exclusive! If both are specified, the latter takes precedence. By default MQTT broker is specified." << std::endl;
Alessio Netti's avatar
Alessio Netti committed
73
}
74
75
76
77

std::string getEnv(const char* var) {
    char* str = std::getenv(var);
    if (str != NULL) {
Alessio Netti's avatar
Alessio Netti committed
78
	    return std::string(str);
79
    } else {
Alessio Netti's avatar
Alessio Netti committed
80
81
82
83
84
85
86
87
88
89
90
	    return std::string("");
    }
}

void splitNodeList(const std::string& str, DCDB::NodeList& nl, char delim = ',')
{
    nl.clear();
    std::stringstream ss(str);
    std::string token;
    while (std::getline(ss, token, delim)) {
        nl.push_back(token);
91
92
93
    }
}

Micha Müller's avatar
Micha Müller committed
94
/**
95
96
97
 * Retrieves Slurm job data from environment variables and sends it to either a
 * CollectAgent or a Cassandra database. Job data can also be passed as command
 * line options.
Micha Müller's avatar
Micha Müller committed
98
 */
99
100
int main(int argc, char** argv) {
    std::cout << "dcdbslurmjob " << VERSION << std::endl << std::endl;
101
102
103
104
105
106
107

    bool                cassandra = false;
    DCDB::Connection *  dcdbConn = nullptr;
    DCDB::JobDataStore *myJobDataStore = nullptr;
    struct mosquitto *  _mosq = nullptr;

    std::string host = "127.0.0.1", cassandraPort = "9042", cassandraUser = "", cassandraPassword = "";
Micha Müller's avatar
Micha Müller committed
108
    int         brokerPort = 1883;
Alessio Netti's avatar
Alessio Netti committed
109
110
    std::string nodelist="", jobId="", userId="";
    uint64_t ts=0;
111
    
Alessio Netti's avatar
Alessio Netti committed
112
    // Defining options
113
    const char *opts = "b:c:u:p:n:t:j:i:h";
Alessio Netti's avatar
Alessio Netti committed
114
115
116
117
118
119
120
121
122
123
124

    char ret;
    while ((ret = getopt(argc, argv, opts))!=-1) {
        switch (ret)
        {
            case 'h':
                usage();
                return 0;
            default:
                break;
        }
125
126
    }
    
Alessio Netti's avatar
Alessio Netti committed
127
128
129
130
131
132
    if (argc < 2) {
        std::cerr << "At least one argument is required: start or stop" << std::endl;
        return 1;
    } else if(!boost::iequals(argv[argc-1], "start") && !boost::iequals(argv[argc-1], "stop")) {
        std::cerr << "Unsupported action: must either be start or stop" << std::endl;
        return 1;
133
134
    }
    
Alessio Netti's avatar
Alessio Netti committed
135
136
137
    optind = 1;
    while ((ret=getopt(argc, argv, opts))!=-1) {
        switch(ret) {
Micha Müller's avatar
Micha Müller committed
138
		case 'b': {
139
140
			cassandra = false;
			host = parseNetworkHost(optarg);
Micha Müller's avatar
Micha Müller committed
141
142
143
144
145
146
147
148
			std::string port = parseNetworkPort(optarg);
			if (port != "") {
				brokerPort = std::stoi(port);
			} else {
				brokerPort = 1883;
			}
			break;
		}
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
		case 'c':
			cassandra = true;
			host = parseNetworkHost(optarg);
			cassandraPort = parseNetworkPort(optarg);
			if (cassandraPort == "")
				cassandraPort = std::string("9042");
			break;
		case 'u':
			cassandra = true;
			cassandraUser = optarg;
			break;
		case 'p': {
			cassandra = true;
			cassandraPassword = optarg;
			// What does this do? Mask the password?
			size_t pwdLen = strlen(optarg);
			memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
			if (pwdLen > 3) {
				memset(optarg + 3, 0, pwdLen - 3);
			}
			break;
		}
Micha Müller's avatar
Micha Müller committed
171
	    case 'n':
Alessio Netti's avatar
Alessio Netti committed
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
                nodelist = optarg;
                break;
            case 't':
                ts = std::stoull(optarg);
                break;
            case 'j':
                jobId = optarg;
                break;
            case 'i':
                userId = optarg;
                break;
            case 'h':
            default:
                usage();
                return 1;
        }
    }

190
191
192
    if (cassandra) {
	    //Allocate and initialize connection to Cassandra.
	    dcdbConn = new DCDB::Connection(host, atoi(cassandraPort.c_str()), cassandraUser, cassandraPassword);
Micha Müller's avatar
Micha Müller committed
193

194
195
196
197
198
199
200
201
	    if (!dcdbConn->connect()) {
		    std::cerr << "Cannot connect to Cassandra!" << std::endl;
		    return 1;
	    }
	    myJobDataStore = new DCDB::JobDataStore(dcdbConn);
    } else {
	    //Initialize Mosquitto library and connect to broker
	    char hostname[256];
Micha Müller's avatar
Micha Müller committed
202

203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
	    if (gethostname(hostname, 255) != 0) {
		    std::cerr << "Cannot get hostname!" << std::endl;
		    return 1;
	    }
	    hostname[255] = '\0';
	    mosquitto_lib_init();
	    _mosq = mosquitto_new(hostname, false, NULL);
	    if (!_mosq) {
		    perror(NULL);
		    return 1;
	    }

	    if (mosquitto_connect(_mosq, host.c_str(), brokerPort, 1000) != MOSQ_ERR_SUCCESS) {
		    std::cerr << "Could not connect to MQTT broker " << host << ":" << std::to_string(brokerPort) << std::endl;
		    return 1;
	    }
Alessio Netti's avatar
Alessio Netti committed
219
    }
Micha Müller's avatar
Micha Müller committed
220
221

    //collect job data
Alessio Netti's avatar
Alessio Netti committed
222
    DCDB::JobData jd;
Micha Müller's avatar
Micha Müller committed
223
224
    int           retCode = 0;

Alessio Netti's avatar
Alessio Netti committed
225
226
227
228
229
230
231
232
233
    if(ts==0)
        ts = getTimestamp();
    if(jobId=="")
        jobId = getEnv("SLURM_JOB_ID");
    
    if (boost::iequals(argv[argc-1], "start")) {
        
        if(userId=="")
            userId = getEnv("SLURM_JOB_USER");
234
        if(nodelist=="") {
Alessio Netti's avatar
Alessio Netti committed
235
            nodelist = getEnv("SLURM_JOB_NODELIST");
236
237
238
239
240
	    if(nodelist=="") {
		nodelist = getEnv("SLURM_NODELIST");
	    }
	}

Alessio Netti's avatar
Alessio Netti committed
241
242
243
244
245
246
247
248
249
        std::cout << "JOBID = " << jobId << std::endl;
        std::cout << "USER  = " << userId << std::endl;
        std::cout << "START = " << ts << std::endl;
        std::cout << "NODES = " << nodelist << std::endl;
        
        DCDB::NodeList nl;
        splitNodeList(nodelist, nl, ',');
        
        try {
250
251
            jd.jobId     = jobId;
            jd.userId    = userId;
Alessio Netti's avatar
Alessio Netti committed
252
253
254
255
            jd.startTime = DCDB::TimeStamp(ts);
            jd.endTime   = DCDB::TimeStamp((uint64_t)0);
            jd.nodes     = nl;
        } catch(const std::invalid_argument& e) {
256
257
258
259
260
261
262
263
264
		std::cerr << "Invalid input format!" << std::endl;
		retCode = 1;
		goto exit;
	}

	if (cassandra && (myJobDataStore->insertJob(jd) != DCDB::JD_OK)) {
		std::cerr << "Job data insert failed!" << std::endl;
		retCode = 1;
		goto exit;
Micha Müller's avatar
Micha Müller committed
265
	}
Alessio Netti's avatar
Alessio Netti committed
266
267
268
269
270
271
    } else if (boost::iequals(argv[argc-1], "stop")) {
        
        std::cout << "JOBID = " << jobId << std::endl;
        std::cout << "STOP  = " << ts << std::endl;
        
        try {
Micha Müller's avatar
Micha Müller committed
272
273
		jd.jobId = jobId;
		jd.endTime = DCDB::TimeStamp(ts);
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
	} catch (const std::invalid_argument &e) {
		std::cerr << "Invalid input format!" << std::endl;
		retCode = 1;
		goto exit;
	}

	if (cassandra) {
		DCDB::JobData jobStart;
		if (myJobDataStore->getJobById(jobStart, jd.jobId) != DCDB::JD_OK) {
			std::cerr << "Could not retrieve job to be updated!" << std::endl;
			retCode = 1;
			goto exit;
		}
		if (myJobDataStore->updateEndtime(jobStart.jobId, jobStart.startTime, jd.endTime) != DCDB::JD_OK) {
			std::cerr << "Could not update end time of job!" << std::endl;
			retCode = 1;
			goto exit;
		}
Micha Müller's avatar
Micha Müller committed
292
293
294
	}
    }

295
296
297
298
    //Message sent to CollectAgent is independent of start/stop. We send the
    //same JSON in either case. CA does job insert or update depending
    //on job endtime value.
    if (!cassandra) {
Micha Müller's avatar
Micha Müller committed
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
	    //create job data string in JSON format
	    std::string                 payload = "";
	    std::string                 topic = "/DCDB_JOBDATA/"; //do not change or keep in sync with simplemqttservermessage.h
	    boost::property_tree::ptree config;
	    std::ostringstream          output;
	    config.clear();
	    config.push_back(boost::property_tree::ptree::value_type("jobid", boost::property_tree::ptree(jd.jobId)));
	    config.push_back(boost::property_tree::ptree::value_type("userid", boost::property_tree::ptree(jd.userId)));
	    config.push_back(boost::property_tree::ptree::value_type("starttime", boost::property_tree::ptree(std::to_string(jd.startTime.getRaw()))));
	    config.push_back(boost::property_tree::ptree::value_type("endtime", boost::property_tree::ptree(std::to_string(jd.endTime.getRaw()))));
	    boost::property_tree::ptree nodes;
	    for (const auto &n : jd.nodes) {
		    nodes.push_back(boost::property_tree::ptree::value_type("", boost::property_tree::ptree(n)));
	    }
	    config.push_back(boost::property_tree::ptree::value_type("nodes", nodes));
	    boost::property_tree::write_json(output, config, true);
	    payload = output.str();

	    //std::cout << "Payload:\n" << payload << std::endl;
318
319
320
	    
	    mosquitto_publish_callback_set(_mosq, publishCallback);
	    uint64_t startTs = getTimestamp();
Micha Müller's avatar
Micha Müller committed
321
	    //send it to broker
322
	    if (mosquitto_publish(_mosq, &msgId, topic.c_str(), payload.length(), payload.c_str(), 1, false) != MOSQ_ERR_SUCCESS) {
323
		    std::cerr << "Broker not reachable! Job data was not published." << std::endl;
Micha Müller's avatar
Micha Müller committed
324
325
326
		    retCode = 1;
		    goto exit;
	    }
327
328
329
330
331
332
333
334
	    
	    do {
			if (mosquitto_loop(_mosq, -1, 1) != MOSQ_ERR_SUCCESS) {
				std::cerr << "Error in mosquitto_loop!" << std::endl;
				retCode = 1;
				goto exit;
			}
		} while(!done && getTimestamp() - startTs < SLURMJOBTIMEOUT);
Alessio Netti's avatar
Alessio Netti committed
335
336
    }

Micha Müller's avatar
Micha Müller committed
337
338
//hasta la vista
exit:
339
340
341
342
343
344
345
346
347
	if (cassandra) {
		delete myJobDataStore;
		dcdbConn->disconnect();
		delete dcdbConn;
	} else {
		mosquitto_disconnect(_mosq);
		mosquitto_destroy(_mosq);
		mosquitto_lib_cleanup();
	}
Micha Müller's avatar
Micha Müller committed
348
	return retCode;
349
}