16.12.2021, 9:00 - 11:00: Due to updates GitLab may be unavailable for some minutes between 09:00 and 11:00.

Commit e0cb93a1 authored by Alessio Netti's avatar Alessio Netti
Browse files

Various fixes

- Added basic obsolescence checks in QueryEngine
- REST API GET requests now output actual JSON when requested
- Other minor changes
parent 7b4349f7
......@@ -59,3 +59,7 @@ plugins {
}
}
analyzerPlugins {
}
......@@ -14,7 +14,7 @@
#include <string>
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/info_parser.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/algorithm/string/split.hpp>
#define LOGH(sev) LOG(sev) << "HttpsServer: "
......@@ -118,10 +118,11 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
" -PUT: /[plugin]/[start|stop|reload]\n"
" Start/stop the sensors of the plugin or\n"
" reload the plugin configuration\n"
"\n"
"All resources have to be prepended by host:port and need at\n"
"least the query ?authkey=[token] at the end. Multiple queries\n"
"need to be separated by semicolons(';')\n";
"\n";
//"All resources have to be prepended by host:port and need at\n"
//"least the query ?authkey=[token] at the end. Multiple queries\n"
//"need to be separated by semicolons(';')\n";
response += _httpsServer._manager->restCheatSheet;
} else {
//first check permission
if (!_httpsServer.check_authkey(auth_value, permission::GETReq)) {
......@@ -133,7 +134,9 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
//Managing REST GET commands to the data analytics framework
if(pathStrs[0] == "analytics") {
try {
data << _httpsServer._manager->REST(pathStrs, queries, method, _httpsServer._io);
restResponse_t reply = _httpsServer._manager->REST(pathStrs, queries, method, _httpsServer._io);
data << reply.data;
response = reply.response;
} catch(const std::invalid_argument &e) {
LOGH(warning) << e.what();
connection->set_status(server::connection::bad_request);
......@@ -153,7 +156,7 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
plugins.put(p.id, "");
}
root.add_child("plugins", plugins);
boost::property_tree::write_info(data, root);
boost::property_tree::write_json(data, root, true);
} else {
for(auto& p : _httpsServer._plugins) {
data << p.id << "\n";
......@@ -184,7 +187,7 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
sensors.add_child(g->getGroupName(), group);
}
root.add_child(p.id, sensors);
boost::property_tree::write_info(data, root);
boost::property_tree::write_json(data, root, true);
} else {
for(auto g : p.configurator->getSensorGroups()) {
for(auto s : g->getSensors()) {
......@@ -279,7 +282,9 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
while (!_httpsServer._mqttPusher->isHalted()) { sleep(1); }
}
try {
data << _httpsServer._manager->REST(pathStrs, queries, method, _httpsServer._io);
restResponse_t reply = _httpsServer._manager->REST(pathStrs, queries, method, _httpsServer._io);
data << reply.data;
response = reply.response;
} catch(const std::invalid_argument &e) {
LOGH(warning) << e.what();
connection->set_status(server::connection::bad_request);
......
......@@ -28,6 +28,11 @@ bool AnalyticsManager::load(const string& path, const string& globalFile, const
return false;
}
if(cfg.find("analyzerPlugins") == cfg.not_found()) {
LOG(warning) << "No analyzerPlugins block found, skipping data analytics initialization!";
return false;
}
BOOST_FOREACH(boost::property_tree::iptree::value_type &plugin, cfg.get_child("analyzerPlugins")) {
if (boost::iequals(plugin.first, "analyzerPlugin")) {
if (!plugin.second.empty()) {
......@@ -224,8 +229,7 @@ bool AnalyticsManager::stop(const string& plugin, const string& analyzer) {
return out;
}
//TODO: fix responses / data stuff
string AnalyticsManager::REST(const vector<string>& pathStrs, const vector<pair<string,string>>& queries, const string& method, boost::asio::io_service& io) {
restResponse_t AnalyticsManager::REST(const vector<string>& pathStrs, const vector<pair<string,string>>& queries, const string& method, boost::asio::io_service& io) {
// Some preliminary checks
if(_status != LOADED)
throw runtime_error("Cannot forward REST command, AnalyticsManager is not loaded!");
......@@ -237,36 +241,17 @@ string AnalyticsManager::REST(const vector<string>& pathStrs, const vector<pair<
// Determining if JSON output was requested
bool json = false;
for (auto& p : queries)
json = p.first == "json" && stoi(p.second) > 0;
if (p.first == "json")
json = stoi(p.second) > 0;
restResponse_t reply;
std::ostringstream data;
// GET block of commands
if (method == "GET") {
// Help cheatsheet command
if (pathStrs[1] == "help") {
data << "dcdbpusher analytics RESTful API cheatsheet:\n"
" -GET: /analytics/help This help message\n"
" /analytics/plugins List of currently loaded plugins (Discovery)\n"
" /analytics/[plugin]/analyzers\n"
" List of running analyzers in the specified\n"
" data analytics plugin\n"
" /analytics/[plugin]/[analyzer]/sensors\n"
" List of currently running sensors which belong\n"
" to the specified data analytics plugin (Discovery)\n"
" /analytics/[plugin]/[analyzer]/units\n"
" List of units to which sensors are associated in the\n"
" specified data analytics plugin (Discovery)\n"
" -PUT: /analytics/[plugin]/[analyzer]/[start|stop|reload]\n"
" Start/stop the sensors of the plugin or\n"
" reload the plugin configuration\n"
" /analytics/[plugin]/[analyzer]/[action]\n"
" Perform plugin-specific actions\n"
" (refer to documentation)"
"\n"
"All resources have to be prepended by host:port and need at\n"
"least the query ?authkey=[token] at the end. Multiple queries\n"
"need to be separated by semicolons(';')\n";
reply.response = restCheatSheet;
// Command to list data analytics plugins
} else if (pathStrs[1] == "plugins") {
if (json) {
......@@ -274,10 +259,11 @@ string AnalyticsManager::REST(const vector<string>& pathStrs, const vector<pair<
for(auto& p : _plugins)
plugins.put(p.id, "");
root.add_child("plugins", plugins);
boost::property_tree::write_info(data, root);
boost::property_tree::write_json(data, root, true);
} else
for(auto& p : _plugins)
data << p.id << "\n";
reply.data = data.str();
// Managing commands that have a path length greater than 2
} else {
if (pathStrs.size() < 3)
......@@ -308,7 +294,7 @@ string AnalyticsManager::REST(const vector<string>& pathStrs, const vector<pair<
sensors.add_child(a->getName(), group);
}
root.add_child(p.id, sensors);
boost::property_tree::write_info(data, root);
boost::property_tree::write_json(data, root, true);
} else {
for (auto &a : p.configurator->getAnalyzers())
if (a->getStreaming() && (analyzer == "" || analyzer == a->getName())) {
......@@ -318,6 +304,7 @@ string AnalyticsManager::REST(const vector<string>& pathStrs, const vector<pair<
data << a->getName() << "." << s->getName() << " " << s->getMqtt() << "\n";
}
}
reply.data = data.str();
break;
}
}
......@@ -340,7 +327,7 @@ string AnalyticsManager::REST(const vector<string>& pathStrs, const vector<pair<
units.add_child(a->getName(), group);
}
root.add_child(p.id, units);
boost::property_tree::write_info(data, root);
boost::property_tree::write_json(data, root, true);
} else {
for (auto &a : p.configurator->getAnalyzers())
if (a->getStreaming() && (analyzer == "" || analyzer == a->getName())) {
......@@ -349,6 +336,7 @@ string AnalyticsManager::REST(const vector<string>& pathStrs, const vector<pair<
data << a->getName() << "." << u->getName() << "\n";
}
}
reply.data = data.str();
break;
}
}
......@@ -367,12 +355,13 @@ string AnalyticsManager::REST(const vector<string>& pathStrs, const vector<pair<
for (auto &a : p.configurator->getAnalyzers())
analyzers.push_back(boost::property_tree::ptree::value_type(a->getName(), boost::property_tree::ptree(a->getStreaming() ? "streaming" : "on-demand")));
root.add_child(p.id, analyzers);
boost::property_tree::write_info(data, root);
boost::property_tree::write_json(data, root, true);
} else {
for (auto &a : p.configurator->getAnalyzers())
data << a->getName() << " " << (a->getStreaming() ? "streaming\n" : "on-demand\n");
}
found = true;
reply.data = data.str();
break;
}
}
......@@ -394,12 +383,12 @@ string AnalyticsManager::REST(const vector<string>& pathStrs, const vector<pair<
// Managing generic plugin PUT actions
if (action == "start") {
if( start(plugin, analyzer) ) {
data << "Plugin " + plugin + " " + analyzer + ": Sensors started";
reply.response = "Plugin " + plugin + " " + analyzer + ": Sensors started";
} else
throw domain_error("Plugin or analyzer not found!");
} else if (action == "stop") {
if( stop(plugin, analyzer) ) {
data << "Plugin " + plugin + " " + analyzer + ": Sensors stopped";
reply.response = "Plugin " + plugin + " " + analyzer + ": Sensors stopped";
} else
throw domain_error("Plugin or analyzer not found!");
} else if (action == "reload") {
......@@ -407,6 +396,7 @@ string AnalyticsManager::REST(const vector<string>& pathStrs, const vector<pair<
throw domain_error("Plugin not found or reload failed!");
else if(!start(plugin))
throw runtime_error("Plugin cannot be restarted!");
reply.response = "Plugin " + plugin + ": Sensors reloaded";
} else {
// Managing custom REST PUT actions defined at the analyzer level
bool found = false;
......@@ -416,11 +406,11 @@ string AnalyticsManager::REST(const vector<string>& pathStrs, const vector<pair<
if (analyzer == "" || analyzer == a->getName()) {
found = true;
// Any thrown exception is catched outside in the HTTPserver
data << a->REST(action, queries);
reply = a->REST(action, queries);
}
if (!found)
throw domain_error("Plugin or analyzer not found!");
}
}
return data.str();
return reply;
}
......@@ -9,6 +9,7 @@
#include <boost/foreach.hpp>
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/info_parser.hpp>
#include <boost/property_tree/json_parser.hpp>
#include <boost/algorithm/string.hpp>
#include <boost/asio.hpp>
#include <dlfcn.h>
......@@ -20,6 +21,7 @@
using namespace std;
// Struct of values required to load analyzer dynamic libraries.
typedef struct {
std::string id;
......@@ -133,9 +135,9 @@ public:
* @param queries vector of queries
* @param method Either GET or PUT
* @param io BOOST IO Service, required to reload plugins
* @return Response as a plain string
* @return Response as a <data, response> pair
*/
string REST(const vector<string>& pathStrs, const vector<pair<string,string>>& queries, const string& method, boost::asio::io_service& io);
restResponse_t REST(const vector<string>& pathStrs, const vector<pair<string,string>>& queries, const string& method, boost::asio::io_service& io);
/**
* @brief Get the vector of currently loaded plugins
......@@ -158,6 +160,29 @@ public:
*/
bool mqttCheck(pluginVector_t& pushers);
// String used as a response for the REST GET /help command
const string restCheatSheet = "dcdbpusher analytics RESTful API cheatsheet:\n"
" -GET: /analytics/plugins List of currently loaded plugins (Discovery)\n"
" /analytics/[plugin]/analyzers\n"
" List of running analyzers in the specified\n"
" data analytics plugin\n"
" /analytics/[plugin]/[analyzer]/sensors\n"
" List of currently running sensors which belong\n"
" to the specified data analytics plugin (Discovery)\n"
" /analytics/[plugin]/[analyzer]/units\n"
" List of units to which sensors are associated in the\n"
" specified data analytics plugin (Discovery)\n"
" -PUT: /analytics/[plugin]/[analyzer]/[start|stop|reload]\n"
" Start/stop the analyzers of the plugin or\n"
" reload the plugin configuration\n"
" /analytics/[plugin]/[analyzer]/[action]\n"
" Perform plugin-specific actions\n"
" (refer to documentation)\n"
"\n"
"All resources have to be prepended by host:port and need at\n"
"least the query ?authkey=[token] at the end. Multiple queries\n"
"need to be separated by semicolons(';')\n";
protected:
// Vector of plugins represented as an_dl_t structures
......
......@@ -12,7 +12,7 @@
#include "AnalyzerInterface.h"
#include "../../includes/ConfiguratorInterface.h"
//TODO: fix auto-publish behavior on plugin reload
/**
* Interface to configurators for data analyzer plugins
*
......
......@@ -13,6 +13,12 @@
#include "../../includes/Logging.h"
#include "UnitInterface.h"
// Struct defining a response to a REST request
typedef struct {
std::string response;
std::string data;
} restResponse_t;
/**
* Interface to data analyzers
*
......@@ -128,9 +134,9 @@ public:
* @param action Name of the action to be performed
* @param queries Vector of queries (key-value pairs)
*
* @return Response string to the request
* @return Response to the request as a <response, data> pair
*/
virtual std::string REST(const std::string& action, const std::vector<std::pair<std::string,std::string>>& queries) = 0;
virtual restResponse_t REST(const std::string& action, const std::vector<std::pair<std::string,std::string>>& queries) = 0;
/**
* @brief Starts this analyzer
......
......@@ -180,10 +180,10 @@ public:
* @param action Name of the action to be performed
* @param queries Vector of queries (key-value pairs)
*
* @return Response string to the request
* @return Response to the request as a <response, data> pair
*/
virtual std::string REST(const std::string& action, const std::vector<std::pair<std::string,std::string>>& queries) override {
throw std::invalid_argument("Unknown action " + action + " requested!");
virtual restResponse_t REST(const std::string& action, const std::vector<std::pair<std::string,std::string>>& queries) override {
throw std::invalid_argument("Unknown plugin action " + action + " requested!");
}
protected:
......
......@@ -64,6 +64,7 @@ std::vector<reading_t>* sensorQueryCallback(const string& name, const uint64_t s
//Initializing the sensor map if necessary. Thread safe!
if(_queryEngine.updated.load()) {
if(!_queryEngine.updating.exchange(true)) {
_sensorMap.clear();
for (auto &p : _configuration->getPlugins())
for (auto &g : p.configurator->getSensorGroups())
for (auto &s : g->getSensors())
......@@ -80,16 +81,26 @@ std::vector<reading_t>* sensorQueryCallback(const string& name, const uint64_t s
SBasePtr sensor = _sensorMap[name];
// Converting absolute timestamps to relative offsets for cache access
uint64_t interval = ((uint64_t)sensor->getCacheInterval() / ((uint64_t)sensor->getCacheSize() - 1)) * 2 * 1000000;
uint64_t now = getTimestamp();
uint64_t startTsInt = rel ? startTs : now - startTs;
uint64_t endTsInt = rel ? endTs : now - endTs;
//TODO: replace these two lines of code with the calls from CacheEntry used in collectagent
//Converting absolute timestamps to relative offsets for cache access
//Getting the cache indexes to access sensor data
int64_t startIdx = sensor->getCacheOffset(rel ? startTs : getTimestamp() - startTs);
int64_t endIdx = sensor->getCacheOffset(rel ? endTs : getTimestamp() - endTs);
int64_t startIdx = sensor->getCacheOffset(startTsInt);
int64_t endIdx = sensor->getCacheOffset(endTsInt);
//Managing invalid time offsets
if( startIdx < 0 || endIdx < 0)
return buffer;
//TODO: better ways to manage data obsolescence?
//Managing obsolete data
if(now - startTsInt > sensor->getCache()[startIdx].timestamp + interval || now - endTsInt > sensor->getCache()[endIdx].timestamp + interval)
return buffer;
if( startIdx <= endIdx )
buffer->insert(buffer->end(), sensor->getCache() + startIdx, sensor->getCache() + endIdx + 1);
else {
......
......@@ -102,6 +102,7 @@ public:
const std::string& getSinkPath() const { return _sinkPath; }
bool getSkipConstVal() const { return _skipConstVal; }
unsigned getCacheSize() const { return _cacheSize; }
unsigned getCacheInterval() const { return _cacheInterval; }
unsigned getSubsampling() const { return _subsamplingFactor; }
const reading_t * const getCache() const { return _cache.get(); }
const reading_t& getLatestValue() const { return _latestValue; }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment