Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

simplemqttserverthread.cpp 12.2 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
/*
 * simplemqttserverthread.cpp
 *
 *  Created on: May 3, 2013
 *      Author: Axel Auweter
 */

#include "simplemqttserver.h"

using namespace std;
11
using namespace boost::system;
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89

static boost::mutex threadMtx;

#ifdef SimpleMQTTVerbose
static boost::mutex coutMtx;
#endif

SimpleMQTTServerThread::SimpleMQTTServerThread()
{
  /*
   * Start the thread at the 'launch' function. The 'launch'
   * function will take care of calling the child-specific
   * 'run' function which contains the thread's main loop.
   *
   * Special care must be taken not to call the run function
   * before the child has gone through its constructor. Thus,
   * we lock a mutex that will cause the 'launch' function to
   * wait until the mutex is released by the child's
   * constructor.
   */
  threadMtx.lock();

  terminate = false;
  if (pthread_create(&t, NULL, launch, this) != 0) {
      cout << "Error creating new MQTT server thread.\n";
      exit(EXIT_FAILURE);
  }

#ifdef SimpleMQTTVerbose
  coutMtx.lock();
  cout << "Started Thread (" << t << ") of class (" << this << ")...\n";
  coutMtx.unlock();
#endif
}

SimpleMQTTServerThread::~SimpleMQTTServerThread()
{
  /*
   * Terminate the thread and join it to ensure proper
   * thread termination before the class is destroyed.
   */
  terminate = true;
  if (pthread_join(t, NULL) != 0) {
      cout << "Error joining thread.\n";
      exit(EXIT_FAILURE);
  }

#ifdef SimpleMQTTVerbose
  coutMtx.lock();
  cout << "Terminated Thread (" << t << ") of class (" << this << ")...\n";
  coutMtx.unlock();
#endif
}

void* SimpleMQTTServerThread::launch(void *selfPtr)
{
#ifdef SimpleMQTTVerbose
  coutMtx.lock();
  cout << "Running launcher for class (" << selfPtr << ")...\n";
  coutMtx.unlock();
#endif

  /*
   * The following lines guard the run function to be called
   * before the constructor of the child class has finished.
   * The lock is released at the end of the child's constructor.
   */
  threadMtx.lock();
  threadMtx.unlock();

  /*
   * Call the child's run function...
   */
  ((SimpleMQTTServerThread*)selfPtr)->run();

  return NULL;
}

90
91
92
93
94
95
96
97
98
99
100
void SimpleMQTTServerThread::yield()
{
  /*
   * Yield this thread's execution to give way to other threads.
   * This is being used in the SimpleMQTTServer to allow the
   * accept thread acquiring the fdsMtx lock. Otherwise, in
   * particular on single-CPU systems (or when the acceptThread
   * and the messageThread share a core), waiting connections
   * may have to wait a long time before the lock will be given
   * to the acceptThread.
   */
101
  boost::this_thread::yield();
102
103
}

104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
void SimpleMQTTServerAcceptThread::run()
{
#ifdef SimpleMQTTVerbose
  coutMtx.lock();
  cout << "Running SimpleMQTTServerAcceptThread for socket " << socket << "...\n";
  coutMtx.unlock();
#endif

  int newsock = -1;
  struct pollfd fd;

  while (!terminate) {
      /*
       * Wait for something to happen on the socket...
       */
      fd.fd = socket;
      fd.events = POLLIN | POLLPRI;
      fd.revents = 0;
      if(poll(&fd, 1, SimpleMQTTPollTimeout) > 0 && (fd.revents & (POLLIN | POLLPRI))) {

124
125
126
127
128
129
130
131
132
          /*
           * Find a free message thread to take over
           * the next incoming connection.
           */
          SimpleMQTTServerMessageThread *mt = NULL;
          for (boost::ptr_list<SimpleMQTTServerMessageThread>::iterator i = messageThreads.begin(); i != messageThreads.end(); i++) {
              if (i->hasCapacity())
                mt = &*i;
          }
133

134
135
136
137
138
139
140
141
142
143
144
145
          /*
           * In case no free message thread was found,
           * try to create a new one as long as we do
           * not exceed the maximum.
           */
          if (!mt) {
              if (messageThreads.size() >= SimpleMQTTMaxThreadsPerSocket) {
                  cout << "Warning: socket " << socket << " cannot accept more connections.\n";
                  // FIXME: There must be nicer ways to handle such situations...
                  sleep(1);
                  continue;
              }
146
              messageThreads.push_back(new SimpleMQTTServerMessageThread(messageCallback));
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
              continue;
          }

          /*
           * Take the next incoming connection.
           */
#ifdef SimpleMQTTVerbose
          coutMtx.lock();
          cout << "Thread (" << this << ") waiting in accept()...\n";
          coutMtx.unlock();
#endif
          newsock = accept(socket, NULL, 0);
          if (newsock != -1) {
              int opt = fcntl(newsock, F_GETFL, 0);
              if (opt == -1 || fcntl(newsock, F_SETFL, opt | O_NONBLOCK)==-1) {
162
163
164
165
166
#ifdef SimpleMQTTVerbose
          coutMtx.lock();
          cout << "Setting socket to non-blocking in thread (" << this << ") failed for socket " << newsock << "...\n";
          coutMtx.unlock();
#endif
167
168
169
                  close(newsock);
              }
              else {
170
171
172
173
174
#ifdef SimpleMQTTVerbose
          coutMtx.lock();
          cout << "Successfully set socket " << newsock << " nonblocing in thread (" << this << ")...\n";
          coutMtx.unlock();
#endif
175
176
177
                  mt->assignConnection(newsock);
              }
          }
178
179
180
181
182
183
184
          else {
#ifdef SimpleMQTTVerbose
          coutMtx.lock();
          cout << "Accept() in thread (" << this << ") returned -1...\n";
          coutMtx.unlock();
#endif
          }
185
186
187
188
      }
  }
}

189
void SimpleMQTTServerAcceptThread::setMessageCallback(SimpleMQTTMessageCallback callback)
190
{
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
  messageCallback = callback;
  /*
   * Set the function that will be called for each received
   * MQTT message and propagate to all message threads.
   */
  messageCallback = callback;
  for (boost::ptr_list<SimpleMQTTServerMessageThread>::iterator i = messageThreads.begin(); i != messageThreads.end(); i++) {
      (*i).setMessageCallback(callback);
  }
}

SimpleMQTTServerAcceptThread::SimpleMQTTServerAcceptThread(int listenSock, SimpleMQTTMessageCallback callback)
{
  /*
   * Assign socket and message callback.
   */
207
  socket = listenSock;
208
  messageCallback = callback;
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229

  /*
   * Release the lock to indicate that the constructor has
   * finished. This causes the launcher to call the run function.
   */
  threadMtx.unlock();
}

SimpleMQTTServerAcceptThread::~SimpleMQTTServerAcceptThread()
{
}

void SimpleMQTTServerMessageThread::run()
{
#ifdef SimpleMQTTVerbose
  coutMtx.lock();
  cout << "Running SimpleMQTTServerMessageThread (" << this << ")...\n";
  coutMtx.unlock();
#endif

  int numfds = -1;
230
  char inbuf[SimpleMQTTReadBufferSize];
231
232
233
234
235

  while (!terminate) {
      /*
       * Check for activity on our sockets...
       */
236
      fdsMtx.lock();
237
      numfds = poll(fds, SimpleMQTTConnectionsPerThread, SimpleMQTTPollTimeout);
238
      fdsMtx.unlock();
239

240
241
242
243
244
245
246
247
      /*
       * Allow other threads to run (avoid instantaneous
       * re-acquisition of the fdsMtx lock in the next
       * iteration while connections are pending in the
       * acceptThread).
       */
      yield();

248
249
250
251
252
253
254
255
      if (numfds == -1)
        throw new system_error(errno, system_category(), "Error in poll().");

      /*
       * Apparently, there is work to do...
       */
      if (numfds > 0) {
          for (int connectionId=0; connectionId<SimpleMQTTConnectionsPerThread; connectionId++) {
256
257
258
259
260
261
262
              if (fds[connectionId].fd != -1) {
#ifdef SimpleMQTTVerbose
                  coutMtx.lock();
                  cout << "fd(" << fds[connectionId].fd << ") revents: " << hex << fds[connectionId].revents << "\n";
                  coutMtx.unlock();
#endif

263
                  if (fds[connectionId].revents & POLLIN) {
264
265
266
267
268
269
270
                      char* readPtr;
                      ssize_t rbytes, lbytes, bytes;

                      rbytes = read(fds[connectionId].fd, inbuf, SimpleMQTTReadBufferSize);

                      readPtr = inbuf;
                      lbytes = rbytes;
271
272

                      /*
273
                       * If read() returns 0, the connection was closed on the
274
275
                       * remote side. In this case, release it from our list.
                       */
276
                      if (rbytes == 0) {
277
278
                          releaseConnection(connectionId);
                      }
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308

                      while (lbytes > 0) {
                          /*
                           * Allocate new message if there is none.
                           */
                          if (!msg[connectionId]) {
                              msg[connectionId] = new SimpleMQTTMessage();
                              if (!msg[connectionId]) {
                                  cout << "Warning: Out of memory! Discarding network input!\n";
                                  continue;
                              }
#ifdef SimpleMQTTVerbose
                              coutMtx.lock();
                              cout << "Allocated new SimpleMQTTMessage (" << msg[connectionId] << ")...\n";
                              coutMtx.unlock();
#endif
                          }

                          /*
                           * Append received data to message.
                           */
                          bytes = msg[connectionId]->appendRawData(readPtr, lbytes);
                          readPtr += bytes;
                          lbytes -= bytes;

                          /*
                           * Check if message is complete.
                           */
                          if (msg[connectionId]->complete()) {
                              /*
309
310
311
312
                               * Forward message upstream!
                               * If there is a callback function, it is responsible
                               * for freeing the message using delete. Otherwise, we
                               * have to take care of this, here.
313
314
315
316
317
318
                               */
#ifdef SimpleMQTTVerbose
                              coutMtx.lock();
                              cout << "Completed receiving SimpleMQTTMessage (" << msg[connectionId] << ")...\n";
                              coutMtx.unlock();
#endif
319
320
321
322
323
324
                              if (messageCallback) {
                                  messageCallback(msg[connectionId]);
                              }
                              else {
                                  delete msg[connectionId];
                              }
325
326
327
                              msg[connectionId] = NULL;
                          }
                      }
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
                  }
              }
          }
      }
  }
}

bool SimpleMQTTServerMessageThread::hasCapacity()
{
  return numConnections <= SimpleMQTTConnectionsPerThread;
}

void SimpleMQTTServerMessageThread::assignConnection(int newsock)
{
  /*
   * Find the first free slot in fds and assign connection.
   */
  fdsMtx.lock();
  for (int i=0; i<SimpleMQTTConnectionsPerThread; i++) {
      if (fds[i].fd == -1) {
          fds[i].fd = newsock;
349
          fds[i].events = POLLIN | POLLPRI | POLLHUP;
350
351
352
353
          fds[i].revents = 0;

          numConnections++;

354
355
356
357
358
359
#ifdef SimpleMQTTVerbose
          coutMtx.lock();
          cout << "Assigned connection  (" << this << ", " << i << ", " << fds[i].fd << ")...\n";
          coutMtx.unlock();
#endif

360
361
362
363
364
365
366
367
368
369
370
371
          break;
      }
  }
  fdsMtx.unlock();
}

void SimpleMQTTServerMessageThread::releaseConnection(int connectionId)
{
  /*
   * Close the connection an clean up.
   */
  fdsMtx.lock();
372
  shutdown(fds[connectionId].fd, SHUT_RDWR);
373
374
375
376
377
378
379
  close(fds[connectionId].fd);
  fds[connectionId].fd = -1;
  fds[connectionId].events = 0;
  fds[connectionId].revents = 0;

  numConnections--;
  fdsMtx.unlock();
380
381
382
383
384
385

#ifdef SimpleMQTTVerbose
  coutMtx.lock();
  cout << "Released connection  (" << this << ", " << connectionId << ")...\n";
  coutMtx.unlock();
#endif
386
387
}

388
389
390
391
392
393
void SimpleMQTTServerMessageThread::setMessageCallback(SimpleMQTTMessageCallback callback)
{
  messageCallback = callback;
}

SimpleMQTTServerMessageThread::SimpleMQTTServerMessageThread(SimpleMQTTMessageCallback callback)
394
395
396
397
398
399
400
401
{
  /*
   * Clear the fds array. Warning: This will only work when the
   * size of the fds array is determined at compile time.
   */
  memset(fds, -1, sizeof(fds));

  /*
402
403
   * Initialize the number of active connections to 0 and set
   * the messageCallback function.
404
405
   */
  numConnections = 0;
406
  messageCallback = callback;
407
408
409
410
411
412
413
414
415
416
417
418

  /*
   * Release the lock to indicate that the constructor has
   * finished. This causes the launcher to call the run function.
   */
  threadMtx.unlock();
}

SimpleMQTTServerMessageThread::~SimpleMQTTServerMessageThread()
{
}