Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

Commit 2a026683 authored by Micha Mueller's avatar Micha Mueller
Browse files

Finish RestAPI class prototype

parent 91d5be44
......@@ -80,11 +80,7 @@ void RestAPI::GET_analytics_sensors(endpointArgs) {
const std::string plugin = getQuery("plugin", queries);
const std::string analyzer = getQuery("analyzer", queries);
if (plugin == "") {
const std::string err = "Request malformed: plugin query missing";
RESTAPILOG(error) << err;
res.body() = err;
res.result(http::status::bad_request);
if (!hasPlugin(plugin, res)) {
return;
}
......@@ -131,7 +127,7 @@ void RestAPI::GET_analytics_sensors(endpointArgs) {
}
}
if (!found) {
res.body() = "Plugin or analyzer not found!";
res.body() = "Plugin or analyzer not found!\n";
res.result(http::status::not_found);
}
}
......@@ -144,11 +140,7 @@ void RestAPI::GET_analytics_units(endpointArgs) {
const std::string plugin = getQuery("plugin", queries);
const std::string analyzer = getQuery("analyzer", queries);
if (plugin == "") {
const std::string err = "Request malformed: plugin query missing";
RESTAPILOG(error) << err;
res.body() = err;
res.result(http::status::bad_request);
if (!hasPlugin(plugin, res)) {
return;
}
......@@ -188,7 +180,7 @@ void RestAPI::GET_analytics_units(endpointArgs) {
}
}
if (!found) {
res.body() = "Plugin or analyzer not found!";
res.body() = "Plugin or analyzer not found!\n";
res.result(http::status::not_found);
}
}
......@@ -200,17 +192,10 @@ void RestAPI::GET_analytics_analyzers(endpointArgs) {
const std::string plugin = getQuery("plugin", queries);
if (plugin == "") {
const std::string err = "Request malformed: plugin query missing";
RESTAPILOG(error) << err;
res.body() = err;
res.result(http::status::bad_request);
if (!hasPlugin(plugin, res)) {
return;
}
res.body() = "Plugin not found!\n";
res.result(http::status::not_found);
std::ostringstream data;
for (const auto& p : _manager->getPlugins()) {
......@@ -261,17 +246,11 @@ void RestAPI::GET_plugins(endpointArgs) {
void RestAPI::GET_sensors(endpointArgs) {
const std::string plugin = getQuery("plugin", queries);
if (plugin == "") {
const std::string err = "Request malformed: plugin query missing";
RESTAPILOG(error) << err;
res.body() = err;
res.result(http::status::bad_request);
if (!hasPlugin(plugin, res)) {
return;
}
res.body() = "Plugin not found!\n";
res.result(http::status::not_found);
for(const auto& p : _plugins) {
if (p.id == plugin) {
std::ostringstream data;
......@@ -325,7 +304,7 @@ void RestAPI::GET_average(endpointArgs) {
for(const auto& p : _plugins) {
if (p.id == plugin) {
res.body() = "Sensor not found!";
res.body() = "Sensor not found!\n";
for(const auto& g : p.configurator->getSensorGroups()) {
for(const auto& s : g->getSensors()) {
if (s->getName() == sensor && s->isInit()) {
......@@ -378,7 +357,292 @@ void RestAPI::GET_average(endpointArgs) {
}
}
//TODO put methods
void RestAPI::PUT_analytics_start(endpointArgs) {
if (!managerLoaded(res)) {
return;
}
const std::string plugin = getQuery("plugin", queries);
const std::string analyzer = getQuery("analyzer", queries);
if (_manager->start(plugin, analyzer)) {
res.body() = "Plugin " + plugin + " " + analyzer + ": Sensors started!\n";
res.result(http::status::ok);
} else {
res.body() = "Plugin or analyzer not found!\n";
res.result(http::status::not_found);
}
}
void RestAPI::PUT_analytics_stop(endpointArgs) {
if (!managerLoaded(res)) {
return;
}
const std::string plugin = getQuery("plugin", queries);
const std::string analyzer = getQuery("analyzer", queries);
if (_manager->stop(plugin, analyzer)) {
res.body() = "Plugin " + plugin + " " + analyzer + ": Sensors stopped!\n";
res.result(http::status::ok);
} else {
res.body() = "Plugin or analyzer not found!\n";
res.result(http::status::not_found);
}
}
void RestAPI::PUT_analytics_reload(endpointArgs) {
if (!managerLoaded(res)) {
return;
}
const std::string plugin = getQuery("plugin", queries);
_mqttPusher->halt();
// Wait until MQTTPusher is paused in order to reload plugins
while (!_mqttPusher->isHalted()) { sleep(1); }
if (!_manager->reload(_io, plugin)) {
res.body() = "Plugin not found or reload failed, please check the config files and MQTT topics!\n";
res.result(http::status::not_found);
} else if (!_manager->start(plugin)){
res.body() = "Plugin cannot be restarted!\n";
res.result(http::status::internal_server_error);
} else {
res.body() = "Plugin " + plugin + ": Sensors reloaded";
res.result(http::status::ok);
}
_mqttPusher->cont();
}
void RestAPI::PUT_analytics_compute(endpointArgs) {
if (!managerLoaded(res)) {
return;
}
const std::string plugin = getQuery("plugin", queries);
const std::string analyzer = getQuery("analyzer", queries);
std::string unit = getQuery("unit", queries);
if (plugin == "" || analyzer == "") {
const std::string err = "Request malformed: plugin or analyzer query missing";
RESTAPILOG(error) << err;
res.body() = err;
res.result(http::status::bad_request);
return;
}
if (unit == "") {
unit = SensorNavigator::rootKey;
}
res.body() = "Plugin or analyzer not found!\n";
res.result(http::status::not_found);
std::ostringstream data;
bool unitFound=false;
for (const auto &p : _manager->getPlugins()) {
if (p.id == plugin) {
for (const auto &a : p.configurator->getAnalyzers()) {
if( a->getName() == analyzer ) {
std::map<std::string, reading_t> outMap;
try {
outMap = a->computeOnDemand(unit);
unitFound = true;
} catch(const domain_error& e) {
// In the particular case where an analyzer is duplicated, it could be that the right
// unit is found only after a few tries. Therefore, we handle the domain_error
// exception raised in AnalyzerTemplate, and allow the search to continue
if(a->getStreaming() && a->getDuplicate()) {
continue;
} else {
res.body() = e.what();
res.result(http::status::not_found);
return;
}
}
if (getQuery("json", queries) == "true") {
boost::property_tree::ptree root, outputs;
// Iterating through the outputs of the on-demand computation and adding them to a JSON
for (const auto& kv : outMap) {
boost::property_tree::ptree sensor;
sensor.push_back(boost::property_tree::ptree::value_type("timestamp", boost::property_tree::ptree(to_string(kv.second.timestamp))));
sensor.push_back(boost::property_tree::ptree::value_type("value", boost::property_tree::ptree(to_string(kv.second.value))));
outputs.push_back(boost::property_tree::ptree::value_type(kv.first, sensor));
}
root.add_child(a->getName(), outputs);
boost::property_tree::write_json(data, root, true);
} else {
for (const auto& kv : outMap) {
data << kv.first << " ts: " << kv.second.timestamp << " v: " << kv.second.value << "\n";
}
}
res.body() = data.str();
res.result(http::status::ok);
return;
}
}
}
}
// This if branch is accessed only if the target analyzer is streaming and duplicated
if(!unitFound) {
res.body() = "Node " + unit + " does not belong to the domain of " + analyzer + "!";
res.result(http::status::not_found);
}
}
void RestAPI::PUT_analytics_analyzer(endpointArgs) {
if (!managerLoaded(res)) {
return;
}
const std::string plugin = getQuery("plugin", queries);
const std::string analyzer = getQuery("analyzer", queries);
const std::string action = getQuery("action", queries);
if (plugin == "" || action == "") {
const std::string err = "Request malformed: plugin or action query missing";
RESTAPILOG(error) << err;
res.body() = err;
res.result(http::status::bad_request);
return;
}
res.body() = "Plugin or analyzer not found!\n";
res.result(http::status::not_found);
// Managing custom REST PUT actions defined at the analyzer level
for (const auto &p : _manager->getPlugins()) {
if (p.id == plugin) {
for (const auto &a : p.configurator->getAnalyzers()) {
if (analyzer == "" || analyzer == a->getName()) {
// Any thrown exception is catched outside in the HTTPserver
try {
restResponse_t reply = a->REST(action, queries);
res.body() = reply.data;
res.body() += reply.response;
res.result(http::status::ok);
} catch(const std::invalid_argument &e) {
const std::string err = e.what();
RESTAPILOG(warning) << err;
res.body() = err;
res.result(http::status::bad_request);
} catch(const std::domain_error &e) {
const std::string err = e.what();
RESTAPILOG(warning) << err;
res.body() = err;
res.result(http::status::not_found);
} catch(const std::exception &e) {
const std::string err = e.what();
RESTAPILOG(warning) << err;
res.body() = err;
res.result(http::status::internal_server_error);
}
}
}
}
}
}
void RestAPI::PUT_start(endpointArgs) {
const std::string plugin = getQuery("plugin", queries);
if (!hasPlugin(plugin, res)) {
return;
}
for (const auto& p : _plugins) {
if (p.id == plugin) {
for (const auto& g : p.configurator->getSensorGroups()) {
g->start();
}
res.body() = "Plugin " + plugin + ": Sensors started\n";
res.result(http::status::ok);
return;
}
}
}
void RestAPI::PUT_stop(endpointArgs) {
const std::string plugin = getQuery("plugin", queries);
if (!hasPlugin(plugin, res)) {
return;
}
for (const auto& p : _plugins) {
if (p.id == plugin) {
for (const auto& g : p.configurator->getSensorGroups()) {
g->stop();
}
res.body() = "Plugin " + plugin + ": Sensors stopped\n";
res.result(http::status::ok);
return;
}
}
}
void RestAPI::PUT_reload(endpointArgs) {
const std::string plugin = getQuery("plugin", queries);
if (!hasPlugin(plugin, res)) {
return;
}
for (const auto &p : _plugins) {
if (p.id == plugin) {
//before modifying the plugin we need to ensure that we have exclusive access
//therefore pause the only other concurrent user (MQTTPusher)
_mqttPusher->halt();
//wait until MQTTPusher is paused
while (!_mqttPusher->isHalted()) {
sleep(1);
}
// Removing obsolete MQTT topics
removeTopics(p);
if (p.configurator->reReadConfig()) {
// Perform checks on MQTT topics
if(!checkTopics(p)) {
res.body() = "Plugin " + plugin + ": problematic MQTT topics or sensor names, please check your config files!\n";
res.result(http::status::internal_server_error);
removeTopics(p);
p.configurator->clearConfig();
} else {
res.body() = "Plugin " + plugin + ": Configuration reloaded\n";
res.result(http::status::ok);
for (const auto& g : p.configurator->getSensorGroups()) {
g->init(_io);
g->start();
}
}
} else {
res.body() = "Plugin " + plugin + ": Could not reload configuration";
res.result(http::status::internal_server_error);
}
//continue MQTTPusher
_mqttPusher->cont();
break;
}
}
//Updating the SensorNavigator on plugin reloads
QueryEngine &qEngine = QueryEngine::getInstance();
std::shared_ptr <SensorNavigator> navigator = std::make_shared<SensorNavigator>();
std::vector<std::string> names, topics;
for (const auto &p : _plugins)
for (const auto &g : p.configurator->getSensorGroups())
for (const auto &s : g->getSensors()) {
names.push_back(s->getName());
topics.push_back(s->getMqtt());
}
navigator->buildTree(qEngine.getSensorHierarchy(), &names, &topics);
qEngine.setNavigator(navigator);
qEngine.triggerUpdate();
}
void RestAPI::removeTopics(dl_t p) {
MQTTChecker& mqttCheck = MQTTChecker::getInstance();
......
......@@ -70,7 +70,7 @@ private:
/**
* GET "/analytics/plugins"
*
* @brief List all data analytic plugins.
* @brief (Discovery) List all currently loaded data analytic plugins.
*
* Queries | key | possible values | explanation
* -------------------------------------------------------------------------
......@@ -82,7 +82,7 @@ private:
/**
* GET "/analytics/sensors"
*
* @brief List all running sensors in one or all analyzers of a plugin.
* @brief (Discovery) List all sensors of a plugin.
*
* Queries | key | possible values | explanation
* -------------------------------------------------------------------------
......@@ -97,7 +97,7 @@ private:
/**
* GET "/analytics/units"
*
* @brief List all units of a plugin sensors are associated with
* @brief (Discovery) List all units of a plugin.
*
* Queries | key | possible values | explanation
* -------------------------------------------------------------------------
......@@ -112,7 +112,7 @@ private:
/**
* GET "/analytics/analyzers"
*
* @brief List all running analyzers of a plugin.
* @brief (Discovery) List all active analyzers of a plugin.
*
* Queries | key | possible values | explanation
* -------------------------------------------------------------------------
......@@ -180,71 +180,97 @@ private:
/**
* PUT "/analytics/start"
*
* @brief
* @brief Start all or only a specific plugin. Or only start a specific
* streaming analyzer within a specific plugin.
*
* Queries | key | possible values | explanation
* -------------------------------------------------------------------------
* Required | - | - | -
* Optional | - | - | -
* Optional | plugin | all analyzer plugin | only start the specified
* | | names | plugin
* | analyzer| all analyzers of a | only start the specified
* | | plugin | analyzer. Requires a plugin
* | | | to be specified. Limited to
* | | | streaming analyzers.
*/
void PUT_analytics_start(endpointArgs);
/**
* PUT "/analytics/stop"
*
* @brief
* @brief Stop all or only a specific plugin. Or only stop a specific
* streaming analyzer within a plugin.
*
* Queries | key | possible values | explanation
* -------------------------------------------------------------------------
* Required | - | - | -
* Optional | - | - | -
* Optional | plugin | all analyzer plugin | only stop the specified
* | | names | plugin
* | analyzer| all analyzers of a | only stop the specified
* | | plugin | analyzer. Requires a plugin
* | | | to be specified. Limited to
* | | | streaming analyzers.
*/
void PUT_analytics_stop(endpointArgs);
/**
* PUT "/analytics/reload"
*
* @brief
* @brief Reload configuration and initialization of all or only a specific
* analytics plugin.
*
* Queries | key | possible values | explanation
* -------------------------------------------------------------------------
* Required | - | - | -
* Optional | - | - | -
* Optional | plugin | all analyzer plugin | reload only the specified
* | | names | plugin
*/
void PUT_analytics_reload(endpointArgs);
/**
* PUT "/analytics/compute"
*
* @brief
* @brief Query the given analyzer for a certain input unit. Intended for
* "on-demand" analyzers, but works with "streaming" analyzers as
* well.
*
* Queries | key | possible values | explanation
* -------------------------------------------------------------------------
* Required | - | - | -
* Optional | - | - | -
* Required | plugin | all analyzer plugin | select plugin
* | | names |
* | analyzer| all analyzers of a | select analyzer
* | | plugin |
* Optional | unit | all units of a plugin| select target unit
* | json | true | format response as json
*/
void PUT_analytics_compute(endpointArgs);
/**
* PUT "/analytics/analyzer"
*
* @brief
* @brief Perform a custom REST PUT action defined at analyzer level.
*
* Queries | key | possible values | explanation
* -------------------------------------------------------------------------
* Required | - | - | -
* Optional | - | - | -
* Required | plugin | all analyzer plugin | select plugin
* | | names |
* | action | see analyzer | select custom action
* | | documentation |
* | custom action may require more queries!
* Optional | analyzer| all analyzers of a | select analyzer
* | | plugin |
* | custom action may allow for more queries!
*/
void PUT_analytics_analyzer(endpointArgs);
/**
* PUT "/start"
*
* @brief
* @brief Start a plugin.
*
* Queries | key | possible values | explanation
* -------------------------------------------------------------------------
* Required | - | - | -
* Required | plugin | all plugin names | specify the plugin
* Optional | - | - | -
*/
void PUT_start(endpointArgs);
......@@ -252,11 +278,11 @@ private:
/**
* PUT "/stop"
*
* @brief
* @brief Stop a plugin.
*
* Queries | key | possible values | explanation
* -------------------------------------------------------------------------
* Required | - | - | -
* Required | plugin | all plugin names | specify the plugin
* Optional | - | - | -
*/
void PUT_stop(endpointArgs);
......@@ -264,7 +290,7 @@ private:
/**
* PUT "/reload"
*
* @brief
* @brief Reload a plugin's configuration (includes plugin restart).
*
* Queries | key | possible values | explanation
* -------------------------------------------------------------------------
......@@ -275,6 +301,24 @@ private:
/******************************************************************************/
// Utility method to check if a plugin query was given.
// Prepares the response accordingly to continue execution on success or
// immediately return on failure.
// Return true if plugin was given, false otherwise.
inline bool hasPlugin(const std::string& plugin, http::response<http::string_body>& res) {
if (plugin == "") {
const std::string err = "Request malformed: plugin query missing";
RESTAPILOG(error) << err;
res.body() = err;
res.result(http::status::bad_request);
return false;
} else {
res.body() = "Plugin not found!\n";
res.result(http::status::not_found);
return true;
}
}
// Utility method to check the status of the analytics manager.
// Return true if loaded, false otherwise.
inline bool managerLoaded(http::response<http::string_body>& res) {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment