Commit 90db284c authored by Axel Auweter's avatar Axel Auweter
Browse files

Prepare DCDBLib and dcdbconfig for virtual sensor support.

parent 071df605
include ../config.mk
# C++ Compiler Flags (use fPIC for our dynamic library)
CXXFLAGS = -O2 -ggdb -Wall -Werror \
CXXFLAGS = -O0 -ggdb -Wall -Werror \
-fPIC --std=c++11 -I$(DCDBDEPLOYPATH)/include -I./include -I./include_internal\
-I$(DCDBBASEPATH)/include/ -fmessage-length=0 -DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG \
-Wno-unused-local-typedef -Wno-unknown-warning-option -Wno-unknown-warning \
......
......@@ -33,10 +33,14 @@ class PublicSensor
{
public:
std::string name;
bool is_virtual;
std::string pattern;
double scaling_factor;
std::string unit;
bool integrable;
std::string expression;
uint64_t t_zero;
uint64_t frequency;
PublicSensor();
PublicSensor(const PublicSensor &copy);
......@@ -47,6 +51,7 @@ typedef enum {
SC_INVALIDSESSION,
SC_INVALIDPATTERN,
SC_INVALIDPUBLICNAME,
SC_WRONGTYPE,
SC_UNKNOWNSENSOR,
SC_UNKNOWNERROR
} SCError;
......@@ -58,12 +63,15 @@ protected:
public:
SCError publishSensor(const char* publicName, const char* sensorPattern);
SCError publishVirtualSensor(const char* publicName, const char* vSensorExpression, TimeStamp tZero, uint64_t frequency);
SCError unPublishSensor(const char* publicName);
SCError getPublicSensorNames(std::list<std::string>& publicSensors);
SCError getPublicSensorsVerbose(std::list<PublicSensor>& publicSensors);
SCError getPublicSensorByName(PublicSensor& sensor, const char* publicName);
SCError isVirtual(bool& isVirtual, std::string publicName);
SCError getSensorPattern(std::string& pattern, std::string publicName);
SCError getSensorListForPattern(std::list<SensorId>& sensorIds, std::string pattern);
SCError getSensorListForPattern(std::list<SensorId>& sensorIds, std::string pattern, TimeStamp start, TimeStamp end);
......@@ -72,6 +80,10 @@ public:
SCError setSensorUnit(std::string publicName, std::string unit);
SCError setSensorIntegrable(std::string publicName, bool integrable);
SCError setVirtualSensorExpression(std::string publicName, std::string expression);
SCError setVirtualSensorTZero(std::string publicName, TimeStamp tZero);
SCError setVirtualSensorFrequency(std::string publicName, uint64_t frequency);
SensorConfig(Connection* conn);
virtual ~SensorConfig();
};
......
......@@ -32,12 +32,15 @@ protected:
public:
SCError publishSensor(std::string publicName, std::string sensorPattern);
SCError publishVirtualSensor(std::string publicName, std::string vSensorExpression, TimeStamp tZero, uint64_t frequency);
SCError unPublishSensor(std::string publicName);
SCError getPublicSensorNames(std::list<std::string>& publicSensors);
SCError getPublicSensorsVerbose(std::list<PublicSensor>& publicSensors);
SCError getPublicSensorByName(PublicSensor& sensor, const char* publicName);
SCError isVirtual(bool& isVirtual, std::string publicName);
SCError getSensorPattern(std::string& pattern, std::string publicName);
SCError getSensorListForPattern(std::list<SensorId>& sensorIds, std::string pattern);
SCError getSensorListForPattern(std::list<SensorId>& sensorIds, std::string pattern, TimeStamp start, TimeStamp end);
......@@ -46,6 +49,10 @@ public:
SCError setSensorUnit(std::string publicName, std::string unit);
SCError setSensorIntegrable(std::string publicName, bool integrable);
SCError setVirtualSensorExpression(std::string publicName, std::string expression);
SCError setVirtualSensorTZero(std::string publicName, TimeStamp tZero);
SCError setVirtualSensorFrequency(std::string publicName, uint64_t frequency);
SensorConfigImpl(Connection* conn);
virtual ~SensorConfigImpl();
};
......
......@@ -350,9 +350,18 @@ bool ConnectionImpl::initSchema() {
if (!existsColumnFamily(CF_PUBLISHEDSENSORS)) {
std::cout << "Creating Column Familiy " CF_PUBLISHEDSENSORS "...\n";
createColumnFamily(CF_PUBLISHEDSENSORS,
"name varchar, virtual bool, pattern varchar, scaling_factor double, unit varchar, integrable boolean, vdef varchar, tzero bigint, frequency bigint",
"name",
"COMPACT STORAGE AND CACHING = all");
"name varchar, " /* Public name */
"virtual boolean, " /* Whether it is a published physical sensor or a virtual sensor */
"pattern varchar, " /* In case of physical sensors: pattern for MQTT topics that this sensor matches against */
"scaling_factor double, " /* Unused */
"unit varchar, " /* Unit of the sensor (e.g. W for Watts) */
"integrable boolean, " /* Indicates whether the sensor is integrable over time */
"expression varchar, " /* For virtual sensors: arithmetic expression to derive the virtual sensor's value */
"tzero bigint, " /* For virtual sensors: time of the first reading */
"frequency bigint", /* For virtual sensors: frequency at which this virtual sensor provides readings */
"name", /* Make the "name" column the primary key */
"COMPACT STORAGE AND CACHING = all"); /* Enable compact storage and maximum caching */
}
/* Keyspace and column family for raw and virtual sensor data */
......
......@@ -23,17 +23,27 @@ using namespace DCDB;
PublicSensor::PublicSensor()
{
name = "";
is_virtual = false;
pattern = "";
scaling_factor = 1.0;
unit = "";
integrable = false;
expression = "";
t_zero = 0;
frequency = 0;
}
PublicSensor::PublicSensor (const PublicSensor &copy)
{
name = copy.name;
is_virtual = copy.is_virtual;
pattern = copy.pattern;
scaling_factor = copy.scaling_factor;
unit = copy.unit;
integrable = copy.integrable;
expression = copy.expression;
t_zero = copy.t_zero;
frequency = copy.frequency;
}
......@@ -45,6 +55,11 @@ SCError SensorConfig::publishSensor(const char* publicName, const char* sensorPa
return impl->publishSensor(publicName, sensorPattern);
}
SCError SensorConfig::publishVirtualSensor(const char* publicName, const char* vSensorExpression, TimeStamp tZero, uint64_t frequency)
{
return impl->publishVirtualSensor(publicName, vSensorExpression, tZero, frequency);
}
SCError SensorConfig::unPublishSensor(const char* publicName)
{
return impl->unPublishSensor(publicName);
......@@ -65,6 +80,11 @@ SCError SensorConfig::getPublicSensorByName(PublicSensor& sensor, const char* pu
return impl->getPublicSensorByName(sensor, publicName);
}
SCError SensorConfig::isVirtual(bool& isVirtual, std::string publicName)
{
return impl->isVirtual(isVirtual, publicName);
}
SCError SensorConfig::getSensorPattern(std::string& pattern, std::string publicName)
{
return impl->getSensorPattern(pattern, publicName);
......@@ -94,6 +114,21 @@ SCError SensorConfig::setSensorIntegrable(std::string publicName, bool integrabl
return impl->setSensorIntegrable(publicName, integrable);
}
SCError SensorConfig::setVirtualSensorExpression(std::string publicName, std::string expression)
{
return impl->setVirtualSensorExpression(publicName, expression);
}
SCError SensorConfig::setVirtualSensorTZero(std::string publicName, TimeStamp tZero)
{
return impl->setVirtualSensorTZero(publicName, tZero);
}
SCError SensorConfig::setVirtualSensorFrequency(std::string publicName, uint64_t frequency)
{
return impl->setVirtualSensorFrequency(publicName, frequency);
}
SensorConfig::SensorConfig(Connection* conn)
{
......@@ -220,7 +255,7 @@ SCError SensorConfigImpl::publishSensor(std::string publicName, std::string sens
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
const CassPrepared* prepared = nullptr;
const char* query = "INSERT INTO " CONFIG_KEYSPACE_NAME "." CF_PUBLISHEDSENSORS " (name, pattern) VALUES (?,?);";
const char* query = "INSERT INTO " CONFIG_KEYSPACE_NAME "." CF_PUBLISHEDSENSORS " (name, pattern, virtual) VALUES (?,?, FALSE);";
future = cass_session_prepare(session, query);
cass_future_wait(future);
......@@ -262,6 +297,70 @@ SCError SensorConfigImpl::publishSensor(std::string publicName, std::string sens
return SC_OK;
}
SCError SensorConfigImpl::publishVirtualSensor(std::string publicName, std::string vSensorExpression, TimeStamp tZero, uint64_t frequency)
{
/* Check if the publicName is valid */
if (!validateSensorPublicName(publicName.c_str())) {
return SC_INVALIDPUBLICNAME;
}
/* Check if the session is valid */
if (!session) {
return SC_INVALIDSESSION;
}
/* TODO: Validate vSesnroExpression */
/* Insert the entry */
CassError rc = CASS_OK;
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
const CassPrepared* prepared = nullptr;
const char* query = "INSERT INTO " CONFIG_KEYSPACE_NAME "." CF_PUBLISHEDSENSORS " (name, expression, tzero, frequency, virtual) VALUES (?,?,?,?,TRUE);";
future = cass_session_prepare(session, query);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
cass_future_free(future);
return SC_UNKNOWNERROR;
} else {
prepared = cass_future_get_prepared(future);
}
cass_future_free(future);
statement = cass_prepared_bind(prepared);
cass_statement_bind_string_by_name(statement, "name", publicName.c_str());
cass_statement_bind_string_by_name(statement, "expression", vSensorExpression.c_str());
cass_statement_bind_int64_by_name(statement, "tzero", tZero.getRaw());
cass_statement_bind_int64_by_name(statement, "frequency", frequency);
future = cass_session_execute(session, statement);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
cass_prepared_free(prepared);
cass_future_free(future);
cass_statement_free(statement);
return SC_UNKNOWNERROR;
}
cass_prepared_free(prepared);
cass_future_free(future);
cass_statement_free(statement);
return SC_OK;
}
SCError SensorConfigImpl::unPublishSensor(std::string publicName)
{
/* Check if the session is valid */
......@@ -401,12 +500,17 @@ SCError SensorConfigImpl::getPublicSensorsVerbose(std::list<PublicSensor>& publi
while (cass_iterator_next(iterator)) {
const char* name;
size_t name_len;
cass_bool_t is_virtual;
const char* pattern;
size_t pattern_len;
double scaling_factor;
const char* unit;
size_t unit_len;
cass_bool_t integrable;
const char* expression;
size_t expression_len;
int64_t tzero;
int64_t frequency;
PublicSensor sensor;
const CassRow* row = cass_iterator_get_row(iterator);
......@@ -414,6 +518,9 @@ SCError SensorConfigImpl::getPublicSensorsVerbose(std::list<PublicSensor>& publi
if (cass_value_get_string(cass_row_get_column_by_name(row, "name"), &name, &name_len) != CASS_OK) {
name = ""; name_len = 0;
}
if (cass_value_get_bool(cass_row_get_column_by_name(row, "virtual"), &is_virtual) != CASS_OK) {
is_virtual = cass_false;
}
if (cass_value_get_string(cass_row_get_column_by_name(row, "pattern"), &pattern, &pattern_len) != CASS_OK) {
pattern = ""; pattern_len = 0;
}
......@@ -426,12 +533,25 @@ SCError SensorConfigImpl::getPublicSensorsVerbose(std::list<PublicSensor>& publi
if (cass_value_get_bool(cass_row_get_column_by_name(row, "integrable"), &integrable) != CASS_OK) {
integrable = cass_false;
}
if (cass_value_get_string(cass_row_get_column_by_name(row, "expression"), &expression, &expression_len) != CASS_OK) {
expression = ""; expression_len = 0;
}
if (cass_value_get_int64(cass_row_get_column_by_name(row, "tzero"), &tzero) != CASS_OK) {
tzero = 0;
}
if (cass_value_get_int64(cass_row_get_column_by_name(row, "frequency"), &frequency) != CASS_OK) {
frequency = 0;
}
sensor.name = std::string(name, name_len);
sensor.is_virtual = is_virtual == cass_true ? true : false;
sensor.pattern = std::string(pattern, pattern_len);
sensor.scaling_factor = scaling_factor;
sensor.unit = std::string(unit, unit_len);
sensor.integrable = integrable == cass_true ? true: false;
sensor.expression = std::string(expression, expression_len);
sensor.t_zero = tzero;
sensor.frequency = frequency;
publicSensors.push_back(sensor);
}
......@@ -490,18 +610,26 @@ SCError SensorConfigImpl::getPublicSensorByName(PublicSensor& sensor, const char
if (cass_iterator_next(iterator)) {
const char* name;
size_t name_len;
cass_bool_t is_virtual;
const char* pattern;
size_t pattern_len;
double scaling_factor;
const char* unit;
size_t unit_len;
cass_bool_t integrable;
const char* expression;
size_t expression_len;
int64_t tzero;
int64_t frequency;
const CassRow* row = cass_iterator_get_row(iterator);
if (cass_value_get_string(cass_row_get_column_by_name(row, "name"), &name, &name_len) != CASS_OK) {
name = ""; name_len = 0;
}
if (cass_value_get_bool(cass_row_get_column_by_name(row, "virtual"), &is_virtual) != CASS_OK) {
is_virtual = cass_false;
}
if (cass_value_get_string(cass_row_get_column_by_name(row, "pattern"), &pattern, &pattern_len) != CASS_OK) {
pattern = ""; pattern_len = 0;
}
......@@ -514,12 +642,26 @@ SCError SensorConfigImpl::getPublicSensorByName(PublicSensor& sensor, const char
if (cass_value_get_bool(cass_row_get_column_by_name(row, "integrable"), &integrable) != CASS_OK) {
integrable = cass_false;
}
if (cass_value_get_string(cass_row_get_column_by_name(row, "expression"), &expression, &expression_len) != CASS_OK) {
expression = ""; expression_len = 0;
}
if (cass_value_get_int64(cass_row_get_column_by_name(row, "tzero"), &tzero) != CASS_OK) {
tzero = 0;
}
if (cass_value_get_int64(cass_row_get_column_by_name(row, "frequency"), &frequency) != CASS_OK) {
frequency = 0;
}
sensor.name = std::string(name, name_len);
sensor.is_virtual = is_virtual == cass_true ? true : false;
sensor.pattern = std::string(pattern, pattern_len);
sensor.scaling_factor = scaling_factor;
sensor.unit = std::string(unit, unit_len);
sensor.integrable = integrable == cass_true ? true: false;
sensor.expression = std::string(expression, expression_len);
sensor.t_zero = tzero;
sensor.frequency = frequency;
}
else {
cass_result_free(result);
......@@ -538,6 +680,66 @@ SCError SensorConfigImpl::getPublicSensorByName(PublicSensor& sensor, const char
return SC_OK;
}
SCError SensorConfigImpl::isVirtual(bool& isVirtual, std::string publicName)
{
/* Check if the session is valid */
if (!session) {
return SC_INVALIDSESSION;
}
/* Read the virtual field from the database */
CassError rc = CASS_OK;
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
const CassPrepared* prepared = nullptr;
const char* query = "SELECT virtual FROM " CONFIG_KEYSPACE_NAME "." CF_PUBLISHEDSENSORS " WHERE name = ? ;";
future = cass_session_prepare(session, query);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
cass_future_free(future);
return SC_UNKNOWNERROR;
} else {
prepared = cass_future_get_prepared(future);
}
cass_future_free(future);
statement = cass_prepared_bind(prepared);
cass_statement_bind_string_by_name(statement, "name", publicName.c_str());
future = cass_session_execute(session, statement);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
} else {
const CassResult* result = cass_future_get_result(future);
CassIterator* iterator = cass_iterator_from_result(result);
if (cass_iterator_next(iterator)) {
cass_bool_t isVirtual_;
const CassRow* row = cass_iterator_get_row(iterator);
cass_value_get_bool(cass_row_get_column_by_name(row, "virtual"), &isVirtual_);
isVirtual = isVirtual_ ? true : false;
}
else {
return SC_UNKNOWNSENSOR;
}
cass_result_free(result);
cass_iterator_free(iterator);
}
cass_future_free(future);
cass_statement_free(statement);
cass_prepared_free(prepared);
return SC_OK;
}
SCError SensorConfigImpl::getSensorPattern(std::string& pattern, std::string publicName)
{
/* Check if the session is valid */
......@@ -545,6 +747,16 @@ SCError SensorConfigImpl::getSensorPattern(std::string& pattern, std::string pub
return SC_INVALIDSESSION;
}
/* Ensure that the public sensor is not virtual */
bool virt;
SCError err = isVirtual(virt, publicName);
if (err != SC_OK) {
return err;
}
if (virt) {
return SC_WRONGTYPE;
}
/* Read the Pattern string from the database */
CassError rc = CASS_OK;
CassStatement* statement = nullptr;
......@@ -843,6 +1055,191 @@ SCError SensorConfigImpl::setSensorIntegrable(std::string publicName, bool integ
return error;
}
SCError SensorConfigImpl::setVirtualSensorExpression(std::string publicName, std::string expression)
{
/* Check if the session is valid */
if (!session) {
return SC_INVALIDSESSION;
}
/* Ensure that the public sensor is virtual */
bool virt;
SCError err = isVirtual(virt, publicName);
if (err != SC_OK) {
return err;
}
if (!virt) {
return SC_WRONGTYPE;
}
/* TODO: Validate expression! */
/* Update the database with the new expression */
SCError error = SC_UNKNOWNERROR;
CassError rc = CASS_OK;
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
const CassPrepared* prepared = nullptr;
const char* query = "UPDATE " CONFIG_KEYSPACE_NAME "." CF_PUBLISHEDSENSORS " SET expression = ? WHERE name = ? ;";
future = cass_session_prepare(session, query);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
return SC_UNKNOWNERROR;
}
prepared = cass_future_get_prepared(future);
cass_future_free(future);
statement = cass_prepared_bind(prepared);
cass_statement_bind_string(statement, 0, expression.c_str());
cass_statement_bind_string(statement, 1, publicName.c_str());
future = cass_session_execute(session, statement);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
error = SC_UNKNOWNERROR;
}
else {
error = SC_OK;
}
cass_statement_free(statement);
cass_prepared_free(prepared);
return error;
}
SCError SensorConfigImpl::setVirtualSensorTZero(std::string publicName, TimeStamp tZero)
{
/* Check if the session is valid */
if (!session) {
return SC_INVALIDSESSION;
}
/* Ensure that the public sensor is virtual */
bool virt;
SCError err = isVirtual(virt, publicName);
if (err != SC_OK) {
return err;
}
if (!virt) {
return SC_WRONGTYPE;
}
/* Update the database with the new expression */
SCError error = SC_UNKNOWNERROR;
CassError rc = CASS_OK;
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
const CassPrepared* prepared = nullptr;
const char* query = "UPDATE " CONFIG_KEYSPACE_NAME "." CF_PUBLISHEDSENSORS " SET tzero = ? WHERE name = ? ;";
future = cass_session_prepare(session, query);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
return SC_UNKNOWNERROR;
}
prepared = cass_future_get_prepared(future);
cass_future_free(future);
statement = cass_prepared_bind(prepared);
cass_statement_bind_int64(statement, 0, tZero.getRaw());
cass_statement_bind_string(statement, 1, publicName.c_str());
future = cass_session_execute(session, statement);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
error = SC_UNKNOWNERROR;
}
else {
error = SC_OK;
}
cass_statement_free(statement);
cass_prepared_free(prepared);
return error;
}
SCError SensorConfigImpl::setVirtualSensorFrequency(std::string publicName, uint64_t frequency)
{
/* Check if the session is valid */
if (!session) {
return SC_INVALIDSESSION;
}
/* Ensure that the public sensor is virtual */
bool virt;
SCError err = isVirtual(virt, publicName);
if (err != SC_OK) {
return err;
}
if (!virt) {
return SC_WRONGTYPE;
}
/* Update the database with the new expression */
SCError error = SC_UNKNOWNERROR;
CassError rc = CASS_OK;
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
const CassPrepared* prepared = nullptr;
const char* query = "UPDATE " CONFIG_KEYSPACE_NAME "." CF_PUBLISHEDSENSORS " SET frequency = ? WHERE name = ? ;";
future = cass_session_prepare(session, query);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
return SC_UNKNOWNERROR;
}
prepared = cass_future_get_prepared(future);
cass_future_free(future);