AggregatorOperator.cpp 6.25 KB
Newer Older
1
//================================================================================
2
// Name        : AggregatorOperator.cpp
3
// Author      : Alessio Netti
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
5
6
7
8
9
10
11
12
13
14
15
16
// Copyright   : Leibniz Supercomputing Centre
// Description :
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2019-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
Alessio Netti's avatar
Alessio Netti committed
17
//
18
19
20
21
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
Alessio Netti's avatar
Alessio Netti committed
22
//
23
24
25
26
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
Alessio Netti's avatar
Alessio Netti committed
27

28
#include "AggregatorOperator.h"
Alessio Netti's avatar
Alessio Netti committed
29

30
AggregatorOperator::AggregatorOperator(const std::string& name) : OperatorTemplate(name) { 
31
    _window = 0;
32
    _buffer = nullptr;
33
}
Alessio Netti's avatar
Alessio Netti committed
34

35
AggregatorOperator::AggregatorOperator(const AggregatorOperator& other) : OperatorTemplate(other) {
Alessio Netti's avatar
Alessio Netti committed
36
    _window = other._window;
37
    _buffer = nullptr;
Alessio Netti's avatar
Alessio Netti committed
38
39
}

40
AggregatorOperator::~AggregatorOperator() {
Alessio Netti's avatar
Alessio Netti committed
41
42
43
44
    if(_buffer)
        delete _buffer;
}

45
void AggregatorOperator::printConfig(LOG_LEVEL ll) {
Alessio Netti's avatar
Alessio Netti committed
46
    LOG_VAR(ll) << "            Window:          " << _window;
47
    OperatorTemplate<AggregatorSensorBase>::printConfig(ll);
Alessio Netti's avatar
Alessio Netti committed
48
49
}

50
void AggregatorOperator::compute(U_Ptr unit) {
51
52
53
54
    // Clearing the buffer, if already allocated
    if(_buffer) 
        _buffer->clear();
    size_t elCtr=0;
55
    for(const auto& in : unit->getInputs()) {
Alessio Netti's avatar
Alessio Netti committed
56
        // Getting the most recent values as specified in _window
57
58
        // Since we do not clear the internal buffer, all sensor readings will be accumulated in the same vector
        elCtr = _buffer==nullptr ? 0 : _buffer->size();
Alessio Netti's avatar
Alessio Netti committed
59
        _buffer = _queryEngine.querySensor(in->getName(), _window, 0, _buffer);
60
        if(!_buffer || _buffer->size()<=elCtr)
61
            throw std::runtime_error("Operator " + _name + ": cannot read from sensor " + in->getName() + "!");
Alessio Netti's avatar
Alessio Netti committed
62
    }
63
64
65
    compute_internal(unit, _buffer);
}

66
void AggregatorOperator::compute_internal(U_Ptr unit, vector<reading_t> *buffer) {
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
    _quantileSensors.clear();
    reading_t reading;
    AggregatorSensorBase::aggregationOps_t op;
    reading.timestamp = getTimestamp();
    // Performing the actual aggregation operation
    for(const auto& out : unit->getOutputs()) {
        op = out->getOperation();
        if(op!=AggregatorSensorBase::QTL) {
            switch (op) {
                case AggregatorSensorBase::SUM:
                    reading.value = computeSum(_buffer);
                    break;
                case AggregatorSensorBase::AVG:
                    reading.value = computeAvg(_buffer);
                    break;
                case AggregatorSensorBase::MIN:
                    reading.value = computeMin(_buffer);
                    break;
                case AggregatorSensorBase::MAX:
                    reading.value = computeMax(_buffer);
                    break;
                case AggregatorSensorBase::STD:
                    reading.value = computeStd(_buffer);
                    break;
91
92
93
                case AggregatorSensorBase::OBS:
                    reading.value = computeObs(_buffer);
                    break;
94
                default:
95
96
                    LOG(warning) << _name << ": Encountered unknown operation!";
                    reading.value = 0;
97
98
99
100
101
102
                    break;
            }
            out->storeReading(reading);
        } else
            _quantileSensors.push_back(out);
    }
103

104
105
106
107
108
    if(!_quantileSensors.empty()) {
        vector<int64_t> result = computeQuantiles(_buffer);
        for(unsigned idx=0; idx<result.size(); idx++) {
            reading.value = result[idx];
            _quantileSensors[idx]->storeReading(reading);
Alessio Netti's avatar
Alessio Netti committed
109
        }
110
111
    }
}
Alessio Netti's avatar
Alessio Netti committed
112

113
int64_t AggregatorOperator::computeObs(vector<reading_t> *buffer) {
114
115
116
    return buffer->size();
}

117
int64_t AggregatorOperator::computeSum(vector<reading_t> *buffer) {
118
119
    int64_t acc=0;
    for(const auto& v : *buffer)
Alessio Netti's avatar
Alessio Netti committed
120
            acc += v.value;
121
122
    return acc;
}
Alessio Netti's avatar
Alessio Netti committed
123

124
int64_t AggregatorOperator::computeAvg(vector<reading_t> *buffer) {
125
126
127
    int64_t acc=0, ctr=buffer->size();
    for(const auto& v : *buffer)
        acc += v.value;
Alessio Netti's avatar
Alessio Netti committed
128
    acc = ctr > 0 ? acc/ctr : acc;
129
    return acc;
Alessio Netti's avatar
Alessio Netti committed
130
131
}

132
int64_t AggregatorOperator::computeMax(vector<reading_t> *buffer) {
133
    int64_t acc=0;
134
135
136
137
138
    bool maxInit=false;
    for(const auto& v : *_buffer)
        if(v.value>acc || !maxInit) {
            acc = v.value;
            maxInit = true;
Alessio Netti's avatar
Alessio Netti committed
139
        }
140
    return acc;
Alessio Netti's avatar
Alessio Netti committed
141
142
}

143
int64_t AggregatorOperator::computeMin(vector<reading_t> *buffer) {
144
    int64_t acc=0;
Alessio Netti's avatar
Alessio Netti committed
145
    bool minInit=false;
146
147
148
149
    for(const auto& v : *_buffer)
        if(v.value<acc || !minInit) {
            acc = v.value;
            minInit = true;
Alessio Netti's avatar
Alessio Netti committed
150
        }
151
152
    return acc;
}
Alessio Netti's avatar
Alessio Netti committed
153

154
int64_t AggregatorOperator::computeStd(vector<reading_t> *buffer) {
155
156
157
158
159
    int64_t avg = computeAvg(buffer);
    int64_t acc=0, val=0, ctr=buffer->size();
    for(const auto& v : *buffer) {
        val = v.value - avg;
        acc += val*val;
Alessio Netti's avatar
Alessio Netti committed
160
    }
161
162
163
    acc = ctr > 0 ? sqrt(acc/ctr) : sqrt(acc);
    return acc;
}
Alessio Netti's avatar
Alessio Netti committed
164

165
vector<int64_t> AggregatorOperator::computeQuantiles(vector<reading_t> *buffer) {
166
167
168
169
170
171
172
173
174
175
    size_t idx, mod;
    vector<int64_t> result;
    // Sorting the sensor reading buffer to extract quantiles
    std::sort(buffer->begin(), buffer->end(), [ ](const reading_t& lhs, const reading_t& rhs) { return lhs.value < rhs.value; });
    for(const auto& q : _quantileSensors) {
        idx = (buffer->size() * q->getQuantile()) / 100;
        mod = (buffer->size() * q->getQuantile()) % 100;
        result.push_back((mod==0 || idx==buffer->size()-1) ? buffer->at(idx).value : (buffer->at(idx).value + buffer->at(idx+1).value)/2);
    }
    return result;
Alessio Netti's avatar
Alessio Netti committed
176
}