Commit 60481e0b authored by Axel Auweter's avatar Axel Auweter
Browse files

First round of refactoring of DCDBLib:

* Bumped Cassandra to 2.1.5
* Bumped cpp-driver to 2.0
* Eliminated cassandraBackend in favor of DCDBConnection

In the future, a DCDBConnection handles the low-level connection
to Cassandra. On top of an established DCDBConnection, you can
create a SensorDataStore object which gives you access to raw
sensor readings.
parent 64e696df
......@@ -13,6 +13,7 @@
#include <boost/date_time/posix_time/posix_time.hpp>
#include <dcdb/connection.h>
#include <dcdb/sensordatastore.h>
#include "simplemqttserver.h"
......@@ -175,7 +176,28 @@ int main(int argc, char* const argv[]) {
}
/*
* Allocate and initialize sensor data store.
* Allocate and initialize connection to Cassandra.
*/
std::string sdHost(cassandraHost);
DCDBConnection dcdbConn(sdHost, 9042);
if (!dcdbConn.connect()) {
std::cout << "Cannot connect to Cassandra!" << std::endl;
exit(EXIT_FAILURE);
}
/*
* Legacy behavior: Initialize the DCDB schema in Cassandra.
*/
dcdbConn.initSchema();
/*
* Allocate the SensorDataStore.
*/
mySensorDataStore = new SensorDataStore(&dcdbConn);
/*
* Set TTL for data store inserts if TTL > 0.
*/
uint64_t ttlInt;
std::istringstream ttlParser(ttl);
......@@ -183,8 +205,9 @@ int main(int argc, char* const argv[]) {
std::cout << "Invalid TTL!" << std::endl;
exit(EXIT_FAILURE);
}
std::string sdHost(cassandraHost);
mySensorDataStore = new SensorDataStore(sdHost, 9042, ttlInt);
if (ttlInt) {
mySensorDataStore->setTTL(ttlInt);
}
/*
* Start the MQTT Message Server.
......@@ -220,6 +243,7 @@ int main(int argc, char* const argv[]) {
ms.stop();
delete mySensorDataStore;
dcdbConn.disconnect();
}
catch (const exception& e) {
cout << "Exception: " << e.what() << "\n";
......
......@@ -6,8 +6,8 @@ CXXFLAGS = -O2 -ggdb -Wall -Werror -Wno-unused-local-typedefs -Wno-unknown-warni
-I$(DCDBBASEPATH)/include/ -fmessage-length=0 -DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG
# List of object files to build and the derived list of corresponding source files
OBJS = src/sensordatastore.o \
src/cassandraBackend.o \
OBJS = src/connection.o \
src/sensordatastore.o \
src/timestamp.o
# List of public header files necessary to use this libray
......
/*
* connection.h
*
* Created on: May 18, 2015
* Author: Axel Auweter
*/
/**
* @file
* @brief This file contains parts of the public API for the
* DCDBLib library.
* It contains the class definition of the DCDBConnection class,
* that handles connections to the data store and schema
* initialization.
*/
#include <string>
#include <cstdint>
#include "cassandra.h"
#ifndef CONNECTION_H
#define CONNECTION_H
/* Forward-declaration of the implementation-internal classes */
class DCDBConnectionImpl;
class DCDBConnection
{
private:
DCDBConnectionImpl* impl; /**< The object which implements the core functionality of this class */
public:
/**
* @brief This function prints Cassandra CQL-specific
* error messages from a CassFuture object.
* @param future The future object which caused the error.
*
* @details
* Once a request to Cassandra completes, we get a CassFuture
* object which holds the return to our request. In case the
* request caused an error, this function prints a human-readable
* error message.
*/
void printError(CassFuture* future);
/**
* @brief Set the hostname for the connection.
* @param hostname Hostname of a Cassandra front end node.
*/
void setHostname(std::string hostname);
/**
* @brief Return the current hostname of the connection.
* @return The hostname of a Cassandra front end node to which this object will connect.
*/
std::string getHostname();
/**
* @brief Set the port for the connection.
* @param port Port on which the Cassandra front end node accepts CQL native protocol clients.
*/
void setPort(uint16_t port);
/**
* @brief Return the current port of the connection.
* @return The port on which the Cassandra front end node accepts CQL native protocol clients.
*/
uint16_t getPort();
/**
* @brief Establish a connection to the Cassandra database.
* @return True if the connection was successfully established, false otherwise.
*/
bool connect();
/**
* @brief Disconnect an existing connection to the Cassandra database.
*/
void disconnect();
/**
* @brief Get the session handle of the connection.
* @return Pointer to this connection's session object.
*/
CassSession* getSessionHandle();
/**
* @brief This function executes a simple raw CQL query.
* @param query The CQL query string.
* @return Returns a CassError type to indicate success or failure
*/
CassError executeSimpleQuery(std::string query);
/**
* @brief Initialize the database schema for DCDB.
* @return True if the schema was successfully initialized.
*/
bool initSchema();
DCDBConnection();
DCDBConnection(std::string hostname, uint16_t port);
virtual ~DCDBConnection();
};
#endif /* CONNECTION_H */
......@@ -16,6 +16,8 @@
#include <stdint.h>
#include <string>
#include "connection.h"
#ifndef SENSORDATASTORE_H_
#define SENSORDATASTORE_H_
......@@ -97,16 +99,6 @@ private:
CassandraBackend* csBackend;
public:
/**
* @brief This function initializes the SensorDataStore object,
* sets the connection parameters and established a
* connection to the database.
* @param hostname The hostname or IP address of the database node to connect to.
* @param port The port on which the database node is listening.
* @param ttl The TTL for data inserted into the data store (set to 0 for unlimited)
*/
void init(std::string hostname, int port, uint64_t ttl);
/**
* @brief This function populates a preallocated SensorId object
* from a MQTT topic string.
......@@ -126,19 +118,17 @@ public:
void insert(SensorId* sid, uint64_t ts, uint64_t value);
/**
* @brief A shortcut constructor for a SensorDataStore object
* that automatically initializes with default parameters
* for hostname and port.
* @brief Set the TTL for newly inserted sensor data.
* @param ttl The TTL for the sensor data in seconds.
*/
SensorDataStore();
void setTTL(uint64_t ttl);
/**
* @brief The standard constructor for a SensorDataStore object.
* @param hostname A string containing the hostname or IP address of the database server.
* @param port A integer containing the port number on which the database server is listening.
* @param ttl A uint64_t containing the TTL for inserted data items.
* @brief A shortcut constructor for a SensorDataStore object
* that allows accessing the data store through a
* connection that is already established.
*/
SensorDataStore(std::string hostname, int port, uint64_t ttl);
SensorDataStore(DCDBConnection* conn);
/**
* @brief The standard destructor for a SensorDatStore object.
......
/*
* cassandraBackend.h
* connection_internal.h
*
* Created on: Apr 08, 2013
* Created on: May 18, 2015
* Author: Axel Auweter
*/
/*
* @file
* @brief This file defines the CassandraBackend class and related
* things that provide a low-level abstraction to the
* Cassandra key-value store.
* @brief This file contains the internal functions of the
* DCDBConnection which are provided by the
* DCDBConnectionImpl class.
*/
#ifndef CASSANDRA_BACKEND_H
#define CASSANDRA_BACKEND_H
#include <string>
#include <cstdint>
#include <dcdbendian.h>
#include "connection.h"
#include "cassandra.h"
#ifndef CONNECTION_INTERNAL_H
#define CONNECTION_INTERNAL_H
/**
* @brief The CassandraBackend class provides low-level
* access to the Cassandra key-value store using
* the CQL API.
*/
class CassandraBackend
class DCDBConnectionImpl
{
protected:
std::string hostname_; /**< The hostname of a DB front-end node. */
uint16_t port_; /**< The port of the DB front-end node. */
bool connected; /**< Indicates whether a connection has been established. */
CassCluster* cluster; /**< The Cassandra Cluster object (contains hostname, port, etc) */
CassSession* session; /**< The session object through which we communicate with C* once the connection is established */
const CassSchema* schema; /**< The schema object containing the current database schema information */
const CassSchema* schema; /**< The schema object containing the current database schema information */
std::string currentKeyspace; /**< The name of the active keyspace */
const CassPrepared* preparedInsert; /**< The prepared statement for insertions */
/**
* @brief This function validates a name to ensure that
* it only consists of alphanumeric characters.
......@@ -44,39 +40,6 @@ protected:
*/
bool validateName(std::string name);
/**
* @brief This function converts a uint64_t into a big-endian
* byte array represented as std::string.
* @param n The value to convert
* @return The string containing the big-endian byte array.
*/
std::string int64Convert(uint64_t n);
/**
* @brief This function prints Cassandra CQL-specific
* error messages from a CassFuture object.
* @param future The future object which caused the error.
*/
void printError(CassFuture* future);
/**
* @brief This function executes a simple raw CQL query.
* @param query The CQL query string.
* @return Returns a CassError type to indicate success or failure
*/
CassError executeSimpleQuery(std::string query);
public:
/* Database meta operations */
/**
* @brief Starts a connection to a Cassandra front-end node.
* @param hostname The hostname to connect to.
* @param port The TCP port number on which Cassandra is listening for CQL clients.
* @return Returns true if a connection was established, false otherwise.
*/
bool connect(std::string hostname, int port);
/**
* @brief Fetch the list of Key Spaces from the Cassandra server.
*/
......@@ -127,34 +90,59 @@ public:
void createColumnFamily(std::string name, std::string fields, std::string primaryKey, std::string options);
/* Database data access operations */
public:
/**
* @brief The implementation function of DCDBConnection::printError().
*/
void printError(CassFuture* future);
/**
* @brief Insert a data into the database
* @param key The name of the row key
* @param ts The data time stamp (used for column name and time information in Cassandra
* @param value The data itself
* @brief The implementation function of DCDBConnection::setHostname().
*/
void insert(std::string key, uint64_t ts, uint64_t value);
void setHostname(std::string hostname);
/**
* @brief Prepare for insertions
* @brief The implementation function of DCDBConnection::getHostname().
*/
void prepareInsert(uint64_t ttl);
std::string getHostname();
/**
* @brief The implementation function of DCDBConnection::setPort().
*/
void setPort(uint16_t port);
/* Class constructor / desctructor */
/**
* @brief The implementation function of DCDBConnection::getPort().
*/
uint16_t getPort();
/**
* @brief The standard constructor for CassandraBackend
* @brief The implementation function of DCDBConnection::connect().
*/
CassandraBackend();
bool connect();
/**
* @brief The standard desctructor for CassandraBackend
* @brief The implementation function of DCDBConnection::disconnect().
*/
virtual ~CassandraBackend();
};
void disconnect();
/**
* @brief The implementation function of DCDBConnection::getSessionHandle().
*/
CassSession* getSessionHandle();
/**
* @brief The implementation function of DCDBConnection::executeSimpleQuery().
*/
CassError executeSimpleQuery(std::string query);
/**
* @brief The implementation function of DCDBConnection::initSchema().
*/
bool initSchema();
DCDBConnectionImpl();
virtual ~DCDBConnectionImpl();
};
#endif /* CASSANDRA_BACKEND_H */
#endif /* CONNECTION_INTERNAL_H */
/*
* dcdbglobals.h
*
* Created on: May 18, 2015
* Author: Axel Auweter
*/
/**
* @file
* @brief This file contains some global definitions and names
* use by the DCDBLib library.
*/
#ifndef DCDB_GLOBALS_H
#define DCDB_GLOBALS_H
#define KEYSPACE_NAME "dcdb"
#define CF_SENSORDATA "sensordata"
#define CONFIG_KEYSPACE_NAME KEYSPACE_NAME "_config"
#define CF_PUBLISHEDSENSORS "publishedsensors"
#define CF_VIRTUALSENSORS "virtualsensors"
#endif /* DCDB_GLOBALS_H */
......@@ -18,14 +18,7 @@
#include <string>
#include "sensordatastore.h"
#include "cassandraBackend.h"
#define KEYSPACE_NAME "dcdb"
#define CF_SENSORDATA "sensordata"
#define CONFIG_KEYSPACE_NAME KEYSPACE_NAME "_config"
#define CF_PUBLISHEDSENSORS "publishedsensors"
#define CF_VIRTUALSENSORS "virtualsensors"
#include "connection.h"
/**
* @brief The SensorDataStoreImpl class contains all protected
......@@ -35,7 +28,9 @@
class SensorDataStoreImpl
{
protected:
CassandraBackend* csBackend; /**< The CassandraBackend object that does the low-level stuff for us. */
DCDBConnection* connection; /**< The DCDBCOnnection object that does the low-level stuff for us. */
CassSession* session; /**< The CassSession object given by the connection. */
const CassPrepared* preparedInsert; /**< The prepared statement for fast insertions. */
/**
* @brief This function returns a "key" string which
......@@ -45,15 +40,13 @@ protected:
*/
std::string sidConvert(SensorId *sid);
public:
/**
* @brief This function connects to the database and initializes keyspaces and column families
* @param hostname The hostname of a Cassandra front-end node
* @param port The port number of the Cassandra front-end node
* @param ttl The TTL for inserted data (set to 0 to insert data without TTL)
* @brief Prepare for insertions.
* @param ttl A TTL that will be set for newly inserted values. Set to 0 to insert without TTL.
*/
void init(std::string hostname, int port, uint64_t ttl);
void prepareInsert(uint64_t ttl);
public:
/**
* @brief This function converts a MQTT topic string to a SensorId object
* @param topic The MQTT topic string to convert
......@@ -69,11 +62,17 @@ public:
*/
void insert(SensorId* sid, uint64_t ts, uint64_t value);
/**
* @brief This function sets the TTL of newly inserted readings.
* @param ttl The TTL to be used for new inserts in seconds.
*/
void setTTL(uint64_t ttl);
/**
* @brief This is the standard constructor of the SensorDataStoreImpl class.
* @param csb A CassandraBackend object to do the raw database access.
*/
SensorDataStoreImpl(CassandraBackend *csb);
SensorDataStoreImpl(DCDBConnection* conn);
/**
* @brief The standard desctructor of SensorDataStoreImpl.
......
/*
* cassandraBackend.cpp
* connection.cpp
*
* Created on: Apr 08, 2013
* Created on: May 18, 2015
* Author: Axel Auweter
*/
#include <algorithm>
#include <iostream>
#include <string>
#include <boost/lexical_cast.hpp>
#include <cstdio>
#include <cstdlib>
#include <cstdint>
#include <cinttypes>
#include "cassandra.h"
#include "cassandraBackend.h"
#include "connection.h"
#include "connection_internal.h"
#include "dcdbglobals.h"
/**
* @details
* Once a request to Cassandra completes, we get a CassFuture
* object which holds the return to our request. In case the
* request caused an error, this function prints a human-readable
* error message.
/*
* Definition of the public DCDBConnection functions.
* All calls forward calls of all DCDBConnection functions
* to their counterparts in the DCDBConnectionImpl class.
*/
void CassandraBackend::printError(CassFuture* future)
{
CassString message = cass_future_error_message(future);
std::cerr << "Cassandra Backend Error: " << std::string(message.data, message.length) << std::endl;
void DCDBConnection::printError(CassFuture* future) {
impl->printError(future);
}
/**
* @details
* The request is issued as a simple synchronous CQL statement.
*/
CassError CassandraBackend::executeSimpleQuery(std::string query)
{
CassError rc = CASS_OK;
CassFuture* future = NULL;
CassStatement* statement = cass_statement_new(cass_string_init(query.c_str()), 0);
void DCDBConnection::setHostname(std::string hostname) {
impl->setHostname(hostname);
}
future = cass_session_execute(session, statement);
cass_future_wait(future);
std::string DCDBConnection::getHostname() {
return impl->getHostname();
}
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
printError(future);
}
void DCDBConnection::setPort(uint16_t port) {
impl->setPort(port);
}
cass_future_free(future);
cass_statement_free(statement);
uint16_t DCDBConnection::getPort() {
return impl->getPort();
}
return rc;
bool DCDBConnection::connect() {
return impl->connect();
}
void DCDBConnection::disconnect() {
impl->disconnect();
}
/**
* @details
* This function connects to a Cassandra front end node
* using the CQL API.
bool DCDBConnection::initSchema() {
return impl->initSchema();
}
CassSession* DCDBConnection::getSessionHandle() {
return impl->getSessionHandle();
}
/*
* DCDBConnection constructors & destructor.
* Upon object creation, allocate a corresponding impl class
* and free it on object deallocation.
*/
bool CassandraBackend::connect(std::string hostname, int port)
{
/* Create a new cluster object */
if (!cluster)
cluster = cass_cluster_new();
DCDBConnection::DCDBConnection() {
if (!impl)
impl = new DCDBConnectionImpl();
}
DCDBConnection::DCDBConnection(std::string hostname, uint16_t port) {
if (!impl)
impl = new DCDBConnectionImpl();
if (!session)
session = cass_session_new();
impl->setHostname(hostname);
impl->setPort(port);
}
/* Set hostname and port */
cass_cluster_set_contact_points(cluster, hostname.c_str());
cass_cluster_set_port(cluster, port);
DCDBConnection::~DCDBConnection() {
if (impl)
delete impl;
}
/* Force protcol version to 1 */
cass_cluster_set_protocol_version(cluster, 1);
/* Connect to the server */