/* * simplemqttserverthread.cpp * * Created on: May 3, 2013 * Author: Axel Auweter */ #include "simplemqttserver.h" #include "abrt.h" using namespace std; using namespace boost::system; #ifdef SimpleMQTTVerbose static boost::mutex coutMtx; #endif static boost::mutex launchMtx; SimpleMQTTServerThread::SimpleMQTTServerThread() { /* * Start the thread at the 'launch' function. The 'launch' * function will take care of calling the child-specific * 'run' function which contains the thread's main loop. * * Special care must be taken not to call the run function * before the child has gone through its constructor. Thus, * we lock a mutex (launchMtx) that will cause the 'launch' * function to wait until the mutex is released by the * child's constructor. * * A second mutex (cleanupMtx) is held to allow others to * wait for a thread to return from its child-specific * 'run' function. This is necessary for cases where the * child class has local objects that can only be destroyed * after the 'run' function has finished (to avoid access * to these objects in the 'run'function after destroying * them). In such cases, the child class destructor can try * to obtain the mutex in the destructor. */ launchMtx.lock(); cleanupMtx.lock(); terminate = false; if (pthread_create(&t, NULL, launch, this) != 0) { cout << "Error creating new MQTT server thread.\n"; abrt(EXIT_FAILURE, INTERR); } #ifdef SimpleMQTTVerbose coutMtx.lock(); cout << "Started Thread (" << t << ") of class (" << this << ")...\n"; coutMtx.unlock(); #endif } SimpleMQTTServerThread::~SimpleMQTTServerThread() { /* * Terminate the thread and join it to ensure proper * thread termination before the class is destroyed. */ terminate = true; if (pthread_join(t, NULL) != 0) { cout << "Error joining thread.\n"; abrt(EXIT_FAILURE, INTERR); } #ifdef SimpleMQTTVerbose coutMtx.lock(); cout << "Terminated Thread (" << t << ") of class (" << this << ")...\n"; coutMtx.unlock(); #endif } void* SimpleMQTTServerThread::launch(void *selfPtr) { #ifdef SimpleMQTTVerbose coutMtx.lock(); cout << "Running launcher for class (" << selfPtr << ")...\n"; coutMtx.unlock(); #endif /* * The following lines guard the run function to be called * before the constructor of the child class has finished. * The lock is released at the end of the child's constructor. */ launchMtx.lock(); launchMtx.unlock(); /* * Call the child's run function... */ ((SimpleMQTTServerThread*)selfPtr)->run(); /* * The thread will terminate. Unlock the cleanupMtx... */ ((SimpleMQTTServerThread*)selfPtr)->cleanupMtx.unlock(); return NULL; } void SimpleMQTTServerThread::yield() { /* * Yield this thread's execution to give way to other threads. * This is being used in the SimpleMQTTServer to allow the * accept thread acquiring the fdsMtx lock. Otherwise, in * particular on single-CPU systems (or when the acceptThread * and the messageThread share a core), waiting connections * may have to wait a long time before the lock will be given * to the acceptThread. */ boost::this_thread::yield(); } void SimpleMQTTServerAcceptThread::run() { #ifdef SimpleMQTTVerbose coutMtx.lock(); cout << "Running SimpleMQTTServerAcceptThread for socket " << socket << "...\n"; coutMtx.unlock(); #endif int newsock = -1; struct pollfd fd; while (!terminate) { /* * Wait for something to happen on the socket... */ fd.fd = socket; fd.events = POLLIN | POLLPRI; fd.revents = 0; if(poll(&fd, 1, SimpleMQTTPollTimeout) > 0 && (fd.revents & (POLLIN | POLLPRI))) { /* * Find a free message thread to take over * the next incoming connection. */ SimpleMQTTServerMessageThread *mt = NULL; for (boost::ptr_list::iterator i = messageThreads.begin(); i != messageThreads.end(); i++) { if (i->hasCapacity()) { mt = &*i; #ifdef SimpleMQTTVerbose coutMtx.lock(); cout << "Found a message thread with capacity: " << mt << "...\n"; coutMtx.unlock(); #endif } else { #ifdef SimpleMQTTVerbose coutMtx.lock(); cout << "No capacity in message thread: " << mt << "...\n"; coutMtx.unlock(); #endif } } /* * In case no free message thread was found, * try to create a new one as long as we do * not exceed the maximum. */ if (!mt) { if (messageThreads.size() >= SimpleMQTTMaxThreadsPerSocket) { cout << "Warning: socket " << socket << " cannot accept more connections.\n"; // FIXME: There must be nicer ways to handle such situations... sleep(1); continue; } messageThreads.push_back(new SimpleMQTTServerMessageThread(messageCallback)); continue; } /* * Take the next incoming connection. */ #ifdef SimpleMQTTVerbose coutMtx.lock(); cout << "Thread (" << this << ") waiting in accept()...\n"; coutMtx.unlock(); #endif newsock = accept(socket, NULL, 0); if (newsock != -1) { int opt = fcntl(newsock, F_GETFL, 0); if (opt == -1 || fcntl(newsock, F_SETFL, opt | O_NONBLOCK)==-1) { #ifdef SimpleMQTTVerbose coutMtx.lock(); cout << "Setting socket to non-blocking in thread (" << this << ") failed for socket " << newsock << "...\n"; coutMtx.unlock(); #endif close(newsock); } else { #ifdef SimpleMQTTVerbose coutMtx.lock(); cout << "Successfully set socket " << newsock << " nonblocing in thread (" << this << ")...\n"; coutMtx.unlock(); #endif mt->assignConnection(newsock); } } else { #ifdef SimpleMQTTVerbose coutMtx.lock(); cout << "Accept() in thread (" << this << ") returned -1...\n"; coutMtx.unlock(); #endif } } } } void SimpleMQTTServerAcceptThread::setMessageCallback(SimpleMQTTMessageCallback callback) { /* * Set the function that will be called for each received * MQTT message and propagate to all message threads. */ messageCallback = callback; for (boost::ptr_list::iterator i = messageThreads.begin(); i != messageThreads.end(); i++) { (*i).setMessageCallback(callback); } } SimpleMQTTServerAcceptThread::SimpleMQTTServerAcceptThread(int listenSock, SimpleMQTTMessageCallback callback) { /* * Assign socket and message callback. */ socket = listenSock; messageCallback = callback; /* * Release the lock to indicate that the constructor has * finished. This causes the launcher to call the run function. */ launchMtx.unlock(); } SimpleMQTTServerAcceptThread::~SimpleMQTTServerAcceptThread() { } void SimpleMQTTServerMessageThread::run() { #ifdef SimpleMQTTVerbose coutMtx.lock(); cout << "Running SimpleMQTTServerMessageThread (" << this << ")...\n"; coutMtx.unlock(); #endif int numfds = -1; char inbuf[SimpleMQTTReadBufferSize]; while (!terminate) { /* * Check for activity on our sockets... */ fdsMtx.lock(); numfds = poll(fds, SimpleMQTTConnectionsPerThread, SimpleMQTTPollTimeout); fdsMtx.unlock(); /* * Allow other threads to run (avoid instantaneous * re-acquisition of the fdsMtx lock in the next * iteration while connections are pending in the * acceptThread). */ yield(); if (numfds == -1) throw new boost::system::system_error(errno, boost::system::system_category(), "Error in poll()."); /* * Apparently, there is work to do... */ if (numfds > 0) { for (int connectionId=0; connectionId 0) { /* * Allocate new message if there is none. */ if (!msg[connectionId]) { msg[connectionId] = new SimpleMQTTMessage(); if (!msg[connectionId]) { cout << "Warning: Out of memory! Discarding network input!\n"; continue; } #ifdef SimpleMQTTVerbose coutMtx.lock(); cout << "Allocated new SimpleMQTTMessage (" << msg[connectionId] << ")...\n"; coutMtx.unlock(); #endif } /* * Append received data to message. */ bytes = msg[connectionId]->appendRawData(readPtr, lbytes); readPtr += bytes; lbytes -= bytes; /* * Check if message is complete. */ if (msg[connectionId]->complete()) { /* * Forward message upstream! * If there is a callback function, it is responsible * for freeing the message using delete. Otherwise, we * have to take care of this, here. */ #ifdef SimpleMQTTVerbose coutMtx.lock(); cout << "Completed receiving SimpleMQTTMessage (" << msg[connectionId] << ")...\n"; coutMtx.unlock(); #endif if (messageCallback) { messageCallback(msg[connectionId]); } else { delete msg[connectionId]; } msg[connectionId] = NULL; } } } } } } } } bool SimpleMQTTServerMessageThread::hasCapacity() { return numConnections < SimpleMQTTConnectionsPerThread; } void SimpleMQTTServerMessageThread::assignConnection(int newsock) { /* * Find the first free slot in fds and assign connection. */ fdsMtx.lock(); for (int i=0; i