PerSystDB.cpp 12.6 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
//================================================================================
// Name        : PerSystDB.cpp
// Author      : Carla Guillen
// Contact     : info@dcdb.it
// Copyright   : Leibniz Supercomputing Centre
// Description : Template implementing features to use Units in Operators.
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2018-2019 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================

#include "PerSystDB.h"

#include <string>
#include <sstream>
#include <iostream>
#include <vector>
#include <map>
#include <sys/types.h>
#include <sys/types.h> // uid_t
#include <pwd.h> // getpwuid
38
#include "../../../common/include/timestamp.h"
39
40
#include "PerSystDB.h"

41
42
43
44
45
#include "boost/date_time/gregorian/gregorian.hpp"
#include "boost/date_time/local_time/local_time.hpp"
#include "boost/date_time/posix_time/posix_time.hpp"


46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66

class SQLResult {
private:
        MYSQL_RES * _result;
public:
        SQLResult(MYSQL * mysql){
                _result= mysql_store_result(mysql);
        }
        ~SQLResult(){
                mysql_free_result(_result);
        }

        MYSQL_RES * get(){
                return _result;
        }

        MYSQL_ROW fetch_row(){
                return mysql_fetch_row(_result);
        }
};

67
68
69
PerSystDB * PerSystDB::instance = nullptr;
std::mutex PerSystDB::mut;

70
void PerSystDB::print_error(){
71
72
    	LOG(error) << "Error(" << mysql_errno(_mysql) << ") [" << mysql_sqlstate(_mysql) << "] \""<< mysql_error(_mysql) << "\"" ;
	if(mysql_errno(_mysql) == 2006){
73
74
		mysql_close(_mysql);
		_initialized=false;
75
	}
76
77
}

78
PerSystDB::PerSystDB(): _mysql(NULL), _rotation(EVERY_MONTH), _every_x_days(0), _end_aggregate_timestamp(0), _initialized(false) {
79
80
81
82
83
}

PerSystDB * PerSystDB::getInstance(){
		// no lock here
		if (instance) return instance;
84

85
86
87
88
		std::lock_guard<std::mutex>  lock(mut);
		if (instance) return instance;

		return instance = new PerSystDB();
89
90
}

Carla Guillen's avatar
Carla Guillen committed
91
bool PerSystDB::initializeConnection(const std::string & host, const std::string & user,
92
93
94
95
96
97
98
99
100
101
102
	const std::string & password, const std::string & database_name,
	Rotation_t rotation, int port, unsigned int every_x_days) {
	std::lock_guard<std::mutex>  lock(mut);
	if (!_initialized) {
		_mysql = mysql_init(NULL);
		if(!mysql_real_connect(_mysql, host.c_str(), user.c_str(), password.c_str(), database_name.c_str(), port, NULL, 0)){
			print_error();
			return false;
		}
		LOG(debug) << "Successfully connected to mariadb";
		_initialized = true;
Carla Guillen's avatar
Carla Guillen committed
103
104
105
106
107
	}
	return true;
}

bool PerSystDB::finalizeConnection(){
108
109
110
111
112
113
	std::lock_guard<std::mutex>  lock(mut);
	if(_initialized){
		mysql_close(_mysql);
		LOG(debug) << "Closed mariadb";
		_initialized = false;
	}
Carla Guillen's avatar
Carla Guillen committed
114
115
116
	return true;
}

117

118
PerSystDB::~PerSystDB(){
119

120
121
}

122
bool PerSystDB::getDBJobIDs(std::vector<std::string> & job_id_strings,	std::map<std::string, std::string>& job_id_map) {
123
	std::lock_guard<std::mutex>  lock(mut);
124
125
126
127
128
129
130
131
132
133
134
135
136
	std::vector<std::string> notfound;
	for(auto & job_id_str: job_id_strings){
		auto found = _jobCache.find(job_id_str);
		if(found != _jobCache.end()){
			job_id_map[job_id_str] = found->second.job_id_db;
			found->second.last_seen_timestamp = getTimestamp();
		} else {
			notfound.push_back(job_id_str);
		}
	}
	if(!notfound.size()){ //every job was found
		return true;
	}
137
138
	std::stringstream build_query;
	build_query	<< "SELECT job_id, job_id_string FROM Accounting WHERE job_id_string IN (";
139
140
141
	for (std::vector<std::string>::size_type i = 0; i < notfound.size(); ++i) {
		build_query << "'" << notfound[i] << "'";
		if (i != notfound.size() - 1) { //not last element
142
143
144
145
146
147
148
149
150
151
			build_query << ",";
		}
	}
	build_query << ")";
	auto query = build_query.str();
	LOG(debug)<< query;
	if (mysql_real_query(_mysql, query.c_str(), query.size())) {
		print_error();
		return false;
	}
152

153
154
155
156
157
	SQLResult result(_mysql);
	if (result.get()) {
		MYSQL_ROW row;
		while ((row = result.fetch_row())) {
			if (row[0]) {
158
159
160
161
				std::string job_id_db = row[0];
				std::string job_id_string = std::string(row[1]);
				job_id_map[job_id_string] = job_id_db;
				addJobToCache(job_id_string, job_id_db);
162
163
164
165
			}
		}
	}
	return true;
166
167
}

168
169
170
171
172
173
174
175
176
177
178
179
void PerSystDB::addJobToCache(std::string &job_id_string, std::string & job_id_db){
	if(_jobCache.size() == JOB_CACHE_MAX_SIZE){ //remove one element before inserting
		using MyPairType = std::pair<std::string, PerSystDB::Job_info_t>;
		auto smallest = std::min_element(_jobCache.begin(), _jobCache.end(),
				[](const MyPairType& l, const MyPairType& r) -> bool {return l.second.last_seen_timestamp < r.second.last_seen_timestamp;});
		_jobCache.erase(smallest);
	}
	Job_info_t ji;
	ji.job_id_db = job_id_db;
	ji.last_seen_timestamp = getTimestamp();
	_jobCache[job_id_string] = ji;
}
180
181

bool PerSystDB::getCurrentSuffixAggregateTable(std::string & suffix){
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
	if(_end_aggregate_timestamp){
		auto now_uts = getTimestamp();
        	if(now_uts < _end_aggregate_timestamp) { //suffix found, don't do anything
			suffix = _current_table_suffix;
                	return true;
        	}
    	}
    	auto right_now = boost::posix_time::second_clock::local_time();
    	auto date_time =  boost::posix_time::to_iso_extended_string(right_now);
    	std::replace( date_time.begin(), date_time.end(), 'T', ' ');

    	std::stringstream build_query;
    	build_query << "SELECT suffix, UNIX_TIMESTAMP(end_timestamp) FROM SuffixToAggregateTable WHERE begin_timestamp < \'";
    	build_query << date_time << "\' AND end_timestamp > \'" << date_time << "\'";
    	auto query = build_query.str();
    	LOG(debug) << query;

    	if(mysql_real_query(_mysql, query.c_str(), query.size())){
        	print_error();
        	return false;
    	}
    	SQLResult result(_mysql);
    	if(result.get()){
       	 	MYSQL_ROW row;
       		 while((row = result.fetch_row())){
            		if(row[0] && row[1]){
                		suffix = std::string(row[0]);
				_current_table_suffix = suffix;
                		std::string row1(row[1]);
                		_end_aggregate_timestamp = std::stoull(row1) * 1e9;
                		return true;
            		} else {
                		return false;
            		}
        	}
    	}
    	return false;
219
220
}

221

222
bool PerSystDB::insertIntoJob(const std::string& job_id_string, const std::string& uid, int & job_id_db, const std::string & suffix){
223
	std::lock_guard<std::mutex>  lock(mut);
224
225
	std::stringstream build_insert;
	build_insert << "INSERT INTO Accounting (job_id_string, user, aggregate_first_suffix, aggregate_last_suffix) VALUES (\'" << job_id_string << "\',\'";
226
	auto* pass = getpwuid(static_cast<uid_t>(std::stoull(uid)));
227
228
229
230
231
232
233
234
	if (pass == nullptr) {
		LOG(error)<< "User " << uid << " not found in system.";
		return false;
	}
	build_insert << pass->pw_name << "\',\'";
	build_insert << suffix << "\',\'" << suffix << "\')";
	std::string query = build_insert.str();
	LOG(debug)<< query;
235

236
237
238
239
240
241
	if (mysql_real_query(_mysql, query.c_str(), query.size())) {
		print_error();
		return false;
	}
	job_id_db = mysql_insert_id(_mysql);
	return true;
242
243
244
245
}


bool PerSystDB::insertInAggregateTable(const std::string& suffix, Aggregate_info_t & agg){
246
	std::lock_guard<std::mutex>  lock(mut);
247
        std::stringstream build_insert;
Carla Guillen's avatar
Carla Guillen committed
248
249
        build_insert << "INSERT INTO Aggregate_" << suffix << " VALUES ( FROM_UNIXTIME(\'" << agg.timestamp;
        build_insert << "\'), \'" << agg.job_id_db;
250
251
252
253
254
255
256
257
        build_insert << "\', \'" << agg.property_type_id;
        build_insert << "\', \'" << agg.num_of_observations;
        build_insert << "\', \'" << agg.average;
        for(auto quant: agg.quantiles){
                build_insert << "\', \'" << quant;
        }
        build_insert << "\', \'" << agg.severity_average << "\')";
        std::string query = build_insert.str();
258
        LOG(debug) << query;
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276

        if(mysql_real_query(_mysql, query.c_str(), query.size())){
                print_error();
                return false;
        }
        return true;
}

bool PerSystDB::createNewAggregate(std::string& new_suffix){
    std::string select = "SELECT suffix, end_timestamp FROM SuffixToAggregateTable ORDER BY end_timestamp DESC LIMIT 1";
    if (mysql_real_query(_mysql, select.c_str(), select.size())){
        print_error();
    }
    std::string last_suffix = "0";
    std::string end_timestamp = "";
    SQLResult result(_mysql);
    if(result.get()){
        MYSQL_ROW row;
277
        while ((row = result.fetch_row())){
278
279
280
281
282
283
284
285
286
                if(row[0]){
                        last_suffix = std::string(row[0]);
                }
                if(row[1]){
                        end_timestamp = std::string(row[1]);
                }
        }
    }
    if(end_timestamp.size() == 0){
287
288
289
        boost::gregorian::date today = boost::gregorian::day_clock::local_day();
        auto today_iso = to_iso_extended_string(today);
        end_timestamp = today_iso + " 00:00:00";
290
    }
291
    int new_suff = std::stoi(last_suffix) + 1;	
292
293
294
295
296
297
298
299

    std::string new_begin_timestamp, new_end_timestamp;
    getNewDates(end_timestamp, new_begin_timestamp, new_end_timestamp);

    std::stringstream build_insert;
    build_insert << "INSERT INTO SuffixToAggregateTable VALUES(\'" << new_suff;
    build_insert << "\', \'" << new_begin_timestamp << "\', \'" << new_end_timestamp << "\')";
    auto query = build_insert.str();
300
    LOG(debug) << query;
301
302
303
304
305
306
307
308
    if (mysql_real_query(_mysql, query.c_str(), query.size())){
        print_error();
        return false;
    }

    std::stringstream build_create;
    build_create << "CREATE TABLE Aggregate_" << new_suff << " LIKE Aggregate";
    auto query2 = build_create.str();
309
    LOG(debug) << query2;
310
    if(mysql_real_query(_mysql, query2.c_str(), query2.size() )){
Carla Guillen's avatar
Carla Guillen committed
311
312
313
        if(mysql_errno(_mysql) == 1050){
		return true; //table exists!
	}
314
315
316
317
        print_error();
        return false;
    }
    new_suffix = std::to_string(new_suff);
318
    _current_table_suffix = new_suffix; 
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
    return true;
}


bool PerSystDB::updateJobsLastSuffix(std::map<std::string, std::string>& job_map, std::string & suffix){
        std::stringstream build_update;
        build_update << "UPDATE Accounting SET aggregate_last_suffix=\'" << suffix << "\' WHERE job_id IN (";
        unsigned int count = 0;
        for(auto & kv: job_map){
                build_update << kv.second;
                if(count + 1 != job_map.size() ){
                        build_update << ",";
                } else {
                        build_update << ")";
                }
                count++;
        }
        auto query = build_update.str();
337
        LOG(debug) << query;
338
339
340

        if (mysql_real_query(_mysql, query.c_str(), query.size())){
                print_error();
341
                return false;
342
        }
343
        return true;
344
345
346
347
}


bool PerSystDB::getTableSuffix(std::string & table_suffix){
348
349
350
351
352
	std::lock_guard<std::mutex>  lock(mut);
    	if(!getCurrentSuffixAggregateTable(table_suffix) && !createNewAggregate(table_suffix)){
            	return false;
    	}
    	return true;
353
354
}

355
void PerSystDB::getNewDates(const std::string& last_end_timestamp, std::string & begin_timestamp, std::string & end_timestamp){
356

Carla Guillen Carias's avatar
Carla Guillen Carias committed
357
358
359
360
	begin_timestamp = last_end_timestamp;
	std::tm tm = { };
	std::stringstream ss(last_end_timestamp);
	ss >> std::get_time(&tm, "%Y-%m-%d %H:%M:%S");
361
362
363

	boost::gregorian::date d = boost::gregorian::date_from_tm(tm);

Carla Guillen Carias's avatar
Carla Guillen Carias committed
364
	switch (_rotation) {
365
366
367
368
369
370
371
372
373
374
375
376
	case EVERY_YEAR:
		d += boost::gregorian::months(12);
		break;
	case EVERY_MONTH:
		d += boost::gregorian::months(1);
		break;
	case EVERY_XDAYS:
		d += boost::gregorian::days(_every_x_days);
		break;
	default:
		d += boost::gregorian::months(1);
		break;
Carla Guillen Carias's avatar
Carla Guillen Carias committed
377
378
	};

379
380
381
382
383
384
	boost::posix_time::ptime epoch(boost::gregorian::date(1970, 1, 1));
	boost::posix_time::ptime end_ts(d);
	boost::posix_time::time_duration diff = end_ts - epoch;

	_end_aggregate_timestamp = diff.total_seconds() * 1e9;
	end_timestamp = to_iso_extended_string(d) + " 00:00:00";
385
386
	LOG(debug) << "_end_aggregate_timestamp =" << _end_aggregate_timestamp;
	LOG(debug) << "end_timestamp =" << end_timestamp;
387
388
389
}