dcdbslurmjob.cpp 19.6 KB
Newer Older
1
2
//================================================================================
// Name        : dcdbslurmjob.cpp
Micha Müller's avatar
Micha Müller committed
3
// Author      : Michael Ott, Micha Mueller
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Copyright   : Leibniz Supercomputing Centre
// Description : Main file of the dcdbslurmjob command line utility
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2011-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================

Micha Müller's avatar
Micha Müller committed
27
#include "../../common/include/globalconfiguration.h"
28
#include "timestamp.h"
Micha Müller's avatar
Micha Müller committed
29
#include <boost/algorithm/string.hpp>
30
#include <boost/regex.hpp>
Micha Müller's avatar
Micha Müller committed
31
32
33
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>
#include <cstdlib>
34
#include <dcdb/connection.h>
Alessio Netti's avatar
Alessio Netti committed
35
#include <dcdb/jobdatastore.h>
Micha Müller's avatar
Micha Müller committed
36
37
#include <iostream>
#include <mosquitto.h>
38
39
#include "dcdb/version.h"
#include "version.h"
40
41
#include "fcntl.h"
#include "unistd.h"
42

43
44
#define SLURM_JOBSTEP_SEP "."

45
46
47
int msgId = -1;
bool done = false;

48
49
50
51
52
53
DCDB::Connection *  dcdbConn = nullptr;
DCDB::JobDataStore *myJobDataStore = nullptr;
struct mosquitto *  myMosq = nullptr;
int timeout = 10;
int qos = 1;

54
55
56
57
58
void publishCallback(struct mosquitto *mosq, void *obj, int mid) {
	if(msgId != -1 && mid == msgId)
		done = true;
}

59
60
61
62
63
64
65
66
67
68
69
70
71
72
// Re-opens STDIN, STDOUT or STDERR to /dev/null if they are closed.
// Prevents libuv from making dcdbslurmjob crash when using Cassandra.
void fixFileDescriptors() {
    std::string stdArr[3] = {"STDIN", "STDOUT", "STDERR"};
    int fd = -1;
    for(int idx=0; idx<=2; idx++) {
        if(fcntl(idx, F_GETFD) < 0) {
            std::cerr << "Warning: detected closed " << stdArr[idx] << " channel. Fixing..." << std::endl;
            if((fd=open("/dev/null", O_RDWR)) < 0 || dup2(fd, idx) < 0) {
                std::cerr << "Error: cannot re-open " << stdArr[idx] << " channel." << std::endl;
            }
        }
    }
}
73

Alessio Netti's avatar
Alessio Netti committed
74
75
76
77
78
/*
 * Print usage information
 */
void usage() {
    std::cout << "Usage:" << std::endl;
79
    std::cout << "  dcdbslurmjob [-b<host>] [-t<timestamp>] [-n<nodelist>] [-d<domainid>] [-j<jobid>] [-i<userid>] start|stop" << std::endl;
80
    std::cout << "  dcdbslurmjob [-c<host>] [-u<username>] [-p<password>] [-t<timestamp>] [-n<nodelist>] [-j<jobid>] [-i<userid>] [-s<pattern>] start|stop" << std::endl;
Alessio Netti's avatar
Alessio Netti committed
81
82
83
84
    std::cout << "  dcdbslurmjob -h" << std::endl;
    std::cout << std::endl;

    std::cout << "Options:" << std::endl;
85
    std::cout << "  -b<hosts>     List of MQTT brokers           [default: localhost:1883]" << std::endl;
Michael Ott's avatar
Michael Ott committed
86
    std::cout << "  -q<qos>       MQTT QoS to use                [default: 1]" << std::endl;
87
    std::cout << "  -o<timeout>   MQTT timeout in seconds        [default: 10]" << std::endl;
88
    std::cout << "  -c<hosts>     List of Cassandra hosts        [default: none]" << std::endl;
89
90
91
92
    std::cout << "  -u<username>  Cassandra username             [default: none]" << std::endl;
    std::cout << "  -p<password>  Cassandra password             [default: none]" << std::endl;
    std::cout << "  -t<timestamp> Timestamp value                [default: now]" << std::endl;
    std::cout << "  -n<nodelist>  Comma-separated nodelist       [default: SLURM_JOB_NODELIST]" << std::endl;
93
94
    std::cout << "  -d<domainid>  Job domain id                  [default: default]" << std::endl;
    std::cout << "  -j<jobid>     String job id                  [default: SLURM_JOB_ID var]" << std::endl;
95
96
    std::cout << "  -i<userid>    Numerical user id              [default: SLURM_JOB_USER var]" << std::endl;
    std::cout << "  -s<pattern>   Nodelist substitution pattern  [default: none]" << std::endl;
97
    std::cout << "  -m<pattern>   Maximum job length in h        [default: none]" << std::endl;
98
    std::cout << "  -f            Force job insert/update        [default: no]" << std::endl;
Alessio Netti's avatar
Alessio Netti committed
99
100
101
    std::cout << std::endl;
    std::cout << "  -h            This help page" << std::endl;
    std::cout << std::endl;
102
    std::cout << "Options -b and -c|u|p are mutual exclusive! If both are specified, the latter takes precedence. By default MQTT broker is specified." << std::endl;
Alessio Netti's avatar
Alessio Netti committed
103
}
104
105
106
107

std::string getEnv(const char* var) {
    char* str = std::getenv(var);
    if (str != NULL) {
Alessio Netti's avatar
Alessio Netti committed
108
	    return std::string(str);
109
    } else {
Alessio Netti's avatar
Alessio Netti committed
110
111
112
113
	    return std::string("");
    }
}

114
void splitNodeList(const std::string& str, DCDB::NodeList& nl)
Alessio Netti's avatar
Alessio Netti committed
115
116
{
    nl.clear();
117
118
119
120
    std::string s1 = str;
    boost::regex r1("([^,[]+)(\\[[0-9,-]+\\])?(,|$)", boost::regex::extended);
    boost::smatch m1;
    while (boost::regex_search(s1, m1, r1)) {
121
122
123
124
		std::string hostBase = m1[1].str();
		
		if (m1[2].str().size() == 0) {
			nl.push_back(hostBase);
125
		} else {
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
			std::string s2 = m1[2].str();
			boost::regex r2("([0-9]+)-?([0-9]+)?(,|\\])", boost::regex::extended);
			boost::smatch m2;
			while (boost::regex_search(s2, m2, r2)) {
				if (m2[2] == "") {
					nl.push_back(hostBase + m2[1].str());
				} else {
					int start = atoi(m2[1].str().c_str());
					int stop = atoi(m2[2].str().c_str());
					for (int i=start; i<=stop; i++) {
						std::stringstream ss;
						ss << std::setw(m2[2].str().length()) << std::setfill('0') << i;
						nl.push_back(hostBase + ss.str());
					}
				}
				s2 = m2.suffix().str();
			}
143
		}
144
		s1 = m1.suffix().str();
145
146
147
    }
}

148
149
150
151
152
153
void convertNodeList(DCDB::NodeList& nl, std::string substitution) {
    //check if input has sed format of "s/.../.../" for substitution
    boost::regex  checkSubstitute("s([^\\\\]{1})([\\S|\\s]*)\\1([\\S|\\s]*)\\1");
    boost::smatch matchResults;
    
    if (regex_match(substitution, matchResults, checkSubstitute)) {
154
155
156
157
158
159
160
		//input has substitute format
		boost::regex re = (boost::regex(matchResults[2].str(), boost::regex_constants::extended));
		std::string fmt = matchResults[3].str();
		for (auto &n: nl) {
			 n = boost::regex_replace(n, re, fmt);
			 //std::cout << n <<" => " << mqtt << std::endl;
		}
161
162
163
    }
}

164
165
166
167
168
169
void splitHostList(const std::string& str, std::vector<std::string>& hl, char delim = ',')
{
    hl.clear();
    std::stringstream ss(str);
    std::string token;
    while (std::getline(ss, token, delim)) {
170
		hl.push_back(token);
171
172
173
    }
}

174
void pickRandomHost(std::vector<std::string>& hl, std::string& host, int& port, bool erase = false) {
175
176
177
178
    srand (time(NULL));
    int n = rand() % hl.size();
    host = parseNetworkHost(hl[n]);
    port = atoi(parseNetworkPort(hl[n]).c_str());
179
    if (erase) {
180
		hl.erase(hl.begin()+n);
181
    }
182
183
}

184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
int insertJob(DCDB::JobData job, bool start, bool cassandra) {
    // Cassandra-based job insertion
    if (cassandra) {
        if (start) {
            if(myJobDataStore->insertJob(job) != DCDB::JD_OK) {
                std::cerr << "Job data insert for job " << job.jobId << " failed!" << std::endl;
                return 1;
            }
        }
        else {
            DCDB::JobData jobStart;
            if (myJobDataStore->getJobById(jobStart, job.jobId, job.domainId) != DCDB::JD_OK) {
                std::cerr << "Could not retrieve job " << job.jobId << " to be updated!" << std::endl;
                return 1;
            }
            if (myJobDataStore->updateEndtime(jobStart.jobId, jobStart.startTime, job.endTime, job.domainId) != DCDB::JD_OK) {
                std::cerr << "Could not update end time of job " << job.jobId << "!" << std::endl;
                return 1;
            }
        }
    // MQTT-based job insertion
    } else {
        // Create job data string in JSON format
        std::string                 payload = "";
        std::string                 topic = "/DCDB_JOBDATA/"; // Do not change or keep in sync with simplemqttservermessage.h
        boost::property_tree::ptree config;
        std::ostringstream          output;
        config.clear();
        config.push_back(boost::property_tree::ptree::value_type("domainid", boost::property_tree::ptree(job.domainId)));
        config.push_back(boost::property_tree::ptree::value_type("jobid", boost::property_tree::ptree(job.jobId)));
        config.push_back(boost::property_tree::ptree::value_type("userid", boost::property_tree::ptree(job.userId)));
        config.push_back(boost::property_tree::ptree::value_type("starttime", boost::property_tree::ptree(std::to_string(job.startTime.getRaw()))));
        config.push_back(boost::property_tree::ptree::value_type("endtime", boost::property_tree::ptree(std::to_string(job.endTime.getRaw()))));
        boost::property_tree::ptree nodes;
        for (const auto &n : job.nodes) {
            nodes.push_back(boost::property_tree::ptree::value_type("", boost::property_tree::ptree(n)));
        }
        
        config.push_back(boost::property_tree::ptree::value_type("nodes", nodes));
        boost::property_tree::write_json(output, config, true);
        payload = output.str();
        //std::cout << "Payload:\n" << payload << std::endl;
        mosquitto_publish_callback_set(myMosq, publishCallback);
        uint64_t startTs = getTimestamp();
        msgId = -1;
        done = false;
        int ret = MOSQ_ERR_UNKNOWN;
        
        // Message sent to CollectAgent is independent of start/stop. We send the
        // same JSON in either case. CA does job insert or update depending
        // on job endtime value.
        if ((ret = mosquitto_publish(myMosq, &msgId, topic.c_str(), payload.length(), payload.c_str(), qos, false)) != MOSQ_ERR_SUCCESS) {
            std::cerr << "Could not publish data for job " << job.jobId << " via MQTT: " << mosquitto_strerror(ret) << std::endl;
            return 1;
        }
        do {
            if ((ret = mosquitto_loop(myMosq, -1, 1)) != MOSQ_ERR_SUCCESS) {
                std::cerr << "Error in mosquitto_loop for job " << job.jobId << ": " << mosquitto_strerror(ret) << std::endl;
                return 1;
            }
        } while(!done && getTimestamp() - startTs < (uint64_t)S_TO_NS(timeout));
    }
    return true;
}


Micha Müller's avatar
Micha Müller committed
250
/**
251
252
253
 * Retrieves Slurm job data from environment variables and sends it to either a
 * CollectAgent or a Cassandra database. Job data can also be passed as command
 * line options.
Micha Müller's avatar
Micha Müller committed
254
 */
255
256
int main(int argc, char** argv) {
    std::cout << "dcdbslurmjob " << VERSION << std::endl << std::endl;
257

258
    bool cassandra = false;
259
    
260
261
    std::vector<std::string> hostList;
    std::string host = "", cassandraUser = "", cassandraPassword = "";
262
    int port;
263
    std::string nodelist="", pnodelist="", jobId="", userId="", stepId="", packId="", taskId="";
264
    std::string domainId = JOB_DEFAULT_DOMAIN;
265
    std::string substitution="";
266
    int maxJobLength = -1;
267
    bool force = false;
Alessio Netti's avatar
Alessio Netti committed
268
    uint64_t ts=0;
269
    
Alessio Netti's avatar
Alessio Netti committed
270
    // Defining options
271
    const char *opts = "b:q:o:c:u:p:n:t:d:j:i:s:m:fh";
Alessio Netti's avatar
Alessio Netti committed
272
273
274
275
276
277
278
279
280
281
282

    char ret;
    while ((ret = getopt(argc, argv, opts))!=-1) {
        switch (ret)
        {
            case 'h':
                usage();
                return 0;
            default:
                break;
        }
283
284
    }
    
Alessio Netti's avatar
Alessio Netti committed
285
286
287
288
289
290
    if (argc < 2) {
        std::cerr << "At least one argument is required: start or stop" << std::endl;
        return 1;
    } else if(!boost::iequals(argv[argc-1], "start") && !boost::iequals(argv[argc-1], "stop")) {
        std::cerr << "Unsupported action: must either be start or stop" << std::endl;
        return 1;
291
292
    }
    
Alessio Netti's avatar
Alessio Netti committed
293
294
295
    optind = 1;
    while ((ret=getopt(argc, argv, opts))!=-1) {
        switch(ret) {
Micha Müller's avatar
Micha Müller committed
296
		case 'b': {
297
			cassandra = false;
298
			splitHostList(optarg, hostList);
Micha Müller's avatar
Micha Müller committed
299
300
			break;
		}
301
302
303
		case 'q':
			qos = atoi(optarg);
			break;
304
305
306
		case 'o':
			timeout = atoi(optarg);
			break;
307
308
		case 'c':
			cassandra = true;
309
			splitHostList(optarg, hostList);
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
			break;
		case 'u':
			cassandra = true;
			cassandraUser = optarg;
			break;
		case 'p': {
			cassandra = true;
			cassandraPassword = optarg;
			// What does this do? Mask the password?
			size_t pwdLen = strlen(optarg);
			memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
			if (pwdLen > 3) {
				memset(optarg + 3, 0, pwdLen - 3);
			}
			break;
		}
Micha Müller's avatar
Micha Müller committed
326
	    case 'n':
327
328
329
330
331
			nodelist = optarg;
			break;
		case 't':
			ts = std::stoull(optarg);
			break;
332
333
334
        case 'd':
            domainId = optarg;
            break;
335
336
337
338
339
340
		case 'j':
			jobId = optarg;
			break;
		case 'i':
			userId = optarg;
			break;
341
	    case 's':
342
343
344
345
346
347
348
349
350
			substitution = optarg;
			if (substitution == "SNG") {
				substitution = "s%([fi][0-9]{2})(r[0-9]{2})(c[0-9]{2})(s[0-9]{2})%/sng/\\1/\\2/\\3/\\4%";
				maxJobLength = 48;
			} else if (substitution == "DEEPEST") {
				substitution = "s%dp-(cn|dam|esb)([0-9]{2})%/deepest/\\1/s\\2%";
				maxJobLength = 20;
			}
			break;
351
	    case 'm':
352
353
			maxJobLength = std::stoull(optarg);
			break;
354
355
356
	    case 'f':
			force = true;
			break;
357
358
359
360
		case 'h':
		default:
			usage();
			return 1;
Alessio Netti's avatar
Alessio Netti committed
361
362
        }
    }
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377

    // Check whether we are started by slurmd and are the first node in the nodelist
    std::string slurmNodename = getEnv("SLURMD_NODENAME");
    std::string slurmNodelist = getEnv("SLURM_JOB_NODELIST");
    if (slurmNodelist == "") {
	slurmNodelist = getEnv("SLURM_NODELIST");
    }
    DCDB::NodeList nl;
    splitNodeList(slurmNodelist, nl);
    if (!force && (slurmNodename.size() > 0)) {
	if (slurmNodename != nl.front()) {
	    std::cout << "Running in slurmd context but not the first node in nodelist. Exiting." << std::endl;
	    return 0;
	}
    }
378
379
    
    if (hostList.size() == 0) {
380
		hostList.push_back("localhost");
381
    }
Alessio Netti's avatar
Alessio Netti committed
382

383
    // Initialize transport
384
    if (cassandra) {
385
        fixFileDescriptors();
386
	    // Allocate and initialize connection to Cassandra.
387
388
	    pickRandomHost(hostList, host, port);
	    if (port == 0) {
389
			port = 9042;
390
391
392
	    }
	
	    dcdbConn = new DCDB::Connection(host, port, cassandraUser, cassandraPassword);
393
	    if (!dcdbConn->connect()) {
394
		    std::cerr << "Cannot connect to Cassandra server " << host << ":" << port << std::endl;
395
396
		    return 1;
	    }
397
	    std::cout << "Connected to Cassandra server " << host << ":" << port << std::endl;
398
399
	    myJobDataStore = new DCDB::JobDataStore(dcdbConn);
    } else {
400
	    // Initialize Mosquitto library and connect to broker
401
	    char hostname[256];
Micha Müller's avatar
Micha Müller committed
402

403
404
405
406
407
408
	    if (gethostname(hostname, 255) != 0) {
		    std::cerr << "Cannot get hostname!" << std::endl;
		    return 1;
	    }
	    hostname[255] = '\0';
	    mosquitto_lib_init();
409
410
	    myMosq = mosquitto_new(hostname, false, NULL);
	    if (!myMosq) {
411
412
413
414
		    perror(NULL);
		    return 1;
	    }

415
416
417
418
419
420
421
	    int ret = MOSQ_ERR_UNKNOWN;
	    do {
		    pickRandomHost(hostList, host, port, true);
		    if (port == 0) {
			    port = 1883;
		    }
		    
422
		    if ((ret = mosquitto_connect(myMosq, host.c_str(), port, 1000)) != MOSQ_ERR_SUCCESS) {
423
424
425
426
427
428
			    std::cerr << "Could not connect to MQTT broker " << host << ":" << port << " (" << mosquitto_strerror(ret) << ")" <<std::endl;
		    } else {
			    std::cout << "Connected to MQTT broker " << host << ":" << port << ", using QoS " << qos << std::endl;
			    break;
		    }
	    } while (hostList.size() > 0);
429
	    
430
431
	    if (ret != MOSQ_ERR_SUCCESS) {
		    std::cout << "No more MQTT brokers left, aborting" << std::endl;
432
433
		    return 1;
	    }
Alessio Netti's avatar
Alessio Netti committed
434
    }
Micha Müller's avatar
Micha Müller committed
435

436
437
438
    // Collect job data
    bool start = boost::iequals(argv[argc-1], "start");
    bool isPackLeader=false;
Alessio Netti's avatar
Alessio Netti committed
439
    DCDB::JobData jd;
440
    int retCode = 0;
Micha Müller's avatar
Micha Müller committed
441

442
443
    // Fetching timestamp
    if(ts==0) {
Alessio Netti's avatar
Alessio Netti committed
444
        ts = getTimestamp();
445
    }
446
    
447
    // Fetching job ID
448
    if(jobId=="") {
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
        // Is this a job array?
        if((jobId=getEnv("SLURM_ARRAY_JOB_ID")) != "" && (taskId = getEnv("SLURM_ARRAY_TASK_ID")) != "") {
            jobId = jobId + "_" + taskId;
        // Is this a job pack? Packs and arrays cannot be combined in SLURM
        } else if ((jobId=getEnv("SLURM_PACK_JOB_ID")) != "" && (packId = getEnv("SLURM_PACK_JOB_OFFSET")) != "") {
            isPackLeader = packId=="0";
            jobId = jobId + "+" + packId;
            // In this case, packId contains the job ID of the whole pack
            packId = getEnv("SLURM_PACK_JOB_ID");
        // Is this an ordinary job?
        } else {
            jobId = getEnv("SLURM_JOB_ID");
            if (jobId == "") {
                jobId = getEnv("SLURM_JOBID");
            }
        }
        
		// Is this a step within a job/pack/array?
467
468
469
470
471
472
		stepId = getEnv("SLURM_STEP_ID");
		if (stepId=="") {
			stepId = getEnv("SLURM_STEPID");
		}
		if (stepId!="" && jobId!="")
			jobId = jobId + SLURM_JOBSTEP_SEP + stepId;
473
	}
474
475
476
477

	// Fetching user ID
    if(userId=="") {
        userId = getEnv("SLURM_JOB_USER");
478
        if(userId=="") {
479
480
481
482
            userId = getEnv("USER");
        }
    }

483
    DCDB::NodeList pnl;
484
    if (start) {
485
486
487
488
489
490
	    // Check whether a nodelist was provided as command line argument.
	    // Otherwise we have populated nl above already.
	    if(nodelist.size() > 0) {
		splitNodeList(nodelist, nl);
	    }
	    convertNodeList(nl, substitution);
491
492
493
494
            // Getting the whole pack's node list, if necessary
            if(isPackLeader) {
                pnodelist = getEnv("SLURM_PACK_JOB_NODELIST");
            }
495
				
496
		std::cout << "DOMAINID = " << domainId << std::endl;
497
		std::cout << "JOBID    = " << jobId << std::endl;
498
499
500
        std::cout << "USER     = " << userId << std::endl;
        std::cout << "START    = " << ts << std::endl;
        std::cout << "NODELIST = " << nodelist << std::endl;
501
502
503
504
505
506
507
508
509
510
		std::cout << "SUBST    = " << substitution << std::endl;
		if (maxJobLength >= 0) {
			std::cout << "JOBLEN   = " << maxJobLength << std::endl;
		}
		std::cout << "NODES    =";
		for (auto &n: nl) {
			std::cout << " " << n;
		}
		std::cout << std::endl;
		
511
512
513
514
515
		// Only for job pack leaders that are starting up
		if(isPackLeader) {
            splitNodeList(pnodelist, pnl);
            convertNodeList(pnl, substitution);
            std::cout << "PACK     =";
Alessio Netti's avatar
Alessio Netti committed
516
            for (auto &n: pnl) {
517
518
519
520
521
                std::cout << " " << n;
            }
            std::cout << std::endl;
		}
		
Alessio Netti's avatar
Alessio Netti committed
522
        try {
523
            jd.domainId  = domainId;
524
525
            jd.jobId     = jobId;
            jd.userId    = userId;
Alessio Netti's avatar
Alessio Netti committed
526
            jd.startTime = DCDB::TimeStamp(ts);
Michael Ott's avatar
Michael Ott committed
527
            jd.endTime   = (maxJobLength >= 0) ? DCDB::TimeStamp((uint64_t) (ts + S_TO_NS((uint64_t)maxJobLength * 3600ull) + 1)) : DCDB::TimeStamp((uint64_t)0);
Alessio Netti's avatar
Alessio Netti committed
528
529
            jd.nodes     = nl;
        } catch(const std::invalid_argument& e) {
530
531
532
533
			std::cerr << "Invalid input format!" << std::endl;
			retCode = 1;
			goto exit;
		}
534
		
Alessio Netti's avatar
Alessio Netti committed
535
    } else if (boost::iequals(argv[argc-1], "stop")) {
536
537
538
        std::cout << "DOMAINID = " << domainId << std::endl;
        std::cout << "JOBID    = " << jobId << std::endl;
        std::cout << "STOP     = " << ts << std::endl;
Alessio Netti's avatar
Alessio Netti committed
539
540
        
        try {
541
            jd.domainId = domainId;
542
543
544
545
			jd.jobId = jobId;
			jd.endTime = DCDB::TimeStamp(ts);
		} catch (const std::invalid_argument &e) {
			std::cerr << "Invalid input format!" << std::endl;
546
547
548
			retCode = 1;
			goto exit;
		}
Micha Müller's avatar
Micha Müller committed
549
550
    }

551
552
553
554
555
556
557
    retCode = insertJob(jd, start, cassandra);
    if(isPackLeader) {
        if(start) {
            jd.nodes = pnl;
        }
        jd.jobId = packId;
        retCode = insertJob(jd, start, cassandra);
Alessio Netti's avatar
Alessio Netti committed
558
    }
559
    
Micha Müller's avatar
Micha Müller committed
560
exit:
561
562
563
564
565
	if (cassandra) {
		delete myJobDataStore;
		dcdbConn->disconnect();
		delete dcdbConn;
	} else {
566
567
		mosquitto_disconnect(myMosq);
		mosquitto_destroy(myMosq);
568
569
		mosquitto_lib_cleanup();
	}
Micha Müller's avatar
Micha Müller committed
570
	return retCode;
571
}