Commit f296b1db authored by Alessio Netti's avatar Alessio Netti

Selective metadata updates in libDCDB

- Implemented a (overloaded) publishSensor method in SensorConfig that
uses a SensorMetadata object to publish metadata selectively - only
columns that are set are updated, query statement is built on the fly
- Changing type of operations to set of strings still pending
parent d5beecce
......@@ -165,8 +165,7 @@ bool AnalyticsController::publishSensors() {
for (const auto &s : u->getBaseOutputs()) {
if (s->getPublish()) {
if (s->getMetadata() && s->getMetadata()->isValid()) {
DCDB::PublicSensor ps = Configuration::metadataToPublicSensor(*s->getMetadata());
err = _dcdbCfg->publishSensor(ps);
err = _dcdbCfg->publishSensor(*s->getMetadata());
_metadataStore->store(*s->getMetadata()->getPattern(), *s->getMetadata());
} else {
err = _dcdbCfg->publishSensor(s->getName().c_str(), s->getMqtt().c_str());
......
......@@ -274,8 +274,7 @@ int mqttCallback(SimpleMQTTMessage *msg)
return 1;
}
if(sm.isValid()) {
DCDB::PublicSensor ps = Configuration::metadataToPublicSensor(sm);
err = mySensorConfig->publishSensor(ps);
err = mySensorConfig->publishSensor(sm);
metadataStore->store(*sm.getPattern(), sm);
}
} else {
......
......@@ -42,6 +42,7 @@
#include "connection.h"
#include "timestamp.h"
#include "sensorid.h"
#include "metadatastore.h"
#include "cassandra.h"
......@@ -132,6 +133,18 @@ public:
*/
SCError publishSensor(const PublicSensor& sensor);
/**
* @brief Makes a physical sensor public, and publish its metadata as well.
*
* This variant of the method takes as input a SensorMetadata object. All fields which are not
* set (null) will not be published.
*
* @param sensor SensorMetadata object containing metadata to be published.
* @return See SCError.
*/
SCError publishSensor(const SensorMetadata& sensor);
/**
* @brief Creates a new virtual sensor.
*
......
......@@ -61,11 +61,12 @@ public:
SCError loadCache();
SCError publishSensor(std::string publicName, std::string sensorPattern);
SCError publishSensor(const PublicSensor& sensor);
SCError publishSensor(const SensorMetadata& sensor);
SCError publishVirtualSensor(std::string publicName, std::string vSensorExpression, std::string vSensorId, TimeStamp tZero, uint64_t interval);
SCError unPublishSensor(std::string publicName);
SCError getPublicSensorNames(std::list<std::string>& publicSensors);
SCError getPublicSensorsVerbose(std::list<PublicSensor>& publicSensors);
SCError getPublicSensorByName(PublicSensor& sensor, const char* publicName);
SCError getPublicSensorByPattern(PublicSensor& sensor, const char* pattern);
SCError getPublicSensorsByWildcard(std::list<PublicSensor>& sensors, const char* wildcard);
......
......@@ -98,6 +98,11 @@ SCError SensorConfig::publishSensor(const PublicSensor& sensor)
return impl->publishSensor(sensor);
}
SCError SensorConfig::publishSensor(const SensorMetadata& sensor)
{
return impl->publishSensor(sensor);
}
SCError SensorConfig::publishVirtualSensor(const char* publicName, const char* vSensorExpression, const char* vSensorId, TimeStamp tZero, uint64_t interval)
{
return impl->publishVirtualSensor(publicName, vSensorExpression, vSensorId, tZero, interval);
......@@ -412,6 +417,119 @@ SCError SensorConfigImpl::publishSensor(const PublicSensor& sensor)
return SC_OK;
}
SCError SensorConfigImpl::publishSensor(const SensorMetadata& sensor)
{
/* Check if the pattern matches the requirements */
SensorId sid;
if (!sensor.getPattern() || !validateSensorPattern(sensor.getPattern()->c_str()) || !sid.mqttTopicConvert(*sensor.getPattern())) {
return SC_INVALIDPATTERN;
}
/* Check if the publicName is valid */
if (!sensor.getPublicName() || !validateSensorPublicName(sensor.getPublicName()->c_str())) {
return SC_INVALIDPUBLICNAME;
}
/* Check if the session is valid */
if (!session) {
return SC_INVALIDSESSION;
}
/* Insert the entry */
CassError rc = CASS_OK;
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
const CassPrepared* prepared = nullptr;
std::string queryBuf = "INSERT INTO " + std::string(CONFIG_KEYSPACE_NAME) + "." + std::string(CF_PUBLISHEDSENSORS) + " (name, pattern, virtual";
std::string valuesBuf = ") VALUES (?, ?, FALSE";
std::string closingBuf = ");";
if(sensor.getScale()) {
queryBuf += ", scaling_factor";
valuesBuf += ", ?";
}
if(sensor.getUnit()) {
queryBuf += ", unit";
valuesBuf += ", ?";
}
if(sensor.getIntegrable() || sensor.getMonotonic()) {
queryBuf += ", sensor_mask";
valuesBuf += ", ?";
}
if(sensor.getOperations()) {
queryBuf += ", operations";
valuesBuf += ", ?";
}
if(sensor.getInterval()) {
queryBuf += ", interval";
valuesBuf += ", ?";
}
if(sensor.getTTL()) {
queryBuf += ", ttl";
valuesBuf += ", ?";
}
std::string query = queryBuf + valuesBuf + closingBuf;
future = cass_session_prepare(session, query.c_str());
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
cass_future_free(future);
return SC_UNKNOWNERROR;
} else {
prepared = cass_future_get_prepared(future);
}
cass_future_free(future);
statement = cass_prepared_bind(prepared);
cass_statement_bind_string_by_name(statement, "name", sensor.getPublicName()->c_str());
cass_statement_bind_string_by_name(statement, "pattern", sid.getId().c_str());
if(sensor.getScale())
cass_statement_bind_double_by_name(statement, "scaling_factor", *sensor.getScale());
if(sensor.getUnit())
cass_statement_bind_string_by_name(statement, "unit", sensor.getUnit()->c_str());
if(sensor.getOperations())
cass_statement_bind_string_by_name(statement, "operations", sensor.getOperations()->c_str());
if(sensor.getInterval())
cass_statement_bind_int64_by_name(statement, "interval", *sensor.getInterval());
if(sensor.getTTL())
cass_statement_bind_int64_by_name(statement, "ttl", *sensor.getTTL());
if(sensor.getIntegrable() || sensor.getMonotonic()) {
uint64_t sensorMask = 0;
if(sensor.getIntegrable())
sensorMask = sensorMask | INTEGRABLE;
if(sensor.getMonotonic())
sensorMask = sensorMask | MONOTONIC;
cass_statement_bind_int64_by_name(statement, "sensor_mask", sensorMask);
}
future = cass_session_execute(session, statement);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
cass_prepared_free(prepared);
cass_future_free(future);
cass_statement_free(statement);
return SC_UNKNOWNERROR;
}
cass_prepared_free(prepared);
cass_future_free(future);
cass_statement_free(statement);
return SC_OK;
}
SCError SensorConfigImpl::publishVirtualSensor(std::string publicName, std::string vSensorExpression, std::string vSensorId, TimeStamp tZero, uint64_t interval)
{
/* Check if the publicName is valid */
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment