Commit 5dba5007 authored by Micha Mueller's avatar Micha Mueller
Browse files

JobDataStore fix: make JobId and StartTs primary key, as JobId alone is not unique

Refactor JobDataStore functionality for the expected use case
parent a5ca9c7a
......@@ -218,14 +218,13 @@ DCDB_C_RESULT disconnectFromDatabase(DCDB::Connection* conn);
DCDB::JobDataStore* constructJobDataStore(DCDB::Connection* conn);
/**
* @brief Insert a job into the database.
* @brief Insert a starting job into the database.
*
* @param jds Pointer to JobDataStore object, which shall be used to
* insert the job.
* @param jid SLURM id of the job.
* @param uid SLURM user id of the job owner.
* @param startTs Start time of the job (in ms since Unix epoch).
* @param endTs End time of the job (in ms since Unix epoch).
* @param nodes String array of node names used by the job.
* @param nodeSize Size of the nodes array.
*
......@@ -236,10 +235,23 @@ DCDB::JobDataStore* constructJobDataStore(DCDB::Connection* conn);
* Builds a JobData struct from (jid, uid, startTs, endTs, nodes, nodeSize) and
* then tries to insert it by calling the corresponding insert function of jds.
*/
DCDB_C_RESULT insertJobIntoDatabase(DCDB::JobDataStore* jds, DCDB::JobId jid,
DCDB::UserId uid, uint64_t startTs,
uint64_t endTs, const char ** nodes,
unsigned nodeSize);
DCDB_C_RESULT insertJobStart(DCDB::JobDataStore* jds, DCDB::JobId jid,
DCDB::UserId uid, uint64_t startTs,
const char ** nodes, unsigned nodeSize);
/**
* @brief Update the end time of the most recent job with Id jid.
*
* @param jds Pointer to JobDataStore object, which shall be used to insert
* the job.
* @param jid SLURM id of the job.
* @param endTs End time of the job (in ms since Unix epoch).
*
* @return DCDB_C_OK if the job was successfully updated. DCDB_BAD_PARAMS if
* no job with the given JobId exists. DCDB_C_UNKNOWN otherwise.
*/
DCDB_C_RESULT updateJobEnd(DCDB::JobDataStore* jds, DCDB::JobId jid,
uint64_t endTs);
/**
* @brief For Debugging. Print the jobdata or an appropriate error message.
......
......@@ -54,7 +54,8 @@ namespace DCDB {
/**
* @brief This struct is a container for the information DCDB keeps about
* SLURM jobs.
* SLURM jobs. Both jobId and startTime are required to uniquely identify a
* job.
*/
struct JobData {
JobId jobId; /**< SLURM job id of the job. */
......@@ -67,6 +68,7 @@ namespace DCDB {
typedef enum {
JD_OK, /**< Everything went fine. */
JD_JOBKEYNOTFOUND,/**< Not job with matching primary key was found */
JD_JOBIDNOTFOUND, /**< The given JobId was not found in the data store. */
JD_BADPARAMS, /**< The provided parameters are ill-formed. Either
because they are erroneous or incomplete. */
......@@ -89,27 +91,20 @@ namespace DCDB {
* @brief This function inserts a single job into the database.
*
* @param jdata Reference to a JobData object filled with all the
* information about the job.
* information about the job. At least jobId and startTime
* have to be filled in as they form the primary key.
* Other JobData values may be left out and can be updated
* later with updateJob().
* @return See JDError
*/
JDError insertJob(JobData& jdata);
/**
* @brief This function inserts a single job which was submitted but not
* yet executed into the database.
*
* @param jid Id of the job.
* @param uid Id of the user the job belongs to.
* @return See JDError
*/
JDError insertSubmittedJob(JobId jid, UserId uid);
/**
* @brief Update a job.
*
* @details Updates the job in the database whose JobId matches
* jdata.JobId. If no such job is found, no job data is
* inserted.
* @details Updates the job in the database whose primary key matches
* jdata. If no entry is found a new one is created (upsert).
* Updates all values of the JobData struct.
*
* @param jdata Reference to a JobData object filled with all the
* information about the job.
......@@ -117,20 +112,48 @@ namespace DCDB {
*/
JDError updateJob(JobData& jdata);
/**
* @brief Update the end time of the job with matching primary key.
*
* @param jobId JobId of the job to be updated. Makes up the
* primary key together with startTime.
* @param startTime Start time of the job. Part of the primary key.
* @param endTime New endTime to be inserted.
*
* return See JDError
*/
JDError updateEndtime(JobId jobId, TimeStamp startTs,
TimeStamp endTime);
/**
* @brief Deletes a job from the job data list.
*
* @param jid JobId.
* @return See JDError.
* @param jid JobId. Makes up the primary key together with startTs.
* @param startTs Start timestamp of the job. Part of the primary key.
* @return See JDError.
*/
JDError deleteJob(JobId jid, TimeStamp startTs);
/**
* @brief Retrieve a job by its primary key.
*
* @param job Reference to a JobData object that will be populated
* with the job data.
* @param jid Id of the job to be retrieved. Makes up the primary key
* together with startTs.
* @param startTs Start time of the job. Part of the primary key.
* @return See JDError.
*/
JDError deleteJob(JobId jid);
JDError getJobByPrimaryKey(JobData& job, JobId jid, TimeStamp startTs);
/**
* @brief Retrieve a job by id.
* @brief Retrieve the most recent job with jid.
*
* @param job Reference to a JobData object that will be populated with
* the job data.
* @param jid Id of the job whose information should be retrieved.
* @param jid Id of the job whose information should be retrieved. If
* multiple jobs with the same jid are present the most
* recent one is returned.
* @return See JDError.
*/
JDError getJobById(JobData& job, JobId jid);
......@@ -177,9 +200,11 @@ namespace DCDB {
* @param nodes Reference to a NodeList which will be populated with
* the nodes.
* @param jid Id of the job whose nodes should be retrieved.
* @param startTs Start timestamp of the job to make up the full primary
* key.
* @return See JDError.
*/
JDError getNodeList(NodeList& nodes, JobId jid);
JDError getNodeList(NodeList& nodes, JobId jid, TimeStamp startTs);
/**
* @brief A shortcut constructor for a JobDataStore object that allows
......
......@@ -64,10 +64,11 @@ namespace DCDB {
/* See jobdatastore.h for documentation */
JDError insertJob(JobData& jdata);
JDError insertSubmittedJob(JobId jid, UserId uid);
JDError updateJob(JobData& jdata);
JDError deleteJob(JobId jid);
JDError updateEndtime(JobId jobId, TimeStamp startTs, TimeStamp endTime);
JDError deleteJob(JobId jid, TimeStamp startTs);
JDError getJobByPrimaryKey(JobData& job, JobId jid, TimeStamp startTs);
JDError getJobById(JobData& job, JobId jid);
JDError getJobsInIntervalExcl(std::list<JobData>& jobs,
TimeStamp intervalStart,
......@@ -75,7 +76,7 @@ namespace DCDB {
JDError getJobsInIntervalIncl(std::list<JobData>& jobs,
TimeStamp intervalStart,
TimeStamp intervalEnd);
JDError getNodeList(NodeList& nodes, JobId jid);
JDError getNodeList(NodeList& nodes, JobId jid, TimeStamp startTs);
JobDataStoreImpl(Connection* conn);
virtual ~JobDataStoreImpl();
......
......@@ -463,17 +463,16 @@ JobDataStore* constructJobDataStore(Connection* conn) {
return NULL;
}
DCDB_C_RESULT insertJobIntoDatabase(JobDataStore* jds, JobId jid,
UserId uid, uint64_t startTs,
uint64_t endTs, const char ** nodes,
unsigned nodeSize) {
DCDB_C_RESULT insertJobStart(JobDataStore* jds, JobId jid, UserId uid,
uint64_t startTs, const char ** nodes,
unsigned nodeSize) {
JobData jdata;
JDError ret;
jdata.jobId = jid;
jdata.userId = uid;
jdata.startTime = startTs;
jdata.endTime = endTs;
jdata.endTime = (uint64_t) 0;
for(unsigned i = 0; i < nodeSize; i++) {
jdata.nodes.push_back(nodes[i]);
}
......@@ -488,6 +487,27 @@ DCDB_C_RESULT insertJobIntoDatabase(JobDataStore* jds, JobId jid,
return DCDB_C_UNKNOWN;
}
DCDB_C_RESULT updateJobEnd(JobDataStore* jds, JobId jid, uint64_t endTs) {
JobData jdata;
JDError ret;
ret = jds->getJobById(jdata, jid);
if (ret == JD_UNKNOWNERROR || ret == JD_PARSINGERROR) {
return DCDB_C_UNKNOWN;
} else if (ret == JD_JOBIDNOTFOUND) {
return DCDB_C_BADPARAMS;
} else if (ret != JD_OK) {
return DCDB_C_UNKNOWN;
}
if (jds->updateEndtime(jid, jdata.startTime, endTs) != JD_OK) {
return DCDB_C_UNKNOWN;
}
return DCDB_C_OK;
}
DCDB_C_RESULT printJob(JobDataStore* jds, JobId jid) {
JobData jdata;
JDError ret;
......
......@@ -487,7 +487,8 @@ bool ConnectionImpl::initSchema() {
"end_ts bigint, " /* End timestamp of the job */
"nodes set<varchar>", /* Set of nodes used by the job */
"jid" /* Make the "jid" column the primary key */
"jid, start_ts" /* Make jid + start_ts columns the primary key*/
/* Together they should be unique */
); /* No further options required */
}
......
......@@ -93,14 +93,8 @@ void JobDataStoreImpl::prepareInsert(uint64_t ttl) {
* Extract all data from the JobData object and push it into the data store.
*/
JDError JobDataStoreImpl::insertJob(JobData& jdata) {
/* Check if the input is valid and reasonable */
if (jdata.startTime.getRaw() == 0 || jdata.endTime.getRaw() == 0) {
return JD_BADPARAMS;
}
if (jdata.startTime >= jdata.endTime) {
return JD_BADPARAMS;
}
if (jdata.nodes.size() == 0) {
/* Check if the input for the primary key is valid and reasonable */
if (jdata.startTime.getRaw() == 0) {
return JD_BADPARAMS;
}
......@@ -153,29 +147,44 @@ JDError JobDataStoreImpl::insertJob(JobData& jdata) {
/**
* @details
* Push JobId and UserId into the data store. The remaining fields are filled
* with dummy values and are expected to be updated with their actual values
* once the job finished.
* Update the job with matching JobId and StartTs in the data store with the
* values provided by the given JobData object. If no such job exists in the
* data store yet, it is inserted.
* The JobData object is expected to be complete. Partial
* updates of only selected fields are not supported. Instead, one has to
* retrieve the other JobData information via getJobById() or
* getJobByPrimaryKey() first and complete its JobData object for the update.
*/
JDError JobDataStoreImpl::insertSubmittedJob(JobId jid, UserId uid) {
JDError JobDataStoreImpl::updateJob(JobData& jdata) {
/* Check if the input for the primary key is valid and reasonable */
if (jdata.startTime.getRaw() == 0) {
return JD_BADPARAMS;
}
JDError error = JD_UNKNOWNERROR;
/* Insert into Cassandra */
/* Update entry in Cassandra (actually upserts) */
CassError rc = CASS_OK;
CassStatement* statement = NULL;
CassFuture *future = NULL;
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
const char* query = "UPDATE " JD_KEYSPACE_NAME "." CF_JOBDATA
" SET uid = ?, end_ts = ?, nodes = ? WHERE jid = ? AND start_ts = ? ;";
statement = cass_prepared_bind(preparedInsert);
statement = cass_statement_new(query, 5);
cass_statement_bind_int64_by_name(statement, "jid", jid);
cass_statement_bind_int64_by_name(statement, "uid", uid);
cass_statement_bind_int64_by_name(statement, "start_ts", 0);
cass_statement_bind_int64_by_name(statement, "end_ts", 0);
cass_statement_bind_int64(statement, 3, jdata.jobId);
cass_statement_bind_int64(statement, 0, jdata.userId);
cass_statement_bind_int64(statement, 4, jdata.startTime.getRaw());
cass_statement_bind_int64(statement, 1, jdata.endTime.getRaw());
/* Use an empty varchar set as dummy value */
CassCollection* set = cass_collection_new(CASS_COLLECTION_TYPE_SET, 0);
/* Copy the string node list to a varchar set */
CassCollection* set = cass_collection_new(CASS_COLLECTION_TYPE_SET,
jdata.nodes.size());
for (auto& s : jdata.nodes) {
cass_collection_append_string(set, s.c_str());
}
cass_statement_bind_collection_by_name(statement, "nodes", set);
cass_statement_bind_collection(statement, 2, set);
/* All parameters bound. Now execute the statement asynchronously */
future = cass_session_execute(session, statement);
......@@ -196,33 +205,20 @@ JDError JobDataStoreImpl::insertSubmittedJob(JobId jid, UserId uid) {
cass_future_free(future);
cass_statement_free(statement);
return error;
}
/**
* TODO: Delimit this method clearer from insert: append an if-condition to the
* statement, to only update if the job already exists in the database.
*
* @details
* Update the job with matching JobId in the data store with the values
* provided by the given JobData object. If no such job exists in the data
* store yet, it is inserted.
* The JobData object is expected to be complete. Partial
* updates of only selected fields are not supported. Instead, one has to
* retrieve the other JobData information via getJobById() first and complete
* its JobData object for the update. This method is intended to update jobs
* which were inserted via insertSubmittedJob() but can be used to correct
* values of every other job, too.
* Update the job with matching JobId and StartTs in the data store with the
* provided end time. If no such job exists in the data store yet, it is
* inserted.
*/
JDError JobDataStoreImpl::updateJob(JobData& jdata) {
/* Check if the input is valid and reasonable */
if (jdata.startTime.getRaw() == 0 || jdata.endTime.getRaw() == 0) {
return JD_BADPARAMS;
}
if (jdata.startTime >= jdata.endTime) {
return JD_BADPARAMS;
}
if (jdata.nodes.size() == 0) {
JDError JobDataStoreImpl::updateEndtime(JobId jobId, TimeStamp startTs,
TimeStamp endTime) {
/* Check if the input for the primary key is valid and reasonable */
if (startTs.getRaw() == 0) {
return JD_BADPARAMS;
}
......@@ -233,30 +229,56 @@ JDError JobDataStoreImpl::updateJob(JobData& jdata) {
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
const char* query = "UPDATE " JD_KEYSPACE_NAME "." CF_JOBDATA
" SET uid = ?, start_ts = ?, end_ts = ?, nodes = ? WHERE jid = ? ;";
" SET end_ts = ? WHERE jid = ?, start_ts = ? ;";
statement = cass_statement_new(query, 5);
statement = cass_statement_new(query, 3);
cass_statement_bind_int64(statement, 4, jdata.jobId);
cass_statement_bind_int64(statement, 0, jdata.userId);
cass_statement_bind_int64(statement, 1, jdata.startTime.getRaw());
cass_statement_bind_int64(statement, 2, jdata.endTime.getRaw());
cass_statement_bind_int64(statement, 1, jobId);
cass_statement_bind_int64(statement, 2, startTs.getRaw());
cass_statement_bind_int64(statement, 0, endTime.getRaw());
/* Copy the string node list to a varchar set */
CassCollection* set = cass_collection_new(CASS_COLLECTION_TYPE_SET,
jdata.nodes.size());
for (auto& s : jdata.nodes) {
cass_collection_append_string(set, s.c_str());
/* All parameters bound. Now execute the statement asynchronously */
future = cass_session_execute(session, statement);
/* Wait for the statement to finish */
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
error = JD_UNKNOWNERROR;
} else {
error = JD_OK;
}
cass_statement_bind_collection(statement, 3, set);
cass_future_free(future);
cass_statement_free(statement);
return error;
}
/**
* @details
* Delete the entry with matching JobId and start TimeStamp from the data store.
*/
JDError JobDataStoreImpl::deleteJob(JobId jid, TimeStamp startTs) {
JDError error = JD_UNKNOWNERROR;
/* Remove entry from Cassandra */
CassError rc = CASS_OK;
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
const char* query = "DELETE FROM " JD_KEYSPACE_NAME "." CF_JOBDATA
" WHERE jid = ? AND start_ts = ?;";
statement = cass_statement_new(query, 2);
cass_statement_bind_int64(statement, 0, jid);
cass_statement_bind_int64(statement, 1, startTs.getRaw());
/* All parameters bound. Now execute the statement asynchronously */
future = cass_session_execute(session, statement);
/* Clean up in the meantime */
cass_collection_free(set);
/* Wait for the statement to finish */
cass_future_wait(future);
......@@ -276,21 +298,24 @@ JDError JobDataStoreImpl::updateJob(JobData& jdata) {
/**
* @details
* Delete the entry with matching JobId from the data store.
* Find the entry in the data store with matching JobId and start_ts and store
* the corresponding values in the JobData object.
*/
JDError JobDataStoreImpl::deleteJob(JobId jid) {
JDError JobDataStoreImpl::getJobByPrimaryKey(JobData& job, JobId jid,
TimeStamp startTs) {
JDError error = JD_UNKNOWNERROR;
/* Remove entry from Cassandra */
/* Select entry from Cassandra */
CassError rc = CASS_OK;
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
const char* query = "DELETE FROM " JD_KEYSPACE_NAME "." CF_JOBDATA
" WHERE jid = ? ;";
const char* query = "SELECT * FROM " JD_KEYSPACE_NAME "." CF_JOBDATA
" WHERE jid = ? AND start_ts = ?;";
statement = cass_statement_new(query, 1);
statement = cass_statement_new(query, 2);
cass_statement_bind_int64(statement, 0, jid);
cass_statement_bind_int64(statement, 1, startTs.getRaw());
/* All parameters bound. Now execute the statement asynchronously */
future = cass_session_execute(session, statement);
......@@ -304,6 +329,57 @@ JDError JobDataStoreImpl::deleteJob(JobId jid) {
error = JD_UNKNOWNERROR;
} else {
error = JD_OK;
const CassResult* cresult = cass_future_get_result(future);
size_t rowCnt = cass_result_row_count(cresult);
/* Check if the returned data is reasonable */
if (rowCnt == 0) {
error = JD_JOBKEYNOTFOUND;
} else {
/* Retrieve data from result */
const CassRow* row = cass_result_first_row(cresult);
cass_int64_t jobId, userId, startTs, endTs;
/* jid and start_ts are always set. Other values should be checked */
cass_value_get_int64(cass_row_get_column_by_name(row, "jid"), &jobId);
cass_value_get_int64(cass_row_get_column_by_name(row, "start_ts"),
&startTs);
if (cass_value_get_int64(cass_row_get_column_by_name(row, "uid"),
&userId) != CASS_OK) {
userId = 0;
error = JD_PARSINGERROR;
}
if (cass_value_get_int64(cass_row_get_column_by_name(row, "end_ts"),
&endTs) != CASS_OK) {
endTs = 0;
error = JD_PARSINGERROR;
}
/* Copy the data in the JobData object */
job.jobId = (JobId) jobId;
job.userId = (UserId) userId;
job.startTime = (uint64_t) startTs;
job.endTime = (uint64_t) endTs;
/* Do not forget about the nodes... */
const char* nodeStr;
size_t nodeStr_len;
const CassValue* set = cass_row_get_column_by_name(row, "nodes");
CassIterator* setIt = cass_iterator_from_collection(set);
while (cass_iterator_next(setIt)) {
cass_value_get_string(cass_iterator_get_value(setIt),
&nodeStr, &nodeStr_len);
job.nodes.emplace_back(nodeStr, nodeStr_len);
}
cass_iterator_free(setIt);
}
cass_result_free(cresult);
}
cass_future_free(future);
......@@ -314,7 +390,8 @@ JDError JobDataStoreImpl::deleteJob(JobId jid) {
/**
* @details
* Find the entry in the data store with matching JobId and store the
* Find the entry in the data store with matching JobId and highest start_ts
* value (= most recent job) and store the
* corresponding values in the JobData object.
*/
JDError JobDataStoreImpl::getJobById(JobData& job, JobId jid) {
......@@ -325,7 +402,7 @@ JDError JobDataStoreImpl::getJobById(JobData& job, JobId jid) {
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
const char* query = "SELECT * FROM " JD_KEYSPACE_NAME "." CF_JOBDATA
" WHERE jid = ? ;";
" WHERE jid = ? ORDER BY start_ts DESC LIMIT 1;";
statement = cass_statement_new(query, 1);
......@@ -356,12 +433,13 @@ JDError JobDataStoreImpl::getJobById(JobData& job, JobId jid) {
cass_int64_t jobId, userId, startTs, endTs;
/* jid and uid should always be set. Other values should be checked */
/* jid and start_ts are always set. Other values should be checked */
cass_value_get_int64(cass_row_get_column_by_name(row, "jid"), &jobId);
cass_value_get_int64(cass_row_get_column_by_name(row, "uid"), &userId);
if (cass_value_get_int64(cass_row_get_column_by_name(row, "start_ts"),
&startTs) != CASS_OK) {
startTs = 0;
cass_value_get_int64(cass_row_get_column_by_name(row, "start_ts"),
&startTs);
if (cass_value_get_int64(cass_row_get_column_by_name(row, "uid"),
&userId) != CASS_OK) {
userId = 0;
error = JD_PARSINGERROR;
}
if (cass_value_get_int64(cass_row_get_column_by_name(row, "end_ts"),
......@@ -704,10 +782,12 @@ JDError JobDataStoreImpl::getJobsInIntervalIncl(std::list<JobData>& jobs,
/**
* @details
* Find the entry in the data store with matching JobId and store the
* Find the entry in the data store with matching JobId and highest start_ts
* value (= most recent job) and store the
* corresponding nodes in the NodeList.
*/
JDError JobDataStoreImpl::getNodeList(NodeList& nodes, JobId jid) {
JDError JobDataStoreImpl::getNodeList(NodeList& nodes, JobId jid,
TimeStamp startTs) {
JDError error = JD_UNKNOWNERROR;
/* Select entry from Cassandra */
......@@ -715,7 +795,7 @@ JDError JobDataStoreImpl::getNodeList(NodeList& nodes, JobId jid) {
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
const char* query = "SELECT nodes FROM " JD_KEYSPACE_NAME "." CF_JOBDATA
" WHERE jid = ? ;";
" WHERE jid = ? ORDER BY start_ts LIMIT 1;";
statement = cass_statement_new(query, 1);
......@@ -812,8 +892,18 @@ JDError JobDataStore::insertJob(JobData& jdata) {
* Instead of doing the actual work, this function simply forwards to the
* corresponding function of the JobDataStoreImpl class.
*/
JDError JobDataStore::insertSubmittedJob(JobId jid, UserId uid) {
return impl->insertSubmittedJob(jid, uid);
JDError JobDataStore::updateJob(JobData& jdata) {
return impl->updateJob(jdata);
}
/**
* @details
* Instead of doing the actual work, this function simply forwards to the
* corresponding function of the JobDataStoreImpl class.
*/
JDError JobDataStore::updateEndtime(JobId jobId, TimeStamp startTs,
TimeStamp endTime) {
return impl->updateEndtime(jobId, startTs, endTime);
}
/**
......@@ -821,8 +911,8 @@ JDError JobDataStore::insertSubmittedJob(JobId jid, UserId uid) {
* Instead of doing the actual work, this function simply forwards to the
* corresponding function of the JobDataStoreImpl class.
*/
JDError JobDataStore::updateJob(JobData& jdata) {
return impl->updateJob(jdata);
JDError JobDataStore::deleteJob(JobId jid, TimeStamp startTs) {
return impl->deleteJob(jid, startTs);
}
/**
......@@ -830,8 +920,9 @@ JDError JobDataStore::updateJob(JobData& jdata) {
* Instead of doing the actual work, this function simply forwards to the
* corresponding function of the JobDataStoreImpl class.
*/
JDError JobDataStore::deleteJob(JobId jid) {
return impl->deleteJob(jid);
JDError JobDataStore::getJobByPrimaryKey(JobData& job, JobId jid,
TimeStamp startTs</