2.12.2021, 9:00 - 11:00: Due to updates GitLab may be unavailable for some minutes between 09:00 and 11:00.

jobdatastore.cpp 38.1 KB
Newer Older
1
2
3
//================================================================================
// Name        : jobdatastore.cpp
// Author      : Axel Auweter, Micha Mueller
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
6
7
8
9
10
// Copyright   : Leibniz Supercomputing Centre
// Description : C++ API implementation for inserting and querying DCDB job data.
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
11
// Copyright (C) 2011-2019 Leibniz Supercomputing Centre
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Lesser General Public
// License as published by the Free Software Foundation; either
// version 2.1 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
// Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public
// License along with this library; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
//================================================================================

/**
29
30
31
32
 * @file
 * @brief This file contains actual implementations of the jobdatastore
 * interface. The interface itself forwards all functions to the internal
 * JobDataStoreImpl. The real logic is implemented in the JobDataStoreImpl.
33
34
 */

35
#include <cinttypes>
36
37
38
39
40
41
42
43
44
45
46
47
#include <list>
#include <string>

#include "cassandra.h"

#include "dcdb/jobdatastore.h"
#include "jobdatastore_internal.h"
#include "dcdb/connection.h"
#include "dcdbglobals.h"

using namespace DCDB;

48
49
50
51
52
53
54
55
56
57
/**
 * @details
 * Since we want high-performance inserts, we prepare the insert CQL query in
 * advance and only bind it on the actual insert.
 */
void JobDataStoreImpl::prepareInsert(uint64_t ttl) {
  CassError rc = CASS_OK;
  CassFuture* future = NULL;
  const char* query;

58
  /* Free the old prepared if necessary. */
59
60
61
62
63
64
  if (preparedInsert) {
    cass_prepared_free(preparedInsert);
  }

  char *queryBuf = NULL;
  if (ttl == 0) {
65
    query = "INSERT INTO " JD_KEYSPACE_NAME "." CF_JOBDATA
66
        " (domain, jid, uid, start_ts, end_ts, nodes) VALUES (?, ?, ?, ?, ?, ?);";
67
68
69
  }
  else {
    queryBuf = (char*)malloc(256);
70
    snprintf(queryBuf, 256, "INSERT INTO " JD_KEYSPACE_NAME "." CF_JOBDATA
71
        " (domain, jid, uid, start_ts, end_ts, nodes) VALUES (?, ?, ?, ?, ?, ?) "
72
        "USING TTL %" PRIu64 " ;", ttl);
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
    query = queryBuf;
  }

  future = cass_session_prepare(session, query);
  cass_future_wait(future);

  rc = cass_future_error_code(future);
  if (rc != CASS_OK) {
    connection->printError(future);
  } else {
    preparedInsert = cass_future_get_prepared(future);
  }

  cass_future_free(future);
  if (queryBuf) {
    free(queryBuf);
  }
}

92
93
/**
 * @details
94
 * Extract all data from the JobData object and push it into the data store.
95
 */
96
JDError JobDataStoreImpl::insertJob(JobData& jdata) {
97
98
  JDError error = JD_UNKNOWNERROR;

99
100
101
102
103
104
105
  /* Insert into Cassandra */
  CassError rc = CASS_OK;
  CassStatement* statement = NULL;
  CassFuture *future = NULL;

  statement = cass_prepared_bind(preparedInsert);

106
  cass_statement_bind_string_by_name(statement, "domain", jdata.domainId.empty() ? JOB_DEFAULT_DOMAIN : jdata.domainId.c_str());
107
108
109
110
  cass_statement_bind_string_by_name(statement, "jid", jdata.jobId.c_str());
  cass_statement_bind_string_by_name(statement, "uid", jdata.userId.c_str());
  cass_statement_bind_int64_by_name(statement, "start_ts", jdata.startTime.getRaw());
  cass_statement_bind_int64_by_name(statement, "end_ts", jdata.endTime.getRaw());
111

112
  /* Copy the string node list to a varchar set */
113
114
115
116
117
118
119
120
  CassCollection* set = cass_collection_new(CASS_COLLECTION_TYPE_SET,
                                            jdata.nodes.size());
  for (auto& s : jdata.nodes) {
    cass_collection_append_string(set, s.c_str());
  }

  cass_statement_bind_collection_by_name(statement, "nodes", set);

121
  /* All parameters bound. Now execute the statement asynchronously */
122
  future = cass_session_execute(session, statement);
123
124
125
126
127

  /* Clean up in the meantime */
  cass_collection_free(set);

  /* Wait for the statement to finish */
128
129
130
131
  cass_future_wait(future);

  rc = cass_future_error_code(future);
  if (rc != CASS_OK) {
132
133
134
135
    connection->printError(future);
    error = JD_UNKNOWNERROR;
  } else {
    error = JD_OK;
136
137
138
139
  }

  cass_future_free(future);
  cass_statement_free(statement);
140
  return error;
141
142
143
144
}

/**
 * @details
145
146
147
148
149
150
151
 * Update the job with matching JobId and StartTs in the data store with the
 * values provided by the given JobData object. If no such job exists in the
 * data store yet, it is inserted.
 * The JobData object is expected to be complete. Partial
 * updates of only selected fields are not supported. Instead, one has to
 * retrieve the other JobData information via getJobById() or
 * getJobByPrimaryKey() first and complete its JobData object for the update.
152
 */
153
JDError JobDataStoreImpl::updateJob(JobData& jdata) {
154
155
  JDError error = JD_UNKNOWNERROR;

156
  /* Update entry in Cassandra (actually upserts) */
157
  CassError rc = CASS_OK;
158
159
160
  CassStatement* statement = nullptr;
  CassFuture* future = nullptr;
  const char* query = "UPDATE " JD_KEYSPACE_NAME "." CF_JOBDATA
161
      " SET uid = ?, end_ts = ?, nodes = ? WHERE domain = ? AND jid = ? AND start_ts = ? ;";
162

163
164
165
166
167
  statement = cass_statement_new(query, 6);

  cass_statement_bind_string(statement, 3, jdata.domainId.c_str());
  cass_statement_bind_string(statement, 4, jdata.jobId.c_str());
  cass_statement_bind_int64(statement, 5, jdata.startTime.getRaw());
168

169
  cass_statement_bind_string(statement, 0, jdata.userId.c_str());
170
  cass_statement_bind_int64(statement, 1, jdata.endTime.getRaw());
171
  
172
  /* Copy the string node list to a varchar set */
173
  CassCollection* set = cass_collection_new(CASS_COLLECTION_TYPE_SET, jdata.nodes.size());
174
175
176
  for (auto& s : jdata.nodes) {
    cass_collection_append_string(set, s.c_str());
  }
177

178
  cass_statement_bind_collection(statement, 2, set);
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198

  /* All parameters bound. Now execute the statement asynchronously */
  future = cass_session_execute(session, statement);

  /* Clean up in the meantime */
  cass_collection_free(set);

  /* Wait for the statement to finish */
  cass_future_wait(future);

  rc = cass_future_error_code(future);
  if (rc != CASS_OK) {
    connection->printError(future);
    error = JD_UNKNOWNERROR;
  } else {
    error = JD_OK;
  }

  cass_future_free(future);
  cass_statement_free(statement);
199

200
  return error;
201
202
203
204
}

/**
 * @details
205
206
207
 * Update the job with matching JobId and StartTs in the data store with the
 * provided end time. If no such job exists in the data store yet, it is
 * inserted.
208
 */
209
JDError JobDataStoreImpl::updateEndtime(JobId jobId, TimeStamp startTs, TimeStamp endTime, std::string domainId) {
210
211
  /* Check if the input for the primary key is valid and reasonable */
  if (startTs.getRaw() == 0) {
212
    return JD_BADPARAMS;
213
214
215
216
  }

  JDError error = JD_UNKNOWNERROR;

217
  /* Update entry in Cassandra (actually upserts) */
218
219
220
221
  CassError rc = CASS_OK;
  CassStatement* statement = nullptr;
  CassFuture* future = nullptr;
  const char* query = "UPDATE " JD_KEYSPACE_NAME "." CF_JOBDATA
222
      " SET end_ts = ? WHERE domain = ? AND jid = ? AND start_ts = ?;";
223

224
225
226
227
228
  statement = cass_statement_new(query, 4);
  
  cass_statement_bind_string(statement, 1, domainId.c_str());
  cass_statement_bind_string(statement, 2, jobId.c_str());
  cass_statement_bind_int64(statement, 3, startTs.getRaw());
229
  cass_statement_bind_int64(statement, 0, endTime.getRaw());
230

231
232
233
234
235
236
237
238
239
240
241
242
  /* All parameters bound. Now execute the statement asynchronously */
  future = cass_session_execute(session, statement);

  /* Wait for the statement to finish */
  cass_future_wait(future);

  rc = cass_future_error_code(future);
  if (rc != CASS_OK) {
    connection->printError(future);
    error = JD_UNKNOWNERROR;
  } else {
    error = JD_OK;
243
244
  }

245
246
247
248
249
250
  cass_future_free(future);
  cass_statement_free(statement);

  return error;
}

251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
/**
 * @details
 * Update the job with matching JobId and StartTs in the data store with the
 * provided new start time. If no such job exists in the data store yet, it is
 * inserted - otherwise, a delete must be performed on the previous entry beforehand.
 */
JDError JobDataStoreImpl::updateStartTime(JobId jobId, TimeStamp startTs, TimeStamp newStartTs, std::string domainId) {
    JDError error = JD_UNKNOWNERROR;
    JobData jd;
    
    getJobByPrimaryKey(jd, jobId, startTs, domainId);
    deleteJob(jobId, startTs, domainId);

    jd.jobId = jobId;
    jd.domainId = domainId;
    jd.startTime = newStartTs;
    
    return insertJob(jd);
}

271
272
273
274
/**
 * @details
 * Delete the entry with matching JobId and start TimeStamp from the data store.
 */
275
JDError JobDataStoreImpl::deleteJob(JobId jid, TimeStamp startTs, std::string domainId)  {
276
277
278
279
280
281
282
  JDError error = JD_UNKNOWNERROR;

  /* Remove entry from Cassandra */
  CassError rc = CASS_OK;
  CassStatement* statement = nullptr;
  CassFuture* future = nullptr;
  const char* query = "DELETE FROM " JD_KEYSPACE_NAME "." CF_JOBDATA
283
      " WHERE domain = ? AND jid = ? AND start_ts = ?;";
284

285
  statement = cass_statement_new(query, 3);
286

287
288
289
  cass_statement_bind_string(statement, 0, domainId.c_str());
  cass_statement_bind_string(statement, 1, jid.c_str());
  cass_statement_bind_int64(statement, 2, startTs.getRaw());
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304

  /* All parameters bound. Now execute the statement asynchronously */
  future = cass_session_execute(session, statement);

  /* Wait for the statement to finish */
  cass_future_wait(future);

  rc = cass_future_error_code(future);
  if (rc != CASS_OK) {
    connection->printError(future);
    error = JD_UNKNOWNERROR;
  } else {
    error = JD_OK;
  }

305
  cass_future_free(future);
306
307
308
  cass_statement_free(statement);

  return error;
309
310
311
312
}

/**
 * @details
313
314
 * Find the entry in the data store with matching JobId and start_ts and store
 * the corresponding values in the JobData object.
315
 */
316
JDError JobDataStoreImpl::getJobByPrimaryKey(JobData& job, JobId jid, TimeStamp startTs, std::string domainId) {
317
318
  JDError error = JD_UNKNOWNERROR;

319
  /* Select entry from Cassandra */
320
321
322
  CassError rc = CASS_OK;
  CassStatement* statement = nullptr;
  CassFuture* future = nullptr;
323
  const char* query = "SELECT * FROM " JD_KEYSPACE_NAME "." CF_JOBDATA
324
      " WHERE domain = ? AND jid = ? AND start_ts = ?;";
325

326
  statement = cass_statement_new(query, 3);
327

328
329
330
  cass_statement_bind_string(statement, 0, domainId.c_str());
  cass_statement_bind_string(statement, 1, jid.c_str());
  cass_statement_bind_int64(statement, 2, startTs.getRaw());
331
332
333
334
335
336
337
338
339
340
341
342
343

  /* All parameters bound. Now execute the statement asynchronously */
  future = cass_session_execute(session, statement);

  /* Wait for the statement to finish */
  cass_future_wait(future);

  rc = cass_future_error_code(future);
  if (rc != CASS_OK) {
    connection->printError(future);
    error = JD_UNKNOWNERROR;
  } else {
    error = JD_OK;
344
345
346
347
348
349
350
351
352
353
354

    const CassResult* cresult = cass_future_get_result(future);
    size_t rowCnt = cass_result_row_count(cresult);

    /* Check if the returned data is reasonable */
    if (rowCnt == 0) {
      error = JD_JOBKEYNOTFOUND;
    } else {
      /* Retrieve data from result */
      const CassRow* row = cass_result_first_row(cresult);

355
      cass_int64_t startTs, endTs;
356
357
358
359
      const char *domId, *jobId, *userId;
      size_t domainId_len, jobId_len, userId_len;
      /* domain, jid and start_ts are always set. Other values should be checked */
      cass_value_get_string(cass_row_get_column_by_name(row, "domain"), &domId, &domainId_len);
360
361
362
363
364
      cass_value_get_string(cass_row_get_column_by_name(row, "jid"), &jobId, &jobId_len);
      cass_value_get_int64(cass_row_get_column_by_name(row, "start_ts"), &startTs);
      if (cass_value_get_string(cass_row_get_column_by_name(row, "uid"), &userId, &userId_len) != CASS_OK) {
        userId = "";
        userId_len = 0;
365
366
        error = JD_PARSINGERROR;
      }
367
      if (cass_value_get_int64(cass_row_get_column_by_name(row, "end_ts"), &endTs) != CASS_OK) {
368
369
370
371
372
        endTs = 0;
        error = JD_PARSINGERROR;
      }

      /* Copy the data in the JobData object */
373
      job.domainId = (DomainId) std::string(domId, domainId_len);
374
375
      job.jobId = (JobId) std::string(jobId, jobId_len);
      job.userId = (UserId) std::string(userId, userId_len);
376
377
378
379
380
381
382
383
      job.startTime = (uint64_t) startTs;
      job.endTime = (uint64_t) endTs;

      /* Do not forget about the nodes... */
      const char* nodeStr;
      size_t nodeStr_len;

      const CassValue* set = cass_row_get_column_by_name(row, "nodes");
384
385
386
387
388
389
390
391
392
393
394
      CassIterator *setIt = nullptr;
      
      if(set && (setIt = cass_iterator_from_collection(set))) {
        while (cass_iterator_next(setIt)) {
            cass_value_get_string(cass_iterator_get_value(setIt), &nodeStr, &nodeStr_len);
            job.nodes.emplace_back(nodeStr, nodeStr_len);
        }
    
        cass_iterator_free(setIt);
      } else { 
        error = JD_PARSINGERROR;
395
396
397
398
      }
    }

    cass_result_free(cresult);
399
400
401
402
403
404
  }

  cass_future_free(future);
  cass_statement_free(statement);

  return error;
405
406
407
408
}

/**
 * @details
409
410
 * Find the entry in the data store with matching JobId and highest start_ts
 * value (= most recent job) and store the
411
 * corresponding values in the JobData object.
412
 */
413
JDError JobDataStoreImpl::getJobById(JobData& job, JobId jid, std::string domainId) {
414
415
416
417
418
419
420
  JDError error = JD_UNKNOWNERROR;

  /* Select entry from Cassandra */
  CassError rc = CASS_OK;
  CassStatement* statement = nullptr;
  CassFuture* future = nullptr;
  const char* query = "SELECT * FROM " JD_KEYSPACE_NAME "." CF_JOBDATA
421
      " WHERE domain = ? AND jid = ? ORDER BY jid DESC, start_ts DESC LIMIT 1;";
422

423
  statement = cass_statement_new(query, 2);
424

425
426
  cass_statement_bind_string(statement, 0, domainId.c_str());
  cass_statement_bind_string(statement, 1, jid.c_str());
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449

  /* All parameters bound. Now execute the statement asynchronously */
  future = cass_session_execute(session, statement);

  /* Wait for the statement to finish */
  cass_future_wait(future);

  rc = cass_future_error_code(future);
  if (rc != CASS_OK) {
    connection->printError(future);
    error = JD_UNKNOWNERROR;
  } else {
    error = JD_OK;

    const CassResult* cresult = cass_future_get_result(future);
    size_t rowCnt = cass_result_row_count(cresult);

    /* Check if the returned data is reasonable */
    if (rowCnt == 0) {
      error = JD_JOBIDNOTFOUND;
    } else {
      /* Retrieve data from result */
      const CassRow* row = cass_result_first_row(cresult);
450
451
      
      cass_int64_t startTs, endTs;
452
453
      const char *domId, *jobId, *userId;
      size_t domainId_len, jobId_len, userId_len;
454

455
456
      /* domain, jid and start_ts are always set. Other values should be checked */
      cass_value_get_string(cass_row_get_column_by_name(row, "domain"), &domId, &domainId_len);
457
458
459
460
461
      cass_value_get_string(cass_row_get_column_by_name(row, "jid"), &jobId, &jobId_len);
      cass_value_get_int64(cass_row_get_column_by_name(row, "start_ts"), &startTs);
      if (cass_value_get_string(cass_row_get_column_by_name(row, "uid"), &userId, &userId_len) != CASS_OK) {
        userId = "";
        userId_len = 0;
462
463
        error = JD_PARSINGERROR;
      }
464
      if (cass_value_get_int64(cass_row_get_column_by_name(row, "end_ts"), &endTs) != CASS_OK) {
465
466
467
468
469
        endTs = 0;
        error = JD_PARSINGERROR;
      }

      /* Copy the data in the JobData object */
470
      job.domainId = (DomainId) std::string(domId, domainId_len);
471
472
      job.jobId = (JobId) std::string(jobId, jobId_len);
      job.userId = (UserId) std::string(userId, userId_len);
473
474
475
476
477
478
479
480
      job.startTime = (uint64_t) startTs;
      job.endTime = (uint64_t) endTs;

      /* Do not forget about the nodes... */
      const char* nodeStr;
      size_t nodeStr_len;

      const CassValue* set = cass_row_get_column_by_name(row, "nodes");
481
482
483
484
485
486
487
488
489
490
491
492
      CassIterator *setIt = nullptr;
      
      if(set && (setIt = cass_iterator_from_collection(set))) {
        while (cass_iterator_next(setIt)) {
            cass_value_get_string(cass_iterator_get_value(setIt),
                                  &nodeStr, &nodeStr_len);
            job.nodes.emplace_back(nodeStr, nodeStr_len);
        }
    
        cass_iterator_free(setIt);
      } else { 
        error = JD_PARSINGERROR;
493
494
495
496
497
498
499
500
501
502
      }
    }

    cass_result_free(cresult);
  }

  cass_future_free(future);
  cass_statement_free(statement);

  return error;
503
504
505
506
}

/**
 * @details
507
508
 * Find all entries in the data store whose start_ts AND end_ts lay within
 * the specified interval. Store the found entries in the JobData list.
509
 */
510
511
JDError JobDataStoreImpl::getJobsInIntervalExcl(std::list<JobData>& jobs,
                                                TimeStamp intervalStart,
512
513
                                                TimeStamp intervalEnd,
                                                std::string domainId) {
514
515
  /* Check if the input is valid and reasonable */
  if (intervalEnd.getRaw() == 0) {
516
    return JD_BADPARAMS;
517
518
  }
  if (intervalStart >= intervalEnd) {
519
    return JD_BADPARAMS;
520
521
522
  }

  JDError error = JD_UNKNOWNERROR;
523
  CassStatement* statement = nullptr;
524
525

  /* Select entries from Cassandra */
526
527
  const char* query = "SELECT * FROM " JD_KEYSPACE_NAME "." CF_JOBDATAVIEW
      " WHERE domain = ? AND end_ts <= ? AND start_ts >= ? ALLOW FILTERING;";
528

529
  statement = cass_statement_new(query, 3);
530
  cass_statement_set_paging_size(statement, JOB_PAGING_SIZE);
531

532
533
534
  cass_statement_bind_string(statement, 0, domainId.c_str());
    cass_statement_bind_int64(statement, 1, intervalEnd.getRaw());
  cass_statement_bind_int64(statement, 2, intervalStart.getRaw());
535
536
537
538
539
  error = runStatement(statement, jobs, NULL);
  cass_statement_free(statement);
  
  return error;
}
540

541
542
543
544
545
546
547
548
549
550
/**
 * @details
 * Find all entries in the data store whose start_ts OR end_ts lays within
 * the specified interval. Store the found entries in the JobData list.
 * Cassandra only supports AND conditions in its query language. Therefore
 * we cannot SELECT directly the required jobs. Instead we have to do two
 * selects and manually deduplicate the results.
 */
JDError JobDataStoreImpl::getJobsInIntervalIncl(std::list<JobData>& jobs,
                                                TimeStamp intervalStart,
551
552
                                                TimeStamp intervalEnd,
                                                std::string domainId) {
553
554
555
556
557
558
559
  /* Check if the input is valid and reasonable */
  if (intervalEnd.getRaw() == 0) {
    return JD_BADPARAMS;
  }
  if (intervalStart >= intervalEnd) {
    return JD_BADPARAMS;
  }
560

561
562
563
564
565
566
567
  JDError error = JD_UNKNOWNERROR;
  JDError error2 = JD_UNKNOWNERROR;
  std::unordered_set<JobId> jobIds;
  CassStatement* statement = nullptr;
  
  /* +++ First SELECT +++ */
  /* Select entries from Cassandra where start_ts lays within the interval */
568
569
  const char* query = "SELECT * FROM " JD_KEYSPACE_NAME "." CF_JOBDATAVIEW
      " WHERE domain = ? AND start_ts >= ? AND start_ts <= ? ALLOW FILTERING;";
570

571
  statement = cass_statement_new(query, 3);
572
  cass_statement_set_paging_size(statement, JOB_PAGING_SIZE);
573

574
575
576
  cass_statement_bind_string(statement, 0, domainId.c_str());
  cass_statement_bind_int64(statement, 1, intervalStart.getRaw());
  cass_statement_bind_int64(statement, 2, intervalEnd.getRaw());
577
578
  error = runStatement(statement, jobs, &jobIds);
  cass_statement_free(statement);
579
  
580
581
  /* +++ Second SELECT +++ */
  /* Select entries from Cassandra where end_ts lays within the interval */
582
583
  query = "SELECT * FROM " JD_KEYSPACE_NAME "." CF_JOBDATAVIEW
      " WHERE domain = ? AND end_ts >= ? AND end_ts <= ? ALLOW FILTERING;";
584

585
  statement = cass_statement_new(query, 3);
586
  cass_statement_set_paging_size(statement, JOB_PAGING_SIZE);
587

588
589
590
  cass_statement_bind_string(statement, 0, domainId.c_str());
  cass_statement_bind_int64(statement, 1, intervalStart.getRaw());
  cass_statement_bind_int64(statement, 2, intervalEnd.getRaw());
591
  error2 = runStatement(statement, jobs, &jobIds);
592
  cass_statement_free(statement);
593
594
595
596
  
  if(error2 != JD_OK)
      error = error2;
  
597
598
  return error;
}
599

600
601
602
603
604
605
606
/**
 * @details
 * Find all entries in the data store corresponding to jobs that were running 
 * in the queried time interval, i.e., their start time is less than the queried 
 * intervalEnd, and their end time is 0 or greater than startInterval.
 */
JDError JobDataStoreImpl::getJobsInIntervalRunning(std::list<JobData>& jobs,
607
                                                   TimeStamp intervalStart,
608
609
                                                   TimeStamp intervalEnd,
                                                   std::string domainId) {
610
611
612
613
614
615
616
    /* Check if the input is valid and reasonable */
    if (intervalEnd.getRaw() == 0) {
        return JD_BADPARAMS;
    }
    if (intervalStart >= intervalEnd) {
        return JD_BADPARAMS;
    }
617

618
    JDError error = JD_UNKNOWNERROR;
619
620
621
    JDError error2 = JD_UNKNOWNERROR;
    std::unordered_set<JobId> jobIds;
    CassStatement* statement = nullptr;
622

623
624
    /* +++ First SELECT +++ */
    /* Select entries from Cassandra where start_ts lays within the interval */
625
626
    const char* query = "SELECT * FROM " JD_KEYSPACE_NAME "." CF_JOBDATAVIEW
    " WHERE domain = ? AND end_ts = ? AND start_ts < ? AND start_ts > ?;";
627

628
    statement = cass_statement_new(query, 4);
629
    cass_statement_set_paging_size(statement, JOB_PAGING_SIZE);
630

631
    cass_statement_bind_string(statement, 0, domainId.c_str());
632
    cass_statement_bind_int64(statement, 1, (int64_t)0);
633
634
    cass_statement_bind_int64(statement, 2, intervalEnd.getRaw());
    cass_statement_bind_int64(statement, 3, (int64_t)0);
635
    error = runStatement(statement, jobs, &jobIds);
636
    cass_statement_free(statement);
637

638
639
    /* +++ Second SELECT +++ */
    /* Select entries from Cassandra where end_ts lays within the interval */
640
641
    query = "SELECT * FROM " JD_KEYSPACE_NAME "." CF_JOBDATAVIEW
    " WHERE domain = ? AND end_ts > ? AND start_ts < ? AND start_ts > ? ALLOW FILTERING;";
642

643
    statement = cass_statement_new(query, 4);
644
    cass_statement_set_paging_size(statement, JOB_PAGING_SIZE);
645

646
647
648
649
    cass_statement_bind_string(statement, 0, domainId.c_str());
    cass_statement_bind_int64(statement, 1, intervalStart.getRaw());
    cass_statement_bind_int64(statement, 2, intervalEnd.getRaw());
    cass_statement_bind_int64(statement, 3, (int64_t)0);
650
651
    error2 = runStatement(statement, jobs, &jobIds);
    cass_statement_free(statement);
652

653
654
    if(error2 != JD_OK)
        error = error2;
655

656
657
    return error;
}
658

659
660
661
662
663
664
665
/**
 * @details
 * Find all entries in the data store corresponding to jobs that have terminated 
 * in the queried time interval, i.e., their end time is within the queried interval.
 */
JDError JobDataStoreImpl::getJobsInIntervalFinished(std::list<JobData>& jobs,
                                                   TimeStamp intervalStart,
666
667
                                                   TimeStamp intervalEnd,
                                                   std::string domainId) {
668
669
670
671
672
673
    /* Check if the input is valid and reasonable */
    if (intervalEnd.getRaw() == 0) {
        return JD_BADPARAMS;
    }
    if (intervalStart >= intervalEnd) {
        return JD_BADPARAMS;
674
    }
675
    
676
677
    JDError error = JD_UNKNOWNERROR;
    CassStatement* statement = nullptr;
678

679
    /* Select entries from Cassandra where end_ts lays within the interval */
680
681
    const char* query = "SELECT * FROM " JD_KEYSPACE_NAME "." CF_JOBDATAVIEW
    " WHERE domain = ? AND end_ts > ? AND end_ts < ? AND start_ts > ? ALLOW FILTERING;";
682

683
    statement = cass_statement_new(query, 4);
684
    cass_statement_set_paging_size(statement, JOB_PAGING_SIZE);
685

686
    cass_statement_bind_string(statement, 0, domainId.c_str());
687
688
    cass_statement_bind_int64(statement, 1, intervalStart.getRaw());
    cass_statement_bind_int64(statement, 2, intervalEnd.getRaw());
689
    cass_statement_bind_int64(statement, 3, (int64_t)0);
690
691
692
    error = runStatement(statement, jobs, NULL);
    cass_statement_free(statement);
    
693
    return error;
694
695
696
697
}

/**
 * @details
698
699
 * Find all entries in the data store corresponding to jobs that are in pending state
 * in the queried time interval, i.e., their start time is 0 or higher than the end of the interval.
700
 */
701
702
JDError JobDataStoreImpl::getJobsInIntervalPending(std::list<JobData>& jobs,
                                                    TimeStamp intervalStart,
703
704
                                                    TimeStamp intervalEnd,
                                                    std::string domainId) {
705
706
707
708
709
710
711
    /* Check if the input is valid and reasonable */
    if (intervalEnd.getRaw() == 0) {
        return JD_BADPARAMS;
    }
    if (intervalStart >= intervalEnd) {
        return JD_BADPARAMS;
    }
712

713
714
715
716
717
718
719
    JDError error = JD_UNKNOWNERROR;
    JDError error2 = JD_UNKNOWNERROR;
    std::unordered_set<JobId> jobIds;
    CassStatement* statement = nullptr;
    
    // First query: we pick the jobs whose start time is greater than the interval's start (and that were inserted
    // into the table before the interval's end)
720
721
    const char* query = "SELECT domain,writetime(uid),jid,start_ts,end_ts,nodes,uid FROM " JD_KEYSPACE_NAME "." CF_JOBDATAVIEW
    " WHERE domain = ? AND start_ts > ? ALLOW FILTERING;";
722

723
    statement = cass_statement_new(query, 2);
724
    cass_statement_set_paging_size(statement, JOB_PAGING_SIZE);
725

726
727
    cass_statement_bind_string(statement, 0, domainId.c_str());
    cass_statement_bind_int64(statement, 1, intervalStart.getRaw());
728
729
    error = runStatement(statement, jobs, &jobIds, intervalEnd.getRaw());
    cass_statement_free(statement);
730

731
732
    // Second query: we pick the jobs whose start time is undefined (and that were inserted
    // into the table before the interval's end)
733
734
    query = "SELECT domain,writetime(uid),jid,start_ts,end_ts,nodes,uid FROM " JD_KEYSPACE_NAME "." CF_JOBDATAVIEW
    " WHERE domain = ? AND start_ts = ? ALLOW FILTERING;";
735

736
    statement = cass_statement_new(query, 2);
737
    cass_statement_set_paging_size(statement, JOB_PAGING_SIZE);
738
739
740

    cass_statement_bind_string(statement, 0, domainId.c_str());
    cass_statement_bind_int64(statement, 1, (int64_t)0);
741
742
    error2 = runStatement(statement, jobs, &jobIds, intervalEnd.getRaw());
    cass_statement_free(statement);
743

744
745
    if(error2 != JD_OK)
        error = error2;
746
    
747
748
    return error;
}
749

750
751
752
753
JDError JobDataStoreImpl::runStatement(CassStatement* statement, std::list<JobData>& jobs, std::unordered_set<JobId>* jobIds, uint64_t filterWriteTime) {
    JDError error = JD_UNKNOWNERROR;
    CassError rc = CASS_OK;
    CassFuture* future = nullptr;
754
    
755
756
    if(!statement)
        return error;
757
    
758
759
760
761
    bool morePages = false;
    do {
        /* All parameters bound. Now execute the statement asynchronously */
        future = cass_session_execute(session, statement);
762

763
764
        /* Wait for the statement to finish */
        cass_future_wait(future);
765

766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
        rc = cass_future_error_code(future);
        if (rc != CASS_OK) {
            connection->printError(future);
            error = JD_UNKNOWNERROR;
            morePages = false;
        } else {
            error = JD_OK;
            
            /* Retrieve data from result */
            const CassResult* cresult = cass_future_get_result(future);
            CassIterator* rowIt = cass_iterator_from_result(cresult);
            
            error = parseJobs(rowIt, jobs, jobIds, filterWriteTime);
            
            if((morePages = cass_result_has_more_pages(cresult)))
                cass_statement_set_paging_state(statement, cresult);
782

783
784
785
            cass_iterator_free(rowIt);
            cass_result_free(cresult);
        }
786

787
788
789
790
791
792
        cass_future_free(future);

    }
    while(morePages);

    return error;
793
794
}

795
JDError JobDataStoreImpl::parseJobs(CassIterator* rowIt, std::list<JobData>& jobs, std::unordered_set<JobId>* jobIds, uint64_t filterWriteTime) {
796
    JDError error = JD_OK;
797
    cass_int64_t startTs, endTs;
798
    cass_int64_t writeTs;
799
800
    const char *domId, *jobId, *userId;
    size_t domainId_len, jobId_len, userId_len;
801
802
803
    JobData job;
    while (cass_iterator_next(rowIt)) {
        const CassRow *row = cass_iterator_get_row(rowIt);
804
805
806
        
        if(filterWriteTime>0) {
            // Skipping a job if it does not respect the write time filter
Alessio Netti's avatar
Alessio Netti committed
807
            if(cass_value_get_int64(cass_row_get_column_by_name(row, "writetime(uid)"), &writeTs)!=CASS_OK || (uint64_t)writeTs*1000>filterWriteTime) {
808
809
810
811
                continue;
            }
        }
        
812
813
        /* domain, jid and start_ts should always be set. Other values should be checked */
        cass_value_get_string(cass_row_get_column_by_name(row, "domain"), &domId, &domainId_len);
814
        cass_value_get_string(cass_row_get_column_by_name(row, "jid"), &jobId, &jobId_len);
815
816
        cass_value_get_int64(cass_row_get_column_by_name(row, "start_ts"), &startTs);
        
817
818
819
820
821
        if (cass_value_get_string(cass_row_get_column_by_name(row, "uid"), &userId, &userId_len) != CASS_OK) {
            userId = "";
            userId_len = 0;
            error = JD_PARSINGERROR;
        }
822
823
824
825
826
827
        if (cass_value_get_int64(cass_row_get_column_by_name(row, "end_ts"), &endTs) != CASS_OK) {
            endTs = 0;
            error = JD_PARSINGERROR;
        }

        /* Copy the data into job object */
828
        job.jobId = (JobId) std::string(jobId, jobId_len);
829
830
        /* Set-based deduplication */
        if (jobIds==nullptr || jobIds->insert(job.jobId).second) {
831
            job.domainId = (DomainId) std::string(domId, domainId_len);
832
            job.userId = (UserId) std::string(userId, userId_len);
833
834
835
836
837
838
839
840
            job.startTime = (uint64_t) startTs;
            job.endTime = (uint64_t) endTs;

            /* Do not forget about the nodes... */
            const char *nodeStr;
            size_t nodeStr_len;

            const CassValue *set = cass_row_get_column_by_name(row, "nodes");
841
842
843
844
845
846
847
848
849
850
851
            CassIterator *setIt = nullptr;
            
            if(set && (setIt = cass_iterator_from_collection(set))) {
                while (cass_iterator_next(setIt)) {
                    cass_value_get_string(cass_iterator_get_value(setIt), &nodeStr, &nodeStr_len);
                    job.nodes.emplace_back(nodeStr, nodeStr_len);
                }
                
                cass_iterator_free(setIt);
            } else {
                error = JD_PARSINGERROR;
852
            }
853
            
854
855
856
857
858
859
860
861
            //TODO job.nodes list deep copied?
            jobs.push_back(job);
            job.nodes.clear();
        }
    }
    return error;
}

862
863
/**
 * @details
864
865
 * Find the entry in the data store with matching JobId and highest start_ts
 * value (= most recent job) and store the
866
 * corresponding nodes in the NodeList.
867
 */
868
JDError JobDataStoreImpl::getNodeList(NodeList& nodes, JobId jid, TimeStamp startTs) {
869
870
871
872
873
874
875
  JDError error = JD_UNKNOWNERROR;

  /* Select entry from Cassandra */
  CassError rc = CASS_OK;
  CassStatement* statement = nullptr;
  CassFuture* future = nullptr;
  const char* query = "SELECT nodes FROM " JD_KEYSPACE_NAME "." CF_JOBDATA
876
      " WHERE jid = ? ORDER BY start_ts LIMIT 1;";
877
878
879

  statement = cass_statement_new(query, 1);

880
  cass_statement_bind_string(statement, 0, jid.c_str());
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909

  /* All parameters bound. Now execute the statement asynchronously */
  future = cass_session_execute(session, statement);

  /* Wait for the statement to finish */
  cass_future_wait(future);

  rc = cass_future_error_code(future);
  if (rc != CASS_OK) {
    connection->printError(future);
    error = JD_UNKNOWNERROR;
  } else {
    error = JD_OK;

    const CassResult* cresult = cass_future_get_result(future);
    size_t rowCnt = cass_result_row_count(cresult);

    /* Check if the returned data is reasonable */
    if (rowCnt == 0) {
      error = JD_JOBIDNOTFOUND;
    } else {
      /* Retrieve data from result */
      const CassRow* row = cass_result_first_row(cresult);

      /* Copy the nodes in the NodeList */
      const char* nodeStr;
      size_t nodeStr_len;

      const CassValue* set = cass_row_get_column_by_name(row, "nodes");
910
911
912
913
914
915
916
      CassIterator *setIt = nullptr;
      
      if(set && (setIt = cass_iterator_from_collection(set))) {
          while (cass_iterator_next(setIt)) {
              cass_value_get_string(cass_iterator_get_value(setIt), &nodeStr, &nodeStr_len);
              nodes.emplace_back(nodeStr, nodeStr_len);
          }
917

918
919
920
          cass_iterator_free(setIt);
      } else {
          error = JD_PARSINGERROR;
921
922
923
924
925
926
927
928
929
930
      }
    }

    cass_result_free(cresult);
  }

  cass_future_free(future);
  cass_statement_free(statement);

  return error;
931
932
933
934
935
936
937
938
939
940
941
}

/**
 * @details
 * This constructor sets the internal connection variable to
 * the externally provided Connection object and also
 * retrieves the CassSession pointer of the connection.
 */
JobDataStoreImpl::JobDataStoreImpl(Connection* conn) {
  connection = conn;
  session = connection->getSessionHandle();
942
943
944

  preparedInsert = nullptr;
  prepareInsert(0);
945
946
947
948
949
}

/**
 * @details
 * The destructor just resets the internal pointers. Deletion of the pointers
950
 * (except preparedInsert) is not our responsibility.
951
952
953
954
 */
JobDataStoreImpl::~JobDataStoreImpl() {
  connection = nullptr;
  session = nullptr;
955
956
957
  if (preparedInsert) {
      cass_prepared_free(preparedInsert);
  }
958
959
960
961
962
963
964
965
966
}

/* ########################################################################## */

/**
 * @details
 * Instead of doing the actual work, this function simply forwards to the
 * corresponding function of the JobDataStoreImpl class.
 */
967
968
JDError JobDataStore::insertJob(JobData& jdata) {
  return impl->insertJob(jdata);
969
970
971
972
973
974
975
}

/**
 * @details
 * Instead of doing the actual work, this function simply forwards to the
 * corresponding function of the JobDataStoreImpl class.
 */
976
977
978
979
980
981
982
983
984
JDError JobDataStore::updateJob(JobData& jdata) {
  return impl->updateJob(jdata);
}

/**
 * @details
 * Instead of doing the actual work, this function simply forwards to the
 * corresponding function of the JobDataStoreImpl class.
 */
985
986
987
988
989
990
991
992
993
994
995
JDError JobDataStore::updateEndtime(JobId jobId, TimeStamp startTs, TimeStamp endTime, std::string domainId) {
  return impl->updateEndtime(jobId, startTs, endTime, domainId);
}

/**
 * @details
 * Instead of doing the actual work, this function simply forwards to the
 * corresponding function of the JobDataStoreImpl class.
 */
JDError JobDataStore::updateStartTime(JobId jobId, TimeStamp startTs, TimeStamp newStartTs, std::string domainId) {
    return impl->updateStartTime(jobId, startTs, newStartTs, domainId);
996
997
998
999
1000
1001
1002
}

/**
 * @details
 * Instead of doing the actual work, this function simply forwards to the
 * corresponding function of the JobDataStoreImpl class.
 */
1003
1004
JDError JobDataStore::deleteJob(JobId jid, TimeStamp startTs, std::string domainId) {
  return impl->deleteJob(jid, startTs, domainId);
1005
1006
1007
1008
1009
1010
1011
}

/**
 * @details
 * Instead of doing the actual work, this function simply forwards to the
 * corresponding function of the JobDataStoreImpl class.
 */
1012
1013
JDError JobDataStore::getJobByPrimaryKey(JobData& job, JobId jid, TimeStamp startTs, std::string domainId) {
  return impl->getJobByPrimaryKey(job, jid, startTs, domainId);
1014
1015
1016
1017
1018
1019
1020
}

/**
 * @details
 * Instead of doing the actual work, this function simply forwards to the
 * corresponding function of the JobDataStoreImpl class.
 */
1021
1022
JDError JobDataStore::getJobById(JobData& job, JobId jid, std::string domainId) {
  return impl->getJobById(job, jid, domainId);
1023
1024
1025
1026
1027
1028
1029
}

/**
 * @details
 * Instead of doing the actual work, this function simply forwards to the
 * corresponding function of the JobDataStoreImpl class.
 */
1030
1031
JDError JobDataStore::getJobsInIntervalExcl(std::list<JobData>& jobs,
                                            TimeStamp intervalStart,
1032
1033
1034
                                            TimeStamp intervalEnd,
                                            std::string domainId) {
  return impl->getJobsInIntervalExcl(jobs, intervalStart, intervalEnd, domainId);
1035
1036
}

1037
1038
1039
1040
1041
/**
 * @details
 * Instead of doing the actual work, this function simply forwards to the
 * corresponding function of the JobDataStoreImpl class.
 */
1042
JDError JobDataStore::getJobsInIntervalIncl(std::list<JobData>& jobs,
1043
                                            TimeStamp intervalStart,
1044
1045
1046
                                            TimeStamp intervalEnd,
                                            std::string domainId) {
  return impl->getJobsInIntervalIncl(jobs, intervalStart, intervalEnd, domainId);
1047
1048
1049
1050
1051
1052
1053
1054
1055
}

/**
 * @details
 * Instead of doing the actual work, this function simply forwards to the
 * corresponding function of the JobDataStoreImpl class.
 */
JDError JobDataStore::getJobsInIntervalRunning(std::list<JobData>& jobs,
                                               TimeStamp intervalStart,
1056
1057
1058
                                               TimeStamp intervalEnd,
                                               std::string domainId) {
    return impl->getJobsInIntervalRunning(jobs, intervalStart, intervalEnd, domainId);
1059
1060
}

1061
1062
1063
1064
1065
/**
 * @details
 * Instead of doing the actual work, this function simply forwards to the
 * corresponding function of the JobDataStoreImpl class.
 */
1066
1067
JDError JobDataStore::getJobsInIntervalFinished(std::list<JobData>& jobs,
                                               TimeStamp intervalStart,
1068
1069
1070
                                               TimeStamp intervalEnd,
                                               std::string domainId) {
    return impl->getJobsInIntervalFinished(jobs, intervalStart, intervalEnd, domainId);
1071
1072
1073
1074
1075
1076
1077
1078
1079
}

/**
 * @details
 * Instead of doing the actual work, this function simply forwards to the
 * corresponding function of the JobDataStoreImpl class.
 */
JDError JobDataStore::getJobsInIntervalPending(std::list<JobData>& jobs,
                                               TimeStamp intervalStart,
1080
1081
1082
                                               TimeStamp intervalEnd,
                                               std::string domainId) {
    return impl->getJobsInIntervalPending(jobs, intervalStart, intervalEnd, domainId);
1083
1084
1085
1086
1087
1088
1089
}

/**
 * @details
 * Instead of doing the actual work, this function simply forwards to the
 * corresponding function of the JobDataStoreImpl class.
 */
1090
1091
1092
JDError JobDataStore::getNodeList(NodeList& nodes, JobId jid,
                                  TimeStamp startTs) {
  return impl->getNodeList(nodes, jid, startTs);
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
}

/**
 * @details
 * This constructor allocates the implementation class which
 * holds the actual implementation of the class functionality.
 */
JobDataStore::JobDataStore(Connection* conn) {
  impl = new JobDataStoreImpl(conn);
}

/**
 * @details
1106
 * The JobDataStore destructor deallocates the
1107
1108
1109
1110
 * JobDataStoreImpl and CassandraBackend objects.
 */
JobDataStore::~JobDataStore() {
  /* Clean up... */
1111
1112
1113
  if (impl) {
    delete impl;
  }
1114
}