Commit f7b3ce3c authored by Alessio Netti's avatar Alessio Netti
Browse files

Dropping use of the IN clause for collective queries

- We now use parallel asynchronous queries on multiple sensors
parent 5429c0fd
...@@ -81,6 +81,12 @@ public: ...@@ -81,6 +81,12 @@ public:
*/ */
void setQueueSizeIo(uint32_t s); void setQueueSizeIo(uint32_t s);
/**
* @brief Returns the maximum size of the outbound requests queue.
* @return The maximum queue size.
*/
uint32_t getQueueSizeIo();
/** /**
* @brief Sets implementation-specific parameters * @brief Sets implementation-specific parameters
* @param p A vector of unsigned integers containing a specific number of * @param p A vector of unsigned integers containing a specific number of
......
...@@ -152,6 +152,12 @@ public: ...@@ -152,6 +152,12 @@ public:
*/ */
void setQueueSizeIo(uint32_t s); void setQueueSizeIo(uint32_t s);
/**
* @brief Returns the maximum size of the outbound requests queue.
* @return The maximum queue size.
*/
uint32_t getQueueSizeIo();
/** /**
* @brief Sets implementation-specific parameters * @brief Sets implementation-specific parameters
* *
......
...@@ -56,6 +56,10 @@ void Connection::setQueueSizeIo(uint32_t s) { ...@@ -56,6 +56,10 @@ void Connection::setQueueSizeIo(uint32_t s) {
impl->setQueueSizeIo(s); impl->setQueueSizeIo(s);
} }
uint32_t Connection::getQueueSizeIo() {
return impl->getQueueSizeIo();
}
void Connection::setBackendParams(uint32_t* p) { void Connection::setBackendParams(uint32_t* p) {
impl->setBackendParams(p); impl->setBackendParams(p);
} }
...@@ -335,6 +339,10 @@ void ConnectionImpl::setQueueSizeIo(uint32_t s) { ...@@ -335,6 +339,10 @@ void ConnectionImpl::setQueueSizeIo(uint32_t s) {
queueSizeIo = s; queueSizeIo = s;
} }
uint32_t ConnectionImpl::getQueueSizeIo() {
return queueSizeIo;
}
void ConnectionImpl::setBackendParams(uint32_t* p) { void ConnectionImpl::setBackendParams(uint32_t* p) {
if(!connected) { if(!connected) {
coreConnPerHost = p[0]; coreConnPerHost = p[0];
......
...@@ -341,7 +341,7 @@ void SensorDataStoreImpl::query(std::list<SensorDataStoreReading>& result, const ...@@ -341,7 +341,7 @@ void SensorDataStoreImpl::query(std::list<SensorDataStoreReading>& result, const
} else { } else {
query.append(AggregateString[aggregate] + std::string("(value) as value")); query.append(AggregateString[aggregate] + std::string("(value) as value"));
} }
query.append(" FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE sid in ? AND ws = ? AND ts >= ? AND ts <= ? ;"); query.append(" FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE sid = ? AND ws = ? AND ts >= ? AND ts <= ? ;");
future = cass_session_prepare(session, query.c_str()); future = cass_session_prepare(session, query.c_str());
cass_future_wait(future); cass_future_wait(future);
...@@ -355,83 +355,93 @@ void SensorDataStoreImpl::query(std::list<SensorDataStoreReading>& result, const ...@@ -355,83 +355,93 @@ void SensorDataStoreImpl::query(std::list<SensorDataStoreReading>& result, const
prepared = cass_future_get_prepared(future); prepared = cass_future_get_prepared(future);
cass_future_free(future); cass_future_free(future);
// Paged asynchronous queries require keeping track of statements
std::list<std::pair<CassStatement*, CassFuture*>> futures;
auto sidIt = sids.begin(); auto sidIt = sids.begin();
size_t sidCtr = 0; size_t sidCtr = 0;
while(sidIt != sids.end()) { // Limiting the amount of concurrent requests with small queues
CassCollection *cassList = cass_collection_new(CASS_COLLECTION_TYPE_LIST, sids.size()); uint32_t realGroupLimit = connection->getQueueSizeIo()/10 < QUERY_GROUP_LIMIT ? connection->getQueueSizeIo()/10 : QUERY_GROUP_LIMIT;
while(sidIt != sids.end()) {
futures.clear();
sidCtr = 0; sidCtr = 0;
// Breaking up the original list of sids in chunks // Breaking up the original list of sids in chunks
while(sidIt != sids.end() && sidCtr < QUERY_GROUP_LIMIT) { while(sidIt != sids.end() && sidCtr < realGroupLimit) {
cass_collection_append_string(cassList, sidIt->getId().c_str()); statement = cass_prepared_bind(prepared);
cass_statement_set_paging_size(statement, PAGING_SIZE);
cass_statement_bind_string(statement, 0, sidIt->getId().c_str());
cass_statement_bind_int16(statement, 1, sids.front().getRsvd());
cass_statement_bind_int64(statement, 2, start.getRaw());
cass_statement_bind_int64(statement, 3, end.getRaw());
futures.push_back(std::pair<CassStatement*, CassFuture*>(statement, cass_session_execute(session, statement)));
++sidIt; ++sidIt;
++sidCtr; ++sidCtr;
} }
statement = cass_prepared_bind(prepared);
cass_statement_set_paging_size(statement, PAGING_SIZE);
cass_statement_bind_collection(statement, 0, cassList);
cass_statement_bind_int16(statement, 1, sids.front().getRsvd());
cass_statement_bind_int64(statement, 2, start.getRaw());
cass_statement_bind_int64(statement, 3, end.getRaw());
bool morePages = false;
do { do {
future = cass_session_execute(session, statement); // Keeps track of outstanding pages from current queries
cass_future_wait(future); std::list<std::pair<CassStatement *, CassFuture *>> futurePages;
for (auto &fut : futures) {
if (cass_future_error_code(future) == CASS_OK) { bool morePages = false;
const CassResult *cresult = cass_future_get_result(future); cass_future_wait(fut.second);
CassIterator *rows = cass_iterator_from_result(cresult);
SensorDataStoreReading entry; if (cass_future_error_code(fut.second) == CASS_OK) {
cass_int64_t ts, value; const CassResult *cresult = cass_future_get_result(fut.second);
const char *name; CassIterator *rows = cass_iterator_from_result(cresult);
size_t name_len; SensorDataStoreReading entry;
cass_int64_t ts, value;
while (cass_iterator_next(rows)) { const char *name;
const CassRow *row = cass_iterator_get_row(rows); size_t name_len;
cass_value_get_int64(cass_row_get_column_by_name(row, "ts"), &ts); while (cass_iterator_next(rows)) {
cass_value_get_int64(cass_row_get_column_by_name(row, "value"), &value); const CassRow *row = cass_iterator_get_row(rows);
entry.timeStamp = (uint64_t) ts;
entry.value = (int64_t) value; cass_value_get_int64(cass_row_get_column_by_name(row, "ts"), &ts);
cass_value_get_int64(cass_row_get_column_by_name(row, "value"), &value);
if(storeSids) { entry.timeStamp = (uint64_t) ts;
cass_value_get_string(cass_row_get_column_by_name(row, "sid"), &name, &name_len); entry.value = (int64_t) value;
entry.sensorId = SensorId(std::string(name, name_len));
} if (storeSids) {
cass_value_get_string(cass_row_get_column_by_name(row, "sid"), &name, &name_len);
result.push_back(entry); entry.sensorId = SensorId(std::string(name, name_len));
}
result.push_back(entry);
#if 0 #if 0
if (localtime) { if (localtime) {
t.convertToLocal(); t.convertToLocal();
} }
if (raw) { if (raw) {
std::cout << sensorName << "," << std::dec << t.getRaw() << "," << std::dec << value << std::endl; std::cout << sensorName << "," << std::dec << t.getRaw() << "," << std::dec << value << std::endl;
} }
else { else {
std::cout << sensorName << "," << t.getString() << "," << std::dec << value << std::endl; std::cout << sensorName << "," << t.getString() << "," << std::dec << value << std::endl;
} }
#endif #endif
}
if ((morePages = cass_result_has_more_pages(cresult))) {
cass_statement_set_paging_state(fut.first, cresult);
futurePages.push_back(std::pair<CassStatement *, CassFuture *>(fut.first, cass_session_execute(session, fut.first)));
} else {
cass_statement_free(fut.first);
}
cass_iterator_free(rows);
cass_result_free(cresult);
} else {
cass_statement_free(fut.first);
morePages = false;
} }
cass_future_free(fut.second);
if((morePages = cass_result_has_more_pages(cresult)))
cass_statement_set_paging_state(statement, cresult);
cass_iterator_free(rows);
cass_result_free(cresult);
} else {
morePages = false;
} }
futures.clear();
cass_future_free(future); futures = futurePages;
futurePages.clear();
} } while(!futures.empty());
while(morePages);
cass_statement_free(statement);
cass_collection_free(cassList);
} }
cass_prepared_free(prepared); cass_prepared_free(prepared);
...@@ -517,9 +527,9 @@ void SensorDataStoreImpl::fuzzyQuery(std::list<SensorDataStoreReading>& result, ...@@ -517,9 +527,9 @@ void SensorDataStoreImpl::fuzzyQuery(std::list<SensorDataStoreReading>& result,
const CassPrepared *prepared = nullptr; const CassPrepared *prepared = nullptr;
const char *queryBefore; const char *queryBefore;
if(storeSids) if(storeSids)
queryBefore = "SELECT sid,ts,value FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE sid in ? AND ws = ? AND ts <= ? ORDER BY ws DESC, ts DESC PER PARTITION LIMIT 1"; queryBefore = "SELECT sid,ts,value FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE sid = ? AND ws = ? AND ts <= ? ORDER BY ws DESC, ts DESC LIMIT 1";
else else
queryBefore = "SELECT ts,value FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE sid in ? AND ws = ? AND ts <= ? ORDER BY ws DESC, ts DESC PER PARTITION LIMIT 1"; queryBefore = "SELECT ts,value FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE sid = ? AND ws = ? AND ts <= ? ORDER BY ws DESC, ts DESC LIMIT 1";
future = cass_session_prepare(session, queryBefore); future = cass_session_prepare(session, queryBefore);
cass_future_wait(future); cass_future_wait(future);
...@@ -534,61 +544,64 @@ void SensorDataStoreImpl::fuzzyQuery(std::list<SensorDataStoreReading>& result, ...@@ -534,61 +544,64 @@ void SensorDataStoreImpl::fuzzyQuery(std::list<SensorDataStoreReading>& result,
prepared = cass_future_get_prepared(future); prepared = cass_future_get_prepared(future);
cass_future_free(future); cass_future_free(future);
std::list<CassFuture*> futures;
auto sidIt = sids.begin(); auto sidIt = sids.begin();
size_t sidCtr = 0; size_t sidCtr = 0;
// Limiting the amount of concurrent requests with small queues
uint32_t realGroupLimit = connection->getQueueSizeIo()/10 < QUERY_GROUP_LIMIT ? connection->getQueueSizeIo()/10 : QUERY_GROUP_LIMIT;
while(sidIt != sids.end()) { while(sidIt != sids.end()) {
CassCollection *cassList = cass_collection_new(CASS_COLLECTION_TYPE_LIST, QUERY_GROUP_LIMIT);
sidCtr = 0; sidCtr = 0;
futures.clear();
// Breaking up the original list of sids in chunks // Breaking up the original list of sids in chunks
while(sidIt != sids.end() && sidCtr < QUERY_GROUP_LIMIT) { while(sidIt != sids.end() && sidCtr < realGroupLimit) {
cass_collection_append_string(cassList, sidIt->getId().c_str()); statement = cass_prepared_bind(prepared);
cass_statement_set_paging_size(statement, -1);
cass_statement_bind_string(statement, 0, sidIt->getId().c_str());
cass_statement_bind_int16(statement, 1, sids.front().getRsvd());
cass_statement_bind_int64(statement, 2, ts.getRaw());
futures.push_back(cass_session_execute(session, statement));
cass_statement_free(statement);
++sidIt; ++sidIt;
++sidCtr; ++sidCtr;
} }
statement = cass_prepared_bind(prepared);
cass_statement_set_paging_size(statement, -1);
cass_statement_bind_collection(statement, 0, cassList);
cass_statement_bind_int16(statement, 1, sids.front().getRsvd());
cass_statement_bind_int64(statement, 2, ts.getRaw());
future = cass_session_execute(session, statement);
cass_future_wait(future);
SensorDataStoreReading r; SensorDataStoreReading r;
for(auto& fut : futures) {
cass_future_wait(fut);
if (cass_future_error_code(fut) == CASS_OK) {
const CassResult *cresult = cass_future_get_result(fut);
CassIterator *rows = cass_iterator_from_result(cresult);
cass_int64_t tsInt, value;
const char *name;
size_t name_len;
if (cass_future_error_code(future) == CASS_OK) { while (cass_iterator_next(rows)) {
const CassResult *cresult = cass_future_get_result(future); const CassRow *row = cass_iterator_get_row(rows);
CassIterator *rows = cass_iterator_from_result(cresult);
cass_int64_t tsInt, value;
const char *name;
size_t name_len;
while (cass_iterator_next(rows)) { cass_value_get_int64(cass_row_get_column_by_name(row, "ts"), &tsInt);
const CassRow *row = cass_iterator_get_row(rows); cass_value_get_int64(cass_row_get_column_by_name(row, "value"), &value);
cass_value_get_int64(cass_row_get_column_by_name(row, "ts"), &tsInt); if (ts.getRaw() - (uint64_t) tsInt < tol_ns) {
cass_value_get_int64(cass_row_get_column_by_name(row, "value"), &value); if(storeSids) {
cass_value_get_string(cass_row_get_column_by_name(row, "sid"), &name, &name_len);
if (ts.getRaw() - (uint64_t) tsInt < tol_ns) { r.sensorId = SensorId(std::string(name, name_len));
if(storeSids) { }
cass_value_get_string(cass_row_get_column_by_name(row, "sid"), &name, &name_len); r.timeStamp = (uint64_t) tsInt;
r.sensorId = SensorId(std::string(name, name_len)); r.value = (int64_t) value;
result.push_back(r);
} }
r.timeStamp = (uint64_t) tsInt;
r.value = (int64_t) value;
result.push_back(r);
} }
cass_iterator_free(rows);
cass_result_free(cresult);
} }
cass_iterator_free(rows); cass_future_free(fut);
cass_result_free(cresult);
} }
cass_statement_free(statement);
cass_future_free(future);
cass_collection_free(cassList);
} }
cass_prepared_free(prepared); cass_prepared_free(prepared);
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment