Commit 5adb6dd8 authored by Michael Ott's avatar Michael Ott
Browse files

Allow for using aggregate functions in queries

parent 32eb6741
...@@ -20,7 +20,7 @@ namespace DCDB { ...@@ -20,7 +20,7 @@ namespace DCDB {
public: public:
Sensor(DCDB::Connection* connection, std::string publicName); Sensor(DCDB::Connection* connection, std::string publicName);
virtual ~Sensor(); virtual ~Sensor();
void query(std::list<SensorDataStoreReading>& reading, TimeStamp& start, TimeStamp& end); void query(std::list<SensorDataStoreReading>& reading, TimeStamp& start, TimeStamp& end, QueryAggregate aggregate = AGGREGATE_NONE);
double getScalingFactor() const { double getScalingFactor() const {
return scalingFactor; return scalingFactor;
......
...@@ -50,6 +50,15 @@ typedef enum { ...@@ -50,6 +50,15 @@ typedef enum {
SDS_OK, SDS_OK,
SDS_EMPTYSET SDS_EMPTYSET
} SDSQueryResult; } SDSQueryResult;
typedef enum {
AGGREGATE_NONE = 0,
AGGREGATE_MIN,
AGGREGATE_MAX,
AGGREGATE_AVG,
AGGREGATE_SUM,
AGGREGATE_COUNT
} QueryAggregate;
/* Forward-declaration of the implementation-internal classes */ /* Forward-declaration of the implementation-internal classes */
class SensorDataStoreImpl; class SensorDataStoreImpl;
...@@ -107,7 +116,7 @@ public: ...@@ -107,7 +116,7 @@ public:
* @param start Start of the time series. * @param start Start of the time series.
* @param end End of the time series. * @param end End of the time series.
*/ */
void query(std::list<SensorDataStoreReading>& result, SensorId& sid, TimeStamp& start, TimeStamp& end); void query(std::list<SensorDataStoreReading>& result, SensorId& sid, TimeStamp& start, TimeStamp& end, QueryAggregate aggregate=AGGREGATE_NONE);
typedef void (*QueryCbFunc)(SensorDataStoreReading& reading, void* userData); typedef void (*QueryCbFunc)(SensorDataStoreReading& reading, void* userData);
/** /**
...@@ -119,7 +128,7 @@ public: ...@@ -119,7 +128,7 @@ public:
* @param start Start of the time series. * @param start Start of the time series.
* @param end End of the time series. * @param end End of the time series.
*/ */
void queryCB(QueryCbFunc cbFunc, void* userData, SensorId& sid, TimeStamp& start, TimeStamp& end); void queryCB(QueryCbFunc cbFunc, void* userData, SensorId& sid, TimeStamp& start, TimeStamp& end, QueryAggregate aggregate=AGGREGATE_NONE);
/** /**
* @brief This function queries the integrated value (val * sec) * @brief This function queries the integrated value (val * sec)
......
...@@ -40,7 +40,8 @@ ...@@ -40,7 +40,8 @@
#include "dcdb/connection.h" #include "dcdb/connection.h"
namespace DCDB { namespace DCDB {
static std::string const AggregateString[] = {"", "min", "max", "avg", "sum", "count"};
/** /**
* @brief The SensorDataStoreImpl class contains all protected * @brief The SensorDataStoreImpl class contains all protected
* functions belonging to SensorDataStore which are * functions belonging to SensorDataStore which are
...@@ -82,7 +83,7 @@ public: ...@@ -82,7 +83,7 @@ public:
* @param start Start of the time series. * @param start Start of the time series.
* @param end End of the time series. * @param end End of the time series.
*/ */
void query(std::list<SensorDataStoreReading>& result, SensorId& sid, TimeStamp& start, TimeStamp& end); void query(std::list<SensorDataStoreReading>& result, SensorId& sid, TimeStamp& start, TimeStamp& end, QueryAggregate aggregate);
/** /**
* @brief This function queries a sensor's values in * @brief This function queries a sensor's values in
...@@ -93,7 +94,7 @@ public: ...@@ -93,7 +94,7 @@ public:
* @param start Start of the time series. * @param start Start of the time series.
* @param end End of the time series. * @param end End of the time series.
*/ */
void queryCB(SensorDataStore::QueryCbFunc cbFunc, void* userData, SensorId& sid, TimeStamp& start, TimeStamp& end); void queryCB(SensorDataStore::QueryCbFunc cbFunc, void* userData, SensorId& sid, TimeStamp& start, TimeStamp& end, QueryAggregate aggregate);
/** /**
* @brief This function queries the integrated value * @brief This function queries the integrated value
......
...@@ -38,7 +38,7 @@ namespace DCDB { ...@@ -38,7 +38,7 @@ namespace DCDB {
delete sensorConfig; delete sensorConfig;
} }
void Sensor::query(std::list<SensorDataStoreReading>& result, TimeStamp& start, TimeStamp& end) { void Sensor::query(std::list<SensorDataStoreReading>& result, TimeStamp& start, TimeStamp& end, QueryAggregate aggregate) {
SensorDataStore sensorDataStore(connection); SensorDataStore sensorDataStore(connection);
if (publicSensor.is_virtual) { if (publicSensor.is_virtual) {
...@@ -61,7 +61,7 @@ namespace DCDB { ...@@ -61,7 +61,7 @@ namespace DCDB {
/* Iterate over the expanded list of sensorIds and output the results in CSV format */ /* Iterate over the expanded list of sensorIds and output the results in CSV format */
for (std::list<SensorId>::iterator sit = sensorIds.begin(); sit != sensorIds.end(); sit++) { for (std::list<SensorId>::iterator sit = sensorIds.begin(); sit != sensorIds.end(); sit++) {
sensorDataStore.query(result, *sit, start, end); sensorDataStore.query(result, *sit, start, end, aggregate);
} }
if (scalingFactor != 1.0 || publicSensor.scaling_factor != 1.0) { if (scalingFactor != 1.0 || publicSensor.scaling_factor != 1.0) {
......
...@@ -145,7 +145,7 @@ void SensorDataStoreImpl::insert(SensorId* sid, uint64_t ts, int64_t value) ...@@ -145,7 +145,7 @@ void SensorDataStoreImpl::insert(SensorId* sid, uint64_t ts, int64_t value)
cass_statement_bind_bytes_by_name(statement, "sid", (cass_byte_t*)(key.c_str()), 16); cass_statement_bind_bytes_by_name(statement, "sid", (cass_byte_t*)(key.c_str()), 16);
cass_statement_bind_int64_by_name(statement, "ts", ts); cass_statement_bind_int64_by_name(statement, "ts", ts);
cass_statement_bind_int64_by_name(statement, "value", value); cass_statement_bind_int64_by_name(statement, "value", value);
future = cass_session_execute(session, statement); future = cass_session_execute(session, statement);
cass_future_wait(future); cass_future_wait(future);
...@@ -174,15 +174,21 @@ void SensorDataStoreImpl::setTTL(uint64_t ttl) ...@@ -174,15 +174,21 @@ void SensorDataStoreImpl::setTTL(uint64_t ttl)
* and creates a SensorDataStoreReading object for each * and creates a SensorDataStoreReading object for each
* entry which is stored in the result list. * entry which is stored in the result list.
*/ */
void SensorDataStoreImpl::query(std::list<SensorDataStoreReading>& result, SensorId& sid, TimeStamp& start, TimeStamp& end) void SensorDataStoreImpl::query(std::list<SensorDataStoreReading>& result, SensorId& sid, TimeStamp& start, TimeStamp& end, QueryAggregate aggregate)
{ {
CassError rc = CASS_OK; CassError rc = CASS_OK;
CassStatement* statement = NULL; CassStatement* statement = NULL;
CassFuture *future = NULL; CassFuture *future = NULL;
const CassPrepared* prepared = nullptr; const CassPrepared* prepared = nullptr;
const char* query = "SELECT * FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE sid = ? AND ts >= ? AND ts <= ? ;";
std::string query = std::string("SELECT ts,");
future = cass_session_prepare(session, query); if (aggregate == AGGREGATE_NONE) {
query.append("value");
} else {
query.append(AggregateString[aggregate] + std::string("(value) as value"));
}
query.append(" FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE sid = ? AND ts >= ? AND ts <= ? ;");
future = cass_session_prepare(session, query.c_str());
cass_future_wait(future); cass_future_wait(future);
rc = cass_future_error_code(future); rc = cass_future_error_code(future);
...@@ -198,7 +204,7 @@ void SensorDataStoreImpl::query(std::list<SensorDataStoreReading>& result, Senso ...@@ -198,7 +204,7 @@ void SensorDataStoreImpl::query(std::list<SensorDataStoreReading>& result, Senso
const std::string key = sid.serialize(); const std::string key = sid.serialize();
#if 0 #if 0
std::cout << "Query: " << query << std::endl << "sid: " << key << " ts1: " << start.getRaw() << " ts2: " << end.getRaw() << std::endl; std::cout << "Query: " << query << std::endl << "sid: " << sid.toString() << " ts1: " << start.getRaw() << " ts2: " << end.getRaw() << std::endl;
#endif #endif
statement = cass_prepared_bind(prepared); statement = cass_prepared_bind(prepared);
...@@ -253,15 +259,21 @@ void SensorDataStoreImpl::query(std::list<SensorDataStoreReading>& result, Senso ...@@ -253,15 +259,21 @@ void SensorDataStoreImpl::query(std::list<SensorDataStoreReading>& result, Senso
* This function issues a regular query to the data store * This function issues a regular query to the data store
* and calls cbFunc for every reading. * and calls cbFunc for every reading.
*/ */
void SensorDataStoreImpl::queryCB(SensorDataStore::QueryCbFunc cbFunc, void* userData, SensorId& sid, TimeStamp& start, TimeStamp& end) void SensorDataStoreImpl::queryCB(SensorDataStore::QueryCbFunc cbFunc, void* userData, SensorId& sid, TimeStamp& start, TimeStamp& end, QueryAggregate aggregate)
{ {
CassError rc = CASS_OK; CassError rc = CASS_OK;
CassStatement* statement = NULL; CassStatement* statement = NULL;
CassFuture *future = NULL; CassFuture *future = NULL;
const CassPrepared* prepared = nullptr; const CassPrepared* prepared = nullptr;
const char* query = "SELECT * FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE sid = ? AND ts >= ? AND ts <= ? ;";
std::string query = std::string("SELECT ts,");
future = cass_session_prepare(session, query); if (aggregate == AGGREGATE_NONE) {
query.append("value");
} else {
query.append(AggregateString[aggregate] + std::string("(value) as value"));
}
query.append(" FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE sid = ? AND ts >= ? AND ts <= ? ;");
future = cass_session_prepare(session, query.c_str());
cass_future_wait(future); cass_future_wait(future);
rc = cass_future_error_code(future); rc = cass_future_error_code(future);
...@@ -323,7 +335,7 @@ SDSQueryResult SensorDataStoreImpl::querySum(int64_t& result, SensorId& sid, Tim ...@@ -323,7 +335,7 @@ SDSQueryResult SensorDataStoreImpl::querySum(int64_t& result, SensorId& sid, Tim
std::list<SensorDataStoreReading> queryResult; std::list<SensorDataStoreReading> queryResult;
/* Issue a standard query */ /* Issue a standard query */
query(queryResult, sid, start, end); query(queryResult, sid, start, end, AGGREGATE_NONE);
/* Check if at least 2 readings in result */ /* Check if at least 2 readings in result */
if (queryResult.size() < 2) if (queryResult.size() < 2)
...@@ -514,9 +526,9 @@ void SensorDataStore::setTTL(uint64_t ttl) ...@@ -514,9 +526,9 @@ void SensorDataStore::setTTL(uint64_t ttl)
* forwards to the insert function of the SensorDataStoreImpl * forwards to the insert function of the SensorDataStoreImpl
* class. * class.
*/ */
void SensorDataStore::query(std::list<SensorDataStoreReading>& result, SensorId& sid, TimeStamp& start, TimeStamp& end) void SensorDataStore::query(std::list<SensorDataStoreReading>& result, SensorId& sid, TimeStamp& start, TimeStamp& end, QueryAggregate aggregate)
{ {
impl->query(result, sid, start, end); impl->query(result, sid, start, end, aggregate);
} }
/** /**
...@@ -525,9 +537,9 @@ void SensorDataStore::query(std::list<SensorDataStoreReading>& result, SensorId& ...@@ -525,9 +537,9 @@ void SensorDataStore::query(std::list<SensorDataStoreReading>& result, SensorId&
* forwards to the insert function of the SensorDataStoreImpl * forwards to the insert function of the SensorDataStoreImpl
* class. * class.
*/ */
void SensorDataStore::queryCB(SensorDataStore::QueryCbFunc cbFunc, void* userData, SensorId& sid, TimeStamp& start, TimeStamp& end) void SensorDataStore::queryCB(SensorDataStore::QueryCbFunc cbFunc, void* userData, SensorId& sid, TimeStamp& start, TimeStamp& end, QueryAggregate aggregate)
{ {
return impl->queryCB(cbFunc, userData, sid, start, end); return impl->queryCB(cbFunc, userData, sid, start, end, aggregate);
} }
/** /**
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment