Commit 7bfd5aab authored by Micha Mueller's avatar Micha Mueller
Browse files

Finish jobdatastore interface implementation

parent 9b1426b0
......@@ -17,7 +17,8 @@ OBJS = src/connection.o \
src/unitconv.o \
src/virtualsensor.o \
src/c_api.o \
src/sensoroperations.o
src/sensoroperations.o \
src/jobdatastore.o
# List of public header files necessary to use this libray
PUBHEADERS = $(shell find include -type f -iname "*.h")
......
......@@ -66,12 +66,14 @@ namespace DCDB {
};
typedef enum {
JD_OK, /**< Everything went fine. */
JD_JOBIDNOTFOUND, /**< The given JobId was not found. */
JD_INVALIDJOBDATA, /**< The provided JobData object is invalid. Either
because the data is erroneous or incomplete. */
JD_UNKNOWNERROR /**< An unknown error occurred. */
//TODO
JD_OK, /**< Everything went fine. */
JD_JOBIDNOTFOUND, /**< The given JobId was not found in the data store. */
JD_INVALIDPARAMS, /**< The provided parameters are invalid. Either
because they are erroneous or incomplete. */
JD_PARSINGERROR, /**< Data retrieved from the data store could not be
parsed and a default value was returned instead.
Use results with care and on own risk. */
JD_UNKNOWNERROR /**< An unknown error occurred. */
} JDError;
/**
......@@ -116,12 +118,12 @@ namespace DCDB {
JDError updateJob(JobData& jdata);
/**
* @brief Removes a job from the job data list.
* @brief Deletes a job from the job data list.
*
* @param jid JobId.
* @return See JDError.
*/
JDError removeJob(JobId jid);
JDError deleteJob(JobId jid);
/**
* @brief Retrieve a job by id.
......@@ -148,8 +150,8 @@ namespace DCDB {
* @return See JDError.
*/
JDError getJobsInIntervalExcl(std::list<JobData>& jobs,
TimeStamp intervalStart,
TimeStamp intervalEnd);
TimeStamp intervalStart,
TimeStamp intervalEnd);
/**
* @brief Retrieve an inclusive list of jobs which were run in the given
......
......@@ -66,7 +66,7 @@ namespace DCDB {
JDError insertJob(JobData& jdata);
JDError insertSubmittedJob(JobId jid, UserId uid);
JDError updateJob(JobData& jdata);
JDError removeJob(JobId jid);
JDError deleteJob(JobId jid);
JDError getJobById(JobData& job, JobId jid);
JDError getJobsInIntervalExcl(std::list<JobData>& jobs,
......
......@@ -94,14 +94,14 @@ void JobDataStoreImpl::prepareInsert(uint64_t ttl) {
*/
JDError JobDataStoreImpl::insertJob(JobData& jdata) {
/* Check if the input is valid and reasonable */
if (jdata.startTime == 0 || jdata.endTime == 0) {
return JD_INVALIDJOBDATA;
if (jdata.startTime.getRaw() == 0 || jdata.endTime.getRaw() == 0) {
return JD_INVALIDPARAMS;
}
if (jdata.startTime >= jdata.endTime) {
return JD_INVALIDJOBDATA;
return JD_INVALIDPARAMS;
}
if (jdata.nodes.size() == 0) {
return JD_INVALIDJOBDATA;
return JD_INVALIDPARAMS;
}
JDError error = JD_UNKNOWNERROR;
......@@ -216,18 +216,19 @@ JDError JobDataStoreImpl::insertSubmittedJob(JobId jid, UserId uid) {
*/
JDError JobDataStoreImpl::updateJob(JobData& jdata) {
/* Check if the input is valid and reasonable */
if (jdata.startTime == 0 || jdata.endTime == 0) {
return JD_INVALIDJOBDATA;
if (jdata.startTime.getRaw() == 0 || jdata.endTime.getRaw() == 0) {
return JD_INVALIDPARAMS;
}
if (jdata.startTime >= jdata.endTime) {
return JD_INVALIDJOBDATA;
return JD_INVALIDPARAMS;
}
if (jdata.nodes.size() == 0) {
return JD_INVALIDJOBDATA;
return JD_INVALIDPARAMS;
}
JDError error = JD_UNKNOWNERROR;
/* Update entry in Cassandra (actually upserts) */
CassError rc = CASS_OK;
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
......@@ -267,6 +268,7 @@ JDError JobDataStoreImpl::updateJob(JobData& jdata) {
error = JD_OK;
}
cass_future_free(future);
cass_statement_free(statement);
return error;
......@@ -274,51 +276,497 @@ JDError JobDataStoreImpl::updateJob(JobData& jdata) {
/**
* @details
* //unnecessary but kept for completeness
* Delete the entry with matching JobId from the data store.
*/
JDError JobDataStoreImpl::removeJob(JobId jid) {
//TODO
return JD_UNKNOWNERROR;
JDError JobDataStoreImpl::deleteJob(JobId jid) {
JDError error = JD_UNKNOWNERROR;
/* Remove entry from Cassandra */
CassError rc = CASS_OK;
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
const char* query = "DELETE FROM " JD_KEYSPACE_NAME "." CF_JOBDATA
" WHERE jid = ? ;";
statement = cass_statement_new(query, 1);
cass_statement_bind_int64(statement, 0, jid);
/* All parameters bound. Now execute the statement asynchronously */
future = cass_session_execute(session, statement);
/* Wait for the statement to finish */
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
error = JD_UNKNOWNERROR;
} else {
error = JD_OK;
}
cass_future_free(future);
cass_statement_free(statement);
return error;
}
/**
* @details
* //TODO
* Find the entry in the data store with matching JobId and store the
* corresponding values in the JobData object.
*/
JDError JobDataStoreImpl::getJobById(JobData& job, JobId jid) {
//TODO
return JD_UNKNOWNERROR;
JDError error = JD_UNKNOWNERROR;
/* Select entry from Cassandra */
CassError rc = CASS_OK;
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
const char* query = "SELECT * FROM " JD_KEYSPACE_NAME "." CF_JOBDATA
" WHERE jid = ? ;";
statement = cass_statement_new(query, 1);
cass_statement_bind_int64(statement, 0, jid);
/* All parameters bound. Now execute the statement asynchronously */
future = cass_session_execute(session, statement);
/* Wait for the statement to finish */
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
error = JD_UNKNOWNERROR;
} else {
error = JD_OK;
const CassResult* cresult = cass_future_get_result(future);
size_t rowCnt = cass_result_row_count(cresult);
/* Check if the returned data is reasonable */
if (rowCnt == 0) {
error = JD_JOBIDNOTFOUND;
} else {
/* Retrieve data from result */
const CassRow* row = cass_result_first_row(cresult);
cass_int64_t jobId, userId, startTs, endTs;
/* jid and uid should always be set. Other values should be checked */
cass_value_get_int64(cass_row_get_column_by_name(row, "jid"), &jobId);
cass_value_get_int64(cass_row_get_column_by_name(row, "uid"), &userId);
if (cass_value_get_int64(cass_row_get_column_by_name(row, "start_ts"),
&startTs) != CASS_OK) {
startTs = 0;
error = JD_PARSINGERROR;
}
if (cass_value_get_int64(cass_row_get_column_by_name(row, "end_ts"),
&endTs) != CASS_OK) {
endTs = 0;
error = JD_PARSINGERROR;
}
/* Copy the data in the JobData object */
job.jobId = (JobId) jobId;
job.userId = (UserId) userId;
job.startTime = (uint64_t) startTs;
job.endTime = (uint64_t) endTs;
/* Do not forget about the nodes... */
const char* nodeStr;
size_t nodeStr_len;
const CassValue* set = cass_row_get_column_by_name(row, "nodes");
CassIterator* setIt = cass_iterator_from_collection(set);
while (cass_iterator_next(setIt)) {
cass_value_get_string(cass_iterator_get_value(setIt),
&nodeStr, &nodeStr_len);
job.nodes.emplace_back(nodeStr, nodeStr_len);
}
cass_iterator_free(setIt);
}
cass_result_free(cresult);
}
cass_future_free(future);
cass_statement_free(statement);
return error;
}
/**
* @details
* //TODO
* Find all entries in the data store whose start_ts AND end_ts lay within
* the specified interval. Store the found entries in the JobData list.
*/
JDError JobDataStoreImpl::getJobsInIntervalExcl(std::list<JobData>& jobs,
TimeStamp intervalStart,
TimeStamp intervalEnd) {
//TODO
return JD_UNKNOWNERROR;
/* Check if the input is valid and reasonable */
if (intervalEnd.getRaw() == 0) {
return JD_INVALIDPARAMS;
}
if (intervalStart >= intervalEnd) {
return JD_INVALIDPARAMS;
}
JDError error = JD_UNKNOWNERROR;
/* Select entries from Cassandra */
CassError rc = CASS_OK;
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
const char* query = "SELECT * FROM " JD_KEYSPACE_NAME "." CF_JOBDATA
" WHERE start_ts >= ? AND end_ts <= ? ;";
statement = cass_statement_new(query, 2);
cass_statement_bind_int64(statement, 0, intervalStart.getRaw());
cass_statement_bind_int64(statement, 1, intervalEnd.getRaw());
/* All parameters bound. Now execute the statement asynchronously */
future = cass_session_execute(session, statement);
/* Wait for the statement to finish */
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
error = JD_UNKNOWNERROR;
} else {
error = JD_OK;
/* Retrieve data from result */
const CassResult* cresult = cass_future_get_result(future);
CassIterator* rowIt = cass_iterator_from_result(cresult);
JobData job;
while (cass_iterator_next(rowIt)) {
const CassRow* row = cass_iterator_get_row(rowIt);
cass_int64_t jobId, userId, startTs, endTs;
/* jid and uid should always be set. Other values should be checked */
cass_value_get_int64(cass_row_get_column_by_name(row, "jid"), &jobId);
cass_value_get_int64(cass_row_get_column_by_name(row, "uid"), &userId);
if (cass_value_get_int64(cass_row_get_column_by_name(row, "start_ts"),
&startTs) != CASS_OK) {
startTs = 0;
error = JD_PARSINGERROR;
}
if (cass_value_get_int64(cass_row_get_column_by_name(row, "end_ts"),
&endTs) != CASS_OK) {
endTs = 0;
error = JD_PARSINGERROR;
}
/* Copy the data into job object */
job.jobId = (JobId) jobId;
job.userId = (UserId) userId;
job.startTime = (uint64_t) startTs;
job.endTime = (uint64_t) endTs;
/* Do not forget about the nodes... */
const char* nodeStr;
size_t nodeStr_len;
const CassValue* set = cass_row_get_column_by_name(row, "nodes");
CassIterator* setIt = cass_iterator_from_collection(set);
while (cass_iterator_next(setIt)) {
cass_value_get_string(cass_iterator_get_value(setIt),
&nodeStr, &nodeStr_len);
job.nodes.emplace_back(nodeStr, nodeStr_len);
}
//TODO job.nodes list deep copied?
jobs.push_back(job);
job.nodes.clear();
cass_iterator_free(setIt);
}
cass_iterator_free(rowIt);
cass_result_free(cresult);
}
cass_future_free(future);
cass_statement_free(statement);
return error;
}
/**
* @details
* //TODO
* Find all entries in the data store whose start_ts OR end_ts lays within
* the specified interval. Store the found entries in the JobData list.
* Cassandra only supports AND conditions in its query language. Therefore
* we cannot SELECT directly the required jobs. Instead we have to do two
* selects and manually deduplicate the results.
* TODO Doing two request successively may tangle up error codes.
*/
JDError JobDataStoreImpl::getJobsInIntervalIncl(std::list<JobData>& jobs,
TimeStamp intervalStart,
TimeStamp intervalEnd) {
//TODO
return JD_UNKNOWNERROR;
/* Check if the input is valid and reasonable */
if (intervalEnd.getRaw() == 0) {
return JD_INVALIDPARAMS;
}
if (intervalStart >= intervalEnd) {
return JD_INVALIDPARAMS;
}
JDError error = JD_UNKNOWNERROR;
/* +++ First SELECT +++ */
/* Select entries from Cassandra where start_ts lays within the interval */
CassError rc = CASS_OK;
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
const char* query = "SELECT * FROM " JD_KEYSPACE_NAME "." CF_JOBDATA
" WHERE start_ts >= ? AND start_ts <= ? ;";
statement = cass_statement_new(query, 2);
cass_statement_bind_int64(statement, 0, intervalStart.getRaw());
cass_statement_bind_int64(statement, 1, intervalEnd.getRaw());
/* All parameters bound. Now execute the statement asynchronously */
future = cass_session_execute(session, statement);
/* Wait for the statement to finish */
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
error = JD_UNKNOWNERROR;
} else {
error = JD_OK;
/* Retrieve data from result */
const CassResult* cresult = cass_future_get_result(future);
CassIterator* rowIt = cass_iterator_from_result(cresult);
JobData job;
while (cass_iterator_next(rowIt)) {
const CassRow* row = cass_iterator_get_row(rowIt);
cass_int64_t jobId, userId, startTs, endTs;
/* jid and uid should always be set. Other values should be checked */
cass_value_get_int64(cass_row_get_column_by_name(row, "jid"), &jobId);
cass_value_get_int64(cass_row_get_column_by_name(row, "uid"), &userId);
if (cass_value_get_int64(cass_row_get_column_by_name(row, "start_ts"),
&startTs) != CASS_OK) {
startTs = 0;
error = JD_PARSINGERROR;
}
if (cass_value_get_int64(cass_row_get_column_by_name(row, "end_ts"),
&endTs) != CASS_OK) {
endTs = 0;
error = JD_PARSINGERROR;
}
/* Copy the data into job object */
job.jobId = (JobId) jobId;
job.userId = (UserId) userId;
job.startTime = (uint64_t) startTs;
job.endTime = (uint64_t) endTs;
/* Do not forget about the nodes... */
const char* nodeStr;
size_t nodeStr_len;
const CassValue* set = cass_row_get_column_by_name(row, "nodes");
CassIterator* setIt = cass_iterator_from_collection(set);
while (cass_iterator_next(setIt)) {
cass_value_get_string(cass_iterator_get_value(setIt),
&nodeStr, &nodeStr_len);
job.nodes.emplace_back(nodeStr, nodeStr_len);
}
//TODO job.nodes list deep copied?
jobs.push_back(job);
job.nodes.clear();
cass_iterator_free(setIt);
}
cass_iterator_free(rowIt);
cass_result_free(cresult);
}
cass_future_free(future);
cass_statement_free(statement);
/* +++ Second SELECT +++ */
/* Select entries from Cassandra where end_ts lays within the interval */
query = "SELECT * FROM " JD_KEYSPACE_NAME "." CF_JOBDATA
" WHERE end_ts >= ? AND end_ts <= ? ;";
statement = cass_statement_new(query, 2);
cass_statement_bind_int64(statement, 0, intervalStart.getRaw());
cass_statement_bind_int64(statement, 1, intervalEnd.getRaw());
/* All parameters bound. Now execute the statement asynchronously */
future = cass_session_execute(session, statement);
/* Wait for the statement to finish */
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
error = JD_UNKNOWNERROR;
} else {
/* Retrieve data from result */
const CassResult* cresult = cass_future_get_result(future);
CassIterator* rowIt = cass_iterator_from_result(cresult);
JobData job;
while (cass_iterator_next(rowIt)) {
const CassRow* row = cass_iterator_get_row(rowIt);
cass_int64_t jobId, userId, startTs, endTs;
/* jid and uid should always be set. Other values should be checked */
cass_value_get_int64(cass_row_get_column_by_name(row, "jid"), &jobId);
cass_value_get_int64(cass_row_get_column_by_name(row, "uid"), &userId);
if (cass_value_get_int64(cass_row_get_column_by_name(row, "start_ts"),
&startTs) != CASS_OK) {
startTs = 0;
error = JD_PARSINGERROR;
}
if (cass_value_get_int64(cass_row_get_column_by_name(row, "end_ts"),
&endTs) != CASS_OK) {
endTs = 0;
error = JD_PARSINGERROR;
}
/* Copy the data into job object */
job.jobId = (JobId) jobId;
/* Manual "deduplication" */
bool alreadyPresent = false;
/* TODO Possible optimization: iterate only jobs from first SELECT */
for (const auto& j : jobs) {
if (j.jobId == job.jobId) {
alreadyPresent = true;
}
}
if (!alreadyPresent) {
job.userId = (UserId) userId;
job.startTime = (uint64_t) startTs;
job.endTime = (uint64_t) endTs;
/* Do not forget about the nodes... */
const char* nodeStr;
size_t nodeStr_len;
const CassValue* set = cass_row_get_column_by_name(row, "nodes");
CassIterator* setIt = cass_iterator_from_collection(set);
while (cass_iterator_next(setIt)) {
cass_value_get_string(cass_iterator_get_value(setIt),
&nodeStr, &nodeStr_len);
job.nodes.emplace_back(nodeStr, nodeStr_len);
}
//TODO job.nodes list deep copied?
jobs.push_back(job);
job.nodes.clear();
cass_iterator_free(setIt);
}
}
cass_iterator_free(rowIt);
cass_result_free(cresult);
}
cass_future_free(future);
cass_statement_free(statement);
return error;
}
/**
* @details
* //TODO
* Find the entry in the data store with matching JobId and store the
* corresponding nodes in the NodeList.
*/
JDError JobDataStoreImpl::getNodeList(NodeList& nodes, JobId jid) {
//TODO
return JD_UNKNOWNERROR;
JDError error = JD_UNKNOWNERROR;
/* Select entry from Cassandra */
CassError rc = CASS_OK;
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
const char* query = "SELECT nodes FROM " JD_KEYSPACE_NAME "." CF_JOBDATA
" WHERE jid = ? ;";
statement = cass_statement_new(query, 1);
cass_statement_bind_int64(statement, 0, jid);
/* All parameters bound. Now execute the statement asynchronously */
future = cass_session_execute(session, statement);
/* Wait for the statement to finish */
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
error = JD_UNKNOWNERROR;
} else {
error = JD_OK;
const CassResult* cresult = cass_future_get_result(future);
size_t rowCnt = cass_result_row_count(cresult);
/* Check if the returned data is reasonable */
if (rowCnt == 0) {
error = JD_JOBIDNOTFOUND;
} else {
/* Retrieve data from result */
const CassRow* row = cass_result_first_row(cresult);
/* Copy the nodes in the NodeList */
const char* nodeStr;
size_t nodeStr_len;
const CassValue* set = cass_row_get_column_by_name(row, "nodes");
CassIterator* setIt = cass_iterator_from_collection(set);
while (cass_iterator_next(setIt)) {
cass_value_get_string(cass_iterator_get_value(setIt),
&nodeStr, &nodeStr_len);
nodes.emplace_back(nodeStr, nodeStr_len);
}
cass_iterator_free(setIt);
}
cass_result_free(cresult);