Commit adfbd3b4 authored by Axel Auweter's avatar Axel Auweter
Browse files

Implement a cache for physical sensor data to speed up virtual sensor calculations.

parent d070d6b0
......@@ -18,9 +18,12 @@
#include <string>
#include <list>
#include <unordered_set>
#include <unordered_map>
#include <map>
#include "timestamp.h"
#include "sensorid.h"
#include "sensordatastore.h"
#include "virtualsensor.h"
......@@ -28,7 +31,6 @@
#define DCDB_VIRTUAL_SENSOR_INTERNAL_H
namespace DCDB {
namespace VirtualSensor {
/**
* @brief Exception class for errors during Physical Sensor Evaluation
......@@ -43,6 +45,24 @@ public:
PhysicalSensorEvaluatorException(const std::string& msg) : runtime_error(msg) {}
};
class PhysicalSensorCache {
protected:
std::map<uint64_t, SensorDataStoreReading> cache;
PublicSensor s;
void populate(Connection* connection, SensorConfig& sc, uint64_t t);
public:
void getBefore(Connection* connection, SensorConfig& sc, SensorDataStoreReading& r, uint64_t t);
void getAfter(Connection* connection, SensorConfig& sc, SensorDataStoreReading& r, uint64_t t);
PhysicalSensorCache(PublicSensor sensor);
virtual ~PhysicalSensorCache();
};
typedef std::unordered_map<std::string, PhysicalSensorCache*> PhysicalSensorCacheContainer;
namespace VirtualSensor {
namespace qi = boost::spirit::qi;
namespace ascii = boost::spirit::ascii;
......@@ -204,13 +224,13 @@ protected:
void generateAST(std::string expr);
void dumpAST();
static int64_t physicalSensorInterpolator(Connection* conn, SensorConfig& sc, PublicSensor& sensor, TimeStamp t);
static int64_t physicalSensorInterpolator(Connection* connection, SensorConfig& sc, PhysicalSensorCacheContainer& pscc, PublicSensor& sensor, TimeStamp t);
public:
void getInputs(std::unordered_set<std::string>& inputSet);
void getInputsRecursive(std::unordered_set<std::string>& inputSet, bool virtualOnly);
int64_t evaluateAt(TimeStamp time);
int64_t evaluateAt(TimeStamp time, PhysicalSensorCacheContainer& pscc);
VSensorExpressionImpl(Connection* conn, std::string expr);
virtual ~VSensorExpressionImpl();
......@@ -230,6 +250,7 @@ protected:
SensorId* vsensorid;
TimeStamp tzero;
uint64_t frequency;
static PhysicalSensorCacheContainer physicalSensorCaches;
public:
VSError query(std::list<SensorDataStoreReading>& result, TimeStamp& start, TimeStamp& end);
......
......@@ -77,111 +77,52 @@ VSensor::~VSensor()
}
}
namespace VirtualSensor {
/*
* Implementations for VSensorExpressionImpl class.
* Implementations for PhysicalSensorCache class
*/
void VSensorExpressionImpl::generateAST(std::string expr)
{
/* Try to generate AST */
typedef std::string::const_iterator StringIterator;
typedef ExpressionGrammar<StringIterator> Grammar;
ascii::space_type space;
Grammar grammar;
/* Add the list of known sensors to the grammar */
std::list<std::string> sensorNames;
SensorConfig sc(connection);
sc.getPublicSensorNames(sensorNames);
grammar.addSensorNames(sensorNames);
StringIterator it = expr.begin();
StringIterator end = expr.end();
bool success = phrase_parse(it, end, grammar, space, opseq);
if ((!success) || (it != end)) {
std::string rest(it, end);
throw VSExpressionParserException(rest);
}
/* Success - opseq now represents the top level of our AST */
}
void VSensorExpressionImpl::dumpAST()
#define PSC_READ_AHEAD "1000"
#define PSC_READ_BEHIND "1"
void PhysicalSensorCache::populate(Connection* connection, SensorConfig& sc, uint64_t t)
{
/* Declare a struct describing the action for each object in the AST when it comes to printing */
struct ASTPrinter {
typedef void result_type;
void operator()(AST::Nil) const {}
void operator()(unsigned int n) const { std::cout << n; }
void operator()(std::string s) const { std::cout << "sensor(" << s << ")"; }
void operator()(AST::Op const& x) const {
boost::apply_visitor(*this, x.oprnd);
switch (x.oprtr) {
case '+': std::cout << " add"; break;
case '-': std::cout << " sub"; break;
case '*': std::cout << " mul"; break;
case '/': std::cout << " div"; break;
}
}
void operator()(AST::Signd const& x) const {
boost::apply_visitor(*this, x.oprnd);
switch (x.sgn) {
case '-': std::cout << " neg"; break;
case '+': std::cout << " pos"; break;
}
}
void operator()(AST::Opseq const& x) const {
boost::apply_visitor(*this, x.frst);
BOOST_FOREACH(AST::Op const& o, x.rst) {
std::cout << ' ';
(*this)(o);
}
}
};
ASTPrinter printer;
printer(opseq);
std::cout << std::endl;
}
int64_t VSensorExpressionImpl::physicalSensorInterpolator(Connection* connection, SensorConfig& sc, PublicSensor& sensor, TimeStamp t)
{
/*
* FIXME: Very naive and inefficient implementation here requiring 2 queries per request.
* In a proper implementation, we would query the full series for every physical input sensor
* and keep them in memory to be evaluated here.
/* FIXME: This function has a couple of problems:
* - Error handling should be improved (exceptions instead of simply returning to caller)
* - If reading ahead or reading behind does not return the number of readings specified
* by PSC_READ_AHEAD/PSC_READ_BEHIND, this could have two reasons of which only one is
* currently being addressed:
* 1) there is simply no more data (time series ends)
* 2) there could be more data but under another sensorId (either because the request
* window crosses a week stamp or because the pattern expansion results in multiple
* internal sensor IDs.
* -
*/
CassSession* session = connection->getSessionHandle();
/* Expand the sensor's public name into its internal SensorId */
/* FIXME: Should do proper error handling. Returning 0 is probably wrong! */
std::list<DCDB::SensorId> sensorIds;
switch (sc.getSensorListForPattern(sensorIds, sensor.pattern, t, t)) {
switch (sc.getSensorListForPattern(sensorIds, s.pattern, t, t)) {
case DCDB::SC_OK:
break;
case DCDB::SC_INVALIDPATTERN:
std::cout << "Invalid pattern." << std::endl;
return 0;
return;
default:
std::cout << "Unknown error." << std::endl;
return 0;
return;
}
/* The sensorIds list should only contain one entry */
std::list<DCDB::SensorId>::iterator sit = sensorIds.begin();
// std::cout << "Raw sensor id: " << std::hex << std::setfill('0') << std::setw(16) << sit->getRaw()[0] << " " << std::hex << std::setfill('0') << std::setw(16) << sit->getRaw()[1] << std::dec << std::endl;
/* Find the readings just before and just after time t */
/* Find the readings before and after time t */
CassError rc = CASS_OK;
CassStatement* statement = NULL;
CassFuture *future = NULL;
const CassPrepared* prepared = nullptr;
const char* queryBefore = "SELECT * FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE sid = ? AND ts <= ? ORDER BY ts DESC LIMIT 1;";
const char* queryAfter = "SELECT * FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE sid = ? AND ts > ? LIMIT 1;";
const char* queryBefore = "SELECT * FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE sid = ? AND ts <= ? ORDER BY ts DESC LIMIT " PSC_READ_BEHIND;
const char* queryAfter = "SELECT * FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE sid = ? AND ts > ? LIMIT " PSC_READ_AHEAD;
SensorDataStoreReading readingBefore, readingAfter;
std::string key = sit->serialize();
/* Query before... */
......@@ -192,7 +133,7 @@ int64_t VSensorExpressionImpl::physicalSensorInterpolator(Connection* connection
if (rc != CASS_OK) {
connection->printError(future);
cass_future_free(future);
return 0;
return;
}
prepared = cass_future_get_prepared(future);
......@@ -200,7 +141,7 @@ int64_t VSensorExpressionImpl::physicalSensorInterpolator(Connection* connection
statement = cass_prepared_bind(prepared);
cass_statement_bind_bytes(statement, 0, (const cass_byte_t*)(key.c_str()), 16);
cass_statement_bind_int64(statement, 1, t.getRaw());
cass_statement_bind_int64(statement, 1, t);
future = cass_session_execute(session, statement);
cass_future_wait(future);
......@@ -209,25 +150,19 @@ int64_t VSensorExpressionImpl::physicalSensorInterpolator(Connection* connection
const CassResult* cresult = cass_future_get_result(future);
CassIterator* rows = cass_iterator_from_result(cresult);
if (cass_iterator_next(rows)) {
while (cass_iterator_next(rows)) {
const CassRow* row = cass_iterator_get_row(rows);
cass_int64_t ts, value;
cass_value_get_int64(cass_row_get_column_by_name(row, "ts"), &ts);
cass_value_get_int64(cass_row_get_column_by_name(row, "value"), &value);
readingBefore.timeStamp = (uint64_t)ts;
readingBefore.value = (int64_t)value;
}
else {
std::stringstream msg;
msg << "Cannot find reading for sensor " << sensor.name << " prior to time " << t.getString() << "(" << t.getRaw() << ")" << std::endl;
cass_iterator_free(rows);
cass_result_free(cresult);
cass_statement_free(statement);
cass_future_free(future);
cass_prepared_free(prepared);
throw(PhysicalSensorEvaluatorException(msg.str()));
SensorDataStoreReading r;
r.sensorId = *sit;
r.timeStamp = (uint64_t)ts;
r.value = (int64_t)value;
cache.insert(std::make_pair((uint64_t)ts, r));
}
cass_iterator_free(rows);
cass_result_free(cresult);
......@@ -245,7 +180,7 @@ int64_t VSensorExpressionImpl::physicalSensorInterpolator(Connection* connection
if (rc != CASS_OK) {
connection->printError(future);
cass_future_free(future);
return 0;
return;
}
prepared = cass_future_get_prepared(future);
......@@ -253,7 +188,7 @@ int64_t VSensorExpressionImpl::physicalSensorInterpolator(Connection* connection
statement = cass_prepared_bind(prepared);
cass_statement_bind_bytes(statement, 0, (const cass_byte_t*)(key.c_str()), 16);
cass_statement_bind_int64(statement, 1, t.getRaw());
cass_statement_bind_int64(statement, 1, t);
future = cass_session_execute(session, statement);
cass_future_wait(future);
......@@ -262,33 +197,167 @@ int64_t VSensorExpressionImpl::physicalSensorInterpolator(Connection* connection
const CassResult* cresult = cass_future_get_result(future);
CassIterator* rows = cass_iterator_from_result(cresult);
if (cass_iterator_next(rows)) {
while (cass_iterator_next(rows)) {
const CassRow* row = cass_iterator_get_row(rows);
cass_int64_t ts, value;
cass_value_get_int64(cass_row_get_column_by_name(row, "ts"), &ts);
cass_value_get_int64(cass_row_get_column_by_name(row, "value"), &value);
readingAfter.timeStamp = (uint64_t)ts;
readingAfter.value = (int64_t)value;
SensorDataStoreReading r;
r.sensorId = *sit;
r.timeStamp = (uint64_t)ts;
r.value = (int64_t)value;
cache.insert(std::make_pair((uint64_t)ts, r));
}
else {
std::stringstream msg;
msg << "Cannot find reading for sensor " << sensor.name << " following time " << t.getString() << "(" << t.getRaw() << ")" << std::endl;
cass_iterator_free(rows);
cass_result_free(cresult);
}
cass_statement_free(statement);
cass_future_free(future);
cass_prepared_free(prepared);
throw(PhysicalSensorEvaluatorException(msg.str()));
#if 0
std::cerr << "Cache for sensor " << s.name << " after populate:" << std::endl;
for (std::map<uint64_t, SensorDataStoreReading>::iterator i=cache.begin(); i != cache.end(); i++) {
std::cerr << i->first << " : " << i->second.timeStamp.getRaw() << " : " << i->second.value << std::endl;
}
cass_iterator_free(rows);
cass_result_free(cresult);
#endif
}
void PhysicalSensorCache::getBefore(Connection* connection, SensorConfig& sc, SensorDataStoreReading& r, uint64_t t)
{
/* Check the cache */
std::map<uint64_t, SensorDataStoreReading>::iterator i = cache.lower_bound(t);
if (i == cache.end()) {
// std::cerr << "Cache miss in getBefore(" << s.name << ", " << t << ")" << std::endl;
populate(connection, sc, t);
i = cache.lower_bound(t);
}
if(i != cache.begin() && i != cache.end()) {
--i;
}
else {
std::stringstream msg;
TimeStamp ts(t);
msg << "Cannot find reading for sensor " << s.name << " prior to time " << ts.getString() << "(" << ts.getRaw() << ")" << std::endl;
throw PhysicalSensorEvaluatorException(msg.str());
}
cass_statement_free(statement);
cass_future_free(future);
cass_prepared_free(prepared);
/* If we are here, we have a value */
r = i->second;
}
void PhysicalSensorCache::getAfter(Connection* connection, SensorConfig& sc, SensorDataStoreReading& r, uint64_t t)
{
/* Check the cache */
std::map<uint64_t, SensorDataStoreReading>::iterator i = cache.upper_bound(t);
if (i == cache.end()) {
// std::cerr << "Cache miss in getAfter(" << s.name << ", " << t << ")" << std::endl;
populate(connection, sc, t);
i = cache.lower_bound(t);
if (i == cache.end()) {
std::stringstream msg;
TimeStamp ts(t);
msg << "Cannot find reading for sensor " << s.name << " following time " << ts.getString() << "(" << ts.getRaw() << ")" << std::endl;
throw PhysicalSensorEvaluatorException(msg.str());
}
}
/* If we are here, we have a value */
r = i->second;
}
PhysicalSensorCache::PhysicalSensorCache(PublicSensor sensor)
{
s = sensor;
}
PhysicalSensorCache::~PhysicalSensorCache()
{
}
namespace VirtualSensor {
/*
* Implementations for VSensorExpressionImpl class.
*/
void VSensorExpressionImpl::generateAST(std::string expr)
{
/* Try to generate AST */
typedef std::string::const_iterator StringIterator;
typedef ExpressionGrammar<StringIterator> Grammar;
ascii::space_type space;
Grammar grammar;
/* Add the list of known sensors to the grammar */
std::list<std::string> sensorNames;
SensorConfig sc(connection);
sc.getPublicSensorNames(sensorNames);
grammar.addSensorNames(sensorNames);
StringIterator it = expr.begin();
StringIterator end = expr.end();
bool success = phrase_parse(it, end, grammar, space, opseq);
if ((!success) || (it != end)) {
std::string rest(it, end);
throw VSExpressionParserException(rest);
}
/* Success - opseq now represents the top level of our AST */
}
void VSensorExpressionImpl::dumpAST()
{
/* Declare a struct describing the action for each object in the AST when it comes to printing */
struct ASTPrinter {
typedef void result_type;
void operator()(AST::Nil) const {}
void operator()(unsigned int n) const { std::cout << n; }
void operator()(std::string s) const { std::cout << "sensor(" << s << ")"; }
void operator()(AST::Op const& x) const {
boost::apply_visitor(*this, x.oprnd);
switch (x.oprtr) {
case '+': std::cout << " add"; break;
case '-': std::cout << " sub"; break;
case '*': std::cout << " mul"; break;
case '/': std::cout << " div"; break;
}
}
void operator()(AST::Signd const& x) const {
boost::apply_visitor(*this, x.oprnd);
switch (x.sgn) {
case '-': std::cout << " neg"; break;
case '+': std::cout << " pos"; break;
}
}
void operator()(AST::Opseq const& x) const {
boost::apply_visitor(*this, x.frst);
BOOST_FOREACH(AST::Op const& o, x.rst) {
std::cout << ' ';
(*this)(o);
}
}
};
ASTPrinter printer;
printer(opseq);
std::cout << std::endl;
}
int64_t VSensorExpressionImpl::physicalSensorInterpolator(Connection* connection, SensorConfig& sc, PhysicalSensorCacheContainer& pscc, PublicSensor& sensor, TimeStamp t)
{
SensorDataStoreReading readingBefore, readingAfter;
/* Get readingBefore and readingAfter from the sensor's cache */
pscc[sensor.name]->getBefore(connection, sc, readingBefore, t.getRaw());
pscc[sensor.name]->getAfter(connection, sc, readingAfter, t.getRaw());
// std::cerr << "For time " << t.getRaw() << ", sensor " << sensor.name << ": before=(" << readingBefore.value << "," << readingBefore.timeStamp.getRaw() << ") after=(" << readingAfter.value << "," << readingAfter.timeStamp.getRaw() << ")" << std::endl;
/*
* Linearly interpolate between the readings using the following equation:
......@@ -370,7 +439,7 @@ void VSensorExpressionImpl::getInputsRecursive(std::unordered_set<std::string>&
}
}
int64_t VSensorExpressionImpl::evaluateAt(TimeStamp time)
int64_t VSensorExpressionImpl::evaluateAt(TimeStamp time, PhysicalSensorCacheContainer& pscc)
{
/* Declare a struct describing the action for each object in the AST when it comes to evaluation */
struct ASTEvaluator {
......@@ -390,11 +459,11 @@ int64_t VSensorExpressionImpl::evaluateAt(TimeStamp time)
/* Things are easy if the sensor is virtual */
if (sen.is_virtual) {
VSensorExpressionImpl vSen(c, sen.expression);
return vSen.evaluateAt(t);
return vSen.evaluateAt(t, ps);
}
else {
/* Physical sensors need a little bit more thinkin' */
return physicalSensorInterpolator(c, sc, sen, t);
return physicalSensorInterpolator(c, sc, ps, sen, t);
}
return 0;
......@@ -425,13 +494,14 @@ int64_t VSensorExpressionImpl::evaluateAt(TimeStamp time)
);
}
ASTEvaluator(Connection* conn, TimeStamp time) : c(conn), t(time) {}
ASTEvaluator(Connection* conn, TimeStamp time, PhysicalSensorCacheContainer& pscc) : c(conn), t(time), ps(pscc) {}
Connection* c;
TimeStamp t;
PhysicalSensorCacheContainer& ps;
};
ASTEvaluator eval(connection, time);
ASTEvaluator eval(connection, time, pscc);
return eval(opseq);
}
......@@ -451,8 +521,30 @@ VSensorExpressionImpl::~VSensorExpressionImpl()
/*
* Implementations for VSensorImpl class.
*/
PhysicalSensorCacheContainer VSensorImpl::physicalSensorCaches;
VSError VSensorImpl::query(std::list<SensorDataStoreReading>& result, TimeStamp& start, TimeStamp& end)
{
/* Clear physical sensor caches */
for (PhysicalSensorCacheContainer::iterator it = physicalSensorCaches.begin(); it != physicalSensorCaches.end(); it++) {
delete it->second;
}
physicalSensorCaches.clear();
/* Initialize sensor caches */
std::unordered_set<std::string> inputs;
expression->getInputsRecursive(inputs, false);
SensorConfig sc(connection);
PublicSensor psen;
for (std::unordered_set<std::string>::iterator it = inputs.begin(); it != inputs.end(); it++) {
sc.getPublicSensorByName(psen, it->c_str());
if (!psen.is_virtual) {
// std::cerr << "Adding to list of phys sensor caches: " << *it << std::endl;
physicalSensorCaches.insert(std::make_pair(*it, new PhysicalSensorCache(psen)));
}
}
/*
* Calculate first and last time stamp at which this virtual sensor fires:
* Each virtual sensor fires at t0 + n*frequency (n=0,1,2,....)
......@@ -471,7 +563,7 @@ VSError VSensorImpl::query(std::list<SensorDataStoreReading>& result, TimeStamp&
i += frequency)
{
try {
int64_t eval = expression->evaluateAt(i);
int64_t eval = expression->evaluateAt(i, physicalSensorCaches);
TimeStamp t(i);
SensorDataStoreReading r;
r.timeStamp = t;
......@@ -522,6 +614,12 @@ VSensorImpl::VSensorImpl(Connection *conn, PublicSensor sensor)
VSensorImpl::~VSensorImpl()
{
/* Clear physical sensor caches */
for (PhysicalSensorCacheContainer::iterator it = physicalSensorCaches.begin(); it != physicalSensorCaches.end(); it++) {
delete it->second;
}
physicalSensorCaches.clear();
if (expression) {
delete expression;
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment