Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

simplemqttserver.cpp 4.71 KB
Newer Older
1
2
3
4
5
6
7
8
9
/*
 * simplemqttserver.cpp
 *
 *  Created on: Apr 5, 2013
 *      Author: Axel Auweter
 */

#include "simplemqttserver.h"

10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
using namespace std;

void SimpleMQTTServer::initSockets(void)
{
  struct addrinfo hints;
  struct addrinfo *ainfo_head, *ainfo_cur;

  memset(&hints, 0, sizeof(struct addrinfo));
  hints.ai_socktype = SOCK_STREAM;
  hints.ai_flags = AI_PASSIVE;
  hints.ai_family = PF_UNSPEC;

  if (getaddrinfo(listenAddress.c_str(), listenPort.c_str(), &hints, &ainfo_head))
    throw new runtime_error("Error initializing socket.");

  ainfo_cur = ainfo_head;
  for (ainfo_cur = ainfo_head; ainfo_cur; ainfo_cur = ainfo_cur->ai_next) {

#ifdef SimpleMQTTVerbose
      /*
       * Print some details about the current addrinfo.
       */
      char buf[INET_ADDRSTRLEN+1], buf6[INET6_ADDRSTRLEN+1];
      if (ainfo_cur->ai_family == AF_INET) {
          buf[INET_ADDRSTRLEN] = 0;
          inet_ntop(AF_INET, &((struct sockaddr_in *)ainfo_cur->ai_addr)->sin_addr, buf, INET_ADDRSTRLEN);
          cout << "Initializing IPv4 socket:\n"
              << "\tAddress: " << buf << "\n"
              << "\tPort: " << ntohs(((struct sockaddr_in *)ainfo_cur->ai_addr)->sin_port) << "\n";
      }
      else if (ainfo_cur->ai_family == AF_INET6) {
          buf6[INET6_ADDRSTRLEN] = 0;
          inet_ntop(AF_INET6, &((struct sockaddr_in6 *)ainfo_cur->ai_addr)->sin6_addr, buf6, INET6_ADDRSTRLEN);
          cout << "Initializing IPv6 socket...\n"
              << "\tAddress: " << buf6 << "\n"
              << "\tPort: " << ntohs(((struct sockaddr_in6 *)ainfo_cur->ai_addr)->sin6_port) << "\n";
      }
      else {
          cout << "Unclear socket type.\n";
      }
#endif

      /*
       * Open the socket.
       */
      int sock = socket(ainfo_cur->ai_family, ainfo_cur->ai_socktype, ainfo_cur->ai_protocol);
      if (sock == -1) {
#ifdef SimpleMQTTVerbose
          cout << "Error: could not create socket.\n";
#endif
          continue;
      }

      /*
       * Set socket options.
       */
      int sopt = 1;
      setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &sopt, sizeof(sopt));
      sopt = 1;
      setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, &sopt, sizeof(sopt));
#if 1
      sopt = fcntl(sock, F_GETFL, 0);
      if (sopt == -1) {
          cout << "Warning: could not get socket options, ignoring socket.\n";
          close(sock);
          continue;
      }
      sopt |= O_NONBLOCK;
      if (fcntl(sock, F_SETFL, sopt) == -1) {
          cout << "Warning: could not set socket options, ignoring socket.\n";
          close(sock);
          continue;
      }
#endif
      /*
       * Bind and listen on socket.
       */
      if (::bind(sock, ainfo_cur->ai_addr, ainfo_cur->ai_addrlen) == -1) {
          cout << "Warning: could not bind to socket, ignoring socket.\n";
          close(sock);
          continue;
      }
      if (listen(sock, SimpleMQTTMaxBacklog) == -1) {
          cout << "Warning: could not listen on socket, ignoring socket.\n";
          close(sock);
          continue;
      }

      listenSockets.push_back(new int(sock));
  }

  freeaddrinfo(ainfo_head);

#ifdef SimpleMQTTVerbose
  cout << "Opened " << listenSockets.size() << " network sockets for MQTT connections.\n";
#endif
}

void SimpleMQTTServer::start()
{
  /*
   * Check if at least one socket is open.
   */
  if (listenSockets.size() == 0) {
      throw new invalid_argument("Failed to establish a listen socket with the given configuration.");
  }

  /*
   * Start one accept thread per socket.
   */
120
  for (unsigned int i=0; i<listenSockets.size(); i++)
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
    acceptThreads.push_back(new SimpleMQTTServerAcceptThread(listenSockets[i]));
}

void SimpleMQTTServer::stop()
{
  /*
   * Terminate all running server threads.
   */
  acceptThreads.clear();
}

void SimpleMQTTServer::init(string addr, string port)
{
  /*
   * Assign all class internal variables with sane values and
   * do some simple validation checks.
   */
  if (addr.empty() || addr.length() == 0)
    throw new invalid_argument("The listen address cannot be empty.");
  listenAddress = addr;

  if (port.empty() || (strtol(port.c_str(), NULL, 10) == 0))
    throw new invalid_argument("Network port is not numeric.");
  listenPort = port;

  /*
   * Set up the sockets.
   */
  initSockets();
}

152
153
SimpleMQTTServer::SimpleMQTTServer()
{
154
155
156
157
158
  /*
   * Initialize server with default settings.
   */
  init("localhost", "1883");
}
159

160
161
162
163
164
165
SimpleMQTTServer::SimpleMQTTServer(std::string addr, std::string port)
{
  /*
   * Initialize server to listen on specified address and port.
   */
  init(addr, port);
166
167
168
169
}

SimpleMQTTServer::~SimpleMQTTServer()
{
170
171
172
173
174
175
176
177
  /*
   * Stop the server operation.
   */
  stop();

  /*
   * Close all listen sockets.
   */
178
  for (unsigned int i=0; i<listenSockets.size(); i++)
179
180
181
    close(listenSockets[i]);

  listenSockets.clear();
182
183
}