Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

PerSystSqlOperator.cpp 13.3 KB
Newer Older
1
//================================================================================
2
// Name        : PerSystSqlOperator.cpp
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Author      : Carla Guillen
// Contact     : info@dcdb.it
// Copyright   : Leibniz Supercomputing Centre
// Description : Template implementing features to use Units in Operators.
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2018-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================

28
29
#include "PerSystSqlOperator.h"

30
31
32
33
34
#include <boost/log/sources/record_ostream.hpp>
#include <boost/log/trivial.hpp>
#include <boost/log/utility/formatting_ostream.hpp>
#include <boost/parameter/keyword.hpp>
#include <stddef.h>
35
#include <cmath>
36
#include <cstdint>
37
#include <memory>
38
#include <string>
39
#include <numeric>
Carla Guillen's avatar
Carla Guillen committed
40
#include <sstream>
41
42

#include "../../../common/include/logging.h"
43
#include "../../../common/include/sensorbase.h"
44
45
46
#include "../../../common/include/timestamp.h"
#include "../../includes/CommonStatistics.h"
#include "../../includes/QueryEngine.h"
47
#include "../../includes/UnitTemplate.h"
48

Carla Guillen's avatar
Carla Guillen committed
49

50
PerSystSqlOperator::PerSystSqlOperator(const std::string& name) :
Carla Guillen's avatar
Carla Guillen committed
51
52
53
		OperatorTemplate(name), JobOperatorTemplate(name), _number_of_even_quantiles(
				0), _severity_formula(NOFORMULA), _severity_threshold(0), _severity_exponent(
				0), _severity_max_memory(0), _go_back_ns(0), _backend(DEFAULT), _scaling_factor(
Carla Guillen's avatar
Carla Guillen committed
54
				1), _property_id(0) {
Carla Guillen Carias's avatar
Carla Guillen Carias committed
55
	_persystdb = MariaDB::getInstance();
56
57
}

58
PerSystSqlOperator::~PerSystSqlOperator() {
59
60
}

61

62
PerSystSqlOperator::PerSystSqlOperator(const PerSystSqlOperator& other) : OperatorTemplate(other._name), JobOperatorTemplate<AggregatorSensorBase>(other._name){
63
	copy(other);
64
65
66
}

PerSystSqlOperator& PerSystSqlOperator::operator=(const PerSystSqlOperator& other){
67
68
69
	JobOperatorTemplate::operator=(other);
	copy(other);
	return *this;
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
}

void PerSystSqlOperator::copy(const PerSystSqlOperator& other){
	this->_buffer = other._buffer;
	this->_quantileSensors = other._quantileSensors;
	this->_number_of_even_quantiles = other._number_of_even_quantiles;
	this->_severity_formula = other._severity_formula;
	this->_severity_threshold = other._severity_threshold;
	this->_severity_exponent = other._severity_exponent;
	this->_severity_max_memory = other._severity_max_memory;
	this->_severities = other._severities;
	this->_go_back_ns = other._go_back_ns;
	this->_backend = other._backend;
	this->_scaling_factor = other._scaling_factor;
	this->_conn.database_name = other._conn.database_name;
	this->_conn.every_x_days = other._conn.every_x_days;
	this->_conn.host = other._conn.host;
	this->_conn.password = other._conn.password;
	this->_conn.port = other._conn.port;
	this->_conn.rotation = other._conn.rotation;
	this->_conn.user = other._conn.user;
	this->_property_id = other._property_id;
92
	this->_persystdb = other._persystdb;
93
94
}

95
void PerSystSqlOperator::printConfig(LOG_LEVEL ll) {
96
	OperatorTemplate<AggregatorSensorBase>::printConfig(ll);
97
	LOG_VAR(ll) << "====================================";
98
	LOG_VAR(ll) << "PerSystSQL Operator " << _name;
99
	LOG_VAR(ll) << "====================================";
100
101
	LOG_VAR(ll) << "backend=" << _backend;
	LOG_VAR(ll) << "go_back_ms=" << _go_back_ns/1e6;
Carla Guillen's avatar
Carla Guillen committed
102
	LOG_VAR(ll) << "_scaling_factor=" << _scaling_factor;
103
104
105
106
107
108
109
	if(_backend == MARIADB){
		LOG_VAR(ll) << "PerSystSQL Operator Connection information:";
		LOG_VAR(ll) << "\tHost=" << _conn.host;
		LOG_VAR(ll) << "\tUser=" << _conn.user;
		LOG_VAR(ll) << "\tDatabase=" << _conn.database_name;
		LOG_VAR(ll) << "\tPort=" << _conn.port;
		LOG_VAR(ll) << "\tRotation=" << _conn.rotation;
110
111
112
		if(_conn.rotation == MariaDB::EVERY_XDAYS){
			LOG_VAR(ll) << "\tEvery_X_days=" << _conn.every_x_days;
		}
113
114
115
116
117
118
119
120
121
122
123
	}
	LOG_VAR(ll) << "Property Configuration:";
	LOG_VAR(ll) << "\tnumber_of_even_quantiles=" << _number_of_even_quantiles;
	LOG_VAR(ll) << "\tproperty_id=" << _property_id;
	LOG_VAR(ll) << "Severity Configuration:";
	LOG_VAR(ll) << "\tseverity_formula=" << _severity_formula;
	LOG_VAR(ll) << "\tseverity_exponent=" << _severity_exponent;
	LOG_VAR(ll) << "\tseverity_threshold=" << _severity_threshold;
	LOG_VAR(ll) << "\tseverity_max_memory=" << _severity_max_memory;
}

124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140

bool PerSystSqlOperator::execOnStart(){
	if( _backend == MARIADB ) {
		if(!_persystdb->initializeConnection(_conn.host, _conn.user, _conn.password, _conn.database_name, _conn.rotation, _conn.port, _conn.every_x_days)){
			LOG(error) << "Database not initialized";
			return false;
		}
	}
	return true;
}

void PerSystSqlOperator::execOnStop(){
	if( _backend == MARIADB ) {
		_persystdb->finalizeConnection();
	}
}

141
void PerSystSqlOperator::compute(U_Ptr unit, qeJobData& jobData) {
Carla Guillen's avatar
Carla Guillen committed
142
	// Clearing the buffer, if already allocated
143
	_buffer.clear();
Carla Guillen's avatar
Carla Guillen committed
144
145
146
	size_t elCtr = 0;
	uint64_t my_timestamp = getTimestamp() - _go_back_ns;
	// Making sure that the aggregation boundaries do not go past the job start/end time
147
148
	uint64_t jobEnd = jobData.endTime != 0 && my_timestamp > jobData.endTime ? jobData.endTime : my_timestamp;
	uint64_t jobStart =	jobEnd - my_timestamp < jobData.startTime ?	jobData.startTime : jobEnd - my_timestamp;
Carla Guillen's avatar
Carla Guillen committed
149
150
151
152
	// Job units are hierarchical, and thus we iterate over all sub-units associated to each single node
	for (const auto& subUnit : unit->getSubUnits()) {
		// Since we do not clear the internal buffer, all sensor readings will be accumulated in the same vector
		for (const auto& in : subUnit->getInputs()) {
153
154
			if( _scaling_factor == 1){
				SensorMetadata buffer;
155
156
				if(_queryEngine.queryMetadata(in->getName(), buffer) && buffer.getScale()){
					_scaling_factor = *buffer.getScale();
Carla Guillen's avatar
Carla Guillen committed
157
					LOG(debug) << "PerSystSql Operator " << _name << " using scaling factor of " << _scaling_factor;
158
159
				}
			}
160
			if (!_queryEngine.querySensor(in->getName(), my_timestamp, my_timestamp, _buffer, false)) {
161
				//LOG(debug)<< "PerSystSql Operator " << _name << " cannot read from sensor " << in->getName() << "!";
Carla Guillen's avatar
Carla Guillen committed
162
163
164
			}
		}
	}
165
	uint64_t measurement_ts = 0;
166
167
168
	if(_buffer.size() == 0){
		LOG(error) << "PerSystSql Operator " << _name << ": no data in queryEngine found!";
		return;
169
170
	} else {
		measurement_ts = _buffer[0].timestamp;
Carla Guillen's avatar
Carla Guillen committed
171
172
	}
	Aggregate_info_t agg_info;
173
174
	agg_info.timestamp = (measurement_ts/1e9);
	compute_internal(unit, _buffer, agg_info, measurement_ts); //it can't possibly be zero here
175
176

	if( _backend == MARIADB ) {
177
178
179
		if (!_persystdb->isInitialized()
				&& !_persystdb->initializeConnection(_conn.host, _conn.user, _conn.password, _conn.database_name, _conn.rotation,
						_conn.port, _conn.every_x_days)) {
180
			LOG(error) << "Database not initialized";
181
182
183
			return;
		}

184
185
		std::string table_suffix;
		if(!_persystdb->getTableSuffix(table_suffix)){
186
			LOG(error) << "Failed to create Aggregate table!";
187
188
			return;
		}
Carla Guillen Carias's avatar
Bugfix    
Carla Guillen Carias committed
189
190
		if(!_persystdb->getDBJobID(jobData.jobId, agg_info.job_id_db, jobData.userId, jobData.nodes.size()) &&
	       	!_persystdb->insertIntoJob(jobData.jobId, jobData.userId, agg_info.job_id_db, table_suffix, jobData.nodes.size()) ){
191
192
           		LOG(error) << "Job insertion not possible, no job id db available for slurm job id" << jobData.jobId;
        		return;
193
		}
194

195
		_persystdb->updateJobsLastSuffix(jobData.jobId, jobData.userId, jobData.nodes.size(), agg_info.job_id_db, table_suffix);
Carla Guillen's avatar
Carla Guillen committed
196

197

198
		_persystdb->insertInAggregateTable(table_suffix, agg_info);
Carla Guillen's avatar
Carla Guillen committed
199
	}
200
201
}

202
203
void PerSystSqlOperator::convertToDoubles(std::vector<reading_t> &buffer, std::vector<double> &douBuffer){
	for(auto &reading: buffer){
204
		double value = reading.value * _scaling_factor;
205
206
207
208
209
		douBuffer.push_back(value);
	}
}


210
void PerSystSqlOperator::compute_internal(U_Ptr& unit, vector<reading_t>& buffer, Aggregate_info_t & agg_info, uint64_t measurement_ts) {
Carla Guillen's avatar
Carla Guillen committed
211
212
213
214
	_quantileSensors.clear();

	reading_t reading;
	AggregatorSensorBase::aggregationOps_t op;
215
	reading.timestamp = measurement_ts;
Carla Guillen's avatar
Carla Guillen committed
216
217

	std::vector<double> douBuffer;
218
	convertToDoubles(buffer, douBuffer);
Carla Guillen's avatar
Carla Guillen committed
219
220
221
222
223
224
225
	// Performing the actual aggregation operation
	for (const auto& out : unit->getOutputs()) {
		op = out->getOperation();
		if (op != AggregatorSensorBase::QTL) {
			switch (op) {
			case AggregatorSensorBase::AVG:
				if (_backend == CASSANDRA) {
226
					reading.value = std::accumulate(douBuffer.begin(), douBuffer.end(), 0.0)/(douBuffer.size() * _scaling_factor);
Carla Guillen's avatar
Carla Guillen committed
227
				} else {
228
					agg_info.average = std::accumulate(douBuffer.begin(), douBuffer.end(), 0.0)/douBuffer.size();
Carla Guillen's avatar
Carla Guillen committed
229
230
231
232
233
234
235
236
				}
				break;
			case AggregatorSensorBase::OBS:
				reading.value = computeObs(buffer);
				agg_info.num_of_observations = computeObs(buffer);
				break;
			case AggregatorSensorBase::AVG_SEV:
				if (_backend == CASSANDRA) {
237
					reading.value = computeSeverityAverage(douBuffer) * SCALING_FACTOR_SEVERITY;
Carla Guillen's avatar
Carla Guillen committed
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
				} else {
					agg_info.severity_average = computeSeverityAverage(douBuffer);
				}
				break;
			default:
				LOG(warning)<< _name << ": Operation " << op << " not supported!";
				reading.value = 0;
				break;
			}
			if(_backend == CASSANDRA) {
				out->storeReading(reading);
			}
		} else {
			_quantileSensors.push_back(out);
		}
	}

	if (!_quantileSensors.empty()) {
		vector<double> quantiles;
		computeEvenQuantiles(douBuffer, _number_of_even_quantiles, quantiles);
		if (_backend == CASSANDRA) {
			for (unsigned idx = 0; idx < quantiles.size(); idx++) {
260
				reading.value = quantiles[idx] / _scaling_factor;
Carla Guillen's avatar
Carla Guillen committed
261
262
263
264
265
266
267
268
269
270
271
272
273
				_quantileSensors[idx]->storeReading(reading);
			}
		} else {
			for(auto q: quantiles){
				agg_info.quantiles.push_back(static_cast<float>(q));
			}
		}
	}
	agg_info.property_type_id = _property_id;

}

void PerSystSqlOperator::compute(U_Ptr unit) {
274
275
//nothing here!
}
Carla Guillen Carias's avatar
Carla Guillen Carias committed
276

Carla Guillen's avatar
Carla Guillen committed
277
double severity_formula1(double metric, double threshold, double exponent) {
278
	double val = metric - threshold;
Carla Guillen Carias's avatar
Carla Guillen Carias committed
279
	if (val > 0) {
280
		double ret = (pow(val, exponent));
Carla Guillen's avatar
Carla Guillen committed
281
		if (ret > 1) {
Carla Guillen Carias's avatar
Carla Guillen Carias committed
282
283
284
285
286
287
288
			return 1;
		}
		return ret;
	}
	return 0;
}

Carla Guillen's avatar
Carla Guillen committed
289
290
double severity_formula2(double metric, double threshold, double exponent) {
	if (!threshold) {
Carla Guillen Carias's avatar
Carla Guillen Carias committed
291
292
		return -1;
	}
293
	double val = metric / threshold - 1;
Carla Guillen Carias's avatar
Carla Guillen Carias committed
294
	if (val > 0) {
Carla Guillen's avatar
Carla Guillen committed
295
296
		double ret = (pow(val, exponent));
		if (ret > 1) {
Carla Guillen Carias's avatar
Carla Guillen Carias committed
297
298
299
300
301
302
303
			return 1;
		}
		return ret;
	}
	return 0;
}

Carla Guillen's avatar
Carla Guillen committed
304
double severity_formula3(double metric, double threshold, double exponent) {
Carla Guillen Carias's avatar
Carla Guillen Carias committed
305
306
307
	if (!threshold) {
		return -1;
	}
308
	double val = metric / threshold;
Carla Guillen Carias's avatar
Carla Guillen Carias committed
309
	if (val > 0) {
Carla Guillen's avatar
Carla Guillen committed
310
311
		double ret = (1 - pow(val, exponent));
		if (ret > 1) {
Carla Guillen Carias's avatar
Carla Guillen Carias committed
312
313
			return 1;
		}
Carla Guillen's avatar
Carla Guillen committed
314
		if (ret < 0) {
Carla Guillen Carias's avatar
Carla Guillen Carias committed
315
316
317
318
319
320
321
			return 0;
		}
		return ret;
	}
	return 0;
}

Carla Guillen's avatar
Carla Guillen committed
322
double severity_memory(double metric, double threshold, double max_memory) {
323
324
	double denominator = max_memory - threshold;
	double severity = -1;
Carla Guillen's avatar
Carla Guillen committed
325
326
327
	if (denominator) {
		severity = metric - threshold / (max_memory - threshold);
		if (severity > 1) {
Carla Guillen Carias's avatar
Carla Guillen Carias committed
328
			severity = 1;
Carla Guillen's avatar
Carla Guillen committed
329
		} else if (severity < 0) {
Carla Guillen Carias's avatar
Carla Guillen Carias committed
330
331
332
333
334
			severity = 0;
		}
	}
	return severity;
}
335

Carla Guillen's avatar
Carla Guillen committed
336
337
double PerSystSqlOperator::computeSeverityAverage(
		std::vector<double> & buffer) {
338
	std::vector<double> severities;
Carla Guillen's avatar
Carla Guillen committed
339
340
341
	switch (_severity_formula) {
	case (FORMULA1):
		for (auto val : buffer) {
342
			auto severity = severity_formula1(val, _severity_threshold, _severity_exponent);
Carla Guillen's avatar
Carla Guillen committed
343
344
			severities.push_back(severity);
		}
345
		break;
Carla Guillen's avatar
Carla Guillen committed
346
347
	case (FORMULA2):
		for (auto val : buffer) {
348
			auto severity = severity_formula2(val, _severity_threshold,	_severity_exponent);
Carla Guillen's avatar
Carla Guillen committed
349
350
			severities.push_back(severity);
		}
351
		break;
Carla Guillen's avatar
Carla Guillen committed
352
353
	case (FORMULA3):
		for (auto val : buffer) {
354
			auto severity = severity_formula3(val, _severity_threshold,	_severity_exponent);
Carla Guillen's avatar
Carla Guillen committed
355
356
			severities.push_back(severity);
		}
357
		break;
Carla Guillen's avatar
Carla Guillen committed
358
359
	case (MEMORY_FORMULA):
		for (auto val : buffer) {
360
			auto severity = severity_memory(val, _severity_threshold, _severity_max_memory);
Carla Guillen's avatar
Carla Guillen committed
361
362
			severities.push_back(severity);
		}
363
		break;
Carla Guillen's avatar
Carla Guillen committed
364
365
366
367
368
369
370
	case (NOFORMULA):
		for (auto val : buffer) {
			severities.push_back(severity_noformula());
		}
		break;
	default:
		return 0.0;
Carla Guillen Carias's avatar
Carla Guillen Carias committed
371
		break;
372
	}
Carla Guillen's avatar
Carla Guillen committed
373
374
375
	if (severities.size()) {
		return (std::accumulate(severities.begin(), severities.end(), 0.0)
				/ severities.size());
376
377
	}
	return 0.0;
378
}
379

380
void computeEvenQuantiles(std::vector<double> &data, const unsigned int NUMBER_QUANTILES, std::vector<double> &quantiles) {
Carla Guillen's avatar
Carla Guillen committed
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
	if (data.empty() || NUMBER_QUANTILES == 0) {
		return;
	}
	std::sort(data.begin(), data.end());
	int elementNumber = data.size();
	quantiles.resize(NUMBER_QUANTILES + 1); //+min
	double factor = elementNumber / static_cast<double>(NUMBER_QUANTILES);
	quantiles[0] = data[0]; //minimum
	quantiles[NUMBER_QUANTILES] = data[data.size() - 1]; //maximum
	for (unsigned int i = 1; i < NUMBER_QUANTILES; i++) {
		if (elementNumber > 1) {
			int idx = static_cast<int>(std::floor(i * factor));
			if (idx == 0) {
				quantiles[i] = data[0];
			} else {
				double rest = (i * factor) - idx;
397
				quantiles[i] = data[idx - 1] + rest * (data[idx] - data[idx - 1]);
Carla Guillen's avatar
Carla Guillen committed
398
399
400
401
402
			}
		} else { //optimization, we don't need to calculate all the quantiles
			quantiles[i] = data[0];
		}
	}
403
404
}