Commit 79fb6f20 authored by Axel Auweter's avatar Axel Auweter
Browse files

DCDBQuery w.i.p.

parent 515f7196
...@@ -5,7 +5,7 @@ OBJS = collectagent.o \ ...@@ -5,7 +5,7 @@ OBJS = collectagent.o \
simplemqttserver.o \ simplemqttserver.o \
simplemqttserverthread.o \ simplemqttserverthread.o \
simplemqttservermessage.o simplemqttservermessage.o
LIBS = -L$(DCDBDEPLOYPATH)/lib/ -ldcdb -lpthread -lcassandra -luv -lboost_system -lboost_random -lboost_thread LIBS = -L$(DCDBDEPLOYPATH)/lib/ -ldcdb -lpthread -lcassandra -luv -lboost_system -lboost_random -lboost_thread -lboost_date_time
TARGET = collectagent TARGET = collectagent
.PHONY : clean install .PHONY : clean install
......
include ../config.mk include ../config.mk
# C++ Compiler Flags (use fPIC for our dynamic library) # C++ Compiler Flags (use fPIC for our dynamic library)
CXXFLAGS = -O0 -g -Wall -Werror -Wno-unused-local-typedefs -Wno-unknown-warning-option\ CXXFLAGS = -O0 -ggdb -Wall -Werror -Wno-unused-local-typedefs -Wno-unknown-warning-option\
-fPIC --std=c++11 -I$(DCDBDEPLOYPATH)/include -I./include -I./include_internal\ -fPIC --std=c++11 -I$(DCDBDEPLOYPATH)/include -I./include -I./include_internal\
-I$(DCDBBASEPATH)/include/ -fmessage-length=0 -I$(DCDBBASEPATH)/include/ -fmessage-length=0
# List of object files to build and the derived list of corresponding source files # List of object files to build and the derived list of corresponding source files
OBJS = src/sensordatastore.o \ OBJS = src/sensordatastore.o \
src/cassandraBackend.o src/cassandraBackend.o \
src/timestamp.o
# List of public header files necessary to use this libray # List of public header files necessary to use this libray
PUBHEADERS = $(shell find include -type f -iname "*.h") PUBHEADERS = $(shell find include -type f -iname "*.h")
...@@ -16,7 +17,7 @@ PUBHEADERS = $(shell find include -type f -iname "*.h") ...@@ -16,7 +17,7 @@ PUBHEADERS = $(shell find include -type f -iname "*.h")
PRIVHEADERS = $(shell find include_internal -type f -iname "*.h") PRIVHEADERS = $(shell find include_internal -type f -iname "*.h")
# External libraries to link against # External libraries to link against
LIBS = -L$(DCDBDEPLOYPATH)/lib/ -lcassandra -lboost_random -lboost_system -luv LIBS = -L$(DCDBDEPLOYPATH)/lib/ -lcassandra -lboost_random -lboost_system -lboost_date_time -luv
# Dynamic library building differs between Linux/BSD and MacOS # Dynamic library building differs between Linux/BSD and MacOS
OS = $(shell uname) OS = $(shell uname)
......
/*
* dcdbtimestamp.h
*
* Created on: Feb 18, 2015
* Author: Axel Auweter
*/
/**
* @file
* @brief This file is a companion to the sensordatastore API.
* It contains the DCDBTimeStamp class definition that helps in
* creating and modifying timestamps in the SensorDataStore.
*/
#include <cstdint>
#include <string>
#include <stdexcept>
#ifndef DCDBTIMESTAMP_H
#define DCDBTIMESTAMP_H
class DCDBTimeStampConversionException : public std::runtime_error
{
public:
DCDBTimeStampConversionException():runtime_error("Time stamp conversion error.") {}
};
/**
* @brief The DCDBTimeStamp class contains a single TimeStamp.
*/
class DCDBTimeStamp
{
protected:
uint64_t raw; /**< The raw timestamp data (nanoseconds since Unix Epoch) */
/**
* @brief Parses a string and tries to derive the time from it by
* guessing the format. Throws DCDBTimeStampException on failure.
* @param timestr A string containing a representation of time
*/
void guessFromString(std::string timestr);
public:
/**
* @brief Standard constructor. Initializes the object with the current time.
*/
DCDBTimeStamp();
/**
* @brief Raw constructor. Initializes the object with an existing raw time.
*/
DCDBTimeStamp(uint64_t ts);
/**
* @brief String constructor. Initializes the object by best guess from a time string.
*/
DCDBTimeStamp(std::string ts);
/**
* @brief Standard destructor.
*/
virtual ~DCDBTimeStamp();
/**
* @brief Sets the object's value to the current time.
*/
void setNow(void);
/**
* @brief Returns the raw time stamp value.
* @return The object's value as uint64_t.
*/
uint64_t getRaw(void);
/* Overloaded operators (compare raw values) */
inline bool operator == (const DCDBTimeStamp& rhs) const {return raw == rhs.raw;}
inline bool operator != (const DCDBTimeStamp& rhs) const {return raw != rhs.raw;}
inline bool operator < (const DCDBTimeStamp& rhs) const {return raw < rhs.raw;}
inline bool operator > (const DCDBTimeStamp& rhs) const {return raw > rhs.raw;}
inline bool operator <= (const DCDBTimeStamp& rhs) const {return raw <= rhs.raw;}
inline bool operator >= (const DCDBTimeStamp& rhs) const {return raw >= rhs.raw;}
};
#endif /* DCDBTIMESTAMP_H */
...@@ -125,7 +125,7 @@ void SensorDataStoreImpl::init(std::string hostname, int port) { ...@@ -125,7 +125,7 @@ void SensorDataStoreImpl::init(std::string hostname, int port) {
if (!csBackend->existsColumnFamily(CF_SENSORALIASES)) { if (!csBackend->existsColumnFamily(CF_SENSORALIASES)) {
std::cout << "Creating Column Familiy " CF_SENSORALIASES "...\n"; std::cout << "Creating Column Familiy " CF_SENSORALIASES "...\n";
csBackend->createColumnFamily(CF_SENSORALIASES, csBackend->createColumnFamily(CF_SENSORALIASES,
"name varchar, pattern varchar", "name varchar, pattern varchar, scaling_factor double, unit varchar",
"name", "name",
"COMPACT STORAGE AND CACHING = all"); "COMPACT STORAGE AND CACHING = all");
} }
...@@ -168,7 +168,7 @@ void SensorDataStoreImpl::init(std::string hostname, int port) { ...@@ -168,7 +168,7 @@ void SensorDataStoreImpl::init(std::string hostname, int port) {
*/ */
void SensorDataStoreImpl::insert(SensorId* sid, uint64_t ts, uint64_t value) void SensorDataStoreImpl::insert(SensorId* sid, uint64_t ts, uint64_t value)
{ {
#if 0 #if 1
std::cout << "Inserting@SensorDataStoreImpl (" << sid->raw[0] << " " << sid->raw[1] << ", " << ts << ", " << value << ")" << std::endl; std::cout << "Inserting@SensorDataStoreImpl (" << sid->raw[0] << " " << sid->raw[1] << ", " << ts << ", " << value << ")" << std::endl;
#endif #endif
......
/*
* dcdbtimestamp.cpp
*
* Created on: Feb 18, 2015
* Author: Axel Auweter
*/
#include <cstring>
#include <cinttypes>
#include "boost/date_time/posix_time/posix_time.hpp"
#include "timestamp.h"
/**
* This function parses a string and tries to do a best guess at the contained
* time information. Currently, it detects strings in the format "yyyy-mm-dd hh:mm:ss.000"
* and posix time.
*/
void DCDBTimeStamp::guessFromString(std::string timestr)
{
boost::posix_time::ptime epoch(boost::gregorian::date(1970,1,1));
/* First try to match it against a time string */
try {
boost::posix_time::ptime ts(boost::posix_time::time_from_string(timestr));
if (ts != boost::posix_time::not_a_date_time) {
boost::posix_time::time_duration diff = ts - epoch;
raw = diff.total_nanoseconds();
return;
}
}
catch (std::exception& e) {
/* Ignore on error */
}
/* Try to match it against a POSIX time */
uint64_t tmp;
if (sscanf(timestr.c_str(), "%" PRIu64, &tmp) == 1) {
raw = tmp * 1000 * 1000 * 1000;
return;
}
/* Not successful - throw exception */
throw DCDBTimeStampConversionException();
}
/**
* This constructor is implemented by calling setNow().
*/
DCDBTimeStamp::DCDBTimeStamp()
{
setNow();
}
/**
* This constructor sets the internal raw value directly to the supplied argument.
*/
DCDBTimeStamp::DCDBTimeStamp(uint64_t ts)
{
raw = ts;
}
/**
* This constructor sets the time using the magic implemented in guessFromString.
*/
DCDBTimeStamp::DCDBTimeStamp(std::string ts)
{
guessFromString(ts);
}
/**
* Currently, the destructor doesn't need to do anything.
*/
DCDBTimeStamp::~DCDBTimeStamp()
{
}
/**
* This function sets the value of raw to the nanoseconds since epoch
* using some magic help of boost::posix_time.
*/
void DCDBTimeStamp::setNow(void)
{
boost::posix_time::ptime epoch(boost::gregorian::date(1970,1,1));
boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
boost::posix_time::time_duration diff = now - epoch;
raw = diff.total_nanoseconds();
}
/**
*
*/
uint64_t DCDBTimeStamp::getRaw(void)
{
return raw;
}
include ../../config.mk include ../../config.mk
CXXFLAGS = -O0 -g --std=c++11 -Wall -Wno-unused-local-typedefs -Wno-unknown-warning-option -fmessage-length=0 -I$(DCDBDEPLOYPATH)/include/ -I$(DCDBBASEPATH)/include/ CXXFLAGS = -O0 -ggdb --std=c++11 -Wall -Wno-unused-local-typedefs -Wno-unknown-warning-option -fmessage-length=0 -I$(DCDBDEPLOYPATH)/include/ -I$(DCDBBASEPATH)/include/
OBJS = dcdbquery.o OBJS = dcdbquery.o query.o casshelper.o
LIBS = -L$(DCDBDEPLOYPATH)/lib/ -ldcdb LIBS = -L$(DCDBDEPLOYPATH)/lib/ -ldcdb -lcassandra -luv -lboost_random -lboost_system -lboost_date_time
TARGET = dcdbquery TARGET = dcdbquery
.PHONY : clean install .PHONY : clean install
......
/*
* casshelper.cpp
*
* Created on: Jan 25, 2015
* Author: Axel Auweter
*/
#include "casshelper.h"
void CassHelper::print_error(CassFuture* future) {
CassString message = cass_future_error_message(future);
fprintf(stderr, "Error: %.*s\n", (int)message.length, message.data);
}
CassCluster* CassHelper::create_cluster(const char* hostname) {
CassCluster* cluster = cass_cluster_new();
cass_cluster_set_contact_points(cluster, hostname);
/* Force protocol version 1 */
cass_cluster_set_protocol_version(cluster, 1);
return cluster;
}
CassError CassHelper::connect_session(CassSession* session, const CassCluster* cluster) {
CassError rc = CASS_OK;
CassFuture* future = cass_session_connect(session, cluster);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
print_error(future);
}
cass_future_free(future);
return rc;
}
CassError CassHelper::execute_query(CassSession* session, const char* query) {
CassError rc = CASS_OK;
CassFuture* future = NULL;
CassStatement* statement = cass_statement_new(cass_string_init(query), 0);
future = cass_session_execute(session, statement);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
print_error(future);
}
cass_future_free(future);
cass_statement_free(statement);
return rc;
}
/*
* casshelper.h
*
* Created on: Jan 25, 2015
* Author: Axel Auweter
*/
/*
* FIXME: These routines should be put into DCDBLib with nice interfaces.
* As of now, it is mostly a rip of the DataStax cpp-driver examples.
*/
#include "cassandra.h"
#include <cstdio>
#ifndef CASSHELPER_H
#define CASSHELPER_H
class CassHelper
{
public:
static void print_error(CassFuture* future);
static CassCluster* create_cluster(const char* hostname);
static CassError connect_session(CassSession* session, const CassCluster* cluster);
static CassError execute_query(CassSession* session, const char* query);
};
#endif /* CASSHELPER_H */
...@@ -5,11 +5,79 @@ ...@@ -5,11 +5,79 @@
* Author: Axel Auweter * Author: Axel Auweter
*/ */
/* C++ standard headers */
#include <iostream> #include <iostream>
#include <stdexcept>
int main(void) /* C standard headers */
#include <cstdlib>
#include <unistd.h>
/* Custom headers */
#include "dcdb/timestamp.h"
#include "query.h"
void usage(void)
{
/* 0---------1---------2---------3---------4---------5---------6---------7--------- */
std::cout << "Usage:" << std::endl
<< "dcdbquery [-h <hostname>] <Sensor 1> [<Sensor 2> ...] <Start> <End>" << std::endl
<< "where" << std::endl
<< " <hostname> - the name of the DB server"
<< " <Sensor n> - a sensor name alias" << std::endl
<< " <Start> - start of time series" << std::endl
<< " <End> - end of time series" << std::endl
<< "Return the readings for the sensors in the interval from <Start> to <End>." << std::endl
<< "<Start> and <End> times should be supplied as 'yyyy-mm-dd hh:mm:ss' or unix" << std::endl
<< "timestamps." << std::endl;
exit(EXIT_SUCCESS);
}
int main(int argc, char* argv[])
{ {
std::cout << "Hello, world\n"; /* Check command line arguments */
if (argc <= 3) {
usage();
}
/* Get the hostname */
char ret;
const char *host = "localhost";
while ((ret=getopt(argc, argv, "+h:"))!=EOF) {
switch(ret) {
case 'h':
host = optarg;
break;
default:
usage();
exit(EXIT_FAILURE);
}
}
/* Try to create DCDBTimeStamp objects from the arguments */
DCDBTimeStamp start, end;
try {
start = DCDBTimeStamp(argv[argc-2]);
end = DCDBTimeStamp(argv[argc-1]);
}
catch (std::exception& e) {
std::cout << "Wrong time format." << std::endl;
exit(EXIT_FAILURE);
}
/* Ensure start < end */
if(start >= end) {
std::cout << "Start time must be earlier than end time." << std::endl;
exit(EXIT_FAILURE);
}
/* Build a list of sensornames */
std::list<std::string> sensors;
for (int arg = optind; arg < argc-2; arg++) {
sensors.push_back(argv[arg]);
}
DCDBQuery::doQuery(host, sensors, start, end);
return 0; return 0;
} }
/*
* query.cpp
*
* Created on: Feb 19, 2015
* Author: Axel Auweter
*/
#include <iostream>
#include <list>
#include <string>
#include <algorithm>
#include <cstdlib>
#include "query.h"
#include "casshelper.h"
/* Lovely spaghetti code coming up next. Be aware... */
void DCDBQuery::doQuery(const char* hostname, std::list<std::string> sensors, DCDBTimeStamp start, DCDBTimeStamp end)
{
/* Connect to db */
CassCluster *cluster = CassHelper::create_cluster(hostname);
CassSession* session = cass_session_new();
if (CassHelper::connect_session(session, cluster) != CASS_OK) {
cass_session_free(session);
cass_cluster_free(cluster);
std::cout << "Cannot connect to Cassandra database." << std::endl;
exit(EXIT_FAILURE);
}
/* Iterate over sensors */
for (std::list<std::string>::iterator it = sensors.begin(); it != sensors.end(); it++) {
/* Lookup the sensor in the alias table */
std::string aliasPattern;
lookupAlias(session, *it, aliasPattern);
std::cout << "Found alias: " << aliasPattern << std::endl;
std::list<SensorId> sidList;
expandAlias(session, aliasPattern, sidList);
}
/* Clean up */
cass_session_free(session);
cass_cluster_free(cluster);
}
void DCDBQuery::lookupAlias(CassSession* session, std::string name, std::string& pattern)
{
CassError rc = CASS_OK;
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
const CassPrepared* prepared = nullptr;
CassString query = cass_string_init("SELECT pattern FROM dcdb_config.sensoralias WHERE name = ? ;");
CassString pattern_cstr;
future = cass_session_prepare(session, query);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
CassHelper::print_error(future);
} else {
prepared = cass_future_get_prepared(future);
}
statement = cass_prepared_bind(prepared);
cass_statement_bind_string_by_name(statement, "name", cass_string_init(name.c_str()));
future = cass_session_execute(session, statement);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
CassHelper::print_error(future);
} else {
const CassResult* result = cass_future_get_result(future);
CassIterator* iterator = cass_iterator_from_result(result);
if (cass_iterator_next(iterator)) {
const CassRow* row = cass_iterator_get_row(iterator);
cass_value_get_string(cass_row_get_column_by_name(row, "pattern"), &pattern_cstr);
pattern = std::string(pattern_cstr.data, pattern_cstr.length);
}
else {
std::cout << "Unknown sensor: " << name << std::endl;
exit(EXIT_FAILURE);
}
cass_result_free(result);
cass_iterator_free(iterator);
}
}
void DCDBQuery::expandAlias(CassSession* session, std::string aliasName, std::list<SensorId>& sensorIds)
{
/* Clear the list of sensorIds */
sensorIds.clear();
/* Strip all slashes from aliasName */
aliasName.erase(std::remove(aliasName.begin(), aliasName.end(), '/'), aliasName.end());
/* Calculate lower and upper boundaries for the expansion of the alias */
std::string low = aliasName;
std::string high = aliasName;
if (aliasName.find("*") != std::string::npos) {
low.replace(aliasName.find("*"), 1, 33-aliasName.length(), '0');
high.replace(aliasName.find("*"), 1, 33-aliasName.length(), 'F');
}
/* Query the database to see which raw sensors actually exist in the interval between low and high */
/*
CassError rc = CASS_OK;
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
const CassPrepared* prepared = nullptr;
CassString query = cass_string_init("SELECT DISTINCT sid FROM dcdb.sensordata WHERE key(sid) >= ? and key(sid) <= ?;");
CassString pattern_cstr;
future = cass_session_prepare(session, query);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
CassHelper::print_error(future);
} else {
prepared = cass_future_get_prepared(future);
}
statement = cass_prepared_bind(prepared);
cass_statement_bind_string_by_name(statement, "name", cass_string_init(name.c_str()));
future = cass_session_execute(session, statement);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
CassHelper::print_error(future);
} else {
const CassResult* result = cass_future_get_result(future);
CassIterator* iterator = cass_iterator_from_result(result);
if (cass_iterator_next(iterator)) {
const CassRow* row = cass_iterator_get_row(iterator);
cass_value_get_string(cass_row_get_column_by_name(row, "pattern"), &pattern_cstr);
pattern = std::string(pattern_cstr.data, pattern_cstr.length);
}
else {
std::cout << "Unknown sensor: " << name << std::endl;
exit(EXIT_FAILURE);
}
cass_result_free(result);
cass_iterator_free(iterator);
}
*/
}