Commit f3c8edba authored by Alessio Netti's avatar Alessio Netti
Browse files

Added Cassandra table for misc DCDB metadata

- Uses a simple string key->value schema
- Stores the timestamp of the latest edit to the publishedsensors table
- Implemented logic to update this timestamp accordingly
parent 45fb3f25
......@@ -85,6 +85,7 @@
using namespace std;
bool newAutoPub;
int keepRunning;
int retCode = EXIT_SUCCESS;
uint64_t msgCtr;
......@@ -335,6 +336,8 @@ int mqttCallback(SimpleMQTTMessage *msg)
default:
break;
}
newAutoPub = true;
} else if (strncmp(topic, DCDB_CALIEVT, DCDB_CALIEVT_LEN) == 0) {
/*
* Special message case. This message contains a Caliper Event data
......@@ -875,6 +878,7 @@ int main(int argc, char* const argv[]) {
/*
* Run (hopefully) forever...
*/
newAutoPub = false;
keepRunning = 1;
uint64_t start, end;
float elapsed;
......@@ -896,6 +900,10 @@ int main(int argc, char* const argv[]) {
if(purged > 0)
LOG(info) << "Cache: purged " << purged << " obsolete entries";
}
if(newAutoPub) {
newAutoPub = false;
mySensorConfig->setPublishedSensorsWritetime(getTimestamp());
}
sleep(sleepInterval);
......
......@@ -320,6 +320,22 @@ public:
* @return See SCError.
*/
SCError setVirtualSensorTZero(std::string publicName, TimeStamp tZero);
/**
* @brief Get the timestamp of the most recent update to the publishedsensors table.
*
* @param ts Unsigned 64-bit integer where to store the result.
* @return See SCError.
*/
SCError getPublishedSensorsWritetime(uint64_t &ts);
/**
* @brief Update the timestamp of the most recent update to the publishedsensors table.
*
* @param ts New timestamp to insert.
* @return See SCError.
*/
SCError setPublishedSensorsWritetime(const uint64_t &ts);
/**
* @brief Constructor for the SensorConfig class.
......
......@@ -48,8 +48,11 @@
#define CONFIG_KEYSPACE_NAME KEYSPACE_NAME "_config"
#define CF_PUBLISHEDSENSORS "publishedsensors"
#define CF_MONITORINGMETADATA "monitoringmetadata"
#define CF_VIRTUALSENSORS "virtualsensors"
#define CF_PROPERTY_PSWRITETIME "pswritetime"
#define CED_KEYSPACE_NAME KEYSPACE_NAME "_calievtdata"
#define CF_CALIEVTDATA "calievtdata"
......
......@@ -84,6 +84,9 @@ public:
SCError setVirtualSensorExpression(std::string publicName, std::string expression);
SCError setVirtualSensorTZero(std::string publicName, TimeStamp tZero);
SCError getPublishedSensorsWritetime(uint64_t &ts);
SCError setPublishedSensorsWritetime(const uint64_t &ts);
SensorConfigImpl(Connection* conn);
virtual ~SensorConfigImpl();
};
......
......@@ -511,6 +511,16 @@ bool ConnectionImpl::initSchema() {
"CACHING = {'keys' : 'all'} "); /* Enable compact storage and maximum caching */
}
/* Creating simple key-value table for misc metadata */
if (!existsColumnFamily(CF_MONITORINGMETADATA)) {
std::cout << "Creating Column Family " CF_MONITORINGMETADATA "...\n";
createColumnFamily(CF_MONITORINGMETADATA,
"name varchar, " /* Property name */
"value varchar", /* Property value */
"name", /* Make the "name" column the primary key */
"CACHING = {'keys' : 'all'} "); /* Enable compact storage and maximum caching */
}
/* Keyspace and column family for raw and virtual sensor data */
if (!existsKeyspace(KEYSPACE_NAME)) {
std::cout << "Creating Keyspace " << KEYSPACE_NAME << "...\n";
......
......@@ -238,6 +238,14 @@ SCError SensorConfig::setSensorInterval(std::string publicName, uint64_t interva
return impl->setSensorInterval(publicName, interval);
}
SCError SensorConfig::getPublishedSensorsWritetime(uint64_t &ts) {
return impl->getPublishedSensorsWritetime(ts);
}
SCError SensorConfig::setPublishedSensorsWritetime(const uint64_t &ts) {
return impl->setPublishedSensorsWritetime(ts);
}
SensorConfig::SensorConfig(Connection* conn)
{
/* Allocate impl object */
......@@ -320,6 +328,112 @@ SCError SensorConfigImpl::loadCache()
}
}
SCError SensorConfigImpl::getPublishedSensorsWritetime(uint64_t &ts)
{
ts = 0;
/* Check if the session is valid */
if (!session) {
return SC_INVALIDSESSION;
}
/* Fill the list with all public sensors */
CassError rc = CASS_OK;
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
const char* query = "SELECT value FROM " CONFIG_KEYSPACE_NAME "." CF_MONITORINGMETADATA " where name=\'" CF_PROPERTY_PSWRITETIME "\' ;";
statement = cass_statement_new(query, 0);
future = cass_session_execute(session, statement);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc == CASS_OK) {
const CassResult* result = cass_future_get_result(future);
CassIterator* iterator = cass_iterator_from_result(result);
if (cass_iterator_next(iterator)) {
const char* tsStr;
size_t tsStr_len;
const CassRow* row = cass_iterator_get_row(iterator);
if (cass_value_get_string(cass_row_get_column_by_name(row, "value"), &tsStr, &tsStr_len) == CASS_OK) {
try {
ts = TimeStamp(std::string(tsStr, tsStr_len)).getRaw();
} catch(const std::exception &e) {
ts = 0;
}
}
}
cass_result_free(result);
cass_iterator_free(iterator);
} else {
connection->printError(future);
cass_future_free(future);
cass_statement_free(statement);
return SC_UNKNOWNERROR;
}
cass_future_free(future);
cass_statement_free(statement);
return SC_OK;
}
SCError SensorConfigImpl::setPublishedSensorsWritetime(const uint64_t &ts)
{
/* Check if the session is valid */
if (!session) {
return SC_INVALIDSESSION;
}
/* Insert the entry */
CassError rc = CASS_OK;
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
const CassPrepared* prepared = nullptr;
const char* query = "INSERT INTO " CONFIG_KEYSPACE_NAME "." CF_MONITORINGMETADATA " (name, value) VALUES (?,?);";
future = cass_session_prepare(session, query);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
cass_future_free(future);
return SC_UNKNOWNERROR;
} else {
prepared = cass_future_get_prepared(future);
}
cass_future_free(future);
statement = cass_prepared_bind(prepared);
cass_statement_bind_string_by_name(statement, "name", CF_PROPERTY_PSWRITETIME);
cass_statement_bind_string_by_name(statement, "value", std::to_string(ts).c_str());
future = cass_session_execute(session, statement);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
cass_prepared_free(prepared);
cass_future_free(future);
cass_statement_free(statement);
return SC_UNKNOWNERROR;
}
cass_prepared_free(prepared);
cass_future_free(future);
cass_statement_free(statement);
return SC_OK;
}
SCError SensorConfigImpl::publishSensor(std::string publicName, std::string sensorPattern)
{
/* Check if the pattern matches the requirements */
......
......@@ -260,6 +260,7 @@ void SensorAction::doPublishSensor(const char* publicName, const char* sensorPat
std::cout << "Invalid dcdb session." << std::endl;
break;
default:
sensorConfig.setPublishedSensorsWritetime(getTimestamp());
break;
}
}
......@@ -305,6 +306,7 @@ void SensorAction::doVCreateSensor(const char* publicName, const char* expressio
std::cout << "Invalid dcdb session." << std::endl;
break;
default:
sensorConfig.setPublishedSensorsWritetime(getTimestamp());
break;
}
}
......@@ -409,6 +411,7 @@ void SensorAction::doScalingfactor(const char* publicName, const char* factor)
switch (err) {
case DCDB::SC_OK:
sensorConfig.setSensorScalingFactor(publicName, f);
sensorConfig.setPublishedSensorsWritetime(getTimestamp());
break;
case DCDB::SC_UNKNOWNSENSOR:
std::cout << "Unknown sensor name: " << publicName << std::endl;
......@@ -431,6 +434,7 @@ void SensorAction::doUnit(const char* publicName, const char* unit)
case DCDB::SC_OK:
if (DCDB::UnitConv::fromString(unit) != DCDB::Unit_None) {
sensorConfig.setSensorUnit(publicName, unit);
sensorConfig.setPublishedSensorsWritetime(getTimestamp());
} else {
std::cout << "Unknown unit: " << unit << std::endl;
}
......@@ -468,8 +472,10 @@ void SensorAction::doSensorProperty(const char* publicName, const char* cmd)
std::cout << "Please specify a sensor property: INTEGRABLE and/or MONOTONIC" << std::endl;
}
if(mask != publicSensor.sensor_mask)
if(mask != publicSensor.sensor_mask) {
sensorConfig.setSensorMask(publicName, mask);
sensorConfig.setPublishedSensorsWritetime(getTimestamp());
}
break;
}
......@@ -491,6 +497,7 @@ void SensorAction::doExpression(const char* publicName, const char* expression)
switch (err) {
case DCDB::SC_OK:
sensorConfig.setPublishedSensorsWritetime(getTimestamp());
break;
case DCDB::SC_UNKNOWNSENSOR:
std::cout << "Unknown sensor name: " << publicName << std::endl;
......@@ -519,6 +526,7 @@ void SensorAction::doTZero(const char* publicName, const char* tZero)
switch (err) {
case DCDB::SC_OK:
sensorConfig.setPublishedSensorsWritetime(getTimestamp());
break;
case DCDB::SC_UNKNOWNSENSOR:
std::cout << "Unknown sensor name: " << publicName << std::endl;
......@@ -551,6 +559,7 @@ void SensorAction::doInterval(const char* publicName, const char *interval)
switch (err) {
case DCDB::SC_OK:
sensorConfig.setPublishedSensorsWritetime(getTimestamp());
break;
case DCDB::SC_UNKNOWNSENSOR:
std::cout << "Unknown sensor name: " << publicName << std::endl;
......@@ -576,6 +585,7 @@ void SensorAction::doTTL(const char* publicName, const char *ttl) {
switch (err) {
case DCDB::SC_OK:
sensorConfig.setPublishedSensorsWritetime(getTimestamp());
break;
case DCDB::SC_UNKNOWNSENSOR:
std::cout << "Unknown sensor name: " << publicName << std::endl;
......@@ -598,6 +608,7 @@ void SensorAction::doOperations(const char* publicName, const char *operations)
switch (err) {
case DCDB::SC_OK:
sensorConfig.setPublishedSensorsWritetime(getTimestamp());
break;
case DCDB::SC_UNKNOWNSENSOR:
std::cout << "Unknown sensor name: " << publicName << std::endl;
......@@ -617,6 +628,7 @@ void SensorAction::doClearOperations(const char* publicName)
switch (err) {
case DCDB::SC_OK:
sensorConfig.setPublishedSensorsWritetime(getTimestamp());
break;
case DCDB::SC_UNKNOWNSENSOR:
std::cout << "Unknown sensor name: " << publicName << std::endl;
......@@ -636,6 +648,7 @@ void SensorAction::doClearOperationsByWildcard(const char* wildcard)
switch (err) {
case DCDB::SC_OK:
sensorConfig.setPublishedSensorsWritetime(getTimestamp());
break;
case DCDB::SC_INVALIDSESSION:
std::cout << "Invalid session!" << std::endl;
......@@ -652,10 +665,12 @@ void SensorAction::doUnPublishSensor(const char* publicName)
{
DCDB::SensorConfig sensorConfig(connection);
sensorConfig.unPublishSensor(publicName);
sensorConfig.setPublishedSensorsWritetime(getTimestamp());
}
void SensorAction::doUnPublishSensorsByWildcard(const char* wildcard)
{
DCDB::SensorConfig sensorConfig(connection);
sensorConfig.unPublishSensorsByWildcard(wildcard);
sensorConfig.setPublishedSensorsWritetime(getTimestamp());
}
......@@ -30,6 +30,7 @@
#include "cassandra.h"
#include "useraction.h"
#include "timestamp.h"
#ifndef SENSORACTION_H
#define SENSORACTION_H
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment