Commit 9c0d7df4 authored by Alessio Netti's avatar Alessio Netti

Analytics: JobAnalyzerTemplate class

- Template for Job Analyzers, derived from AnalyzerInterface
parent 642fce1a
......@@ -158,6 +158,7 @@ file. The following is instead a list of configuration parameters that are avail
| interval | Specifies how often the analyzer will be invoked to perform computations, and thus the sampling interval of its output sensors. Only used for analyzers in _streaming_ mode.
| relaxed | If set to _true_, the units of this analyzer will be instantiated even if some of the respective input sensors do not exist.
| delay | Delay in milliseconds to be applied to the start of the analyzer. This parameter only applies to streaming analyzers. It can be used to allow for input sensor caches to be populated before the analyzer is started.
| unitCacheLimit | Defines the maximum size of the unit cache that is used in the on-demand and job modes. Default is 1000.
| minValues | Minimum number of readings that need to be stored in output sensors before these are pushed as MQTT messages. Only used for analyzers in _streaming_ mode.
| mqttPart | Part of the MQTT topic associated to this analyzer. Only used when the Unit system is not employed (see this [section](#mqttTopics)).
| sync | If set to _true_, computation will be performed at time intervals synchronized with sensor readings.
......
......@@ -386,6 +386,8 @@ protected:
an.setDuplicate(to_bool(val.second.data()));
} else if (boost::iequals(val.first, "relaxed")) {
an.setRelaxed(to_bool(val.second.data()));
} else if (boost::iequals(val.first, "unitCacheLimit")) {
an.setUnitCacheLimit(stoull(val.second.data()));
} else if (boost::iequals(val.first, "streaming")) {
an.setStreaming(to_bool(val.second.data()));
} else if (boost::iequals(val.first, INPUT_BLOCK) || boost::iequals(val.first, OUTPUT_BLOCK)) {
......@@ -448,7 +450,7 @@ protected:
}
} else {
if (unit(*u)) {
an.addToOndemandCache(u);
an.addToUnitCache(u);
LOG(debug) << " Template unit for on-demand operation " + u->getName() + " generated.";
} else {
LOG(error) << " Template unit " << u->getName() << " did not pass the final check!";
......
......@@ -76,6 +76,7 @@ public:
_minValues(1),
_interval(1000),
_cacheInterval(900000),
_unitCacheLimit(1000),
_cacheSize(1),
_delayInterval(0),
_pendingTasks(0),
......@@ -98,6 +99,7 @@ public:
_minValues(other._minValues),
_interval(other._interval),
_cacheInterval(other._cacheInterval),
_unitCacheLimit(other._unitCacheLimit),
_cacheSize(other._cacheSize),
_delayInterval(other._delayInterval),
_pendingTasks(0),
......@@ -125,6 +127,7 @@ public:
_minValues = other._minValues;
_interval = other._interval;
_cacheInterval = other._cacheInterval;
_unitCacheLimit = other._unitCacheLimit;
_cacheSize = other._cacheSize;
_delayInterval = other._delayInterval;
_pendingTasks.store(0);
......@@ -237,6 +240,7 @@ public:
unsigned getMinValues() const { return _minValues; }
unsigned getInterval() const { return _interval; }
unsigned getCacheSize() const { return _cacheSize; }
unsigned getUnitCacheLimit() const { return _unitCacheLimit; }
unsigned getDelayInterval() const { return _delayInterval; }
int getUnitID() const { return _unitID; }
......@@ -251,9 +255,10 @@ public:
void setDuplicate(bool duplicate) { _duplicate = duplicate; }
void setMinValues(unsigned minValues) { _minValues = minValues; }
void setInterval(unsigned interval) { _interval = interval; }
void setUnitCacheLimit(unsigned uc) { _unitCacheLimit = uc+1; }
void setCacheInterval(unsigned cacheInterval) { _cacheInterval = cacheInterval; }
void setDelayInterval(unsigned delayInterval) { _delayInterval = delayInterval; }
virtual vector<UnitPtr>& getUnits() = 0;
virtual vector<UnitPtr>& getUnits(bool pop=false) = 0;
protected:
......@@ -292,6 +297,8 @@ protected:
unsigned int _interval;
// Size of the cache in time for the output sensors in this analyzer
unsigned int _cacheInterval;
// Maximum number of units that can be contained in the unit cache
unsigned int _unitCacheLimit;
// Real size of the cache, as determined from cacheInterval
unsigned int _cacheSize;
// Time in seconds to wait for before starting computation
......
......@@ -57,9 +57,7 @@ protected:
using U_Ptr = shared_ptr< UnitTemplate<S> >;
public:
static const unsigned cacheLimit = 100;
/**
* @brief Class constructor
*
......@@ -67,7 +65,7 @@ public:
*/
AnalyzerTemplate(const string name) :
AnalyzerInterface(name),
_ondemandCache(nullptr),
_unitCache(nullptr),
_insertionLUT(nullptr),
_queryEngine(QueryEngine::getInstance()) {}
......@@ -81,6 +79,8 @@ public:
*/
AnalyzerTemplate(const AnalyzerTemplate& other) :
AnalyzerInterface(other),
_unitCache(nullptr),
_insertionLUT(nullptr),
_queryEngine(QueryEngine::getInstance()) {
for(auto u : other._units) {
......@@ -114,9 +114,9 @@ public:
_units.clear();
_baseUnits.clear();
if(_ondemandCache) {
_ondemandCache->clear();
delete _ondemandCache;
if(_unitCache) {
_unitCache->clear();
delete _unitCache;
}
if(_insertionLUT) {
_insertionLUT->clear();
......@@ -137,6 +137,7 @@ public:
LOG_VAR(ll) << " Duplicated mode: " << (_duplicate ? "enabled" : "disabled");
LOG_VAR(ll) << " MinValues: " << _minValues;
LOG_VAR(ll) << " Interval: " << _interval;
LOG_VAR(ll) << " Unit Cache Size: " << _unitCacheLimit;
LOG_VAR(ll) << " Start delay: " << _delayInterval;
if(!_units.empty()) {
LOG_VAR(ll) << " Units:";
......@@ -167,10 +168,11 @@ public:
*
* The units returned by this method are of the UnitInterface type. The actual units, in their
* derived type, are used internally.
*
*
* @param pop This parameter does not affect the output of this method
* @return The vector of UnitInterface objects of this analyzer
*/
virtual vector<UnitPtr>& getUnits() override { return _baseUnits; }
virtual vector<UnitPtr>& getUnits(bool pop=false) override { return _baseUnits; }
/**
* @brief Clears all the units contained in this analyzer
......@@ -265,7 +267,7 @@ public:
shared_ptr<SensorNavigator> navi = _queryEngine.getNavigator();
UnitGenerator<S> unitGen(navi);
// We check whether the input node belongs to this analyzer's unit domain
if(!_ondemandCache)
if(!_unitCache)
throw std::runtime_error("Initialization error in analyzer " + _name + "!");
// Getting exclusive access to the analyzer
......@@ -275,16 +277,16 @@ public:
// and therefore we can use such unit without doing any resolution
try {
U_Ptr tempUnit = nullptr;
if (_ondemandCache->count(node)) {
if (_unitCache->count(node)) {
LOG(debug) << "Analyzer " << _name << ": cache hit for unit " << node << ".";
tempUnit = _ondemandCache->at(node);
tempUnit = _unitCache->at(node);
} else {
if (!_ondemandCache->count(SensorNavigator::templateKey))
if (!_unitCache->count(SensorNavigator::templateKey))
throw std::runtime_error("No template unit in analyzer " + _name + "!");
LOG(debug) << "Analyzer " << _name << ": cache miss for unit " << node << ".";
U_Ptr uTemplate = _ondemandCache->at(SensorNavigator::templateKey);
U_Ptr uTemplate = _unitCache->at(SensorNavigator::templateKey);
tempUnit = unitGen.generateUnit(node, uTemplate->getInputs(), uTemplate->getOutputs(), uTemplate->getInputMode(), "", _relaxed);
addToOndemandCache(tempUnit);
addToUnitCache(tempUnit);
}
// Initializing sensors if necessary
......@@ -322,7 +324,7 @@ public:
}
/**
* @brief Adds an unit to the internal cache of on-demand units
* @brief Adds an unit to the internal cache of units
*
* The cache is used to speed up response times to queries of on-demand analyzers, and reduce
* overall overhead. The cache has a limited size: once this size is reached, at every insertion
......@@ -330,17 +332,17 @@ public:
*
* @param unit Shared pointer to the Unit objecy to be added to the cache
*/
void addToOndemandCache(U_Ptr unit) {
if(!_ondemandCache) {
_ondemandCache = new map<string, U_Ptr>();
void addToUnitCache(U_Ptr unit) {
if(!_unitCache) {
_unitCache = new map<string, U_Ptr>();
_insertionLUT = new map<uint64_t, U_Ptr>();
}
if(_ondemandCache->size() >= cacheLimit) {
if(_unitCache->size() >= _unitCacheLimit) {
U_Ptr oldest = _insertionLUT->begin()->second;
_ondemandCache->erase(oldest->getName());
_unitCache->erase(oldest->getName());
}
_ondemandCache->insert(make_pair(unit->getName(), unit));
_unitCache->insert(make_pair(unit->getName(), unit));
// The template unit must never be deleted, even if the cache is full; therefore, we omit its entry from
// the insertion time LUT, so that it is never picked for deletion
if(unit->getName() != SensorNavigator::templateKey)
......@@ -437,8 +439,8 @@ protected:
*/
virtual void compute(U_Ptr unit) = 0;
// Cache for frequently used units in ondemand mode
map<string, U_Ptr>* _ondemandCache;
// Cache for frequently used units in ondemand and job modes
map<string, U_Ptr>* _unitCache;
// Helper map to keep track of the cache insertion times
map<uint64_t, U_Ptr>* _insertionLUT;
// Vector of pointers to the internal units
......
This diff is collapsed.
......@@ -135,7 +135,7 @@ void AnalyticsController::run() {
if (_doHalt) break;
for (const auto &a : p.configurator->getAnalyzers())
if(a->getStreaming())
for (const auto &u : a->getUnits())
for (const auto &u : a->getUnits(true))
for (const auto &s : u->getBaseOutputs())
if (s->getSizeOfReadingQueue() >= a->getMinValues() && sid.mqttTopicConvert(s->getMqtt())) {
readings.clear();
......
......@@ -57,6 +57,26 @@ public:
return m;
}
/**
* @brief Converts a numerical job ID to its internal MQTT topic representation
*
* @param jobID The job ID value to be processed
* @return The processed MQTT topic
*/
static std::string jobToTopic(uint32_t jobId) {
return "/job" + std::to_string(jobId);
}
/**
* @brief Converts an MQTT topic to its corresponding numerical job ID, if applicable
*
* @param topic The topic to be processed
* @return The numerical job ID
*/
static uint32_t topicToJob(const std::string& topic) {
return 0;
}
/**
* @brief Converts a sensor name to its internal MQTT topic representation
*
......
......@@ -153,7 +153,7 @@ void MQTTPusher::push() {
}
for (const auto &a : p.configurator->getAnalyzers()) {
if(a->getStreaming()) {
for (const auto &u : a->getUnits()) {
for (const auto &u : a->getUnits(true)) {
for (const auto &s : u->getBaseOutputs()) {
if (s->getSizeOfReadingQueue() >= a->getMinValues()) {
if (_msgCap == DISABLED || totalCount < (unsigned) _maxNumberOfMessages) {
......
......@@ -74,6 +74,7 @@ QueryEngine& _queryEngine = QueryEngine::getInstance();
boost::shared_ptr<boost::asio::io_service::work> keepAliveWork;
//TODO: fix sensormap rebuilding after plugin unloads
std::vector<reading_t>* sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>* buffer, const bool rel) {
//Initializing the sensor map if necessary. Thread safe!
if(_queryEngine.updated.load()) {
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment