Commit 0b053866 authored by Alessio Netti's avatar Alessio Netti
Browse files

dcdblib: jobID and userID type in JobDataStore is now std::string

parent a0a1deb3
......@@ -51,8 +51,8 @@ namespace DCDB {
/* Forward-declaration of the implementation-internal classes */
class JobDataStoreImpl;
using JobId = uint32_t;
using UserId = uint32_t;
using JobId = std::string;
using UserId = std::string;
using NodeList = std::list<std::string>;
/**
......
......@@ -89,7 +89,7 @@ namespace DCDB {
private:
// Private utility method to avoid code duplication
JDError parseJobs(CassIterator* rowIt, std::list<JobData>& jobs, std::unordered_set<uint32_t>* jobIds);
JDError parseJobs(CassIterator* rowIt, std::list<JobData>& jobs, std::unordered_set<JobId>* jobIds);
};
} /* End of namespace DCDB */
......
......@@ -546,8 +546,8 @@ bool ConnectionImpl::initSchema() {
if (!existsColumnFamily(CF_JOBDATA)) {
std::cout << "Creating Column Family " CF_JOBDATA "...\n";
createColumnFamily(CF_JOBDATA,
"jid bigint, " /* Job Id */
"uid bigint, " /* User Id */
"jid varchar, " /* Job Id */
"uid varchar, " /* User Id */
"start_ts bigint, " /* Start timestamp of the job */
"end_ts bigint, " /* End timestamp of the job */
"nodes set<varchar>", /* Set of nodes used by the job */
......
......@@ -108,12 +108,10 @@ JDError JobDataStoreImpl::insertJob(JobData& jdata) {
statement = cass_prepared_bind(preparedInsert);
cass_statement_bind_int64_by_name(statement, "jid", jdata.jobId);
cass_statement_bind_int64_by_name(statement, "uid", jdata.userId);
cass_statement_bind_int64_by_name(statement, "start_ts",
jdata.startTime.getRaw());
cass_statement_bind_int64_by_name(statement, "end_ts",
jdata.endTime.getRaw());
cass_statement_bind_string_by_name(statement, "jid", jdata.jobId.c_str());
cass_statement_bind_string_by_name(statement, "uid", jdata.userId.c_str());
cass_statement_bind_int64_by_name(statement, "start_ts", jdata.startTime.getRaw());
cass_statement_bind_int64_by_name(statement, "end_ts", jdata.endTime.getRaw());
/* Copy the string node list to a varchar set */
CassCollection* set = cass_collection_new(CASS_COLLECTION_TYPE_SET,
......@@ -173,14 +171,13 @@ JDError JobDataStoreImpl::updateJob(JobData& jdata) {
statement = cass_statement_new(query, 5);
cass_statement_bind_int64(statement, 3, jdata.jobId);
cass_statement_bind_int64(statement, 0, jdata.userId);
cass_statement_bind_string(statement, 3, jdata.jobId.c_str());
cass_statement_bind_string(statement, 0, jdata.userId.c_str());
cass_statement_bind_int64(statement, 4, jdata.startTime.getRaw());
cass_statement_bind_int64(statement, 1, jdata.endTime.getRaw());
/* Copy the string node list to a varchar set */
CassCollection* set = cass_collection_new(CASS_COLLECTION_TYPE_SET,
jdata.nodes.size());
CassCollection* set = cass_collection_new(CASS_COLLECTION_TYPE_SET, jdata.nodes.size());
for (auto& s : jdata.nodes) {
cass_collection_append_string(set, s.c_str());
}
......@@ -216,8 +213,7 @@ JDError JobDataStoreImpl::updateJob(JobData& jdata) {
* provided end time. If no such job exists in the data store yet, it is
* inserted.
*/
JDError JobDataStoreImpl::updateEndtime(JobId jobId, TimeStamp startTs,
TimeStamp endTime) {
JDError JobDataStoreImpl::updateEndtime(JobId jobId, TimeStamp startTs, TimeStamp endTime) {
/* Check if the input for the primary key is valid and reasonable */
if (startTs.getRaw() == 0) {
return JD_BADPARAMS;
......@@ -234,7 +230,7 @@ JDError JobDataStoreImpl::updateEndtime(JobId jobId, TimeStamp startTs,
statement = cass_statement_new(query, 3);
cass_statement_bind_int64(statement, 1, jobId);
cass_statement_bind_string(statement, 1, jobId.c_str());
cass_statement_bind_int64(statement, 2, startTs.getRaw());
cass_statement_bind_int64(statement, 0, endTime.getRaw());
......@@ -274,7 +270,7 @@ JDError JobDataStoreImpl::deleteJob(JobId jid, TimeStamp startTs) {
statement = cass_statement_new(query, 2);
cass_statement_bind_int64(statement, 0, jid);
cass_statement_bind_string(statement, 0, jid.c_str());
cass_statement_bind_int64(statement, 1, startTs.getRaw());
/* All parameters bound. Now execute the statement asynchronously */
......@@ -315,7 +311,7 @@ JDError JobDataStoreImpl::getJobByPrimaryKey(JobData& job, JobId jid,
statement = cass_statement_new(query, 2);
cass_statement_bind_int64(statement, 0, jid);
cass_statement_bind_string(statement, 0, jid.c_str());
cass_statement_bind_int64(statement, 1, startTs.getRaw());
/* All parameters bound. Now execute the statement asynchronously */
......@@ -341,26 +337,25 @@ JDError JobDataStoreImpl::getJobByPrimaryKey(JobData& job, JobId jid,
/* Retrieve data from result */
const CassRow* row = cass_result_first_row(cresult);
cass_int64_t jobId, userId, startTs, endTs;
cass_int64_t startTs, endTs;
const char *jobId, *userId;
size_t jobId_len, userId_len;
/* jid and start_ts are always set. Other values should be checked */
cass_value_get_int64(cass_row_get_column_by_name(row, "jid"), &jobId);
cass_value_get_int64(cass_row_get_column_by_name(row, "start_ts"),
&startTs);
if (cass_value_get_int64(cass_row_get_column_by_name(row, "uid"),
&userId) != CASS_OK) {
userId = 0;
cass_value_get_string(cass_row_get_column_by_name(row, "jid"), &jobId, &jobId_len);
cass_value_get_int64(cass_row_get_column_by_name(row, "start_ts"), &startTs);
if (cass_value_get_string(cass_row_get_column_by_name(row, "uid"), &userId, &userId_len) != CASS_OK) {
userId = "";
userId_len = 0;
error = JD_PARSINGERROR;
}
if (cass_value_get_int64(cass_row_get_column_by_name(row, "end_ts"),
&endTs) != CASS_OK) {
if (cass_value_get_int64(cass_row_get_column_by_name(row, "end_ts"), &endTs) != CASS_OK) {
endTs = 0;
error = JD_PARSINGERROR;
}
/* Copy the data in the JobData object */
job.jobId = (JobId) jobId;
job.userId = (UserId) userId;
job.jobId = (JobId) std::string(jobId, jobId_len);
job.userId = (UserId) std::string(userId, userId_len);
job.startTime = (uint64_t) startTs;
job.endTime = (uint64_t) endTs;
......@@ -372,8 +367,7 @@ JDError JobDataStoreImpl::getJobByPrimaryKey(JobData& job, JobId jid,
CassIterator* setIt = cass_iterator_from_collection(set);
while (cass_iterator_next(setIt)) {
cass_value_get_string(cass_iterator_get_value(setIt),
&nodeStr, &nodeStr_len);
cass_value_get_string(cass_iterator_get_value(setIt), &nodeStr, &nodeStr_len);
job.nodes.emplace_back(nodeStr, nodeStr_len);
}
......@@ -407,7 +401,7 @@ JDError JobDataStoreImpl::getJobById(JobData& job, JobId jid) {
statement = cass_statement_new(query, 1);
cass_statement_bind_int64(statement, 0, jid);
cass_statement_bind_string(statement, 0, jid.c_str());
/* All parameters bound. Now execute the statement asynchronously */
future = cass_session_execute(session, statement);
......@@ -431,27 +425,27 @@ JDError JobDataStoreImpl::getJobById(JobData& job, JobId jid) {
} else {
/* Retrieve data from result */
const CassRow* row = cass_result_first_row(cresult);
cass_int64_t jobId, userId, startTs, endTs;
cass_int64_t startTs, endTs;
const char *jobId, *userId;
size_t jobId_len, userId_len;
/* jid and start_ts are always set. Other values should be checked */
cass_value_get_int64(cass_row_get_column_by_name(row, "jid"), &jobId);
cass_value_get_int64(cass_row_get_column_by_name(row, "start_ts"),
&startTs);
if (cass_value_get_int64(cass_row_get_column_by_name(row, "uid"),
&userId) != CASS_OK) {
userId = 0;
cass_value_get_string(cass_row_get_column_by_name(row, "jid"), &jobId, &jobId_len);
cass_value_get_int64(cass_row_get_column_by_name(row, "start_ts"), &startTs);
if (cass_value_get_string(cass_row_get_column_by_name(row, "uid"), &userId, &userId_len) != CASS_OK) {
userId = "";
userId_len = 0;
error = JD_PARSINGERROR;
}
if (cass_value_get_int64(cass_row_get_column_by_name(row, "end_ts"),
&endTs) != CASS_OK) {
if (cass_value_get_int64(cass_row_get_column_by_name(row, "end_ts"), &endTs) != CASS_OK) {
endTs = 0;
error = JD_PARSINGERROR;
}
/* Copy the data in the JobData object */
job.jobId = (JobId) jobId;
job.userId = (UserId) userId;
job.jobId = (JobId) std::string(jobId, jobId_len);
job.userId = (UserId) std::string(userId, userId_len);
job.startTime = (uint64_t) startTs;
job.endTime = (uint64_t) endTs;
......@@ -560,7 +554,7 @@ JDError JobDataStoreImpl::getJobsInIntervalRunning(std::list<JobData>& jobs,
/* +++ First SELECT +++ */
/* Select entries from Cassandra where start_ts lays within the interval */
std::unordered_set<uint32_t> jobIds;
std::unordered_set<JobId> jobIds;
CassError rc = CASS_OK;
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
......@@ -659,7 +653,7 @@ JDError JobDataStoreImpl::getJobsInIntervalIncl(std::list<JobData>& jobs,
/* +++ First SELECT +++ */
/* Select entries from Cassandra where start_ts lays within the interval */
std::unordered_set<uint32_t> jobIds;
std::unordered_set<JobId> jobIds;
CassError rc = CASS_OK;
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
......@@ -734,16 +728,22 @@ JDError JobDataStoreImpl::getJobsInIntervalIncl(std::list<JobData>& jobs,
return error;
}
JDError JobDataStoreImpl::parseJobs(CassIterator* rowIt, std::list<JobData>& jobs, std::unordered_set<uint32_t>* jobIds) {
JDError JobDataStoreImpl::parseJobs(CassIterator* rowIt, std::list<JobData>& jobs, std::unordered_set<JobId>* jobIds) {
JDError error = JD_OK;
cass_int64_t jobId, userId, startTs, endTs;
cass_int64_t startTs, endTs;
const char *jobId, *userId;
size_t jobId_len, userId_len;
JobData job;
while (cass_iterator_next(rowIt)) {
const CassRow *row = cass_iterator_get_row(rowIt);
/* jid and uid should always be set. Other values should be checked */
cass_value_get_int64(cass_row_get_column_by_name(row, "jid"), &jobId);
cass_value_get_int64(cass_row_get_column_by_name(row, "uid"), &userId);
cass_value_get_string(cass_row_get_column_by_name(row, "jid"), &jobId, &jobId_len);
if (cass_value_get_string(cass_row_get_column_by_name(row, "uid"), &userId, &userId_len) != CASS_OK) {
userId = "";
userId_len = 0;
error = JD_PARSINGERROR;
}
if (cass_value_get_int64(cass_row_get_column_by_name(row, "start_ts"), &startTs) != CASS_OK) {
startTs = 0;
error = JD_PARSINGERROR;
......@@ -754,10 +754,10 @@ JDError JobDataStoreImpl::parseJobs(CassIterator* rowIt, std::list<JobData>& job
}
/* Copy the data into job object */
job.jobId = (JobId) jobId;
job.jobId = (JobId) std::string(jobId, jobId_len);
/* Set-based deduplication */
if (jobIds==nullptr || jobIds->insert(job.jobId).second) {
job.userId = (UserId) userId;
job.userId = (UserId) std::string(userId, userId_len);
job.startTime = (uint64_t) startTs;
job.endTime = (uint64_t) endTs;
......@@ -769,8 +769,7 @@ JDError JobDataStoreImpl::parseJobs(CassIterator* rowIt, std::list<JobData>& job
CassIterator *setIt = cass_iterator_from_collection(set);
while (cass_iterator_next(setIt)) {
cass_value_get_string(cass_iterator_get_value(setIt),
&nodeStr, &nodeStr_len);
cass_value_get_string(cass_iterator_get_value(setIt), &nodeStr, &nodeStr_len);
job.nodes.emplace_back(nodeStr, nodeStr_len);
}
......@@ -789,8 +788,7 @@ JDError JobDataStoreImpl::parseJobs(CassIterator* rowIt, std::list<JobData>& job
* value (= most recent job) and store the
* corresponding nodes in the NodeList.
*/
JDError JobDataStoreImpl::getNodeList(NodeList& nodes, JobId jid,
TimeStamp startTs) {
JDError JobDataStoreImpl::getNodeList(NodeList& nodes, JobId jid, TimeStamp startTs) {
JDError error = JD_UNKNOWNERROR;
/* Select entry from Cassandra */
......@@ -802,7 +800,7 @@ JDError JobDataStoreImpl::getNodeList(NodeList& nodes, JobId jid,
statement = cass_statement_new(query, 1);
cass_statement_bind_int64(statement, 0, jid);
cass_statement_bind_string(statement, 0, jid.c_str());
/* All parameters bound. Now execute the statement asynchronously */
future = cass_session_execute(session, statement);
......@@ -835,8 +833,7 @@ JDError JobDataStoreImpl::getNodeList(NodeList& nodes, JobId jid,
CassIterator* setIt = cass_iterator_from_collection(set);
while (cass_iterator_next(setIt)) {
cass_value_get_string(cass_iterator_get_value(setIt),
&nodeStr, &nodeStr_len);
cass_value_get_string(cass_iterator_get_value(setIt), &nodeStr, &nodeStr_len);
nodes.emplace_back(nodeStr, nodeStr_len);
}
......
......@@ -180,8 +180,8 @@ int main(int argc, char** argv) {
splitNodeList(nodelist, nl, ',');
try {
jd.jobId = std::stoul(jobId);
jd.userId = std::stoul(userId);
jd.jobId = jobId;
jd.userId = userId;
jd.startTime = DCDB::TimeStamp(ts);
jd.endTime = DCDB::TimeStamp((uint64_t)0);
jd.nodes = nl;
......@@ -200,7 +200,7 @@ int main(int argc, char** argv) {
std::cout << "STOP = " << ts << std::endl;
try {
if (myJobDataStore->getJobById(jd, std::stoul(jobId)) != DCDB::JD_OK) {
if (myJobDataStore->getJobById(jd, jobId) != DCDB::JD_OK) {
std::cerr << "Could not retrieve job to be updated!";
return 1;
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment