Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

Commit e1fffa21 authored by Axel Auweter's avatar Axel Auweter
Browse files

Implemented functionality to subscribe to mqtt broker and push some

values into the data store. Also shows the achieved message rate. Still
pretty dirty and naive.
parent 917005a6
......@@ -36,6 +36,7 @@
<option id="gnu.cpp.compiler.option.include.paths.596958677" name="Include paths (-I)" superClass="gnu.cpp.compiler.option.include.paths" valueType="includePath">
<listOptionValue builtIn="false" value="../../../thrift/lib/cpp/src"/>
<listOptionValue builtIn="false" value="/opt/local/include"/>
<listOptionValue builtIn="false" value="../../../mosquitto-1.1/lib/"/>
</option>
<inputType id="cdt.managedbuild.tool.gnu.cpp.compiler.input.1562330115" superClass="cdt.managedbuild.tool.gnu.cpp.compiler.input"/>
</tool>
......@@ -53,7 +54,11 @@
<project id="CollectAgent.null.1626518544" name="CollectAgent"/>
</storageModule>
<storageModule moduleId="org.eclipse.cdt.core.LanguageSettingsProviders"/>
<storageModule moduleId="refreshScope"/>
<storageModule moduleId="refreshScope" versionNumber="2">
<configuration configurationName="Default">
<resource resourceType="PROJECT" workspacePath="/CollectAgent"/>
</configuration>
</storageModule>
<storageModule moduleId="scannerConfiguration">
<autodiscovery enabled="true" problemReportingEnabled="true" selectedProfileId=""/>
<scannerConfigBuildInfo instanceId="cdt.managedbuild.toolchain.gnu.macosx.base.182916731;cdt.managedbuild.toolchain.gnu.macosx.base.182916731.802739244;cdt.managedbuild.tool.gnu.c.compiler.macosx.base.1988907322;cdt.managedbuild.tool.gnu.c.compiler.input.660553692">
......@@ -63,4 +68,5 @@
<autodiscovery enabled="true" problemReportingEnabled="true" selectedProfileId=""/>
</scannerConfigBuildInfo>
</storageModule>
<storageModule moduleId="org.eclipse.cdt.internal.ui.text.commentOwnerProjectMappings"/>
</cproject>
......@@ -6,12 +6,16 @@
// Description : As of now this is some rudimentary bad code to talk to the DB
//============================================================================
#include <stdio.h>
#include <stdlib.h>
#include <cstdio>
#include <cstdlib>
#include <sstream>
#include <sys/socket.h>
#include <netinet/in.h>
#include <sys/time.h>
#include <thrift/Thrift.h>
#include <thrift/transport/TTransport.h>
#include <thrift/transport/TBufferTransports.h>
......@@ -21,11 +25,49 @@
#include "cassandra/cassandra.h"
#include "mosquitto.h"
using namespace apache::thrift;
using namespace apache::thrift::transport;
using namespace apache::thrift::protocol;
using namespace org::apache::cassandra;
CassandraClient *myClient;
int keepRunning;
int msgCtr;
void sigHandler(int sig) {
keepRunning = 0;
}
void mqttOnMessage(struct mosquitto *mosq, void *obj, const struct mosquitto_message *message) {
msgCtr++;
try {
CqlResult res;
std::string query;
std::stringstream datestr, sidstr;
time_t t = time(NULL);
struct tm *now = localtime(&t);
datestr << "'" << (now->tm_year + 1900) << "-" << (now->tm_mon + 1)
<< "-" << (now->tm_mday) << " " << (now->tm_hour) << ":"
<< (now->tm_min) << ":" << (now->tm_sec) << "'";
sidstr << msgCtr;
query = "INSERT INTO sensordata (sid, ts, value) VALUES ( " + sidstr.str() + ", " + datestr.str() + ", 3.141 );";
myClient->execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
}
catch(TTransportException te){
printf("TP Exception: %s [%d]\n", te.what(), te.getType());
}catch(InvalidRequestException ire){
printf("IRE Exception: %s [%s]\n", ire.what(), ire.why.c_str());
}catch(NotFoundException nfe){
printf("NF Exception: %s\n", nfe.what());
}
}
int main(void) {
boost::shared_ptr<TSocket> sock;
boost::shared_ptr<TTransport> tr;
......@@ -38,24 +80,145 @@ int main(void) {
prot = boost::shared_ptr<TBinaryProtocol>(new TBinaryProtocol(tr));
try {
CassandraClient myClient(prot);
myClient = new CassandraClient(prot);
tr->open();
myClient.describe_cluster_name(clusterName);
start:
myClient->describe_cluster_name(clusterName);
printf("Cluster name: %s\n", clusterName.c_str());
int dcdbKeyspace = -1;
printf("Keyspaces:\n");
std::vector<KsDef> keySpaces;
myClient.describe_keyspaces(keySpaces);
myClient->describe_keyspaces(keySpaces);
for (int i=0; i<keySpaces.size(); i++) {
printf(" [%d]: %s\n", i, keySpaces[i].name.c_str());
if(keySpaces[i].name == "dcdb") {
dcdbKeyspace = i;
}
}
CqlResult res;
std::string query = "USE " + keySpaces[0].name;
printf("Sending CQL statement: %s...", query.c_str());
myClient.execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
std::string query;
if (dcdbKeyspace<0) {
printf("Creating dcdb keyspace...\n");
query = "CREATE KEYSPACE dcdb WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': '1'};";
printf("Sending CQL statement: %s", query.c_str());
myClient->execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
printf(" --> Success!\n");
printf("Starting over...\n\n");
goto start;
}
else {
printf("Using existing keyspace dcdb...\n");
}
query = "USE dcdb;";
printf("Sending CQL statement: %s", query.c_str());
myClient->execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
printf(" --> Success!\n");
int sensordataCf = -1;
printf("Column families in dcdb:\n");
for (int i=0; i<keySpaces[dcdbKeyspace].cf_defs.size(); i++) {
printf(" [%d]: %s\n", i, keySpaces[dcdbKeyspace].cf_defs[i].name.c_str());
if (keySpaces[dcdbKeyspace].cf_defs[i].name == "sensordata") {
sensordataCf = i;
}
}
if (sensordataCf<0) {
printf("Creating sensordata column familiy...\n");
query = "CREATE TABLE sensordata ( sid int, ts timestamp, value float, PRIMARY KEY (sid, ts)) WITH COMPACT STORAGE;";
printf("Sending CQL statement: %s", query.c_str());
myClient->execute_cql3_query(res, query, Compression::NONE, ConsistencyLevel::ONE);
printf(" --> Success!\n");
printf("Starting over...\n\n");
goto start;
}
else {
printf("Using existing sensordata column familiy.\n");
}
/* Should have the keyspace and the column familiy in the system now, subscribe to local mqtt broker */
int mosqMajor, mosqMinor, mosqRevision;
mosquitto_lib_version(&mosqMajor, &mosqMinor, &mosqRevision);
printf("Initializing Mosquitto Library Version %d.%d.%d\n", mosqMajor, mosqMinor, mosqRevision);
mosquitto_lib_init();
/* Init mosquitto struct */
struct mosquitto* mosq;
mosq = mosquitto_new("CollectAgent", false, NULL);
if (!mosq) {
perror(NULL);
exit(EXIT_FAILURE);
}
/* Connect to the broker */
printf("Connecting to broker...");
fflush(stdout);
if (mosquitto_connect(mosq, "localhost", 1883, 1000) != MOSQ_ERR_SUCCESS) {
perror("\nCould not connect to host");
exit(EXIT_FAILURE);
}
printf(" Done.\n");
/* Catch SIGINT signals */
signal(SIGINT, sigHandler);
/* Subscribe to anything */
printf("Subscribing to anything on the broker (Pattern #)...");
fflush(stdout);
if (mosquitto_subscribe(mosq, NULL, "#", 0) != MOSQ_ERR_SUCCESS) {
perror("\nCould not subscribe");
exit(EXIT_FAILURE);
}
printf(" Done.\n");
/* Set the callback for mosquitto */
printf("Configuring message callback...");
fflush(stdout);
mosquitto_message_callback_set(mosq, mqttOnMessage);
printf(" Done.\n");
/* Here comes the main loop */
printf("Starting mqtt loop thread...");
fflush(stdout);
if (mosquitto_loop_start(mosq) != MOSQ_ERR_SUCCESS) {
perror("\nCould not start mosquitto thread");
exit(EXIT_FAILURE);
}
printf(" Done.\n");
keepRunning = 1;
timeval start, end;
double elapsed;
while(keepRunning) {
gettimeofday(&start, NULL);
sleep(10);
/* not really thread safe but will do the job */
gettimeofday(&end, NULL);
elapsed = (end.tv_sec - start.tv_sec) * 1000.0;
elapsed += (end.tv_usec - start.tv_usec) / 1000.0;
printf("Message rate: %f messages/second\n", (msgCtr/elapsed)*1000.0);
msgCtr = 0;
}
printf("Cleaning up...");
/* Stop the mosquitto loop */
mosquitto_loop_stop(mosq, true);
mosquitto_disconnect(mosq);
/* Disconnect from Cassandra */
tr->close();
printf(" Done.\n");
#if 0
int resB = 0;
while (resB < keySpaces[0].cf_defs.size()) {
query = "SELECT * FROM " + keySpaces[0].cf_defs[resB].name + ";";
......@@ -80,6 +243,7 @@ int main(void) {
}
resB++;
}
#endif
}
catch(TTransportException te){
printf("TP Exception: %s [%d]\n", te.what(), te.getType());
......
CXXFLAGS = -O0 -g -Wall -fmessage-length=0 -I../../thrift/lib/cpp/src/ -I/opt/local/include/
CXXFLAGS = -O0 -g -Wall -fmessage-length=0 -I../../thrift/lib/cpp/src/ -I/opt/local/include/ -I../../mosquitto-1.1/lib/
OBJS = CollectAgent.o cassandra/Cassandra.o cassandra/cassandra_constants.o cassandra/cassandra_types.o
LIBS = -L../../thrift/lib/cpp/.libs/ -lthrift
LIBS = -L../../thrift/lib/cpp/.libs/ -lthrift -L../../mosquitto-1.1/lib/ -lmosquitto -lssl -lcrypto -lpthread
TARGET = CollectAgent
SUBTARGETS = cassandra-lib
SUBTARGETS = cassandra-lib check-thrift-env
P = $(shell cd ../../thrift/lib/cpp/.libs/;pwd)
U = $(shell uname)
$(TARGET): $(SUBTARGETS) $(OBJS)
$(CXX) -o $(TARGET) $(OBJS) $(LIBS)
all: $(TARGET)
check-thrift-env:
@if [ "$U" = "Darwin" ]; then \
R=`echo $$DYLD_LIBRARY_PATH | grep $P`; \
if [ "$$R" = "" ]; then \
echo "******************************************"; echo "Please type: "; echo ' export DYLD_LIBRARY_PATH=$$DYLD_LIBRARY_PATH:$P'; echo "******************************************"; \
fi; \
fi
cassandra-lib: cassandra-headers
cassandra-headers:
......@@ -27,4 +38,4 @@ clean: clean-cassandra-headers
rm -f $(OBJS) $(TARGET)
clean-cassandra-headers:
rm -rf cassandra
\ No newline at end of file
rm -rf cassandra
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment