Commit b8d8561d authored by Axel Auweter's avatar Axel Auweter
Browse files

Created SensorDataStore class which acts as the interface to Cassandra.

This results in a much cleaner collectagent.cpp file and main function.
However, further cleanups are necessary and the respective TODOs can be
found in the code.
parent 6646b3d0
include ../config.mk
#CXXFLAGS = $(shell ./cxx11flags.sh $(CXX) -O0 -g -Wall -fmessage-length=0 -I$(DCDBDEPLOYPATH)/include/ | head -1)
CXXFLAGS = -O0 -g -Wall -fmessage-length=0 -I$(DCDBDEPLOYPATH)/include/
OBJS = collectagent.o simplemqttserver.o simplemqttserverthread.o simplemqttservermessage.o cassandra/Cassandra.o cassandra/cassandra_constants.o cassandra/cassandra_types.o
OBJS = collectagent.o \
simplemqttserver.o \
simplemqttserverthread.o \
simplemqttservermessage.o \
sensordatastore.o \
cassandra/Cassandra.o \
cassandra/cassandra_constants.o \
cassandra/cassandra_types.o
SRC = $(patsubst cassandra/%,,$(OBJS:.o=.cpp))
#LIBS = $(shell ./cxx11flags.sh $(CXX) -L$(DCDBDEPLOYPATH)/lib/ -lthrift -lssl -lcrypto -lpthread -lboost_system -lboost_thread | tail -1)
LIBS = -L$(DCDBDEPLOYPATH)/lib/ -lthrift -lssl -lcrypto -lpthread -lboost_system -lboost_thread
TARGET = collectagent
......
......@@ -9,112 +9,25 @@
#include <cstdlib>
#include <sstream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <signal.h>
#include <sys/time.h>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <thrift/Thrift.h>
#include <thrift/transport/TTransport.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/transport/TSocket.h>
#include <thrift/protocol/TProtocol.h>
#include <thrift/protocol/TBinaryProtocol.h>
#include "cassandra/Cassandra.h"
#include "simplemqttserver.h"
#include "mosquitto.h"
#include "sensordatastore.h"
using namespace std;
using namespace apache::thrift;
using namespace apache::thrift::transport;
using namespace apache::thrift::protocol;
using namespace org::apache::cassandra;
CassandraClient *myClient;
int keepRunning;
uint64_t msgCtr;
uint64_t pmsgCtr;
#pragma pack(push,1)
typedef union {
uint64_t raw;
struct {
uint8_t knc_id;
uint8_t bnc_id;
uint8_t bic_id;
uint8_t chassis_id;
uint8_t rack_id_lsb;
uint8_t rack_id_msb;
uint8_t cluster_id;
uint8_t datacenter_id;
};
} DeviceLocation;
typedef struct {
uint64_t sensor_id : 16;
uint64_t device_id : 48;
} DeviceSensorId;
typedef union {
uint64_t raw[2];
struct {
DeviceLocation dl;
DeviceSensorId dsid;
};
} SensorId;
#pragma pack(pop)
SensorDataStore *mySensorDataStore;
void sigHandler(int sig)
{
keepRunning = 0;
}
bool topicToSid(SensorId* sid, string topic)
{
uint64_t pos = 0;
const char* buf = topic.c_str();
sid->raw[0] = 0;
sid->raw[1] = 0;
while (*buf) {
if (*buf >= '0' && *buf <= '9') {
sid->raw[pos / 64] |= (((uint64_t)(*buf - '0')) << (60-(pos%64)));
pos += 4;
}
else if (*buf >= 'A' && *buf <= 'F') {
sid->raw[pos / 64] |= (((uint64_t)(*buf - 'A' + 0xa)) << (60-(pos%64)));
pos += 4;
}
else if (*buf >= 'a' && *buf <= 'f') {
sid->raw[pos / 64] |= (((uint64_t)(*buf - 'a' + 0xa)) << (60-(pos%64)));
pos += 4;
}
buf++;
}
return pos == 128;
}
string sidConvert(SensorId *sid)
{
uint64_t ll[2];
ll[0] = __builtin_bswap64(sid->raw[0]);
ll[1]= __builtin_bswap64(sid->raw[1]);
return string((char*)ll, 16);
}
string int64Convert(uint64_t n)
{
n = __builtin_bswap64(n);
return string((char*)&n, 8);
}
void mqttCallback(SimpleMQTTMessage *msg)
{
/*
......@@ -127,189 +40,78 @@ void mqttCallback(SimpleMQTTMessage *msg)
/*
* Decode the message and put into the database.
*/
try {
#if 0
msg->dump();
msg->dump();
#endif
if (msg->isPublish()) {
ColumnParent cparent;
Column c;
string key, name, value;
boost::posix_time::ptime epoch(boost::gregorian::date(1970,1,1));
boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
boost::posix_time::time_duration diff = now - epoch;
cparent.column_family = "sensordata";
uint64_t ts = diff.total_nanoseconds();
SensorId sid;
if (topicToSid(&sid,msg->getTopic())) {
if (msg->isPublish()) {
/*
* Calculate Time Stamp.
*/
boost::posix_time::ptime epoch(boost::gregorian::date(1970,1,1));
boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
boost::posix_time::time_duration diff = now - epoch;
uint64_t ts = diff.total_nanoseconds();
/*
* Check if we can decode the message topic
* into a valid SensorId. If successful, store
* the record in the database.
*/
SensorId sid;
if (mySensorDataStore->topicToSid(&sid,msg->getTopic())) {
#if 0
cout << "Topic decode successful:"
<< "\nRaw: " << sid.raw[0] << " " << sid.raw[1]
<< "\ndatacenter_id: " << hex << (uint32_t)sid.dl.datacenter_id
<< "\ncluster_id: " << hex << (uint32_t)sid.dl.cluster_id
<< "\nrack_id_msb: " << hex << (uint32_t)sid.dl.rack_id_msb
<< "\nrack_id_lsb: " << hex << (uint32_t)sid.dl.rack_id_lsb
<< "\nchassis_id: " << hex << (uint32_t)sid.dl.chassis_id
<< "\nbic_id: " << hex << (uint32_t)sid.dl.bic_id
<< "\nbnc_id: " << hex << (uint32_t)sid.dl.bnc_id
<< "\nknc_id: " << hex << (uint32_t)sid.dl.knc_id
<< "\ndevice_id: " << hex << sid.dsid.device_id
<< "\nsensor_id: " << hex << sid.dsid.sensor_id
<< "\n";
cout << "Topic decode successful:"
<< "\nRaw: " << sid.raw[0] << " " << sid.raw[1]
<< "\ndatacenter_id: " << hex << (uint32_t)sid.dl.datacenter_id
<< "\ncluster_id: " << hex << (uint32_t)sid.dl.cluster_id
<< "\nrack_id_msb: " << hex << (uint32_t)sid.dl.rack_id_msb
<< "\nrack_id_lsb: " << hex << (uint32_t)sid.dl.rack_id_lsb
<< "\nchassis_id: " << hex << (uint32_t)sid.dl.chassis_id
<< "\nbic_id: " << hex << (uint32_t)sid.dl.bic_id
<< "\nbnc_id: " << hex << (uint32_t)sid.dl.bnc_id
<< "\nknc_id: " << hex << (uint32_t)sid.dl.knc_id
<< "\ndevice_id: " << hex << sid.dsid.device_id
<< "\nsensor_id: " << hex << sid.dsid.sensor_id
<< "\n";
#endif
key = sidConvert(&sid);
name = int64Convert(ts);
value = int64Convert(*((uint64_t*)msg->getPayload()));
c.name = name;
c.value = value;
c.__isset.value = true;
c.timestamp = ts/1000;
c.__isset.timestamp = true;
myClient->insert(key, cparent, c, ConsistencyLevel::ONE);
}
#if 0
key = convert(&sid, sizeof(msgCtr));
name = convert(&ts, sizeof(ts));
float tmp = 3.141;
value = convert(&tmp, 4);
c.name = name;
c.value = value;
c.__isset.value = true;
c.timestamp = ts/1000;
c.__isset.timestamp = true;
myClient->insert(key, cparent, c, ConsistencyLevel::ONE);
#endif
mySensorDataStore->insert(&sid, ts, *((uint64_t*)msg->getPayload()));
}
}
catch(const TTransportException& te){
cout << "TP Exception: " << te.what() << "[" << te.getType() << "]\n";
}
catch(const InvalidRequestException& ire){
cout << "IRE Exception: " << ire.what() << "[" << ire.why << "]\n";
}
catch(const NotFoundException& nfe){
cout << "NF Exception: " << nfe.what() << "\n";
}
delete msg;
}
int main(void) {
boost::shared_ptr<TSocket> sock;
boost::shared_ptr<TTransport> tr;
boost::shared_ptr<TProtocol> prot;
std::string clusterName;
sock = boost::shared_ptr<TSocket>(new TSocket("localhost", 9160));
tr = boost::shared_ptr<TFramedTransport>(new TFramedTransport(sock));
prot = boost::shared_ptr<TBinaryProtocol>(new TBinaryProtocol(tr));
/*
* Open the connection to the Cassandra database and
* create the necessary keyspace and column family.
*/
try {
myClient = new CassandraClient(prot);
tr->open();
start:
myClient->describe_cluster_name(clusterName);
cout << "Cluster name: " << clusterName << "\n";
int dcdbKeyspace = -1;
cout << "Keyspaces:\n";
std::vector<KsDef> keySpaces;
myClient->describe_keyspaces(keySpaces);
for (unsigned int i=0; i<keySpaces.size(); i++) {
cout << " [" << i << "]: " << keySpaces[i].name << "\n";
if(keySpaces[i].name == "dcdb") {
dcdbKeyspace = i;
}
}
CqlResult res;
std::string query;
if (dcdbKeyspace<0) {
cout << "Creating dcdb keyspace...\n";
query = "CREATE KEYSPACE dcdb WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '1'};";
cout << "Sending CQL statement: " << query.c_str();
myClient->execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
cout << " --> Success!\n";
cout << "Starting over...\n\n";
goto start;
}
else {
cout << "Using existing keyspace dcdb...\n";
}
query = "USE dcdb;";
cout << "Sending CQL statement: " << query;
myClient->execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
cout << " --> Success!\n";
int sensordataCf = -1;
cout << "Column families in dcdb:\n";
for (unsigned int i=0; i<keySpaces[dcdbKeyspace].cf_defs.size(); i++) {
cout << " [" << i << "]: " << keySpaces[dcdbKeyspace].cf_defs[i].name << "\n";
if (keySpaces[dcdbKeyspace].cf_defs[i].name == "sensordata") {
sensordataCf = i;
}
}
if (sensordataCf<0) {
cout << "Creating sensordata column familiy...\n";
query = "CREATE TABLE sensordata ( sid blob, ts bigint, value bigint, PRIMARY KEY (sid, ts)) WITH COMPACT STORAGE;";
cout << "Sending CQL statement: " << query;
myClient->execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
cout << " --> Success!\n";
cout << "Starting over...\n\n";
goto start;
}
else {
cout << "Using existing sensordata column familiy.\n";
}
}
catch(const TTransportException& te){
cout << "TP Exception: " << te.what() << "[" << te.getType() << "]\n";
exit(EXIT_FAILURE);
}
catch(const InvalidRequestException& ire){
cout << "IRE Exception: " << ire.what() << "[" << ire.why << "]\n";
exit(EXIT_FAILURE);
}
catch(const NotFoundException& nfe){
cout << "NF Exception: " << nfe.what() << "\n";
exit(EXIT_FAILURE);
}
/* Catch SIGINT signals */
signal(SIGINT, sigHandler);
try{
keepRunning = 1;
timeval start, end;
double elapsed;
/*
* Allocate and initialize sensor data store.
*/
mySensorDataStore = new SensorDataStore();
mySensorDataStore->init();
/*
* Catch SIGINT signals to allow for proper server shutdowns.
*/
signal(SIGINT, sigHandler);
/*
* Start the MQTT Message Server.
*/
SimpleMQTTServer ms;
ms.setMessageCallback(mqttCallback);
ms.start();
cout << "Server running...\n";
/*
* Run (hopefully) forever...
*/
keepRunning = 1;
timeval start, end;
double elapsed;
while(keepRunning) {
gettimeofday(&start, NULL);
sleep(60);
......@@ -326,10 +128,12 @@ int main(void) {
cout << "Stopping...\n";
ms.stop();
delete mySensorDataStore;
}
catch (const exception& e) {
cout << "Exception: " << e.what() << "\n";
exit(EXIT_FAILURE);
}
return 0;
return EXIT_SUCCESS;
}
/*
* sensordatastore.cpp
*
* Created on: Jul 24, 2013
* Author: Axel Auweter
*/
#include "sensordatastore.h"
bool SensorDataStore::topicToSid(SensorId* sid, string topic)
{
uint64_t pos = 0;
const char* buf = topic.c_str();
sid->raw[0] = 0;
sid->raw[1] = 0;
while (*buf) {
if (*buf >= '0' && *buf <= '9') {
sid->raw[pos / 64] |= (((uint64_t)(*buf - '0')) << (60-(pos%64)));
pos += 4;
}
else if (*buf >= 'A' && *buf <= 'F') {
sid->raw[pos / 64] |= (((uint64_t)(*buf - 'A' + 0xa)) << (60-(pos%64)));
pos += 4;
}
else if (*buf >= 'a' && *buf <= 'f') {
sid->raw[pos / 64] |= (((uint64_t)(*buf - 'a' + 0xa)) << (60-(pos%64)));
pos += 4;
}
buf++;
}
return pos == 128;
}
string SensorDataStore::sidConvert(SensorId *sid)
{
uint64_t ll[2];
ll[0] = __builtin_bswap64(sid->raw[0]);
ll[1]= __builtin_bswap64(sid->raw[1]);
return string((char*)ll, 16);
}
string SensorDataStore::int64Convert(uint64_t n)
{
n = __builtin_bswap64(n);
return string((char*)&n, 8);
}
void SensorDataStore::insert(SensorId* sid, uint64_t ts, uint64_t value)
{
try {
ColumnParent cparent;
Column c;
string key, name, cvalue;
cparent.column_family = "sensordata";
/*
* Convert to Cassandra formats and assign
* Cassandra-internal timestamp.
*/
key = sidConvert(sid);
name = int64Convert(ts);
cvalue = int64Convert(value);
c.name = name;
c.value = cvalue;
c.__isset.value = true;
c.timestamp = ts/1000;
c.__isset.timestamp = true;
/*
* Call Hector to do the insert.
*/
myClient->insert(key, cparent, c, ConsistencyLevel::ONE);
}
/*
* TODO: All caught exceptions should be handled more gracefully
* as we would like to keep the CollectAgent running as long as
* possible.
*/
catch(const TTransportException& te){
cout << "TP Exception: " << te.what() << "[" << te.getType() << "]\n";
exit(EXIT_FAILURE);
}
catch(const InvalidRequestException& ire){
cout << "IRE Exception: " << ire.what() << "[" << ire.why << "]\n";
exit(EXIT_FAILURE);
}
catch(const NotFoundException& nfe){
cout << "NF Exception: " << nfe.what() << "\n";
exit(EXIT_FAILURE);
}
}
/*
* TODO: Split the following function up into
* multiple functions and parameterize the process
* a bit (e.g. make hostname and port configurable)!
*/
void SensorDataStore::init()
{
sock = boost::shared_ptr<TSocket>(new TSocket("localhost", 9160));
tr = boost::shared_ptr<TFramedTransport>(new TFramedTransport(sock));
prot = boost::shared_ptr<TBinaryProtocol>(new TBinaryProtocol(tr));
/*
* Open the connection to the Cassandra database and
* create the necessary keyspace and column family.
*/
try {
myClient = new CassandraClient(prot);
tr->open();
start:
myClient->describe_cluster_name(clusterName);
cout << "Cluster name: " << clusterName << "\n";
int dcdbKeyspace = -1;
cout << "Keyspaces:\n";
std::vector<KsDef> keySpaces;
myClient->describe_keyspaces(keySpaces);
for (unsigned int i=0; i<keySpaces.size(); i++) {
cout << " [" << i << "]: " << keySpaces[i].name << "\n";
if(keySpaces[i].name == "dcdb") {
dcdbKeyspace = i;
}
}
CqlResult res;
std::string query;
if (dcdbKeyspace<0) {
cout << "Creating dcdb keyspace...\n";
query = "CREATE KEYSPACE dcdb WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '1'};";
cout << "Sending CQL statement: " << query.c_str();
myClient->execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
cout << " --> Success!\n";
cout << "Starting over...\n\n";
goto start;
}
else {
cout << "Using existing keyspace dcdb...\n";
}
query = "USE dcdb;";
cout << "Sending CQL statement: " << query;
myClient->execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
cout << " --> Success!\n";
int sensordataCf = -1;
cout << "Column families in dcdb:\n";
for (unsigned int i=0; i<keySpaces[dcdbKeyspace].cf_defs.size(); i++) {
cout << " [" << i << "]: " << keySpaces[dcdbKeyspace].cf_defs[i].name << "\n";
if (keySpaces[dcdbKeyspace].cf_defs[i].name == "sensordata") {
sensordataCf = i;
}
}
if (sensordataCf<0) {
cout << "Creating sensordata column familiy...\n";
query = "CREATE TABLE sensordata ( sid blob, ts bigint, value bigint, PRIMARY KEY (sid, ts)) WITH COMPACT STORAGE;";
cout << "Sending CQL statement: " << query;
myClient->execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
cout << " --> Success!\n";
cout << "Starting over...\n\n";
goto start;
}
else {
cout << "Using existing sensordata column familiy.\n";
}
}
catch(const TTransportException& te){
cout << "TP Exception: " << te.what() << "[" << te.getType() << "]\n";
exit(EXIT_FAILURE);
}
catch(const InvalidRequestException& ire){
cout << "IRE Exception: " << ire.what() << "[" << ire.why << "]\n";
exit(EXIT_FAILURE);
}
catch(const NotFoundException& nfe){
cout << "NF Exception: " << nfe.what() << "\n";
exit(EXIT_FAILURE);
}
}
SensorDataStore::SensorDataStore()
{
myClient = NULL;
}
SensorDataStore::~SensorDataStore()
{
/*
* Clean up...
*/
if (myClient)
delete myClient;
}
/*
* sensordatastore.h
*
* Created on: Jul 24, 2013
* Author: Axel Auweter
*/
#include <stdint.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <thrift/Thrift.h>
#include <thrift/transport/TTransport.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/transport/TSocket.h>
#include <thrift/protocol/TProtocol.h>
#include <thrift/protocol/TBinaryProtocol.h>
#include "cassandra/Cassandra.h"
#ifndef SENSORDATASTORE_H_
#define SENSORDATASTORE_H_