dcdbslurmjob.cpp 13.5 KB
Newer Older
1
2
//================================================================================
// Name        : dcdbslurmjob.cpp
Micha Müller's avatar
Micha Müller committed
3
// Author      : Michael Ott, Micha Mueller
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Copyright   : Leibniz Supercomputing Centre
// Description : Main file of the dcdbslurmjob command line utility
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2011-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================

Micha Müller's avatar
Micha Müller committed
27
#include "../../common/include/globalconfiguration.h"
28
#include "timestamp.h"
Micha Müller's avatar
Micha Müller committed
29
#include <boost/algorithm/string.hpp>
30
#include <boost/regex.hpp>
Micha Müller's avatar
Micha Müller committed
31
32
33
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>
#include <cstdlib>
34
#include <dcdb/connection.h>
Alessio Netti's avatar
Alessio Netti committed
35
#include <dcdb/jobdatastore.h>
Micha Müller's avatar
Micha Müller committed
36
37
#include <iostream>
#include <mosquitto.h>
38
39
40
#include "dcdb/version.h"
#include "version.h"

Alessio Netti's avatar
Alessio Netti committed
41

42
43
44
45
46
47
48
49
50
51
52
53
#define SLURMJOBTIMEOUT 60000000000


int msgId = -1;
bool done = false;

void publishCallback(struct mosquitto *mosq, void *obj, int mid) {
	if(msgId != -1 && mid == msgId)
		done = true;
}


Alessio Netti's avatar
Alessio Netti committed
54
55
56
57
58
/*
 * Print usage information
 */
void usage() {
    std::cout << "Usage:" << std::endl;
Micha Müller's avatar
Micha Müller committed
59
    std::cout << "  dcdbslurmjob [-b<host>] [-t<timestamp>] [-n<nodelist>] [-j<jobid>] [-i<userid>] start|stop" << std::endl;
60
    std::cout << "  dcdbslurmjob [-c<host>] [-u<username>] [-p<password>] [-t<timestamp>] [-n<nodelist>] [-j<jobid>] [-i<userid>] [-s<pattern>] start|stop" << std::endl;
Alessio Netti's avatar
Alessio Netti committed
61
62
63
64
    std::cout << "  dcdbslurmjob -h" << std::endl;
    std::cout << std::endl;

    std::cout << "Options:" << std::endl;
65
66
67
68
69
70
71
72
73
    std::cout << "  -b<host>      MQTT broker                    [default: 127.0.0.1:1883]" << std::endl;
    std::cout << "  -c<host>      Cassandra host                 [default: 127.0.0.1:9042]" << std::endl;
    std::cout << "  -u<username>  Cassandra username             [default: none]" << std::endl;
    std::cout << "  -p<password>  Cassandra password             [default: none]" << std::endl;
    std::cout << "  -t<timestamp> Timestamp value                [default: now]" << std::endl;
    std::cout << "  -n<nodelist>  Comma-separated nodelist       [default: SLURM_JOB_NODELIST]" << std::endl;
    std::cout << "  -j<jobid>     Numerical job id               [default: SLURM_JOB_ID var]" << std::endl;
    std::cout << "  -i<userid>    Numerical user id              [default: SLURM_JOB_USER var]" << std::endl;
    std::cout << "  -s<pattern>   Nodelist substitution pattern  [default: none]" << std::endl;
Alessio Netti's avatar
Alessio Netti committed
74
75
76
    std::cout << std::endl;
    std::cout << "  -h            This help page" << std::endl;
    std::cout << std::endl;
77
    std::cout << "Options -b and -c|u|p are mutual exclusive! If both are specified, the latter takes precedence. By default MQTT broker is specified." << std::endl;
Alessio Netti's avatar
Alessio Netti committed
78
}
79
80
81
82

std::string getEnv(const char* var) {
    char* str = std::getenv(var);
    if (str != NULL) {
Alessio Netti's avatar
Alessio Netti committed
83
	    return std::string(str);
84
    } else {
Alessio Netti's avatar
Alessio Netti committed
85
86
87
88
	    return std::string("");
    }
}

89
void splitNodeList(const std::string& str, DCDB::NodeList& nl)
Alessio Netti's avatar
Alessio Netti committed
90
91
{
    nl.clear();
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
    std::string s1 = str;
    boost::regex r1("([^,[]+)(\\[[0-9,-]+\\])?(,|$)", boost::regex::extended);
    boost::smatch m1;
    while (boost::regex_search(s1, m1, r1)) {
	std::string hostBase = m1[1].str();
	
	if (m1[2].str().size() == 0) {
	    nl.push_back(hostBase);
	} else {
	    std::string s2 = m1[2].str();
	    boost::regex r2("([0-9]+)-?([0-9]+)?(,|\\])", boost::regex::extended);
	    boost::smatch m2;
	    while (boost::regex_search(s2, m2, r2)) {
		if (m2[2] == "") {
		    nl.push_back(hostBase + m2[1].str());
		} else {
		    int start = atoi(m2[1].str().c_str());
		    int stop = atoi(m2[2].str().c_str());
		    for (int i=start; i<=stop; i++) {
			std::stringstream ss;
			ss << std::setw(m2[2].str().length()) << std::setfill('0') << i;
			nl.push_back(hostBase + ss.str());
		    }
		}
		s2 = m2.suffix().str();
	    }
	}
	s1 = m1.suffix().str();
120
121
122
    }
}

123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
void convertNodeList(DCDB::NodeList& nl, std::string substitution) {
    //check if input has sed format of "s/.../.../" for substitution
    boost::regex  checkSubstitute("s([^\\\\]{1})([\\S|\\s]*)\\1([\\S|\\s]*)\\1");
    boost::smatch matchResults;
    
    if (regex_match(substitution, matchResults, checkSubstitute)) {
	//input has substitute format
	boost::regex re = (boost::regex(matchResults[2].str(), boost::regex_constants::extended));
	std::string fmt = matchResults[3].str();
	for (auto &n: nl) {
	     n = boost::regex_replace(n, re, fmt);
	     //std::cout << n <<" => " << mqtt << std::endl;
	 }
    }
}

Micha Müller's avatar
Micha Müller committed
139
/**
140
141
142
 * Retrieves Slurm job data from environment variables and sends it to either a
 * CollectAgent or a Cassandra database. Job data can also be passed as command
 * line options.
Micha Müller's avatar
Micha Müller committed
143
 */
144
145
int main(int argc, char** argv) {
    std::cout << "dcdbslurmjob " << VERSION << std::endl << std::endl;
146
147
148
149
150
151
152

    bool                cassandra = false;
    DCDB::Connection *  dcdbConn = nullptr;
    DCDB::JobDataStore *myJobDataStore = nullptr;
    struct mosquitto *  _mosq = nullptr;

    std::string host = "127.0.0.1", cassandraPort = "9042", cassandraUser = "", cassandraPassword = "";
Micha Müller's avatar
Micha Müller committed
153
    int         brokerPort = 1883;
Alessio Netti's avatar
Alessio Netti committed
154
    std::string nodelist="", jobId="", userId="";
155
    std::string substitution="";
Alessio Netti's avatar
Alessio Netti committed
156
    uint64_t ts=0;
157
    
Alessio Netti's avatar
Alessio Netti committed
158
    // Defining options
159
    const char *opts = "b:c:u:p:n:t:j:i:s:h";
Alessio Netti's avatar
Alessio Netti committed
160
161
162
163
164
165
166
167
168
169
170

    char ret;
    while ((ret = getopt(argc, argv, opts))!=-1) {
        switch (ret)
        {
            case 'h':
                usage();
                return 0;
            default:
                break;
        }
171
172
    }
    
Alessio Netti's avatar
Alessio Netti committed
173
174
175
176
177
178
    if (argc < 2) {
        std::cerr << "At least one argument is required: start or stop" << std::endl;
        return 1;
    } else if(!boost::iequals(argv[argc-1], "start") && !boost::iequals(argv[argc-1], "stop")) {
        std::cerr << "Unsupported action: must either be start or stop" << std::endl;
        return 1;
179
180
    }
    
Alessio Netti's avatar
Alessio Netti committed
181
182
183
    optind = 1;
    while ((ret=getopt(argc, argv, opts))!=-1) {
        switch(ret) {
Micha Müller's avatar
Micha Müller committed
184
		case 'b': {
185
186
			cassandra = false;
			host = parseNetworkHost(optarg);
Micha Müller's avatar
Micha Müller committed
187
188
189
190
191
192
193
194
			std::string port = parseNetworkPort(optarg);
			if (port != "") {
				brokerPort = std::stoi(port);
			} else {
				brokerPort = 1883;
			}
			break;
		}
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
		case 'c':
			cassandra = true;
			host = parseNetworkHost(optarg);
			cassandraPort = parseNetworkPort(optarg);
			if (cassandraPort == "")
				cassandraPort = std::string("9042");
			break;
		case 'u':
			cassandra = true;
			cassandraUser = optarg;
			break;
		case 'p': {
			cassandra = true;
			cassandraPassword = optarg;
			// What does this do? Mask the password?
			size_t pwdLen = strlen(optarg);
			memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
			if (pwdLen > 3) {
				memset(optarg + 3, 0, pwdLen - 3);
			}
			break;
		}
Micha Müller's avatar
Micha Müller committed
217
	    case 'n':
Alessio Netti's avatar
Alessio Netti committed
218
219
220
221
222
223
224
225
226
227
228
                nodelist = optarg;
                break;
            case 't':
                ts = std::stoull(optarg);
                break;
            case 'j':
                jobId = optarg;
                break;
            case 'i':
                userId = optarg;
                break;
229
	    case 's':
230
231
232
233
		substitution = optarg;
		if (substitution == "SNG") {
			substitution = "s%([fi][0-9]{2})(r[0-9]{2})(c[0-9]{2})(s[0-9]{2})%/sng/\\1/\\2/\\3/\\4%";
		}
234
		break;
Alessio Netti's avatar
Alessio Netti committed
235
236
237
238
239
240
241
            case 'h':
            default:
                usage();
                return 1;
        }
    }

242
243
244
    if (cassandra) {
	    //Allocate and initialize connection to Cassandra.
	    dcdbConn = new DCDB::Connection(host, atoi(cassandraPort.c_str()), cassandraUser, cassandraPassword);
Micha Müller's avatar
Micha Müller committed
245

246
247
248
249
250
251
252
253
	    if (!dcdbConn->connect()) {
		    std::cerr << "Cannot connect to Cassandra!" << std::endl;
		    return 1;
	    }
	    myJobDataStore = new DCDB::JobDataStore(dcdbConn);
    } else {
	    //Initialize Mosquitto library and connect to broker
	    char hostname[256];
Micha Müller's avatar
Micha Müller committed
254

255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
	    if (gethostname(hostname, 255) != 0) {
		    std::cerr << "Cannot get hostname!" << std::endl;
		    return 1;
	    }
	    hostname[255] = '\0';
	    mosquitto_lib_init();
	    _mosq = mosquitto_new(hostname, false, NULL);
	    if (!_mosq) {
		    perror(NULL);
		    return 1;
	    }

	    if (mosquitto_connect(_mosq, host.c_str(), brokerPort, 1000) != MOSQ_ERR_SUCCESS) {
		    std::cerr << "Could not connect to MQTT broker " << host << ":" << std::to_string(brokerPort) << std::endl;
		    return 1;
	    }
Alessio Netti's avatar
Alessio Netti committed
271
    }
Micha Müller's avatar
Micha Müller committed
272
273

    //collect job data
Alessio Netti's avatar
Alessio Netti committed
274
    DCDB::JobData jd;
Micha Müller's avatar
Micha Müller committed
275
276
    int           retCode = 0;

Alessio Netti's avatar
Alessio Netti committed
277
278
    if(ts==0)
        ts = getTimestamp();
279
    if(jobId=="") {
Alessio Netti's avatar
Alessio Netti committed
280
        jobId = getEnv("SLURM_JOB_ID");
281
282
283
284
	if(jobId=="") {
	    jobId = getEnv("SLURM_JOBID");
	}
    }
Alessio Netti's avatar
Alessio Netti committed
285
286
287
288
289
    
    if (boost::iequals(argv[argc-1], "start")) {
        
        if(userId=="")
            userId = getEnv("SLURM_JOB_USER");
290
        if(nodelist=="") {
Alessio Netti's avatar
Alessio Netti committed
291
            nodelist = getEnv("SLURM_JOB_NODELIST");
292
293
294
295
296
	    if(nodelist=="") {
		nodelist = getEnv("SLURM_NODELIST");
	    }
	}

297
298
	DCDB::NodeList nl;
	splitNodeList(nodelist, nl);
299
	convertNodeList(nl, substitution);
300
301
302
303
304

	std::cout << "JOBID    = " << jobId << std::endl;
        std::cout << "USER     = " << userId << std::endl;
        std::cout << "START    = " << ts << std::endl;
        std::cout << "NODELIST = " << nodelist << std::endl;
305
	std::cout << "SUBST    = " << substitution << std::endl;
306
307
308
309
310
311
	std::cout << "NODES    =";
	for (auto &n: nl) {
	    std::cout << " " << n;
	}
	std::cout << std::endl;

312
313
	
	
Alessio Netti's avatar
Alessio Netti committed
314
        try {
315
316
            jd.jobId     = jobId;
            jd.userId    = userId;
Alessio Netti's avatar
Alessio Netti committed
317
318
319
320
            jd.startTime = DCDB::TimeStamp(ts);
            jd.endTime   = DCDB::TimeStamp((uint64_t)0);
            jd.nodes     = nl;
        } catch(const std::invalid_argument& e) {
321
322
323
324
325
326
327
328
329
		std::cerr << "Invalid input format!" << std::endl;
		retCode = 1;
		goto exit;
	}

	if (cassandra && (myJobDataStore->insertJob(jd) != DCDB::JD_OK)) {
		std::cerr << "Job data insert failed!" << std::endl;
		retCode = 1;
		goto exit;
Micha Müller's avatar
Micha Müller committed
330
	}
Alessio Netti's avatar
Alessio Netti committed
331
332
333
334
335
336
    } else if (boost::iequals(argv[argc-1], "stop")) {
        
        std::cout << "JOBID = " << jobId << std::endl;
        std::cout << "STOP  = " << ts << std::endl;
        
        try {
Micha Müller's avatar
Micha Müller committed
337
338
		jd.jobId = jobId;
		jd.endTime = DCDB::TimeStamp(ts);
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
	} catch (const std::invalid_argument &e) {
		std::cerr << "Invalid input format!" << std::endl;
		retCode = 1;
		goto exit;
	}

	if (cassandra) {
		DCDB::JobData jobStart;
		if (myJobDataStore->getJobById(jobStart, jd.jobId) != DCDB::JD_OK) {
			std::cerr << "Could not retrieve job to be updated!" << std::endl;
			retCode = 1;
			goto exit;
		}
		if (myJobDataStore->updateEndtime(jobStart.jobId, jobStart.startTime, jd.endTime) != DCDB::JD_OK) {
			std::cerr << "Could not update end time of job!" << std::endl;
			retCode = 1;
			goto exit;
		}
Micha Müller's avatar
Micha Müller committed
357
358
359
	}
    }

360
361
362
363
    //Message sent to CollectAgent is independent of start/stop. We send the
    //same JSON in either case. CA does job insert or update depending
    //on job endtime value.
    if (!cassandra) {
Micha Müller's avatar
Micha Müller committed
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
	    //create job data string in JSON format
	    std::string                 payload = "";
	    std::string                 topic = "/DCDB_JOBDATA/"; //do not change or keep in sync with simplemqttservermessage.h
	    boost::property_tree::ptree config;
	    std::ostringstream          output;
	    config.clear();
	    config.push_back(boost::property_tree::ptree::value_type("jobid", boost::property_tree::ptree(jd.jobId)));
	    config.push_back(boost::property_tree::ptree::value_type("userid", boost::property_tree::ptree(jd.userId)));
	    config.push_back(boost::property_tree::ptree::value_type("starttime", boost::property_tree::ptree(std::to_string(jd.startTime.getRaw()))));
	    config.push_back(boost::property_tree::ptree::value_type("endtime", boost::property_tree::ptree(std::to_string(jd.endTime.getRaw()))));
	    boost::property_tree::ptree nodes;
	    for (const auto &n : jd.nodes) {
		    nodes.push_back(boost::property_tree::ptree::value_type("", boost::property_tree::ptree(n)));
	    }
	    config.push_back(boost::property_tree::ptree::value_type("nodes", nodes));
	    boost::property_tree::write_json(output, config, true);
	    payload = output.str();

	    //std::cout << "Payload:\n" << payload << std::endl;
383
384
385
	    
	    mosquitto_publish_callback_set(_mosq, publishCallback);
	    uint64_t startTs = getTimestamp();
Micha Müller's avatar
Micha Müller committed
386
	    //send it to broker
387
	    if (mosquitto_publish(_mosq, &msgId, topic.c_str(), payload.length(), payload.c_str(), 1, false) != MOSQ_ERR_SUCCESS) {
388
		    std::cerr << "Broker not reachable! Job data was not published." << std::endl;
Micha Müller's avatar
Micha Müller committed
389
390
391
		    retCode = 1;
		    goto exit;
	    }
392
393
394
395
396
397
398
399
	    
	    do {
			if (mosquitto_loop(_mosq, -1, 1) != MOSQ_ERR_SUCCESS) {
				std::cerr << "Error in mosquitto_loop!" << std::endl;
				retCode = 1;
				goto exit;
			}
		} while(!done && getTimestamp() - startTs < SLURMJOBTIMEOUT);
Alessio Netti's avatar
Alessio Netti committed
400
401
    }

Micha Müller's avatar
Micha Müller committed
402
403
//hasta la vista
exit:
404
405
406
407
408
409
410
411
412
	if (cassandra) {
		delete myJobDataStore;
		dcdbConn->disconnect();
		delete dcdbConn;
	} else {
		mosquitto_disconnect(_mosq);
		mosquitto_destroy(_mosq);
		mosquitto_lib_cleanup();
	}
Micha Müller's avatar
Micha Müller committed
413
	return retCode;
414
}