Commit f8703fc3 authored by Micha Mueller's avatar Micha Mueller
Browse files

Batch of compiler fixes

parent fd8c374a
......@@ -261,3 +261,240 @@ bool AnalyticsManager::stop(const string& plugin, const string& analyzer) {
}
return out;
}
restResponse_t AnalyticsManager::REST(const vector<string>& pathStrs, const vector<pair<string,string>>& queries, const string& method, boost::asio::io_service& io) {
// Some preliminary checks
if(_status != LOADED)
throw runtime_error("Cannot forward REST command, AnalyticsManager is not loaded!");
if (method != "GET" && method != "PUT")
throw invalid_argument("Unsupported REST method!");
if(pathStrs.size() < 2 || pathStrs[0] != "analytics")
throw invalid_argument("Received malformed request!");
// Determining if JSON output was requested
bool json = false;
for (auto& p : queries)
if (p.first == "json")
json = stoi(p.second) > 0;
restResponse_t reply;
std::ostringstream data;
// GET block of commands
if (method == "GET") {
// Help cheatsheet command
if (pathStrs[1] == "help") {
reply.response = restCheatSheet;
// Command to list data analytics plugins
} else if (pathStrs[1] == "plugins") {
if (json) {
boost::property_tree::ptree root, plugins;
for(auto& p : _plugins)
plugins.put(p.id, "");
root.add_child("plugins", plugins);
boost::property_tree::write_json(data, root, true);
} else
for(auto& p : _plugins)
data << p.id << "\n";
reply.data = data.str();
// Managing commands that have a path length greater than 2
} else {
if (pathStrs.size() < 3)
throw invalid_argument("Received malformed request, no second path part!");
string analyzer = pathStrs.size() > 3 ? pathStrs[2] : "";
string action = pathStrs[pathStrs.size() - 1];
string plugin = pathStrs[1];
// Listing all sensors in one or all analyzers of a plugin; the [analyzer] block is optional
if (action == "sensors") {
bool found = false;
for (auto &p : _plugins) {
if (p.id == plugin) {
if (json) {
boost::property_tree::ptree root, sensors;
// In JSON mode, sensors are arranged hierarchically by plugin->analyzer->sensor
for (auto &a : p.configurator->getAnalyzers())
if (a->getStreaming() && (analyzer == "" || analyzer == a->getName())) {
found = true;
boost::property_tree::ptree group;
for (auto &u : a->getUnits())
for (auto &s : u->getBaseOutputs())
// Explicitly adding nodes to the ptree as to prevent BOOST from performing
// parsing on the node names
group.push_back(boost::property_tree::ptree::value_type(s->getName(), boost::property_tree::ptree(s->getMqtt())));
sensors.add_child(a->getName(), group);
}
root.add_child(p.id, sensors);
boost::property_tree::write_json(data, root, true);
} else {
for (auto &a : p.configurator->getAnalyzers())
if (a->getStreaming() && (analyzer == "" || analyzer == a->getName())) {
found = true;
for (auto &u : a->getUnits())
for (auto &s : u->getBaseOutputs())
data << a->getName() << "." << s->getName() << " " << s->getMqtt() << "\n";
}
}
reply.data = data.str();
break;
}
}
if (!found)
throw domain_error("Plugin or analyzer not found!");
} else if (action == "units") {
bool found = false;
for (auto &p : _plugins) {
if (p.id == plugin) {
if (json) {
boost::property_tree::ptree root, units;
// In JSON mode, units are arranged hierarchically by plugin->analyzer->unit
for (auto &a : p.configurator->getAnalyzers())
if (a->getStreaming() && (analyzer == "" || analyzer == a->getName())) {
found = true;
boost::property_tree::ptree group;
for (auto &u : a->getUnits())
group.push_back(boost::property_tree::ptree::value_type(u->getName(), boost::property_tree::ptree()));
units.add_child(a->getName(), group);
}
root.add_child(p.id, units);
boost::property_tree::write_json(data, root, true);
} else {
for (auto &a : p.configurator->getAnalyzers())
if (a->getStreaming() && (analyzer == "" || analyzer == a->getName())) {
found = true;
for (auto &u : a->getUnits())
data << a->getName() << "." << u->getName() << "\n";
}
}
reply.data = data.str();
break;
}
}
if (!found)
throw domain_error("Plugin or analyzer not found!");
} else if (action == "analyzers") {
if(analyzer != "")
throw invalid_argument("Analyzers GET command does not support analyzer names!");
bool found = false;
for (auto &p : _plugins) {
if (p.id == plugin) {
if (json) {
boost::property_tree::ptree root, analyzers;
// For each analyzer, we output its type as well
for (auto &a : p.configurator->getAnalyzers())
analyzers.push_back(boost::property_tree::ptree::value_type(a->getName(), boost::property_tree::ptree(a->getStreaming() ? "streaming" : "on-demand")));
root.add_child(p.id, analyzers);
boost::property_tree::write_json(data, root, true);
} else {
for (auto &a : p.configurator->getAnalyzers())
data << a->getName() << " " << (a->getStreaming() ? "streaming\n" : "on-demand\n");
}
found = true;
reply.data = data.str();
break;
}
}
if (!found)
throw domain_error("Plugin not found!");
} else
throw invalid_argument("Unknown action " + action + " requested");
}
// PUT block of commands
} else if (method == "PUT") {
if (pathStrs.size() < 3)
throw invalid_argument("Received malformed request, no second path part!");
string analyzer = pathStrs.size() > 3 ? pathStrs[2] : "";
string action = pathStrs[pathStrs.size() - 1];
string plugin = pathStrs[1];
// Managing generic plugin PUT actions
if (action == "start") {
if( start(plugin, analyzer) ) {
reply.response = "Plugin " + plugin + " " + analyzer + ": Sensors started";
} else
throw domain_error("Plugin or analyzer not found!");
} else if (action == "stop") {
if( stop(plugin, analyzer) ) {
reply.response = "Plugin " + plugin + " " + analyzer + ": Sensors stopped";
} else
throw domain_error("Plugin or analyzer not found!");
} else if (action == "reload") {
if(!reload(io, plugin))
throw domain_error("Plugin not found or reload failed, please check the config files and MQTT topics!");
else if(!start(plugin))
throw runtime_error("Plugin cannot be restarted!");
reply.response = "Plugin " + plugin + ": Sensors reloaded";
} else if (action == "compute") {
if(pathStrs.size() < 4)
throw invalid_argument("Received malformed request, no third path part!");
string unit = SensorNavigator::rootKey;
for (auto& p : queries)
if (p.first == "unit")
unit = p.second;
bool found=false, unitFound=false;
for (auto &p : _plugins)
if (p.id == plugin)
for (auto &a : p.configurator->getAnalyzers())
if( a->getName() == analyzer ) {
found = true;
map <string, reading_t> outMap;
try {
outMap = a->computeOnDemand(unit);
unitFound = true;
} catch(const domain_error& e) {
// In the particular case where an analyzer is duplicated, it could be that the right
// unit is found only after a few tries. Therefore, we handle the domain_error
// exception raised in AnalyzerTemplate, and allow the search to continue
if(a->getStreaming() && a->getDuplicate())
continue;
else
throw;
}
if (json) {
boost::property_tree::ptree root, outputs;
// Iterating through the outputs of the on-demand computation and adding them to a JSON
for (const auto& kv : outMap) {
boost::property_tree::ptree sensor;
sensor.push_back(boost::property_tree::ptree::value_type("timestamp", boost::property_tree::ptree(to_string(kv.second.timestamp))));
sensor.push_back(boost::property_tree::ptree::value_type("value", boost::property_tree::ptree(to_string(kv.second.value))));
outputs.push_back(boost::property_tree::ptree::value_type(kv.first, sensor));
}
root.add_child(a->getName(), outputs);
boost::property_tree::write_json(data, root, true);
} else {
for (const auto& kv : outMap)
data << kv.first << " ts: " << kv.second.timestamp << " v: " << kv.second.value << "\n";
}
reply.data = data.str();
break;
}
if(!found)
throw domain_error("Plugin or analyzer not found!");
// This if branch is accessed only if the target analyzer is streaming and duplicated
else if(!unitFound)
throw domain_error("Node " + unit + " does not belong to the domain of " + analyzer + "!");
} else {
// Managing custom REST PUT actions defined at the analyzer level
bool found = false;
for (auto &p : _plugins)
if (p.id == plugin)
for (auto &a : p.configurator->getAnalyzers())
if (analyzer == "" || analyzer == a->getName()) {
found = true;
// Any thrown exception is catched outside in the HTTPserver
reply = a->REST(action, queries);
}
if (!found)
throw domain_error("Plugin or analyzer not found!");
}
}
return reply;
}
......@@ -135,6 +135,48 @@ public:
*/
bool stop(const string& plugin="", const string& analyzer="");
/**
* @brief Supply a REST command to the manager
*
* Commands must be plugin-specific. Those will be forwarded to said plugins, if of GET type, and
* the result will be collected. If PUT type, one or more actions will be performed on the plugin
* (e.g. start or stop). Some generic commands are available as well (see cheatsheet).
*
* @param pathStrs resource path to be accessed
* @param queries vector of queries
* @param method Either GET or PUT
* @param io BOOST IO Service, required to reload plugins
* @return Response as a <data, response> pair
*/
restResponse_t REST(const vector<string>& pathStrs, const vector<pair<string,string>>& queries, const string& method, boost::asio::io_service& io);
// String used as a response for the REST GET /help command
const string restCheatSheet = "dcdbpusher analytics RESTful API cheatsheet:\n"
" -GET: /analytics/plugins List of currently loaded plugins (Discovery)\n"
" /analytics/[plugin]/analyzers\n"
" List of running analyzers in the specified\n"
" data analytics plugin\n"
" /analytics/[plugin]/[analyzer]/sensors\n"
" List of currently running sensors which belong\n"
" to the specified data analytics plugin (Discovery)\n"
" /analytics/[plugin]/[analyzer]/units\n"
" List of units to which sensors are associated in the\n"
" specified data analytics plugin (Discovery)\n"
" -PUT: /analytics/[plugin]/[analyzer]/[start|stop|reload]\n"
" Start/stop the analyzers of the plugin or\n"
" reload the plugin configuration\n"
" /analytics/[plugin]/[analyzer]/compute\n"
" Perform computation of the given analyzer\n"
" in real-time. A \"unit\" query, specifying\n"
" the target unit must be included\n"
" /analytics/[plugin]/[analyzer]/[action]\n"
" Perform plugin-specific actions\n"
" (refer to documentation)\n"
"\n"
"All resources have to be prepended by host:port and need at\n"
"least the query ?authkey=[token] at the end. Multiple queries\n"
"need to be separated by semicolons(';')\n";
/**
* @brief Get the vector of currently loaded plugins
*
......
......@@ -147,6 +147,23 @@ public:
*/
virtual restResponse_t REST(const string& action, const unordered_map<string, string>& queries) = 0;
/**
* For LEGACY SUPPORT only.
* TODO switch to new Rest API design with the method above
*
* @brief Perform a REST-triggered PUT action
*
* This method must be implemented in derived classes. It will perform an action (if any)
* on the analyzer according to the input action string. Any thrown
* exceptions will be reported in the response string.
*
* @param action Name of the action to be performed
* @param queries Vector of queries (key-value pairs)
*
* @return Response to the request as a <response, data> pair
*/
virtual restResponse_t REST(const string& action, const vector<pair<string,string>>& queries) = 0;
/**
* @brief Starts this analyzer
*
......
......@@ -225,6 +225,24 @@ public:
throw invalid_argument("Unknown plugin action " + action + " requested!");
}
/**
* For LEGACY SUPPORT only.
* TODO switch to new Rest API design with the method above
*
* @brief Perform a REST-triggered PUT action
*
* This is a dummy implementation that can be overridden in user plugins. Any thrown
* exceptions will be reported in the response string.
*
* @param action Name of the action to be performed
* @param queries Vector of queries (key-value pairs)
*
* @return Response to the request as a <response, data> pair
*/
virtual restResponse_t REST(const string& action, const vector<pair<string,string>>& queries) override {
throw invalid_argument("Unknown plugin action " + action + " requested!");
}
/**
* @brief Performs an on-demand compute task
*
......
......@@ -501,7 +501,7 @@ int main(int argc, char* const argv[]) {
Configuration& settings = config;
cassandra_t& cassandraSettings = config.cassandraSettings;
pluginSettings_t& pluginSettings = config.pluginSettings;
restAPISettings_t& restAPISettings = config.restAPISettings;
serverSettings_t& restAPISettings = config.restAPISettings;
analyticsSettings_t& analyticsSettings = config.analyticsSettings;
optind = 1;
......@@ -661,7 +661,7 @@ int main(int argc, char* const argv[]) {
#endif
LOG(info) << "RestAPI Settings:";
LOG(info) << " REST Server: " << restAPISettings.restHost << ":" << restAPISettings.restPort;
LOG(info) << " REST Server: " << restAPISettings.host << ":" << restAPISettings.port;
#ifdef SimpleMQTTVerbose
LOG(info) << " Certificate: " << restAPISettings.certificate;
LOG(info) << " Private key file: " << restAPISettings.privateKey;
......@@ -701,7 +701,7 @@ int main(int argc, char* const argv[]) {
httpOptions.reuse_address(true);
httpOptions.thread_pool(std::make_shared<boost::network::utils::thread_pool>());
httpServer_t httpServer(httpOptions.address(restAPISettings.restHost).port(restAPISettings.restPort));
httpServer_t httpServer(httpOptions.address(restAPISettings.host).port(restAPISettings.port));
httpThread = std::thread([&httpServer] { httpServer.run(); });
LOG(info) << "HTTP Server running...";
......
......@@ -51,8 +51,8 @@ public:
* @param cfgFileName Name of the file containing the config
*/
Configuration(const std::string& cfgFilePath, const std::string& cfgFileName) : GlobalConfiguration(cfgFilePath, cfgFileName) {
restAPISettings.restPort = string(RESTAPIPORT);
restAPISettings.restHost = string(RESTAPIHOST);
restAPISettings.port = string(RESTAPIPORT);
restAPISettings.host = string(RESTAPIHOST);
}
Configuration() {}
......
......@@ -10,7 +10,6 @@
#include <boost/foreach.hpp>
#include <boost/property_tree/info_parser.hpp>
#include <boost/algorithm/string.hpp>
#include "RestAPI.h"
#include "logging.h"
// Wrapper class for plugin-specific settings
......
......@@ -13,6 +13,7 @@
#include <boost/log/trivial.hpp>
#include "includes/PluginDefinitions.h"
#include "mqttchecker.h"
#include "RestAPI.h"
#define BROKERPORT 1883
#define BROKERHOST "127.0.0.1"
......
......@@ -13,6 +13,7 @@
#include <boost/asio.hpp>
#include "includes/PluginDefinitions.h"
#include "globalconfiguration.h"
#include "../analytics/AnalyticsManager.h"
#include "mqttchecker.h"
#include "MQTTPusher.h"
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment