Commit f19bdab4 authored by Axel Auweter's avatar Axel Auweter
Browse files

Getting rid of Thrift. SensorDataStore::insert however is still #if 0'ed out.

parent 9569fef2
......@@ -5,7 +5,7 @@ OBJS = collectagent.o \
simplemqttserver.o \
simplemqttserverthread.o \
simplemqttservermessage.o
LIBS = -L$(DCDBDEPLOYPATH)/lib/ -ldcdb -lpthread -lboost_system -lboost_thread -lthrift
LIBS = -L$(DCDBDEPLOYPATH)/lib/ -ldcdb -lpthread -lcassandra -luv -lboost_system -lboost_random -lboost_thread
TARGET = collectagent
.PHONY : clean install
......
......@@ -7,10 +7,7 @@ CXXFLAGS = -O0 -g -Wall -Werror -Wno-unused-local-typedefs -Wno-unknown-warning-
# List of object files to build and the derived list of corresponding source files
OBJS = src/sensordatastore.o \
src/cassandraBackend.o \
cassandra/Cassandra.o \
cassandra/cassandra_constants.o \
cassandra/cassandra_types.o
src/cassandraBackend.o
SRC = $(patsubst cassandra/%,,$(OBJS:.o=.cpp))
# List of public header files necessary to use this libray
......@@ -20,7 +17,7 @@ PUBHEADERS = $(shell find include -type f -iname "*.h")
PRIVHEADERS = $(shell find include_internal -type f -iname "*.h")
# External libraries to link against
LIBS = -L$(DCDBDEPLOYPATH)/lib -lthrift -lssl -lcrypto -lpthread -lboost_system -lboost_thread
LIBS = -L$(DCDBDEPLOYPATH)/lib/ -lcassandra -lboost_random -lboost_system -luv
# Dynamic library building differs between Linux/BSD and MacOS
OS = $(shell uname)
......@@ -36,11 +33,10 @@ endif
P = $(shell cd $(DCDBDEPLOYPATH)/lib/ && pwd)
# List of Phony Targets
.PHONY : check-thrift-env all clean clean-cassandra-headers install
.PHONY : check-target-env all clean install
# Main Library Target
$(TARGET): $(SRC)
$(MAKE) cassandra/Cassandra.h
$(MAKE) $(OBJS)
@if [ "$(OS)" = "Darwin" ]; then \
echo "Linking library in Mac OS style: $(TARGET)"; \
......@@ -53,7 +49,7 @@ $(TARGET): $(SRC)
all: $(TARGET)
# Alert the user to put the necessary paths into LD_LIBRARY_PATH (or similar on other platforms)
check-thrift-env:
check-target-env:
@if [ "$(OS)" = "Darwin" ]; then \
R=`echo $$DYLD_LIBRARY_PATH | grep $P`; \
if [ "$$R" = "" ]; then \
......@@ -67,47 +63,21 @@ check-thrift-env:
fi; \
fi
# Build the Cassandra Thrift headers
cassandra/Cassandra.h:
@if [ ! -e cassandra ]; then \
printf "Building Thrift interface headers... "; \
$(DCDBDEPLOYPATH)/bin/thrift --gen cpp $(DCDBDEPLOYPATH)/cassandra/interface/cassandra.thrift; \
mv gen-cpp cassandra; \
echo "Done."; \
fi
@if [ "$U" = "Linux" ]; then \
echo "Fixing Thrift includes... "; \
grep stdint cassandra/cassandra_types.h > /dev/null; \
if [ "$$?" -eq "1" ]; then sed -i '1i#include <stdint.h>' cassandra/cassandra_types.h; fi;\
grep stdint cassandra/Cassandra.h > /dev/null; \
if [ "$$?" -eq "1" ]; then sed -i '1i#include <stdint.h>' cassandra/Cassandra.h; fi;\
fi
@# This is a really bad way of adding a line of text from a Makefile, but the differences between sed on Mac OS and Linux leave me with no options...
@sed -e 's/#include "cassandra_types.h"/#include "cassandra_types.h"$$#undef VERSION/' cassandra/cassandra_constants.h > cassandra/cassandra_constants.h_newline
@cat cassandra/cassandra_constants.h_newline | tr $$ '\n' > cassandra/cassandra_constants.h
@rm cassandra/cassandra_constants.h_newline
@# And more fixes needed since clang complains about the thrift generated code's use of namespaces
@sed -i -e 's/using apache/using ::apache/g' cassandra/cassandra_types.cpp
# Build the documentation
docs: $(PUBHEADERS) $(PRIVHEADERS) $(SRC)
@echo "Creating documentation..."
doxygen
# Clean everything
clean: clean-cassandra-headers clean-docs
clean: clean-docs
rm -f $(OBJS) $(TARGET)
# Clean the Cassandra headers only
clean-cassandra-headers:
rm -rf cassandra
# Clean the documentation
clean-docs:
rm -rf docs
# Install to deploy path
install: $(TARGET) check-thrift-env
install: $(TARGET) check-target-env
mkdir -p $(DCDBDEPLOYPATH)/lib
mkdir -p $(DCDBDEPLOYPATH)/include/dcdb
install $(TARGET) $(DCDBDEPLOYPATH)/lib/
......
......@@ -15,49 +15,30 @@
#ifndef CASSANDRA_BACKEND_H
#define CASSANDRA_BACKEND_H
#include <sys/socket.h>
#include <netinet/in.h>
#include <string>
#include <boost/lexical_cast.hpp>
#include <thrift/Thrift.h>
#include <thrift/transport/TTransport.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/transport/TSocket.h>
#include <thrift/protocol/TProtocol.h>
#include <thrift/protocol/TBinaryProtocol.h>
#include <dcdbendian.h>
#include "../cassandra/Cassandra.h"
using namespace apache::thrift;
using namespace apache::thrift::transport;
using namespace apache::thrift::protocol;
using namespace org::apache::cassandra;
#include "cassandra.h"
/**
* @brief The CassandraBackend class provides low-level
* access to the Cassandra key-value store using
* the THRIFT API.
* the CQL API.
*/
class CassandraBackend
{
protected:
CassandraClient *myClient; /**< The main object provided by the Thrift API for us */
std::vector<KsDef> keySpaces; /**< A vector containing a local copy of the keyspace definitions */
std::string clusterName; /**< The name of the cluster that we're connecting to */
boost::shared_ptr<TSocket> sock; /**< A boost TSocket object for our connection */
boost::shared_ptr<TTransport> tr; /**< A boost TTransport object sitting on top of the TSocket sock */
boost::shared_ptr<TProtocol> prot; /**< A boost TProtocol object sitting on top of the TTransport tr */
CassCluster* cluster; /**< The Cassandra Cluster object (contains hostname, port, etc) */
CassSession* session; /**< The session object through which we communicate with C* once the connection is established */
const CassSchema* schema; /**< The schema object containing the current database schema information */
std::string currentKeyspace; /**< The name of the active keyspace */
/**
* @brief This function validates a name to ensure that
* it only consists of alphanumeric characters.
* @param name A string to check
* @return True if the name is alphanumeric, false otherwise
* @return True if the name is alphanumeric, false otherwise
*/
bool validateName(std::string name);
......@@ -65,27 +46,39 @@ protected:
* @brief This function converts a uint64_t into a big-endian
* byte array represented as std::string.
* @param n The value to convert
* @return The string containing the big-endian byte array.
* @return The string containing the big-endian byte array.
*/
std::string int64Convert(uint64_t n);
public:
/* FIXME - make currentKeySpace protected! */
KsDef currentKeySpace; /**< The keyspace defniition of the currently selected keyspace */
/**
* @brief This function prints Cassandra CQL-specific
* error messages from a CassFuture object.
* @param future The future object which caused the error.
*/
void printError(CassFuture* future);
/**
* @brief This function executes a simple raw CQL query.
* @param query The CQL query string.
* @return Returns a CassError type to indicate success or failure
*/
CassError executeSimpleQuery(std::string query);
public:
/* Database meta operations */
/**
* @brief Starts a connection to a Cassandra front-end node.
* @param hostname The hostname to connect to.
* @param port The TCP port number on which Cassandra is listening for Thrift clients.
* @return Returns true if a connection was established, false otherwise.
*/
void connect(std::string hostname, int port);
bool connect(std::string hostname, int port);
/**
* @brief Fetch the list of Key Spaces from the Cassandra server.
*/
void updateKeySpaces();
void updateSchema();
/**
* @brief Check if a keyspace with a given name exists.
......@@ -107,6 +100,12 @@ public:
*/
void selectKeyspace(std::string name);
/**
* @brief Get the active keyspace's name
* @return string object containing the keyspace name
*/
std::string getActiveKeyspace();
/**
* @brief Check if a column family with a given name
* exists in the currently selected keyspace.
......
......@@ -6,75 +6,140 @@
*/
#include <iostream>
#include <string>
#include <boost/lexical_cast.hpp>
#include "cassandraBackend.h"
/**
* @details
* Once a request to Cassandra completes, we get a CassFuture
* object which holds the return to our request. In case the
* request caused an error, this function prints a human-readable
* error message.
*/
void CassandraBackend::printError(CassFuture* future)
{
CassString message = cass_future_error_message(future);
std::cerr << "Cassandra Backend Error: " << std::string(message.data, message.length) << std::endl;
}
/**
* @details
* The request is issued as a simple synchronous CQL statement.
*/
CassError CassandraBackend::executeSimpleQuery(std::string query)
{
CassError rc = CASS_OK;
CassFuture* future = NULL;
CassStatement* statement = cass_statement_new(cass_string_init(query.c_str()), 0);
future = cass_session_execute(session, statement);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
printError(future);
}
cass_future_free(future);
cass_statement_free(statement);
return rc;
}
/**
* @details
* This function connects to a Cassandra front end node
* using the Thrift API and Boost's TBinaryProtocol over
* TFramedTransport over TSocket.
*
* It also fetches some information about the database
* cluster.
* using the CQL API.
*/
void CassandraBackend::connect(std::string hostname, int port)
bool CassandraBackend::connect(std::string hostname, int port)
{
sock = boost::shared_ptr<TSocket>(new TSocket(hostname, port));
tr = boost::shared_ptr<TFramedTransport>(new TFramedTransport(sock));
prot = boost::shared_ptr<TBinaryProtocol>(new TBinaryProtocol(tr));
/* Create a new cluster object */
if (!cluster)
cluster = cass_cluster_new();
if (!session)
session = cass_session_new();
printf("Allocated cluster object: %p\n", cluster);
/* Set hostname and port */
cass_cluster_set_contact_points(cluster, hostname.c_str());
cass_cluster_set_port(cluster, port);
/* Force protcol version to 1 */
cass_cluster_set_protocol_version(cluster, 1);
myClient = new CassandraClient(prot);
tr->open();
/* Connect to the server */
CassError rc = CASS_OK;
printf("Connecting session...\n");
CassFuture* future = cass_session_connect(session, cluster);
myClient->describe_cluster_name(clusterName);
std::cout << "Connected to cluster: " << clusterName << "\n";
printf("Waiting for connection...\n");
/* Wait for successful connection */
cass_future_wait(future);
printf("Checking for error code...\n");
rc = cass_future_error_code(future);
printf("Error code: %d\n", rc);
if (rc != CASS_OK) {
printError(future);
cass_future_free(future);
return false;
}
cass_future_free(future);
std::cout << "Connected to cluster.\n";
return true;
}
/**
* @details
* This function updates the local copy of the list
* of available keyspaces. It will be called any time
* a existsKeyspace query is performed to take note
* This function updates the local copy of the data
* store schema information. It is called whenever
* an existsKeyspace query is performed to take note
* of recently added keyspaces.
*/
void CassandraBackend::updateKeySpaces()
void CassandraBackend::updateSchema()
{
myClient->describe_keyspaces(keySpaces);
schema = cass_session_get_schema(session);
}
/**
* @details
* This function iterates all known keyspaces to check
* if a keyspace with a given name can be found.
* This function tries to retrieve schema information
* about a given keyspace to determine if the keyspace
* exists
*/
bool CassandraBackend::existsKeyspace(std::string name)
{
updateKeySpaces();
for (std::vector<KsDef>::iterator it = keySpaces.begin(); it != keySpaces.end(); ++it) {
if ((*it).name.compare(name) == 0)
return true;
}
return false;
updateSchema();
const CassSchemaMeta* keyspaceMeta = cass_schema_get_keyspace(schema, name.c_str());
if (keyspaceMeta != NULL)
return true;
else
return false;
}
/**
* @details
* This function creates a new keyspace with a given name and
* replication factor.
*
* We are using CQL3 to do this as its syntax is less likely
* to change in future versions than the native API.
*/
void CassandraBackend::createKeyspace(std::string name, int replicationFactor)
{
CqlResult res;
std::string query;
std::string rFact = boost::lexical_cast<std::string>(replicationFactor);
if(validateName(name)) {
query = "CREATE KEYSPACE " + name + " WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '" + rFact + "' };";
myClient->execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
if(executeSimpleQuery(query) != CASS_OK) {
std::cerr << "Failed to create keyspace " << name << "!" << std::endl;
}
}
}
......@@ -86,34 +151,49 @@ void CassandraBackend::createKeyspace(std::string name, int replicationFactor)
*/
void CassandraBackend::selectKeyspace(std::string name)
{
CqlResult res;
std::string query;
if (validateName(name)) {
if (validateName(name) && existsKeyspace(name)) {
query = "USE " + name + ";";
myClient->execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
updateKeySpaces();
for (std::vector<KsDef>::iterator it = keySpaces.begin(); it != keySpaces.end(); ++it) {
if ((*it).name.compare(name) == 0)
currentKeySpace = *it;
if (executeSimpleQuery(query) != CASS_OK) {
std::cerr << "Error selecting keyspace " << name << "!" << std::endl;
}
currentKeyspace = name;
}
}
/**
* @details
* This function iterates over all column families in the key
* space that was selected by selectKeyspace to check if a
* column family with the given name exists.
* If no keyspace was selected the returned string is empty.
*/
std::string CassandraBackend::getActiveKeyspace()
{
return currentKeyspace;
}
/**
* @details
* This function tries to query meta information for a given
* column family (table) to check whether the column family
* exists.
*/
bool CassandraBackend::existsColumnFamily(std::string name)
{
for (std::vector<CfDef>::iterator it = currentKeySpace.cf_defs.begin(); it != currentKeySpace.cf_defs.end(); ++it) {
if ((*it).name.compare(name) == 0)
return true;
updateSchema();
const CassSchemaMeta* keyspaceMeta = cass_schema_get_keyspace(schema, currentKeyspace.c_str());
if (keyspaceMeta == NULL) {
/* It is a bit misleading to return false if the keyspace doesn't even exist... */
return false;
}
return false;
const CassSchemaMeta* tableMeta = cass_schema_meta_get_entry(keyspaceMeta, name.c_str());
if (tableMeta == NULL) {
return false;
}
return true;
}
/**
......@@ -123,7 +203,6 @@ bool CassandraBackend::existsColumnFamily(std::string name)
*/
void CassandraBackend::createColumnFamily(std::string name, std::string fields, std::string primaryKey, std::string options)
{
CqlResult res;
std::stringstream query;
/* FIXME: Secure this and use proper types for fields, primaryKey, and options. */
......@@ -131,7 +210,7 @@ void CassandraBackend::createColumnFamily(std::string name, std::string fields,
<< " ( " << fields << ", PRIMARY KEY (" << primaryKey << "))"
<< " WITH " << options << ";";
myClient->execute_cql3_query(res, query.str(), Compression::NONE, ConsistencyLevel::ONE);
executeSimpleQuery(query.str());
}
/**
......@@ -177,6 +256,7 @@ std::string CassandraBackend::int64Convert(uint64_t n)
*/
void CassandraBackend::insert(std::string columnFamily, std::string key, uint64_t ts, uint64_t value)
{
#if 0
try {
ColumnParent cparent;
Column c;
......@@ -219,15 +299,22 @@ void CassandraBackend::insert(std::string columnFamily, std::string key, uint64_
std::cout << "NF Exception: " << nfe.what() << "\n";
exit(EXIT_FAILURE);
}
#else
std::cerr << "Insert currently unimplemented!\n";
#endif
}
CassandraBackend::CassandraBackend()
{
cluster = nullptr;
session = nullptr;
}
CassandraBackend::~CassandraBackend()
{
/* Clean up... */
if (myClient)
delete myClient;
if (session)
cass_session_free(session);
if (cluster)
cass_cluster_free(cluster);
}
......@@ -100,68 +100,54 @@ bool SensorDataStoreImpl::topicToSid(SensorId* sid, std::string topic)
*/
void SensorDataStoreImpl::init(std::string hostname, int port) {
/*
* Open the connection to the Cassandra database and
* create the necessary keyspaces and column families.
*/
try {
csBackend->connect(hostname, port);
/*
* Open the connection to the Cassandra database and
* create the necessary keyspaces and column families.
*/
csBackend->connect(hostname, port);
/* Keyspace and column family for sensor aliases */
/* FIXME: We should have a good way to determine the number of replicas here */
if (!csBackend->existsKeyspace(CONFIG_KEYSPACE_NAME)) {
std::cout << "Creating Keyspace " << CONFIG_KEYSPACE_NAME << "...\n";
csBackend->createKeyspace(CONFIG_KEYSPACE_NAME, 1);
}
/* Keyspace and column family for sensor aliases */
/* FIXME: We should have a good way to determine the number of replicas here */
if (!csBackend->existsKeyspace(CONFIG_KEYSPACE_NAME)) {
std::cout << "Creating Keyspace " << CONFIG_KEYSPACE_NAME << "...\n";
csBackend->createKeyspace(CONFIG_KEYSPACE_NAME, 1);
}
csBackend->selectKeyspace(CONFIG_KEYSPACE_NAME);
csBackend->selectKeyspace(CONFIG_KEYSPACE_NAME);
if (!(csBackend->currentKeySpace.name.compare(CONFIG_KEYSPACE_NAME) == 0)) {
std::cout << "Cannot select keyspace " << CONFIG_KEYSPACE_NAME << "\n";
exit(EXIT_FAILURE);
}
if (!(csBackend->getActiveKeyspace().compare(CONFIG_KEYSPACE_NAME) == 0)) {
std::cout << "Cannot select keyspace " << CONFIG_KEYSPACE_NAME << "\n";
exit(EXIT_FAILURE);
}
if (!csBackend->existsColumnFamily(CF_SENSORALIASES)) {
std::cout << "Creating Column Familiy " CF_SENSORALIASES "...\n";
csBackend->createColumnFamily(CF_SENSORALIASES,
"name varchar, pattern varchar",
"name",
"COMPACT STORAGE AND CACHING = all");
}
if (!csBackend->existsColumnFamily(CF_SENSORALIASES)) {
std::cout << "Creating Column Familiy " CF_SENSORALIASES "...\n";
csBackend->createColumnFamily(CF_SENSORALIASES,
"name varchar, pattern varchar",
"name",
"COMPACT STORAGE AND CACHING = all");
}
/* Keyspace and column family for raw sensor data */
if (!csBackend->existsKeyspace(KEYSPACE_NAME)) {
std::cout << "Creating Keyspace " << KEYSPACE_NAME << "...\n";
csBackend->createKeyspace(KEYSPACE_NAME, 1);
}
/* Keyspace and column family for raw sensor data */
if (!csBackend->existsKeyspace(KEYSPACE_NAME)) {
std::cout << "Creating Keyspace " << KEYSPACE_NAME << "...\n";
csBackend->createKeyspace(KEYSPACE_NAME, 1);
}
csBackend->selectKeyspace(KEYSPACE_NAME);
csBackend->selectKeyspace(KEYSPACE_NAME);
if (!(csBackend->currentKeySpace.name.compare(KEYSPACE_NAME) == 0)) {
std::cout << "Cannot select keyspace " << KEYSPACE_NAME << "\n";
exit(EXIT_FAILURE);
}
if (!(csBackend->getActiveKeyspace().compare(KEYSPACE_NAME) == 0)) {
std::cout << "Cannot select keyspace " << KEYSPACE_NAME << "\n";
exit(EXIT_FAILURE);
}
if (!csBackend->existsColumnFamily(CF_SENSORDATA)) {
std::cout << "Creating Column Familiy " CF_SENSORDATA "...\n";
csBackend->createColumnFamily(CF_SENSORDATA,
"sid blob, ts bigint, value bigint",
"sid, ts",
"COMPACT STORAGE");
}
}
catch(const TTransportException& te){
std::cout << "TP Exception: " << te.what() << "[" << te.getType() << "]\n";
exit(EXIT_FAILURE);
}
catch(const InvalidRequestException& ire){
std::cout << "IRE Exception: " << ire.what() << "[" << ire.why << "]\n";
exit(EXIT_FAILURE);
}
catch(const NotFoundException& nfe){
std::cout << "NF Exception: " << nfe.what() << "\n";
exit(EXIT_FAILURE);
}
if (!csBackend->existsColumnFamily(CF_SENSORDATA)) {
std::cout << "Creating Column Familiy " CF_SENSORDATA "...\n";
csBackend->createColumnFamily(CF_SENSORDATA,
"sid blob, ts bigint, value bigint",
"sid, ts",
"COMPACT STORAGE");
}
}
/**
......@@ -257,7 +243,7 @@ SensorDataStore::SensorDataStore()
{
csBackend = new CassandraBackend();
impl = nullptr;
init("localhost", 9160);
init("localhost", 9042);
}
/**
......