Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

Commit 5abfb2dd authored by Axel Auweter's avatar Axel Auweter
Browse files

Implemented the ability to set a callback function that will be called from

the SimpleMQTTServer after receiving an MQTT message.
parent 370035c1
......@@ -307,6 +307,13 @@ int main(void) {
}
#else
void mqttCallback(SimpleMQTTMessage *msg)
{
cout << "Hello from mqttCallback!\n";
msg->dump();
delete msg;
}
int main(void) {
MQTTFixedHeader mh;
......@@ -323,6 +330,7 @@ int main(void) {
try{
SimpleMQTTServer ms;
ms.setMessageCallback(mqttCallback);
ms.start();
sleep(60);
ms.stop();
......
......@@ -118,7 +118,7 @@ void SimpleMQTTServer::start()
* Start one accept thread per socket.
*/
for (unsigned int i=0; i<listenSockets.size(); i++)
acceptThreads.push_back(new SimpleMQTTServerAcceptThread(listenSockets[i]));
acceptThreads.push_back(new SimpleMQTTServerAcceptThread(listenSockets[i], messageCallback));
}
void SimpleMQTTServer::stop()
......@@ -129,6 +129,19 @@ void SimpleMQTTServer::stop()
acceptThreads.clear();
}
void SimpleMQTTServer::setMessageCallback(SimpleMQTTMessageCallback callback)
{
/*
* Set the function that will be called for each received
* MQTT message and propagate to all accept threads.
*/
messageCallback = callback;
for (boost::ptr_list<SimpleMQTTServerAcceptThread>::iterator i = acceptThreads.begin(); i != acceptThreads.end(); i++) {
(*i).setMessageCallback(callback);
}
}
void SimpleMQTTServer::init(string addr, string port)
{
/*
......
......@@ -67,6 +67,7 @@
#define SimpleMQTTVerbose
#include "simplemqttservermessage.h"
typedef void (*SimpleMQTTMessageCallback)(SimpleMQTTMessage*);
#include "simplemqttserverthread.h"
/*
......@@ -89,10 +90,9 @@ class SimpleMQTTServer
protected:
std::string listenAddress;
std::string listenPort;
boost::ptr_vector<int> listenSockets;
boost::ptr_list<SimpleMQTTServerAcceptThread> acceptThreads;
SimpleMQTTMessageCallback messageCallback = NULL;
void init(std::string addr, std::string port);
void initSockets(void);
......@@ -100,6 +100,7 @@ protected:
public:
void start();
void stop();
void setMessageCallback(SimpleMQTTMessageCallback callback);
SimpleMQTTServer();
SimpleMQTTServer(std::string addr, std::string port);
......
......@@ -85,7 +85,7 @@ ssize_t SimpleMQTTMessage::decodeFixedHeader(void* buf, size_t len)
/*
* If this message has no variable length part,
* we're alreay done. Otherwise, we need to
* we're already done. Otherwise, we need to
* receive a little more.
*/
if (remainingLength == 0)
......
......@@ -128,7 +128,7 @@ void SimpleMQTTServerAcceptThread::run()
sleep(1);
continue;
}
messageThreads.push_back(new SimpleMQTTServerMessageThread());
messageThreads.push_back(new SimpleMQTTServerMessageThread(messageCallback));
continue;
}
......@@ -154,9 +154,26 @@ void SimpleMQTTServerAcceptThread::run()
}
}
SimpleMQTTServerAcceptThread::SimpleMQTTServerAcceptThread(int listenSock)
void SimpleMQTTServerAcceptThread::setMessageCallback(SimpleMQTTMessageCallback callback)
{
messageCallback = callback;
/*
* Set the function that will be called for each received
* MQTT message and propagate to all message threads.
*/
messageCallback = callback;
for (boost::ptr_list<SimpleMQTTServerMessageThread>::iterator i = messageThreads.begin(); i != messageThreads.end(); i++) {
(*i).setMessageCallback(callback);
}
}
SimpleMQTTServerAcceptThread::SimpleMQTTServerAcceptThread(int listenSock, SimpleMQTTMessageCallback callback)
{
/*
* Assign socket and message callback.
*/
socket = listenSock;
messageCallback = callback;
/*
* Release the lock to indicate that the constructor has
......@@ -250,19 +267,22 @@ void SimpleMQTTServerMessageThread::run()
*/
if (msg[connectionId]->complete()) {
/*
* TODO: Forward message upstream!
* For now, to test things, simply discard the message!
* Forward message upstream!
* If there is a callback function, it is responsible
* for freeing the message using delete. Otherwise, we
* have to take care of this, here.
*/
#ifdef SimpleMQTTVerbose
coutMtx.lock();
cout << "Completed receiving SimpleMQTTMessage (" << msg[connectionId] << ")...\n";
coutMtx.unlock();
msg[connectionId]->dump();
coutMtx.lock();
cout << "Now freeing SimpleMQTTMessage (" << msg[connectionId] << ")...\n";
coutMtx.unlock();
#endif
delete msg[connectionId];
if (messageCallback) {
messageCallback(msg[connectionId]);
}
else {
delete msg[connectionId];
}
msg[connectionId] = NULL;
}
}
......@@ -326,7 +346,12 @@ void SimpleMQTTServerMessageThread::releaseConnection(int connectionId)
#endif
}
SimpleMQTTServerMessageThread::SimpleMQTTServerMessageThread()
void SimpleMQTTServerMessageThread::setMessageCallback(SimpleMQTTMessageCallback callback)
{
messageCallback = callback;
}
SimpleMQTTServerMessageThread::SimpleMQTTServerMessageThread(SimpleMQTTMessageCallback callback)
{
/*
* Clear the fds array. Warning: This will only work when the
......@@ -335,9 +360,11 @@ SimpleMQTTServerMessageThread::SimpleMQTTServerMessageThread()
memset(fds, -1, sizeof(fds));
/*
* Initialize the number of active connections to 0.
* Initialize the number of active connections to 0 and set
* the messageCallback function.
*/
numConnections = 0;
messageCallback = callback;
/*
* Release the lock to indicate that the constructor has
......
......@@ -32,6 +32,7 @@ protected:
struct pollfd fds[SimpleMQTTConnectionsPerThread];
SimpleMQTTMessage* msg[SimpleMQTTConnectionsPerThread];
boost::mutex fdsMtx;
SimpleMQTTMessageCallback messageCallback;
void run();
......@@ -39,8 +40,9 @@ public:
void assignConnection(int newsock);
void releaseConnection(int connectionId);
bool hasCapacity();
void setMessageCallback(SimpleMQTTMessageCallback callback);
SimpleMQTTServerMessageThread();
SimpleMQTTServerMessageThread(SimpleMQTTMessageCallback callback);
virtual
~SimpleMQTTServerMessageThread();
};
......@@ -49,12 +51,15 @@ class SimpleMQTTServerAcceptThread : SimpleMQTTServerThread
{
protected:
int socket;
void run();
boost::ptr_list<SimpleMQTTServerMessageThread> messageThreads;
SimpleMQTTMessageCallback messageCallback;
void run();
public:
SimpleMQTTServerAcceptThread(int listenSock);
void setMessageCallback(SimpleMQTTMessageCallback callback);
SimpleMQTTServerAcceptThread(int listenSock, SimpleMQTTMessageCallback callback);
virtual
~SimpleMQTTServerAcceptThread();
};
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment