simplemqttserverthread.cpp 10.9 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
/*
 * simplemqttserverthread.cpp
 *
 *  Created on: May 3, 2013
 *      Author: Axel Auweter
 */

#include "simplemqttserver.h"

using namespace std;

static boost::mutex threadMtx;

#ifdef SimpleMQTTVerbose
static boost::mutex coutMtx;
#endif

SimpleMQTTServerThread::SimpleMQTTServerThread()
{
  /*
   * Start the thread at the 'launch' function. The 'launch'
   * function will take care of calling the child-specific
   * 'run' function which contains the thread's main loop.
   *
   * Special care must be taken not to call the run function
   * before the child has gone through its constructor. Thus,
   * we lock a mutex that will cause the 'launch' function to
   * wait until the mutex is released by the child's
   * constructor.
   */
  threadMtx.lock();

  terminate = false;
  if (pthread_create(&t, NULL, launch, this) != 0) {
      cout << "Error creating new MQTT server thread.\n";
      exit(EXIT_FAILURE);
  }

#ifdef SimpleMQTTVerbose
  coutMtx.lock();
  cout << "Started Thread (" << t << ") of class (" << this << ")...\n";
  coutMtx.unlock();
#endif
}

SimpleMQTTServerThread::~SimpleMQTTServerThread()
{
  /*
   * Terminate the thread and join it to ensure proper
   * thread termination before the class is destroyed.
   */
  terminate = true;
  if (pthread_join(t, NULL) != 0) {
      cout << "Error joining thread.\n";
      exit(EXIT_FAILURE);
  }

#ifdef SimpleMQTTVerbose
  coutMtx.lock();
  cout << "Terminated Thread (" << t << ") of class (" << this << ")...\n";
  coutMtx.unlock();
#endif
}

void* SimpleMQTTServerThread::launch(void *selfPtr)
{
#ifdef SimpleMQTTVerbose
  coutMtx.lock();
  cout << "Running launcher for class (" << selfPtr << ")...\n";
  coutMtx.unlock();
#endif

  /*
   * The following lines guard the run function to be called
   * before the constructor of the child class has finished.
   * The lock is released at the end of the child's constructor.
   */
  threadMtx.lock();
  threadMtx.unlock();

  /*
   * Call the child's run function...
   */
  ((SimpleMQTTServerThread*)selfPtr)->run();

  return NULL;
}

void SimpleMQTTServerAcceptThread::run()
{
#ifdef SimpleMQTTVerbose
  coutMtx.lock();
  cout << "Running SimpleMQTTServerAcceptThread for socket " << socket << "...\n";
  coutMtx.unlock();
#endif

  int newsock = -1;
  struct pollfd fd;

  while (!terminate) {
      /*
       * Wait for something to happen on the socket...
       */
      fd.fd = socket;
      fd.events = POLLIN | POLLPRI;
      fd.revents = 0;
      if(poll(&fd, 1, SimpleMQTTPollTimeout) > 0 && (fd.revents & (POLLIN | POLLPRI))) {

109
110
111
112
113
114
115
116
117
          /*
           * Find a free message thread to take over
           * the next incoming connection.
           */
          SimpleMQTTServerMessageThread *mt = NULL;
          for (boost::ptr_list<SimpleMQTTServerMessageThread>::iterator i = messageThreads.begin(); i != messageThreads.end(); i++) {
              if (i->hasCapacity())
                mt = &*i;
          }
118

119
120
121
122
123
124
125
126
127
128
129
130
          /*
           * In case no free message thread was found,
           * try to create a new one as long as we do
           * not exceed the maximum.
           */
          if (!mt) {
              if (messageThreads.size() >= SimpleMQTTMaxThreadsPerSocket) {
                  cout << "Warning: socket " << socket << " cannot accept more connections.\n";
                  // FIXME: There must be nicer ways to handle such situations...
                  sleep(1);
                  continue;
              }
131
              messageThreads.push_back(new SimpleMQTTServerMessageThread(messageCallback));
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
              continue;
          }

          /*
           * Take the next incoming connection.
           */
#ifdef SimpleMQTTVerbose
          coutMtx.lock();
          cout << "Thread (" << this << ") waiting in accept()...\n";
          coutMtx.unlock();
#endif
          newsock = accept(socket, NULL, 0);
          if (newsock != -1) {
              int opt = fcntl(newsock, F_GETFL, 0);
              if (opt == -1 || fcntl(newsock, F_SETFL, opt | O_NONBLOCK)==-1) {
                  close(newsock);
              }
              else {
                  mt->assignConnection(newsock);
              }
          }
153
154
155
156
      }
  }
}

157
void SimpleMQTTServerAcceptThread::setMessageCallback(SimpleMQTTMessageCallback callback)
158
{
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
  messageCallback = callback;
  /*
   * Set the function that will be called for each received
   * MQTT message and propagate to all message threads.
   */
  messageCallback = callback;
  for (boost::ptr_list<SimpleMQTTServerMessageThread>::iterator i = messageThreads.begin(); i != messageThreads.end(); i++) {
      (*i).setMessageCallback(callback);
  }
}

SimpleMQTTServerAcceptThread::SimpleMQTTServerAcceptThread(int listenSock, SimpleMQTTMessageCallback callback)
{
  /*
   * Assign socket and message callback.
   */
175
  socket = listenSock;
176
  messageCallback = callback;
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197

  /*
   * Release the lock to indicate that the constructor has
   * finished. This causes the launcher to call the run function.
   */
  threadMtx.unlock();
}

SimpleMQTTServerAcceptThread::~SimpleMQTTServerAcceptThread()
{
}

void SimpleMQTTServerMessageThread::run()
{
#ifdef SimpleMQTTVerbose
  coutMtx.lock();
  cout << "Running SimpleMQTTServerMessageThread (" << this << ")...\n";
  coutMtx.unlock();
#endif

  int numfds = -1;
198
  char inbuf[SimpleMQTTReadBufferSize];
199
200
201
202
203

  while (!terminate) {
      /*
       * Check for activity on our sockets...
       */
204
      fdsMtx.lock();
205
      numfds = poll(fds, SimpleMQTTConnectionsPerThread, SimpleMQTTPollTimeout);
206
      fdsMtx.unlock();
207
208
209
210
211
212
213
214
215

      if (numfds == -1)
        throw new system_error(errno, system_category(), "Error in poll().");

      /*
       * Apparently, there is work to do...
       */
      if (numfds > 0) {
          for (int connectionId=0; connectionId<SimpleMQTTConnectionsPerThread; connectionId++) {
216
217
218
219
220
221
222
              if (fds[connectionId].fd != -1) {
#ifdef SimpleMQTTVerbose
                  coutMtx.lock();
                  cout << "fd(" << fds[connectionId].fd << ") revents: " << hex << fds[connectionId].revents << "\n";
                  coutMtx.unlock();
#endif

223
                  if (fds[connectionId].revents & POLLIN) {
224
225
226
227
228
229
230
                      char* readPtr;
                      ssize_t rbytes, lbytes, bytes;

                      rbytes = read(fds[connectionId].fd, inbuf, SimpleMQTTReadBufferSize);

                      readPtr = inbuf;
                      lbytes = rbytes;
231
232

                      /*
233
                       * If read() returns 0, the connection was closed on the
234
235
                       * remote side. In this case, release it from our list.
                       */
236
                      if (rbytes == 0) {
237
238
                          releaseConnection(connectionId);
                      }
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268

                      while (lbytes > 0) {
                          /*
                           * Allocate new message if there is none.
                           */
                          if (!msg[connectionId]) {
                              msg[connectionId] = new SimpleMQTTMessage();
                              if (!msg[connectionId]) {
                                  cout << "Warning: Out of memory! Discarding network input!\n";
                                  continue;
                              }
#ifdef SimpleMQTTVerbose
                              coutMtx.lock();
                              cout << "Allocated new SimpleMQTTMessage (" << msg[connectionId] << ")...\n";
                              coutMtx.unlock();
#endif
                          }

                          /*
                           * Append received data to message.
                           */
                          bytes = msg[connectionId]->appendRawData(readPtr, lbytes);
                          readPtr += bytes;
                          lbytes -= bytes;

                          /*
                           * Check if message is complete.
                           */
                          if (msg[connectionId]->complete()) {
                              /*
269
270
271
272
                               * Forward message upstream!
                               * If there is a callback function, it is responsible
                               * for freeing the message using delete. Otherwise, we
                               * have to take care of this, here.
273
274
275
276
277
278
                               */
#ifdef SimpleMQTTVerbose
                              coutMtx.lock();
                              cout << "Completed receiving SimpleMQTTMessage (" << msg[connectionId] << ")...\n";
                              coutMtx.unlock();
#endif
279
280
281
282
283
284
                              if (messageCallback) {
                                  messageCallback(msg[connectionId]);
                              }
                              else {
                                  delete msg[connectionId];
                              }
285
286
287
                              msg[connectionId] = NULL;
                          }
                      }
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
                  }
              }
          }
      }
  }
}

bool SimpleMQTTServerMessageThread::hasCapacity()
{
  return numConnections <= SimpleMQTTConnectionsPerThread;
}

void SimpleMQTTServerMessageThread::assignConnection(int newsock)
{
  /*
   * Find the first free slot in fds and assign connection.
   */
  fdsMtx.lock();
  for (int i=0; i<SimpleMQTTConnectionsPerThread; i++) {
      if (fds[i].fd == -1) {
          fds[i].fd = newsock;
309
          fds[i].events = POLLIN | POLLPRI | POLLHUP;
310
311
312
313
          fds[i].revents = 0;

          numConnections++;

314
315
316
317
318
319
#ifdef SimpleMQTTVerbose
          coutMtx.lock();
          cout << "Assigned connection  (" << this << ", " << i << ", " << fds[i].fd << ")...\n";
          coutMtx.unlock();
#endif

320
321
322
323
324
325
326
327
328
329
330
331
          break;
      }
  }
  fdsMtx.unlock();
}

void SimpleMQTTServerMessageThread::releaseConnection(int connectionId)
{
  /*
   * Close the connection an clean up.
   */
  fdsMtx.lock();
332
  shutdown(fds[connectionId].fd, SHUT_RDWR);
333
334
335
336
337
338
339
  close(fds[connectionId].fd);
  fds[connectionId].fd = -1;
  fds[connectionId].events = 0;
  fds[connectionId].revents = 0;

  numConnections--;
  fdsMtx.unlock();
340
341
342
343
344
345

#ifdef SimpleMQTTVerbose
  coutMtx.lock();
  cout << "Released connection  (" << this << ", " << connectionId << ")...\n";
  coutMtx.unlock();
#endif
346
347
}

348
349
350
351
352
353
void SimpleMQTTServerMessageThread::setMessageCallback(SimpleMQTTMessageCallback callback)
{
  messageCallback = callback;
}

SimpleMQTTServerMessageThread::SimpleMQTTServerMessageThread(SimpleMQTTMessageCallback callback)
354
355
356
357
358
359
360
361
{
  /*
   * Clear the fds array. Warning: This will only work when the
   * size of the fds array is determined at compile time.
   */
  memset(fds, -1, sizeof(fds));

  /*
362
363
   * Initialize the number of active connections to 0 and set
   * the messageCallback function.
364
365
   */
  numConnections = 0;
366
  messageCallback = callback;
367
368
369
370
371
372
373
374
375
376
377
378

  /*
   * Release the lock to indicate that the constructor has
   * finished. This causes the launcher to call the run function.
   */
  threadMtx.unlock();
}

SimpleMQTTServerMessageThread::~SimpleMQTTServerMessageThread()
{
}