AggregatorOperator.cpp 4.6 KB
Newer Older
1
//================================================================================
2
// Name        : AggregatorOperator.cpp
3
// Author      : Alessio Netti
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
6
7
8
9
10
11
12
13
14
15
16
// Copyright   : Leibniz Supercomputing Centre
// Description :
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2019-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
Alessio Netti's avatar
Alessio Netti committed
17
//
18
19
20
21
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
Alessio Netti's avatar
Alessio Netti committed
22
//
23
24
25
26
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
Alessio Netti's avatar
Alessio Netti committed
27

28
#include "AggregatorOperator.h"
29
#include "../../includes/CommonStatistics.h"
Alessio Netti's avatar
Alessio Netti committed
30

31
AggregatorOperator::AggregatorOperator(const std::string& name) : OperatorTemplate(name) { 
32
    _window = 0;
33
    _relative = true;
34
}
Alessio Netti's avatar
Alessio Netti committed
35

36
AggregatorOperator::AggregatorOperator(const AggregatorOperator& other) : OperatorTemplate(other) {
Alessio Netti's avatar
Alessio Netti committed
37
    _window = other._window;
38
    _relative = other._relative;
Alessio Netti's avatar
Alessio Netti committed
39
40
}

41
AggregatorOperator::~AggregatorOperator() {}
Alessio Netti's avatar
Alessio Netti committed
42

43
void AggregatorOperator::printConfig(LOG_LEVEL ll) {
Alessio Netti's avatar
Alessio Netti committed
44
    LOG_VAR(ll) << "            Window:          " << _window;
45
    LOG_VAR(ll) << "            Relative mode:   " << (_relative ? "enabled" : "disabled");
46
    OperatorTemplate<AggregatorSensorBase>::printConfig(ll);
Alessio Netti's avatar
Alessio Netti committed
47
48
}

49
void AggregatorOperator::compute(U_Ptr unit) {
50
51
    // Clearing the buffer
    _buffer.clear();
52
    size_t elCtr=0;
53
54
55
    uint64_t startTs=0, endTs=0, now=getTimestamp();
    startTs = _relative ? _window : now - _window;
    endTs   = _relative ? 0 : now;
56
    for(const auto& in : unit->getInputs()) {
Alessio Netti's avatar
Alessio Netti committed
57
        // Getting the most recent values as specified in _window
58
        // Since we do not clear the internal buffer, all sensor readings will be accumulated in the same vector
59
60
        elCtr = _buffer.size();
        if(!_queryEngine.querySensor(in->getName(), startTs, endTs, _buffer, _relative) || _buffer.size()<=elCtr)
61
            throw std::runtime_error("Operator " + _name + ": cannot read from sensor " + in->getName() + "!");
Alessio Netti's avatar
Alessio Netti committed
62
    }
63
64
65
    compute_internal(unit, _buffer);
}

66
void AggregatorOperator::compute_internal(U_Ptr unit, vector<reading_t>& buffer) {
67
68
    _percentileSensors.clear();
    _percentiles.clear();
69
70
71
72
73
74
75
76
77
    reading_t reading;
    AggregatorSensorBase::aggregationOps_t op;
    reading.timestamp = getTimestamp();
    // Performing the actual aggregation operation
    for(const auto& out : unit->getOutputs()) {
        op = out->getOperation();
        if(op!=AggregatorSensorBase::QTL) {
            switch (op) {
                case AggregatorSensorBase::SUM:
78
                    reading.value = computeSum(buffer);
79
80
                    break;
                case AggregatorSensorBase::AVG:
81
                    reading.value = computeAvg(buffer);
82
83
                    break;
                case AggregatorSensorBase::MIN:
84
                    reading.value = computeMin(buffer);
85
86
                    break;
                case AggregatorSensorBase::MAX:
87
                    reading.value = computeMax(buffer);
88
89
                    break;
                case AggregatorSensorBase::STD:
90
                    reading.value = computeStd(buffer);
91
                    break;
92
                case AggregatorSensorBase::OBS:
93
                    reading.value = computeObs(buffer);
94
                    break;
95
                default:
96
97
                    LOG(warning) << _name << ": Encountered unknown operation!";
                    reading.value = 0;
98
99
100
                    break;
            }
            out->storeReading(reading);
101
102
103
        } else {
            _percentileSensors.push_back(out);
            _percentiles.push_back(out->getPercentile());
Alessio Netti's avatar
Alessio Netti committed
104
        }
105
    }
106
    if(!_percentileSensors.empty()) {
107
        computePercentiles(buffer, _percentiles, _percentileResult);
108
109
110
111
        for(unsigned idx=0; idx<_percentileResult.size(); idx++) {
            reading.value = _percentileResult[idx];
            _percentileSensors[idx]->storeReading(reading);
        }
112
    }
Alessio Netti's avatar
Alessio Netti committed
113
}