Commit 471fd55a authored by Micha Müller's avatar Micha Müller
Browse files

Merge branch 'development' of https://gitlab.lrz.de/dcdb/dcdb into development

parents b9c4cf89 ca1364e0
......@@ -30,6 +30,7 @@
#include "sensornavigator.h"
#include "sensorbase.h"
#include "metadatastore.h"
#include <atomic>
using namespace std;
......@@ -46,6 +47,8 @@ struct qeJobData {
typedef bool (*QueryEngineCallback)(const string&, const uint64_t, const uint64_t, vector<reading_t>&, const bool);
//Typedef for the job retrieval callback
typedef bool (*QueryEngineJobCallback)(const uint32_t, const uint64_t, const uint64_t, vector<qeJobData>&, const bool, const bool);
//Typedef for the metadata retrieval callback
typedef bool (*QueryEngineMetadataCallback)(const string&, SensorMetadata&);
/**
* @brief Class that grants query access to local and remote sensors
......@@ -135,7 +138,7 @@ public:
*
* @param cb Pointer to a function of type QueryEngineCallback
*/
void setQueryCallback(QueryEngineCallback cb) { _callback = cb; }
void setQueryCallback(QueryEngineCallback cb) { _callback = cb; }
/**
* @brief Sets the internal callback to retrieve job data
......@@ -148,6 +151,17 @@ public:
*/
void setJobQueryCallback(QueryEngineJobCallback jcb) { _jCallback = jcb; }
/**
* @brief Sets the internal callback to retrieve sensor metadata data
*
* This method sets the internal callback that will be used by the QueryEngine to retrieve sensor
* metadata and thus implement an abstraction layer. Behavior of the callback must be identical to
* that specified in setQueryCallback.
*
* @param mcb Pointer to a function of type QueryEngineMetadataCallback
*/
void setMetadataQueryCallback(QueryEngineMetadataCallback mcb) { _mCallback = mcb; }
/**
* @brief Returns the internal SensorNavigator object
*
......@@ -251,6 +265,24 @@ public:
return _jCallback(jobId, startTs, endTs, buffer, rel, range);
}
/**
* @brief Perform a sensor metadata query
*
* This method allows to retrieve the metadata of available sensors. The input "buffer" object
* allows to re-use memory over successive readings. Note that in order to use this method, a
* callback must have been set through the setMetadataQueryCallback method. If not, this
* method will throw an exception.
*
* @param name Name of the sensor to be queried
* @param buffer SensorMetadata object in which to store the result
* @return True if successful, false otherwise
*/
bool queryMetadata(const string& name, SensorMetadata& buffer) {
if(!_mCallback)
throw runtime_error("Query Engine: sensor metadata callback not set!");
return _mCallback(name, buffer);
}
/**
* @brief Locks access to the QueryEngine
*
......@@ -288,6 +320,7 @@ private:
_sensorMap = NULL;
_callback = NULL;
_jCallback = NULL;
_mCallback = NULL;
updating.store(false);
access.store(0);
}
......@@ -310,6 +343,8 @@ private:
QueryEngineCallback _callback;
// Callback used to retrieve job data
QueryEngineJobCallback _jCallback;
// Callback used to retrieve metadata
QueryEngineMetadataCallback _mCallback;
// String storing the current sensor hierarchy, used for convenience
string _sensorHierarchy;
// String storing the filter to be used when building a sensor navigator
......
......@@ -64,10 +64,11 @@ public:
}
};
void PerSystDB::print_error(){
PerSystDB * PerSystDB::instance = nullptr;
std::mutex PerSystDB::mut;
std::cout << "Error(" << mysql_errno(_mysql) << ") [" << mysql_sqlstate(_mysql) << "] \""<< mysql_error(_mysql) << "\"" << std::endl;
//mysql_close(mysql);
void PerSystDB::print_error(){
LOG(error) << "Error(" << mysql_errno(_mysql) << ") [" << mysql_sqlstate(_mysql) << "] \""<< mysql_error(_mysql) << "\"" ;
}
PerSystDB::PerSystDB(): _mysql(NULL), _rotation(EVERY_MONTH), _every_x_days(0), _internal_connection(false), _end_aggregate_timestamp(0) {
......@@ -76,7 +77,16 @@ PerSystDB::PerSystDB(): _mysql(NULL), _rotation(EVERY_MONTH), _every_x_days(0),
PerSystDB::PerSystDB(MYSQL *mysql, Rotation_t rotation, unsigned int every_x_days) :
_mysql(mysql), _rotation(rotation), _every_x_days(every_x_days), _internal_connection(
false), _end_aggregate_timestamp(0) {
}
PerSystDB * PerSystDB::getInstance(){
// no lock here
if (instance) return instance;
std::lock_guard<std::mutex> lock(mut);
if (instance) return instance;
return instance = new PerSystDB();
}
bool PerSystDB::initializeConnection(const std::string & host, const std::string & user,
......@@ -87,11 +97,13 @@ bool PerSystDB::initializeConnection(const std::string & host, const std::string
print_error();
return false;
}
LOG(debug) << "Successfully connected to mariadb";
return true;
}
bool PerSystDB::finalizeConnection(){
mysql_close(_mysql);
LOG(debug) << "Closed mariadb";
return true;
}
......@@ -103,6 +115,8 @@ PerSystDB::PerSystDB(const std::string & host, const std::string & user,
_mysql = mysql_init(NULL);
if(!mysql_real_connect(_mysql, host.c_str(), user.c_str(), password.c_str(), database_name.c_str(), port, NULL, 0)){
print_error();
} else {
LOG(debug) << "Internal Connection: Successfully connected to mariadb";
}
}
......@@ -110,43 +124,42 @@ PerSystDB::PerSystDB(const std::string & host, const std::string & user,
PerSystDB::~PerSystDB(){
if(_internal_connection){
mysql_close(_mysql);
LOG(debug) << "Internal Connection: Disconnected from mariadb";
}
}
bool PerSystDB::getDBJobIDs(std::vector<std::string> & job_id_strings, std::map<std::string, std::string>& job_id_map ){
std::stringstream build_query;
build_query << "SELECT job_id, job_id_string FROM Accounting WHERE job_id_string IN (";
for(std::vector<std::string>::size_type i = 0; i < job_id_strings.size(); ++i){
build_query << "'" << job_id_strings[i] << "'";
if (i != job_id_strings.size()-1) { //not last element
build_query << ",";
}
}
build_query << ")";
auto query = build_query.str();
#if DEBUG
std::cout << query << std::endl;
#endif
if(mysql_real_query(_mysql, query.c_str(), query.size())){
print_error();
return false;
}
bool PerSystDB::getDBJobIDs(std::vector<std::string> & job_id_strings, std::map<std::string, std::string>& job_id_map) {
std::stringstream build_query;
build_query << "SELECT job_id, job_id_string FROM Accounting WHERE job_id_string IN (";
for (std::vector<std::string>::size_type i = 0; i < job_id_strings.size();
++i) {
build_query << "'" << job_id_strings[i] << "'";
if (i != job_id_strings.size() - 1) { //not last element
build_query << ",";
}
}
build_query << ")";
auto query = build_query.str();
LOG(debug)<< query;
if (mysql_real_query(_mysql, query.c_str(), query.size())) {
print_error();
return false;
}
SQLResult result(_mysql);
if(result.get()){
MYSQL_ROW row;
while((row = result.fetch_row())){
if(row[0]){
job_id_map[std::string(row[1])]= row[0];
}
}
}
return true;
SQLResult result(_mysql);
if (result.get()) {
MYSQL_ROW row;
while ((row = result.fetch_row())) {
if (row[0]) {
job_id_map[std::string(row[1])] = row[0];
}
}
}
return true;
}
bool PerSystDB::getCurrentSuffixAggregateTable(std::string & suffix){
if(_end_aggregate_timestamp){
auto now_uts = getTimestamp();
if(now_uts < _end_aggregate_timestamp) { //suffix found, don't do anything
......@@ -162,9 +175,7 @@ bool PerSystDB::getCurrentSuffixAggregateTable(std::string & suffix){
build_query << "SELECT suffix, UNIX_TIMESTAMP(end_timestamp) FROM SuffixToAggregateTable WHERE begin_timestamp < \'";
build_query << date_time << "\' AND end_timestamp > \'" << date_time << "\'";
auto query = build_query.str();
#if DEBUG
std::cout << query << std::endl;
#endif
LOG(debug) << query;
if(mysql_real_query(_mysql, query.c_str(), query.size())){
print_error();
......@@ -189,22 +200,24 @@ bool PerSystDB::getCurrentSuffixAggregateTable(std::string & suffix){
bool PerSystDB::insertIntoJob(const std::string& job_id_string, unsigned long long uid, int & job_id_db, const std::string & suffix){
std::stringstream build_insert;
build_insert << "INSERT INTO Accounting (job_id_string, user, aggregate_first_suffix, aggregate_last_suffix) VALUES (\'" << job_id_string << "\',\'";
auto* pass = getpwuid(static_cast<uid_t>(uid));
build_insert << pass->pw_name << "\',\'";
build_insert << suffix << "\',\'" << suffix << "\')";
std::string query = build_insert.str();
#if DEBUG
std::cout << query << std::endl;
#endif
std::stringstream build_insert;
build_insert << "INSERT INTO Accounting (job_id_string, user, aggregate_first_suffix, aggregate_last_suffix) VALUES (\'" << job_id_string << "\',\'";
auto* pass = getpwuid(static_cast<uid_t>(uid));
if (pass == nullptr) {
LOG(error)<< "User " << uid << " not found in system.";
return false;
}
build_insert << pass->pw_name << "\',\'";
build_insert << suffix << "\',\'" << suffix << "\')";
std::string query = build_insert.str();
LOG(debug)<< query;
if(mysql_real_query(_mysql, query.c_str(), query.size())){
print_error();
return false;
}
job_id_db = mysql_insert_id(_mysql);
return true;
if (mysql_real_query(_mysql, query.c_str(), query.size())) {
print_error();
return false;
}
job_id_db = mysql_insert_id(_mysql);
return true;
}
......@@ -220,9 +233,7 @@ bool PerSystDB::insertInAggregateTable(const std::string& suffix, Aggregate_info
}
build_insert << "\', \'" << agg.severity_average << "\')";
std::string query = build_insert.str();
#if DEBUG
std::cout << query << std::endl;
#endif
LOG(debug) << query;
if(mysql_real_query(_mysql, query.c_str(), query.size())){
print_error();
......@@ -264,9 +275,7 @@ bool PerSystDB::createNewAggregate(std::string& new_suffix){
build_insert << "INSERT INTO SuffixToAggregateTable VALUES(\'" << new_suff;
build_insert << "\', \'" << new_begin_timestamp << "\', \'" << new_end_timestamp << "\')";
auto query = build_insert.str();
#if DEBUG
std::cout << query << std::endl;
#endif
LOG(debug) << query;
if (mysql_real_query(_mysql, query.c_str(), query.size())){
print_error();
return false;
......@@ -275,9 +284,7 @@ bool PerSystDB::createNewAggregate(std::string& new_suffix){
std::stringstream build_create;
build_create << "CREATE TABLE Aggregate_" << new_suff << " LIKE Aggregate";
auto query2 = build_create.str();
#if DEBUG
std::cout << query2 << std::endl;
#endif
LOG(debug) << query2;
if(mysql_real_query(_mysql, query2.c_str(), query2.size() )){
if(mysql_errno(_mysql) == 1050){
return true; //table exists!
......@@ -304,9 +311,7 @@ bool PerSystDB::updateJobsLastSuffix(std::map<std::string, std::string>& job_map
count++;
}
auto query = build_update.str();
#if DEBUG
std::cout << query << std::endl;
#endif
LOG(debug) << query;
if (mysql_real_query(_mysql, query.c_str(), query.size())){
print_error();
......@@ -354,10 +359,8 @@ void PerSystDB::getNewDates(const std::string& last_end_timestamp, std::string &
_end_aggregate_timestamp = diff.total_seconds() * 1e9;
end_timestamp = to_iso_extended_string(d) + " 00:00:00";
#if DEBUG
std::cout << "boost_end_timestamp =" << _end_aggregate_timestamp << std::endl;
std::cout << "boost_end_aggregate_timestamp=" << end_timestamp << std::endl;
#endif
LOG(debug) << "_end_aggregate_timestamp =" << _end_aggregate_timestamp;
LOG(debug) << "end_timestamp =" << end_timestamp;
}
......@@ -29,9 +29,11 @@
#define ANALYTICS_OPERATORS_PERSYSTSQL_PERSYSTDB_H_
#include "mariadb/mysql.h"
#include "../../../common/include/logging.h"
#include <vector>
#include <string>
#include <map>
#include <mutex>
struct Aggregate_info_t {
std::string job_id_db;
......@@ -60,9 +62,11 @@ protected:
bool _internal_connection;
unsigned long long _end_aggregate_timestamp;
std::string _current_table_suffix;
boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
void print_error();
static PerSystDB * instance;
static std::mutex mut;
public:
PerSystDB(); //Used for later initialization of mysql
bool initializeConnection(const std::string & host,
......@@ -89,6 +93,7 @@ public:
bool insertInAggregateTable(const std::string& suffix, Aggregate_info_t & agg_info);
bool updateJobsLastSuffix(std::map<std::string, std::string>& job_map, std::string & suffix);
bool getTableSuffix(std::string & table_suffix);
static PerSystDB * getInstance();
};
......
......@@ -40,33 +40,43 @@
PerSystSqlConfigurator::PerSystSqlConfigurator(): JobOperatorConfiguratorTemplate() {
_operatorName = "persystsql";
_baseName = "sensor";
_rotation_map["EVERY_YEAR"] = PerSystDB::EVERY_YEAR;
_rotation_map["EVERY_MONTH"] = PerSystDB::EVERY_MONTH;
_rotation_map["EVERY_XDAYS"] = PerSystDB::EVERY_XDAYS;
}
PerSystSqlConfigurator::~PerSystSqlConfigurator() {
}
void PerSystSqlConfigurator::sensorBase(AggregatorSensorBase& s, CFG_VAL config) {
BOOST_FOREACH(boost::property_tree::iptree::value_type &val, config)
{
if (boost::iequals(val.first, "operation")) {
std::string opName = val.second.data();
if (opName == "average"){
s.setOperation(AggregatorSensorBase::AVG);
} else if (opName == "deciles" || opName == "percentiles" || opName == "quantile"){
s.setOperation(AggregatorSensorBase::QTL);
} else if (opName == "observations" || opName == "numobs") {
s.setOperation(AggregatorSensorBase::OBS);
} else if (opName == "average_severity"){
s.setOperation(AggregatorSensorBase::AVG_SEV);
} else {
LOG(error) << "PerSystSqlConfigurator operation " << opName << " not supported!" ;
}
}
}
BOOST_FOREACH(boost::property_tree::iptree::value_type &val, config)
{
if (boost::iequals(val.first, "operation")) {
std::string opName = val.second.data();
if (opName == "average") {
s.setOperation(AggregatorSensorBase::AVG);
} else if (opName == "deciles" || opName == "percentiles" || opName == "quantile") {
s.setOperation(AggregatorSensorBase::QTL);
} else if (opName == "observations" || opName == "numobs") {
s.setOperation(AggregatorSensorBase::OBS);
} else if (opName == "average_severity") {
s.setOperation(AggregatorSensorBase::AVG_SEV);
} else {
LOG(error) << "PerSystSqlConfigurator operation " << opName << " not supported!";
}
}
}
}
void PerSystSqlConfigurator::operatorAttributes(PerSystSqlOperator& op, CFG_VAL config) {
std::string host;
std::string user;
std::string password;
std::string database_name;
int port = -1;
PerSystDB::Rotation_t rotation = PerSystDB::EVERY_MONTH;
PerSystSqlOperator::Backend_t backend = PerSystSqlOperator::DEFAULT;
unsigned int every_x_days = 0;
BOOST_FOREACH(boost::property_tree::iptree::value_type &val, config) {
if (boost::iequals(val.first, "number_quantiles")) {
unsigned int num_quantiles = std::stoul(val.second.data());
......@@ -96,15 +106,39 @@ void PerSystSqlConfigurator::operatorAttributes(PerSystSqlOperator& op, CFG_VAL
op.setSeverityMaxMemory(max_memory);
} else if (boost::iequals(val.first, "backend")) {
if(val.second.data() == "cassandra") {
backend = PerSystSqlOperator::CASSANDRA;
op.setBackend(PerSystSqlOperator::CASSANDRA);
} else if(val.second.data() == "mariadb") {
backend = PerSystSqlOperator::MARIADB;
op.setBackend(PerSystSqlOperator::MARIADB);
}
} else if(boost::iequals(val.first, "property_id")){
} else if(boost::iequals(val.first, "property_id")){
auto property_id = std::stoi(val.second.data());
op.setPropertyId(property_id);
} else if (boost::iequals(val.first, "mariadb_host")) {
host = val.second.data();
} else if (boost::iequals(val.first, "mariadb_user")){
user = val.second.data();
} else if (boost::iequals(val.first, "mariadb_password")){
password = val.second.data();
} else if (boost::iequals(val.first, "mariadb_database_name")){
database_name = val.second.data();
} else if (boost::iequals(val.first, "mariadb_port")){
port = std::stoi(val.second.data());
} else if (boost::iequals(val.first, "mariadb_rotation")){
auto found = _rotation_map.find(val.second.data());
if (found != _rotation_map.end()){
rotation = found->second;
} else {
LOG(error) << " Rotation strategy (" << val.second.data() << ") not found.";
}
} else if (boost::iequals(val.first, "mariadb_every_x_days")){
every_x_days = std::stoi(val.second.data());
}
}
if(backend == PerSystSqlOperator::MARIADB) {
op.setMariaDBConnection(host, user, password, database_name, port, rotation, every_x_days);
}
}
bool PerSystSqlConfigurator::unit(UnitTemplate<AggregatorSensorBase>& u) {
......
......@@ -40,6 +40,7 @@ public:
PerSystSqlConfigurator();
virtual ~PerSystSqlConfigurator();
private:
std::map<std::string, PerSystDB::Rotation_t> _rotation_map;
void sensorBase(AggregatorSensorBase& s, CFG_VAL config) override;
void operatorAttributes(PerSystSqlOperator& op, CFG_VAL config) override;
bool unit(UnitTemplate<AggregatorSensorBase>& u) override;
......
......@@ -47,13 +47,15 @@
#include "../../includes/UnitTemplate.h"
int PerSystSqlOperator::_number_of_calls = 0;
PerSystDB PerSystSqlOperator::persystdb;
bool PerSystSqlOperator::persystdb_initialized = false;
std::mutex PerSystSqlOperator::mut;
PerSystSqlOperator::PerSystSqlOperator(const std::string& name) :
OperatorTemplate(name), JobOperatorTemplate(name), _number_of_even_quantiles(
0), _severity_formula(NOFORMULA), _severity_threshold(0), _severity_exponent(
0), _severity_max_memory(0), _go_back_ns(0), _backend(DEFAULT), _scaling_factor(
1), _property_id(0) {
_persystdb = PerSystDB::getInstance();
}
PerSystSqlOperator::~PerSystSqlOperator() {
......@@ -91,6 +93,7 @@ void PerSystSqlOperator::copy(const PerSystSqlOperator& other){
this->_conn.rotation = other._conn.rotation;
this->_conn.user = other._conn.user;
this->_property_id = other._property_id;
this->_persystdb = other._persystdb;
}
void PerSystSqlOperator::printConfig(LOG_LEVEL ll) {
......@@ -127,23 +130,35 @@ void PerSystSqlOperator::compute(U_Ptr unit, qeJobData& jobData) {
for (const auto& subUnit : unit->getSubUnits()) {
// Since we do not clear the internal buffer, all sensor readings will be accumulated in the same vector
for (const auto& in : subUnit->getInputs()) {
_scaling_factor = in->getMetadata()->scale;
if( _scaling_factor == 1){
SensorMetadata buffer;
if(_queryEngine.queryMetadata(in->getName(), buffer)){
_scaling_factor = buffer.scale;
}
}
if (!_queryEngine.querySensor(in->getName(), my_timestamp, my_timestamp, _buffer, false)) {
LOG(debug)<< "PerSystSql Operator " << _name << " cannot read from sensor " << in->getName() << "!";
}
}
}
static bool persystdb_initialized = false;
if ( _backend == MARIADB && !persystdb_initialized) {
bool persystdb_initialized = persystdb.initializeConnection(_conn.host, _conn.user, _conn.password, _conn.database_name, _conn.rotation, _conn.port, _conn.every_x_days);
if(!persystdb_initialized) {
LOG(error) << "Unable to establish connection to database";
return;
}
if(_buffer.size() == 0){
LOG(error) << "PerSystSql Operator " << _name << ": no data in queryEngine found!";
return;
}
Aggregate_info_t agg_info;
std::string table_suffix;
if(_backend == MARIADB){
compute_internal(unit, _buffer, agg_info);
if( _backend == MARIADB ) {
std::lock_guard<std::mutex> lk(mut);
if (!persystdb_initialized) {
bool persystdb_initialized = _persystdb->initializeConnection(_conn.host, _conn.user, _conn.password, _conn.database_name, _conn.rotation, _conn.port, _conn.every_x_days);
if(!persystdb_initialized) {
LOG(error) << "Unable to establish connection to database";
return;
}
}
std::stringstream jobidBuilder;
jobidBuilder << jobData.jobId;
......@@ -151,35 +166,32 @@ void PerSystSqlOperator::compute(U_Ptr unit, qeJobData& jobData) {
job_ids.push_back(jobidBuilder.str());
std::map<std::string, std::string> job_map;
if(!persystdb.getTableSuffix(table_suffix)){
LOG(error) << "failed to create table!";
return;
}
if(!persystdb.getDBJobIDs(job_ids, job_map)){
return;
}
// handle jobs which are not present
for(auto &job_id_string : job_ids ){
auto search = job_map.find(job_id_string);
if(search == job_map.end()){ //Not found
int job_id_db;
if(persystdb.insertIntoJob(job_id_string, jobData.userId, job_id_db, table_suffix)){
agg_info.job_id_db = std::to_string(job_id_db);
} else {
continue;
}
}
}
agg_info.timestamp = (my_timestamp/1e9);
}
std::string table_suffix;
if(!_persystdb->getTableSuffix(table_suffix)){
LOG(error) << "failed to create table!";
return;
}
if(!_persystdb->getDBJobIDs(job_ids, job_map)){
return;
}
compute_internal(unit, _buffer, agg_info);
// handle jobs which are not present
for(auto &job_id_string : job_ids ){
auto search = job_map.find(job_id_string);
if(search == job_map.end()){ //Not found
int job_id_db;
if(_persystdb->insertIntoJob(job_id_string, jobData.userId, job_id_db, table_suffix)){
agg_info.job_id_db = std::to_string(job_id_db);
} else {
continue;
}
}
}
agg_info.timestamp = (my_timestamp/1e9);
if(_backend == MARIADB){
persystdb.insertInAggregateTable(table_suffix, agg_info);
_persystdb->insertInAggregateTable(table_suffix, agg_info);
if(_number_of_calls % 10 == 0 && persystdb_initialized){
persystdb.finalizeConnection();
_persystdb->finalizeConnection();
persystdb_initialized = false;
}
_number_of_calls++;
......