Commit 3ed2bd1d authored by Daniele Tafani's avatar Daniele Tafani
Browse files

Added functions like Deltas, Integrals and Derivatives to queries. Still need...

Added functions like Deltas, Integrals and Derivatives to queries. Still need to fix unit conversions and scaling factors for all functions.
parent 53421384
......@@ -66,7 +66,7 @@ public:
std::string v_sensorid; /**< For virtual sensors, this field holds a SensorID used for storing cached values in the database. (FIXME: Cache to be implemented) */
uint64_t t_zero; /**< For virtual sensors, this field holds the first point in time at which the sensor carries a value. */
uint64_t frequency; /**< For virtual sensors, this field holds the interval at which the sensor evaluates (in nanoseconds). */
PublicSensor();
PublicSensor(const PublicSensor &copy);
};
......
......@@ -48,6 +48,7 @@
#ifndef DCDB_VIRTUAL_SENSOR_INTERNAL_H
#define DCDB_VIRTUAL_SENSOR_INTERNAL_H
#define BOOST_SPIRIT_DEBUG
namespace DCDB {
......@@ -104,6 +105,14 @@ struct Nil {};
*/
struct Signd;
/**
* @brief The FUNCTION object.
*
* The FUNCTION object represents a generic function that could be applied to an operand (expression) in the AST.
* Currently supported functions: delta_(sensor).
*/
//struct Function;
/**
* @brief The OPSEQ object.
*
......@@ -203,6 +212,7 @@ struct ExpressionGrammar : qi::grammar<Iterator, AST::Opseq(), ascii::space_type
| '(' >> expression >> ')'
| (qi::char_('-') >> factor)
| (qi::char_('+') >> factor)
| delta
| sensor
;
......@@ -212,6 +222,9 @@ struct ExpressionGrammar : qi::grammar<Iterator, AST::Opseq(), ascii::space_type
qi::rule<Iterator, AST::Opseq(), ascii::space_type> term;
qi::rule<Iterator, AST::Operand(), ascii::space_type> factor;
qi::symbols<char, std::string> sensor;
qi::symbols<char, std::string> delta;
std::string deltaStr = "delta_";
/**
* @brief Populates the parser grammar with a symbol table of available sensor names.
......@@ -224,6 +237,7 @@ struct ExpressionGrammar : qi::grammar<Iterator, AST::Opseq(), ascii::space_type
void addSensorNames(std::list<std::string> sensorNames) {
for (std::list<std::string>::iterator it = sensorNames.begin(); it != sensorNames.end(); it++) {
sensor.add (it->c_str(), *it);
delta.add(deltaStr + it->c_str(), deltaStr + *it);
}
}
};
......@@ -244,12 +258,13 @@ protected:
void dumpAST();
static int64_t physicalSensorInterpolator(Connection* connection, SensorConfig& sc, PhysicalSensorCacheContainer& pscc, PublicSensor& sensor, TimeStamp t);
static int64_t physicalSensorDelta(Connection* connection, SensorConfig& sc, PhysicalSensorCacheContainer& pscc, PublicSensor& sensor, TimeStamp t, TimeStamp tzero, uint64_t frequency);
public:
void getInputs(std::unordered_set<std::string>& inputSet);
void getInputsRecursive(std::unordered_set<std::string>& inputSet, bool virtualOnly);
int64_t evaluateAt(TimeStamp time, PhysicalSensorCacheContainer& pscc);
int64_t evaluateAt(TimeStamp time, PhysicalSensorCacheContainer& pscc, TimeStamp tzero, uint64_t frequency);
VSensorExpressionImpl(Connection* conn, std::string expr);
virtual ~VSensorExpressionImpl();
......@@ -272,6 +287,11 @@ protected:
static PhysicalSensorCacheContainer physicalSensorCaches;
public:
TimeStamp getTZero();
uint64_t getFrequency();
void setTZero(TimeStamp tzero);
void setFrequency(uint64_t frequency);
VSError query(std::list<SensorDataStoreReading>& result, TimeStamp& start, TimeStamp& end);
VSError queryCB(SensorDataStore::QueryCbFunc cbFunc, void* userData, TimeStamp& start, TimeStamp& end);
......
......@@ -315,7 +315,7 @@ void VSensorExpressionImpl::generateAST(std::string expr)
typedef ExpressionGrammar<StringIterator> Grammar;
ascii::space_type space;
Grammar grammar;
Grammar grammar;
/* Add the list of known sensors to the grammar */
std::list<std::string> sensorNames;
......@@ -359,6 +359,8 @@ void VSensorExpressionImpl::dumpAST()
case '+': std::cout << " pos"; break;
}
}
// void operator()(AST::Function const& x) const {std::cout << " " << x.fnctn;}
void operator()(AST::Opseq const& x) const {
boost::apply_visitor(*this, x.frst);
BOOST_FOREACH(AST::Op const& o, x.rst) {
......@@ -373,6 +375,19 @@ void VSensorExpressionImpl::dumpAST()
std::cout << std::endl;
}
int64_t VSensorExpressionImpl::physicalSensorDelta(Connection* connection, SensorConfig& sc, PhysicalSensorCacheContainer& pscc, PublicSensor& sensor, TimeStamp t, TimeStamp tzero, uint64_t frequency)
{
int64_t currentSensorReading = physicalSensorInterpolator(connection, sc, pscc, sensor, t);
if( t.getRaw() == tzero.getRaw())
return currentSensorReading;
else {
TimeStamp previousT(t.getRaw() - frequency);
int64_t previousSensorReading = physicalSensorInterpolator(connection, sc, pscc, sensor, previousT);
return currentSensorReading - previousSensorReading;
}
}
int64_t VSensorExpressionImpl::physicalSensorInterpolator(Connection* connection, SensorConfig& sc, PhysicalSensorCacheContainer& pscc, PublicSensor& sensor, TimeStamp t)
{
SensorDataStoreReading readingBefore, readingAfter;
......@@ -407,13 +422,19 @@ void VSensorExpressionImpl::getInputs(std::unordered_set<std::string>& inputSet)
typedef void result_type;
void operator()(AST::Nil) const {}
void operator()(unsigned int n) const { }
void operator()(std::string s) const { is.insert(s); }
void operator()(std::string s) const {
if(s.find("delta_")!=std::string::npos)
is.insert(s.substr(6, (std::size_t) s.length() - 6));
else
is.insert(s);
}
void operator()(AST::Op const& x) const {
boost::apply_visitor(*this, x.oprnd);
}
void operator()(AST::Signd const& x) const {
boost::apply_visitor(*this, x.oprnd);
}
void operator()(AST::Opseq const& x) const {
boost::apply_visitor(*this, x.frst);
BOOST_FOREACH(AST::Op const& o, x.rst) {
......@@ -463,7 +484,7 @@ void VSensorExpressionImpl::getInputsRecursive(std::unordered_set<std::string>&
}
}
int64_t VSensorExpressionImpl::evaluateAt(TimeStamp time, PhysicalSensorCacheContainer& pscc)
int64_t VSensorExpressionImpl::evaluateAt(TimeStamp time, PhysicalSensorCacheContainer& pscc, TimeStamp tzero, uint64_t frequency)
{
/* Declare a struct describing the action for each object in the AST when it comes to evaluation */
struct ASTEvaluator {
......@@ -472,28 +493,62 @@ int64_t VSensorExpressionImpl::evaluateAt(TimeStamp time, PhysicalSensorCacheCon
int64_t operator()(unsigned int n) const { return n; }
int64_t operator()(std::string s) const {
/* Evaluate sensor s at time t */
SensorConfig sc(c);
PublicSensor sen;
if (sc.getPublicSensorByName(sen, s.c_str()) != SC_OK) {
std::cout << "Internal error on getPublicSensorByName while trying to evaluate " << s << " at " << t.getString() << std::endl;
return 0;
if(s.find("delta_")!=std::string::npos) {
/* Evaluate sensor s at time t */
SensorConfig sc(c);
PublicSensor sen;
std::string sensorName = s.substr(6, (std::size_t) s.length() - 6);
if (sc.getPublicSensorByName(sen, sensorName.c_str()) != SC_OK) {
std::cout << "Internal error on getPublicSensorByName while trying to evaluate " << sensorName << " at " << t.getString() << std::endl;
return 0;
}
/* Things are easy if the sensor is virtual */
if (sen.is_virtual) {
VSensorExpressionImpl vSen(c, sen.expression);
int64_t currentSensorReading = vSen.evaluateAt(t, ps, tz, f);
if(t.getRaw() == tz.getRaw())
return currentSensorReading;
else {
TimeStamp previousT(t.getRaw() - f);
int64_t previousSensorReading = vSen.evaluateAt(previousT, ps, tz, f);
return currentSensorReading - previousSensorReading;
}
}
else {
/* Physical sensors need a little bit more thinkin' */
return physicalSensorDelta(c, sc, ps, sen, t, tz, f);
}
}
else {
/* Evaluate sensor s at time t */
SensorConfig sc(c);
PublicSensor sen;
if (sc.getPublicSensorByName(sen, s.c_str()) != SC_OK) {
std::cout << "Internal error on getPublicSensorByName while trying to evaluate " << s << " at " << t.getString() << std::endl;
return 0;
}
/* Things are easy if the sensor is virtual */
if (sen.is_virtual) {
VSensorExpressionImpl vSen(c, sen.expression);
return vSen.evaluateAt(t, ps, tz, f);
}
else {
/* Physical sensors need a little bit more thinkin' */
return physicalSensorInterpolator(c, sc, ps, sen, t);
}
return 0;
}
/* Things are easy if the sensor is virtual */
if (sen.is_virtual) {
VSensorExpressionImpl vSen(c, sen.expression);
return vSen.evaluateAt(t, ps);
}
else {
/* Physical sensors need a little bit more thinkin' */
return physicalSensorInterpolator(c, sc, ps, sen, t);
}
return 0;
}
int64_t operator()(int64_t lhs, AST::Op const& x) const {
int64_t rhs = boost::apply_visitor(*this, x.oprnd);
switch (x.oprtr) {
case '+': return lhs + rhs; break;
case '-': return lhs - rhs; break;
......@@ -510,6 +565,22 @@ int64_t VSensorExpressionImpl::evaluateAt(TimeStamp time, PhysicalSensorCacheCon
}
return 0;
}
// int64_t operator()(AST::Function const& x) const {
// switch (x.fnctn) {
//
// /* Switch-case is here for future function definitions...*/
// case "delta_":
//
//
// /* case 'functionX_':
// * case 'functionY_':
// * ...
// */
// break;
// }
// return 0;
// }
int64_t operator()(AST::Opseq const& x) const {
return std::accumulate(
x.rst.begin(), x.rst.end(),
......@@ -518,14 +589,16 @@ int64_t VSensorExpressionImpl::evaluateAt(TimeStamp time, PhysicalSensorCacheCon
);
}
ASTEvaluator(Connection* conn, TimeStamp time, PhysicalSensorCacheContainer& pscc) : c(conn), t(time), ps(pscc) {}
ASTEvaluator(Connection* conn, TimeStamp time, PhysicalSensorCacheContainer& pscc, TimeStamp tzero, uint64_t frequency) : c(conn), t(time), ps(pscc), tz(tzero), f(frequency) {}
Connection* c;
TimeStamp t;
PhysicalSensorCacheContainer& ps;
TimeStamp tz;
uint64_t f;
};
ASTEvaluator eval(connection, time, pscc);
ASTEvaluator eval(connection, time, pscc, tzero, frequency);
return eval(opseq);
}
......@@ -536,6 +609,8 @@ VSensorExpressionImpl::VSensorExpressionImpl(Connection* conn, std::string expr)
/* Generate the AST for the expression */
generateAST(expr);
dumpAST();
}
VSensorExpressionImpl::~VSensorExpressionImpl()
......@@ -587,7 +662,7 @@ VSError VSensorImpl::query(std::list<SensorDataStoreReading>& result, TimeStamp&
i += frequency)
{
try {
int64_t eval = expression->evaluateAt(i, physicalSensorCaches);
int64_t eval = expression->evaluateAt(i, physicalSensorCaches, this->getTZero(), this->getFrequency());
TimeStamp t(i);
SensorDataStoreReading r;
r.timeStamp = t;
......@@ -639,7 +714,7 @@ VSError VSensorImpl::queryCB(SensorDataStore::QueryCbFunc cbFunc, void* userData
i += frequency)
{
try {
int64_t eval = expression->evaluateAt(i, physicalSensorCaches);
int64_t eval = expression->evaluateAt(i, physicalSensorCaches, this->tzero, this->frequency);
TimeStamp t(i);
SensorDataStoreReading r;
r.timeStamp = t;
......@@ -654,6 +729,22 @@ VSError VSensorImpl::queryCB(SensorDataStore::QueryCbFunc cbFunc, void* userData
return VS_OK;
}
TimeStamp VSensorImpl::getTZero() {
return tzero;
}
uint64_t VSensorImpl::getFrequency() {
return frequency;
}
void VSensorImpl::setTZero(TimeStamp t) {
tzero = t;
}
void VSensorImpl::setFrequency(uint64_t f) {
frequency = f;
}
VSensorImpl::VSensorImpl(Connection *conn, std::string name)
{
SensorConfig sc(conn);
......
......@@ -65,6 +65,26 @@ bool DCDBQuery::getFloatOutputEnabled() {
void DCDBQuery::genOutput(std::list<DCDB::SensorDataStoreReading> &results)
{
int64_t prev = 0;
uint64_t prevT = 0;
int64_t dx = 0;
uint64_t dt = 0;
/* Print Header */
std::cout << "Sensor,Time";
if(printValue)
std::cout << ",Value";
if(calculateDelta)
std::cout << ",Delta";
if(calculateDeltaT)
std::cout << ",DeltaT";
if(calculateDerivative)
std::cout << ",Derivative";
if(calculateIntegral)
std::cout << ",Integral";
/*End of header */
std::cout << std::endl;
for (std::list<DCDB::SensorDataStoreReading>::iterator reading =
results.begin(); reading != results.end(); reading++) {
double fvalue;
......@@ -76,7 +96,7 @@ void DCDBQuery::genOutput(std::list<DCDB::SensorDataStoreReading> &results)
} else {
ivalue = (*reading).value;
}
/* Convert the unit if requested */
if (unitConvert) {
if (useFloatOutput) {
......@@ -106,27 +126,70 @@ void DCDBQuery::genOutput(std::list<DCDB::SensorDataStoreReading> &results)
(*reading).timeStamp.convertToLocal();
}
if (useRawOutput) {
std::cout << (*reading).timeStamp.getRaw() << ",";
std::cout << (*reading).timeStamp.getRaw();
} else {
std::cout << (*reading).timeStamp.getString() << ",";
std::cout << (*reading).timeStamp.getString();
}
/* Print the sensor value */
if (useFloatOutput) {
std::cout << fvalue << ",";
} else {
std::cout << ivalue << ",";
}
if(printValue) {
if (useFloatOutput) {
std::cout << "," << fvalue;
} else {
std::cout << "," << ivalue;
}
}
/* Print delta */
if (reading != results.begin()) {
std::cout << (*reading).value - prev;
}
prev = (*reading).value;
/* Print Delta */
if(calculateDelta)
{
if (reading != results.begin()) {
dx = (*reading).value - prev;
std::cout << "," << dx;
}
}
/* Print Delta T*/
if(calculateDeltaT)
{
if (reading != results.begin()) {
dt = (*reading).timeStamp.getRaw() - prevT;
std::cout << "," << dt;
}
}
/* Print Derivative*/
if(calculateDerivative)
{
if(reading != results.begin()) {
if(!calculateDelta)
dx = (*reading).value - prev;
if(!calculateDeltaT)
dt = (*reading).timeStamp.getRaw() - prevT;
if(reading != results.begin())
std::cout << "," << dx / dt;
}
}
/* Print Integral*/
if(calculateIntegral)
{
if(reading != results.begin()) {
if(!calculateDelta)
dx = (*reading).value - prev;
if(!calculateDeltaT)
dt = (*reading).timeStamp.getRaw() - prevT;
std::cout << "," << dx*dt;
}
}
std::cout << std::endl;
prev = (*reading).value;
prevT = (*reading).timeStamp.getRaw();
}
}
void DCDBQuery::doQuery(const char* hostname, std::list<std::string> sensors, DCDB::TimeStamp start, DCDB::TimeStamp end)
{
/* Create a new connection to the database */
......@@ -141,42 +204,90 @@ void DCDBQuery::doQuery(const char* hostname, std::list<std::string> sensors, DC
DCDB::SensorConfig sensorConfig(connection);
/* Print the CSV header */
std::cout << "Sensor,Time,Value,Delta" << std::endl;
//std::cout << "Sensor,Time,Value,Delta,DeltaT,Derivative,Integral" << std::endl;
/* Iterate over list of sensors requested by the user */
for (std::list<std::string>::iterator it = sensors.begin(); it != sensors.end(); it++) {
unitConvert = false;
printValue = false;
calculateDelta = false;
calculateDeltaT = false;
calculateDerivative = false;
calculateIntegral = false;
scale = false;
scalingFactor = 1;
std::string modifierStr;
baseUnit = DCDB::Unit_None;
targetUnit = DCDB::Unit_None;
/* Check if the sensor was requested in a different unit or with scaling factor */
if (it->find('/') != std::string::npos) {
modifierStr = it->substr(it->find('/')+1, it->length());
/* Remove the modifier from the string */
*it = it->substr(0, it->find('/'));
/* Check what type of modificatino is requested */
boost::regex e("\\.?[0-9]*", boost::regex::extended);
if (boost::regex_match(modifierStr, e)) {
scale = true;
sscanf(modifierStr.c_str(), "%lf", &scalingFactor);
}
else {
unitConvert = true;
targetUnit = DCDB::UnitConv::fromString(modifierStr);
DCDB::PublicSensor sen;
sensorConfig.getPublicSensorByName(sen, it->c_str());
baseUnit = DCDB::UnitConv::fromString(sen.unit);
sensorName = *it;
while(it != sensors.end()) {
if(it->find(sensorName) != std::string::npos) {
/* Check if the sensor was requested in a different unit or with scaling factor */
if (it->find('/') != std::string::npos) {
modifierStr = it->substr(it->find('/')+1, it->length());
/* Remove the modifier from the string */
*it = it->substr(0, it->find('/'));
/* Check what type of modification is requested */
boost::regex e("\\.?[0-9]*", boost::regex::extended);
if (boost::regex_match(modifierStr, e)) {
scale = true;
sscanf(modifierStr.c_str(), "%lf", &scalingFactor);
}
else {
unitConvert = true;
targetUnit = DCDB::UnitConv::fromString(modifierStr);
DCDB::PublicSensor sen;
sensorConfig.getPublicSensorByName(sen, it->c_str());
baseUnit = DCDB::UnitConv::fromString(sen.unit);
}
}
/* Check if there are functions defined on sensors (e.g., delta, integral, etc.) */
if (it->find("#delta") != std::string::npos) {
/* Remove the modifier from the string */
*it = it->substr(0, it->find("#delta"));
calculateDelta = true;
sensorName = *it;
}
else if (it->find("#delta_t") != std::string::npos) {
/* Remove the modifier from the string */
calculateDeltaT = true;
sensorName = *it;
}
else if (it->find("#integ") != std::string::npos) {
/* Remove the modifier from the string */
*it = it->substr(0, it->find("#integ"));
calculateIntegral = true;
sensorName = *it;
}
else if (it->find("#deriv") != std::string::npos) {
/* Remove the modifier from the string */
*it = it->substr(0, it->find("#deriv"));
calculateDerivative = true;
sensorName = *it;
}
else {
printValue = true;
sensorName = *it;
}
it = sensors.erase(it);
}
else
it++;
}
std::list<DCDB::SensorDataStoreReading> results;
sensorName = *it;
DCDB::Sensor sensor(connection, sensorName);
if (scale) {
sensor.setScalingFactor(scalingFactor);
......
......@@ -52,6 +52,11 @@ protected:
bool scale;
double scalingFactor;
bool unitConvert;
bool printValue;
bool calculateDelta;
bool calculateDeltaT;
bool calculateDerivative;
bool calculateIntegral;
DCDB::Unit baseUnit;
DCDB::Unit targetUnit;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment