Commit 4dab64ad authored by Axel Auweter's avatar Axel Auweter
Browse files

Started some refactoring of DCDBLib.

parent 6115e159
......@@ -7,6 +7,7 @@ CXXFLAGS = -O0 -g -Wall -Werror -Wno-unused-local-typedefs -Wno-unknown-warning-
# List of object files to build and the derived list of corresponding source files
OBJS = src/sensordatastore.o \
src/cassandraBackend.o \
cassandra/Cassandra.o \
cassandra/cassandra_constants.o \
cassandra/cassandra_types.o
......
......@@ -6,6 +6,7 @@
*/
#include <stdint.h>
#include <string>
#ifndef SENSORDATASTORE_H_
#define SENSORDATASTORE_H_
......@@ -41,11 +42,20 @@ typedef union {
#pragma pack(pop)
/* Forward-declaration of the implementation-internal classes */
class SensorDataStoreImpl;
class CassandraBackend;
/*
* SensorDataStore
*
* Use this class to initialize, write, and read sensor data.
*/
class SensorDataStore
{
private:
SensorDataStoreImpl* impl;
CassandraBackend* csBackend;
public:
void init(std::string hostname, int port);
......
/*
* cassandraBackend.h
*
* Created on: Apr 08, 2013
* Author: Axel Auweter
*/
#ifndef CASSANDRA_BACKEND_H
#define CASSANDRA_BACKEND_H
#include <sys/socket.h>
#include <netinet/in.h>
#include <boost/lexical_cast.hpp>
#include <thrift/Thrift.h>
#include <thrift/transport/TTransport.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/transport/TSocket.h>
#include <thrift/protocol/TProtocol.h>
#include <thrift/protocol/TBinaryProtocol.h>
#include <dcdbendian.h>
#include "../cassandra/Cassandra.h"
using namespace apache::thrift;
using namespace apache::thrift::transport;
using namespace apache::thrift::protocol;
using namespace org::apache::cassandra;
using namespace std;
class CassandraBackend
{
protected:
CassandraClient *myClient;
std::vector<KsDef> keySpaces;
string clusterName;
boost::shared_ptr<TSocket> sock;
boost::shared_ptr<TTransport> tr;
boost::shared_ptr<TProtocol> prot;
bool validateName(string name);
string int64Convert(uint64_t n);
public:
/* FIXME - make currentKeySpace protected! */
KsDef currentKeySpace;
/* Database meta operations */
void connect(string hostname, int port);
void updateKeySpaces();
bool existsKeyspace(string name);
void createKeyspace(string name, int replicationFactor);
void selectKeyspace(string name);
bool existsColumnFamily(string name);
void createColumnFamilies();
/* Database data access operations */
void insert(string key, uint64_t ts, uint64_t value);
/* Class constructor / desctructor */
CassandraBackend();
virtual ~CassandraBackend();
};
#endif /* CASSANDRA_BACKEND_H */
......@@ -8,61 +8,28 @@
#ifndef DCDB_INTERNAL_H
#define DCDB_INTERNAL_H
#include <sys/socket.h>
#include <netinet/in.h>
#include <boost/lexical_cast.hpp>
#include <thrift/Thrift.h>
#include <thrift/transport/TTransport.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/transport/TSocket.h>
#include <thrift/protocol/TProtocol.h>
#include <thrift/protocol/TBinaryProtocol.h>
#include "../cassandra/Cassandra.h"
#include <string>
#include "sensordatastore.h"
#include "cassandraBackend.h"
using namespace apache::thrift;
using namespace apache::thrift::transport;
using namespace apache::thrift::protocol;
using namespace org::apache::cassandra;
using namespace std;
#define KEYSPACE_NAME "dcdb"
#define KEYSPACE_NAME "dcdb"
#define CF_SENSORDATA "sensordata"
#define CF_SETTINGS "settings"
class SensorDataStoreImpl
{
protected:
CassandraClient *myClient;
std::vector<KsDef> keySpaces;
KsDef currentKeySpace;
string clusterName;
boost::shared_ptr<TSocket> sock;
boost::shared_ptr<TTransport> tr;
boost::shared_ptr<TProtocol> prot;
string sidConvert(SensorId *sid);
string int64Convert(uint64_t n);
void connect(string hostname, int port);
void updateKeySpaces();
bool existsKeyspace(string name);
void createKeyspace(string name, int replicationFactor);
void selectKeyspace(string name);
bool existsColumnFamilies();
void createColumnFamilies();
bool validateName(string name);
CassandraBackend* csBackend;
std::string sidConvert(SensorId *sid);
public:
void init(string hostname, int port);
bool topicToSid(SensorId* sid, std::string topic);
void insert(SensorId* sid, uint64_t ts, uint64_t value);
SensorDataStoreImpl();
virtual ~SensorDataStoreImpl();
void init(std::string hostname, int port);
bool topicToSid(SensorId* sid, std::string topic);
void insert(SensorId* sid, uint64_t ts, uint64_t value);
SensorDataStoreImpl(CassandraBackend *csb);
virtual ~SensorDataStoreImpl();
};
#endif
\ No newline at end of file
#endif
/*
* cassandraBackend.cpp
*
* Created on: Apr 08, 2013
* Author: Axel Auweter
*/
/* FIXME - remove this include (table naming and creation should be controlled by sensordatastore) */
#include "sensordatastore_internal.h"
#include "cassandraBackend.h"
void CassandraBackend::connect(string hostname, int port)
{
sock = boost::shared_ptr<TSocket>(new TSocket(hostname, port));
tr = boost::shared_ptr<TFramedTransport>(new TFramedTransport(sock));
prot = boost::shared_ptr<TBinaryProtocol>(new TBinaryProtocol(tr));
myClient = new CassandraClient(prot);
tr->open();
myClient->describe_cluster_name(clusterName);
cout << "Connected to cluster: " << clusterName << "\n";
}
void CassandraBackend::updateKeySpaces()
{
myClient->describe_keyspaces(keySpaces);
}
bool CassandraBackend::existsKeyspace(string name)
{
updateKeySpaces();
for (std::vector<KsDef>::iterator it = keySpaces.begin(); it != keySpaces.end(); ++it) {
if ((*it).name.compare(name) == 0)
return true;
}
return false;
}
/*
* Create the keyspace that contains all the sensor data.
* We are using CQL3 to do this as its syntax is less likely
* to change in future versions than the native API.
*/
void CassandraBackend::createKeyspace(string name, int replicationFactor)
{
CqlResult res;
string query;
string rFact = boost::lexical_cast<string>(replicationFactor);
if(validateName(name)) {
query = "CREATE KEYSPACE " + name + " WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '" + rFact + "' };";
myClient->execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
}
}
void CassandraBackend::selectKeyspace(string name)
{
CqlResult res;
string query;
if (validateName(name)) {
query = "USE " + name + ";";
myClient->execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
updateKeySpaces();
for (std::vector<KsDef>::iterator it = keySpaces.begin(); it != keySpaces.end(); ++it) {
if ((*it).name.compare(name) == 0)
currentKeySpace = *it;
}
}
}
bool CassandraBackend::existsColumnFamily(string name)
{
for (std::vector<CfDef>::iterator it = currentKeySpace.cf_defs.begin(); it != currentKeySpace.cf_defs.end(); ++it) {
if ((*it).name.compare(name) == 0)
return true;
}
return false;
}
void CassandraBackend::createColumnFamilies()
{
CqlResult res;
string query;
query = "CREATE TABLE " CF_SENSORDATA " ( sid blob, ts bigint, value bigint, PRIMARY KEY (sid, ts)) WITH COMPACT STORAGE;";
myClient->execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
}
bool CassandraBackend::validateName(string name)
{
/*
* Make sure name only consists of alphabetical characters (super ugly)...
*/
class {
public:
static bool check(char c) {
return !isalpha(c);
}
} isNotAlpha;
if (find_if(name.begin(), name.end(), isNotAlpha.check) == name.end())
return true;
else
return false;
}
string CassandraBackend::int64Convert(uint64_t n)
{
n = Endian::hostToBE(n);
return string((char*)&n, 8);
}
void CassandraBackend::insert(string key, uint64_t ts, uint64_t value)
{
try {
ColumnParent cparent;
Column c;
string key, name, cvalue;
cparent.column_family = CF_SENSORDATA;
/*
* Convert to Cassandra formats and assign
* Cassandra-internal timestamp.
*/
name = int64Convert(ts);
cvalue = int64Convert(value);
c.name = name;
c.value = cvalue;
c.__isset.value = true;
c.timestamp = ts/1000;
c.__isset.timestamp = true;
/*
* Call Hector to do the insert.
*/
myClient->insert(key, cparent, c, ConsistencyLevel::ONE);
}
/*
* TODO: All caught exceptions should be handled more gracefully
* as we would like to keep the CollectAgent running as long as
* possible.
*/
catch(const TTransportException& te){
cout << "TP Exception: " << te.what() << "[" << te.getType() << "]\n";
exit(EXIT_FAILURE);
}
catch(const InvalidRequestException& ire){
cout << "IRE Exception: " << ire.what() << "[" << ire.why << "]\n";
exit(EXIT_FAILURE);
}
catch(const NotFoundException& nfe){
cout << "NF Exception: " << nfe.what() << "\n";
exit(EXIT_FAILURE);
}
}
CassandraBackend::CassandraBackend()
{
}
CassandraBackend::~CassandraBackend()
{
/* Clean up... */
if (myClient)
delete myClient;
}
......@@ -5,9 +5,6 @@
* Author: Axel Auweter
*/
#include <boost/lexical_cast.hpp>
#include <dcdbendian.h>
#include "sensordatastore_internal.h"
string SensorDataStoreImpl::sidConvert(SensorId *sid)
......@@ -18,150 +15,6 @@ string SensorDataStoreImpl::sidConvert(SensorId *sid)
return string((char*)ll, 16);
}
string SensorDataStoreImpl::int64Convert(uint64_t n)
{
n = Endian::hostToBE(n);
return string((char*)&n, 8);
}
void SensorDataStoreImpl::connect(string hostname, int port)
{
sock = boost::shared_ptr<TSocket>(new TSocket(hostname, port));
tr = boost::shared_ptr<TFramedTransport>(new TFramedTransport(sock));
prot = boost::shared_ptr<TBinaryProtocol>(new TBinaryProtocol(tr));
myClient = new CassandraClient(prot);
tr->open();
myClient->describe_cluster_name(clusterName);
cout << "Connected to cluster: " << clusterName << "\n";
}
void SensorDataStoreImpl::updateKeySpaces()
{
myClient->describe_keyspaces(keySpaces);
}
bool SensorDataStoreImpl::existsKeyspace(string name)
{
updateKeySpaces();
for (std::vector<KsDef>::iterator it = keySpaces.begin(); it != keySpaces.end(); ++it) {
if ((*it).name.compare(name) == 0)
return true;
}
return false;
}
/*
* Create the keyspace that contains all the sensor data.
* We are using CQL3 to do this as its syntax is less likely
* to change in future versions than the native API.
*/
void SensorDataStoreImpl::createKeyspace(string name, int replicationFactor)
{
CqlResult res;
string query;
string rFact = boost::lexical_cast<string>(replicationFactor);
if(validateName(name)) {
query = "CREATE KEYSPACE " + name + " WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '" + rFact + "' };";
myClient->execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
}
}
void SensorDataStoreImpl::selectKeyspace(string name)
{
CqlResult res;
string query;
if (validateName(name)) {
query = "USE " + name + ";";
myClient->execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
updateKeySpaces();
for (std::vector<KsDef>::iterator it = keySpaces.begin(); it != keySpaces.end(); ++it) {
if ((*it).name.compare(name) == 0)
currentKeySpace = *it;
}
}
}
bool SensorDataStoreImpl::existsColumnFamilies()
{
for (std::vector<CfDef>::iterator it = currentKeySpace.cf_defs.begin(); it != currentKeySpace.cf_defs.end(); ++it) {
if ((*it).name.compare("sensordata") == 0)
return true;
}
return false;
}
void SensorDataStoreImpl::createColumnFamilies()
{
CqlResult res;
string query;
query = "CREATE TABLE sensordata ( sid blob, ts bigint, value bigint, PRIMARY KEY (sid, ts)) WITH COMPACT STORAGE;";
myClient->execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
}
bool SensorDataStoreImpl::validateName(string name)
{
/*
* Make sure name only consists of alphabetical characters (super ugly)...
*/
class {
public:
static bool check(char c) {
return !isalpha(c);
}
} isNotAlpha;
if (find_if(name.begin(), name.end(), isNotAlpha.check) == name.end())
return true;
else
return false;
}
void SensorDataStoreImpl::init(string hostname, int port) {
/*
* Open the connection to the Cassandra database and
* create the necessary keyspace and column family.
*/
try {
connect(hostname, port);
if (!existsKeyspace(KEYSPACE_NAME)) {
cout << "Creating Keyspace " << KEYSPACE_NAME << "...\n";
createKeyspace(KEYSPACE_NAME, 1);
}
selectKeyspace(KEYSPACE_NAME);
if (!(currentKeySpace.name.compare(KEYSPACE_NAME) == 0)) {
cout << "Cannot select keyspace " << KEYSPACE_NAME << "\n";
exit(EXIT_FAILURE);
}
if (!existsColumnFamilies()) {
cout << "Creating Column Families...\n";
createColumnFamilies();
}
}
catch(const TTransportException& te){
cout << "TP Exception: " << te.what() << "[" << te.getType() << "]\n";
exit(EXIT_FAILURE);
}
catch(const InvalidRequestException& ire){
cout << "IRE Exception: " << ire.what() << "[" << ire.why << "]\n";
exit(EXIT_FAILURE);
}
catch(const NotFoundException& nfe){
cout << "NF Exception: " << nfe.what() << "\n";
exit(EXIT_FAILURE);
}
}
bool SensorDataStoreImpl::topicToSid(SensorId* sid, string topic)
{
uint64_t pos = 0;
......@@ -186,39 +39,32 @@ bool SensorDataStoreImpl::topicToSid(SensorId* sid, string topic)
return pos == 128;
}
void SensorDataStoreImpl::insert(SensorId* sid, uint64_t ts, uint64_t value)
{
try {
ColumnParent cparent;
Column c;
string key, name, cvalue;
cparent.column_family = "sensordata";
/*
* Convert to Cassandra formats and assign
* Cassandra-internal timestamp.
*/
key = sidConvert(sid);
name = int64Convert(ts);
cvalue = int64Convert(value);
c.name = name;
c.value = cvalue;
c.__isset.value = true;
c.timestamp = ts/1000;
c.__isset.timestamp = true;
/*
* Call Hector to do the insert.
*/
myClient->insert(key, cparent, c, ConsistencyLevel::ONE);
}
void SensorDataStoreImpl::init(string hostname, int port) {
/*
* TODO: All caught exceptions should be handled more gracefully
* as we would like to keep the CollectAgent running as long as
* possible.
* Open the connection to the Cassandra database and
* create the necessary keyspace and column family.
*/
try {
csBackend->connect(hostname, port);
if (!csBackend->existsKeyspace(KEYSPACE_NAME)) {
cout << "Creating Keyspace " << KEYSPACE_NAME << "...\n";
csBackend->createKeyspace(KEYSPACE_NAME, 1);
}
csBackend->selectKeyspace(KEYSPACE_NAME);
if (!(csBackend->currentKeySpace.name.compare(KEYSPACE_NAME) == 0)) {
cout << "Cannot select keyspace " << KEYSPACE_NAME << "\n";
exit(EXIT_FAILURE);
}
if (!csBackend->existsColumnFamily(CF_SENSORDATA)) {
cout << "Creating Column Familiy " CF_SENSORDATA "...\n";
csBackend->createColumnFamilies();
}
}
catch(const TTransportException& te){
cout << "TP Exception: " << te.what() << "[" << te.getType() << "]\n";
exit(EXIT_FAILURE);
......@@ -233,27 +79,28 @@ void SensorDataStoreImpl::insert(SensorId* sid, uint64_t ts, uint64_t value)
}
}
SensorDataStoreImpl::SensorDataStoreImpl()
void SensorDataStoreImpl::insert(SensorId* sid, uint64_t ts, uint64_t value)
{
csBackend->insert(sidConvert(sid), ts, value);
}
SensorDataStoreImpl::SensorDataStoreImpl(CassandraBackend *csb)
{
csBackend = csb;
}
SensorDataStoreImpl::~SensorDataStoreImpl()
{
/*
* Clean up...
*/
if (myClient)
delete myClient;
}
void SensorDataStore::init(string hostname, int port)
{
/* Allocate new SensorDataStoreImpl Object if necessary */
if (!impl) {
impl = new SensorDataStoreImpl();
impl = new SensorDataStoreImpl(csBackend);
}
/* Call impl's init function */