//================================================================================ // Name : query.cpp // Author : Axel Auweter, Daniele Tafani // Contact : info@dcdb.it // Copyright : Leibniz Supercomputing Centre // Description : Implementation of query class of dcdbquery //================================================================================ //================================================================================ // This file is part of DCDB (DataCenter DataBase) // Copyright (C) 2011-2019 Leibniz Supercomputing Centre // // This program is free software; you can redistribute it and/or // modify it under the terms of the GNU General Public License // as published by the Free Software Foundation; either version 2 // of the License, or (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU General Public License for more details. // // You should have received a copy of the GNU General Public License // along with this program; if not, write to the Free Software // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA. //================================================================================ #include #include #include #include #include #include #include #include #include "dcdbendian.h" #include "query.h" #include "dcdb/sensoroperations.h" void DCDBQuery::setLocalTimeEnabled(bool enable) { useLocalTime = enable; } bool DCDBQuery::getLocalTimeEnabled() { return useLocalTime; } void DCDBQuery::setRawOutputEnabled(bool enable) { useRawOutput = enable; } bool DCDBQuery::getRawOutputEnabled() { return useRawOutput; } bool scaleAndConvert(int64_t &value, double baseScalingFactor, double scalingFactor, DCDB::Unit baseUnit, DCDB::Unit unit) { if(scalingFactor != 1.0 || baseScalingFactor != 1.0) { if( DCDB::scale(&value, scalingFactor, baseScalingFactor) == DCDB::DCDB_OP_OVERFLOW) return false; } /* Convert the unit if requested */ if ((unit != DCDB::Unit_None) && (unit != baseUnit)) { if (!DCDB::UnitConv::convert(value, baseUnit, unit)) { std::cerr << "Warning, cannot convert units (" << DCDB::UnitConv::toString(baseUnit) << " -> " << DCDB::UnitConv::toString(unit) << ")" << std::endl; return false; } } return true; } void DCDBQuery::genOutput(std::list &results, queryMap_t::iterator start, queryMap_t::iterator stop) { /* Print Header */ std::cout << "Sensor,Time"; for (queryMap_t::iterator it=start; it!=stop; it++) { switch(it->second.operation) { case DCDB_OP_NONE: std::cout << ",Value"; break; case DCDB_OP_DELTA: std::cout << ",Delta"; break; case DCDB_OP_DELTAT: std::cout << ",Delta_t"; break; case DCDB_OP_DERIVATIVE: std::cout << ",Derivative"; break; case DCDB_OP_INTEGRAL: std::cout << ",Integral"; break; default: break; } std::string unitStr; if(it->second.unit != DCDB::Unit_None) { unitStr = DCDB::UnitConv::toString(it->second.unit); } else if (baseUnit != DCDB::Unit_None) { unitStr = DCDB::UnitConv::toString(baseUnit); } if (it->second.operation == DCDB_OP_DERIVATIVE) { if ((unitStr.back() == 's') || (unitStr.back() == 'h')) { unitStr.pop_back(); } else if (unitStr.back() == 'J') { unitStr.pop_back(); unitStr.append("W"); } } else if (it->second.operation == DCDB_OP_INTEGRAL) { if (unitStr.back() == 'W') { unitStr.append("s"); } } if (unitStr.size() > 0) { std::cout << " (" << unitStr << ")"; } } std::cout << std::endl; int64_t prevReading=0; DCDB::TimeStamp prevT((uint64_t) 0); for (std::list::iterator reading = results.begin(); reading != results.end(); reading++) { DCDB::TimeStamp ts = (*reading).timeStamp; /* Print the sensor's public name */ std::cout << start->first.name << ","; /* Print the time stamp */ if (useLocalTime) ts.convertToLocal(); if (useRawOutput) std::cout << ts.getRaw(); else std::cout << ts.getString(); int64_t value, result; /* Print the sensor value */ for (queryMap_t::iterator it=start; it!=stop; it++) { DCDB::Unit unit; if (it->second.unit != DCDB::Unit_None) { unit = it->second.unit; } else { unit = baseUnit; } value = (*reading).value; result = 0; bool resultOk = false; switch(it->second.operation) { case DCDB_OP_NONE: if (scaleAndConvert(value, baseScalingFactor, it->second.scalingFactor, baseUnit, it->second.unit)) { result = value; resultOk = true; } break; case DCDB_OP_DELTA: if (scaleAndConvert(value, baseScalingFactor, it->second.scalingFactor, baseUnit, it->second.unit)) { if ((prevT > (uint64_t) 0) && (DCDB::delta(value, prevReading, &result) == DCDB::DCDB_OP_SUCCESS)) { resultOk = true; } } break; case DCDB_OP_DELTAT: if (scaleAndConvert(value, baseScalingFactor, it->second.scalingFactor, baseUnit, it->second.unit)) { if ((prevT > (uint64_t) 0) && (DCDB::delta(ts.getRaw(), prevT.getRaw(), &result) == DCDB::DCDB_OP_SUCCESS)) { resultOk = true; } } break; case DCDB_OP_DERIVATIVE: { int64_t prevValue = prevReading; if (scaleAndConvert(value, baseScalingFactor, it->second.scalingFactor, baseUnit, it->second.unit) && scaleAndConvert(prevValue, baseScalingFactor, it->second.scalingFactor, baseUnit, it->second.unit)) { if( (prevT > (uint64_t) 0) && DCDB::derivative(value, prevValue, ts.getRaw(), prevT.getRaw(), &result, unit) == DCDB::DCDB_OP_SUCCESS) { resultOk = true; } } break;} case DCDB_OP_INTEGRAL: { int64_t prevValue = prevReading; if (scaleAndConvert(prevValue, baseScalingFactor, it->second.scalingFactor, baseUnit, it->second.unit)) { if( (prevT > (uint64_t) 0) && DCDB::integral(prevValue, ts.getRaw(), prevT.getRaw(), &result, unit) == DCDB::DCDB_OP_SUCCESS) { resultOk = true; } } break;} default: break; } if (resultOk) { std::cout << "," << result; } else { std::cout << ","; } } prevReading = value; prevT = ts; std::cout << std::endl; } } void DCDBQuery::doQuery(const char* hostname, std::list sensors, DCDB::TimeStamp start, DCDB::TimeStamp end) { /* Create a new connection to the database */ connection = new DCDB::Connection(); connection->setHostname(hostname); if (!connection->connect()) { std::cout << "Cannot connect to database." << std::endl; exit(EXIT_FAILURE); } /* Initialize the SensorConfig interface */ DCDB::SensorConfig sensorConfig(connection); /* Iterate over list of sensors requested by the user */ for (std::list::iterator it = sensors.begin(); it != sensors.end(); it++) { notifyOverflow = false; std::string sensorName; std::string functName; std::string modifierStr; //double scalingFactor = 1.0; //DCDB::Unit unit = DCDB::Unit_None; /* Retrieve sensor object first */ std::string str = *it; boost::regex functRegex("^([^\\(\\)]+)\\(([^\\(\\)]+)\\)$", boost::regex::extended); boost::smatch match; if(boost::regex_search(str, match, functRegex)) { functName = match[1].str(); str = match[2].str(); } boost::regex sensorRegex("([^\\@]+)\\@?([^\\@]*)", boost::regex::extended); if(boost::regex_search(str, match, sensorRegex)) { sensorName = match[1].str(); modifierStr = match[2].str(); } queryConfig_t queryCfg = { 1.0, DCDB::Unit_None, DCDB_OP_NONE}; if (functName.length() == 0) { queryCfg.operation = DCDB_OP_NONE; } else if (boost::iequals(functName, "delta")) { queryCfg.operation = DCDB_OP_DELTA; } else if (boost::iequals(functName, "delta_t")) { queryCfg.operation = DCDB_OP_DELTAT; } else if (boost::iequals(functName, "derivative")) { queryCfg.operation = DCDB_OP_DERIVATIVE; } else if (boost::iequals(functName, "integral")) { queryCfg.operation = DCDB_OP_INTEGRAL; } else { queryCfg.operation = DCDB_OP_UNKNOWN; std::cerr << "Unknown sensor operation: " << functName << std::endl; } if (queryCfg.operation != DCDB_OP_UNKNOWN) { if (modifierStr.length() > 0) { boost::regex e("[0-9]*\\.?[0-9]*", boost::regex::extended); if (boost::regex_match(modifierStr, e)) { queryCfg.scalingFactor = atof(modifierStr.c_str()); } else { queryCfg.unit = DCDB::UnitConv::fromString(modifierStr); } } std::list publicSensors; sensorConfig.getPublicSensorsByWildcard(publicSensors, sensorName.c_str()); if (publicSensors.size() > 0) { for (auto sen: publicSensors) { queries.insert(std::pair(sen, queryCfg)); } } else { DCDB::PublicSensor pS; pS.name = sensorName; pS.pattern = sensorName; queries.insert(std::pair(pS, queryCfg)); } } } std::string prevSensorName; for (auto q: queries) { if (q.first.name != prevSensorName) { std::pair range = queries.equal_range(q.first); /* Base scaling factor and unit of the public sensor */ baseUnit = DCDB::UnitConv::fromString(q.first.unit); baseScalingFactor = q.first.scaling_factor; std::list results; DCDB::Sensor sensor(connection, q.first); sensor.query(results, start, end, DCDB::AGGREGATE_NONE); genOutput(results, range.first, range.second); if(notifyOverflow) std::cout << "Overflow detected." << std::endl; prevSensorName = q.first.name; } } /* * Clean up */ connection->disconnect(); delete connection; } DCDBQuery::DCDBQuery() { connection = nullptr; useLocalTime = false; useRawOutput = false; }