Commit eb9b1a6c authored by Micha Mueller's avatar Micha Mueller
Browse files

CollectAgent: switch to new uniform CommonRestServer design

parent 294e940d
/*
* CARestAPI.cpp
*
* Created on: 24.05.2019
* Author: Micha Mueller
*/
#include "CARestAPI.h"
#define stdBind(fun) std::bind(&CARestAPI::fun, \
this, \
std::placeholders::_1, \
std::placeholders::_2)
CARestAPI::CARestAPI(serverSettings_t settings,
SensorCache* sensorCache,
AnalyticsController* analyticsController) :
RESTHttpsServer(settings),
_sensorCache(sensorCache),
_analyticsController(analyticsController) {
addEndpoint("/help", {http::verb::get, stdBind(GET_help)});
addEndpoint("/average", {http::verb::get, stdBind(GET_average)});
_analyticsController->getManager()->addRestEndpoints(this);
//Overwrite analytic's reload endpoint because we need to stop analyticscontroller beforehand
addEndpoint("/analytics/reload", {http::verb::put, stdBind(PUT_analytics_reload)});
}
void CARestAPI::GET_help(endpointArgs) {
res.body() = caRestCheatSheet + _analyticsController->getManager()->restCheatSheet;
res.result(http::status::ok);
}
void CARestAPI::GET_average(endpointArgs) {
const std::string sensor = getQuery("sensor", queries);
const std::string interval = getQuery("interval", queries);
if (sensor == "") {
res.body() = "Request malformed: sensor query missing";
res.result(http::status::bad_request);
return;
}
uint64_t time = 0;
if (interval != "") {
try {
time = std::stoul(interval);
} catch (const std::exception& e) {
RESTAPILOG(warning) << "Bad interval query: " << e.what();
res.body() = "Bad interval query!\n";
res.result(http::status::bad_request);
return;
}
}
//try getting the latest value
try {
//TODO: switch from SID input to sensor name input
int64_t val = _sensorCache->getSensor(sensor, (uint64_t) time * 1000000000);
res.body() = "collectagent::" + sensor + " Average of last " +
std::to_string(time) + " seconds is " + std::to_string(val);
res.result(http::status::ok);
//std::ostringstream data;
//data << val << "\n";
//data << "Sid : " << sid.toString() << ", Value: " << val << "." << std::endl;
//res.body() = data.str();
} catch (const std::invalid_argument &e) {
res.body() = "Error: Sensor id not found.\n";
res.result(http::status::not_found);
} catch (const std::out_of_range &e) {
res.body() = "Error: Sensor unavailable.\n";
res.result(http::status::no_content);
} catch (const std::exception &e) {
res.body() = "Internal server error.\n";
res.result(http::status::internal_server_error);
RESTAPILOG(warning) << "Internal server error: " << e.what();
}
}
void CARestAPI::PUT_analytics_reload(endpointArgs) {
if (_analyticsController->getManager()->getStatus() != AnalyticsManager::LOADED) {
res.body() = "AnalyticsManager is not loaded!\n";
res.result(http::status::internal_server_error);
return;
}
const std::string plugin = getQuery("plugin", queries);
// Wait until controller is paused in order to reload plugins
_analyticsController->halt(true);
if (!_analyticsController->getManager()->reload(_analyticsController->getIoService(), plugin)) {
res.body() = "Plugin not found or reload failed, please check the config files and MQTT topics!\n";
res.result(http::status::not_found);
} else if (_analyticsController->getManager()->start(plugin)){
res.body() = "Plugin cannot be restarted!\n";
res.result(http::status::internal_server_error);
} else {
res.body() = "Plugin " + plugin + ": Sensors reloaded\n";
res.result(http::status::ok);
}
_analyticsController->resume();
}
/*
* CARestAPI.h
*
* Created on: 24.05.2019
* Author: Micha Mueller
*/
#ifndef COLLECTAGENT_CARESTAPI_H_
#define COLLECTAGENT_CARESTAPI_H_
#include "RESTHttpsServer.h"
#include "analyticscontroller.h"
class CARestAPI : public RESTHttpsServer {
public:
CARestAPI(serverSettings_t settings,
SensorCache* sensorCache,
AnalyticsController* analyticsController);
virtual ~CARestAPI() {}
// String used as a response for the REST GET /help command
const std::string caRestCheatSheet = "collectAgent RESTful API cheatsheet:\n"
" -GET: /help This help message.\n"
" /analytics/help\n"
" An help message for data analytics commands.\n"
" /average?sensor;[interval]\n"
" Average of last sensor readings from the last\n"
" [interval] seconds or of all cached readings\n"
" if no interval is given\n"
"\n";
private:
/**
* GET "/help"
*
* @brief Return a cheatsheet of possible REST API endpoints.
*
* Queries | key | possible values | explanation
* -------------------------------------------------------------------------
* Required | - | - | -
* Optional | - | - | -
*/
void GET_help(endpointArgs);
/**
* GET "/average"
*
* @brief Get the average of the last readings of a sensor.
*
* Queries | key | possible values | explanation
* -------------------------------------------------------------------------
* Required | sensor | all sensor names of | specify the sensor within the
* | | the plugin within the| plugin
* | | sensor cache |
* Optional | interval| number of seconds | use only readings more recent
* | | | than (now - interval) for
* | | | average calculation
*/
void GET_average(endpointArgs);
/**
* PUT "/analytics/reload"
*
* @brief Reload configuration and initialization of all or only a specific
* analytics plugin.
*
* @detail Overwrites the method from the AnalyticsManager to ensure that
* analyticsctonroller is stopped before reloading plugins.
*
* Queries | key | possible values | explanation
* -------------------------------------------------------------------------
* Required | - | - | -
* Optional | plugin | all analyzer plugin | reload only the specified
* | | names | plugin
*/
void PUT_analytics_reload(endpointArgs);
SensorCache* _sensorCache;
AnalyticsController* _analyticsController;
};
#endif /* COLLECTAGENT_CARESTAPI_H_ */
......@@ -16,6 +16,7 @@ OBJS = ../common/src/logging.o \
../common/src/globalconfiguration.o \
../common/src/RESTHttpsServer.o \
analyticscontroller.o \
CARestAPI.o \
sensorcache.o \
collectagent.o \
configuration.o \
......@@ -35,12 +36,11 @@ LIBS = -L../lib \
-lboost_system \
-lboost_random \
-lboost_thread \
-lboost_filesystem \
-lboost_date_time \
-lboost_log_setup \
-lboost_log \
-lboost_regex \
-lcppnetlib-server-parsers \
-lcppnetlib-uri
-lboost_regex
TARGET = collectagent
......
......@@ -23,12 +23,6 @@ void AnalyticsController::stop() {
_initialized = false;
}
restResponse_t AnalyticsController::REST(const vector<string>& pathStrs, const vector<pair<string,string>>& queries, const string& method) {
if(_initialized)
throw runtime_error("Cannot forward REST command, AnalyticsController is not initialized!");
return _manager->REST(pathStrs, queries, method, _io);
}
bool AnalyticsController::initialize(Configuration& settings, const string& configPath) {
_settings = settings;
_configPath = configPath;
......
......@@ -144,16 +144,11 @@ public:
uint64_t getReadingCtr() { uint64_t ctr=_readingCtr; _readingCtr=0; return ctr; }
/**
* @brief Supply a REST command to the manager
*
* This method simply forwards the request to the internal AnalyticsManager.
*
* @param pathStrs resource path to be accessed
* @param queries vector of queries
* @param method Either GET or PUT
* @return Response as a <data, response> pair
*/
restResponse_t REST(const vector<string>& pathStrs, const vector<pair<string,string>>& queries, const string& method);
* @brief Return the io_service used by the analytics controller.
*
* @return Reference to this object's boost::asio::io_service.
*/
boost::asio::io_service& getIoService() { return _io; }
private:
......
......@@ -24,10 +24,6 @@
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//================================================================================
#include <boost/network/protocol/http/server.hpp>
#include <boost/network/utils/thread_pool.hpp>
#include <boost/network/uri.hpp>
#include <iostream>
#include <cstdlib>
#include <signal.h>
#include <unistd.h>
......@@ -42,6 +38,7 @@
#include <dcdb/sensor.h>
#include "version.h"
#include "CARestAPI.h"
#include "configuration.h"
#include "simplemqttserver.h"
#include "messaging.h"
......@@ -141,184 +138,6 @@ void abrtHandler(int sig)
abrt(EXIT_FAILURE, SIGNAL);
}
//TODO: trim common code with dcdbpusher
struct httpHandler_t;
typedef boost::network::http::server<httpHandler_t> httpServer_t;
struct httpHandler_t {
void operator()(httpServer_t::request const &request, httpServer_t::connection_ptr connection) {
httpServer_t::string_type ip = source(request);
static httpServer_t::response_header headers[] = { { "Connection", "close" }, { "Content-Type", "text/plain" } };
boost::network::uri::uri uri("https://" + request.destination);
httpServer_t::string_type method = request.method;
httpServer_t::string_type path = uri.path();
httpServer_t::string_type query = uri.query();
std::string response = "";
std::ostringstream data;
std::string auth_value = "";
std::vector<std::string> pathStrs;
std::vector<std::pair<std::string, std::string>> queries;
//first check if request is supported at all
if (method != "GET" && method != "PUT") {
LOGH(warning) << "Unsupported " << method << " request was made";
connection->set_status(httpServer_t::connection::not_supported);
goto error;
}
LOGH(info) << method << " request of " << request.destination << " was made";
//do some string processing
//split path into its hierarchical parts
if (path.size() >= 2) {
if (path[0] == '/')
path.erase(0,1);
if (path[path.size() -1] == '/')
path.erase(path.size() -1);
boost::split(pathStrs, path, boost::is_any_of("/"), boost::token_compress_off);
}
//split query part into the individual queries (key-value pairs)
{ //do not remove the enclosing brackets
//need to encapsulate this code block to keep queryStrs local
std::vector<std::string> queryStrs;
boost::split(queryStrs, query, boost::is_any_of(";"), boost::token_compress_on);
for(auto& key : queryStrs) {
size_t pos = key.find("=");
if (pos != std::string::npos) {
std::string value;
value = key.substr(pos+1);
key.erase(pos);
queries.push_back(std::make_pair(key, value));
}
}
}
//finished string processing
//bool json = false;
//for (auto& p : queries) {
// authkey is required in every case
//if (p.first == "authkey") {
// auth_value = p.second;
//} else
//if (p.first == "json")
// if (stoi(p.second) > 0)
// json = true;
//}
if (pathStrs.size() < 1) {
LOGH(warning) << "Received malformed request: No first path part";
connection->set_status(httpServer_t::connection::bad_request);
goto error;
}
//select code depending on request
if (method == "GET") {
if (pathStrs[0] == "help") {
response = "collectagent RESTful API cheatsheet:\n"
" -GET: /help This help message\n"
" /analytics/help\n"
" /[sensor]/avg?interval=[timeInSec]\n"
" Average of last sensor readings from the last\n"
" [interval] seconds or of all cached readings\n"
" if no interval is given\n"
"\n";
//"All resources have to be prepended by host:port and need at\n"
//"least the query ?authkey=[token] at the end. Multiple queries\n"
//"need to be separated by semicolons(';')\n";
response += analyticsController->getManager()->restCheatSheet;
} else if (pathStrs[0] == "analytics") {
try {
restResponse_t reply = analyticsController->REST(pathStrs, queries, method);
data << reply.data;
response = reply.response;
} catch(const std::invalid_argument &e) {
LOGH(warning) << e.what();
connection->set_status(httpServer_t::connection::bad_request);
goto error;
} catch(const std::domain_error &e) {
response = e.what();
connection->set_status(httpServer_t::connection::not_found);
} catch(const std::exception &e) {
LOGH(warning) << e.what();
connection->set_status(httpServer_t::connection::internal_server_error);
goto error;
}
} else {
if (pathStrs.size() < 2) {
LOGH(warning) << "Received malformed request: No second path part";
connection->set_status(httpServer_t::connection::bad_request);
goto error;
}
if (pathStrs[1] != "avg") {
LOGH(warning) << "Unknown action " << pathStrs[1] << " requested";
connection->set_status(httpServer_t::connection::not_supported);
goto error;
}
uint64_t time = 0;
for (auto &p : queries)
if (p.first == "interval")
time = std::stoul(p.second);
//try getting the latest value
try {
//TODO: switch from SID input to sensor name input
int64_t val = mySensorCache.getSensor(pathStrs[0], (uint64_t) time * 1000000000);
connection->set_status(httpServer_t::connection::ok);
response = "collectagent::" + pathStrs[0] + " Average of last " +
std::to_string(time) + " seconds is " + std::to_string(val);
//data << val << "\n";
//data << "Sid : " << sid.toString() << ", Value: " << val << "." << std::endl;
}
catch (const std::invalid_argument &e) {
connection->set_status(httpServer_t::connection::not_found);
response = "Error: Sensor id not found.\n";
} catch (const std::out_of_range &e) {
connection->set_status(httpServer_t::connection::no_content);
response = "Error: Sensor unavailable.\n";
} catch (const std::exception &e) {
connection->set_status(httpServer_t::connection::internal_server_error);
LOGH(warning) << "Internal server error.\n";
goto error;
}
}
} else if(method == "PUT") {
if( pathStrs.back() == "reload" )
analyticsController->halt(true);
try {
restResponse_t reply = analyticsController->REST(pathStrs, queries, method);
data << reply.data;
response = reply.response;
} catch(const std::invalid_argument &e) {
LOGH(warning) << e.what();
connection->set_status(httpServer_t::connection::bad_request);
goto error;
} catch(const std::domain_error &e) {
response = e.what();
connection->set_status(httpServer_t::connection::not_found);
} catch(const std::exception &e) {
response = e.what();
connection->set_status(httpServer_t::connection::internal_server_error);
}
// Continue MQTTPusher when a reload was performed
if( pathStrs.back() == "reload" )
analyticsController->resume();
}
LOGH(info) << "Responding: " << response;
data << response << std::endl;
//Error management section
error:
connection->set_headers(boost::make_iterator_range(headers, headers + 2));
connection->write(data.str());
}
};
int mqttCallback(SimpleMQTTMessage *msg)
{
/*
......@@ -695,14 +514,9 @@ int main(int argc, char* const argv[]) {
/*
* Start the HTTP Server for the REST API
*/
std::thread httpThread;
httpHandler_t httpHandler;
httpServer_t::options httpOptions(httpHandler);
httpOptions.reuse_address(true);
httpOptions.thread_pool(std::make_shared<boost::network::utils::thread_pool>());
httpServer_t httpServer(httpOptions.address(restAPISettings.host).port(restAPISettings.port));
httpThread = std::thread([&httpServer] { httpServer.run(); });
CARestAPI httpsServer(restAPISettings, &mySensorCache, analyticsController);
config->readRestAPIUsers(&httpsServer);
httpsServer.start();
LOG(info) << "HTTP Server running...";
......@@ -748,8 +562,7 @@ int main(int argc, char* const argv[]) {
analyticsController->stop();
ms.stop();
LOG(info) << "MQTT Server stopped...";
httpServer.stop();
httpThread.join();
httpsServer.stop();
LOG(info) << "HTTP Server stopped...";
delete mySensorDataStore;
delete mySensorConfig;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment