16.12.2021, 9:00 - 11:00: Due to updates GitLab may be unavailable for some minutes between 09:00 and 11:00.

Commit 058da43e authored by Micha Mueller's avatar Micha Mueller
Browse files

Move analytics related Rest API methods back to AnalyticsManager

CollectAgent should be switched to new Rest API design as soon as possible
Minor improvements to README, Makefile and PUT /analytics/reload methods
parent f8703fc3
......@@ -498,3 +498,400 @@ restResponse_t AnalyticsManager::REST(const vector<string>& pathStrs, const vect
}
return reply;
}
/******************************************************************************/
/* Rest API endpoint methods */
/******************************************************************************/
#define stdBind(fun) std::bind(&AnalyticsManager::fun, \
this, \
std::placeholders::_1, \
std::placeholders::_2)
void AnalyticsManager::addRestEndpoints(RESTHttpsServer* restServer) {
restServer->addEndpoint("/analytics/help", {http::verb::get, stdBind(GET_analytics_help)});
restServer->addEndpoint("/analytics/plugins", {http::verb::get, stdBind(GET_analytics_plugins)});
restServer->addEndpoint("/analytics/sensors", {http::verb::get, stdBind(GET_analytics_sensors)});
restServer->addEndpoint("/analytics/units", {http::verb::get, stdBind(GET_analytics_units)});
restServer->addEndpoint("/analytics/analyzers", {http::verb::get, stdBind(GET_analytics_analyzers)});
restServer->addEndpoint("/analytics/start", {http::verb::put, stdBind(PUT_analytics_start)});
restServer->addEndpoint("/analytics/stop", {http::verb::put, stdBind(PUT_analytics_stop)});
restServer->addEndpoint("/analytics/reload", {http::verb::put, stdBind(PUT_analytics_reload)});
restServer->addEndpoint("/analytics/compute", {http::verb::put, stdBind(PUT_analytics_compute)});
restServer->addEndpoint("/analytics/analyzer", {http::verb::put, stdBind(PUT_analytics_analyzer)});
}
void AnalyticsManager::GET_analytics_help(endpointArgs){
if (!managerLoaded(res)) {
return;
}
res.body() = newRestCheatSheet;
res.result(http::status::ok);
}
void AnalyticsManager::GET_analytics_plugins(endpointArgs) {
if (!managerLoaded(res)) {
return;
}
std::ostringstream data;
if (getQuery("json", queries) == "true") {
boost::property_tree::ptree root, plugins;
for(const auto& p : _plugins) {
plugins.put(p.id, "");
}
root.add_child("plugins", plugins);
boost::property_tree::write_json(data, root, true);
} else {
for(const auto& p : _plugins) {
data << p.id << "\n";
}
}
res.body() = data.str();
res.result(http::status::ok);
}
void AnalyticsManager::GET_analytics_sensors(endpointArgs) {
if (!managerLoaded(res)) {
return;
}
const std::string plugin = getQuery("plugin", queries);
const std::string analyzer = getQuery("analyzer", queries);
if (!hasPlugin(plugin, res)) {
return;
}
bool found = false;
std::ostringstream data;
for (const auto& p : _plugins) {
if (p.id == plugin) {
if (getQuery("json", queries) == "true") {
boost::property_tree::ptree root, sensors;
// In JSON mode, sensors are arranged hierarchically by plugin->analyzer->sensor
for (const auto& a : p.configurator->getAnalyzers()) {
if (a->getStreaming() && (analyzer == "" || analyzer == a->getName())) {
found = true;
boost::property_tree::ptree group;
for (const auto& u : a->getUnits()) {
for (const auto& s : u->getBaseOutputs()) {
// Explicitly adding nodes to the ptree as to prevent BOOST from performing
// parsing on the node names
group.push_back(boost::property_tree::ptree::value_type(s->getName(), boost::property_tree::ptree(s->getMqtt())));
}
}
sensors.add_child(a->getName(), group);
}
}
root.add_child(p.id, sensors);
boost::property_tree::write_json(data, root, true);
} else {
for (const auto& a : p.configurator->getAnalyzers()) {
if (a->getStreaming() && (analyzer == "" || analyzer == a->getName())) {
found = true;
for (const auto& u : a->getUnits()) {
for (const auto& s : u->getBaseOutputs()) {
data << a->getName() << "." << s->getName() << " " << s->getMqtt() << "\n";
}
}
}
}
}
res.body() = data.str();
res.result(http::status::ok);
break;
}
}
if (!found) {
res.body() = "Plugin or analyzer not found!\n";
res.result(http::status::not_found);
}
}
void AnalyticsManager::GET_analytics_units(endpointArgs) {
if (!managerLoaded(res)) {
return;
}
const std::string plugin = getQuery("plugin", queries);
const std::string analyzer = getQuery("analyzer", queries);
if (!hasPlugin(plugin, res)) {
return;
}
bool found = false;
std::ostringstream data;
for (const auto& p : _plugins) {
if (p.id == plugin) {
if (getQuery("json", queries) == "true") {
boost::property_tree::ptree root, units;
// In JSON mode, sensors are arranged hierarchically by plugin->analyzer->sensor
for (const auto& a : p.configurator->getAnalyzers())
if (a->getStreaming() && (analyzer == "" || analyzer == a->getName())) {
found = true;
boost::property_tree::ptree group;
for (const auto& u : a->getUnits()) {
group.push_back(boost::property_tree::ptree::value_type(u->getName(), boost::property_tree::ptree()));
}
units.add_child(a->getName(), group);
}
root.add_child(p.id, units);
boost::property_tree::write_json(data, root, true);
} else {
for (const auto& a : p.configurator->getAnalyzers()) {
if (a->getStreaming() && (analyzer == "" || analyzer == a->getName())) {
found = true;
for (const auto& u : a->getUnits()) {
data << a->getName() << "." << u->getName() << "\n";
}
}
}
}
res.body() = data.str();
res.result(http::status::ok);
break;
}
}
if (!found) {
res.body() = "Plugin or analyzer not found!\n";
res.result(http::status::not_found);
}
}
void AnalyticsManager::GET_analytics_analyzers(endpointArgs) {
if (!managerLoaded(res)) {
return;
}
const std::string plugin = getQuery("plugin", queries);
if (!hasPlugin(plugin, res)) {
return;
}
std::ostringstream data;
for (const auto& p : _plugins) {
if (p.id == plugin) {
if (getQuery("json", queries) == "true") {
boost::property_tree::ptree root, analyzers;
// For each analyzer, we output its type as well
for (const auto& a : p.configurator->getAnalyzers()) {
analyzers.push_back(boost::property_tree::ptree::value_type(a->getName(), boost::property_tree::ptree(a->getStreaming() ? "streaming" : "on-demand")));
}
root.add_child(p.id, analyzers);
boost::property_tree::write_json(data, root, true);
} else {
for (const auto& a : p.configurator->getAnalyzers()) {
data << a->getName() << " " << (a->getStreaming() ? "streaming\n" : "on-demand\n");
}
}
res.body() = data.str();
res.result(http::status::ok);
return;
}
}
}
void AnalyticsManager::PUT_analytics_start(endpointArgs) {
if (!managerLoaded(res)) {
return;
}
const std::string plugin = getQuery("plugin", queries);
const std::string analyzer = getQuery("analyzer", queries);
if (start(plugin, analyzer)) {
res.body() = "Plugin " + plugin + " " + analyzer + ": Sensors started!\n";
res.result(http::status::ok);
} else {
res.body() = "Plugin or analyzer not found!\n";
res.result(http::status::not_found);
}
}
void AnalyticsManager::PUT_analytics_stop(endpointArgs) {
if (!managerLoaded(res)) {
return;
}
const std::string plugin = getQuery("plugin", queries);
const std::string analyzer = getQuery("analyzer", queries);
if (stop(plugin, analyzer)) {
res.body() = "Plugin " + plugin + " " + analyzer + ": Sensors stopped!\n";
res.result(http::status::ok);
} else {
res.body() = "Plugin or analyzer not found!\n";
res.result(http::status::not_found);
}
}
void AnalyticsManager::PUT_analytics_reload(endpointArgs) {
if (!managerLoaded(res)) {
return;
}
/*
* This endpoint must either be overwritten (by adding a custom
* "analyzer/reload" endpoint) or must not be used. A reload requires
* an external io_service object and can therefore not be conducted by the
* AnalyticsManager itself.
*/
res.body() = "Sorry! It seems like this endpoint was not properly implemented.";
res.result(http::status::not_implemented);
/*
const std::string plugin = getQuery("plugin", queries);
if (!reload(_io, plugin)) {
res.body() = "Plugin not found or reload failed, please check the config files and MQTT topics!\n";
res.result(http::status::not_found);
} else if (!start(plugin)){
res.body() = "Plugin cannot be restarted!\n";
res.result(http::status::internal_server_error);
} else {
res.body() = "Plugin " + plugin + ": Sensors reloaded";
res.result(http::status::ok);
}
*/
}
void AnalyticsManager::PUT_analytics_compute(endpointArgs) {
if (!managerLoaded(res)) {
return;
}
const std::string plugin = getQuery("plugin", queries);
const std::string analyzer = getQuery("analyzer", queries);
std::string unit = getQuery("unit", queries);
if (plugin == "" || analyzer == "") {
const std::string err = "Request malformed: plugin or analyzer query missing";
RESTAPILOG(error) << err;
res.body() = err;
res.result(http::status::bad_request);
return;
}
if (unit == "") {
unit = SensorNavigator::rootKey;
}
res.body() = "Plugin or analyzer not found!\n";
res.result(http::status::not_found);
std::ostringstream data;
bool unitFound=false;
for (const auto &p : _plugins) {
if (p.id == plugin) {
for (const auto &a : p.configurator->getAnalyzers()) {
if( a->getName() == analyzer ) {
std::map<std::string, reading_t> outMap;
try {
outMap = a->computeOnDemand(unit);
unitFound = true;
} catch(const domain_error& e) {
// In the particular case where an analyzer is duplicated, it could be that the right
// unit is found only after a few tries. Therefore, we handle the domain_error
// exception raised in AnalyzerTemplate, and allow the search to continue
if(a->getStreaming() && a->getDuplicate()) {
continue;
} else {
res.body() = e.what();
res.result(http::status::not_found);
return;
}
}
if (getQuery("json", queries) == "true") {
boost::property_tree::ptree root, outputs;
// Iterating through the outputs of the on-demand computation and adding them to a JSON
for (const auto& kv : outMap) {
boost::property_tree::ptree sensor;
sensor.push_back(boost::property_tree::ptree::value_type("timestamp", boost::property_tree::ptree(to_string(kv.second.timestamp))));
sensor.push_back(boost::property_tree::ptree::value_type("value", boost::property_tree::ptree(to_string(kv.second.value))));
outputs.push_back(boost::property_tree::ptree::value_type(kv.first, sensor));
}
root.add_child(a->getName(), outputs);
boost::property_tree::write_json(data, root, true);
} else {
for (const auto& kv : outMap) {
data << kv.first << " ts: " << kv.second.timestamp << " v: " << kv.second.value << "\n";
}
}
res.body() = data.str();
res.result(http::status::ok);
return;
}
}
}
}
// This if branch is accessed only if the target analyzer is streaming and duplicated
if(!unitFound) {
res.body() = "Node " + unit + " does not belong to the domain of " + analyzer + "!";
res.result(http::status::not_found);
}
}
void AnalyticsManager::PUT_analytics_analyzer(endpointArgs) {
if (!managerLoaded(res)) {
return;
}
const std::string plugin = getQuery("plugin", queries);
const std::string analyzer = getQuery("analyzer", queries);
const std::string action = getQuery("action", queries);
if (plugin == "" || action == "") {
const std::string err = "Request malformed: plugin or action query missing";
RESTAPILOG(error) << err;
res.body() = err;
res.result(http::status::bad_request);
return;
}
res.body() = "Plugin or analyzer not found!\n";
res.result(http::status::not_found);
// Managing custom REST PUT actions defined at the analyzer level
for (const auto &p : _plugins) {
if (p.id == plugin) {
for (const auto &a : p.configurator->getAnalyzers()) {
if (analyzer == "" || analyzer == a->getName()) {
// Any thrown exception is catched outside in the HTTPserver
try {
restResponse_t reply = a->REST(action, queries);
res.body() = reply.data;
res.body() += reply.response;
res.result(http::status::ok);
} catch(const std::invalid_argument &e) {
const std::string err = e.what();
RESTAPILOG(warning) << err;
res.body() = err;
res.result(http::status::bad_request);
} catch(const std::domain_error &e) {
const std::string err = e.what();
RESTAPILOG(warning) << err;
res.body() = err;
res.result(http::status::not_found);
} catch(const std::exception &e) {
const std::string err = e.what();
RESTAPILOG(warning) << err;
res.body() = err;
res.result(http::status::internal_server_error);
}
}
}
}
}
}
......@@ -14,6 +14,7 @@
#include <boost/asio.hpp>
#include <dlfcn.h>
#include "logging.h"
#include "RESTHttpsServer.h"
#include "mqttchecker.h"
#include "includes/UnitInterface.h"
#include "includes/AnalyzerConfiguratorInterface.h"
......@@ -188,12 +189,54 @@ public:
std::vector<an_dl_t>& getPlugins() { return _plugins; }
/**
* @brief Get current status of analytics manager
* @brief Get the current status of the AnalyticsManager.
*
* @return Status of this analytics manager
* @return Status of the AnalyticsManager.
*/
managerState_t getStatus() { return _status; }
/**
* @brief Adds analytics RestAPI endpoints to an RestAPI server.
*
* @param restServer The RestAPI server which should offer analytics
* endpoints.
*/
void addRestEndpoints(RESTHttpsServer* restServer);
// String used as a response for the REST GET /help command
const string newRestCheatSheet = "dcdbpusher analytics RESTful API cheatsheet:\n"
"(All commands must be prepended by \"/analytics\" !)\n"
" -GET: /plugins?[json] D List off currently loaded plugins.\n"
" /sensors?plugin;[analyzer];[json]\n"
" D List of currently running sensors which belong to\n"
" the specified plugin (and analyzer).\n"
" /analyzers?plugin;[json]\n"
" D List of running analyzers in the specified data\n"
" analytics plugin.\n"
" /units?plugin;[analyzer];[json]\n"
" D List of units to which sensors are associated in\n"
" the specified data analytics plugin (and analyzer).\n"
" -PUT: /start?[plugin];[analyzer]\n"
" Start all or only a specific analytics plugin or\n"
" start only a specific analyzer within a plugin.\n"
" /stop?[plugin];[analyzer]\n"
" Stop all or only a specific analytics plugin or\n"
" stop only a specific analyzer within a plugin.\n"
" /reload?[plugin] Reload all or only a specific analytics plugin.\n"
" /compute?plugin;analyzer;[unit];[json]\n"
" Query the specified analyzer for a unit. Default\n"
" unit is the root.\n"
" /analyzer?plugin;action;[analyzer]\n"
" Do a custom analyzer action for all or only an\n"
" selected analyzer within a plugin (refer to plugin\n"
" documentation).\n"
"\n"
"D = Discovery method\n"
"All resources have to be prepended by host:port.\n"
"A query can be appended as ?query=[value] at the end. Multiple queries\n"
"need to be separated by semicolons(';'). \"query=value\" syntax was shortened\n"
"to \"query\" for readability. Optional queries are marked with [ ]\n";
protected:
// Utility method to drop all topics associated to a certain plugin
......@@ -213,6 +256,184 @@ protected:
//Logger object
boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
private:
// all stuff related to REST API
// Utility method to check the status of the analytics manager. If not
// loaded: prepares the response accordingly so no further actions are
// required.
// Return true if loaded, false otherwise.
inline bool managerLoaded(http::response<http::string_body>& res) {
if (_status != LOADED) {
const std::string err = "AnalyticsManager is not loaded!\n";
RESTAPILOG(error) << err;
res.body() = err;
res.result(http::status::internal_server_error);
return false;
}
return true;
}
// methods for REST API endpoints
/**
* GET "/analytics/help"
*
* @brief Return a cheatsheet of available REST API endpoints specific for
* the analytics manager.
*
* Queries | key | possible values | explanation
* -------------------------------------------------------------------------
* Required | - | - | -
* Optional | - | - | -
*/
void GET_analytics_help(endpointArgs);
/**
* GET "/analytics/plugins"
*
* @brief (Discovery) List all currently loaded data analytic plugins.
*
* Queries | key | possible values | explanation
* -------------------------------------------------------------------------
* Required | - | - | -
* Optional | json | true | format response as json
*/
void GET_analytics_plugins(endpointArgs);
/**
* GET "/analytics/sensors"
*
* @brief (Discovery) List all sensors of a plugin.
*
* Queries | key | possible values | explanation
* -------------------------------------------------------------------------
* Required | plugin | all analyzer plugin | specify the plugin
* | | names |
* Optional | analyzer| all analyzers of a | restrict sensors list to an
* | | plugin | analyzer
* | json | true | format response as json
*/
void GET_analytics_sensors(endpointArgs);
/**
* GET "/analytics/units"
*
* @brief (Discovery) List all units of a plugin.
*
* Queries | key | possible values | explanation
* -------------------------------------------------------------------------
* Required | plugin | all analyzer plugin | specify the plugin
* | | names |
* Optional | analyzer| all analyzers of a | restrict unit list to an
* | | plugin | analyzer
* | json | true | format response as json
*/
void GET_analytics_units(endpointArgs);
/**
* GET "/analytics/analyzers"
*
* @brief (Discovery) List all active analyzers of a plugin.
*
* Queries | key | possible values | explanation
* -------------------------------------------------------------------------
* Required | plugin | all analyzer plugin | specify the plugin
* | | names |
* Optional | json | true | format response as json
*/
void GET_analytics_analyzers(endpointArgs);
/**
* PUT "/analytics/start"
*
* @brief Start all or only a specific plugin. Or only start a specific
* streaming analyzer within a specific plugin.
*
* Queries | key | possible values | explanation
* -------------------------------------------------------------------------
* Required | - | - | -
* Optional | plugin | all analyzer plugin | only start the specified
* | | names | plugin
* | analyzer| all analyzers of a | only start the specified
* | | plugin | analyzer. Requires a plugin
* | | | to be specified. Limited to
* | | | streaming analyzers.
*/
void PUT_analytics_start(endpointArgs);
/**
* PUT "/analytics/stop"
*
* @brief Stop all or only a specific plugin. Or only stop a specific
* streaming analyzer within a plugin.
*
* Queries | key | possible values | explanation
* -------------------------------------------------------------------------
* Required | - | - | -
* Optional | plugin | all analyzer plugin | only stop the specified
* | | names | plugin
* | analyzer| all analyzers of a | only stop the specified
* | | plugin | analyzer. Requires a plugin
* | | | to be specified. Limited to
* | | | streaming analyzers.
*/
void PUT_analytics_stop(endpointArgs);
/**
* This endpoint must either be overwritten (by adding a custom
* "analyzer/reload" endpoint) or must not be used. A reload requires
* an external io_service object and can therefore not be conducted by the
* AnalyticsManager itself.
*
* PUT "/analytics/reload"
*
* @brief Reload configuration and initialization of all or only a specific
* analytics plugin.
* </