Commit e14c491c authored by Axel Auweter's avatar Axel Auweter
Browse files

Refactoring stage 4:

* Extended DCDBConfig and SensorDataStore to provide functions for querying data
* Adapted dcdbquery accordingly
* Project now compiles again but dcdbquery likely to be broken
parent 1d5f1bfb
......@@ -17,6 +17,9 @@
#include <list>
#include "connection.h"
#include "timestamp.h"
#include "sensorid.h"
#include "cassandra.h"
#ifndef SENSORCONFIG_H
......@@ -40,6 +43,7 @@ typedef enum {
SC_OK,
SC_INVALIDSESSION,
SC_INVALIDPATTERN,
SC_UNKNOWNSENSOR,
SC_UNKNOWNERROR
} SCError;
......@@ -52,6 +56,10 @@ public:
SCError publishSensor(const char* publicName, const char* sensorPattern);
SCError getPublicSensors(std::list<DCDBPublicSensor>& publicSensors);
SCError getSensorPattern(std::string& pattern, std::string publicName);
SCError getSensorListForPattern(std::list<SensorId>& sensorIds, std::string pattern);
SCError getSensorListForPattern(std::list<SensorId>& sensorIds, std::string pattern, DCDBTimeStamp start, DCDBTimeStamp end);
SensorConfig(DCDBConnection* conn);
virtual ~SensorConfig();
};
......
......@@ -13,10 +13,12 @@
* DeviceLocation, and DeviceSensorId.
*/
#include <stdint.h>
#include <string>
#include <list>
#include <cstdint>
#include "sensorid.h"
#include "timestamp.h"
#include "connection.h"
#ifndef SENSORDATASTORE_H_
......@@ -25,6 +27,18 @@
/* Forward-declaration of the implementation-internal classes */
class SensorDataStoreImpl;
/**
* @brief SensorDataStoreReading is the class for a single
* sensor-timestamp-value entry in the database.
*/
class SensorDataStoreReading
{
public:
SensorId sensorId;
DCDBTimeStamp timeStamp;
uint64_t value;
};
/**
* @brief SensorDataStore is the class of the DCDBLib library
* to write and read sensor data.
......@@ -50,6 +64,16 @@ public:
*/
void setTTL(uint64_t ttl);
/**
* @brief This function querie a sensor's values in
* the given time range.
* @param result The list where the results will be stored.
* @param sid The SensorId to query.
* @param start Start of the time series.
* @param end End of the time series.
*/
void query(std::list<SensorDataStoreReading>& result, SensorId sid, DCDBTimeStamp start, DCDBTimeStamp end);
/**
* @brief A shortcut constructor for a SensorDataStore object
* that allows accessing the data store through a
......
......@@ -102,6 +102,13 @@ public:
*/
std::string serialize();
/**
* @brief This function matches the sensor against a
* sensor pattern string.
* @return Returns true if the sensor name matches the pattern, false otherwise.
*/
bool patternMatch(std::string pattern);
SensorId();
SensorId(std::string mqttTopic);
virtual ~SensorId();
......
......@@ -6,8 +6,14 @@
*/
#include <list>
#include <string>
#include "cassandra.h"
#include "connection.h"
#include "timestamp.h"
#include "sensorid.h"
#include "sensorconfig.h"
#ifndef SENSORCONFIG_INTERNAL_H
......@@ -22,9 +28,13 @@ protected:
bool validateSensorPattern(const char* sensorPattern);
public:
SCError publishSensor(const char* publicName, const char* sensorPattern);
SCError publishSensor(std::string publicName, std::string sensorPattern);
SCError getPublicSensors(std::list<DCDBPublicSensor>& publicSensors);
SCError getSensorPattern(std::string& pattern, std::string publicName);
SCError getSensorListForPattern(std::list<SensorId>& sensorIds, std::string pattern);
SCError getSensorListForPattern(std::list<SensorId>& sensorIds, std::string pattern, DCDBTimeStamp start, DCDBTimeStamp end);
SensorConfigImpl(DCDBConnection* conn);
virtual ~SensorConfigImpl();
};
......
......@@ -53,6 +53,16 @@ public:
*/
void setTTL(uint64_t ttl);
/**
* @brief This function querie a sensor's values in
* the given time range.
* @param result The list where the results will be stored.
* @param sid The SensorId to query.
* @param start Start of the time series.
* @param end End of the time series.
*/
void query(std::list<SensorDataStoreReading>& result, SensorId sid, DCDBTimeStamp start, DCDBTimeStamp end);
/**
* @brief This is the standard constructor of the SensorDataStoreImpl class.
* @param csb A CassandraBackend object to do the raw database access.
......
......@@ -7,11 +7,13 @@
#include <cstring>
#include <iostream>
#include <algorithm>
#include "cassandra.h"
#include "sensorconfig_internal.h"
#include "dcdbglobals.h"
#include "dcdbendian.h"
/*
* DCDBPublicSensor functions.
......@@ -46,6 +48,22 @@ SCError SensorConfig::getPublicSensors(std::list<DCDBPublicSensor>& publicSensor
return impl->getPublicSensors(publicSensors);
}
SCError SensorConfig::getSensorPattern(std::string& pattern, std::string publicName)
{
return impl->getSensorPattern(pattern, publicName);
}
SCError SensorConfig::getSensorListForPattern(std::list<SensorId>& sensorIds, std::string pattern)
{
return impl->getSensorListForPattern(sensorIds, pattern);
}
SCError SensorConfig::getSensorListForPattern(std::list<SensorId>& sensorIds, std::string pattern, DCDBTimeStamp start, DCDBTimeStamp end)
{
return impl->getSensorListForPattern(sensorIds, pattern, start, end);
}
SensorConfig::SensorConfig(DCDBConnection* conn)
{
/* Allocate impl object */
......@@ -141,10 +159,10 @@ bool SensorConfigImpl::validateSensorPattern(const char* sensorPattern)
/*
* SensorConfigImpl public functions
*/
SCError SensorConfigImpl::publishSensor(const char* publicName, const char* sensorPattern)
SCError SensorConfigImpl::publishSensor(std::string publicName, std::string sensorPattern)
{
/* Check if the pattern matches the requirements */
if (!validateSensorPattern(sensorPattern)) {
if (!validateSensorPattern(sensorPattern.c_str())) {
return SC_INVALIDPATTERN;
}
......@@ -176,8 +194,8 @@ SCError SensorConfigImpl::publishSensor(const char* publicName, const char* sens
statement = cass_prepared_bind(prepared);
cass_statement_bind_string_by_name(statement, "name", publicName);
cass_statement_bind_string_by_name(statement, "pattern", sensorPattern);
cass_statement_bind_string_by_name(statement, "name", publicName.c_str());
cass_statement_bind_string_by_name(statement, "pattern", sensorPattern.c_str());
future = cass_session_execute(session, statement);
cass_future_wait(future);
......@@ -207,11 +225,14 @@ SCError SensorConfigImpl::getPublicSensors(std::list<DCDBPublicSensor>& publicSe
return SC_INVALIDSESSION;
}
/* Clear the list */
publicSensors.clear();
/* Fill the list with all public sensors */
CassError rc = CASS_OK;
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
const char* query = "SELECT * FROM dcdb_config.publishedsensors;";
const char* query = "SELECT * FROM " CONFIG_KEYSPACE_NAME "." CF_PUBLISHEDSENSORS " ;";
statement = cass_statement_new(query, 0);
......@@ -269,6 +290,169 @@ SCError SensorConfigImpl::getPublicSensors(std::list<DCDBPublicSensor>& publicSe
return SC_OK;
}
SCError SensorConfigImpl::getSensorPattern(std::string& pattern, std::string publicName)
{
/* Check if the session is valid */
if (!session) {
return SC_INVALIDSESSION;
}
/* Read the Pattern string from the database */
CassError rc = CASS_OK;
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
const CassPrepared* prepared = nullptr;
const char* query = "SELECT pattern FROM " CONFIG_KEYSPACE_NAME "." CF_PUBLISHEDSENSORS " WHERE name = ? ;";
future = cass_session_prepare(session, query);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
} else {
prepared = cass_future_get_prepared(future);
}
cass_future_free(future);
statement = cass_prepared_bind(prepared);
cass_statement_bind_string_by_name(statement, "name", publicName.c_str());
future = cass_session_execute(session, statement);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
} else {
const CassResult* result = cass_future_get_result(future);
CassIterator* iterator = cass_iterator_from_result(result);
if (cass_iterator_next(iterator)) {
const char* pattern_cstr;
size_t pattern_len;
const CassRow* row = cass_iterator_get_row(iterator);
cass_value_get_string(cass_row_get_column_by_name(row, "pattern"), &pattern_cstr, &pattern_len);
pattern = std::string(pattern_cstr, pattern_len);
}
else {
return SC_UNKNOWNSENSOR;
}
cass_result_free(result);
cass_iterator_free(iterator);
}
cass_future_free(future);
cass_statement_free(statement);
cass_prepared_free(prepared);
return SC_OK;
}
SCError SensorConfigImpl::getSensorListForPattern(std::list<SensorId>& sensorIds, std::string pattern)
{
/* Tiny hack to call the long version of this function */
DCDBTimeStamp start(0x0);
DCDBTimeStamp end(0x260DD31906D70000);
return getSensorListForPattern(sensorIds, pattern, start, end);
}
SCError SensorConfigImpl::getSensorListForPattern(std::list<SensorId>& sensorIds, std::string pattern, DCDBTimeStamp start, DCDBTimeStamp end)
{
/* Clear the list of sensorIds */
sensorIds.clear();
/* Strip all slashes from publishedSensorName */
pattern.erase(std::remove(pattern.begin(), pattern.end(), '/'), pattern.end());
/* Calculate lower and upper boundaries for the expansion of the pattern */
std::string low = pattern;
std::string high = pattern;
if (pattern.find("*") != std::string::npos) {
low.replace(pattern.find("*"), 1, 33-pattern.length(), '0');
high.replace(pattern.find("*"), 1, 33-pattern.length(), 'F');
}
SensorId lowId, highId;
if (!lowId.mqttTopicConvert(low)) {
return SC_INVALIDPATTERN;
}
if (!highId.mqttTopicConvert(high)) {
return SC_INVALIDPATTERN;
}
lowId.setRsvd(start.getWeekstamp());
highId.setRsvd(end.getWeekstamp());
low = lowId.serialize();
high = highId.serialize();
// std::cout << "Lower boundary for sensor scan: " << std::hex << std::setfill('0') << std::setw(16) << lowId.raw[0] << " " << std::hex << std::setfill('0') << std::setw(16) << lowId.raw[1] << std::endl;
// std::cout << "Upper boundary for sensor scan: " << std::hex << std::setfill('0') << std::setw(16) << highId.raw[0] << " " << std::hex << std::setfill('0') << std::setw(16) << highId.raw[1] << std::endl;
/* Query the database to see which raw sensors actually exist in the interval between low and high */
CassError rc = CASS_OK;
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
const CassPrepared* prepared = nullptr;
const char* query = "SELECT DISTINCT sid FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE TOKEN(sid) >= ? and TOKEN(sid) <= ?;";
future = cass_session_prepare(session, query);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
return SC_UNKNOWNERROR;
}
prepared = cass_future_get_prepared(future);
cass_future_free(future);
statement = cass_prepared_bind(prepared);
cass_statement_bind_bytes(statement, 0, (const cass_byte_t*)(low.c_str()), 16);
cass_statement_bind_bytes(statement, 1, (const cass_byte_t*)(high.c_str()), 16);
future = cass_session_execute(session, statement);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
return SC_UNKNOWNERROR;
}
const CassResult* result = cass_future_get_result(future);
cass_future_free(future);
CassIterator* iterator = cass_iterator_from_result(result);
while (cass_iterator_next(iterator)) {
const CassRow* row = cass_iterator_get_row(iterator);
const cass_byte_t* res;
size_t res_len;
cass_value_get_bytes(cass_row_get_column_by_name(row, "sid"), &res, &res_len);
uint64_t raw[2];
raw[0] = Endian::BEToHost(((uint64_t*)res)[0]);
raw[1] = Endian::BEToHost(((uint64_t*)res)[1]);
SensorId sensor;
sensor.setRaw(raw);
/* Check if the sensorId matches the pattern and append to result */
if (sensor.patternMatch(pattern)) {
sensorIds.push_back(sensor);
}
}
cass_result_free(result);
cass_iterator_free(iterator);
cass_statement_free(statement);
cass_prepared_free(prepared);
return SC_OK;
}
SensorConfigImpl::SensorConfigImpl(DCDBConnection* conn)
{
connection = conn;
......
......@@ -126,7 +126,6 @@ void SensorDataStoreImpl::insert(SensorId* sid, uint64_t ts, uint64_t value)
cass_statement_bind_int64_by_name(statement, "value", value);
future = cass_session_execute(session, statement);
cass_future_wait(future);
rc = cass_future_error_code(future);
......@@ -148,6 +147,82 @@ void SensorDataStoreImpl::setTTL(uint64_t ttl)
prepareInsert(ttl);
}
/**
* @details
* This function issues a regular query to the data store
* and creates a SensorDataStoreReading object for each
* entry which is stores in the result list.
*/
void SensorDataStoreImpl::query(std::list<SensorDataStoreReading>& result, SensorId sid, DCDBTimeStamp start, DCDBTimeStamp end)
{
CassError rc = CASS_OK;
CassStatement* statement = NULL;
CassFuture *future = NULL;
const CassPrepared* prepared = nullptr;
const char* query = "SELECT * FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE sid = ? AND ts >= ? AND ts <= ? ;";
future = cass_session_prepare(session, query);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
cass_future_free(future);
return;
}
prepared = cass_future_get_prepared(future);
cass_future_free(future);
std::string key = sid.serialize();
statement = cass_prepared_bind(prepared);
cass_statement_bind_bytes(statement, 0, (const cass_byte_t*)(key.c_str()), 16);
cass_statement_bind_int64(statement, 1, start.getRaw());
cass_statement_bind_int64(statement, 2, end.getRaw());
future = cass_session_execute(session, statement);
cass_future_wait(future);
if (cass_future_error_code(future) == CASS_OK) {
const CassResult* cresult = cass_future_get_result(future);
CassIterator* rows = cass_iterator_from_result(cresult);
while (cass_iterator_next(rows)) {
const CassRow* row = cass_iterator_get_row(rows);
SensorDataStoreReading entry;
cass_int64_t ts, value;
cass_value_get_int64(cass_row_get_column_by_name(row, "ts"), &ts);
cass_value_get_int64(cass_row_get_column_by_name(row, "value"), &value);
entry.sensorId = sid;
entry.timeStamp = DCDBTimeStamp((uint64_t)ts);
entry.value = (uint64_t)value;
result.push_back(entry);
#if 0
if (localtime) {
t.convertToLocal();
}
if (raw) {
std::cout << sensorName << "," << std::dec << t.getRaw() << "," << std::dec << value << std::endl;
}
else {
std::cout << sensorName << "," << t.getString() << "," << std::dec << value << std::endl;
}
#endif
}
cass_iterator_free(rows);
cass_result_free(cresult);
}
cass_statement_free(statement);
cass_future_free(future);
cass_prepared_free(prepared);
}
/**
* @details
* This constructor sets the internal connection variable to
......@@ -198,6 +273,17 @@ void SensorDataStore::setTTL(uint64_t ttl)
impl->setTTL(ttl);
}
/**
* @details
* Instead of doing the actual work, this function simply
* forwards to the insert function of the SensorDataStoreImpl
* class.
*/
void SensorDataStore::query(std::list<SensorDataStoreReading>& result, SensorId sid, DCDBTimeStamp start, DCDBTimeStamp end)
{
impl->query(result, sid, start, end);
}
/**
* @details
* This constructor allocates the implementation class which
......
......@@ -5,10 +5,18 @@
* Author: Axel Auweter
*/
#include "sensorid.h"
#include <string>
#include <algorithm>
#include <cstdint>
#include <cinttypes>
#include <boost/algorithm/string.hpp>
#include "dcdbendian.h"
#include "sensorid.h"
/*
* Public Functions
*/
......@@ -122,6 +130,53 @@ std::string SensorId::serialize()
return std::string((char*)ll, 16);
}
/**
* @details
* This function strips all slashes from the pattern string and
* then compares the sensor and the pattern character by
* character, skipping over the possible wildcard in the pattern.
*/
bool SensorId::patternMatch(std::string pattern)
{
/* Strip all slashes from pattern */
pattern.erase(std::remove(pattern.begin(), pattern.end(), '/'), pattern.end());
/* Convert to lower case */
boost::algorithm::to_lower(pattern);
/* Calculate the wildcard length */
int wl = (pattern.find("*") != std::string::npos) ? 33 - pattern.length() : 0;
/* Do a character by character comparison */
int posP = 0;
int posS = 0;
while (posS < 32) {
char p, cs[2];
uint64_t s;
p = pattern.c_str()[posP];
s = data.raw[posS / 16];
s >>= (60-((4*posS)%64));
s &= 0xf;
snprintf(cs, 2, "%" PRIx64, s);
if (p == '*') {
/* Jump over the wildcard */
posS += wl;
posP++;
continue;
}
else if ((posS >= 24 && posS <= 27) || (p==cs[0])) {
posS++;
posP++;
continue;
}
else {
return false;
}
}
return true;
}
SensorId::SensorId()
{
/* Initialize to zeros */
......
include ../../config.mk
CXXFLAGS = -O0 -ggdb --std=c++11 -Wall -Wno-unused-local-typedefs -Wno-unknown-warning-option -fmessage-length=0 -I$(DCDBDEPLOYPATH)/include/ -I$(DCDBBASEPATH)/include/ -DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG
OBJS = dcdbquery.o query.o casshelper.o
OBJS = dcdbquery.o query.o
LIBS = -L$(DCDBDEPLOYPATH)/lib/ -ldcdb -lcassandra -luv -lboost_random -lboost_system -lboost_date_time -lssl -lcrypto
TARGET = dcdbquery
......
/*
* casshelper.cpp
*
* Created on: Jan 25, 2015
* Author: Axel Auweter
*/
#include "casshelper.h"
void CassHelper::print_error(CassFuture* future) {
CassString message = cass_future_error_message(future);
fprintf(stderr, "Error: %.*s\n", (int)message.length, message.data);
}
CassCluster* CassHelper::create_cluster(const char* hostname) {
/* Set loglevel to error only (otherwise we'll get a warning on the TOKEN() queries) */
cass_log_set_level(CASS_LOG_ERROR);
CassCluster* cluster = cass_cluster_new();
cass_cluster_set_contact_points(cluster, hostname);
/* Force protocol version 1 */
cass_cluster_set_protocol_version(cluster, 1);
return cluster;
}
CassError CassHelper::connect_session(CassSession* session, const CassCluster* cluster) {
CassError rc = CASS_OK;
CassFuture* future = cass_session_connect(session, cluster);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
print_error(future);
}
cass_future_free(future);