Commit 777a70c2 authored by Michael Ott's avatar Michael Ott
Browse files

Allow for querying sensors of all nodes in a job

parent 31009e8c
......@@ -45,9 +45,11 @@ void usage(void)
if (isatty(fileno(stdin))) {
/* 0---------1---------2---------3---------4---------5---------6---------7--------- */
std::cout << "Usage:" << std::endl;
std::cout << " dcdbquery [-h <host>] [-d] [-r] [-l] <Sensor 1> [<Sensor 2> ...] <Start> <End>" << std::endl;
std::cout << " dcdbquery [-h <host>] [-r] [-l] <Sensor 1> [<Sensor 2> ...] <Start> <End>" << std::endl;
std::cout << " dcdbquery [-h <host>] [-r] [-l] -j <jobId> <Sensor 1> [<Sensor 2> ...]" << std::endl;
std::cout << std::endl;
std::cout << "Parameters:" << std::endl;
std::cout << " <jobId> a job to query sensors for" << std::endl;
std::cout << " <Sensor n> a sensor name" << std::endl;
std::cout << " <Start> start of time series" << std::endl;
std::cout << " <End> end of time series" << std::endl;
......@@ -115,8 +117,9 @@ int main(int argc, char * const argv[])
if (!host) {
host = "localhost";
}
std::string jobId;
while ((ret=getopt(argcReal, argvReal, "+h:rlf"))!=-1) {
while ((ret=getopt(argcReal, argvReal, "+h:rlj:"))!=-1) {
switch(ret) {
case 'h':
host = optarg;
......@@ -127,6 +130,9 @@ int main(int argc, char * const argv[])
case 'l':
myQuery->setLocalTimeEnabled(true);
break;
case 'j':
jobId = optarg;
break;
default:
usage();
exit(EXIT_FAILURE);
......@@ -135,29 +141,38 @@ int main(int argc, char * const argv[])
/* Try to create TimeStamp objects from the arguments */
DCDB::TimeStamp start, end;
try {
bool local = myQuery->getLocalTimeEnabled();
start = DCDB::TimeStamp(argvReal[argcReal-2], local);
end = DCDB::TimeStamp(argvReal[argcReal-1], local);
}
catch (std::exception& e) {
std::cout << "Wrong time format." << std::endl;
exit(EXIT_FAILURE);
}
/* Ensure start < end */
if(start > end) {
std::cout << "Start time must be earlier than end time." << std::endl;
exit(EXIT_FAILURE);
if (jobId.size() == 0) {
try {
bool local = myQuery->getLocalTimeEnabled();
start = DCDB::TimeStamp(argvReal[argcReal-2], local);
end = DCDB::TimeStamp(argvReal[argcReal-1], local);
} catch (std::exception& e) {
std::cout << "Wrong time format." << std::endl;
exit(EXIT_FAILURE);
}
/* Ensure start < end */
if(start > end) {
std::cout << "Start time must be earlier than end time." << std::endl;
exit(EXIT_FAILURE);
}
argcReal-= 2;
}
/* Build a list of sensornames */
std::list<std::string> sensors;
for (int arg = optind; arg < argcReal-2; arg++) {
for (int arg = optind; arg < argcReal; arg++) {
sensors.push_back(argvReal[arg]);
}
myQuery->doQuery(host, sensors, start, end);
if (myQuery->connect(host) == 0) {
if (jobId.size() == 0) {
myQuery->doQuery(sensors, start, end);
} else {
myQuery->dojobQuery(sensors, jobId);
}
myQuery->disconnect();
}
delete myQuery;
......
......@@ -39,6 +39,8 @@
#include "dcdbendian.h"
#include "query.h"
#include "dcdb/sensoroperations.h"
#include <dcdb/jobdatastore.h>
void DCDBQuery::setLocalTimeEnabled(bool enable) {
useLocalTime = enable;
......@@ -56,6 +58,25 @@ bool DCDBQuery::getRawOutputEnabled() {
return useRawOutput;
}
int DCDBQuery::connect(const char* hostname) {
if (connection != nullptr) {
return 0;
}
connection = new DCDB::Connection();
connection->setHostname(hostname);
if (!connection->connect()) {
std::cout << "Cannot connect to database." << std::endl;
return 1;
}
return 0;
}
void DCDBQuery::disconnect() {
connection->disconnect();
delete connection;
connection = nullptr;
}
bool scaleAndConvert(int64_t &value, double baseScalingFactor, double scalingFactor, DCDB::Unit baseUnit, DCDB::Unit unit) {
if(scalingFactor != 1.0 || baseScalingFactor != 1.0) {
if( DCDB::scale(&value, scalingFactor, baseScalingFactor) == DCDB::DCDB_OP_OVERFLOW)
......@@ -205,69 +226,69 @@ void DCDBQuery::genOutput(std::list<DCDB::SensorDataStoreReading> &results, quer
}
}
void DCDBQuery::doQuery(const char* hostname, std::list<std::string> sensors, DCDB::TimeStamp start, DCDB::TimeStamp end)
{
/* Create a new connection to the database */
connection = new DCDB::Connection();
connection->setHostname(hostname);
if (!connection->connect()) {
std::cout << "Cannot connect to database." << std::endl;
exit(EXIT_FAILURE);
void DCDBQuery::setInterval(DCDB::TimeStamp start, DCDB::TimeStamp end) {
start_ts = start;
end_ts = end;
}
void DCDBQuery::parseSensorSpecification(const std::string sensor, std::string& sensorName, queryConfig_t& queryCfg) {
std::string s = sensor;
/* Check for function first */
boost::regex functRegex("^([^\\(\\)]+)\\(([^\\(\\)]+)\\)$", boost::regex::extended);
boost::smatch match;
std::string functName;
if(boost::regex_search(s, match, functRegex)) {
functName = match[1].str();
s = match[2].str();
}
/* Split into sensor name and potential modifier, i.e. unit conversion or scaling factor */
boost::regex sensorRegex("([^\\@]+)\\@?([^\\@]*)", boost::regex::extended);
std::string modifierStr;
if(boost::regex_search(s, match, sensorRegex)) {
sensorName = match[1].str();
modifierStr = match[2].str();
}
queryCfg = { 1.0, DCDB::Unit_None, DCDB_OP_NONE};
if (functName.length() == 0) {
queryCfg.operation = DCDB_OP_NONE;
} else if (boost::iequals(functName, "delta")) {
queryCfg.operation = DCDB_OP_DELTA;
} else if (boost::iequals(functName, "delta_t")) {
queryCfg.operation = DCDB_OP_DELTAT;
} else if (boost::iequals(functName, "derivative")) {
queryCfg.operation = DCDB_OP_DERIVATIVE;
} else if (boost::iequals(functName, "integral")) {
queryCfg.operation = DCDB_OP_INTEGRAL;
} else {
queryCfg.operation = DCDB_OP_UNKNOWN;
std::cerr << "Unknown sensor operation: " << functName << std::endl;
}
if (queryCfg.operation != DCDB_OP_UNKNOWN) {
if (modifierStr.length() > 0) {
boost::regex e("[0-9]*\\.?[0-9]*", boost::regex::extended);
if (boost::regex_match(modifierStr, e)) {
queryCfg.scalingFactor = atof(modifierStr.c_str());
} else {
queryCfg.unit = DCDB::UnitConv::fromString(modifierStr);
}
}
}
}
void DCDBQuery::prepareQuery(std::list<std::string> sensors) {
/* Initialize the SensorConfig interface */
DCDB::SensorConfig sensorConfig(connection);
/* Iterate over list of sensors requested by the user */
for (std::list<std::string>::iterator it = sensors.begin(); it != sensors.end(); it++) {
notifyOverflow = false;
std::string sensorName;
std::string functName;
std::string modifierStr;
//double scalingFactor = 1.0;
//DCDB::Unit unit = DCDB::Unit_None;
/* Retrieve sensor object first */
std::string str = *it;
boost::regex functRegex("^([^\\(\\)]+)\\(([^\\(\\)]+)\\)$", boost::regex::extended);
boost::smatch match;
if(boost::regex_search(str, match, functRegex)) {
functName = match[1].str();
str = match[2].str();
}
boost::regex sensorRegex("([^\\@]+)\\@?([^\\@]*)", boost::regex::extended);
if(boost::regex_search(str, match, sensorRegex)) {
sensorName = match[1].str();
modifierStr = match[2].str();
}
queryConfig_t queryCfg = { 1.0, DCDB::Unit_None, DCDB_OP_NONE};
if (functName.length() == 0) {
queryCfg.operation = DCDB_OP_NONE;
} else if (boost::iequals(functName, "delta")) {
queryCfg.operation = DCDB_OP_DELTA;
} else if (boost::iequals(functName, "delta_t")) {
queryCfg.operation = DCDB_OP_DELTAT;
} else if (boost::iequals(functName, "derivative")) {
queryCfg.operation = DCDB_OP_DERIVATIVE;
} else if (boost::iequals(functName, "integral")) {
queryCfg.operation = DCDB_OP_INTEGRAL;
} else {
queryCfg.operation = DCDB_OP_UNKNOWN;
std::cerr << "Unknown sensor operation: " << functName << std::endl;
}
queryConfig_t queryCfg;
parseSensorSpecification(*it, sensorName, queryCfg);
if (queryCfg.operation != DCDB_OP_UNKNOWN) {
if (modifierStr.length() > 0) {
boost::regex e("[0-9]*\\.?[0-9]*", boost::regex::extended);
if (boost::regex_match(modifierStr, e)) {
queryCfg.scalingFactor = atof(modifierStr.c_str());
} else {
queryCfg.unit = DCDB::UnitConv::fromString(modifierStr);
}
}
std::list <DCDB::PublicSensor> publicSensors;
sensorConfig.getPublicSensorsByWildcard(publicSensors, sensorName.c_str());
if (publicSensors.size() > 0) {
......@@ -282,33 +303,81 @@ void DCDBQuery::doQuery(const char* hostname, std::list<std::string> sensors, DC
}
}
}
}
void DCDBQuery::prepareQuery(std::list<std::string> sensors, std::list<std::string> prefixes) {
/* Initialize the SensorConfig interface */
DCDB::SensorConfig sensorConfig(connection);
/* Iterate over list of sensors requested by the user */
for (std::list<std::string>::iterator it = sensors.begin(); it != sensors.end(); it++) {
std::string sensorName;
queryConfig_t queryCfg;
parseSensorSpecification(*it, sensorName, queryCfg);
for (auto p: prefixes) {
std::string s = p;
if (s.back() != '/') {
s.push_back('/');
}
s.append(sensorName);
if (queryCfg.operation != DCDB_OP_UNKNOWN) {
std::list <DCDB::PublicSensor> publicSensors;
sensorConfig.getPublicSensorsByWildcard(publicSensors, s.c_str());
if (publicSensors.size() > 0) {
for (auto sen: publicSensors) {
queries.insert(std::pair<DCDB::PublicSensor, queryConfig_t>(sen, queryCfg));
}
} else {
DCDB::PublicSensor pS;
pS.name = s;
pS.pattern = s;
queries.insert(std::pair<DCDB::PublicSensor, queryConfig_t>(pS, queryCfg));
}
}
}
}
}
void DCDBQuery::execute() {
std::string prevSensorName;
for (auto q: queries) {
if (q.first.name != prevSensorName) {
std::pair<queryMap_t::iterator, queryMap_t::iterator> range = queries.equal_range(q.first);
/* Base scaling factor and unit of the public sensor */
baseUnit = DCDB::UnitConv::fromString(q.first.unit);
baseScalingFactor = q.first.scaling_factor;
std::list<DCDB::SensorDataStoreReading> results;
DCDB::Sensor sensor(connection, q.first);
sensor.query(results, start, end, DCDB::AGGREGATE_NONE);
genOutput(results, range.first, range.second);
if(notifyOverflow)
std::cout << "Overflow detected." << std::endl;
/* Base scaling factor and unit of the public sensor */
baseUnit = DCDB::UnitConv::fromString(q.first.unit);
baseScalingFactor = q.first.scaling_factor;
std::list<DCDB::SensorDataStoreReading> results;
DCDB::Sensor sensor(connection, q.first);
sensor.query(results, start_ts, end_ts, DCDB::AGGREGATE_NONE);
genOutput(results, range.first, range.second);
prevSensorName = q.first.name;
}
}
}
void DCDBQuery::doQuery(std::list<std::string> sensors, DCDB::TimeStamp start, DCDB::TimeStamp end) {
setInterval(start, end);
prepareQuery(sensors);
execute();
}
void DCDBQuery::dojobQuery(std::list<std::string> sensors, std::string jobId) {
DCDB::JobDataStore jobDataStore(connection);
DCDB::JobData jobData;
DCDB::JDError err = jobDataStore.getJobById(jobData, jobId);
/*
* Clean up
*/
connection->disconnect();
delete connection;
if (err == DCDB::JD_OK) {
setInterval(jobData.startTime, jobData.endTime);
prepareQuery(sensors, jobData.nodes);
execute();
} else {
std::cerr << "Job not found: " << jobId << std::endl;
}
}
DCDBQuery::DCDBQuery()
......
......@@ -66,7 +66,15 @@ typedef enum {
CONVERT_ERR,
} CONVERT_RESULT;
protected:
private:
void genOutput(std::list<DCDB::SensorDataStoreReading> &results, queryMap_t::iterator start, queryMap_t::iterator stop);
void setInterval(DCDB::TimeStamp start, DCDB::TimeStamp end);
void parseSensorSpecification(const std::string sensor, std::string& sensorName, queryConfig_t& queryCfg);
void prepareQuery(std::list<std::string> sensors);
void prepareQuery(std::list<std::string> sensors, std::list<std::string> prefixes);
void execute();
DCDB::Connection* connection;
bool useLocalTime;
bool useRawOutput;
......@@ -74,8 +82,8 @@ protected:
queryMap_t queries;
double baseScalingFactor;
DCDB::Unit baseUnit;
bool notifyOverflow;
DCDB::TimeStamp start_ts;
DCDB::TimeStamp end_ts;
public:
void setLocalTimeEnabled(bool enable);
......@@ -83,12 +91,11 @@ public:
void setRawOutputEnabled(bool enable);
bool getRawOutputEnabled();
void genOutput(std::list<DCDB::SensorDataStoreReading> &results, queryMap_t::iterator start, queryMap_t::iterator stop);
void check(std::list<std::string>::iterator it , double* scalingFactor);
void checkModifier(std::list<std::string>::iterator it, struct outputFormat *format);
int connect(const char* hostname);
void disconnect();
void doQuery(std::list<std::string> sensors, DCDB::TimeStamp start, DCDB::TimeStamp end);
void dojobQuery(std::list<std::string> sensors, std::string jobId);
void doQuery(const char* hostname, std::list<std::string> sensors, DCDB::TimeStamp start, DCDB::TimeStamp end);
DCDBQuery();
virtual ~DCDBQuery() {};
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment