Commit 174997aa authored by Alessio Netti's avatar Alessio Netti
Browse files

Per-insert TTL in SensorDataStore

parent 7556fa84
......@@ -113,25 +113,28 @@ public:
/**
* @brief This function inserts a single sensor reading into
* the database.
* @param sid A SensorId object.
* @param ts The timestamp of the sensor reading.
* @param value The value of the sensor reading.
* @param sid A SensorId object
* @param ts The timestamp of the sensor reading
* @param value The value of the sensor reading
* @param ttl Time to live (in seconds) for the inserted reading
*/
void insert(SensorId* sid, uint64_t ts, int64_t value);
void insert(SensorId* sid, uint64_t ts, int64_t value, int64_t ttl=-1);
/**
* @brief This function inserts a single sensor reading into
* the database.
* @param reading A SensorDataStoreReading object.
* @param reading A SensorDataStoreReading object
* @param ttl Time to live (in seconds) for the inserted reading
*/
void insert(SensorDataStoreReading& reading);
void insert(SensorDataStoreReading& reading, int64_t ttl=-1);
/**
* @brief This function inserts a single sensor reading into
* the database.
* @param readings A list of SensorDataStoreReading object.
* @param readings A list of SensorDataStoreReading object
* @param ttl Time to live (in seconds) for the inserted readings
*/
void insertBatch(std::list<SensorDataStoreReading>& readings);
void insertBatch(std::list<SensorDataStoreReading>& readings, int64_t ttl=-1);
/**
* @brief Set the TTL for newly inserted sensor data.
......
......@@ -70,7 +70,9 @@ protected:
Connection* connection; /**< The Connection object that does the low-level stuff for us. */
CassSession* session; /**< The CassSession object given by the connection. */
const CassPrepared* preparedInsert; /**< The prepared statement for fast insertions. */
const CassPrepared* preparedInsert_noTTL; /**< The prepared statement for fast insertions, without TTL. */
bool debugLog;
uint64_t defaultTTL;
/**
* @brief Prepare for insertions.
......@@ -84,22 +86,25 @@ public:
* @param sid The SensorId object representing the sensor (typically obtained from topicToSid)
* @param ts The timestamp of the reading (nanoseconds since Unix epoch)
* @param value The sensor reading as 64-bit integer
* @param ttl Time to live (in seconds) for the inserted reading
*/
void insert(SensorId* sid, uint64_t ts, int64_t value);
void insert(SensorId* sid, uint64_t ts, int64_t value, int64_t ttl=-1);
/**
* @brief This function inserts a single sensor reading into
* the database.
* @param reading A SensorDataStoreReading object.
* @param reading A SensorDataStoreReading object
* @param ttl Time to live (in seconds) for the inserted reading
*/
void insert(SensorDataStoreReading& reading);
void insert(SensorDataStoreReading& reading, int64_t ttl=-1);
/**
* @brief This function inserts a single sensor reading into
* the database.
* @param readings A list of SensorDataStoreReading object.
* @param readings A list of SensorDataStoreReading object
* @param ttl Time to live (in seconds) for the inserted readings
*/
void insertBatch(std::list<SensorDataStoreReading>& readings);
void insertBatch(std::list<SensorDataStoreReading>& readings, int64_t ttl=-1);
/**
* @brief This function sets the TTL of newly inserted readings.
......
......@@ -85,42 +85,47 @@ SensorDataStoreReading::~SensorDataStoreReading() {
*/
void SensorDataStoreImpl::prepareInsert(uint64_t ttl)
{
CassError rc = CASS_OK;
CassFuture* future = NULL;
const char* query;
/*
* Free the old prepared if necessary.
*/
if (preparedInsert) {
CassError rc = CASS_OK;
CassFuture* future = NULL;
const char* query;
/*
* Free the old prepared if necessary.
*/
if (preparedInsert) {
cass_prepared_free(preparedInsert);
}
char *queryBuf = NULL;
if (ttl == 0) {
query = "INSERT INTO dcdb.sensordata (sid, ws, ts, value) VALUES (?, ?, ?, ?);";
}
else {
queryBuf = (char*)malloc(256);
snprintf(queryBuf, 256, "INSERT INTO dcdb.sensordata (sid, ws, ts, value) VALUES (?, ?, ?, ?) USING TTL %" PRIu64 " ;", ttl);
query = queryBuf;
}
future = cass_session_prepare(session, query);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
}
query = "INSERT INTO dcdb.sensordata (sid, ws, ts, value) VALUES (?, ?, ?, ?) USING TTL ? ;";
future = cass_session_prepare(session, query);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
} else {
} else {
preparedInsert = cass_future_get_prepared(future);
}
}
cass_future_free(future);
cass_future_free(future);
if (queryBuf) {
free(queryBuf);
}
// Preparing the insert statement without a TTL clause
if (preparedInsert_noTTL) {
cass_prepared_free(preparedInsert_noTTL);
}
query = "INSERT INTO dcdb.sensordata (sid, ws, ts, value) VALUES (?, ?, ?, ?);";
future = cass_session_prepare(session, query);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
} else {
preparedInsert_noTTL = cass_future_get_prepared(future);
}
cass_future_free(future);
defaultTTL = ttl;
}
/**
......@@ -137,7 +142,7 @@ void SensorDataStoreImpl::prepareInsert(uint64_t ttl)
* Applications should not call this function directly, but
* use the insert function provided by the SensorDataStore class.
*/
void SensorDataStoreImpl::insert(SensorId* sid, uint64_t ts, int64_t value)
void SensorDataStoreImpl::insert(SensorId* sid, uint64_t ts, int64_t value, int64_t ttl)
{
#if 0
std::cout << "Inserting@SensorDataStoreImpl (" << sid->raw[0] << " " << sid->raw[1] << ", " << ts << ", " << value << ")" << std::endl;
......@@ -146,12 +151,16 @@ void SensorDataStoreImpl::insert(SensorId* sid, uint64_t ts, int64_t value)
/* Calculate and insert week number */
uint16_t week = ts / 604800000000000;
sid->setRsvd(week);
int64_t ttlReal = (ttl<0 ? defaultTTL : ttl);
CassStatement* statement = cass_prepared_bind(preparedInsert);
CassStatement* statement = cass_prepared_bind(ttlReal<=0 ? preparedInsert_noTTL : preparedInsert);
cass_statement_bind_string_by_name(statement, "sid", sid->getId().c_str());
cass_statement_bind_int16_by_name(statement, "ws", week);
cass_statement_bind_int64_by_name(statement, "ts", ts);
cass_statement_bind_int64_by_name(statement, "value", value);
if(ttlReal>0)
cass_statement_bind_int32(statement, 4, ttlReal);
CassFuture* future = cass_session_execute(session, statement);
cass_statement_free(statement);
......@@ -160,24 +169,28 @@ void SensorDataStoreImpl::insert(SensorId* sid, uint64_t ts, int64_t value)
cass_future_free(future);
}
void SensorDataStoreImpl::insert(SensorDataStoreReading& reading) {
insert(&reading.sensorId, reading.timeStamp.getRaw(), reading.value);
void SensorDataStoreImpl::insert(SensorDataStoreReading& reading, int64_t ttl) {
insert(&reading.sensorId, reading.timeStamp.getRaw(), reading.value, ttl);
}
void SensorDataStoreImpl::insertBatch(std::list<SensorDataStoreReading>& readings) {
void SensorDataStoreImpl::insertBatch(std::list<SensorDataStoreReading>& readings, int64_t ttl) {
CassBatch* batch = cass_batch_new(CASS_BATCH_TYPE_UNLOGGED);
int64_t ttlReal = (ttl<0 ? defaultTTL : ttl);
for (auto r: readings) {
/* Calculate and insert week number */
uint16_t week = r.timeStamp.getRaw() / 604800000000000;
r.sensorId.setRsvd(week);
/* Add insert statement to batch */
CassStatement* statement = cass_prepared_bind(preparedInsert);
CassStatement* statement = cass_prepared_bind(ttlReal<=0 ? preparedInsert_noTTL : preparedInsert);
cass_statement_bind_string_by_name(statement, "sid", r.sensorId.getId().c_str());
cass_statement_bind_int16_by_name(statement, "ws", week);
cass_statement_bind_int64_by_name(statement, "ts", r.timeStamp.getRaw());
cass_statement_bind_int64_by_name(statement, "value", r.value);
if(ttlReal>0)
cass_statement_bind_int32(statement, 4, ttlReal);
cass_batch_add_statement(batch, statement);
cass_statement_free(statement);
}
......@@ -648,9 +661,11 @@ SensorDataStoreImpl::SensorDataStoreImpl(Connection* conn)
connection = conn;
session = connection->getSessionHandle();
debugLog = false;
defaultTTL = 0;
preparedInsert = nullptr;
prepareInsert(0);
preparedInsert_noTTL = nullptr;
prepareInsert(defaultTTL);
}
/**
......@@ -664,6 +679,9 @@ SensorDataStoreImpl::~SensorDataStoreImpl()
if (preparedInsert) {
cass_prepared_free(preparedInsert);
}
if (preparedInsert_noTTL) {
cass_prepared_free(preparedInsert_noTTL);
}
}
/**
......@@ -672,18 +690,18 @@ SensorDataStoreImpl::~SensorDataStoreImpl()
* forwards to the insert function of the SensorDataStoreImpl
* class.
*/
void SensorDataStore::insert(SensorId* sid, uint64_t ts, int64_t value)
void SensorDataStore::insert(SensorId* sid, uint64_t ts, int64_t value, int64_t ttl)
{
impl->insert(sid, ts, value);
impl->insert(sid, ts, value, ttl);
}
void SensorDataStore::insert(SensorDataStoreReading& reading)
void SensorDataStore::insert(SensorDataStoreReading& reading, int64_t ttl)
{
impl->insert(reading);
impl->insert(reading, ttl);
}
void SensorDataStore::insertBatch(std::list<SensorDataStoreReading>& readings) {
impl->insertBatch(readings);
void SensorDataStore::insertBatch(std::list<SensorDataStoreReading>& readings, int64_t ttl) {
impl->insertBatch(readings, ttl);
}
/**
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment