Commit b4830c2b authored by Alessio Netti's avatar Alessio Netti
Browse files

Analytics: switching to group queries for Aggregator plugins

parent e98e25a3
...@@ -47,20 +47,18 @@ void AggregatorOperator::printConfig(LOG_LEVEL ll) { ...@@ -47,20 +47,18 @@ void AggregatorOperator::printConfig(LOG_LEVEL ll) {
} }
void AggregatorOperator::compute(U_Ptr unit) { void AggregatorOperator::compute(U_Ptr unit) {
// Clearing the buffer
_buffer.clear();
size_t elCtr=0;
uint64_t startTs=0, endTs=0, now=getTimestamp(); uint64_t startTs=0, endTs=0, now=getTimestamp();
startTs = _relative ? _window : now - _window; startTs = _relative ? _window : now - _window;
endTs = _relative ? 0 : now; endTs = _relative ? 0 : now;
// Clearing the buffer
_buffer.clear();
std::vector<std::string> sensorNames;
for(const auto& in : unit->getInputs()) { for(const auto& in : unit->getInputs()) {
// Getting the most recent values as specified in _window sensorNames.push_back(in->getName());
// Since we do not clear the internal buffer, all sensor readings will be accumulated in the same vector }
elCtr = _buffer.size(); if(!_queryEngine.querySensor(sensorNames, startTs, endTs, _buffer, _relative)) {
if(!_queryEngine.querySensor(in->getName(), startTs, endTs, _buffer, _relative) || _buffer.size()<=elCtr) { LOG(debug) << "Operator " + _name + ": cannot read from any sensor for unit " + unit->getName() + "!";
LOG(debug) << "Operator " + _name + ": cannot read from sensor " + in->getName() + "!"; return;
return;
}
} }
compute_internal(unit, _buffer); compute_internal(unit, _buffer);
} }
......
...@@ -37,24 +37,22 @@ JobAggregatorOperator::JobAggregatorOperator(const JobAggregatorOperator& other) ...@@ -37,24 +37,22 @@ JobAggregatorOperator::JobAggregatorOperator(const JobAggregatorOperator& other)
JobAggregatorOperator::~JobAggregatorOperator() {} JobAggregatorOperator::~JobAggregatorOperator() {}
void JobAggregatorOperator::compute(U_Ptr unit, qeJobData& jobData) { void JobAggregatorOperator::compute(U_Ptr unit, qeJobData& jobData) {
// Clearing the buffer
_buffer.clear();
size_t elCtr=0;
uint64_t now = getTimestamp(); uint64_t now = getTimestamp();
// Making sure that the aggregation boundaries do not go past the job start/end time // Making sure that the aggregation boundaries do not go past the job start/end time
uint64_t jobEnd = jobData.endTime!=0 && now > jobData.endTime ? jobData.endTime : now; uint64_t jobEnd = jobData.endTime!=0 && now > jobData.endTime ? jobData.endTime : now;
uint64_t jobStart = jobEnd-_window < jobData.startTime ? jobData.startTime : jobEnd-_window; uint64_t jobStart = jobEnd-_window < jobData.startTime ? jobData.startTime : jobEnd-_window;
// Clearing the buffer
_buffer.clear();
std::vector<std::string> sensorNames;
// Job units are hierarchical, and thus we iterate over all sub-units associated to each single node // Job units are hierarchical, and thus we iterate over all sub-units associated to each single node
for(const auto& subUnit : unit->getSubUnits()) { for(const auto& subUnit : unit->getSubUnits()) {
// Getting the most recent values as specified in _window for (const auto &in : subUnit->getInputs()) {
// Since we do not clear the internal buffer, all sensor readings will be accumulated in the same vector sensorNames.push_back(in->getName());
for(const auto& in : subUnit->getInputs()) {
elCtr = _buffer.size();
if (!_queryEngine.querySensor(in->getName(), jobStart, jobEnd, _buffer, false) || _buffer.size() <= elCtr) {
LOG(debug) << "Job Operator " << _name << " cannot read from sensor " << in->getName() << "!";
return;
}
} }
} }
if (!_queryEngine.querySensor(sensorNames, jobStart, jobEnd, _buffer, false)) {
LOG(debug) << "Job Operator " << _name << ": cannot read from any sensor for unit " + unit->getName() + "!";
return;
}
compute_internal(unit, _buffer); compute_internal(unit, _buffer);
} }
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment