Commit 83c38fc1 authored by Alessio Netti's avatar Alessio Netti
Browse files

On-demand units cache

- Analyzers in on-demand mode now cache units that have been queried
and resolved recently, to speed up response times and reduce overhead
- Once the cache reaches its maximum size, the oldest entry is removed
upon every insertion
parent c65c6d53
......@@ -4,7 +4,8 @@
#include "SensorNavigator.h"
const string SensorNavigator::rootKey = "root";
const string SensorNavigator::rootKey = "__root__";
const string SensorNavigator::templateKey = "__template__";
void SensorNavigator::clearTree() {
if(_sensorTree) {
......
......@@ -28,6 +28,8 @@ public:
//Identifies the root supernode
static const string rootKey;
//Template identifier
static const string templateKey;
//Internal structure to store nodes
struct Node {
......@@ -145,7 +147,7 @@ public:
* @param depth maximum depth of the desired subtree
* @return The internal map used to store the sensor tree (or subtree)
*/
map<string, Node> *getSubTree(const string& node="root", int depth=INT_MAX);
map<string, Node> *getSubTree(const string& node="__root__", int depth=INT_MAX);
/**
* @brief Returns all nodes in the sensor tree at a certain depth
......@@ -171,7 +173,7 @@ public:
* @param recursive Enabled recursive node retrieval
* @return A set of node names
*/
set<string> *getNodes(const string& node="root", bool recursive=true);
set<string> *getNodes(const string& node="__root__", bool recursive=true);
/**
* @brief Returns all sensors in the sensor tree at a certain depth
......@@ -197,7 +199,7 @@ public:
* @param recursive Enabled recursive sensor retrieval
* @return A set of sensor names
*/
set<string> *getSensors(const string& node="root", bool recursive=true);
set<string> *getSensors(const string& node="__root__", bool recursive=true);
/**
* @brief Determines whether a node in the tree is a sensor
......@@ -265,7 +267,7 @@ public:
* @param direction Defines where navigation should be heading
* @return The set of destination nodes on the given navigation path
*/
set<string> *navigate(const string& node="root", int direction=1);
set<string> *navigate(const string& node="__root__", int direction=1);
/**
* @brief Builds a MQTT topic for a new sensors associated with a given node
......
......@@ -369,7 +369,7 @@ protected:
an.addUnit(u);
} else {
if (unit(*u)) {
an.addUnit(u);
an.addToOndemandCache(u);
LOG(debug) << " Template unit for on-demand operation generated!";
} else {
LOG(error) << " Template unit " << u->getName() << " did not pass the final check!";
......
......@@ -36,6 +36,8 @@ protected:
public:
static const unsigned cacheLimit = 100;
/**
* @brief Class constructor
*
......@@ -43,6 +45,8 @@ public:
*/
AnalyzerTemplate(const string name) :
AnalyzerInterface(name),
_ondemandCache(nullptr),
_insertionLUT(nullptr),
_queryEngine(QueryEngine::getInstance()) {}
/**
......@@ -87,6 +91,15 @@ public:
virtual ~AnalyzerTemplate() {
_units.clear();
_baseUnits.clear();
if(_ondemandCache) {
_ondemandCache->clear();
delete _ondemandCache;
}
if(_insertionLUT) {
_insertionLUT->clear();
delete _insertionLUT;
}
}
/**
......@@ -199,40 +212,54 @@ public:
* @param node Unit name for which the query must be performed
* @return a map<string, reading_t> containing the output of the query
*/
virtual map<string, reading_t> computeOnDemand(const string& node="root") override {
virtual map<string, reading_t> computeOnDemand(const string& node="__root__") override {
map<string, reading_t> outMap;
if( !_streaming ) {
shared_ptr<SensorNavigator> navi = _queryEngine.getNavigator();
UnitGenerator<S> unitGen(navi);
// We check whether the input node belongs to this analyzer's unit domain
if(_units.size()==0)
throw std::runtime_error("Initialization issue in analyzer " + _name + "!");
// If the ondemand template unit refers to root it means it has been completely resolved already,
// and therefore we can duplicate such unit without doing any resolution
U_Ptr tempUnit = nullptr;
if(_units[0]->getName() == SensorNavigator::rootKey) {
if (node == SensorNavigator::rootKey)
tempUnit = unitGen.duplicateUnit(_units[0]);
else
throw domain_error("UnitGenerator: Node " + node + " does not belong to this unit domain!");
} else
tempUnit = unitGen.generateUnit(node, _units[0]->getInputs(), _units[0]->getOutputs() , _units[0]->getInputMode(), "");
for(const auto s : tempUnit->getOutputs())
s->initSensor(_cacheSize);
if(!_ondemandCache)
throw std::runtime_error("Initialization error in analyzer " + _name + "!");
// Getting exclusive access to the analyzer
while( _onDemandLock.exchange(true) ) {}
addUnit(tempUnit);
int onDemandUnitID = _units.size() - 1;
compute(onDemandUnitID);
// If the ondemand template unit refers to root it means it has been completely resolved already,
// and therefore we can use such unit without doing any resolution
try {
U_Ptr tempUnit = nullptr;
if (_ondemandCache->count(node)) {
LOG(debug) << "Analyzer " << _name << ": cache hit for unit " << node << ".";
tempUnit = _ondemandCache->at(node);
} else {
if (!_ondemandCache->count(SensorNavigator::templateKey))
throw std::runtime_error("No template unit in analyzer " + _name + "!");
LOG(debug) << "Analyzer " << _name << ": cache miss for unit " << node << ".";
U_Ptr uTemplate = _ondemandCache->at(SensorNavigator::templateKey);
tempUnit = unitGen.generateUnit(node, uTemplate->getInputs(), uTemplate->getOutputs(), uTemplate->getInputMode(), "");
addToOndemandCache(tempUnit);
}
// Initializing sensors if necessary
for (const auto s : tempUnit->getOutputs())
if (!s->isInit())
s->initSensor(_cacheSize);
addUnit(tempUnit);
int onDemandUnitID = _units.size() - 1;
compute(onDemandUnitID);
for(const auto& o : _units[onDemandUnitID]->getOutputs())
outMap.insert(make_pair(o->getName(), o->getLatestValue()));
for (const auto &o : _units[onDemandUnitID]->getOutputs()) {
outMap.insert(make_pair(o->getName(), o->getLatestValue()));
o->clearReadingQueue();
}
_units.erase(_units.begin() + onDemandUnitID);
_baseUnits.erase(_baseUnits.begin() + onDemandUnitID);
_units.erase(_units.begin() + onDemandUnitID);
_baseUnits.erase(_baseUnits.begin() + onDemandUnitID);
} catch(const exception& e) {
_onDemandLock.store(false);
throw;
}
_onDemandLock.store(false);
} else if( _keepRunning ) {
......@@ -247,10 +274,36 @@ public:
if(!found)
throw std::domain_error("Node " + node + " does not belong to the domain of " + _name + "!");
} else
throw std::runtime_error("Analyzer " + _name + " not available for on-demand query!");
throw std::runtime_error("Analyzer " + _name + ": not available for on-demand query!");
return outMap;
}
/**
* @brief Adds an unit to the internal cache of on-demand units
*
* The cache is used to speed up response times to queries of on-demand analyzers, and reduce
* overall overhead. The cache has a limited size: once this size is reached, at every insertion
* the oldest entry in the cache is removed.
*
* @param unit Shared pointer to the Unit objecy to be added to the cache
*/
void addToOndemandCache(U_Ptr unit) {
if(!_ondemandCache) {
_ondemandCache = new map<string, U_Ptr>();
_insertionLUT = new map<uint64_t, U_Ptr>();
}
if(_ondemandCache->size() >= cacheLimit) {
U_Ptr oldest = _insertionLUT->begin()->second;
_ondemandCache->erase(oldest->getName());
}
_ondemandCache->insert(make_pair(unit->getName(), unit));
// The template unit must never be deleted, even if the cache is full; therefore, we omit its entry from
// the insertion time LUT, so that it is never picked for deletion
if(unit->getName() != SensorNavigator::templateKey)
_insertionLUT->insert(make_pair(getTimestamp(), unit));
}
protected:
/**
......@@ -286,11 +339,15 @@ protected:
*
*/
virtual void computeAsync() override {
if(_duplicate && _unitID>=0)
compute(_unitID);
else
for(int i=0; i < _units.size(); i++)
compute(i);
try {
if (_duplicate && _unitID >= 0)
compute(_unitID);
else
for (int i = 0; i < _units.size(); i++)
compute(i);
} catch(const exception& e) {
LOG(error) << "Analyzer " + _name + ": internal error " + e.what() + " during computation!";
}
if (_timer && _keepRunning) {
_timer->expires_at(timestamp2ptime(nextReadingTime()));
......@@ -300,6 +357,10 @@ protected:
_pendingTasks--;
}
// Cache for frequently used units in ondemand mode
map<string, U_Ptr>* _ondemandCache;
// Helper map to keep track of the cache insertion times
map<uint64_t, U_Ptr>* _insertionLUT;
// Vector of pointers to the internal units
vector<U_Ptr> _units;
// Vector of pointers to the internal units, casted to UnitInterface - only efficient way to do this in C++
......
......@@ -176,11 +176,12 @@ public:
} catch( const exception& e) {
delete units;
delete unitObjects;
throw e;
throw;
}
}
else {
shared_ptr<UnitTemplate<SBase>> unPtr = make_shared<UnitTemplate<SBase>>("template", inputs, outputs);
if(ondemand) {
shared_ptr<UnitTemplate<SBase>> unPtr = make_shared<UnitTemplate<SBase>>(SensorNavigator::templateKey, inputs, outputs);
unPtr->setInputMode(inputMode);
unitObjects->push_back(unPtr);
}
......
......@@ -106,6 +106,7 @@ public:
unsigned getSubsampling() const { return _subsamplingFactor; }
const reading_t * const getCache() const { return _cache.get(); }
const reading_t& getLatestValue() const { return _latestValue; }
const bool isInit() const { return _cache && _readingQueue; }
void setSkipConstVal(bool skipConstVal) { _skipConstVal = skipConstVal; }
void setDelta(const bool delta) { _delta = delta; }
......@@ -117,6 +118,7 @@ public:
const std::size_t getSizeOfReadingQueue() const { return _readingQueue->read_available(); }
std::size_t popReadingQueue(reading_t *reads, std::size_t max) const { return _readingQueue->pop(reads, max); }
void clearReadingQueue() const { reading_t buf; while(_readingQueue->pop(buf)) {} }
void pushReadingQueue(reading_t *reads, std::size_t count) const { _readingQueue->push(reads, count); }
void initSensor(unsigned interval) {
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment