Commit 59a7c6db authored by Axel Auweter's avatar Axel Auweter
Browse files

Add skeleton for two new tools: dcdbconfig & dcdbquery.

The tools currently use the new CQL API for Cassandra (while DCDBLib is still using Thrift). It is intended to move DCDBLib ober to the new API so that many of the functions of the two tools will actually be handled via DCDBLib calls.
parent 873c5ef5
......@@ -23,31 +23,26 @@
#pragma pack(push,1)
/**
* @brief The DeviceLocation type describes the location of a sensor.
* In the current implementation, DeviceLocation consists of 64 bits
* which can either be accessed trough the raw member or through the
* DEEP project specific members:
* - datacenter_id
* - cluster_id
* - rack_id_msb
* - rack_id_lsb
* - chassis_id
* - bic_id
* - bnc_id
* - knc_id
* @brief The DeviceLocation type describes the location of a device. A
* device is the smallest piece of hardware containing sensors.
*
* The location of a device is highly specific to the system architecture
* and thus, it is only treated as unsigned 64 bit integer internally.
* Since these bits, however, make up for the location of the data within
* the distributed database, it is recommended to assign a globally used
* schema in advance leaving the higher-order bits to higher level
* entities.
*
* Example:
* ----------------------------------------------
* | 8 Bits | 8 Bits | 16 Bits | 8 Bits | ...
* ----------------------------------------------
* | Data | | | |
* | Center | Cluster | Rack | Chassis | ...
* | ID | ID | ID | ID |
* ----------------------------------------------
*/
typedef union {
uint64_t raw; /**< The raw bit-field representing the DeviceLocation */
struct {
uint16_t sdb_id;
uint8_t bmc_id;
uint8_t chassis_id;
uint8_t rack_id_lsb;
uint8_t rack_id_msb;
uint8_t cluster_id;
uint8_t datacenter_id;
};
} DeviceLocation;
typedef uint64_t DeviceLocation;
/**
* @brief The DeviceSensorId type describes the tuple of the sensor
......@@ -60,8 +55,7 @@ typedef union {
* approach is, for example, to use MAC addresses as the device_id.
*/
typedef struct {
//uint64_t sensor_number : 16; /**< The sensor_number of the sensor */
uint64_t sensor_id : 16; /**< The sensor_number of the sensor */
uint64_t sensor_number : 16; /**< The sensor_number of the sensor */
uint64_t rsvd : 16; /**< Reserved */
uint64_t device_id : 32; /**< The location-independent device_id */
} DeviceSensorId;
......@@ -120,7 +114,6 @@ public:
* @return Returns true if the topic string was valid and the SensorId object was populated.
*/
bool topicToSid(SensorId* sid, std::string topic);
//template <typename T> bool topicToSid(T* sid, uint64_t ts, uint64_t value)
/**
* @brief This function inserts a single sensor reading into
......@@ -130,7 +123,6 @@ public:
* @param value The value of the sensor reading.
*/
void insert(SensorId* sid, uint64_t ts, uint64_t value);
//template <typename T> void insert(T* sid, uint64_t ts, uint64_t value)
/**
* @brief A shortcut constructor for a SensorDataStore object
......
......@@ -20,9 +20,12 @@
#include "sensordatastore.h"
#include "cassandraBackend.h"
#define KEYSPACE_NAME "dcdb"
#define CF_SENSORDATA "sensordata"
#define CF_SETTINGS "settings"
#define KEYSPACE_NAME "dcdb"
#define CF_SENSORDATA "sensordata"
#define CONFIG_KEYSPACE_NAME KEYSPACE_NAME "_config"
#define CF_SENSORALIASES "sensoralias"
#define CF_VIRTUALSENSORS "virtualsensors"
/**
* @brief The SensorDataStoreImpl class contains all protected
......
......@@ -142,12 +142,12 @@ void CassandraBackend::createColumnFamily(std::string name, std::string fields,
bool CassandraBackend::validateName(std::string name)
{
/*
* Make sure name only consists of alphabetical characters (super ugly)...
* Make sure name only consists of alphabetical characters or underscores (super ugly)...
*/
class {
public:
static bool check(char c) {
return !isalpha(c);
return !isalpha(c) && !(c == '_');
}
} isNotAlpha;
......
......@@ -64,7 +64,6 @@ std::string SensorDataStoreImpl::sidConvert(SensorId *sid)
* SensorDataStore class.
*/
bool SensorDataStoreImpl::topicToSid(SensorId* sid, std::string topic)
//template <typename T> bool SensorDataStoreImpl::topicToSid(T* sid, string topic)
{
uint64_t pos = 0;
const char* buf = topic.c_str();
......@@ -103,11 +102,34 @@ void SensorDataStoreImpl::init(std::string hostname, int port) {
/*
* Open the connection to the Cassandra database and
* create the necessary keyspace and column family.
* create the necessary keyspaces and column families.
*/
try {
csBackend->connect(hostname, port);
/* Keyspace and column family for sensor aliases */
/* FIXME: We should have a good way to determine the number of replicas here */
if (!csBackend->existsKeyspace(CONFIG_KEYSPACE_NAME)) {
std::cout << "Creating Keyspace " << CONFIG_KEYSPACE_NAME << "...\n";
csBackend->createKeyspace(CONFIG_KEYSPACE_NAME, 1);
}
csBackend->selectKeyspace(CONFIG_KEYSPACE_NAME);
if (!(csBackend->currentKeySpace.name.compare(CONFIG_KEYSPACE_NAME) == 0)) {
std::cout << "Cannot select keyspace " << CONFIG_KEYSPACE_NAME << "\n";
exit(EXIT_FAILURE);
}
if (!csBackend->existsColumnFamily(CF_SENSORALIASES)) {
std::cout << "Creating Column Familiy " CF_SENSORALIASES "...\n";
csBackend->createColumnFamily(CF_SENSORALIASES,
"name varchar, pattern varchar",
"name",
"CACHING = all");
}
/* Keyspace and column family for raw sensor data */
if (!csBackend->existsKeyspace(KEYSPACE_NAME)) {
std::cout << "Creating Keyspace " << KEYSPACE_NAME << "...\n";
csBackend->createKeyspace(KEYSPACE_NAME, 1);
......
include ../config.mk
PROJECTS = dcdbconfig dcdbquery
.PHONY : clean install $(PROJECTS)
all: $(PROJECTS)
$(PROJECTS):
@$(MAKE) -j $(MAKETHREADS) -C $@
clean:
@$(foreach p,$(PROJECTS),$(MAKE) -C $(p) clean;)
install:
@$(foreach p,$(PROJECTS),$(MAKE) -C $(p) install;)
include ../../config.mk
CXXFLAGS = -O0 -g --std=c++11 -Wall -Wno-unused-local-typedefs -Wno-unknown-warning-option -fmessage-length=0 -I$(DCDBDEPLOYPATH)/include/ -I$(DCDBBASEPATH)/include/
OBJS = dcdbconfig.o sensoraction.o useraction.o casshelper.o
LIBS = -L$(DCDBDEPLOYPATH)/lib/ -ldcdb -lcassandra
#-lpthread -lboost_system -lboost_thread -lthrift
TARGET = dcdbconfig
.PHONY : clean install
$(TARGET): $(OBJS)
$(CXX) -o $(TARGET) $(OBJS) $(LIBS)
all: $(TARGET)
clean:
rm -f $(TARGET)
rm -f $(OBJS)
install: $(TARGET)
install $(TARGET) $(DCDBDEPLOYPATH)/bin/
/*
* casshelper.cpp
*
* Created on: Jan 25, 2015
* Author: Axel Auweter
*/
#include "casshelper.h"
void CassHelper::print_error(CassFuture* future) {
CassString message = cass_future_error_message(future);
fprintf(stderr, "Error: %.*s\n", (int)message.length, message.data);
}
CassCluster* CassHelper::create_cluster(const char* hostname) {
CassCluster* cluster = cass_cluster_new();
cass_cluster_set_contact_points(cluster, hostname);
/* Force protocol version 1 */
cass_cluster_set_protocol_version(cluster, 1);
return cluster;
}
CassError CassHelper::connect_session(CassSession* session, const CassCluster* cluster) {
CassError rc = CASS_OK;
CassFuture* future = cass_session_connect(session, cluster);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
print_error(future);
}
cass_future_free(future);
return rc;
}
CassError CassHelper::execute_query(CassSession* session, const char* query) {
CassError rc = CASS_OK;
CassFuture* future = NULL;
CassStatement* statement = cass_statement_new(cass_string_init(query), 0);
future = cass_session_execute(session, statement);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
print_error(future);
}
cass_future_free(future);
cass_statement_free(statement);
return rc;
}
/*
* casshelper.h
*
* Created on: Jan 25, 2015
* Author: Axel Auweter
*/
/*
* FIXME: These routines should be put into DCDBLib with nice interfaces.
* As of now, it is mostly a rip of the DataStax cpp-driver examples.
*/
#include "cassandra.h"
#include <cstdio>
#ifndef CASSHELPER_H
#define CASSHELPER_H
class CassHelper
{
public:
static void print_error(CassFuture* future);
static CassCluster* create_cluster(const char* hostname);
static CassError connect_session(CassSession* session, const CassCluster* cluster);
static CassError execute_query(CassSession* session, const char* query);
};
#endif /* CASSHELPER_H */
/*
* dcdbconfig.cpp
*
* Created on: Jan 06, 2015
* Author: Axel Auweter
*/
#include <iostream>
#include <cstring>
#include <unistd.h>
#include "cassandra.h"
#include "useraction.h"
void usage(int argc, char* argv[])
{
std::cout << "Usage: " << argv[0] << " [-h host] <command> [<arguments> ... ]" << std::endl << std::endl;
std::cout << "Valid commands are: " << std::endl;
std::cout << " help <command name> - print help for given command" << std::endl;
std::cout << " sensors - list and configure sensors" << std::endl;
}
int main(int argc, char* argv[])
{
/* Check command line parameters */
if (argc < 2) {
usage(argc, argv);
exit(EXIT_FAILURE);
}
char ret;
const char *host = "localhost";
while ((ret=getopt(argc, argv, "+h:"))!=EOF) {
switch(ret) {
case 'h':
host = optarg;
break;
default:
usage(argc, argv);
exit(EXIT_FAILURE);
}
}
if (optind >= argc) {
std::cout << "Missing command!" << std::endl;
usage(argc, argv);
exit(EXIT_FAILURE);
}
/* Process user command */
UserAction *action;
if (strcasecmp(argv[optind], "help") == 0) {
/* Help is special: either we do general usage or we trigger the class factory and run the printHelp() function */
if (optind + 1 >= argc) {
usage (argc, argv);
}
else {
action = UserActionFactory::getAction(argv[optind+1]);
if (action) {
action->printHelp(argc, argv);
}
else {
std::cout << "Cannot provide help for unknown command: " << argv[optind+1] << std::endl;
exit(EXIT_FAILURE);
}
}
}
else {
/* If the command is not help, we try to instantiate the respective class through the factory and process the command */
action = UserActionFactory::getAction(argv[optind]);
if (action) {
return action->executeCommand(argc, argv, optind, host);
}
else {
std::cout << "Unknwon command: " << argv[1] << std::endl;
usage(argc, argv);
exit(EXIT_FAILURE);
}
}
/* Shouldn't fall through here */
return 0;
}
/*
* sensoraction.h
*
* Created on: Jan 25, 2015
* Author: Axel Auweter
*/
#include <iostream>
#include <cstdlib>
#include <cstring>
#include "cassandra.h"
#include "sensoraction.h"
#include "casshelper.h"
void SensorAction::printHelp(int argc, char* argv[])
{
std::cout << "SENSOR command help" << std::endl;
std::cout << "The SENSOR command has the following options:" << std::endl;
std::cout << " aliasadd <alias name> <pattern> - Add a sensor alias matching a given MQTT" << std::endl;
std::cout << " topic pattern." << std::endl;
std::cout << " aliaslist - List all configured aliases." << std::endl;
}
int SensorAction::executeCommand(int argc, char* argv[], int argvidx, const char* hostname)
{
/* Independent from the command, we need to connect to the server */
CassCluster* cluster = CassHelper::create_cluster(hostname);
CassSession* session = cass_session_new();
if (CassHelper::connect_session(session, cluster) != CASS_OK) {
cass_cluster_free(cluster);
cass_session_free(session);
std::cout << "Cannot connect to Cassandra database." << std::endl;
return EXIT_FAILURE;
}
/* Check what we need to do (argv[argvidx] contains "SENSOR") */
argvidx++;
if (argvidx >= argc) {
std::cout << "The SENSOR command needs at least one parameter." << std::endl;
std::cout << "Run with 'HELP SENSOR' to see the list of possible SENSOR commands." << std::endl;
goto executeCommandError;
}
if (strcasecmp(argv[argvidx], "aliasadd") == 0) {
/* aliasadd needs two more parameters */
if (argvidx+2 >= argc) {
std::cout << "ALIASADD needs two more parameters!" << std::endl;
goto executeCommandError;
}
doAddAlias(session, argv[argvidx+1], argv[argvidx+2]);
}
else if (strcasecmp(argv[argvidx], "aliaslist") == 0) {
doListAliases(session);
}
else {
std::cout << "Invalid SENSOR command: " << argv[argvidx] << std::endl;
goto executeCommandError;
}
/* Clean up */
cass_cluster_free(cluster);
cass_session_free(session);
return EXIT_SUCCESS;
executeCommandError:
cass_cluster_free(cluster);
cass_session_free(session);
return EXIT_FAILURE;
}
void SensorAction::doAddAlias(CassSession* session, const char* aliasName, const char* aliasPattern)
{
/*
* FIXME: Should do some sanity checks before allowing this into the DB.
* FIXME: Merge the core functionality into DCDBLib and stick to the
* CONFIG_KEYSPACE_NAME/CF_SENSORALIASES instead of hard coding to
* dcdb_config.sensoralias
*/
CassError rc = CASS_OK;
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
const CassPrepared* prepared = nullptr;
CassString query = cass_string_init("INSERT INTO dcdb_config.sensoralias (name, pattern) VALUES (?,?);");
future = cass_session_prepare(session, query);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
CassHelper::print_error(future);
} else {
prepared = cass_future_get_prepared(future);
}
statement = cass_prepared_bind(prepared);
cass_statement_bind_string_by_name(statement, "name", cass_string_init(aliasName));
cass_statement_bind_string_by_name(statement, "pattern", cass_string_init(aliasPattern));
future = cass_session_execute(session, statement);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
CassHelper::print_error(future);
}
cass_future_free(future);
cass_statement_free(statement);
}
void SensorAction::doListAliases(CassSession* session)
{
CassError rc = CASS_OK;
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
CassString query = cass_string_init("SELECT * FROM dcdb_config.sensoralias;");
CassString name, pattern;
statement = cass_statement_new(query, 0);
future = cass_session_execute(session, statement);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
CassHelper::print_error(future);
} else {
const CassResult* result = cass_future_get_result(future);
CassIterator* iterator = cass_iterator_from_result(result);
while (cass_iterator_next(iterator)) {
const CassRow* row = cass_iterator_get_row(iterator);
cass_value_get_string(cass_row_get_column_by_name(row, "name"), &name);
cass_value_get_string(cass_row_get_column_by_name(row, "pattern"), &pattern);
std::cout << name.data << " : " << pattern.data << std::endl;
}
cass_result_free(result);
cass_iterator_free(iterator);
}
}
/*
* sensoraction.h
*
* Created on: Jan 25, 2015
* Author: Axel Auweter
*/
#include "cassandra.h"
#include "useraction.h"
#ifndef SENSORACTION_H
#define SENSORACTION_H
class SensorAction : public UserAction
{
public:
void printHelp(int argc, char* argv[]);
int executeCommand(int argc, char* argv[], int argvidx, const char* hostname);
protected:
void doListAliases(CassSession* session);
void doAddAlias(CassSession* session, const char* aliasName, const char* aliasPattern);
};
#endif
/*
* useraction.cpp
*
* Created on: Jan 25, 2015
* Author: Axel Auweter
*/
#include "useraction.h"
#include "sensoraction.h"
#include <cstring>
/*
* This function acts as a class factory to return the
* appropriate class handling a given type of action
* (e.g. "sensors")
*/
UserAction* UserActionFactory::getAction(const char *actionStr)
{
if (strcasecmp(actionStr, "sensor") == 0) {
return new SensorAction();
}
else {
return nullptr;
}
}
/*
* useraction.h
*
* Created on: Jan 25, 2015
* Author: Axel Auweter
*/
#ifndef USERACTION_H
#define USERACTION_H
class UserAction
{
public:
virtual void printHelp(int argc, char* argv[]) = 0;
virtual int executeCommand(int argc, char* argv[], int argvidx, const char* hostname) = 0;
};
class UserActionFactory
{
public:
static UserAction* getAction(const char *actionStr);
};
#endif // USERACTION_H
include ../../config.mk
CXXFLAGS = -O0 -g --std=c++11 -Wall -Wno-unused-local-typedefs -Wno-unknown-warning-option -fmessage-length=0 -I$(DCDBDEPLOYPATH)/include/ -I$(DCDBBASEPATH)/include/
OBJS = dcdbquery.o
LIBS = -L$(DCDBDEPLOYPATH)/lib/ -ldcdb
#-lpthread -lboost_system -lboost_thread -lthrift
TARGET = dcdbquery
.PHONY : clean install
$(TARGET): $(OBJS)
$(CXX) -o $(TARGET) $(OBJS) $(LIBS)
all: $(TARGET)