Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

simplemqttserverthread.cpp 10.9 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
/*
 * simplemqttserverthread.cpp
 *
 *  Created on: May 3, 2013
 *      Author: Axel Auweter
 */

#include "simplemqttserver.h"

using namespace std;
11
using namespace boost::system;
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109

static boost::mutex threadMtx;

#ifdef SimpleMQTTVerbose
static boost::mutex coutMtx;
#endif

SimpleMQTTServerThread::SimpleMQTTServerThread()
{
  /*
   * Start the thread at the 'launch' function. The 'launch'
   * function will take care of calling the child-specific
   * 'run' function which contains the thread's main loop.
   *
   * Special care must be taken not to call the run function
   * before the child has gone through its constructor. Thus,
   * we lock a mutex that will cause the 'launch' function to
   * wait until the mutex is released by the child's
   * constructor.
   */
  threadMtx.lock();

  terminate = false;
  if (pthread_create(&t, NULL, launch, this) != 0) {
      cout << "Error creating new MQTT server thread.\n";
      exit(EXIT_FAILURE);
  }

#ifdef SimpleMQTTVerbose
  coutMtx.lock();
  cout << "Started Thread (" << t << ") of class (" << this << ")...\n";
  coutMtx.unlock();
#endif
}

SimpleMQTTServerThread::~SimpleMQTTServerThread()
{
  /*
   * Terminate the thread and join it to ensure proper
   * thread termination before the class is destroyed.
   */
  terminate = true;
  if (pthread_join(t, NULL) != 0) {
      cout << "Error joining thread.\n";
      exit(EXIT_FAILURE);
  }

#ifdef SimpleMQTTVerbose
  coutMtx.lock();
  cout << "Terminated Thread (" << t << ") of class (" << this << ")...\n";
  coutMtx.unlock();
#endif
}

void* SimpleMQTTServerThread::launch(void *selfPtr)
{
#ifdef SimpleMQTTVerbose
  coutMtx.lock();
  cout << "Running launcher for class (" << selfPtr << ")...\n";
  coutMtx.unlock();
#endif

  /*
   * The following lines guard the run function to be called
   * before the constructor of the child class has finished.
   * The lock is released at the end of the child's constructor.
   */
  threadMtx.lock();
  threadMtx.unlock();

  /*
   * Call the child's run function...
   */
  ((SimpleMQTTServerThread*)selfPtr)->run();

  return NULL;
}

void SimpleMQTTServerAcceptThread::run()
{
#ifdef SimpleMQTTVerbose
  coutMtx.lock();
  cout << "Running SimpleMQTTServerAcceptThread for socket " << socket << "...\n";
  coutMtx.unlock();
#endif

  int newsock = -1;
  struct pollfd fd;

  while (!terminate) {
      /*
       * Wait for something to happen on the socket...
       */
      fd.fd = socket;
      fd.events = POLLIN | POLLPRI;
      fd.revents = 0;
      if(poll(&fd, 1, SimpleMQTTPollTimeout) > 0 && (fd.revents & (POLLIN | POLLPRI))) {

110
111
112
113
114
115
116
117
118
          /*
           * Find a free message thread to take over
           * the next incoming connection.
           */
          SimpleMQTTServerMessageThread *mt = NULL;
          for (boost::ptr_list<SimpleMQTTServerMessageThread>::iterator i = messageThreads.begin(); i != messageThreads.end(); i++) {
              if (i->hasCapacity())
                mt = &*i;
          }
119

120
121
122
123
124
125
126
127
128
129
130
131
          /*
           * In case no free message thread was found,
           * try to create a new one as long as we do
           * not exceed the maximum.
           */
          if (!mt) {
              if (messageThreads.size() >= SimpleMQTTMaxThreadsPerSocket) {
                  cout << "Warning: socket " << socket << " cannot accept more connections.\n";
                  // FIXME: There must be nicer ways to handle such situations...
                  sleep(1);
                  continue;
              }
132
              messageThreads.push_back(new SimpleMQTTServerMessageThread(messageCallback));
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
              continue;
          }

          /*
           * Take the next incoming connection.
           */
#ifdef SimpleMQTTVerbose
          coutMtx.lock();
          cout << "Thread (" << this << ") waiting in accept()...\n";
          coutMtx.unlock();
#endif
          newsock = accept(socket, NULL, 0);
          if (newsock != -1) {
              int opt = fcntl(newsock, F_GETFL, 0);
              if (opt == -1 || fcntl(newsock, F_SETFL, opt | O_NONBLOCK)==-1) {
                  close(newsock);
              }
              else {
                  mt->assignConnection(newsock);
              }
          }
154
155
156
157
      }
  }
}

158
void SimpleMQTTServerAcceptThread::setMessageCallback(SimpleMQTTMessageCallback callback)
159
{
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
  messageCallback = callback;
  /*
   * Set the function that will be called for each received
   * MQTT message and propagate to all message threads.
   */
  messageCallback = callback;
  for (boost::ptr_list<SimpleMQTTServerMessageThread>::iterator i = messageThreads.begin(); i != messageThreads.end(); i++) {
      (*i).setMessageCallback(callback);
  }
}

SimpleMQTTServerAcceptThread::SimpleMQTTServerAcceptThread(int listenSock, SimpleMQTTMessageCallback callback)
{
  /*
   * Assign socket and message callback.
   */
176
  socket = listenSock;
177
  messageCallback = callback;
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198

  /*
   * Release the lock to indicate that the constructor has
   * finished. This causes the launcher to call the run function.
   */
  threadMtx.unlock();
}

SimpleMQTTServerAcceptThread::~SimpleMQTTServerAcceptThread()
{
}

void SimpleMQTTServerMessageThread::run()
{
#ifdef SimpleMQTTVerbose
  coutMtx.lock();
  cout << "Running SimpleMQTTServerMessageThread (" << this << ")...\n";
  coutMtx.unlock();
#endif

  int numfds = -1;
199
  char inbuf[SimpleMQTTReadBufferSize];
200
201
202
203
204

  while (!terminate) {
      /*
       * Check for activity on our sockets...
       */
205
      fdsMtx.lock();
206
      numfds = poll(fds, SimpleMQTTConnectionsPerThread, SimpleMQTTPollTimeout);
207
      fdsMtx.unlock();
208
209
210
211
212
213
214
215
216

      if (numfds == -1)
        throw new system_error(errno, system_category(), "Error in poll().");

      /*
       * Apparently, there is work to do...
       */
      if (numfds > 0) {
          for (int connectionId=0; connectionId<SimpleMQTTConnectionsPerThread; connectionId++) {
217
218
219
220
221
222
223
              if (fds[connectionId].fd != -1) {
#ifdef SimpleMQTTVerbose
                  coutMtx.lock();
                  cout << "fd(" << fds[connectionId].fd << ") revents: " << hex << fds[connectionId].revents << "\n";
                  coutMtx.unlock();
#endif

224
                  if (fds[connectionId].revents & POLLIN) {
225
226
227
228
229
230
231
                      char* readPtr;
                      ssize_t rbytes, lbytes, bytes;

                      rbytes = read(fds[connectionId].fd, inbuf, SimpleMQTTReadBufferSize);

                      readPtr = inbuf;
                      lbytes = rbytes;
232
233

                      /*
234
                       * If read() returns 0, the connection was closed on the
235
236
                       * remote side. In this case, release it from our list.
                       */
237
                      if (rbytes == 0) {
238
239
                          releaseConnection(connectionId);
                      }
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269

                      while (lbytes > 0) {
                          /*
                           * Allocate new message if there is none.
                           */
                          if (!msg[connectionId]) {
                              msg[connectionId] = new SimpleMQTTMessage();
                              if (!msg[connectionId]) {
                                  cout << "Warning: Out of memory! Discarding network input!\n";
                                  continue;
                              }
#ifdef SimpleMQTTVerbose
                              coutMtx.lock();
                              cout << "Allocated new SimpleMQTTMessage (" << msg[connectionId] << ")...\n";
                              coutMtx.unlock();
#endif
                          }

                          /*
                           * Append received data to message.
                           */
                          bytes = msg[connectionId]->appendRawData(readPtr, lbytes);
                          readPtr += bytes;
                          lbytes -= bytes;

                          /*
                           * Check if message is complete.
                           */
                          if (msg[connectionId]->complete()) {
                              /*
270
271
272
273
                               * Forward message upstream!
                               * If there is a callback function, it is responsible
                               * for freeing the message using delete. Otherwise, we
                               * have to take care of this, here.
274
275
276
277
278
279
                               */
#ifdef SimpleMQTTVerbose
                              coutMtx.lock();
                              cout << "Completed receiving SimpleMQTTMessage (" << msg[connectionId] << ")...\n";
                              coutMtx.unlock();
#endif
280
281
282
283
284
285
                              if (messageCallback) {
                                  messageCallback(msg[connectionId]);
                              }
                              else {
                                  delete msg[connectionId];
                              }
286
287
288
                              msg[connectionId] = NULL;
                          }
                      }
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
                  }
              }
          }
      }
  }
}

bool SimpleMQTTServerMessageThread::hasCapacity()
{
  return numConnections <= SimpleMQTTConnectionsPerThread;
}

void SimpleMQTTServerMessageThread::assignConnection(int newsock)
{
  /*
   * Find the first free slot in fds and assign connection.
   */
  fdsMtx.lock();
  for (int i=0; i<SimpleMQTTConnectionsPerThread; i++) {
      if (fds[i].fd == -1) {
          fds[i].fd = newsock;
310
          fds[i].events = POLLIN | POLLPRI | POLLHUP;
311
312
313
314
          fds[i].revents = 0;

          numConnections++;

315
316
317
318
319
320
#ifdef SimpleMQTTVerbose
          coutMtx.lock();
          cout << "Assigned connection  (" << this << ", " << i << ", " << fds[i].fd << ")...\n";
          coutMtx.unlock();
#endif

321
322
323
324
325
326
327
328
329
330
331
332
          break;
      }
  }
  fdsMtx.unlock();
}

void SimpleMQTTServerMessageThread::releaseConnection(int connectionId)
{
  /*
   * Close the connection an clean up.
   */
  fdsMtx.lock();
333
  shutdown(fds[connectionId].fd, SHUT_RDWR);
334
335
336
337
338
339
340
  close(fds[connectionId].fd);
  fds[connectionId].fd = -1;
  fds[connectionId].events = 0;
  fds[connectionId].revents = 0;

  numConnections--;
  fdsMtx.unlock();
341
342
343
344
345
346

#ifdef SimpleMQTTVerbose
  coutMtx.lock();
  cout << "Released connection  (" << this << ", " << connectionId << ")...\n";
  coutMtx.unlock();
#endif
347
348
}

349
350
351
352
353
354
void SimpleMQTTServerMessageThread::setMessageCallback(SimpleMQTTMessageCallback callback)
{
  messageCallback = callback;
}

SimpleMQTTServerMessageThread::SimpleMQTTServerMessageThread(SimpleMQTTMessageCallback callback)
355
356
357
358
359
360
361
362
{
  /*
   * Clear the fds array. Warning: This will only work when the
   * size of the fds array is determined at compile time.
   */
  memset(fds, -1, sizeof(fds));

  /*
363
364
   * Initialize the number of active connections to 0 and set
   * the messageCallback function.
365
366
   */
  numConnections = 0;
367
  messageCallback = callback;
368
369
370
371
372
373
374
375
376
377
378
379

  /*
   * Release the lock to indicate that the constructor has
   * finished. This causes the launcher to call the run function.
   */
  threadMtx.unlock();
}

SimpleMQTTServerMessageThread::~SimpleMQTTServerMessageThread()
{
}