Commit a2bb421d authored by Axel Auweter's avatar Axel Auweter
Browse files

Added decoding of PUBLISH messages (topic, payload) to SimpleMQTTServer.

FilePusher now accepts a topic string at the command line.
parent ef350209
...@@ -72,20 +72,22 @@ void mqttCallback(SimpleMQTTMessage *msg) ...@@ -72,20 +72,22 @@ void mqttCallback(SimpleMQTTMessage *msg)
msg->dump(); msg->dump();
if (msg->isPublish()) {
#if 0 #if 0
key = convert(&sid, sizeof(msgCtr)); key = convert(&sid, sizeof(msgCtr));
name = convert(&ts, sizeof(ts)); name = convert(&ts, sizeof(ts));
float tmp = 3.141; float tmp = 3.141;
value = convert(&tmp, 4); value = convert(&tmp, 4);
c.name = name; c.name = name;
c.value = value; c.value = value;
c.__isset.value = true; c.__isset.value = true;
c.timestamp = ts/1000; c.timestamp = ts/1000;
c.__isset.timestamp = true; c.__isset.timestamp = true;
myClient->insert(key, cparent, c, ConsistencyLevel::ONE); myClient->insert(key, cparent, c, ConsistencyLevel::ONE);
#endif #endif
}
} }
catch(TTransportException *te){ catch(TTransportException *te){
cout << "TP Exception: " << te->what() << "[" << te->getType() << "]\n"; cout << "TP Exception: " << te->what() << "[" << te->getType() << "]\n";
......
...@@ -25,6 +25,8 @@ SimpleMQTTMessage::SimpleMQTTMessage() ...@@ -25,6 +25,8 @@ SimpleMQTTMessage::SimpleMQTTMessage()
remainingRaw = NULL; remainingRaw = NULL;
remainingLength = 0; remainingLength = 0;
fixedHeaderLength = 0; fixedHeaderLength = 0;
payloadPtr = NULL;
topicLen = 0;
} }
SimpleMQTTMessage::~SimpleMQTTMessage() SimpleMQTTMessage::~SimpleMQTTMessage()
...@@ -228,6 +230,13 @@ void SimpleMQTTMessage::dump() ...@@ -228,6 +230,13 @@ void SimpleMQTTMessage::dump()
<< ", RETAIN=" << hex << (int)fixedHeader.bits.retain << "\n"; << ", RETAIN=" << hex << (int)fixedHeader.bits.retain << "\n";
cout << " Bytes Processed: " << bytesProcessed << "\n"; cout << " Bytes Processed: " << bytesProcessed << "\n";
cout << " Remaining Length: " << remainingLength << "\n"; cout << " Remaining Length: " << remainingLength << "\n";
if (isPublish()) {
cout << " Message Topic: " << getTopic() << "\n";
cout << " Message Length: " << getPayloadLength() << "\n";
cout << " Message Payload: " << string((char*)getPayload(), getPayloadLength()) << "\n";
}
#ifdef SimpleMQTTVerbose #ifdef SimpleMQTTVerbose
coutMtx.unlock(); coutMtx.unlock();
#endif #endif
...@@ -242,3 +251,58 @@ bool SimpleMQTTMessage::isPublish() ...@@ -242,3 +251,58 @@ bool SimpleMQTTMessage::isPublish()
{ {
return complete() && fixedHeader.bits.type == MQTT_PUBLISH; return complete() && fixedHeader.bits.type == MQTT_PUBLISH;
} }
string SimpleMQTTMessage::getTopic()
{
/*
* The topic is contained at the beginning of the remainingRaw field
* if the message is a publish message.
* Bytes 0 and 1 of the remainingRaw field encode the string length.
*/
uint8_t *data;
int msb, lsb;
if(!isPublish())
return NULL;
data = (uint8_t*)remainingRaw;
msb = data[0];
lsb = data[1];
topicLen = (msb<<8) | lsb;
payloadPtr = &data[2]+topicLen;
return string((const char *)(data+2), topicLen);
}
size_t SimpleMQTTMessage::getPayloadLength()
{
if(!isPublish())
return 0;
if(!payloadPtr || !topicLen) {
uint8_t msb, lsb, *data;
data = (uint8_t*)remainingRaw;
msb = data[0];
lsb = data[1];
topicLen = (((size_t)msb)<<8) | lsb;
payloadPtr = data+2+topicLen;
}
if(!payloadPtr || !topicLen) {
return 0;
}
return remainingLength - ((uint8_t*)payloadPtr - (uint8_t*)remainingRaw);
}
void* SimpleMQTTMessage::getPayload()
{
if(!isPublish())
return NULL;
if(!payloadPtr)
getPayloadLength();
return payloadPtr;
}
...@@ -54,8 +54,10 @@ protected: ...@@ -54,8 +54,10 @@ protected:
MQTTFixedHeader fixedHeader; MQTTFixedHeader fixedHeader;
size_t fixedHeaderLength; size_t fixedHeaderLength;
size_t bytesProcessed; size_t bytesProcessed;
void *remainingRaw;
size_t remainingLength; size_t remainingLength;
size_t topicLen;
void *remainingRaw;
void *payloadPtr;
ssize_t decodeFixedHeader(void* buf, size_t len); ssize_t decodeFixedHeader(void* buf, size_t len);
ssize_t receiveMessage(void* buf, size_t len); ssize_t receiveMessage(void* buf, size_t len);
...@@ -64,7 +66,9 @@ public: ...@@ -64,7 +66,9 @@ public:
ssize_t appendRawData(void* buf, size_t len); ssize_t appendRawData(void* buf, size_t len);
bool complete(); bool complete();
bool isPublish(); bool isPublish();
// std::string getTopic(); std::string getTopic();
size_t getPayloadLength();
void* getPayload();
void dump(); void dump();
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment