//================================================================================ // Name : jobdatastore.cpp // Author : Axel Auweter, Micha Mueller // Copyright : Leibniz Supercomputing Centre // Description : C++ API implementation for inserting and querying DCDB job data. //================================================================================ //================================================================================ // This file is part of DCDB (DataCenter DataBase) // Copyright (C) 2011-2018 Leibniz Supercomputing Centre // // This library is free software; you can redistribute it and/or // modify it under the terms of the GNU Lesser General Public // License as published by the Free Software Foundation; either // version 2.1 of the License, or (at your option) any later version. // // This library is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU // Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public // License along with this library; if not, write to the Free Software // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA //================================================================================ /** * @file * @brief This file contains actual implementations of the jobdatastore * interface. The interface itself forwards all functions to the internal * JobDataStoreImpl. The real logic is implemented in the JobDataStoreImpl. */ #include #include #include #include "cassandra.h" #include "dcdb/jobdatastore.h" #include "jobdatastore_internal.h" #include "dcdb/connection.h" #include "dcdbglobals.h" using namespace DCDB; /** * @details * Since we want high-performance inserts, we prepare the insert CQL query in * advance and only bind it on the actual insert. */ void JobDataStoreImpl::prepareInsert(uint64_t ttl) { CassError rc = CASS_OK; CassFuture* future = NULL; const char* query; /* Free the old prepared if necessary. */ if (preparedInsert) { cass_prepared_free(preparedInsert); } char *queryBuf = NULL; if (ttl == 0) { query = "INSERT INTO " JD_KEYSPACE_NAME "." CF_JOBDATA " (jid, uid, start_ts, end_ts, nodes) VALUES (?, ?, ?, ?, ?);"; } else { queryBuf = (char*)malloc(256); snprintf(queryBuf, 256, "INSERT INTO " JD_KEYSPACE_NAME "." CF_JOBDATA " (jid, uid, start_ts, end_ts, nodes) VALUES (?, ?, ?, ?, ?) " "USING TTL %" PRIu64 " ;", ttl); query = queryBuf; } future = cass_session_prepare(session, query); cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); } else { preparedInsert = cass_future_get_prepared(future); } cass_future_free(future); if (queryBuf) { free(queryBuf); } } /** * @details * Extract all data from the JobData object and push it into the data store. */ JDError JobDataStoreImpl::insertJob(JobData& jdata) { /* Check if the input for the primary key is valid and reasonable */ if (jdata.startTime.getRaw() == 0) { return JD_BADPARAMS; } JDError error = JD_UNKNOWNERROR; /* Insert into Cassandra */ CassError rc = CASS_OK; CassStatement* statement = NULL; CassFuture *future = NULL; statement = cass_prepared_bind(preparedInsert); cass_statement_bind_int64_by_name(statement, "jid", jdata.jobId); cass_statement_bind_int64_by_name(statement, "uid", jdata.userId); cass_statement_bind_int64_by_name(statement, "start_ts", jdata.startTime.getRaw()); cass_statement_bind_int64_by_name(statement, "end_ts", jdata.endTime.getRaw()); /* Copy the string node list to a varchar set */ CassCollection* set = cass_collection_new(CASS_COLLECTION_TYPE_SET, jdata.nodes.size()); for (auto& s : jdata.nodes) { cass_collection_append_string(set, s.c_str()); } cass_statement_bind_collection_by_name(statement, "nodes", set); /* All parameters bound. Now execute the statement asynchronously */ future = cass_session_execute(session, statement); /* Clean up in the meantime */ cass_collection_free(set); /* Wait for the statement to finish */ cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); error = JD_UNKNOWNERROR; } else { error = JD_OK; } cass_future_free(future); cass_statement_free(statement); return error; } /** * @details * Update the job with matching JobId and StartTs in the data store with the * values provided by the given JobData object. If no such job exists in the * data store yet, it is inserted. * The JobData object is expected to be complete. Partial * updates of only selected fields are not supported. Instead, one has to * retrieve the other JobData information via getJobById() or * getJobByPrimaryKey() first and complete its JobData object for the update. */ JDError JobDataStoreImpl::updateJob(JobData& jdata) { /* Check if the input for the primary key is valid and reasonable */ if (jdata.startTime.getRaw() == 0) { return JD_BADPARAMS; } JDError error = JD_UNKNOWNERROR; /* Update entry in Cassandra (actually upserts) */ CassError rc = CASS_OK; CassStatement* statement = nullptr; CassFuture* future = nullptr; const char* query = "UPDATE " JD_KEYSPACE_NAME "." CF_JOBDATA " SET uid = ?, end_ts = ?, nodes = ? WHERE jid = ? AND start_ts = ? ;"; statement = cass_statement_new(query, 5); cass_statement_bind_int64(statement, 3, jdata.jobId); cass_statement_bind_int64(statement, 0, jdata.userId); cass_statement_bind_int64(statement, 4, jdata.startTime.getRaw()); cass_statement_bind_int64(statement, 1, jdata.endTime.getRaw()); /* Copy the string node list to a varchar set */ CassCollection* set = cass_collection_new(CASS_COLLECTION_TYPE_SET, jdata.nodes.size()); for (auto& s : jdata.nodes) { cass_collection_append_string(set, s.c_str()); } cass_statement_bind_collection(statement, 2, set); /* All parameters bound. Now execute the statement asynchronously */ future = cass_session_execute(session, statement); /* Clean up in the meantime */ cass_collection_free(set); /* Wait for the statement to finish */ cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); error = JD_UNKNOWNERROR; } else { error = JD_OK; } cass_future_free(future); cass_statement_free(statement); return error; } /** * @details * Update the job with matching JobId and StartTs in the data store with the * provided end time. If no such job exists in the data store yet, it is * inserted. */ JDError JobDataStoreImpl::updateEndtime(JobId jobId, TimeStamp startTs, TimeStamp endTime) { /* Check if the input for the primary key is valid and reasonable */ if (startTs.getRaw() == 0) { return JD_BADPARAMS; } JDError error = JD_UNKNOWNERROR; /* Update entry in Cassandra (actually upserts) */ CassError rc = CASS_OK; CassStatement* statement = nullptr; CassFuture* future = nullptr; const char* query = "UPDATE " JD_KEYSPACE_NAME "." CF_JOBDATA " SET end_ts = ? WHERE jid = ?, start_ts = ? ;"; statement = cass_statement_new(query, 3); cass_statement_bind_int64(statement, 1, jobId); cass_statement_bind_int64(statement, 2, startTs.getRaw()); cass_statement_bind_int64(statement, 0, endTime.getRaw()); /* All parameters bound. Now execute the statement asynchronously */ future = cass_session_execute(session, statement); /* Wait for the statement to finish */ cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); error = JD_UNKNOWNERROR; } else { error = JD_OK; } cass_future_free(future); cass_statement_free(statement); return error; } /** * @details * Delete the entry with matching JobId and start TimeStamp from the data store. */ JDError JobDataStoreImpl::deleteJob(JobId jid, TimeStamp startTs) { JDError error = JD_UNKNOWNERROR; /* Remove entry from Cassandra */ CassError rc = CASS_OK; CassStatement* statement = nullptr; CassFuture* future = nullptr; const char* query = "DELETE FROM " JD_KEYSPACE_NAME "." CF_JOBDATA " WHERE jid = ? AND start_ts = ?;"; statement = cass_statement_new(query, 2); cass_statement_bind_int64(statement, 0, jid); cass_statement_bind_int64(statement, 1, startTs.getRaw()); /* All parameters bound. Now execute the statement asynchronously */ future = cass_session_execute(session, statement); /* Wait for the statement to finish */ cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); error = JD_UNKNOWNERROR; } else { error = JD_OK; } cass_future_free(future); cass_statement_free(statement); return error; } /** * @details * Find the entry in the data store with matching JobId and start_ts and store * the corresponding values in the JobData object. */ JDError JobDataStoreImpl::getJobByPrimaryKey(JobData& job, JobId jid, TimeStamp startTs) { JDError error = JD_UNKNOWNERROR; /* Select entry from Cassandra */ CassError rc = CASS_OK; CassStatement* statement = nullptr; CassFuture* future = nullptr; const char* query = "SELECT * FROM " JD_KEYSPACE_NAME "." CF_JOBDATA " WHERE jid = ? AND start_ts = ?;"; statement = cass_statement_new(query, 2); cass_statement_bind_int64(statement, 0, jid); cass_statement_bind_int64(statement, 1, startTs.getRaw()); /* All parameters bound. Now execute the statement asynchronously */ future = cass_session_execute(session, statement); /* Wait for the statement to finish */ cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); error = JD_UNKNOWNERROR; } else { error = JD_OK; const CassResult* cresult = cass_future_get_result(future); size_t rowCnt = cass_result_row_count(cresult); /* Check if the returned data is reasonable */ if (rowCnt == 0) { error = JD_JOBKEYNOTFOUND; } else { /* Retrieve data from result */ const CassRow* row = cass_result_first_row(cresult); cass_int64_t jobId, userId, startTs, endTs; /* jid and start_ts are always set. Other values should be checked */ cass_value_get_int64(cass_row_get_column_by_name(row, "jid"), &jobId); cass_value_get_int64(cass_row_get_column_by_name(row, "start_ts"), &startTs); if (cass_value_get_int64(cass_row_get_column_by_name(row, "uid"), &userId) != CASS_OK) { userId = 0; error = JD_PARSINGERROR; } if (cass_value_get_int64(cass_row_get_column_by_name(row, "end_ts"), &endTs) != CASS_OK) { endTs = 0; error = JD_PARSINGERROR; } /* Copy the data in the JobData object */ job.jobId = (JobId) jobId; job.userId = (UserId) userId; job.startTime = (uint64_t) startTs; job.endTime = (uint64_t) endTs; /* Do not forget about the nodes... */ const char* nodeStr; size_t nodeStr_len; const CassValue* set = cass_row_get_column_by_name(row, "nodes"); CassIterator* setIt = cass_iterator_from_collection(set); while (cass_iterator_next(setIt)) { cass_value_get_string(cass_iterator_get_value(setIt), &nodeStr, &nodeStr_len); job.nodes.emplace_back(nodeStr, nodeStr_len); } cass_iterator_free(setIt); } cass_result_free(cresult); } cass_future_free(future); cass_statement_free(statement); return error; } /** * @details * Find the entry in the data store with matching JobId and highest start_ts * value (= most recent job) and store the * corresponding values in the JobData object. */ JDError JobDataStoreImpl::getJobById(JobData& job, JobId jid) { JDError error = JD_UNKNOWNERROR; /* Select entry from Cassandra */ CassError rc = CASS_OK; CassStatement* statement = nullptr; CassFuture* future = nullptr; const char* query = "SELECT * FROM " JD_KEYSPACE_NAME "." CF_JOBDATA " WHERE jid = ? ORDER BY start_ts DESC LIMIT 1;"; statement = cass_statement_new(query, 1); cass_statement_bind_int64(statement, 0, jid); /* All parameters bound. Now execute the statement asynchronously */ future = cass_session_execute(session, statement); /* Wait for the statement to finish */ cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); error = JD_UNKNOWNERROR; } else { error = JD_OK; const CassResult* cresult = cass_future_get_result(future); size_t rowCnt = cass_result_row_count(cresult); /* Check if the returned data is reasonable */ if (rowCnt == 0) { error = JD_JOBIDNOTFOUND; } else { /* Retrieve data from result */ const CassRow* row = cass_result_first_row(cresult); cass_int64_t jobId, userId, startTs, endTs; /* jid and start_ts are always set. Other values should be checked */ cass_value_get_int64(cass_row_get_column_by_name(row, "jid"), &jobId); cass_value_get_int64(cass_row_get_column_by_name(row, "start_ts"), &startTs); if (cass_value_get_int64(cass_row_get_column_by_name(row, "uid"), &userId) != CASS_OK) { userId = 0; error = JD_PARSINGERROR; } if (cass_value_get_int64(cass_row_get_column_by_name(row, "end_ts"), &endTs) != CASS_OK) { endTs = 0; error = JD_PARSINGERROR; } /* Copy the data in the JobData object */ job.jobId = (JobId) jobId; job.userId = (UserId) userId; job.startTime = (uint64_t) startTs; job.endTime = (uint64_t) endTs; /* Do not forget about the nodes... */ const char* nodeStr; size_t nodeStr_len; const CassValue* set = cass_row_get_column_by_name(row, "nodes"); CassIterator* setIt = cass_iterator_from_collection(set); while (cass_iterator_next(setIt)) { cass_value_get_string(cass_iterator_get_value(setIt), &nodeStr, &nodeStr_len); job.nodes.emplace_back(nodeStr, nodeStr_len); } cass_iterator_free(setIt); } cass_result_free(cresult); } cass_future_free(future); cass_statement_free(statement); return error; } /** * @details * Find all entries in the data store whose start_ts AND end_ts lay within * the specified interval. Store the found entries in the JobData list. */ JDError JobDataStoreImpl::getJobsInIntervalExcl(std::list& jobs, TimeStamp intervalStart, TimeStamp intervalEnd) { /* Check if the input is valid and reasonable */ if (intervalEnd.getRaw() == 0) { return JD_BADPARAMS; } if (intervalStart >= intervalEnd) { return JD_BADPARAMS; } JDError error = JD_UNKNOWNERROR; /* Select entries from Cassandra */ CassError rc = CASS_OK; CassStatement* statement = nullptr; CassFuture* future = nullptr; const char* query = "SELECT * FROM " JD_KEYSPACE_NAME "." CF_JOBDATA " WHERE start_ts >= ? AND end_ts <= ? ;"; statement = cass_statement_new(query, 2); cass_statement_bind_int64(statement, 0, intervalStart.getRaw()); cass_statement_bind_int64(statement, 1, intervalEnd.getRaw()); /* All parameters bound. Now execute the statement asynchronously */ future = cass_session_execute(session, statement); /* Wait for the statement to finish */ cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); error = JD_UNKNOWNERROR; } else { error = JD_OK; /* Retrieve data from result */ const CassResult* cresult = cass_future_get_result(future); CassIterator* rowIt = cass_iterator_from_result(cresult); JobData job; while (cass_iterator_next(rowIt)) { const CassRow* row = cass_iterator_get_row(rowIt); cass_int64_t jobId, userId, startTs, endTs; /* jid and uid should always be set. Other values should be checked */ cass_value_get_int64(cass_row_get_column_by_name(row, "jid"), &jobId); cass_value_get_int64(cass_row_get_column_by_name(row, "uid"), &userId); if (cass_value_get_int64(cass_row_get_column_by_name(row, "start_ts"), &startTs) != CASS_OK) { startTs = 0; error = JD_PARSINGERROR; } if (cass_value_get_int64(cass_row_get_column_by_name(row, "end_ts"), &endTs) != CASS_OK) { endTs = 0; error = JD_PARSINGERROR; } /* Copy the data into job object */ job.jobId = (JobId) jobId; job.userId = (UserId) userId; job.startTime = (uint64_t) startTs; job.endTime = (uint64_t) endTs; /* Do not forget about the nodes... */ const char* nodeStr; size_t nodeStr_len; const CassValue* set = cass_row_get_column_by_name(row, "nodes"); CassIterator* setIt = cass_iterator_from_collection(set); while (cass_iterator_next(setIt)) { cass_value_get_string(cass_iterator_get_value(setIt), &nodeStr, &nodeStr_len); job.nodes.emplace_back(nodeStr, nodeStr_len); } //TODO job.nodes list deep copied? jobs.push_back(job); job.nodes.clear(); cass_iterator_free(setIt); } cass_iterator_free(rowIt); cass_result_free(cresult); } cass_future_free(future); cass_statement_free(statement); return error; } /** * @details * Find all entries in the data store whose start_ts OR end_ts lays within * the specified interval. Store the found entries in the JobData list. * Cassandra only supports AND conditions in its query language. Therefore * we cannot SELECT directly the required jobs. Instead we have to do two * selects and manually deduplicate the results. * TODO Doing two request successively may tangle up error codes. */ JDError JobDataStoreImpl::getJobsInIntervalIncl(std::list& jobs, TimeStamp intervalStart, TimeStamp intervalEnd) { /* Check if the input is valid and reasonable */ if (intervalEnd.getRaw() == 0) { return JD_BADPARAMS; } if (intervalStart >= intervalEnd) { return JD_BADPARAMS; } JDError error = JD_UNKNOWNERROR; /* +++ First SELECT +++ */ /* Select entries from Cassandra where start_ts lays within the interval */ CassError rc = CASS_OK; CassStatement* statement = nullptr; CassFuture* future = nullptr; const char* query = "SELECT * FROM " JD_KEYSPACE_NAME "." CF_JOBDATA " WHERE start_ts >= ? AND start_ts <= ? ;"; statement = cass_statement_new(query, 2); cass_statement_bind_int64(statement, 0, intervalStart.getRaw()); cass_statement_bind_int64(statement, 1, intervalEnd.getRaw()); /* All parameters bound. Now execute the statement asynchronously */ future = cass_session_execute(session, statement); /* Wait for the statement to finish */ cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); error = JD_UNKNOWNERROR; } else { error = JD_OK; /* Retrieve data from result */ const CassResult* cresult = cass_future_get_result(future); CassIterator* rowIt = cass_iterator_from_result(cresult); JobData job; while (cass_iterator_next(rowIt)) { const CassRow* row = cass_iterator_get_row(rowIt); cass_int64_t jobId, userId, startTs, endTs; /* jid and uid should always be set. Other values should be checked */ cass_value_get_int64(cass_row_get_column_by_name(row, "jid"), &jobId); cass_value_get_int64(cass_row_get_column_by_name(row, "uid"), &userId); if (cass_value_get_int64(cass_row_get_column_by_name(row, "start_ts"), &startTs) != CASS_OK) { startTs = 0; error = JD_PARSINGERROR; } if (cass_value_get_int64(cass_row_get_column_by_name(row, "end_ts"), &endTs) != CASS_OK) { endTs = 0; error = JD_PARSINGERROR; } /* Copy the data into job object */ job.jobId = (JobId) jobId; job.userId = (UserId) userId; job.startTime = (uint64_t) startTs; job.endTime = (uint64_t) endTs; /* Do not forget about the nodes... */ const char* nodeStr; size_t nodeStr_len; const CassValue* set = cass_row_get_column_by_name(row, "nodes"); CassIterator* setIt = cass_iterator_from_collection(set); while (cass_iterator_next(setIt)) { cass_value_get_string(cass_iterator_get_value(setIt), &nodeStr, &nodeStr_len); job.nodes.emplace_back(nodeStr, nodeStr_len); } //TODO job.nodes list deep copied? jobs.push_back(job); job.nodes.clear(); cass_iterator_free(setIt); } cass_iterator_free(rowIt); cass_result_free(cresult); } cass_future_free(future); cass_statement_free(statement); /* +++ Second SELECT +++ */ /* Select entries from Cassandra where end_ts lays within the interval */ query = "SELECT * FROM " JD_KEYSPACE_NAME "." CF_JOBDATA " WHERE end_ts >= ? AND end_ts <= ? ;"; statement = cass_statement_new(query, 2); cass_statement_bind_int64(statement, 0, intervalStart.getRaw()); cass_statement_bind_int64(statement, 1, intervalEnd.getRaw()); /* All parameters bound. Now execute the statement asynchronously */ future = cass_session_execute(session, statement); /* Wait for the statement to finish */ cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); error = JD_UNKNOWNERROR; } else { /* Retrieve data from result */ const CassResult* cresult = cass_future_get_result(future); CassIterator* rowIt = cass_iterator_from_result(cresult); JobData job; while (cass_iterator_next(rowIt)) { const CassRow* row = cass_iterator_get_row(rowIt); cass_int64_t jobId, userId, startTs, endTs; /* jid and uid should always be set. Other values should be checked */ cass_value_get_int64(cass_row_get_column_by_name(row, "jid"), &jobId); cass_value_get_int64(cass_row_get_column_by_name(row, "uid"), &userId); if (cass_value_get_int64(cass_row_get_column_by_name(row, "start_ts"), &startTs) != CASS_OK) { startTs = 0; error = JD_PARSINGERROR; } if (cass_value_get_int64(cass_row_get_column_by_name(row, "end_ts"), &endTs) != CASS_OK) { endTs = 0; error = JD_PARSINGERROR; } /* Copy the data into job object */ job.jobId = (JobId) jobId; /* Manual "deduplication" */ bool alreadyPresent = false; /* TODO Possible optimization: iterate only jobs from first SELECT */ for (const auto& j : jobs) { if (j.jobId == job.jobId) { alreadyPresent = true; } } if (!alreadyPresent) { job.userId = (UserId) userId; job.startTime = (uint64_t) startTs; job.endTime = (uint64_t) endTs; /* Do not forget about the nodes... */ const char* nodeStr; size_t nodeStr_len; const CassValue* set = cass_row_get_column_by_name(row, "nodes"); CassIterator* setIt = cass_iterator_from_collection(set); while (cass_iterator_next(setIt)) { cass_value_get_string(cass_iterator_get_value(setIt), &nodeStr, &nodeStr_len); job.nodes.emplace_back(nodeStr, nodeStr_len); } //TODO job.nodes list deep copied? jobs.push_back(job); job.nodes.clear(); cass_iterator_free(setIt); } } cass_iterator_free(rowIt); cass_result_free(cresult); } cass_future_free(future); cass_statement_free(statement); return error; } /** * @details * Find the entry in the data store with matching JobId and highest start_ts * value (= most recent job) and store the * corresponding nodes in the NodeList. */ JDError JobDataStoreImpl::getNodeList(NodeList& nodes, JobId jid, TimeStamp startTs) { JDError error = JD_UNKNOWNERROR; /* Select entry from Cassandra */ CassError rc = CASS_OK; CassStatement* statement = nullptr; CassFuture* future = nullptr; const char* query = "SELECT nodes FROM " JD_KEYSPACE_NAME "." CF_JOBDATA " WHERE jid = ? ORDER BY start_ts LIMIT 1;"; statement = cass_statement_new(query, 1); cass_statement_bind_int64(statement, 0, jid); /* All parameters bound. Now execute the statement asynchronously */ future = cass_session_execute(session, statement); /* Wait for the statement to finish */ cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); error = JD_UNKNOWNERROR; } else { error = JD_OK; const CassResult* cresult = cass_future_get_result(future); size_t rowCnt = cass_result_row_count(cresult); /* Check if the returned data is reasonable */ if (rowCnt == 0) { error = JD_JOBIDNOTFOUND; } else { /* Retrieve data from result */ const CassRow* row = cass_result_first_row(cresult); /* Copy the nodes in the NodeList */ const char* nodeStr; size_t nodeStr_len; const CassValue* set = cass_row_get_column_by_name(row, "nodes"); CassIterator* setIt = cass_iterator_from_collection(set); while (cass_iterator_next(setIt)) { cass_value_get_string(cass_iterator_get_value(setIt), &nodeStr, &nodeStr_len); nodes.emplace_back(nodeStr, nodeStr_len); } cass_iterator_free(setIt); } cass_result_free(cresult); } cass_future_free(future); cass_statement_free(statement); return error; } /** * @details * This constructor sets the internal connection variable to * the externally provided Connection object and also * retrieves the CassSession pointer of the connection. */ JobDataStoreImpl::JobDataStoreImpl(Connection* conn) { connection = conn; session = connection->getSessionHandle(); preparedInsert = nullptr; prepareInsert(0); } /** * @details * The destructor just resets the internal pointers. Deletion of the pointers * (except preparedInsert) is not our responsibility. */ JobDataStoreImpl::~JobDataStoreImpl() { connection = nullptr; session = nullptr; if (preparedInsert) { cass_prepared_free(preparedInsert); } } /* ########################################################################## */ /** * @details * Instead of doing the actual work, this function simply forwards to the * corresponding function of the JobDataStoreImpl class. */ JDError JobDataStore::insertJob(JobData& jdata) { return impl->insertJob(jdata); } /** * @details * Instead of doing the actual work, this function simply forwards to the * corresponding function of the JobDataStoreImpl class. */ JDError JobDataStore::updateJob(JobData& jdata) { return impl->updateJob(jdata); } /** * @details * Instead of doing the actual work, this function simply forwards to the * corresponding function of the JobDataStoreImpl class. */ JDError JobDataStore::updateEndtime(JobId jobId, TimeStamp startTs, TimeStamp endTime) { return impl->updateEndtime(jobId, startTs, endTime); } /** * @details * Instead of doing the actual work, this function simply forwards to the * corresponding function of the JobDataStoreImpl class. */ JDError JobDataStore::deleteJob(JobId jid, TimeStamp startTs) { return impl->deleteJob(jid, startTs); } /** * @details * Instead of doing the actual work, this function simply forwards to the * corresponding function of the JobDataStoreImpl class. */ JDError JobDataStore::getJobByPrimaryKey(JobData& job, JobId jid, TimeStamp startTs) { return impl->getJobByPrimaryKey(job, jid, startTs); } /** * @details * Instead of doing the actual work, this function simply forwards to the * corresponding function of the JobDataStoreImpl class. */ JDError JobDataStore::getJobById(JobData& job, JobId jid) { return impl->getJobById(job, jid); } /** * @details * Instead of doing the actual work, this function simply forwards to the * corresponding function of the JobDataStoreImpl class. */ JDError JobDataStore::getJobsInIntervalExcl(std::list& jobs, TimeStamp intervalStart, TimeStamp intervalEnd) { return impl->getJobsInIntervalExcl(jobs, intervalStart, intervalEnd); } /** * @details * Instead of doing the actual work, this function simply forwards to the * corresponding function of the JobDataStoreImpl class. */ JDError JobDataStore::getJobsInIntervalIncl(std::list& jobs, TimeStamp intervalStart, TimeStamp intervalEnd) { return impl->getJobsInIntervalIncl(jobs, intervalStart, intervalEnd); } /** * @details * Instead of doing the actual work, this function simply forwards to the * corresponding function of the JobDataStoreImpl class. */ JDError JobDataStore::getNodeList(NodeList& nodes, JobId jid, TimeStamp startTs) { return impl->getNodeList(nodes, jid, startTs); } /** * @details * This constructor allocates the implementation class which * holds the actual implementation of the class functionality. */ JobDataStore::JobDataStore(Connection* conn) { impl = new JobDataStoreImpl(conn); } /** * @details * The JobDataStore destructor deallocates the * JobDataStoreImpl and CassandraBackend objects. */ JobDataStore::~JobDataStore() { /* Clean up... */ if (impl) { delete impl; } }