Commit 898e2bc1 authored by Micha Mueller's avatar Micha Mueller
Browse files

WIP: Fill jobdatastore interface with live

parent a042e498
......@@ -143,7 +143,7 @@ namespace DCDB {
*
* @param jobs Reference to a list of JobData that will be
* populated with the jobs.
* @param intervalStart Starting time of the interval.
* @param intervalStart Start time of the interval.
* @param intervalEnd End time of the interval.
* @return See JDError.
*/
......@@ -155,13 +155,13 @@ namespace DCDB {
* @brief Retrieve an inclusive list of jobs which were run in the given
* time interval.
*
* @details INCLUSIVE version; all jobs whose start OR end time lay
* @details INCLUSIVE version; all jobs whose start OR end time lays
* within the interval are returned. See also
* getJobsInIntervalExcl().
*
* @param jobs Reference to a list of JobData that will be
* populated with the jobs.
* @param intervalStart Starting time of the interval.
* @param intervalStart Start time of the interval.
* @param intervalEnd End time of the interval.
* @return See JDError.
*/
......
......@@ -33,6 +33,11 @@
#ifndef DCDB_GLOBALS_H
#define DCDB_GLOBALS_H
/* Legend:
* CF = Column Family
* JD = Job Data
*/
#define KEYSPACE_NAME "dcdb"
#define CF_SENSORDATA "sensordata"
#define SENSORDATA_GC_GRACE_SECONDS "600"
......@@ -42,5 +47,6 @@
#define CF_VIRTUALSENSORS "virtualsensors"
#define JD_KEYSPACE_NAME KEYSPACE_NAME "_jobdata"
#define CF_JOBDATA "jobdata"
#endif /* DCDB_GLOBALS_H */
......@@ -460,6 +460,32 @@ bool ConnectionImpl::initSchema() {
"COMPACT STORAGE AND gc_grace_seconds = " SENSORDATA_GC_GRACE_SECONDS );
}
/* Keyspace and column family for job data */
if (!existsKeyspace(JD_KEYSPACE_NAME)) {
std::cout << "Creating Keyspace " << JD_KEYSPACE_NAME << "...\n";
createKeyspace(JD_KEYSPACE_NAME, 1);
}
selectKeyspace(JD_KEYSPACE_NAME);
if (!(getActiveKeyspace().compare(JD_KEYSPACE_NAME) == 0)) {
std::cout << "Cannot select keyspace " << JD_KEYSPACE_NAME << "\n";
return false;
}
if (!existsColumnFamily(CF_JOBDATA)) {
std::cout << "Creating Column Familiy " CF_JOBDATA "...\n";
createColumnFamily(CF_JOBDATA,
"jid bigint, " /* Job Id */
"uid bigint, " /* User Id */
"start_ts bigint, " /* Start timestamp of the job */
"end_ts bigint, " /* End timestamp of the job */
"nodes set<varchar>", /* Set of nodes used by the job */
"jid", /* Make the "jid" column the primary key */
""); /* No further options required */
}
return true;
}
......
......@@ -25,9 +25,13 @@
//================================================================================
/**
* //TODO documentary
* @file
* @brief This file contains actual implementations of the jobdatastore
* interface. The interface itself forwards all functions to the internal
* JobDataStoreImpl. The real logic is implemented in the JobDataStoreImpl.
*/
#include <cinttypes>
#include <list>
#include <string>
......@@ -59,13 +63,14 @@ void JobDataStoreImpl::prepareInsert(uint64_t ttl) {
char *queryBuf = NULL;
if (ttl == 0) {
query = "INSERT INTO dcdb.jobdata (jid, uid, startTs, endTs, nodes) "
"VALUES (?, ?, ?, ?, ?);";
query = "INSERT INTO " JD_KEYSPACE_NAME "." CF_JOBDATA
" (jid, uid, start_ts, end_ts, nodes) VALUES (?, ?, ?, ?, ?);";
}
else {
queryBuf = (char*)malloc(256);
snprintf(queryBuf, 256, "INSERT INTO dcdb.jobdata (jid, uid, startTs, endTs, nodes) "
"VALUES (?, ?, ?, ?, ?) USING TTL %" PRIu64 " ;", ttl);
snprintf(queryBuf, 256, "INSERT INTO " JD_KEYSPACE_NAME "." CF_JOBDATA
" (jid, uid, start_ts, end_ts, nodes) VALUES (?, ?, ?, ?, ?) "
"USING TTL %" PRIu64 " ;", ttl);
query = queryBuf;
}
......@@ -87,11 +92,50 @@ void JobDataStoreImpl::prepareInsert(uint64_t ttl) {
/**
* @details
* //TODO
* Extract all data from the JobData object and push it into the data store.
*/
JDError JobDataStoreImpl::insertJob(JobData& jdata) {
//TODO
return JD_UNKNOWNERROR;
/* Insert into Cassandra */
CassError rc = CASS_OK;
CassStatement* statement = NULL;
CassFuture *future = NULL;
statement = cass_prepared_bind(preparedInsert);
cass_statement_bind_int64_by_name(statement, "jid", jdata.jobId);
cass_statement_bind_int64_by_name(statement, "uid", jdata.userId);
cass_statement_bind_int64_by_name(statement, "start_ts",
jdata.startTime.getRaw());
cass_statement_bind_int64_by_name(statement, "end_ts",
jdata.endTime.getRaw());
/* Bind the string node list to a varchar set */
CassCollection* set = cass_collection_new(CASS_COLLECTION_TYPE_SET,
jdata.nodes.size());
for (auto& s : jdata.nodes) {
cass_collection_append_string(set, s.c_str());
}
cass_statement_bind_collection_by_name(statement, "nodes", set);
/* The collection can be freed after binding */
cass_collection_free(set);
future = cass_session_execute(session, statement);
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
cass_future_free(future);
cass_statement_free(statement);
return JD_UNKNOWNERROR;
}
cass_future_free(future);
cass_statement_free(statement);
return JD_OK;
}
/**
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment