//================================================================================ // Name : sensorconfig.cpp // Author : Axel Auweter, Daniele Tafani // Contact : info@dcdb.it // Copyright : Leibniz Supercomputing Centre // Description : C++ API implementation for configuring libdcdb public sensors. //================================================================================ //================================================================================ // This file is part of DCDB (DataCenter DataBase) // Copyright (C) 2011-2019 Leibniz Supercomputing Centre // // This library is free software; you can redistribute it and/or // modify it under the terms of the GNU Lesser General Public // License as published by the Free Software Foundation; either // version 2.1 of the License, or (at your option) any later version. // // This library is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU // Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public // License along with this library; if not, write to the Free Software // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA //================================================================================ #include #include #include #include #include #include #include #include "cassandra.h" #include #include "sensorconfig_internal.h" #include "dcdbglobals.h" #include "dcdbendian.h" #include "dcdb/virtualsensor.h" using namespace DCDB; /* * PublicSensor functions. */ PublicSensor::PublicSensor() { name = ""; is_virtual = false; pattern = ""; scaling_factor = 1.0; unit = ""; sensor_mask = 0; expression = ""; v_sensorid = ""; t_zero = 0; interval = 0; ttl = 0; } PublicSensor::PublicSensor (const PublicSensor ©) { name = copy.name; is_virtual = copy.is_virtual; pattern = copy.pattern; scaling_factor = copy.scaling_factor; unit = copy.unit; sensor_mask = copy.sensor_mask; expression = copy.expression; v_sensorid = copy.v_sensorid; t_zero = copy.t_zero; interval = copy.interval; operations = copy.operations; ttl = copy.ttl; } /* * SensorConfig functions */ SCError SensorConfig::loadCache() { return impl->loadCache(); } SCError SensorConfig::publishSensor(const char* publicName, const char* sensorPattern) { return impl->publishSensor(publicName, sensorPattern); } SCError SensorConfig::publishSensor(const PublicSensor& sensor) { return impl->publishSensor(sensor); } SCError SensorConfig::publishSensor(const SensorMetadata& sensor) { return impl->publishSensor(sensor); } SCError SensorConfig::publishVirtualSensor(const char* publicName, const char* vSensorExpression, const char* vSensorId, TimeStamp tZero, uint64_t interval) { return impl->publishVirtualSensor(publicName, vSensorExpression, vSensorId, tZero, interval); } SCError SensorConfig::unPublishSensor(const char* publicName) { return impl->unPublishSensor(publicName); } SCError SensorConfig::getPublicSensorNames(std::list& publicSensors) { return impl->getPublicSensorNames(publicSensors); } SCError SensorConfig::getPublicSensorsVerbose(std::list& publicSensors) { return impl->getPublicSensorsVerbose(publicSensors); } SCError SensorConfig::getPublicSensorByName(PublicSensor& sensor, const char* publicName) { return impl->getPublicSensorByName(sensor, publicName); } SCError SensorConfig::getPublicSensorByPattern(PublicSensor& sensor, const char* pattern) { return impl->getPublicSensorByPattern(sensor, pattern); } SCError SensorConfig::getPublicSensorsByWildcard(std::list& sensors, const char* wildcard) { return impl->getPublicSensorsByWildcard(sensors, wildcard); } SCError SensorConfig::isVirtual(bool& isVirtual, std::string publicName) { return impl->isVirtual(isVirtual, publicName); } SCError SensorConfig::getSensorPattern(std::string& pattern, std::string publicName) { return impl->getSensorPattern(pattern, publicName); } SCError SensorConfig::getSensorListForPattern(std::list& sensorIds, std::string pattern) { return impl->getSensorListForPattern(sensorIds, pattern); } SCError SensorConfig::getSensorListForPattern(std::list& sensorIds, std::string pattern, TimeStamp start, TimeStamp end) { return impl->getSensorListForPattern(sensorIds, pattern, start, end); } SCError SensorConfig::setSensorScalingFactor(std::string publicName, double scalingFactor) { return impl->setSensorScalingFactor(publicName, scalingFactor); } SCError SensorConfig::setSensorUnit(std::string publicName, std::string unit) { return impl->setSensorUnit(publicName, unit); } SCError SensorConfig::setSensorMask(std::string publicName, uint64_t mask) { return impl->setSensorMask(publicName, mask); } SCError SensorConfig::setOperations(std::string publicName, std::set operations) { return impl->setOperations(publicName,operations); } SCError SensorConfig::setTimeToLive(std::string publicName, uint64_t ttl) { return impl->setTimeToLive(publicName, ttl); } SCError SensorConfig::setVirtualSensorExpression(std::string publicName, std::string expression) { return impl->setVirtualSensorExpression(publicName, expression); } SCError SensorConfig::setVirtualSensorTZero(std::string publicName, TimeStamp tZero) { return impl->setVirtualSensorTZero(publicName, tZero); } SCError SensorConfig::setSensorInterval(std::string publicName, uint64_t interval) { return impl->setSensorInterval(publicName, interval); } SensorConfig::SensorConfig(Connection* conn) { /* Allocate impl object */ impl = new SensorConfigImpl(conn); } SensorConfig::~SensorConfig() { if (impl) { delete impl; } } /* * SensorConfigImpl protected members and functions */ /* * Validate the pattern for a Sensor to be published * Patterns may only consist of hex numbers and forward slashes * and at most one wildcard character (*). Also, the total number of * bits may not exceed 128. */ bool SensorConfigImpl::validateSensorPattern(const char* sensorPattern) { unsigned int wildcards, seps; /* Iterate through the string and validate/count the characters */ wildcards = 0, seps = 0; for (unsigned int c = 0; c < strlen(sensorPattern); c++) { switch (sensorPattern[c]) { case '*': wildcards++; break; case '/': seps++; break; /* Everything else is allowed */ default: break; } } /* More than one wildcard is not allowed */ if (wildcards > 1) return false; if (strlen(sensorPattern) - seps > MAX_PATTERN_LENGTH) return false; /* Looks good */ return true; } /* * Validate the public name of a sensor. */ bool SensorConfigImpl::validateSensorPublicName(std::string publicName) { return true; } /* * SensorConfigImpl public functions */ SCError SensorConfigImpl::loadCache() { sensorList.clear(); sensorMapByName.clear(); sensorMapByPattern.clear(); SCError rc = getPublicSensorsVerbose(sensorList); if (rc != SC_OK) { return rc; } else { for (auto& s: sensorList) { sensorMapByName.insert(std::make_pair(s.name, std::ref(s))); sensorMapByPattern.insert(std::make_pair(s.pattern, std::ref(s))); } return SC_OK; } } SCError SensorConfigImpl::publishSensor(std::string publicName, std::string sensorPattern) { /* Check if the pattern matches the requirements */ SensorId sid; if (!validateSensorPattern(sensorPattern.c_str()) || !sid.mqttTopicConvert(sensorPattern)) { return SC_INVALIDPATTERN; } /* Check if the publicName is valid */ if (!validateSensorPublicName(publicName.c_str())) { return SC_INVALIDPUBLICNAME; } /* Check if the session is valid */ if (!session) { return SC_INVALIDSESSION; } /* Insert the entry */ CassError rc = CASS_OK; CassStatement* statement = nullptr; CassFuture* future = nullptr; const CassPrepared* prepared = nullptr; const char* query = "INSERT INTO " CONFIG_KEYSPACE_NAME "." CF_PUBLISHEDSENSORS " (name, pattern, virtual) VALUES (?,?, FALSE);"; future = cass_session_prepare(session, query); cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); cass_future_free(future); return SC_UNKNOWNERROR; } else { prepared = cass_future_get_prepared(future); } cass_future_free(future); statement = cass_prepared_bind(prepared); cass_statement_bind_string_by_name(statement, "name", publicName.c_str()); cass_statement_bind_string_by_name(statement, "pattern", sid.getId().c_str()); future = cass_session_execute(session, statement); cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); cass_prepared_free(prepared); cass_future_free(future); cass_statement_free(statement); return SC_UNKNOWNERROR; } cass_prepared_free(prepared); cass_future_free(future); cass_statement_free(statement); return SC_OK; } SCError SensorConfigImpl::publishSensor(const PublicSensor& sensor) { /* Check if the pattern matches the requirements */ SensorId sid; if (!validateSensorPattern(sensor.pattern.c_str()) || !sid.mqttTopicConvert(sensor.pattern)) { return SC_INVALIDPATTERN; } /* Check if the publicName is valid */ if (!validateSensorPublicName(sensor.name.c_str())) { return SC_INVALIDPUBLICNAME; } /* Check if the session is valid */ if (!session) { return SC_INVALIDSESSION; } /* Insert the entry */ CassError rc = CASS_OK; CassStatement* statement = nullptr; CassFuture* future = nullptr; const CassPrepared* prepared = nullptr; const char* query = "INSERT INTO " CONFIG_KEYSPACE_NAME "." CF_PUBLISHEDSENSORS " (name, pattern, virtual, scaling_factor, unit, sensor_mask, interval, ttl) VALUES (?,?, FALSE, ?, ?, ?, ?, ?);"; future = cass_session_prepare(session, query); cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); cass_future_free(future); return SC_UNKNOWNERROR; } else { prepared = cass_future_get_prepared(future); } cass_future_free(future); statement = cass_prepared_bind(prepared); cass_statement_bind_string_by_name(statement, "name", sensor.name.c_str()); cass_statement_bind_string_by_name(statement, "pattern", sid.getId().c_str()); cass_statement_bind_double_by_name(statement, "scaling_factor", sensor.scaling_factor); cass_statement_bind_string_by_name(statement, "unit", sensor.unit.c_str()); cass_statement_bind_int64_by_name(statement, "sensor_mask", sensor.sensor_mask); cass_statement_bind_int64_by_name(statement, "interval", sensor.interval); cass_statement_bind_int64_by_name(statement, "ttl", sensor.ttl); future = cass_session_execute(session, statement); cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); cass_prepared_free(prepared); cass_future_free(future); cass_statement_free(statement); return SC_UNKNOWNERROR; } cass_prepared_free(prepared); cass_future_free(future); cass_statement_free(statement); // Operations are inserted as an update statement, if required if(sensor.operations.size() > 0) return setOperations(sensor.name, sensor.operations); else return SC_OK; } SCError SensorConfigImpl::publishSensor(const SensorMetadata& sensor) { /* Check if the pattern matches the requirements */ SensorId sid; if (!sensor.getPattern() || !validateSensorPattern(sensor.getPattern()->c_str()) || !sid.mqttTopicConvert(*sensor.getPattern())) { return SC_INVALIDPATTERN; } /* Check if the publicName is valid */ if (!sensor.getPublicName() || !validateSensorPublicName(sensor.getPublicName()->c_str())) { return SC_INVALIDPUBLICNAME; } /* Check if the session is valid */ if (!session) { return SC_INVALIDSESSION; } /* Insert the entry */ CassError rc = CASS_OK; CassStatement* statement = nullptr; CassFuture* future = nullptr; const CassPrepared* prepared = nullptr; std::string queryBuf = "INSERT INTO " + std::string(CONFIG_KEYSPACE_NAME) + "." + std::string(CF_PUBLISHEDSENSORS) + " (name, pattern, virtual"; std::string valuesBuf = ") VALUES (?, ?, FALSE"; std::string closingBuf = ");"; if(sensor.getScale()) { queryBuf += ", scaling_factor"; valuesBuf += ", ?"; } if(sensor.getUnit()) { queryBuf += ", unit"; valuesBuf += ", ?"; } if(sensor.getIntegrable() || sensor.getMonotonic()) { queryBuf += ", sensor_mask"; valuesBuf += ", ?"; } if(sensor.getInterval()) { queryBuf += ", interval"; valuesBuf += ", ?"; } if(sensor.getTTL()) { queryBuf += ", ttl"; valuesBuf += ", ?"; } std::string query = queryBuf + valuesBuf + closingBuf; future = cass_session_prepare(session, query.c_str()); cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); cass_future_free(future); return SC_UNKNOWNERROR; } else { prepared = cass_future_get_prepared(future); } cass_future_free(future); statement = cass_prepared_bind(prepared); cass_statement_bind_string_by_name(statement, "name", sensor.getPublicName()->c_str()); cass_statement_bind_string_by_name(statement, "pattern", sid.getId().c_str()); if(sensor.getScale()) cass_statement_bind_double_by_name(statement, "scaling_factor", *sensor.getScale()); if(sensor.getUnit()) cass_statement_bind_string_by_name(statement, "unit", sensor.getUnit()->c_str()); if(sensor.getInterval()) cass_statement_bind_int64_by_name(statement, "interval", *sensor.getInterval()); if(sensor.getTTL()) cass_statement_bind_int64_by_name(statement, "ttl", *sensor.getTTL()); if(sensor.getIntegrable() || sensor.getMonotonic()) { uint64_t sensorMask = 0; if(sensor.getIntegrable()) sensorMask = sensorMask | INTEGRABLE; if(sensor.getMonotonic()) sensorMask = sensorMask | MONOTONIC; cass_statement_bind_int64_by_name(statement, "sensor_mask", sensorMask); } future = cass_session_execute(session, statement); cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); cass_prepared_free(prepared); cass_future_free(future); cass_statement_free(statement); return SC_UNKNOWNERROR; } cass_prepared_free(prepared); cass_future_free(future); cass_statement_free(statement); // Operations are inserted as an update statement, if required if(sensor.getOperations() && sensor.getOperations()->size()>0) return setOperations(*sensor.getPublicName(), *sensor.getOperations()); else return SC_OK; } SCError SensorConfigImpl::publishVirtualSensor(std::string publicName, std::string vSensorExpression, std::string vSensorId, TimeStamp tZero, uint64_t interval) { /* Check if the publicName is valid */ if (!validateSensorPublicName(publicName.c_str())) { return SC_INVALIDPUBLICNAME; } /* Check if the session is valid */ if (!session) { return SC_INVALIDSESSION; } /* Validate vSensorExpression */ try { VSensorExpression vsExp(connection, vSensorExpression); /* Check that it is not recursive-pointing to itself */ std::unordered_set inputSet; vsExp.getInputsRecursive(inputSet); std::unordered_set::const_iterator found = inputSet.find(publicName); if (found != inputSet.end()) { return SC_EXPRESSIONSELFREF; } } catch (std::exception& e) { std::cout << e.what(); return SC_INVALIDEXPRESSION; } /* Check if the vSensorId is valid */ SensorId vSensor; if (!vSensor.mqttTopicConvert(vSensorId)) { return SC_INVALIDVSENSORID; } /* Insert the entry */ CassError rc = CASS_OK; CassStatement* statement = nullptr; CassFuture* future = nullptr; const CassPrepared* prepared = nullptr; const char* query = "INSERT INTO " CONFIG_KEYSPACE_NAME "." CF_PUBLISHEDSENSORS " (name, expression, vsensorid, tzero, interval, virtual) VALUES (?,?,?,?,?,TRUE);"; future = cass_session_prepare(session, query); cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); cass_future_free(future); return SC_UNKNOWNERROR; } else { prepared = cass_future_get_prepared(future); } cass_future_free(future); statement = cass_prepared_bind(prepared); cass_statement_bind_string_by_name(statement, "name", publicName.c_str()); cass_statement_bind_string_by_name(statement, "expression", vSensorExpression.c_str()); cass_statement_bind_string_by_name(statement, "vsensorid", vSensorId.c_str()); cass_statement_bind_int64_by_name(statement, "tzero", tZero.getRaw()); cass_statement_bind_int64_by_name(statement, "interval", interval); future = cass_session_execute(session, statement); cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); cass_prepared_free(prepared); cass_future_free(future); cass_statement_free(statement); return SC_UNKNOWNERROR; } cass_prepared_free(prepared); cass_future_free(future); cass_statement_free(statement); return SC_OK; } SCError SensorConfigImpl::unPublishSensor(std::string publicName) { /* Check if the session is valid */ if (!session) { return SC_INVALIDSESSION; } /* Remove the entry */ CassError rc = CASS_OK; CassStatement* statement = nullptr; CassFuture* future = nullptr; const CassPrepared* prepared = nullptr; const char* query = "DELETE FROM " CONFIG_KEYSPACE_NAME "." CF_PUBLISHEDSENSORS " WHERE name = ? ;"; future = cass_session_prepare(session, query); cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); cass_future_free(future); return SC_UNKNOWNERROR; } else { prepared = cass_future_get_prepared(future); } cass_future_free(future); statement = cass_prepared_bind(prepared); cass_statement_bind_string_by_name(statement, "name", publicName.c_str()); future = cass_session_execute(session, statement); cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); cass_prepared_free(prepared); cass_future_free(future); cass_statement_free(statement); return SC_UNKNOWNERROR; } cass_prepared_free(prepared); cass_future_free(future); cass_statement_free(statement); return SC_OK; } SCError SensorConfigImpl::getPublicSensorNames(std::list& publicSensors) { /* Check if the session is valid */ if (!session) { return SC_INVALIDSESSION; } /* Clear the list */ publicSensors.clear(); /* Fill the list with all public sensors */ CassError rc = CASS_OK; CassStatement* statement = nullptr; CassFuture* future = nullptr; const char* query = "SELECT name FROM " CONFIG_KEYSPACE_NAME "." CF_PUBLISHEDSENSORS " ;"; statement = cass_statement_new(query, 0); future = cass_session_execute(session, statement); cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); cass_future_free(future); cass_statement_free(statement); return SC_UNKNOWNERROR; } else { const CassResult* result = cass_future_get_result(future); CassIterator* iterator = cass_iterator_from_result(result); while (cass_iterator_next(iterator)) { const char* name; size_t name_len; const CassRow* row = cass_iterator_get_row(iterator); if (cass_value_get_string(cass_row_get_column_by_name(row, "name"), &name, &name_len) != CASS_OK) { name = ""; name_len = 0; } publicSensors.push_back(std::string(name, name_len)); } cass_result_free(result); cass_iterator_free(iterator); } cass_future_free(future); cass_statement_free(statement); return SC_OK; } SCError SensorConfigImpl::getPublicSensorsVerbose(std::list& publicSensors) { /* Check if the session is valid */ if (!session) { return SC_INVALIDSESSION; } /* Clear the list */ publicSensors.clear(); /* Fill the list with all public sensors */ CassError rc = CASS_OK; CassStatement* statement = nullptr; CassFuture* future = nullptr; const char* query = "SELECT * FROM " CONFIG_KEYSPACE_NAME "." CF_PUBLISHEDSENSORS " ;"; statement = cass_statement_new(query, 0); future = cass_session_execute(session, statement); cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); cass_future_free(future); cass_statement_free(statement); return SC_UNKNOWNERROR; } else { const CassResult* result = cass_future_get_result(future); CassIterator* iterator = cass_iterator_from_result(result); while (cass_iterator_next(iterator)) { const char* name; size_t name_len; cass_bool_t is_virtual; const char* pattern; size_t pattern_len; double scaling_factor; const char* unit; size_t unit_len; int64_t sensor_mask; const char* expression; size_t expression_len; const char* vsensorid; size_t vsensorid_len; int64_t tzero; int64_t interval; int64_t ttl; set operations; PublicSensor sensor; const CassRow* row = cass_iterator_get_row(iterator); if (cass_value_get_string(cass_row_get_column_by_name(row, "name"), &name, &name_len) != CASS_OK) { name = ""; name_len = 0; } if (cass_value_get_bool(cass_row_get_column_by_name(row, "virtual"), &is_virtual) != CASS_OK) { is_virtual = cass_false; } if (cass_value_get_string(cass_row_get_column_by_name(row, "pattern"), &pattern, &pattern_len) != CASS_OK) { pattern = ""; pattern_len = 0; } if (cass_value_get_double(cass_row_get_column_by_name(row, "scaling_factor"), &scaling_factor) != CASS_OK) { scaling_factor = 1.0; } if (cass_value_get_string(cass_row_get_column_by_name(row, "unit"), &unit, &unit_len) != CASS_OK) { unit = ""; unit_len = 0; } if (cass_value_get_int64(cass_row_get_column_by_name(row, "sensor_mask"), &sensor_mask) != CASS_OK) { sensor_mask = 0; } if (cass_value_get_string(cass_row_get_column_by_name(row, "expression"), &expression, &expression_len) != CASS_OK) { expression = ""; expression_len = 0; } if (cass_value_get_string(cass_row_get_column_by_name(row, "vsensorid"), &vsensorid, &vsensorid_len) != CASS_OK) { vsensorid = ""; vsensorid_len = 0; } if (cass_value_get_int64(cass_row_get_column_by_name(row, "tzero"), &tzero) != CASS_OK) { tzero = 0; } if (cass_value_get_int64(cass_row_get_column_by_name(row, "interval"), &interval) != CASS_OK) { interval = 0; } if (cass_value_get_int64(cass_row_get_column_by_name(row, "ttl"), &ttl) != CASS_OK) { ttl = 0; } const CassValue* opSet = nullptr; CassIterator *opSetIt = nullptr; if((opSet=cass_row_get_column_by_name(row, "operations")) && (opSetIt=cass_iterator_from_collection(opSet))) { const char *opString; size_t opLen; while (cass_iterator_next(opSetIt)) { if (cass_value_get_string(cass_iterator_get_value(opSetIt), &opString, &opLen) != CASS_OK) { operations.clear(); break; } else operations.insert(std::string(opString, opLen)); } cass_iterator_free(opSetIt); } sensor.name = std::string(name, name_len); sensor.is_virtual = is_virtual == cass_true ? true : false; sensor.pattern = std::string(pattern, pattern_len); sensor.scaling_factor = scaling_factor; sensor.unit = std::string(unit, unit_len); sensor.sensor_mask = sensor_mask; sensor.expression = std::string(expression, expression_len); sensor.v_sensorid = std::string(vsensorid, vsensorid_len); sensor.t_zero = tzero; sensor.interval = interval; sensor.ttl = ttl; sensor.operations = operations; publicSensors.push_back(sensor); } cass_result_free(result); cass_iterator_free(iterator); } cass_future_free(future); cass_statement_free(statement); return SC_OK; } SCError SensorConfigImpl::getPublicSensorByName(PublicSensor& sensor, const char* publicName) { /* Check if the sensor definition is already in the cache */ SensorMap_t::const_iterator got = sensorMapByName.find(publicName); if (got != sensorMapByName.end()) { sensor = got->second; return SC_OK; } /* Not in cache - query the data base */ /* Check if the session is valid */ if (!session) { return SC_INVALIDSESSION; } /* Fill the list with all public sensors */ CassError rc = CASS_OK; CassStatement* statement = nullptr; CassFuture* future = nullptr; const CassPrepared* prepared = nullptr; const char* query = "SELECT * FROM " CONFIG_KEYSPACE_NAME "." CF_PUBLISHEDSENSORS " WHERE name = ?;"; future = cass_session_prepare(session, query); cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); cass_future_free(future); return SC_UNKNOWNERROR; } else { prepared = cass_future_get_prepared(future); } cass_future_free(future); statement = cass_prepared_bind(prepared); cass_statement_bind_string_by_name(statement, "name", publicName); future = cass_session_execute(session, statement); cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); cass_future_free(future); cass_statement_free(statement); return SC_UNKNOWNERROR; } else { const CassResult* result = cass_future_get_result(future); CassIterator* iterator = cass_iterator_from_result(result); if (cass_iterator_next(iterator)) { const char* name; size_t name_len; cass_bool_t is_virtual; const char* pattern; size_t pattern_len; double scaling_factor; const char* unit; size_t unit_len; int64_t sensor_mask; const char* expression; size_t expression_len; const char* vsensorid; size_t vsensorid_len; int64_t tzero; int64_t interval; int64_t ttl; set operations; const CassRow* row = cass_iterator_get_row(iterator); if (cass_value_get_string(cass_row_get_column_by_name(row, "name"), &name, &name_len) != CASS_OK) { name = ""; name_len = 0; } if (cass_value_get_bool(cass_row_get_column_by_name(row, "virtual"), &is_virtual) != CASS_OK) { is_virtual = cass_false; } if (cass_value_get_string(cass_row_get_column_by_name(row, "pattern"), &pattern, &pattern_len) != CASS_OK) { pattern = ""; pattern_len = 0; } if (cass_value_get_double(cass_row_get_column_by_name(row, "scaling_factor"), &scaling_factor) != CASS_OK) { scaling_factor = 1.0; } if (cass_value_get_string(cass_row_get_column_by_name(row, "unit"), &unit, &unit_len) != CASS_OK) { unit = ""; unit_len = 0; } if (cass_value_get_int64(cass_row_get_column_by_name(row, "sensor_mask"), &sensor_mask) != CASS_OK) { sensor_mask = 0; } if (cass_value_get_string(cass_row_get_column_by_name(row, "expression"), &expression, &expression_len) != CASS_OK) { expression = ""; expression_len = 0; } if (cass_value_get_string(cass_row_get_column_by_name(row, "vsensorid"), &vsensorid, &vsensorid_len) != CASS_OK) { vsensorid = ""; vsensorid_len = 0; } if (cass_value_get_int64(cass_row_get_column_by_name(row, "tzero"), &tzero) != CASS_OK) { tzero = 0; } if (cass_value_get_int64(cass_row_get_column_by_name(row, "interval"), &interval) != CASS_OK) { interval = 0; } if (cass_value_get_int64(cass_row_get_column_by_name(row, "ttl"), &ttl) != CASS_OK) { ttl = 0; } const CassValue* opSet = nullptr; CassIterator *opSetIt = nullptr; if((opSet=cass_row_get_column_by_name(row, "operations")) && (opSetIt=cass_iterator_from_collection(opSet))) { CassIterator *opSetIt = cass_iterator_from_collection(opSet); const char *opString; size_t opLen; while (cass_iterator_next(opSetIt)) { if (cass_value_get_string(cass_iterator_get_value(opSetIt), &opString, &opLen) != CASS_OK) { operations.clear(); break; } else operations.insert(std::string(opString, opLen)); } cass_iterator_free(opSetIt); } sensor.name = std::string(name, name_len); sensor.is_virtual = is_virtual == cass_true ? true : false; sensor.pattern = std::string(pattern, pattern_len); sensor.scaling_factor = scaling_factor; sensor.unit = std::string(unit, unit_len); sensor.sensor_mask = sensor_mask; sensor.expression = std::string(expression, expression_len); sensor.v_sensorid = std::string(vsensorid, vsensorid_len); sensor.t_zero = tzero; sensor.interval = interval; sensor.ttl = ttl; sensor.operations = operations; /* Add to sensorPropertyCache for later use */ sensorList.push_back(sensor); sensorMapByName.insert(std::make_pair(publicName, std::ref(sensorList.back()))); sensorMapByPattern.insert(std::make_pair(sensor.pattern, std::ref(sensorList.back()))); } else { cass_result_free(result); cass_iterator_free(iterator); cass_future_free(future); cass_statement_free(statement); return SC_UNKNOWNSENSOR; } cass_result_free(result); cass_iterator_free(iterator); } cass_future_free(future); cass_statement_free(statement); cass_prepared_free(prepared); return SC_OK; } SCError SensorConfigImpl::getPublicSensorByPattern(PublicSensor& sensor, const char* pattern) { if (sensorList.size() == 0) { loadCache(); } SensorMap_t::const_iterator got = sensorMapByPattern.find(pattern); if (got != sensorMapByPattern.end()) { sensor = got->second; return SC_OK; } else { return SC_UNKNOWNSENSOR; } } SCError SensorConfigImpl::getPublicSensorsByWildcard(std::list& sensors, const char* wildcard) { SCError err = SC_OK; if (strpbrk(wildcard, "*?") == NULL) { PublicSensor sen; if ((err = getPublicSensorByName(sen, wildcard)) == SC_OK) { sensors.push_back(sen); } return err; } else { if ((err = getPublicSensorsVerbose(sensors)) == SC_OK) { std::string w("^"); while (*wildcard != 0) { switch (*wildcard) { case '.': case '[': case '\\': case '^': case '$': w.append("\\"); w.append(wildcard, 1); break; case '?': w.append("."); break; case '*': w.append(".*"); break; default: w.append(wildcard, 1); break; } wildcard++; } boost::regex r(w, boost::regex::basic); auto it = sensors.begin(); while(it != sensors.end()) { if (!boost::regex_match(it->name, r)) { it = sensors.erase(it); } else { it++; } } if (sensors.size() > 0) { return SC_OK; } else { return SC_UNKNOWNSENSOR; } } else { return err; } } } SCError SensorConfigImpl::isVirtual(bool& isVirtual, std::string publicName) { /* Check if the session is valid */ if (!session) { return SC_INVALIDSESSION; } /* Read the virtual field from the database */ CassError rc = CASS_OK; CassStatement* statement = nullptr; CassFuture* future = nullptr; const CassPrepared* prepared = nullptr; const char* query = "SELECT virtual FROM " CONFIG_KEYSPACE_NAME "." CF_PUBLISHEDSENSORS " WHERE name = ? ;"; future = cass_session_prepare(session, query); cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); cass_future_free(future); return SC_UNKNOWNERROR; } else { prepared = cass_future_get_prepared(future); } cass_future_free(future); statement = cass_prepared_bind(prepared); cass_statement_bind_string_by_name(statement, "name", publicName.c_str()); future = cass_session_execute(session, statement); cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); } else { const CassResult* result = cass_future_get_result(future); CassIterator* iterator = cass_iterator_from_result(result); if (cass_iterator_next(iterator)) { cass_bool_t isVirtual_; const CassRow* row = cass_iterator_get_row(iterator); cass_value_get_bool(cass_row_get_column_by_name(row, "virtual"), &isVirtual_); isVirtual = isVirtual_ ? true : false; } else { return SC_UNKNOWNSENSOR; } cass_result_free(result); cass_iterator_free(iterator); } cass_future_free(future); cass_statement_free(statement); cass_prepared_free(prepared); return SC_OK; } SCError SensorConfigImpl::getSensorPattern(std::string& pattern, std::string publicName) { /* Check if the session is valid */ if (!session) { return SC_INVALIDSESSION; } /* Ensure that the public sensor is not virtual */ bool virt; SCError err = isVirtual(virt, publicName); if (err != SC_OK) { return err; } if (virt) { return SC_WRONGTYPE; } /* Read the Pattern string from the database */ CassError rc = CASS_OK; CassStatement* statement = nullptr; CassFuture* future = nullptr; const CassPrepared* prepared = nullptr; const char* query = "SELECT pattern FROM " CONFIG_KEYSPACE_NAME "." CF_PUBLISHEDSENSORS " WHERE name = ? ;"; future = cass_session_prepare(session, query); cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); cass_future_free(future); return SC_UNKNOWNERROR; } else { prepared = cass_future_get_prepared(future); } cass_future_free(future); statement = cass_prepared_bind(prepared); cass_statement_bind_string_by_name(statement, "name", publicName.c_str()); future = cass_session_execute(session, statement); cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); } else { const CassResult* result = cass_future_get_result(future); CassIterator* iterator = cass_iterator_from_result(result); if (cass_iterator_next(iterator)) { const char* pattern_cstr; size_t pattern_len; const CassRow* row = cass_iterator_get_row(iterator); cass_value_get_string(cass_row_get_column_by_name(row, "pattern"), &pattern_cstr, &pattern_len); pattern = std::string(pattern_cstr, pattern_len); } else { return SC_UNKNOWNSENSOR; } cass_result_free(result); cass_iterator_free(iterator); } cass_future_free(future); cass_statement_free(statement); cass_prepared_free(prepared); return SC_OK; } SCError SensorConfigImpl::getSensorListForPattern(std::list& sensorIds, std::string pattern) { /* Tiny hack to call the long version of this function */ TimeStamp start((uint64_t)0x0); TimeStamp end((uint64_t)0x260DD31906D70000); return getSensorListForPattern(sensorIds, pattern, start, end); } SCError SensorConfigImpl::getSensorListForPattern(std::list& sensorIds, std::string pattern, TimeStamp start, TimeStamp end) { /* Clear the list of sensorIds */ sensorIds.clear(); /* Strip all slashes from publishedSensorName */ //pattern.erase(std::remove(pattern.begin(), pattern.end(), '/'), pattern.end()); uint16_t wsStart = start.getWeekstamp(); uint16_t wsEnd = end.getWeekstamp(); for(uint16_t ws=wsStart; ws<=wsEnd; ws++) { SensorId sensor; sensor.setId(pattern); sensor.setRsvd(ws); sensorIds.push_back(sensor); } return SC_OK; // /* Clear the list of sensorIds */ // sensorIds.clear(); // // /* Strip all slashes from publishedSensorName */ // pattern.erase(std::remove(pattern.begin(), pattern.end(), '/'), pattern.end()); // // /* Calculate lower and upper boundaries for the expansion of the pattern */ // std::string low = pattern; // std::string high = pattern; // if (pattern.find("*") != std::string::npos) { // low.replace(pattern.find("*"), 1, MAX_PATTERN_LENGTH-pattern.length(), 0); // high.replace(pattern.find("*"), 1, MAX_PATTERN_LENGTH-pattern.length(), 127); // } // // SensorId lowId, highId; // if (!lowId.mqttTopicConvert(low)) { // return SC_INVALIDPATTERN; // } // if (!highId.mqttTopicConvert(high)) { // return SC_INVALIDPATTERN; // } // // std::cout << "Lower boundary for sensor scan: " << std::hex << std::setfill('0') << std::setw(16) << lowId.raw[0] << " " << std::hex << std::setfill('0') << std::setw(16) << lowId.raw[1] << std::endl; // std::cout << "Upper boundary for sensor scan: " << std::hex << std::setfill('0') << std::setw(16) << highId.raw[0] << " " << std::hex << std::setfill('0') << std::setw(16) << highId.raw[1] << std::endl; // // /* Query the database to see which raw sensors actually exist in the interval between low and high */ // CassError rc = CASS_OK; // CassStatement* statement = nullptr; // CassFuture* future = nullptr; // const CassPrepared* prepared = nullptr; // const char* query = "SELECT DISTINCT sid,ws FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE TOKEN(sid) >= TOKEN(?) and TOKEN(sid) <= TOKEN(?) and ws >= ? and ws <= ?;"; // // future = cass_session_prepare(session, query); // cass_future_wait(future); // // rc = cass_future_error_code(future); // if (rc != CASS_OK) { // connection->printError(future); // return SC_UNKNOWNERROR; // } // // prepared = cass_future_get_prepared(future); // cass_future_free(future); // // statement = cass_prepared_bind(prepared); // // cass_statement_bind_string(statement, 0, low.c_str()); // cass_statement_bind_string(statement, 1, high.c_str()); // cass_statement_bind_int16(statement, 2, (cass_int16_t)start.getWeekstamp()); // cass_statement_bind_int16(statement, 3, (cass_int16_t)end.getWeekstamp()); // // future = cass_session_execute(session, statement); // cass_future_wait(future); // // rc = cass_future_error_code(future); // if (rc != CASS_OK) { // connection->printError(future); // return SC_UNKNOWNERROR; // } // // const CassResult* result = cass_future_get_result(future); // cass_future_free(future); // // CassIterator* iterator = cass_iterator_from_result(result); // while (cass_iterator_next(iterator)) { // const CassRow* row = cass_iterator_get_row(iterator); // const char* res; // size_t res_len; // cass_int16_t res_ws; // cass_value_get_string(cass_row_get_column_by_name(row, "sid"), &res, &res_len); // cass_value_get_int16(cass_row_get_column_by_name(row, "ws"), &res_ws); // // SensorId sensor; // std::string id(res, res_len); // sensor.setId(id); // // /* Check if the sensorId matches the pattern and append to result */ // if (sensor.patternMatch(pattern)) { // /* Only append if within the weekstamp window */ // if ((res_ws >= start.getWeekstamp()) && (res_ws <= end.getWeekstamp())) { // sensorIds.push_back(sensor); // } // } // } // cass_result_free(result); // cass_iterator_free(iterator); // cass_statement_free(statement); // cass_prepared_free(prepared); // // return SC_OK; } SCError SensorConfigImpl::setSensorScalingFactor(std::string publicName, double scalingFactor) { SCError error = SC_UNKNOWNERROR; /* Check the sensorconfig whether the given public sensor has the integrable flag set to true */ CassError rc = CASS_OK; CassStatement* statement = nullptr; CassFuture* future = nullptr; const CassPrepared* prepared = nullptr; const char* query = "UPDATE " CONFIG_KEYSPACE_NAME "." CF_PUBLISHEDSENSORS " SET scaling_factor = ? WHERE name = ? ;"; future = cass_session_prepare(session, query); cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); return SC_UNKNOWNERROR; } prepared = cass_future_get_prepared(future); cass_future_free(future); statement = cass_prepared_bind(prepared); cass_statement_bind_double(statement, 0, scalingFactor); cass_statement_bind_string(statement, 1, publicName.c_str()); future = cass_session_execute(session, statement); cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); error = SC_UNKNOWNERROR; } else { error = SC_OK; } cass_statement_free(statement); cass_prepared_free(prepared); return error; } SCError SensorConfigImpl::setSensorUnit(std::string publicName, std::string unit) { SCError error = SC_UNKNOWNERROR; /* Check the sensorconfig whether the given public sensor has the integrable flag set to true */ CassError rc = CASS_OK; CassStatement* statement = nullptr; CassFuture* future = nullptr; const CassPrepared* prepared = nullptr; const char* query = "UPDATE " CONFIG_KEYSPACE_NAME "." CF_PUBLISHEDSENSORS " SET unit = ? WHERE name = ? ;"; future = cass_session_prepare(session, query); cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); return SC_UNKNOWNERROR; } prepared = cass_future_get_prepared(future); cass_future_free(future); statement = cass_prepared_bind(prepared); cass_statement_bind_string(statement, 0, unit.c_str()); cass_statement_bind_string(statement, 1, publicName.c_str()); future = cass_session_execute(session, statement); cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); error = SC_UNKNOWNERROR; } else { error = SC_OK; } cass_statement_free(statement); cass_prepared_free(prepared); return error; } SCError SensorConfigImpl::setSensorMask(std::string publicName, uint64_t mask) { SCError error = SC_UNKNOWNERROR; /* Check the sensorconfig whether the given public sensor has the integrable flag set to true */ CassError rc = CASS_OK; CassStatement* statement = nullptr; CassFuture* future = nullptr; const CassPrepared* prepared = nullptr; const char* query = "UPDATE " CONFIG_KEYSPACE_NAME "." CF_PUBLISHEDSENSORS " SET sensor_mask = ? WHERE name = ? ;"; future = cass_session_prepare(session, query); cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); return SC_UNKNOWNERROR; } prepared = cass_future_get_prepared(future); cass_future_free(future); statement = cass_prepared_bind(prepared); cass_statement_bind_int64(statement, 0, mask); cass_statement_bind_string(statement, 1, publicName.c_str()); future = cass_session_execute(session, statement); cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); error = SC_UNKNOWNERROR; } else { error = SC_OK; } cass_statement_free(statement); cass_prepared_free(prepared); return error; } SCError SensorConfigImpl::setOperations(std::string publicName, std::set operations) { SCError error = SC_UNKNOWNERROR; CassError rc = CASS_OK; CassStatement* statement = nullptr; CassFuture* future = nullptr; const CassPrepared* prepared = nullptr; const char* query = "UPDATE " CONFIG_KEYSPACE_NAME "." CF_PUBLISHEDSENSORS " SET operations = operations + ? WHERE name = ? ;"; CassCollection* cassSet = cass_collection_new(CASS_COLLECTION_TYPE_SET, operations.size()); for(const auto& op : operations) cass_collection_append_string(cassSet, op.c_str()); future = cass_session_prepare(session, query); cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); return SC_UNKNOWNERROR; } prepared = cass_future_get_prepared(future); cass_future_free(future); statement = cass_prepared_bind(prepared); cass_statement_bind_collection(statement, 0, cassSet); cass_statement_bind_string(statement, 1, publicName.c_str()); future = cass_session_execute(session, statement); cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); error = SC_UNKNOWNERROR; } else { error = SC_OK; } cass_statement_free(statement); cass_prepared_free(prepared); cass_collection_free(cassSet); return error; } SCError SensorConfigImpl::setTimeToLive(std::string publicName, uint64_t ttl) { SCError error = SC_UNKNOWNERROR; CassError rc = CASS_OK; CassStatement* statement = nullptr; CassFuture* future = nullptr; const CassPrepared* prepared = nullptr; const char* query = "UPDATE " CONFIG_KEYSPACE_NAME "." CF_PUBLISHEDSENSORS " SET ttl = ? WHERE name = ? ;"; future = cass_session_prepare(session, query); cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); return SC_UNKNOWNERROR; } prepared = cass_future_get_prepared(future); cass_future_free(future); statement = cass_prepared_bind(prepared); cass_statement_bind_int64(statement, 0, ttl); cass_statement_bind_string(statement, 1, publicName.c_str()); future = cass_session_execute(session, statement); cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); error = SC_UNKNOWNERROR; } else { error = SC_OK; } cass_statement_free(statement); cass_prepared_free(prepared); return error; } SCError SensorConfigImpl::setVirtualSensorExpression(std::string publicName, std::string expression) { /* Check if the session is valid */ if (!session) { return SC_INVALIDSESSION; } /* Ensure that the public sensor is virtual */ bool virt; SCError err = isVirtual(virt, publicName); if (err != SC_OK) { return err; } if (!virt) { return SC_WRONGTYPE; } /* TODO: Validate expression! */ /* Update the database with the new expression */ SCError error = SC_UNKNOWNERROR; CassError rc = CASS_OK; CassStatement* statement = nullptr; CassFuture* future = nullptr; const CassPrepared* prepared = nullptr; const char* query = "UPDATE " CONFIG_KEYSPACE_NAME "." CF_PUBLISHEDSENSORS " SET expression = ? WHERE name = ? ;"; future = cass_session_prepare(session, query); cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); return SC_UNKNOWNERROR; } prepared = cass_future_get_prepared(future); cass_future_free(future); statement = cass_prepared_bind(prepared); cass_statement_bind_string(statement, 0, expression.c_str()); cass_statement_bind_string(statement, 1, publicName.c_str()); future = cass_session_execute(session, statement); cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); error = SC_UNKNOWNERROR; } else { error = SC_OK; } cass_statement_free(statement); cass_prepared_free(prepared); return error; } SCError SensorConfigImpl::setVirtualSensorTZero(std::string publicName, TimeStamp tZero) { /* Check if the session is valid */ if (!session) { return SC_INVALIDSESSION; } /* Ensure that the public sensor is virtual */ bool virt; SCError err = isVirtual(virt, publicName); if (err != SC_OK) { return err; } if (!virt) { return SC_WRONGTYPE; } /* Update the database with the new expression */ SCError error = SC_UNKNOWNERROR; CassError rc = CASS_OK; CassStatement* statement = nullptr; CassFuture* future = nullptr; const CassPrepared* prepared = nullptr; const char* query = "UPDATE " CONFIG_KEYSPACE_NAME "." CF_PUBLISHEDSENSORS " SET tzero = ? WHERE name = ? ;"; future = cass_session_prepare(session, query); cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); return SC_UNKNOWNERROR; } prepared = cass_future_get_prepared(future); cass_future_free(future); statement = cass_prepared_bind(prepared); cass_statement_bind_int64(statement, 0, tZero.getRaw()); cass_statement_bind_string(statement, 1, publicName.c_str()); future = cass_session_execute(session, statement); cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); error = SC_UNKNOWNERROR; } else { error = SC_OK; } cass_statement_free(statement); cass_prepared_free(prepared); return error; } SCError SensorConfigImpl::setSensorInterval(std::string publicName, uint64_t interval) { /* Check if the session is valid */ if (!session) { return SC_INVALIDSESSION; } /* Ensure that the public sensor is virtual */ //bool virt; //SCError err = isVirtual(virt, publicName); //if (err != SC_OK) { // return err; //} //if (!virt) { // return SC_WRONGTYPE; //} /* Update the database with the new expression */ SCError error = SC_UNKNOWNERROR; CassError rc = CASS_OK; CassStatement* statement = nullptr; CassFuture* future = nullptr; const CassPrepared* prepared = nullptr; const char* query = "UPDATE " CONFIG_KEYSPACE_NAME "." CF_PUBLISHEDSENSORS " SET interval = ? WHERE name = ? ;"; future = cass_session_prepare(session, query); cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); return SC_UNKNOWNERROR; } prepared = cass_future_get_prepared(future); cass_future_free(future); statement = cass_prepared_bind(prepared); cass_statement_bind_int64(statement, 0, interval); cass_statement_bind_string(statement, 1, publicName.c_str()); future = cass_session_execute(session, statement); cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); error = SC_UNKNOWNERROR; } else { error = SC_OK; } cass_statement_free(statement); cass_prepared_free(prepared); return error; } SensorConfigImpl::SensorConfigImpl(Connection* conn) { connection = conn; session = connection->getSessionHandle(); } SensorConfigImpl::~SensorConfigImpl() { connection = nullptr; session = nullptr; }