Commit 2fa55ec2 authored by Alessio Netti's avatar Alessio Netti
Browse files

Analytics: job analyzers have direct access to job information

- A jobData structure is now supplied alongside che corresponding unit
as input to the compute() method for job analyzers
parent c771b7f8
......@@ -33,18 +33,22 @@ JobAggregatorAnalyzer::JobAggregatorAnalyzer(const std::string& name) :
JobAggregatorAnalyzer::~JobAggregatorAnalyzer() {}
void JobAggregatorAnalyzer::compute(U_Ptr unit) {
void JobAggregatorAnalyzer::compute(U_Ptr unit, qeJobData& jobData) {
// Clearing the buffer, if already allocated
if(_buffer)
_buffer->clear();
size_t elCtr=0;
uint64_t now = getTimestamp();
// Making sure that the aggregation boundaries do not go past the job start/end time
uint64_t jobEnd = jobData.endTime!=0 && now > jobData.endTime ? jobData.endTime : now;
uint64_t jobStart = jobEnd-_window < jobData.startTime ? jobData.startTime : jobEnd-_window;
// Job units are hierarchical, and thus we iterate over all sub-units associated to each single node
for(const auto& subUnit : unit->getSubUnits()) {
// Getting the most recent values as specified in _window
// Since we do not clear the internal buffer, all sensor readings will be accumulated in the same vector
for(const auto& in : subUnit->getInputs()) {
elCtr = _buffer == nullptr ? 0 : _buffer->size();
_buffer = _queryEngine.querySensor(in->getName(), _window, 0, _buffer);
_buffer = _queryEngine.querySensor(in->getName(), jobStart, jobEnd, _buffer, false);
if (!_buffer || _buffer->size() <= elCtr) {
LOG(debug) << "Job Analyzer " << _name << " cannot read from sensor " << in->getName() << "!";
return;
......
......@@ -40,10 +40,12 @@ public:
JobAggregatorAnalyzer(const std::string& name);
virtual ~JobAggregatorAnalyzer();
private:
void compute(U_Ptr unit) override;
protected:
using AggregatorAnalyzer::compute;
void compute(U_Ptr unit, qeJobData& jobData) override;
};
......
......@@ -145,12 +145,14 @@ public:
// Getting exclusive access to the analyzer
while( this->_onDemandLock.exchange(true) ) {}
uint32_t jobId = MQTTChecker::topicToJob(node);
if(_jobDataVec)
_jobDataVec->clear();
vector<qeJobData>* buf = this->_queryEngine.queryJob(jobId, 0, 0, _jobDataVec, true, false);
if(buf) _jobDataVec = buf;
if(buf && !buf->empty()) {
U_Ptr jobUnit = jobDataToUnit(buf->at(0));
U_Ptr jobUnit = jobDataToUnit(_jobDataVec->at(0));
this->compute(jobUnit);
this->compute(jobUnit, _jobDataVec->at(0));
for (const auto &o : jobUnit->getOutputs()) {
outMap.insert(make_pair(o->getName(), o->getLatestValue()));
o->clearReadingQueue();
......@@ -181,6 +183,20 @@ public:
protected:
using AnalyzerTemplate<S>::compute;
/**
* @brief Data analytics (job) computation logic
*
* This method contains the actual logic used by the analyzed, and is automatically called by
* the computeAsync method. This variant of the compute() method defined in AnalyzerTemplate also
* includes a job data structure in its list of arguments, and is specialized for job analyzers.
*
* @param unit Shared pointer to unit to be processed
* @param jobData Job data structure
*/
virtual void compute(U_Ptr unit, qeJobData& jobData) = 0;
/**
* @brief This method encapsulates all logic to generate and manage job units
*
......@@ -257,6 +273,8 @@ protected:
}
try {
if(_jobDataVec)
_jobDataVec->clear();
vector<qeJobData>* buf = this->_queryEngine.queryJob(0, this->_interval * 1000000, 0, _jobDataVec, true, true);
if(buf) {
_jobDataVec = buf;
......@@ -265,17 +283,19 @@ protected:
for(const auto& job : *_jobDataVec) {
try {
_tempUnits.push_back(jobDataToUnit(job));
} catch(const invalid_argument& e2) { continue; }
} catch(const invalid_argument& e2) { _tempUnits.push_back(nullptr); continue; }
}
// Performing actual computation on each unit
for(const auto& ju : _tempUnits)
this->compute(ju);
for(size_t idx=0; idx<_tempUnits.size(); idx++)
if(_tempUnits[idx])
this->compute(_tempUnits[idx], _jobDataVec->at(idx));
// Acquiring the spinlock to refresh the exposed units
while(_unitAccess.exchange(true)) {}
this->clearUnits();
for(const auto& ju : _tempUnits)
this->addUnit(ju);
if(ju)
this->addUnit(ju);
_unitAccess.store(false);
_tempUnits.clear();
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment