Commit a40e9c93 authored by Alessio Netti's avatar Alessio Netti
Browse files

libdcdb: integration of fuzzy sensor queries

- If users specify a single timestamp (startTs==endTs) when performing
a sensor query through dcdbquery or libdcdb, a fuzzy sensor query is
performed, and the closest sensor reading to the target timestamp is
returned
parent 99bc166a
......@@ -197,7 +197,12 @@ public:
* underlying data may be slightly unaligned depending on the sampling rate. If rel is set to
* false, startTs and endTs are interpreted as absolute timestamps, and the cache view is
* determined by performing binary search with O(log(n)) complexity, thus resulting in a accurate
* time range. This parameter does not affect the query method when using the Cassandra datastore.
* time range. Relative and absolute mode have different data guarantees:
*
* - Relative: returned data is guaranteed to not be stale, but it can extend outside of the
* queried range by a factor proportional to sensor's sampling rate.
* - Absolute: returned data is guaranteed to not be stale and to be strictly within the queried
* time range.
*
* @param name Name of the sensor to be queried
* @param startTs Start timestamp (in nanoseconds) of the time range for the query
......
......@@ -155,7 +155,7 @@ std::vector<reading_t>* sensorQueryCallback(const string& name, const uint64_t s
uint64_t startTsInt = rel ? now - startTs : startTs;
uint64_t endTsInt = rel ? now - endTs : endTs;
DCDB::TimeStamp start(startTsInt), end(endTsInt);
sensor.query(results, start, end, DCDB::AGGREGATE_NONE);
sensor.query(results, start, end, DCDB::AGGREGATE_NONE, 10000000000);
// Dealing with allocations that may have been performed by the cache search
if(!output)
output = (buffer==nullptr) ? new std::vector<reading_t>() : buffer;
......
......@@ -174,7 +174,7 @@ public:
int64_t endIdx = rel ? getOffset(endTs) : searchTimestamp(endTs, false);
//Managing invalid time offsets
if(startIdx < 0 || endIdx < 0)
if(startIdx < 0 || endIdx < 0 || startIdx-endIdx==1)
return buffer;
//Managing obsolete data
if(tsAbs(startTsInt, _cache[startIdx].timestamp) > staleThreshold || tsAbs(endTsInt, _cache[endIdx].timestamp) > staleThreshold)
......
......@@ -47,7 +47,7 @@ namespace DCDB {
Sensor(DCDB::Connection* connection, const std::string& publicName);
Sensor(Connection* connection, const PublicSensor& sensor);
virtual ~Sensor();
void query(std::list<SensorDataStoreReading>& reading, TimeStamp& start, TimeStamp& end, QueryAggregate aggregate = AGGREGATE_NONE);
void query(std::list<SensorDataStoreReading>& reading, TimeStamp& start, TimeStamp& end, QueryAggregate aggregate = AGGREGATE_NONE, uint64_t tol_ns=10000000000);
private:
Connection* connection;
......
......@@ -155,6 +155,16 @@ public:
*/
void query(std::list<SensorDataStoreReading>& result, SensorId& sid, TimeStamp& start, TimeStamp& end, QueryAggregate aggregate=AGGREGATE_NONE);
/**
* @brief This function performs a fuzzy query and returns the
* closest sensor reading to the input timestamp.
* @param result The list where the results will be stored.
* @param sid The SensorId to query.
* @param ts The target timestamp.
* @param tol_ns Tolerance of the fuzzy query in nanoseconds.
*/
void fuzzyQuery(std::list<SensorDataStoreReading>& result, SensorId& sid, TimeStamp& ts, uint64_t tol_ns=10000000000);
typedef void (*QueryCbFunc)(SensorDataStoreReading& reading, void* userData);
/**
* @brief This function queries a sensor's values in
......
......@@ -123,6 +123,16 @@ public:
*/
void query(std::list<SensorDataStoreReading>& result, SensorId& sid, TimeStamp& start, TimeStamp& end, QueryAggregate aggregate);
/**
* @brief This function performs a fuzzy query and returns the
* closest sensor reading to the input timestamp.
* @param result The list where the results will be stored.
* @param sid The SensorId to query.
* @param ts The target timestamp.
* @param tol_ns Tolerance of the fuzzy query in nanoseconds.
*/
void fuzzyQuery(std::list<SensorDataStoreReading>& result, SensorId& sid, TimeStamp& ts, uint64_t tol_ns=10000000000);
/**
* @brief This function queries a sensor's values in
* the given time range.
......
......@@ -65,7 +65,7 @@ namespace DCDB {
delete sensorConfig;
}
void Sensor::query(std::list<SensorDataStoreReading>& result, TimeStamp& start, TimeStamp& end, QueryAggregate aggregate) {
void Sensor::query(std::list<SensorDataStoreReading>& result, TimeStamp& start, TimeStamp& end, QueryAggregate aggregate, uint64_t tol_ns) {
SensorDataStore sensorDataStore(connection);
if (publicSensor.is_virtual) {
......@@ -87,10 +87,16 @@ namespace DCDB {
}
/* Iterate over the expanded list of sensorIds and output the results in CSV format */
for (std::list<SensorId>::iterator sit = sensorIds.begin(); sit != sensorIds.end(); sit++) {
sensorDataStore.query(result, *sit, start, end, aggregate);
if(start.getRaw() != end.getRaw()) {
for (std::list<SensorId>::iterator sit = sensorIds.begin(); sit != sensorIds.end(); sit++) {
sensorDataStore.query(result, *sit, start, end, aggregate);
}
} else {
for (std::list<SensorId>::iterator sit = sensorIds.begin(); sit != sensorIds.end(); sit++) {
sensorDataStore.fuzzyQuery(result, *sit, start, tol_ns);
}
}
}
}
} /* namespace DCDB */
......@@ -297,6 +297,132 @@ void SensorDataStoreImpl::query(std::list<SensorDataStoreReading>& result, Senso
cass_prepared_free(prepared);
}
/**
* @details
* This function performs a fuzzy query on the datastore,
* picking a single sensor readings that is closest to
* the one given as input
*/
void SensorDataStoreImpl::fuzzyQuery(std::list<SensorDataStoreReading>& result, SensorId& sid, TimeStamp& ts, uint64_t tol_ns) {
size_t elCtr = result.size();
/* Issue a standard query */
query(result, sid, ts, ts, AGGREGATE_NONE);
// We got a sensor reading, and return
if(elCtr < result.size())
return;
/* Find the readings before and after time t */
CassError rc = CASS_OK;
CassStatement* statement = NULL;
CassFuture *future = NULL;
const CassPrepared* prepared = nullptr;
const char* queryBefore = "SELECT * FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE sid = ? AND ws = ? AND ts <= ? ORDER BY ws DESC, ts DESC LIMIT 1";
const char* queryAfter = "SELECT * FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE sid = ? AND ws = ? AND ts > ? LIMIT 1";
future = cass_session_prepare(session, queryBefore);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
cass_future_free(future);
return;
}
prepared = cass_future_get_prepared(future);
cass_future_free(future);
statement = cass_prepared_bind(prepared);
cass_statement_bind_string(statement, 0, sid.getId().c_str());
cass_statement_bind_int16(statement, 1, sid.getRsvd());
cass_statement_bind_int64(statement, 2, ts.getRaw());
future = cass_session_execute(session, statement);
cass_future_wait(future);
SensorDataStoreReading r;
uint64_t distNow=tol_ns, minDist=tol_ns;
if (cass_future_error_code(future) == CASS_OK) {
const CassResult* cresult = cass_future_get_result(future);
CassIterator* rows = cass_iterator_from_result(cresult);
while (cass_iterator_next(rows)) {
const CassRow* row = cass_iterator_get_row(rows);
cass_int64_t tsInt, value;
cass_value_get_int64(cass_row_get_column_by_name(row, "ts"), &tsInt);
cass_value_get_int64(cass_row_get_column_by_name(row, "value"), &value);
distNow = (uint64_t)tsInt < ts.getRaw() ? ts.getRaw()-(uint64_t)tsInt : (uint64_t)tsInt-ts.getRaw();
if(distNow<minDist) {
minDist = distNow;
r.sensorId = sid;
r.timeStamp = (uint64_t) tsInt;
r.value = (int64_t) value;
}
}
cass_iterator_free(rows);
cass_result_free(cresult);
}
cass_statement_free(statement);
cass_future_free(future);
cass_prepared_free(prepared);
/* Query after... */
future = cass_session_prepare(session, queryAfter);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
cass_future_free(future);
return;
}
prepared = cass_future_get_prepared(future);
cass_future_free(future);
statement = cass_prepared_bind(prepared);
cass_statement_bind_string(statement, 0, sid.getId().c_str());
cass_statement_bind_int16(statement, 1, sid.getRsvd());
cass_statement_bind_int64(statement, 2, ts.getRaw());
future = cass_session_execute(session, statement);
cass_future_wait(future);
if (cass_future_error_code(future) == CASS_OK) {
const CassResult* cresult = cass_future_get_result(future);
CassIterator* rows = cass_iterator_from_result(cresult);
while (cass_iterator_next(rows)) {
const CassRow* row = cass_iterator_get_row(rows);
cass_int64_t tsInt, value;
cass_value_get_int64(cass_row_get_column_by_name(row, "ts"), &tsInt);
cass_value_get_int64(cass_row_get_column_by_name(row, "value"), &value);
distNow = (uint64_t)tsInt < ts.getRaw() ? ts.getRaw()-(uint64_t)tsInt : (uint64_t)tsInt-ts.getRaw();
if(distNow<minDist) {
minDist = distNow;
r.sensorId = sid;
r.timeStamp = (uint64_t) tsInt;
r.value = (int64_t) value;
}
}
cass_iterator_free(rows);
cass_result_free(cresult);
}
cass_statement_free(statement);
cass_future_free(future);
cass_prepared_free(prepared);
if(minDist < tol_ns)
result.push_back(r);
}
/**
* @details
* This function issues a regular query to the data store
......@@ -591,6 +717,16 @@ void SensorDataStore::query(std::list<SensorDataStoreReading>& result, SensorId&
impl->query(result, sid, start, end, aggregate);
}
/**
* @details
* Instead of doing the actual work, this function simply
* forwards to the insert function of the SensorDataStoreImpl
* class.
*/
void SensorDataStore::fuzzyQuery(std::list<SensorDataStoreReading>& result, SensorId& sid, TimeStamp& ts, uint64_t tol_ns) {
impl->fuzzyQuery(result, sid, ts, tol_ns);
}
/**
* @details
* Instead of doing the actual work, this function simply
......
......@@ -156,7 +156,7 @@ int main(int argc, char * const argv[])
}
/* Ensure start < end */
if(start >= end) {
if(start > end) {
std::cout << "Start time must be earlier than end time." << std::endl;
exit(EXIT_FAILURE);
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment