The expiration time for new job artifacts in CI/CD pipelines is now 30 days (GitLab default). Previously generated artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

dcdbslurmjob.cpp 18.9 KB
Newer Older
1
2
//================================================================================
// Name        : dcdbslurmjob.cpp
Micha Müller's avatar
Micha Müller committed
3
// Author      : Michael Ott, Micha Mueller
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// Copyright   : Leibniz Supercomputing Centre
// Description : Main file of the dcdbslurmjob command line utility
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2011-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================

Micha Müller's avatar
Micha Müller committed
27
#include "../../common/include/globalconfiguration.h"
28
#include "timestamp.h"
Micha Müller's avatar
Micha Müller committed
29
#include <boost/algorithm/string.hpp>
30
#include <boost/regex.hpp>
Micha Müller's avatar
Micha Müller committed
31
32
33
#include <boost/property_tree/json_parser.hpp>
#include <boost/property_tree/ptree.hpp>
#include <cstdlib>
34
#include <dcdb/connection.h>
Alessio Netti's avatar
Alessio Netti committed
35
#include <dcdb/jobdatastore.h>
Micha Müller's avatar
Micha Müller committed
36
37
#include <iostream>
#include <mosquitto.h>
38
39
#include "dcdb/version.h"
#include "version.h"
40
41
#include "fcntl.h"
#include "unistd.h"
42

43
44
#define SLURM_JOBSTEP_SEP "."

45
46
47
int msgId = -1;
bool done = false;

48
49
50
51
52
53
DCDB::Connection *  dcdbConn = nullptr;
DCDB::JobDataStore *myJobDataStore = nullptr;
struct mosquitto *  myMosq = nullptr;
int timeout = 10;
int qos = 1;

54
55
56
57
58
void publishCallback(struct mosquitto *mosq, void *obj, int mid) {
	if(msgId != -1 && mid == msgId)
		done = true;
}

59
60
61
62
63
64
65
66
67
68
69
70
71
72
// Re-opens STDIN, STDOUT or STDERR to /dev/null if they are closed.
// Prevents libuv from making dcdbslurmjob crash when using Cassandra.
void fixFileDescriptors() {
    std::string stdArr[3] = {"STDIN", "STDOUT", "STDERR"};
    int fd = -1;
    for(int idx=0; idx<=2; idx++) {
        if(fcntl(idx, F_GETFD) < 0) {
            std::cerr << "Warning: detected closed " << stdArr[idx] << " channel. Fixing..." << std::endl;
            if((fd=open("/dev/null", O_RDWR)) < 0 || dup2(fd, idx) < 0) {
                std::cerr << "Error: cannot re-open " << stdArr[idx] << " channel." << std::endl;
            }
        }
    }
}
73

Alessio Netti's avatar
Alessio Netti committed
74
75
76
77
78
/*
 * Print usage information
 */
void usage() {
    std::cout << "Usage:" << std::endl;
79
    std::cout << "  dcdbslurmjob [-b<host>] [-t<timestamp>] [-n<nodelist>] [-d<domainid>] [-j<jobid>] [-i<userid>] start|stop" << std::endl;
80
    std::cout << "  dcdbslurmjob [-c<host>] [-u<username>] [-p<password>] [-t<timestamp>] [-n<nodelist>] [-j<jobid>] [-i<userid>] [-s<pattern>] start|stop" << std::endl;
Alessio Netti's avatar
Alessio Netti committed
81
82
83
84
    std::cout << "  dcdbslurmjob -h" << std::endl;
    std::cout << std::endl;

    std::cout << "Options:" << std::endl;
85
    std::cout << "  -b<hosts>     List of MQTT brokers           [default: localhost:1883]" << std::endl;
Michael Ott's avatar
Michael Ott committed
86
    std::cout << "  -q<qos>       MQTT QoS to use                [default: 1]" << std::endl;
87
    std::cout << "  -o<timeout>   MQTT timeout in seconds        [default: 10]" << std::endl;
88
    std::cout << "  -c<hosts>     List of Cassandra hosts        [default: none]" << std::endl;
89
90
91
92
    std::cout << "  -u<username>  Cassandra username             [default: none]" << std::endl;
    std::cout << "  -p<password>  Cassandra password             [default: none]" << std::endl;
    std::cout << "  -t<timestamp> Timestamp value                [default: now]" << std::endl;
    std::cout << "  -n<nodelist>  Comma-separated nodelist       [default: SLURM_JOB_NODELIST]" << std::endl;
93
94
    std::cout << "  -d<domainid>  Job domain id                  [default: default]" << std::endl;
    std::cout << "  -j<jobid>     String job id                  [default: SLURM_JOB_ID var]" << std::endl;
95
96
    std::cout << "  -i<userid>    Numerical user id              [default: SLURM_JOB_USER var]" << std::endl;
    std::cout << "  -s<pattern>   Nodelist substitution pattern  [default: none]" << std::endl;
97
    std::cout << "  -m<pattern>   Maximum job length in h        [default: none]" << std::endl;
Alessio Netti's avatar
Alessio Netti committed
98
99
100
    std::cout << std::endl;
    std::cout << "  -h            This help page" << std::endl;
    std::cout << std::endl;
101
    std::cout << "Options -b and -c|u|p are mutual exclusive! If both are specified, the latter takes precedence. By default MQTT broker is specified." << std::endl;
Alessio Netti's avatar
Alessio Netti committed
102
}
103
104
105
106

std::string getEnv(const char* var) {
    char* str = std::getenv(var);
    if (str != NULL) {
Alessio Netti's avatar
Alessio Netti committed
107
	    return std::string(str);
108
    } else {
Alessio Netti's avatar
Alessio Netti committed
109
110
111
112
	    return std::string("");
    }
}

113
void splitNodeList(const std::string& str, DCDB::NodeList& nl)
Alessio Netti's avatar
Alessio Netti committed
114
115
{
    nl.clear();
116
117
118
119
    std::string s1 = str;
    boost::regex r1("([^,[]+)(\\[[0-9,-]+\\])?(,|$)", boost::regex::extended);
    boost::smatch m1;
    while (boost::regex_search(s1, m1, r1)) {
120
121
122
123
		std::string hostBase = m1[1].str();
		
		if (m1[2].str().size() == 0) {
			nl.push_back(hostBase);
124
		} else {
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
			std::string s2 = m1[2].str();
			boost::regex r2("([0-9]+)-?([0-9]+)?(,|\\])", boost::regex::extended);
			boost::smatch m2;
			while (boost::regex_search(s2, m2, r2)) {
				if (m2[2] == "") {
					nl.push_back(hostBase + m2[1].str());
				} else {
					int start = atoi(m2[1].str().c_str());
					int stop = atoi(m2[2].str().c_str());
					for (int i=start; i<=stop; i++) {
						std::stringstream ss;
						ss << std::setw(m2[2].str().length()) << std::setfill('0') << i;
						nl.push_back(hostBase + ss.str());
					}
				}
				s2 = m2.suffix().str();
			}
142
		}
143
		s1 = m1.suffix().str();
144
145
146
    }
}

147
148
149
150
151
152
void convertNodeList(DCDB::NodeList& nl, std::string substitution) {
    //check if input has sed format of "s/.../.../" for substitution
    boost::regex  checkSubstitute("s([^\\\\]{1})([\\S|\\s]*)\\1([\\S|\\s]*)\\1");
    boost::smatch matchResults;
    
    if (regex_match(substitution, matchResults, checkSubstitute)) {
153
154
155
156
157
158
159
		//input has substitute format
		boost::regex re = (boost::regex(matchResults[2].str(), boost::regex_constants::extended));
		std::string fmt = matchResults[3].str();
		for (auto &n: nl) {
			 n = boost::regex_replace(n, re, fmt);
			 //std::cout << n <<" => " << mqtt << std::endl;
		}
160
161
162
    }
}

163
164
165
166
167
168
void splitHostList(const std::string& str, std::vector<std::string>& hl, char delim = ',')
{
    hl.clear();
    std::stringstream ss(str);
    std::string token;
    while (std::getline(ss, token, delim)) {
169
		hl.push_back(token);
170
171
172
    }
}

173
void pickRandomHost(std::vector<std::string>& hl, std::string& host, int& port, bool erase = false) {
174
175
176
177
    srand (time(NULL));
    int n = rand() % hl.size();
    host = parseNetworkHost(hl[n]);
    port = atoi(parseNetworkPort(hl[n]).c_str());
178
    if (erase) {
179
		hl.erase(hl.begin()+n);
180
    }
181
182
}

183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
int insertJob(DCDB::JobData job, bool start, bool cassandra) {
    // Cassandra-based job insertion
    if (cassandra) {
        if (start) {
            if(myJobDataStore->insertJob(job) != DCDB::JD_OK) {
                std::cerr << "Job data insert for job " << job.jobId << " failed!" << std::endl;
                return 1;
            }
        }
        else {
            DCDB::JobData jobStart;
            if (myJobDataStore->getJobById(jobStart, job.jobId, job.domainId) != DCDB::JD_OK) {
                std::cerr << "Could not retrieve job " << job.jobId << " to be updated!" << std::endl;
                return 1;
            }
            if (myJobDataStore->updateEndtime(jobStart.jobId, jobStart.startTime, job.endTime, job.domainId) != DCDB::JD_OK) {
                std::cerr << "Could not update end time of job " << job.jobId << "!" << std::endl;
                return 1;
            }
        }
    // MQTT-based job insertion
    } else {
        // Create job data string in JSON format
        std::string                 payload = "";
        std::string                 topic = "/DCDB_JOBDATA/"; // Do not change or keep in sync with simplemqttservermessage.h
        boost::property_tree::ptree config;
        std::ostringstream          output;
        config.clear();
        config.push_back(boost::property_tree::ptree::value_type("domainid", boost::property_tree::ptree(job.domainId)));
        config.push_back(boost::property_tree::ptree::value_type("jobid", boost::property_tree::ptree(job.jobId)));
        config.push_back(boost::property_tree::ptree::value_type("userid", boost::property_tree::ptree(job.userId)));
        config.push_back(boost::property_tree::ptree::value_type("starttime", boost::property_tree::ptree(std::to_string(job.startTime.getRaw()))));
        config.push_back(boost::property_tree::ptree::value_type("endtime", boost::property_tree::ptree(std::to_string(job.endTime.getRaw()))));
        boost::property_tree::ptree nodes;
        for (const auto &n : job.nodes) {
            nodes.push_back(boost::property_tree::ptree::value_type("", boost::property_tree::ptree(n)));
        }
        
        config.push_back(boost::property_tree::ptree::value_type("nodes", nodes));
        boost::property_tree::write_json(output, config, true);
        payload = output.str();
        //std::cout << "Payload:\n" << payload << std::endl;
        mosquitto_publish_callback_set(myMosq, publishCallback);
        uint64_t startTs = getTimestamp();
        msgId = -1;
        done = false;
        int ret = MOSQ_ERR_UNKNOWN;
        
        // Message sent to CollectAgent is independent of start/stop. We send the
        // same JSON in either case. CA does job insert or update depending
        // on job endtime value.
        if ((ret = mosquitto_publish(myMosq, &msgId, topic.c_str(), payload.length(), payload.c_str(), qos, false)) != MOSQ_ERR_SUCCESS) {
            std::cerr << "Could not publish data for job " << job.jobId << " via MQTT: " << mosquitto_strerror(ret) << std::endl;
            return 1;
        }
        do {
            if ((ret = mosquitto_loop(myMosq, -1, 1)) != MOSQ_ERR_SUCCESS) {
                std::cerr << "Error in mosquitto_loop for job " << job.jobId << ": " << mosquitto_strerror(ret) << std::endl;
                return 1;
            }
        } while(!done && getTimestamp() - startTs < (uint64_t)S_TO_NS(timeout));
    }
    return true;
}


Micha Müller's avatar
Micha Müller committed
249
/**
250
251
252
 * Retrieves Slurm job data from environment variables and sends it to either a
 * CollectAgent or a Cassandra database. Job data can also be passed as command
 * line options.
Micha Müller's avatar
Micha Müller committed
253
 */
254
255
int main(int argc, char** argv) {
    std::cout << "dcdbslurmjob " << VERSION << std::endl << std::endl;
256

257
    bool cassandra = false;
258
    
259
260
    std::vector<std::string> hostList;
    std::string host = "", cassandraUser = "", cassandraPassword = "";
261
    int port;
262
    std::string nodelist="", pnodelist="", jobId="", userId="", stepId="", packId="", taskId="";
263
    std::string domainId = JOB_DEFAULT_DOMAIN;
264
    std::string substitution="";
265
    int maxJobLength = -1;
Alessio Netti's avatar
Alessio Netti committed
266
    uint64_t ts=0;
267
    
Alessio Netti's avatar
Alessio Netti committed
268
    // Defining options
269
    const char *opts = "b:q:o:c:u:p:n:t:d:j:i:s:m:h";
Alessio Netti's avatar
Alessio Netti committed
270
271
272
273
274
275
276
277
278
279
280

    char ret;
    while ((ret = getopt(argc, argv, opts))!=-1) {
        switch (ret)
        {
            case 'h':
                usage();
                return 0;
            default:
                break;
        }
281
282
    }
    
Alessio Netti's avatar
Alessio Netti committed
283
284
285
286
287
288
    if (argc < 2) {
        std::cerr << "At least one argument is required: start or stop" << std::endl;
        return 1;
    } else if(!boost::iequals(argv[argc-1], "start") && !boost::iequals(argv[argc-1], "stop")) {
        std::cerr << "Unsupported action: must either be start or stop" << std::endl;
        return 1;
289
290
    }
    
Alessio Netti's avatar
Alessio Netti committed
291
292
293
    optind = 1;
    while ((ret=getopt(argc, argv, opts))!=-1) {
        switch(ret) {
Micha Müller's avatar
Micha Müller committed
294
		case 'b': {
295
			cassandra = false;
296
			splitHostList(optarg, hostList);
Micha Müller's avatar
Micha Müller committed
297
298
			break;
		}
299
300
301
		case 'q':
			qos = atoi(optarg);
			break;
302
303
304
		case 'o':
			timeout = atoi(optarg);
			break;
305
306
		case 'c':
			cassandra = true;
307
			splitHostList(optarg, hostList);
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
			break;
		case 'u':
			cassandra = true;
			cassandraUser = optarg;
			break;
		case 'p': {
			cassandra = true;
			cassandraPassword = optarg;
			// What does this do? Mask the password?
			size_t pwdLen = strlen(optarg);
			memset(optarg, 'x', (pwdLen >= 3) ? 3 : pwdLen);
			if (pwdLen > 3) {
				memset(optarg + 3, 0, pwdLen - 3);
			}
			break;
		}
Micha Müller's avatar
Micha Müller committed
324
	    case 'n':
325
326
327
328
329
			nodelist = optarg;
			break;
		case 't':
			ts = std::stoull(optarg);
			break;
330
331
332
        case 'd':
            domainId = optarg;
            break;
333
334
335
336
337
338
		case 'j':
			jobId = optarg;
			break;
		case 'i':
			userId = optarg;
			break;
339
	    case 's':
340
341
342
343
344
345
346
347
348
			substitution = optarg;
			if (substitution == "SNG") {
				substitution = "s%([fi][0-9]{2})(r[0-9]{2})(c[0-9]{2})(s[0-9]{2})%/sng/\\1/\\2/\\3/\\4%";
				maxJobLength = 48;
			} else if (substitution == "DEEPEST") {
				substitution = "s%dp-(cn|dam|esb)([0-9]{2})%/deepest/\\1/s\\2%";
				maxJobLength = 20;
			}
			break;
349
	    case 'm':
350
351
352
353
354
355
			maxJobLength = std::stoull(optarg);
			break;
		case 'h':
		default:
			usage();
			return 1;
Alessio Netti's avatar
Alessio Netti committed
356
357
        }
    }
358
359
    
    if (hostList.size() == 0) {
360
		hostList.push_back("localhost");
361
    }
Alessio Netti's avatar
Alessio Netti committed
362

363
    // Initialize transport
364
    if (cassandra) {
365
        fixFileDescriptors();
366
	    // Allocate and initialize connection to Cassandra.
367
368
	    pickRandomHost(hostList, host, port);
	    if (port == 0) {
369
			port = 9042;
370
371
372
	    }
	
	    dcdbConn = new DCDB::Connection(host, port, cassandraUser, cassandraPassword);
373
	    if (!dcdbConn->connect()) {
374
		    std::cerr << "Cannot connect to Cassandra server " << host << ":" << port << std::endl;
375
376
		    return 1;
	    }
377
	    std::cout << "Connected to Cassandra server " << host << ":" << port << std::endl;
378
379
	    myJobDataStore = new DCDB::JobDataStore(dcdbConn);
    } else {
380
	    // Initialize Mosquitto library and connect to broker
381
	    char hostname[256];
Micha Müller's avatar
Micha Müller committed
382

383
384
385
386
387
388
	    if (gethostname(hostname, 255) != 0) {
		    std::cerr << "Cannot get hostname!" << std::endl;
		    return 1;
	    }
	    hostname[255] = '\0';
	    mosquitto_lib_init();
389
390
	    myMosq = mosquitto_new(hostname, false, NULL);
	    if (!myMosq) {
391
392
393
394
		    perror(NULL);
		    return 1;
	    }

395
396
397
398
399
400
401
	    int ret = MOSQ_ERR_UNKNOWN;
	    do {
		    pickRandomHost(hostList, host, port, true);
		    if (port == 0) {
			    port = 1883;
		    }
		    
402
		    if ((ret = mosquitto_connect(myMosq, host.c_str(), port, 1000)) != MOSQ_ERR_SUCCESS) {
403
404
405
406
407
408
			    std::cerr << "Could not connect to MQTT broker " << host << ":" << port << " (" << mosquitto_strerror(ret) << ")" <<std::endl;
		    } else {
			    std::cout << "Connected to MQTT broker " << host << ":" << port << ", using QoS " << qos << std::endl;
			    break;
		    }
	    } while (hostList.size() > 0);
409
	    
410
411
	    if (ret != MOSQ_ERR_SUCCESS) {
		    std::cout << "No more MQTT brokers left, aborting" << std::endl;
412
413
		    return 1;
	    }
Alessio Netti's avatar
Alessio Netti committed
414
    }
Micha Müller's avatar
Micha Müller committed
415

416
417
418
    // Collect job data
    bool start = boost::iequals(argv[argc-1], "start");
    bool isPackLeader=false;
Alessio Netti's avatar
Alessio Netti committed
419
    DCDB::JobData jd;
420
    int retCode = 0;
Micha Müller's avatar
Micha Müller committed
421

422
423
    // Fetching timestamp
    if(ts==0) {
Alessio Netti's avatar
Alessio Netti committed
424
        ts = getTimestamp();
425
    }
426
    
427
    // Fetching job ID
428
    if(jobId=="") {
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
        // Is this a job array?
        if((jobId=getEnv("SLURM_ARRAY_JOB_ID")) != "" && (taskId = getEnv("SLURM_ARRAY_TASK_ID")) != "") {
            jobId = jobId + "_" + taskId;
        // Is this a job pack? Packs and arrays cannot be combined in SLURM
        } else if ((jobId=getEnv("SLURM_PACK_JOB_ID")) != "" && (packId = getEnv("SLURM_PACK_JOB_OFFSET")) != "") {
            isPackLeader = packId=="0";
            jobId = jobId + "+" + packId;
            // In this case, packId contains the job ID of the whole pack
            packId = getEnv("SLURM_PACK_JOB_ID");
        // Is this an ordinary job?
        } else {
            jobId = getEnv("SLURM_JOB_ID");
            if (jobId == "") {
                jobId = getEnv("SLURM_JOBID");
            }
        }
        
		// Is this a step within a job/pack/array?
447
448
449
450
451
452
		stepId = getEnv("SLURM_STEP_ID");
		if (stepId=="") {
			stepId = getEnv("SLURM_STEPID");
		}
		if (stepId!="" && jobId!="")
			jobId = jobId + SLURM_JOBSTEP_SEP + stepId;
453
	}
454
455
456
457

	// Fetching user ID
    if(userId=="") {
        userId = getEnv("SLURM_JOB_USER");
458
        if(userId=="") {
459
460
461
462
463
464
            userId = getEnv("USER");
        }
    }

    DCDB::NodeList nl, pnl;
    if (start) {
465
		if(nodelist=="") {
Alessio Netti's avatar
Alessio Netti committed
466
            nodelist = getEnv("SLURM_JOB_NODELIST");
467
468
469
470
471
472
473
            if (nodelist == "") {
                nodelist = getEnv("SLURM_NODELIST");
            }
            // Getting the whole pack's node list, if necessary
            if(isPackLeader) {
                pnodelist = getEnv("SLURM_PACK_JOB_NODELIST");
            }
474
		}
475
		
476
477
		splitNodeList(nodelist, nl);
		convertNodeList(nl, substitution);
478
		
479
		std::cout << "DOMAINID = " << domainId << std::endl;
480
		std::cout << "JOBID    = " << jobId << std::endl;
481
482
483
        std::cout << "USER     = " << userId << std::endl;
        std::cout << "START    = " << ts << std::endl;
        std::cout << "NODELIST = " << nodelist << std::endl;
484
485
486
487
488
489
490
491
492
493
		std::cout << "SUBST    = " << substitution << std::endl;
		if (maxJobLength >= 0) {
			std::cout << "JOBLEN   = " << maxJobLength << std::endl;
		}
		std::cout << "NODES    =";
		for (auto &n: nl) {
			std::cout << " " << n;
		}
		std::cout << std::endl;
		
494
495
496
497
498
		// Only for job pack leaders that are starting up
		if(isPackLeader) {
            splitNodeList(pnodelist, pnl);
            convertNodeList(pnl, substitution);
            std::cout << "PACK     =";
Alessio Netti's avatar
Alessio Netti committed
499
            for (auto &n: pnl) {
500
501
502
503
504
                std::cout << " " << n;
            }
            std::cout << std::endl;
		}
		
Alessio Netti's avatar
Alessio Netti committed
505
        try {
506
            jd.domainId  = domainId;
507
508
            jd.jobId     = jobId;
            jd.userId    = userId;
Alessio Netti's avatar
Alessio Netti committed
509
            jd.startTime = DCDB::TimeStamp(ts);
Michael Ott's avatar
Michael Ott committed
510
            jd.endTime   = (maxJobLength >= 0) ? DCDB::TimeStamp((uint64_t) (ts + S_TO_NS((uint64_t)maxJobLength * 3600ull) + 1)) : DCDB::TimeStamp((uint64_t)0);
Alessio Netti's avatar
Alessio Netti committed
511
512
            jd.nodes     = nl;
        } catch(const std::invalid_argument& e) {
513
514
515
516
			std::cerr << "Invalid input format!" << std::endl;
			retCode = 1;
			goto exit;
		}
517
		
Alessio Netti's avatar
Alessio Netti committed
518
    } else if (boost::iequals(argv[argc-1], "stop")) {
519
520
521
        std::cout << "DOMAINID = " << domainId << std::endl;
        std::cout << "JOBID    = " << jobId << std::endl;
        std::cout << "STOP     = " << ts << std::endl;
Alessio Netti's avatar
Alessio Netti committed
522
523
        
        try {
524
            jd.domainId = domainId;
525
526
527
528
			jd.jobId = jobId;
			jd.endTime = DCDB::TimeStamp(ts);
		} catch (const std::invalid_argument &e) {
			std::cerr << "Invalid input format!" << std::endl;
529
530
531
			retCode = 1;
			goto exit;
		}
Micha Müller's avatar
Micha Müller committed
532
533
    }

534
535
536
537
538
539
540
    retCode = insertJob(jd, start, cassandra);
    if(isPackLeader) {
        if(start) {
            jd.nodes = pnl;
        }
        jd.jobId = packId;
        retCode = insertJob(jd, start, cassandra);
Alessio Netti's avatar
Alessio Netti committed
541
    }
542
    
Micha Müller's avatar
Micha Müller committed
543
exit:
544
545
546
547
548
	if (cassandra) {
		delete myJobDataStore;
		dcdbConn->disconnect();
		delete dcdbConn;
	} else {
549
550
		mosquitto_disconnect(myMosq);
		mosquitto_destroy(myMosq);
551
552
		mosquitto_lib_cleanup();
	}
Micha Müller's avatar
Micha Müller committed
553
	return retCode;
554
}