Commit e01274d8 authored by Michael Ott's avatar Michael Ott
Browse files

Remove querySum functions in libdcdb

parent 31db7e35
......@@ -54,7 +54,6 @@ typedef enum {
DCDB_C_CONNERR, /**< The connection to the database could not be made. */
DCDB_C_SENSORNOTFOUND, /**< The requested sensor cannot be found in the database's list of public sensors. */
DCDB_C_EMPTYSET, /**< The query into the database resulted in an empty set. */
DCDB_C_NOTINTEGRABLE, /**< One of the QuerySum() functions was called on a sensor that is not marked as integrable. */
DCDB_C_NOSENSOR, /**< The caller did not specify a sensor to be queried. */
DCDB_C_BADPARAMS, /**< The provided function parameters are malformed or incomplete */
DCDB_C_UNKNOWN /**< An unknown error occurred. */
......@@ -68,102 +67,6 @@ typedef enum {
typedef uint32_t DCDB_C_OPTIONS;
#define DCDB_C_LOCALTIME 0x1 /**< Treat time stamps passed to the query as being in local time instead of UTC */
/**
* @brief This function integrates a given senor's data series over time.
*
* @param result Pointer to a int64_t variable which will hold the result of the operation.
* @param hostname Hostname of a database node.
* @param port TCP port to use for connecting to the database node (for Cassandra, this is usually 9042).
* @param sensorPublicName Public name of the sensor whose values should be integrated.
* @param start time_t denoting the start of the time series.
* @param end time_t denoting the end of the timer series.
* @param options Bitmask of DCDB_C_OPTIONS.
*
* @details
* When calling this function, libdcdb integrates the values of the given
* sensor over time (in seconds). For example, if you have a sensor which
* stores power data in Watts, the resulting value will be energy in Joules
* (Watts * seconds).
*
* If you intend to sum up the results of multiple sensors, consider the
* use of the dcdbQuerySumMultiple() or dcdbQuerySumMultipleThreaded()
* functions.
*/
DCDB_C_RESULT dcdbQuerySum(
int64_t* result,
const char* hostname,
uint16_t port,
const char* sensorPublicName,
time_t start,
time_t end,
DCDB_C_OPTIONS options
);
/**
* @brief This function integrates multiple data series over time.
*
* @param result Pointer to a int64_t variable which will hold the result of the operation.
* @param hostname Hostname of a database node.
* @param port TCP port to use for connecting to the database node (for Cassandra, this is usually 9042).
* @param sensorPublicName Array of public names of the sensors whose values should be integrated.
* @param sensorPublicNameLength Number of sensors within sensorPublicName.
* @param start time_t denoting the start of the time series.
* @param end time_t denoting the end of the timer series.
* @param options Bitmask of DCDB_C_OPTIONS.
*
* @details
* When calling this function, libdcdb integrates the values of the given
* sensors over time (in seconds). For example, if you have a sensor which
* stores power data in Watts, the resulting value will be energy in Joules
* (Watts * seconds).
*/
DCDB_C_RESULT dcdbQuerySumMultiple(
int64_t* result,
const char* hostname,
uint16_t port,
const char* sensorPublicName[],
unsigned int sensorPublicNameLength,
time_t start,
time_t end,
DCDB_C_OPTIONS options
);
/**
* @brief This function is a threaded (this faster) implementation of dcdbQuerySumMultiple()
*
* @param result Pointer to a int64_t variable which will hold the result of the operation.
* @param hostname Hostname of a database node.
* @param port TCP port to use for connecting to the database node (for Cassandra, this is usually 9042).
* @param sensorPublicName Array of public names of the sensors whose values should be integrated.
* @param sensorPublicNameLength Number of sensors within sensorPublicName.
* @param start time_t denoting the start of the time series.
* @param end time_t denoting the end of the timer series.
* @param options Bitmask of DCDB_C_OPTIONS.
* @param numThreads Number of threads to spawn for the calculation.
*
* @details
* When calling this function, libdcdb integrates the values of the given
* sensors over time (in seconds). For example, if you have a sensor which
* stores power data in Watts, the resulting value will be energy in Joules
* (Watts * seconds).
*
* Please be aware that it makes no sense to spawn more threads than the
* sensorPublicName array holds since the implementation will spawn at most
* one thread per sensor.
*/
DCDB_C_RESULT dcdbQuerySumMultipleThreaded(
int64_t* result,
const char* hostname,
uint16_t port,
const char* sensorPublicName[],
unsigned int sensorPublicNameLength,
time_t start,
time_t end,
DCDB_C_OPTIONS options,
unsigned int numThreads
);
/*****************************************************************************/
/* Following are C-API functions to insert job information. */
/* Intended to be called from python-script. */
......
......@@ -206,17 +206,6 @@ public:
*/
void queryCB(QueryCbFunc cbFunc, void* userData, SensorId& sid, TimeStamp& start, TimeStamp& end, QueryAggregate aggregate=AGGREGATE_NONE);
/**
* @brief This function queries the integrated value (val * sec)
* of a sensor for the given time range.
* @param result The resulting integrated value.
* @param sid The SensorId to query.
* @param start Start of the time series.
* @param end End of the time series.
* @return SDS_OK if ok, SDS_EMPTYSET if not at least 2 readings in interval.
*/
SDSQueryResult querySum(int64_t& result, SensorId& sid, TimeStamp& start, TimeStamp& end);
/**
* @brief This function truncates all sensor data that is older than
* the specified week.
......
......@@ -177,17 +177,6 @@ public:
*/
void queryCB(SensorDataStore::QueryCbFunc cbFunc, void* userData, SensorId& sid, TimeStamp& start, TimeStamp& end, QueryAggregate aggregate);
/**
* @brief This function queries the integrated value
* of a sensor for the given time range.
* @param result The resulting integrated value.
* @param sid The SensorId to query.
* @param start Start of the time series.
* @param end End of the time series.
* @return SDS_OK if ok, SDS_EMPTYSET if not at least 2 readings in interval.
*/
SDSQueryResult querySum(int64_t& result, SensorId& sid, TimeStamp& start, TimeStamp& end);
/**
* @brief This function truncates all sensor data that is older than
* the specified week.
......
......@@ -42,400 +42,6 @@
using namespace DCDB;
/* FIXME:
* All these functions suffer from a minor logical error in handling
* week-stamp transitions. When calling query sum in a time range
* that contains a week stamp transition, this will result in multiple
* calls to SensorDataStoreImpl::querySum() with their results being
* summed up. When transitioning from week stamp w to w+1 in the
* evaluation, this results in the algorithm ignoring the interval
* between the last reading within w and the first reading within w+1.
*
* To fix this, one would have to replicate the implementation of
* integrals from SensorDataStoreImpl::querySum for the last reading
* in w and the first reading in w+1.
*
* An alternative fix would be to shift the whole querySum logic
* including the expansion of the public sensor name into patterns
* and subsequent querying of multiple SensorIDs for forming a
* continuous (but week stamp separated) time series into a function
* that is part of the libdcdb C++ API (i.e. rework the current
* SensorDataStoreImpl::querySum() function.
*/
DCDB_C_RESULT dcdbQuerySum(
int64_t* result,
const char* hostname,
uint16_t port,
const char* sensorPublicName,
time_t start,
time_t end,
DCDB_C_OPTIONS options)
{
TimeStamp startTs(start), endTs(end);
*result = 0;
/* Convert start and end to UTC if they're specified in local time. */
if(options & DCDB_C_LOCALTIME) {
startTs.convertFromLocal();
endTs.convertFromLocal();
}
/* Create a new connection to the database */
Connection* connection;
connection = new Connection();
connection->setHostname(hostname);
connection->setPort(port);
if (!connection->connect()) {
delete connection;
return DCDB_C_CONNERR;
}
/* Initialize the SensorConfig interface */
SensorConfig sensorConfig(connection);
SensorDataStore sensorDataStore(connection);
/* Check if the sensor is marked integrable */
PublicSensor sensorProperties;
switch (sensorConfig.getPublicSensorByName(sensorProperties, sensorPublicName)) {
case SC_OK:
if ((sensorProperties.sensor_mask & INTEGRABLE) == INTEGRABLE) {
delete connection;
return DCDB_C_NOTINTEGRABLE;
}
break;
case SC_INVALIDSESSION:
delete connection;
return DCDB_C_CONNERR;
case SC_UNKNOWNSENSOR:
delete connection;
return DCDB_C_SENSORNOTFOUND;
default:
delete connection;
return DCDB_C_UNKNOWN;
}
/* Look up the pattern for the sensor's public name */
std::string pattern;
switch (sensorConfig.getSensorPattern(pattern, sensorPublicName)) {
case SC_OK:
break;
case SC_INVALIDSESSION:
delete connection;
return DCDB_C_CONNERR;
case SC_UNKNOWNSENSOR:
delete connection;
return DCDB_C_SENSORNOTFOUND;
default:
delete connection;
return DCDB_C_UNKNOWN;
}
/* Expand the pattern into a list of existing sensors in the time range */
std::list<SensorId> sensorIds;
switch (sensorConfig.getSensorListForPattern(sensorIds, pattern, startTs, endTs)) {
case SC_OK:
break;
case SC_INVALIDPATTERN:
delete connection;
return DCDB_C_SENSORNOTFOUND;
default:
delete connection;
return DCDB_C_UNKNOWN;
}
/* Iterate over the expanded list of sensorIds and sum up the results */
for (std::list<SensorId>::iterator sit = sensorIds.begin(); sit != sensorIds.end(); sit++) {
int64_t tmp_result = 0;
TimeStamp startTs(start), endTs(end);
if (sensorDataStore.querySum(tmp_result, *sit, startTs, endTs) == SDS_EMPTYSET) {
/* In case there is no reading for this sensor, invalidate everything */
*result = 0;
return DCDB_C_EMPTYSET;
}
*result += tmp_result;
}
/* Clean up */
connection->disconnect();
delete connection;
return DCDB_C_OK;
}
DCDB_C_RESULT dcdbQuerySumMultiple(
int64_t* result,
const char* hostname,
uint16_t port,
const char* sensorPublicName[],
unsigned int sensorPublicNameLength,
time_t start,
time_t end,
DCDB_C_OPTIONS options
)
{
unsigned int i;
TimeStamp startTs(start), endTs(end);
*result = 0;
/* Convert start and end to UTC if they're specified in local time. */
if(options & DCDB_C_LOCALTIME) {
startTs.convertFromLocal();
endTs.convertFromLocal();
}
/* Create a new connection to the database */
Connection* connection;
connection = new Connection();
connection->setHostname(hostname);
connection->setPort(port);
if (!connection->connect()) {
delete connection;
return DCDB_C_CONNERR;
}
/* Initialize the SensorConfig interface */
SensorConfig sensorConfig(connection);
SensorDataStore sensorDataStore(connection);
/* Check if all the sensors are marked integrable */
for (i=0; i<sensorPublicNameLength; i++) {
PublicSensor sensorProperties;
switch (sensorConfig.getPublicSensorByName(sensorProperties, sensorPublicName[i])) {
case SC_OK:
if ((sensorProperties.sensor_mask & INTEGRABLE) == INTEGRABLE) {
delete connection;
return DCDB_C_NOTINTEGRABLE;
}
break;
case SC_INVALIDSESSION:
delete connection;
return DCDB_C_CONNERR;
case SC_UNKNOWNSENSOR:
delete connection;
return DCDB_C_SENSORNOTFOUND;
default:
delete connection;
return DCDB_C_UNKNOWN;
}
}
/* Iterate and sum up over all sensor names */
for (i=0; i<sensorPublicNameLength; i++) {
/* Look up the pattern for the sensor's public name */
std::string pattern;
switch (sensorConfig.getSensorPattern(pattern, sensorPublicName[i])) {
case SC_OK:
break;
case SC_INVALIDSESSION:
delete connection;
return DCDB_C_CONNERR;
case SC_UNKNOWNSENSOR:
delete connection;
return DCDB_C_SENSORNOTFOUND;
default:
delete connection;
return DCDB_C_UNKNOWN;
}
/* Expand the pattern into a list of existing sensors in the time range */
std::list<SensorId> sensorIds;
switch (sensorConfig.getSensorListForPattern(sensorIds, pattern, startTs, endTs)) {
case SC_OK:
break;
case SC_INVALIDPATTERN:
delete connection;
return DCDB_C_SENSORNOTFOUND;
default:
delete connection;
return DCDB_C_UNKNOWN;
}
/* Iterate over the expanded list of sensorIds and sum up the results */
for (std::list<SensorId>::iterator sit = sensorIds.begin(); sit != sensorIds.end(); sit++) {
int64_t tmp_result = 0;
TimeStamp startTs(start), endTs(end);
if (sensorDataStore.querySum(tmp_result, *sit, startTs, endTs) == SDS_EMPTYSET) {
/* In case there is no reading for this sensor, invalidate everything */
*result = 0;
return DCDB_C_EMPTYSET;
}
*result += tmp_result;
}
}
/* Clean up */
connection->disconnect();
delete connection;
return DCDB_C_OK;
}
typedef struct dcdbQuerySumMultipleThreadData {
int sensorIndexLow;
int sensorIndexHigh;
SensorConfig* sensorConfig;
SensorDataStore* sensorDataStore;
const char** sensorPublicName;
TimeStamp startTs;
TimeStamp endTs;
int64_t result;
DCDB_C_RESULT error;
} DCDBQuerySumMultipleThreadData;
static void* dcdbQuerySumMultipleThread(void* data)
{
DCDBQuerySumMultipleThreadData* td = (DCDBQuerySumMultipleThreadData*)data;
/* Iterate and sum up over all sensor names assigned to this thread */
for (int i=td->sensorIndexLow; i<td->sensorIndexHigh; i++) {
/* Look up the pattern for the sensor's public name */
std::string pattern;
switch (td->sensorConfig->getSensorPattern(pattern, td->sensorPublicName[i])) {
case SC_OK:
break;
case SC_INVALIDSESSION:
td->error = DCDB_C_CONNERR;
return NULL;
case SC_UNKNOWNSENSOR:
td->error = DCDB_C_SENSORNOTFOUND;
return NULL;
default:
td->error = DCDB_C_UNKNOWN;
return NULL;
}
/* Expand the pattern into a list of existing sensors in the time range */
std::list<SensorId> sensorIds;
switch (td->sensorConfig->getSensorListForPattern(sensorIds, pattern, td->startTs, td->endTs)) {
case SC_OK:
break;
case SC_INVALIDPATTERN:
td->error = DCDB_C_SENSORNOTFOUND;
return NULL;
default:
td->error = DCDB_C_UNKNOWN;
return NULL;
}
/* Iterate over the expanded list of sensorIds and sum up the results */
for (std::list<SensorId>::iterator sit = sensorIds.begin(); sit != sensorIds.end(); sit++) {
int64_t tmp_result = 0;
if (td->sensorDataStore->querySum(tmp_result, *sit, td->startTs, td->endTs) == SDS_EMPTYSET) {
/* In case there is no reading for this sensor, invalidate everything */
td->result = 0;
td->error = DCDB_C_EMPTYSET;
return NULL;
}
td->result += tmp_result;
}
}
td->error = DCDB_C_OK;
return NULL;
}
DCDB_C_RESULT dcdbQuerySumMultipleThreaded(
int64_t* result,
const char* hostname,
uint16_t port,
const char* sensorPublicName[],
unsigned int sensorPublicNameLength,
time_t start,
time_t end,
DCDB_C_OPTIONS options,
unsigned int numThreads
)
{
unsigned int i;
if (sensorPublicNameLength == 0) {
return DCDB_C_NOSENSOR;
}
TimeStamp startTs(start), endTs(end);
*result = 0;
/* Convert start and end to UTC if they're specified in local time. */
if(options & DCDB_C_LOCALTIME) {
startTs.convertFromLocal();
endTs.convertFromLocal();
}
/* Create a new connection to the database */
Connection* connection;
connection = new Connection();
connection->setHostname(hostname);
connection->setPort(port);
if (!connection->connect()) {
delete connection;
return DCDB_C_CONNERR;
}
/* Initialize the SensorConfig interface */
SensorConfig sensorConfig(connection);
SensorDataStore sensorDataStore(connection);
/* Check if all the sensors are marked integrable */
for (i=0; i<sensorPublicNameLength; i++) {
PublicSensor sensorProperties;
switch (sensorConfig.getPublicSensorByName(sensorProperties, sensorPublicName[i])) {
case SC_OK:
if ((sensorProperties.sensor_mask & INTEGRABLE) == INTEGRABLE) {
delete connection;
return DCDB_C_NOTINTEGRABLE;
}
break;
case SC_INVALIDSESSION:
delete connection;
return DCDB_C_CONNERR;
case SC_UNKNOWNSENSOR:
delete connection;
return DCDB_C_SENSORNOTFOUND;
default:
delete connection;
return DCDB_C_UNKNOWN;
}
}
DCDBQuerySumMultipleThreadData* td = (DCDBQuerySumMultipleThreadData*)malloc(numThreads * sizeof(DCDBQuerySumMultipleThreadData));
pthread_t* threads = (pthread_t*)malloc(numThreads * sizeof(pthread_t));
int sensorsPerThread = sensorPublicNameLength / numThreads;
DCDB_C_RESULT error = DCDB_C_OK;
for (i=0; i<numThreads; i++) {
td[i].sensorIndexLow = i * sensorsPerThread;
td[i].sensorIndexHigh = (i<numThreads-1)?((i+1) * sensorsPerThread):sensorPublicNameLength;
td[i].sensorConfig = &sensorConfig;
td[i].sensorDataStore = &sensorDataStore;
td[i].sensorPublicName = sensorPublicName;
td[i].startTs = startTs;
td[i].endTs = endTs;
td[i].result = 0;
pthread_create(&threads[i], NULL, dcdbQuerySumMultipleThread, &td[i]);
}
for (i=0; i<numThreads; i++) {
pthread_join(threads[i], NULL);
if (error == DCDB_C_OK && td[i].error == DCDB_C_OK) {
*result += td[i].result;
}
else {
error = td[i].error;
}
}
free(threads);
free(td);
/* Clean up */
connection->disconnect();
delete connection;
return error;
}
Connection* connectToDatabase(const char* hostname, uint16_t port) {
Connection* conn = new Connection(hostname, port);
......
......@@ -677,54 +677,6 @@ void SensorDataStoreImpl::queryCB(SensorDataStore::QueryCbFunc cbFunc, void* use
cass_prepared_free(prepared);
}
/**
* @details
* This function generates an integrated value of the time series
* by first querying for the result set list using query() and then
* summing up the result.
*/
SDSQueryResult SensorDataStoreImpl::querySum(int64_t& result, SensorId& sid, TimeStamp& start, TimeStamp& end)
{
std::list<SensorDataStoreReading> queryResult;
/* Issue a standard query */
query(queryResult, sid, start, end, AGGREGATE_NONE);
/* Check if at least 2 readings in result */
if (queryResult.size() < 2)
return SDS_EMPTYSET;
/* Integrate the result */
result = 0;
SensorDataStoreReading prev;
for (std::list<SensorDataStoreReading>::iterator it = queryResult.begin(); it != queryResult.end(); it++) {
if (!(it == queryResult.begin())) {
SensorDataStoreReading cur = *it;
/* Calculate average between two readings */
int64_t avg = (cur.value + prev.value) / 2;
/* Calculate time difference */
uint64_t dt = cur.timeStamp.getRaw() - prev.timeStamp.getRaw();
/* Sum up (with lousy attempt to keep it numerically stable - should probably use double instead) */
if (dt > 10000000000) {
/* dt > 10s => convert dt to s first */
dt /= 1000000000;
result += avg * dt;
}
else {
/* dt < 10s => multiply first */
avg *= dt;
result += avg / 1000000000;
}
}
prev = *it;
}
return SDS_OK;
}
/**
* @details
* This function deletes all data from the sensordata store
......@@ -950,17 +902,6 @@ void SensorDataStore::queryCB(SensorDataStore::QueryCbFunc cbFunc, void* userDat
return impl->queryCB(cbFunc, userData, sid, start, end, aggregate);
}
/**
* @details
* Instead of doing the actual work, this function simply
* forwards to the insert function of the SensorDataStoreImpl
* class.
*/
SDSQueryResult SensorDataStore::querySum(int64_t& result, SensorId& sid, TimeStamp& start, TimeStamp& end)
{
return impl->querySum(result, sid, start, end);
}
/**
* @details
* Instead of doing the actual work, this function simply
......