PerSystSqlOperator.cpp 13.1 KB
Newer Older
1
//================================================================================
2
// Name        : PerSystSqlOperator.cpp
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Author      : Carla Guillen
// Contact     : info@dcdb.it
// Copyright   : Leibniz Supercomputing Centre
// Description : Template implementing features to use Units in Operators.
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2018-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================

28
29
#include "PerSystSqlOperator.h"

30
31
32
33
34
#include <boost/log/sources/record_ostream.hpp>
#include <boost/log/trivial.hpp>
#include <boost/log/utility/formatting_ostream.hpp>
#include <boost/parameter/keyword.hpp>
#include <stddef.h>
35
#include <cmath>
36
#include <cstdint>
37
#include <memory>
38
#include <string>
39
#include <numeric>
Carla Guillen's avatar
Carla Guillen committed
40
#include <sstream>
41
42

#include "../../../common/include/logging.h"
43
#include "../../../common/include/sensorbase.h"
44
45
46
#include "../../../common/include/timestamp.h"
#include "../../includes/CommonStatistics.h"
#include "../../includes/QueryEngine.h"
47
#include "../../includes/UnitTemplate.h"
48

Carla Guillen's avatar
Carla Guillen committed
49

50
PerSystSqlOperator::PerSystSqlOperator(const std::string& name) :
Carla Guillen's avatar
Carla Guillen committed
51
52
53
		OperatorTemplate(name), JobOperatorTemplate(name), _number_of_even_quantiles(
				0), _severity_formula(NOFORMULA), _severity_threshold(0), _severity_exponent(
				0), _severity_max_memory(0), _go_back_ns(0), _backend(DEFAULT), _scaling_factor(
Carla Guillen's avatar
Carla Guillen committed
54
				1), _property_id(0) {
Carla Guillen Carias's avatar
Carla Guillen Carias committed
55
	_persystdb = MariaDB::getInstance();
56
57
}

58
PerSystSqlOperator::~PerSystSqlOperator() {
59
60
}

61

62
PerSystSqlOperator::PerSystSqlOperator(const PerSystSqlOperator& other) : OperatorTemplate(other._name), JobOperatorTemplate<AggregatorSensorBase>(other._name){
63
	copy(other);
64
65
66
}

PerSystSqlOperator& PerSystSqlOperator::operator=(const PerSystSqlOperator& other){
67
68
69
	JobOperatorTemplate::operator=(other);
	copy(other);
	return *this;
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
}

void PerSystSqlOperator::copy(const PerSystSqlOperator& other){
	this->_buffer = other._buffer;
	this->_quantileSensors = other._quantileSensors;
	this->_number_of_even_quantiles = other._number_of_even_quantiles;
	this->_severity_formula = other._severity_formula;
	this->_severity_threshold = other._severity_threshold;
	this->_severity_exponent = other._severity_exponent;
	this->_severity_max_memory = other._severity_max_memory;
	this->_severities = other._severities;
	this->_go_back_ns = other._go_back_ns;
	this->_backend = other._backend;
	this->_scaling_factor = other._scaling_factor;
	this->_conn.database_name = other._conn.database_name;
	this->_conn.every_x_days = other._conn.every_x_days;
	this->_conn.host = other._conn.host;
	this->_conn.password = other._conn.password;
	this->_conn.port = other._conn.port;
	this->_conn.rotation = other._conn.rotation;
	this->_conn.user = other._conn.user;
	this->_property_id = other._property_id;
92
	this->_persystdb = other._persystdb;
93
94
}

95
void PerSystSqlOperator::printConfig(LOG_LEVEL ll) {
96
	OperatorTemplate<AggregatorSensorBase>::printConfig(ll);
97
	LOG_VAR(ll) << "====================================";
98
	LOG_VAR(ll) << "PerSystSQL Operator " << _name;
99
	LOG_VAR(ll) << "====================================";
100
101
	LOG_VAR(ll) << "backend=" << _backend;
	LOG_VAR(ll) << "go_back_ms=" << _go_back_ns/1e6;
Carla Guillen's avatar
Carla Guillen committed
102
	LOG_VAR(ll) << "_scaling_factor=" << _scaling_factor;
103
104
105
106
107
108
109
	if(_backend == MARIADB){
		LOG_VAR(ll) << "PerSystSQL Operator Connection information:";
		LOG_VAR(ll) << "\tHost=" << _conn.host;
		LOG_VAR(ll) << "\tUser=" << _conn.user;
		LOG_VAR(ll) << "\tDatabase=" << _conn.database_name;
		LOG_VAR(ll) << "\tPort=" << _conn.port;
		LOG_VAR(ll) << "\tRotation=" << _conn.rotation;
110
111
112
		if(_conn.rotation == MariaDB::EVERY_XDAYS){
			LOG_VAR(ll) << "\tEvery_X_days=" << _conn.every_x_days;
		}
113
114
115
116
117
118
119
120
121
122
123
	}
	LOG_VAR(ll) << "Property Configuration:";
	LOG_VAR(ll) << "\tnumber_of_even_quantiles=" << _number_of_even_quantiles;
	LOG_VAR(ll) << "\tproperty_id=" << _property_id;
	LOG_VAR(ll) << "Severity Configuration:";
	LOG_VAR(ll) << "\tseverity_formula=" << _severity_formula;
	LOG_VAR(ll) << "\tseverity_exponent=" << _severity_exponent;
	LOG_VAR(ll) << "\tseverity_threshold=" << _severity_threshold;
	LOG_VAR(ll) << "\tseverity_max_memory=" << _severity_max_memory;
}

124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140

bool PerSystSqlOperator::execOnStart(){
	if( _backend == MARIADB ) {
		if(!_persystdb->initializeConnection(_conn.host, _conn.user, _conn.password, _conn.database_name, _conn.rotation, _conn.port, _conn.every_x_days)){
			LOG(error) << "Database not initialized";
			return false;
		}
	}
	return true;
}

void PerSystSqlOperator::execOnStop(){
	if( _backend == MARIADB ) {
		_persystdb->finalizeConnection();
	}
}

141
void PerSystSqlOperator::compute(U_Ptr unit, qeJobData& jobData) {
Carla Guillen's avatar
Carla Guillen committed
142
	// Clearing the buffer, if already allocated
143
	_buffer.clear();
Carla Guillen's avatar
Carla Guillen committed
144
145
146
	size_t elCtr = 0;
	uint64_t my_timestamp = getTimestamp() - _go_back_ns;
	// Making sure that the aggregation boundaries do not go past the job start/end time
147
148
	uint64_t jobEnd = jobData.endTime != 0 && my_timestamp > jobData.endTime ? jobData.endTime : my_timestamp;
	uint64_t jobStart =	jobEnd - my_timestamp < jobData.startTime ?	jobData.startTime : jobEnd - my_timestamp;
Carla Guillen's avatar
Carla Guillen committed
149
150
151
152
	// Job units are hierarchical, and thus we iterate over all sub-units associated to each single node
	for (const auto& subUnit : unit->getSubUnits()) {
		// Since we do not clear the internal buffer, all sensor readings will be accumulated in the same vector
		for (const auto& in : subUnit->getInputs()) {
153
154
			if( _scaling_factor == 1){
				SensorMetadata buffer;
155
156
				if(_queryEngine.queryMetadata(in->getName(), buffer) && buffer.getScale()){
					_scaling_factor = *buffer.getScale();
Carla Guillen's avatar
Carla Guillen committed
157
					LOG(debug) << "PerSystSql Operator " << _name << " using scaling factor of " << _scaling_factor;
158
159
				}
			}
160
			if (!_queryEngine.querySensor(in->getName(), my_timestamp, my_timestamp, _buffer, false)) {
161
				//LOG(debug)<< "PerSystSql Operator " << _name << " cannot read from sensor " << in->getName() << "!";
Carla Guillen's avatar
Carla Guillen committed
162
163
164
			}
		}
	}
165
166
167
168

	if(_buffer.size() == 0){
		LOG(error) << "PerSystSql Operator " << _name << ": no data in queryEngine found!";
		return;
Carla Guillen's avatar
Carla Guillen committed
169
	}
170

Carla Guillen's avatar
Carla Guillen committed
171
	Aggregate_info_t agg_info;
172
173
174
	compute_internal(unit, _buffer, agg_info);

	if( _backend == MARIADB ) {
175
176
177
		if (!_persystdb->isInitialized()
				&& !_persystdb->initializeConnection(_conn.host, _conn.user, _conn.password, _conn.database_name, _conn.rotation,
						_conn.port, _conn.every_x_days)) {
178
			LOG(error) << "Database not initialized";
179
180
181
			return;
		}

182
183
		std::string table_suffix;
		if(!_persystdb->getTableSuffix(table_suffix)){
184
			LOG(error) << "Failed to create Aggregate table!";
185
186
			return;
		}
Carla Guillen Carias's avatar
Bugfix    
Carla Guillen Carias committed
187
188
		if(!_persystdb->getDBJobID(jobData.jobId, agg_info.job_id_db, jobData.userId, jobData.nodes.size()) &&
	       	!_persystdb->insertIntoJob(jobData.jobId, jobData.userId, agg_info.job_id_db, table_suffix, jobData.nodes.size()) ){
189
190
           		LOG(error) << "Job insertion not possible, no job id db available for slurm job id" << jobData.jobId;
        		return;
191
		}
192

193
		_persystdb->updateJobsLastSuffix(jobData.jobId, jobData.userId, jobData.nodes.size(), agg_info.job_id_db, table_suffix);
Carla Guillen's avatar
Carla Guillen committed
194

195
		agg_info.timestamp = (my_timestamp/1e9);
196
		_persystdb->insertInAggregateTable(table_suffix, agg_info);
Carla Guillen's avatar
Carla Guillen committed
197
	}
198
199
}

200
201
void PerSystSqlOperator::convertToDoubles(std::vector<reading_t> &buffer, std::vector<double> &douBuffer){
	for(auto &reading: buffer){
202
		double value = reading.value * _scaling_factor;
203
204
205
206
207
		douBuffer.push_back(value);
	}
}


208
void PerSystSqlOperator::compute_internal(U_Ptr& unit, vector<reading_t>& buffer, Aggregate_info_t & agg_info) {
Carla Guillen's avatar
Carla Guillen committed
209
210
211
212
213
214
215
	_quantileSensors.clear();

	reading_t reading;
	AggregatorSensorBase::aggregationOps_t op;
	reading.timestamp = getTimestamp() - _go_back_ns;

	std::vector<double> douBuffer;
216
	convertToDoubles(buffer, douBuffer);
Carla Guillen's avatar
Carla Guillen committed
217
218
219
220
221
222
223
	// Performing the actual aggregation operation
	for (const auto& out : unit->getOutputs()) {
		op = out->getOperation();
		if (op != AggregatorSensorBase::QTL) {
			switch (op) {
			case AggregatorSensorBase::AVG:
				if (_backend == CASSANDRA) {
224
					reading.value = std::accumulate(douBuffer.begin(), douBuffer.end(), 0.0)/(douBuffer.size() * _scaling_factor);
Carla Guillen's avatar
Carla Guillen committed
225
				} else {
226
					agg_info.average = std::accumulate(douBuffer.begin(), douBuffer.end(), 0.0)/douBuffer.size();
Carla Guillen's avatar
Carla Guillen committed
227
228
229
230
231
232
233
234
				}
				break;
			case AggregatorSensorBase::OBS:
				reading.value = computeObs(buffer);
				agg_info.num_of_observations = computeObs(buffer);
				break;
			case AggregatorSensorBase::AVG_SEV:
				if (_backend == CASSANDRA) {
235
					reading.value = computeSeverityAverage(douBuffer) * SCALING_FACTOR_SEVERITY;
Carla Guillen's avatar
Carla Guillen committed
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
				} else {
					agg_info.severity_average = computeSeverityAverage(douBuffer);
				}
				break;
			default:
				LOG(warning)<< _name << ": Operation " << op << " not supported!";
				reading.value = 0;
				break;
			}
			if(_backend == CASSANDRA) {
				out->storeReading(reading);
			}
		} else {
			_quantileSensors.push_back(out);
		}
	}

	if (!_quantileSensors.empty()) {
		vector<double> quantiles;
		computeEvenQuantiles(douBuffer, _number_of_even_quantiles, quantiles);
		if (_backend == CASSANDRA) {
			for (unsigned idx = 0; idx < quantiles.size(); idx++) {
258
				reading.value = quantiles[idx] / _scaling_factor;
Carla Guillen's avatar
Carla Guillen committed
259
260
261
262
263
264
265
266
267
268
269
270
271
				_quantileSensors[idx]->storeReading(reading);
			}
		} else {
			for(auto q: quantiles){
				agg_info.quantiles.push_back(static_cast<float>(q));
			}
		}
	}
	agg_info.property_type_id = _property_id;

}

void PerSystSqlOperator::compute(U_Ptr unit) {
272
273
//nothing here!
}
Carla Guillen Carias's avatar
Carla Guillen Carias committed
274

Carla Guillen's avatar
Carla Guillen committed
275
double severity_formula1(double metric, double threshold, double exponent) {
276
	double val = metric - threshold;
Carla Guillen Carias's avatar
Carla Guillen Carias committed
277
	if (val > 0) {
278
		double ret = (pow(val, exponent));
Carla Guillen's avatar
Carla Guillen committed
279
		if (ret > 1) {
Carla Guillen Carias's avatar
Carla Guillen Carias committed
280
281
282
283
284
285
286
			return 1;
		}
		return ret;
	}
	return 0;
}

Carla Guillen's avatar
Carla Guillen committed
287
288
double severity_formula2(double metric, double threshold, double exponent) {
	if (!threshold) {
Carla Guillen Carias's avatar
Carla Guillen Carias committed
289
290
		return -1;
	}
291
	double val = metric / threshold - 1;
Carla Guillen Carias's avatar
Carla Guillen Carias committed
292
	if (val > 0) {
Carla Guillen's avatar
Carla Guillen committed
293
294
		double ret = (pow(val, exponent));
		if (ret > 1) {
Carla Guillen Carias's avatar
Carla Guillen Carias committed
295
296
297
298
299
300
301
			return 1;
		}
		return ret;
	}
	return 0;
}

Carla Guillen's avatar
Carla Guillen committed
302
double severity_formula3(double metric, double threshold, double exponent) {
Carla Guillen Carias's avatar
Carla Guillen Carias committed
303
304
305
	if (!threshold) {
		return -1;
	}
306
	double val = metric / threshold;
Carla Guillen Carias's avatar
Carla Guillen Carias committed
307
	if (val > 0) {
Carla Guillen's avatar
Carla Guillen committed
308
309
		double ret = (1 - pow(val, exponent));
		if (ret > 1) {
Carla Guillen Carias's avatar
Carla Guillen Carias committed
310
311
			return 1;
		}
Carla Guillen's avatar
Carla Guillen committed
312
		if (ret < 0) {
Carla Guillen Carias's avatar
Carla Guillen Carias committed
313
314
315
316
317
318
319
			return 0;
		}
		return ret;
	}
	return 0;
}

Carla Guillen's avatar
Carla Guillen committed
320
double severity_memory(double metric, double threshold, double max_memory) {
321
322
	double denominator = max_memory - threshold;
	double severity = -1;
Carla Guillen's avatar
Carla Guillen committed
323
324
325
	if (denominator) {
		severity = metric - threshold / (max_memory - threshold);
		if (severity > 1) {
Carla Guillen Carias's avatar
Carla Guillen Carias committed
326
			severity = 1;
Carla Guillen's avatar
Carla Guillen committed
327
		} else if (severity < 0) {
Carla Guillen Carias's avatar
Carla Guillen Carias committed
328
329
330
331
332
			severity = 0;
		}
	}
	return severity;
}
333

Carla Guillen's avatar
Carla Guillen committed
334
335
double PerSystSqlOperator::computeSeverityAverage(
		std::vector<double> & buffer) {
336
	std::vector<double> severities;
Carla Guillen's avatar
Carla Guillen committed
337
338
339
	switch (_severity_formula) {
	case (FORMULA1):
		for (auto val : buffer) {
340
			auto severity = severity_formula1(val, _severity_threshold, _severity_exponent);
Carla Guillen's avatar
Carla Guillen committed
341
342
			severities.push_back(severity);
		}
343
		break;
Carla Guillen's avatar
Carla Guillen committed
344
345
	case (FORMULA2):
		for (auto val : buffer) {
346
			auto severity = severity_formula2(val, _severity_threshold,	_severity_exponent);
Carla Guillen's avatar
Carla Guillen committed
347
348
			severities.push_back(severity);
		}
349
		break;
Carla Guillen's avatar
Carla Guillen committed
350
351
	case (FORMULA3):
		for (auto val : buffer) {
352
			auto severity = severity_formula3(val, _severity_threshold,	_severity_exponent);
Carla Guillen's avatar
Carla Guillen committed
353
354
			severities.push_back(severity);
		}
355
		break;
Carla Guillen's avatar
Carla Guillen committed
356
357
	case (MEMORY_FORMULA):
		for (auto val : buffer) {
358
			auto severity = severity_memory(val, _severity_threshold, _severity_max_memory);
Carla Guillen's avatar
Carla Guillen committed
359
360
			severities.push_back(severity);
		}
361
		break;
Carla Guillen's avatar
Carla Guillen committed
362
363
364
365
366
367
368
	case (NOFORMULA):
		for (auto val : buffer) {
			severities.push_back(severity_noformula());
		}
		break;
	default:
		return 0.0;
Carla Guillen Carias's avatar
Carla Guillen Carias committed
369
		break;
370
	}
Carla Guillen's avatar
Carla Guillen committed
371
372
373
	if (severities.size()) {
		return (std::accumulate(severities.begin(), severities.end(), 0.0)
				/ severities.size());
374
375
	}
	return 0.0;
376
}
377

378
void computeEvenQuantiles(std::vector<double> &data, const unsigned int NUMBER_QUANTILES, std::vector<double> &quantiles) {
Carla Guillen's avatar
Carla Guillen committed
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
	if (data.empty() || NUMBER_QUANTILES == 0) {
		return;
	}
	std::sort(data.begin(), data.end());
	int elementNumber = data.size();
	quantiles.resize(NUMBER_QUANTILES + 1); //+min
	double factor = elementNumber / static_cast<double>(NUMBER_QUANTILES);
	quantiles[0] = data[0]; //minimum
	quantiles[NUMBER_QUANTILES] = data[data.size() - 1]; //maximum
	for (unsigned int i = 1; i < NUMBER_QUANTILES; i++) {
		if (elementNumber > 1) {
			int idx = static_cast<int>(std::floor(i * factor));
			if (idx == 0) {
				quantiles[i] = data[0];
			} else {
				double rest = (i * factor) - idx;
395
				quantiles[i] = data[idx - 1] + rest * (data[idx] - data[idx - 1]);
Carla Guillen's avatar
Carla Guillen committed
396
397
398
399
400
			}
		} else { //optimization, we don't need to calculate all the quantiles
			quantiles[i] = data[0];
		}
	}
401
402
}