Commit 7df0a639 authored by Alessio Netti's avatar Alessio Netti
Browse files

Analytics: REST API action to rebuild Sensor Navigator

- Especially useful in Collect Agent: now there is no need to restart
the CA to instantiate operator plugins after sensors have been
published
parent 148ef195
......@@ -247,6 +247,7 @@ public:
" Do a custom operator action for all or only an\n"
" selected operator within a plugin (refer to plugin\n"
" documentation).\n"
" /navigator Reloads the sensor navigator.\n"
"\n"
"D = Discovery method\n"
"All resources have to be prepended by host:port.\n"
......
......@@ -671,6 +671,16 @@ Prefix `/analytics` left out!
</tr>
</table>
<table>
<tr>
<td colspan="2"><b>PUT /navigator</b></td>
<td colspan="2">Rebuild the Sensor Navigator used for instantiating operators.</td>
</tr>
<tr>
<td colspan="4">No queries.</td>
</tr>
</table>
<table>
<tr>
<td colspan="2"><b>PUT /compute</b></td>
......
......@@ -48,6 +48,7 @@ CARestAPI::CARestAPI(serverSettings_t settings,
addEndpoint("/analytics/reload", {http::verb::put, stdBind(PUT_analytics_reload)});
addEndpoint("/analytics/load", {http::verb::put, stdBind(PUT_analytics_load)});
addEndpoint("/analytics/unload", {http::verb::put, stdBind(PUT_analytics_unload)});
addEndpoint("/analytics/navigator", {http::verb::put, stdBind(PUT_analytics_navigator)});
}
void CARestAPI::GET_help(endpointArgs) {
......@@ -116,7 +117,7 @@ void CARestAPI::PUT_analytics_reload(endpointArgs) {
if (!_analyticsController->getManager()->reload(_analyticsController->getIoService(), plugin)) {
res.body() = "Plugin not found or reload failed, please check the config files and MQTT topics!\n";
res.result(http::status::not_found);
} else if (_analyticsController->getManager()->start(plugin)){
} else if (!_analyticsController->getManager()->start(plugin)) {
res.body() = "Plugin cannot be restarted!\n";
res.result(http::status::internal_server_error);
} else {
......@@ -168,3 +169,14 @@ void CARestAPI::PUT_analytics_unload(endpointArgs) {
_analyticsController->resume();
}
void CARestAPI::PUT_analytics_navigator(endpointArgs) {
if(!_analyticsController->rebuildSensorNavigator()) {
res.body() = "Sensor hierarchy tree could not be rebuilt.\n";
res.result(http::status::internal_server_error);
} else {
std::shared_ptr <SensorNavigator> navigator = QueryEngine::getInstance().getNavigator();
res.body() = "Built a sensor hierarchy tree of size " + std::to_string(navigator->getTreeSize()) + " and depth " + std::to_string(navigator->getTreeDepth()) + ".\n";
res.result(http::status::ok);
}
}
......@@ -32,6 +32,7 @@
#include "analyticscontroller.h"
#include "mqttchecker.h"
#include "configuration.h"
/**
* @brief Class providing a RESTful API to collect agent via network (HTTPs only).
......@@ -137,6 +138,17 @@ private:
*/
void PUT_analytics_unload(endpointArgs);
/**
* PUT "/analytics/navigator"
*
* @brief Reloads the sensor navigator.
*
* Queries | key | possible values | explanation
* -------------------------------------------------------------------------
* Required | - | - | -
*/
void PUT_analytics_navigator(endpointArgs);
SensorCache* _sensorCache;
AnalyticsController* _analyticsController;
};
......
......@@ -211,3 +211,41 @@ bool AnalyticsController::publishSensors() {
LOG(info) << "Sensor name auto-publish performed for " << publishCtr << " sensors!";
return true;
}
bool AnalyticsController::rebuildSensorNavigator() {
QueryEngine &qEngine = QueryEngine::getInstance();
// Locking access to the QueryEngine
qEngine.lock();
_navigator = std::make_shared<SensorNavigator>();
vector<string> topics;
list<DCDB::PublicSensor> publicSensors;
SensorMetadata sBuf;
// Fetching sensor names and topics from the Cassandra datastore
if(_dcdbCfg->getPublicSensorsVerbose(publicSensors)!=SC_OK)
LOG(error) << "Failed to retrieve public sensors. Sensor Navigator will be empty.";
for (const auto &s : publicSensors)
if (!s.is_virtual) {
sBuf = Configuration::publicSensorToMetadata(s);
topics.push_back(sBuf.pattern);
}
publicSensors.clear();
// Building the sensor navigator
try {
_navigator->setFilter(_settings.analyticsSettings.filter);
_navigator->buildTree(_settings.analyticsSettings.hierarchy, &topics);
} catch (const std::invalid_argument &e) {
_navigator->clearTree();
qEngine.getNavigator()->clearTree();
qEngine.unlock();
return false;
}
topics.clear();
// Assigning the newly-built sensor navigator to the QueryEngine
qEngine.setNavigator(_navigator);
// Unlocking the QueryEngine
qEngine.unlock();
return true;
}
......@@ -185,6 +185,16 @@ public:
* @return Reference to this object's boost::asio::io_service.
*/
boost::asio::io_service& getIoService() { return _io; }
/**
* @brief Rebuilds the internal sensor navigator.
*
* This method does not rely on a MetadataStore, unlike initialize(), but queries the list of public
* sensors on its own; this was done to prevent race conditions with the collectagent's MetadataStore.
*
* @return True if successful, false otherwise
*/
bool rebuildSensorNavigator();
private:
......
......@@ -50,7 +50,6 @@
#include <string>
#include <boost/date_time/posix_time/posix_time.hpp>
#include <boost/algorithm/string/trim.hpp>
#include <dcdb/connection.h>
#include <dcdb/sensordatastore.h>
......@@ -123,6 +122,9 @@ bool jobQueryCallback(const uint32_t jobId, const uint64_t startTs, const uint64
}
bool sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>& buffer, const bool rel) {
// Returning NULL if the query engine is being updated
if(queryEngine.updating.load()) return false;
++queryEngine.access;
std::string topic=name;
// Getting the topic of the queried sensor from the Navigator
// If not found, we try to use the input name as topic
......@@ -131,12 +133,16 @@ bool sensorQueryCallback(const string& name, const uint64_t startTs, const uint6
} catch(const std::domain_error& e) {}
DCDB::SensorId sid;
// Creating a SID to perform the query
if(!sid.mqttTopicConvert(topic))
if(!sid.mqttTopicConvert(topic)) {
--queryEngine.access;
return false;
}
if(mySensorCache.getSensorMap().count(sid) > 0) {
CacheEntry &entry = mySensorCache.getSensorMap()[sid];
if (entry.getView(startTs, endTs, buffer, rel))
if (entry.getView(startTs, endTs, buffer, rel)) {
--queryEngine.access;
return true;
}
}
// If we are here then the sensor was not found in the cache - we need to fetch data from Cassandra
try {
......@@ -151,8 +157,10 @@ bool sensorQueryCallback(const string& name, const uint64_t startTs, const uint6
uint64_t endTsInt = rel ? now - endTs : endTs;
DCDB::TimeStamp start(startTsInt), end(endTsInt);
sensor.query(results, start, end, DCDB::AGGREGATE_NONE, 3600000000000);
if(results.empty())
if(results.empty()) {
--queryEngine.access;
return false;
}
reading_t reading;
for (const auto &r : results) {
reading.value = r.value;
......@@ -161,8 +169,10 @@ bool sensorQueryCallback(const string& name, const uint64_t startTs, const uint6
}
}
catch(const std::exception& e) {
--queryEngine.access;
return false;
}
--queryEngine.access;
return true;
}
......@@ -500,12 +510,12 @@ int main(int argc, char* const argv[]) {
// Fetching public sensor information from the Cassandra datastore
list<DCDB::PublicSensor> publicSensors;
metadataStore = new MetadataStore();
mySensorConfig->getPublicSensorsVerbose(publicSensors);
if(mySensorConfig->getPublicSensorsVerbose(publicSensors)!=SC_OK)
LOG(error) << "Failed to retrieve public sensors. Metadata Store and Sensor Navigator will be empty.";
SensorMetadata sBuf;
for (const auto &s : publicSensors)
if (!s.is_virtual) {
sBuf = Configuration::publicSensorToMetadata(s);
boost::algorithm::trim(sBuf.pattern);
metadataStore->store(sBuf.pattern, sBuf);
}
......@@ -590,10 +600,10 @@ int main(int argc, char* const argv[]) {
*/
CARestAPI* httpsServer = nullptr;
if (restAPISettings.enabled) {
httpsServer = new CARestAPI(restAPISettings, &mySensorCache, analyticsController);
config.readRestAPIUsers(httpsServer);
httpsServer->start();
LOG(info) << "HTTP Server running...";
httpsServer = new CARestAPI(restAPISettings, &mySensorCache, analyticsController);
config.readRestAPIUsers(httpsServer);
httpsServer->start();
LOG(info) << "HTTP Server running...";
}
/*
......
......@@ -97,7 +97,11 @@ SensorMetadata Configuration::publicSensorToMetadata(const DCDB::PublicSensor& p
SensorMetadata sm;
sm.publicName = ps.name;
sm.isVirtual = ps.is_virtual;
// Stripping whitespace from the sensor pattern in the SID
sm.pattern = ps.pattern;
boost::algorithm::trim(sm.pattern);
sm.unit = ps.unit;
sm.scale = ps.scaling_factor;
sm.ttl = ps.ttl;
......
......@@ -30,6 +30,7 @@
#include <string>
#include <unistd.h>
#include <boost/algorithm/string/trim.hpp>
#include "logging.h"
#include "globalconfiguration.h"
......
......@@ -65,6 +65,7 @@ RestAPI::RestAPI(serverSettings_t settings,
addEndpoint("/analytics/reload", {http::verb::put, stdBind(PUT_analytics_reload)});
addEndpoint("/analytics/load", {http::verb::put, stdBind(PUT_analytics_load)});
addEndpoint("/analytics/unload", {http::verb::put, stdBind(PUT_analytics_unload)});
addEndpoint("/analytics/navigator", {http::verb::put, stdBind(PUT_analytics_navigator)});
}
void RestAPI::GET_help(endpointArgs) {
......@@ -424,10 +425,22 @@ void RestAPI::PUT_analytics_unload(endpointArgs) {
reloadQueryEngine();
}
void RestAPI::reloadQueryEngine(const bool force) {
void RestAPI::PUT_analytics_navigator(endpointArgs) {
unloadQueryEngine();
if(!reloadQueryEngine(true)) {
res.body() = "Sensor hierarchy tree could not be rebuilt.\n";
res.result(http::status::internal_server_error);
} else {
std::shared_ptr<SensorNavigator> navigator = QueryEngine::getInstance().getNavigator();
res.body() = "Built a sensor hierarchy tree of size " + std::to_string(navigator->getTreeSize()) + " and depth " + std::to_string(navigator->getTreeDepth()) + ".\n";
res.result(http::status::ok);
}
}
bool RestAPI::reloadQueryEngine(const bool force) {
//Updating the SensorNavigator on plugin reloads, if operator plugins are currently running
if (!force && (_manager==NULL || _manager->getPlugins().empty()))
return;
return false;
QueryEngine &qEngine = QueryEngine::getInstance();
std::shared_ptr <std::map<std::string, SBasePtr>> sensorMap = std::make_shared<std::map<std::string, SBasePtr>>();
......@@ -458,14 +471,17 @@ void RestAPI::reloadQueryEngine(const bool force) {
try {
navigator->buildTree(qEngine.getSensorHierarchy(), &topics);
} catch(const std::exception& e) {
navigator->clearTree();
qEngine.getNavigator()->clearTree();
qEngine.unlock();
return;
return false;
}
qEngine.setSensorMap(sensorMap);
qEngine.setNavigator(navigator);
// Unlocking the QueryEngine
qEngine.unlock();
return true;
}
void RestAPI::unloadQueryEngine() {
......
......@@ -248,10 +248,21 @@ private:
*/
void PUT_analytics_unload(endpointArgs);
/**
* PUT "/analytics/navigator"
*
* @brief Reloads the sensor navigator.
*
* Queries | key | possible values | explanation
* -------------------------------------------------------------------------
* Required | - | - | -
*/
void PUT_analytics_navigator(endpointArgs);
/******************************************************************************/
// Utility method to reload the sensor navigator whenever needed
void reloadQueryEngine(const bool force=false);
bool reloadQueryEngine(const bool force=false);
void unloadQueryEngine();
PluginManager* _pluginManager;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment