Commit f9932899 authored by Axel Auweter's avatar Axel Auweter
Browse files

Add a "snappier" callback based query API and make dcdbquery use it.

parent 596d9eeb
include ../config.mk
CXXFLAGS = -O2 -g --std=c++11 -Wall -Wno-unused-local-typedefs -Wno-unknown-warning-option -fmessage-length=0 -I$(DCDBDEPLOYPATH)/include/ -I$(DCDBBASEPATH)/include/ -DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG
CXXFLAGS = -O2 -g --std=c++11 -Wall -Wno-unused-local-typedefs -Wno-deprecated-declarations -Wno-unknown-warning-option -fmessage-length=0 -I$(DCDBDEPLOYPATH)/include/ -I$(DCDBBASEPATH)/include/ -DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG
OBJS = collectagent.o \
simplemqttserver.o \
simplemqttserverthread.o \
......
......@@ -89,6 +89,18 @@ public:
*/
void query(std::list<SensorDataStoreReading>& result, SensorId& sid, TimeStamp& start, TimeStamp& end);
typedef void (*QueryCbFunc)(SensorDataStoreReading& reading, void* userData);
/**
* @brief This function queries a sensor's values in
* the given time range.
* @param cbFunc A function to called for each reading.
* @param userData Pointer to user data that will be returned in the callback.
* @param sid The SensorId to query.
* @param start Start of the time series.
* @param end End of the time series.
*/
void queryCB(QueryCbFunc cbFunc, void* userData, SensorId& sid, TimeStamp& start, TimeStamp& end);
/**
* @brief This function queries the integrated value (val * sec)
* of a sensor for the given time range.
......
......@@ -82,6 +82,7 @@ protected:
public:
VSError query(std::list<SensorDataStoreReading>& result, TimeStamp& start, TimeStamp& end);
VSError queryCB(SensorDataStore::QueryCbFunc cbFunc, void* userData, TimeStamp& start, TimeStamp& end);
VSensor(Connection *conn, std::string name);
VSensor(Connection *conn, PublicSensor sensor);
......
......@@ -65,6 +65,17 @@ public:
*/
void query(std::list<SensorDataStoreReading>& result, SensorId& sid, TimeStamp& start, TimeStamp& end);
/**
* @brief This function queries a sensor's values in
* the given time range.
* @param cbFunc A function to called for each reading.
* @param userData Pointer to user data that will be returned in the callback.
* @param sid The SensorId to query.
* @param start Start of the time series.
* @param end End of the time series.
*/
void queryCB(SensorDataStore::QueryCbFunc cbFunc, void* userData, SensorId& sid, TimeStamp& start, TimeStamp& end);
/**
* @brief This function queries the integrated value
* of a sensor for the given time range.
......
......@@ -254,6 +254,7 @@ protected:
public:
VSError query(std::list<SensorDataStoreReading>& result, TimeStamp& start, TimeStamp& end);
VSError queryCB(SensorDataStore::QueryCbFunc cbFunc, void* userData, TimeStamp& start, TimeStamp& end);
VSensorImpl(Connection *conn, std::string name);
VSensorImpl(Connection *conn, PublicSensor sensor);
......
......@@ -729,6 +729,7 @@ SCError SensorConfigImpl::getPublicSensorByName(PublicSensor& sensor, const char
cass_future_free(future);
cass_statement_free(statement);
cass_prepared_free(prepared);
return SC_OK;
}
......
......@@ -229,6 +229,70 @@ void SensorDataStoreImpl::query(std::list<SensorDataStoreReading>& result, Senso
cass_prepared_free(prepared);
}
/**
* @details
* This function issues a regular query to the data store
* and calls cbFunc for every reading.
*/
void SensorDataStoreImpl::queryCB(SensorDataStore::QueryCbFunc cbFunc, void* userData, SensorId& sid, TimeStamp& start, TimeStamp& end)
{
CassError rc = CASS_OK;
CassStatement* statement = NULL;
CassFuture *future = NULL;
const CassPrepared* prepared = nullptr;
const char* query = "SELECT * FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE sid = ? AND ts >= ? AND ts <= ? ;";
future = cass_session_prepare(session, query);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
cass_future_free(future);
return;
}
prepared = cass_future_get_prepared(future);
cass_future_free(future);
std::string key = sid.serialize();
statement = cass_prepared_bind(prepared);
cass_statement_bind_bytes(statement, 0, (const cass_byte_t*)(key.c_str()), 16);
cass_statement_bind_int64(statement, 1, start.getRaw());
cass_statement_bind_int64(statement, 2, end.getRaw());
future = cass_session_execute(session, statement);
cass_future_wait(future);
if (cass_future_error_code(future) == CASS_OK) {
const CassResult* cresult = cass_future_get_result(future);
CassIterator* rows = cass_iterator_from_result(cresult);
SensorDataStoreReading entry;
while (cass_iterator_next(rows)) {
const CassRow* row = cass_iterator_get_row(rows);
cass_int64_t ts, value;
cass_value_get_int64(cass_row_get_column_by_name(row, "ts"), &ts);
cass_value_get_int64(cass_row_get_column_by_name(row, "value"), &value);
entry.sensorId = sid;
entry.timeStamp = (uint64_t)ts;
entry.value = (int64_t)value;
cbFunc(entry, userData);
}
cass_iterator_free(rows);
cass_result_free(cresult);
}
cass_statement_free(statement);
cass_future_free(future);
cass_prepared_free(prepared);
}
/**
* @details
* This function generates an integrated value of the time series
......@@ -436,6 +500,17 @@ void SensorDataStore::query(std::list<SensorDataStoreReading>& result, SensorId&
impl->query(result, sid, start, end);
}
/**
* @details
* Instead of doing the actual work, this function simply
* forwards to the insert function of the SensorDataStoreImpl
* class.
*/
void SensorDataStore::queryCB(SensorDataStore::QueryCbFunc cbFunc, void* userData, SensorId& sid, TimeStamp& start, TimeStamp& end)
{
return impl->queryCB(cbFunc, userData, sid, start, end);
}
/**
* @details
* Instead of doing the actual work, this function simply
......
......@@ -60,6 +60,11 @@ VSError VSensor::query(std::list<SensorDataStoreReading>& result, TimeStamp& sta
return impl->query(result, start, end);
}
VSError VSensor::queryCB(SensorDataStore::QueryCbFunc cbFunc, void* userData, TimeStamp& start, TimeStamp& end)
{
return impl->queryCB(cbFunc, userData, start, end);
}
VSensor::VSensor(Connection *conn, std::string name)
{
impl = new VirtualSensor::VSensorImpl(conn, name);
......@@ -578,6 +583,58 @@ VSError VSensorImpl::query(std::list<SensorDataStoreReading>& result, TimeStamp&
return VS_OK;
}
VSError VSensorImpl::queryCB(SensorDataStore::QueryCbFunc cbFunc, void* userData, TimeStamp& start, TimeStamp& end)
{
/* Clear physical sensor caches */
for (PhysicalSensorCacheContainer::iterator it = physicalSensorCaches.begin(); it != physicalSensorCaches.end(); it++) {
delete it->second;
}
physicalSensorCaches.clear();
/* Initialize sensor caches */
std::unordered_set<std::string> inputs;
expression->getInputsRecursive(inputs, false);
SensorConfig sc(connection);
PublicSensor psen;
for (std::unordered_set<std::string>::iterator it = inputs.begin(); it != inputs.end(); it++) {
sc.getPublicSensorByName(psen, it->c_str());
if (!psen.is_virtual) {
// std::cerr << "Adding to list of phys sensor caches: " << *it << std::endl;
physicalSensorCaches.insert(std::make_pair(*it, new PhysicalSensorCache(psen)));
}
}
/*
* Calculate first and last time stamp at which this virtual sensor fires:
* Each virtual sensor fires at t0 + n*frequency (n=0,1,2,....)
*/
uint64_t n_start, n_end;
n_start = (start.getRaw() - tzero.getRaw()) / frequency;
n_end = (end.getRaw() - tzero.getRaw()) / frequency;
/* Iterate over all time steps at which this sensor fires. */
for (
uint64_t i = (tzero.getRaw() + (n_start * frequency));
i <= (tzero.getRaw() + (n_end * frequency));
i += frequency)
{
try {
int64_t eval = expression->evaluateAt(i, physicalSensorCaches);
TimeStamp t(i);
SensorDataStoreReading r;
r.timeStamp = t;
r.value = eval;
cbFunc(r, userData);
}
catch (PhysicalSensorEvaluatorException& e) {
std::cerr << e.what();
}
}
return VS_OK;
}
VSensorImpl::VSensorImpl(Connection *conn, std::string name)
{
SensorConfig sc(conn);
......
include ../../config.mk
CXXFLAGS = -O2 -ggdb --std=c++11 -Wall -Wno-unused-local-typedefs -Wno-unknown-warning-option -fmessage-length=0 -I$(DCDBDEPLOYPATH)/include/ -I$(DCDBBASEPATH)/include/
CXXFLAGS = -O2 -ggdb --std=c++11 -Wall -Wno-unused-local-typedefs -Wno-deprecated-declarations -Wno-unknown-warning-option -fmessage-length=0 -I$(DCDBDEPLOYPATH)/include/ -I$(DCDBBASEPATH)/include/
OBJS = dcdbconfig.o sensoraction.o dbaction.o useraction.o
LIBS = -L$(DCDBDEPLOYPATH)/lib/ -ldcdb -lcassandra -luv -lboost_random -lboost_system -lboost_date_time -lboost_regex -lssl -lcrypto
# GCC 4.8 is broken
......
include ../../config.mk
CXXFLAGS = -O2 -ggdb --std=c++11 -Wall -Wno-unused-local-typedefs -Wno-unknown-warning-option -fmessage-length=0 -I$(DCDBDEPLOYPATH)/include/ -I$(DCDBBASEPATH)/include/ -DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG
CXXFLAGS = -O2 -ggdb --std=c++11 -Wall -Wno-unused-local-typedefs -Wno-deprecated-declarations -Wno-unknown-warning-option -fmessage-length=0 -I$(DCDBDEPLOYPATH)/include/ -I$(DCDBBASEPATH)/include/ -DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG
OBJS = dcdbquery.o query.o
LIBS = -L$(DCDBDEPLOYPATH)/lib/ -ldcdb -lcassandra -luv -lboost_random -lboost_system -lboost_date_time -lboost_regex -lssl -lcrypto
TARGET = dcdbquery
......
......@@ -43,14 +43,7 @@ bool DCDBQuery::getFloatOutputEnabled() {
return useFloatOutput;
}
void DCDBQuery::genOutput(
std::string sensor,
DCDB::SensorDataStoreReading& reading,
bool scale,
double scalingFactor,
bool unitConvert,
DCDB::Unit baseUnit,
DCDB::Unit targetUnit)
void DCDBQuery::genOutput(DCDB::SensorDataStoreReading& reading)
{
double fvalue;
int64_t ivalue;
......@@ -94,7 +87,7 @@ void DCDBQuery::genOutput(
}
/* Print the sensor's public name */
std::cout << sensor << ",";
std::cout << sensorName << ",";
/* Print the time stamp */
if (useLocalTime) {
......@@ -116,6 +109,14 @@ void DCDBQuery::genOutput(
}
}
void DCDBQuery::queryCallback(DCDB::SensorDataStoreReading& reading, void* userData)
{
DCDBQuery* self = (DCDBQuery*)userData;
/* Generate output for the reading */
self->genOutput(reading);
}
void DCDBQuery::doQuery(const char* hostname, std::list<std::string> sensors, DCDB::TimeStamp start, DCDB::TimeStamp end)
{
/* Create a new connection to the database */
......@@ -135,11 +136,12 @@ void DCDBQuery::doQuery(const char* hostname, std::list<std::string> sensors, DC
/* Iterate over list of sensors requested by the user */
for (std::list<std::string>::iterator it = sensors.begin(); it != sensors.end(); it++) {
bool unitConvert = false;
bool scale = false;
double scalingFactor = 1;
unitConvert = false;
scale = false;
scalingFactor = 1;
std::string modifierStr;
DCDB::Unit baseUnit = DCDB::Unit_None, targetUnit = DCDB::Unit_None;
baseUnit = DCDB::Unit_None;
targetUnit = DCDB::Unit_None;
/* Check if the sensor was requested in a different unit or with scaling factor */
if (it->find('/') != std::string::npos) {
......@@ -182,16 +184,9 @@ void DCDBQuery::doQuery(const char* hostname, std::list<std::string> sensors, DC
/* If this is a virtual sensor... */
if (publicSensor.is_virtual) {
sensorName = *it;
DCDB::VSensor vSen(connection, publicSensor);
std::list<DCDB::SensorDataStoreReading> readings;
vSen.query(readings, start, end);
/* Iterate over the readings */
for (std::list<DCDB::SensorDataStoreReading>::iterator rit = readings.begin(); rit != readings.end(); rit++) {
DCDB::SensorDataStoreReading reading = *rit;
genOutput(*it, reading, scale, scalingFactor, unitConvert, baseUnit, targetUnit);
}
vSen.queryCB(queryCallback, this, start, end);
}
else {
/* Expand the pattern into a list of existing sensors in the time range */
......@@ -209,14 +204,8 @@ void DCDBQuery::doQuery(const char* hostname, std::list<std::string> sensors, DC
/* Iterate over the expanded list of sensorIds and output the results in CSV format */
for (std::list<DCDB::SensorId>::iterator sit = sensorIds.begin(); sit != sensorIds.end(); sit++) {
std::list <DCDB::SensorDataStoreReading> readings;
sensorDataStore.query(readings, *sit, start, end);
/* Iterate over the readings */
for (std::list<DCDB::SensorDataStoreReading>::iterator rit = readings.begin(); rit != readings.end(); rit++) {
DCDB::SensorDataStoreReading reading = *rit;
genOutput(*it, reading, scale, scalingFactor, unitConvert, baseUnit, targetUnit);
}
sensorName = *it;
sensorDataStore.queryCB(queryCallback, this, *sit, start, end);
}
}
}
......
......@@ -27,6 +27,13 @@ protected:
bool useRawOutput;
bool useFloatOutput;
std::string sensorName;
bool scale;
double scalingFactor;
bool unitConvert;
DCDB::Unit baseUnit;
DCDB::Unit targetUnit;
public:
void setLocalTimeEnabled(bool enable);
bool getLocalTimeEnabled();
......@@ -35,15 +42,9 @@ public:
void setFloatOutputEnabled(bool enable);
bool getFloatOutputEnabled();
void genOutput(
std::string sensor,
DCDB::SensorDataStoreReading& reading,
bool scale,
double scalingFactor,
bool unitConvert,
DCDB::Unit baseUnit,
DCDB::Unit targetUnit
);
void genOutput(DCDB::SensorDataStoreReading& reading);
static void queryCallback(DCDB::SensorDataStoreReading& reading, void* userData);
void doQuery(const char* hostname, std::list<std::string> sensors, DCDB::TimeStamp start, DCDB::TimeStamp end);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment