Commit 52e562be authored by Axel Auweter's avatar Axel Auweter
Browse files

First implementation of dcdbquery (related to #13).

parent 0beec2ce
......@@ -73,6 +73,12 @@ public:
*/
uint64_t getRaw(void);
/**
* @brief Returns the "weekstamp" corresponding to the object's value
* @return The week number of the timestamp.
*/
uint16_t getWeekstamp(void);
/* Overloaded operators (compare raw values) */
inline bool operator == (const DCDBTimeStamp& rhs) const {return raw == rhs.raw;}
inline bool operator != (const DCDBTimeStamp& rhs) const {return raw != rhs.raw;}
......
......@@ -168,7 +168,7 @@ void SensorDataStoreImpl::init(std::string hostname, int port) {
*/
void SensorDataStoreImpl::insert(SensorId* sid, uint64_t ts, uint64_t value)
{
#if 1
#if 0
std::cout << "Inserting@SensorDataStoreImpl (" << sid->raw[0] << " " << sid->raw[1] << ", " << ts << ", " << value << ")" << std::endl;
#endif
......
......@@ -98,4 +98,12 @@ uint64_t DCDBTimeStamp::getRaw(void)
return raw;
}
/**
*
*/
uint16_t DCDBTimeStamp::getWeekstamp(void)
{
uint16_t week = raw / 604800000000;
return week;
}
......@@ -13,6 +13,9 @@ void CassHelper::print_error(CassFuture* future) {
}
CassCluster* CassHelper::create_cluster(const char* hostname) {
/* Set loglevel to error only (otherwise we'll get a warning on the TOKEN() queries) */
cass_log_set_level(CASS_LOG_ERROR);
CassCluster* cluster = cass_cluster_new();
cass_cluster_set_contact_points(cluster, hostname);
......
......@@ -6,12 +6,18 @@
*/
#include <iostream>
#include <iomanip>
#include <sstream>
#include <list>
#include <string>
#include <algorithm>
#include <cstdlib>
#include <cinttypes>
#include <boost/algorithm/string.hpp>
#include "dcdbendian.h"
#include "query.h"
#include "casshelper.h"
......@@ -35,11 +41,21 @@ void DCDBQuery::doQuery(const char* hostname, std::list<std::string> sensors, DC
std::string aliasPattern;
lookupAlias(session, *it, aliasPattern);
std::cout << "Found alias: " << aliasPattern << std::endl;
// std::cout << "Found alias: " << aliasPattern << std::endl;
std::list<SensorId> sidList;
expandAlias(session, aliasPattern, sidList);
expandAlias(session, aliasPattern, start, end, sidList);
#if 0
for (std::list<SensorId>::iterator sit = sidList.begin(); sit != sidList.end(); sit++) {
std::cout << std::hex << std::setfill('0') << std::setw(16) << (*sit).raw[0]
<< " " << std::hex << std::setfill('0') << std::setw(16) << (*sit).raw[1] << std::endl;
}
#endif
/* Print the CSV header */
std::cout << *it << ",Time,Value" << std::endl;
querySensorsCSV(session, *it, sidList, start, end);
}
/* Clean up */
......@@ -96,7 +112,7 @@ void DCDBQuery::lookupAlias(CassSession* session, std::string name, std::string&
}
}
void DCDBQuery::expandAlias(CassSession* session, std::string aliasName, std::list<SensorId>& sensorIds)
void DCDBQuery::expandAlias(CassSession* session, std::string aliasName, DCDBTimeStamp start, DCDBTimeStamp end, std::list<SensorId>& sensorIds)
{
/* Clear the list of sensorIds */
sensorIds.clear();
......@@ -112,15 +128,31 @@ void DCDBQuery::expandAlias(CassSession* session, std::string aliasName, std::li
high.replace(aliasName.find("*"), 1, 33-aliasName.length(), 'F');
}
SensorId lowId, highId;
if (!topicToSid(lowId, low)) {
std::cout << "Internal error" << std::endl;
exit(EXIT_FAILURE);
}
if (!topicToSid(highId, high)) {
std::cout << "Internal error" << std::endl;
exit(EXIT_FAILURE);
}
lowId.dsid.rsvd = start.getWeekstamp();
highId.dsid.rsvd = end.getWeekstamp();
low = sidConvert(lowId);
high = sidConvert(highId);
// std::cout << "Lower boundary for sensor scan: " << std::hex << std::setfill('0') << std::setw(16) << lowId.raw[0] << " " << std::hex << std::setfill('0') << std::setw(16) << lowId.raw[1] << std::endl;
// std::cout << "Upper boundary for sensor scan: " << std::hex << std::setfill('0') << std::setw(16) << highId.raw[0] << " " << std::hex << std::setfill('0') << std::setw(16) << highId.raw[1] << std::endl;
/* Query the database to see which raw sensors actually exist in the interval between low and high */
/*
CassError rc = CASS_OK;
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
const CassPrepared* prepared = nullptr;
CassString query = cass_string_init("SELECT DISTINCT sid FROM dcdb.sensordata WHERE key(sid) >= ? and key(sid) <= ?;");
CassString pattern_cstr;
CassString query = cass_string_init("SELECT DISTINCT sid FROM dcdb.sensordata WHERE TOKEN(sid) >= ? and TOKEN(sid) <= ?;");
future = cass_session_prepare(session, query);
cass_future_wait(future);
......@@ -133,7 +165,12 @@ void DCDBQuery::expandAlias(CassSession* session, std::string aliasName, std::li
}
statement = cass_prepared_bind(prepared);
cass_statement_bind_string_by_name(statement, "name", cass_string_init(name.c_str()));
CassBytes lowCb = cass_bytes_init((cass_byte_t*)(low.c_str()), 16);
CassBytes highCb = cass_bytes_init((cass_byte_t*)(high.c_str()), 16);
cass_statement_bind_bytes(statement, 0, lowCb);
cass_statement_bind_bytes(statement, 1, highCb);
future = cass_session_execute(session, statement);
cass_future_wait(future);
......@@ -145,18 +182,141 @@ void DCDBQuery::expandAlias(CassSession* session, std::string aliasName, std::li
const CassResult* result = cass_future_get_result(future);
CassIterator* iterator = cass_iterator_from_result(result);
if (cass_iterator_next(iterator)) {
while (cass_iterator_next(iterator)) {
const CassRow* row = cass_iterator_get_row(iterator);
cass_value_get_string(cass_row_get_column_by_name(row, "pattern"), &pattern_cstr);
pattern = std::string(pattern_cstr.data, pattern_cstr.length);
}
else {
std::cout << "Unknown sensor: " << name << std::endl;
exit(EXIT_FAILURE);
CassBytes res;
cass_value_get_bytes(cass_row_get_column_by_name(row, "sid"), &res);
SensorId sensor;
sensor.raw[0] = Endian::BEToHost(((uint64_t*)res.data)[0]);
sensor.raw[1] = Endian::BEToHost(((uint64_t*)res.data)[1]);
/* Check if the sensorId matches the pattern */
if (sidPatternMatch(sensor, aliasName)) {
sensorIds.push_back(sensor);
}
}
cass_result_free(result);
cass_iterator_free(iterator);
}
*/
cass_future_free(future);
cass_statement_free(statement);
cass_prepared_free(prepared);
}
/* Ugly copy of DCDBLib's topicToSid - It's really time to clean up the API */
bool DCDBQuery::topicToSid(SensorId& sid, std::string topic)
{
uint64_t pos = 0;
const char* buf = topic.c_str();
sid.raw[0] = 0;
sid.raw[1] = 0;
while (*buf && pos < 128) {
if (*buf >= '0' && *buf <= '9') {
sid.raw[pos / 64] |= (((uint64_t)(*buf - '0')) << (60-(pos%64)));
pos += 4;
}
else if (*buf >= 'A' && *buf <= 'F') {
sid.raw[pos / 64] |= (((uint64_t)(*buf - 'A' + 0xa)) << (60-(pos%64)));
pos += 4;
}
else if (*buf >= 'a' && *buf <= 'f') {
sid.raw[pos / 64] |= (((uint64_t)(*buf - 'a' + 0xa)) << (60-(pos%64)));
pos += 4;
}
buf++;
}
return pos == 128;
}
/* Ugly copy of DCDBLib's sidConvert - It's really time to clean up the API */
std::string DCDBQuery::sidConvert(SensorId& sid)
{
uint64_t ll[2];
ll[0] = Endian::hostToBE(sid.raw[0]);
ll[1] = Endian::hostToBE(sid.raw[1]);
return std::string((char*)ll, 16);
}
bool DCDBQuery::sidPatternMatch(SensorId& sid, std::string pattern)
{
/* Strip all slashes from pattern */
pattern.erase(std::remove(pattern.begin(), pattern.end(), '/'), pattern.end());
/* Convert to lower case */
boost::algorithm::to_lower(pattern);
/* Calculate the wildcard length */
int wl = (pattern.find("*") != std::string::npos) ? 33 - pattern.length() : 0;
/* Do a character by character comparison */
int posP = 0;
int posS = 0;
while (posS < 32) {
char p, cs[2];
uint64_t s;
p = pattern.c_str()[posP];
s = sid.raw[posS / 16];
s >>= (60-((4*posS)%64));
s &= 0xf;
snprintf(cs, 2, "%" PRIx64, s);
if (p == '*') {
/* Jump over the wildcard */
posS += wl;
posP++;
continue;
}
else if ((posS >= 24 && posS <= 27) || (p==cs[0])) {
posS++;
posP++;
continue;
}
else {
return false;
}
}
return true;
}
void DCDBQuery::querySensorsCSV(CassSession* session, std::string sensorName, std::list<SensorId>& sidList, DCDBTimeStamp& start, DCDBTimeStamp& end)
{
/* Since everything has been mangled until here, we assume parameter safety and do a non-prepared statement... Oh oh... */
std::stringstream query;
query << "SELECT * FROM dcdb.sensordata WHERE sid IN (";
for (std::list<SensorId>::iterator it = sidList.begin(); it != sidList.end(); ++it) {
if (it != sidList.begin()) {
query << ", ";
}
query << "0x";
query << std::hex << std::setfill('0') << std::setw(16) << (*it).raw[0]
<< std::hex << std::setfill('0') << std::setw(16) << (*it).raw[1];
}
query << ") AND ts >= ";
query << std::dec << start.getRaw();
query << " AND ts <= ";
query << std::dec << end.getRaw();
query << ";";
CassString csQuery = cass_string_init(query.str().c_str());
CassStatement* statement = cass_statement_new(csQuery, 0);
CassFuture* future = cass_session_execute(session, statement);
if (cass_future_error_code(future) == CASS_OK) {
const CassResult* result = cass_future_get_result(future);
CassIterator* rows = cass_iterator_from_result(result);
while (cass_iterator_next(rows)) {
const CassRow* row = cass_iterator_get_row(rows);
cass_int64_t ts, value;
cass_value_get_int64(cass_row_get_column_by_name(row, "ts"), &ts);
cass_value_get_int64(cass_row_get_column_by_name(row, "value"), &value);
std::cout << sensorName << "," << std::dec << ts << "," << std::dec << value << std::endl;
}
}
cass_statement_free(statement);
cass_future_free(future);
}
......@@ -24,7 +24,11 @@ public:
protected:
static void lookupAlias(CassSession* session, std::string name, std::string& pattern);
static void expandAlias(CassSession* session, std::string aliasName, std::list<SensorId>& sensorIds);
static void expandAlias(CassSession* session, std::string aliasName, DCDBTimeStamp start, DCDBTimeStamp end, std::list<SensorId>& sensorIds);
static bool topicToSid(SensorId& sid, std::string topic);
static std::string sidConvert(SensorId& sid);
static bool sidPatternMatch(SensorId& sid, std::string pattern);
static void querySensorsCSV(CassSession* session, std::string sensorName, std::list<SensorId>& sidList, DCDBTimeStamp& start, DCDBTimeStamp& end);
};
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment