Commit 7b4349f7 authored by Alessio Netti's avatar Alessio Netti
Browse files

Data Analytics REST API

- Mostly working, still under testing
- Users can define custom analyzer-specific REST PUT actions
- Support for on-demand data analytics computation via REST coming up
parent ad6d7139
......@@ -105,6 +105,8 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
if (pathStrs[0] == "help") {
response = "dcdbpusher RESTful API cheatsheet:\n"
" -GET: /help This help message\n"
" /analytics/help\n"
" An help message for data analytics commands\n"
" /plugins List of currently loaded plugins (Discovery)\n"
" /[plugin]/sensors\n"
" List of currently running sensors which belong\n"
......@@ -128,7 +130,23 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
goto error;
}
if (pathStrs[0] == "plugins") {
//Managing REST GET commands to the data analytics framework
if(pathStrs[0] == "analytics") {
try {
data << _httpsServer._manager->REST(pathStrs, queries, method, _httpsServer._io);
} catch(const std::invalid_argument &e) {
LOGH(warning) << e.what();
connection->set_status(server::connection::bad_request);
goto error;
} catch(const std::domain_error &e) {
response = e.what();
connection->set_status(server::connection::not_found);
} catch(const std::exception &e) {
LOGH(warning) << e.what();
connection->set_status(server::connection::internal_server_error);
goto error;
}
} else if (pathStrs[0] == "plugins") {
if (json) {
boost::property_tree::ptree root, plugins;
for(auto& p : _httpsServer._plugins) {
......@@ -205,6 +223,7 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
}
//process actual request
bool found = false;
response = "Plugin not found!";
connection->set_status(server::connection::not_found);
......@@ -214,6 +233,7 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
for(const auto& g : p.configurator->getSensorGroups()) {
for(const auto& s : g->getSensors()) {
if (s->getName() == sensor) {
found = true;
response = pathStrs[0] + "::" + sensor + _httpsServer.calcAvg(*s, time);
connection->set_status(server::connection::ok);
break;
......@@ -222,6 +242,23 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
}
}
}
if(!found) {
for(auto& p : _httpsServer._manager->getPlugins())
if (p.id == pathStrs[0]) {
response = "Sensor not found!";
for(const auto& a : p.configurator->getAnalyzers())
if(a->getStreaming())
for(const auto& u : a->getUnits())
for (const auto& s : u->getBaseOutputs())
if (s->getName() == sensor) {
found = true;
response = pathStrs[0] + "::" + sensor + _httpsServer.calcAvg(*s, time);
connection->set_status(server::connection::ok);
break;
}
}
}
}
}
}
......@@ -234,90 +271,115 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
goto error;
}
if (pathStrs.size() < 2) {
LOGH(warning) << "Received malformed request: No second path part";
connection->set_status(server::connection::bad_request);
goto error;
}
//Managing REST PUT commands to the data analytics framework
if(pathStrs[0] == "analytics") {
if( pathStrs.back() == "reload" ) {
_httpsServer._mqttPusher->halt();
// Wait until MQTTPusher is paused in order to reload plugins
while (!_httpsServer._mqttPusher->isHalted()) { sleep(1); }
}
try {
data << _httpsServer._manager->REST(pathStrs, queries, method, _httpsServer._io);
} catch(const std::invalid_argument &e) {
LOGH(warning) << e.what();
connection->set_status(server::connection::bad_request);
goto error;
} catch(const std::domain_error &e) {
response = e.what();
connection->set_status(server::connection::not_found);
} catch(const std::exception &e) {
response = e.what();
connection->set_status(server::connection::internal_server_error);
}
// Continue MQTTPusher when a reload was performed
if( pathStrs.back() == "reload" )
_httpsServer._mqttPusher->cont();
} else {
if (pathStrs.size() < 2) {
LOGH(warning) << "Received malformed request: No second path part";
connection->set_status(server::connection::bad_request);
goto error;
}
std::string action = pathStrs[1];
std::string action = pathStrs[1];
//process actual request
response = "Plugin not found!";
connection->set_status(server::connection::not_found);
//process actual request
response = "Plugin not found!";
connection->set_status(server::connection::not_found);
//switch code depending on selected action
if (action == "start") {
for(auto& p : _httpsServer._plugins) {
if (p.id == pathStrs[0]) {
for(const auto& g : p.configurator->getSensorGroups()) {
g->start();
//switch code depending on selected action
if (action == "start") {
for (auto &p : _httpsServer._plugins) {
if (p.id == pathStrs[0]) {
for (const auto &g : p.configurator->getSensorGroups()) {
g->start();
}
response = "Plugin " + pathStrs[0] + ": Sensors started";
connection->set_status(server::connection::ok);
break;
}
response = "Plugin " + pathStrs[0] + ": Sensors started";
connection->set_status(server::connection::ok);
break;
}
}
} else if (action == "stop") {
for(auto& p : _httpsServer._plugins) {
if (p.id == pathStrs[0]) {
for(const auto& g : p.configurator->getSensorGroups()) {
g->stop();
} else if (action == "stop") {
for (auto &p : _httpsServer._plugins) {
if (p.id == pathStrs[0]) {
for (const auto &g : p.configurator->getSensorGroups()) {
g->stop();
}
response = "Plugin " + pathStrs[0] + ": Sensors stopped";
connection->set_status(server::connection::ok);
break;
}
response = "Plugin " + pathStrs[0] + ": Sensors stopped";
connection->set_status(server::connection::ok);
break;
}
}
} else if (action == "reload") {
for(auto& p : _httpsServer._plugins) {
if (p.id == pathStrs[0]) {
//before modifying the plugin we need to ensure that we have exclusive access
//therefore pause the only other concurrent user (MQTTPusher)
_httpsServer._mqttPusher->halt();
//wait until MQTTPusher is paused
while (!_httpsServer._mqttPusher->isHalted()) {
sleep(1);
}
} else if (action == "reload") {
for (auto &p : _httpsServer._plugins) {
if (p.id == pathStrs[0]) {
//before modifying the plugin we need to ensure that we have exclusive access
//therefore pause the only other concurrent user (MQTTPusher)
_httpsServer._mqttPusher->halt();
//wait until MQTTPusher is paused
while (!_httpsServer._mqttPusher->isHalted()) {
sleep(1);
}
if (p.configurator->reReadConfig()) {
response = "Plugin " + pathStrs[0] + ": Configuration reloaded";
connection->set_status(server::connection::ok);
} else {
response = "Plugin " + pathStrs[0] + ": Could not reload configuration";
connection->set_status(server::connection::internal_server_error);
}
if (p.configurator->reReadConfig()) {
response = "Plugin " + pathStrs[0] + ": Configuration reloaded";
connection->set_status(server::connection::ok);
} else {
response = "Plugin " + pathStrs[0] + ": Could not reload configuration";
connection->set_status(server::connection::internal_server_error);
}
for(const auto& g : p.configurator->getSensorGroups()) {
g->init(_httpsServer._io);
g->start();
}
for (const auto &g : p.configurator->getSensorGroups()) {
g->init(_httpsServer._io);
g->start();
}
//continue MQTTPusher
_httpsServer._mqttPusher->cont();
break;
//continue MQTTPusher
_httpsServer._mqttPusher->cont();
break;
}
}
} else {
LOGH(warning) << "Unknown action " << pathStrs[1] << " requested";
connection->set_status(server::connection::not_supported);
goto error;
}
} else {
LOGH(warning) << "Unknown action " << pathStrs[1] << " requested";
connection->set_status(server::connection::not_supported);
goto error;
}
//Updating the SensorNavigator on plugin changes
QueryEngine& qEngine = QueryEngine::getInstance();
std::shared_ptr<SensorNavigator> navigator = std::make_shared<SensorNavigator>();
vector<std::string> names, topics;
for(const auto& p : _httpsServer._plugins)
for(const auto& g : p.configurator->getSensorGroups())
for(const auto& s : g->getSensors()) {
names.push_back(s->getName());
topics.push_back(s->getMqtt());
}
navigator->buildTree(qEngine.getSensorHierarchy(), &names, &topics);
qEngine.setNavigator(navigator);
qEngine.triggerUpdate();
//Updating the SensorNavigator on plugin changes
QueryEngine &qEngine = QueryEngine::getInstance();
std::shared_ptr <SensorNavigator> navigator = std::make_shared<SensorNavigator>();
vector <std::string> names, topics;
for (const auto &p : _httpsServer._plugins)
for (const auto &g : p.configurator->getSensorGroups())
for (const auto &s : g->getSensors()) {
names.push_back(s->getName());
topics.push_back(s->getMqtt());
}
navigator->buildTree(qEngine.getSensorHierarchy(), &names, &topics);
qEngine.setNavigator(navigator);
qEngine.triggerUpdate();
}
}
LOGH(info) << "Responding: " << response;
......@@ -335,8 +397,8 @@ void HttpsServer::requestHandler::log(const server::string_type& message) {
LOGH(error) << message;
}
HttpsServer::HttpsServer(restAPISettings_t restAPISettings, pluginVector_t& plugins, MQTTPusher* mqttPusher, boost::asio::io_service& io) :
_plugins(plugins), _mqttPusher(mqttPusher), _io(io), _handler(*this) {
HttpsServer::HttpsServer(restAPISettings_t restAPISettings, pluginVector_t& plugins, MQTTPusher* mqttPusher, AnalyticsManager* manager, boost::asio::io_service& io) :
_plugins(plugins), _mqttPusher(mqttPusher), _manager(manager), _io(io), _handler(*this) {
std::shared_ptr<asio::ssl::context> ctx = std::make_shared<asio::ssl::context>(asio::ssl::context::sslv23);
ctx->set_options(asio::ssl::context::default_workarounds | asio::ssl::context::no_sslv3 | asio::ssl::context::single_dh_use);
......
......@@ -25,6 +25,7 @@
#include "includes/Logging.h"
#include "includes/PluginDefinitions.h"
#include "MQTTPusher.h"
#include "analytics/AnalyticsManager.h"
typedef struct {
std::string restHost;
......@@ -47,7 +48,7 @@ typedef std::map<std::string, std::bitset<NUM_PERMISSIONS>> authkeyMap_t;
class HttpsServer {
public:
HttpsServer(restAPISettings_t restAPISettings, pluginVector_t& plugins, MQTTPusher* mqttPusher, boost::asio::io_service& io);
HttpsServer(restAPISettings_t restAPISettings, pluginVector_t& plugins, MQTTPusher* mqttPusher, AnalyticsManager* manager, boost::asio::io_service& io);
virtual ~HttpsServer();
bool addAuthkey(authkeyMap_t::value_type authkey) {
......@@ -108,6 +109,7 @@ private:
pluginVector_t& _plugins;
authkeyMap_t _authkeys;
MQTTPusher* _mqttPusher;
AnalyticsManager* _manager;
boost::asio::io_service& _io;
server* _server;
......
......@@ -3,6 +3,7 @@
//
#include "AnalyticsManager.h"
#include "timestamp.h"
void AnalyticsManager::clear() {
for(const auto& p : _plugins)
......@@ -155,14 +156,16 @@ bool AnalyticsManager::init(boost::asio::io_service& io, const string& plugin) {
LOG(error) << "Cannot init, AnalyticsManager is not loaded!";
return false;
}
bool out=false;
for (const auto &p : _plugins)
//Actions always affect either one or all plugins, and always all analyzers within said plugin
if(plugin=="" || plugin==p.id) {
out = true;
LOG(info) << "Init \"" << p.id << "\" data analytics plugin";
for (const auto &a : p.configurator->getAnalyzers())
a->init(io);
}
return true;
return out;
}
bool AnalyticsManager::reload(boost::asio::io_service& io, const string& plugin) {
......@@ -170,50 +173,254 @@ bool AnalyticsManager::reload(boost::asio::io_service& io, const string& plugin)
LOG(error) << "Cannot reload, AnalyticsManager is not loaded!";
return false;
}
bool out=false;
for (const auto &p : _plugins)
if(plugin=="" || plugin==p.id) {
LOG(info) << "Reload \"" << p.id << "\" data analytics plugin";
out = true;
if( !p.configurator->reReadConfig() )
return false;
for (const auto &a : p.configurator->getAnalyzers())
a->init(io);
}
return true;
return out;
}
bool AnalyticsManager::start(const string& plugin) {
bool AnalyticsManager::start(const string& plugin, const string& analyzer) {
if(_status != LOADED) {
LOG(error) << "Cannot start, AnalyticsManager is not loaded!";
return false;
}
bool out=false;
for (const auto &p : _plugins)
if(plugin=="" || plugin==p.id) {
LOG(info) << "Start \"" << p.id << "\" data analytics plugin";
for (const auto &a : p.configurator->getAnalyzers())
a->start();
// Only streaming analyzers can be started
if(a->getStreaming() && (analyzer=="" || analyzer==a->getName())) {
a->start();
out=true;
}
}
return true;
return out;
}
bool AnalyticsManager::stop(const string& plugin) {
bool AnalyticsManager::stop(const string& plugin, const string& analyzer) {
if(_status != LOADED) {
LOG(error) << "Cannot stop, AnalyticsManager is not loaded!";
return false;
}
bool out=false;
for (const auto &p : _plugins)
if(plugin=="" || plugin==p.id) {
LOG(info) << "Stop \"" << p.id << "\" data analytics plugin";
for (const auto &a : p.configurator->getAnalyzers())
a->stop();
// Only streaming analyzers can be stopped
if(a->getStreaming() && (analyzer=="" || analyzer==a->getName())) {
a->stop();
out=true;
}
}
return true;
return out;
}
string AnalyticsManager::forwardREST(const string& command) {
if(_status != LOADED) {
LOG(error) << "Cannot forward REST command, AnalyticsManager is not loaded!";
return "";
//TODO: fix responses / data stuff
string AnalyticsManager::REST(const vector<string>& pathStrs, const vector<pair<string,string>>& queries, const string& method, boost::asio::io_service& io) {
// Some preliminary checks
if(_status != LOADED)
throw runtime_error("Cannot forward REST command, AnalyticsManager is not loaded!");
if (method != "GET" && method != "PUT")
throw invalid_argument("Unsupported REST method!");
if(pathStrs.size() < 2 || pathStrs[0] != "analytics")
throw invalid_argument("Received malformed request!");
// Determining if JSON output was requested
bool json = false;
for (auto& p : queries)
json = p.first == "json" && stoi(p.second) > 0;
std::ostringstream data;
// GET block of commands
if (method == "GET") {
// Help cheatsheet command
if (pathStrs[1] == "help") {
data << "dcdbpusher analytics RESTful API cheatsheet:\n"
" -GET: /analytics/help This help message\n"
" /analytics/plugins List of currently loaded plugins (Discovery)\n"
" /analytics/[plugin]/analyzers\n"
" List of running analyzers in the specified\n"
" data analytics plugin\n"
" /analytics/[plugin]/[analyzer]/sensors\n"
" List of currently running sensors which belong\n"
" to the specified data analytics plugin (Discovery)\n"
" /analytics/[plugin]/[analyzer]/units\n"
" List of units to which sensors are associated in the\n"
" specified data analytics plugin (Discovery)\n"
" -PUT: /analytics/[plugin]/[analyzer]/[start|stop|reload]\n"
" Start/stop the sensors of the plugin or\n"
" reload the plugin configuration\n"
" /analytics/[plugin]/[analyzer]/[action]\n"
" Perform plugin-specific actions\n"
" (refer to documentation)"
"\n"
"All resources have to be prepended by host:port and need at\n"
"least the query ?authkey=[token] at the end. Multiple queries\n"
"need to be separated by semicolons(';')\n";
// Command to list data analytics plugins
} else if (pathStrs[1] == "plugins") {
if (json) {
boost::property_tree::ptree root, plugins;
for(auto& p : _plugins)
plugins.put(p.id, "");
root.add_child("plugins", plugins);
boost::property_tree::write_info(data, root);
} else
for(auto& p : _plugins)
data << p.id << "\n";
// Managing commands that have a path length greater than 2
} else {
if (pathStrs.size() < 3)
throw invalid_argument("Received malformed request, no second path part!");
string analyzer = pathStrs.size() > 3 ? pathStrs[2] : "";
string action = pathStrs[pathStrs.size() - 1];
string plugin = pathStrs[1];
// Listing all sensors in one or all analyzers of a plugin; the [analyzer] block is optional
if (action == "sensors") {
bool found = false;
for (auto &p : _plugins) {
if (p.id == plugin) {
if (json) {
boost::property_tree::ptree root, sensors;
// In JSON mode, sensors are arranged hierarchically by plugin->analyzer->sensor
for (auto &a : p.configurator->getAnalyzers())
if (a->getStreaming() && (analyzer == "" || analyzer == a->getName())) {
found = true;
boost::property_tree::ptree group;
for (auto &u : a->getUnits())
for (auto &s : u->getBaseOutputs())
// Explicitly adding nodes to the ptree as to prevent BOOST from performing
// parsing on the node names
group.push_back(boost::property_tree::ptree::value_type(s->getName(), boost::property_tree::ptree(s->getMqtt())));
sensors.add_child(a->getName(), group);
}
root.add_child(p.id, sensors);
boost::property_tree::write_info(data, root);
} else {
for (auto &a : p.configurator->getAnalyzers())
if (a->getStreaming() && (analyzer == "" || analyzer == a->getName())) {
found = true;
for (auto &u : a->getUnits())
for (auto &s : u->getBaseOutputs())
data << a->getName() << "." << s->getName() << " " << s->getMqtt() << "\n";
}
}
break;
}
}
if (!found)
throw domain_error("Plugin or analyzer not found!");
} else if (action == "units") {
bool found = false;
for (auto &p : _plugins) {
if (p.id == plugin) {
if (json) {
boost::property_tree::ptree root, units;
// In JSON mode, units are arranged hierarchically by plugin->analyzer->unit
for (auto &a : p.configurator->getAnalyzers())
if (a->getStreaming() && (analyzer == "" || analyzer == a->getName())) {
found = true;
boost::property_tree::ptree group;
for (auto &u : a->getUnits())
group.push_back(boost::property_tree::ptree::value_type(u->getName(), boost::property_tree::ptree()));
units.add_child(a->getName(), group);
}
root.add_child(p.id, units);
boost::property_tree::write_info(data, root);
} else {
for (auto &a : p.configurator->getAnalyzers())
if (a->getStreaming() && (analyzer == "" || analyzer == a->getName())) {
found = true;
for (auto &u : a->getUnits())
data << a->getName() << "." << u->getName() << "\n";
}
}
break;
}
}
if (!found)
throw domain_error("Plugin or analyzer not found!");
} else if (action == "analyzers") {
if(analyzer != "")
throw invalid_argument("Analyzers GET command does not support analyzer names!");
bool found = false;
for (auto &p : _plugins) {
if (p.id == plugin) {
if (json) {
boost::property_tree::ptree root, analyzers;
// For each analyzer, we output its type as well
for (auto &a : p.configurator->getAnalyzers())
analyzers.push_back(boost::property_tree::ptree::value_type(a->getName(), boost::property_tree::ptree(a->getStreaming() ? "streaming" : "on-demand")));
root.add_child(p.id, analyzers);
boost::property_tree::write_info(data, root);
} else {
for (auto &a : p.configurator->getAnalyzers())
data << a->getName() << " " << (a->getStreaming() ? "streaming\n" : "on-demand\n");
}
found = true;
break;
}
}
if (!found)
throw domain_error("Plugin not found!");
} else
throw invalid_argument("Unknown action " + action + " requested");
}
// PUT block of commands
} else if (method == "PUT") {
if (pathStrs.size() < 3)
throw invalid_argument("Received malformed request, no second path part!");
string analyzer = pathStrs.size() > 3 ? pathStrs[2] : "";
string action = pathStrs[pathStrs.size() - 1];
string plugin = pathStrs[1];
// Managing generic plugin PUT actions
if (action == "start") {
if( start(plugin, analyzer) ) {
data << "Plugin " + plugin + " " + analyzer + ": Sensors started";
} else
throw domain_error("Plugin or analyzer not found!");
} else if (action == "stop") {
if( stop(plugin, analyzer) ) {
data << "Plugin " + plugin + " " + analyzer + ": Sensors stopped";
} else
throw domain_error("Plugin or analyzer not found!");
} else if (action == "reload") {
if(!reload(io, plugin))
throw domain_error("Plugin not found or reload failed!");
else if(!start(plugin))
throw runtime_error("Plugin cannot be restarted!");
} else {
// Managing custom REST PUT actions defined at the analyzer level
bool found = false;
for (auto &p : _plugins)
if (p.id == plugin)
for (auto &a : p.configurator->getAnalyzers())
if (analyzer == "" || analyzer == a->getName()) {
found = true;
// Any thrown exception is catched outside in the HTTPserver
data << a->REST(action, queries);
}
if (!found)
throw domain_error("Plugin or analyzer not found!");
}
}
//TODO: implement REST interface integration
return "";
return data.str();
}
......@@ -7,6 +7,7 @@
#include <set>
#include <boost/foreach.hpp>
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/info_parser.hpp>
#include <boost/algorithm/string.hpp>