Commit 68a61f34 authored by Alessio Netti's avatar Alessio Netti
Browse files

On-demand data analytics

- By setting the "streaming" attribute to false, users
can instantiate analyzers that are available only on-demand for requests
on the REST API
- The input/output config will be used to instantiate an unit on the fly
starting from the node queried by the user
- Computation is performed and the result is returned, no data is pushed
on MQTT or to the database
parent e0cb93a1
......@@ -136,22 +136,23 @@ bool AnalyticsManager::mqttCheck(pluginVector_t& pushers) {
// Check if an MQTT-suffix was assigned twice and if it is correctly formatted
for(const auto& p : _plugins)
for(const auto& a : p.configurator->getAnalyzers())
for(const auto& u : a->getUnits())
for(const auto& o: u->getBaseOutputs()) {
std::string str(o->getMqtt());
str.erase(std::remove(str.begin(), str.end(), '/'), str.end());
if (str.length() != 28) {
LOG(error) << "MQTT-Topic \"" << o->getMqtt() << "\" contains " << str.length() << " hex characters, not 28 as required!";
return false;
}
if( a->getStreaming() )
for(const auto& u : a->getUnits())
for(const auto& o: u->getBaseOutputs()) {
std::string str(o->getMqtt());
str.erase(std::remove(str.begin(), str.end(), '/'), str.end());
if (str.length() != 28) {
LOG(error) << "MQTT-Topic \"" << o->getMqtt() << "\" contains " << str.length() << " hex characters, not 28 as required!";
return false;
}
auto returnIt = _mqttTopics.insert(str);
if (!returnIt.second) {
LOG(error) << "MQTT-Topic \"" << o->getMqtt() << "\" used twice!";
return false;
auto returnIt = _mqttTopics.insert(str);
if (!returnIt.second) {
LOG(error) << "MQTT-Topic \"" << o->getMqtt() << "\" used twice!";
return false;
}
}
}
_mqttTopics.clear();
return true;
}
......@@ -397,6 +398,42 @@ restResponse_t AnalyticsManager::REST(const vector<string>& pathStrs, const vect
else if(!start(plugin))
throw runtime_error("Plugin cannot be restarted!");
reply.response = "Plugin " + plugin + ": Sensors reloaded";
} else if (action == "compute") {
if(pathStrs.size() < 4)
throw invalid_argument("Received malformed request, no third path part!");
string unit = "root";
for (auto& p : queries)
if (p.first == "unit")
unit = p.second;
bool found = false;
for (auto &p : _plugins)
if (p.id == plugin)
for (auto &a : p.configurator->getAnalyzers())
if( a->getName() == analyzer ) {
found = true;
map<string, reading_t> outMap = a->computeOnDemand(unit);
if (json) {
boost::property_tree::ptree root, outputs;
// Iterating through the outputs of the on-demand computation and adding them to a JSON
for (const auto& kv : outMap) {
boost::property_tree::ptree sensor;
sensor.push_back(boost::property_tree::ptree::value_type("timestamp", boost::property_tree::ptree(to_string(kv.second.timestamp))));
sensor.push_back(boost::property_tree::ptree::value_type("value", boost::property_tree::ptree(to_string(kv.second.value))));
outputs.push_back(boost::property_tree::ptree::value_type(kv.first, sensor));
}
root.add_child(a->getName(), outputs);
boost::property_tree::write_json(data, root, true);
} else {
for (const auto& kv : outMap)
data << kv.first << " ts: " << kv.second.timestamp << " v: " << kv.second.value << "\n";
}
reply.data = data.str();
break;
}
if(!found)
throw domain_error("Plugin or analyzer not found!");
} else {
// Managing custom REST PUT actions defined at the analyzer level
bool found = false;
......
......@@ -175,6 +175,10 @@ public:
" -PUT: /analytics/[plugin]/[analyzer]/[start|stop|reload]\n"
" Start/stop the analyzers of the plugin or\n"
" reload the plugin configuration\n"
" /analytics/[plugin]/[analyzer]/compute\n"
" Perform computation of the given analyzer\n"
" in real-time. A \"unit\" query, specifying\n"
" the target unit must be included\n"
" /analytics/[plugin]/[analyzer]/[action]\n"
" Perform plugin-specific actions\n"
" (refer to documentation)\n"
......
......@@ -13,7 +13,6 @@
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/info_parser.hpp>
#include "AnalyzerTemplate.h"
#include "UnitGenerator.h"
#include "AnalyzerConfiguratorInterface.h"
#include "../../includes/SensorBase.h"
......@@ -275,7 +274,7 @@ protected:
*/
bool readAnalyzer(Analyzer& an, CFG_VAL config) {
// Vectors containing "prototype" inputs and outputs to be modified with the actual compute units
std::vector<SBase> protoInputs, protoOutputs;
std::vector<shared_ptr<SBase>> protoInputs, protoOutputs;
inputMode_t inputMode = SELECTIVE;
// Check for the existence of a template definition to initialize the analyzer
boost::optional<boost::property_tree::iptree&> def = config.get_child_optional("default");
......@@ -316,7 +315,8 @@ protected:
LOG(debug) << " I/O " << _baseName << " " << valInner.second.data();
SBase sensor = SBase(valInner.second.data());
if (readSensorBase(sensor, valInner.second)) {
val.first==INPUT_BLOCK ? protoInputs.push_back(sensor) : protoOutputs.push_back(sensor);
shared_ptr<SBase> sensorPtr = make_shared<SBase>(sensor);
val.first==INPUT_BLOCK ? protoInputs.push_back(sensorPtr) : protoOutputs.push_back(sensorPtr);
} else {
LOG(warning) << "I/O " << _baseName << " " << an.getName() << "::" << sensor.getName() << " could not be read! Omitting";
}
......@@ -337,32 +337,46 @@ protected:
// Instantiating units and returning the result
if(!an.getTemplate()) {
vector< shared_ptr<UnitTemplate<SBase>>> *units=NULL;
vector <shared_ptr<UnitTemplate<SBase>>> *units = NULL;
try {
units = _unitGen.generateUnits(protoInputs, protoOutputs, inputMode, _mqttPrefix + an.getMqttPart()); }
catch(const std::exception& e) {
units = _unitGen.generateUnits(protoInputs, protoOutputs, inputMode, _mqttPrefix + an.getMqttPart(),
!an.getStreaming());
}
catch (const std::exception &e) {
LOG(error) << _analyzerName << " " << an.getName() << ": Error when creating units: " << e.what();
delete units;
return false; }
for(auto& u: *units) {
LOG(debug) << " Unit \"" << u->getName() << "\"";
for(const auto& i : u->getInputs())
LOG(debug) << " Input " << i->getName();
for(const auto& o : u->getOutputs())
LOG(debug) << " Output " << o->getName() << " using MQTT-topic \"" << o->getMqtt() << "\"";
if(!unit(*u)) {
LOG(error) << " Unit " << u->getName() << " did not pass the final check!";
an.clearUnits();
delete units;
return false; }
else
an.addUnit(u);
return false;
}
for (auto &u: *units) {
if (an.getStreaming()) {
LOG(debug) << " Unit \"" << u->getName() << "\"";
for (const auto &i : u->getInputs())
LOG(debug) << " Input " << i->getName();
for (const auto &o : u->getOutputs())
LOG(debug) << " Output " << o->getName() << " using MQTT-topic \"" << o->getMqtt() << "\"";
if (!unit(*u)) {
LOG(error) << " Unit " << u->getName() << " did not pass the final check!";
an.clearUnits();
delete units;
return false;
} else
an.addUnit(u);
} else {
if (unit(*u)) {
an.addUnit(u);
LOG(debug) << " Template unit for on-demand operation generated!";
} else {
LOG(error) << " Template unit " << u->getName() << " did not pass the final check!";
an.clearUnits();
delete units;
return false;
}
}
}
delete units;
}
return true;
}
......
......@@ -8,15 +8,18 @@
#include <atomic>
#include <memory>
#include <vector>
#include <map>
#include <boost/asio.hpp>
#include "../../includes/Logging.h"
#include "UnitInterface.h"
using namespace std;
// Struct defining a response to a REST request
typedef struct {
std::string response;
std::string data;
string response;
string data;
} restResponse_t;
/**
......@@ -37,7 +40,7 @@ public:
*
* @param name Name of the analyzer
*/
AnalyzerInterface(const std::string& name) :
AnalyzerInterface(const string& name) :
_name(name),
_mqttPart(""),
_isTemplate(false),
......@@ -51,6 +54,7 @@ public:
_cacheInterval(900000),
_cacheSize(1),
_pendingTasks(0),
_onDemandLock(false),
_timer(nullptr) {}
/**
......@@ -69,9 +73,9 @@ public:
_interval(other._interval),
_cacheInterval(other._cacheInterval),
_cacheSize(other._cacheSize),
_timer(nullptr) {
_pendingTasks.store(other._pendingTasks.load());
}
_pendingTasks(0),
_onDemandLock(false),
_timer(nullptr) {}
/**
* @brief Class destructor
......@@ -94,7 +98,8 @@ public:
_interval = other._interval;
_cacheInterval = other._cacheInterval;
_cacheSize = other._cacheSize;
_pendingTasks.store(other._pendingTasks.load());
_pendingTasks.store(0);
_onDemandLock.store(false);
_timer = nullptr;
return *this;
......@@ -136,7 +141,7 @@ public:
*
* @return Response to the request as a <response, data> pair
*/
virtual restResponse_t REST(const std::string& action, const std::vector<std::pair<std::string,std::string>>& queries) = 0;
virtual restResponse_t REST(const string& action, const vector<pair<string,string>>& queries) = 0;
/**
* @brief Starts this analyzer
......@@ -172,9 +177,22 @@ public:
*/
virtual void clearUnits() = 0;
/**
* @brief Performs an on-demand compute task
*
* Unlike the protected computeAsync and compute methods, computeOnDemand allows to interactively
* perform data analytics queries on the analyzer, which must have the _streaming attribute set
* to false. A unit is generated on the fly, corresponding to the input node given as input,
* and results are returned in the form of a map.
*
* @param node Sensor tree node that defines the query
* @return a map<string, reading_t> containing the output of the query
*/
virtual map<string, reading_t> computeOnDemand(const string& node="") = 0;
// Getter methods
const std::string& getName() const { return _name; }
const std::string& getMqttPart() const { return _mqttPart; }
const string& getName() const { return _name; }
const string& getMqttPart() const { return _mqttPart; }
bool getTemplate() const { return _isTemplate; }
bool getSync() const { return _sync; }
bool getDuplicate() const { return _duplicate; }
......@@ -185,8 +203,8 @@ public:
int getUnitID() const { return _unitID; }
// Setter methods
void setName(const std::string& name) { _name = name; }
void setMqttPart(const std::string& mqttPart) { _mqttPart = mqttPart; }
void setName(const string& name) { _name = name; }
void setMqttPart(const string& mqttPart) { _mqttPart = mqttPart; }
void setTemplate(bool t) { _isTemplate = t; }
void setSync(bool sync) { _sync = sync; }
void setUnitID(int u) { _unitID = u; }
......@@ -195,7 +213,7 @@ public:
void setMinValues(unsigned minValues) { _minValues = minValues; }
void setInterval(unsigned interval) { _interval = interval; }
void setCacheInterval(unsigned cacheInterval) { _cacheInterval = cacheInterval; }
virtual std::vector<UnitPtr>& getUnits() = 0;
virtual vector<UnitPtr>& getUnits() = 0;
protected:
......@@ -221,9 +239,9 @@ protected:
// Name of this analyzer
std::string _name;
string _name;
// MQTT part (see docs) of this analyzer
std::string _mqttPart;
string _mqttPart;
// To distinguish between templates and actual analyzers
bool _isTemplate;
......@@ -246,15 +264,17 @@ protected:
// Real size of the cache, as determined from cacheInterval
unsigned int _cacheSize;
// Number of pending ASIO tasks
std::atomic_uint _pendingTasks;
atomic_uint _pendingTasks;
// Lock used to serialize access to the ondemand functionality
atomic_bool _onDemandLock;
// Timer for scheduling tasks
std::unique_ptr<boost::asio::deadline_timer> _timer;
unique_ptr<boost::asio::deadline_timer> _timer;
// Logger object
boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
};
//for better readability
using AnalyzerPtr = std::shared_ptr<AnalyzerInterface>;
using AnalyzerPtr = shared_ptr<AnalyzerInterface>;
#endif //PROJECT_ANALYZERINTERFACE_H
......@@ -7,6 +7,7 @@
#include "AnalyzerInterface.h"
#include "UnitTemplate.h"
#include "UnitGenerator.h"
#include "timestamp.h"
#include "QueryEngine.h"
......@@ -14,6 +15,7 @@
#include <map>
#include <memory>
using namespace std;
/**
* Template that implements features needed by Analyzers and complying to AnalyzerInterface.
......@@ -25,12 +27,12 @@
template <typename S>
class AnalyzerTemplate : public AnalyzerInterface {
// The template shall only be instantiated for classes which derive from SensorBase
static_assert(std::is_base_of<SensorBase, S>::value, "S must derive from SensorBase!");
static_assert(is_base_of<SensorBase, S>::value, "S must derive from SensorBase!");
protected:
// For readability
using S_Ptr = std::shared_ptr<S>;
using U_Ptr = std::shared_ptr< UnitTemplate<S> >;
using S_Ptr = shared_ptr<S>;
using U_Ptr = shared_ptr< UnitTemplate<S> >;
public:
......@@ -39,7 +41,7 @@ public:
*
* @param name Name of the analyzer
*/
AnalyzerTemplate(const std::string name) :
AnalyzerTemplate(const string name) :
AnalyzerInterface(name),
_queryEngine(QueryEngine::getInstance()) {}
......@@ -95,7 +97,7 @@ public:
virtual void addUnit(UnitPtr u) override {
// Since the AnalyzerInterface method accepts UnitInterface objects, we must cast the input argument
// to its actual type, which is UnitTemplate<S>
if (U_Ptr dUnit = std::dynamic_pointer_cast< UnitTemplate<S> >(u)) {
if (U_Ptr dUnit = dynamic_pointer_cast< UnitTemplate<S> >(u)) {
_units.push_back(dUnit);
_baseUnits.push_back(u);
}
......@@ -111,7 +113,7 @@ public:
*
* @return The vector of UnitInterface objects of this analyzer
*/
virtual std::vector<UnitPtr>& getUnits() override { return _baseUnits; }
virtual vector<UnitPtr>& getUnits() override { return _baseUnits; }
/**
* @brief Clears all the units contained in this analyzer
......@@ -150,7 +152,7 @@ public:
_keepRunning = 1;
_pendingTasks++;
_timer->async_wait(std::bind(&AnalyzerTemplate<S>::computeAsync, this));
_timer->async_wait(bind(&AnalyzerTemplate<S>::computeAsync, this));
LOG(info) << "Analyzer " << _name << " started.";
}
......@@ -182,8 +184,61 @@ public:
*
* @return Response to the request as a <response, data> pair
*/
virtual restResponse_t REST(const std::string& action, const std::vector<std::pair<std::string,std::string>>& queries) override {
throw std::invalid_argument("Unknown plugin action " + action + " requested!");
virtual restResponse_t REST(const string& action, const vector<pair<string,string>>& queries) override {
throw invalid_argument("Unknown plugin action " + action + " requested!");
}
/**
* @brief Performs an on-demand compute task
*
* Unlike the protected computeAsync and compute methods, computeOnDemand allows to interactively
* perform data analytics queries on the analyzer, which must have the _streaming attribute set
* to false. A unit is generated on the fly, corresponding to the input node given as input,
* and results are returned in the form of a map.
*
* @param node Unit name for which the query must be performed
* @return a map<string, reading_t> containing the output of the query
*/
virtual map<string, reading_t> computeOnDemand(const string& node="") override {
map<string, reading_t> outMap;
if( !_streaming ) {
UnitGenerator<S> unitGen(_queryEngine.getNavigator());
// We check whether the input node belongs to this analyzer's unit domain
if(_units.size()==0)
throw std::runtime_error("Initialization issue in analyzer " + _name + "!");
U_Ptr tempUnit = unitGen.generateUnit(node, _units[0]->getInputs(), _units[0]->getOutputs() , _units[0]->getInputMode(), "");
for(const auto s : tempUnit->getOutputs())
s->initSensor(_cacheSize);
// Getting exclusive access to the analyzer
while( _onDemandLock.exchange(true) ) {}
addUnit(tempUnit);
int onDemandUnitID = _units.size() - 1;
compute(onDemandUnitID);
for(const auto& o : _units[onDemandUnitID]->getOutputs())
outMap.insert(make_pair(o->getName(), o->getLatestValue()));
_units.erase(_units.begin() + onDemandUnitID);
_baseUnits.erase(_baseUnits.begin() + onDemandUnitID);
_onDemandLock.store(false);
} else if( _keepRunning ) {
bool found = false;
for(const auto& u : _units)
if(u->getName() == node) {
found = true;
for(const auto& o : u->getOutputs())
outMap.insert(make_pair(o->getName(), o->getLatestValue()));
}
if(!found)
throw std::domain_error("Node " + node + " does not belong to the domain of " + _name + "!");
} else
throw std::runtime_error("Analyzer " + _name + " not available for on-demand query!");
return outMap;
}
protected:
......@@ -230,18 +285,18 @@ protected:
if (_timer && _keepRunning) {
_timer->expires_at(timestamp2ptime(nextReadingTime()));
_pendingTasks++;
_timer->async_wait(std::bind(&AnalyzerTemplate::computeAsync, this));
_timer->async_wait(bind(&AnalyzerTemplate::computeAsync, this));
}
_pendingTasks--;
}
// Vector of pointers to the internal units
std::vector<U_Ptr> _units;
vector<U_Ptr> _units;
// Vector of pointers to the internal units, casted to UnitInterface - only efficient way to do this in C++
// unless we use raw arrays
std::vector<UnitPtr> _baseUnits;
vector<UnitPtr> _baseUnits;
// Instance of a QueryEngine object to get sensor data
QueryEngine& _queryEngine;
QueryEngine& _queryEngine;
};
#endif //PROJECT_ANALYZERTEMPLATE_H
......@@ -13,8 +13,6 @@
using namespace std;
typedef enum inputMode_t { SELECTIVE = 1, ALL = 2, ALL_RECURSIVE = 3 } inputMode_t;
/**
* Helper template to generate Analyzer Units
*
......@@ -104,6 +102,9 @@ public:
*/
set<string> *resolveNodeLevelString(const string& s, const string& node, const bool replace=true) {
int level = parseNodeLevelString(s);
if(!_navi->nodeExists(node))
throw invalid_argument("UnitGenerator: Node " + node + " does not exist!");
set<string> *sensors = new set<string>();
if( level <= -1 )
sensors->insert(s);
......@@ -136,23 +137,24 @@ public:
* @param outputs The vector of "prototype" sensor objects for outputs
* @param inputMode Defines the method with which input sensors are instantiated for each unit
* @param mqttPrefix MQTT prefix to use for output sensors if only the "root" unit is defined
* @param ondemand If True, no unit resolution is performed and a raw unit template is stored
* @return A vector of shared pointers to the generated unit objects
*/
vector<shared_ptr<UnitTemplate<SBase>>> *generateUnits(vector<SBase>& inputs, vector<SBase>& outputs, inputMode_t inputMode, string mqttPrefix="") {
vector<shared_ptr<UnitTemplate<SBase>>> *generateUnits(vector<shared_ptr<SBase>>& inputs, vector<shared_ptr<SBase>>& outputs, inputMode_t inputMode, string mqttPrefix="", bool ondemand=false) {
// If no outputs are defined, no units can be instantiated
if((inputs.size()==0 && inputMode==SELECTIVE) || outputs.size() == 0)
if((inputs.empty() && inputMode==SELECTIVE) || outputs.empty())
throw invalid_argument("UnitGenerator: Invalid inputs or outputs!");
// Output sensors must share the same unit pattern
if(!isConsistent(outputs))
throw invalid_argument("UnitGenerator: Incoherent output levels!");
// We iterate over the outputs, and compute their depth level in the current sensor tree. From such depth level,
// the list of units is defined, consisting in all nodes in the sensor tree at the level of the outputs
int unitLevel = parseNodeLevelString(outputs[0].getName());
for(const auto& out : outputs)
if(unitLevel != parseNodeLevelString(out.getName()))
throw invalid_argument("UnitGenerator: Incoherent output levels!");
int unitLevel = parseNodeLevelString(outputs[0]->getName());
set<string>* units = NULL;
if(unitLevel > -1)
units = resolveNodeLevelString(outputs[0].getName(), _navi->rootKey, false);
units = resolveNodeLevelString(outputs[0]->getName(), _navi->rootKey, false);
// If no depth level was found (output sensor names do not contain any <unit-X> keyword) we assume that
// everything relates to root
else if(unitLevel == -1) {
......@@ -163,93 +165,166 @@ public:
if(!units || units->empty())
throw invalid_argument("UnitGenerator: Invalid output level or unit specification!");
// We iterate over the units, and resolve their inputs and outputs starting from the prototype definitions
vector<shared_ptr<SBase>> unitInputs, unitOutputs;
vector<shared_ptr<UnitTemplate<SBase>>> *unitObjects = new vector<shared_ptr<UnitTemplate<SBase>>>();
// AddedSensors keeps track of which sensor names were added already to the input set, to prevent duplicates
set<string> addedSensors, *sensors;
for(const auto& u : *units) {
unitInputs.clear();
unitOutputs.clear();
// Mapping inputs
for(const auto& in : inputs) {
// Depending on the relationship of an input prototype sensor to the output level, it could be
// mapped to one sensor or more: for example, if output has level <unit-1>, and an input sensor
// has level <unit-2>, than the input will be unique, and the sensor associated to the father of the
// unit. If the other way around, the input will consist of multiple sensors, one for each child of
// the unit
sensors = resolveNodeLevelString(in.getName(), u);
if (sensors->empty()) {
if(!ondemand)
for(const auto& u : *units) {
try {
unitObjects->push_back(_generateUnit(u, inputs, outputs, inputMode, mqttPrefix));
} catch( const exception& e) {
delete units;
delete sensors;
delete unitObjects;
throw invalid_argument("UnitGenerator: String " + in.getName() + " cannot be resolved!");
} else
for (const auto &s : *sensors) {
if (!addedSensors.count(s)) {
SBase uIn(in);
uIn.setName(s);
if (!_navi->sensorExists(uIn.getName())) {
delete units;
delete sensors;
delete unitObjects;
throw invalid_argument("UnitGenerator: Sensor " + uIn.getName() + " does not exist!");
}
addedSensors.insert(s);
unitInputs.push_back(make_shared<SBase>(uIn));
}
}
delete sensors;
throw e;
}
}
else {
shared_ptr<UnitTemplate<SBase>> unPtr = make_shared<UnitTemplate<SBase>>("__template__", inputs, outputs);
unPtr->setInputMode(inputMode);
unitObjects->push_back(unPtr);
}
delete units;
return unitObjects;