Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

query.cpp 12 KB
Newer Older
Axel Auweter's avatar
Axel Auweter committed
1
2
//================================================================================
// Name        : query.cpp
3
// Author      : Axel Auweter, Daniele Tafani
Micha Müller's avatar
Micha Müller committed
4
// Contact     : info@dcdb.it
Axel Auweter's avatar
Axel Auweter committed
5
6
7
8
9
10
// Copyright   : Leibniz Supercomputing Centre
// Description : Implementation of query class of dcdbquery
//================================================================================

//================================================================================
// This file is part of DCDB (DataCenter DataBase)
11
// Copyright (C) 2011-2019 Leibniz Supercomputing Centre
Axel Auweter's avatar
Axel Auweter committed
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA.
//================================================================================
Axel Auweter's avatar
Axel Auweter committed
27
28
29
30
31
32
33

#include <iostream>
#include <list>
#include <string>
#include <algorithm>

#include <cstdlib>
34
#include <cinttypes>
Axel Auweter's avatar
Axel Auweter committed
35

36
#include <boost/algorithm/string.hpp>
37
#include <boost/regex.hpp>
38
39

#include "dcdbendian.h"
Axel Auweter's avatar
Axel Auweter committed
40
#include "query.h"
Daniele Tafani's avatar
Daniele Tafani committed
41
#include "dcdb/sensoroperations.h"
42
43
#include <dcdb/jobdatastore.h>

Axel Auweter's avatar
Axel Auweter committed
44

Axel Auweter's avatar
Axel Auweter committed
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
void DCDBQuery::setLocalTimeEnabled(bool enable) {
  useLocalTime = enable;
}

bool DCDBQuery::getLocalTimeEnabled() {
  return useLocalTime;
}

void DCDBQuery::setRawOutputEnabled(bool enable) {
  useRawOutput = enable;
}

bool DCDBQuery::getRawOutputEnabled() {
  return useRawOutput;
}

61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
int DCDBQuery::connect(const char* hostname) {
    if (connection != nullptr) {
	return 0;
    }
    connection = new DCDB::Connection();
    connection->setHostname(hostname);
    if (!connection->connect()) {
	std::cout << "Cannot connect to database." << std::endl;
	return 1;
    }
    return 0;
}

void DCDBQuery::disconnect() {
    connection->disconnect();
    delete connection;
    connection = nullptr;
}

80
81
82
83
bool scaleAndConvert(int64_t &value, double baseScalingFactor, double scalingFactor, DCDB::Unit baseUnit, DCDB::Unit unit) {
    if(scalingFactor != 1.0 || baseScalingFactor != 1.0) {
	if( DCDB::scale(&value, scalingFactor, baseScalingFactor) == DCDB::DCDB_OP_OVERFLOW)
	    return false;
84
85
    }
    
86
87
88
89
90
91
92
93
94
    /* Convert the unit if requested */
    if ((unit != DCDB::Unit_None) && (unit != baseUnit)) {
	if (!DCDB::UnitConv::convert(value, baseUnit, unit)) {
	    std::cerr << "Warning, cannot convert units ("
	    << DCDB::UnitConv::toString(baseUnit) << " -> "
	    << DCDB::UnitConv::toString(unit) << ")"
	    << std::endl;
	    return false;
	}
95
    }
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
	    
    return true;
}

void DCDBQuery::genOutput(std::list<DCDB::SensorDataStoreReading> &results, queryMap_t::iterator start, queryMap_t::iterator stop) {
    /* Print Header */
    std::cout << "Sensor,Time";
    for (queryMap_t::iterator it=start; it!=stop; it++) {
	switch(it->second.operation) {
	    case DCDB_OP_NONE:
		std::cout << ",Value";
		break;
	    case DCDB_OP_DELTA:
		std::cout << ",Delta";
		break;
	    case DCDB_OP_DELTAT:
		std::cout << ",Delta_t";
		break;
	    case DCDB_OP_DERIVATIVE:
		std::cout << ",Derivative";
		break;
	    case DCDB_OP_INTEGRAL:
		std::cout << ",Integral";
		break;
120
	    default:
Alessio Netti's avatar
Alessio Netti committed
121
		break;
122
	}
123
124
	
	std::string unitStr;
125
	if(it->second.unit != DCDB::Unit_None) {
126
	    unitStr = DCDB::UnitConv::toString(it->second.unit);
127
	} else if (baseUnit != DCDB::Unit_None) {
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
	    unitStr = DCDB::UnitConv::toString(baseUnit);
	}
	if (it->second.operation == DCDB_OP_DERIVATIVE) {
	    if ((unitStr.back() == 's') || (unitStr.back() == 'h')) {
		unitStr.pop_back();
	    } else if (unitStr.back() == 'J') {
		unitStr.pop_back();
		unitStr.append("W");
	    }
	} else if (it->second.operation == DCDB_OP_INTEGRAL) {
	    if (unitStr.back() == 'W') {
		unitStr.append("s");
	    }
	}
	if (unitStr.size() > 0) {
	    std::cout << " (" << unitStr << ")";
144
	}
145
    }
146
147
    std::cout << std::endl;
    
148
    int64_t prevReading=0;
149
    DCDB::TimeStamp prevT((uint64_t) 0);
150
151
    for (std::list<DCDB::SensorDataStoreReading>::iterator reading = results.begin(); reading != results.end(); reading++) {
	DCDB::TimeStamp ts = (*reading).timeStamp;
152
	
153
	/* Print the sensor's public name */
154
	std::cout << start->first.name << ",";
155
156
157
158
159
160
161
162
163
	
	/* Print the time stamp */
	if (useLocalTime)
	    ts.convertToLocal();
	
	if (useRawOutput)
	    std::cout << ts.getRaw();
	else
	    std::cout << ts.getString();
164
	
165
	int64_t value, result;
166
167
	/* Print the sensor value */
	for (queryMap_t::iterator it=start; it!=stop; it++) {
168
169
170
171
172
173
	    DCDB::Unit unit;
	    if (it->second.unit != DCDB::Unit_None) {
		unit = it->second.unit;
	    } else {
		unit = baseUnit;
	    }
174
175
	    value = (*reading).value;
	    result = 0;
176
177
178
179
180
181
182
183
184
185
186
187
	    bool resultOk = false;
	    switch(it->second.operation) {
		case DCDB_OP_NONE:
		    if (scaleAndConvert(value, baseScalingFactor, it->second.scalingFactor, baseUnit, it->second.unit)) {
			result = value;
			resultOk = true;
		    }
		    break;
		case DCDB_OP_DELTA:
		    if (scaleAndConvert(value, baseScalingFactor, it->second.scalingFactor, baseUnit, it->second.unit)) {
			if ((prevT > (uint64_t) 0) && (DCDB::delta(value, prevReading, &result) == DCDB::DCDB_OP_SUCCESS)) {
			    resultOk = true;
188
			}
189
190
191
192
		    }
		    break;
		case DCDB_OP_DELTAT:
		    if (scaleAndConvert(value, baseScalingFactor, it->second.scalingFactor, baseUnit, it->second.unit)) {
193
			if ((prevT > (uint64_t) 0) && (DCDB::delta(ts.getRaw(), prevT.getRaw(), &result) == DCDB::DCDB_OP_SUCCESS)) {
194
			    resultOk = true;
195
			}
196
197
198
199
200
201
202
		    }
		    break;
		case DCDB_OP_DERIVATIVE: {
		    int64_t prevValue = prevReading;
		    if (scaleAndConvert(value, baseScalingFactor, it->second.scalingFactor, baseUnit, it->second.unit) && scaleAndConvert(prevValue, baseScalingFactor, it->second.scalingFactor, baseUnit, it->second.unit)) {
			if( (prevT > (uint64_t) 0) && DCDB::derivative(value, prevValue, ts.getRaw(), prevT.getRaw(), &result, unit) == DCDB::DCDB_OP_SUCCESS) {
			    resultOk = true;
203
			}
204
205
206
207
208
209
210
		    }
		    break;}
		case DCDB_OP_INTEGRAL: {
		    int64_t prevValue = prevReading;
		    if (scaleAndConvert(prevValue, baseScalingFactor, it->second.scalingFactor, baseUnit, it->second.unit)) {
			if( (prevT > (uint64_t) 0) && DCDB::integral(prevValue, ts.getRaw(), prevT.getRaw(), &result, unit) == DCDB::DCDB_OP_SUCCESS) {
			    resultOk = true;
211
			}
212
213
214
215
216
217
218
		    }
		    break;}
		default:
		    break;
	    }
	    if (resultOk) {
		std::cout << "," << result;
219
220
221
222
	    } else {
		std::cout << ",";
	    }
	}
223
	prevReading = value;
224
	prevT = ts;
225
	std::cout << std::endl;
226
    }
227
228
}

229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
void DCDBQuery::setInterval(DCDB::TimeStamp start, DCDB::TimeStamp end) {
    start_ts = start;
    end_ts = end;
}

void DCDBQuery::parseSensorSpecification(const std::string sensor, std::string& sensorName, queryConfig_t& queryCfg) {
    
    std::string s = sensor;
    /* Check for function first */
    boost::regex functRegex("^([^\\(\\)]+)\\(([^\\(\\)]+)\\)$", boost::regex::extended);
    boost::smatch match;
    std::string functName;
    if(boost::regex_search(s, match, functRegex)) {
	functName = match[1].str();
	s = match[2].str();
    }
    
    /* Split into sensor name and potential modifier, i.e. unit conversion or scaling factor */
    boost::regex sensorRegex("([^\\@]+)\\@?([^\\@]*)", boost::regex::extended);
    std::string modifierStr;
    if(boost::regex_search(s, match, sensorRegex)) {
	sensorName = match[1].str();
	modifierStr = match[2].str();
252
    }
253
    
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
    queryCfg = { 1.0, DCDB::Unit_None, DCDB_OP_NONE};
    if (functName.length() == 0) {
	queryCfg.operation = DCDB_OP_NONE;
    } else if (boost::iequals(functName, "delta")) {
	queryCfg.operation = DCDB_OP_DELTA;
    } else if (boost::iequals(functName, "delta_t")) {
	queryCfg.operation = DCDB_OP_DELTAT;
    } else if (boost::iequals(functName, "derivative")) {
	queryCfg.operation = DCDB_OP_DERIVATIVE;
    } else if (boost::iequals(functName, "integral")) {
	queryCfg.operation = DCDB_OP_INTEGRAL;
    } else {
	queryCfg.operation = DCDB_OP_UNKNOWN;
	std::cerr << "Unknown sensor operation: " << functName << std::endl;
    }
    
    if (queryCfg.operation != DCDB_OP_UNKNOWN) {
	if (modifierStr.length() > 0) {
	    boost::regex e("[0-9]*\\.?[0-9]*", boost::regex::extended);
	    if (boost::regex_match(modifierStr, e)) {
		queryCfg.scalingFactor = atof(modifierStr.c_str());
	    } else {
		queryCfg.unit = DCDB::UnitConv::fromString(modifierStr);
	    }
	}
    }
}

void DCDBQuery::prepareQuery(std::list<std::string> sensors) {
283
284
285
    /* Initialize the SensorConfig interface */
    DCDB::SensorConfig sensorConfig(connection);
    
286
287
288
    /* Iterate over list of sensors requested by the user */
    for (std::list<std::string>::iterator it = sensors.begin(); it != sensors.end(); it++) {
	std::string sensorName;
289
290
	queryConfig_t queryCfg;
	parseSensorSpecification(*it, sensorName, queryCfg);
291
	if (queryCfg.operation != DCDB_OP_UNKNOWN) {
Michael Ott's avatar
Michael Ott committed
292
293
294
295
296
297
298
299
300
301
302
	    std::list <DCDB::PublicSensor> publicSensors;
	    sensorConfig.getPublicSensorsByWildcard(publicSensors, sensorName.c_str());
	    if (publicSensors.size() > 0) {
		for (auto sen: publicSensors) {
		    queries.insert(std::pair<DCDB::PublicSensor, queryConfig_t>(sen, queryCfg));
		}
	    } else {
		DCDB::PublicSensor pS;
		pS.name = sensorName;
		pS.pattern = sensorName;
		queries.insert(std::pair<DCDB::PublicSensor, queryConfig_t>(pS, queryCfg));
303
	    }
304
305
	}
    }
306
307
308
309
310
}

void DCDBQuery::prepareQuery(std::list<std::string> sensors, std::list<std::string> prefixes) {
    /* Initialize the SensorConfig interface */
    DCDB::SensorConfig sensorConfig(connection);
311
    
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
    /* Iterate over list of sensors requested by the user */
    for (std::list<std::string>::iterator it = sensors.begin(); it != sensors.end(); it++) {
	std::string sensorName;
	queryConfig_t queryCfg;
	parseSensorSpecification(*it, sensorName, queryCfg);
	for (auto p: prefixes) {
	    std::string s = p;
	    if (s.back() != '/') {
		s.push_back('/');
	    }
	    s.append(sensorName);
	    if (queryCfg.operation != DCDB_OP_UNKNOWN) {
		std::list <DCDB::PublicSensor> publicSensors;
		sensorConfig.getPublicSensorsByWildcard(publicSensors, s.c_str());
		if (publicSensors.size() > 0) {
		    for (auto sen: publicSensors) {
			queries.insert(std::pair<DCDB::PublicSensor, queryConfig_t>(sen, queryCfg));
		    }
		} else {
		    DCDB::PublicSensor pS;
		    pS.name = s;
		    pS.pattern = s;
		    queries.insert(std::pair<DCDB::PublicSensor, queryConfig_t>(pS, queryCfg));
		}
	    }
	}
    }
}

void DCDBQuery::execute() {
342
343
    std::string prevSensorName;
    for (auto q: queries) {
344
	if (q.first.name != prevSensorName) {
345
	    std::pair<queryMap_t::iterator, queryMap_t::iterator> range = queries.equal_range(q.first);
346
347
348
349
350
351
352
353
354
355
	    
	    /* Base scaling factor and unit of the public sensor */
	    baseUnit = DCDB::UnitConv::fromString(q.first.unit);
	    baseScalingFactor = q.first.scaling_factor;
	    
	    std::list<DCDB::SensorDataStoreReading> results;
	    DCDB::Sensor sensor(connection, q.first);
	    sensor.query(results, start_ts, end_ts, DCDB::AGGREGATE_NONE);
	    
	    genOutput(results, range.first, range.second);
356
	    
357
	    prevSensorName = q.first.name;
358
359
	}
    }
360
361
362
363
364
365
366
367
368
369
370
371
}

void DCDBQuery::doQuery(std::list<std::string> sensors, DCDB::TimeStamp start, DCDB::TimeStamp end) {
    setInterval(start, end);
    prepareQuery(sensors);
    execute();
}

void DCDBQuery::dojobQuery(std::list<std::string> sensors, std::string jobId) {
    DCDB::JobDataStore jobDataStore(connection);
    DCDB::JobData jobData;
    DCDB::JDError err = jobDataStore.getJobById(jobData, jobId);
372
    
373
374
375
376
377
378
379
380
    if (err == DCDB::JD_OK) {
	setInterval(jobData.startTime, jobData.endTime);
	prepareQuery(sensors, jobData.nodes);
	execute();
    } else {
	std::cerr << "Job not found: " << jobId << std::endl;
    }

Axel Auweter's avatar
Axel Auweter committed
381
382
}

Axel Auweter's avatar
Axel Auweter committed
383
DCDBQuery::DCDBQuery()
Axel Auweter's avatar
Axel Auweter committed
384
{
Axel Auweter's avatar
Axel Auweter committed
385
386
387
  connection = nullptr;
  useLocalTime = false;
  useRawOutput = false;
Axel Auweter's avatar
Axel Auweter committed
388
}