Commit da72f15e authored by Alessio Netti's avatar Alessio Netti

Multithreaded REST API

- Now using the global thread pool in dcdbpusher, collectagent and
grafanaserver to boost performance
parent c94e3e18
......@@ -37,14 +37,15 @@
std::placeholders::_2, \
std::placeholders::_3)
CARestAPI::CARestAPI(serverSettings_t settings,
influx_t* influxSettings,
SensorCache* sensorCache,
SensorDataStore* sensorDataStore,
SensorConfig *sensorConfig,
CARestAPI::CARestAPI(serverSettings_t settings,
influx_t* influxSettings,
SensorCache* sensorCache,
SensorDataStore* sensorDataStore,
SensorConfig *sensorConfig,
AnalyticsController* analyticsController,
SimpleMQTTServer* mqttServer) :
RESTHttpsServer(settings),
SimpleMQTTServer* mqttServer,
boost::asio::io_context& io) :
RESTHttpsServer(settings, io),
_influxSettings(influxSettings),
_sensorCache(sensorCache),
_sensorDataStore(sensorDataStore),
......
......@@ -46,11 +46,12 @@ class CARestAPI : public RESTHttpsServer {
public:
CARestAPI(serverSettings_t settings,
influx_t* influxSettings,
SensorCache* sensorCache,
SensorDataStore* sensorDataStore,
SensorConfig *sensorConfig,
SensorCache* sensorCache,
SensorDataStore* sensorDataStore,
SensorConfig *sensorConfig,
AnalyticsController* analyticsController,
SimpleMQTTServer* mqttServer);
SimpleMQTTServer* mqttServer,
boost::asio::io_context& io);
virtual ~CARestAPI() {}
......
......@@ -869,7 +869,7 @@ int main(int argc, char* const argv[]) {
* Start the HTTP Server for the REST API
*/
if (restAPISettings.enabled) {
httpsServer = new CARestAPI(restAPISettings, &config.influxSettings, &mySensorCache, mySensorDataStore, mySensorConfig, analyticsController, &ms);
httpsServer = new CARestAPI(restAPISettings, &config.influxSettings, &mySensorCache, mySensorDataStore, mySensorConfig, analyticsController, &ms, analyticsController->getIoService());
config.readRestAPIUsers(httpsServer);
httpsServer->start();
LOG(info) << "HTTP Server running...";
......@@ -934,14 +934,13 @@ int main(int argc, char* const argv[]) {
}
LOG(info) << "Stopping...";
analyticsController->stop();
ms.stop();
LOG(info) << "MQTT Server stopped...";
if (restAPISettings.enabled) {
httpsServer->stop();
delete httpsServer;
LOG(info) << "HTTP Server stopped...";
if (restAPISettings.enabled) {
httpsServer->stop();
LOG(info) << "HTTP Server stopped...";
}
analyticsController->stop();
delete mySensorDataStore;
delete myJobDataStore;
delete mySensorConfig;
......@@ -950,6 +949,9 @@ int main(int argc, char* const argv[]) {
delete dcdbConn;
delete metadataStore;
delete analyticsController;
if (restAPISettings.enabled) {
delete httpsServer;
}
LOG(info) << "Collect Agent closed. Bye bye...";
}
catch (const std::runtime_error& e) {
......
......@@ -185,7 +185,7 @@ public:
return;
}
ServerLOG(info) << "Starting...";
_serverThread = std::thread(&RESTHttpsServer::run, this);
startAccept();
_isRunning = true;
ServerLOG(info) << "Started!";
......@@ -202,11 +202,10 @@ public:
}
ServerLOG(info) << "Stopping...";
_io->stop();
_serverThread.join();
_io->restart();
_isRunning = false;
_acceptor->cancel();
_acceptor->close();
ServerLOG(info) << "Stopped!";
}
......@@ -253,7 +252,7 @@ public:
protected:
//This class should not be constructible on its own
RESTHttpsServer(serverSettings_t settings);
RESTHttpsServer(serverSettings_t settings, boost::asio::io_context& io);
virtual ~RESTHttpsServer() {
if(_isRunning)
......@@ -272,27 +271,15 @@ private:
void startAccept() {
// Start asynchronous wait until we get a connection.
// On connection start launch the session, transferring ownership of the socket
_acceptor->async_accept(*_socket, _remoteEndpoint,
// This will receive the new connection
auto newSocket = new tcp::socket(_io);
_acceptor->async_accept(*newSocket, _remoteEndpoint,
std::bind(&RESTHttpsServer::handle_session,
this,
std::ref(*_socket),
std::ref(*newSocket),
std::ref(*_ctx)));
}
/**
* @brief Main work function for the server thread.
*
* @details Kicks of the main server loop:
* 1. Asynchronously wait for connection
* 2. Process incoming request.
* 3. Repeat
* Runs until interruption (i.e. stop() is called)
*/
void run() {
startAccept();
_io->run();
}
/**
* @brief Handles an HTTP server connection session.
*
......@@ -357,15 +344,12 @@ private:
//FIXME remove once boost::beast includes a URI parser (https://github.com/boostorg/beast/issues/787)
std::string splitUri(const std::string& uri, queries_t& queries);
std::unique_ptr<boost::asio::io_context> _io; /**< Central io_context for all I/O */
std::unique_ptr<ssl::context> _ctx; /**< SSL context hold the certificates and is required for https support */
std::unique_ptr<tcp::socket> _socket; /** Socket object used for connections */
boost::asio::io_context& _io; /**< Central io_context for all I/O */
std::shared_ptr<ssl::context> _ctx; /**< SSL context hold the certificates and is required for https support */
std::unique_ptr<tcp::acceptor> _acceptor; /**< Acceptor receives incoming connections */
tcp::endpoint _remoteEndpoint; /**< Used to store information about the connecting client endpoint */
std::thread _serverThread; /**< Thread responsible for handling incoming requests */
bool _isRunning; /**< Indicate whether the server is already running */
apiEndpoints_t _endpoints;/**< Store all supported endpoints for later lookup*/
......
......@@ -61,14 +61,12 @@ struct send_lambda {
}
};
RESTHttpsServer::RESTHttpsServer(serverSettings_t settings) :
_retCode(0),
RESTHttpsServer::RESTHttpsServer(serverSettings_t settings, boost::asio::io_context& io) :
_retCode(0),
_io(io),
_isRunning(false) {
_io = std::unique_ptr<boost::asio::io_context>(
new boost::asio::io_context(1));
_ctx = std::unique_ptr<ssl::context>(new ssl::context(ssl::context::tls_server));
_ctx = std::shared_ptr<ssl::context>(new ssl::context(ssl::context::tls_server));
_ctx->set_options(ssl::context::default_workarounds |
ssl::context::no_tlsv1 |
......@@ -96,15 +94,12 @@ RESTHttpsServer::RESTHttpsServer(serverSettings_t settings) :
// 2048bit Diffie-Hellman parameters from RFC3526
static unsigned char const s_dh2048_pem[] = { 0x2D, 0x2D, 0x2D, 0x2D, 0x2D, 0x42, 0x45, 0x47, 0x49, 0x4E, 0x20, 0x44, 0x48, 0x20, 0x50, 0x41, 0x52, 0x41, 0x4D, 0x45, 0x54, 0x45, 0x52, 0x53, 0x2D, 0x2D, 0x2D, 0x2D, 0x2D, 0x0A, 0x4D, 0x49, 0x49, 0x42, 0x43, 0x41, 0x4B, 0x43, 0x41, 0x51, 0x45, 0x41, 0x2F, 0x2F, 0x2F, 0x2F, 0x2F, 0x2F, 0x2F, 0x2F, 0x2F, 0x2F, 0x2F, 0x4A, 0x44, 0x39, 0x71, 0x69, 0x49, 0x57, 0x6A, 0x43, 0x4E, 0x4D, 0x54, 0x47, 0x59, 0x6F, 0x75, 0x41, 0x33, 0x42, 0x7A, 0x52, 0x4B, 0x51, 0x4A, 0x4F, 0x43, 0x49, 0x70, 0x6E, 0x7A, 0x48, 0x51, 0x43, 0x43, 0x37, 0x36, 0x6D, 0x4F, 0x78, 0x4F, 0x62, 0x0A, 0x49, 0x6C, 0x46, 0x4B, 0x43, 0x48, 0x6D, 0x4F, 0x4E, 0x41, 0x54, 0x64, 0x37, 0x35, 0x55, 0x5A, 0x73, 0x38, 0x30, 0x36, 0x51, 0x78, 0x73, 0x77, 0x4B, 0x77, 0x70, 0x74, 0x38, 0x6C, 0x38, 0x55, 0x4E, 0x30, 0x2F, 0x68, 0x4E, 0x57, 0x31, 0x74, 0x55, 0x63, 0x4A, 0x46, 0x35, 0x49, 0x57, 0x31, 0x64, 0x6D, 0x4A, 0x65, 0x66, 0x73, 0x62, 0x30, 0x54, 0x45, 0x4C, 0x70, 0x70, 0x6A, 0x66, 0x74, 0x0A, 0x61, 0x77, 0x76, 0x2F, 0x58, 0x4C, 0x62, 0x30, 0x42, 0x72, 0x66, 0x74, 0x37, 0x6A, 0x68, 0x72, 0x2B, 0x31, 0x71, 0x4A, 0x6E, 0x36, 0x57, 0x75, 0x6E, 0x79, 0x51, 0x52, 0x66, 0x45, 0x73, 0x66, 0x35, 0x6B, 0x6B, 0x6F, 0x5A, 0x6C, 0x48, 0x73, 0x35, 0x46, 0x73, 0x39, 0x77, 0x67, 0x42, 0x38, 0x75, 0x4B, 0x46, 0x6A, 0x76, 0x77, 0x57, 0x59, 0x32, 0x6B, 0x67, 0x32, 0x48, 0x46, 0x58, 0x54, 0x0A, 0x6D, 0x6D, 0x6B, 0x57, 0x50, 0x36, 0x6A, 0x39, 0x4A, 0x4D, 0x39, 0x66, 0x67, 0x32, 0x56, 0x64, 0x49, 0x39, 0x79, 0x6A, 0x72, 0x5A, 0x59, 0x63, 0x59, 0x76, 0x4E, 0x57, 0x49, 0x49, 0x56, 0x53, 0x75, 0x35, 0x37, 0x56, 0x4B, 0x51, 0x64, 0x77, 0x6C, 0x70, 0x5A, 0x74, 0x5A, 0x77, 0x77, 0x31, 0x54, 0x6B, 0x71, 0x38, 0x6D, 0x41, 0x54, 0x78, 0x64, 0x47, 0x77, 0x49, 0x79, 0x68, 0x67, 0x68, 0x0A, 0x66, 0x44, 0x4B, 0x51, 0x58, 0x6B, 0x59, 0x75, 0x4E, 0x73, 0x34, 0x37, 0x34, 0x35, 0x35, 0x33, 0x4C, 0x42, 0x67, 0x4F, 0x68, 0x67, 0x4F, 0x62, 0x4A, 0x34, 0x4F, 0x69, 0x37, 0x41, 0x65, 0x69, 0x6A, 0x37, 0x58, 0x46, 0x58, 0x66, 0x42, 0x76, 0x54, 0x46, 0x4C, 0x4A, 0x33, 0x69, 0x76, 0x4C, 0x39, 0x70, 0x56, 0x59, 0x46, 0x78, 0x67, 0x35, 0x6C, 0x55, 0x6C, 0x38, 0x36, 0x70, 0x56, 0x71, 0x0A, 0x35, 0x52, 0x58, 0x53, 0x4A, 0x68, 0x69, 0x59, 0x2B, 0x67, 0x55, 0x51, 0x46, 0x58, 0x4B, 0x4F, 0x57, 0x6F, 0x71, 0x73, 0x71, 0x6D, 0x6A, 0x2F, 0x2F, 0x2F, 0x2F, 0x2F, 0x2F, 0x2F, 0x2F, 0x2F, 0x2F, 0x77, 0x49, 0x42, 0x41, 0x67, 0x3D, 0x3D, 0x0A, 0x2D, 0x2D, 0x2D, 0x2D, 0x2D, 0x45, 0x4E, 0x44, 0x20, 0x44, 0x48, 0x20, 0x50, 0x41, 0x52, 0x41, 0x4D, 0x45, 0x54, 0x45, 0x52, 0x53, 0x2D, 0x2D, 0x2D, 0x2D, 0x2D };
_ctx->use_tmp_dh(boost::asio::buffer(s_dh2048_pem));
// This will receive the new connection
_socket = std::unique_ptr<tcp::socket>(new tcp::socket(*_io));
try {
auto const address = boost::asio::ip::make_address(settings.host);
auto const port = static_cast<unsigned short>(std::stoul(settings.port));
_acceptor = std::unique_ptr<tcp::acceptor>(new tcp::acceptor(*_io, {address, port}));
_acceptor = std::unique_ptr<tcp::acceptor>(new tcp::acceptor(io, {address, port}));
_acceptor->set_option(tcp::acceptor::reuse_address(true));
} catch (const std::exception& e) {
ServerLOG(fatal) << "RestAPI address invalid! Please make sure IP address and port are valid!";
......@@ -115,29 +110,35 @@ RESTHttpsServer::RESTHttpsServer(serverSettings_t settings) :
void RESTHttpsServer::handle_session(tcp::socket& socket, ssl::context& ctx) {
ServerLOG(debug) << _remoteEndpoint.address().to_string() << ":"
<< _remoteEndpoint.port() << " connecting";
if(_isRunning) {
ServerLOG(debug) << _remoteEndpoint.address().to_string() << ":" << _remoteEndpoint.port() << " connecting";
// Launching a new async accept call
startAccept();
} else {
delete &socket;
return;
}
bool close = false;
boost::beast::error_code ec;
// Construct the stream around the socket
boost::beast::ssl_stream<tcp::socket&> stream{socket, ctx};
// Perform the SSL handshake
stream.handshake(ssl::stream_base::server, ec);
if(ec) {
ServerLOG(debug) << "handshake error: " << ec.message();
goto serverError;
}
{//scope, so any goto before does not cross variable initialization
// This buffer is required to persist across reads
boost::beast::flat_buffer buffer;
// This lambda is used to send messages
send_lambda<boost::beast::ssl_stream<tcp::socket&>> lambda{stream, close, ec};
while(true) {
// Declare a parser for a request with a string body
http::request_parser<http::string_body> parser;
......@@ -207,7 +208,9 @@ serverError:
socket.close(ec);
if(ec) { ServerLOG(debug) << "socket close error: " << ec.message(); }
startAccept();
// Getting rid of the allocated socket
delete &socket;
}
template<class Body, class Send>
......
......@@ -44,8 +44,8 @@ RestAPI::RestAPI(serverSettings_t settings,
PluginManager * pluginManager,
MQTTPusher * mqttPusher,
OperatorManager * manager,
boost::asio::io_service &io)
: RESTHttpsServer(settings),
boost::asio::io_context &io)
: RESTHttpsServer(settings, io),
_pluginManager(pluginManager),
_mqttPusher(mqttPusher),
_manager(manager),
......
......@@ -48,7 +48,7 @@ class RestAPI : public RESTHttpsServer {
PluginManager * pluginManager,
MQTTPusher * mqttPusher,
OperatorManager * manager,
boost::asio::io_service &io);
boost::asio::io_context &io);
virtual ~RestAPI() {}
......@@ -293,7 +293,7 @@ class RestAPI : public RESTHttpsServer {
PluginManager * _pluginManager;
MQTTPusher * _mqttPusher;
OperatorManager * _manager;
boost::asio::io_service &_io;
boost::asio::io_context &_io;
};
#endif /* DCDBPUSHER_RESTAPI_H_ */
......@@ -69,6 +69,7 @@
using namespace std;
int retCode = 0;
bool restAPIEnabled = false;
Configuration * _configuration;
MQTTPusher * _mqttPusher;
PluginManager * _pluginManager;
......@@ -146,6 +147,12 @@ void sigHandler(int sig) {
//Stop all sensors
_pluginManager->stopPlugin();
if (restAPIEnabled) {
//Stop https server
LOG(info) << "Stopping REST API Server...";
_httpsServer->stop();
}
//Stop io service by killing keepAliveWork
keepAliveWork.reset();
......@@ -477,7 +484,7 @@ int main(int argc, char **argv) {
LOG(info) << "Signal handlers registered!";
LOG(info) << "Cleaning up...";
bool restAPIEnabled = restAPISettings.enabled;
restAPIEnabled = restAPISettings.enabled;
delete _configuration;
LOG(info) << "Setup complete!";
......@@ -492,19 +499,15 @@ int main(int argc, char **argv) {
mqttThread.join();
LOG(info) << "MQTTPusher stopped.";
if (restAPIEnabled) {
//Stop https server
LOG(info) << "Stopping REST API Server...";
_httpsServer->stop();
delete _httpsServer;
}
LOG(info) << "Tearing down objects...";
_queryEngine.setNavigator(nullptr);
_queryEngine.setSensorMap(nullptr);
delete _mqttPusher;
delete _operatorManager;
delete _pluginManager;
if (restAPIEnabled) {
delete _httpsServer;
}
}
catch (const std::runtime_error& e) {
LOG(fatal) << e.what();
......
......@@ -77,13 +77,13 @@ void sigHandler(int sig) {
else if( sig == SIGTERM )
LOG(fatal) << "Received SIGTERM";
//Stop io service by killing keepAliveWork
keepAliveWork.reset();
//Stop https server
LOG(info) << "Stopping REST API Server...";
_httpsServer->stop();
//Stop io service by killing keepAliveWork
keepAliveWork.reset();
//Stop the Cassandra connection.
LOG(info) << "Closing Cassandra connection...";
_cassandraConnection->disconnect();
......@@ -247,7 +247,7 @@ int main(int argc, char *argv[])
throw std::runtime_error("Failed to connect to the Cassandra database!");
}
_httpsServer = new RestAPI(restAPISettings, hierarchySettings, _cassandraConnection);
_httpsServer = new RestAPI(restAPISettings, hierarchySettings, _cassandraConnection, io);
_configuration->readRestAPIUsers(_httpsServer);
LOG(info) << "Retrieving published sensor names and topics...";
......
......@@ -39,8 +39,9 @@
RestAPI::RestAPI(serverSettings_t settings,
hierarchySettings_t hierarchySettings,
DCDB::Connection* cassandraConnection) :
RESTHttpsServer(settings),
DCDB::Connection* cassandraConnection,
boost::asio::io_context &io) :
RESTHttpsServer(settings, io),
_connection(cassandraConnection),
_updating(false),
_hierarchySettings(hierarchySettings) {
......
......@@ -58,7 +58,8 @@ class RestAPI : public RESTHttpsServer {
public:
RestAPI(serverSettings_t settings,
hierarchySettings_t hierarchySettings,
DCDB::Connection* cassandraConnection);
DCDB::Connection* cassandraConnection,
boost::asio::io_context &io);
virtual ~RestAPI() {
if(_sensorConfig) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment