Commit 0466b434 authored by Alessio Netti's avatar Alessio Netti
Browse files

DA: REST API in collectagent

- The REST API of the data analytics framework is now functional in
the collectagent as well
- This implementation is temporary and should be replaced with a generic
REST API server whenever possible
parent ae32094b
......@@ -21,6 +21,11 @@ void AnalyticsController::stop() {
_threads.join_all();
}
//TODO: error checking on _io
restResponse_t AnalyticsController::REST(const vector<string>& pathStrs, const vector<pair<string,string>>& queries, const string& method) {
return _manager->REST(pathStrs, queries, method, _io);
}
bool AnalyticsController::initialize(globalCA_t& settings, const string& configPath) {
_settings = settings;
_configPath = configPath;
......
......@@ -95,7 +95,22 @@ public:
*
* @return True if the controller is currently stopped, false otherwise
*/
bool isStopped() { return _halted; }
bool isHalted() { return _halted; }
/**
* @brief Triggers a temporary halt of the internal management thread
*
* @param wait If set to true, the method returns only when the thread has stopped
*/
void halt(bool wait=false) {
_doHalt = true;
if(wait) while (!_halted) { sleep(1); }
}
/**
* @brief Resumes the internal management thread
*/
void resume() { _doHalt = false; }
/**
* @brief Returns the internal AnalyticsManager object
......@@ -128,6 +143,18 @@ public:
*/
uint64_t getReadingCtr() { uint64_t ctr=_readingCtr; _readingCtr=0; return ctr; }
/**
* @brief Supply a REST command to the manager
*
* This method simply forwards the request to the internal AnalyticsManager.
*
* @param pathStrs resource path to be accessed
* @param queries vector of queries
* @param method Either GET or PUT
* @return Response as a <data, response> pair
*/
restResponse_t REST(const vector<string>& pathStrs, const vector<pair<string,string>>& queries, const string& method);
private:
// Method implementing the main loop of the internal thread
......
......@@ -142,46 +142,179 @@ void abrtHandler(int sig)
abrt(EXIT_FAILURE, SIGNAL);
}
//TODO: trim common code with dcdbpusher
struct httpHandler_t;
typedef boost::network::http::server<httpHandler_t> httpServer_t;
struct httpHandler_t {
void operator()(httpServer_t::request const &request, httpServer_t::connection_ptr connection) {
httpServer_t::string_type ip = source(request);
std::ostringstream data;
static httpServer_t::response_header headers[] = { { "Connection", "close" }, { "Content-Type", "text/plain" } };
//try getting the latest value
try {
boost::network::uri::uri uri("http://localhost"+request.destination);
std::map<std::string, std::string> queries;
boost::network::uri::query_map(uri, queries);
int avg = atoi(queries.find("avg")->second.c_str());
int64_t val = mySensorCache.getSensor(uri.path(), (uint64_t)avg * 1000000000);
data << val << "\n";
//data << "Sid : " << sid.toString() << ", Value: " << val << "." << std::endl;
connection->set_status(httpServer_t::connection::ok);
connection->set_headers(boost::make_iterator_range(headers, headers + 2));
connection->write(data.str());
boost::network::uri::uri uri("https://" + request.destination);
httpServer_t::string_type method = request.method;
httpServer_t::string_type path = uri.path();
httpServer_t::string_type query = uri.query();
std::string response = "";
std::ostringstream data;
std::string auth_value = "";
bool json = false;
std::vector<std::string> pathStrs;
std::vector<std::pair<std::string, std::string>> queries;
//first check if request is supported at all
if (method != "GET" && method != "PUT") {
LOGH(warning) << "Unsupported " << method << " request was made";
connection->set_status(httpServer_t::connection::not_supported);
goto error;
}
catch (const std::invalid_argument& e) {
connection->set_status(httpServer_t::connection::not_found);
connection->set_headers(boost::make_iterator_range(headers, headers + 2));
connection->write("Error: Sensor id not found.\n");
} catch (const std::out_of_range &e) {
connection->set_status(httpServer_t::connection::no_content);
connection->set_headers(boost::make_iterator_range(headers, headers + 2));
connection->write("Error: Sensor unavailable.\n");
} catch (const std::exception& e) {
connection->set_status(httpServer_t::connection::internal_server_error);
connection->set_headers(boost::make_iterator_range(headers, headers + 2));
connection->write("Server error.\n");
LOGH(info) << method << " request of " << request.destination << " was made";
//do some string processing
//split path into its hierarchical parts
if (path.size() >= 2) {
if (path[0] == '/')
path.erase(0,1);
if (path[path.size() -1] == '/')
path.erase(path.size() -1);
boost::split(pathStrs, path, boost::is_any_of("/"), boost::token_compress_off);
}
//split query part into the individual queries (key-value pairs)
{ //do not remove the enclosing brackets
//need to encapsulate this code block to keep queryStrs local
std::vector<std::string> queryStrs;
boost::split(queryStrs, query, boost::is_any_of(";"), boost::token_compress_on);
for(auto& key : queryStrs) {
size_t pos = key.find("=");
if (pos != std::string::npos) {
std::string value;
value = key.substr(pos+1);
key.erase(pos);
queries.push_back(std::make_pair(key, value));
}
}
}
//finished string processing
for (auto& p : queries) {
//authkey is required in every case
//if (p.first == "authkey") {
// auth_value = p.second;
//} else
if (p.first == "json")
if (stoi(p.second) > 0)
json = true;
}
if (pathStrs.size() < 1) {
LOGH(warning) << "Received malformed request: No first path part";
connection->set_status(httpServer_t::connection::bad_request);
goto error;
}
//select code depending on request
if (method == "GET") {
if (pathStrs[0] == "help") {
response = "collectagent RESTful API cheatsheet:\n"
" -GET: /help This help message\n"
" /analytics/help\n"
" /[sensor]/avg?interval=[timeInSec]\n"
" Average of last sensor readings from the last\n"
" [interval] seconds or of all cached readings\n"
" if no interval is given\n"
"\n";
//"All resources have to be prepended by host:port and need at\n"
//"least the query ?authkey=[token] at the end. Multiple queries\n"
//"need to be separated by semicolons(';')\n";
response += analyticsController->getManager()->restCheatSheet;
} else if (pathStrs[0] == "analytics") {
try {
restResponse_t reply = analyticsController->REST(pathStrs, queries, method);
data << reply.data;
response = reply.response;
} catch(const std::invalid_argument &e) {
LOGH(warning) << e.what();
connection->set_status(httpServer_t::connection::bad_request);
goto error;
} catch(const std::domain_error &e) {
response = e.what();
connection->set_status(httpServer_t::connection::not_found);
} catch(const std::exception &e) {
LOGH(warning) << e.what();
connection->set_status(httpServer_t::connection::internal_server_error);
goto error;
}
} else {
if (pathStrs.size() < 2) {
LOGH(warning) << "Received malformed request: No second path part";
connection->set_status(httpServer_t::connection::bad_request);
goto error;
}
if (pathStrs[1] != "avg") {
LOGH(warning) << "Unknown action " << pathStrs[1] << " requested";
connection->set_status(httpServer_t::connection::not_supported);
goto error;
}
uint64_t time = 0;
for (auto &p : queries)
if (p.first == "interval")
time = std::stoul(p.second);
//try getting the latest value
try {
//TODO: switch from SID input to sensor name input
int64_t val = mySensorCache.getSensor(pathStrs[0], (uint64_t) time * 1000000000);
connection->set_status(httpServer_t::connection::ok);
response = "collectagent::" + pathStrs[0] + " Average of last " +
std::to_string(time) + " seconds is " + std::to_string(val);
//data << val << "\n";
//data << "Sid : " << sid.toString() << ", Value: " << val << "." << std::endl;
}
catch (const std::invalid_argument &e) {
connection->set_status(httpServer_t::connection::not_found);
response = "Error: Sensor id not found.\n";
} catch (const std::out_of_range &e) {
connection->set_status(httpServer_t::connection::no_content);
response = "Error: Sensor unavailable.\n";
} catch (const std::exception &e) {
connection->set_status(httpServer_t::connection::internal_server_error);
LOGH(warning) << "Internal server error.\n";
goto error;
}
}
} else if(method == "PUT") {
if( pathStrs.back() == "reload" )
analyticsController->halt(true);
try {
restResponse_t reply = analyticsController->REST(pathStrs, queries, method);
data << reply.data;
response = reply.response;
} catch(const std::invalid_argument &e) {
LOGH(warning) << e.what();
connection->set_status(httpServer_t::connection::bad_request);
goto error;
} catch(const std::domain_error &e) {
response = e.what();
connection->set_status(httpServer_t::connection::not_found);
} catch(const std::exception &e) {
response = e.what();
connection->set_status(httpServer_t::connection::internal_server_error);
}
// Continue MQTTPusher when a reload was performed
if( pathStrs.back() == "reload" )
analyticsController->resume();
}
LOGH(info) << "Responding: " << response;
data << response << std::endl;
//Error management section
error:
connection->set_headers(boost::make_iterator_range(headers, headers + 2));
connection->write(data.str());
}
};
......
......@@ -32,6 +32,8 @@
//to use it, only a boost severity-logger named lg is required
#define LOG(sev) BOOST_LOG_SEV(lg, boost::log::trivial::sev)
#define LOGH(sev) LOG(sev) << "HttpsServer: "
//another shortcut which can take the severity level as variable
#define LOG_VAR(var) BOOST_LOG_SEV(lg, var)
......
......@@ -17,8 +17,6 @@
#include <boost/property_tree/json_parser.hpp>
#include <boost/algorithm/string/split.hpp>
#define LOGH(sev) LOG(sev) << "HttpsServer: "
HttpsServer::requestHandler::requestHandler(HttpsServer& httpsServer) : _httpsServer(httpsServer) {}
void HttpsServer::requestHandler::operator()(server::request const &request, server::connection_ptr connection) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment