Commit 835657e6 authored by Alessio Netti's avatar Alessio Netti

Operations metadata field now uses set<string>

- The new implementation allows to publish multiple operations
concurrently from different sources
parent 3debc5c1
This diff is collapsed.
......@@ -38,6 +38,7 @@
#include <string>
#include <list>
#include <set>
#include "connection.h"
#include "timestamp.h"
......@@ -62,18 +63,18 @@ class SensorConfigImpl;
class PublicSensor
{
public:
std::string name; /**< The public sensor's (public) name. */
bool is_virtual; /**< Denotes whether the sensor is a virtual sensor. */
std::string pattern; /**< For non-virtual sensors, this holds a pattern describing the (internal) sensor IDs to which this public sensor matches. */
double scaling_factor; /**< Scaling factor for every sensor reading */
std::string unit; /**< Describes the unit of the sensor. See unitconv.h for known units. */
uint64_t sensor_mask; /**< Determines the properties of the sensor. Currently defined are: integrable, monotonic. */
std::string expression; /**< For virtual sensors, this field holds the expression through which the virtual sensor's value is calculated. */
std::string v_sensorid; /**< For virtual sensors, this field holds a SensorID used for storing cached values in the database. (FIXME: Cache to be implemented) */
uint64_t t_zero; /**< For virtual sensors, this field holds the first point in time at which the sensor carries a value. */
uint64_t interval; /**< This field holds the interval at which the sensor evaluates (in nanoseconds). */
std::string operations; /**< Defines the operations on the sensor, e.g. avg, std deviation, etc. */
uint64_t ttl; /**< Defines the time to live (in nanoseconds) for the readings of this sensor. */
std::string name; /**< The public sensor's (public) name. */
bool is_virtual; /**< Denotes whether the sensor is a virtual sensor. */
std::string pattern; /**< For non-virtual sensors, this holds a pattern describing the (internal) sensor IDs to which this public sensor matches. */
double scaling_factor; /**< Scaling factor for every sensor reading */
std::string unit; /**< Describes the unit of the sensor. See unitconv.h for known units. */
uint64_t sensor_mask; /**< Determines the properties of the sensor. Currently defined are: integrable, monotonic. */
std::string expression; /**< For virtual sensors, this field holds the expression through which the virtual sensor's value is calculated. */
std::string v_sensorid; /**< For virtual sensors, this field holds a SensorID used for storing cached values in the database. (FIXME: Cache to be implemented) */
uint64_t t_zero; /**< For virtual sensors, this field holds the first point in time at which the sensor carries a value. */
uint64_t interval; /**< This field holds the interval at which the sensor evaluates (in nanoseconds). */
std::set<std::string> operations; /**< Defines the operations on the sensor, e.g. avg, std deviation, etc. */
uint64_t ttl; /**< Defines the time to live (in nanoseconds) for the readings of this sensor. */
PublicSensor();
PublicSensor(const PublicSensor &copy);
......@@ -282,10 +283,10 @@ public:
* @brief Set an operation for the sensor.
*
* @param publicName Name of the sensor.
* @param operstion New operation for the sensor.
* @param operation Set of operations for the sensor.
* @return See SCError.
*/
SCError setOperations(std::string publicName, std::string operations);
SCError setOperations(std::string publicName, std::set<std::string> operations);
/**
* @brief Set a new sensor expression for a virtual sensor.
......
......@@ -80,7 +80,7 @@ public:
SCError setSensorScalingFactor(std::string publicName, double scalingFactor);
SCError setSensorUnit(std::string publicName, std::string unit);
SCError setSensorMask(std::string publicName, uint64_t mask);
SCError setOperations(std::string publicName, std::string operations);
SCError setOperations(std::string publicName, std::set<std::string> operations);
SCError setTimeToLive(std::string publicName, uint64_t ttl);
SCError setSensorInterval(std::string publicName, uint64_t interval);
......
......@@ -460,7 +460,7 @@ bool ConnectionImpl::initSchema() {
"sensor_mask bigint, " /* Bit mask that specifies sensor properties. Currently defined ones are:
Integrable: indicates whether the sensor is integrable over time;
Monotonic : indicates whether the collected sensor data is monotonic. */
"operations varchar, " /* Operations for the sensor (e.g., avg, stdev,...). */
"operations set<varchar>, " /* Operations for the sensor (e.g., avg, stdev,...). */
"expression varchar, " /* For virtual sensors: arithmetic expression to derive the virtual sensor's value */
"vsensorid varchar, " /* For virtual sensors: Unique sensorId for the sensor in the virtualsensors table */
"tzero bigint, " /* For virtual sensors: time of the first reading */
......@@ -468,7 +468,7 @@ bool ConnectionImpl::initSchema() {
"ttl bigint", /* Time to live in nanoseconds for readings of this sensor */
"name", /* Make the "name" column the primary key */
"COMPACT STORAGE AND CACHING = {'keys' : 'all'} "); /* Enable compact storage and maximum caching */
"CACHING = {'keys' : 'all'} "); /* Enable compact storage and maximum caching */
}
/* Keyspace and column family for raw and virtual sensor data */
......
......@@ -58,7 +58,6 @@ PublicSensor::PublicSensor()
v_sensorid = "";
t_zero = 0;
interval = 0;
operations = "";
ttl = 0;
}
......@@ -172,7 +171,7 @@ SCError SensorConfig::setSensorMask(std::string publicName, uint64_t mask)
return impl->setSensorMask(publicName, mask);
}
SCError SensorConfig::setOperations(std::string publicName, std::string operations)
SCError SensorConfig::setOperations(std::string publicName, std::set<std::string> operations)
{
return impl->setOperations(publicName,operations);
}
......@@ -369,7 +368,7 @@ SCError SensorConfigImpl::publishSensor(const PublicSensor& sensor)
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
const CassPrepared* prepared = nullptr;
const char* query = "INSERT INTO " CONFIG_KEYSPACE_NAME "." CF_PUBLISHEDSENSORS " (name, pattern, virtual, scaling_factor, unit, sensor_mask, operations, interval, ttl) VALUES (?,?, FALSE, ?, ?, ?, ?, ?, ?);";
const char* query = "INSERT INTO " CONFIG_KEYSPACE_NAME "." CF_PUBLISHEDSENSORS " (name, pattern, virtual, scaling_factor, unit, sensor_mask, interval, ttl) VALUES (?,?, FALSE, ?, ?, ?, ?, ?);";
future = cass_session_prepare(session, query);
cass_future_wait(future);
......@@ -392,10 +391,9 @@ SCError SensorConfigImpl::publishSensor(const PublicSensor& sensor)
cass_statement_bind_double_by_name(statement, "scaling_factor", sensor.scaling_factor);
cass_statement_bind_string_by_name(statement, "unit", sensor.unit.c_str());
cass_statement_bind_int64_by_name(statement, "sensor_mask", sensor.sensor_mask);
cass_statement_bind_string_by_name(statement, "operations", sensor.operations.c_str());
cass_statement_bind_int64_by_name(statement, "interval", sensor.interval);
cass_statement_bind_int64_by_name(statement, "ttl", sensor.ttl);
future = cass_session_execute(session, statement);
cass_future_wait(future);
......@@ -414,7 +412,11 @@ SCError SensorConfigImpl::publishSensor(const PublicSensor& sensor)
cass_future_free(future);
cass_statement_free(statement);
return SC_OK;
// Operations are inserted as an update statement, if required
if(sensor.operations.size() > 0)
return setOperations(sensor.name, sensor.operations);
else
return SC_OK;
}
SCError SensorConfigImpl::publishSensor(const SensorMetadata& sensor)
......@@ -457,10 +459,6 @@ SCError SensorConfigImpl::publishSensor(const SensorMetadata& sensor)
queryBuf += ", sensor_mask";
valuesBuf += ", ?";
}
if(sensor.getOperations()) {
queryBuf += ", operations";
valuesBuf += ", ?";
}
if(sensor.getInterval()) {
queryBuf += ", interval";
valuesBuf += ", ?";
......@@ -494,8 +492,6 @@ SCError SensorConfigImpl::publishSensor(const SensorMetadata& sensor)
cass_statement_bind_double_by_name(statement, "scaling_factor", *sensor.getScale());
if(sensor.getUnit())
cass_statement_bind_string_by_name(statement, "unit", sensor.getUnit()->c_str());
if(sensor.getOperations())
cass_statement_bind_string_by_name(statement, "operations", sensor.getOperations()->c_str());
if(sensor.getInterval())
cass_statement_bind_int64_by_name(statement, "interval", *sensor.getInterval());
if(sensor.getTTL())
......@@ -508,7 +504,7 @@ SCError SensorConfigImpl::publishSensor(const SensorMetadata& sensor)
sensorMask = sensorMask | MONOTONIC;
cass_statement_bind_int64_by_name(statement, "sensor_mask", sensorMask);
}
future = cass_session_execute(session, statement);
cass_future_wait(future);
......@@ -526,8 +522,12 @@ SCError SensorConfigImpl::publishSensor(const SensorMetadata& sensor)
cass_prepared_free(prepared);
cass_future_free(future);
cass_statement_free(statement);
return SC_OK;
// Operations are inserted as an update statement, if required
if(sensor.getOperations() && sensor.getOperations()->size()>0)
return setOperations(*sensor.getPublicName(), *sensor.getOperations());
else
return SC_OK;
}
SCError SensorConfigImpl::publishVirtualSensor(std::string publicName, std::string vSensorExpression, std::string vSensorId, TimeStamp tZero, uint64_t interval)
......@@ -769,8 +769,7 @@ SCError SensorConfigImpl::getPublicSensorsVerbose(std::list<PublicSensor>& publi
int64_t tzero;
int64_t interval;
int64_t ttl;
const char* operations;
size_t operations_len;
set<string> operations;
PublicSensor sensor;
const CassRow* row = cass_iterator_get_row(iterator);
......@@ -808,10 +807,24 @@ SCError SensorConfigImpl::getPublicSensorsVerbose(std::list<PublicSensor>& publi
if (cass_value_get_int64(cass_row_get_column_by_name(row, "ttl"), &ttl) != CASS_OK) {
ttl = 0;
}
if (cass_value_get_string(cass_row_get_column_by_name(row, "operations"), &operations, &operations_len) != CASS_OK) {
operations = ""; operations_len = 0;
}
const CassValue* opSet = nullptr;
CassIterator *opSetIt = nullptr;
if((opSet=cass_row_get_column_by_name(row, "operations")) && (opSetIt=cass_iterator_from_collection(opSet))) {
const char *opString;
size_t opLen;
while (cass_iterator_next(opSetIt)) {
if (cass_value_get_string(cass_iterator_get_value(opSetIt), &opString, &opLen) != CASS_OK) {
operations.clear();
break;
} else
operations.insert(std::string(opString, opLen));
}
cass_iterator_free(opSetIt);
}
sensor.name = std::string(name, name_len);
sensor.is_virtual = is_virtual == cass_true ? true : false;
sensor.pattern = std::string(pattern, pattern_len);
......@@ -904,8 +917,7 @@ SCError SensorConfigImpl::getPublicSensorByName(PublicSensor& sensor, const char
int64_t tzero;
int64_t interval;
int64_t ttl;
const char* operations;
size_t operations_len;
set<string> operations;
const CassRow* row = cass_iterator_get_row(iterator);
......@@ -942,8 +954,23 @@ SCError SensorConfigImpl::getPublicSensorByName(PublicSensor& sensor, const char
if (cass_value_get_int64(cass_row_get_column_by_name(row, "ttl"), &ttl) != CASS_OK) {
ttl = 0;
}
if (cass_value_get_string(cass_row_get_column_by_name(row, "operations"), &operations, &operations_len) != CASS_OK) {
operations = ""; operations_len = 0;
const CassValue* opSet = nullptr;
CassIterator *opSetIt = nullptr;
if((opSet=cass_row_get_column_by_name(row, "operations")) && (opSetIt=cass_iterator_from_collection(opSet))) {
CassIterator *opSetIt = cass_iterator_from_collection(opSet);
const char *opString;
size_t opLen;
while (cass_iterator_next(opSetIt)) {
if (cass_value_get_string(cass_iterator_get_value(opSetIt), &opString, &opLen) != CASS_OK) {
operations.clear();
break;
} else
operations.insert(std::string(opString, opLen));
}
cass_iterator_free(opSetIt);
}
sensor.name = std::string(name, name_len);
......@@ -1443,7 +1470,7 @@ SCError SensorConfigImpl::setSensorMask(std::string publicName, uint64_t mask)
return error;
}
SCError SensorConfigImpl::setOperations(std::string publicName, std::string operations)
SCError SensorConfigImpl::setOperations(std::string publicName, std::set<std::string> operations)
{
SCError error = SC_UNKNOWNERROR;
......@@ -1451,7 +1478,11 @@ SCError SensorConfigImpl::setOperations(std::string publicName, std::string oper
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
const CassPrepared* prepared = nullptr;
const char* query = "UPDATE " CONFIG_KEYSPACE_NAME "." CF_PUBLISHEDSENSORS " SET operations = ? WHERE name = ? ;";
const char* query = "UPDATE " CONFIG_KEYSPACE_NAME "." CF_PUBLISHEDSENSORS " SET operations = operations + ? WHERE name = ? ;";
CassCollection* cassSet = cass_collection_new(CASS_COLLECTION_TYPE_SET, operations.size());
for(const auto& op : operations)
cass_collection_append_string(cassSet, op.c_str());
future = cass_session_prepare(session, query);
cass_future_wait(future);
......@@ -1467,7 +1498,7 @@ SCError SensorConfigImpl::setOperations(std::string publicName, std::string oper
statement = cass_prepared_bind(prepared);
cass_statement_bind_string(statement, 0, operations.c_str());
cass_statement_bind_collection(statement, 0, cassSet);
cass_statement_bind_string(statement, 1, publicName.c_str());
future = cass_session_execute(session, statement);
......@@ -1484,6 +1515,7 @@ SCError SensorConfigImpl::setOperations(std::string publicName, std::string oper
cass_statement_free(statement);
cass_prepared_free(prepared);
cass_collection_free(cassSet);
return error;
}
......
......@@ -35,6 +35,7 @@
#include <dcdb/sensorconfig.h>
#include <dcdb/unitconv.h>
#include "cassandra.h"
#include "metadatastore.h"
#include "sensoraction.h"
......@@ -323,6 +324,9 @@ void SensorAction::doShow(const char* publicName)
DCDB::PublicSensor publicSensor;
DCDB::SCError err = sensorConfig.getPublicSensorByName(publicSensor, publicName);
SensorMetadata sm;
sm.setOperations(publicSensor.operations);
switch (err) {
case DCDB::SC_OK:
if (!publicSensor.is_virtual) {
......@@ -338,7 +342,7 @@ void SensorAction::doShow(const char* publicName)
}
std::cout << "Unit: " << publicSensor.unit << std::endl;
std::cout << "Scaling factor: " << publicSensor.scaling_factor << std::endl;
std::cout << "Operations: " << publicSensor.operations << std::endl;
std::cout << "Operations: " << sm.getOperationsString() << std::endl;
std::cout << "Interval: " << publicSensor.interval << std::endl;
std::cout << "TTL: " << publicSensor.ttl << std::endl;
std::cout << "Sensor Properties: ";
......@@ -559,9 +563,11 @@ void SensorAction::doTTL(const char* publicName, const char *ttl) {
void SensorAction::doOperations(const char* publicName, const char *operations)
{
SensorMetadata sm;
sm.setOperations(std::string(operations));
DCDB::SensorConfig sensorConfig(connection);
DCDB::SCError err = sensorConfig.setOperations(publicName, operations);
DCDB::SCError err = sensorConfig.setOperations(publicName, *sm.getOperations());
switch (err) {
case DCDB::SC_OK:
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment