PerSystSqlOperator.cpp 12.9 KB
Newer Older
1
//================================================================================
2
// Name        : PerSystSqlOperator.cpp
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// Author      : Carla Guillen
// Contact     : info@dcdb.it
// Copyright   : Leibniz Supercomputing Centre
// Description : Template implementing features to use Units in Operators.
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2018-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================

28
29
#include "PerSystSqlOperator.h"

30
31
32
33
34
#include <boost/log/sources/record_ostream.hpp>
#include <boost/log/trivial.hpp>
#include <boost/log/utility/formatting_ostream.hpp>
#include <boost/parameter/keyword.hpp>
#include <stddef.h>
35
#include <cmath>
36
#include <cstdint>
37
#include <memory>
38
#include <string>
39
#include <numeric>
Carla Guillen's avatar
Carla Guillen committed
40
#include <sstream>
41
42

#include "../../../common/include/logging.h"
43
#include "../../../common/include/sensorbase.h"
44
45
46
#include "../../../common/include/timestamp.h"
#include "../../includes/CommonStatistics.h"
#include "../../includes/QueryEngine.h"
47
#include "../../includes/UnitTemplate.h"
48

Carla Guillen's avatar
Carla Guillen committed
49
int PerSystSqlOperator::_number_of_calls = 0;
50
51
bool PerSystSqlOperator::persystdb_initialized = false;
std::mutex PerSystSqlOperator::mut;
Carla Guillen's avatar
Carla Guillen committed
52

53
PerSystSqlOperator::PerSystSqlOperator(const std::string& name) :
Carla Guillen's avatar
Carla Guillen committed
54
55
56
		OperatorTemplate(name), JobOperatorTemplate(name), _number_of_even_quantiles(
				0), _severity_formula(NOFORMULA), _severity_threshold(0), _severity_exponent(
				0), _severity_max_memory(0), _go_back_ns(0), _backend(DEFAULT), _scaling_factor(
Carla Guillen's avatar
Carla Guillen committed
57
				1), _property_id(0) {
58
	_persystdb = PerSystDB::getInstance();
59
60
}

61
PerSystSqlOperator::~PerSystSqlOperator() {
62
63
}

64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95

PerSystSqlOperator::PerSystSqlOperator(const PerSystSqlOperator& other) : OperatorTemplate(other._name), JobOperatorTemplate(other._name){
    copy(other);
}

PerSystSqlOperator& PerSystSqlOperator::operator=(const PerSystSqlOperator& other){
	OperatorTemplate::operator=(other);
    JobOperatorTemplate::operator=(other);
    copy(other);
    return *this;
}

void PerSystSqlOperator::copy(const PerSystSqlOperator& other){
	this->_buffer = other._buffer;
	this->_quantileSensors = other._quantileSensors;
	this->_number_of_even_quantiles = other._number_of_even_quantiles;
	this->_severity_formula = other._severity_formula;
	this->_severity_threshold = other._severity_threshold;
	this->_severity_exponent = other._severity_exponent;
	this->_severity_max_memory = other._severity_max_memory;
	this->_severities = other._severities;
	this->_go_back_ns = other._go_back_ns;
	this->_backend = other._backend;
	this->_scaling_factor = other._scaling_factor;
	this->_conn.database_name = other._conn.database_name;
	this->_conn.every_x_days = other._conn.every_x_days;
	this->_conn.host = other._conn.host;
	this->_conn.password = other._conn.password;
	this->_conn.port = other._conn.port;
	this->_conn.rotation = other._conn.rotation;
	this->_conn.user = other._conn.user;
	this->_property_id = other._property_id;
96
	this->_persystdb = other._persystdb;
97
98
}

99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
void PerSystSqlOperator::printConfig(LOG_LEVEL ll) {
	LOG_VAR(ll) << "backend=" << _backend;
	LOG_VAR(ll) << "go_back_ms=" << _go_back_ns/1e6;
	if(_backend == MARIADB){
		LOG_VAR(ll) << "PerSystSQL Operator Connection information:";
		LOG_VAR(ll) << "\tHost=" << _conn.host;
		LOG_VAR(ll) << "\tUser=" << _conn.user;
		LOG_VAR(ll) << "\tDatabase=" << _conn.database_name;
		LOG_VAR(ll) << "\tPort=" << _conn.port;
		LOG_VAR(ll) << "\tRotation=" << _conn.rotation;
		LOG_VAR(ll) << "\tEvery_X_days=" << _conn.every_x_days;
	}
	LOG_VAR(ll) << "Property Configuration:";
	LOG_VAR(ll) << "\tnumber_of_even_quantiles=" << _number_of_even_quantiles;
	LOG_VAR(ll) << "\tproperty_id=" << _property_id;
	LOG_VAR(ll) << "Severity Configuration:";
	LOG_VAR(ll) << "\tseverity_formula=" << _severity_formula;
	LOG_VAR(ll) << "\tseverity_exponent=" << _severity_exponent;
	LOG_VAR(ll) << "\tseverity_threshold=" << _severity_threshold;
	LOG_VAR(ll) << "\tseverity_max_memory=" << _severity_max_memory;
}

121
void PerSystSqlOperator::compute(U_Ptr unit, qeJobData& jobData) {
Carla Guillen's avatar
Carla Guillen committed
122
	// Clearing the buffer, if already allocated
123
	_buffer.clear();
Carla Guillen's avatar
Carla Guillen committed
124
125
126
	size_t elCtr = 0;
	uint64_t my_timestamp = getTimestamp() - _go_back_ns;
	// Making sure that the aggregation boundaries do not go past the job start/end time
127
128
	uint64_t jobEnd = jobData.endTime != 0 && my_timestamp > jobData.endTime ? jobData.endTime : my_timestamp;
	uint64_t jobStart =	jobEnd - my_timestamp < jobData.startTime ?	jobData.startTime : jobEnd - my_timestamp;
Carla Guillen's avatar
Carla Guillen committed
129
130
131
132
	// Job units are hierarchical, and thus we iterate over all sub-units associated to each single node
	for (const auto& subUnit : unit->getSubUnits()) {
		// Since we do not clear the internal buffer, all sensor readings will be accumulated in the same vector
		for (const auto& in : subUnit->getInputs()) {
133
134
135
136
137
138
			if( _scaling_factor == 1){
				SensorMetadata buffer;
				if(_queryEngine.queryMetadata(in->getName(), buffer)){
					_scaling_factor = buffer.scale;
				}
			}
139
			if (!_queryEngine.querySensor(in->getName(), my_timestamp, my_timestamp, _buffer, false)) {
Carla Guillen's avatar
Carla Guillen committed
140
141
142
143
				LOG(debug)<< "PerSystSql Operator " << _name << " cannot read from sensor " << in->getName() << "!";
			}
		}
	}
144
145
146
147

	if(_buffer.size() == 0){
		LOG(error) << "PerSystSql Operator " << _name << ": no data in queryEngine found!";
		return;
Carla Guillen's avatar
Carla Guillen committed
148
	}
149

Carla Guillen's avatar
Carla Guillen committed
150
	Aggregate_info_t agg_info;
151
152
153
154
155
156
157
158
159
160
161
	compute_internal(unit, _buffer, agg_info);

	if( _backend == MARIADB ) {
		std::lock_guard<std::mutex> lk(mut);
		if (!persystdb_initialized) {
			bool persystdb_initialized = _persystdb->initializeConnection(_conn.host, _conn.user, _conn.password, _conn.database_name, _conn.rotation, _conn.port, _conn.every_x_days);
			if(!persystdb_initialized) {
				LOG(error) << "Unable to establish connection to database";
				return;
			}
		}
162
163
164
165
166
167
168
		std::stringstream jobidBuilder;
		jobidBuilder << jobData.jobId;

		std::vector<std::string> job_ids;
		job_ids.push_back(jobidBuilder.str());

		std::map<std::string, std::string> job_map;
169
170
171
172
173
174
175
176
		std::string table_suffix;
		if(!_persystdb->getTableSuffix(table_suffix)){
			LOG(error) << "failed to create table!";
			return;
		}
		if(!_persystdb->getDBJobIDs(job_ids, job_map)){
			return;
		}
177

178
179
		// handle jobs which are not present
		for(auto &job_id_string : job_ids ){
180
181
182
183
184
185
186
187
188
       			auto search = job_map.find(job_id_string);
       			if(search == job_map.end()){ //Not found
          		 	int job_id_db;
	            		if(_persystdb->insertIntoJob(job_id_string, jobData.userId, job_id_db, table_suffix)){
           				agg_info.job_id_db = std::to_string(job_id_db);
        	    		} else {
            		 		continue;
            			}
        		}
189
190
		}
		agg_info.timestamp = (my_timestamp/1e9);
Carla Guillen's avatar
Carla Guillen committed
191

192
		_persystdb->insertInAggregateTable(table_suffix, agg_info);
193
		if(_number_of_calls % 10 == 0  && persystdb_initialized){
194
			_persystdb->finalizeConnection();
195
196
197
			persystdb_initialized = false;
		}
		_number_of_calls++;
Carla Guillen's avatar
Carla Guillen committed
198
	}
199
200
}

201
202
void PerSystSqlOperator::convertToDoubles(std::vector<reading_t> &buffer, std::vector<double> &douBuffer){
	for(auto &reading: buffer){
203
		double value = reading.value * _scaling_factor;
204
205
206
207
208
		douBuffer.push_back(value);
	}
}


209
void PerSystSqlOperator::compute_internal(U_Ptr& unit, vector<reading_t>& buffer, Aggregate_info_t & agg_info) {
Carla Guillen's avatar
Carla Guillen committed
210
211
212
213
214
215
216
	_quantileSensors.clear();

	reading_t reading;
	AggregatorSensorBase::aggregationOps_t op;
	reading.timestamp = getTimestamp() - _go_back_ns;

	std::vector<double> douBuffer;
217
	convertToDoubles(buffer, douBuffer);
Carla Guillen's avatar
Carla Guillen committed
218
219
220
221
222
223
224
	// Performing the actual aggregation operation
	for (const auto& out : unit->getOutputs()) {
		op = out->getOperation();
		if (op != AggregatorSensorBase::QTL) {
			switch (op) {
			case AggregatorSensorBase::AVG:
				if (_backend == CASSANDRA) {
225
					reading.value = std::accumulate(douBuffer.begin(), douBuffer.end(), 0.0)/(douBuffer.size() * _scaling_factor);
Carla Guillen's avatar
Carla Guillen committed
226
				} else {
227
					agg_info.average = std::accumulate(douBuffer.begin(), douBuffer.end(), 0.0)/douBuffer.size();
Carla Guillen's avatar
Carla Guillen committed
228
229
230
231
232
233
234
235
				}
				break;
			case AggregatorSensorBase::OBS:
				reading.value = computeObs(buffer);
				agg_info.num_of_observations = computeObs(buffer);
				break;
			case AggregatorSensorBase::AVG_SEV:
				if (_backend == CASSANDRA) {
236
					reading.value = computeSeverityAverage(douBuffer) * SCALING_FACTOR_SEVERITY;
Carla Guillen's avatar
Carla Guillen committed
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
				} else {
					agg_info.severity_average = computeSeverityAverage(douBuffer);
				}
				break;
			default:
				LOG(warning)<< _name << ": Operation " << op << " not supported!";
				reading.value = 0;
				break;
			}
			if(_backend == CASSANDRA) {
				out->storeReading(reading);
			}
		} else {
			_quantileSensors.push_back(out);
		}
	}

	if (!_quantileSensors.empty()) {
		vector<double> quantiles;
		computeEvenQuantiles(douBuffer, _number_of_even_quantiles, quantiles);
		if (_backend == CASSANDRA) {
			for (unsigned idx = 0; idx < quantiles.size(); idx++) {
259
				reading.value = quantiles[idx] / _scaling_factor;
Carla Guillen's avatar
Carla Guillen committed
260
261
262
263
264
265
266
267
268
269
270
271
272
				_quantileSensors[idx]->storeReading(reading);
			}
		} else {
			for(auto q: quantiles){
				agg_info.quantiles.push_back(static_cast<float>(q));
			}
		}
	}
	agg_info.property_type_id = _property_id;

}

void PerSystSqlOperator::compute(U_Ptr unit) {
273
274
//nothing here!
}
Carla Guillen Carias's avatar
Carla Guillen Carias committed
275

Carla Guillen's avatar
Carla Guillen committed
276
double severity_formula1(double metric, double threshold, double exponent) {
277
	double val = metric - threshold;
Carla Guillen Carias's avatar
Carla Guillen Carias committed
278
	if (val > 0) {
279
		double ret = (pow(val, exponent));
Carla Guillen's avatar
Carla Guillen committed
280
		if (ret > 1) {
Carla Guillen Carias's avatar
Carla Guillen Carias committed
281
282
283
284
285
286
287
			return 1;
		}
		return ret;
	}
	return 0;
}

Carla Guillen's avatar
Carla Guillen committed
288
289
double severity_formula2(double metric, double threshold, double exponent) {
	if (!threshold) {
Carla Guillen Carias's avatar
Carla Guillen Carias committed
290
291
		return -1;
	}
292
	double val = metric / threshold - 1;
Carla Guillen Carias's avatar
Carla Guillen Carias committed
293
	if (val > 0) {
Carla Guillen's avatar
Carla Guillen committed
294
295
		double ret = (pow(val, exponent));
		if (ret > 1) {
Carla Guillen Carias's avatar
Carla Guillen Carias committed
296
297
298
299
300
301
302
			return 1;
		}
		return ret;
	}
	return 0;
}

Carla Guillen's avatar
Carla Guillen committed
303
double severity_formula3(double metric, double threshold, double exponent) {
Carla Guillen Carias's avatar
Carla Guillen Carias committed
304
305
306
	if (!threshold) {
		return -1;
	}
307
	double val = metric / threshold;
Carla Guillen Carias's avatar
Carla Guillen Carias committed
308
	if (val > 0) {
Carla Guillen's avatar
Carla Guillen committed
309
310
		double ret = (1 - pow(val, exponent));
		if (ret > 1) {
Carla Guillen Carias's avatar
Carla Guillen Carias committed
311
312
			return 1;
		}
Carla Guillen's avatar
Carla Guillen committed
313
		if (ret < 0) {
Carla Guillen Carias's avatar
Carla Guillen Carias committed
314
315
316
317
318
319
320
			return 0;
		}
		return ret;
	}
	return 0;
}

Carla Guillen's avatar
Carla Guillen committed
321
double severity_memory(double metric, double threshold, double max_memory) {
322
323
	double denominator = max_memory - threshold;
	double severity = -1;
Carla Guillen's avatar
Carla Guillen committed
324
325
326
	if (denominator) {
		severity = metric - threshold / (max_memory - threshold);
		if (severity > 1) {
Carla Guillen Carias's avatar
Carla Guillen Carias committed
327
			severity = 1;
Carla Guillen's avatar
Carla Guillen committed
328
		} else if (severity < 0) {
Carla Guillen Carias's avatar
Carla Guillen Carias committed
329
330
331
332
333
			severity = 0;
		}
	}
	return severity;
}
334

Carla Guillen's avatar
Carla Guillen committed
335
336
double PerSystSqlOperator::computeSeverityAverage(
		std::vector<double> & buffer) {
337
	std::vector<double> severities;
Carla Guillen's avatar
Carla Guillen committed
338
339
340
	switch (_severity_formula) {
	case (FORMULA1):
		for (auto val : buffer) {
341
			auto severity = severity_formula1(val, _severity_threshold, _severity_exponent);
Carla Guillen's avatar
Carla Guillen committed
342
343
			severities.push_back(severity);
		}
344
		break;
Carla Guillen's avatar
Carla Guillen committed
345
346
	case (FORMULA2):
		for (auto val : buffer) {
347
			auto severity = severity_formula2(val, _severity_threshold,	_severity_exponent);
Carla Guillen's avatar
Carla Guillen committed
348
349
			severities.push_back(severity);
		}
350
		break;
Carla Guillen's avatar
Carla Guillen committed
351
352
	case (FORMULA3):
		for (auto val : buffer) {
353
			auto severity = severity_formula3(val, _severity_threshold,	_severity_exponent);
Carla Guillen's avatar
Carla Guillen committed
354
355
			severities.push_back(severity);
		}
356
		break;
Carla Guillen's avatar
Carla Guillen committed
357
358
	case (MEMORY_FORMULA):
		for (auto val : buffer) {
359
			auto severity = severity_memory(val, _severity_threshold, _severity_max_memory);
Carla Guillen's avatar
Carla Guillen committed
360
361
			severities.push_back(severity);
		}
362
		break;
Carla Guillen's avatar
Carla Guillen committed
363
364
365
366
367
368
369
	case (NOFORMULA):
		for (auto val : buffer) {
			severities.push_back(severity_noformula());
		}
		break;
	default:
		return 0.0;
Carla Guillen Carias's avatar
Carla Guillen Carias committed
370
		break;
371
	}
Carla Guillen's avatar
Carla Guillen committed
372
373
374
	if (severities.size()) {
		return (std::accumulate(severities.begin(), severities.end(), 0.0)
				/ severities.size());
375
376
	}
	return 0.0;
377
}
378

379
void computeEvenQuantiles(std::vector<double> &data, const unsigned int NUMBER_QUANTILES, std::vector<double> &quantiles) {
Carla Guillen's avatar
Carla Guillen committed
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
	if (data.empty() || NUMBER_QUANTILES == 0) {
		return;
	}
	std::sort(data.begin(), data.end());
	int elementNumber = data.size();
	quantiles.resize(NUMBER_QUANTILES + 1); //+min
	double factor = elementNumber / static_cast<double>(NUMBER_QUANTILES);
	quantiles[0] = data[0]; //minimum
	quantiles[NUMBER_QUANTILES] = data[data.size() - 1]; //maximum
	for (unsigned int i = 1; i < NUMBER_QUANTILES; i++) {
		if (elementNumber > 1) {
			int idx = static_cast<int>(std::floor(i * factor));
			if (idx == 0) {
				quantiles[i] = data[0];
			} else {
				double rest = (i * factor) - idx;
396
				quantiles[i] = data[idx - 1] + rest * (data[idx] - data[idx - 1]);
Carla Guillen's avatar
Carla Guillen committed
397
398
399
400
401
			}
		} else { //optimization, we don't need to calculate all the quantiles
			quantiles[i] = data[0];
		}
	}
402
403
}