Commit 53421384 authored by Michael Ott's avatar Michael Ott
Browse files

Add support for QoS and MQTT CONNECT, DISCONNECT, and PINGREQ messages.

parent 6b791e3c
......@@ -113,7 +113,7 @@ struct httpHandler_t {
};
void mqttCallback(SimpleMQTTMessage *msg)
int mqttCallback(SimpleMQTTMessage *msg)
{
/*
* Increment the msgCtr/vmsgCtr for statistics.
......@@ -144,8 +144,7 @@ void mqttCallback(SimpleMQTTMessage *msg)
//...otherwise this message is malformed -> ignore...
else {
cout << "Message malformed\n";
delete msg;
return;
return 1;
}
/*
......@@ -179,10 +178,11 @@ void mqttCallback(SimpleMQTTMessage *msg)
#if 1
else {
cout << "Wrong topic format: " << msg->getTopic() << "\n";
return 1;
}
#endif
}
delete msg;
return 0;
}
......
......@@ -93,7 +93,7 @@
//#define SimpleMQTTVerbose
#include "simplemqttservermessage.h"
typedef void (*SimpleMQTTMessageCallback)(SimpleMQTTMessage*);
typedef int (*SimpleMQTTMessageCallback)(SimpleMQTTMessage*);
#include "simplemqttserverthread.h"
/*
......
......@@ -25,6 +25,7 @@
//================================================================================
#include "simplemqttserver.h"
#include <arpa/inet.h>
using namespace std;
using namespace boost::system;
......@@ -44,8 +45,9 @@ SimpleMQTTMessage::SimpleMQTTMessage()
remainingRaw = NULL;
remainingLength = 0;
fixedHeaderLength = 0;
msgId = 0;
payloadLength = 0;
payloadPtr = NULL;
topicLen = 0;
}
SimpleMQTTMessage::~SimpleMQTTMessage()
......@@ -159,6 +161,36 @@ ssize_t SimpleMQTTMessage::receiveMessage(void* buf, size_t len)
* In this case, we have received the entire message.
*/
state = Complete;
switch(fixedHeader.type) {
case MQTT_PUBLISH: {
char* data = (char*) remainingRaw;
/*
* The topic is contained at the beginning of the remainingRaw field
* if the message is a publish message.
* Bytes 0 and 1 of the remainingRaw field encode the string length.
*/
ssize_t topicLen = ntohs(((uint16_t*) data)[0]);
data+= 2;
topic = string(data, topicLen);
data+= topicLen;
/* if qos is 1 or 2, the msg id follow in the next 2 bytes */
if (fixedHeader.qos > 0) {
msgId = ntohs(((uint16_t*) data)[0]);
data+= 2;
}
/* The rest of the message is its payload */
payloadPtr = (void*) data;
payloadLength = remainingLength - ((uint8_t*)payloadPtr - (uint8_t*)remainingRaw);
break;
}
case MQTT_PUBREL: {
msgId = ntohs(((uint16_t*) remainingRaw)[0]);
break;
}
}
}
else {
......@@ -246,10 +278,11 @@ void SimpleMQTTMessage::dump()
}
cout << ", Dup=" << hex << (int)fixedHeader.dup
<< ", QoS=" << hex << (int)fixedHeader.qos
<< ", RETAIN=" << hex << (int)fixedHeader.retain << "\n";
<< ", RETAIN=" << hex << (int)fixedHeader.retain << "\n" << dec;
cout << " Bytes Processed: " << bytesProcessed << "\n";
cout << " Remaining Length: " << remainingLength << "\n";
cout << " MessageID: " << msgId << "\n";
if (isPublish()) {
cout << " Message Topic: " << getTopic() << "\n";
cout << " Message Length: " << getPayloadLength() << "\n";
......@@ -275,57 +308,25 @@ uint8_t SimpleMQTTMessage::getType() {
return fixedHeader.type;
}
string SimpleMQTTMessage::getTopic()
const string& SimpleMQTTMessage::getTopic()
{
/*
* The topic is contained at the beginning of the remainingRaw field
* if the message is a publish message.
* Bytes 0 and 1 of the remainingRaw field encode the string length.
*/
uint8_t *data;
int msb, lsb;
if(!isPublish())
return NULL;
return topic;
}
data = (uint8_t*)remainingRaw;
msb = data[0];
lsb = data[1];
topicLen = (msb<<8) | lsb;
payloadPtr = &data[2]+topicLen;
uint16_t SimpleMQTTMessage::getMsgId() {
return msgId;
}
return string((const char *)(data+2), topicLen);
uint8_t SimpleMQTTMessage::getQoS() {
return fixedHeader.qos;
}
size_t SimpleMQTTMessage::getPayloadLength()
{
if(!isPublish())
return 0;
if(!payloadPtr || !topicLen) {
uint8_t msb, lsb, *data;
data = (uint8_t*)remainingRaw;
msb = data[0];
lsb = data[1];
topicLen = (((size_t)msb)<<8) | lsb;
payloadPtr = data+2+topicLen;
}
if(!payloadPtr || !topicLen) {
return 0;
}
return remainingLength - ((uint8_t*)payloadPtr - (uint8_t*)remainingRaw);
return payloadLength;
}
void* SimpleMQTTMessage::getPayload()
{
if(!isPublish())
return NULL;
if(!payloadPtr)
getPayloadLength();
return payloadPtr;
}
......@@ -74,8 +74,10 @@ protected:
size_t fixedHeaderLength;
size_t bytesProcessed;
size_t remainingLength;
size_t topicLen;
uint16_t msgId;
std::string topic;
void *remainingRaw;
size_t payloadLength;
void *payloadPtr;
ssize_t decodeFixedHeader(void* buf, size_t len);
......@@ -86,10 +88,11 @@ public:
bool complete();
bool isPublish();
uint8_t getType();
std::string getTopic();
const std::string& getTopic();
uint16_t getMsgId();
uint8_t getQoS();
size_t getPayloadLength();
void* getPayload();
void* getPayload();
void dump();
......
......@@ -255,6 +255,49 @@ SimpleMQTTServerAcceptThread::~SimpleMQTTServerAcceptThread()
{
}
int SimpleMQTTServerMessageThread::sendAck(int connectionId) {
uint8_t buf[] = {0, 0, 0, 0};
switch(msg[connectionId]->getType()) {
case MQTT_CONNECT: {
buf[0] = MQTT_CONNACK << 4;
buf[1] = 2;
break;
}
case MQTT_PUBLISH: {
if (msg[connectionId]->getQoS() == 0) {
return 1;
} else {
if (msg[connectionId]->getQoS() == 1) {
buf[0] = MQTT_PUBACK << 4;
} else {
buf[0] = MQTT_PUBREC << 4;
}
buf[1] = 2;
uint16_t msgId = htons(msg[connectionId]->getMsgId());
buf[2] = ((uint8_t*)&msgId)[0];
buf[3] = ((uint8_t*)&msgId)[1];
}
break;
}
case MQTT_PUBREL: {
buf[0] = MQTT_PUBCOMP << 4;
buf[1] = 2;
uint16_t msgId = htons(msg[connectionId]->getMsgId());
buf[2] = ((uint8_t*)&msgId)[0];
buf[3] = ((uint8_t*)&msgId)[1];
break;
}
case MQTT_PINGRESP: {
buf[0] = MQTT_PINGRESP << 4;
buf[1] = 0;
break;
}
default:
return 1;
}
return !(write(fds[connectionId].fd, buf, buf[1]+2) == buf[1]+2);
}
void SimpleMQTTServerMessageThread::run()
{
#ifdef SimpleMQTTVerbose
......@@ -354,28 +397,38 @@ void SimpleMQTTServerMessageThread::run()
#endif
switch(msg[connectionId]->getType()) {
case MQTT_CONNECT: {
cout << "Received CONNECT, sending CONNACK" << endl;
uint8_t buf[] = {MQTT_CONNACK << 4, 2, 0, 0};
write(fds[connectionId].fd, buf, sizeof(buf));
sendAck(connectionId);
break;
}
case MQTT_PUBLISH: {
if ((messageCallback(msg[connectionId]) == 0) && (msg[connectionId]->getQoS() > 0)) {
sendAck(connectionId);
}
break;
}
case MQTT_PUBREL: {
sendAck(connectionId);
break;
}
case MQTT_PINGREQ: {
sendAck(connectionId);
break;
}
case MQTT_DISCONNECT: {
cout << "Received DISCONNECT" << endl;
releaseConnection(connectionId);
break;
}
default: {
if (messageCallback) {
cout << "Calling Callback function" << endl;
messageCallback(msg[connectionId]);
}
else {
cout << "Nothing to do.." << endl;
delete msg[connectionId];
}
break;
}
}
delete msg[connectionId];
msg[connectionId] = NULL;
}
}
......@@ -456,16 +509,6 @@ void SimpleMQTTServerMessageThread::releaseConnection(int connectionId)
fds[connectionId].fd = -1;
fds[connectionId].events = 0;
fds[connectionId].revents = 0;
/*
* If the connection was closed while we were receiving
* a message, delete the corresponding message object.
*/
if(msg[connectionId]) {
delete msg[connectionId];
msg[connectionId] = NULL;
}
numConnections--;
#ifdef SimpleMQTTVerbose
......
......@@ -57,6 +57,7 @@ protected:
SimpleMQTTMessageCallback messageCallback;
void run();
int sendAck(int connectionId);
public:
int queueConnection(int newsock);
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment