PerSystSqlOperator.cpp 13.5 KB
Newer Older
1
//================================================================================
2
// Name        : PerSystSqlOperator.cpp
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Author      : Carla Guillen
// Contact     : info@dcdb.it
// Copyright   : Leibniz Supercomputing Centre
// Description : Template implementing features to use Units in Operators.
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2018-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================

28
29
#include "PerSystSqlOperator.h"

30
31
32
33
34
#include <boost/log/sources/record_ostream.hpp>
#include <boost/log/trivial.hpp>
#include <boost/log/utility/formatting_ostream.hpp>
#include <boost/parameter/keyword.hpp>
#include <stddef.h>
35
#include <cmath>
36
#include <cstdint>
37
#include <memory>
38
#include <string>
39
#include <numeric>
Carla Guillen's avatar
Carla Guillen committed
40
#include <sstream>
41
42

#include "../../../common/include/logging.h"
43
#include "../../../common/include/sensorbase.h"
44
45
46
#include "../../../common/include/timestamp.h"
#include "../../includes/CommonStatistics.h"
#include "../../includes/QueryEngine.h"
47
#include "../../includes/UnitTemplate.h"
48

Carla Guillen's avatar
Carla Guillen committed
49

50
PerSystSqlOperator::PerSystSqlOperator(const std::string& name) :
Carla Guillen's avatar
Carla Guillen committed
51
52
53
		OperatorTemplate(name), JobOperatorTemplate(name), _number_of_even_quantiles(
				0), _severity_formula(NOFORMULA), _severity_threshold(0), _severity_exponent(
				0), _severity_max_memory(0), _go_back_ns(0), _backend(DEFAULT), _scaling_factor(
Carla Guillen's avatar
Carla Guillen committed
54
				1), _property_id(0) {
Carla Guillen Carias's avatar
Carla Guillen Carias committed
55
	_persystdb = MariaDB::getInstance();
56
57
}

58
PerSystSqlOperator::~PerSystSqlOperator() {
59
60
}

61

62
PerSystSqlOperator::PerSystSqlOperator(const PerSystSqlOperator& other) : OperatorTemplate(other._name), JobOperatorTemplate<AggregatorSensorBase>(other._name){
63
	copy(other);
64
65
66
}

PerSystSqlOperator& PerSystSqlOperator::operator=(const PerSystSqlOperator& other){
67
68
69
	JobOperatorTemplate::operator=(other);
	copy(other);
	return *this;
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
}

void PerSystSqlOperator::copy(const PerSystSqlOperator& other){
	this->_buffer = other._buffer;
	this->_quantileSensors = other._quantileSensors;
	this->_number_of_even_quantiles = other._number_of_even_quantiles;
	this->_severity_formula = other._severity_formula;
	this->_severity_threshold = other._severity_threshold;
	this->_severity_exponent = other._severity_exponent;
	this->_severity_max_memory = other._severity_max_memory;
	this->_severities = other._severities;
	this->_go_back_ns = other._go_back_ns;
	this->_backend = other._backend;
	this->_scaling_factor = other._scaling_factor;
	this->_conn.database_name = other._conn.database_name;
	this->_conn.every_x_days = other._conn.every_x_days;
	this->_conn.host = other._conn.host;
	this->_conn.password = other._conn.password;
	this->_conn.port = other._conn.port;
	this->_conn.rotation = other._conn.rotation;
	this->_conn.user = other._conn.user;
	this->_property_id = other._property_id;
92
	this->_persystdb = other._persystdb;
93
94
}

95
void PerSystSqlOperator::printConfig(LOG_LEVEL ll) {
96
	OperatorTemplate<AggregatorSensorBase>::printConfig(ll);
97
	LOG_VAR(ll) << "====================================";
98
	LOG_VAR(ll) << "PerSystSQL Operator " << _name;
99
	LOG_VAR(ll) << "====================================";
100
101
	LOG_VAR(ll) << "backend=" << _backend;
	LOG_VAR(ll) << "go_back_ms=" << _go_back_ns/1e6;
Carla Guillen's avatar
Carla Guillen committed
102
	LOG_VAR(ll) << "_scaling_factor=" << _scaling_factor;
103
104
105
106
107
108
109
	if(_backend == MARIADB){
		LOG_VAR(ll) << "PerSystSQL Operator Connection information:";
		LOG_VAR(ll) << "\tHost=" << _conn.host;
		LOG_VAR(ll) << "\tUser=" << _conn.user;
		LOG_VAR(ll) << "\tDatabase=" << _conn.database_name;
		LOG_VAR(ll) << "\tPort=" << _conn.port;
		LOG_VAR(ll) << "\tRotation=" << _conn.rotation;
110
111
112
		if(_conn.rotation == MariaDB::EVERY_XDAYS){
			LOG_VAR(ll) << "\tEvery_X_days=" << _conn.every_x_days;
		}
113
114
115
116
117
118
119
120
121
122
123
	}
	LOG_VAR(ll) << "Property Configuration:";
	LOG_VAR(ll) << "\tnumber_of_even_quantiles=" << _number_of_even_quantiles;
	LOG_VAR(ll) << "\tproperty_id=" << _property_id;
	LOG_VAR(ll) << "Severity Configuration:";
	LOG_VAR(ll) << "\tseverity_formula=" << _severity_formula;
	LOG_VAR(ll) << "\tseverity_exponent=" << _severity_exponent;
	LOG_VAR(ll) << "\tseverity_threshold=" << _severity_threshold;
	LOG_VAR(ll) << "\tseverity_max_memory=" << _severity_max_memory;
}

124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140

bool PerSystSqlOperator::execOnStart(){
	if( _backend == MARIADB ) {
		if(!_persystdb->initializeConnection(_conn.host, _conn.user, _conn.password, _conn.database_name, _conn.rotation, _conn.port, _conn.every_x_days)){
			LOG(error) << "Database not initialized";
			return false;
		}
	}
	return true;
}

void PerSystSqlOperator::execOnStop(){
	if( _backend == MARIADB ) {
		_persystdb->finalizeConnection();
	}
}

141
void PerSystSqlOperator::compute(U_Ptr unit, qeJobData& jobData) {
Carla Guillen's avatar
Carla Guillen committed
142
	// Clearing the buffer, if already allocated
143
	_buffer.clear();
Carla Guillen's avatar
Carla Guillen committed
144
145
146
	size_t elCtr = 0;
	uint64_t my_timestamp = getTimestamp() - _go_back_ns;
	// Making sure that the aggregation boundaries do not go past the job start/end time
147
148
	uint64_t jobEnd = jobData.endTime != 0 && my_timestamp > jobData.endTime ? jobData.endTime : my_timestamp;
	uint64_t jobStart =	jobEnd - my_timestamp < jobData.startTime ?	jobData.startTime : jobEnd - my_timestamp;
Carla Guillen's avatar
Carla Guillen committed
149
150
151
152
	// Job units are hierarchical, and thus we iterate over all sub-units associated to each single node
	for (const auto& subUnit : unit->getSubUnits()) {
		// Since we do not clear the internal buffer, all sensor readings will be accumulated in the same vector
		for (const auto& in : subUnit->getInputs()) {
153
154
155
156
			if( _scaling_factor == 1){
				SensorMetadata buffer;
				if(_queryEngine.queryMetadata(in->getName(), buffer)){
					_scaling_factor = buffer.scale;
Carla Guillen's avatar
Carla Guillen committed
157
					LOG(debug) << "PerSystSql Operator " << _name << " using scaling factor of " << _scaling_factor;
158
159
				}
			}
160
			if (!_queryEngine.querySensor(in->getName(), my_timestamp, my_timestamp, _buffer, false)) {
161
				//LOG(debug)<< "PerSystSql Operator " << _name << " cannot read from sensor " << in->getName() << "!";
Carla Guillen's avatar
Carla Guillen committed
162
163
164
			}
		}
	}
165
166
167
168

	if(_buffer.size() == 0){
		LOG(error) << "PerSystSql Operator " << _name << ": no data in queryEngine found!";
		return;
Carla Guillen's avatar
Carla Guillen committed
169
	}
170

Carla Guillen's avatar
Carla Guillen committed
171
	Aggregate_info_t agg_info;
172
173
174
	compute_internal(unit, _buffer, agg_info);

	if( _backend == MARIADB ) {
175
176
177
		if (!_persystdb->isInitialized()
				&& !_persystdb->initializeConnection(_conn.host, _conn.user, _conn.password, _conn.database_name, _conn.rotation,
						_conn.port, _conn.every_x_days)) {
178
			LOG(error) << "Database not initialized";
179
180
181
			return;
		}

182
		std::vector<std::string> job_ids = {jobData.jobId};
183
184

		std::map<std::string, std::string> job_map;
185
186
187
188
189
190
191
192
		std::string table_suffix;
		if(!_persystdb->getTableSuffix(table_suffix)){
			LOG(error) << "failed to create table!";
			return;
		}
		if(!_persystdb->getDBJobIDs(job_ids, job_map)){
			return;
		}
193

194

195
196
		// handle jobs which are not present
		for(auto &job_id_string : job_ids ){
197
198
199
       			auto search = job_map.find(job_id_string);
       			if(search == job_map.end()){ //Not found
          		 	int job_id_db;
200
	            	if(_persystdb->insertIntoJob(job_id_string, jobData.userId, job_id_db, table_suffix)){
201
           				agg_info.job_id_db = std::to_string(job_id_db);
202
203
        	    	} else {
        	    		LOG(error) << "Job insertion not possible, no job id db found for slurm job id" << job_id_string;
204
205
            			continue;
            		}
206
        		} else { //found
207
208
        			agg_info.job_id_db = search->second;
        		}
209
		}
210
		_persystdb->updateJobsLastSuffix(job_map, table_suffix);
Carla Guillen's avatar
Carla Guillen committed
211

212
		agg_info.timestamp = (my_timestamp/1e9);
213
		_persystdb->insertInAggregateTable(table_suffix, agg_info);
Carla Guillen's avatar
Carla Guillen committed
214
	}
215
216
}

217
218
void PerSystSqlOperator::convertToDoubles(std::vector<reading_t> &buffer, std::vector<double> &douBuffer){
	for(auto &reading: buffer){
219
		double value = reading.value * _scaling_factor;
220
221
222
223
224
		douBuffer.push_back(value);
	}
}


225
void PerSystSqlOperator::compute_internal(U_Ptr& unit, vector<reading_t>& buffer, Aggregate_info_t & agg_info) {
Carla Guillen's avatar
Carla Guillen committed
226
227
228
229
230
231
232
	_quantileSensors.clear();

	reading_t reading;
	AggregatorSensorBase::aggregationOps_t op;
	reading.timestamp = getTimestamp() - _go_back_ns;

	std::vector<double> douBuffer;
233
	convertToDoubles(buffer, douBuffer);
Carla Guillen's avatar
Carla Guillen committed
234
235
236
237
238
239
240
	// Performing the actual aggregation operation
	for (const auto& out : unit->getOutputs()) {
		op = out->getOperation();
		if (op != AggregatorSensorBase::QTL) {
			switch (op) {
			case AggregatorSensorBase::AVG:
				if (_backend == CASSANDRA) {
241
					reading.value = std::accumulate(douBuffer.begin(), douBuffer.end(), 0.0)/(douBuffer.size() * _scaling_factor);
Carla Guillen's avatar
Carla Guillen committed
242
				} else {
243
					agg_info.average = std::accumulate(douBuffer.begin(), douBuffer.end(), 0.0)/douBuffer.size();
Carla Guillen's avatar
Carla Guillen committed
244
245
246
247
248
249
250
251
				}
				break;
			case AggregatorSensorBase::OBS:
				reading.value = computeObs(buffer);
				agg_info.num_of_observations = computeObs(buffer);
				break;
			case AggregatorSensorBase::AVG_SEV:
				if (_backend == CASSANDRA) {
252
					reading.value = computeSeverityAverage(douBuffer) * SCALING_FACTOR_SEVERITY;
Carla Guillen's avatar
Carla Guillen committed
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
				} else {
					agg_info.severity_average = computeSeverityAverage(douBuffer);
				}
				break;
			default:
				LOG(warning)<< _name << ": Operation " << op << " not supported!";
				reading.value = 0;
				break;
			}
			if(_backend == CASSANDRA) {
				out->storeReading(reading);
			}
		} else {
			_quantileSensors.push_back(out);
		}
	}

	if (!_quantileSensors.empty()) {
		vector<double> quantiles;
		computeEvenQuantiles(douBuffer, _number_of_even_quantiles, quantiles);
		if (_backend == CASSANDRA) {
			for (unsigned idx = 0; idx < quantiles.size(); idx++) {
275
				reading.value = quantiles[idx] / _scaling_factor;
Carla Guillen's avatar
Carla Guillen committed
276
277
278
279
280
281
282
283
284
285
286
287
288
				_quantileSensors[idx]->storeReading(reading);
			}
		} else {
			for(auto q: quantiles){
				agg_info.quantiles.push_back(static_cast<float>(q));
			}
		}
	}
	agg_info.property_type_id = _property_id;

}

void PerSystSqlOperator::compute(U_Ptr unit) {
289
290
//nothing here!
}
Carla Guillen Carias's avatar
Carla Guillen Carias committed
291

Carla Guillen's avatar
Carla Guillen committed
292
double severity_formula1(double metric, double threshold, double exponent) {
293
	double val = metric - threshold;
Carla Guillen Carias's avatar
Carla Guillen Carias committed
294
	if (val > 0) {
295
		double ret = (pow(val, exponent));
Carla Guillen's avatar
Carla Guillen committed
296
		if (ret > 1) {
Carla Guillen Carias's avatar
Carla Guillen Carias committed
297
298
299
300
301
302
303
			return 1;
		}
		return ret;
	}
	return 0;
}

Carla Guillen's avatar
Carla Guillen committed
304
305
double severity_formula2(double metric, double threshold, double exponent) {
	if (!threshold) {
Carla Guillen Carias's avatar
Carla Guillen Carias committed
306
307
		return -1;
	}
308
	double val = metric / threshold - 1;
Carla Guillen Carias's avatar
Carla Guillen Carias committed
309
	if (val > 0) {
Carla Guillen's avatar
Carla Guillen committed
310
311
		double ret = (pow(val, exponent));
		if (ret > 1) {
Carla Guillen Carias's avatar
Carla Guillen Carias committed
312
313
314
315
316
317
318
			return 1;
		}
		return ret;
	}
	return 0;
}

Carla Guillen's avatar
Carla Guillen committed
319
double severity_formula3(double metric, double threshold, double exponent) {
Carla Guillen Carias's avatar
Carla Guillen Carias committed
320
321
322
	if (!threshold) {
		return -1;
	}
323
	double val = metric / threshold;
Carla Guillen Carias's avatar
Carla Guillen Carias committed
324
	if (val > 0) {
Carla Guillen's avatar
Carla Guillen committed
325
326
		double ret = (1 - pow(val, exponent));
		if (ret > 1) {
Carla Guillen Carias's avatar
Carla Guillen Carias committed
327
328
			return 1;
		}
Carla Guillen's avatar
Carla Guillen committed
329
		if (ret < 0) {
Carla Guillen Carias's avatar
Carla Guillen Carias committed
330
331
332
333
334
335
336
			return 0;
		}
		return ret;
	}
	return 0;
}

Carla Guillen's avatar
Carla Guillen committed
337
double severity_memory(double metric, double threshold, double max_memory) {
338
339
	double denominator = max_memory - threshold;
	double severity = -1;
Carla Guillen's avatar
Carla Guillen committed
340
341
342
	if (denominator) {
		severity = metric - threshold / (max_memory - threshold);
		if (severity > 1) {
Carla Guillen Carias's avatar
Carla Guillen Carias committed
343
			severity = 1;
Carla Guillen's avatar
Carla Guillen committed
344
		} else if (severity < 0) {
Carla Guillen Carias's avatar
Carla Guillen Carias committed
345
346
347
348
349
			severity = 0;
		}
	}
	return severity;
}
350

Carla Guillen's avatar
Carla Guillen committed
351
352
double PerSystSqlOperator::computeSeverityAverage(
		std::vector<double> & buffer) {
353
	std::vector<double> severities;
Carla Guillen's avatar
Carla Guillen committed
354
355
356
	switch (_severity_formula) {
	case (FORMULA1):
		for (auto val : buffer) {
357
			auto severity = severity_formula1(val, _severity_threshold, _severity_exponent);
Carla Guillen's avatar
Carla Guillen committed
358
359
			severities.push_back(severity);
		}
360
		break;
Carla Guillen's avatar
Carla Guillen committed
361
362
	case (FORMULA2):
		for (auto val : buffer) {
363
			auto severity = severity_formula2(val, _severity_threshold,	_severity_exponent);
Carla Guillen's avatar
Carla Guillen committed
364
365
			severities.push_back(severity);
		}
366
		break;
Carla Guillen's avatar
Carla Guillen committed
367
368
	case (FORMULA3):
		for (auto val : buffer) {
369
			auto severity = severity_formula3(val, _severity_threshold,	_severity_exponent);
Carla Guillen's avatar
Carla Guillen committed
370
371
			severities.push_back(severity);
		}
372
		break;
Carla Guillen's avatar
Carla Guillen committed
373
374
	case (MEMORY_FORMULA):
		for (auto val : buffer) {
375
			auto severity = severity_memory(val, _severity_threshold, _severity_max_memory);
Carla Guillen's avatar
Carla Guillen committed
376
377
			severities.push_back(severity);
		}
378
		break;
Carla Guillen's avatar
Carla Guillen committed
379
380
381
382
383
384
385
	case (NOFORMULA):
		for (auto val : buffer) {
			severities.push_back(severity_noformula());
		}
		break;
	default:
		return 0.0;
Carla Guillen Carias's avatar
Carla Guillen Carias committed
386
		break;
387
	}
Carla Guillen's avatar
Carla Guillen committed
388
389
390
	if (severities.size()) {
		return (std::accumulate(severities.begin(), severities.end(), 0.0)
				/ severities.size());
391
392
	}
	return 0.0;
393
}
394

395
void computeEvenQuantiles(std::vector<double> &data, const unsigned int NUMBER_QUANTILES, std::vector<double> &quantiles) {
Carla Guillen's avatar
Carla Guillen committed
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
	if (data.empty() || NUMBER_QUANTILES == 0) {
		return;
	}
	std::sort(data.begin(), data.end());
	int elementNumber = data.size();
	quantiles.resize(NUMBER_QUANTILES + 1); //+min
	double factor = elementNumber / static_cast<double>(NUMBER_QUANTILES);
	quantiles[0] = data[0]; //minimum
	quantiles[NUMBER_QUANTILES] = data[data.size() - 1]; //maximum
	for (unsigned int i = 1; i < NUMBER_QUANTILES; i++) {
		if (elementNumber > 1) {
			int idx = static_cast<int>(std::floor(i * factor));
			if (idx == 0) {
				quantiles[i] = data[0];
			} else {
				double rest = (i * factor) - idx;
412
				quantiles[i] = data[idx - 1] + rest * (data[idx] - data[idx - 1]);
Carla Guillen's avatar
Carla Guillen committed
413
414
415
416
417
			}
		} else { //optimization, we don't need to calculate all the quantiles
			quantiles[i] = data[0];
		}
	}
418
419
}