Commit 9b1426b0 authored by Micha Mueller's avatar Micha Mueller
Browse files

WIP2: Fill jobdatastore interface with life

parent 898e2bc1
......@@ -54,9 +54,7 @@ void JobDataStoreImpl::prepareInsert(uint64_t ttl) {
CassFuture* future = NULL;
const char* query;
/*
* Free the old prepared if necessary.
*/
/* Free the old prepared if necessary. */
if (preparedInsert) {
cass_prepared_free(preparedInsert);
}
......@@ -95,6 +93,19 @@ void JobDataStoreImpl::prepareInsert(uint64_t ttl) {
* Extract all data from the JobData object and push it into the data store.
*/
JDError JobDataStoreImpl::insertJob(JobData& jdata) {
/* Check if the input is valid and reasonable */
if (jdata.startTime == 0 || jdata.endTime == 0) {
return JD_INVALIDJOBDATA;
}
if (jdata.startTime >= jdata.endTime) {
return JD_INVALIDJOBDATA;
}
if (jdata.nodes.size() == 0) {
return JD_INVALIDJOBDATA;
}
JDError error = JD_UNKNOWNERROR;
/* Insert into Cassandra */
CassError rc = CASS_OK;
CassStatement* statement = NULL;
......@@ -109,7 +120,7 @@ JDError JobDataStoreImpl::insertJob(JobData& jdata) {
cass_statement_bind_int64_by_name(statement, "end_ts",
jdata.endTime.getRaw());
/* Bind the string node list to a varchar set */
/* Copy the string node list to a varchar set */
CassCollection* set = cass_collection_new(CASS_COLLECTION_TYPE_SET,
jdata.nodes.size());
for (auto& s : jdata.nodes) {
......@@ -117,43 +128,148 @@ JDError JobDataStoreImpl::insertJob(JobData& jdata) {
}
cass_statement_bind_collection_by_name(statement, "nodes", set);
/* The collection can be freed after binding */
cass_collection_free(set);
/* All parameters bound. Now execute the statement asynchronously */
future = cass_session_execute(session, statement);
/* Clean up in the meantime */
cass_collection_free(set);
/* Wait for the statement to finish */
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
cass_future_free(future);
cass_statement_free(statement);
return JD_UNKNOWNERROR;
connection->printError(future);
error = JD_UNKNOWNERROR;
} else {
error = JD_OK;
}
cass_future_free(future);
cass_statement_free(statement);
return JD_OK;
return error;
}
/**
* @details
* //TODO
* Push JobId and UserId into the data store. The remaining fields are filled
* with dummy values and are expected to be updated with their actual values
* once the job finished.
*/
JDError JobDataStoreImpl::insertSubmittedJob(JobId jid, UserId uid) {
//TODO
return JD_UNKNOWNERROR;
JDError error = JD_UNKNOWNERROR;
/* Insert into Cassandra */
CassError rc = CASS_OK;
CassStatement* statement = NULL;
CassFuture *future = NULL;
statement = cass_prepared_bind(preparedInsert);
cass_statement_bind_int64_by_name(statement, "jid", jid);
cass_statement_bind_int64_by_name(statement, "uid", uid);
cass_statement_bind_int64_by_name(statement, "start_ts", 0);
cass_statement_bind_int64_by_name(statement, "end_ts", 0);
/* Use an empty varchar set as dummy value */
CassCollection* set = cass_collection_new(CASS_COLLECTION_TYPE_SET, 0);
cass_statement_bind_collection_by_name(statement, "nodes", set);
/* All parameters bound. Now execute the statement asynchronously */
future = cass_session_execute(session, statement);
/* Clean up in the meantime */
cass_collection_free(set);
/* Wait for the statement to finish */
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
error = JD_UNKNOWNERROR;
} else {
error = JD_OK;
}
cass_future_free(future);
cass_statement_free(statement);
return error;
}
/**
* TODO: Delimit this method clearer from insert: append an if-condition to the
* statement, to only update if the job already exists in the database.
*
* @details
* //TODO
* Update the job with matching JobId in the data store with the values
* provided by the given JobData object. If no such job exists in the data
* store yet, it is inserted.
* The JobData object is expected to be complete. Partial
* updates of only selected fields are not supported. Instead, one has to
* retrieve the other JobData information via getJobById() first and complete
* its JobData object for the update. This method is intended to update jobs
* which were inserted via insertSubmittedJob() but can be used to correct
* values of every other job, too.
*/
JDError JobDataStoreImpl::updateJob(JobData& jdata) {
//TODO
return JD_UNKNOWNERROR;
/* Check if the input is valid and reasonable */
if (jdata.startTime == 0 || jdata.endTime == 0) {
return JD_INVALIDJOBDATA;
}
if (jdata.startTime >= jdata.endTime) {
return JD_INVALIDJOBDATA;
}
if (jdata.nodes.size() == 0) {
return JD_INVALIDJOBDATA;
}
JDError error = JD_UNKNOWNERROR;
CassError rc = CASS_OK;
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
const char* query = "UPDATE " JD_KEYSPACE_NAME "." CF_JOBDATA
" SET uid = ?, start_ts = ?, end_ts = ?, nodes = ? WHERE jid = ? ;";
statement = cass_statement_new(query, 5);
cass_statement_bind_int64(statement, 4, jdata.jobId);
cass_statement_bind_int64(statement, 0, jdata.userId);
cass_statement_bind_int64(statement, 1, jdata.startTime.getRaw());
cass_statement_bind_int64(statement, 2, jdata.endTime.getRaw());
/* Copy the string node list to a varchar set */
CassCollection* set = cass_collection_new(CASS_COLLECTION_TYPE_SET,
jdata.nodes.size());
for (auto& s : jdata.nodes) {
cass_collection_append_string(set, s.c_str());
}
cass_statement_bind_collection(statement, 3, set);
/* All parameters bound. Now execute the statement asynchronously */
future = cass_session_execute(session, statement);
/* Clean up in the meantime */
cass_collection_free(set);
/* Wait for the statement to finish */
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
error = JD_UNKNOWNERROR;
} else {
error = JD_OK;
}
cass_statement_free(statement);
return error;
}
/**
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment