Commit 70e07df2 authored by Alessio Netti's avatar Alessio Netti

libDCDB: paging for SensorDataStore and SensorConfig queries

parent 39db19f7
......@@ -44,6 +44,7 @@
#define CF_SENSORDATA "sensordata"
#define SENSORDATA_GC_GRACE_SECONDS "600"
#define SENSORDATA_COMPACTION "{'class' : 'TimeWindowCompactionStrategy', 'compaction_window_unit' : 'DAYS', 'compaction_window_size' : 1 }"
#define PAGING_SIZE 50000
#define CONFIG_KEYSPACE_NAME KEYSPACE_NAME "_config"
#define CF_PUBLISHEDSENSORS "publishedsensors"
......
......@@ -372,8 +372,7 @@ bool ConnectionImpl::connect() {
cass_cluster_set_num_threads_io(cluster, numThreadsIo);
cass_cluster_set_queue_size_io(cluster, queueSizeIo);
cass_cluster_set_core_connections_per_host(cluster, coreConnPerHost);
//TODO: avoid this and use actual paging in queries
cass_cluster_set_request_timeout(cluster, 300000);
cass_cluster_set_request_timeout(cluster, 60000);
/* Force protcol version to 1 */
cass_cluster_set_protocol_version(cluster, 1);
......
......@@ -691,37 +691,48 @@ SCError SensorConfigImpl::getPublicSensorNames(std::list<std::string>& publicSen
const char* query = "SELECT name FROM " CONFIG_KEYSPACE_NAME "." CF_PUBLISHEDSENSORS " ;";
statement = cass_statement_new(query, 0);
cass_statement_set_paging_size(statement, PAGING_SIZE);
future = cass_session_execute(session, statement);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
cass_future_free(future);
cass_statement_free(statement);
return SC_UNKNOWNERROR;
} else {
const CassResult* result = cass_future_get_result(future);
CassIterator* iterator = cass_iterator_from_result(result);
while (cass_iterator_next(iterator)) {
const char* name;
size_t name_len;
const CassRow* row = cass_iterator_get_row(iterator);
if (cass_value_get_string(cass_row_get_column_by_name(row, "name"), &name, &name_len) != CASS_OK) {
name = ""; name_len = 0;
bool morePages = false;
do {
future = cass_session_execute(session, statement);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc == CASS_OK) {
const CassResult* result = cass_future_get_result(future);
CassIterator* iterator = cass_iterator_from_result(result);
while (cass_iterator_next(iterator)) {
const char* name;
size_t name_len;
const CassRow* row = cass_iterator_get_row(iterator);
if (cass_value_get_string(cass_row_get_column_by_name(row, "name"), &name, &name_len) != CASS_OK) {
name = ""; name_len = 0;
}
publicSensors.push_back(std::string(name, name_len));
}
publicSensors.push_back(std::string(name, name_len));
if((morePages = cass_result_has_more_pages(result)))
cass_statement_set_paging_state(statement, result);
cass_result_free(result);
cass_iterator_free(iterator);
} else {
connection->printError(future);
cass_future_free(future);
cass_statement_free(statement);
return SC_UNKNOWNERROR;
}
cass_result_free(result);
cass_iterator_free(iterator);
cass_future_free(future);
}
while(morePages);
cass_future_free(future);
cass_statement_free(statement);
return SC_OK;
}
......@@ -743,113 +754,124 @@ SCError SensorConfigImpl::getPublicSensorsVerbose(std::list<PublicSensor>& publi
const char* query = "SELECT * FROM " CONFIG_KEYSPACE_NAME "." CF_PUBLISHEDSENSORS " ;";
statement = cass_statement_new(query, 0);
cass_statement_set_paging_size(statement, PAGING_SIZE);
future = cass_session_execute(session, statement);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
cass_future_free(future);
cass_statement_free(statement);
return SC_UNKNOWNERROR;
} else {
const CassResult* result = cass_future_get_result(future);
CassIterator* iterator = cass_iterator_from_result(result);
while (cass_iterator_next(iterator)) {
const char* name;
size_t name_len;
cass_bool_t is_virtual;
const char* pattern;
size_t pattern_len;
double scaling_factor;
const char* unit;
size_t unit_len;
int64_t sensor_mask;
const char* expression;
size_t expression_len;
const char* vsensorid;
size_t vsensorid_len;
int64_t tzero;
int64_t interval;
int64_t ttl;
set<string> operations;
PublicSensor sensor;
const CassRow* row = cass_iterator_get_row(iterator);
if (cass_value_get_string(cass_row_get_column_by_name(row, "name"), &name, &name_len) != CASS_OK) {
name = ""; name_len = 0;
}
if (cass_value_get_bool(cass_row_get_column_by_name(row, "virtual"), &is_virtual) != CASS_OK) {
is_virtual = cass_false;
}
if (cass_value_get_string(cass_row_get_column_by_name(row, "pattern"), &pattern, &pattern_len) != CASS_OK) {
pattern = ""; pattern_len = 0;
}
if (cass_value_get_double(cass_row_get_column_by_name(row, "scaling_factor"), &scaling_factor) != CASS_OK) {
scaling_factor = 1.0;
}
if (cass_value_get_string(cass_row_get_column_by_name(row, "unit"), &unit, &unit_len) != CASS_OK) {
unit = ""; unit_len = 0;
}
if (cass_value_get_int64(cass_row_get_column_by_name(row, "sensor_mask"), &sensor_mask) != CASS_OK) {
sensor_mask = 0;
}
if (cass_value_get_string(cass_row_get_column_by_name(row, "expression"), &expression, &expression_len) != CASS_OK) {
expression = ""; expression_len = 0;
}
if (cass_value_get_string(cass_row_get_column_by_name(row, "vsensorid"), &vsensorid, &vsensorid_len) != CASS_OK) {
vsensorid = ""; vsensorid_len = 0;
}
if (cass_value_get_int64(cass_row_get_column_by_name(row, "tzero"), &tzero) != CASS_OK) {
tzero = 0;
}
if (cass_value_get_int64(cass_row_get_column_by_name(row, "interval"), &interval) != CASS_OK) {
interval = 0;
}
if (cass_value_get_int64(cass_row_get_column_by_name(row, "ttl"), &ttl) != CASS_OK) {
ttl = 0;
}
const CassValue* opSet = nullptr;
CassIterator *opSetIt = nullptr;
if((opSet=cass_row_get_column_by_name(row, "operations")) && (opSetIt=cass_iterator_from_collection(opSet))) {
const char *opString;
size_t opLen;
while (cass_iterator_next(opSetIt)) {
if (cass_value_get_string(cass_iterator_get_value(opSetIt), &opString, &opLen) != CASS_OK) {
operations.clear();
break;
} else
operations.insert(std::string(opString, opLen));
bool morePages = false;
do {
future = cass_session_execute(session, statement);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc == CASS_OK) {
const CassResult* result = cass_future_get_result(future);
CassIterator* iterator = cass_iterator_from_result(result);
while (cass_iterator_next(iterator)) {
const char* name;
size_t name_len;
cass_bool_t is_virtual;
const char* pattern;
size_t pattern_len;
double scaling_factor;
const char* unit;
size_t unit_len;
int64_t sensor_mask;
const char* expression;
size_t expression_len;
const char* vsensorid;
size_t vsensorid_len;
int64_t tzero;
int64_t interval;
int64_t ttl;
set<string> operations;
PublicSensor sensor;
const CassRow* row = cass_iterator_get_row(iterator);
if (cass_value_get_string(cass_row_get_column_by_name(row, "name"), &name, &name_len) != CASS_OK) {
name = ""; name_len = 0;
}
cass_iterator_free(opSetIt);
if (cass_value_get_bool(cass_row_get_column_by_name(row, "virtual"), &is_virtual) != CASS_OK) {
is_virtual = cass_false;
}
if (cass_value_get_string(cass_row_get_column_by_name(row, "pattern"), &pattern, &pattern_len) != CASS_OK) {
pattern = ""; pattern_len = 0;
}
if (cass_value_get_double(cass_row_get_column_by_name(row, "scaling_factor"), &scaling_factor) != CASS_OK) {
scaling_factor = 1.0;
}
if (cass_value_get_string(cass_row_get_column_by_name(row, "unit"), &unit, &unit_len) != CASS_OK) {
unit = ""; unit_len = 0;
}
if (cass_value_get_int64(cass_row_get_column_by_name(row, "sensor_mask"), &sensor_mask) != CASS_OK) {
sensor_mask = 0;
}
if (cass_value_get_string(cass_row_get_column_by_name(row, "expression"), &expression, &expression_len) != CASS_OK) {
expression = ""; expression_len = 0;
}
if (cass_value_get_string(cass_row_get_column_by_name(row, "vsensorid"), &vsensorid, &vsensorid_len) != CASS_OK) {
vsensorid = ""; vsensorid_len = 0;
}
if (cass_value_get_int64(cass_row_get_column_by_name(row, "tzero"), &tzero) != CASS_OK) {
tzero = 0;
}
if (cass_value_get_int64(cass_row_get_column_by_name(row, "interval"), &interval) != CASS_OK) {
interval = 0;
}
if (cass_value_get_int64(cass_row_get_column_by_name(row, "ttl"), &ttl) != CASS_OK) {
ttl = 0;
}
const CassValue* opSet = nullptr;
CassIterator *opSetIt = nullptr;
if((opSet=cass_row_get_column_by_name(row, "operations")) && (opSetIt=cass_iterator_from_collection(opSet))) {
const char *opString;
size_t opLen;
while (cass_iterator_next(opSetIt)) {
if (cass_value_get_string(cass_iterator_get_value(opSetIt), &opString, &opLen) != CASS_OK) {
operations.clear();
break;
} else
operations.insert(std::string(opString, opLen));
}
cass_iterator_free(opSetIt);
}
sensor.name = std::string(name, name_len);
sensor.is_virtual = is_virtual == cass_true ? true : false;
sensor.pattern = std::string(pattern, pattern_len);
sensor.scaling_factor = scaling_factor;
sensor.unit = std::string(unit, unit_len);
sensor.sensor_mask = sensor_mask;
sensor.expression = std::string(expression, expression_len);
sensor.v_sensorid = std::string(vsensorid, vsensorid_len);
sensor.t_zero = tzero;
sensor.interval = interval;
sensor.ttl = ttl;
sensor.operations = operations;
publicSensors.push_back(sensor);
}
sensor.name = std::string(name, name_len);
sensor.is_virtual = is_virtual == cass_true ? true : false;
sensor.pattern = std::string(pattern, pattern_len);
sensor.scaling_factor = scaling_factor;
sensor.unit = std::string(unit, unit_len);
sensor.sensor_mask = sensor_mask;
sensor.expression = std::string(expression, expression_len);
sensor.v_sensorid = std::string(vsensorid, vsensorid_len);
sensor.t_zero = tzero;
sensor.interval = interval;
sensor.ttl = ttl;
sensor.operations = operations;
publicSensors.push_back(sensor);
if((morePages = cass_result_has_more_pages(result)))
cass_statement_set_paging_state(statement, result);
cass_result_free(result);
cass_iterator_free(iterator);
} else {
connection->printError(future);
cass_future_free(future);
cass_statement_free(statement);
return SC_UNKNOWNERROR;
}
cass_result_free(result);
cass_iterator_free(iterator);
}
cass_future_free(future);
cass_future_free(future);
}
while(morePages);
cass_statement_free(statement);
return SC_OK;
}
......
......@@ -262,50 +262,62 @@ void SensorDataStoreImpl::query(std::list<SensorDataStoreReading>& result, Senso
#endif
statement = cass_prepared_bind(prepared);
cass_statement_set_paging_size(statement, PAGING_SIZE);
cass_statement_bind_string(statement, 0, sid.getId().c_str());
cass_statement_bind_int16(statement, 1, sid.getRsvd());
cass_statement_bind_int64(statement, 2, start.getRaw());
cass_statement_bind_int64(statement, 3, end.getRaw());
future = cass_session_execute(session, statement);
cass_future_wait(future);
bool morePages = false;
do {
future = cass_session_execute(session, statement);
cass_future_wait(future);
if (cass_future_error_code(future) == CASS_OK) {
const CassResult* cresult = cass_future_get_result(future);
CassIterator* rows = cass_iterator_from_result(cresult);
if (cass_future_error_code(future) == CASS_OK) {
const CassResult *cresult = cass_future_get_result(future);
CassIterator *rows = cass_iterator_from_result(cresult);
SensorDataStoreReading entry;
SensorDataStoreReading entry;
while (cass_iterator_next(rows)) {
const CassRow* row = cass_iterator_get_row(rows);
while (cass_iterator_next(rows)) {
const CassRow *row = cass_iterator_get_row(rows);
cass_int64_t ts, value;
cass_value_get_int64(cass_row_get_column_by_name(row, "ts"), &ts);
cass_value_get_int64(cass_row_get_column_by_name(row, "value"), &value);
cass_int64_t ts, value;
cass_value_get_int64(cass_row_get_column_by_name(row, "ts"), &ts);
cass_value_get_int64(cass_row_get_column_by_name(row, "value"), &value);
entry.sensorId = sid;
entry.timeStamp = (uint64_t)ts;
entry.value = (int64_t)value;
entry.sensorId = sid;
entry.timeStamp = (uint64_t) ts;
entry.value = (int64_t) value;
result.push_back(entry);
result.push_back(entry);
#if 0
if (localtime) {
t.convertToLocal();
}
if (raw) {
std::cout << sensorName << "," << std::dec << t.getRaw() << "," << std::dec << value << std::endl;
}
else {
std::cout << sensorName << "," << t.getString() << "," << std::dec << value << std::endl;
}
if (localtime) {
t.convertToLocal();
}
if (raw) {
std::cout << sensorName << "," << std::dec << t.getRaw() << "," << std::dec << value << std::endl;
}
else {
std::cout << sensorName << "," << t.getString() << "," << std::dec << value << std::endl;
}
#endif
}
if((morePages = cass_result_has_more_pages(cresult)))
cass_statement_set_paging_state(statement, cresult);
cass_iterator_free(rows);
cass_result_free(cresult);
} else {
morePages = false;
}
cass_iterator_free(rows);
cass_result_free(cresult);
}
cass_future_free(future);
}
while(morePages);
cass_statement_free(statement);
cass_future_free(future);
cass_prepared_free(prepared);
}
......@@ -360,55 +372,67 @@ void SensorDataStoreImpl::query(std::list<SensorDataStoreReading>& result, std::
}
statement = cass_prepared_bind(prepared);
cass_statement_set_paging_size(statement, -1);
cass_statement_set_paging_size(statement, PAGING_SIZE);
cass_statement_bind_collection(statement, 0, cassList);
cass_statement_bind_int16(statement, 1, sids.front().getRsvd());
cass_statement_bind_int64(statement, 2, start.getRaw());
cass_statement_bind_int64(statement, 3, end.getRaw());
future = cass_session_execute(session, statement);
cass_future_wait(future);
if (cass_future_error_code(future) == CASS_OK) {
const CassResult *cresult = cass_future_get_result(future);
CassIterator *rows = cass_iterator_from_result(cresult);
SensorDataStoreReading entry;
cass_int64_t ts, value;
const char *name;
size_t name_len;
while (cass_iterator_next(rows)) {
const CassRow *row = cass_iterator_get_row(rows);
cass_value_get_int64(cass_row_get_column_by_name(row, "ts"), &ts);
cass_value_get_int64(cass_row_get_column_by_name(row, "value"), &value);
entry.timeStamp = (uint64_t) ts;
entry.value = (int64_t) value;
if(storeSids) {
cass_value_get_string(cass_row_get_column_by_name(row, "sid"), &name, &name_len);
entry.sensorId = SensorId(std::string(name, name_len));
}
result.push_back(entry);
bool morePages = false;
do {
future = cass_session_execute(session, statement);
cass_future_wait(future);
if (cass_future_error_code(future) == CASS_OK) {
const CassResult *cresult = cass_future_get_result(future);
CassIterator *rows = cass_iterator_from_result(cresult);
SensorDataStoreReading entry;
cass_int64_t ts, value;
const char *name;
size_t name_len;
while (cass_iterator_next(rows)) {
const CassRow *row = cass_iterator_get_row(rows);
cass_value_get_int64(cass_row_get_column_by_name(row, "ts"), &ts);
cass_value_get_int64(cass_row_get_column_by_name(row, "value"), &value);
entry.timeStamp = (uint64_t) ts;
entry.value = (int64_t) value;
if(storeSids) {
cass_value_get_string(cass_row_get_column_by_name(row, "sid"), &name, &name_len);
entry.sensorId = SensorId(std::string(name, name_len));
}
result.push_back(entry);
#if 0
if (localtime) {
t.convertToLocal();
}
if (raw) {
std::cout << sensorName << "," << std::dec << t.getRaw() << "," << std::dec << value << std::endl;
}
else {
std::cout << sensorName << "," << t.getString() << "," << std::dec << value << std::endl;
}
if (localtime) {
t.convertToLocal();
}
if (raw) {
std::cout << sensorName << "," << std::dec << t.getRaw() << "," << std::dec << value << std::endl;
}
else {
std::cout << sensorName << "," << t.getString() << "," << std::dec << value << std::endl;
}
#endif
}
if((morePages = cass_result_has_more_pages(cresult)))
cass_statement_set_paging_state(statement, cresult);
cass_iterator_free(rows);
cass_result_free(cresult);
} else {
morePages = false;
}
cass_iterator_free(rows);
cass_result_free(cresult);
cass_future_free(future);
}
while(morePages);
cass_statement_free(statement);
cass_future_free(future);
cass_collection_free(cassList);
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment