Commit d51f4d99 authored by Daniele Tafani's avatar Daniele Tafani
Browse files

New working version of grafanaserver based on boost beast.

parent 441b212e
......@@ -8,14 +8,12 @@ SOURCEFORGE_MROR = vorboss
CASSANDRA_VERSION = 3.0.18
MOSQUITTO_VERSION = 1.5.5
BOOST_VERSION = 1.70.0
OPENSSL_VERSION = 1.0.2l
OPENSSL_VERSION = 1.1.1c
CPPDRV_VERSION = 2.10.0
LIBUV_VERSION = 1.24.0
BACNET-STACK_VERSION = 0.8.6
FREEIPMI_VERSION = 1.6.3
NET-SNMP_VERSION = 5.8
CPPRESTSDK_VERSION = 2.10.6
CPPNET_VERSION = 0.12.0-final
OPENCV_VERSION = 4.1.0
BOOST_VERSION_U = $(subst .,_,$(BOOST_VERSION))
......@@ -29,18 +27,14 @@ DISTFILES = apache-cassandra-$(CASSANDRA_VERSION).tar.gz;http://archive.apache.o
bacnet-stack-$(BACNET-STACK_VERSION).tgz;https://downloads.sourceforge.net/project/bacnet/bacnet-stack/bacnet-stack-$(BACNET-STACK_VERSION)/bacnet-stack-$(BACNET-STACK_VERSION).tgz \
freeipmi-$(FREEIPMI_VERSION).tar.gz;http://ftp.gnu.org/gnu/freeipmi/freeipmi-$(FREEIPMI_VERSION).tar.gz \
net-snmp-$(NET-SNMP_VERSION).tar.gz;https://sourceforge.net/projects/net-snmp/files/net-snmp/$(NET-SNMP_VERSION)/net-snmp-$(NET-SNMP_VERSION).tar.gz/download \
cpprestsdk-$(CPPRESTSDK_VERSION).tar.gz;https://github.com/Microsoft/cpprestsdk/archive/v$(CPPRESTSDK_VERSION).tar.gz \
cpp-netlib-$(CPPNET_VERSION).tar.gz;http://downloads.cpp-netlib.org/0.12.0/cpp-netlib-$(CPPNET_VERSION).tar.gz \
opencv-$(OPENCV_VERSION).tar.gz;https://github.com/opencv/opencv/archive/$(OPENCV_VERSION).tar.gz
DISTFILES_HASHES = apache-cassandra-3.0.18.tar.gz|94dbdaa58b366166c53f881b8e266bc8;\
mosquitto-1.5.5.tar.gz|a17dffc6f63b2a4ab2eb5c51139e60e9;\
boost_1_70_0.tar.gz|fea771fe8176828fabf9c09242ee8c26;\
openssl-1.0.2l.tar.gz|f85123cd390e864dfbe517e7616e6566;\
openssl-1.1.1c.tar.gz|15e21da6efe8aa0e0768ffd8cd37a5f6;\
cpp-driver-2.10.0.tar.gz|6d15dd2cccd2efd1fdc86089d26971d0;\
libuv-v1.24.0.tar.gz|90320330757253b07404d2a97f59c66b;\
cpprestsdk-2.10.6.tar.gz|0a9b2424578fbeb1ac8465173ce8fc71; \
cpp-netlib-0.12.0-final.tar.gz|29b87c0e8c1a9e7fbdea5afcec947d53; \
bacnet-stack-0.8.6.tgz|544ebd42ed959deb2213209b66bbc460;\
freeipmi-1.6.3.tar.gz|b2d97e20db9b81b460ce1b9dad5bf54e;\
net-snmp-5.8.tar.gz|63bfc65fbb86cdb616598df1aff6458a; \
......@@ -192,44 +186,6 @@ $(DCDBDEPSPATH)/libuv-v$(LIBUV_VERSION)/.installed: $(DCDBDEPSPATH)/libuv-v$(LIB
@echo "Installing libuv..."
cd $(@D)/build && make install && touch $(@)
$(DCDBDEPSPATH)/cpp-netlib-$(CPPNET_VERSION)/.built: $(DCDBDEPSPATH)/cpp-netlib-$(CPPNET_VERSION)/.patched
@echo "Building cpp-netlib..."
mkdir -p $(DCDBDEPSPATH)/cpp-netlib_build
cd $(DCDBDEPSPATH)/cpp-netlib_build && \
CC=$(FULL_CC) CXX=$(FULL_CXX) cmake $(CMAKE_CROSS_FLAGS) \
-DCPP-NETLIB_ENABLE_HTTPS=off \
-DCPP-NETLIB_BUILD_TESTS=OFF \
-DCPP-NETLIB_BUILD_EXAMPLES=OFF \
-DCMAKE_INSTALL_LIBDIR=lib \
-DCMAKE_INSTALL_PREFIX=$(DCDBDEPLOYPATH)/ \
-DBOOST_ROOT=$(DCDBDEPSPATH)/boost_$(BOOST_VERSION_U)/ \
$(@D) && \
make -j $(MAKETHREADS) && touch $(@)
$(DCDBDEPSPATH)/cpp-netlib-$(CPPNET_VERSION)/.installed: $(DCDBDEPSPATH)/cpp-netlib-$(CPPNET_VERSION)/.built | $(DCDBDEPLOYPATH)
@echo "Installing cpp-netlib..."
cd $(DCDBDEPSPATH)/cpp-netlib_build && make install && touch $(@)
$(DCDBDEPSPATH)/cpprestsdk-$(CPPRESTSDK_VERSION)/.built: $(DCDBDEPSPATH)/cpprestsdk-$(CPPRESTSDK_VERSION)/.patched
@echo "Building cpprestsdk..."
mkdir -p $(DCDBDEPSPATH)/cpprestsdk_build
cd $(DCDBDEPSPATH)/cpprestsdk_build && \
cmake -DCMAKE_BUILD_TYPE=Release \
-DBOOST_ROOT=$(DCDBDEPSPATH)/boost_$(BOOST_VERSION_U) \
-DOPENSSL_ROOT_DIR=$(DCDBDEPSPATH)/openssl-$(OPENSSL_VERSION) \
-DCPPREST_EXCLUDE_WEBSOCKETS=ON \
-DCPPREST_EXCLUDE_COMPRESSION=ON \
-DBUILD_TESTS=OFF \
-DBUILD_SAMPLES=OFF \
-DCMAKE_INSTALL_PREFIX=$(DCDBDEPLOYPATH)/ \
$(@D)/Release && \
make -j $(MAKETHREADS) && touch $(@)
$(DCDBDEPSPATH)/cpprestsdk-$(CPPRESTSDK_VERSION)/.installed: $(DCDBDEPSPATH)/cpprestsdk-$(CPPRESTSDK_VERSION)/.built
@echo "Installing cpprestsdk..."
cd $(DCDBDEPSPATH)/cpprestsdk_build && make install && touch $(@)
$(DCDBDEPSPATH)/cpp-driver-$(CPPDRV_VERSION)/.built: $(DCDBDEPSPATH)/cpp-driver-$(CPPDRV_VERSION)/.patched $(DCDBDEPSPATH)/openssl-$(OPENSSL_VERSION)/.built $(DCDBDEPSPATH)/libuv-v$(LIBUV_VERSION)/.built
@echo "Building cpp-driver..."
mkdir -p $(@D)/build
......
......@@ -336,7 +336,8 @@ bool OperatorManager::stop(const string& plugin, const string& operatorN) {
#define stdBind(fun) std::bind(&OperatorManager::fun, \
this, \
std::placeholders::_1, \
std::placeholders::_2)
std::placeholders::_2, \
std::placeholders::_3)
void OperatorManager::addRestEndpoints(RESTHttpsServer* restServer) {
restServer->addEndpoint("/analytics/help", {http::verb::get, stdBind(GET_analytics_help)});
......
......@@ -30,7 +30,8 @@
#define stdBind(fun) std::bind(&CARestAPI::fun, \
this, \
std::placeholders::_1, \
std::placeholders::_2)
std::placeholders::_2, \
std::placeholders::_3)
CARestAPI::CARestAPI(serverSettings_t settings,
SensorCache* sensorCache,
......
......@@ -51,7 +51,7 @@
#define SERVER_STRING "RestAPIServer"
#define endpointArgs http::response<http::string_body>& res, queries_t& queries
#define endpointArgs http::request<http::string_body>& req, http::response<http::string_body>& res, queries_t& queries
/******************************************************************************/
......@@ -80,7 +80,7 @@ using userBase_t = std::unordered_map<std::string, userAttributes_t>;
using queries_t = std::unordered_map<std::string, std::string>;
using apiEndpointHandler_t = std::function<void(http::response<http::string_body>&, queries_t&)>;
using apiEndpointHandler_t = std::function<void(http::request<http::string_body>&, http::response<http::string_body>&, queries_t&)>;
using apiEndpoint_t = std::pair<http::verb, apiEndpointHandler_t>;
using apiEndpoints_t = std::unordered_map<std::string, apiEndpoint_t>;
......
......@@ -134,6 +134,7 @@ void RESTHttpsServer::handle_session(tcp::socket& socket, ssl::context& ctx) {
while(true) {
// Read a request
http::request<http::string_body> req;
std::cout << req.body();
http::read(stream, buffer, req, ec);
if(ec == http::error::end_of_stream) {
break;
......@@ -198,7 +199,7 @@ void RESTHttpsServer::handle_request(http::request<Body>& req, Send&& send) {
if (endpoint.first == req.method()) {
//Everything matches --> call the endpoint function
ServerLOG(info) << req.method_string() << " " << endpointName << " requested";
endpoint.second(res, queries);
endpoint.second(req, res, queries);
} else {
const std::string msg = "Request method " + req.method_string().to_string() +
" does not match endpoint " + endpointName + "\n";
......@@ -212,7 +213,7 @@ void RESTHttpsServer::handle_request(http::request<Body>& req, Send&& send) {
res.body() = "Invalid endpoint\n";
}
ServerLOG(info) << "Responding:\n" << res.body();
//ServerLOG(info) << "Responding:\n" << res.body();
res.prepare_payload();
send(std::move(res));
......
......@@ -149,7 +149,12 @@ bool GlobalConfiguration::readRestAPIUsers(RESTHttpsServer* server) {
#ifdef DEBUG
LOG(info) << " Permission \"PUT\"";
#endif
attributes.second[PUT] = true;
attributes.second[POST] = true;
} else if (boost::iequals(perm.first, "POST")) {
#ifdef DEBUG
LOG(info) << " Permission \"POST\"";
#endif
attributes.second[POST] = true;
} else if (boost::iequals(perm.first, "password")) {
attributes.first = perm.second.data();
#ifdef DEBUG
......
/*
* HttpsServer.cpp
*
* Created on: 25.05.2018
* Author: Micha Mueller
*/
#include "HttpsServer.h"
#include "timestamp.h"
#include <iostream>
#include <memory>
#include <functional>
#include <string>
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/algorithm/string/split.hpp>
#define LOGH(sev) LOG(sev) << "HttpsServer: "
HttpsServer::requestHandler::requestHandler(HttpsServer& httpsServer) : _httpsServer(httpsServer) {}
void HttpsServer::requestHandler::operator()(server::request const &request, server::connection_ptr connection) {
//first log some info about client
server::string_type ip = source(request);
unsigned int port = request.source_port;
LOGH(info) << ip << ":" << port << " connected";
//set appropriate default value to connection status
connection->set_status(server::connection::internal_server_error);
std::ostringstream data;
boost::network::uri::uri uri("https://" + request.destination);
server::string_type method = request.method;
server::string_type path = uri.path();
server::string_type query = uri.query();
std::string response = "";
std::string auth_value = "";
bool json = false;
std::vector<std::string> pathStrs;
std::vector<std::pair<std::string, std::string>> queries;
//first check if request is supported at all
if (method != "GET" && method != "PUT") {
LOGH(warning) << "Unsupported " << method << " request was made";
connection->set_status(server::connection::not_supported);
goto error;
}
LOGH(info) << method << " request of " << request.destination << " was made";
//do some string processing
//split path into its hierarchical parts
if (path.size() >= 2) {
if (path[0] == '/') {
path.erase(0,1);
}
if (path[path.size() -1] == '/') {
path.erase(path.size() -1);
}
boost::split(pathStrs, path, boost::is_any_of("/"), boost::token_compress_off);
}
//split query part into the individual queries (key-value pairs)
{ //do not remove the enclosing brackets
//need to encapsulate this code block to keep queryStrs local
std::vector<std::string> queryStrs;
boost::split(queryStrs, query, boost::is_any_of(";"), boost::token_compress_on);
for(auto& key : queryStrs) {
size_t pos = key.find("=");
if (pos != std::string::npos) {
std::string value;
value = key.substr(pos+1);
key.erase(pos);
queries.push_back(std::make_pair(key, value));
}
}
}
//finished string processing
for (auto& p : queries) {
//authkey is required in every case
if (p.first == "authkey") {
auth_value = p.second;
} else if (p.first == "json") {
if (stoi(p.second) > 0) {
json = true;
}
}
}
if (pathStrs.size() < 1) {
LOGH(warning) << "Received malformed request: No first path part";
connection->set_status(server::connection::bad_request);
goto error;
}
//select code depending on request
if (method == "GET") {
if (pathStrs[0] == "help") {
response = "dcdbpusher RESTful API cheatsheet:\n"
" -GET: /help This help message\n"
" /analytics/help\n"
" An help message for data analytics commands\n"
" /plugins List of currently loaded plugins (Discovery)\n"
" /[plugin]/sensors\n"
" List of currently running sensors which belong\n"
" to the specified plugin (Discovery)\n"
" /[plugin]/[sensor]/avg?interval=[timeInSec]\n"
" Average of last sensor readings from the last\n"
" [interval] seconds or of all cached readings\n"
" if no interval is given\n"
" -PUT: /[plugin]/[start|stop|reload]\n"
" Start/stop the sensors of the plugin or\n"
" reload the plugin configuration\n"
"\n";
//"All resources have to be prepended by host:port and need at\n"
//"least the query ?authkey=[token] at the end. Multiple queries\n"
//"need to be separated by semicolons(';')\n";
response += _httpsServer._manager->restCheatSheet;
} else {
//first check permission
if (!_httpsServer.check_authkey(auth_value, permission::GETReq)) {
LOGH(warning) << "Provided authentication token has insufficient permissions";
connection->set_status(server::connection::unauthorized);
goto error;
}
//Managing REST GET commands to the data analytics framework
if(pathStrs[0] == "analytics") {
try {
restResponse_t reply = _httpsServer._manager->REST(pathStrs, queries, method, _httpsServer._io);
data << reply.data;
response = reply.response;
} catch(const std::invalid_argument &e) {
LOGH(warning) << e.what();
connection->set_status(server::connection::bad_request);
goto error;
} catch(const std::domain_error &e) {
response = e.what();
connection->set_status(server::connection::not_found);
} catch(const std::exception &e) {
LOGH(warning) << e.what();
connection->set_status(server::connection::internal_server_error);
goto error;
}
} else if (pathStrs[0] == "plugins") {
if (json) {
boost::property_tree::ptree root, plugins;
for(auto& p : _httpsServer._plugins) {
plugins.put(p.id, "");
}
root.add_child("plugins", plugins);
boost::property_tree::write_json(data, root, true);
} else {
for(auto& p : _httpsServer._plugins) {
data << p.id << "\n";
}
}
} else {
//do some prior checks
if (pathStrs.size() < 2) {
LOGH(warning) << "Received malformed request: No second path part";
connection->set_status(server::connection::bad_request);
goto error;
}
if (pathStrs[1] == "sensors") {
response = "Plugin not found!";
connection->set_status(server::connection::not_found);
for(auto& p : _httpsServer._plugins) {
if (p.id == pathStrs[0]) {
if (json) {
boost::property_tree::ptree root, sensors;
for(auto g : p.configurator->getSensorGroups()) {
boost::property_tree::ptree group;
for(auto s : g->getSensors()) {
group.put(s->getName(), s->getMqtt());
}
sensors.add_child(g->getGroupName(), group);
}
root.add_child(p.id, sensors);
boost::property_tree::write_json(data, root, true);
} else {
for(auto g : p.configurator->getSensorGroups()) {
for(auto s : g->getSensors()) {
data << g->getGroupName() << "." << s->getName() << " " << s->getMqtt() << "\n";
}
}
}
response = "";
connection->set_status(server::connection::ok);
break;
}
}
} else {
if (pathStrs.size() < 3) {
LOGH(warning) << "Received malformed request: No third path part";
connection->set_status(server::connection::bad_request);
goto error;
}
if (pathStrs[2] != "avg") {
LOGH(warning) << "Unknown action " << pathStrs[2] << " requested";
connection->set_status(server::connection::not_supported);
goto error;
}
std::string sensor = pathStrs[1];
std::string action = pathStrs[2];
uint64_t time = 0;
for (auto& p : queries) {
if (p.first == "interval")
time = std::stoul(p.second);
}
//process actual request
bool found = false;
response = "Plugin not found!";
connection->set_status(server::connection::not_found);
for(auto& p : _httpsServer._plugins) {
if (p.id == pathStrs[0]) {
response = "Sensor not found!";
for(const auto& g : p.configurator->getSensorGroups()) {
for(const auto& s : g->getSensors()) {
if (s->getName() == sensor && s->isInit()) {
found = true;
uint64_t avg = 0;
try {
avg = s->getCache()->getAverage(S_TO_NS(time));
} catch(const std::exception& e) {
response = "Unable to compute average: ";
response += e.what();
connection->set_status(server::connection::internal_server_error);
break;
}
response = pathStrs[0] + "::" + sensor + " Average of last " +
std::to_string(time) + " seconds is " + std::to_string(avg);
connection->set_status(server::connection::ok);
break;
}
}
}
}
}
if(!found) {
for(auto& p : _httpsServer._manager->getPlugins())
if (p.id == pathStrs[0]) {
response = "Sensor not found!";
for(const auto& a : p.configurator->getAnalyzers())
if(a->getStreaming())
for(const auto& u : a->getUnits())
for (const auto& s : u->getBaseOutputs())
if (s->getName() == sensor && s->isInit()) {
found = true;
uint64_t avg = 0;
try {
avg = s->getCache()->getAverage(S_TO_NS(time));
} catch(const std::exception& e) {
response = "Unable to compute average: ";
response += e.what();
connection->set_status(server::connection::internal_server_error);
break;
}
response = pathStrs[0] + "::" + sensor + " Average of last " +
std::to_string(time) + " seconds is " + std::to_string(avg);
connection->set_status(server::connection::ok);
break;
}
}
}
}
}
}
} else if (method == "PUT") {
//first check permission
if (!_httpsServer.check_authkey(auth_value, permission::PUTReq)) {
LOGH(warning) << "Provided authentication token has insufficient permissions";
connection->set_status(server::connection::unauthorized);
goto error;
}
//Managing REST PUT commands to the data analytics framework
if(pathStrs[0] == "analytics") {
if( pathStrs.back() == "reload" ) {
_httpsServer._mqttPusher->halt();
// Wait until MQTTPusher is paused in order to reload plugins
while (!_httpsServer._mqttPusher->isHalted()) { sleep(1); }
}
try {
restResponse_t reply = _httpsServer._manager->REST(pathStrs, queries, method, _httpsServer._io);
data << reply.data;
response = reply.response;
} catch(const std::invalid_argument &e) {
LOGH(warning) << e.what();
connection->set_status(server::connection::bad_request);
goto error;
} catch(const std::domain_error &e) {
response = e.what();
connection->set_status(server::connection::not_found);
} catch(const std::exception &e) {
response = e.what();
connection->set_status(server::connection::internal_server_error);
}
// Continue MQTTPusher when a reload was performed
if( pathStrs.back() == "reload" )
_httpsServer._mqttPusher->cont();
} else {
if (pathStrs.size() < 2) {
LOGH(warning) << "Received malformed request: No second path part";
connection->set_status(server::connection::bad_request);
goto error;
}
std::string action = pathStrs[1];
//process actual request
response = "Plugin not found!";
connection->set_status(server::connection::not_found);
//switch code depending on selected action
if (action == "start") {
for (auto &p : _httpsServer._plugins) {
if (p.id == pathStrs[0]) {
for (const auto &g : p.configurator->getSensorGroups()) {
g->start();
}
response = "Plugin " + pathStrs[0] + ": Sensors started";
connection->set_status(server::connection::ok);
break;
}
}
} else if (action == "stop") {
for (auto &p : _httpsServer._plugins) {
if (p.id == pathStrs[0]) {
for (const auto &g : p.configurator->getSensorGroups()) {
g->stop();
}
response = "Plugin " + pathStrs[0] + ": Sensors stopped";
connection->set_status(server::connection::ok);
break;
}
}
} else if (action == "reload") {
for (auto &p : _httpsServer._plugins) {
if (p.id == pathStrs[0]) {
//before modifying the plugin we need to ensure that we have exclusive access
//therefore pause the only other concurrent user (MQTTPusher)
_httpsServer._mqttPusher->halt();
//wait until MQTTPusher is paused
while (!_httpsServer._mqttPusher->isHalted()) {
sleep(1);
}
// Removing obsolete MQTT topics
_httpsServer.removeTopics(p);
if (p.configurator->reReadConfig()) {
// Perform checks on MQTT topics
if(!_httpsServer.checkTopics(p)) {
response = "Plugin " + pathStrs[0] + ": problematic MQTT topics or sensor names, please check your config files!";
connection->set_status(server::connection::internal_server_error);
_httpsServer.removeTopics(p);
p.configurator->clearConfig();
} else {
response = "Plugin " + pathStrs[0] + ": Configuration reloaded";
connection->set_status(server::connection::ok);
for (const auto &g : p.configurator->getSensorGroups()) {
g->init(_httpsServer._io);
g->start();
}
}
} else {
response = "Plugin " + pathStrs[0] + ": Could not reload configuration";
connection->set_status(server::connection::internal_server_error);
}
//continue MQTTPusher
_httpsServer._mqttPusher->cont();
break;
}
}
} else {
LOGH(warning) << "Unknown action " << pathStrs[1] << " requested";
connection->set_status(server::connection::not_supported);
goto error;
}
//Updating the SensorNavigator on plugin reloads
if(action == "reload") {
QueryEngine &qEngine = QueryEngine::getInstance();
std::shared_ptr <SensorNavigator> navigator = std::make_shared<SensorNavigator>();
vector <std::string> names, topics;
for (const auto &p : _httpsServer._plugins)
for (const auto &g : p.configurator->getSensorGroups())
for (const auto &s : g->getSensors()) {
names.push_back(s->getName());
topics.push_back(s->getMqtt());
}
navigator->buildTree(qEngine.getSensorHierarchy(), &names, &topics);
qEngine.setNavigator(navigator);
qEngine.triggerUpdate();
}
}
}
LOGH(info) << "Responding: " << response;
data << response << std::endl;
//jump right here if an error was encountered.
//An empty response will be send while the connections status should be set to an appropriate error state
error:
server::response_header headers[] = { {"Connection", "close"}, {"Content-Type", "text/plain"} };
connection->set_headers(boost::make_iterator_range(headers, headers + 2));
connection->write(data.str());
}
void HttpsServer::requestHandler::log(const server::string_type& message) {
LOGH(error) << message;
}
HttpsServer::HttpsServer(restAPISettings_t restAPISettings, pluginVector_t& plugins, MQTTPusher* mqttPusher, AnalyticsManager* manager, boost::asio::io_service& io) :
_plugins(plugins), _mqttPusher(mqttPusher), _manager(manager), _io(io), _handler(*this) {
std::shared_ptr<asio::ssl::context> ctx = std::make_shared<asio::ssl::context>(asio::ssl::context::sslv23);
ctx->set_options(asio::ssl::context::default_workarounds | asio::ssl::context::no_sslv3 | asio::ssl::context::single_dh_use);
// Set certificate, private key and DH parameters