Commit 67b196e4 authored by Axel Auweter's avatar Axel Auweter
Browse files

Adressing issue #43, you can now insert without TTL and occasionally

use the new dcdbconfig db fuzzytrunc command to remove entire rows
of data that are getting too old.
parent 90f40c8a
......@@ -98,6 +98,13 @@ public:
*/
SDSQueryResult querySum(int64_t& result, SensorId sid, DCDBTimeStamp start, DCDBTimeStamp end);
/**
* @brief This function truncates all sensor data that is older than
* the specified week.
* @param weekStamp The 16-bit weekstamp generated from a cut-off date
*/
void truncBeforeWeek(uint16_t weekStamp);
/**
* @brief A shortcut constructor for a SensorDataStore object
* that allows accessing the data store through a
......
......@@ -74,6 +74,19 @@ public:
*/
SDSQueryResult querySum(int64_t& result, SensorId sid, DCDBTimeStamp start, DCDBTimeStamp end);
/**
* @brief This function truncates all sensor data that is older than
* the specified week.
* @param weekStamp The 16-bit weekstamp generated from a cut-off date
*/
void truncBeforeWeek(uint16_t weekStamp);
/**
* @brief This function deletes a row from the sensordatastore.
* @param sid SensorId object that identifies the row to be deleted.
*/
void deleteRow(SensorId& sid);
/**
* @brief This is the standard constructor of the SensorDataStoreImpl class.
* @param csb A CassandraBackend object to do the raw database access.
......
......@@ -275,6 +275,104 @@ SDSQueryResult SensorDataStoreImpl::querySum(int64_t& result, SensorId sid, DCDB
return SDS_OK;
}
/**
* @details
* This function deletes all data from the sensordata store
* that is older than weekStamp-1 weeks.
*/
void SensorDataStoreImpl::truncBeforeWeek(uint16_t weekStamp)
{
/* List of rows that should be deleted */
std::list<SensorId> deleteList;
/* Query the database to collect all rows */
CassError rc = CASS_OK;
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
const char* query = "SELECT DISTINCT sid FROM " KEYSPACE_NAME "." CF_SENSORDATA ";";
statement = cass_statement_new(query, 0);
future = cass_session_execute(session, statement);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
return;
}
const CassResult* result = cass_future_get_result(future);
cass_future_free(future);
CassIterator* iterator = cass_iterator_from_result(result);
/* Iterate over all rows and filter out those, that are too old */
while (cass_iterator_next(iterator)) {
const CassRow* row = cass_iterator_get_row(iterator);
const cass_byte_t* res;
size_t res_len;
cass_value_get_bytes(cass_row_get_column_by_name(row, "sid"), &res, &res_len);
uint64_t raw[2];
raw[0] = Endian::BEToHost(((uint64_t*)res)[0]);
raw[1] = Endian::BEToHost(((uint64_t*)res)[1]);
SensorId sensor;
sensor.setRaw(raw);
/* Check if the sensorId's rsvd field is smaller than the weekStamp */
if (sensor.getRsvd() < weekStamp) {
deleteList.push_back(sensor);
}
}
cass_result_free(result);
cass_iterator_free(iterator);
cass_statement_free(statement);
/* Now iterate over all entries in the deleteList and delete them */
for (std::list<SensorId>::iterator it = deleteList.begin(); it != deleteList.end(); it++) {
deleteRow(*it);
}
}
/**
* @details
* Deleting entire rows is rather efficient compared to deleting individual columns.
*/
void SensorDataStoreImpl::deleteRow(SensorId& sid)
{
CassError rc = CASS_OK;
CassStatement* statement = NULL;
CassFuture *future = NULL;
const CassPrepared* prepared = nullptr;
const char* query = "DELETE FROM " KEYSPACE_NAME "." CF_SENSORDATA " WHERE sid = ?;";
future = cass_session_prepare(session, query);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
cass_future_free(future);
return;
}
prepared = cass_future_get_prepared(future);
cass_future_free(future);
std::string key = sid.serialize();
statement = cass_prepared_bind(prepared);
cass_statement_bind_bytes(statement, 0, (const cass_byte_t*)(key.c_str()), 16);
future = cass_session_execute(session, statement);
cass_future_wait(future);
cass_statement_free(statement);
cass_future_free(future);
cass_prepared_free(prepared);
}
/**
* @details
* This constructor sets the internal connection variable to
......@@ -347,6 +445,17 @@ SDSQueryResult SensorDataStore::querySum(int64_t& result, SensorId sid, DCDBTime
return impl->querySum(result, sid, start, end);
}
/**
* @details
* Instead of doing the actual work, this function simply
* forwards to the insert function of the SensorDataStoreImpl
* class.
*/
void SensorDataStore::truncBeforeWeek(uint16_t weekStamp)
{
return impl->truncBeforeWeek(weekStamp);
}
/**
* @details
* This constructor allocates the implementation class which
......
include ../../config.mk
CXXFLAGS = -O2 -ggdb --std=c++11 -Wall -Wno-unused-local-typedefs -Wno-unknown-warning-option -fmessage-length=0 -I$(DCDBDEPLOYPATH)/include/ -I$(DCDBBASEPATH)/include/
OBJS = dcdbconfig.o sensoraction.o useraction.o
OBJS = dcdbconfig.o sensoraction.o dbaction.o useraction.o
LIBS = -L$(DCDBDEPLOYPATH)/lib/ -ldcdb -lcassandra -luv -lboost_random -lboost_system -lboost_date_time -lboost_regex -lssl -lcrypto
# GCC 4.8 is broken
ifeq ($(findstring 4.8, $(shell $(CXX) --version)), 4.8)
......
/*
* sensoraction.h
*
* Created on: Jan 25, 2015
* Author: Axel Auweter
*/
#include <iostream>
#include <string>
#include <boost/lexical_cast.hpp>
#include "dbaction.h"
/*
* Print the help for the SENSOR command
*/
void DBAction::printHelp(int argc, char* argv[])
{
/* 01234567890123456789012345678901234567890123456789012345678901234567890123456789 */
std::cout << "DB command help" << std::endl;
std::cout << "The DB command has the following options:" << std::endl;
std::cout << " INSERT <sid> <time> <value> - Insert test data into the data store" << std::endl;
std::cout << " FUZZYTRUNC <time> - Truncate data that is older than <time>" << std::endl;
}
/*
* Execute any of the DB commands
*/
int DBAction::executeCommand(int argc, char* argv[], int argvidx, const char* hostname)
{
/* Independent from the command, we need to connect to the server */
connection = new DCDBConnection();
connection->setHostname(hostname);
if (!connection->connect()) {
std::cerr << "Cannot connect to Cassandra database." << std::endl;
return EXIT_FAILURE;
}
/* Check what we need to do (argv[argvidx] contains "DB") */
argvidx++;
if (argvidx >= argc) {
std::cout << "The DB command needs at least two parameters." << std::endl;
std::cout << "Run with 'HELP DB' to see the list of possible DB commands." << std::endl;
goto executeCommandError;
}
if (strcasecmp(argv[argvidx], "INSERT") == 0) {
/* INSERT needs two more parameters */
if (argvidx+3 >= argc) {
std::cout << "INSERT needs three more parameters!" << std::endl;
goto executeCommandError;
}
doInsert(argv[argvidx+1], argv[argvidx+2], argv[argvidx+3]);
}
else if (strcasecmp(argv[argvidx], "FUZZYTRUNC") == 0) {
/* FUZZYTRUNC needs one more parameter */
if (argvidx+1 >= argc) {
std::cout << "FUZZYTRUNC needs one more parameter!" << std::endl;
goto executeCommandError;
}
doFuzzyTrunc(argv[argvidx+1]);
}
else {
std::cout << "Invalid DB command: " << argv[argvidx] << std::endl;
goto executeCommandError;
}
/* Clean up */
connection->disconnect();
delete connection;
return EXIT_SUCCESS;
executeCommandError:
connection->disconnect();
delete connection;
return EXIT_FAILURE;
}
/*
* Insert a single sensor reading into the database
*/
void DBAction::doInsert(std::string sidstr, std::string timestr, std::string valuestr)
{
SensorDataStore ds(connection);
SensorId sid;
DCDBTimeStamp ts;
int64_t value;
if (!sid.mqttTopicConvert(sidstr)) {
std::cout << "Invalid SID: " << sidstr << std::endl;
return;
}
try {
ts = DCDBTimeStamp(timestr);
}
catch (std::exception& e) {
std::cout << "Wrong time format." << std::endl;
return;
}
try {
value = boost::lexical_cast<int64_t>(valuestr);
}
catch (std::exception& e) {
std::cout << "Wrong value format." << std::endl;
return;
}
ds.insert(&sid, ts.getRaw(), value);
}
/*
* Fuzzy delete sensor data older than timestr
* The goal of this is to kill entire cassandra rows, so we get the weekstamp of timestr,
* subtract 1 and delete everything prior to that.
*/
void DBAction::doFuzzyTrunc(std::string timestr)
{
SensorDataStore ds(connection);
DCDBTimeStamp ts;
try {
ts = DCDBTimeStamp(timestr);
}
catch (std::exception& e) {
std::cout << "Wrong time format." << std::endl;
return;
}
ds.truncBeforeWeek(ts.getWeekstamp());
}
/*
* dbaction.h
*
* Created on: Sep 01, 2015
* Author: Axel Auweter
*/
#include <dcdb/connection.h>
#include <dcdb/sensordatastore.h>
#include <dcdb/timestamp.h>
#include <string>
#include "useraction.h"
#ifndef DBACTION_H
#define DBACTION_H
class DBAction : public UserAction
{
public:
void printHelp(int argc, char* argv[]);
int executeCommand(int argc, char* argv[], int argvidx, const char* hostname);
DBAction() {};
virtual ~DBAction() {};
protected:
DCDBConnection* connection;
void doInsert(std::string sidstr, std::string timestr, std::string valuestr);
void doFuzzyTrunc(std::string timestr);
};
#endif
......@@ -21,6 +21,7 @@ void usage(int argc, char* argv[])
std::cout << "Usage: " << argv[0] << " [-h host] <command> [<arguments> ... ]" << std::endl << std::endl;
std::cout << "Valid commands are: " << std::endl;
std::cout << " HELP <command name> - print help for given command" << std::endl;
std::cout << " DB - perform low-level database functions" << std::endl;
std::cout << " SENSOR - list and configure sensors" << std::endl;
}
......
......@@ -7,6 +7,7 @@
#include "useraction.h"
#include "sensoraction.h"
#include "dbaction.h"
#include <cstring>
#include <memory>
......@@ -23,6 +24,9 @@ std::shared_ptr<UserAction> UserActionFactory::getAction(const char *actionStr)
if (strcasecmp(actionStr, "sensor") == 0) {
action = new SensorAction();
}
else if (strcasecmp(actionStr, "db") == 0) {
action = new DBAction();
}
if (action != nullptr) {
return std::shared_ptr<UserAction>(action);
......
......@@ -37,11 +37,11 @@ _dcdbconfig_options()
fi
if [ "${num_args}" -le "$((${toplevel_command_at}+1))" ]; then
comrep+="help sensor "
comrep+="help sensor db "
else
if [ "${COMP_WORDS[${toplevel_command_at}]}" = "help" ]; then
if [ ! "${COMP_WORDS[$((${COMP_CWORD}-1))]}" = "sensor" ]; then
comrep="sensor"
comrep="sensor db "
fi
elif [ "${COMP_WORDS[${toplevel_command_at}]}" = "sensor" ]; then
if [ "${num_args}" -eq "$((${toplevel_command_at}+2))" ]; then
......@@ -65,6 +65,10 @@ _dcdbconfig_options()
if [ "${COMP_WORDS[$((${toplevel_command_at}+1))]}" = "unpublish" ] && [ "${num_args}" -eq "$((${toplevel_command_at}+3))" ]; then
comrep=$(dcdbconfig ${hostname_str} sensor list 2> /dev/null)
fi
elif [ "${COMP_WORDS[${toplevel_command_at}]}" = "db" ]; then
if [ "${num_args}" -eq "$((${toplevel_command_at}+2))" ]; then
comrep="insert fuzzytrunc"
fi
fi
fi
......
......@@ -80,7 +80,10 @@ case $1 in
wait_for_listen_port 9160
# Start CollectAgent (TTL for the data is 100 days)
env LD_LIBRARY_PATH=$LD_LIBRARY_PATH collectagent -D -l 0.0.0.0 -h 127.0.0.1 -t 8640000
#env LD_LIBRARY_PATH=$LD_LIBRARY_PATH collectagent -D -l 0.0.0.0 -h 127.0.0.1 -t 8640000
# Start CollectAgent (no TTL)
env LD_LIBRARY_PATH=$LD_LIBRARY_PATH collectagent -D -l 0.0.0.0 -h 127.0.0.1
# Wait till CollectAgent is up
wait_for_listen_port 1883
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment