The name of the initial branch for new projects is now "main" instead of "master". Existing projects remain unchanged. More information: https://doku.lrz.de/display/PUBLIC/GitLab

Commit 5ead00a6 authored by Alessio Netti's avatar Alessio Netti
Browse files

libDCDB: paging for JobDataStore and CaliEvtDataStore queries

parent 70e07df2
......@@ -44,7 +44,7 @@
#define CF_SENSORDATA "sensordata"
#define SENSORDATA_GC_GRACE_SECONDS "600"
#define SENSORDATA_COMPACTION "{'class' : 'TimeWindowCompactionStrategy', 'compaction_window_unit' : 'DAYS', 'compaction_window_size' : 1 }"
#define PAGING_SIZE 50000
#define PAGING_SIZE 10000
#define CONFIG_KEYSPACE_NAME KEYSPACE_NAME "_config"
#define CF_PUBLISHEDSENSORS "publishedsensors"
......
......@@ -222,41 +222,54 @@ void CaliEvtDataStoreImpl::query(std::list<CaliEvtData>& result, SensorId& sid,
cass_future_free(future);
statement = cass_prepared_bind(prepared);
cass_statement_set_paging_size(statement, PAGING_SIZE);
cass_statement_bind_string(statement, 0, sid.getId().c_str());
cass_statement_bind_int16(statement, 1, sid.getRsvd());
cass_statement_bind_int64(statement, 2, start.getRaw());
cass_statement_bind_int64(statement, 3, end.getRaw());
future = cass_session_execute(session, statement);
cass_future_wait(future);
if (cass_future_error_code(future) == CASS_OK) {
const CassResult* cresult = cass_future_get_result(future);
CassIterator* rows = cass_iterator_from_result(cresult);
CaliEvtData entry;
while (cass_iterator_next(rows)) {
const CassRow* row = cass_iterator_get_row(rows);
bool morePages = false;
do {
future = cass_session_execute(session, statement);
cass_future_wait(future);
if (cass_future_error_code(future) == CASS_OK) {
const CassResult* cresult = cass_future_get_result(future);
CassIterator* rows = cass_iterator_from_result(cresult);
CaliEvtData entry;
while (cass_iterator_next(rows)) {
const CassRow* row = cass_iterator_get_row(rows);
cass_int64_t ts;
const char* eventStr;
size_t eventStr_len;
cass_value_get_int64(cass_row_get_column_by_name(row, "ts"), &ts);
cass_value_get_string(cass_row_get_column_by_name(row, "value"), &eventStr, &eventStr_len);
entry.eventId = sid;
entry.timeStamp = (uint64_t)ts;
entry.event = std::string(eventStr, eventStr_len);
result.push_back(entry);
}
cass_int64_t ts;
const char* eventStr;
size_t eventStr_len;
cass_value_get_int64(cass_row_get_column_by_name(row, "ts"), &ts);
cass_value_get_string(cass_row_get_column_by_name(row, "value"), &eventStr, &eventStr_len);
if((morePages = cass_result_has_more_pages(cresult)))
cass_statement_set_paging_state(statement, cresult);
cass_iterator_free(rows);
cass_result_free(cresult);
} else {
morePages = false;
}
entry.eventId = sid;
entry.timeStamp = (uint64_t)ts;
entry.event = std::string(eventStr, eventStr_len);
cass_future_free(future);
result.push_back(entry);
}
cass_iterator_free(rows);
cass_result_free(cresult);
}
while(morePages);
cass_statement_free(statement);
cass_future_free(future);
cass_prepared_free(prepared);
}
......@@ -417,41 +430,54 @@ void CaliEvtDataStoreImpl::queryCB(CaliEvtDataStore::QueryCECbFunc cbFunc, void*
cass_future_free(future);
statement = cass_prepared_bind(prepared);
cass_statement_set_paging_size(statement, PAGING_SIZE);
cass_statement_bind_string(statement, 0, sid.getId().c_str());
cass_statement_bind_int16(statement, 1, sid.getRsvd());
cass_statement_bind_int64(statement, 2, start.getRaw());
cass_statement_bind_int64(statement, 3, end.getRaw());
future = cass_session_execute(session, statement);
cass_future_wait(future);
if (cass_future_error_code(future) == CASS_OK) {
const CassResult* cresult = cass_future_get_result(future);
CassIterator* rows = cass_iterator_from_result(cresult);
CaliEvtData entry;
while (cass_iterator_next(rows)) {
const CassRow* row = cass_iterator_get_row(rows);
bool morePages = false;
do {
future = cass_session_execute(session, statement);
cass_future_wait(future);
if (cass_future_error_code(future) == CASS_OK) {
const CassResult* cresult = cass_future_get_result(future);
CassIterator* rows = cass_iterator_from_result(cresult);
CaliEvtData entry;
while (cass_iterator_next(rows)) {
const CassRow* row = cass_iterator_get_row(rows);
cass_int64_t ts;
const char* eventStr;
size_t eventStr_len;
cass_value_get_int64(cass_row_get_column_by_name(row, "ts"), &ts);
cass_value_get_string(cass_row_get_column_by_name(row, "value"), &eventStr, &eventStr_len);
entry.eventId = sid;
entry.timeStamp = (uint64_t)ts;
entry.event = std::string(eventStr, eventStr_len);
cbFunc(entry, userData);
}
cass_int64_t ts;
const char* eventStr;
size_t eventStr_len;
cass_value_get_int64(cass_row_get_column_by_name(row, "ts"), &ts);
cass_value_get_string(cass_row_get_column_by_name(row, "value"), &eventStr, &eventStr_len);
if((morePages = cass_result_has_more_pages(cresult)))
cass_statement_set_paging_state(statement, cresult);
cass_iterator_free(rows);
cass_result_free(cresult);
} else {
morePages = false;
}
entry.eventId = sid;
entry.timeStamp = (uint64_t)ts;
entry.event = std::string(eventStr, eventStr_len);
cass_future_free(future);
cbFunc(entry, userData);
}
cass_iterator_free(rows);
cass_result_free(cresult);
}
while(morePages);
cass_statement_free(statement);
cass_future_free(future);
cass_prepared_free(prepared);
}
......
......@@ -500,36 +500,46 @@ JDError JobDataStoreImpl::getJobsInIntervalExcl(std::list<JobData>& jobs,
" WHERE start_ts >= ? AND end_ts <= ? ALLOW FILTERING;";
statement = cass_statement_new(query, 2);
cass_statement_set_paging_size(statement, PAGING_SIZE);
cass_statement_bind_int64(statement, 0, intervalStart.getRaw());
cass_statement_bind_int64(statement, 1, intervalEnd.getRaw());
/* All parameters bound. Now execute the statement asynchronously */
future = cass_session_execute(session, statement);
/* Wait for the statement to finish */
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
error = JD_UNKNOWNERROR;
} else {
error = JD_OK;
/* Retrieve data from result */
const CassResult* cresult = cass_future_get_result(future);
CassIterator* rowIt = cass_iterator_from_result(cresult);
bool morePages = false;
do {
/* All parameters bound. Now execute the statement asynchronously */
future = cass_session_execute(session, statement);
error = parseJobs(rowIt, jobs, NULL);
/* Wait for the statement to finish */
cass_future_wait(future);
cass_iterator_free(rowIt);
cass_result_free(cresult);
}
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
error = JD_UNKNOWNERROR;
morePages = false;
} else {
error = JD_OK;
/* Retrieve data from result */
const CassResult* cresult = cass_future_get_result(future);
CassIterator* rowIt = cass_iterator_from_result(cresult);
error = parseJobs(rowIt, jobs, NULL);
cass_future_free(future);
cass_statement_free(statement);
if((morePages = cass_result_has_more_pages(cresult)))
cass_statement_set_paging_state(statement, cresult);
cass_iterator_free(rowIt);
cass_result_free(cresult);
}
cass_future_free(future);
}
while(morePages);
cass_statement_free(statement);
return error;
}
......@@ -562,34 +572,45 @@ JDError JobDataStoreImpl::getJobsInIntervalRunning(std::list<JobData>& jobs,
" WHERE start_ts <= ? AND end_ts = ? ALLOW FILTERING;";
statement = cass_statement_new(query, 2);
cass_statement_set_paging_size(statement, PAGING_SIZE);
cass_statement_bind_int64(statement, 0, intervalEnd.getRaw());
cass_statement_bind_int64(statement, 1, (int64_t)0);
/* All parameters bound. Now execute the statement asynchronously */
future = cass_session_execute(session, statement);
/* Wait for the statement to finish */
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
error = JD_UNKNOWNERROR;
} else {
error = JD_OK;
bool morePages = false;
do {
/* All parameters bound. Now execute the statement asynchronously */
future = cass_session_execute(session, statement);
/* Wait for the statement to finish */
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
error = JD_UNKNOWNERROR;
morePages = false;
} else {
error = JD_OK;
/* Retrieve data from result */
const CassResult* cresult = cass_future_get_result(future);
CassIterator* rowIt = cass_iterator_from_result(cresult);
error = parseJobs(rowIt, jobs, &jobIds);
/* Retrieve data from result */
const CassResult* cresult = cass_future_get_result(future);
CassIterator* rowIt = cass_iterator_from_result(cresult);
if((morePages = cass_result_has_more_pages(cresult)))
cass_statement_set_paging_state(statement, cresult);
cass_iterator_free(rowIt);
cass_result_free(cresult);
}
error = parseJobs(rowIt, jobs, &jobIds);
cass_future_free(future);
cass_iterator_free(rowIt);
cass_result_free(cresult);
}
cass_future_free(future);
while(morePages);
cass_statement_free(statement);
/* +++ Second SELECT +++ */
......@@ -598,32 +619,43 @@ JDError JobDataStoreImpl::getJobsInIntervalRunning(std::list<JobData>& jobs,
" WHERE start_ts <= ? AND end_ts >= ? ALLOW FILTERING;";
statement = cass_statement_new(query, 2);
cass_statement_set_paging_size(statement, PAGING_SIZE);
cass_statement_bind_int64(statement, 0, intervalEnd.getRaw());
cass_statement_bind_int64(statement, 1, intervalStart.getRaw());
/* All parameters bound. Now execute the statement asynchronously */
future = cass_session_execute(session, statement);
/* Wait for the statement to finish */
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
error = JD_UNKNOWNERROR;
} else {
/* Retrieve data from result */
const CassResult* cresult = cass_future_get_result(future);
CassIterator* rowIt = cass_iterator_from_result(cresult);
morePages = false;
do {
/* All parameters bound. Now execute the statement asynchronously */
future = cass_session_execute(session, statement);
/* Wait for the statement to finish */
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
error = JD_UNKNOWNERROR;
morePages = false;
} else {
/* Retrieve data from result */
const CassResult* cresult = cass_future_get_result(future);
CassIterator* rowIt = cass_iterator_from_result(cresult);
error = parseJobs(rowIt, jobs, &jobIds);
error = parseJobs(rowIt, jobs, &jobIds);
if((morePages = cass_result_has_more_pages(cresult)))
cass_statement_set_paging_state(statement, cresult);
cass_iterator_free(rowIt);
cass_result_free(cresult);
}
cass_iterator_free(rowIt);
cass_result_free(cresult);
cass_future_free(future);
}
cass_future_free(future);
while(morePages);
cass_statement_free(statement);
return error;
......@@ -661,34 +693,45 @@ JDError JobDataStoreImpl::getJobsInIntervalIncl(std::list<JobData>& jobs,
" WHERE start_ts >= ? AND start_ts <= ? ;";
statement = cass_statement_new(query, 2);
cass_statement_set_paging_size(statement, PAGING_SIZE);
cass_statement_bind_int64(statement, 0, intervalStart.getRaw());
cass_statement_bind_int64(statement, 1, intervalEnd.getRaw());
/* All parameters bound. Now execute the statement asynchronously */
future = cass_session_execute(session, statement);
/* Wait for the statement to finish */
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
error = JD_UNKNOWNERROR;
} else {
error = JD_OK;
bool morePages = false;
do {
/* All parameters bound. Now execute the statement asynchronously */
future = cass_session_execute(session, statement);
/* Wait for the statement to finish */
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
error = JD_UNKNOWNERROR;
morePages = false;
} else {
error = JD_OK;
/* Retrieve data from result */
const CassResult* cresult = cass_future_get_result(future);
CassIterator* rowIt = cass_iterator_from_result(cresult);
error = parseJobs(rowIt, jobs, &jobIds);
/* Retrieve data from result */
const CassResult* cresult = cass_future_get_result(future);
CassIterator* rowIt = cass_iterator_from_result(cresult);
if((morePages = cass_result_has_more_pages(cresult)))
cass_statement_set_paging_state(statement, cresult);
cass_iterator_free(rowIt);
cass_result_free(cresult);
}
error = parseJobs(rowIt, jobs, &jobIds);
cass_future_free(future);
cass_iterator_free(rowIt);
cass_result_free(cresult);
}
cass_future_free(future);
while(morePages);
cass_statement_free(statement);
/* +++ Second SELECT +++ */
......@@ -697,32 +740,43 @@ JDError JobDataStoreImpl::getJobsInIntervalIncl(std::list<JobData>& jobs,
" WHERE end_ts >= ? AND end_ts <= ? ALLOW FILTERING;";
statement = cass_statement_new(query, 2);
cass_statement_set_paging_size(statement, PAGING_SIZE);
cass_statement_bind_int64(statement, 0, intervalStart.getRaw());
cass_statement_bind_int64(statement, 1, intervalEnd.getRaw());
/* All parameters bound. Now execute the statement asynchronously */
future = cass_session_execute(session, statement);
/* Wait for the statement to finish */
cass_future_wait(future);
morePages = false;
do {
/* All parameters bound. Now execute the statement asynchronously */
future = cass_session_execute(session, statement);
/* Wait for the statement to finish */
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
error = JD_UNKNOWNERROR;
morePages = false;
} else {
/* Retrieve data from result */
const CassResult* cresult = cass_future_get_result(future);
CassIterator* rowIt = cass_iterator_from_result(cresult);
error = parseJobs(rowIt, jobs, &jobIds);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
error = JD_UNKNOWNERROR;
} else {
/* Retrieve data from result */
const CassResult* cresult = cass_future_get_result(future);
CassIterator* rowIt = cass_iterator_from_result(cresult);
if((morePages = cass_result_has_more_pages(cresult)))
cass_statement_set_paging_state(statement, cresult);
cass_iterator_free(rowIt);
cass_result_free(cresult);
}
error = parseJobs(rowIt, jobs, &jobIds);
cass_future_free(future);
cass_iterator_free(rowIt);
cass_result_free(cresult);
}
cass_future_free(future);
while(morePages);
cass_statement_free(statement);
return error;
......
......@@ -628,39 +628,52 @@ void SensorDataStoreImpl::queryCB(SensorDataStore::QueryCbFunc cbFunc, void* use
cass_future_free(future);
statement = cass_prepared_bind(prepared);
cass_statement_set_paging_size(statement, PAGING_SIZE);
cass_statement_bind_string(statement, 0, sid.getId().c_str());
cass_statement_bind_int16(statement, 1, sid.getRsvd());
cass_statement_bind_int64(statement, 2, start.getRaw());
cass_statement_bind_int64(statement, 3, end.getRaw());
future = cass_session_execute(session, statement);
cass_future_wait(future);
if (cass_future_error_code(future) == CASS_OK) {
const CassResult* cresult = cass_future_get_result(future);
CassIterator* rows = cass_iterator_from_result(cresult);
SensorDataStoreReading entry;
while (cass_iterator_next(rows)) {
const CassRow* row = cass_iterator_get_row(rows);
bool morePages = false;
do {
future = cass_session_execute(session, statement);
cass_future_wait(future);
if (cass_future_error_code(future) == CASS_OK) {
const CassResult* cresult = cass_future_get_result(future);
CassIterator* rows = cass_iterator_from_result(cresult);
SensorDataStoreReading entry;
while (cass_iterator_next(rows)) {
const CassRow* row = cass_iterator_get_row(rows);
cass_int64_t ts, value;
cass_value_get_int64(cass_row_get_column_by_name(row, "ts"), &ts);
cass_value_get_int64(cass_row_get_column_by_name(row, "value"), &value);
entry.sensorId = sid;
entry.timeStamp = (uint64_t)ts;
entry.value = (int64_t)value;
cbFunc(entry, userData);
}
cass_int64_t ts, value;
cass_value_get_int64(cass_row_get_column_by_name(row, "ts"), &ts);
cass_value_get_int64(cass_row_get_column_by_name(row, "value"), &value);
if((morePages = cass_result_has_more_pages(cresult)))
cass_statement_set_paging_state(statement, cresult);
cass_iterator_free(rows);
cass_result_free(cresult);
} else {
morePages = false;
}
entry.sensorId = sid;
entry.timeStamp = (uint64_t)ts;
entry.value = (int64_t)value;
cass_future_free(future);
cbFunc(entry, userData);
}
cass_iterator_free(rows);
cass_result_free(cresult);
}
while(morePages);
cass_statement_free(statement);
cass_future_free(future);
cass_prepared_free(prepared);
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment