Commit aa6fd6cf authored by Alessio Netti's avatar Alessio Netti
Browse files

libdcdb: adding group queries in SensorDataStore

- overloaded query() and fuzzyQuery() methods that allow to query a set
of sensor IDs at the same time
- also changed the behavior of fuzzuQuery(): only one query is performed
now, retrieving the most recent value BEFORE the specified timestamp
parent 150e5b1f
Loading
Loading
Loading
Loading
+24 −0
Original line number Diff line number Diff line
@@ -158,6 +158,18 @@ public:
     */
    void query(std::list<SensorDataStoreReading>& result, SensorId& sid, TimeStamp& start, TimeStamp& end, QueryAggregate aggregate=AGGREGATE_NONE);

    /**
     * @brief This function queries a the values of
     *        a set of sensors in the given time range.
     *        The weekstamp being used is in this case that of the sensor ID 
     *        at the front of the input list.
     * @param result   The list where the results will be stored.
     * @param sids     The list of SensorIds to query.
     * @param start    Start of the time series.
     * @param end      End of the time series.
     */
    void query(std::list<SensorDataStoreReading>& result, std::list<SensorId>& sids, TimeStamp& start, TimeStamp& end, QueryAggregate aggregate=AGGREGATE_NONE);
    
    /**
     * @brief This function performs a fuzzy query and returns the 
     *        closest sensor reading to the input timestamp.
@@ -168,6 +180,18 @@ public:
     */
    void fuzzyQuery(std::list<SensorDataStoreReading>& result, SensorId& sid, TimeStamp& ts, uint64_t tol_ns=10000000000);

    /**
     * @brief This function performs a fuzzy query and returns the closest 
     *        sensor reading to the input timestamp, one per queried sensor.
     *        The weekstamp being used is in this case that of the sensor ID 
     *        at the front of the input list.
     * @param result   The list where the results will be stored.
     * @param sids     The list of SensorIds to query.
     * @param ts       The target timestamp.
     * @param tol_ns   Tolerance of the fuzzy query in nanoseconds.
     */
    void fuzzyQuery(std::list<SensorDataStoreReading>& result, std::list<SensorId>& sids, TimeStamp& ts, uint64_t tol_ns=10000000000);

    typedef void (*QueryCbFunc)(SensorDataStoreReading& reading, void* userData);
    /**
     * @brief This function queries a sensor's values in
+29 −3
Original line number Diff line number Diff line
@@ -40,6 +40,8 @@
#include "dcdb/sensordatastore.h"
#include "dcdb/connection.h"

#define QUERY_GROUP_LIMIT 1000

namespace DCDB {
static std::string const AggregateString[] = {"", "min", "max", "avg", "sum", "count"};

@@ -119,7 +121,7 @@ public:
  void setDebugLog(bool dl);

  /**
   * @brief This function querie a sensor's values in
   * @brief This function queries a sensor's values in
   *        the given time range.
   * @param result   The list where the results will be stored.
   * @param sid      The SensorId to query.
@@ -128,6 +130,18 @@ public:
   */
  void query(std::list<SensorDataStoreReading>& result, SensorId& sid, TimeStamp& start, TimeStamp& end, QueryAggregate aggregate);

  /**
   * @brief This function queries a the values of
   *        a set of sensors in the given time range.
   *        The weekstamp being used is in this case that 
   *        of the sensor ID at the front of the input list.
   * @param result   The list where the results will be stored.
   * @param sids     The list of SensorIds to query.
   * @param start    Start of the time series.
   * @param end      End of the time series.
   */
  void query(std::list<SensorDataStoreReading>& result, std::list<SensorId>& sids, TimeStamp& start, TimeStamp& end, QueryAggregate aggregate);

  /**
   * @brief This function performs a fuzzy query and returns the 
   *        closest sensor reading to the input timestamp.
@@ -138,6 +152,18 @@ public:
   */
  void fuzzyQuery(std::list<SensorDataStoreReading>& result, SensorId& sid, TimeStamp& ts, uint64_t tol_ns=10000000000);
   
  /**
   * @brief This function performs a fuzzy query and returns the closest 
   *        sensor reading to the input timestamp, one per queried sensor.
   *        The weekstamp being used is in this case that of the sensor ID 
   *        at the front of the input list.
   * @param result   The list where the results will be stored.
   * @param sids     The list of SensorIds to query.
   * @param ts       The target timestamp.
   * @param tol_ns   Tolerance of the fuzzy query in nanoseconds.
   */
  void fuzzyQuery(std::list<SensorDataStoreReading>& result, std::list<SensorId>& sids, TimeStamp& ts, uint64_t tol_ns=10000000000);

  /**
   * @brief This function queries a sensor's values in
   *        the given time range.
+199 −51
Original line number Diff line number Diff line
@@ -231,8 +231,7 @@ void SensorDataStoreImpl::setDebugLog(bool dl)
 * and creates a SensorDataStoreReading object for each
 * entry which is stored in the result list.
 */
void SensorDataStoreImpl::query(std::list<SensorDataStoreReading>& result, SensorId& sid, TimeStamp& start, TimeStamp& end, QueryAggregate aggregate)
{
void SensorDataStoreImpl::query(std::list<SensorDataStoreReading>& result, SensorId& sid, TimeStamp& start, TimeStamp& end, QueryAggregate aggregate) {
  CassError rc = CASS_OK;
  CassStatement* statement = NULL;
  CassFuture *future = NULL;
@@ -310,6 +309,110 @@ void SensorDataStoreImpl::query(std::list<SensorDataStoreReading>& result, Senso
  cass_prepared_free(prepared);
}

/**
 * @details
 * This function issues a regular query to the data store,
 * queries an arbitrary number of sensors simultaneously
 * and creates a SensorDataStoreReading object for each
 * entry which is stored in the result list.
 */
void SensorDataStoreImpl::query(std::list<SensorDataStoreReading>& result, std::list<SensorId>& sids, TimeStamp& start, TimeStamp& end, QueryAggregate aggregate) {
    if(sids.empty())
        return;
    
    CassError rc = CASS_OK;
    CassStatement* statement = NULL;
    CassFuture *future = NULL;
    const CassPrepared* prepared = nullptr;

    std::string query = std::string("SELECT sid,ts,");
    if (aggregate == AGGREGATE_NONE) {
        query.append("value");
    } else {
        query.append(AggregateString[aggregate] + std::string("(value) as value"));
    }
    query.append(" FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE sid in ? AND ws = ? AND ts >= ? AND ts <= ? ;");
    future = cass_session_prepare(session, query.c_str());
    cass_future_wait(future);

    rc = cass_future_error_code(future);
    if (rc != CASS_OK) {
        connection->printError(future);
        cass_future_free(future);
        return;
    }

    prepared = cass_future_get_prepared(future);
    cass_future_free(future);

    auto sidIt = sids.begin();
    size_t sidCtr = 0;

    while(sidIt != sids.end()) {
        CassCollection *cassList = cass_collection_new(CASS_COLLECTION_TYPE_LIST, sids.size());

        sidCtr = 0;
        // Breaking up the original list of sids in chunks
        while(sidIt != sids.end() && sidCtr < QUERY_GROUP_LIMIT) {
            cass_collection_append_string(cassList, sidIt->getId().c_str());
            ++sidIt;
            ++sidCtr;
        }

        statement = cass_prepared_bind(prepared);
        cass_statement_set_paging_size(statement, -1);
        cass_statement_bind_collection(statement, 0, cassList);
        cass_statement_bind_int16(statement, 1, sids.front().getRsvd());
        cass_statement_bind_int64(statement, 2, start.getRaw());
        cass_statement_bind_int64(statement, 3, end.getRaw());

        future = cass_session_execute(session, statement);
        cass_future_wait(future);

        if (cass_future_error_code(future) == CASS_OK) {
            const CassResult *cresult = cass_future_get_result(future);
            CassIterator *rows = cass_iterator_from_result(cresult);
            SensorDataStoreReading entry;
            cass_int64_t ts, value;
            const char *name;
            size_t name_len;

            while (cass_iterator_next(rows)) {
                const CassRow *row = cass_iterator_get_row(rows);

                cass_value_get_int64(cass_row_get_column_by_name(row, "ts"), &ts);
                cass_value_get_int64(cass_row_get_column_by_name(row, "value"), &value);
                cass_value_get_string(cass_row_get_column_by_name(row, "sid"), &name, &name_len);

                entry.sensorId = SensorId(std::string(name, name_len));
                entry.timeStamp = (uint64_t) ts;
                entry.value = (int64_t) value;

                result.push_back(entry);
#if 0
                if (localtime) {
                  t.convertToLocal();
              }
              if (raw) {
                  std::cout << sensorName << "," << std::dec << t.getRaw() << "," << std::dec << value << std::endl;
              }
              else {
                  std::cout << sensorName << "," << t.getString() << "," << std::dec << value << std::endl;
              }
#endif
            }
            cass_iterator_free(rows);
            cass_result_free(cresult);
        }

        cass_statement_free(statement);
        cass_future_free(future);
        cass_collection_free(cassList);
    }
    
    cass_prepared_free(prepared);
}

/**
 * @details
 * This function performs a fuzzy query on the datastore,
@@ -317,20 +420,12 @@ void SensorDataStoreImpl::query(std::list<SensorDataStoreReading>& result, Senso
 * the one given as input
 */
void SensorDataStoreImpl::fuzzyQuery(std::list<SensorDataStoreReading>& result, SensorId& sid, TimeStamp& ts, uint64_t tol_ns) {
    size_t elCtr = result.size();
    /* Issue a standard query */
    query(result, sid, ts, ts, AGGREGATE_NONE);
    // We got a sensor reading, and return
    if(elCtr < result.size())
        return;

    /* Find the readings before and after time t */
    /* Find the readings before time t */
    CassError rc = CASS_OK;
    CassStatement* statement = NULL;
    CassFuture *future = NULL;
    const CassPrepared* prepared = nullptr;
    const char* queryBefore = "SELECT * FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE sid = ? AND ws = ? AND ts <= ? ORDER BY ws DESC, ts DESC LIMIT 1";
    const char* queryAfter = "SELECT * FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE sid = ? AND ws = ? AND ts > ? LIMIT 1";
    const char* queryBefore = "SELECT sid,ts,value FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE sid = ? AND ws = ? AND ts <= ? ORDER BY ws DESC, ts DESC LIMIT 1";

    future = cass_session_prepare(session, queryBefore);
    cass_future_wait(future);
@@ -353,7 +448,6 @@ void SensorDataStoreImpl::fuzzyQuery(std::list<SensorDataStoreReading>& result,
    future = cass_session_execute(session, statement);
    cass_future_wait(future);
    SensorDataStoreReading r;
    uint64_t distNow=tol_ns, minDist=tol_ns;

    if (cass_future_error_code(future) == CASS_OK) {
        const CassResult* cresult = cass_future_get_result(future);
@@ -366,12 +460,11 @@ void SensorDataStoreImpl::fuzzyQuery(std::list<SensorDataStoreReading>& result,
            cass_value_get_int64(cass_row_get_column_by_name(row, "ts"), &tsInt);
            cass_value_get_int64(cass_row_get_column_by_name(row, "value"), &value);

            distNow = (uint64_t)tsInt < ts.getRaw() ? ts.getRaw()-(uint64_t)tsInt : (uint64_t)tsInt-ts.getRaw();
            if(distNow<minDist) {
                minDist = distNow;
            if(ts.getRaw() - (uint64_t)tsInt < tol_ns) {
                r.sensorId = sid;
                r.timeStamp = (uint64_t) tsInt;
                r.value = (int64_t) value;
                result.push_back(r);
            }
        }
        cass_iterator_free(rows);
@@ -381,9 +474,26 @@ void SensorDataStoreImpl::fuzzyQuery(std::list<SensorDataStoreReading>& result,
    cass_statement_free(statement);
    cass_future_free(future);
    cass_prepared_free(prepared);
}

/**
 * @details
 * This function performs a fuzzy query on the datastore,
 * picking readings from a set of sensors that are closest to
 * the timestamp given as input
 */
void SensorDataStoreImpl::fuzzyQuery(std::list<SensorDataStoreReading>& result, std::list<SensorId>& sids, TimeStamp& ts, uint64_t tol_ns) {
    if(sids.empty())
        return;

    /* Find the readings before time t */
    CassError rc = CASS_OK;
    CassStatement *statement = NULL;
    CassFuture *future = NULL;
    const CassPrepared *prepared = nullptr;
    const char *queryBefore = "SELECT sid,ts,value FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE sid in ? AND ws = ? AND ts <= ? ORDER BY ws DESC, ts DESC PER PARTITION LIMIT 1";

    /* Query after... */
    future = cass_session_prepare(session, queryAfter);
    future = cass_session_prepare(session, queryBefore);
    cass_future_wait(future);

    rc = cass_future_error_code(future);
@@ -396,46 +506,63 @@ void SensorDataStoreImpl::fuzzyQuery(std::list<SensorDataStoreReading>& result,
    prepared = cass_future_get_prepared(future);
    cass_future_free(future);
    
    auto sidIt = sids.begin();
    size_t sidCtr = 0;
    
    while(sidIt != sids.end()) {
        CassCollection *cassList = cass_collection_new(CASS_COLLECTION_TYPE_LIST, QUERY_GROUP_LIMIT);
        sidCtr = 0;
        // Breaking up the original list of sids in chunks
        while(sidIt != sids.end() && sidCtr < QUERY_GROUP_LIMIT) {
            cass_collection_append_string(cassList, sidIt->getId().c_str());
            ++sidIt;
            ++sidCtr;
        }

        statement = cass_prepared_bind(prepared);
    cass_statement_bind_string(statement, 0, sid.getId().c_str());
    cass_statement_bind_int16(statement, 1, sid.getRsvd());
        cass_statement_set_paging_size(statement, -1);
        cass_statement_bind_collection(statement, 0, cassList);
        cass_statement_bind_int16(statement, 1, sids.front().getRsvd());
        cass_statement_bind_int64(statement, 2, ts.getRaw());

        future = cass_session_execute(session, statement);
        cass_future_wait(future);
        SensorDataStoreReading r;

        if (cass_future_error_code(future) == CASS_OK) {
            const CassResult *cresult = cass_future_get_result(future);
            CassIterator *rows = cass_iterator_from_result(cresult);
            cass_int64_t tsInt, value;
            const char *name;
            size_t name_len;

            while (cass_iterator_next(rows)) {
                const CassRow *row = cass_iterator_get_row(rows);

            cass_int64_t tsInt, value;
                cass_value_get_int64(cass_row_get_column_by_name(row, "ts"), &tsInt);
                cass_value_get_int64(cass_row_get_column_by_name(row, "value"), &value);
                cass_value_get_string(cass_row_get_column_by_name(row, "sid"), &name, &name_len);

            distNow = (uint64_t)tsInt < ts.getRaw() ? ts.getRaw()-(uint64_t)tsInt : (uint64_t)tsInt-ts.getRaw();
            if(distNow<minDist) {
                minDist = distNow;
                r.sensorId = sid;
                if (ts.getRaw() - (uint64_t) tsInt < tol_ns) {
                    r.sensorId = SensorId(std::string(name, name_len));
                    r.timeStamp = (uint64_t) tsInt;
                    r.value = (int64_t) value;
                    result.push_back(r);
                }
            }

            cass_iterator_free(rows);
            cass_result_free(cresult);
        }

        cass_statement_free(statement);
        cass_future_free(future);
    cass_prepared_free(prepared);
        cass_collection_free(cassList);
    }
    
    if(minDist < tol_ns)
        result.push_back(r);
    cass_prepared_free(prepared);
}


/**
 * @details
 * This function issues a regular query to the data store
@@ -735,6 +862,17 @@ void SensorDataStore::query(std::list<SensorDataStoreReading>& result, SensorId&
    impl->query(result, sid, start, end, aggregate);
}

/**
 * @details
 * Instead of doing the actual work, this function simply
 * forwards to the insert function of the SensorDataStoreImpl
 * class.
 */
void SensorDataStore::query(std::list<SensorDataStoreReading>& result, std::list<SensorId>& sids, TimeStamp& start, TimeStamp& end, QueryAggregate aggregate)
{
    impl->query(result, sids, start, end, aggregate);
}

/**
 * @details
 * Instead of doing the actual work, this function simply
@@ -745,6 +883,16 @@ void SensorDataStore::fuzzyQuery(std::list<SensorDataStoreReading>& result, Sens
    impl->fuzzyQuery(result, sid, ts, tol_ns);
}

/**
 * @details
 * Instead of doing the actual work, this function simply
 * forwards to the insert function of the SensorDataStoreImpl
 * class.
 */
void SensorDataStore::fuzzyQuery(std::list<SensorDataStoreReading>& result, std::list<SensorId>& sids, TimeStamp& ts, uint64_t tol_ns) {
    impl->fuzzyQuery(result, sids, ts, tol_ns);
}

/**
 * @details
 * Instead of doing the actual work, this function simply