Commit a7f9b57c authored by Alessio Netti's avatar Alessio Netti

Configurable Cassandra Driver

- Added configuration options to tune the Cassandra driver for
optimal performance
- Added a "debugLog" configuration switch that enables logging of
errors in asynchronous inserts in the Cassandra driver
parent 05e98971
......@@ -408,6 +408,11 @@ int main(int argc, char* const argv[]) {
//Allocate and initialize connection to Cassandra.
dcdbConn = new DCDB::Connection(cassandraHost, atoi(cassandraPort.c_str()), settings.cassandraSettings.username, settings.cassandraSettings.password);
dcdbConn->setNumThreadsIo(settings.cassandraSettings.numThreadsIo);
dcdbConn->setQueueSizeIo(settings.cassandraSettings.queueSizeIo);
uint32_t params[3] = {settings.cassandraSettings.coreConnPerHost, settings.cassandraSettings.maxConnPerHost, settings.cassandraSettings.maxConcRequests};
dcdbConn->setBackendParams(params);
if (!dcdbConn->connect()) {
LOG(fatal) << "Cannot connect to Cassandra!";
......@@ -419,6 +424,7 @@ int main(int argc, char* const argv[]) {
*/
dcdbConn->initSchema();
/*
* Allocate the SensorDataStore.
*/
......@@ -430,6 +436,7 @@ int main(int argc, char* const argv[]) {
*/
if (settings.cassandraSettings.ttl > 0)
mySensorDataStore->setTTL(settings.cassandraSettings.ttl);
mySensorDataStore->setDebugLog(settings.cassandraSettings.debugLog);
/*
* Start the MQTT Message Server.
......
......@@ -19,5 +19,11 @@ cassandra {
username
password
ttl 0
numThreadsIo 1
queueSizeIo 4096
coreConnPerHost 1
maxConnPerHost 2
maxConcRequests 100
debugLog false
}
......@@ -29,6 +29,12 @@ Configuration::Configuration(const std::string& cfgFilePath) :
_global.cassandraSettings.username = "";
_global.cassandraSettings.password = "";
_global.cassandraSettings.ttl = 0;
_global.cassandraSettings.numThreadsIo = 1;
_global.cassandraSettings.queueSizeIo = 4096;
_global.cassandraSettings.coreConnPerHost = 1;
_global.cassandraSettings.maxConnPerHost = 2;
_global.cassandraSettings.maxConcRequests = 100;
_global.cassandraSettings.debugLog = false;
}
Configuration::~Configuration() {}
......@@ -85,6 +91,18 @@ bool Configuration::readGlobal() {
_global.cassandraSettings.password = global.second.data();
} else if (boost::iequals(global.first, "ttl")) {
_global.cassandraSettings.ttl = stoul(global.second.data());
} else if (boost::iequals(global.first, "numThreadsIo")) {
_global.cassandraSettings.numThreadsIo = stoul(global.second.data());
} else if (boost::iequals(global.first, "queueSizeIo")) {
_global.cassandraSettings.queueSizeIo = stoul(global.second.data());
} else if (boost::iequals(global.first, "coreConnPerHost")) {
_global.cassandraSettings.coreConnPerHost = stoul(global.second.data());
} else if (boost::iequals(global.first, "maxConnPerHost")) {
_global.cassandraSettings.maxConnPerHost = stoul(global.second.data());
} else if (boost::iequals(global.first, "maxConcRequests")) {
_global.cassandraSettings.maxConcRequests = stoul(global.second.data());
} else if (boost::iequals(global.first, "debugLog")) {
_global.cassandraSettings.debugLog = global.second.data() == "true";
} else {
LOG(warning) << " Value \"" << global.first << "\" not recognized. Omitting";
}
......
......@@ -27,6 +27,12 @@ typedef struct {
std::string username;
std::string password;
uint32_t ttl;
uint32_t numThreadsIo;
uint32_t queueSizeIo;
uint32_t coreConnPerHost;
uint32_t maxConnPerHost;
uint32_t maxConcRequests;
bool debugLog;
} cassandra_t;
typedef struct {
......
......@@ -66,6 +66,25 @@ public:
*/
void printError(CassFuture* future);
/**
* @brief Sets the number of IO threads that are spawned
* @param n The number of IO threads
*/
void setNumThreadsIo(uint32_t n);
/**
* @brief Sets the maximum size of the outbound requests queue
* @param s The maximum size of the requests queue
*/
void setQueueSizeIo(uint32_t s);
/**
* @brief Sets implementation-specific parameters
* @param p A vector of unsigned integers containing a specific number of
* implementation-specific parameters
*/
void setBackendParams(uint32_t* p);
/**
* @brief Set the hostname for the connection.
* @param hostname Hostname of a Cassandra front end node.
......
......@@ -125,6 +125,12 @@ public:
*/
void setTTL(uint64_t ttl);
/**
* @brief Enables or disables logging of Cassandra insert errors
* @param dl true to enable logging, false otherwise
*/
void setDebugLog(bool dl);
/**
* @brief This function queries a sensor's values in
* the given time range.
......
......@@ -55,6 +55,12 @@ protected:
const CassSchemaMeta* schema; /**< The schema object containing the current database schema information */
std::string currentKeyspace; /**< The name of the active keyspace */
uint32_t numThreadsIo;
uint32_t queueSizeIo;
uint32_t coreConnPerHost;
uint32_t maxConnPerHost;
uint32_t maxConcRequests;
/**
* @brief This function validates a name to ensure that
* it only consists of alphanumeric characters.
......@@ -119,6 +125,31 @@ public:
*/
void printError(CassFuture* future);
/**
* @brief Sets the number of IO threads that are spawned
* @param n The number of IO threads
*/
void setNumThreadsIo(uint32_t n);
/**
* @brief Sets the maximum size of the outbound requests queue
* @param s The maximum size of the requests queue
*/
void setQueueSizeIo(uint32_t s);
/**
* @brief Sets implementation-specific parameters
*
* Input is an array of size three of unsigned integers:
* - p[0] contains the number of connections that are associated by default to each IO thread
* - p[1] contains the max number of connections that can be spawned by an IO thread
* - p[2] specifies the number of concurrent requests that triggers a new connection creation
*
* @param p An array of unsigned integers containing three Cassandra-related
* configuration parameters
*/
void setBackendParams(uint32_t* p);
/**
* @brief The implementation function of Connection::setHostname().
*/
......
......@@ -41,7 +41,23 @@
namespace DCDB {
static std::string const AggregateString[] = {"", "min", "max", "avg", "sum", "count"};
//Definition of callback function
void DataStoreImpl_on_result(CassFuture_* future, void* data) {
/* This result will now return immediately */
static CassError rcPrev = CASS_OK;
static uint32_t ctr = 0;
CassError rc = cass_future_error_code(future);
if(rc != CASS_OK) {
if(rc != rcPrev) {
std::cout << "Cassandra Backend Error: " << cass_error_desc(rc) << std::endl;
ctr = 0;
rcPrev = rc;
} else if(++ctr%10000 == 0)
std::cout << "Cassandra Backend Error: " << cass_error_desc(rc) << " (" << ctr << " more)" << std::endl;
}
}
/**
* @brief The SensorDataStoreImpl class contains all protected
* functions belonging to SensorDataStore which are
......@@ -53,6 +69,7 @@ protected:
Connection* connection; /**< The Connection object that does the low-level stuff for us. */
CassSession* session; /**< The CassSession object given by the connection. */
const CassPrepared* preparedInsert; /**< The prepared statement for fast insertions. */
bool debugLog;
/**
* @brief Prepare for insertions.
......@@ -89,6 +106,12 @@ public:
*/
void setTTL(uint64_t ttl);
/**
* @brief Enables or disables logging of Cassandra insert errors
* @param dl true to enable logging, false otherwise
*/
void setDebugLog(bool dl);
/**
* @brief This function querie a sensor's values in
* the given time range.
......
......@@ -46,6 +46,18 @@ void Connection::printError(CassFuture* future) {
impl->printError(future);
}
void Connection::setNumThreadsIo(uint32_t n) {
impl->setNumThreadsIo(n);
}
void Connection::setQueueSizeIo(uint32_t s) {
impl->setQueueSizeIo(s);
}
void Connection::setBackendParams(uint32_t* p) {
impl->setBackendParams(p);
}
void Connection::setHostname(std::string hostname) {
impl->setHostname(hostname);
}
......@@ -283,6 +295,24 @@ void ConnectionImpl::printError(CassFuture* future)
std::cerr << "Cassandra Backend Error: " << std::string(message, length) << std::endl;
}
void ConnectionImpl::setNumThreadsIo(uint32_t n) {
if(!connected)
numThreadsIo = n;
}
void ConnectionImpl::setQueueSizeIo(uint32_t s) {
if(!connected)
queueSizeIo = s;
}
void ConnectionImpl::setBackendParams(uint32_t* p) {
if(!connected) {
coreConnPerHost = p[0];
maxConnPerHost = p[1];
maxConcRequests = p[2];
}
}
void ConnectionImpl::setHostname(std::string hostname) {
if (!connected)
hostname_ = hostname;
......@@ -335,6 +365,12 @@ bool ConnectionImpl::connect() {
cass_cluster_set_credentials(cluster, username_.c_str(), password_.c_str());
}
cass_cluster_set_num_threads_io(cluster, numThreadsIo);
cass_cluster_set_queue_size_io(cluster, queueSizeIo);
cass_cluster_set_core_connections_per_host(cluster, coreConnPerHost);
cass_cluster_set_max_connections_per_host(cluster, maxConnPerHost);
cass_cluster_set_max_concurrent_requests_threshold(cluster, maxConcRequests);
/* Force protcol version to 1 */
cass_cluster_set_protocol_version(cluster, 1);
......@@ -468,6 +504,12 @@ ConnectionImpl::ConnectionImpl() {
session = nullptr;
schema = nullptr;
numThreadsIo = 1;
queueSizeIo = 4096;
coreConnPerHost = 1;
maxConnPerHost = 2;
maxConcRequests = 100;
hostname_ = "localhost";
port_ = 9042;
connected = false;
......
......@@ -185,6 +185,9 @@ void SensorDataStoreImpl::insertBatch(std::list<SensorDataStoreReading>& reading
/* Execute batch */
CassFuture *future = cass_session_execute_batch(session, batch);
cass_batch_free(batch);
if(debugLog)
cass_future_set_callback(future, DataStoreImpl_on_result, NULL);
/* Don't wait for the future, just free it to make the call truly asynchronous */
cass_future_free(future);
......@@ -200,6 +203,15 @@ void SensorDataStoreImpl::setTTL(uint64_t ttl)
prepareInsert(ttl);
}
/**
* @brief Enables or disables logging of Cassandra insert errors
* @param dl true to enable logging, false otherwise
*/
void SensorDataStoreImpl::setDebugLog(bool dl)
{
debugLog = dl;
}
/**
* @details
* This function issues a regular query to the data store
......@@ -512,6 +524,7 @@ SensorDataStoreImpl::SensorDataStoreImpl(Connection* conn)
{
connection = conn;
session = connection->getSessionHandle();
debugLog = false;
preparedInsert = nullptr;
prepareInsert(0);
......@@ -561,6 +574,15 @@ void SensorDataStore::setTTL(uint64_t ttl)
impl->setTTL(ttl);
}
/**
* @brief Enables or disables logging of Cassandra insert errors
* @param dl true to enable logging, false otherwise
*/
void SensorDataStore::setDebugLog(bool dl)
{
impl->setDebugLog(dl);
}
/**
* @details
* Instead of doing the actual work, this function simply
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment