Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

simplemqttserver.cpp 5.16 KB
Newer Older
1
2
3
4
5
6
7
8
9
/*
 * simplemqttserver.cpp
 *
 *  Created on: Apr 5, 2013
 *      Author: Axel Auweter
 */

#include "simplemqttserver.h"

10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
using namespace std;

void SimpleMQTTServer::initSockets(void)
{
  struct addrinfo hints;
  struct addrinfo *ainfo_head, *ainfo_cur;

  memset(&hints, 0, sizeof(struct addrinfo));
  hints.ai_socktype = SOCK_STREAM;
  hints.ai_flags = AI_PASSIVE;
  hints.ai_family = PF_UNSPEC;

  if (getaddrinfo(listenAddress.c_str(), listenPort.c_str(), &hints, &ainfo_head))
    throw new runtime_error("Error initializing socket.");

  ainfo_cur = ainfo_head;
  for (ainfo_cur = ainfo_head; ainfo_cur; ainfo_cur = ainfo_cur->ai_next) {

#ifdef SimpleMQTTVerbose
      /*
       * Print some details about the current addrinfo.
       */
      char buf[INET_ADDRSTRLEN+1], buf6[INET6_ADDRSTRLEN+1];
      if (ainfo_cur->ai_family == AF_INET) {
          buf[INET_ADDRSTRLEN] = 0;
          inet_ntop(AF_INET, &((struct sockaddr_in *)ainfo_cur->ai_addr)->sin_addr, buf, INET_ADDRSTRLEN);
          cout << "Initializing IPv4 socket:\n"
              << "\tAddress: " << buf << "\n"
              << "\tPort: " << ntohs(((struct sockaddr_in *)ainfo_cur->ai_addr)->sin_port) << "\n";
      }
      else if (ainfo_cur->ai_family == AF_INET6) {
          buf6[INET6_ADDRSTRLEN] = 0;
          inet_ntop(AF_INET6, &((struct sockaddr_in6 *)ainfo_cur->ai_addr)->sin6_addr, buf6, INET6_ADDRSTRLEN);
          cout << "Initializing IPv6 socket...\n"
              << "\tAddress: " << buf6 << "\n"
              << "\tPort: " << ntohs(((struct sockaddr_in6 *)ainfo_cur->ai_addr)->sin6_port) << "\n";
      }
      else {
          cout << "Unclear socket type.\n";
      }
#endif

      /*
       * Open the socket.
       */
      int sock = socket(ainfo_cur->ai_family, ainfo_cur->ai_socktype, ainfo_cur->ai_protocol);
      if (sock == -1) {
#ifdef SimpleMQTTVerbose
          cout << "Error: could not create socket.\n";
#endif
          continue;
      }

      /*
       * Set socket options.
       */
      int sopt = 1;
      setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &sopt, sizeof(sopt));
      sopt = 1;
      setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, &sopt, sizeof(sopt));
#if 1
      sopt = fcntl(sock, F_GETFL, 0);
      if (sopt == -1) {
          cout << "Warning: could not get socket options, ignoring socket.\n";
          close(sock);
          continue;
      }
      sopt |= O_NONBLOCK;
      if (fcntl(sock, F_SETFL, sopt) == -1) {
          cout << "Warning: could not set socket options, ignoring socket.\n";
          close(sock);
          continue;
      }
#endif
      /*
       * Bind and listen on socket.
       */
      if (::bind(sock, ainfo_cur->ai_addr, ainfo_cur->ai_addrlen) == -1) {
          cout << "Warning: could not bind to socket, ignoring socket.\n";
          close(sock);
          continue;
      }
      if (listen(sock, SimpleMQTTMaxBacklog) == -1) {
          cout << "Warning: could not listen on socket, ignoring socket.\n";
          close(sock);
          continue;
      }

      listenSockets.push_back(new int(sock));
  }

  freeaddrinfo(ainfo_head);

#ifdef SimpleMQTTVerbose
104
  cout << "Opened " << listenSockets.size() << " network socket(s) for MQTT connections.\n";
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
#endif
}

void SimpleMQTTServer::start()
{
  /*
   * Check if at least one socket is open.
   */
  if (listenSockets.size() == 0) {
      throw new invalid_argument("Failed to establish a listen socket with the given configuration.");
  }

  /*
   * Start one accept thread per socket.
   */
120
  for (unsigned int i=0; i<listenSockets.size(); i++)
121
    acceptThreads.push_back(new SimpleMQTTServerAcceptThread(listenSockets[i], messageCallback));
122
123
124
125
126
127
128
129
130
131
}

void SimpleMQTTServer::stop()
{
  /*
   * Terminate all running server threads.
   */
  acceptThreads.clear();
}

132
133
134
135
136
137
138
139
140
141
142
143
144
void SimpleMQTTServer::setMessageCallback(SimpleMQTTMessageCallback callback)
{
  /*
   * Set the function that will be called for each received
   * MQTT message and propagate to all accept threads.
   */
  messageCallback = callback;
  for (boost::ptr_list<SimpleMQTTServerAcceptThread>::iterator i = acceptThreads.begin(); i != acceptThreads.end(); i++) {
      (*i).setMessageCallback(callback);
  }
}


145
146
147
148
149
150
151
152
153
154
155
156
157
158
void SimpleMQTTServer::init(string addr, string port)
{
  /*
   * Assign all class internal variables with sane values and
   * do some simple validation checks.
   */
  if (addr.empty() || addr.length() == 0)
    throw new invalid_argument("The listen address cannot be empty.");
  listenAddress = addr;

  if (port.empty() || (strtol(port.c_str(), NULL, 10) == 0))
    throw new invalid_argument("Network port is not numeric.");
  listenPort = port;

159
160
  messageCallback = NULL;

161
162
163
164
165
166
  /*
   * Set up the sockets.
   */
  initSockets();
}

167
168
SimpleMQTTServer::SimpleMQTTServer()
{
169
170
171
172
173
  /*
   * Initialize server with default settings.
   */
  init("localhost", "1883");
}
174

175
176
177
178
179
180
SimpleMQTTServer::SimpleMQTTServer(std::string addr, std::string port)
{
  /*
   * Initialize server to listen on specified address and port.
   */
  init(addr, port);
181
182
183
184
}

SimpleMQTTServer::~SimpleMQTTServer()
{
185
186
187
188
189
190
191
192
  /*
   * Stop the server operation.
   */
  stop();

  /*
   * Close all listen sockets.
   */
193
  for (unsigned int i=0; i<listenSockets.size(); i++)
194
195
196
    close(listenSockets[i]);

  listenSockets.clear();
197
198
}