MariaDB.cpp 14.2 KB
Newer Older
1
//================================================================================
Carla Guillen Carias's avatar
Carla Guillen Carias committed
2
// Name        : MariaDB.cpp
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// Author      : Carla Guillen
// Contact     : info@dcdb.it
// Copyright   : Leibniz Supercomputing Centre
// Description : Template implementing features to use Units in Operators.
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2018-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================

#include <string>
#include <sstream>
#include <iostream>
#include <vector>
#include <map>
#include <sys/types.h>
#include <sys/types.h> // uid_t
35
36
37
38
#include "../../../common/include/timestamp.h"
#include "boost/date_time/gregorian/gregorian.hpp"
#include "boost/date_time/local_time/local_time.hpp"
#include "boost/date_time/posix_time/posix_time.hpp"
Carla Guillen Carias's avatar
Carla Guillen Carias committed
39
40
#include "MariaDB.h"
#include "MariaDB.h"
41

42
/**************SQLResult****************/
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63

class SQLResult {
private:
        MYSQL_RES * _result;
public:
        SQLResult(MYSQL * mysql){
                _result= mysql_store_result(mysql);
        }
        ~SQLResult(){
                mysql_free_result(_result);
        }

        MYSQL_RES * get(){
                return _result;
        }

        MYSQL_ROW fetch_row(){
                return mysql_fetch_row(_result);
        }
};

64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
/**************JobCache****************/

const std::string DELIMITER = "|";
std::string createIdJobCache(const std::string uid, int number_of_nodes, const std::string &job_id_string){
	std::stringstream id;
	id << job_id_string << DELIMITER << uid << DELIMITER << number_of_nodes;
	return id.str();
}

void JobCache::addJobToCache(const std::string uid, int number_of_nodes, const std::string &job_id_string, const std::string & job_id_db){
	//remove one element before inserting (the last condition (_jobCache.size() > JOB_CACHE_MAX_SIZE) shouldn't really happen...
	if(_jobCacheMap.size() == JOB_CACHE_MAX_SIZE || _jobCacheMap.size() > JOB_CACHE_MAX_SIZE){
		using MyPairType = std::pair<std::string, Job_info_t>;
		auto smallest = std::min_element(_jobCacheMap.begin(), _jobCacheMap.end(),
				[](const MyPairType& l, const MyPairType& r) -> bool {return l.second.last_seen_timestamp < r.second.last_seen_timestamp;});
		_jobCacheMap.erase(smallest);
	}
	Job_info_t ji;
	ji.job_id_db = job_id_db;
	ji.last_seen_timestamp = getTimestamp();
	_jobCacheMap[createIdJobCache(uid, number_of_nodes, job_id_string)] = ji;
}

Carla Guillen Carias's avatar
Bugfix    
Carla Guillen Carias committed
87
Job_info_t * JobCache::find(const std::string uid, int number_of_nodes, const std::string &job_id_string){
88
89
	auto found = _jobCacheMap.find(createIdJobCache(uid, number_of_nodes, job_id_string));
	if(found != _jobCacheMap.end()){ //found
Carla Guillen Carias's avatar
Bugfix    
Carla Guillen Carias committed
90
91
		found->second.last_seen_timestamp = getTimestamp();
		return &(found->second);
92
	}
Carla Guillen Carias's avatar
Bugfix    
Carla Guillen Carias committed
93
	return nullptr;
94
95
96
97
}

/**************MariaDB****************/

Carla Guillen Carias's avatar
Carla Guillen Carias committed
98
99
MariaDB * MariaDB::instance = nullptr;
std::mutex MariaDB::mut;
100
std::once_flag MariaDB::init_once;
101

Carla Guillen Carias's avatar
Carla Guillen Carias committed
102
void MariaDB::print_error(){
103
104
    	LOG(error) << "Error(" << mysql_errno(_mysql) << ") [" << mysql_sqlstate(_mysql) << "] \""<< mysql_error(_mysql) << "\"" ;
	if(mysql_errno(_mysql) == 2006){
105
106
		mysql_close(_mysql);
		_initialized=false;
107
	}
108
109
}

Carla Guillen Carias's avatar
Carla Guillen Carias committed
110
MariaDB::MariaDB(): _mysql(NULL), _rotation(EVERY_MONTH), _every_x_days(0), _end_aggregate_timestamp(0), _initialized(false) {
111
112
}

113
114
115
void MariaDB::initialize(){
	instance = new MariaDB();
}
116

117
118
119
MariaDB * MariaDB::getInstance(){
	std::call_once(init_once, &MariaDB::initialize);
	return instance;
120
121
}

Carla Guillen Carias's avatar
Carla Guillen Carias committed
122
bool MariaDB::initializeConnection(const std::string & host, const std::string & user,
123
124
125
126
127
128
129
130
131
132
133
	const std::string & password, const std::string & database_name,
	Rotation_t rotation, int port, unsigned int every_x_days) {
	std::lock_guard<std::mutex>  lock(mut);
	if (!_initialized) {
		_mysql = mysql_init(NULL);
		if(!mysql_real_connect(_mysql, host.c_str(), user.c_str(), password.c_str(), database_name.c_str(), port, NULL, 0)){
			print_error();
			return false;
		}
		LOG(debug) << "Successfully connected to mariadb";
		_initialized = true;
Carla Guillen's avatar
Carla Guillen committed
134
135
136
137
	}
	return true;
}

Carla Guillen Carias's avatar
Carla Guillen Carias committed
138
bool MariaDB::finalizeConnection(){
139
140
141
142
143
144
	std::lock_guard<std::mutex>  lock(mut);
	if(_initialized){
		mysql_close(_mysql);
		LOG(debug) << "Closed mariadb";
		_initialized = false;
	}
Carla Guillen's avatar
Carla Guillen committed
145
146
147
	return true;
}

148

Carla Guillen Carias's avatar
Carla Guillen Carias committed
149
MariaDB::~MariaDB(){
150

151
152
}

Carla Guillen's avatar
Carla Guillen committed
153
bool MariaDB::getDBJobID(const std::string & job_id_string, std::string& job_db_id, const std::string & user, int number_nodes, int batch_domain) {
154
	std::lock_guard<std::mutex>  lock(mut);
Carla Guillen Carias's avatar
Bugfix    
Carla Guillen Carias committed
155
156
	Job_info_t *job_info=_jobCache.find(user, number_nodes, job_id_string);
	if(job_info){ //found
157
158
		job_db_id = job_info->job_id_db;
		return true; //job found
159
	}
160

161
	std::stringstream build_query;
Carla Guillen Carias's avatar
Bugfix    
Carla Guillen Carias committed
162
	build_query	<< "SELECT job_id, job_id_string FROM Accounting WHERE job_id_string='" << job_id_string << "' AND user='" << user << "'";
Carla Guillen's avatar
Carla Guillen committed
163
	build_query << " AND nodes=" << number_nodes << " AND batch_domain=" << batch_domain;
164
165
166
167
168
169
	auto query = build_query.str();
	LOG(debug)<< query;
	if (mysql_real_query(_mysql, query.c_str(), query.size())) {
		print_error();
		return false;
	}
170

171
172
173
174
175
	SQLResult result(_mysql);
	if (result.get()) {
		MYSQL_ROW row;
		while ((row = result.fetch_row())) {
			if (row[0]) {
176
				job_db_id = row[0];
177
				std::string job_id_string = std::string(row[1]);
178
179
				_jobCache.addJobToCache(user, number_nodes, job_id_string, job_db_id);
				return true; //found
180
181
182
			}
		}
	}
183
	return false;
184
}
185

Carla Guillen Carias's avatar
Carla Guillen Carias committed
186
bool MariaDB::getCurrentSuffixAggregateTable(std::string & suffix){
187
188
	if(_end_aggregate_timestamp){
		auto now_uts = getTimestamp();
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
       	if(now_uts < _end_aggregate_timestamp) { //suffix found, don't do anything
       		suffix = _current_table_suffix;
            return true;
       	}
    }
    auto right_now = boost::posix_time::second_clock::local_time();
    auto date_time =  boost::posix_time::to_iso_extended_string(right_now);
    std::replace( date_time.begin(), date_time.end(), 'T', ' ');

    std::stringstream build_query;
    build_query << "SELECT suffix, UNIX_TIMESTAMP(end_timestamp) FROM SuffixToAggregateTable WHERE begin_timestamp < \'";
    build_query << date_time << "\' AND end_timestamp > \'" << date_time << "\'";
    auto query = build_query.str();
    LOG(debug) << query;

    if(mysql_real_query(_mysql, query.c_str(), query.size())){
    	print_error();
206
    	return false;
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
    }
    SQLResult result(_mysql);
    if(result.get()){
    	MYSQL_ROW row;
       	while((row = result.fetch_row())){
       		if(row[0] && row[1]){
       			suffix = std::string(row[0]);
                _current_table_suffix = suffix;
                std::string row1(row[1]);
                _end_aggregate_timestamp = std::stoull(row1) * 1e9;
                return true;
            } else {
            	return false;
            }
        }
    }
    return false;
224
225
}

226

Carla Guillen's avatar
Carla Guillen committed
227
bool MariaDB::insertIntoJob(const std::string& job_id_string, const std::string& uid, std::string & job_id_db, const std::string & suffix, int number_nodes, int batch_domain){
228
	std::lock_guard<std::mutex>  lock(mut);
229
	//maybe another thread did this for us
Carla Guillen Carias's avatar
Bugfix    
Carla Guillen Carias committed
230
231
	Job_info_t *job_info=_jobCache.find(uid,number_nodes, job_id_string);
	if(job_info){
232
		job_id_db = job_info->job_id_db;
233
234
235
		return true;
	}

236
	//Also check that job was not inserted by another collector (shouldn't really happen but "sicher ist sicher"
237
238
	std::stringstream build_query;
	build_query	<< "SELECT job_id, job_id_string FROM Accounting WHERE job_id_string ='";
Carla Guillen's avatar
Carla Guillen committed
239
	build_query << job_id_string << "' AND user='" << uid <<"' AND nodes=" << number_nodes << " AND batch_domain=" << batch_domain;
240
241
242
243
	auto select_query = build_query.str();
	LOG(debug) << select_query;

	if (mysql_real_query(_mysql, select_query.c_str(), select_query.size())) {
244
245
246
		print_error();
		return false;
	}
247
248
249
250
251
252
253

	bool job_found_in_db = false;
	SQLResult result(_mysql);
	if (result.get()) {
		MYSQL_ROW row;
		while ((row = result.fetch_row())) {
			if (row[0]) {
254
255
				job_id_db = row[0];
				_jobCache.addJobToCache(uid, number_nodes, job_id_string, job_id_db);
256
257
258
259
260
261
262
				job_found_in_db=true;
			}
		}
	}

	if(!job_found_in_db) {
		std::stringstream build_insert;
Carla Guillen's avatar
Carla Guillen committed
263
		build_insert << "INSERT IGNORE INTO Accounting (job_id_string, user, nodes, aggregate_first_suffix, aggregate_last_suffix, batch_domain) VALUES (\'" << job_id_string << "\',\'";
264
		build_insert << uid << "\',\'";
265
		build_insert << number_nodes << "\',\'";
Carla Guillen's avatar
Carla Guillen committed
266
267
		build_insert << suffix << "\',\'" << suffix << "\',\'";
		build_insert << batch_domain << "\')"; 
268
269
270
271
272
273
274
		std::string query = build_insert.str();
		LOG(debug)<< query;

		if (mysql_real_query(_mysql, query.c_str(), query.size())) {
			print_error();
			return false;
		}
275
		job_id_db = std::to_string(mysql_insert_id(_mysql));
276
	}
277
	return true;
278
279
280
}


Carla Guillen Carias's avatar
Carla Guillen Carias committed
281
bool MariaDB::insertInAggregateTable(const std::string& suffix, Aggregate_info_t & agg){
282
        std::stringstream build_insert;
Carla Guillen's avatar
Carla Guillen committed
283
284
        build_insert << "INSERT INTO Aggregate_" << suffix << " VALUES ( FROM_UNIXTIME(\'" << agg.timestamp;
        build_insert << "\'), \'" << agg.job_id_db;
285
286
287
288
289
290
291
292
        build_insert << "\', \'" << agg.property_type_id;
        build_insert << "\', \'" << agg.num_of_observations;
        build_insert << "\', \'" << agg.average;
        for(auto quant: agg.quantiles){
                build_insert << "\', \'" << quant;
        }
        build_insert << "\', \'" << agg.severity_average << "\')";
        std::string query = build_insert.str();
293
        LOG(debug) << query;
294

295
	std::lock_guard<std::mutex>  lock(mut);
296
297
298
299
300
301
302
        if(mysql_real_query(_mysql, query.c_str(), query.size())){
                print_error();
                return false;
        }
        return true;
}

Carla Guillen Carias's avatar
Carla Guillen Carias committed
303
bool MariaDB::createNewAggregate(std::string& new_suffix){
304
305
306
307
308
309
310
311
312
    std::string select = "SELECT suffix, end_timestamp FROM SuffixToAggregateTable ORDER BY end_timestamp DESC LIMIT 1";
    if (mysql_real_query(_mysql, select.c_str(), select.size())){
        print_error();
    }
    std::string last_suffix = "0";
    std::string end_timestamp = "";
    SQLResult result(_mysql);
    if(result.get()){
        MYSQL_ROW row;
313
        while ((row = result.fetch_row())){
314
315
316
317
318
319
320
321
322
                if(row[0]){
                        last_suffix = std::string(row[0]);
                }
                if(row[1]){
                        end_timestamp = std::string(row[1]);
                }
        }
    }
    if(end_timestamp.size() == 0){
323
324
325
        boost::gregorian::date today = boost::gregorian::day_clock::local_day();
        auto today_iso = to_iso_extended_string(today);
        end_timestamp = today_iso + " 00:00:00";
326
    }
327
    int new_suff = std::stoi(last_suffix) + 1;	
328
329
330
331
332
333
334
335

    std::string new_begin_timestamp, new_end_timestamp;
    getNewDates(end_timestamp, new_begin_timestamp, new_end_timestamp);

    std::stringstream build_insert;
    build_insert << "INSERT INTO SuffixToAggregateTable VALUES(\'" << new_suff;
    build_insert << "\', \'" << new_begin_timestamp << "\', \'" << new_end_timestamp << "\')";
    auto query = build_insert.str();
336
    LOG(debug) << query;
337

338
339
340
341
342
343
344
    if (mysql_real_query(_mysql, query.c_str(), query.size())){
        print_error();
        return false;
    }
    std::stringstream build_create;
    build_create << "CREATE TABLE Aggregate_" << new_suff << " LIKE Aggregate";
    auto query2 = build_create.str();
345
    LOG(debug) << query2;
346

347
    if(mysql_real_query(_mysql, query2.c_str(), query2.size() )){
Carla Guillen's avatar
Carla Guillen committed
348
        if(mysql_errno(_mysql) == 1050){
349
        	return true; //table exists!
350
        }
351
352
353
354
        print_error();
        return false;
    }
    new_suffix = std::to_string(new_suff);
355
    _current_table_suffix = new_suffix; 
356
357
358
359
    return true;
}


360
bool MariaDB::updateJobsLastSuffix(const std::string & job_id_string, const std::string & user, int number_nodes, const std::string& job_id_db, std::string & suffix){
361
	std::lock_guard<std::mutex> lock(mut);
Carla Guillen Carias's avatar
Bugfix    
Carla Guillen Carias committed
362
363
364
	Job_info_t* job_info = _jobCache.find(user, number_nodes, job_id_string);
	if(job_info){ //found
		if(job_info->job_current_table_suffix.empty() || job_info->job_current_table_suffix != suffix){
365
366
367
368
369
			job_info->job_current_table_suffix = suffix; //set new suffix
			//must update;
		} else {
			//no need to update in databse
			return true;
370
		}
371
	} //not found must update
372

373
	std::stringstream build_update;
374
	build_update << "UPDATE Accounting SET aggregate_last_suffix=\'" << suffix << "\' WHERE job_id=" << job_id_db;
375
376
	auto query = build_update.str();
	LOG(debug)<< query;
377

378
379
380
381
382
	if (mysql_real_query(_mysql, query.c_str(), query.size())) {
		print_error();
		return false;
	}
	return true;
383
384
385
}


Carla Guillen Carias's avatar
Carla Guillen Carias committed
386
bool MariaDB::getTableSuffix(std::string & table_suffix){
387
	std::lock_guard<std::mutex> lock(mut);
388
	if (!getCurrentSuffixAggregateTable(table_suffix) && !createNewAggregate(table_suffix)) {
389
390
391
		return false;
	}
	return true;
392
393
}

Carla Guillen Carias's avatar
Carla Guillen Carias committed
394
void MariaDB::getNewDates(const std::string& last_end_timestamp, std::string & begin_timestamp, std::string & end_timestamp){
Carla Guillen Carias's avatar
Carla Guillen Carias committed
395
396
397
398
	begin_timestamp = last_end_timestamp;
	std::tm tm = { };
	std::stringstream ss(last_end_timestamp);
	ss >> std::get_time(&tm, "%Y-%m-%d %H:%M:%S");
399
400
401

	boost::gregorian::date d = boost::gregorian::date_from_tm(tm);

Carla Guillen Carias's avatar
Carla Guillen Carias committed
402
	switch (_rotation) {
403
404
405
406
407
408
409
410
411
412
413
414
	case EVERY_YEAR:
		d += boost::gregorian::months(12);
		break;
	case EVERY_MONTH:
		d += boost::gregorian::months(1);
		break;
	case EVERY_XDAYS:
		d += boost::gregorian::days(_every_x_days);
		break;
	default:
		d += boost::gregorian::months(1);
		break;
Carla Guillen Carias's avatar
Carla Guillen Carias committed
415
416
	};

417
418
419
420
421
422
	boost::posix_time::ptime epoch(boost::gregorian::date(1970, 1, 1));
	boost::posix_time::ptime end_ts(d);
	boost::posix_time::time_duration diff = end_ts - epoch;

	_end_aggregate_timestamp = diff.total_seconds() * 1e9;
	end_timestamp = to_iso_extended_string(d) + " 00:00:00";
423
424
	LOG(debug) << "_end_aggregate_timestamp =" << _end_aggregate_timestamp;
	LOG(debug) << "end_timestamp =" << end_timestamp;
425
426
427
}