Commit edc7490c authored by Michael Ott's avatar Michael Ott
Browse files

Rework DCDBQuery::doQuery() and DCDBQuery::genOutput() to leverage proper...

Rework DCDBQuery::doQuery() and DCDBQuery::genOutput() to leverage proper regular expressions for function-, sensor-, and modifier-parsing.
parent 97fa91c6
......@@ -55,330 +55,214 @@ bool DCDBQuery::getRawOutputEnabled() {
return useRawOutput;
}
void DCDBQuery::genOutput(std::list<DCDB::SensorDataStoreReading> &results)
{
int64_t prev = 0;
uint64_t prevT = 0;
/* Print Header */
std::cout << "Sensor,Time";
if(valueFormat.printValue) {
std::cout << ",Value";
if(valueFormat.unit != "none")
std::cout << " (" << valueFormat.unit << ")";
bool scaleAndConvert(int64_t &value, double baseScalingFactor, double scalingFactor, DCDB::Unit baseUnit, DCDB::Unit unit) {
if(scalingFactor != 1.0 || baseScalingFactor != 1.0) {
if( DCDB::scale(&value, scalingFactor, baseScalingFactor) == DCDB::DCDB_OP_OVERFLOW)
return false;
}
if(deltaFormat.printValue) {
std::cout << ",Delta";
if(deltaFormat.unit != "none")
std::cout << " (" << deltaFormat.unit << ")";
/* Convert the unit if requested */
if ((unit != DCDB::Unit_None) && (unit != baseUnit)) {
if (!DCDB::UnitConv::convert(value, baseUnit, unit)) {
std::cerr << "Warning, cannot convert units ("
<< DCDB::UnitConv::toString(baseUnit) << " -> "
<< DCDB::UnitConv::toString(unit) << ")"
<< std::endl;
return false;
}
}
if(deltaTFormat.printValue) {
std::cout << ",DeltaT";
if(deltaTFormat.unit != "none")
std::cout << " (" << deltaTFormat.unit << ")";
}
if(derivativeFormat.printValue) {
std::cout << ",Derivative";
if(derivativeFormat.unit != "none")
std::cout << " (" << derivativeFormat.unit << ")";
}
if(integralFormat.printValue) {
std::cout << ",Integral";
if(integralFormat.unit != "none")
std::cout << " (" << integralFormat.unit << ")";
return true;
}
void DCDBQuery::genOutput(std::list<DCDB::SensorDataStoreReading> &results, queryMap_t::iterator start, queryMap_t::iterator stop) {
/* Print Header */
std::cout << "Sensor,Time";
for (queryMap_t::iterator it=start; it!=stop; it++) {
switch(it->second.operation) {
case DCDB_OP_NONE:
std::cout << ",Value";
break;
case DCDB_OP_DELTA:
std::cout << ",Delta";
break;
case DCDB_OP_DELTAT:
std::cout << ",Delta_t";
break;
case DCDB_OP_DERIVATIVE:
std::cout << ",Derivative";
break;
case DCDB_OP_INTEGRAL:
std::cout << ",Integral";
break;
}
if(it->second.unit != DCDB::Unit_None) {
std::cout << " (" << DCDB::UnitConv::toString(it->second.unit) << ")";
}
}
/*End of header */
std::cout << std::endl;
for (std::list<DCDB::SensorDataStoreReading>::iterator reading = results.begin(); reading != results.end(); reading++) {
/* Print the sensor's public name */
std::cout << sensorName << ",";
int64_t prevValue;
DCDB::TimeStamp prevT(0llu);
for (std::list<DCDB::SensorDataStoreReading>::iterator reading = results.begin(); reading != results.end(); reading++) {
int64_t value = (*reading).value;
DCDB::TimeStamp ts = (*reading).timeStamp;
/* Print the time stamp */
if (useLocalTime)
(*reading).timeStamp.convertToLocal();
if (useRawOutput)
std::cout << (*reading).timeStamp.getRaw();
else
std::cout << (*reading).timeStamp.getString();
/* Print the sensor's public name */
std::cout << start->first << ",";
/* Print the time stamp */
if (useLocalTime)
ts.convertToLocal();
if (useRawOutput)
std::cout << ts.getRaw();
else
std::cout << ts.getString();
/* Print the sensor value */
if(valueFormat.printValue) {
int64_t result = (*reading).value;
if(valueFormat.scalingFactor != 1.0 || baseScalingFactor != 1.0) {
if( DCDB::scale(&result, valueFormat.scalingFactor, baseScalingFactor) == DCDB::DCDB_OP_OVERFLOW)
notifyOverflow = true;
}
/* Convert the unit if requested */
if ((DCDB::UnitConv::fromString(valueFormat.unit) != DCDB::Unit_None) && (DCDB::UnitConv::fromString(valueFormat.unit) != baseUnit)) {
if (!DCDB::UnitConv::convert(result, baseUnit, DCDB::UnitConv::fromString(valueFormat.unit)))
std::cerr << "Warning, cannot convert units ("
<< DCDB::UnitConv::toString(baseUnit) << " -> "
<< valueFormat.unit << ")"
<< std::endl;
}
std::cout << "," << result;
}
/* Print Delta */
if(deltaFormat.printValue && reading != results.begin()) {
int64_t result;
int64_t current = (*reading).value;
int64_t previous = prev;
if(deltaFormat.scalingFactor != 1.0) {
if( DCDB::scale(&current, deltaFormat.scalingFactor, baseScalingFactor) == DCDB::DCDB_OP_OVERFLOW)
notifyOverflow = true;
if( DCDB::scale(&previous, deltaFormat.scalingFactor, baseScalingFactor) == DCDB::DCDB_OP_OVERFLOW)
notifyOverflow = true;
}
if( DCDB::delta(current, previous, &result) == DCDB::DCDB_OP_OVERFLOW )
notifyOverflow = true;
if ((DCDB::UnitConv::fromString(deltaFormat.unit) != DCDB::Unit_None) && (DCDB::UnitConv::fromString(deltaFormat.unit) != baseUnit)) {
if (!DCDB::UnitConv::convert(result, baseUnit, DCDB::UnitConv::fromString(deltaFormat.unit)))
std::cerr << "Warning, cannot convert units ("
<< DCDB::UnitConv::toString(baseUnit) << " -> "
<< deltaFormat.unit << ")"
<< std::endl;
}
std::cout << "," << result;
}
/* Print Delta T */
if(deltaTFormat.printValue && reading != results.begin()) {
int64_t result;
int64_t current = (*reading).timeStamp.getRaw();
int64_t previous = prevT;
if(deltaTFormat.scalingFactor != 1.0 || baseScalingFactor != 1.0) {
if( DCDB::scale(&current, deltaTFormat.scalingFactor, baseScalingFactor) == DCDB::DCDB_OP_OVERFLOW)
notifyOverflow = true;
if( DCDB::scale(&previous, deltaTFormat.scalingFactor, baseScalingFactor) == DCDB::DCDB_OP_OVERFLOW)
notifyOverflow = true;
}
if( DCDB::delta(current, previous, &result) == DCDB::DCDB_OP_OVERFLOW)
notifyOverflow = true;
if ((DCDB::UnitConv::fromString(deltaTFormat.unit) != DCDB::Unit_None) && (DCDB::UnitConv::fromString(deltaTFormat.unit) != baseUnit)) {
if (!DCDB::UnitConv::convert(result, baseUnit, DCDB::UnitConv::fromString(deltaTFormat.unit)))
std::cerr << "Warning, cannot convert units ("
<< DCDB::UnitConv::toString(baseUnit) << " -> "
<< deltaTFormat.unit << ")"
<< std::endl;
}
std::cout << "," << result;
}
/* Print Derivative */
if(derivativeFormat.printValue && reading != results.begin()) {
int64_t result;
int64_t current = (*reading).value;
int64_t previous = prev;
/* No need to check overflow for dt as the scaling only applies to dx */
if(derivativeFormat.scalingFactor != 1.0 || baseScalingFactor != 1.0) {
if( DCDB::scale(&current, derivativeFormat.scalingFactor, baseScalingFactor) == DCDB::DCDB_OP_OVERFLOW)
notifyOverflow = true;
if( DCDB::scale(&previous, derivativeFormat.scalingFactor, baseScalingFactor) == DCDB::DCDB_OP_OVERFLOW)
notifyOverflow = true;
}
if( DCDB::derivative(current, previous, (*reading).timeStamp.getRaw(), prevT, &result) == DCDB::DCDB_OP_OVERFLOW)
notifyOverflow = true;
if ((DCDB::UnitConv::fromString(derivativeFormat.unit) != DCDB::Unit_None) && (DCDB::UnitConv::fromString(derivativeFormat.unit) != baseUnit)) {
if (!DCDB::UnitConv::convert(result, baseUnit, DCDB::UnitConv::fromString(derivativeFormat.unit)))
std::cerr << "Warning, cannot convert units ("
<< DCDB::UnitConv::toString(baseUnit) << " -> "
<< derivativeFormat.unit << ")"
<< std::endl;
}
std::cout << "," << result;
}
/* Print Integral */
if(integralFormat.printValue && reading != results.begin()) {
int64_t result;
int64_t current = (*reading).value;
int64_t previous = prev;
/* Very unlikely that dt > dx in case of overflow, so we just scale dx */
if(integralFormat.scalingFactor != 1.0 || baseScalingFactor != 1.0) {
if( DCDB::scale(&current, integralFormat.scalingFactor, baseScalingFactor) == DCDB::DCDB_OP_OVERFLOW)
notifyOverflow = true;
if( DCDB::scale(&previous, integralFormat.scalingFactor, baseScalingFactor) == DCDB::DCDB_OP_OVERFLOW)
notifyOverflow = true;
}
if( DCDB::integral(current, previous, (*reading).timeStamp.getRaw(), prevT, &result) == DCDB::DCDB_OP_OVERFLOW)
notifyOverflow = true;
if ((DCDB::UnitConv::fromString(integralFormat.unit) != DCDB::Unit_None) && (DCDB::UnitConv::fromString(integralFormat.unit) != baseUnit)) {
if (!DCDB::UnitConv::convert(result, baseUnit, DCDB::UnitConv::fromString(integralFormat.unit)))
std::cerr << "Warning, cannot convert units ("
<< DCDB::UnitConv::toString(baseUnit) << " -> "
<< integralFormat.unit << ")"
<< std::endl;
}
std::cout << "," << result;
}
std::cout << std::endl;
prev = (*reading).value;
prevT = (*reading).timeStamp.getRaw();
/* Print the sensor value */
for (queryMap_t::iterator it=start; it!=stop; it++) {
if (scaleAndConvert(value, baseScalingFactor, it->second.scalingFactor, baseUnit, it->second.unit)) {
switch(it->second.operation) {
case DCDB_OP_NONE:
std::cout << "," << value;
break;
case DCDB_OP_DELTA: {
int64_t result;
if ((prevT > 0llu) && (DCDB::delta(value, prevValue, &result) == DCDB::DCDB_OP_SUCCESS)) {
std::cout << "," << result;
} else {
std::cout << ",";
}
break;}
case DCDB_OP_DELTAT: {
int64_t result;
if ((prevT > 0llu) && (DCDB::delta(ts.getRaw(), prevT.getRaw(), &result) == DCDB::DCDB_OP_SUCCESS)) {
std::cout << "," << result;
} else {
std::cout << ",";
}
break;}
case DCDB_OP_DERIVATIVE: {
int64_t result;
if( (prevT > 0llu) && DCDB::derivative(value, prevValue, ts.getRaw(), prevT.getRaw(), &result) == DCDB::DCDB_OP_SUCCESS) {
std::cout << "," << result;
} else {
std::cout << ",";
}
break;}
case DCDB_OP_INTEGRAL: {
int64_t result;
if( (prevT > 0llu) && DCDB::integral(value, prevValue, ts.getRaw(), prevT.getRaw(), &result) == DCDB::DCDB_OP_SUCCESS) {
std::cout << "," << result;
} else {
std::cout << ",";
}
break;}
}
} else {
std::cout << ",";
}
}
prevValue = (*reading).value;
prevT = (*reading).timeStamp.getRaw();
std::cout << std::endl;
}
}
void DCDBQuery::checkModifier(std::list<std::string>::iterator it, struct outputFormat *format) {
std::string modifierStr = it->substr(it->find('/')+1, it->length());
/* Check what type of modification is requested */
//boost::regex e("\\.?[0-9]*", boost::regex::extended);
boost::regex e("[0-9]*\.?[0-9]*", boost::regex::extended);
if (boost::regex_match(modifierStr, e))
sscanf(modifierStr.c_str(), "%lf", &(format->scalingFactor));
else
format->unit = modifierStr;
}
void DCDBQuery::doQuery(const char* hostname, std::list<std::string> sensors, DCDB::TimeStamp start, DCDB::TimeStamp end)
{
/* Create a new connection to the database */
connection = new DCDB::Connection();
connection->setHostname(hostname);
if (!connection->connect()) {
std::cout << "Cannot connect to database." << std::endl;
exit(EXIT_FAILURE);
}
/* Initialize the SensorConfig interface */
DCDB::SensorConfig sensorConfig(connection);
/* Iterate over list of sensors requested by the user */
for (std::list<std::string>::iterator it = sensors.begin(); it != sensors.end(); it++) {
scalingFactor = 1;
unit = "none";
notifyOverflow = false;
/* Output format initialization */
valueFormat = {false,1,"none"};
deltaFormat = {false,1,"none"};
deltaTFormat = {false,1,"none"};
derivativeFormat = {false,1,"none"};
integralFormat = {false,1,"none"};
std::string modifierStr;
boost::smatch match;
baseUnit = DCDB::Unit_None;
/* Retrieve sensor object first */
boost::regex functExp("^delta|^delta_t|^derivative|^integral", boost::regex::extended);
if(boost::regex_search(*it, match, functExp))
sensorName = it->substr(it->find('(') + 1, it->size() - it->find('(') - (it->size() - it->find(')')) - 1 );
else if (it->find('/') != std::string::npos)
sensorName = it->substr(0, it->find('/'));
else
sensorName = *it;
/* Create a new connection to the database */
connection = new DCDB::Connection();
connection->setHostname(hostname);
if (!connection->connect()) {
std::cout << "Cannot connect to database." << std::endl;
exit(EXIT_FAILURE);
}
DCDB::PublicSensor sen;
sensorConfig.getPublicSensorByName(sen, sensorName.c_str());
/* Base scaling factor and unit of the public sensor */
baseUnit = DCDB::UnitConv::fromString(sen.unit);
baseScalingFactor = sen.scaling_factor;
DCDB::Sensor sensor(connection, sen);
/* Iterate over the list to detect all instances of the sensor with name "sensorName" */
while(it != sensors.end()) {
if(it->find(sensorName) != std::string::npos) {
/* Check first if the sensor we're looking is part of a function ...*/
if(boost::regex_search(*it, match, functExp)) {
if(match[0].compare("delta")==0) {
deltaFormat.printValue = true;
/* ...and for each function, check if we need a different scaling factor or unit... */
if(it->find('/') != std::string::npos) {
checkModifier(it, &deltaFormat);
}
}
else if(match[0].compare("delta_t")==0) {
deltaTFormat.printValue = true;
if(it->find('/') != std::string::npos) {
checkModifier(it, &deltaTFormat);
}
}
else if(match[0].compare("derivative")==0) {
derivativeFormat.printValue = true;
if(it->find('/') != std::string::npos) {
checkModifier(it, &derivativeFormat);
}
}
else if(match[0].compare("integral")==0) {
integralFormat.printValue = true;
if(it->find('/') != std::string::npos) {
checkModifier(it, &integralFormat);
}
}
}
/* ...otherwise, print just the sensor values and check if they are requested in a different scaling factor or unit... */
else {
valueFormat.printValue = true;
if(it->find('/') != std::string::npos) {
checkModifier(it, &valueFormat);
}
}
/*...remove this instance of the sensor from the list */
it = sensors.erase(it);
}
else
it++;
}
std::list<DCDB::SensorDataStoreReading> results;
sensor.query(results, start, end, DCDB::AGGREGATE_NONE);
genOutput(results);
if(notifyOverflow)
std::cout << "Overflow detected." << std::endl;
}
/*
* Clean up
*/
connection->disconnect();
delete connection;
/* Iterate over list of sensors requested by the user */
for (std::list<std::string>::iterator it = sensors.begin(); it != sensors.end(); it++) {
notifyOverflow = false;
std::string sensorName;
std::string functName;
std::string modifierStr;
double scalingFactor = 1.0;
DCDB::Unit unit = DCDB::Unit_None;
/* Retrieve sensor object first */
std::string str = *it;
boost::regex functRegex("^([^\\(\\)]+)\\(([^\\(\\)]+)\\)$", boost::regex::extended);
boost::smatch match;
if(boost::regex_search(str, match, functRegex)) {
functName = match[1].str();
str = match[2].str();
}
boost::regex sensorRegex("([^/]+)/?([^/]*)", boost::regex::extended);
if(boost::regex_search(str, match, sensorRegex)) {
sensorName = match[1].str();
modifierStr = match[2].str();
}
queryConfig_t queryCfg = { 1.0, DCDB::Unit_None, DCDB_OP_NONE};
if (functName.length() == 0) {
queryCfg.operation = DCDB_OP_NONE;
} else if (boost::iequals(functName, "delta")) {
queryCfg.operation = DCDB_OP_DELTA;
} else if (boost::iequals(functName, "delta_t")) {
queryCfg.operation = DCDB_OP_DELTAT;
} else if (boost::iequals(functName, "derivative")) {
queryCfg.operation = DCDB_OP_DERIVATIVE;
} else if (boost::iequals(functName, "integral")) {
queryCfg.operation = DCDB_OP_INTEGRAL;
} else {
queryCfg.operation = DCDB_OP_UNKNOWN;
std::cerr << "Unknown sensor operation: " << functName << std::endl;
}
if (queryCfg.operation != DCDB_OP_UNKNOWN) {
if (modifierStr.length() > 0) {
boost::regex e("[0-9]*\\.?[0-9]*", boost::regex::extended);
if (boost::regex_match(modifierStr, e)) {
queryCfg.scalingFactor = atof(modifierStr.c_str());
} else {
queryCfg.unit = DCDB::UnitConv::fromString(modifierStr);
}
}
queries.insert(std::pair<std::string,queryConfig_t>(sensorName, queryCfg));
}
}
/* Initialize the SensorConfig interface */
DCDB::SensorConfig sensorConfig(connection);
std::string prevSensorName;
for (auto q: queries) {
if (q.first != prevSensorName) {
std::pair<queryMap_t::iterator, queryMap_t::iterator> range = queries.equal_range(q.first);
DCDB::PublicSensor sen;
sensorConfig.getPublicSensorByName(sen, q.first.c_str());
/* Base scaling factor and unit of the public sensor */
baseUnit = DCDB::UnitConv::fromString(sen.unit);
baseScalingFactor = sen.scaling_factor;
std::list<DCDB::SensorDataStoreReading> results;
DCDB::Sensor sensor(connection, sen);
sensor.query(results, start, end, DCDB::AGGREGATE_NONE);
genOutput(results, range.first, range.second);
if(notifyOverflow)
std::cout << "Overflow detected." << std::endl;
prevSensorName = q.first;
}
}
/*
* Clean up
*/
connection->disconnect();
delete connection;
}
DCDBQuery::DCDBQuery()
......
......@@ -27,6 +27,7 @@
#include <list>
#include <string>
#include <map>
#include "dcdb/connection.h"
#include "dcdb/timestamp.h"
......@@ -43,11 +44,21 @@
class DCDBQuery
{
struct outputFormat {
bool printValue;
typedef enum {
DCDB_OP_NONE,
DCDB_OP_DELTA,
DCDB_OP_DELTAT,
DCDB_OP_DERIVATIVE,
DCDB_OP_INTEGRAL,
DCDB_OP_UNKNOWN,
} DCDB_OP_TYPE;
typedef struct queryConfig {
double scalingFactor;
std::string unit;
};
DCDB::Unit unit;
DCDB_OP_TYPE operation;
} queryConfig_t;
typedef std::multimap<std::string, queryConfig_t> queryMap_t;
typedef enum {
CONVERT_OK,
......@@ -59,19 +70,10 @@ protected:
bool useLocalTime;
bool useRawOutput;
std::string sensorName;
double scalingFactor;
std::string unit;
double baseScalingFactor;
DCDB::Unit baseUnit;
outputFormat valueFormat;
outputFormat deltaFormat;
outputFormat deltaTFormat;
outputFormat derivativeFormat;
outputFormat integralFormat;
std::multimap<std::string, queryConfig_t> queries;
double baseScalingFactor;
DCDB::Unit baseUnit;
bool notifyOverflow;
public:
......@@ -80,7 +82,8 @@ public:
void setRawOutputEnabled(bool enable);
bool getRawOutputEnabled();
void genOutput(std::list<DCDB::SensorDataStoreReading> &results);
void genOutput(std::list<DCDB::SensorDataStoreReading> &results, queryMap_t::iterator start, queryMap_t::iterator stop);
void check(std::list<std::string>::iterator it , double* scalingFactor);
void checkModifier(std::list<std::string>::iterator it, struct outputFormat *format);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment