JobTSAggregatorOperator.cpp 5.09 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
//================================================================================
// Name        : JobTSAggregatorOperator.cpp
// Author      : Carla Guillen
// Contact     : info@dcdb.it
// Copyright   : Leibniz Supercomputing Centre
// Description : Template implementing features to use Units in Operators.
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2018-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================

#include "JobTSAggregatorOperator.h"

#include <boost/log/sources/record_ostream.hpp>
#include <boost/log/trivial.hpp>
#include <boost/log/utility/formatting_ostream.hpp>
#include <boost/parameter/keyword.hpp>
#include <stddef.h>
#include <sys/types.h>
#include <cstdint>
#include <string>

#include "../../../common/include/logging.h"
#include "../../../common/include/timestamp.h"
#include "../../includes/CommonStatistics.h"
#include "../../includes/QueryEngine.h"

JobTSAggregatorOperator::JobTSAggregatorOperator(const std::string& name) :
		OperatorTemplate(name), JobOperatorTemplate(name), number_of_even_quantiles(0) {
}

JobTSAggregatorOperator::~JobTSAggregatorOperator() {
}

void JobTSAggregatorOperator::compute(U_Ptr unit, qeJobData& jobData) {
    // Clearing the buffer, if already allocated
	_buffer.clear();
    size_t elCtr=0;
    uint64_t my_timestamp = getTimestamp(); //TODO minus 10 seconds or so...
    // Making sure that the aggregation boundaries do not go past the job start/end time
    uint64_t jobEnd   = jobData.endTime!=0 && my_timestamp > jobData.endTime ? jobData.endTime : my_timestamp;
    uint64_t jobStart = jobEnd-my_timestamp < jobData.startTime ? jobData.startTime : jobEnd-my_timestamp;
    // Job units are hierarchical, and thus we iterate over all sub-units associated to each single node
    for(const auto& subUnit : unit->getSubUnits()) {
        // Getting the most recent values as specified in _window
        // Since we do not clear the internal buffer, all sensor readings will be accumulated in the same vector
        for(const auto& in : subUnit->getInputs()) {
            elCtr = _buffer.size();
            _queryEngine.querySensor(in->getName(), my_timestamp, my_timestamp, _buffer, false);
            if (_buffer.size() <= elCtr) {
                LOG(debug) << "Job Operator " << _name << " cannot read from sensor " << in->getName() << "!";
                return;
            }
        }
    }
    compute_internal(unit, _buffer);
}

void JobTSAggregatorOperator::compute_internal(U_Ptr unit, vector<reading_t> buffer) {
    reading_t reading;
    AggregatorSensorBase::aggregationOps_t op;
    reading.timestamp = getTimestamp();
    // Performing the actual aggregation operation
    for(const auto& out : unit->getOutputs()) {
        op = out->getOperation();
        if(op!=AggregatorSensorBase::QTL) {
            switch (op) {
                case AggregatorSensorBase::SUM:
                    reading.value = computeSum(buffer);
                    break;
                case AggregatorSensorBase::AVG:
                    reading.value = computeAvg(buffer);
                    break;
                case AggregatorSensorBase::MIN:
                    reading.value = computeMin(buffer);
                    break;
                case AggregatorSensorBase::MAX:
                    reading.value = computeMax(buffer);
                    break;
                case AggregatorSensorBase::STD:
                    reading.value = computeStd(buffer);
                    break;
                case AggregatorSensorBase::OBS:
                    reading.value = computeObs(buffer);
                    break;
                default:
                    LOG(warning) << _name << ": Encountered unknown operation!";
                    reading.value = 0;
                    break;
            }
            out->storeReading(reading);
        }
    }

    if(!_quantileSensors.empty()) {
    	vector<int64_t> quantiles;
      	computeEvenQuantiles(buffer, number_of_even_quantiles, quantiles);
        for(unsigned idx=0; idx<quantiles.size(); idx++) {
            reading.value = quantiles[idx];
            _quantileSensors[idx]->storeReading(reading);
        }
    }
}

void JobTSAggregatorOperator::compute(U_Ptr unit){
//nothing here!
}