2.12.2021, 9:00 - 11:00: Due to updates GitLab may be unavailable for some minutes between 09:00 and 11:00.

simplemqttservermessage.cpp 9.29 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//================================================================================
// Name        : simplemqttservermessage.cpp
// Author      : Axel Auweter
// Copyright   : Leibniz Supercomputing Centre
// Description : Implementation of a class for receiving a simple MQTT message
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2011-2016 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
26
27

#include "simplemqttserver.h"
28
#include <arpa/inet.h> 
29
30

using namespace std;
31
using namespace boost::system;
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46

#ifdef SimpleMQTTVerbose
static boost::mutex coutMtx;
#endif

SimpleMQTTMessage::SimpleMQTTMessage()
{
  /*
   * Initialize all class variables to an
   * empty message state.
   */
  state = Empty;
  bytesProcessed = 0;
  remainingRaw = NULL;
  remainingLength = 0;
47
  bufferLength = 0;
48
  fixedHeaderLength = 0;
49
50
  msgId = 0;
  payloadLength = 0;
51
  payloadPtr = NULL;
52
53
54
55
56
57
58
59
60
61
62
}

SimpleMQTTMessage::~SimpleMQTTMessage()
{
  /*
   * Free the memory allocated for the remainingRaw buffer.
   */
  if(remainingRaw)
    free(remainingRaw);
}

63
64
65
66
67
68
69
70
71
72
73
void SimpleMQTTMessage::clear() {
    //We reset all variables except for the internal buffer, which is recycled
    state = Empty;
    bytesProcessed = 0;
    remainingLength = 0;
    fixedHeaderLength = 0;
    msgId = 0;
    payloadLength = 0;
    payloadPtr = NULL;
}

74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
ssize_t SimpleMQTTMessage::decodeFixedHeader(void* buf, size_t len)
{
  /*
   * Decode the MQTTFixedHeader.
   */
  char* readPtr = (char*)buf;
  ssize_t lbytes = len;

  /*
   * Check for the first byte in the MQTT Fixed Header.
   */
  if (state == Empty) {
      if (lbytes && bytesProcessed == 0) {
          fixedHeader.raw[0] = *readPtr;

          lbytes--;
          bytesProcessed++;
          readPtr++;

          state = DecodingFixedHeader;
      }
      else {
          state = Error;
      }
  }

  /*
   * Decode the length of the message.
   */
  if (state == DecodingFixedHeader) {
      if (lbytes) {
          int multiplier;
          char digit;
          do {
              digit = *readPtr;
              fixedHeader.raw[bytesProcessed] = digit;
              multiplier = 1 << ((bytesProcessed-1)*7);
              remainingLength += (digit & 127) * multiplier;

              lbytes--;
              bytesProcessed++;
              readPtr++;
          }
          while (lbytes && (bytesProcessed < 5) && ((digit&128) != 0));

          if ((digit&128) == 0) {
              fixedHeaderLength = bytesProcessed;

              /*
               * If this message has no variable length part,
124
               * we're already done. Otherwise, we need to
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
               * receive a little more.
               */
              if (remainingLength == 0)
                state = Complete;
              else
                state = FixedHeaderOk;

              bytesProcessed = 0;
          }
          else if (bytesProcessed >= 5) {
              state = Error;
          }
      }
  }

  return len-lbytes;
}

ssize_t SimpleMQTTMessage::receiveMessage(void* buf, size_t len)
{
  /*
   * Receive the remainder of an MQTT message.
   */
  ssize_t lbytes = len;

  /*
   * If we are in this function for the first time,
   * we need to allocate the buffer.
   */
154
155
  if (!remainingRaw || remainingLength > bufferLength) {
      if(remainingRaw) free(remainingRaw);
156
      remainingRaw = malloc(remainingLength);
157
      bufferLength = remainingLength;
158
      if (!remainingRaw) {
159
          throw new boost::system::system_error(errno, boost::system::system_category(), "Error in SimpleMQTTMessage::receiveMessage().");
160
161
162
163
164
165
166
167
168
169
170
      }
  }

  /*
   * Fill the buffer with all we have (until full).
   */
  char* writePtr = (char*)remainingRaw;
  writePtr += bytesProcessed;

  if (bytesProcessed+len >= remainingLength) {
    memcpy(writePtr, buf, remainingLength-bytesProcessed);
171
    lbytes -= remainingLength-bytesProcessed;
172
173
174
175
176
177
    bytesProcessed += remainingLength-bytesProcessed;

    /*
     * In this case, we have received the entire message.
     */
    state = Complete;
178
179
180
181
182
183
184
185
186
187
    switch(fixedHeader.type) {
      case MQTT_PUBLISH: {
        char* data = (char*) remainingRaw;
        /*
         * The topic is contained at the beginning of the remainingRaw field
         * if the message is a publish message.
         * Bytes 0 and 1 of the remainingRaw field encode the string length.
         */
        ssize_t topicLen = ntohs(((uint16_t*) data)[0]);
        data+= 2;
188

189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
        topic = string(data, topicLen);
        data+= topicLen;
        
        /* if qos is 1 or 2, the msg id follow in the next 2 bytes */
        if (fixedHeader.qos > 0) {
          msgId = ntohs(((uint16_t*) data)[0]);
          data+= 2;
        }
        
        /* The rest of the message is its payload */
        payloadPtr = (void*) data;
        payloadLength = remainingLength - ((uint8_t*)payloadPtr - (uint8_t*)remainingRaw);
        break;
      }
      case MQTT_PUBREL: {
        msgId = ntohs(((uint16_t*) remainingRaw)[0]);
        break;
      }
    }
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248

  }
  else {
    memcpy(writePtr, buf, len);
    lbytes -= len;
    bytesProcessed += len;
  }

  return len-lbytes;
}

ssize_t SimpleMQTTMessage::appendRawData(void* buf, size_t len)
{
  /*
   * This function appends len bytes to the message.
   * The function returns the number of processed bytes.
   */
  char* readPtr = (char*)buf;
  ssize_t bytes = 0, lbytes = len;

  /*
   * Process the data in buf.
   */
  while(lbytes > 0 && state != Error && state != Complete) {
      switch(state) {
      case Empty:
      case DecodingFixedHeader:
        bytes = decodeFixedHeader(readPtr, lbytes);
        break;
      case FixedHeaderOk:
        bytes = receiveMessage(readPtr, lbytes);
        break;
      default:
        cout << "Unhandled state in SimpleMQTTMessage::appendRawData!\n";
        break;
      }

      readPtr += bytes;
      lbytes -= bytes;
  }

249
#ifdef SimpleMQTTVerbose
250
251
252
253
254
255
256
257
258
259
260
  coutMtx.lock();
  cout << "Finished appendRawData() function. Results follow...\n";
  coutMtx.unlock();
  dump();
#endif

  return len-lbytes;
}

void SimpleMQTTMessage::dump()
{
261
#ifdef SimpleMQTTVerbose
262
  coutMtx.lock();
263
#endif
264
265
266
267
268
269
270
271
272
273
274
  cout << "Dump of SimpleMQTTMessage (" << this << "):\n";
  cout << "    State: ";
  switch (state) {
  case Empty: cout << "Empty"; break;
  case DecodingFixedHeader: cout << "DecodingFixedHeader"; break;
  case FixedHeaderOk: cout << "FixedHeaderOk"; break;
  case Complete: cout << "Complete"; break;
  case Error: cout << "Error"; break;
  default: cout << "Unknown state (bad!)"; break;
  }
  cout << "\n    Fixed Header: Type=";
275
  switch (fixedHeader.type) {
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
  case MQTT_RESERVED: cout << "RESERVED"; break;
  case MQTT_CONNECT: cout << "CONNECT"; break;
  case MQTT_CONNACK: cout << "CONNACK"; break;
  case MQTT_PUBLISH: cout << "PUBLISH"; break;
  case MQTT_PUBACK: cout << "PUBACK"; break;
  case MQTT_PUBREC: cout << "PUBREC"; break;
  case MQTT_PUBREL: cout << "PUBREL"; break;
  case MQTT_PUBCOMP: cout << "PUBCOMP"; break;
  case MQTT_SUBSCRIBE: cout << "SUBSCRIBE"; break;
  case MQTT_SUBACK: cout << "SUBACK"; break;
  case MQTT_UNSUBSCRIBE: cout << "UNSUBSCRIBE"; break;
  case MQTT_UNSUBACK: cout << "UNSUBACK"; break;
  case MQTT_PINGREQ: cout << "PINGREQ"; break;
  case MQTT_PINGRESP: cout << "PINGRESP"; break;
  case MQTT_DISCONNECT: cout << "DISCONNECT"; break;
  default: cout << "Unknown type (bad!)"; break;
  }
293
294
  cout << ", Dup=" << hex << (int)fixedHeader.dup
      << ", QoS=" << hex << (int)fixedHeader.qos
295
      << ", RETAIN=" << hex << (int)fixedHeader.retain << "\n" << dec;
296
297
  cout << "    Bytes Processed: " << bytesProcessed << "\n";
  cout << "    Remaining Length: " << remainingLength << "\n";
298

299
  cout << "    MessageID: " << msgId << "\n";
300
301
302
303
304
305
  if (isPublish()) {
      cout << "    Message Topic: " << getTopic() << "\n";
      cout << "    Message Length: " << getPayloadLength() << "\n";
      cout << "    Message Payload: " << string((char*)getPayload(), getPayloadLength()) << "\n";
  }

306
#ifdef SimpleMQTTVerbose
307
  coutMtx.unlock();
308
#endif
309
310
311
312
313
314
}

bool SimpleMQTTMessage::complete()
{
  return state == Complete;
}
315
316
317

bool SimpleMQTTMessage::isPublish()
{
318
319
320
321
322
  return complete() && fixedHeader.type == MQTT_PUBLISH;
}

uint8_t SimpleMQTTMessage::getType() {
  return fixedHeader.type;
323
}
324

325
const string& SimpleMQTTMessage::getTopic()
326
{
327
328
  return topic;
}
329

330
331
332
uint16_t SimpleMQTTMessage::getMsgId() {
  return msgId;
}
333

334
335
uint8_t SimpleMQTTMessage::getQoS() {
  return fixedHeader.qos;
336
337
338
339
}

size_t SimpleMQTTMessage::getPayloadLength()
{
340
  return payloadLength;
341
342
343
344
345
346
}

void* SimpleMQTTMessage::getPayload()
{
  return payloadPtr;
}