Commit 5c2b02fd authored by Michael Ott's avatar Michael Ott
Browse files

Make SimpleMQTTServerThread lock-free

parent 2fca5c61
......@@ -55,6 +55,10 @@
#define SimpleMQTTConnectionsPerThread 16
#endif
#ifndef SimpleMQTTConnectionsQueueLength
#define SimpleMQTTConnectionsQueueLength 4
#endif
/*
* Define the maximum number of threads that will be spawned per socket.
*/
......
......@@ -26,6 +26,7 @@
#include "simplemqttserver.h"
#include "abrt.h"
#include "messaging.h"
using namespace std;
using namespace boost::system;
......@@ -69,7 +70,7 @@ SimpleMQTTServerThread::SimpleMQTTServerThread()
#ifdef SimpleMQTTVerbose
coutMtx.lock();
cout << "Started Thread (" << t << ") of class (" << this << ")...\n";
cout << "Started Thread (" << hex << t << ") of class (" << this << ")...\n";
coutMtx.unlock();
#endif
}
......@@ -155,46 +156,6 @@ void SimpleMQTTServerAcceptThread::run()
fd.events = POLLIN | POLLPRI;
fd.revents = 0;
if(poll(&fd, 1, SimpleMQTTPollTimeout) > 0 && (fd.revents & (POLLIN | POLLPRI))) {
/*
* Find a free message thread to take over
* the next incoming connection.
*/
SimpleMQTTServerMessageThread *mt = NULL;
for (boost::ptr_list<SimpleMQTTServerMessageThread>::iterator i = messageThreads.begin(); i != messageThreads.end(); i++) {
if (i->hasCapacity()) {
mt = &*i;
#ifdef SimpleMQTTVerbose
coutMtx.lock();
cout << "Found a message thread with capacity: " << mt << "...\n";
coutMtx.unlock();
#endif
}
else {
#ifdef SimpleMQTTVerbose
coutMtx.lock();
cout << "No capacity in message thread: " << mt << "...\n";
coutMtx.unlock();
#endif
}
}
/*
* In case no free message thread was found,
* try to create a new one as long as we do
* not exceed the maximum.
*/
if (!mt) {
if (messageThreads.size() >= SimpleMQTTMaxThreadsPerSocket) {
cout << "Warning: socket " << socket << " cannot accept more connections.\n";
// FIXME: There must be nicer ways to handle such situations...
sleep(1);
continue;
}
messageThreads.push_back(new SimpleMQTTServerMessageThread(messageCallback));
continue;
}
/*
* Take the next incoming connection.
*/
......@@ -208,28 +169,57 @@ void SimpleMQTTServerAcceptThread::run()
int opt = fcntl(newsock, F_GETFL, 0);
if (opt == -1 || fcntl(newsock, F_SETFL, opt | O_NONBLOCK)==-1) {
#ifdef SimpleMQTTVerbose
coutMtx.lock();
cout << "Setting socket to non-blocking in thread (" << this << ") failed for socket " << newsock << "...\n";
coutMtx.unlock();
coutMtx.lock();
cout << "Setting socket to non-blocking in thread (" << this << ") failed for socket " << newsock << "...\n";
coutMtx.unlock();
#endif
close(newsock);
}
else {
close(newsock);
} else {
#ifdef SimpleMQTTVerbose
coutMtx.lock();
cout << "Successfully set socket " << newsock << " nonblocing in thread (" << this << ")...\n";
coutMtx.unlock();
coutMtx.lock();
cout << "Successfully set socket " << newsock << " non-blocking in thread (" << this << ")...\n";
coutMtx.unlock();
#endif
/*
* Find a free message thread to take over
* the next incoming connection.
*/
boost::ptr_list<SimpleMQTTServerMessageThread>::iterator mt = messageThreads.begin();
while ((mt != messageThreads.end()) && mt->queueConnection(newsock)) {
mt++;
}
if (mt == messageThreads.end()) {
/*
* In case no free message thread was found,
* try to create a new one as long as we do
* not exceed the maximum.
*/
if (messageThreads.size() >= SimpleMQTTMaxThreadsPerSocket) {
cout << "Warning: socket " << socket << " cannot accept more connections.\n";
// FIXME: There must be nicer ways to handle such situations...
close(newsock);
break;
}
messageThreads.push_back(new SimpleMQTTServerMessageThread(messageCallback));
messageThreads.back().queueConnection(newsock);
}
#ifdef SimpleMQTTVerbose
else {
coutMtx.lock();
cout << "Found a message thread with capacity: " << &*mt << "\n";
coutMtx.unlock();
}
#endif
mt->assignConnection(newsock);
}
}
else {
}
#ifdef SimpleMQTTVerbose
coutMtx.lock();
cout << "Accept() in thread (" << this << ") returned -1...\n";
coutMtx.unlock();
else {
coutMtx.lock();
cout << "Accept() in thread (" << this << ") returned -1...\n";
coutMtx.unlock();
}
#endif
}
}
}
}
......@@ -278,19 +268,14 @@ void SimpleMQTTServerMessageThread::run()
while (!terminate) {
/*
* Check for activity on our sockets...
* Check for pending connections
*/
fdsMtx.lock();
numfds = poll(fds, SimpleMQTTConnectionsPerThread, SimpleMQTTPollTimeout);
fdsMtx.unlock();
assignConnections();
/*
* Allow other threads to run (avoid instantaneous
* re-acquisition of the fdsMtx lock in the next
* iteration while connections are pending in the
* acceptThread).
* Check for activity on our sockets...
*/
yield();
numfds = poll(fds, SimpleMQTTConnectionsPerThread, SimpleMQTTPollTimeout);
if (numfds == -1)
throw new boost::system::system_error(errno, boost::system::system_category(), "Error in poll().");
......@@ -303,7 +288,7 @@ void SimpleMQTTServerMessageThread::run()
if (fds[connectionId].fd != -1) {
#ifdef SimpleMQTTVerbose
coutMtx.lock();
cout << "fd(" << fds[connectionId].fd << ") revents: " << hex << fds[connectionId].revents << "\n";
cout << "fd(" << fds[connectionId].fd << ") revents: " << hex << fds[connectionId].revents << dec << "\n";
coutMtx.unlock();
#endif
......@@ -379,35 +364,63 @@ void SimpleMQTTServerMessageThread::run()
}
}
bool SimpleMQTTServerMessageThread::hasCapacity()
{
return numConnections < SimpleMQTTConnectionsPerThread;
int SimpleMQTTServerMessageThread::queueConnection(int newsock) {
if (numConnections >= SimpleMQTTConnectionsPerThread) {
#ifdef SimpleMQTTVerbose
coutMtx.lock();
cout << "Maximum number of connections reached, rejecting new connection (" << this << ", " << newsock << ", " << numConnections << ")...\n";
coutMtx.unlock();
#endif
return 2;
}
int nextWritePos = (fdQueueWritePos + 1) % SimpleMQTTConnectionsQueueLength;
if (nextWritePos == fdQueueReadPos) {
#ifdef SimpleMQTTVerbose
coutMtx.lock();
cout << "Queue is full, rejecting new connection (" << this << ", " << newsock << ", " << fdQueueWritePos << ")...\n";
coutMtx.unlock();
#endif
return 1;
}
#ifdef SimpleMQTTVerbose
coutMtx.lock();
cout << "Queued new connection (" << this << ", " << newsock << ", " << fdQueueWritePos << ")...\n";
coutMtx.unlock();
#endif
fdQueue[fdQueueWritePos] = newsock;
fdQueueWritePos = nextWritePos;
return 0;
}
void SimpleMQTTServerMessageThread::assignConnection(int newsock)
void SimpleMQTTServerMessageThread::assignConnections()
{
/*
* Find the first free slot in fds and assign connection.
*/
fdsMtx.lock();
for (int i=0; i<SimpleMQTTConnectionsPerThread; i++) {
if (fds[i].fd == -1) {
fds[i].events = POLLIN | POLLPRI | POLLHUP;
fds[i].revents = 0;
fds[i].fd = newsock;
while (fdQueueWritePos != fdQueueReadPos) {
/* There is no free fd slot at the moment, defer.. */
if (numConnections >= SimpleMQTTConnectionsPerThread) {
return;
}
numConnections++;
for (int i=0; i<SimpleMQTTConnectionsPerThread; i++) {
if (fds[i].fd == -1) {
fds[i].events = POLLIN | POLLPRI | POLLHUP;
fds[i].revents = 0;
fds[i].fd = fdQueue[fdQueueReadPos];
numConnections++;
fdQueueReadPos = (fdQueueReadPos + 1) % SimpleMQTTConnectionsQueueLength;
#ifdef SimpleMQTTVerbose
coutMtx.lock();
cout << "Assigned connection (" << this << ", " << i << ", " << fds[i].fd << ")...\n";
coutMtx.unlock();
coutMtx.lock();
cout << "Assigned connection (" << this << ", " << i << ", " << fds[i].fd << ")...\n";
coutMtx.unlock();
#endif
break;
}
}
}
fdsMtx.unlock();
}
void SimpleMQTTServerMessageThread::releaseConnection(int connectionId)
......@@ -415,7 +428,6 @@ void SimpleMQTTServerMessageThread::releaseConnection(int connectionId)
/*
* Close the connection an clean up.
*/
fdsMtx.lock();
shutdown(fds[connectionId].fd, SHUT_RDWR);
close(fds[connectionId].fd);
fds[connectionId].fd = -1;
......@@ -432,7 +444,6 @@ void SimpleMQTTServerMessageThread::releaseConnection(int connectionId)
}
numConnections--;
fdsMtx.unlock();
#ifdef SimpleMQTTVerbose
coutMtx.lock();
......@@ -473,6 +484,13 @@ SimpleMQTTServerMessageThread::SimpleMQTTServerMessageThread(SimpleMQTTMessageCa
numConnections = 0;
messageCallback = callback;
/*
* Initialize the fd queue for new connections
*/
fdQueue = new(int[SimpleMQTTConnectionsQueueLength]);
fdQueueReadPos = 0;
fdQueueWritePos = 0;
/*
* Release the lock to indicate that the constructor has
* finished. This causes the launcher to call the run function.
......@@ -490,6 +508,7 @@ SimpleMQTTServerMessageThread::~SimpleMQTTServerMessageThread()
* order to avoid fdsMtx to be accessed after destruction.
*/
terminate = true;
delete fdQueue;
cleanupMtx.lock();
cleanupMtx.unlock();
......
......@@ -51,13 +51,16 @@ protected:
int numConnections;
struct pollfd fds[SimpleMQTTConnectionsPerThread];
SimpleMQTTMessage* msg[SimpleMQTTConnectionsPerThread];
boost::mutex fdsMtx;
int *fdQueue;
int fdQueueReadPos;
int fdQueueWritePos;
SimpleMQTTMessageCallback messageCallback;
void run();
public:
void assignConnection(int newsock);
int queueConnection(int newsock);
void assignConnections();
void releaseConnection(int connectionId);
bool hasCapacity();
void setMessageCallback(SimpleMQTTMessageCallback callback);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment