Commit 27d43ecc authored by Alessio Netti's avatar Alessio Netti
Browse files

Analytics: adding goBack parameter in Aggregator and Job Aggregator plugins

parent f8aa4f83
......@@ -804,6 +804,7 @@ The configuration parameters specific to the _Aggregator_ plugin are the followi
| Value | Explanation |
|:----- |:----------- |
| window | Length in milliseconds of the time window that is used to retrieve recent readings for the input sensors, starting from the latest one.
| goBack | Value in milliseconds that can be used to shift back the aggregation window. Useful when very recent sensor data is not always available.
Additionally, output sensors in operators of the _Aggregator_ plugin accept the following parameters:
......
......@@ -66,6 +66,8 @@ void AggregatorConfigurator::operatorAttributes(AggregatorOperator& op, CFG_VAL
{
if (boost::iequals(val.first, "window"))
op.setWindow(stoull(val.second.data()) * 1000000);
else if (boost::iequals(val.first, "goBack"))
op.setGoBack(stoull(val.second.data()) * 1000000);
else if (boost::iequals(val.first, "relative"))
op.setRelative(to_bool(val.second.data()));
}
......
......@@ -30,11 +30,13 @@
AggregatorOperator::AggregatorOperator(const std::string& name) : OperatorTemplate(name) {
_window = 0;
_goBack = 0;
_relative = true;
}
AggregatorOperator::AggregatorOperator(const AggregatorOperator& other) : OperatorTemplate(other) {
_window = other._window;
_goBack = other._goBack;
_relative = other._relative;
}
......@@ -42,14 +44,15 @@ AggregatorOperator::~AggregatorOperator() {}
void AggregatorOperator::printConfig(LOG_LEVEL ll) {
LOG_VAR(ll) << " Window: " << _window;
LOG_VAR(ll) << " Go Back: " << _goBack;
LOG_VAR(ll) << " Relative mode: " << (_relative ? "enabled" : "disabled");
OperatorTemplate<AggregatorSensorBase>::printConfig(ll);
}
void AggregatorOperator::compute(U_Ptr unit) {
uint64_t startTs=0, endTs=0, now=getTimestamp();
startTs = _relative ? _window : now - _window;
endTs = _relative ? 0 : now;
startTs = _relative ? (_window + _goBack) : (now - _window - _goBack);
endTs = _relative ? (_goBack) : (now - _goBack);
// Clearing the buffer
_buffer.clear();
std::vector<std::string> sensorNames;
......@@ -68,7 +71,7 @@ void AggregatorOperator::compute_internal(U_Ptr unit, vector<reading_t>& buffer)
_percentiles.clear();
reading_t reading;
AggregatorSensorBase::aggregationOps_t op;
reading.timestamp = getTimestamp();
reading.timestamp = getTimestamp() - _goBack;
// Performing the actual aggregation operation
for(const auto& out : unit->getOutputs()) {
op = out->getOperation();
......
......@@ -48,9 +48,11 @@ public:
virtual ~AggregatorOperator();
void setWindow(unsigned long long w) { _window = w; }
void setGoBack(unsigned long long g) { _goBack = g; }
void setRelative(bool r) { _relative = r; }
unsigned long long getWindow() { return _window; }
unsigned long long getGoBack() { return _goBack; }
bool getRelative() { return _relative; }
void printConfig(LOG_LEVEL ll) override;
......@@ -66,6 +68,7 @@ protected:
vector<size_t> _percentiles;
vector<int64_t> _percentileResult;
unsigned long long _window;
unsigned long long _goBack;
bool _relative;
};
......
......@@ -66,6 +66,8 @@ void JobAggregatorConfigurator::operatorAttributes(JobAggregatorOperator& op, CF
{
if (boost::iequals(val.first, "window"))
op.setWindow(stoull(val.second.data()) * 1000000);
else if (boost::iequals(val.first, "goBack"))
op.setGoBack(stoull(val.second.data()) * 1000000);
}
}
......
......@@ -37,7 +37,7 @@ JobAggregatorOperator::JobAggregatorOperator(const JobAggregatorOperator& other)
JobAggregatorOperator::~JobAggregatorOperator() {}
void JobAggregatorOperator::compute(U_Ptr unit, qeJobData& jobData) {
uint64_t now = getTimestamp();
uint64_t now = getTimestamp() - _goBack;
// Making sure that the aggregation boundaries do not go past the job start/end time
uint64_t jobEnd = jobData.endTime!=0 && now > jobData.endTime ? jobData.endTime : now;
uint64_t jobStart = jobEnd-_window < jobData.startTime ? jobData.startTime : jobEnd-_window;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment