Commit 68986e44 authored by Axel Auweter's avatar Axel Auweter
Browse files

Add support for unit, scaling factor, and integrable flag in dcdbtools and dcdblib for ticket #39

Still pending: honor scaling_factor during query().
parent e3fdca61
......@@ -34,6 +34,7 @@ public:
std::string pattern;
double scaling_factor;
std::string unit;
bool integrable;
DCDBPublicSensor();
DCDBPublicSensor(const DCDBPublicSensor &copy);
......@@ -54,12 +55,19 @@ protected:
public:
SCError publishSensor(const char* publicName, const char* sensorPattern);
SCError getPublicSensors(std::list<DCDBPublicSensor>& publicSensors);
SCError getPublicSensorNames(std::list<std::string>& publicSensors);
SCError getPublicSensorsVerbose(std::list<DCDBPublicSensor>& publicSensors);
SCError getPublicSensorByName(DCDBPublicSensor& sensor, const char* publicName);
SCError getSensorPattern(std::string& pattern, std::string publicName);
SCError getSensorListForPattern(std::list<SensorId>& sensorIds, std::string pattern);
SCError getSensorListForPattern(std::list<SensorId>& sensorIds, std::string pattern, DCDBTimeStamp start, DCDBTimeStamp end);
SCError setSensorScalingFactor(std::string publicName, double scalingFactor);
SCError setSensorUnit(std::string publicName, std::string unit);
SCError setSensorIntegrable(std::string publicName, bool integrable);
SensorConfig(DCDBConnection* conn);
virtual ~SensorConfig();
};
......
......@@ -29,12 +29,19 @@ protected:
public:
SCError publishSensor(std::string publicName, std::string sensorPattern);
SCError getPublicSensors(std::list<DCDBPublicSensor>& publicSensors);
SCError getPublicSensorNames(std::list<std::string>& publicSensors);
SCError getPublicSensorsVerbose(std::list<DCDBPublicSensor>& publicSensors);
SCError getPublicSensorByName(DCDBPublicSensor& sensor, const char* publicName);
SCError getSensorPattern(std::string& pattern, std::string publicName);
SCError getSensorListForPattern(std::list<SensorId>& sensorIds, std::string pattern);
SCError getSensorListForPattern(std::list<SensorId>& sensorIds, std::string pattern, DCDBTimeStamp start, DCDBTimeStamp end);
SCError setSensorScalingFactor(std::string publicName, double scalingFactor);
SCError setSensorUnit(std::string publicName, std::string unit);
SCError setSensorIntegrable(std::string publicName, bool integrable);
SensorConfigImpl(DCDBConnection* conn);
virtual ~SensorConfigImpl();
};
......
......@@ -348,7 +348,7 @@ bool DCDBConnectionImpl::initSchema() {
if (!existsColumnFamily(CF_PUBLISHEDSENSORS)) {
std::cout << "Creating Column Familiy " CF_PUBLISHEDSENSORS "...\n";
createColumnFamily(CF_PUBLISHEDSENSORS,
"name varchar, pattern varchar, scaling_factor double, unit varchar",
"name varchar, pattern varchar, scaling_factor double, unit varchar, integrable boolean",
"name",
"COMPACT STORAGE AND CACHING = all");
}
......
......@@ -43,9 +43,19 @@ SCError SensorConfig::publishSensor(const char* publicName, const char* sensorPa
return impl->publishSensor(publicName, sensorPattern);
}
SCError SensorConfig::getPublicSensors(std::list<DCDBPublicSensor>& publicSensors)
SCError SensorConfig::getPublicSensorNames(std::list<std::string>& publicSensors)
{
return impl->getPublicSensors(publicSensors);
return impl->getPublicSensorNames(publicSensors);
}
SCError SensorConfig::getPublicSensorsVerbose(std::list<DCDBPublicSensor>& publicSensors)
{
return impl->getPublicSensorsVerbose(publicSensors);
}
SCError SensorConfig::getPublicSensorByName(DCDBPublicSensor& sensor, const char* publicName)
{
return impl->getPublicSensorByName(sensor, publicName);
}
SCError SensorConfig::getSensorPattern(std::string& pattern, std::string publicName)
......@@ -63,6 +73,20 @@ SCError SensorConfig::getSensorListForPattern(std::list<SensorId>& sensorIds, st
return impl->getSensorListForPattern(sensorIds, pattern, start, end);
}
SCError SensorConfig::setSensorScalingFactor(std::string publicName, double scalingFactor) {
return impl->setSensorScalingFactor(publicName, scalingFactor);
}
SCError SensorConfig::setSensorUnit(std::string publicName, std::string unit)
{
return impl->setSensorUnit(publicName, unit);
}
SCError SensorConfig::setSensorIntegrable(std::string publicName, bool integrable)
{
return impl->setSensorIntegrable(publicName, integrable);
}
SensorConfig::SensorConfig(DCDBConnection* conn)
{
......@@ -218,7 +242,59 @@ SCError SensorConfigImpl::publishSensor(std::string publicName, std::string sens
return SC_OK;
}
SCError SensorConfigImpl::getPublicSensors(std::list<DCDBPublicSensor>& publicSensors)
SCError SensorConfigImpl::getPublicSensorNames(std::list<std::string>& publicSensors)
{
/* Check if the session is valid */
if (!session) {
return SC_INVALIDSESSION;
}
/* Clear the list */
publicSensors.clear();
/* Fill the list with all public sensors */
CassError rc = CASS_OK;
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
const char* query = "SELECT name FROM " CONFIG_KEYSPACE_NAME "." CF_PUBLISHEDSENSORS " ;";
statement = cass_statement_new(query, 0);
future = cass_session_execute(session, statement);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
cass_future_free(future);
cass_statement_free(statement);
return SC_UNKNOWNERROR;
} else {
const CassResult* result = cass_future_get_result(future);
CassIterator* iterator = cass_iterator_from_result(result);
while (cass_iterator_next(iterator)) {
const char* name;
size_t name_len;
const CassRow* row = cass_iterator_get_row(iterator);
if (cass_value_get_string(cass_row_get_column_by_name(row, "name"), &name, &name_len) != CASS_OK) {
name = ""; name_len = 0;
}
publicSensors.push_back(std::string(name, name_len));
}
cass_result_free(result);
cass_iterator_free(iterator);
}
cass_future_free(future);
cass_statement_free(statement);
return SC_OK;
}
SCError SensorConfigImpl::getPublicSensorsVerbose(std::list<DCDBPublicSensor>& publicSensors)
{
/* Check if the session is valid */
if (!session) {
......@@ -257,6 +333,7 @@ SCError SensorConfigImpl::getPublicSensors(std::list<DCDBPublicSensor>& publicSe
double scaling_factor;
const char* unit;
size_t unit_len;
cass_bool_t integrable;
DCDBPublicSensor sensor;
const CassRow* row = cass_iterator_get_row(iterator);
......@@ -273,11 +350,15 @@ SCError SensorConfigImpl::getPublicSensors(std::list<DCDBPublicSensor>& publicSe
if (cass_value_get_string(cass_row_get_column_by_name(row, "unit"), &unit, &unit_len) != CASS_OK) {
unit = ""; unit_len = 0;
}
if (cass_value_get_bool(cass_row_get_column_by_name(row, "integrable"), &integrable) != CASS_OK) {
integrable = cass_false;
}
sensor.name = std::string(name, name_len);
sensor.pattern = std::string(pattern, pattern_len);
sensor.scaling_factor = scaling_factor;
sensor.unit = std::string(unit, unit_len);
sensor.integrable = integrable == cass_true ? true: false;
publicSensors.push_back(sensor);
}
......@@ -290,6 +371,100 @@ SCError SensorConfigImpl::getPublicSensors(std::list<DCDBPublicSensor>& publicSe
return SC_OK;
}
SCError SensorConfigImpl::getPublicSensorByName(DCDBPublicSensor& sensor, const char* publicName)
{
/* Check if the session is valid */
if (!session) {
return SC_INVALIDSESSION;
}
/* Fill the list with all public sensors */
CassError rc = CASS_OK;
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
const CassPrepared* prepared = nullptr;
const char* query = "SELECT * FROM " CONFIG_KEYSPACE_NAME "." CF_PUBLISHEDSENSORS " WHERE name = ?;";
future = cass_session_prepare(session, query);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
cass_future_free(future);
return SC_UNKNOWNERROR;
} else {
prepared = cass_future_get_prepared(future);
}
cass_future_free(future);
statement = cass_prepared_bind(prepared);
cass_statement_bind_string_by_name(statement, "name", publicName);
future = cass_session_execute(session, statement);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
cass_future_free(future);
cass_statement_free(statement);
return SC_UNKNOWNERROR;
} else {
const CassResult* result = cass_future_get_result(future);
CassIterator* iterator = cass_iterator_from_result(result);
if (cass_iterator_next(iterator)) {
const char* name;
size_t name_len;
const char* pattern;
size_t pattern_len;
double scaling_factor;
const char* unit;
size_t unit_len;
cass_bool_t integrable;
const CassRow* row = cass_iterator_get_row(iterator);
if (cass_value_get_string(cass_row_get_column_by_name(row, "name"), &name, &name_len) != CASS_OK) {
name = ""; name_len = 0;
}
if (cass_value_get_string(cass_row_get_column_by_name(row, "pattern"), &pattern, &pattern_len) != CASS_OK) {
pattern = ""; pattern_len = 0;
}
if (cass_value_get_double(cass_row_get_column_by_name(row, "scaling_factor"), &scaling_factor) != CASS_OK) {
scaling_factor = 1.0;
}
if (cass_value_get_string(cass_row_get_column_by_name(row, "unit"), &unit, &unit_len) != CASS_OK) {
unit = ""; unit_len = 0;
}
if (cass_value_get_bool(cass_row_get_column_by_name(row, "integrable"), &integrable) != CASS_OK) {
integrable = cass_false;
}
sensor.name = std::string(name, name_len);
sensor.pattern = std::string(pattern, pattern_len);
sensor.scaling_factor = scaling_factor;
sensor.unit = std::string(unit, unit_len);
sensor.integrable = integrable == cass_true ? true: false;
}
else {
cass_result_free(result);
cass_iterator_free(iterator);
cass_future_free(future);
cass_statement_free(statement);
return SC_UNKNOWNSENSOR;
}
cass_result_free(result);
cass_iterator_free(iterator);
}
cass_future_free(future);
cass_statement_free(statement);
return SC_OK;
}
SCError SensorConfigImpl::getSensorPattern(std::string& pattern, std::string publicName)
{
/* Check if the session is valid */
......@@ -454,6 +629,143 @@ SCError SensorConfigImpl::getSensorListForPattern(std::list<SensorId>& sensorIds
return SC_OK;
}
SCError SensorConfigImpl::setSensorScalingFactor(std::string publicName, double scalingFactor)
{
SCError error = SC_UNKNOWNERROR;
/* Check the sensorconfig whether the given public sensor has the integrable flag set to true */
CassError rc = CASS_OK;
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
const CassPrepared* prepared = nullptr;
const char* query = "UPDATE " CONFIG_KEYSPACE_NAME "." CF_PUBLISHEDSENSORS " SET scaling_factor = ? WHERE name = ? ;";
future = cass_session_prepare(session, query);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
return SC_UNKNOWNERROR;
}
prepared = cass_future_get_prepared(future);
cass_future_free(future);
statement = cass_prepared_bind(prepared);
cass_statement_bind_double(statement, 0, scalingFactor);
cass_statement_bind_string(statement, 1, publicName.c_str());
future = cass_session_execute(session, statement);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
error = SC_UNKNOWNERROR;
}
else {
error = SC_OK;
}
cass_statement_free(statement);
cass_prepared_free(prepared);
return error;
}
SCError SensorConfigImpl::setSensorUnit(std::string publicName, std::string unit)
{
SCError error = SC_UNKNOWNERROR;
/* Check the sensorconfig whether the given public sensor has the integrable flag set to true */
CassError rc = CASS_OK;
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
const CassPrepared* prepared = nullptr;
const char* query = "UPDATE " CONFIG_KEYSPACE_NAME "." CF_PUBLISHEDSENSORS " SET unit = ? WHERE name = ? ;";
future = cass_session_prepare(session, query);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
return SC_UNKNOWNERROR;
}
prepared = cass_future_get_prepared(future);
cass_future_free(future);
statement = cass_prepared_bind(prepared);
cass_statement_bind_string(statement, 0, unit.c_str());
cass_statement_bind_string(statement, 1, publicName.c_str());
future = cass_session_execute(session, statement);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
error = SC_UNKNOWNERROR;
}
else {
error = SC_OK;
}
cass_statement_free(statement);
cass_prepared_free(prepared);
return error;
}
SCError SensorConfigImpl::setSensorIntegrable(std::string publicName, bool integrable)
{
SCError error = SC_UNKNOWNERROR;
/* Check the sensorconfig whether the given public sensor has the integrable flag set to true */
CassError rc = CASS_OK;
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
const CassPrepared* prepared = nullptr;
const char* query = "UPDATE " CONFIG_KEYSPACE_NAME "." CF_PUBLISHEDSENSORS " SET integrable = ? WHERE name = ? ;";
future = cass_session_prepare(session, query);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
return SC_UNKNOWNERROR;
}
prepared = cass_future_get_prepared(future);
cass_future_free(future);
statement = cass_prepared_bind(prepared);
cass_statement_bind_bool(statement, 0, integrable == true ? cass_true : cass_false);
cass_statement_bind_string(statement, 1, publicName.c_str());
future = cass_session_execute(session, statement);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
error = SC_UNKNOWNERROR;
}
else {
error = SC_OK;
}
cass_statement_free(statement);
cass_prepared_free(prepared);
return error;
}
SensorConfigImpl::SensorConfigImpl(DCDBConnection* conn)
{
......
......@@ -24,10 +24,15 @@ void SensorAction::printHelp(int argc, char* argv[])
/* 01234567890123456789012345678901234567890123456789012345678901234567890123456789 */
std::cout << "SENSOR command help" << std::endl;
std::cout << "The SENSOR command has the following options:" << std::endl;
std::cout << " PUBLISH <public name> <pattern> - Make a sensor publicly available under the " << std::endl;
std::cout << " <public name> comprising of all internal" << std::endl;
std::cout << " sensors matching the <pattern>." << std::endl;
std::cout << " LISTPUBLIC - List all public sensors and their patterns." << std::endl;
std::cout << " PUBLISH <public name> <pattern> - Make a sensor publicly available under " << std::endl;
std::cout << " <public name> comprising of all internal" << std::endl;
std::cout << " sensors matching the <pattern>." << std::endl;
std::cout << " LIST - List all public sensors." << std::endl;
std::cout << " LISTPUBLIC - List all public sensors and patterns." << std::endl;
std::cout << " SHOW <public name> - Show details for a given sensor." << std::endl;
std::cout << " SCALINGFACTOR <public name> <fac> - Set scaling factor to <fac>." << std::endl;
std::cout << " UNIT <public name> <unit> - Set unit to <unit>." << std::endl;
std::cout << " INTEGRABLE <public name> [ON|OFF] - Mark sensor integrable." << std::endl;
}
/*
......@@ -57,12 +62,45 @@ int SensorAction::executeCommand(int argc, char* argv[], int argvidx, const char
std::cout << "PUBLISH needs two more parameters!" << std::endl;
goto executeCommandError;
}
doPublishSensor(argv[argvidx+1], argv[argvidx+2]);
}
else if (strcasecmp(argv[argvidx], "LIST") == 0) {
doList();
}
else if (strcasecmp(argv[argvidx], "LISTPUBLIC") == 0) {
doListPublicSensors();
}
else if (strcasecmp(argv[argvidx], "SHOW") == 0) {
if (argvidx+1 >= argc) {
std::cout << "SHOW needs a sensor name as parameter!" << std::endl;
goto executeCommandError;
}
doShow(argv[argvidx+1]);
}
else if (strcasecmp(argv[argvidx], "SCALINGFACTOR") == 0) {
/* SCALINGFACTOR needs two more parameters */
if (argvidx+2 >= argc) {
std::cout << "SCALINGFACTOR needs two more parameters!" << std::endl;
goto executeCommandError;
}
doScalingfactor(argv[argvidx+1], argv[argvidx+2]);
}
else if (strcasecmp(argv[argvidx], "UNIT") == 0) {
/* UNIT needs two more parameters */
if (argvidx+2 >= argc) {
std::cout << "UNIT needs two more parameters!" << std::endl;
goto executeCommandError;
}
doUnit(argv[argvidx+1], argv[argvidx+2]);
}
else if (strcasecmp(argv[argvidx], "INTEGRABLE") == 0) {
/* INTEGRABLE needs two more parameters */
if (argvidx+2 >= argc) {
std::cout << "INTEGRABLE needs two more parameters!" << std::endl;
goto executeCommandError;
}
doIntegrable(argv[argvidx+1], argv[argvidx+2]);
}
else {
std::cout << "Invalid SENSOR command: " << argv[argvidx] << std::endl;
goto executeCommandError;
......@@ -90,15 +128,146 @@ void SensorAction::doPublishSensor(const char* publicName, const char* sensorPat
}
/*
* List all published sensor entries from the database
* List all published sensors by name
*/
void SensorAction::doList()
{
SensorConfig sensorConfig(connection);
std::list<std::string> publicSensors;
sensorConfig.getPublicSensorNames(publicSensors);
for (std::list<std::string>::iterator it=publicSensors.begin(); it != publicSensors.end(); it++) {
std::cout << (*it) << std::endl;
}
}
/*
* List all published sensors with name and pattern
*/
void SensorAction::doListPublicSensors()
{
SensorConfig sensorConfig(connection);
std::list<DCDBPublicSensor> publicSensors;
sensorConfig.getPublicSensors(publicSensors);
sensorConfig.getPublicSensorsVerbose(publicSensors);
for (std::list<DCDBPublicSensor>::iterator it=publicSensors.begin(); it != publicSensors.end(); it++) {
std::cout << (*it).name << " : " << (*it).pattern << std::endl;
}
}
/*
* Show the details for a given sensor
*/
void SensorAction::doShow(const char* publicName)
{
SensorConfig sensorConfig(connection);
DCDBPublicSensor publicSensor;
SCError err = sensorConfig.getPublicSensorByName(publicSensor, publicName);
switch (err) {
case SC_OK:
std::cout << "Details for sensor " << publicSensor.name << ":" << std::endl;
std::cout << "Pattern: " << publicSensor.pattern << std::endl;
std::cout << "Unit: " << publicSensor.unit << std::endl;
std::cout << "Scaling factor: " << publicSensor.scaling_factor << std::endl;
std::cout << "Integrable: " << (publicSensor.integrable ? "true" : "false") << std::endl;
break;
case SC_UNKNOWNSENSOR:
std::cout << "Unknown sensor name: " << publicName << std::endl;
break;
default:
std::cout << "Internal error." << std::endl;
}
}
/*
* Set the scaling factor for a sensor
*/
void SensorAction::doScalingfactor(const char* publicName, const char* factor)
{
double f;
std::string fact(factor);
try {
f = std::stod(fact);
}
catch(std::exception& e) {
std::cout << factor << " is not a number." << std::endl;
return;
}
SensorConfig sensorConfig(connection);
DCDBPublicSensor publicSensor;
SCError err = sensorConfig.getPublicSensorByName(publicSensor, publicName);
switch (err) {
case SC_OK:
sensorConfig.setSensorScalingFactor(publicName, f);
break;
case SC_UNKNOWNSENSOR:
std::cout << "Unknown sensor name: " << publicName << std::endl;
break;
default:
std::cout << "Internal error." << std::endl;
}
}
/*
* Set the unit for a sensor.
*/
void SensorAction::doUnit(const char* publicName, const char* unit)
{
SensorConfig sensorConfig(connection);
DCDBPublicSensor publicSensor;
SCError err = sensorConfig.getPublicSensorByName(publicSensor, publicName);
switch (err) {
case SC_OK:
sensorConfig.setSensorUnit(publicName, unit);
break;
case SC_UNKNOWNSENSOR:
std::cout << "Unknown sensor name: " << publicName << std::endl;
break;
default:
std::cout << "Internal error." << std::endl;
}
}