Commit c1bd5deb authored by Axel Auweter's avatar Axel Auweter
Browse files

Add a simple benchmark application and optimized versions of C-API's querySum().

parent a0441418
......@@ -44,6 +44,28 @@ DCDB_C_RESULT dcdbQuerySum(
DCDB_C_OPTIONS options
);
DCDB_C_RESULT dcdbQuerySumMultiple(
int64_t* result,
const char* hostname,
uint16_t port,
const char* sensorPublicName[],
unsigned int sensorPublicNameLength,
time_t start,
time_t end,
DCDB_C_OPTIONS options
);
DCDB_C_RESULT dcdbQuerySumMultipleThreaded(
int64_t* result,
const char* hostname,
uint16_t port,
const char* sensorPublicName[],
unsigned int sensorPublicNameLength,
time_t start,
time_t end,
DCDB_C_OPTIONS options,
int numThreads
);
#ifdef __cplusplus
}
......
......@@ -9,6 +9,8 @@
#include <iostream>
#include <list>
#include <pthread.h>
#include "c_api.h"
#include "sensorid.h"
......@@ -114,3 +116,272 @@ DCDB_C_RESULT dcdbQuerySum(
delete connection;
return DCDB_C_OK;
}
DCDB_C_RESULT dcdbQuerySumMultiple(
int64_t* result,
const char* hostname,
uint16_t port,
const char* sensorPublicName[],
unsigned int sensorPublicNameLength,
time_t start,
time_t end,
DCDB_C_OPTIONS options
)
{
unsigned int i;
DCDBTimeStamp startTs(start), endTs(end);
*result = 0;
/* Convert start and end to UTC if they're specified in local time. */
if(options & DCDB_C_LOCALTIME) {
startTs.convertFromLocal();
endTs.convertFromLocal();
}
/* Create a new connection to the database */
DCDBConnection* connection;
connection = new DCDBConnection();
connection->setHostname(hostname);
connection->setPort(port);
if (!connection->connect()) {
delete connection;
return DCDB_C_CONNERR;
}
/* Initialize the SensorConfig interface */
SensorConfig sensorConfig(connection);
SensorDataStore sensorDataStore(connection);
/* Check if all the sensors are marked integrable */
for (i=0; i<sensorPublicNameLength; i++) {
DCDBPublicSensor sensorProperties;
switch (sensorConfig.getPublicSensorByName(sensorProperties, sensorPublicName[i])) {
case SC_OK:
if (!sensorProperties.integrable) {
delete connection;
return DCDB_C_NOTINTEGRABLE;
}
break;
case SC_INVALIDSESSION:
delete connection;
return DCDB_C_CONNERR;
case SC_UNKNOWNSENSOR:
delete connection;
return DCDB_C_SENSORNOTFOUND;
default:
delete connection;
return DCDB_C_UNKNOWN;
}
}
/* Iterate and sum up over all sensor names */
for (i=0; i<sensorPublicNameLength; i++) {
/* Look up the pattern for the sensor's public name */
std::string pattern;
switch (sensorConfig.getSensorPattern(pattern, sensorPublicName[i])) {
case SC_OK:
break;
case SC_INVALIDSESSION:
delete connection;
return DCDB_C_CONNERR;
case SC_UNKNOWNSENSOR:
delete connection;
return DCDB_C_SENSORNOTFOUND;
default:
delete connection;
return DCDB_C_UNKNOWN;
}
/* Expand the pattern into a list of existing sensors in the time range */
std::list<SensorId> sensorIds;
switch (sensorConfig.getSensorListForPattern(sensorIds, pattern, startTs, endTs)) {
case SC_OK:
break;
case SC_INVALIDPATTERN:
delete connection;
return DCDB_C_SENSORNOTFOUND;
default:
delete connection;
return DCDB_C_UNKNOWN;
}
/* Iterate over the expanded list of sensorIds and sum up the results */
for (std::list<SensorId>::iterator sit = sensorIds.begin(); sit != sensorIds.end(); sit++) {
int64_t tmp_result = 0;
if (sensorDataStore.querySum(tmp_result, *sit, start, end) == SDS_EMPTYSET) {
/* In case there is no reading for this sensor, invalidate everything */
*result = 0;
return DCDB_C_EMPTYSET;
}
*result += tmp_result;
}
}
/* Clean up */
connection->disconnect();
delete connection;
return DCDB_C_OK;
}
typedef struct dcdbQuerySumMultipleThreadData {
int sensorIndexLow;
int sensorIndexHigh;
SensorConfig* sensorConfig;
SensorDataStore* sensorDataStore;
const char** sensorPublicName;
DCDBTimeStamp startTs;
DCDBTimeStamp endTs;
int64_t result;
DCDB_C_RESULT error;
} DCDBQuerySumMultipleThreadData;
static void* dcdbQuerySumMultipleThread(void* data)
{
DCDBQuerySumMultipleThreadData* td = (DCDBQuerySumMultipleThreadData*)data;
/* Iterate and sum up over all sensor names assigned to this thread */
for (int i=td->sensorIndexLow; i<td->sensorIndexHigh; i++) {
/* Look up the pattern for the sensor's public name */
std::string pattern;
switch (td->sensorConfig->getSensorPattern(pattern, td->sensorPublicName[i])) {
case SC_OK:
break;
case SC_INVALIDSESSION:
td->error = DCDB_C_CONNERR;
return NULL;
case SC_UNKNOWNSENSOR:
td->error = DCDB_C_SENSORNOTFOUND;
return NULL;
default:
td->error = DCDB_C_UNKNOWN;
return NULL;
}
/* Expand the pattern into a list of existing sensors in the time range */
std::list<SensorId> sensorIds;
switch (td->sensorConfig->getSensorListForPattern(sensorIds, pattern, td->startTs, td->endTs)) {
case SC_OK:
break;
case SC_INVALIDPATTERN:
td->error = DCDB_C_SENSORNOTFOUND;
return NULL;
default:
td->error = DCDB_C_UNKNOWN;
return NULL;
}
/* Iterate over the expanded list of sensorIds and sum up the results */
for (std::list<SensorId>::iterator sit = sensorIds.begin(); sit != sensorIds.end(); sit++) {
int64_t tmp_result = 0;
if (td->sensorDataStore->querySum(tmp_result, *sit, td->startTs, td->endTs) == SDS_EMPTYSET) {
/* In case there is no reading for this sensor, invalidate everything */
td->result = 0;
td->error = DCDB_C_EMPTYSET;
return NULL;
}
td->result += tmp_result;
}
}
td->error = DCDB_C_OK;
return NULL;
}
DCDB_C_RESULT dcdbQuerySumMultipleThreaded(
int64_t* result,
const char* hostname,
uint16_t port,
const char* sensorPublicName[],
unsigned int sensorPublicNameLength,
time_t start,
time_t end,
DCDB_C_OPTIONS options,
int numThreads
)
{
unsigned int i;
DCDBTimeStamp startTs(start), endTs(end);
*result = 0;
/* Convert start and end to UTC if they're specified in local time. */
if(options & DCDB_C_LOCALTIME) {
startTs.convertFromLocal();
endTs.convertFromLocal();
}
/* Create a new connection to the database */
DCDBConnection* connection;
connection = new DCDBConnection();
connection->setHostname(hostname);
connection->setPort(port);
if (!connection->connect()) {
delete connection;
return DCDB_C_CONNERR;
}
/* Initialize the SensorConfig interface */
SensorConfig sensorConfig(connection);
SensorDataStore sensorDataStore(connection);
/* Check if all the sensors are marked integrable */
for (i=0; i<sensorPublicNameLength; i++) {
DCDBPublicSensor sensorProperties;
switch (sensorConfig.getPublicSensorByName(sensorProperties, sensorPublicName[i])) {
case SC_OK:
if (!sensorProperties.integrable) {
delete connection;
return DCDB_C_NOTINTEGRABLE;
}
break;
case SC_INVALIDSESSION:
delete connection;
return DCDB_C_CONNERR;
case SC_UNKNOWNSENSOR:
delete connection;
return DCDB_C_SENSORNOTFOUND;
default:
delete connection;
return DCDB_C_UNKNOWN;
}
}
DCDBQuerySumMultipleThreadData* td = (DCDBQuerySumMultipleThreadData*)malloc(numThreads * sizeof(DCDBQuerySumMultipleThreadData));
pthread_t* threads = (pthread_t*)malloc(numThreads * sizeof(pthread_t));
int sensorsPerThread = sensorPublicNameLength / numThreads;
DCDB_C_RESULT error = DCDB_C_OK;
for (i=0; i<numThreads; i++) {
td[i].sensorIndexLow = i * sensorsPerThread;
td[i].sensorIndexHigh = (i<numThreads-1)?((i+1) * sensorsPerThread):sensorPublicNameLength;
td[i].sensorConfig = &sensorConfig;
td[i].sensorDataStore = &sensorDataStore;
td[i].sensorPublicName = sensorPublicName;
td[i].startTs = startTs;
td[i].endTs = endTs;
td[i].result = 0;
pthread_create(&threads[i], NULL, dcdbQuerySumMultipleThread, &td[i]);
}
for (i=0; i<numThreads; i++) {
pthread_join(threads[i], NULL);
if (error == DCDB_C_OK && td[i].error == DCDB_C_OK) {
*result += td[i].result;
}
else {
error = td[i].error;
}
}
free(threads);
free(td);
/* Clean up */
connection->disconnect();
delete connection;
return error;
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment