PerSystSqlOperator.cpp 13.5 KB
Newer Older
1
//================================================================================
2
// Name        : PerSystSqlOperator.cpp
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Author      : Carla Guillen
// Contact     : info@dcdb.it
// Copyright   : Leibniz Supercomputing Centre
// Description : Template implementing features to use Units in Operators.
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2018-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================

28
29
#include "PerSystSqlOperator.h"

30
31
32
33
34
#include <boost/log/sources/record_ostream.hpp>
#include <boost/log/trivial.hpp>
#include <boost/log/utility/formatting_ostream.hpp>
#include <boost/parameter/keyword.hpp>
#include <stddef.h>
35
#include <cmath>
36
#include <cstdint>
37
#include <memory>
38
#include <string>
39
#include <numeric>
Carla Guillen's avatar
Carla Guillen committed
40
#include <sstream>
41
42

#include "../../../common/include/logging.h"
43
#include "../../../common/include/sensorbase.h"
44
45
46
#include "../../../common/include/timestamp.h"
#include "../../includes/CommonStatistics.h"
#include "../../includes/QueryEngine.h"
47
#include "../../includes/UnitTemplate.h"
48

Carla Guillen's avatar
Carla Guillen committed
49

50
PerSystSqlOperator::PerSystSqlOperator(const std::string& name) :
Carla Guillen's avatar
Carla Guillen committed
51
		OperatorTemplate(name), JobOperatorTemplate(name), _number_of_even_quantiles(
Carla Guillen's avatar
Carla Guillen committed
52
				0), _batch_domain(-1), _severity_formula(NOFORMULA), _severity_threshold(0), _severity_exponent(
Carla Guillen's avatar
Carla Guillen committed
53
				0), _severity_max_memory(0), _go_back_ns(0), _backend(DEFAULT), _scaling_factor(
Alessio Netti's avatar
Alessio Netti committed
54
				1), _searchedOnceForMetaData(false), _property_id(0) {
Carla Guillen Carias's avatar
Carla Guillen Carias committed
55
	_persystdb = MariaDB::getInstance();
56
57
}

58
PerSystSqlOperator::~PerSystSqlOperator() {
59
60
}

61

62
PerSystSqlOperator::PerSystSqlOperator(const PerSystSqlOperator& other) : OperatorTemplate(other._name), JobOperatorTemplate<AggregatorSensorBase>(other._name){
63
	copy(other);
64
65
66
}

PerSystSqlOperator& PerSystSqlOperator::operator=(const PerSystSqlOperator& other){
67
68
69
	JobOperatorTemplate::operator=(other);
	copy(other);
	return *this;
70
71
72
73
74
75
}

void PerSystSqlOperator::copy(const PerSystSqlOperator& other){
	this->_buffer = other._buffer;
	this->_quantileSensors = other._quantileSensors;
	this->_number_of_even_quantiles = other._number_of_even_quantiles;
Carla Guillen's avatar
Carla Guillen committed
76
	this->_batch_domain = other._batch_domain;
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
	this->_severity_formula = other._severity_formula;
	this->_severity_threshold = other._severity_threshold;
	this->_severity_exponent = other._severity_exponent;
	this->_severity_max_memory = other._severity_max_memory;
	this->_severities = other._severities;
	this->_go_back_ns = other._go_back_ns;
	this->_backend = other._backend;
	this->_scaling_factor = other._scaling_factor;
	this->_conn.database_name = other._conn.database_name;
	this->_conn.every_x_days = other._conn.every_x_days;
	this->_conn.host = other._conn.host;
	this->_conn.password = other._conn.password;
	this->_conn.port = other._conn.port;
	this->_conn.rotation = other._conn.rotation;
	this->_conn.user = other._conn.user;
	this->_property_id = other._property_id;
93
	this->_persystdb = other._persystdb;
94
	this->_searchedOnceForMetaData = other._searchedOnceForMetaData;
95
96
}

97
void PerSystSqlOperator::printConfig(LOG_LEVEL ll) {
98
	OperatorTemplate<AggregatorSensorBase>::printConfig(ll);
99
	LOG_VAR(ll) << "====================================";
100
	LOG_VAR(ll) << "PerSystSQL Operator " << _name;
101
	LOG_VAR(ll) << "====================================";
102
103
	LOG_VAR(ll) << "backend=" << _backend;
	LOG_VAR(ll) << "go_back_ms=" << _go_back_ns/1e6;
104
105
	LOG_VAR(ll) << "scaling_factor=" << _scaling_factor;
	LOG_VAR(ll) << "batch_domain=" << _batch_domain;
106
107
108
109
110
111
112
	if(_backend == MARIADB){
		LOG_VAR(ll) << "PerSystSQL Operator Connection information:";
		LOG_VAR(ll) << "\tHost=" << _conn.host;
		LOG_VAR(ll) << "\tUser=" << _conn.user;
		LOG_VAR(ll) << "\tDatabase=" << _conn.database_name;
		LOG_VAR(ll) << "\tPort=" << _conn.port;
		LOG_VAR(ll) << "\tRotation=" << _conn.rotation;
113
114
115
		if(_conn.rotation == MariaDB::EVERY_XDAYS){
			LOG_VAR(ll) << "\tEvery_X_days=" << _conn.every_x_days;
		}
116
117
118
119
120
121
122
123
124
125
126
	}
	LOG_VAR(ll) << "Property Configuration:";
	LOG_VAR(ll) << "\tnumber_of_even_quantiles=" << _number_of_even_quantiles;
	LOG_VAR(ll) << "\tproperty_id=" << _property_id;
	LOG_VAR(ll) << "Severity Configuration:";
	LOG_VAR(ll) << "\tseverity_formula=" << _severity_formula;
	LOG_VAR(ll) << "\tseverity_exponent=" << _severity_exponent;
	LOG_VAR(ll) << "\tseverity_threshold=" << _severity_threshold;
	LOG_VAR(ll) << "\tseverity_max_memory=" << _severity_max_memory;
}

127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143

bool PerSystSqlOperator::execOnStart(){
	if( _backend == MARIADB ) {
		if(!_persystdb->initializeConnection(_conn.host, _conn.user, _conn.password, _conn.database_name, _conn.rotation, _conn.port, _conn.every_x_days)){
			LOG(error) << "Database not initialized";
			return false;
		}
	}
	return true;
}

void PerSystSqlOperator::execOnStop(){
	if( _backend == MARIADB ) {
		_persystdb->finalizeConnection();
	}
}

144
void PerSystSqlOperator::compute(U_Ptr unit, qeJobData& jobData) {
Carla Guillen's avatar
Carla Guillen committed
145
	// Clearing the buffer, if already allocated
146
	_buffer.clear();
Carla Guillen's avatar
Carla Guillen committed
147
148
149
	size_t elCtr = 0;
	uint64_t my_timestamp = getTimestamp() - _go_back_ns;
	// Job units are hierarchical, and thus we iterate over all sub-units associated to each single node
150
	std::vector<std::string> vectorOfSensorNames;
Carla Guillen's avatar
Carla Guillen committed
151
152
153
	for (const auto& subUnit : unit->getSubUnits()) {
		// Since we do not clear the internal buffer, all sensor readings will be accumulated in the same vector
		for (const auto& in : subUnit->getInputs()) {
154
			if(!_searchedOnceForMetaData){
155
				SensorMetadata buffer;
156
157
				if(_queryEngine.queryMetadata(in->getName(), buffer) && buffer.getScale()){
					_scaling_factor = *buffer.getScale();
Carla Guillen's avatar
Carla Guillen committed
158
					LOG(debug) << "PerSystSql Operator " << _name << " using scaling factor of " << _scaling_factor;
159
					_searchedOnceForMetaData = true;
160
161
				}
			}
162
			vectorOfSensorNames.push_back(in->getName());
Carla Guillen's avatar
Carla Guillen committed
163
164
		}
	}
165
166
167
168
169
170
171
	if(vectorOfSensorNames.size() == 0 ){
		LOG(debug) << "PerSystSql Operator: No names found for vectorOfSensorNames ";
		return;
	}
	if (!_queryEngine.querySensor(vectorOfSensorNames, my_timestamp, my_timestamp, _buffer, false)) {
		LOG(debug)<< "PerSystSql Operator " << _name << " cannot read vector sensor " << (*vectorOfSensorNames.begin()) << "!";
	}
172
	uint64_t measurement_ts = 0;
173
	if(_buffer.size() == 0){
174
		LOG(debug) << "PerSystSql Operator " << _name << ": no data in queryEngine found!";
175
		return;
176
177
	} else {
		measurement_ts = _buffer[0].timestamp;
Carla Guillen's avatar
Carla Guillen committed
178
179
	}
	Aggregate_info_t agg_info;
180
181
	agg_info.timestamp = (measurement_ts/1e9);
	compute_internal(unit, _buffer, agg_info, measurement_ts); //it can't possibly be zero here
182
183

	if( _backend == MARIADB ) {
184
185
186
		if (!_persystdb->isInitialized()
				&& !_persystdb->initializeConnection(_conn.host, _conn.user, _conn.password, _conn.database_name, _conn.rotation,
						_conn.port, _conn.every_x_days)) {
187
			LOG(error) << "Database not initialized";
188
189
190
			return;
		}

191
192
		std::string table_suffix;
		if(!_persystdb->getTableSuffix(table_suffix)){
193
			LOG(error) << "Failed to create Aggregate table!";
194
195
			return;
		}
Carla Guillen's avatar
Carla Guillen committed
196
197
		if(!_persystdb->getDBJobID(jobData.jobId, agg_info.job_id_db, jobData.userId, jobData.nodes.size(), _batch_domain) &&
	       	!_persystdb->insertIntoJob(jobData.jobId, jobData.userId, agg_info.job_id_db, table_suffix, jobData.nodes.size(), _batch_domain) ){
198
199
           		LOG(error) << "Job insertion not possible, no job id db available for slurm job id" << jobData.jobId;
        		return;
200
		}
201

202
		_persystdb->updateJobsLastSuffix(jobData.jobId, jobData.userId, jobData.nodes.size(), agg_info.job_id_db, table_suffix);
Carla Guillen's avatar
Carla Guillen committed
203

204

205
		_persystdb->insertInAggregateTable(table_suffix, agg_info);
Carla Guillen's avatar
Carla Guillen committed
206
	}
207
208
}

209
210
void PerSystSqlOperator::convertToDoubles(std::vector<reading_t> &buffer, std::vector<double> &douBuffer){
	for(auto &reading: buffer){
211
		double value = reading.value * _scaling_factor;
212
213
214
215
216
		douBuffer.push_back(value);
	}
}


217
void PerSystSqlOperator::compute_internal(U_Ptr& unit, vector<reading_t>& buffer, Aggregate_info_t & agg_info, uint64_t measurement_ts) {
Carla Guillen's avatar
Carla Guillen committed
218
219
220
221
	_quantileSensors.clear();

	reading_t reading;
	AggregatorSensorBase::aggregationOps_t op;
222
	reading.timestamp = measurement_ts;
Carla Guillen's avatar
Carla Guillen committed
223
224

	std::vector<double> douBuffer;
225
	convertToDoubles(buffer, douBuffer);
Carla Guillen's avatar
Carla Guillen committed
226
227
228
229
230
231
232
	// Performing the actual aggregation operation
	for (const auto& out : unit->getOutputs()) {
		op = out->getOperation();
		if (op != AggregatorSensorBase::QTL) {
			switch (op) {
			case AggregatorSensorBase::AVG:
				if (_backend == CASSANDRA) {
233
					reading.value = std::accumulate(douBuffer.begin(), douBuffer.end(), 0.0)/(douBuffer.size() * _scaling_factor);
Carla Guillen's avatar
Carla Guillen committed
234
				} else {
235
					agg_info.average = std::accumulate(douBuffer.begin(), douBuffer.end(), 0.0)/douBuffer.size();
Carla Guillen's avatar
Carla Guillen committed
236
237
238
239
240
241
242
243
				}
				break;
			case AggregatorSensorBase::OBS:
				reading.value = computeObs(buffer);
				agg_info.num_of_observations = computeObs(buffer);
				break;
			case AggregatorSensorBase::AVG_SEV:
				if (_backend == CASSANDRA) {
244
					reading.value = computeSeverityAverage(douBuffer) * SCALING_FACTOR_SEVERITY;
Carla Guillen's avatar
Carla Guillen committed
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
				} else {
					agg_info.severity_average = computeSeverityAverage(douBuffer);
				}
				break;
			default:
				LOG(warning)<< _name << ": Operation " << op << " not supported!";
				reading.value = 0;
				break;
			}
			if(_backend == CASSANDRA) {
				out->storeReading(reading);
			}
		} else {
			_quantileSensors.push_back(out);
		}
	}

	if (!_quantileSensors.empty()) {
		vector<double> quantiles;
		computeEvenQuantiles(douBuffer, _number_of_even_quantiles, quantiles);
		if (_backend == CASSANDRA) {
			for (unsigned idx = 0; idx < quantiles.size(); idx++) {
267
				reading.value = quantiles[idx] / _scaling_factor;
Carla Guillen's avatar
Carla Guillen committed
268
269
270
271
272
273
274
275
276
277
278
279
280
				_quantileSensors[idx]->storeReading(reading);
			}
		} else {
			for(auto q: quantiles){
				agg_info.quantiles.push_back(static_cast<float>(q));
			}
		}
	}
	agg_info.property_type_id = _property_id;

}

void PerSystSqlOperator::compute(U_Ptr unit) {
281
282
//nothing here!
}
Carla Guillen Carias's avatar
Carla Guillen Carias committed
283

Carla Guillen's avatar
Carla Guillen committed
284
double severity_formula1(double metric, double threshold, double exponent) {
285
	double val = metric - threshold;
Carla Guillen Carias's avatar
Carla Guillen Carias committed
286
	if (val > 0) {
287
		double ret = (pow(val, exponent));
Carla Guillen's avatar
Carla Guillen committed
288
		if (ret > 1) {
Carla Guillen Carias's avatar
Carla Guillen Carias committed
289
290
291
292
293
294
295
			return 1;
		}
		return ret;
	}
	return 0;
}

Carla Guillen's avatar
Carla Guillen committed
296
297
double severity_formula2(double metric, double threshold, double exponent) {
	if (!threshold) {
Carla Guillen Carias's avatar
Carla Guillen Carias committed
298
299
		return -1;
	}
300
	double val = metric / threshold - 1;
Carla Guillen Carias's avatar
Carla Guillen Carias committed
301
	if (val > 0) {
Carla Guillen's avatar
Carla Guillen committed
302
303
		double ret = (pow(val, exponent));
		if (ret > 1) {
Carla Guillen Carias's avatar
Carla Guillen Carias committed
304
305
306
307
308
309
310
			return 1;
		}
		return ret;
	}
	return 0;
}

Carla Guillen's avatar
Carla Guillen committed
311
double severity_formula3(double metric, double threshold, double exponent) {
Carla Guillen Carias's avatar
Carla Guillen Carias committed
312
313
314
	if (!threshold) {
		return -1;
	}
315
	double val = metric / threshold;
Carla Guillen Carias's avatar
Carla Guillen Carias committed
316
	if (val > 0) {
Carla Guillen's avatar
Carla Guillen committed
317
318
		double ret = (1 - pow(val, exponent));
		if (ret > 1) {
Carla Guillen Carias's avatar
Carla Guillen Carias committed
319
320
			return 1;
		}
Carla Guillen's avatar
Carla Guillen committed
321
		if (ret < 0) {
Carla Guillen Carias's avatar
Carla Guillen Carias committed
322
323
324
325
326
327
328
			return 0;
		}
		return ret;
	}
	return 0;
}

Carla Guillen's avatar
Carla Guillen committed
329
double severity_memory(double metric, double threshold, double max_memory) {
330
331
	double denominator = max_memory - threshold;
	double severity = -1;
Carla Guillen's avatar
Carla Guillen committed
332
333
334
	if (denominator) {
		severity = metric - threshold / (max_memory - threshold);
		if (severity > 1) {
Carla Guillen Carias's avatar
Carla Guillen Carias committed
335
			severity = 1;
Carla Guillen's avatar
Carla Guillen committed
336
		} else if (severity < 0) {
Carla Guillen Carias's avatar
Carla Guillen Carias committed
337
338
339
340
341
			severity = 0;
		}
	}
	return severity;
}
342

Carla Guillen's avatar
Carla Guillen committed
343
344
double PerSystSqlOperator::computeSeverityAverage(
		std::vector<double> & buffer) {
345
	std::vector<double> severities;
Carla Guillen's avatar
Carla Guillen committed
346
347
348
	switch (_severity_formula) {
	case (FORMULA1):
		for (auto val : buffer) {
349
			auto severity = severity_formula1(val, _severity_threshold, _severity_exponent);
Carla Guillen's avatar
Carla Guillen committed
350
351
			severities.push_back(severity);
		}
352
		break;
Carla Guillen's avatar
Carla Guillen committed
353
354
	case (FORMULA2):
		for (auto val : buffer) {
355
			auto severity = severity_formula2(val, _severity_threshold,	_severity_exponent);
Carla Guillen's avatar
Carla Guillen committed
356
357
			severities.push_back(severity);
		}
358
		break;
Carla Guillen's avatar
Carla Guillen committed
359
360
	case (FORMULA3):
		for (auto val : buffer) {
361
			auto severity = severity_formula3(val, _severity_threshold,	_severity_exponent);
Carla Guillen's avatar
Carla Guillen committed
362
363
			severities.push_back(severity);
		}
364
		break;
Carla Guillen's avatar
Carla Guillen committed
365
366
	case (MEMORY_FORMULA):
		for (auto val : buffer) {
367
			auto severity = severity_memory(val, _severity_threshold, _severity_max_memory);
Carla Guillen's avatar
Carla Guillen committed
368
369
			severities.push_back(severity);
		}
370
		break;
Carla Guillen's avatar
Carla Guillen committed
371
372
373
374
375
376
377
	case (NOFORMULA):
		for (auto val : buffer) {
			severities.push_back(severity_noformula());
		}
		break;
	default:
		return 0.0;
Carla Guillen Carias's avatar
Carla Guillen Carias committed
378
		break;
379
	}
Carla Guillen's avatar
Carla Guillen committed
380
381
382
	if (severities.size()) {
		return (std::accumulate(severities.begin(), severities.end(), 0.0)
				/ severities.size());
383
384
	}
	return 0.0;
385
}
386

387
void computeEvenQuantiles(std::vector<double> &data, const unsigned int NUMBER_QUANTILES, std::vector<double> &quantiles) {
Carla Guillen's avatar
Carla Guillen committed
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
	if (data.empty() || NUMBER_QUANTILES == 0) {
		return;
	}
	std::sort(data.begin(), data.end());
	int elementNumber = data.size();
	quantiles.resize(NUMBER_QUANTILES + 1); //+min
	double factor = elementNumber / static_cast<double>(NUMBER_QUANTILES);
	quantiles[0] = data[0]; //minimum
	quantiles[NUMBER_QUANTILES] = data[data.size() - 1]; //maximum
	for (unsigned int i = 1; i < NUMBER_QUANTILES; i++) {
		if (elementNumber > 1) {
			int idx = static_cast<int>(std::floor(i * factor));
			if (idx == 0) {
				quantiles[i] = data[0];
			} else {
				double rest = (i * factor) - idx;
404
				quantiles[i] = data[idx - 1] + rest * (data[idx] - data[idx - 1]);
Carla Guillen's avatar
Carla Guillen committed
405
406
407
408
409
			}
		} else { //optimization, we don't need to calculate all the quantiles
			quantiles[i] = data[0];
		}
	}
410
411
}