2.12.2021, 9:00 - 11:00: Due to updates GitLab may be unavailable for some minutes between 09:00 and 11:00.

simplemqttserver.cpp 6.54 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
//================================================================================
// Name        : simplemqttserver.cpp
// Author      : Axel Auweter
// Copyright   : Leibniz Supercomputing Centre
// Description : Implementation of a simple MQTT message server
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2011-2016 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
26
27
28

#include "simplemqttserver.h"

29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
using namespace std;

void SimpleMQTTServer::initSockets(void)
{
  struct addrinfo hints;
  struct addrinfo *ainfo_head, *ainfo_cur;

  memset(&hints, 0, sizeof(struct addrinfo));
  hints.ai_socktype = SOCK_STREAM;
  hints.ai_flags = AI_PASSIVE;
  hints.ai_family = PF_UNSPEC;

  if (getaddrinfo(listenAddress.c_str(), listenPort.c_str(), &hints, &ainfo_head))
    throw new runtime_error("Error initializing socket.");

  ainfo_cur = ainfo_head;
  for (ainfo_cur = ainfo_head; ainfo_cur; ainfo_cur = ainfo_cur->ai_next) {

#ifdef SimpleMQTTVerbose
      /*
       * Print some details about the current addrinfo.
       */
      char buf[INET_ADDRSTRLEN+1], buf6[INET6_ADDRSTRLEN+1];
      if (ainfo_cur->ai_family == AF_INET) {
          buf[INET_ADDRSTRLEN] = 0;
          inet_ntop(AF_INET, &((struct sockaddr_in *)ainfo_cur->ai_addr)->sin_addr, buf, INET_ADDRSTRLEN);
          cout << "Initializing IPv4 socket:\n"
              << "\tAddress: " << buf << "\n"
              << "\tPort: " << ntohs(((struct sockaddr_in *)ainfo_cur->ai_addr)->sin_port) << "\n";
      }
      else if (ainfo_cur->ai_family == AF_INET6) {
          buf6[INET6_ADDRSTRLEN] = 0;
          inet_ntop(AF_INET6, &((struct sockaddr_in6 *)ainfo_cur->ai_addr)->sin6_addr, buf6, INET6_ADDRSTRLEN);
          cout << "Initializing IPv6 socket...\n"
              << "\tAddress: " << buf6 << "\n"
              << "\tPort: " << ntohs(((struct sockaddr_in6 *)ainfo_cur->ai_addr)->sin6_port) << "\n";
      }
      else {
          cout << "Unclear socket type.\n";
      }
#endif

      /*
       * Open the socket.
       */
      int sock = socket(ainfo_cur->ai_family, ainfo_cur->ai_socktype, ainfo_cur->ai_protocol);
      if (sock == -1) {
#ifdef SimpleMQTTVerbose
          cout << "Error: could not create socket.\n";
#endif
          continue;
      }

      /*
       * Set socket options.
       */
      int sopt = 1;
      setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &sopt, sizeof(sopt));
      sopt = 1;
      setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, &sopt, sizeof(sopt));
#if 1
      sopt = fcntl(sock, F_GETFL, 0);
      if (sopt == -1) {
          cout << "Warning: could not get socket options, ignoring socket.\n";
          close(sock);
          continue;
      }
      sopt |= O_NONBLOCK;
      if (fcntl(sock, F_SETFL, sopt) == -1) {
          cout << "Warning: could not set socket options, ignoring socket.\n";
          close(sock);
          continue;
      }
#endif
      /*
       * Bind and listen on socket.
       */
      if (::bind(sock, ainfo_cur->ai_addr, ainfo_cur->ai_addrlen) == -1) {
107
          LOG(warning) << "Could not bind to socket, ignoring socket.";
108
109
110
111
          close(sock);
          continue;
      }
      if (listen(sock, SimpleMQTTMaxBacklog) == -1) {
112
          LOG(warning) << "Could not listen on socket, ignoring socket.";
113
114
115
116
117
118
119
120
121
122
          close(sock);
          continue;
      }

      listenSockets.push_back(new int(sock));
  }

  freeaddrinfo(ainfo_head);

#ifdef SimpleMQTTVerbose
123
  cout << "Opened " << listenSockets.size() << " network socket(s) for MQTT connections.\n";
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#endif
}

void SimpleMQTTServer::start()
{
  /*
   * Check if at least one socket is open.
   */
  if (listenSockets.size() == 0) {
      throw new invalid_argument("Failed to establish a listen socket with the given configuration.");
  }

  /*
   * Start one accept thread per socket.
   */
139
  for (unsigned int i=0; i<listenSockets.size(); i++)
140
    acceptThreads.push_back(new SimpleMQTTServerAcceptThread(listenSockets[i], messageCallback, this->_maxThreads, this->_maxConnPerThread));
141
142
143
144
145
146
147
148
149
150
}

void SimpleMQTTServer::stop()
{
  /*
   * Terminate all running server threads.
   */
  acceptThreads.clear();
}

151
152
153
154
155
156
157
158
159
160
161
162
163
void SimpleMQTTServer::setMessageCallback(SimpleMQTTMessageCallback callback)
{
  /*
   * Set the function that will be called for each received
   * MQTT message and propagate to all accept threads.
   */
  messageCallback = callback;
  for (boost::ptr_list<SimpleMQTTServerAcceptThread>::iterator i = acceptThreads.begin(); i != acceptThreads.end(); i++) {
      (*i).setMessageCallback(callback);
  }
}


164
165
166
167
168
169
170
171
172
173
174
175
176
177
void SimpleMQTTServer::init(string addr, string port)
{
  /*
   * Assign all class internal variables with sane values and
   * do some simple validation checks.
   */
  if (addr.empty() || addr.length() == 0)
    throw new invalid_argument("The listen address cannot be empty.");
  listenAddress = addr;

  if (port.empty() || (strtol(port.c_str(), NULL, 10) == 0))
    throw new invalid_argument("Network port is not numeric.");
  listenPort = port;

178
179
  messageCallback = NULL;

180
181
182
183
184
185
  /*
   * Set up the sockets.
   */
  initSockets();
}

186
187
SimpleMQTTServer::SimpleMQTTServer()
{
188
189
190
191
192
  /*
   * Initialize server with default settings.
   */
  init("localhost", "1883");
}
193

194
SimpleMQTTServer::SimpleMQTTServer(std::string addr, std::string port, uint64_t maxThreads, uint64_t maxConnPerThread)
195
196
197
198
{
  /*
   * Initialize server to listen on specified address and port.
   */
199
200
  this->_maxThreads = maxThreads;
  this->_maxConnPerThread = maxConnPerThread;
201
  init(addr, port);
202
203
204
205
}

SimpleMQTTServer::~SimpleMQTTServer()
{
206
207
208
209
210
211
212
213
  /*
   * Stop the server operation.
   */
  stop();

  /*
   * Close all listen sockets.
   */
214
  for (unsigned int i=0; i<listenSockets.size(); i++)
215
216
217
    close(listenSockets[i]);

  listenSockets.clear();
218
219
}