Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

Commit a65bde4b authored by Axel Auweter's avatar Axel Auweter
Browse files

Removed mosquitto dependency entirely. Started reworking collectagent main...

Removed mosquitto dependency entirely. Started reworking collectagent main routines and message handling.
parent 5abfb2dd
include ../config.mk
CXXFLAGS = $(shell ./cxx11flags.sh $(CXX) -O0 -g -Wall -Wno-null-conversion -fmessage-length=0 -I$(DCDBDEPLOYPATH)/include/ | head -1)
CXXFLAGS = $(shell ./cxx11flags.sh $(CXX) -O0 -g -Wall -fmessage-length=0 -I$(DCDBDEPLOYPATH)/include/ | head -1)
OBJS = collectagent.o simplemqttserver.o simplemqttserverthread.o simplemqttservermessage.o cassandra/Cassandra.o cassandra/cassandra_constants.o cassandra/cassandra_types.o
SRC = $(patsubst cassandra/%,,$(OBJS:.o=.cpp))
LIBS = $(shell ./cxx11flags.sh $(CXX) -L$(DCDBDEPLOYPATH)/lib/ -lthrift -lmosquitto -lssl -lcrypto -lpthread -lboost_system | tail -1)
LIBS = $(shell ./cxx11flags.sh $(CXX) -L$(DCDBDEPLOYPATH)/lib/ -lthrift -lssl -lcrypto -lpthread -lboost_system | tail -1)
TARGET = collectagent
SUBTARGETS = cassandra/Cassandra.h
......
......@@ -3,11 +3,10 @@
// Author : Axel Auweter
// Version :
// Copyright : Leibniz Supercomputing Centre
// Description : As of now this is some rudimentary bad code to talk to the DB
// Description : Main code of the CollectAgent
//============================================================================
#include <cstdlib>
#include <sstream>
#include <sys/socket.h>
......@@ -38,60 +37,40 @@ using namespace org::apache::cassandra;
CassandraClient *myClient;
int keepRunning;
int msgCtr;
uint64_t msgCtr;
uint64_t pmsgCtr;
void sigHandler(int sig) {
void sigHandler(int sig)
{
keepRunning = 0;
}
static inline string convert(void* ptr, size_t len) {
switch (len) {
case 4:
// *((uint32_t*)ptr) = __builtin_bswap32(*((uint32_t*)ptr));
break;
case 8:
// *((uint64_t*)ptr) = __builtin_bswap64(*((uint64_t*)ptr));
break;
default:
// Do nothing and hope for the best...
break;
}
string res((char*)ptr, len);
return res;
}
void mqttOnMessage(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message) {
void mqttCallback(SimpleMQTTMessage *msg)
{
/*
* Increment the msgCtr/vmsgCtr for statistics.
*/
msgCtr++;
if (msg->isPublish())
pmsgCtr++;
/*
* Decode the message and put into the database.
*/
try {
CqlResult res;
std::string query;
std::stringstream datestr, sidstr;
#if 0
time_t t = time(NULL);
struct tm *now = localtime(&t);
datestr << "'" << (now->tm_year + 1900) << "-" << (now->tm_mon + 1)
<< "-" << (now->tm_mday) << " " << (now->tm_hour) << ":"
<< (now->tm_min) << ":" << (now->tm_sec) << "'";
#else
ColumnParent cparent;
Column c;
string key, name, value;
boost::posix_time::ptime epoch(boost::gregorian::date(1970,1,1));
boost::posix_time::ptime now = boost::posix_time::microsec_clock::universal_time();
boost::posix_time::time_duration diff = now - epoch;
//datestr << diff.total_nanoseconds();
#endif
//sidstr << msgCtr;
#if 0
query = "INSERT INTO sensordata (sid, ts, value) VALUES ( " + sidstr.str() + ", " + datestr.str() + ", 3.141 );";
myClient->execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
#else
ColumnParent cparent;
cparent.column_family = "sensordata";
Column c;
string key, name, value;
uint64_t ts = diff.total_nanoseconds();
uint32_t sid = msgCtr;
#if 0
key = convert(&sid, sizeof(msgCtr));
name = convert(&ts, sizeof(ts));
......@@ -105,19 +84,22 @@ void mqttOnMessage(struct mosquitto *mosq, void *obj, const struct mosquitto_mes
myClient->insert(key, cparent, c, ConsistencyLevel::ONE);
#endif
}
catch(TTransportException te){
cout << "TP Exception: " << te.what() << "[" << te.getType() << "]\n";
}catch(InvalidRequestException ire){
cout << "IRE Exception: " << ire.what() << "[" << ire.why << "]\n";
}catch(NotFoundException nfe){
cout << "NF Exception: " << nfe.what() << "\n";
catch(TTransportException *te){
cout << "TP Exception: " << te->what() << "[" << te->getType() << "]\n";
}
catch(InvalidRequestException *ire){
cout << "IRE Exception: " << ire->what() << "[" << ire->why << "]\n";
}
catch(NotFoundException *nfe){
cout << "NF Exception: " << nfe->what() << "\n";
}
delete msg;
}
#if 0
int main(void) {
boost::shared_ptr<TSocket> sock;
boost::shared_ptr<TTransport> tr;
boost::shared_ptr<TProtocol> prot;
......@@ -128,6 +110,10 @@ int main(void) {
tr = boost::shared_ptr<TFramedTransport>(new TFramedTransport(sock));
prot = boost::shared_ptr<TBinaryProtocol>(new TBinaryProtocol(tr));
/*
* Open the connection to the Cassandra database and
* create the necessary keyspace and column family.
*/
try {
myClient = new CassandraClient(prot);
tr->open();
......@@ -136,7 +122,6 @@ int main(void) {
myClient->describe_cluster_name(clusterName);
cout << "Cluster name: " << clusterName << "\n";
int dcdbKeyspace = -1;
cout << "Keyspaces:\n";
std::vector<KsDef> keySpaces;
......@@ -190,157 +175,52 @@ int main(void) {
else {
cout << "Using existing sensordata column familiy.\n";
}
}
catch(TTransportException *te){
cout << "TP Exception: " << te->what() << "[" << te->getType() << "]\n";
}
catch(InvalidRequestException *ire){
cout << "IRE Exception: " << ire->what() << "[" << ire->why << "]\n";
}
catch(NotFoundException *nfe){
cout << "NF Exception: " << nfe->what() << "\n";
}
/* Should have the keyspace and the column familiy in the system now, subscribe to local mqtt broker */
int mosqMajor, mosqMinor, mosqRevision;
mosquitto_lib_version(&mosqMajor, &mosqMinor, &mosqRevision);
cout << "Initializing Mosquitto Library Version " << mosqMajor << "." << mosqMinor << "." << mosqRevision << "\n";
mosquitto_lib_init();
/* Init mosquitto struct */
struct mosquitto* mosq;
mosq = mosquitto_new("CollectAgent", false, NULL);
if (!mosq) {
perror(NULL);
exit(EXIT_FAILURE);
}
/* Connect to the broker */
cout << "Connecting to broker...";
cout.flush();
if (mosquitto_connect(mosq, "localhost", 1883, 1000) != MOSQ_ERR_SUCCESS) {
perror("\nCould not connect to host");
exit(EXIT_FAILURE);
}
cout << " Done.\n";
/* Catch SIGINT signals */
signal(SIGINT, sigHandler);
/* Catch SIGINT signals */
signal(SIGINT, sigHandler);
/* Subscribe to anything */
cout << "Subscribing to anything on the broker (Pattern #)...";
cout.flush();
if (mosquitto_subscribe(mosq, NULL, "#", 0) != MOSQ_ERR_SUCCESS) {
perror("\nCould not subscribe");
exit(EXIT_FAILURE);
}
cout << " Done.\n";
/* Set the callback for mosquitto */
cout << "Configuring message callback...";
cout.flush();
mosquitto_message_callback_set(mosq, mqttOnMessage);
cout << " Done.\n";
/* Here comes the main loop */
cout << "Starting mqtt loop thread...";
cout.flush();
if (mosquitto_loop_start(mosq) != MOSQ_ERR_SUCCESS) {
perror("\nCould not start mosquitto thread");
exit(EXIT_FAILURE);
}
cout << " Done.\n";
try{
keepRunning = 1;
timeval start, end;
double elapsed;
SimpleMQTTServer ms;
ms.setMessageCallback(mqttCallback);
ms.start();
cout << "Server running...\n";
while(keepRunning) {
gettimeofday(&start, NULL);
sleep(10);
sleep(60);
/* not really thread safe but will do the job */
gettimeofday(&end, NULL);
elapsed = (end.tv_sec - start.tv_sec) * 1000.0;
elapsed += (end.tv_usec - start.tv_usec) / 1000.0;
cout << "Message rate: " << (msgCtr/elapsed)*1000.0 << " messages/second\n";
float publish = msgCtr?(pmsgCtr*100.0)/msgCtr:0;
cout << "Message rate: " << (msgCtr/elapsed)*1000.0 << " messages/second (" << publish << "% PUBLISH)\n";
msgCtr = 0;
pmsgCtr = 0;
}
cout << "Cleaning up...";
/* Stop the mosquitto loop */
mosquitto_loop_stop(mosq, true);
mosquitto_disconnect(mosq);
/* Disconnect from Cassandra */
tr->close();
cout << " Done.\n";
#if 0
int resB = 0;
while (resB < keySpaces[0].cf_defs.size()) {
query = "SELECT * FROM " + keySpaces[0].cf_defs[resB].name + ";";
printf("Sending CQL statement: %s...", query.c_str());
myClient.execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
printf(" Done.\nResult:\n");
int resC = 0;
while (resC < res.rows.size()) {
int resD = 0;
while (resD < res.rows[resC].columns.size()) {
printf("%s [%lu]:", res.rows[resC].columns[resD].name.c_str(),
res.rows[resC].columns[resD].value.length());
int resE = 0;
while (resE < res.rows[resC].columns[resD].value.length()) {
printf(" %02x", res.rows[resC].columns[resD].value.c_str()[resE] & 0xff);
resE++;
}
printf("\n");
resD++;
}
resC++;
}
resB++;
}
#endif
}
catch(TTransportException te){
cout << "TP Exception: " << te.what() << "[" << te.getType() << "]\n";
}
catch(InvalidRequestException ire){
cout << "IRE Exception: " << ire.what() << "[" << ire.why << "]\n";
}
catch(NotFoundException nfe){
cout << "NF Exception: " << nfe.what() << "\n";
}
return EXIT_SUCCESS;
}
#else
void mqttCallback(SimpleMQTTMessage *msg)
{
cout << "Hello from mqttCallback!\n";
msg->dump();
delete msg;
}
int main(void) {
cout << "Stopping...\n";
MQTTFixedHeader mh;
mh.bits.type = MQTT_CONNECT;
mh.bits.qos = 2;
mh.bits.dup = 1;
mh.bits.retain = 1;
mh.bits.remaining_length[0] = 0xff;
mh.bits.remaining_length[1] = 0xee;
mh.bits.remaining_length[2] = 0xdd;
mh.bits.remaining_length[3] = 0xcc;
printf("mh.raw: 0x%02x 0x%02x 0x%02x 0x%02x 0x%02x\n", mh.raw[0], mh.raw[1], mh.raw[2], mh.raw[3], mh.raw[4]);
try{
SimpleMQTTServer ms;
ms.setMessageCallback(mqttCallback);
ms.start();
sleep(60);
ms.stop();
}
catch (std::exception e) {
cout << "Exception: " << e.what() << "\n";
catch (exception *e) {
cout << "Exception: " << e->what() << "\n";
}
return 0;
}
#endif
......@@ -64,7 +64,7 @@
/*
* Enable verbose output of the MQTT server.
*/
#define SimpleMQTTVerbose
//#define SimpleMQTTVerbose
#include "simplemqttservermessage.h"
typedef void (*SimpleMQTTMessageCallback)(SimpleMQTTMessage*);
......
......@@ -190,7 +190,9 @@ ssize_t SimpleMQTTMessage::appendRawData(void* buf, size_t len)
void SimpleMQTTMessage::dump()
{
#if SimpleMQTTVerbose
coutMtx.lock();
#endif
cout << "Dump of SimpleMQTTMessage (" << this << "):\n";
cout << " State: ";
switch (state) {
......@@ -225,10 +227,17 @@ void SimpleMQTTMessage::dump()
<< ", RETAIN=" << hex << (int)fixedHeader.bits.retain << "\n";
cout << " Bytes Processed: " << bytesProcessed << "\n";
cout << " Remaining Length: " << remainingLength << "\n";
#if SimpleMQTTVerbose
coutMtx.unlock();
#endif
}
bool SimpleMQTTMessage::complete()
{
return state == Complete;
}
bool SimpleMQTTMessage::isPublish()
{
return complete() && fixedHeader.bits.type == MQTT_PUBLISH;
}
......@@ -63,6 +63,7 @@ protected:
public:
ssize_t appendRawData(void* buf, size_t len);
bool complete();
bool isPublish();
void dump();
......
......@@ -224,7 +224,6 @@ void SimpleMQTTServerMessageThread::run()
char* readPtr;
ssize_t rbytes, lbytes, bytes;
cout << "Data ready on fd: " << fds[connectionId].fd << "\n";
rbytes = read(fds[connectionId].fd, inbuf, SimpleMQTTReadBufferSize);
readPtr = inbuf;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment