Commit 6b086567 authored by Micha Mueller's avatar Micha Mueller
Browse files

Refactor analytics for new RestAPI implementation

parent 2a026683
......@@ -261,240 +261,3 @@ bool AnalyticsManager::stop(const string& plugin, const string& analyzer) {
}
return out;
}
restResponse_t AnalyticsManager::REST(const vector<string>& pathStrs, const vector<pair<string,string>>& queries, const string& method, boost::asio::io_service& io) {
// Some preliminary checks
if(_status != LOADED)
throw runtime_error("Cannot forward REST command, AnalyticsManager is not loaded!");
if (method != "GET" && method != "PUT")
throw invalid_argument("Unsupported REST method!");
if(pathStrs.size() < 2 || pathStrs[0] != "analytics")
throw invalid_argument("Received malformed request!");
// Determining if JSON output was requested
bool json = false;
for (auto& p : queries)
if (p.first == "json")
json = stoi(p.second) > 0;
restResponse_t reply;
std::ostringstream data;
// GET block of commands
if (method == "GET") {
// Help cheatsheet command
if (pathStrs[1] == "help") {
reply.response = restCheatSheet;
// Command to list data analytics plugins
} else if (pathStrs[1] == "plugins") {
if (json) {
boost::property_tree::ptree root, plugins;
for(auto& p : _plugins)
plugins.put(p.id, "");
root.add_child("plugins", plugins);
boost::property_tree::write_json(data, root, true);
} else
for(auto& p : _plugins)
data << p.id << "\n";
reply.data = data.str();
// Managing commands that have a path length greater than 2
} else {
if (pathStrs.size() < 3)
throw invalid_argument("Received malformed request, no second path part!");
string analyzer = pathStrs.size() > 3 ? pathStrs[2] : "";
string action = pathStrs[pathStrs.size() - 1];
string plugin = pathStrs[1];
// Listing all sensors in one or all analyzers of a plugin; the [analyzer] block is optional
if (action == "sensors") {
bool found = false;
for (auto &p : _plugins) {
if (p.id == plugin) {
if (json) {
boost::property_tree::ptree root, sensors;
// In JSON mode, sensors are arranged hierarchically by plugin->analyzer->sensor
for (auto &a : p.configurator->getAnalyzers())
if (a->getStreaming() && (analyzer == "" || analyzer == a->getName())) {
found = true;
boost::property_tree::ptree group;
for (auto &u : a->getUnits())
for (auto &s : u->getBaseOutputs())
// Explicitly adding nodes to the ptree as to prevent BOOST from performing
// parsing on the node names
group.push_back(boost::property_tree::ptree::value_type(s->getName(), boost::property_tree::ptree(s->getMqtt())));
sensors.add_child(a->getName(), group);
}
root.add_child(p.id, sensors);
boost::property_tree::write_json(data, root, true);
} else {
for (auto &a : p.configurator->getAnalyzers())
if (a->getStreaming() && (analyzer == "" || analyzer == a->getName())) {
found = true;
for (auto &u : a->getUnits())
for (auto &s : u->getBaseOutputs())
data << a->getName() << "." << s->getName() << " " << s->getMqtt() << "\n";
}
}
reply.data = data.str();
break;
}
}
if (!found)
throw domain_error("Plugin or analyzer not found!");
} else if (action == "units") {
bool found = false;
for (auto &p : _plugins) {
if (p.id == plugin) {
if (json) {
boost::property_tree::ptree root, units;
// In JSON mode, units are arranged hierarchically by plugin->analyzer->unit
for (auto &a : p.configurator->getAnalyzers())
if (a->getStreaming() && (analyzer == "" || analyzer == a->getName())) {
found = true;
boost::property_tree::ptree group;
for (auto &u : a->getUnits())
group.push_back(boost::property_tree::ptree::value_type(u->getName(), boost::property_tree::ptree()));
units.add_child(a->getName(), group);
}
root.add_child(p.id, units);
boost::property_tree::write_json(data, root, true);
} else {
for (auto &a : p.configurator->getAnalyzers())
if (a->getStreaming() && (analyzer == "" || analyzer == a->getName())) {
found = true;
for (auto &u : a->getUnits())
data << a->getName() << "." << u->getName() << "\n";
}
}
reply.data = data.str();
break;
}
}
if (!found)
throw domain_error("Plugin or analyzer not found!");
} else if (action == "analyzers") {
if(analyzer != "")
throw invalid_argument("Analyzers GET command does not support analyzer names!");
bool found = false;
for (auto &p : _plugins) {
if (p.id == plugin) {
if (json) {
boost::property_tree::ptree root, analyzers;
// For each analyzer, we output its type as well
for (auto &a : p.configurator->getAnalyzers())
analyzers.push_back(boost::property_tree::ptree::value_type(a->getName(), boost::property_tree::ptree(a->getStreaming() ? "streaming" : "on-demand")));
root.add_child(p.id, analyzers);
boost::property_tree::write_json(data, root, true);
} else {
for (auto &a : p.configurator->getAnalyzers())
data << a->getName() << " " << (a->getStreaming() ? "streaming\n" : "on-demand\n");
}
found = true;
reply.data = data.str();
break;
}
}
if (!found)
throw domain_error("Plugin not found!");
} else
throw invalid_argument("Unknown action " + action + " requested");
}
// PUT block of commands
} else if (method == "PUT") {
if (pathStrs.size() < 3)
throw invalid_argument("Received malformed request, no second path part!");
string analyzer = pathStrs.size() > 3 ? pathStrs[2] : "";
string action = pathStrs[pathStrs.size() - 1];
string plugin = pathStrs[1];
// Managing generic plugin PUT actions
if (action == "start") {
if( start(plugin, analyzer) ) {
reply.response = "Plugin " + plugin + " " + analyzer + ": Sensors started";
} else
throw domain_error("Plugin or analyzer not found!");
} else if (action == "stop") {
if( stop(plugin, analyzer) ) {
reply.response = "Plugin " + plugin + " " + analyzer + ": Sensors stopped";
} else
throw domain_error("Plugin or analyzer not found!");
} else if (action == "reload") {
if(!reload(io, plugin))
throw domain_error("Plugin not found or reload failed, please check the config files and MQTT topics!");
else if(!start(plugin))
throw runtime_error("Plugin cannot be restarted!");
reply.response = "Plugin " + plugin + ": Sensors reloaded";
} else if (action == "compute") {
if(pathStrs.size() < 4)
throw invalid_argument("Received malformed request, no third path part!");
string unit = SensorNavigator::rootKey;
for (auto& p : queries)
if (p.first == "unit")
unit = p.second;
bool found=false, unitFound=false;
for (auto &p : _plugins)
if (p.id == plugin)
for (auto &a : p.configurator->getAnalyzers())
if( a->getName() == analyzer ) {
found = true;
map <string, reading_t> outMap;
try {
outMap = a->computeOnDemand(unit);
unitFound = true;
} catch(const domain_error& e) {
// In the particular case where an analyzer is duplicated, it could be that the right
// unit is found only after a few tries. Therefore, we handle the domain_error
// exception raised in AnalyzerTemplate, and allow the search to continue
if(a->getStreaming() && a->getDuplicate())
continue;
else
throw;
}
if (json) {
boost::property_tree::ptree root, outputs;
// Iterating through the outputs of the on-demand computation and adding them to a JSON
for (const auto& kv : outMap) {
boost::property_tree::ptree sensor;
sensor.push_back(boost::property_tree::ptree::value_type("timestamp", boost::property_tree::ptree(to_string(kv.second.timestamp))));
sensor.push_back(boost::property_tree::ptree::value_type("value", boost::property_tree::ptree(to_string(kv.second.value))));
outputs.push_back(boost::property_tree::ptree::value_type(kv.first, sensor));
}
root.add_child(a->getName(), outputs);
boost::property_tree::write_json(data, root, true);
} else {
for (const auto& kv : outMap)
data << kv.first << " ts: " << kv.second.timestamp << " v: " << kv.second.value << "\n";
}
reply.data = data.str();
break;
}
if(!found)
throw domain_error("Plugin or analyzer not found!");
// This if branch is accessed only if the target analyzer is streaming and duplicated
else if(!unitFound)
throw domain_error("Node " + unit + " does not belong to the domain of " + analyzer + "!");
} else {
// Managing custom REST PUT actions defined at the analyzer level
bool found = false;
for (auto &p : _plugins)
if (p.id == plugin)
for (auto &a : p.configurator->getAnalyzers())
if (analyzer == "" || analyzer == a->getName()) {
found = true;
// Any thrown exception is catched outside in the HTTPserver
reply = a->REST(action, queries);
}
if (!found)
throw domain_error("Plugin or analyzer not found!");
}
}
return reply;
}
......@@ -135,21 +135,6 @@ public:
*/
bool stop(const string& plugin="", const string& analyzer="");
/**
* @brief Supply a REST command to the manager
*
* Commands must be plugin-specific. Those will be forwarded to said plugins, if of GET type, and
* the result will be collected. If PUT type, one or more actions will be performed on the plugin
* (e.g. start or stop). Some generic commands are available as well (see cheatsheet).
*
* @param pathStrs resource path to be accessed
* @param queries vector of queries
* @param method Either GET or PUT
* @param io BOOST IO Service, required to reload plugins
* @return Response as a <data, response> pair
*/
restResponse_t REST(const vector<string>& pathStrs, const vector<pair<string,string>>& queries, const string& method, boost::asio::io_service& io);
/**
* @brief Get the vector of currently loaded plugins
*
......@@ -167,33 +152,6 @@ public:
*/
managerState_t getStatus() { return _status; }
// String used as a response for the REST GET /help command
const string restCheatSheet = "dcdbpusher analytics RESTful API cheatsheet:\n"
" -GET: /analytics/plugins List of currently loaded plugins (Discovery)\n"
" /analytics/[plugin]/analyzers\n"
" List of running analyzers in the specified\n"
" data analytics plugin\n"
" /analytics/[plugin]/[analyzer]/sensors\n"
" List of currently running sensors which belong\n"
" to the specified data analytics plugin (Discovery)\n"
" /analytics/[plugin]/[analyzer]/units\n"
" List of units to which sensors are associated in the\n"
" specified data analytics plugin (Discovery)\n"
" -PUT: /analytics/[plugin]/[analyzer]/[start|stop|reload]\n"
" Start/stop the analyzers of the plugin or\n"
" reload the plugin configuration\n"
" /analytics/[plugin]/[analyzer]/compute\n"
" Perform computation of the given analyzer\n"
" in real-time. A \"unit\" query, specifying\n"
" the target unit must be included\n"
" /analytics/[plugin]/[analyzer]/[action]\n"
" Perform plugin-specific actions\n"
" (refer to documentation)\n"
"\n"
"All resources have to be prepended by host:port.\n"
"A query can be appended as ?query=[value] at the end. Multiple queries\n"
"need to be separated by semicolons(';')\n";
protected:
// Utility method to drop all topics associated to a certain plugin
......
......@@ -7,6 +7,7 @@
#include <atomic>
#include <memory>
#include <unordered_map>
#include <vector>
#include <map>
#include <boost/asio.hpp>
......@@ -144,7 +145,7 @@ public:
*
* @return Response to the request as a <response, data> pair
*/
virtual restResponse_t REST(const string& action, const vector<pair<string,string>>& queries) = 0;
virtual restResponse_t REST(const string& action, const unordered_map<string, string>& queries) = 0;
/**
* @brief Starts this analyzer
......
......@@ -221,7 +221,7 @@ public:
*
* @return Response to the request as a <response, data> pair
*/
virtual restResponse_t REST(const string& action, const vector<pair<string,string>>& queries) override {
virtual restResponse_t REST(const string& action, const unordered_map<string, string>& queries) override {
throw invalid_argument("Unknown plugin action " + action + " requested!");
}
......
......@@ -47,7 +47,7 @@ void RestAPI::GET_analytics_help(endpointArgs){
if (!managerLoaded(res)) {
return;
}
res.body() = _manager->restCheatSheet;
res.body() = anRestCheatSheet;
res.result(http::status::ok);
}
......@@ -222,7 +222,7 @@ void RestAPI::GET_analytics_analyzers(endpointArgs) {
}
void RestAPI::GET_help(endpointArgs) {
res.body() = restCheatSheet + _manager->restCheatSheet;
res.body() = restCheatSheet + anRestCheatSheet;
res.result(http::status::ok);
}
......
......@@ -52,6 +52,33 @@ public:
" reload the plugin configuration\n"
"\n";
// String used as a response for the REST GET /help command
const string anRestCheatSheet = "dcdbpusher analytics RESTful API cheatsheet:\n"
" -GET: /analytics/plugins List of currently loaded plugins (Discovery)\n"
" /analytics/[plugin]/analyzers\n"
" List of running analyzers in the specified\n"
" data analytics plugin\n"
" /analytics/[plugin]/[analyzer]/sensors\n"
" List of currently running sensors which belong\n"
" to the specified data analytics plugin (Discovery)\n"
" /analytics/[plugin]/[analyzer]/units\n"
" List of units to which sensors are associated in the\n"
" specified data analytics plugin (Discovery)\n"
" -PUT: /analytics/[plugin]/[analyzer]/[start|stop|reload]\n"
" Start/stop the analyzers of the plugin or\n"
" reload the plugin configuration\n"
" /analytics/[plugin]/[analyzer]/compute\n"
" Perform computation of the given analyzer\n"
" in real-time. A \"unit\" query, specifying\n"
" the target unit must be included\n"
" /analytics/[plugin]/[analyzer]/[action]\n"
" Perform plugin-specific actions\n"
" (refer to documentation)\n"
"\n"
"All resources have to be prepended by host:port.\n"
"A query can be appended as ?query=[value] at the end. Multiple queries\n"
"need to be separated by semicolons(';')\n";
private:
/**
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment