Commit 40353098 authored by Alessio Netti's avatar Alessio Netti

libdcdb: added option to drop sensor IDs when performing group queries

- Allows to reduce the size of the query results considerably
parent 95139a6b
......@@ -173,13 +173,13 @@ bool sensorGroupQueryCallback(const std::vector<string>& names, const uint64_t s
// If timestamps are equal we perform a fuzzy query
if(startTsInt == endTsInt) {
topics.front().setRsvd(startWs);
mySensorDataStore->fuzzyQuery(results, topics, start, 3600000000000);
mySensorDataStore->fuzzyQuery(results, topics, start, 3600000000000, false);
}
// Else, we iterate over the weekstamps (if more than one) and perform range queries
else {
for(uint16_t currWs=startWs; currWs<=endWs; currWs++) {
topics.front().setRsvd(currWs);
mySensorDataStore->query(results, topics, start, end, DCDB::AGGREGATE_NONE);
mySensorDataStore->query(results, topics, start, end, DCDB::AGGREGATE_NONE, false);
}
}
......
......@@ -167,8 +167,9 @@ public:
* @param sids The list of SensorIds to query.
* @param start Start of the time series.
* @param end End of the time series.
* @param storeSid If true, Sensor IDs will be retrieved for each reading.
*/
void query(std::list<SensorDataStoreReading>& result, std::list<SensorId>& sids, TimeStamp& start, TimeStamp& end, QueryAggregate aggregate=AGGREGATE_NONE);
void query(std::list<SensorDataStoreReading>& result, std::list<SensorId>& sids, TimeStamp& start, TimeStamp& end, QueryAggregate aggregate=AGGREGATE_NONE, bool storeSids=true);
/**
* @brief This function performs a fuzzy query and returns the
......@@ -189,8 +190,9 @@ public:
* @param sids The list of SensorIds to query.
* @param ts The target timestamp.
* @param tol_ns Tolerance of the fuzzy query in nanoseconds.
* @param storeSid If true, Sensor IDs will be retrieved for each reading.
*/
void fuzzyQuery(std::list<SensorDataStoreReading>& result, std::list<SensorId>& sids, TimeStamp& ts, uint64_t tol_ns=10000000000);
void fuzzyQuery(std::list<SensorDataStoreReading>& result, std::list<SensorId>& sids, TimeStamp& ts, uint64_t tol_ns=10000000000, bool storeSids=true);
typedef void (*QueryCbFunc)(SensorDataStoreReading& reading, void* userData);
/**
......
......@@ -139,8 +139,9 @@ public:
* @param sids The list of SensorIds to query.
* @param start Start of the time series.
* @param end End of the time series.
* @param storeSid If true, Sensor IDs will be retrieved for each reading.
*/
void query(std::list<SensorDataStoreReading>& result, std::list<SensorId>& sids, TimeStamp& start, TimeStamp& end, QueryAggregate aggregate);
void query(std::list<SensorDataStoreReading>& result, std::list<SensorId>& sids, TimeStamp& start, TimeStamp& end, QueryAggregate aggregate, bool storeSids);
/**
* @brief This function performs a fuzzy query and returns the
......@@ -161,8 +162,9 @@ public:
* @param sids The list of SensorIds to query.
* @param ts The target timestamp.
* @param tol_ns Tolerance of the fuzzy query in nanoseconds.
* @param storeSid If true, Sensor IDs will be retrieved for each reading.
*/
void fuzzyQuery(std::list<SensorDataStoreReading>& result, std::list<SensorId>& sids, TimeStamp& ts, uint64_t tol_ns=10000000000);
void fuzzyQuery(std::list<SensorDataStoreReading>& result, std::list<SensorId>& sids, TimeStamp& ts, uint64_t tol_ns=10000000000, bool storeSids);
/**
* @brief This function queries a sensor's values in
......
......@@ -316,7 +316,7 @@ void SensorDataStoreImpl::query(std::list<SensorDataStoreReading>& result, Senso
* and creates a SensorDataStoreReading object for each
* entry which is stored in the result list.
*/
void SensorDataStoreImpl::query(std::list<SensorDataStoreReading>& result, std::list<SensorId>& sids, TimeStamp& start, TimeStamp& end, QueryAggregate aggregate) {
void SensorDataStoreImpl::query(std::list<SensorDataStoreReading>& result, std::list<SensorId>& sids, TimeStamp& start, TimeStamp& end, QueryAggregate aggregate, bool storeSids) {
if(sids.empty())
return;
......@@ -325,7 +325,7 @@ void SensorDataStoreImpl::query(std::list<SensorDataStoreReading>& result, std::
CassFuture *future = NULL;
const CassPrepared* prepared = nullptr;
std::string query = std::string("SELECT sid,ts,");
std::string query = std::string(storeSids ? "SELECT sid,ts," : "SELECT ts,");
if (aggregate == AGGREGATE_NONE) {
query.append("value");
} else {
......@@ -382,12 +382,14 @@ void SensorDataStoreImpl::query(std::list<SensorDataStoreReading>& result, std::
cass_value_get_int64(cass_row_get_column_by_name(row, "ts"), &ts);
cass_value_get_int64(cass_row_get_column_by_name(row, "value"), &value);
cass_value_get_string(cass_row_get_column_by_name(row, "sid"), &name, &name_len);
entry.sensorId = SensorId(std::string(name, name_len));
entry.timeStamp = (uint64_t) ts;
entry.value = (int64_t) value;
if(storeSids) {
cass_value_get_string(cass_row_get_column_by_name(row, "sid"), &name, &name_len);
entry.sensorId = SensorId(std::string(name, name_len));
}
result.push_back(entry);
#if 0
if (localtime) {
......@@ -482,7 +484,7 @@ void SensorDataStoreImpl::fuzzyQuery(std::list<SensorDataStoreReading>& result,
* picking readings from a set of sensors that are closest to
* the timestamp given as input
*/
void SensorDataStoreImpl::fuzzyQuery(std::list<SensorDataStoreReading>& result, std::list<SensorId>& sids, TimeStamp& ts, uint64_t tol_ns) {
void SensorDataStoreImpl::fuzzyQuery(std::list<SensorDataStoreReading>& result, std::list<SensorId>& sids, TimeStamp& ts, uint64_t tol_ns, bool storeSids) {
if(sids.empty())
return;
......@@ -491,7 +493,8 @@ void SensorDataStoreImpl::fuzzyQuery(std::list<SensorDataStoreReading>& result,
CassStatement *statement = NULL;
CassFuture *future = NULL;
const CassPrepared *prepared = nullptr;
const char *queryBefore = "SELECT sid,ts,value FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE sid in ? AND ws = ? AND ts <= ? ORDER BY ws DESC, ts DESC PER PARTITION LIMIT 1";
const char *queryBefore = (storeSids ? "SELECT sid,ts,value FROM " : "SELECT ts,value FROM ") KEYSPACE_NAME
"." CF_SENSORDATA " WHERE sid in ? AND ws = ? AND ts <= ? ORDER BY ws DESC, ts DESC PER PARTITION LIMIT 1";
future = cass_session_prepare(session, queryBefore);
cass_future_wait(future);
......@@ -541,10 +544,12 @@ void SensorDataStoreImpl::fuzzyQuery(std::list<SensorDataStoreReading>& result,
cass_value_get_int64(cass_row_get_column_by_name(row, "ts"), &tsInt);
cass_value_get_int64(cass_row_get_column_by_name(row, "value"), &value);
cass_value_get_string(cass_row_get_column_by_name(row, "sid"), &name, &name_len);
if (ts.getRaw() - (uint64_t) tsInt < tol_ns) {
r.sensorId = SensorId(std::string(name, name_len));
if(storeSids) {
cass_value_get_string(cass_row_get_column_by_name(row, "sid"), &name, &name_len);
r.sensorId = SensorId(std::string(name, name_len));
}
r.timeStamp = (uint64_t) tsInt;
r.value = (int64_t) value;
result.push_back(r);
......@@ -868,9 +873,9 @@ void SensorDataStore::query(std::list<SensorDataStoreReading>& result, SensorId&
* forwards to the insert function of the SensorDataStoreImpl
* class.
*/
void SensorDataStore::query(std::list<SensorDataStoreReading>& result, std::list<SensorId>& sids, TimeStamp& start, TimeStamp& end, QueryAggregate aggregate)
void SensorDataStore::query(std::list<SensorDataStoreReading>& result, std::list<SensorId>& sids, TimeStamp& start, TimeStamp& end, QueryAggregate aggregate, bool storeSids)
{
impl->query(result, sids, start, end, aggregate);
impl->query(result, sids, start, end, aggregate, storeSids);
}
/**
......@@ -889,8 +894,8 @@ void SensorDataStore::fuzzyQuery(std::list<SensorDataStoreReading>& result, Sens
* forwards to the insert function of the SensorDataStoreImpl
* class.
*/
void SensorDataStore::fuzzyQuery(std::list<SensorDataStoreReading>& result, std::list<SensorId>& sids, TimeStamp& ts, uint64_t tol_ns) {
impl->fuzzyQuery(result, sids, ts, tol_ns);
void SensorDataStore::fuzzyQuery(std::list<SensorDataStoreReading>& result, std::list<SensorId>& sids, TimeStamp& ts, uint64_t tol_ns, bool storeSids) {
impl->fuzzyQuery(result, sids, ts, tol_ns, storeSids);
}
/**
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment