Commit 4a7f20b6 authored by Alessio Netti's avatar Alessio Netti
Browse files

Analytics: cleanup JobAnalyzerTemplate

- The class is now derived from AnalyzerTemplate, reducing code
duplication
parent 9c0d7df4
......@@ -27,7 +27,7 @@
#ifndef PROJECT_JOBANALYZERTEMPLATE_H
#define PROJECT_JOBANALYZERTEMPLATE_H
#include "AnalyzerInterface.h"
#include "AnalyzerTemplate.h"
#include <boost/lockfree/spsc_queue.hpp>
#include <boost/property_tree/ptree.hpp>
#include <boost/property_tree/info_parser.hpp>
......@@ -41,7 +41,7 @@
*
*/
template <typename S>
class JobAnalyzerTemplate : public AnalyzerInterface {
class JobAnalyzerTemplate : public AnalyzerTemplate<S> {
// The template shall only be instantiated for classes which derive from SensorBase
static_assert(is_base_of<SensorBase, S>::value, "S must derive from SensorBase!");
......@@ -59,10 +59,7 @@ public:
* @param name Name of the analyzer
*/
JobAnalyzerTemplate(const string name) :
AnalyzerInterface(name),
_unitCache(nullptr),
_insertionLUT(nullptr),
_queryEngine(QueryEngine::getInstance()),
AnalyzerTemplate<S>(name),
_jobUnitsQueue(nullptr),
_jobDataVec(nullptr) {
......@@ -74,18 +71,11 @@ public:
*
*/
JobAnalyzerTemplate(const JobAnalyzerTemplate& other) :
AnalyzerInterface(other),
_unitCache(nullptr),
_insertionLUT(nullptr),
_queryEngine(QueryEngine::getInstance()),
AnalyzerTemplate<S>(other),
_jobUnitsQueue(nullptr),
_jobDataVec(nullptr) {
_mapAccess.store(false);
for(auto u : other._units) {
_units.push_back(u);
_baseUnits.push_back(u);
}
}
/**
......@@ -93,13 +83,7 @@ public:
*
*/
JobAnalyzerTemplate& operator=(const JobAnalyzerTemplate& other) {
AnalyzerInterface::operator=(other);
_units.clear();
for(auto u : other._units) {
_units.push_back(u);
_baseUnits.push_back(u);
}
AnalyzerTemplate<S>::operator=(other);
_jobUnitsQueue.reset(nullptr);
_jobDataVec = nullptr;
}
......@@ -108,38 +92,10 @@ public:
* @brief Class destructor
*/
virtual ~JobAnalyzerTemplate() {
_units.clear();
_baseUnits.clear();
if(_unitCache) {
_unitCache->clear();
delete _unitCache;
}
if(_insertionLUT) {
_insertionLUT->clear();
delete _insertionLUT;
}
if(_jobDataVec)
delete _jobDataVec;
}
/**
* @brief Prints the current analyzer configuration
*
* @param ll Logging level at which the configuration is printed
*/
virtual void printConfig(LOG_LEVEL ll) override {
if(_mqttPart!="")
LOG_VAR(ll) << " MQTT part: " << _mqttPart;
LOG_VAR(ll) << " Sync readings: " << (_sync ? "enabled" : "disabled");
LOG_VAR(ll) << " Streaming mode: " << (_streaming ? "enabled" : "disabled");
LOG_VAR(ll) << " MinValues: " << _minValues;
LOG_VAR(ll) << " Interval: " << _interval;
LOG_VAR(ll) << " Unit Cache Size: " << _unitCacheLimit;
LOG_VAR(ll) << " Start delay: " << _delayInterval;
LOG_VAR(ll) << " Units: none";
}
/**
* @brief Perform a REST-triggered PUT action
*
......@@ -157,22 +113,22 @@ public:
uint32_t maxJobs = queries.count("max")>0 ? stoull(queries.at("max")) : 100;
bool json = queries.count("json")>0 ? queries.at("json")=="true" : false;
uint32_t jobCtr = 0;
if(!_unitCache)
throw std::runtime_error("Initialization error in analyzer " + _name + "!");
if(!this->_unitCache)
throw std::runtime_error("Initialization error in analyzer " + this->_name + "!");
while( _mapAccess.exchange(true) ) {}
if(json) {
boost::property_tree::ptree root, units, sensors;
for (auto it = _insertionLUT->rbegin(); it != _insertionLUT->rend() && jobCtr < maxJobs; ++it) {
for (auto it = this->_insertionLUT->rbegin(); it != this->_insertionLUT->rend() && jobCtr < maxJobs; ++it) {
for (const auto &s : *it->getBaseOutputs())
sensors.push_back(boost::property_tree::ptree::value_type("", boost::property_tree::ptree(s->getName())));
units.add_child(*it->getName(), sensors);
jobCtr++;
}
root.add_child(_name, units);
root.add_child(this->_name, units);
boost::property_tree::write_json(data, root, true);
} else {
for (auto it = _insertionLUT->rbegin(); it != _insertionLUT->rend() && jobCtr < maxJobs; ++it) {
for (auto it = this->_insertionLUT->rbegin(); it != this->_insertionLUT->rend() && jobCtr < maxJobs; ++it) {
for (const auto &s : *it->getBaseOutputs())
data << *it->getName() << "::" << s->getMqtt() << "\n";
jobCtr++;
......@@ -185,22 +141,6 @@ public:
resp.data = data.str();
return resp;
}
/**
* @brief Adds an unit to this analyzer
*
* @param u Shared pointer to a UnitInterface object
*/
virtual void addUnit(UnitPtr u) override {
// Since the AnalyzerInterface method accepts UnitInterface objects, we must cast the input argument
// to its actual type, which is UnitTemplate<S>
if (U_Ptr dUnit = dynamic_pointer_cast< UnitTemplate<S> >(u)) {
_units.push_back(dUnit);
_baseUnits.push_back(u);
}
else
LOG(error) << "Analyzer " << _name << ": Type mismatch when storing output sensor! Sensor omitted";
}
/**
* @brief Returns the units of this analyzer
......@@ -218,61 +158,18 @@ public:
*/
virtual vector<UnitPtr>& getUnits(bool pop=false) override {
if(pop) {
_baseUnits.clear();
_units.clear();
this->_baseUnits.clear();
this->_units.clear();
unordered_set<string> unitSet;
U_Ptr u;
while(_jobUnitsQueue.pop(u))
if(unitSet.insert(u->getName()))
addUnit(u);
return _baseUnits;
return this->_baseUnits;
} else
return _dummyBaseUnits;
}
/**
* @brief Clears all the units contained in this analyzer
*/
virtual void clearUnits() override { _units.clear(); _baseUnits.clear(); _unitID = -1; }
/**
* @brief Starts this analyzer
*/
virtual void start() override {
if(_keepRunning) {
LOG(info) << "Analyzer " << _name << " already running.";
return;
} else if(!_streaming) {
LOG(error) << "On-demand analyzer " << _name << " cannot be started.";
return;
}
_keepRunning = 1;
_pendingTasks++;
_timer->async_wait(bind(&AnalyzerTemplate<S>::computeAsync, this));
if(_delayInterval == 0)
LOG(info) << "Analyzer " << _name << " started.";
else
LOG(info) << "Analyzer " << _name << " will be started after a delay of " << _delayInterval << " seconds.";
}
/**
* @brief Stops this analyzer
*/
virtual void stop() override {
if(_keepRunning == 0) {
LOG(info) << "Analyzer " << _name << " already stopped.";
return;
} else if(!_streaming) {
LOG(error) << "On-demand analyzer " << _name << " cannot be stopped.";
return;
}
_keepRunning = 0;
wait();
LOG(info) << "Analyzer " << _name << " stopped.";
}
/**
* @brief Initializes this analyzer
*
......@@ -280,7 +177,7 @@ public:
*/
virtual void init(boost::asio::io_service& io) override {
AnalyzerInterface::init(io);
_jobUnitsQueue.reset(new boost::lockfree::spsc_queue<U_Ptr>(_unitCacheLimit));
_jobUnitsQueue.reset(new boost::lockfree::spsc_queue<U_Ptr>(this->_unitCacheLimit));
}
/**
......@@ -296,12 +193,12 @@ public:
*/
virtual map<string, reading_t> computeOnDemand(const string& node="__root__") override {
map<string, reading_t> outMap;
if( !_streaming ) {
if( !this->_streaming ) {
try {
// Getting exclusive access to the analyzer
while( _onDemandLock.exchange(true) ) {}
while( this->_onDemandLock.exchange(true) ) {}
uint32_t jobId = MQTTChecker::topicToJob(node);
vector<qeJobData>* buf = _queryEngine.queryJob(jobId, 0, 0, _jobDataVec, true, false);
vector<qeJobData>* buf = this->_queryEngine.queryJob(jobId, 0, 0, _jobDataVec, true, false);
if(buf) _jobDataVec = buf;
if(buf && !buf->empty()) {
U_Ptr jobUnit = jobDataToUnit(buf[0]);
......@@ -312,16 +209,16 @@ public:
o->clearReadingQueue();
}
} else
throw std::runtime_error("Analyzer " + _name + ": cannot retrieve job data!");
throw std::runtime_error("Analyzer " + this->_name + ": cannot retrieve job data!");
} catch(const exception& e) {
_onDemandLock.store(false);
this->_onDemandLock.store(false);
throw;
}
_onDemandLock.store(false);
} else if( _keepRunning ) {
this->_onDemandLock.store(false);
} else if( this->_keepRunning ) {
bool found = false;
while( _mapAccess.exchange(true) ) {}
for(const auto& kv : _unitCache)
for(const auto& kv : this->_unitCache)
if(kv.first == node) {
found = true;
for(const auto& o : kv.second->getBaseOutputs())
......@@ -330,64 +227,14 @@ public:
_mapAccess.store(false);
if(!found)
throw std::domain_error("Job " + node + " does not belong to the domain of " + _name + "!");
throw std::domain_error("Job " + node + " does not belong to the domain of " + this->_name + "!");
} else
throw std::runtime_error("Analyzer " + _name + ": not available for on-demand query!");
throw std::runtime_error("Analyzer " + this->_name + ": not available for on-demand query!");
return outMap;
}
/**
* @brief Adds an unit to the internal cache of units
*
* The cache is used to speed up response times to queries of on-demand analyzers, and reduce
* overall overhead. The cache has a limited size: once this size is reached, at every insertion
* the oldest entry in the cache is removed.
*
* @param unit Shared pointer to the Unit objecy to be added to the cache
*/
void addToUnitCache(U_Ptr unit) {
if (!_unitCache) {
_unitCache = new map<string, U_Ptr>();
_insertionLUT = new map<uint64_t, U_Ptr>();
}
if (_unitCache->size() >= _unitCacheLimit) {
U_Ptr oldest = _insertionLUT->begin()->second;
_unitCache->erase(oldest->getName());
}
_unitCache->insert(make_pair(unit->getName(), unit));
// The template unit must never be deleted, even if the cache is full; therefore, we omit its entry from
// the insertion time LUT, so that it is never picked for deletion
if (unit->getName() != SensorNavigator::templateKey)
_insertionLUT->insert(make_pair(getTimestamp(), unit));
}
protected:
/**
* @brief Returns the timestamp associated with the next compute task
*
* If the sync option is enabled, the timestamp will be adjusted to be synchronized with the
* other sensor readings.
*
* @return Timestamp of the next compute task
*/
uint64_t nextReadingTime() {
uint64_t now = getTimestamp();
uint64_t next;
if (_sync) {
uint64_t interval64 = static_cast<uint64_t>(_interval);
uint64_t now_ms = now / 1000 / 1000;
uint64_t waitToStart = interval64 - (now_ms%interval64); //synchronize all measurements with other sensors
if(!waitToStart ){ // less than 1 ms seconds is too small, so we wait the entire interval for the next measurement
return (now_ms + interval64)*1000*1000;
}
return (now_ms + waitToStart)*1000*1000;
} else {
return now + MS_TO_NS(_interval);
}
}
/**
* @brief This method encapsulates all logic to generate and manage job units
*
......@@ -402,29 +249,29 @@ protected:
virtual U_Ptr jobDataToUnit(qeJobData& jobData) {
string jobTopic = MQTTChecker::jobToTopic(jobData.jobId);
U_Ptr jobUnit = nullptr;
if(!_unitCache)
throw std::runtime_error("Initialization error in analyzer " + _name + "!");
if(!this->_unitCache)
throw std::runtime_error("Initialization error in analyzer " + this->_name + "!");
if (_unitCache->count(jobTopic)) {
jobUnit = _unitCache->at(jobTopic);
LOG(debug) << "Analyzer " << _name << ": cache hit for unit " << jobTopic << ".";
if (this->_unitCache->count(jobTopic)) {
jobUnit = this->_unitCache->at(jobTopic);
LOG(debug) << "Analyzer " << this->_name << ": cache hit for unit " << jobTopic << ".";
} else {
if (!_unitCache->count(SensorNavigator::templateKey))
throw std::runtime_error("No template unit in analyzer " + _name + "!");
LOG(debug) << "Analyzer " << _name << ": cache miss for unit " << jobTopic << ".";
U_Ptr uTemplate = _unitCache->at(SensorNavigator::templateKey);
shared_ptr<SensorNavigator> navi = _queryEngine.getNavigator();
if (!this->_unitCache->count(SensorNavigator::templateKey))
throw std::runtime_error("No template unit in analyzer " + this->_name + "!");
LOG(debug) << "Analyzer " << this->_name << ": cache miss for unit " << jobTopic << ".";
U_Ptr uTemplate = this->_unitCache->at(SensorNavigator::templateKey);
shared_ptr<SensorNavigator> navi = this->_queryEngine.getNavigator();
UnitGenerator<S> unitGen(navi);
vector<string> nodes;
for (const auto &n : jobData.nodes)
nodes.push_back(translateNodeName(n));
jobUnit = unitGen.generateJobUnit(jobTopic, nodes, uTemplate->getInputs(), uTemplate->getOutputs(), uTemplate->getInputMode(), jobTopic, _relaxed);
jobUnit = unitGen.generateJobUnit(jobTopic, nodes, uTemplate->getInputs(), uTemplate->getOutputs(), uTemplate->getInputMode(), jobTopic, this->_relaxed);
// Initializing sensors if necessary
for (const auto s : jobUnit->getOutputs())
if (!s->isInit())
s->initSensor(_cacheSize);
s->initSensor(this->_cacheSize);
// Spinlock to regulate access to the internal unit map - normally innocuous
while( _mapAccess.exchange(true) ) {}
......@@ -460,14 +307,14 @@ protected:
*
*/
virtual void computeAsync() override {
if(_delayInterval > 0) {
sleep(_delayInterval);
_delayInterval = 0;
LOG(info) << "Analyzer " + _name + ": starting computation after delayed start!";
if(this->_delayInterval > 0) {
sleep(this->_delayInterval);
this->_delayInterval = 0;
LOG(info) << "Analyzer " + this->_name + ": starting computation after delayed start!";
}
try {
vector<qeJobData>* buf = _queryEngine.queryJob(0, _interval * 1000000, 0, _jobDataVec, true, true);
vector<qeJobData>* buf = this->_queryEngine.queryJob(0, this->_interval * 1000000, 0, _jobDataVec, true, true);
if(buf) {
_jobDataVec = buf;
for(const auto& job : *_jobDataVec) {
......@@ -477,40 +324,19 @@ protected:
}
}
else
LOG(error) << "Analyzer " + _name + ": cannot retrieve job data!";
LOG(error) << "Analyzer " + this->_name + ": cannot retrieve job data!";
} catch(const exception& e) {
LOG(error) << "Analyzer " + _name + ": internal error " + e.what() + " during computation!";
LOG(error) << "Analyzer " + this->_name + ": internal error " + e.what() + " during computation!";
}
if (_timer && _keepRunning) {
_timer->expires_at(timestamp2ptime(nextReadingTime()));
_pendingTasks++;
_timer->async_wait(bind(&JobAnalyzerTemplate::computeAsync, this));
if (this->_timer && this->_keepRunning) {
this->_timer->expires_at(timestamp2ptime(this->nextReadingTime()));
this->_pendingTasks++;
this->_timer->async_wait(bind(&JobAnalyzerTemplate::computeAsync, this));
}
_pendingTasks--;
this->_pendingTasks--;
}
/**
* @brief Data analytics computation logic
*
* This method contains the actual logic used by the analyzed, and is automatically called by
* the computeAsync method.
*
* @param unit Shared pointer to unit to be processed
*/
virtual void compute(U_Ptr unit) = 0;
// Cache for frequently used units in ondemand and job modes
map<string, U_Ptr>* _unitCache;
// Helper map to keep track of the cache insertion times
map<uint64_t, U_Ptr>* _insertionLUT;
// Vector of pointers to the internal units
vector<U_Ptr> _units;
// Vector of pointers to the internal units, casted to UnitInterface - only efficient way to do this in C++
// unless we use raw arrays
vector<UnitPtr> _baseUnits;
// Instance of a QueryEngine object to get sensor data
QueryEngine& _queryEngine;
// Queue of recently-modified units that is periodically emptied when messages are pushed
std::unique_ptr<boost::lockfree::spsc_queue<U_Ptr>> _jobUnitsQueue;
// Spinlock used to regulate access to the internal units map, for "visualization" purposes
......@@ -520,6 +346,8 @@ protected:
vector<UnitPtr> _dummyBaseUnits;
// Vector of job data structures used to retrieve job data at runtime
vector<qeJobData>* _jobDataVec;
// Logger object
boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
};
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment