Commit 2344db72 authored by Micha Mueller's avatar Micha Mueller
Browse files

Prepare inserts beforehand for faster actual inserts into Cassandra

parent 6cfb09d3
......@@ -59,10 +59,10 @@ namespace DCDB {
class JobData {
public:
JobId jobId; /**< SLURM job id of the job. */
UserId userId; /**< Id of the user who submitted the job. */
TimeStamp startTime;/**< Time when the job started (started != submitted) */
TimeStamp endTime; /**< Time when the job finished. */
NodeList nodes; /**< List of nodes the job occupied. */
UserId userId; /**< Id of the user who submitted the job. */
//TODO
};
......@@ -84,13 +84,13 @@ namespace DCDB {
/**
* @brief This function inserts a single job into the database.
* @param jid Id of the job.
* @param uid Id of the user the job belongs to.
* @param startTs The starting timestamp of the job.
* @param endTs The end timestamp of the job.
* @param nodes List of nodes which were used by the job.
* @param uid Id of the user the job belongs to.
*/
JDError insertJob(JobId jid, TimeStamp startTs, TimeStamp endTs,
NodeList nodes, UserId uid);
JDError insertJob(JobId jid, UserId uid, TimeStamp startTs,
TimeStamp endTs, NodeList nodes);
/**
* @brief This function inserts a single job which was submitted to SLURM
......
......@@ -52,11 +52,18 @@ namespace DCDB {
protected:
Connection* connection; /**< The Connection object that does the low-level stuff for us. */
CassSession* session; /**< The CassSession object given by the connection. */
const CassPrepared* preparedInsert; /**< The prepared statement for fast insertions. */
/**
* @brief Prepare for insertions.
* @param ttl A TTL that will be set for newly inserted values. Set to 0 to insert without TTL.
*/
void prepareInsert(uint64_t ttl);
public:
/* See jobdatastore.h for documentation */
JDError insertJob(JobId jid, TimeStamp startTs, TimeStamp endTs,
NodeList nodes, UserId uid);
JDError insertJob(JobId jid, UserId uid, TimeStamp startTs,
TimeStamp endTs, NodeList nodes);
JDError insertSubmittedJob(JobId jid, UserId uid);
JDError removeJob(JobId jid);
......
......@@ -7,7 +7,7 @@
//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2011-2016 Leibniz Supercomputing Centre
// Copyright (C) 2011-2018 Leibniz Supercomputing Centre
//
// This library is free software; you can redistribute it and/or
// modify it under the terms of the GNU Lesser General Public
......@@ -40,12 +40,57 @@
using namespace DCDB;
/**
* @details
* Since we want high-performance inserts, we prepare the insert CQL query in
* advance and only bind it on the actual insert.
*/
void JobDataStoreImpl::prepareInsert(uint64_t ttl) {
CassError rc = CASS_OK;
CassFuture* future = NULL;
const char* query;
/*
* Free the old prepared if necessary.
*/
if (preparedInsert) {
cass_prepared_free(preparedInsert);
}
char *queryBuf = NULL;
if (ttl == 0) {
query = "INSERT INTO dcdb.jobdata (jid, uid, startTs, endTs, nodes) "
"VALUES (?, ?, ?, ?, ?);";
}
else {
queryBuf = (char*)malloc(256);
snprintf(queryBuf, 256, "INSERT INTO dcdb.jobdata (jid, uid, startTs, endTs, nodes) "
"VALUES (?, ?, ?, ?, ?) USING TTL %" PRIu64 " ;", ttl);
query = queryBuf;
}
future = cass_session_prepare(session, query);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
} else {
preparedInsert = cass_future_get_prepared(future);
}
cass_future_free(future);
if (queryBuf) {
free(queryBuf);
}
}
/**
* @details
* //TODO
*/
JDError JobDataStoreImpl::insertJob(JobId jid, TimeStamp startTs, TimeStamp endTs,
NodeList nodes, UserId uid) {
JDError JobDataStoreImpl::insertJob(JobId jid, UserId uid, TimeStamp startTs,
TimeStamp endTs, NodeList nodes) {
//TODO
return JD_UNKNOWNERROR;
}
......@@ -61,7 +106,7 @@ JDError JobDataStoreImpl::insertSubmittedJob(JobId jid, UserId uid) {
/**
* @details
* //TODO
* //TODO unnecessary
*/
JDError JobDataStoreImpl::removeJob(JobId jid) {
//TODO
......@@ -70,7 +115,7 @@ JDError JobDataStoreImpl::removeJob(JobId jid) {
/**
* @details
* //TODO
* //TODO unnecessary. Merge with getJobsByUser to getJobsByTime
*/
JDError JobDataStoreImpl::getJobs(std::list<JobData>& jobs) {
//TODO
......@@ -104,6 +149,8 @@ JDError JobDataStoreImpl::getNodeList(NodeList& nodes, JobId jid) {
return JD_UNKNOWNERROR;
}
//TODO merge setter into one update method
/**
* @details
* //TODO
......@@ -140,16 +187,22 @@ JDError JobDataStoreImpl::setNodeList(JobId jid, NodeList nodes) {
JobDataStoreImpl::JobDataStoreImpl(Connection* conn) {
connection = conn;
session = connection->getSessionHandle();
preparedInsert = nullptr;
prepareInsert(0);
}
/**
* @details
* The destructor just resets the internal pointers. Deletion of the pointers
* is not our responsibility.
* (except preparedInsert) is not our responsibility.
*/
JobDataStoreImpl::~JobDataStoreImpl() {
connection = nullptr;
session = nullptr;
if (preparedInsert) {
cass_prepared_free(preparedInsert);
}
}
/* ########################################################################## */
......@@ -159,9 +212,9 @@ JobDataStoreImpl::~JobDataStoreImpl() {
* Instead of doing the actual work, this function simply forwards to the
* corresponding function of the JobDataStoreImpl class.
*/
JDError JobDataStore::insertJob(JobId jid, TimeStamp startTs, TimeStamp endTs,
NodeList nodes, UserId uid) {
return impl->insertJob(jid, startTs, endTs, nodes, uid);
JDError JobDataStore::insertJob(JobId jid, UserId uid, TimeStamp startTs,
TimeStamp endTs, NodeList nodes) {
return impl->insertJob(jid, uid, startTs, endTs, nodes);
}
/**
......@@ -261,7 +314,7 @@ JobDataStore::JobDataStore(Connection* conn) {
*/
JobDataStore::~JobDataStore() {
/* Clean up... */
if (impl) {
delete impl;
}
if (impl) {
delete impl;
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment