Commit 6b791e3c authored by Michael Ott's avatar Michael Ott
Browse files

Add preliminary support for MQTT CONNECT and DISCONNECT messages

parent a0b600c8
...@@ -226,7 +226,7 @@ void SimpleMQTTMessage::dump() ...@@ -226,7 +226,7 @@ void SimpleMQTTMessage::dump()
default: cout << "Unknown state (bad!)"; break; default: cout << "Unknown state (bad!)"; break;
} }
cout << "\n Fixed Header: Type="; cout << "\n Fixed Header: Type=";
switch (fixedHeader.bits.type) { switch (fixedHeader.type) {
case MQTT_RESERVED: cout << "RESERVED"; break; case MQTT_RESERVED: cout << "RESERVED"; break;
case MQTT_CONNECT: cout << "CONNECT"; break; case MQTT_CONNECT: cout << "CONNECT"; break;
case MQTT_CONNACK: cout << "CONNACK"; break; case MQTT_CONNACK: cout << "CONNACK"; break;
...@@ -244,9 +244,9 @@ void SimpleMQTTMessage::dump() ...@@ -244,9 +244,9 @@ void SimpleMQTTMessage::dump()
case MQTT_DISCONNECT: cout << "DISCONNECT"; break; case MQTT_DISCONNECT: cout << "DISCONNECT"; break;
default: cout << "Unknown type (bad!)"; break; default: cout << "Unknown type (bad!)"; break;
} }
cout << ", Dup=" << hex << (int)fixedHeader.bits.dup cout << ", Dup=" << hex << (int)fixedHeader.dup
<< ", QoS=" << hex << (int)fixedHeader.bits.qos << ", QoS=" << hex << (int)fixedHeader.qos
<< ", RETAIN=" << hex << (int)fixedHeader.bits.retain << "\n"; << ", RETAIN=" << hex << (int)fixedHeader.retain << "\n";
cout << " Bytes Processed: " << bytesProcessed << "\n"; cout << " Bytes Processed: " << bytesProcessed << "\n";
cout << " Remaining Length: " << remainingLength << "\n"; cout << " Remaining Length: " << remainingLength << "\n";
...@@ -268,7 +268,11 @@ bool SimpleMQTTMessage::complete() ...@@ -268,7 +268,11 @@ bool SimpleMQTTMessage::complete()
bool SimpleMQTTMessage::isPublish() bool SimpleMQTTMessage::isPublish()
{ {
return complete() && fixedHeader.bits.type == MQTT_PUBLISH; return complete() && fixedHeader.type == MQTT_PUBLISH;
}
uint8_t SimpleMQTTMessage::getType() {
return fixedHeader.type;
} }
string SimpleMQTTMessage::getTopic() string SimpleMQTTMessage::getTopic()
......
...@@ -53,7 +53,7 @@ typedef union { ...@@ -53,7 +53,7 @@ typedef union {
uint8_t dup :1; uint8_t dup :1;
uint8_t type :4; uint8_t type :4;
uint8_t remaining_length[4]; uint8_t remaining_length[4];
} bits; };
} MQTTFixedHeader; } MQTTFixedHeader;
#pragma pack(pop) #pragma pack(pop)
...@@ -85,9 +85,11 @@ public: ...@@ -85,9 +85,11 @@ public:
ssize_t appendRawData(void* buf, size_t len); ssize_t appendRawData(void* buf, size_t len);
bool complete(); bool complete();
bool isPublish(); bool isPublish();
uint8_t getType();
std::string getTopic(); std::string getTopic();
size_t getPayloadLength(); size_t getPayloadLength();
void* getPayload(); void* getPayload();
void dump(); void dump();
......
...@@ -352,13 +352,31 @@ void SimpleMQTTServerMessageThread::run() ...@@ -352,13 +352,31 @@ void SimpleMQTTServerMessageThread::run()
cout << "Completed receiving SimpleMQTTMessage (" << msg[connectionId] << ")...\n"; cout << "Completed receiving SimpleMQTTMessage (" << msg[connectionId] << ")...\n";
coutMtx.unlock(); coutMtx.unlock();
#endif #endif
if (messageCallback) { switch(msg[connectionId]->getType()) {
messageCallback(msg[connectionId]); case MQTT_CONNECT: {
cout << "Received CONNECT, sending CONNACK" << endl;
uint8_t buf[] = {MQTT_CONNACK << 4, 2, 0, 0};
write(fds[connectionId].fd, buf, sizeof(buf));
break;
}
case MQTT_DISCONNECT: {
cout << "Received DISCONNECT" << endl;
releaseConnection(connectionId);
break;
} }
else { default: {
if (messageCallback) {
cout << "Calling Callback function" << endl;
messageCallback(msg[connectionId]);
}
else {
cout << "Nothing to do.." << endl;
delete msg[connectionId]; delete msg[connectionId];
}
break;
} }
msg[connectionId] = NULL; }
msg[connectionId] = NULL;
} }
} }
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment