jobdatastore.cpp 9.08 KB
Newer Older
1
2
3
4
5
6
7
8
9
//================================================================================
// Name        : jobdatastore.cpp
// Author      : Axel Auweter, Micha Mueller
// Copyright   : Leibniz Supercomputing Centre
// Description : C++ API implementation for inserting and querying DCDB job data.
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
10
// Copyright (C) 2011-2018 Leibniz Supercomputing Centre
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Lesser General Public
// License as published by the Free Software Foundation; either
// version 2.1 of the License, or (at your option) any later version.
//
// This library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
// Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public
// License along with this library; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
//================================================================================

/**
28
29
30
31
 * @file
 * @brief This file contains actual implementations of the jobdatastore
 * interface. The interface itself forwards all functions to the internal
 * JobDataStoreImpl. The real logic is implemented in the JobDataStoreImpl.
32
33
 */

34
#include <cinttypes>
35
36
37
38
39
40
41
42
43
44
45
46
#include <list>
#include <string>

#include "cassandra.h"

#include "dcdb/jobdatastore.h"
#include "jobdatastore_internal.h"
#include "dcdb/connection.h"
#include "dcdbglobals.h"

using namespace DCDB;

47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
/**
 * @details
 * Since we want high-performance inserts, we prepare the insert CQL query in
 * advance and only bind it on the actual insert.
 */
void JobDataStoreImpl::prepareInsert(uint64_t ttl) {
  CassError rc = CASS_OK;
  CassFuture* future = NULL;
  const char* query;

  /*
   * Free the old prepared if necessary.
   */
  if (preparedInsert) {
    cass_prepared_free(preparedInsert);
  }

  char *queryBuf = NULL;
  if (ttl == 0) {
66
67
    query = "INSERT INTO " JD_KEYSPACE_NAME "." CF_JOBDATA
        " (jid, uid, start_ts, end_ts, nodes) VALUES (?, ?, ?, ?, ?);";
68
69
70
  }
  else {
    queryBuf = (char*)malloc(256);
71
72
73
    snprintf(queryBuf, 256, "INSERT INTO " JD_KEYSPACE_NAME "." CF_JOBDATA
        " (jid, uid, start_ts, end_ts, nodes) VALUES (?, ?, ?, ?, ?) "
        "USING TTL %" PRIu64 " ;", ttl);
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
    query = queryBuf;
  }

  future = cass_session_prepare(session, query);
  cass_future_wait(future);

  rc = cass_future_error_code(future);
  if (rc != CASS_OK) {
    connection->printError(future);
  } else {
    preparedInsert = cass_future_get_prepared(future);
  }

  cass_future_free(future);
  if (queryBuf) {
    free(queryBuf);
  }
}

93
94
/**
 * @details
95
 * Extract all data from the JobData object and push it into the data store.
96
 */
97
JDError JobDataStoreImpl::insertJob(JobData& jdata) {
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
  /* Insert into Cassandra */
  CassError rc = CASS_OK;
  CassStatement* statement = NULL;
  CassFuture *future = NULL;

  statement = cass_prepared_bind(preparedInsert);

  cass_statement_bind_int64_by_name(statement, "jid", jdata.jobId);
  cass_statement_bind_int64_by_name(statement, "uid", jdata.userId);
  cass_statement_bind_int64_by_name(statement, "start_ts",
                                    jdata.startTime.getRaw());
  cass_statement_bind_int64_by_name(statement, "end_ts",
                                    jdata.endTime.getRaw());

  /* Bind the string node list to a varchar set */
  CassCollection* set = cass_collection_new(CASS_COLLECTION_TYPE_SET,
                                            jdata.nodes.size());
  for (auto& s : jdata.nodes) {
    cass_collection_append_string(set, s.c_str());
  }

  cass_statement_bind_collection_by_name(statement, "nodes", set);
  /* The collection can be freed after binding */
  cass_collection_free(set);

  future = cass_session_execute(session, statement);
  cass_future_wait(future);

  rc = cass_future_error_code(future);
  if (rc != CASS_OK) {
      connection->printError(future);

      cass_future_free(future);
      cass_statement_free(statement);

      return JD_UNKNOWNERROR;
  }

  cass_future_free(future);
  cass_statement_free(statement);
  return JD_OK;
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
}

/**
 * @details
 * //TODO
 */
JDError JobDataStoreImpl::insertSubmittedJob(JobId jid, UserId uid) {
  //TODO
  return JD_UNKNOWNERROR;
}

/**
 * @details
 * //TODO
 */
154
JDError JobDataStoreImpl::updateJob(JobData& jdata) {
155
156
157
158
159
160
  //TODO
  return JD_UNKNOWNERROR;
}

/**
 * @details
161
 * //unnecessary but kept for completeness
162
 */
163
JDError JobDataStoreImpl::removeJob(JobId jid) {
164
165
166
167
168
169
170
171
  //TODO
  return JD_UNKNOWNERROR;
}

/**
 * @details
 * //TODO
 */
172
JDError JobDataStoreImpl::getJobById(JobData& job, JobId jid) {
173
174
175
176
177
178
179
180
  //TODO
  return JD_UNKNOWNERROR;
}

/**
 * @details
 * //TODO
 */
181
182
183
JDError JobDataStoreImpl::getJobsInIntervalExcl(std::list<JobData>& jobs,
                                                TimeStamp intervalStart,
                                                TimeStamp intervalEnd) {
184
185
186
187
188
189
190
191
  //TODO
  return JD_UNKNOWNERROR;
}

/**
 * @details
 * //TODO
 */
192
193
194
JDError JobDataStoreImpl::getJobsInIntervalIncl(std::list<JobData>& jobs,
                                                TimeStamp intervalStart,
                                                TimeStamp intervalEnd) {
195
196
197
198
199
200
201
202
  //TODO
  return JD_UNKNOWNERROR;
}

/**
 * @details
 * //TODO
 */
203
JDError JobDataStoreImpl::getNodeList(NodeList& nodes, JobId jid) {
204
205
206
207
208
209
210
211
212
213
214
215
216
  //TODO
  return JD_UNKNOWNERROR;
}

/**
 * @details
 * This constructor sets the internal connection variable to
 * the externally provided Connection object and also
 * retrieves the CassSession pointer of the connection.
 */
JobDataStoreImpl::JobDataStoreImpl(Connection* conn) {
  connection = conn;
  session = connection->getSessionHandle();
217
218
219

  preparedInsert = nullptr;
  prepareInsert(0);
220
221
222
223
224
}

/**
 * @details
 * The destructor just resets the internal pointers. Deletion of the pointers
225
 * (except preparedInsert) is not our responsibility.
226
227
228
229
 */
JobDataStoreImpl::~JobDataStoreImpl() {
  connection = nullptr;
  session = nullptr;
230
231
232
  if (preparedInsert) {
      cass_prepared_free(preparedInsert);
  }
233
234
235
236
237
238
239
240
241
}

/* ########################################################################## */

/**
 * @details
 * Instead of doing the actual work, this function simply forwards to the
 * corresponding function of the JobDataStoreImpl class.
 */
242
243
JDError JobDataStore::insertJob(JobData& jdata) {
  return impl->insertJob(jdata);
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
}

/**
 * @details
 * Instead of doing the actual work, this function simply forwards to the
 * corresponding function of the JobDataStoreImpl class.
 */
JDError JobDataStore::insertSubmittedJob(JobId jid, UserId uid) {
  return impl->insertSubmittedJob(jid, uid);
}

/**
 * @details
 * Instead of doing the actual work, this function simply forwards to the
 * corresponding function of the JobDataStoreImpl class.
 */
260
261
JDError JobDataStore::updateJob(JobData& jdata) {
  return impl->updateJob(jdata);
262
263
264
265
266
267
268
}

/**
 * @details
 * Instead of doing the actual work, this function simply forwards to the
 * corresponding function of the JobDataStoreImpl class.
 */
269
270
JDError JobDataStore::removeJob(JobId jid) {
  return impl->removeJob(jid);
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
}

/**
 * @details
 * Instead of doing the actual work, this function simply forwards to the
 * corresponding function of the JobDataStoreImpl class.
 */
JDError JobDataStore::getJobById(JobData& job, JobId jid) {
  return impl->getJobById(job, jid);
}

/**
 * @details
 * Instead of doing the actual work, this function simply forwards to the
 * corresponding function of the JobDataStoreImpl class.
 */
287
288
289
290
JDError JobDataStore::getJobsInIntervalExcl(std::list<JobData>& jobs,
                                            TimeStamp intervalStart,
                                            TimeStamp intervalEnd) {
  return impl->getJobsInIntervalExcl(jobs, intervalStart, intervalEnd);
291
292
293
294
295
296
297
}

/**
 * @details
 * Instead of doing the actual work, this function simply forwards to the
 * corresponding function of the JobDataStoreImpl class.
 */
298
299
300
301
JDError JobDataStore::getJobsInIntervalIncl(std::list<JobData>& jobs,
                                            TimeStamp intervalStart,
                                            TimeStamp intervalEnd) {
  return impl->getJobsInIntervalIncl(jobs, intervalStart, intervalEnd);
302
303
304
305
306
307
308
}

/**
 * @details
 * Instead of doing the actual work, this function simply forwards to the
 * corresponding function of the JobDataStoreImpl class.
 */
309
310
JDError JobDataStore::getNodeList(NodeList& nodes, JobId jid) {
  return impl->getNodeList(nodes, jid);
311
312
313
314
315
316
317
318
319
320
321
322
323
}

/**
 * @details
 * This constructor allocates the implementation class which
 * holds the actual implementation of the class functionality.
 */
JobDataStore::JobDataStore(Connection* conn) {
  impl = new JobDataStoreImpl(conn);
}

/**
 * @details
324
 * The JobDataStore destructor deallocates the
325
326
327
328
 * JobDataStoreImpl and CassandraBackend objects.
 */
JobDataStore::~JobDataStore() {
  /* Clean up... */
329
330
331
  if (impl) {
    delete impl;
  }
332
}