Commit 370035c1 authored by Axel Auweter's avatar Axel Auweter
Browse files

Implemented SimpleMQTTMessage class. Creation of MQTT messages from network

stream works. Fixed a race condition to the message threads struct pollfd fds
array.
parent 075b3220
include ../config.mk
CXXFLAGS = $(shell ./cxx11flags.sh $(CXX) -O0 -g -Wall -Wno-null-conversion -fmessage-length=0 -I$(DCDBDEPLOYPATH)/include/ | head -1)
OBJS = collectagent.o simplemqttserver.o simplemqttserverthread.o cassandra/Cassandra.o cassandra/cassandra_constants.o cassandra/cassandra_types.o
OBJS = collectagent.o simplemqttserver.o simplemqttserverthread.o simplemqttservermessage.o cassandra/Cassandra.o cassandra/cassandra_constants.o cassandra/cassandra_types.o
SRC = $(patsubst cassandra/%,,$(OBJS:.o=.cpp))
LIBS = $(shell ./cxx11flags.sh $(CXX) -L$(DCDBDEPLOYPATH)/lib/ -lthrift -lmosquitto -lssl -lcrypto -lpthread -lboost_system | tail -1)
TARGET = collectagent
......
/*
* simplemqttservermessage.cpp
*
* Created on: May 26, 2013
* Author: Axel Auweter
*/
#include "simplemqttserver.h"
using namespace std;
#ifdef SimpleMQTTVerbose
static boost::mutex coutMtx;
#endif
SimpleMQTTMessage::SimpleMQTTMessage()
{
/*
* Initialize all class variables to an
* empty message state.
*/
state = Empty;
bytesProcessed = 0;
remainingRaw = NULL;
remainingLength = 0;
fixedHeaderLength = 0;
}
SimpleMQTTMessage::~SimpleMQTTMessage()
{
/*
* Free the memory allocated for the remainingRaw buffer.
*/
if(remainingRaw)
free(remainingRaw);
}
ssize_t SimpleMQTTMessage::decodeFixedHeader(void* buf, size_t len)
{
/*
* Decode the MQTTFixedHeader.
*/
char* readPtr = (char*)buf;
ssize_t lbytes = len;
/*
* Check for the first byte in the MQTT Fixed Header.
*/
if (state == Empty) {
if (lbytes && bytesProcessed == 0) {
fixedHeader.raw[0] = *readPtr;
lbytes--;
bytesProcessed++;
readPtr++;
state = DecodingFixedHeader;
}
else {
state = Error;
}
}
/*
* Decode the length of the message.
*/
if (state == DecodingFixedHeader) {
if (lbytes) {
int multiplier;
char digit;
do {
digit = *readPtr;
fixedHeader.raw[bytesProcessed] = digit;
multiplier = 1 << ((bytesProcessed-1)*7);
remainingLength += (digit & 127) * multiplier;
lbytes--;
bytesProcessed++;
readPtr++;
}
while (lbytes && (bytesProcessed < 5) && ((digit&128) != 0));
if ((digit&128) == 0) {
fixedHeaderLength = bytesProcessed;
/*
* If this message has no variable length part,
* we're alreay done. Otherwise, we need to
* receive a little more.
*/
if (remainingLength == 0)
state = Complete;
else
state = FixedHeaderOk;
bytesProcessed = 0;
}
else if (bytesProcessed >= 5) {
state = Error;
}
}
}
return len-lbytes;
}
ssize_t SimpleMQTTMessage::receiveMessage(void* buf, size_t len)
{
/*
* Receive the remainder of an MQTT message.
*/
ssize_t lbytes = len;
/*
* If we are in this function for the first time,
* we need to allocate the buffer.
*/
if (!remainingRaw) {
remainingRaw = malloc(remainingLength);
if (!remainingRaw) {
throw new system_error(errno, system_category(), "Error in SimpleMQTTMessage::receiveMessage().");
}
}
/*
* Fill the buffer with all we have (until full).
*/
char* writePtr = (char*)remainingRaw;
writePtr += bytesProcessed;
if (bytesProcessed+len >= remainingLength) {
memcpy(writePtr, buf, remainingLength-bytesProcessed);
lbytes -= remainingLength;
bytesProcessed += remainingLength-bytesProcessed;
/*
* In this case, we have received the entire message.
*/
state = Complete;
}
else {
memcpy(writePtr, buf, len);
lbytes -= len;
bytesProcessed += len;
}
return len-lbytes;
}
ssize_t SimpleMQTTMessage::appendRawData(void* buf, size_t len)
{
/*
* This function appends len bytes to the message.
* The function returns the number of processed bytes.
*/
char* readPtr = (char*)buf;
ssize_t bytes = 0, lbytes = len;
/*
* Process the data in buf.
*/
while(lbytes > 0 && state != Error && state != Complete) {
switch(state) {
case Empty:
case DecodingFixedHeader:
bytes = decodeFixedHeader(readPtr, lbytes);
break;
case FixedHeaderOk:
bytes = receiveMessage(readPtr, lbytes);
break;
default:
cout << "Unhandled state in SimpleMQTTMessage::appendRawData!\n";
break;
}
readPtr += bytes;
lbytes -= bytes;
}
#if 0
coutMtx.lock();
cout << "Finished appendRawData() function. Results follow...\n";
coutMtx.unlock();
dump();
#endif
return len-lbytes;
}
void SimpleMQTTMessage::dump()
{
coutMtx.lock();
cout << "Dump of SimpleMQTTMessage (" << this << "):\n";
cout << " State: ";
switch (state) {
case Empty: cout << "Empty"; break;
case DecodingFixedHeader: cout << "DecodingFixedHeader"; break;
case FixedHeaderOk: cout << "FixedHeaderOk"; break;
case Complete: cout << "Complete"; break;
case Error: cout << "Error"; break;
default: cout << "Unknown state (bad!)"; break;
}
cout << "\n Fixed Header: Type=";
switch (fixedHeader.bits.type) {
case MQTT_RESERVED: cout << "RESERVED"; break;
case MQTT_CONNECT: cout << "CONNECT"; break;
case MQTT_CONNACK: cout << "CONNACK"; break;
case MQTT_PUBLISH: cout << "PUBLISH"; break;
case MQTT_PUBACK: cout << "PUBACK"; break;
case MQTT_PUBREC: cout << "PUBREC"; break;
case MQTT_PUBREL: cout << "PUBREL"; break;
case MQTT_PUBCOMP: cout << "PUBCOMP"; break;
case MQTT_SUBSCRIBE: cout << "SUBSCRIBE"; break;
case MQTT_SUBACK: cout << "SUBACK"; break;
case MQTT_UNSUBSCRIBE: cout << "UNSUBSCRIBE"; break;
case MQTT_UNSUBACK: cout << "UNSUBACK"; break;
case MQTT_PINGREQ: cout << "PINGREQ"; break;
case MQTT_PINGRESP: cout << "PINGRESP"; break;
case MQTT_DISCONNECT: cout << "DISCONNECT"; break;
default: cout << "Unknown type (bad!)"; break;
}
cout << ", Dup=" << hex << (int)fixedHeader.bits.dup
<< ", QoS=" << hex << (int)fixedHeader.bits.qos
<< ", RETAIN=" << hex << (int)fixedHeader.bits.retain << "\n";
cout << " Bytes Processed: " << bytesProcessed << "\n";
cout << " Remaining Length: " << remainingLength << "\n";
coutMtx.unlock();
}
bool SimpleMQTTMessage::complete()
{
return state == Complete;
}
/*
* simplemqttmessage.h
* simplemqttservermessage.h
*
* Created on: May 3, 2013
* Author: Axel Auweter
......@@ -8,6 +8,7 @@
#ifndef SIMPLEMQTTMESSAGE_H_
#define SIMPLEMQTTMESSAGE_H_
#define MQTT_RESERVED 0x0
#define MQTT_CONNECT 0x1
#define MQTT_CONNACK 0x2
#define MQTT_PUBLISH 0x3
......@@ -38,5 +39,35 @@ typedef union {
#pragma pack(pop)
enum MQTTMessageState {
Empty,
DecodingFixedHeader,
FixedHeaderOk,
Complete,
Error
};
class SimpleMQTTMessage
{
protected:
MQTTMessageState state;
MQTTFixedHeader fixedHeader;
size_t fixedHeaderLength;
size_t bytesProcessed;
void *remainingRaw;
size_t remainingLength;
ssize_t decodeFixedHeader(void* buf, size_t len);
ssize_t receiveMessage(void* buf, size_t len);
public:
ssize_t appendRawData(void* buf, size_t len);
bool complete();
void dump();
SimpleMQTTMessage();
virtual ~SimpleMQTTMessage();
};
#endif /* SIMPLEMQTTMESSAGE_H_ */
......@@ -135,10 +135,21 @@ void SimpleMQTTServerAcceptThread::run()
/*
* Take the next incoming connection.
*/
#ifdef SimpleMQTTVerbose
coutMtx.lock();
cout << "Thread (" << this << ") waiting in accept()...\n";
coutMtx.unlock();
#endif
newsock = accept(socket, NULL, 0);
if (newsock != -1)
if (newsock != -1) {
int opt = fcntl(newsock, F_GETFL, 0);
if (opt == -1 || fcntl(newsock, F_SETFL, opt | O_NONBLOCK)==-1) {
close(newsock);
}
else {
mt->assignConnection(newsock);
}
}
}
}
}
......@@ -173,7 +184,9 @@ void SimpleMQTTServerMessageThread::run()
/*
* Check for activity on our sockets...
*/
fdsMtx.lock();
numfds = poll(fds, SimpleMQTTConnectionsPerThread, SimpleMQTTPollTimeout);
fdsMtx.unlock();
if (numfds == -1)
throw new system_error(errno, system_category(), "Error in poll().");
......@@ -183,19 +196,76 @@ void SimpleMQTTServerMessageThread::run()
*/
if (numfds > 0) {
for (int connectionId=0; connectionId<SimpleMQTTConnectionsPerThread; connectionId++) {
if (fds[connectionId].fd) {
if (fds[connectionId].fd != -1) {
#ifdef SimpleMQTTVerbose
coutMtx.lock();
cout << "fd(" << fds[connectionId].fd << ") revents: " << hex << fds[connectionId].revents << "\n";
coutMtx.unlock();
#endif
if (fds[connectionId].revents & POLLIN) {
int nbytes;
char* readPtr;
ssize_t rbytes, lbytes, bytes;
cout << "Data ready on fd: " << fds[connectionId].fd << "\n";
nbytes = read(fds[connectionId].fd, inbuf, SimpleMQTTReadBufferSize);
rbytes = read(fds[connectionId].fd, inbuf, SimpleMQTTReadBufferSize);
readPtr = inbuf;
lbytes = rbytes;
/*
* If read() returns 0, the connection was closed on the
* remote side. In this case, release it from our list.
*/
if (nbytes == 0) {
if (rbytes == 0) {
releaseConnection(connectionId);
}
while (lbytes > 0) {
/*
* Allocate new message if there is none.
*/
if (!msg[connectionId]) {
msg[connectionId] = new SimpleMQTTMessage();
if (!msg[connectionId]) {
cout << "Warning: Out of memory! Discarding network input!\n";
continue;
}
#ifdef SimpleMQTTVerbose
coutMtx.lock();
cout << "Allocated new SimpleMQTTMessage (" << msg[connectionId] << ")...\n";
coutMtx.unlock();
#endif
}
/*
* Append received data to message.
*/
bytes = msg[connectionId]->appendRawData(readPtr, lbytes);
readPtr += bytes;
lbytes -= bytes;
/*
* Check if message is complete.
*/
if (msg[connectionId]->complete()) {
/*
* TODO: Forward message upstream!
* For now, to test things, simply discard the message!
*/
#ifdef SimpleMQTTVerbose
coutMtx.lock();
cout << "Completed receiving SimpleMQTTMessage (" << msg[connectionId] << ")...\n";
coutMtx.unlock();
msg[connectionId]->dump();
coutMtx.lock();
cout << "Now freeing SimpleMQTTMessage (" << msg[connectionId] << ")...\n";
coutMtx.unlock();
#endif
delete msg[connectionId];
msg[connectionId] = NULL;
}
}
}
}
}
......@@ -217,21 +287,21 @@ void SimpleMQTTServerMessageThread::assignConnection(int newsock)
for (int i=0; i<SimpleMQTTConnectionsPerThread; i++) {
if (fds[i].fd == -1) {
fds[i].fd = newsock;
fds[i].events = POLLIN;
fds[i].events = POLLIN | POLLPRI | POLLHUP;
fds[i].revents = 0;
numConnections++;
break;
}
}
fdsMtx.unlock();
#ifdef SimpleMQTTVerbose
coutMtx.lock();
cout << "Assigned connection (" << this << ")...\n";
cout << "Assigned connection (" << this << ", " << i << ", " << fds[i].fd << ")...\n";
coutMtx.unlock();
#endif
break;
}
}
fdsMtx.unlock();
}
void SimpleMQTTServerMessageThread::releaseConnection(int connectionId)
......@@ -240,6 +310,7 @@ void SimpleMQTTServerMessageThread::releaseConnection(int connectionId)
* Close the connection an clean up.
*/
fdsMtx.lock();
shutdown(fds[connectionId].fd, SHUT_RDWR);
close(fds[connectionId].fd);
fds[connectionId].fd = -1;
fds[connectionId].events = 0;
......@@ -247,6 +318,12 @@ void SimpleMQTTServerMessageThread::releaseConnection(int connectionId)
numConnections--;
fdsMtx.unlock();
#ifdef SimpleMQTTVerbose
coutMtx.lock();
cout << "Released connection (" << this << ", " << connectionId << ")...\n";
coutMtx.unlock();
#endif
}
SimpleMQTTServerMessageThread::SimpleMQTTServerMessageThread()
......
......@@ -30,6 +30,7 @@ class SimpleMQTTServerMessageThread : SimpleMQTTServerThread
protected:
int numConnections;
struct pollfd fds[SimpleMQTTConnectionsPerThread];
SimpleMQTTMessage* msg[SimpleMQTTConnectionsPerThread];
boost::mutex fdsMtx;
void run();
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment