Commit f2e6a8bb authored by Alessio Netti's avatar Alessio Netti
Browse files

Analytics: refactored percentile computation in Aggregator operator

parent ef21e4f5
......@@ -88,5 +88,20 @@ void computeEvenQuantiles(std::vector<reading_t> &data, const unsigned int NUMBE
}
}
void computePercentiles(std::vector<reading_t> &data, const std::vector<size_t> &percentilePositions, std::vector<int64_t> &percentiles) {
if (data.empty() || percentilePositions.empty())
return;
size_t idx, mod;
percentiles.clear();
// Sorting the sensor reading buffer to extract quantiles
std::sort(data.begin(), data.end(), [ ](const reading_t& lhs, const reading_t& rhs) { return lhs.value < rhs.value; });
for(const auto& q : percentilePositions) {
idx = (data.size() * q) / 100;
mod = (data.size() * q) % 100;
percentiles.push_back((mod==0 || idx==data.size()-1) ? data[idx].value : (data[idx].value + data[idx+1].value)/2);
}
}
#endif /* ANALYTICS_INCLUDES_COMMONSTATISTICS_H_ */
......@@ -56,7 +56,7 @@ void AggregatorConfigurator::sensorBase(AggregatorSensorBase& s, CFG_VAL config)
} else if (boost::iequals(val.first, "percentile")) {
size_t quantile = stoull(val.second.data());
if( quantile>0 && quantile<100 )
s.setQuantile(quantile);
s.setPercentile(quantile);
}
}
}
......
......@@ -71,7 +71,8 @@ void AggregatorOperator::compute(U_Ptr unit) {
}
void AggregatorOperator::compute_internal(U_Ptr unit, vector<reading_t> *buffer) {
_quantileSensors.clear();
_percentileSensors.clear();
_percentiles.clear();
reading_t reading;
AggregatorSensorBase::aggregationOps_t op;
reading.timestamp = getTimestamp();
......@@ -104,28 +105,16 @@ void AggregatorOperator::compute_internal(U_Ptr unit, vector<reading_t> *buffer)
break;
}
out->storeReading(reading);
} else
_quantileSensors.push_back(out);
}
if(!_quantileSensors.empty()) {
vector<int64_t> result = computeQuantiles(buffer);
for(unsigned idx=0; idx<result.size(); idx++) {
reading.value = result[idx];
_quantileSensors[idx]->storeReading(reading);
} else {
_percentileSensors.push_back(out);
_percentiles.push_back(out->getPercentile());
}
}
}
vector<int64_t> AggregatorOperator::computeQuantiles(vector<reading_t> *buffer) {
size_t idx, mod;
vector<int64_t> result;
// Sorting the sensor reading buffer to extract quantiles
std::sort(buffer->begin(), buffer->end(), [ ](const reading_t& lhs, const reading_t& rhs) { return lhs.value < rhs.value; });
for(const auto& q : _quantileSensors) {
idx = (buffer->size() * q->getQuantile()) / 100;
mod = (buffer->size() * q->getQuantile()) % 100;
result.push_back((mod==0 || idx==buffer->size()-1) ? buffer->at(idx).value : (buffer->at(idx).value + buffer->at(idx+1).value)/2);
if(!_percentileSensors.empty()) {
computePercentiles(*buffer, _percentiles, _percentileResult);
for(unsigned idx=0; idx<_percentileResult.size(); idx++) {
reading.value = _percentileResult[idx];
_percentileSensors[idx]->storeReading(reading);
}
}
return result;
}
......@@ -60,12 +60,11 @@ protected:
virtual void compute(U_Ptr unit) override;
// Internal method containing the actual logic of the operator
void compute_internal(U_Ptr unit, vector<reading_t> *buffer);
// A separate method for each operation implies code redundancy, but also better efficiency and less useless
// variables used by specific operations lying around
vector<int64_t> computeQuantiles(vector<reading_t> *buffer);
vector<reading_t> *_buffer;
vector<AggregatorSBPtr> _quantileSensors;
vector<AggregatorSBPtr> _percentileSensors;
vector<size_t> _percentiles;
vector<int64_t> _percentileResult;
unsigned long long _window;
bool _relative;
......
......@@ -52,29 +52,29 @@ public:
// Constructor and destructor
AggregatorSensorBase(const std::string& name) : SensorBase(name) {
_opType = SUM;
_quantile = 50;
_percentile = 50;
}
// Copy constructor
AggregatorSensorBase(AggregatorSensorBase& other) : SensorBase(other) {
_opType = other._opType;
_quantile = other._quantile;
_percentile = other._percentile;
}
virtual ~AggregatorSensorBase() {}
void setOperation(aggregationOps_t op) { _opType = op; }
void setQuantile(size_t q) { _quantile = q; }
void setPercentile(size_t q) { _percentile = q; }
aggregationOps_t getOperation() { return _opType; }
size_t getQuantile() { return _quantile; }
size_t getPercentile() { return _percentile; }
void printConfig(LOG_LEVEL ll, LOGGER& lg, unsigned leadingSpaces=16) {
SensorBase::printConfig(ll, lg, leadingSpaces);
std::string leading(leadingSpaces, ' ');
LOG_VAR(ll) << leading << " Operation: " << getOpString(_opType);
if(_opType==QTL)
LOG_VAR(ll) << leading << " Percentile: " << _quantile;
LOG_VAR(ll) << leading << " Percentile: " << _percentile;
}
protected:
......@@ -111,7 +111,7 @@ protected:
}
aggregationOps_t _opType;
size_t _quantile;
size_t _percentile;
};
using AggregatorSBPtr = std::shared_ptr<AggregatorSensorBase>;
......
......@@ -56,7 +56,7 @@ void JobAggregatorConfigurator::sensorBase(AggregatorSensorBase& s, CFG_VAL conf
} else if (boost::iequals(val.first, "percentile")) {
size_t quantile = stoull(val.second.data());
if( quantile>0 && quantile<100 )
s.setQuantile(quantile);
s.setPercentile(quantile);
}
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment