//================================================================================ // Name : jobdatastore.cpp // Author : Axel Auweter, Micha Mueller // Copyright : Leibniz Supercomputing Centre // Description : C++ API implementation for inserting and querying DCDB job data. //================================================================================ //================================================================================ // This file is part of DCDB (DataCenter DataBase) // Copyright (C) 2011-2018 Leibniz Supercomputing Centre // // This library is free software; you can redistribute it and/or // modify it under the terms of the GNU Lesser General Public // License as published by the Free Software Foundation; either // version 2.1 of the License, or (at your option) any later version. // // This library is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU // Lesser General Public License for more details. // // You should have received a copy of the GNU Lesser General Public // License along with this library; if not, write to the Free Software // Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA //================================================================================ /** * @file * @brief This file contains actual implementations of the jobdatastore * interface. The interface itself forwards all functions to the internal * JobDataStoreImpl. The real logic is implemented in the JobDataStoreImpl. */ #include #include #include #include "cassandra.h" #include "dcdb/jobdatastore.h" #include "jobdatastore_internal.h" #include "dcdb/connection.h" #include "dcdbglobals.h" using namespace DCDB; /** * @details * Since we want high-performance inserts, we prepare the insert CQL query in * advance and only bind it on the actual insert. */ void JobDataStoreImpl::prepareInsert(uint64_t ttl) { CassError rc = CASS_OK; CassFuture* future = NULL; const char* query; /* * Free the old prepared if necessary. */ if (preparedInsert) { cass_prepared_free(preparedInsert); } char *queryBuf = NULL; if (ttl == 0) { query = "INSERT INTO " JD_KEYSPACE_NAME "." CF_JOBDATA " (jid, uid, start_ts, end_ts, nodes) VALUES (?, ?, ?, ?, ?);"; } else { queryBuf = (char*)malloc(256); snprintf(queryBuf, 256, "INSERT INTO " JD_KEYSPACE_NAME "." CF_JOBDATA " (jid, uid, start_ts, end_ts, nodes) VALUES (?, ?, ?, ?, ?) " "USING TTL %" PRIu64 " ;", ttl); query = queryBuf; } future = cass_session_prepare(session, query); cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); } else { preparedInsert = cass_future_get_prepared(future); } cass_future_free(future); if (queryBuf) { free(queryBuf); } } /** * @details * Extract all data from the JobData object and push it into the data store. */ JDError JobDataStoreImpl::insertJob(JobData& jdata) { /* Insert into Cassandra */ CassError rc = CASS_OK; CassStatement* statement = NULL; CassFuture *future = NULL; statement = cass_prepared_bind(preparedInsert); cass_statement_bind_int64_by_name(statement, "jid", jdata.jobId); cass_statement_bind_int64_by_name(statement, "uid", jdata.userId); cass_statement_bind_int64_by_name(statement, "start_ts", jdata.startTime.getRaw()); cass_statement_bind_int64_by_name(statement, "end_ts", jdata.endTime.getRaw()); /* Bind the string node list to a varchar set */ CassCollection* set = cass_collection_new(CASS_COLLECTION_TYPE_SET, jdata.nodes.size()); for (auto& s : jdata.nodes) { cass_collection_append_string(set, s.c_str()); } cass_statement_bind_collection_by_name(statement, "nodes", set); /* The collection can be freed after binding */ cass_collection_free(set); future = cass_session_execute(session, statement); cass_future_wait(future); rc = cass_future_error_code(future); if (rc != CASS_OK) { connection->printError(future); cass_future_free(future); cass_statement_free(statement); return JD_UNKNOWNERROR; } cass_future_free(future); cass_statement_free(statement); return JD_OK; } /** * @details * //TODO */ JDError JobDataStoreImpl::insertSubmittedJob(JobId jid, UserId uid) { //TODO return JD_UNKNOWNERROR; } /** * @details * //TODO */ JDError JobDataStoreImpl::updateJob(JobData& jdata) { //TODO return JD_UNKNOWNERROR; } /** * @details * //unnecessary but kept for completeness */ JDError JobDataStoreImpl::removeJob(JobId jid) { //TODO return JD_UNKNOWNERROR; } /** * @details * //TODO */ JDError JobDataStoreImpl::getJobById(JobData& job, JobId jid) { //TODO return JD_UNKNOWNERROR; } /** * @details * //TODO */ JDError JobDataStoreImpl::getJobsInIntervalExcl(std::list& jobs, TimeStamp intervalStart, TimeStamp intervalEnd) { //TODO return JD_UNKNOWNERROR; } /** * @details * //TODO */ JDError JobDataStoreImpl::getJobsInIntervalIncl(std::list& jobs, TimeStamp intervalStart, TimeStamp intervalEnd) { //TODO return JD_UNKNOWNERROR; } /** * @details * //TODO */ JDError JobDataStoreImpl::getNodeList(NodeList& nodes, JobId jid) { //TODO return JD_UNKNOWNERROR; } /** * @details * This constructor sets the internal connection variable to * the externally provided Connection object and also * retrieves the CassSession pointer of the connection. */ JobDataStoreImpl::JobDataStoreImpl(Connection* conn) { connection = conn; session = connection->getSessionHandle(); preparedInsert = nullptr; prepareInsert(0); } /** * @details * The destructor just resets the internal pointers. Deletion of the pointers * (except preparedInsert) is not our responsibility. */ JobDataStoreImpl::~JobDataStoreImpl() { connection = nullptr; session = nullptr; if (preparedInsert) { cass_prepared_free(preparedInsert); } } /* ########################################################################## */ /** * @details * Instead of doing the actual work, this function simply forwards to the * corresponding function of the JobDataStoreImpl class. */ JDError JobDataStore::insertJob(JobData& jdata) { return impl->insertJob(jdata); } /** * @details * Instead of doing the actual work, this function simply forwards to the * corresponding function of the JobDataStoreImpl class. */ JDError JobDataStore::insertSubmittedJob(JobId jid, UserId uid) { return impl->insertSubmittedJob(jid, uid); } /** * @details * Instead of doing the actual work, this function simply forwards to the * corresponding function of the JobDataStoreImpl class. */ JDError JobDataStore::updateJob(JobData& jdata) { return impl->updateJob(jdata); } /** * @details * Instead of doing the actual work, this function simply forwards to the * corresponding function of the JobDataStoreImpl class. */ JDError JobDataStore::removeJob(JobId jid) { return impl->removeJob(jid); } /** * @details * Instead of doing the actual work, this function simply forwards to the * corresponding function of the JobDataStoreImpl class. */ JDError JobDataStore::getJobById(JobData& job, JobId jid) { return impl->getJobById(job, jid); } /** * @details * Instead of doing the actual work, this function simply forwards to the * corresponding function of the JobDataStoreImpl class. */ JDError JobDataStore::getJobsInIntervalExcl(std::list& jobs, TimeStamp intervalStart, TimeStamp intervalEnd) { return impl->getJobsInIntervalExcl(jobs, intervalStart, intervalEnd); } /** * @details * Instead of doing the actual work, this function simply forwards to the * corresponding function of the JobDataStoreImpl class. */ JDError JobDataStore::getJobsInIntervalIncl(std::list& jobs, TimeStamp intervalStart, TimeStamp intervalEnd) { return impl->getJobsInIntervalIncl(jobs, intervalStart, intervalEnd); } /** * @details * Instead of doing the actual work, this function simply forwards to the * corresponding function of the JobDataStoreImpl class. */ JDError JobDataStore::getNodeList(NodeList& nodes, JobId jid) { return impl->getNodeList(nodes, jid); } /** * @details * This constructor allocates the implementation class which * holds the actual implementation of the class functionality. */ JobDataStore::JobDataStore(Connection* conn) { impl = new JobDataStoreImpl(conn); } /** * @details * The JobDataStore destructor deallocates the * JobDataStoreImpl and CassandraBackend objects. */ JobDataStore::~JobDataStore() { /* Clean up... */ if (impl) { delete impl; } }