Commit b5d54e0c authored by Axel Auweter's avatar Axel Auweter
Browse files

Cleanups (mainly in sensordatastore).

parent 99b8140a
......@@ -7,8 +7,6 @@
//============================================================================
#include <cstdlib>
#include <sstream>
#include <signal.h>
#include <boost/date_time/posix_time/posix_time.hpp>
......@@ -40,9 +38,6 @@ void mqttCallback(SimpleMQTTMessage *msg)
/*
* Decode the message and put into the database.
*/
#if 0
msg->dump();
#endif
if (msg->isPublish()) {
/*
* Calculate Time Stamp.
......@@ -86,15 +81,14 @@ int main(void) {
try{
/*
* Allocate and initialize sensor data store.
* Catch SIGINT signals to allow for proper server shutdowns.
*/
mySensorDataStore = new SensorDataStore();
mySensorDataStore->init();
signal(SIGINT, sigHandler);
/*
* Catch SIGINT signals to allow for proper server shutdowns.
* Allocate and initialize sensor data store.
*/
signal(SIGINT, sigHandler);
mySensorDataStore = new SensorDataStore();
/*
* Start the MQTT Message Server.
......
......@@ -5,8 +5,28 @@
* Author: Axel Auweter
*/
#include <boost/lexical_cast.hpp>
#include "sensordatastore.h"
bool SensorDataStore::validateName(string name)
{
/*
* Make sure name only consists of alphabetical characters (super ugly)...
*/
class {
public:
static bool check(char c) {
return !isalpha(c);
}
} isNotAlpha;
if (find_if(name.begin(), name.end(), isNotAlpha.check) == name.end())
return true;
else
return false;
}
bool SensorDataStore::topicToSid(SensorId* sid, string topic)
{
uint64_t pos = 0;
......@@ -92,81 +112,110 @@ void SensorDataStore::insert(SensorId* sid, uint64_t ts, uint64_t value)
}
}
/*
* TODO: Split the following function up into
* multiple functions and parameterize the process
* a bit (e.g. make hostname and port configurable)!
*/
void SensorDataStore::init()
void SensorDataStore::connect(string hostname, int port)
{
sock = boost::shared_ptr<TSocket>(new TSocket("localhost", 9160));
sock = boost::shared_ptr<TSocket>(new TSocket(hostname, port));
tr = boost::shared_ptr<TFramedTransport>(new TFramedTransport(sock));
prot = boost::shared_ptr<TBinaryProtocol>(new TBinaryProtocol(tr));
myClient = new CassandraClient(prot);
tr->open();
myClient->describe_cluster_name(clusterName);
cout << "Connected to cluster: " << clusterName << "\n";
}
void SensorDataStore::updateKeySpaces()
{
myClient->describe_keyspaces(keySpaces);
}
bool SensorDataStore::existsKeyspace(string name)
{
updateKeySpaces();
for (std::vector<KsDef>::iterator it = keySpaces.begin(); it != keySpaces.end(); ++it) {
if ((*it).name.compare(name) == 0)
return true;
}
return false;
}
/*
* Create the keyspace that contains all the sensor data.
* We are using CQL3 to do this as its syntax is less likely
* to change in future versions than the native API.
*/
void SensorDataStore::createKeyspace(string name, int replicationFactor)
{
CqlResult res;
string query;
string rFact = boost::lexical_cast<string>(replicationFactor);
if(validateName(name)) {
query = "CREATE KEYSPACE " + name + " WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '" + rFact + "' };";
myClient->execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
}
}
void SensorDataStore::selectKeyspace(string name)
{
CqlResult res;
string query;
if (validateName(name)) {
query = "USE " + name + ";";
myClient->execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
updateKeySpaces();
for (std::vector<KsDef>::iterator it = keySpaces.begin(); it != keySpaces.end(); ++it) {
if ((*it).name.compare(name) == 0)
currentKeySpace = *it;
}
}
}
bool SensorDataStore::existsColumnFamilies()
{
for (std::vector<CfDef>::iterator it = currentKeySpace.cf_defs.begin(); it != currentKeySpace.cf_defs.end(); ++it) {
if ((*it).name.compare("sensordata") == 0)
return true;
}
return false;
}
void SensorDataStore::createColumnFamilies()
{
CqlResult res;
string query;
query = "CREATE TABLE sensordata ( sid blob, ts bigint, value bigint, PRIMARY KEY (sid, ts)) WITH COMPACT STORAGE;";
myClient->execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
}
void SensorDataStore::init(string hostname, int port)
{
/*
* Open the connection to the Cassandra database and
* create the necessary keyspace and column family.
*/
try {
myClient = new CassandraClient(prot);
tr->open();
start:
myClient->describe_cluster_name(clusterName);
cout << "Cluster name: " << clusterName << "\n";
int dcdbKeyspace = -1;
cout << "Keyspaces:\n";
std::vector<KsDef> keySpaces;
myClient->describe_keyspaces(keySpaces);
for (unsigned int i=0; i<keySpaces.size(); i++) {
cout << " [" << i << "]: " << keySpaces[i].name << "\n";
if(keySpaces[i].name == "dcdb") {
dcdbKeyspace = i;
}
}
connect(hostname, port);
CqlResult res;
std::string query;
if (dcdbKeyspace<0) {
cout << "Creating dcdb keyspace...\n";
query = "CREATE KEYSPACE dcdb WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '1'};";
cout << "Sending CQL statement: " << query.c_str();
myClient->execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
cout << " --> Success!\n";
cout << "Starting over...\n\n";
goto start;
}
else {
cout << "Using existing keyspace dcdb...\n";
if (!existsKeyspace(KEYSPACE_NAME)) {
cout << "Creating Keyspace " << KEYSPACE_NAME << "...\n";
createKeyspace(KEYSPACE_NAME, 1);
}
query = "USE dcdb;";
cout << "Sending CQL statement: " << query;
myClient->execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
cout << " --> Success!\n";
int sensordataCf = -1;
cout << "Column families in dcdb:\n";
for (unsigned int i=0; i<keySpaces[dcdbKeyspace].cf_defs.size(); i++) {
cout << " [" << i << "]: " << keySpaces[dcdbKeyspace].cf_defs[i].name << "\n";
if (keySpaces[dcdbKeyspace].cf_defs[i].name == "sensordata") {
sensordataCf = i;
}
}
selectKeyspace(KEYSPACE_NAME);
if (sensordataCf<0) {
cout << "Creating sensordata column familiy...\n";
query = "CREATE TABLE sensordata ( sid blob, ts bigint, value bigint, PRIMARY KEY (sid, ts)) WITH COMPACT STORAGE;";
cout << "Sending CQL statement: " << query;
myClient->execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
cout << " --> Success!\n";
cout << "Starting over...\n\n";
goto start;
if (!currentKeySpace.name.compare(KEYSPACE_NAME) == 0) {
cout << "Cannot select keyspace " << KEYSPACE_NAME << "\n";
exit(EXIT_FAILURE);
}
else {
cout << "Using existing sensordata column familiy.\n";
if (!existsColumnFamilies()) {
cout << "Creating Column Families...\n";
createColumnFamilies();
}
}
catch(const TTransportException& te){
......@@ -185,7 +234,12 @@ void SensorDataStore::init()
SensorDataStore::SensorDataStore()
{
myClient = NULL;
init("localhost", 9160);
}
SensorDataStore::SensorDataStore(string hostname, int port)
{
init(hostname, port);
}
SensorDataStore::~SensorDataStore()
......
......@@ -27,6 +27,8 @@ using namespace apache::thrift::protocol;
using namespace org::apache::cassandra;
using namespace std;
#define KEYSPACE_NAME "dcdb"
#pragma pack(push,1)
typedef union {
......@@ -62,20 +64,33 @@ class SensorDataStore
{
protected:
CassandraClient *myClient;
std::vector<KsDef> keySpaces;
KsDef currentKeySpace;
string clusterName;
boost::shared_ptr<TSocket> sock;
boost::shared_ptr<TTransport> tr;
boost::shared_ptr<TProtocol> prot;
std::string clusterName;
string sidConvert(SensorId *sid);
string int64Convert(uint64_t n);
void connect(string hostname, int port);
void updateKeySpaces();
bool existsKeyspace(string name);
void createKeyspace(string name, int replicationFactor);
void selectKeyspace(string name);
bool existsColumnFamilies();
void createColumnFamilies();
bool validateName(string name);
public:
void init();
void init(string hostname, int port);
void insert(SensorId* sid, uint64_t ts, uint64_t value);
bool topicToSid(SensorId* sid, string topic);
SensorDataStore();
SensorDataStore(string hostname, int port);
virtual ~SensorDataStore();
};
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment