Commit 1d865069 authored by Alessio Netti's avatar Alessio Netti

Handling inconsistent timestamps in JobDataStore

parent a5b6bb4e
......@@ -569,13 +569,14 @@ JDError JobDataStoreImpl::getJobsInIntervalRunning(std::list<JobData>& jobs,
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
const char* query = "SELECT * FROM " JD_KEYSPACE_NAME "." CF_JOBDATA
" WHERE start_ts <= ? AND end_ts = ? ALLOW FILTERING;";
" WHERE start_ts <= ? AND start_ts > ? AND end_ts = ? ALLOW FILTERING;";
statement = cass_statement_new(query, 2);
statement = cass_statement_new(query, 3);
cass_statement_set_paging_size(statement, PAGING_SIZE);
cass_statement_bind_int64(statement, 0, intervalEnd.getRaw());
cass_statement_bind_int64(statement, 1, (int64_t)0);
cass_statement_bind_int64(statement, 2, (int64_t)0);
bool morePages = false;
do {
......@@ -616,13 +617,14 @@ JDError JobDataStoreImpl::getJobsInIntervalRunning(std::list<JobData>& jobs,
/* +++ Second SELECT +++ */
/* Select entries from Cassandra where end_ts lays within the interval */
query = "SELECT * FROM " JD_KEYSPACE_NAME "." CF_JOBDATA
" WHERE start_ts <= ? AND end_ts >= ? ALLOW FILTERING;";
" WHERE start_ts <= ? AND start_ts > ? AND end_ts >= ? ALLOW FILTERING;";
statement = cass_statement_new(query, 2);
statement = cass_statement_new(query, 3);
cass_statement_set_paging_size(statement, PAGING_SIZE);
cass_statement_bind_int64(statement, 0, intervalEnd.getRaw());
cass_statement_bind_int64(statement, 1, intervalStart.getRaw());
cass_statement_bind_int64(statement, 1, (int64_t)0);
cass_statement_bind_int64(statement, 2, intervalStart.getRaw());
morePages = false;
do {
......
......@@ -158,17 +158,19 @@ void JobAction::doList() {
}
}
void JobAction::doRunning() {
void JobAction::doPending() {
DCDB::JobDataStore jobDataStore(connection);
DCDB::TimeStamp tsEnd;
DCDB::TimeStamp tsStart(tsEnd.getRaw() - JOB_ACTION_OFFSET);
DCDB::TimeStamp tsNow;
DCDB::TimeStamp tsEnd((uint64_t)LLONG_MAX);
DCDB::TimeStamp tsStart((uint64_t)0);
std::list<DCDB::JobData> jobList;
DCDB::JDError err = jobDataStore.getJobsInIntervalRunning(jobList, tsStart, tsEnd);
DCDB::JDError err = jobDataStore.getJobsInIntervalIncl(jobList, tsStart, tsEnd);
switch (err) {
case DCDB::JD_OK:
std::cout << "Job ID, User ID" << std::endl;
for(const auto &j : jobList) {
std::cout << j.jobId << "," << j.userId << std::endl;
if(j.startTime.getRaw()>tsNow.getRaw() || j.startTime.getRaw()==0)
std::cout << j.jobId << "," << j.userId << std::endl;
}
std::cout << std::endl;
break;
......@@ -177,19 +179,17 @@ void JobAction::doRunning() {
}
}
void JobAction::doPending() {
void JobAction::doRunning() {
DCDB::JobDataStore jobDataStore(connection);
DCDB::TimeStamp tsNow;
DCDB::TimeStamp tsEnd((uint64_t)LLONG_MAX);
DCDB::TimeStamp tsStart(tsNow.getRaw() + JOB_ACTION_OFFSET);
DCDB::TimeStamp tsEnd;
DCDB::TimeStamp tsStart(tsEnd.getRaw() - JOB_ACTION_OFFSET);
std::list<DCDB::JobData> jobList;
DCDB::JDError err = jobDataStore.getJobsInIntervalExcl(jobList, tsStart, tsEnd);
DCDB::JDError err = jobDataStore.getJobsInIntervalRunning(jobList, tsStart, tsEnd);
switch (err) {
case DCDB::JD_OK:
std::cout << "Job ID, User ID" << std::endl;
for(const auto &j : jobList) {
if(j.startTime.getRaw()>tsNow.getRaw())
std::cout << j.jobId << "," << j.userId << std::endl;
std::cout << j.jobId << "," << j.userId << std::endl;
}
std::cout << std::endl;
break;
......@@ -209,7 +209,7 @@ void JobAction::doFinished() {
case DCDB::JD_OK:
std::cout << "Job ID, User ID" << std::endl;
for(const auto &j : jobList) {
if(j.endTime.getRaw()!=0)
if(j.startTime.getRaw()!=0 && j.endTime.getRaw()!=0)
std::cout << j.jobId << "," << j.userId << std::endl;
}
std::cout << std::endl;
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment