Commit fc4c08de authored by Axel Auweter's avatar Axel Auweter
Browse files

First (naive, inefficient, ...) working implementatino for querying virtual sensors.

parent 1fa730c7
......@@ -53,6 +53,7 @@ typedef enum {
SC_INVALIDPATTERN,
SC_INVALIDPUBLICNAME,
SC_INVALIDEXPRESSION,
SC_EXPRESSIONSELFREF,
SC_INVALIDVSENSORID,
SC_WRONGTYPE,
SC_UNKNOWNSENSOR,
......
......@@ -90,19 +90,19 @@ public:
* @brief Returns the raw time stamp value.
* @return The object's value as uint64_t.
*/
uint64_t getRaw(void);
uint64_t getRaw(void) const;
/**
* @brief Returns the time stamp's value as human readable string
* @return The object's value as std::string.
*/
std::string getString(void);
std::string getString(void) const;
/**
* @brief Returns the "weekstamp" corresponding to the object's value
* @return The week number of the timestamp.
*/
uint16_t getWeekstamp(void);
uint16_t getWeekstamp(void) const;
/* Overloaded operators (compare raw values) */
inline bool operator == (const TimeStamp& rhs) const {return raw == rhs.raw;}
......
......@@ -12,6 +12,11 @@
#include <stdexcept>
#include <string>
#include <unordered_set>
#include "connection.h"
#include "sensorconfig.h"
#include "sensordatastore.h"
#ifndef DCDB_VIRTUAL_SENSOR_H
#define DCDB_VIRTUAL_SENSOR_H
......@@ -36,15 +41,39 @@ private:
std::string where_;
};
/* Forward declare VirtualSensor::VSensorImpl */
/* Forward declare VirtualSensor::VSensorExpressionImpl and VirtualSensor::VSensorImpl */
namespace VirtualSensor{
class VSensorExpressionImpl;
class VSensorImpl;
}
/**
* @brief Class for evaluating Virtual Sensors
* @brief Public class for evaluating Virtual Sensors expressions
*
* TODO: Implement this :)
* This class forwards to the library internal VSensorExpressionImpl class.
*/
class VSensorExpression
{
protected:
VirtualSensor::VSensorExpressionImpl *impl;
public:
void getInputs(std::unordered_set<std::string>& inputSet);
void getInputsRecursive(std::unordered_set<std::string>& inputSet, bool virtualOnly = true);
VSensorExpression(Connection* conn, std::string expr);
virtual ~VSensorExpression();
};
typedef enum {
VS_OK,
VS_UNKNOWNERROR
} VSError;
/**
* @brief Class for querying virtual sensors
*
* TODO: Implement this...
*/
class VSensor
{
......@@ -52,7 +81,10 @@ protected:
VirtualSensor::VSensorImpl *impl;
public:
VSensor(std::string expr);
VSError query(std::list<SensorDataStoreReading>& result, TimeStamp& start, TimeStamp& end);
VSensor(Connection *conn, std::string name);
VSensor(Connection *conn, PublicSensor sensor);
virtual ~VSensor();
};
......
......@@ -14,6 +14,16 @@
#include <boost/variant/recursive_variant.hpp>
#include <boost/fusion/include/adapt_struct.hpp>
#include <cstdint>
#include <string>
#include <list>
#include <unordered_set>
#include "timestamp.h"
#include "sensorid.h"
#include "virtualsensor.h"
#ifndef DCDB_VIRTUAL_SENSOR_INTERNAL_H
#define DCDB_VIRTUAL_SENSOR_INTERNAL_H
......@@ -59,6 +69,7 @@ struct Op;
typedef boost::variant<
Nil,
unsigned int,
std::string,
boost::recursive_wrapper<Signd>,
boost::recursive_wrapper<Opseq>
> Operand;
......@@ -140,24 +151,78 @@ struct ExpressionGrammar : qi::grammar<Iterator, AST::Opseq(), ascii::space_type
| '(' >> expression >> ')'
| (qi::char_('-') >> factor)
| (qi::char_('+') >> factor)
| sensor
;
}
qi::rule<Iterator, AST::Opseq(), ascii::space_type> expression;
qi::rule<Iterator, AST::Opseq(), ascii::space_type> term;
qi::rule<Iterator, AST::Operand(), ascii::space_type> factor;
qi::symbols<char, std::string> sensor;
/**
* @brief Populates the parser grammar with a symbol table of available sensor names.
*
* Since it eases the implementation, references to sensors in the virtual sensor
* expressions are parsed as symbols by the Qi parser. Therefore, we have to populate
* the list of sensors from the public sensors table during runtime. This should be
* done always after instantiating objects of the ExpressionGrammar.
*/
void addSensorNames(std::list<std::string> sensorNames) {
for (std::list<std::string>::iterator it = sensorNames.begin(); it != sensorNames.end(); it++) {
sensor.add (it->c_str(), *it);
}
}
};
class VSensorImpl
/**
* @brief Private implementation class for evaluating Virtual Sensors expressions
*
* This class implements the expression parser and provides functions that create an
* unordered set of inputs required to evaluate the expression.
*/
class VSensorExpressionImpl
{
protected:
Connection* connection;
VirtualSensor::AST::Opseq opseq;
void validateExpression(std::string expr);
void generateAST(std::string expr);
void dumpAST();
static int64_t physicalSensorInterpolator(Connection* conn, SensorConfig& sc, PublicSensor& sensor, TimeStamp t);
public:
VSensorImpl(std::string expr);
void getInputs(std::unordered_set<std::string>& inputSet);
void getInputsRecursive(std::unordered_set<std::string>& inputSet, bool virtualOnly);
int64_t evaluateAt(TimeStamp time);
VSensorExpressionImpl(Connection* conn, std::string expr);
virtual ~VSensorExpressionImpl();
};
/**
* @brief Private implementation class for querying virtual sensors
*
* TODO: Implement this...
*/
class VSensorImpl
{
protected:
Connection* connection;
std::string name;
VSensorExpressionImpl* expression;
SensorId* vsensorid;
TimeStamp tzero;
uint64_t frequency;
public:
VSError query(std::list<SensorDataStoreReading>& result, TimeStamp& start, TimeStamp& end);
VSensorImpl(Connection *conn, std::string name);
VSensorImpl(Connection *conn, PublicSensor sensor);
virtual ~VSensorImpl();
};
......
......@@ -8,6 +8,9 @@
#include <cstring>
#include <iostream>
#include <algorithm>
#include <string>
#include <list>
#include <unordered_set>
#include "cassandra.h"
......@@ -314,7 +317,15 @@ SCError SensorConfigImpl::publishVirtualSensor(std::string publicName, std::stri
/* Validate vSensorExpression */
try {
VSensor vsensor(vSensorExpression);
VSensorExpression vsExp(connection, vSensorExpression);
/* Check that it is not recursive-pointing to itself */
std::unordered_set<std::string> inputSet;
vsExp.getInputsRecursive(inputSet);
std::unordered_set<std::string>::const_iterator found = inputSet.find(publicName);
if (found != inputSet.end()) {
return SC_EXPRESSIONSELFREF;
}
}
catch (std::exception& e) {
std::cout << e.what();
......
......@@ -201,7 +201,7 @@ void TimeStamp::convertToLocal()
/**
*
*/
uint64_t TimeStamp::getRaw(void)
uint64_t TimeStamp::getRaw(void) const
{
return raw;
}
......@@ -209,7 +209,7 @@ uint64_t TimeStamp::getRaw(void)
/**
*
*/
std::string TimeStamp::getString(void)
std::string TimeStamp::getString(void) const
{
#ifndef BOOST_DATE_TIME_HAS_NANOSECONDS
#error Needs nanoseconds support in boost.
......@@ -223,7 +223,7 @@ std::string TimeStamp::getString(void)
/**
*
*/
uint16_t TimeStamp::getWeekstamp(void)
uint16_t TimeStamp::getWeekstamp(void) const
{
uint16_t week = raw / 604800000000000;
return week;
......
......@@ -7,6 +7,13 @@
#include "virtualsensor.h"
#include "virtualsensor_internal.h"
#include "sensorconfig.h"
#include "dcdbglobals.h"
#include <iostream>
#include <iomanip>
#include <numeric>
#include <cstdlib>
namespace DCDB {
......@@ -21,14 +28,49 @@ const char* VSExpressionParserException::what() const throw() {
return msg_.c_str();
}
/*
* Implementations for VSensorExpression class.
*/
void VSensorExpression::getInputs(std::unordered_set<std::string>& inputSet)
{
impl->getInputs(inputSet);
}
void VSensorExpression::getInputsRecursive(std::unordered_set<std::string>& inputSet, bool virtualOnly)
{
return impl->getInputsRecursive(inputSet, virtualOnly);
}
VSensorExpression::VSensorExpression(Connection* conn, std::string expr) {
impl = new VirtualSensor::VSensorExpressionImpl(conn, expr);
}
VSensorExpression::~VSensorExpression() {
if (impl) {
delete impl;
}
}
/*
* Implementations for VSensor class.
*/
VSensor::VSensor(std::string expr) {
impl = new VirtualSensor::VSensorImpl(expr);
VSError VSensor::query(std::list<SensorDataStoreReading>& result, TimeStamp& start, TimeStamp& end)
{
return impl->query(result, start, end);
}
VSensor::VSensor(Connection *conn, std::string name)
{
impl = new VirtualSensor::VSensorImpl(conn, name);
}
VSensor::~VSensor() {
VSensor::VSensor(Connection *conn, PublicSensor sensor)
{
impl = new VirtualSensor::VSensorImpl(conn, sensor);
}
VSensor::~VSensor()
{
if (impl) {
delete impl;
}
......@@ -36,9 +78,9 @@ VSensor::~VSensor() {
namespace VirtualSensor {
/*
* Implementations for VSensorImpl class.
* Implementations for VSensorExpressionImpl class.
*/
void VSensorImpl::validateExpression(std::string expr)
void VSensorExpressionImpl::generateAST(std::string expr)
{
/* Try to generate AST */
typedef std::string::const_iterator StringIterator;
......@@ -47,6 +89,12 @@ void VSensorImpl::validateExpression(std::string expr)
ascii::space_type space;
Grammar grammar;
/* Add the list of known sensors to the grammar */
std::list<std::string> sensorNames;
SensorConfig sc(connection);
sc.getPublicSensorNames(sensorNames);
grammar.addSensorNames(sensorNames);
StringIterator it = expr.begin();
StringIterator end = expr.end();
bool success = phrase_parse(it, end, grammar, space, opseq);
......@@ -59,13 +107,14 @@ void VSensorImpl::validateExpression(std::string expr)
/* Success - opseq now represents the top level of our AST */
}
void VSensorImpl::dumpAST()
void VSensorExpressionImpl::dumpAST()
{
/* Declare a struct describing the action for each object in the AST when it comes to printing */
struct ASTPrinter {
typedef void result_type;
void operator()(AST::Nil) const {}
void operator()(unsigned int n) const { std::cout << n; }
void operator()(std::string s) const { std::cout << "sensor(" << s << ")"; }
void operator()(AST::Op const& x) const {
boost::apply_visitor(*this, x.oprnd);
switch (x.oprtr) {
......@@ -96,17 +145,369 @@ void VSensorImpl::dumpAST()
std::cout << std::endl;
}
VSensorImpl::VSensorImpl(std::string expr)
int64_t VSensorExpressionImpl::physicalSensorInterpolator(Connection* connection, SensorConfig& sc, PublicSensor& sensor, TimeStamp t)
{
/* Check if the expression is valid */
validateExpression(expr);
/*
* FIXME: Very naive and inefficient implementation here requiring 2 queries per request.
* In a proper implementation, we would query the full series for every physical input sensor
* and keep them in memory to be evaluated here.
*/
CassSession* session = connection->getSessionHandle();
/* Expand the sensor's public name into its internal SensorId */
/* FIXME: Should do proper error handling. Returning 0 is probably wrong! */
std::list<DCDB::SensorId> sensorIds;
switch (sc.getSensorListForPattern(sensorIds, sensor.pattern, t, t)) {
case DCDB::SC_OK:
break;
case DCDB::SC_INVALIDPATTERN:
std::cout << "Invalid pattern." << std::endl;
return 0;
default:
std::cout << "Unknown error." << std::endl;
return 0;
}
/* The sensorIds list should only contain one entry */
std::list<DCDB::SensorId>::iterator sit = sensorIds.begin();
// std::cout << "Raw sensor id: " << std::hex << std::setfill('0') << std::setw(16) << sit->getRaw()[0] << " " << std::hex << std::setfill('0') << std::setw(16) << sit->getRaw()[1] << std::dec << std::endl;
/* Find the readings just before and just after time t */
CassError rc = CASS_OK;
CassStatement* statement = NULL;
CassFuture *future = NULL;
const CassPrepared* prepared = nullptr;
const char* queryBefore = "SELECT * FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE sid = ? AND ts <= ? ORDER BY ts DESC LIMIT 1;";
const char* queryAfter = "SELECT * FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE sid = ? AND ts > ? LIMIT 1;";
SensorDataStoreReading readingBefore, readingAfter;
std::string key = sit->serialize();
/* Query before... */
future = cass_session_prepare(session, queryBefore);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
cass_future_free(future);
return 0;
}
prepared = cass_future_get_prepared(future);
cass_future_free(future);
statement = cass_prepared_bind(prepared);
cass_statement_bind_bytes(statement, 0, (const cass_byte_t*)(key.c_str()), 16);
cass_statement_bind_int64(statement, 1, t.getRaw());
future = cass_session_execute(session, statement);
cass_future_wait(future);
/* Dump AST */
dumpAST();
if (cass_future_error_code(future) == CASS_OK) {
const CassResult* cresult = cass_future_get_result(future);
CassIterator* rows = cass_iterator_from_result(cresult);
if (cass_iterator_next(rows)) {
const CassRow* row = cass_iterator_get_row(rows);
cass_int64_t ts, value;
cass_value_get_int64(cass_row_get_column_by_name(row, "ts"), &ts);
cass_value_get_int64(cass_row_get_column_by_name(row, "value"), &value);
readingBefore.timeStamp = (uint64_t)ts;
readingBefore.value = (int64_t)value;
}
else {
std::cout << "Cannot find reading for sensor " << sensor.name << " prior to time " << t.getString() << "(" << t.getRaw() << ")" << std::endl;
}
cass_iterator_free(rows);
cass_result_free(cresult);
}
cass_statement_free(statement);
cass_future_free(future);
cass_prepared_free(prepared);
/* Query after... */
future = cass_session_prepare(session, queryAfter);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
cass_future_free(future);
return 0;
}
prepared = cass_future_get_prepared(future);
cass_future_free(future);
statement = cass_prepared_bind(prepared);
cass_statement_bind_bytes(statement, 0, (const cass_byte_t*)(key.c_str()), 16);
cass_statement_bind_int64(statement, 1, t.getRaw());
future = cass_session_execute(session, statement);
cass_future_wait(future);
if (cass_future_error_code(future) == CASS_OK) {
const CassResult* cresult = cass_future_get_result(future);
CassIterator* rows = cass_iterator_from_result(cresult);
if (cass_iterator_next(rows)) {
const CassRow* row = cass_iterator_get_row(rows);
cass_int64_t ts, value;
cass_value_get_int64(cass_row_get_column_by_name(row, "ts"), &ts);
cass_value_get_int64(cass_row_get_column_by_name(row, "value"), &value);
readingAfter.timeStamp = (uint64_t)ts;
readingAfter.value = (int64_t)value;
}
else {
std::cout << "Cannot find reading for sensor " << sensor.name << " following time " << t.getString() << "(" << t.getRaw() << ")" << std::endl;
}
cass_iterator_free(rows);
cass_result_free(cresult);
}
cass_statement_free(statement);
cass_future_free(future);
cass_prepared_free(prepared);
/*
* Linearly interpolate between the readings using the following equation:
*
* y2 - y1 x2y1 - x1y2
* y = ------- * x + -----------
* x2 - x1 x2 - x1
*/
typedef double eval_type;
eval_type x1 = readingBefore.timeStamp.getRaw();
eval_type x2 = readingAfter.timeStamp.getRaw();
eval_type y1 = readingBefore.value;
eval_type y2 = readingAfter.value;
eval_type x = t.getRaw();
return (((y2 - y1) / (x2 - x1)) * x) + (((x2 * y1) - (x1 * y2)) / (x2 - x1));
}
void VSensorExpressionImpl::getInputs(std::unordered_set<std::string>& inputSet)
{
/* Declare a struct describing the action for each object in the AST when it comes to collecting the sensor inputs */
struct ASTInputCollector {
typedef void result_type;
void operator()(AST::Nil) const {}
void operator()(unsigned int n) const { }
void operator()(std::string s) const { is.insert(s); }
void operator()(AST::Op const& x) const {
boost::apply_visitor(*this, x.oprnd);
}
void operator()(AST::Signd const& x) const {
boost::apply_visitor(*this, x.oprnd);
}
void operator()(AST::Opseq const& x) const {
boost::apply_visitor(*this, x.frst);
BOOST_FOREACH(AST::Op const& o, x.rst) {
(*this)(o);
}
}
ASTInputCollector(std::unordered_set<std::string>& inputSet) : is(inputSet) {}
std::unordered_set<std::string>& is;
};
ASTInputCollector inputCollector(inputSet);
inputCollector(opseq);
}
void VSensorExpressionImpl::getInputsRecursive(std::unordered_set<std::string>& inputSet, bool virtualOnly)
{
/* Get our own inputs */
std::unordered_set<std::string> myInputs;
getInputs(myInputs);
/* Iterate over inputs and append to set */
for (std::unordered_set<std::string>::iterator it = myInputs.begin(); it != myInputs.end(); it++ ) {
/* Get information for this sensor */
SensorConfig sc(connection);
PublicSensor psen;
sc.getPublicSensorByName(psen, it->c_str());
/* Check if the sensor is physical and whether we only append virtual sensors */
if (!psen.is_virtual && virtualOnly) {
continue;
}
/* Append the sensor to the set */
inputSet.insert(*it);
/* If the sensor is physical, we're done */
if (!psen.is_virtual) {
continue;
}
/* Recurse into this sensor's list of inputs */
VSensorExpressionImpl vsen(connection, psen.expression);
vsen.getInputsRecursive(inputSet, virtualOnly);
}
}
int64_t VSensorExpressionImpl::evaluateAt(TimeStamp time)
{
/* Declare a struct describing the action for each object in the AST when it comes to evaluation */
struct ASTEvaluator {
typedef int64_t result_type;
int64_t operator()(AST::Nil) const { return 0; }
int64_t operator()(unsigned int n) const { return n; }
int64_t operator()(std::string s) const {
/* Evaluate sensor s at time t */
SensorConfig sc(c);
PublicSensor sen;
if (sc.getPublicSensorByName(sen, s.c_str()) != SC_OK) {
std::cout << "Internal error on getPublicSensorByName while trying to evaluate " << s << " at " << t.getString() << std::endl;
return 0;
}
/* Things are easy if the sensor is virtual */
if (sen.is_virtual) {
VSensorExpressionImpl vSen(c, sen.expression);
return vSen.evaluateAt(t);
}
else {
/* Physical sensors need a little bit more thinkin' */
return physicalSensorInterpolator(c, sc, sen, t);
}