Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

Commit 75179192 authored by Axel Auweter's avatar Axel Auweter
Browse files

New DCDBLib takes care of connecting to Cassandra.

Moved all of CollectAgent's Cassandra related stuff to DCDBLib.
Bumped Cassandra to 1.2.13.
Minor build system improvements.
parent f8c0fd8e
......@@ -2,69 +2,25 @@ include ../config.mk
CXXFLAGS = -O0 -g --std=c++11 -Wall -fmessage-length=0 -I$(DCDBDEPLOYPATH)/include/
OBJS = collectagent.o \
simplemqttserver.o \
simplemqttserverthread.o \
simplemqttservermessage.o \
sensordatastore.o \
cassandra/Cassandra.o \
cassandra/cassandra_constants.o \
cassandra/cassandra_types.o
SRC = $(patsubst cassandra/%,,$(OBJS:.o=.cpp))
LIBS = -L$(DCDBDEPLOYPATH)/lib/ -lthrift -lssl -lcrypto -lpthread -lboost_system -lboost_thread
simplemqttserver.o \
simplemqttserverthread.o \
simplemqttservermessage.o
LIBS = -L$(DCDBDEPLOYPATH)/lib/ -ldcdb -lpthread -lboost_system -lboost_thread
TARGET = collectagent
SUBTARGETS = cassandra/Cassandra.h
.PHONY : check-thrift-env clean clean-cassandra-headers install
.PHONY : clean install
P = $(shell cd $(DCDBDEPLOYPATH)/lib/ && pwd)
U = $(shell uname)
$(TARGET): $(SRC)
$(MAKE) cassandra/Cassandra.h
$(MAKE) $(OBJS)
$(TARGET): $(OBJS)
$(CXX) -o $(TARGET) $(OBJS) $(LIBS)
all: $(TARGET)
check-thrift-env:
@if [ "$U" = "Darwin" ]; then \
R=`echo $$DYLD_LIBRARY_PATH | grep $P`; \
if [ "$$R" = "" ]; then \
printf "\n******************************************\nPlease type the following line before running:\n export DYLD_LIBRARY_PATH=$$DYLD_LIBRARY_PATH:$P\n******************************************\n"; \
fi; \
fi
@if [ "$U" = "Linux" ]; then \
R=`echo $$LD_LIBRARY_PATH | grep $P`; \
if [ "$$R" = "" ]; then \
printf "\n******************************************\nPlease type the following line before running:\n export LD_LIBRARY_PATH=$$LD_LIBRARY_PATH:$P\n******************************************\n"; \
fi; \
fi
cassandra/Cassandra.h:
@if [ ! -e cassandra ]; then \
printf "Building Thrift interface headers... "; \
$(DCDBDEPLOYPATH)/bin/thrift --gen cpp $(DCDBDEPLOYPATH)/cassandra/interface/cassandra.thrift; \
mv gen-cpp cassandra; \
echo "Done."; \
fi
@if [ "$U" = "Linux" ]; then \
echo "Fixing Thrift includes... "; \
grep stdint cassandra/cassandra_types.h > /dev/null; \
if [ "$$?" -eq "1" ]; then sed -i '1i#include <stdint.h>' cassandra/cassandra_types.h; fi;\
grep stdint cassandra/Cassandra.h > /dev/null; \
if [ "$$?" -eq "1" ]; then sed -i '1i#include <stdint.h>' cassandra/Cassandra.h; fi;\
fi
@# This is a really bad way of adding a line of text from a Makefile, but the differences between sed on Mac OS and Linux leave me with no options...
@sed -e 's/#include "cassandra_types.h"/#include "cassandra_types.h"$$#undef VERSION/' cassandra/cassandra_constants.h > cassandra/cassandra_constants.h_newline
@cat cassandra/cassandra_constants.h_newline | tr $$ '\n' > cassandra/cassandra_constants.h
@rm cassandra/cassandra_constants.h_newline
clean: clean-cassandra-headers
rm -f $(OBJS) $(TARGET)
clean-cassandra-headers:
rm -rf cassandra
clean:
rm -f $(TARGET)
rm -f $(OBJS)
install: $(TARGET) check-thrift-env
install: $(TARGET)
install $(TARGET) $(DCDBDEPLOYPATH)/bin/
......@@ -11,8 +11,9 @@
#include <boost/date_time/posix_time/posix_time.hpp>
#include <dcdb/sensordatastore.h>
#include "simplemqttserver.h"
#include "sensordatastore.h"
using namespace std;
......
/*
* sensordatastore.cpp
*
* Created on: Jul 24, 2013
* Author: Axel Auweter
*/
#include <boost/lexical_cast.hpp>
#include "sensordatastore.h"
bool SensorDataStore::validateName(string name)
{
/*
* Make sure name only consists of alphabetical characters (super ugly)...
*/
class {
public:
static bool check(char c) {
return !isalpha(c);
}
} isNotAlpha;
if (find_if(name.begin(), name.end(), isNotAlpha.check) == name.end())
return true;
else
return false;
}
bool SensorDataStore::topicToSid(SensorId* sid, string topic)
{
uint64_t pos = 0;
const char* buf = topic.c_str();
sid->raw[0] = 0;
sid->raw[1] = 0;
while (*buf) {
if (*buf >= '0' && *buf <= '9') {
sid->raw[pos / 64] |= (((uint64_t)(*buf - '0')) << (60-(pos%64)));
pos += 4;
}
else if (*buf >= 'A' && *buf <= 'F') {
sid->raw[pos / 64] |= (((uint64_t)(*buf - 'A' + 0xa)) << (60-(pos%64)));
pos += 4;
}
else if (*buf >= 'a' && *buf <= 'f') {
sid->raw[pos / 64] |= (((uint64_t)(*buf - 'a' + 0xa)) << (60-(pos%64)));
pos += 4;
}
buf++;
}
return pos == 128;
}
string SensorDataStore::sidConvert(SensorId *sid)
{
uint64_t ll[2];
ll[0] = __builtin_bswap64(sid->raw[0]);
ll[1]= __builtin_bswap64(sid->raw[1]);
return string((char*)ll, 16);
}
string SensorDataStore::int64Convert(uint64_t n)
{
n = __builtin_bswap64(n);
return string((char*)&n, 8);
}
void SensorDataStore::insert(SensorId* sid, uint64_t ts, uint64_t value)
{
try {
ColumnParent cparent;
Column c;
string key, name, cvalue;
cparent.column_family = "sensordata";
/*
* Convert to Cassandra formats and assign
* Cassandra-internal timestamp.
*/
key = sidConvert(sid);
name = int64Convert(ts);
cvalue = int64Convert(value);
c.name = name;
c.value = cvalue;
c.__isset.value = true;
c.timestamp = ts/1000;
c.__isset.timestamp = true;
/*
* Call Hector to do the insert.
*/
myClient->insert(key, cparent, c, ConsistencyLevel::ONE);
}
/*
* TODO: All caught exceptions should be handled more gracefully
* as we would like to keep the CollectAgent running as long as
* possible.
*/
catch(const TTransportException& te){
cout << "TP Exception: " << te.what() << "[" << te.getType() << "]\n";
exit(EXIT_FAILURE);
}
catch(const InvalidRequestException& ire){
cout << "IRE Exception: " << ire.what() << "[" << ire.why << "]\n";
exit(EXIT_FAILURE);
}
catch(const NotFoundException& nfe){
cout << "NF Exception: " << nfe.what() << "\n";
exit(EXIT_FAILURE);
}
}
void SensorDataStore::connect(string hostname, int port)
{
sock = boost::shared_ptr<TSocket>(new TSocket(hostname, port));
tr = boost::shared_ptr<TFramedTransport>(new TFramedTransport(sock));
prot = boost::shared_ptr<TBinaryProtocol>(new TBinaryProtocol(tr));
myClient = new CassandraClient(prot);
tr->open();
myClient->describe_cluster_name(clusterName);
cout << "Connected to cluster: " << clusterName << "\n";
}
void SensorDataStore::updateKeySpaces()
{
myClient->describe_keyspaces(keySpaces);
}
bool SensorDataStore::existsKeyspace(string name)
{
updateKeySpaces();
for (std::vector<KsDef>::iterator it = keySpaces.begin(); it != keySpaces.end(); ++it) {
if ((*it).name.compare(name) == 0)
return true;
}
return false;
}
/*
* Create the keyspace that contains all the sensor data.
* We are using CQL3 to do this as its syntax is less likely
* to change in future versions than the native API.
*/
void SensorDataStore::createKeyspace(string name, int replicationFactor)
{
CqlResult res;
string query;
string rFact = boost::lexical_cast<string>(replicationFactor);
if(validateName(name)) {
query = "CREATE KEYSPACE " + name + " WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '" + rFact + "' };";
myClient->execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
}
}
void SensorDataStore::selectKeyspace(string name)
{
CqlResult res;
string query;
if (validateName(name)) {
query = "USE " + name + ";";
myClient->execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
updateKeySpaces();
for (std::vector<KsDef>::iterator it = keySpaces.begin(); it != keySpaces.end(); ++it) {
if ((*it).name.compare(name) == 0)
currentKeySpace = *it;
}
}
}
bool SensorDataStore::existsColumnFamilies()
{
for (std::vector<CfDef>::iterator it = currentKeySpace.cf_defs.begin(); it != currentKeySpace.cf_defs.end(); ++it) {
if ((*it).name.compare("sensordata") == 0)
return true;
}
return false;
}
void SensorDataStore::createColumnFamilies()
{
CqlResult res;
string query;
query = "CREATE TABLE sensordata ( sid blob, ts bigint, value bigint, PRIMARY KEY (sid, ts)) WITH COMPACT STORAGE;";
myClient->execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
}
void SensorDataStore::init(string hostname, int port)
{
/*
* Open the connection to the Cassandra database and
* create the necessary keyspace and column family.
*/
try {
connect(hostname, port);
if (!existsKeyspace(KEYSPACE_NAME)) {
cout << "Creating Keyspace " << KEYSPACE_NAME << "...\n";
createKeyspace(KEYSPACE_NAME, 1);
}
selectKeyspace(KEYSPACE_NAME);
if (!currentKeySpace.name.compare(KEYSPACE_NAME) == 0) {
cout << "Cannot select keyspace " << KEYSPACE_NAME << "\n";
exit(EXIT_FAILURE);
}
if (!existsColumnFamilies()) {
cout << "Creating Column Families...\n";
createColumnFamilies();
}
}
catch(const TTransportException& te){
cout << "TP Exception: " << te.what() << "[" << te.getType() << "]\n";
exit(EXIT_FAILURE);
}
catch(const InvalidRequestException& ire){
cout << "IRE Exception: " << ire.what() << "[" << ire.why << "]\n";
exit(EXIT_FAILURE);
}
catch(const NotFoundException& nfe){
cout << "NF Exception: " << nfe.what() << "\n";
exit(EXIT_FAILURE);
}
}
SensorDataStore::SensorDataStore()
{
init("localhost", 9160);
}
SensorDataStore::SensorDataStore(string hostname, int port)
{
init(hostname, port);
}
SensorDataStore::~SensorDataStore()
{
/*
* Clean up...
*/
if (myClient)
delete myClient;
}
/*
* sensordatastore.h
*
* Created on: Jul 24, 2013
* Author: Axel Auweter
*/
#include <stdint.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <thrift/Thrift.h>
#include <thrift/transport/TTransport.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/transport/TSocket.h>
#include <thrift/protocol/TProtocol.h>
#include <thrift/protocol/TBinaryProtocol.h>
#include "cassandra/Cassandra.h"
#ifndef SENSORDATASTORE_H_
#define SENSORDATASTORE_H_
using namespace apache::thrift;
using namespace apache::thrift::transport;
using namespace apache::thrift::protocol;
using namespace org::apache::cassandra;
using namespace std;
#define KEYSPACE_NAME "dcdb"
#pragma pack(push,1)
typedef union {
uint64_t raw;
struct {
uint8_t knc_id;
uint8_t bnc_id;
uint8_t bic_id;
uint8_t chassis_id;
uint8_t rack_id_lsb;
uint8_t rack_id_msb;
uint8_t cluster_id;
uint8_t datacenter_id;
};
} DeviceLocation;
typedef struct {
uint64_t sensor_id : 16;
uint64_t device_id : 48;
} DeviceSensorId;
typedef union {
uint64_t raw[2];
struct {
DeviceLocation dl;
DeviceSensorId dsid;
};
} SensorId;
#pragma pack(pop)
class SensorDataStore
{
protected:
CassandraClient *myClient;
std::vector<KsDef> keySpaces;
KsDef currentKeySpace;
string clusterName;
boost::shared_ptr<TSocket> sock;
boost::shared_ptr<TTransport> tr;
boost::shared_ptr<TProtocol> prot;
string sidConvert(SensorId *sid);
string int64Convert(uint64_t n);
void connect(string hostname, int port);
void updateKeySpaces();
bool existsKeyspace(string name);
void createKeyspace(string name, int replicationFactor);
void selectKeyspace(string name);
bool existsColumnFamilies();
void createColumnFamilies();
bool validateName(string name);
public:
void init(string hostname, int port);
void insert(SensorId* sid, uint64_t ts, uint64_t value);
bool topicToSid(SensorId* sid, string topic);
SensorDataStore();
SensorDataStore(string hostname, int port);
virtual ~SensorDataStore();
};
#endif /* SENSORDATASTORE_H_ */
include ../config.mk
# C++ Compiler Flags (use fPIC for our dynamic library)
CXXFLAGS = -O0 -g -Wall -Werror -fPIC --std=c++11 -I$(DCDBDEPLOYPATH)/include -I./include -I./include_internal -fmessage-length=0
# List of object files to build and the derived list of corresponding source files
OBJS = src/sensordatastore.o \
cassandra/Cassandra.o \
cassandra/cassandra_constants.o \
cassandra/cassandra_types.o
SRC = $(patsubst cassandra/%,,$(OBJS:.o=.cpp))
# List of public header files necessary to use this libray
PUBHEADERS = $(shell find include -type f -iname "*.h")
# External libraries to link against
LIBS = -L$(DCDBDEPLOYPATH)/lib -lthrift -lssl -lcrypto -lpthread -lboost_system -lboost_thread
# Dynamic library building differs between Linux/BSD and MacOS
OS = $(shell uname)
ifeq ($(OS),Darwin)
TARGET = libdcdb.dylib
DLFLAGS = -dynamiclib
else
TARGET = libdcdb.so
DLFLAGS = -dynamic -Wl,-soname,$(TARGET)
endif
# List of Phony Targets
.PHONY : check-thrift-env all clean clean-cassandra-headers install
# Main Library Target
$(TARGET): $(SRC)
$(MAKE) cassandra/Cassandra.h
$(MAKE) $(OBJS)
@if [ "$(OS)" = "Darwin" ]; then \
echo "Linking library in Mac OS style: $(TARGET)"; \
$(CXX) $(DLFLAGS) -o $(TARGET) $(OBJS) $(LIBS); \
else \
echo "Linking library in Linux style: $(TARGET)"; \
$(CXX) $(DLFLAGS) -o $(TARGET) $(OBJS) $(LIBS); \
fi
all: $(TARGET)
# Alert the user to put the necessary paths into LD_LIBRARY_PATH (or similar on other platforms)
check-thrift-env:
@if [ "$U" = "Darwin" ]; then \
R=`echo $$DYLD_LIBRARY_PATH | grep $P`; \
if [ "$$R" = "" ]; then \
printf "\n******************************************\nPlease type the following line before running:\n export DYLD_LIBRARY_PATH=$$DYLD_LIBRARY_PATH:$P\n******************************************\n"; \
fi; \
fi
@if [ "$U" = "Linux" ]; then \
R=`echo $$LD_LIBRARY_PATH | grep $P`; \
if [ "$$R" = "" ]; then \
printf "\n******************************************\nPlease type the following line before running:\n export LD_LIBRARY_PATH=$$LD_LIBRARY_PATH:$P\n******************************************\n"; \
fi; \
fi
# Build the Cassandra Thrift headers
cassandra/Cassandra.h:
@if [ ! -e cassandra ]; then \
printf "Building Thrift interface headers... "; \
$(DCDBDEPLOYPATH)/bin/thrift --gen cpp $(DCDBDEPLOYPATH)/cassandra/interface/cassandra.thrift; \
mv gen-cpp cassandra; \
echo "Done."; \
fi
@if [ "$U" = "Linux" ]; then \
echo "Fixing Thrift includes... "; \
grep stdint cassandra/cassandra_types.h > /dev/null; \
if [ "$$?" -eq "1" ]; then sed -i '1i#include <stdint.h>' cassandra/cassandra_types.h; fi;\
grep stdint cassandra/Cassandra.h > /dev/null; \
if [ "$$?" -eq "1" ]; then sed -i '1i#include <stdint.h>' cassandra/Cassandra.h; fi;\
fi
@# This is a really bad way of adding a line of text from a Makefile, but the differences between sed on Mac OS and Linux leave me with no options...
@sed -e 's/#include "cassandra_types.h"/#include "cassandra_types.h"$$#undef VERSION/' cassandra/cassandra_constants.h > cassandra/cassandra_constants.h_newline
@cat cassandra/cassandra_constants.h_newline | tr $$ '\n' > cassandra/cassandra_constants.h
@rm cassandra/cassandra_constants.h_newline
# Clean everything
clean: clean-cassandra-headers
rm -f $(OBJS) $(TARGET)
# Clean the Cassandra headers only
clean-cassandra-headers:
rm -rf cassandra
# Install to deploy path
install: $(TARGET) check-thrift-env
mkdir -p $(DCDBDEPLOYPATH)/lib
mkdir -p $(DCDBDEPLOYPATH)/include/dcdb
install $(TARGET) $(DCDBDEPLOYPATH)/lib/
install $(PUBHEADERS) $(DCDBDEPLOYPATH)/include/dcdb/
/*
* sensordatastore.h
*
* Created on: Jul 24, 2013
* Author: Axel Auweter
*/
#include <stdint.h>
#ifndef SENSORDATASTORE_H_
#define SENSORDATASTORE_H_
#pragma pack(push,1)
typedef union {
uint64_t raw;
struct {
uint8_t knc_id;
uint8_t bnc_id;
uint8_t bic_id;
uint8_t chassis_id;
uint8_t rack_id_lsb;
uint8_t rack_id_msb;
uint8_t cluster_id;
uint8_t datacenter_id;
};
} DeviceLocation;
typedef struct {
uint64_t sensor_id : 16;
uint64_t device_id : 48;
} DeviceSensorId;
typedef union {
uint64_t raw[2];
struct {
DeviceLocation dl;
DeviceSensorId dsid;
};
} SensorId;
#pragma pack(pop)
class SensorDataStoreImpl;
class SensorDataStore
{
private:
SensorDataStoreImpl* impl;
public:
void init(std::string hostname, int port);
bool topicToSid(SensorId* sid, std::string topic);
void insert(SensorId* sid, uint64_t ts, uint64_t value);
SensorDataStore();
SensorDataStore(std::string hostname, int port);
virtual ~SensorDataStore();
};
#endif /* SENSORDATASTORE_H_ */
/*
* dcdb_internal.h
*
* Internal data structures and definitions for the DCDB Library
*
*/
#ifndef DCDB_INTERNAL_H
#define DCDB_INTERNAL_H
#include <sys/socket.h>
#include <netinet/in.h>
#include <boost/lexical_cast.hpp>
<