Commit 1d5f1bfb authored by Axel Auweter's avatar Axel Auweter
Browse files

Refactoring stage 3:

* Added SensorConfig and DCDBPublicSensor classes as interface for dcdbconfig
* Fixed bugs and tested:
  --> CollectAgent fully working
  --> dcdbconfig fully working
parent 6fd96fd3
include ../config.mk
CXXFLAGS = -O2 -g --std=c++11 -Wall -Wno-unused-local-typedefs -Wno-unknown-warning-option -fmessage-length=0 -I$(DCDBDEPLOYPATH)/include/ -I$(DCDBBASEPATH)/include/ -DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG
CXXFLAGS = -O0 -g --std=c++11 -Wall -Wno-unused-local-typedefs -Wno-unknown-warning-option -fmessage-length=0 -I$(DCDBDEPLOYPATH)/include/ -I$(DCDBBASEPATH)/include/ -DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG
OBJS = collectagent.o \
simplemqttserver.o \
simplemqttserverthread.o \
......
......@@ -178,10 +178,11 @@ int main(int argc, char* const argv[]) {
/*
* Allocate and initialize connection to Cassandra.
*/
std::string sdHost(cassandraHost);
DCDBConnection dcdbConn(sdHost, 9042);
std::string sdHost = cassandraHost;
DCDBConnection* dcdbConn;
dcdbConn = new DCDBConnection(sdHost, 9042);
if (!dcdbConn.connect()) {
if (!dcdbConn->connect()) {
std::cout << "Cannot connect to Cassandra!" << std::endl;
exit(EXIT_FAILURE);
}
......@@ -189,12 +190,12 @@ int main(int argc, char* const argv[]) {
/*
* Legacy behavior: Initialize the DCDB schema in Cassandra.
*/
dcdbConn.initSchema();
dcdbConn->initSchema();
/*
* Allocate the SensorDataStore.
*/
mySensorDataStore = new SensorDataStore(&dcdbConn);
mySensorDataStore = new SensorDataStore(dcdbConn);
/*
* Set TTL for data store inserts if TTL > 0.
......@@ -243,7 +244,8 @@ int main(int argc, char* const argv[]) {
ms.stop();
delete mySensorDataStore;
dcdbConn.disconnect();
dcdbConn->disconnect();
delete dcdbConn;
}
catch (const exception& e) {
cout << "Exception: " << e.what() << "\n";
......
......@@ -9,6 +9,7 @@ CXXFLAGS = -O0 -ggdb -Wall -Werror \
OBJS = src/connection.o \
src/sensordatastore.o \
src/timestamp.o \
src/sensorconfig.o \
src/sensorid.o
# List of public header files necessary to use this libray
......
/*
* sensorconfig.h
*
* Created on: May 19, 2015
* Author: Axel Auweter
*/
/**
* @file
* @brief This file contains parts of the public API for the
* DCDBLib library.
* It contains the class definition of the SensorConfig class,
* that handles sensor configuration and initialization.
*/
#include <string>
#include <list>
#include "connection.h"
#include "cassandra.h"
#ifndef SENSORCONFIG_H
#define SENSORCONFIG_H
class SensorConfigImpl;
class DCDBPublicSensor
{
public:
std::string name;
std::string pattern;
double scaling_factor;
std::string unit;
DCDBPublicSensor();
DCDBPublicSensor(const DCDBPublicSensor &copy);
};
typedef enum {
SC_OK,
SC_INVALIDSESSION,
SC_INVALIDPATTERN,
SC_UNKNOWNERROR
} SCError;
class SensorConfig
{
protected:
SensorConfigImpl* impl;
public:
SCError publishSensor(const char* publicName, const char* sensorPattern);
SCError getPublicSensors(std::list<DCDBPublicSensor>& publicSensors);
SensorConfig(DCDBConnection* conn);
virtual ~SensorConfig();
};
#endif /* CONFIG_H */
/*
* sensorconfig_internal.h
*
* Created on: May 19, 2015
* Author: Axel Auweter
*/
#include <list>
#include "cassandra.h"
#include "sensorconfig.h"
#ifndef SENSORCONFIG_INTERNAL_H
#define SENSORCONFIG_INTERNAL_H
class SensorConfigImpl
{
protected:
DCDBConnection* connection;
CassSession* session;
bool validateSensorPattern(const char* sensorPattern);
public:
SCError publishSensor(const char* publicName, const char* sensorPattern);
SCError getPublicSensors(std::list<DCDBPublicSensor>& publicSensors);
SensorConfigImpl(DCDBConnection* conn);
virtual ~SensorConfigImpl();
};
#endif /* SENSORCONFIG_INTERNAL_H */
......@@ -63,21 +63,17 @@ CassSession* DCDBConnection::getSessionHandle() {
* and free it on object deallocation.
*/
DCDBConnection::DCDBConnection() {
if (!impl)
impl = new DCDBConnectionImpl();
impl = new DCDBConnectionImpl();
}
DCDBConnection::DCDBConnection(std::string hostname, uint16_t port) {
if (!impl)
impl = new DCDBConnectionImpl();
impl = new DCDBConnectionImpl();
impl->setHostname(hostname);
impl->setPort(port);
}
DCDBConnection::~DCDBConnection() {
if (impl)
delete impl;
delete impl;
}
......
/*
* sensorconfig.cpp
*
* Created on: May 19, 2015
* Author: Axel Auweter
*/
#include <cstring>
#include <iostream>
#include "cassandra.h"
#include "sensorconfig_internal.h"
#include "dcdbglobals.h"
/*
* DCDBPublicSensor functions.
*/
DCDBPublicSensor::DCDBPublicSensor()
{
name = "";
pattern = "";
scaling_factor = 1.0;
unit = "";
}
DCDBPublicSensor::DCDBPublicSensor (const DCDBPublicSensor &copy)
{
name = copy.name;
pattern = copy.pattern;
scaling_factor = copy.scaling_factor;
unit = copy.unit;
}
/*
* SensorConfig functions
*/
SCError SensorConfig::publishSensor(const char* publicName, const char* sensorPattern)
{
return impl->publishSensor(publicName, sensorPattern);
}
SCError SensorConfig::getPublicSensors(std::list<DCDBPublicSensor>& publicSensors)
{
return impl->getPublicSensors(publicSensors);
}
SensorConfig::SensorConfig(DCDBConnection* conn)
{
/* Allocate impl object */
impl = new SensorConfigImpl(conn);
}
SensorConfig::~SensorConfig()
{
if (impl) {
delete impl;
}
}
/*
* SensorConfigImpl protected functions
*/
/*
* Validate the pattern for a Sensor to be published
* Patterns may only consist of hex numbers and forward slashes
* and at most one wildcard character (*). Also, the total number of
* bits may not exceed 128.
*/
bool SensorConfigImpl::validateSensorPattern(const char* sensorPattern)
{
unsigned int wildcards, bits;
/* 128 bits in HEX is at most 32 characters. Considering forward-slash separators, 64 is a reasonable maxmimum string length */
if (strlen(sensorPattern) > 64) {
return false;
}
/* Iterate through the string and validate/count the characters */
bits = 0;
wildcards = 0;
for (unsigned int c = 0; c < strlen(sensorPattern); c++) {
switch (sensorPattern[c]) {
/* Regular HEX digit */
case '0':
case '1':
case '2':
case '3':
case '4':
case '5':
case '6':
case '7':
case '8':
case '9':
case 'a':
case 'A':
case 'b':
case 'B':
case 'c':
case 'C':
case 'd':
case 'D':
case 'e':
case 'E':
case 'f':
case 'F':
bits += 4;
break;
/* Wildcard */
case '*':
wildcards++;
break;
/* Forward slash */
case '/':
break;
/* Everything else is not allowed */
default:
return false;
}
}
/* More than one wildcard is not allowed */
if (wildcards > 1)
return false;
/* In case of a wildcard, there should be some bits left for the wildcard to fill in */
if (wildcards == 1 && bits >= 128)
return false;
/* Looks good */
return true;
}
/*
* SensorConfigImpl public functions
*/
SCError SensorConfigImpl::publishSensor(const char* publicName, const char* sensorPattern)
{
/* Check if the pattern matches the requirements */
if (!validateSensorPattern(sensorPattern)) {
return SC_INVALIDPATTERN;
}
/* Check if the session is valid */
if (!session) {
return SC_INVALIDSESSION;
}
/* Insert the entry */
CassError rc = CASS_OK;
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
const CassPrepared* prepared = nullptr;
const char* query = "INSERT INTO " CONFIG_KEYSPACE_NAME "." CF_PUBLISHEDSENSORS " (name, pattern) VALUES (?,?);";
future = cass_session_prepare(session, query);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
cass_future_free(future);
return SC_UNKNOWNERROR;
} else {
prepared = cass_future_get_prepared(future);
}
cass_future_free(future);
statement = cass_prepared_bind(prepared);
cass_statement_bind_string_by_name(statement, "name", publicName);
cass_statement_bind_string_by_name(statement, "pattern", sensorPattern);
future = cass_session_execute(session, statement);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
cass_prepared_free(prepared);
cass_future_free(future);
cass_statement_free(statement);
return SC_UNKNOWNERROR;
}
cass_prepared_free(prepared);
cass_future_free(future);
cass_statement_free(statement);
return SC_OK;
}
SCError SensorConfigImpl::getPublicSensors(std::list<DCDBPublicSensor>& publicSensors)
{
/* Check if the session is valid */
if (!session) {
return SC_INVALIDSESSION;
}
/* Fill the list with all public sensors */
CassError rc = CASS_OK;
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
const char* query = "SELECT * FROM dcdb_config.publishedsensors;";
statement = cass_statement_new(query, 0);
future = cass_session_execute(session, statement);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
cass_future_free(future);
cass_statement_free(statement);
return SC_UNKNOWNERROR;
} else {
const CassResult* result = cass_future_get_result(future);
CassIterator* iterator = cass_iterator_from_result(result);
while (cass_iterator_next(iterator)) {
const char* name;
size_t name_len;
const char* pattern;
size_t pattern_len;
double scaling_factor;
const char* unit;
size_t unit_len;
DCDBPublicSensor sensor;
const CassRow* row = cass_iterator_get_row(iterator);
if (cass_value_get_string(cass_row_get_column_by_name(row, "name"), &name, &name_len) != CASS_OK) {
name = ""; name_len = 0;
}
if (cass_value_get_string(cass_row_get_column_by_name(row, "pattern"), &pattern, &pattern_len) != CASS_OK) {
pattern = ""; pattern_len = 0;
}
if (cass_value_get_double(cass_row_get_column_by_name(row, "scaling_factor"), &scaling_factor) != CASS_OK) {
scaling_factor = 1.0;
}
if (cass_value_get_string(cass_row_get_column_by_name(row, "unit"), &unit, &unit_len) != CASS_OK) {
unit = ""; unit_len = 0;
}
sensor.name = std::string(name, name_len);
sensor.pattern = std::string(pattern, pattern_len);
sensor.scaling_factor = scaling_factor;
sensor.unit = std::string(unit, unit_len);
publicSensors.push_back(sensor);
}
cass_result_free(result);
cass_iterator_free(iterator);
}
cass_future_free(future);
cass_statement_free(statement);
return SC_OK;
}
SensorConfigImpl::SensorConfigImpl(DCDBConnection* conn)
{
connection = conn;
session = connection->getSessionHandle();
}
SensorConfigImpl::~SensorConfigImpl()
{
connection = nullptr;
session = nullptr;
}
......@@ -158,6 +158,9 @@ SensorDataStoreImpl::SensorDataStoreImpl(DCDBConnection* conn)
{
connection = conn;
session = connection->getSessionHandle();
preparedInsert = nullptr;
prepareInsert(0);
}
/**
......@@ -167,6 +170,10 @@ SensorDataStoreImpl::SensorDataStoreImpl(DCDBConnection* conn)
SensorDataStoreImpl::~SensorDataStoreImpl()
{
connection = nullptr;
session = nullptr;
if (preparedInsert) {
cass_prepared_free(preparedInsert);
}
}
/**
......
include ../../config.mk
CXXFLAGS = -O0 -ggdb --std=c++11 -Wall -Wno-unused-local-typedefs -Wno-unknown-warning-option -fmessage-length=0 -I$(DCDBDEPLOYPATH)/include/ -I$(DCDBBASEPATH)/include/
OBJS = dcdbconfig.o sensoraction.o useraction.o casshelper.o
LIBS = -L$(DCDBDEPLOYPATH)/lib/ -ldcdb -lcassandra -luv -lboost_random -lboost_system -lssl -lcrypto
OBJS = dcdbconfig.o sensoraction.o useraction.o
LIBS = -L$(DCDBDEPLOYPATH)/lib/ -ldcdb -lcassandra -luv -lboost_random -lboost_system -lboost_date_time -lssl -lcrypto
# GCC 4.8 is broken
ifeq ($(findstring 4.8, $(shell $(CXX) --version)), 4.8)
SLIBS = $(DCDBDEPLOYPATH)/lib/libboost_random.a $(DCDBDEPLOYPATH)/lib/libboost_system.a
......
/*
* casshelper.cpp
*
* Created on: Jan 25, 2015
* Author: Axel Auweter
*/
#include "casshelper.h"
void CassHelper::print_error(CassFuture* future) {
CassString message = cass_future_error_message(future);
fprintf(stderr, "Error: %.*s\n", (int)message.length, message.data);
}
CassCluster* CassHelper::create_cluster(const char* hostname) {
CassCluster* cluster = cass_cluster_new();
cass_cluster_set_contact_points(cluster, hostname);
/* Force protocol version 1 */
cass_cluster_set_protocol_version(cluster, 1);
return cluster;
}
CassError CassHelper::connect_session(CassSession* session, const CassCluster* cluster) {
CassError rc = CASS_OK;
CassFuture* future = cass_session_connect(session, cluster);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
print_error(future);
}
cass_future_free(future);
return rc;
}
CassError CassHelper::execute_query(CassSession* session, const char* query) {
CassError rc = CASS_OK;
CassFuture* future = NULL;
CassStatement* statement = cass_statement_new(cass_string_init(query), 0);
future = cass_session_execute(session, statement);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
print_error(future);
}
cass_future_free(future);
cass_statement_free(statement);
return rc;
}
/*
* casshelper.h
*
* Created on: Jan 25, 2015
* Author: Axel Auweter
*/
/*
* FIXME: These routines should be put into DCDBLib with nice interfaces.
* As of now, it is mostly a rip of the DataStax cpp-driver examples.
*/
#include "cassandra.h"
#include <cstdio>
#ifndef CASSHELPER_H
#define CASSHELPER_H
class CassHelper
{
public:
static void print_error(CassFuture* future);
static CassCluster* create_cluster(const char* hostname);
static CassError connect_session(CassSession* session, const CassCluster* cluster);
static CassError execute_query(CassSession* session, const char* query);
};
#endif /* CASSHELPER_H */
......@@ -20,8 +20,8 @@ void usage(int argc, char* argv[])
{
std::cout << "Usage: " << argv[0] << " [-h host] <command> [<arguments> ... ]" << std::endl << std::endl;
std::cout << "Valid commands are: " << std::endl;
std::cout << " help <command name> - print help for given command" << std::endl;
std::cout << " sensor - list and configure sensors" << std::endl;
std::cout << " HELP <command name> - print help for given command" << std::endl;
std::cout << " SENSOR - list and configure sensors" << std::endl;
}
int main(int argc, char* argv[])
......@@ -52,14 +52,13 @@ int main(int argc, char* argv[])
}
/* Process user command */
UserAction *action;
if (strcasecmp(argv[optind], "help") == 0) {
/* Help is special: either we do general usage or we trigger the class factory and run the printHelp() function */
if (optind + 1 >= argc) {
usage (argc, argv);
}
else {
action = UserActionFactory::getAction(argv[optind+1]);
auto action = UserActionFactory::getAction(argv[optind+1]);
if (action) {
action->printHelp(argc, argv);
}
......@@ -71,7 +70,7 @@ int main(int argc, char* argv[])
}
else {
/* If the command is not help, we try to instantiate the respective class through the factory and process the command */
action = UserActionFactory::getAction(argv[optind]);
auto action = UserActionFactory::getAction(argv[optind]);
if (action) {
return action->executeCommand(argc, argv, optind, host);
}
......
......@@ -11,10 +11,10 @@
#include <cstdlib>
#include <cstring>