Commit 76aaf386 authored by Michael Ott's avatar Michael Ott
Browse files

Provide SensorDataStore::insertBatch() function for batch inserts to reduce overhead

parent 39b38cea
......@@ -81,6 +81,9 @@ public:
(value == rhs.value);}
#endif
SensorDataStoreReading();
SensorDataStoreReading(SensorId& sid, uint64_t ts, int64_t value);
virtual ~SensorDataStoreReading();
};
/**
......@@ -109,6 +112,13 @@ public:
*/
void insert(SensorDataStoreReading& reading);
/**
* @brief This function inserts a single sensor reading into
* the database.
* @param readings A list of SensorDataStoreReading object.
*/
void insertBatch(std::list<SensorDataStoreReading>& readings);
/**
* @brief Set the TTL for newly inserted sensor data.
* @param ttl The TTL for the sensor data in seconds.
......
......@@ -76,6 +76,13 @@ public:
*/
void insert(SensorDataStoreReading& reading);
/**
* @brief This function inserts a single sensor reading into
* the database.
* @param readings A list of SensorDataStoreReading object.
*/
void insertBatch(std::list<SensorDataStoreReading>& readings);
/**
* @brief This function sets the TTL of newly inserted readings.
* @param ttl The TTL to be used for new inserts in seconds.
......
......@@ -64,6 +64,18 @@
using namespace DCDB;
SensorDataStoreReading::SensorDataStoreReading() {
}
SensorDataStoreReading::SensorDataStoreReading(SensorId& sid, uint64_t ts, int64_t value) {
this->sensorId = sid;
this->timeStamp = TimeStamp(ts);
this->value = value;
}
SensorDataStoreReading::~SensorDataStoreReading() {
}
/**
* @details
* Since we want high-performance inserts, we prepare the
......@@ -162,6 +174,35 @@ void SensorDataStoreImpl::insert(SensorDataStoreReading& reading) {
insert(&reading.sensorId, reading.timeStamp.getRaw(), reading.value);
}
void SensorDataStoreImpl::insertBatch(std::list<SensorDataStoreReading>& readings) {
CassBatch* batch = cass_batch_new(CASS_BATCH_TYPE_UNLOGGED);
for (auto r: readings) {
/* Calculate and insert week number */
uint16_t week = r.timeStamp.getRaw() / 604800000000000;
r.sensorId.setRsvd(week);
/* Add insert statement to batch */
CassStatement* statement = cass_prepared_bind(preparedInsert);
cass_statement_bind_bytes_by_name(statement, "sid", (cass_byte_t*)(r.sensorId.serialize().c_str()), 16);
cass_statement_bind_int64_by_name(statement, "ts", r.timeStamp.getRaw());
cass_statement_bind_int64_by_name(statement, "value", r.value);
cass_batch_add_statement(batch, statement);
cass_statement_free(statement);
}
/* Execute batch */
CassFuture *future = cass_session_execute_batch(session, batch);
cass_batch_free(batch);
CassError rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
}
cass_future_free(future);
}
/**
* @details
* This function updates the prepared statement for inserts
......@@ -518,6 +559,10 @@ void SensorDataStore::insert(SensorDataStoreReading& reading)
impl->insert(reading);
}
void SensorDataStore::insertBatch(std::list<SensorDataStoreReading>& readings) {
impl->insertBatch(readings);
}
/**
* @details
* Instead of doing the actual work, this function simply
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment