Commit 40c489d1 authored by Axel Auweter's avatar Axel Auweter
Browse files

Work in progress on CollectAgent/SimpleMQTTServer. Listens and accepts new

connections and does thread management.
parent e3489c91
/*
* simplemqttmessage.h
*
* Created on: May 3, 2013
* Author: Axel Auweter
*/
#ifndef SIMPLEMQTTMESSAGE_H_
#define SIMPLEMQTTMESSAGE_H_
#define MQTT_CONNECT 0x1
#define MQTT_CONNACK 0x2
#define MQTT_PUBLISH 0x3
#define MQTT_PUBACK 0x4
#define MQTT_PUBREC 0x5
#define MQTT_PUBREL 0x6
#define MQTT_PUBCOMP 0x7
#define MQTT_SUBSCRIBE 0x8
#define MQTT_SUBACK 0x9
#define MQTT_UNSUBSCRIBE 0xa
#define MQTT_UNSUBACK 0xb
#define MQTT_PINGREQ 0xc
#define MQTT_PINGRESP 0xd
#define MQTT_DISCONNECT 0xe
#pragma pack(push,1)
typedef union {
uint8_t raw[5];
struct {
uint8_t retain :1;
uint8_t qos :2;
uint8_t dup :1;
uint8_t type :4;
uint8_t remaining_length[4];
} bits;
} MQTTFixedHeader;
#pragma pack(pop)
#endif /* SIMPLEMQTTMESSAGE_H_ */
/*
* simplemqttserverthread.cpp
*
* Created on: May 3, 2013
* Author: Axel Auweter
*/
#include "simplemqttserver.h"
using namespace std;
static boost::mutex threadMtx;
#ifdef SimpleMQTTVerbose
static boost::mutex coutMtx;
#endif
SimpleMQTTServerThread::SimpleMQTTServerThread()
{
/*
* Start the thread at the 'launch' function. The 'launch'
* function will take care of calling the child-specific
* 'run' function which contains the thread's main loop.
*
* Special care must be taken not to call the run function
* before the child has gone through its constructor. Thus,
* we lock a mutex that will cause the 'launch' function to
* wait until the mutex is released by the child's
* constructor.
*/
threadMtx.lock();
terminate = false;
if (pthread_create(&t, NULL, launch, this) != 0) {
cout << "Error creating new MQTT server thread.\n";
exit(EXIT_FAILURE);
}
#ifdef SimpleMQTTVerbose
coutMtx.lock();
cout << "Started Thread (" << t << ") of class (" << this << ")...\n";
coutMtx.unlock();
#endif
}
SimpleMQTTServerThread::~SimpleMQTTServerThread()
{
/*
* Terminate the thread and join it to ensure proper
* thread termination before the class is destroyed.
*/
terminate = true;
if (pthread_join(t, NULL) != 0) {
cout << "Error joining thread.\n";
exit(EXIT_FAILURE);
}
#ifdef SimpleMQTTVerbose
coutMtx.lock();
cout << "Terminated Thread (" << t << ") of class (" << this << ")...\n";
coutMtx.unlock();
#endif
}
void* SimpleMQTTServerThread::launch(void *selfPtr)
{
#ifdef SimpleMQTTVerbose
coutMtx.lock();
cout << "Running launcher for class (" << selfPtr << ")...\n";
coutMtx.unlock();
#endif
/*
* The following lines guard the run function to be called
* before the constructor of the child class has finished.
* The lock is released at the end of the child's constructor.
*/
threadMtx.lock();
threadMtx.unlock();
/*
* Call the child's run function...
*/
((SimpleMQTTServerThread*)selfPtr)->run();
return NULL;
}
void SimpleMQTTServerAcceptThread::run()
{
#ifdef SimpleMQTTVerbose
coutMtx.lock();
cout << "Running SimpleMQTTServerAcceptThread for socket " << socket << "...\n";
coutMtx.unlock();
#endif
int newsock = -1;
struct pollfd fd;
while (!terminate) {
/*
* Wait for something to happen on the socket...
*/
fd.fd = socket;
fd.events = POLLIN | POLLPRI;
fd.revents = 0;
if(poll(&fd, 1, SimpleMQTTPollTimeout) > 0 && (fd.revents & (POLLIN | POLLPRI))) {
/*
* Find a free message thread to take over
* the next incoming connection.
*/
SimpleMQTTServerMessageThread *mt = NULL;
for (boost::ptr_list<SimpleMQTTServerMessageThread>::iterator i = messageThreads.begin(); i != messageThreads.end(); i++) {
if (i->hasCapacity())
mt = &*i;
}
/*
* In case no free message thread was found,
* try to create a new one as long as we do
* not exceed the maximum.
*/
if (!mt) {
if (messageThreads.size() >= SimpleMQTTMaxThreadsPerSocket) {
cout << "Warning: socket " << socket << " cannot accept more connections.\n";
// FIXME: There must be nicer ways to handle such situations...
sleep(1);
continue;
}
messageThreads.push_back(new SimpleMQTTServerMessageThread());
continue;
}
/*
* Take the next incoming connection.
*/
newsock = accept(socket, NULL, 0);
if (newsock != -1)
mt->assignConnection(newsock);
}
}
}
SimpleMQTTServerAcceptThread::SimpleMQTTServerAcceptThread(int listenSock)
{
socket = listenSock;
/*
* Release the lock to indicate that the constructor has
* finished. This causes the launcher to call the run function.
*/
threadMtx.unlock();
}
SimpleMQTTServerAcceptThread::~SimpleMQTTServerAcceptThread()
{
}
void SimpleMQTTServerMessageThread::run()
{
#ifdef SimpleMQTTVerbose
coutMtx.lock();
cout << "Running SimpleMQTTServerMessageThread (" << this << ")...\n";
coutMtx.unlock();
#endif
int numfds = -1;
while (!terminate) {
/*
* Check for activity on our sockets...
*/
numfds = poll(fds, SimpleMQTTConnectionsPerThread, SimpleMQTTPollTimeout);
if (numfds == -1)
throw new system_error(errno, system_category(), "Error in poll().");
/*
* Apparently, there is work to do...
*/
if (numfds > 0) {
for (int connectionId=0; connectionId<SimpleMQTTConnectionsPerThread; connectionId++) {
if (fds[connectionId].fd) {
if (fds[connectionId].revents & POLLIN) {
int nbytes;
char buf[8];
cout << "Data ready on fd: " << fds[connectionId].fd << "\n";
nbytes = read(fds[connectionId].fd, buf, 7);
buf[7] = 0;
/*
* If read() returns 0, the conncetion was closed on the
* remote side. In this case, release it from our list.
*/
if (nbytes == 0) {
releaseConnection(connectionId);
}
}
}
}
}
}
}
bool SimpleMQTTServerMessageThread::hasCapacity()
{
return numConnections <= SimpleMQTTConnectionsPerThread;
}
void SimpleMQTTServerMessageThread::assignConnection(int newsock)
{
/*
* Find the first free slot in fds and assign connection.
*/
fdsMtx.lock();
for (int i=0; i<SimpleMQTTConnectionsPerThread; i++) {
if (fds[i].fd == -1) {
fds[i].fd = newsock;
fds[i].events = POLLIN;
fds[i].revents = 0;
numConnections++;
break;
}
}
fdsMtx.unlock();
#ifdef SimpleMQTTVerbose
coutMtx.lock();
cout << "Assigned connection (" << this << ")...\n";
coutMtx.unlock();
#endif
}
void SimpleMQTTServerMessageThread::releaseConnection(int connectionId)
{
/*
* Close the connection an clean up.
*/
fdsMtx.lock();
close(fds[connectionId].fd);
fds[connectionId].fd = -1;
fds[connectionId].events = 0;
fds[connectionId].revents = 0;
numConnections--;
fdsMtx.unlock();
}
SimpleMQTTServerMessageThread::SimpleMQTTServerMessageThread()
{
/*
* Clear the fds array. Warning: This will only work when the
* size of the fds array is determined at compile time.
*/
memset(fds, -1, sizeof(fds));
/*
* Initialize the number of active connections to 0.
*/
numConnections = 0;
/*
* Release the lock to indicate that the constructor has
* finished. This causes the launcher to call the run function.
*/
threadMtx.unlock();
}
SimpleMQTTServerMessageThread::~SimpleMQTTServerMessageThread()
{
}
/*
* simplemqttserverthread.h
*
* Created on: May 3, 2013
* Author: Axel Auweter
*/
#ifndef SIMPLEMQTTSERVERTHREAD_H_
#define SIMPLEMQTTSERVERTHREAD_H_
#define MQTT_INVALID_SOCKET -1
class SimpleMQTTServerThread
{
protected:
pthread_t t;
bool terminate;
static void* launch(void* thisPtr);
virtual void run() = 0;
public:
SimpleMQTTServerThread();
virtual
~SimpleMQTTServerThread();
};
class SimpleMQTTServerMessageThread : SimpleMQTTServerThread
{
protected:
int numConnections;
struct pollfd fds[SimpleMQTTConnectionsPerThread];
boost::mutex fdsMtx;
void run();
public:
void assignConnection(int newsock);
void releaseConnection(int connectionId);
bool hasCapacity();
SimpleMQTTServerMessageThread();
virtual
~SimpleMQTTServerMessageThread();
};
class SimpleMQTTServerAcceptThread : SimpleMQTTServerThread
{
protected:
int socket;
void run();
boost::ptr_list<SimpleMQTTServerMessageThread> messageThreads;
public:
SimpleMQTTServerAcceptThread(int listenSock);
virtual
~SimpleMQTTServerAcceptThread();
};
#endif /* SIMPLEMQTTSERVERTHREAD_H_ */
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment