Commit 2508c48f authored by Axel Auweter's avatar Axel Auweter
Browse files

Cleanups...

parent e14c491c
......@@ -265,13 +265,6 @@ bool DCDBConnectionImpl::connect() {
if (connected)
return false;
/* Create a new cluster object */
if (!cluster)
cluster = cass_cluster_new();
if (!session)
session = cass_session_new();
/* Set hostname and port */
cass_cluster_set_contact_points(cluster, hostname_.c_str());
cass_cluster_set_port(cluster, port_);
......@@ -392,17 +385,28 @@ DCDBConnectionImpl::DCDBConnectionImpl() {
hostname_ = "localhost";
port_ = 9042;
connected = false;
/* Set loglevel to errors since our token() queries will result in unnecessary warnings by the driver */
cass_log_set_level(CASS_LOG_ERROR);
/* Create a new cluster object */
if (!cluster)
cluster = cass_cluster_new();
if (!session)
session = cass_session_new();
}
DCDBConnectionImpl::~DCDBConnectionImpl() {
/* Clean up... */
disconnect();
if (schema)
cass_schema_free(schema);
if (session)
cass_session_free(session);
if (cluster)
cass_cluster_free(cluster);
if (schema)
cass_schema_free(schema);
}
......@@ -17,7 +17,6 @@
#include "dcdbendian.h"
#include "query.h"
#include "casshelper.h"
void DCDBQuery::setLocalTimeEnabled(bool enable) {
useLocalTime = enable;
......@@ -126,340 +125,9 @@ void DCDBQuery::doQuery(const char* hostname, std::list<std::string> sensors, DC
}
#if 0
/* Lovely spaghetti code coming up next. Be aware... */
void DCDBQuery::doQuery(const char* hostname, std::list<std::string> sensors, DCDBTimeStamp start, DCDBTimeStamp end, bool raw, bool localtime)
DCDBQuery::DCDBQuery()
{
/* Connect to db */
CassCluster *cluster = CassHelper::create_cluster(hostname);
CassSession* session = cass_session_new();
if (CassHelper::connect_session(session, cluster) != CASS_OK) {
cass_session_free(session);
cass_cluster_free(cluster);
std::cout << "Cannot connect to Cassandra database." << std::endl;
exit(EXIT_FAILURE);
}
/* Print the CSV header */
std::cout << "Sensor,Time,Value" << std::endl;
/* Iterate over list of sensors requested by the user */
for (std::list<std::string>::iterator it = sensors.begin(); it != sensors.end(); it++) {
/* Lookup the sensor in the published sensors table */
std::string sensorPattern;
lookupPublishedSensorPattern(session, *it,sensorPattern);
std::list<SensorId> sidList;
expandSensorPattern(session, sensorPattern, start, end, sidList);
#if 0
for (std::list<SensorId>::iterator sit = sidList.begin(); sit != sidList.end(); sit++) {
std::cout << std::hex << std::setfill('0') << std::setw(16) << (*sit).raw[0]
<< " " << std::hex << std::setfill('0') << std::setw(16) << (*sit).raw[1] << std::endl;
}
#endif
querySensorsCSV(session, *it, sidList, start, end, raw, localtime);
}
/* Clean up */
cass_session_free(session);
cass_cluster_free(cluster);
}
void DCDBQuery::lookupPublishedSensorPattern(CassSession* session, std::string name, std::string& pattern)
{
CassError rc = CASS_OK;
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
const CassPrepared* prepared = nullptr;
CassString query = cass_string_init("SELECT pattern FROM dcdb_config.publishedsensors WHERE name = ? ;");
CassString pattern_cstr;
future = cass_session_prepare(session, query);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
CassHelper::print_error(future);
} else {
prepared = cass_future_get_prepared(future);
}
statement = cass_prepared_bind(prepared);
cass_statement_bind_string_by_name(statement, "name", cass_string_init(name.c_str()));
cass_future_free(future);
future = cass_session_execute(session, statement);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
CassHelper::print_error(future);
} else {
const CassResult* result = cass_future_get_result(future);
CassIterator* iterator = cass_iterator_from_result(result);
if (cass_iterator_next(iterator)) {
const CassRow* row = cass_iterator_get_row(iterator);
cass_value_get_string(cass_row_get_column_by_name(row, "pattern"), &pattern_cstr);
pattern = std::string(pattern_cstr.data, pattern_cstr.length);
}
else {
std::cout << "Unknown sensor: " << name << std::endl;
exit(EXIT_FAILURE);
}
cass_result_free(result);
cass_iterator_free(iterator);
}
cass_future_free(future);
cass_statement_free(statement);
cass_prepared_free(prepared);
}
void DCDBQuery::expandSensorPattern(CassSession* session, std::string sensorPattern, DCDBTimeStamp start, DCDBTimeStamp end, std::list<SensorId>& sensorIds)
{
/* Clear the list of sensorIds */
sensorIds.clear();
/* Strip all slashes from publishedSensorName */
sensorPattern.erase(std::remove(sensorPattern.begin(), sensorPattern.end(), '/'), sensorPattern.end());
/* Calculate lower and upper boundaries for the expansion of the pattern */
std::string low = sensorPattern;
std::string high = sensorPattern;
if (sensorPattern.find("*") != std::string::npos) {
low.replace(sensorPattern.find("*"), 1, 33-sensorPattern.length(), '0');
high.replace(sensorPattern.find("*"), 1, 33-sensorPattern.length(), 'F');
}
SensorId lowId, highId;
if (!topicToSid(lowId, low)) {
std::cout << "Internal error" << std::endl;
exit(EXIT_FAILURE);
}
if (!topicToSid(highId, high)) {
std::cout << "Internal error" << std::endl;
exit(EXIT_FAILURE);
}
lowId.dsid.rsvd = start.getWeekstamp();
highId.dsid.rsvd = end.getWeekstamp();
low = sidConvert(lowId);
high = sidConvert(highId);
// std::cout << "Lower boundary for sensor scan: " << std::hex << std::setfill('0') << std::setw(16) << lowId.raw[0] << " " << std::hex << std::setfill('0') << std::setw(16) << lowId.raw[1] << std::endl;
// std::cout << "Upper boundary for sensor scan: " << std::hex << std::setfill('0') << std::setw(16) << highId.raw[0] << " " << std::hex << std::setfill('0') << std::setw(16) << highId.raw[1] << std::endl;
/* Query the database to see which raw sensors actually exist in the interval between low and high */
CassError rc = CASS_OK;
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
const CassPrepared* prepared = nullptr;
CassString query = cass_string_init("SELECT DISTINCT sid FROM dcdb.sensordata WHERE TOKEN(sid) >= ? and TOKEN(sid) <= ?;");
future = cass_session_prepare(session, query);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
CassHelper::print_error(future);
} else {
prepared = cass_future_get_prepared(future);
}
statement = cass_prepared_bind(prepared);
CassBytes lowCb = cass_bytes_init((cass_byte_t*)(low.c_str()), 16);
CassBytes highCb = cass_bytes_init((cass_byte_t*)(high.c_str()), 16);
cass_statement_bind_bytes(statement, 0, lowCb);
cass_statement_bind_bytes(statement, 1, highCb);
cass_future_free(future);
future = cass_session_execute(session, statement);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
CassHelper::print_error(future);
} else {
const CassResult* result = cass_future_get_result(future);
CassIterator* iterator = cass_iterator_from_result(result);
while (cass_iterator_next(iterator)) {
const CassRow* row = cass_iterator_get_row(iterator);
CassBytes res;
cass_value_get_bytes(cass_row_get_column_by_name(row, "sid"), &res);
SensorId sensor;
sensor.raw[0] = Endian::BEToHost(((uint64_t*)res.data)[0]);
sensor.raw[1] = Endian::BEToHost(((uint64_t*)res.data)[1]);
/* Check if the sensorId matches the pattern */
if (sidPatternMatch(sensor, sensorPattern)) {
sensorIds.push_back(sensor);
}
}
cass_result_free(result);
cass_iterator_free(iterator);
}
cass_future_free(future);
cass_statement_free(statement);
cass_prepared_free(prepared);
}
/* Ugly copy of DCDBLib's topicToSid - It's really time to clean up the API */
bool DCDBQuery::topicToSid(SensorId& sid, std::string topic)
{
uint64_t pos = 0;
const char* buf = topic.c_str();
sid.raw[0] = 0;
sid.raw[1] = 0;
while (*buf && pos < 128) {
if (*buf >= '0' && *buf <= '9') {
sid.raw[pos / 64] |= (((uint64_t)(*buf - '0')) << (60-(pos%64)));
pos += 4;
}
else if (*buf >= 'A' && *buf <= 'F') {
sid.raw[pos / 64] |= (((uint64_t)(*buf - 'A' + 0xa)) << (60-(pos%64)));
pos += 4;
}
else if (*buf >= 'a' && *buf <= 'f') {
sid.raw[pos / 64] |= (((uint64_t)(*buf - 'a' + 0xa)) << (60-(pos%64)));
pos += 4;
}
buf++;
}
return pos == 128;
}
/* Ugly copy of DCDBLib's sidConvert - It's really time to clean up the API */
std::string DCDBQuery::sidConvert(SensorId& sid)
{
uint64_t ll[2];
ll[0] = Endian::hostToBE(sid.raw[0]);
ll[1] = Endian::hostToBE(sid.raw[1]);
return std::string((char*)ll, 16);
}
bool DCDBQuery::sidPatternMatch(SensorId& sid, std::string pattern)
{
/* Strip all slashes from pattern */
pattern.erase(std::remove(pattern.begin(), pattern.end(), '/'), pattern.end());
/* Convert to lower case */
boost::algorithm::to_lower(pattern);
/* Calculate the wildcard length */
int wl = (pattern.find("*") != std::string::npos) ? 33 - pattern.length() : 0;
/* Do a character by character comparison */
int posP = 0;
int posS = 0;
while (posS < 32) {
char p, cs[2];
uint64_t s;
p = pattern.c_str()[posP];
s = sid.raw[posS / 16];
s >>= (60-((4*posS)%64));
s &= 0xf;
snprintf(cs, 2, "%" PRIx64, s);
if (p == '*') {
/* Jump over the wildcard */
posS += wl;
posP++;
continue;
}
else if ((posS >= 24 && posS <= 27) || (p==cs[0])) {
posS++;
posP++;
continue;
}
else {
return false;
}
}
return true;
}
void DCDBQuery::querySensorsCSV(CassSession* session, std::string sensorName, std::list<SensorId>& sidList, DCDBTimeStamp& start, DCDBTimeStamp& end, bool raw, bool localtime)
{
/* Since everything has been mangled until here, we assume parameter safety and do a non-prepared statement... Oh oh... */
std::stringstream query;
query << "SELECT * FROM dcdb.sensordata WHERE sid IN (";
for (std::list<SensorId>::iterator it = sidList.begin(); it != sidList.end(); ++it) {
if (it != sidList.begin()) {
query << ", ";
}
query << "0x";
query << std::hex << std::setfill('0') << std::setw(16) << (*it).raw[0]
<< std::hex << std::setfill('0') << std::setw(16) << (*it).raw[1];
}
query << ") AND ts >= ";
query << std::dec << start.getRaw();
query << " AND ts <= ";
query << std::dec << end.getRaw();
query << ";";
CassString csQuery = cass_string_init(query.str().c_str());
// HACK = sometimes the query fails for unknown reasons
int retries_left = 10;
while (retries_left) {
CassStatement* statement = cass_statement_new(csQuery, 0);
CassFuture* future = cass_session_execute(session, statement);
cass_future_wait(future);
if (cass_future_error_code(future) == CASS_OK) {
const CassResult* result = cass_future_get_result(future);
CassIterator* rows = cass_iterator_from_result(result);
while (cass_iterator_next(rows)) {
const CassRow* row = cass_iterator_get_row(rows);
cass_int64_t ts, value;
cass_value_get_int64(cass_row_get_column_by_name(row, "ts"), &ts);
cass_value_get_int64(cass_row_get_column_by_name(row, "value"), &value);
DCDBTimeStamp t((uint64_t)ts);
if (localtime) {
t.convertToLocal();
}
if (raw) {
std::cout << sensorName << "," << std::dec << t.getRaw() << "," << std::dec << value << std::endl;
}
else {
std::cout << sensorName << "," << t.getString() << "," << std::dec << value << std::endl;
}
}
cass_iterator_free(rows);
cass_statement_free(statement);
cass_future_free(future);
break;
}
else {
retries_left--;
sleep(1);
// CassString message = cass_future_error_message(future);
// fprintf(stderr, "Error: %.*s\n", (int)message.length, message.data);
}
cass_statement_free(statement);
cass_future_free(future);
}
if (retries_left == 0) {
fprintf(stderr, "Error reading data. Please try again later.\n");
}
connection = nullptr;
useLocalTime = false;
useRawOutput = false;
}
#endif
......@@ -31,7 +31,7 @@ public:
bool getRawOutputEnabled();
void doQuery(const char* hostname, std::list<std::string> sensors, DCDBTimeStamp start, DCDBTimeStamp end);
DCDBQuery() {};
DCDBQuery();
virtual ~DCDBQuery() {};
};
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment