Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

simplemqttserverthread.cpp 12.5 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
/*
 * simplemqttserverthread.cpp
 *
 *  Created on: May 3, 2013
 *      Author: Axel Auweter
 */

#include "simplemqttserver.h"

using namespace std;
11
using namespace boost::system;
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89

static boost::mutex threadMtx;

#ifdef SimpleMQTTVerbose
static boost::mutex coutMtx;
#endif

SimpleMQTTServerThread::SimpleMQTTServerThread()
{
  /*
   * Start the thread at the 'launch' function. The 'launch'
   * function will take care of calling the child-specific
   * 'run' function which contains the thread's main loop.
   *
   * Special care must be taken not to call the run function
   * before the child has gone through its constructor. Thus,
   * we lock a mutex that will cause the 'launch' function to
   * wait until the mutex is released by the child's
   * constructor.
   */
  threadMtx.lock();

  terminate = false;
  if (pthread_create(&t, NULL, launch, this) != 0) {
      cout << "Error creating new MQTT server thread.\n";
      exit(EXIT_FAILURE);
  }

#ifdef SimpleMQTTVerbose
  coutMtx.lock();
  cout << "Started Thread (" << t << ") of class (" << this << ")...\n";
  coutMtx.unlock();
#endif
}

SimpleMQTTServerThread::~SimpleMQTTServerThread()
{
  /*
   * Terminate the thread and join it to ensure proper
   * thread termination before the class is destroyed.
   */
  terminate = true;
  if (pthread_join(t, NULL) != 0) {
      cout << "Error joining thread.\n";
      exit(EXIT_FAILURE);
  }

#ifdef SimpleMQTTVerbose
  coutMtx.lock();
  cout << "Terminated Thread (" << t << ") of class (" << this << ")...\n";
  coutMtx.unlock();
#endif
}

void* SimpleMQTTServerThread::launch(void *selfPtr)
{
#ifdef SimpleMQTTVerbose
  coutMtx.lock();
  cout << "Running launcher for class (" << selfPtr << ")...\n";
  coutMtx.unlock();
#endif

  /*
   * The following lines guard the run function to be called
   * before the constructor of the child class has finished.
   * The lock is released at the end of the child's constructor.
   */
  threadMtx.lock();
  threadMtx.unlock();

  /*
   * Call the child's run function...
   */
  ((SimpleMQTTServerThread*)selfPtr)->run();

  return NULL;
}

90
91
92
93
94
95
96
97
98
99
100
void SimpleMQTTServerThread::yield()
{
  /*
   * Yield this thread's execution to give way to other threads.
   * This is being used in the SimpleMQTTServer to allow the
   * accept thread acquiring the fdsMtx lock. Otherwise, in
   * particular on single-CPU systems (or when the acceptThread
   * and the messageThread share a core), waiting connections
   * may have to wait a long time before the lock will be given
   * to the acceptThread.
   */
101
  boost::this_thread::yield();
102
103
}

104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
void SimpleMQTTServerAcceptThread::run()
{
#ifdef SimpleMQTTVerbose
  coutMtx.lock();
  cout << "Running SimpleMQTTServerAcceptThread for socket " << socket << "...\n";
  coutMtx.unlock();
#endif

  int newsock = -1;
  struct pollfd fd;

  while (!terminate) {
      /*
       * Wait for something to happen on the socket...
       */
      fd.fd = socket;
      fd.events = POLLIN | POLLPRI;
      fd.revents = 0;
      if(poll(&fd, 1, SimpleMQTTPollTimeout) > 0 && (fd.revents & (POLLIN | POLLPRI))) {

124
125
126
127
128
129
130
131
132
          /*
           * Find a free message thread to take over
           * the next incoming connection.
           */
          SimpleMQTTServerMessageThread *mt = NULL;
          for (boost::ptr_list<SimpleMQTTServerMessageThread>::iterator i = messageThreads.begin(); i != messageThreads.end(); i++) {
              if (i->hasCapacity())
                mt = &*i;
          }
133

134
135
136
137
138
139
140
141
142
143
144
145
          /*
           * In case no free message thread was found,
           * try to create a new one as long as we do
           * not exceed the maximum.
           */
          if (!mt) {
              if (messageThreads.size() >= SimpleMQTTMaxThreadsPerSocket) {
                  cout << "Warning: socket " << socket << " cannot accept more connections.\n";
                  // FIXME: There must be nicer ways to handle such situations...
                  sleep(1);
                  continue;
              }
146
              messageThreads.push_back(new SimpleMQTTServerMessageThread(messageCallback));
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
              continue;
          }

          /*
           * Take the next incoming connection.
           */
#ifdef SimpleMQTTVerbose
          coutMtx.lock();
          cout << "Thread (" << this << ") waiting in accept()...\n";
          coutMtx.unlock();
#endif
          newsock = accept(socket, NULL, 0);
          if (newsock != -1) {
              int opt = fcntl(newsock, F_GETFL, 0);
              if (opt == -1 || fcntl(newsock, F_SETFL, opt | O_NONBLOCK)==-1) {
162
163
164
165
166
#ifdef SimpleMQTTVerbose
          coutMtx.lock();
          cout << "Setting socket to non-blocking in thread (" << this << ") failed for socket " << newsock << "...\n";
          coutMtx.unlock();
#endif
167
168
169
                  close(newsock);
              }
              else {
170
171
172
173
174
#ifdef SimpleMQTTVerbose
          coutMtx.lock();
          cout << "Successfully set socket " << newsock << " nonblocing in thread (" << this << ")...\n";
          coutMtx.unlock();
#endif
175
176
177
                  mt->assignConnection(newsock);
              }
          }
178
179
180
181
182
183
184
          else {
#ifdef SimpleMQTTVerbose
          coutMtx.lock();
          cout << "Accept() in thread (" << this << ") returned -1...\n";
          coutMtx.unlock();
#endif
          }
185
186
187
188
      }
  }
}

189
void SimpleMQTTServerAcceptThread::setMessageCallback(SimpleMQTTMessageCallback callback)
190
{
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
  /*
   * Set the function that will be called for each received
   * MQTT message and propagate to all message threads.
   */
  messageCallback = callback;
  for (boost::ptr_list<SimpleMQTTServerMessageThread>::iterator i = messageThreads.begin(); i != messageThreads.end(); i++) {
      (*i).setMessageCallback(callback);
  }
}

SimpleMQTTServerAcceptThread::SimpleMQTTServerAcceptThread(int listenSock, SimpleMQTTMessageCallback callback)
{
  /*
   * Assign socket and message callback.
   */
206
  socket = listenSock;
207
  messageCallback = callback;
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228

  /*
   * Release the lock to indicate that the constructor has
   * finished. This causes the launcher to call the run function.
   */
  threadMtx.unlock();
}

SimpleMQTTServerAcceptThread::~SimpleMQTTServerAcceptThread()
{
}

void SimpleMQTTServerMessageThread::run()
{
#ifdef SimpleMQTTVerbose
  coutMtx.lock();
  cout << "Running SimpleMQTTServerMessageThread (" << this << ")...\n";
  coutMtx.unlock();
#endif

  int numfds = -1;
229
  char inbuf[SimpleMQTTReadBufferSize];
230
231
232
233
234

  while (!terminate) {
      /*
       * Check for activity on our sockets...
       */
235
      fdsMtx.lock();
236
      numfds = poll(fds, SimpleMQTTConnectionsPerThread, SimpleMQTTPollTimeout);
237
      fdsMtx.unlock();
238

239
240
241
242
243
244
245
246
      /*
       * Allow other threads to run (avoid instantaneous
       * re-acquisition of the fdsMtx lock in the next
       * iteration while connections are pending in the
       * acceptThread).
       */
      yield();

247
248
249
250
251
252
253
254
      if (numfds == -1)
        throw new system_error(errno, system_category(), "Error in poll().");

      /*
       * Apparently, there is work to do...
       */
      if (numfds > 0) {
          for (int connectionId=0; connectionId<SimpleMQTTConnectionsPerThread; connectionId++) {
255
256
257
258
259
260
261
              if (fds[connectionId].fd != -1) {
#ifdef SimpleMQTTVerbose
                  coutMtx.lock();
                  cout << "fd(" << fds[connectionId].fd << ") revents: " << hex << fds[connectionId].revents << "\n";
                  coutMtx.unlock();
#endif

262
                  if (fds[connectionId].revents & POLLIN) {
263
264
265
266
267
268
269
                      char* readPtr;
                      ssize_t rbytes, lbytes, bytes;

                      rbytes = read(fds[connectionId].fd, inbuf, SimpleMQTTReadBufferSize);

                      readPtr = inbuf;
                      lbytes = rbytes;
270
271

                      /*
272
                       * If read() returns 0, the connection was closed on the
273
274
                       * remote side. In this case, release it from our list.
                       */
275
                      if (rbytes == 0) {
276
277
                          releaseConnection(connectionId);
                      }
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307

                      while (lbytes > 0) {
                          /*
                           * Allocate new message if there is none.
                           */
                          if (!msg[connectionId]) {
                              msg[connectionId] = new SimpleMQTTMessage();
                              if (!msg[connectionId]) {
                                  cout << "Warning: Out of memory! Discarding network input!\n";
                                  continue;
                              }
#ifdef SimpleMQTTVerbose
                              coutMtx.lock();
                              cout << "Allocated new SimpleMQTTMessage (" << msg[connectionId] << ")...\n";
                              coutMtx.unlock();
#endif
                          }

                          /*
                           * Append received data to message.
                           */
                          bytes = msg[connectionId]->appendRawData(readPtr, lbytes);
                          readPtr += bytes;
                          lbytes -= bytes;

                          /*
                           * Check if message is complete.
                           */
                          if (msg[connectionId]->complete()) {
                              /*
308
309
310
311
                               * Forward message upstream!
                               * If there is a callback function, it is responsible
                               * for freeing the message using delete. Otherwise, we
                               * have to take care of this, here.
312
313
314
315
316
317
                               */
#ifdef SimpleMQTTVerbose
                              coutMtx.lock();
                              cout << "Completed receiving SimpleMQTTMessage (" << msg[connectionId] << ")...\n";
                              coutMtx.unlock();
#endif
318
319
320
321
322
323
                              if (messageCallback) {
                                  messageCallback(msg[connectionId]);
                              }
                              else {
                                  delete msg[connectionId];
                              }
324
325
326
                              msg[connectionId] = NULL;
                          }
                      }
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
                  }
              }
          }
      }
  }
}

bool SimpleMQTTServerMessageThread::hasCapacity()
{
  return numConnections <= SimpleMQTTConnectionsPerThread;
}

void SimpleMQTTServerMessageThread::assignConnection(int newsock)
{
  /*
   * Find the first free slot in fds and assign connection.
   */
  fdsMtx.lock();
  for (int i=0; i<SimpleMQTTConnectionsPerThread; i++) {
      if (fds[i].fd == -1) {
347
          fds[i].events = POLLIN | POLLPRI | POLLHUP;
348
          fds[i].revents = 0;
Axel Auweter's avatar
Axel Auweter committed
349
          fds[i].fd = newsock;
350
351
352

          numConnections++;

353
354
355
356
357
358
#ifdef SimpleMQTTVerbose
          coutMtx.lock();
          cout << "Assigned connection  (" << this << ", " << i << ", " << fds[i].fd << ")...\n";
          coutMtx.unlock();
#endif

359
360
361
362
363
364
365
366
367
368
369
370
          break;
      }
  }
  fdsMtx.unlock();
}

void SimpleMQTTServerMessageThread::releaseConnection(int connectionId)
{
  /*
   * Close the connection an clean up.
   */
  fdsMtx.lock();
371
  shutdown(fds[connectionId].fd, SHUT_RDWR);
372
373
374
375
376
  close(fds[connectionId].fd);
  fds[connectionId].fd = -1;
  fds[connectionId].events = 0;
  fds[connectionId].revents = 0;

377
378
379
380
  /*
   * If the connection was closed while we were receiving
   * a message, delete the corresponding message object.
   */
381
382
383
384
385
  if(msg[connectionId]) {
    delete msg[connectionId];
    msg[connectionId] = NULL;
  }

386
387
  numConnections--;
  fdsMtx.unlock();
388
389
390
391
392
393

#ifdef SimpleMQTTVerbose
  coutMtx.lock();
  cout << "Released connection  (" << this << ", " << connectionId << ")...\n";
  coutMtx.unlock();
#endif
394
395
}

396
397
398
399
400
401
void SimpleMQTTServerMessageThread::setMessageCallback(SimpleMQTTMessageCallback callback)
{
  messageCallback = callback;
}

SimpleMQTTServerMessageThread::SimpleMQTTServerMessageThread(SimpleMQTTMessageCallback callback)
402
403
404
405
406
407
408
{
  /*
   * Clear the fds array. Warning: This will only work when the
   * size of the fds array is determined at compile time.
   */
  memset(fds, -1, sizeof(fds));

Axel Auweter's avatar
Axel Auweter committed
409
410
411
412
413
414
  /*
   * Clear the msg array. Warning: This will only work when the
   * size of the msg array is determined at compile time.
   */
  memset(msg, 0, sizeof(msg));

415
  /*
416
417
   * Initialize the number of active connections to 0 and set
   * the messageCallback function.
418
419
   */
  numConnections = 0;
420
  messageCallback = callback;
421
422
423
424
425
426
427
428
429
430
431
432

  /*
   * Release the lock to indicate that the constructor has
   * finished. This causes the launcher to call the run function.
   */
  threadMtx.unlock();
}

SimpleMQTTServerMessageThread::~SimpleMQTTServerMessageThread()
{
}