Commit 8cacfb11 authored by Daniele Tafani's avatar Daniele Tafani
Browse files

Merge branch 'master' of ssh://deep-ras.srv.lrz.de/git/dcdb

parents a9532500 9615bb79
......@@ -5,7 +5,7 @@ OBJS = collectagent.o \
simplemqttserver.o \
simplemqttserverthread.o \
simplemqttservermessage.o
LIBS = -L$(DCDBDEPLOYPATH)/lib/ -ldcdb -lpthread -lboost_system -lboost_thread -lthrift
LIBS = -L$(DCDBDEPLOYPATH)/lib/ -ldcdb -lpthread -lcassandra -luv -lboost_system -lboost_random -lboost_thread
TARGET = collectagent
.PHONY : clean install
......
......@@ -79,26 +79,29 @@ void mqttCallback(SimpleMQTTMessage *msg)
* the record in the database.
*/
SensorId sid;
mySensorDataStore->topicToSid(&sid,msg->getTopic());
if (mySensorDataStore->topicToSid(&sid,msg->getTopic())) {
#if 0
cout << "Topic decode successful:"
<< "\nRaw: " << sid.raw[0] << " " << sid.raw[1]
<< "\ndatacenter_id: " << hex << (uint32_t)sid.dl.datacenter_id
<< "\ncluster_id: " << hex << (uint32_t)sid.dl.cluster_id
<< "\nrack_id_msb: " << hex << (uint32_t)sid.dl.rack_id_msb
<< "\nrack_id_lsb: " << hex << (uint32_t)sid.dl.rack_id_lsb
<< "\nchassis_id: " << hex << (uint32_t)sid.dl.chassis_id
<< "\nbmc_id: " << hex << (uint32_t)sid.dl.bmc_id
<< "\nsdb_id: " << hex << (uint32_t)sid.dl.sdb_id
<< "\ndevice_id: " << hex << sid.dsid.device_id
<< "\nsensor_id: " << hex << sid.dsid.sensor_id
<< "\nRaw: " << hex << setw(16) << setfill('0') << sid.raw[0] << " " << hex << setw(16) << setfill('0') << sid.raw[1]
<< "\ndatacenter_id: " << hex << ((sid.dl & 0xFF00000000000000) >> 56)
<< "\ncluster_id: " << hex << ((sid.dl & 0x00FF000000000000) >> 48)
<< "\nrack_id: " << hex << ((sid.dl & 0x0000FFFF00000000) >> 32)
<< "\nchassis_id: " << hex << ((sid.dl & 0x00000000FF000000) >> 24)
<< "\nbic_id: " << hex << ((sid.dl & 0x0000000000FF0000) >> 16)
<< "\nbmc_id: " << hex << ((sid.dl & 0x000000000000FF00) >> 8)
<< "\nknc_id: " << hex << ((sid.dl & 0x00000000000000FF))
<< "\ndevice_id: " << hex << sid.dsid.device_id
<< "\nreserved: " << hex << sid.dsid.rsvd
<< "\nsensor_number: " << hex << sid.dsid.sensor_number
<< "\n";
#endif
//mySensorDataStore->insert(&sid, ts, *((uint64_t*)msg->getPayload()));
mySensorDataStore->insert(&sid, ts, val);
}
#if 0
else {
cout << "Wrong topic format: " << msg->getTopic() << "\n";
}
#endif
}
delete msg;
}
......@@ -139,7 +142,7 @@ int main(int argc, char* const argv[]) {
/*
* Allocate and initialize sensor data store.
*/
mySensorDataStore = new SensorDataStore();
mySensorDataStore = new SensorDataStore("127.0.0.1", 9042);
/*
* Start the MQTT Message Server.
......
......@@ -7,11 +7,7 @@ CXXFLAGS = -O0 -g -Wall -Werror -Wno-unused-local-typedefs -Wno-unknown-warning-
# List of object files to build and the derived list of corresponding source files
OBJS = src/sensordatastore.o \
src/cassandraBackend.o \
cassandra/Cassandra.o \
cassandra/cassandra_constants.o \
cassandra/cassandra_types.o
SRC = $(patsubst cassandra/%,,$(OBJS:.o=.cpp))
src/cassandraBackend.o
# List of public header files necessary to use this libray
PUBHEADERS = $(shell find include -type f -iname "*.h")
......@@ -20,7 +16,7 @@ PUBHEADERS = $(shell find include -type f -iname "*.h")
PRIVHEADERS = $(shell find include_internal -type f -iname "*.h")
# External libraries to link against
LIBS = -L$(DCDBDEPLOYPATH)/lib -lthrift -lssl -lcrypto -lpthread -lboost_system -lboost_thread
LIBS = -L$(DCDBDEPLOYPATH)/lib/ -lcassandra -lboost_random -lboost_system -luv
# Dynamic library building differs between Linux/BSD and MacOS
OS = $(shell uname)
......@@ -36,12 +32,10 @@ endif
P = $(shell cd $(DCDBDEPLOYPATH)/lib/ && pwd)
# List of Phony Targets
.PHONY : check-thrift-env all clean clean-cassandra-headers install
.PHONY : check-target-env all clean install
# Main Library Target
$(TARGET): $(SRC)
$(MAKE) cassandra/Cassandra.h
$(MAKE) $(OBJS)
$(TARGET): $(OBJS)
@if [ "$(OS)" = "Darwin" ]; then \
echo "Linking library in Mac OS style: $(TARGET)"; \
$(CXX) $(CXXFLAGS) $(DLFLAGS) -o $(TARGET) $(OBJS) $(LIBS); \
......@@ -50,10 +44,10 @@ $(TARGET): $(SRC)
$(CXX) $(CXXFLAGS) $(DLFLAGS) -o $(TARGET) $(OBJS) $(LIBS); \
fi
all: $(TARGET)
all: $(TARGET) check-target-env
# Alert the user to put the necessary paths into LD_LIBRARY_PATH (or similar on other platforms)
check-thrift-env:
check-target-env:
@if [ "$(OS)" = "Darwin" ]; then \
R=`echo $$DYLD_LIBRARY_PATH | grep $P`; \
if [ "$$R" = "" ]; then \
......@@ -67,47 +61,21 @@ check-thrift-env:
fi; \
fi
# Build the Cassandra Thrift headers
cassandra/Cassandra.h:
@if [ ! -e cassandra ]; then \
printf "Building Thrift interface headers... "; \
$(DCDBDEPLOYPATH)/bin/thrift --gen cpp $(DCDBDEPLOYPATH)/cassandra/interface/cassandra.thrift; \
mv gen-cpp cassandra; \
echo "Done."; \
fi
@if [ "$U" = "Linux" ]; then \
echo "Fixing Thrift includes... "; \
grep stdint cassandra/cassandra_types.h > /dev/null; \
if [ "$$?" -eq "1" ]; then sed -i '1i#include <stdint.h>' cassandra/cassandra_types.h; fi;\
grep stdint cassandra/Cassandra.h > /dev/null; \
if [ "$$?" -eq "1" ]; then sed -i '1i#include <stdint.h>' cassandra/Cassandra.h; fi;\
fi
@# This is a really bad way of adding a line of text from a Makefile, but the differences between sed on Mac OS and Linux leave me with no options...
@sed -e 's/#include "cassandra_types.h"/#include "cassandra_types.h"$$#undef VERSION/' cassandra/cassandra_constants.h > cassandra/cassandra_constants.h_newline
@cat cassandra/cassandra_constants.h_newline | tr $$ '\n' > cassandra/cassandra_constants.h
@rm cassandra/cassandra_constants.h_newline
@# And more fixes needed since clang complains about the thrift generated code's use of namespaces
@sed -i -e 's/using apache/using ::apache/g' cassandra/cassandra_types.cpp
# Build the documentation
docs: $(PUBHEADERS) $(PRIVHEADERS) $(SRC)
@echo "Creating documentation..."
doxygen
# Clean everything
clean: clean-cassandra-headers clean-docs
clean: clean-docs
rm -f $(OBJS) $(TARGET)
# Clean the Cassandra headers only
clean-cassandra-headers:
rm -rf cassandra
# Clean the documentation
clean-docs:
rm -rf docs
# Install to deploy path
install: $(TARGET) check-thrift-env
install: $(TARGET) check-target-env
mkdir -p $(DCDBDEPLOYPATH)/lib
mkdir -p $(DCDBDEPLOYPATH)/include/dcdb
install $(TARGET) $(DCDBDEPLOYPATH)/lib/
......
......@@ -15,49 +15,30 @@
#ifndef CASSANDRA_BACKEND_H
#define CASSANDRA_BACKEND_H
#include <sys/socket.h>
#include <netinet/in.h>
#include <string>
#include <boost/lexical_cast.hpp>
#include <thrift/Thrift.h>
#include <thrift/transport/TTransport.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/transport/TSocket.h>
#include <thrift/protocol/TProtocol.h>
#include <thrift/protocol/TBinaryProtocol.h>
#include <dcdbendian.h>
#include "../cassandra/Cassandra.h"
using namespace apache::thrift;
using namespace apache::thrift::transport;
using namespace apache::thrift::protocol;
using namespace org::apache::cassandra;
#include "cassandra.h"
/**
* @brief The CassandraBackend class provides low-level
* access to the Cassandra key-value store using
* the THRIFT API.
* the CQL API.
*/
class CassandraBackend
{
protected:
CassandraClient *myClient; /**< The main object provided by the Thrift API for us */
std::vector<KsDef> keySpaces; /**< A vector containing a local copy of the keyspace definitions */
std::string clusterName; /**< The name of the cluster that we're connecting to */
boost::shared_ptr<TSocket> sock; /**< A boost TSocket object for our connection */
boost::shared_ptr<TTransport> tr; /**< A boost TTransport object sitting on top of the TSocket sock */
boost::shared_ptr<TProtocol> prot; /**< A boost TProtocol object sitting on top of the TTransport tr */
CassCluster* cluster; /**< The Cassandra Cluster object (contains hostname, port, etc) */
CassSession* session; /**< The session object through which we communicate with C* once the connection is established */
const CassSchema* schema; /**< The schema object containing the current database schema information */
std::string currentKeyspace; /**< The name of the active keyspace */
/**
* @brief This function validates a name to ensure that
* it only consists of alphanumeric characters.
* @param name A string to check
* @return True if the name is alphanumeric, false otherwise
* @return True if the name is alphanumeric, false otherwise
*/
bool validateName(std::string name);
......@@ -65,27 +46,39 @@ protected:
* @brief This function converts a uint64_t into a big-endian
* byte array represented as std::string.
* @param n The value to convert
* @return The string containing the big-endian byte array.
* @return The string containing the big-endian byte array.
*/
std::string int64Convert(uint64_t n);
public:
/* FIXME - make currentKeySpace protected! */
KsDef currentKeySpace; /**< The keyspace defniition of the currently selected keyspace */
/**
* @brief This function prints Cassandra CQL-specific
* error messages from a CassFuture object.
* @param future The future object which caused the error.
*/
void printError(CassFuture* future);
/**
* @brief This function executes a simple raw CQL query.
* @param query The CQL query string.
* @return Returns a CassError type to indicate success or failure
*/
CassError executeSimpleQuery(std::string query);
public:
/* Database meta operations */
/**
* @brief Starts a connection to a Cassandra front-end node.
* @param hostname The hostname to connect to.
* @param port The TCP port number on which Cassandra is listening for Thrift clients.
* @param port The TCP port number on which Cassandra is listening for CQL clients.
* @return Returns true if a connection was established, false otherwise.
*/
void connect(std::string hostname, int port);
bool connect(std::string hostname, int port);
/**
* @brief Fetch the list of Key Spaces from the Cassandra server.
*/
void updateKeySpaces();
void updateSchema();
/**
* @brief Check if a keyspace with a given name exists.
......@@ -107,6 +100,12 @@ public:
*/
void selectKeyspace(std::string name);
/**
* @brief Get the active keyspace's name
* @return string object containing the keyspace name
*/
std::string getActiveKeyspace();
/**
* @brief Check if a column family with a given name
* exists in the currently selected keyspace.
......@@ -130,12 +129,11 @@ public:
/**
* @brief Insert a data into the database
* @param columnFamily The name of the column family
* @param key The name of the row key
* @param ts The data time stamp (used for column name and time information in Cassandra
* @param value The data itself
*/
void insert(std::string columnFamily, std::string key, uint64_t ts, uint64_t value);
void insert(std::string key, uint64_t ts, uint64_t value);
/* Class constructor / desctructor */
......
......@@ -6,75 +6,134 @@
*/
#include <iostream>
#include <string>
#include <boost/lexical_cast.hpp>
#include "cassandraBackend.h"
/**
* @details
* Once a request to Cassandra completes, we get a CassFuture
* object which holds the return to our request. In case the
* request caused an error, this function prints a human-readable
* error message.
*/
void CassandraBackend::printError(CassFuture* future)
{
CassString message = cass_future_error_message(future);
std::cerr << "Cassandra Backend Error: " << std::string(message.data, message.length) << std::endl;
}
/**
* @details
* The request is issued as a simple synchronous CQL statement.
*/
CassError CassandraBackend::executeSimpleQuery(std::string query)
{
CassError rc = CASS_OK;
CassFuture* future = NULL;
CassStatement* statement = cass_statement_new(cass_string_init(query.c_str()), 0);
future = cass_session_execute(session, statement);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
printError(future);
}
cass_future_free(future);
cass_statement_free(statement);
return rc;
}
/**
* @details
* This function connects to a Cassandra front end node
* using the Thrift API and Boost's TBinaryProtocol over
* TFramedTransport over TSocket.
*
* It also fetches some information about the database
* cluster.
* using the CQL API.
*/
void CassandraBackend::connect(std::string hostname, int port)
bool CassandraBackend::connect(std::string hostname, int port)
{
sock = boost::shared_ptr<TSocket>(new TSocket(hostname, port));
tr = boost::shared_ptr<TFramedTransport>(new TFramedTransport(sock));
prot = boost::shared_ptr<TBinaryProtocol>(new TBinaryProtocol(tr));
/* Create a new cluster object */
if (!cluster)
cluster = cass_cluster_new();
if (!session)
session = cass_session_new();
/* Set hostname and port */
cass_cluster_set_contact_points(cluster, hostname.c_str());
cass_cluster_set_port(cluster, port);
/* Force protcol version to 1 */
cass_cluster_set_protocol_version(cluster, 1);
/* Connect to the server */
CassError rc = CASS_OK;
CassFuture* future = cass_session_connect(session, cluster);
myClient = new CassandraClient(prot);
tr->open();
/* Wait for successful connection */
cass_future_wait(future);
myClient->describe_cluster_name(clusterName);
std::cout << "Connected to cluster: " << clusterName << "\n";
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
printError(future);
cass_future_free(future);
return false;
}
cass_future_free(future);
std::cout << "Connected to cluster.\n";
return true;
}
/**
* @details
* This function updates the local copy of the list
* of available keyspaces. It will be called any time
* a existsKeyspace query is performed to take note
* This function updates the local copy of the data
* store schema information. It is called whenever
* an existsKeyspace query is performed to take note
* of recently added keyspaces.
*/
void CassandraBackend::updateKeySpaces()
void CassandraBackend::updateSchema()
{
myClient->describe_keyspaces(keySpaces);
schema = cass_session_get_schema(session);
}
/**
* @details
* This function iterates all known keyspaces to check
* if a keyspace with a given name can be found.
* This function tries to retrieve schema information
* about a given keyspace to determine if the keyspace
* exists
*/
bool CassandraBackend::existsKeyspace(std::string name)
{
updateKeySpaces();
for (std::vector<KsDef>::iterator it = keySpaces.begin(); it != keySpaces.end(); ++it) {
if ((*it).name.compare(name) == 0)
return true;
}
return false;
updateSchema();
const CassSchemaMeta* keyspaceMeta = cass_schema_get_keyspace(schema, name.c_str());
if (keyspaceMeta != NULL)
return true;
else
return false;
}
/**
* @details
* This function creates a new keyspace with a given name and
* replication factor.
*
* We are using CQL3 to do this as its syntax is less likely
* to change in future versions than the native API.
*/
void CassandraBackend::createKeyspace(std::string name, int replicationFactor)
{
CqlResult res;
std::string query;
std::string rFact = boost::lexical_cast<std::string>(replicationFactor);
if(validateName(name)) {
query = "CREATE KEYSPACE " + name + " WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '" + rFact + "' };";
myClient->execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
if(executeSimpleQuery(query) != CASS_OK) {
std::cerr << "Failed to create keyspace " << name << "!" << std::endl;
}
}
}
......@@ -86,34 +145,49 @@ void CassandraBackend::createKeyspace(std::string name, int replicationFactor)
*/
void CassandraBackend::selectKeyspace(std::string name)
{
CqlResult res;
std::string query;
if (validateName(name)) {
if (validateName(name) && existsKeyspace(name)) {
query = "USE " + name + ";";
myClient->execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
updateKeySpaces();
for (std::vector<KsDef>::iterator it = keySpaces.begin(); it != keySpaces.end(); ++it) {
if ((*it).name.compare(name) == 0)
currentKeySpace = *it;
if (executeSimpleQuery(query) != CASS_OK) {
std::cerr << "Error selecting keyspace " << name << "!" << std::endl;
}
currentKeyspace = name;
}
}
/**
* @details
* This function iterates over all column families in the key
* space that was selected by selectKeyspace to check if a
* column family with the given name exists.
* If no keyspace was selected the returned string is empty.
*/
std::string CassandraBackend::getActiveKeyspace()
{
return currentKeyspace;
}
/**
* @details
* This function tries to query meta information for a given
* column family (table) to check whether the column family
* exists.
*/
bool CassandraBackend::existsColumnFamily(std::string name)
{
for (std::vector<CfDef>::iterator it = currentKeySpace.cf_defs.begin(); it != currentKeySpace.cf_defs.end(); ++it) {
if ((*it).name.compare(name) == 0)
return true;
updateSchema();
const CassSchemaMeta* keyspaceMeta = cass_schema_get_keyspace(schema, currentKeyspace.c_str());
if (keyspaceMeta == NULL) {
/* It is a bit misleading to return false if the keyspace doesn't even exist... */
return false;
}
return false;
const CassSchemaMeta* tableMeta = cass_schema_meta_get_entry(keyspaceMeta, name.c_str());
if (tableMeta == NULL) {
return false;
}
return true;
}
/**
......@@ -123,7 +197,6 @@ bool CassandraBackend::existsColumnFamily(std::string name)
*/
void CassandraBackend::createColumnFamily(std::string name, std::string fields, std::string primaryKey, std::string options)
{
CqlResult res;
std::stringstream query;
/* FIXME: Secure this and use proper types for fields, primaryKey, and options. */
......@@ -131,7 +204,7 @@ void CassandraBackend::createColumnFamily(std::string name, std::string fields,
<< " ( " << fields << ", PRIMARY KEY (" << primaryKey << "))"
<< " WITH " << options << ";";
myClient->execute_cql3_query(res, query.str(), Compression::NONE, ConsistencyLevel::ONE);
executeSimpleQuery(query.str());
}
/**
......@@ -171,63 +244,66 @@ std::string CassandraBackend::int64Convert(uint64_t n)
/**
* @details
* Since Cassandra uses manly strings to communicate in Thrift,
* this function converts the ts and value parameters to big-endian
* Since Cassandra uses strings to communicate in CQL, this
* function converts the ts and value parameters to big-endian
* byte arrays using the int64convert function.
*
* FIXME: Do performance optimizations, e.g. the prepared
* statement should not be created on every insert
*/
void CassandraBackend::insert(std::string columnFamily, std::string key, uint64_t ts, uint64_t value)
void CassandraBackend::insert(std::string key, uint64_t ts, uint64_t value)
{
try {
ColumnParent cparent;
Column c;
std::string name, cvalue;
cparent.column_family = columnFamily;
/*
* Convert to Cassandra formats and assign
* Cassandra-internal timestamp.
*/
name = int64Convert(ts);
cvalue = int64Convert(value);
c.name = name;
c.value = cvalue;
c.__isset.value = true;
c.timestamp = ts/1000;
c.__isset.timestamp = true;
/*
* Call Hector to do the insert.
*/
myClient->insert(key, cparent, c, ConsistencyLevel::ONE);
}
/*
* TODO: All caught exceptions should be handled more gracefully
* as we would like to keep the CollectAgent running as long as
* possible.
*/
catch(const TTransportException& te){
std::cout << "TP Exception: " << te.what() << "[" << te.getType() << "]\n";
exit(EXIT_FAILURE);
}
catch(const InvalidRequestException& ire){
std::cout << "IRE Exception: " << ire.what() << "[" << ire.why << "]\n";
exit(EXIT_FAILURE);
#if 0
std::cout << "Inserting@CassandraBackend ( <<KEY>>" << ", " << ts << ", " << value << ")" << std::endl;
#endif
CassError rc = CASS_OK;
CassStatement* statement = NULL;
CassFuture* future = NULL;
const CassPrepared* prepared = NULL;
CassString query = cass_string_init("INSERT INTO dcdb.sensordata (sid, ts, value) VALUES (?, ?, ?);");
future = cass_session_prepare(session, query);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
printError(future);
} else {
prepared = cass_future_get_prepared(future);
}
catch(const NotFoundException& nfe){
std::cout << "NF Exception: " << nfe.what() << "\n";
exit(EXIT_FAILURE);
statement = cass_prepared_bind(prepared);
CassBytes sid = cass_bytes_init((cass_byte_t*)(key.c_str()), 16);
cass_statement_bind_bytes_by_name(statement, "sid", sid);
cass_statement_bind_int64_by_name(statement, "ts", ts);
cass_statement_bind_int64_by_name(statement, "value", value);
future = cass_session_execute(session, statement