//================================================================================ // Name : JobTSAggregatorOperator.cpp // Author : Carla Guillen // Contact : info@dcdb.it // Copyright : Leibniz Supercomputing Centre // Description : Template implementing features to use Units in Operators. //================================================================================ //================================================================================ // This file is part of DCDB (DataCenter DataBase) // Copyright (C) 2018-2019 Leibniz Supercomputing Centre // // This program is free software; you can redistribute it and/or // modify it under the terms of the GNU General Public License // as published by the Free Software Foundation; either version 2 // of the License, or (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with this program; if not, write to the Free Software // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. //================================================================================ #include "JobTSAggregatorOperator.h" #include #include #include #include #include #include #include #include #include "../../../common/include/logging.h" #include "../../../common/include/timestamp.h" #include "../../includes/CommonStatistics.h" #include "../../includes/QueryEngine.h" JobTSAggregatorOperator::JobTSAggregatorOperator(const std::string& name) : OperatorTemplate(name), JobOperatorTemplate(name), number_of_even_quantiles(0) { } JobTSAggregatorOperator::~JobTSAggregatorOperator() { } void JobTSAggregatorOperator::compute(U_Ptr unit, qeJobData& jobData) { // Clearing the buffer, if already allocated _buffer.clear(); size_t elCtr=0; uint64_t my_timestamp = getTimestamp(); //TODO minus 10 seconds or so... // Making sure that the aggregation boundaries do not go past the job start/end time uint64_t jobEnd = jobData.endTime!=0 && my_timestamp > jobData.endTime ? jobData.endTime : my_timestamp; uint64_t jobStart = jobEnd-my_timestamp < jobData.startTime ? jobData.startTime : jobEnd-my_timestamp; // Job units are hierarchical, and thus we iterate over all sub-units associated to each single node for(const auto& subUnit : unit->getSubUnits()) { // Getting the most recent values as specified in _window // Since we do not clear the internal buffer, all sensor readings will be accumulated in the same vector for(const auto& in : subUnit->getInputs()) { elCtr = _buffer.size(); _queryEngine.querySensor(in->getName(), my_timestamp, my_timestamp, _buffer, false); if (_buffer.size() <= elCtr) { LOG(debug) << "Job Operator " << _name << " cannot read from sensor " << in->getName() << "!"; return; } } } compute_internal(unit, _buffer); } void JobTSAggregatorOperator::compute_internal(U_Ptr unit, vector buffer) { reading_t reading; AggregatorSensorBase::aggregationOps_t op; reading.timestamp = getTimestamp() - 10e9; // Performing the actual aggregation operation for(const auto& out : unit->getOutputs()) { op = out->getOperation(); if(op!=AggregatorSensorBase::QTL) { switch (op) { case AggregatorSensorBase::SUM: reading.value = computeSum(buffer); break; case AggregatorSensorBase::AVG: reading.value = computeAvg(buffer); break; case AggregatorSensorBase::MIN: reading.value = computeMin(buffer); break; case AggregatorSensorBase::MAX: reading.value = computeMax(buffer); break; case AggregatorSensorBase::STD: reading.value = computeStd(buffer); break; case AggregatorSensorBase::OBS: reading.value = computeObs(buffer); break; default: LOG(warning) << _name << ": Encountered unknown operation!"; reading.value = 0; break; } out->storeReading(reading); } } if(!_quantileSensors.empty()) { vector quantiles; computeEvenQuantiles(buffer, number_of_even_quantiles, quantiles); for(unsigned idx=0; idxstoreReading(reading); } } } void JobTSAggregatorOperator::compute(U_Ptr unit){ //nothing here! }