Commit 2564afba authored by Alessio Netti's avatar Alessio Netti
Browse files

Analytics: bugfixes for job analyzers

- Fixed a fair amount of bugs related to job analyzers
- Added a "getJobsInIntervalRunning" method to JobDataStore, to get
the list of jobs that were running in a certain time interval
- Some code refactoring for the JobDataStore class to reduce duplication
- Job analyzers have been tested and are working properly as of now
parent 91e3805b
......@@ -519,12 +519,12 @@ protected:
}
for (auto &u: *units) {
if(!constructSensorTopics(*u, an)) {
an.clearUnits();
delete units;
return false;
}
if (an.getStreaming()) {
if(!constructSensorTopics(*u, an)) {
an.clearUnits();
delete units;
return false;
}
if (!unit(*u)) {
LOG(error) << " Unit " << u->getName() << " did not pass the final check!";
an.clearUnits();
......
......@@ -113,8 +113,8 @@ protected:
delete units;
an.clearUnits();
if(!this->constructSensorTopics(*jobUnit, an))
return false;
//if(!this->constructSensorTopics(*jobUnit, an))
// return false;
if (this->unit(*jobUnit)) {
an.addToUnitCache(jobUnit);
LOG(debug) << " Template job unit " + jobUnit->getName() + " generated.";
......
......@@ -216,12 +216,14 @@ protected:
if (this->_unitCache->count(jobTopic)) {
jobUnit = this->_unitCache->at(jobTopic);
LOG(debug) << "Analyzer " << this->_name << ": cache hit for unit " << jobTopic << ".";
if(!this->_streaming)
LOG(debug) << "Analyzer " << this->_name << ": cache hit for unit " << jobTopic << ".";
} else {
if (!this->_unitCache->count(SensorNavigator::templateKey))
throw std::runtime_error("No template unit in analyzer " + this->_name + "!");
LOG(debug) << "Analyzer " << this->_name << ": cache miss for unit " << jobTopic << ".";
if(!this->_streaming)
LOG(debug) << "Analyzer " << this->_name << ": cache miss for unit " << jobTopic << ".";
U_Ptr uTemplate = this->_unitCache->at(SensorNavigator::templateKey);
shared_ptr<SensorNavigator> navi = this->_queryEngine.getNavigator();
UnitGenerator<S> unitGen(navi);
......@@ -251,7 +253,7 @@ protected:
* @param n Raw node hostname
* @return Converted sensor navigator-friendly node name
*/
virtual string translateNodeName(string n) { return n; }
virtual string translateNodeName(string n) { return MQTTChecker::formatTopic(n) + std::string(1, MQTT_SEP); }
/**
* @brief Performs a compute task
......@@ -283,7 +285,10 @@ protected:
for(const auto& job : *_jobDataVec) {
try {
_tempUnits.push_back(jobDataToUnit(job));
} catch(const invalid_argument& e2) { _tempUnits.push_back(nullptr); continue; }
} catch(const invalid_argument& e2) {
LOG(error) << e2.what();
_tempUnits.push_back(nullptr);
continue; }
}
// Performing actual computation on each unit
......
......@@ -178,7 +178,7 @@ public:
if(!boost::regex_search(s.c_str(), _match, _blockRx))
throw invalid_argument("JobUnitGenerator: sensor string is incorrectly formatted!");
else
boost::regex_replace(newName, _blockRx, job);
newName = boost::regex_replace(newName, _blockRx, job);
return newName;
}
......@@ -330,7 +330,7 @@ public:
for(const auto& out : outputs) {
shared_ptr<SBase> uOut = make_shared<SBase>(*out);
uOut->setName(resolveJobString(uOut->getName(), j));
uOut->setMqtt(_navi->buildTopicForNode(j, uOut->getMqtt()));
uOut->setMqtt(MQTTChecker::formatTopic(j) + MQTTChecker::formatTopic(uOut->getMqtt()));
// Duplicating the file sink adding the name of each unit to the path
if(uOut->getSinkPath()!="")
uOut->setSinkPath(MQTTChecker::topicToFile(uOut->getMqtt(), uOut->getSinkPath()));
......
......@@ -99,7 +99,7 @@ std::vector<qeJobData>* jobQueryCallback(const uint32_t jobId, const uint64_t st
uint64_t startTsInt = rel ? now - startTs : startTs;
uint64_t endTsInt = rel ? now - endTs : endTs;
DCDB::TimeStamp start(startTsInt), end(endTsInt);
err = myJobDataStore->getJobsInIntervalExcl(tempList, start, end);
err = myJobDataStore->getJobsInIntervalRunning(tempList, start, end);
if(err != JD_OK) return NULL;
} else {
// Getting a single job by id
......
......@@ -68,7 +68,7 @@ public:
* @return The processed MQTT topic
*/
static std::string jobToTopic(uint32_t jobId) {
return "/job" + std::to_string(jobId);
return "/job" + std::to_string(jobId) + "/";
}
/**
......
......@@ -179,6 +179,24 @@ namespace DCDB {
JDError getJobsInIntervalExcl(std::list<JobData>& jobs,
TimeStamp intervalStart,
TimeStamp intervalEnd);
/**
* @brief Retrieve an inclusive list of jobs which were run in the given
* time interval.
*
* @details Find all entries in the data store corresponding to jobs that were running in
* the queried time interval, i.e., their start time is less than the queried intervalEnd,
* and their end time is 0 or greater than startInterval.
*
* @param jobs Reference to a list of JobData that will be
* populated with the jobs.
* @param intervalStart Start time of the interval.
* @param intervalEnd End time of the interval.
* @return See JDError.
*/
JDError getJobsInIntervalRunning(std::list<JobData>& jobs,
TimeStamp intervalStart,
TimeStamp intervalEnd);
/**
* @brief Retrieve an inclusive list of jobs which were run in the given
......
......@@ -35,6 +35,7 @@
#define DCDB_JOBDATASTORE_INTERNAL_H
#include <list>
#include <unordered_set>
#include <string>
#include "dcdb/jobdatastore.h"
......@@ -73,6 +74,9 @@ namespace DCDB {
JDError getJobsInIntervalExcl(std::list<JobData>& jobs,
TimeStamp intervalStart,
TimeStamp intervalEnd);
JDError getJobsInIntervalRunning(std::list<JobData>& jobs,
TimeStamp intervalStart,
TimeStamp intervalEnd);
JDError getJobsInIntervalIncl(std::list<JobData>& jobs,
TimeStamp intervalStart,
TimeStamp intervalEnd);
......@@ -80,6 +84,11 @@ namespace DCDB {
JobDataStoreImpl(Connection* conn);
virtual ~JobDataStoreImpl();
private:
// Private utility method to avoid code duplication
JDError parseJobs(CassIterator* rowIt, std::list<JobData>& jobs, std::unordered_set<uint32_t>* jobIds);
};
} /* End of namespace DCDB */
......
......@@ -502,7 +502,7 @@ JDError JobDataStoreImpl::getJobsInIntervalExcl(std::list<JobData>& jobs,
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
const char* query = "SELECT * FROM " JD_KEYSPACE_NAME "." CF_JOBDATA
" WHERE start_ts >= ? AND end_ts <= ? ;";
" WHERE start_ts >= ? AND end_ts <= ? ALLOW FILTERING;";
statement = cass_statement_new(query, 2);
......@@ -525,61 +525,113 @@ JDError JobDataStoreImpl::getJobsInIntervalExcl(std::list<JobData>& jobs,
/* Retrieve data from result */
const CassResult* cresult = cass_future_get_result(future);
CassIterator* rowIt = cass_iterator_from_result(cresult);
error = parseJobs(rowIt, jobs, NULL);
cass_iterator_free(rowIt);
cass_result_free(cresult);
}
JobData job;
cass_future_free(future);
cass_statement_free(statement);
while (cass_iterator_next(rowIt)) {
const CassRow* row = cass_iterator_get_row(rowIt);
return error;
}
cass_int64_t jobId, userId, startTs, endTs;
/**
* @details
* Find all entries in the data store corresponding to jobs that were running
* in the queried time interval, i.e., their start time is less than the queried
* intervalEnd, and their end time is 0 or greater than startInterval.
*/
JDError JobDataStoreImpl::getJobsInIntervalRunning(std::list<JobData>& jobs,
TimeStamp intervalStart,
TimeStamp intervalEnd) {
/* Check if the input is valid and reasonable */
if (intervalEnd.getRaw() == 0) {
return JD_BADPARAMS;
}
if (intervalStart >= intervalEnd) {
return JD_BADPARAMS;
}
/* jid and uid should always be set. Other values should be checked */
cass_value_get_int64(cass_row_get_column_by_name(row, "jid"), &jobId);
cass_value_get_int64(cass_row_get_column_by_name(row, "uid"), &userId);
if (cass_value_get_int64(cass_row_get_column_by_name(row, "start_ts"),
&startTs) != CASS_OK) {
startTs = 0;
error = JD_PARSINGERROR;
}
if (cass_value_get_int64(cass_row_get_column_by_name(row, "end_ts"),
&endTs) != CASS_OK) {
endTs = 0;
error = JD_PARSINGERROR;
}
JDError error = JD_UNKNOWNERROR;
/* Copy the data into job object */
job.jobId = (JobId) jobId;
job.userId = (UserId) userId;
job.startTime = (uint64_t) startTs;
job.endTime = (uint64_t) endTs;
/* +++ First SELECT +++ */
/* Select entries from Cassandra where start_ts lays within the interval */
std::unordered_set<uint32_t> jobIds;
CassError rc = CASS_OK;
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
const char* query = "SELECT * FROM " JD_KEYSPACE_NAME "." CF_JOBDATA
" WHERE start_ts <= ? AND end_ts = ? ALLOW FILTERING;";
/* Do not forget about the nodes... */
const char* nodeStr;
size_t nodeStr_len;
statement = cass_statement_new(query, 2);
const CassValue* set = cass_row_get_column_by_name(row, "nodes");
CassIterator* setIt = cass_iterator_from_collection(set);
cass_statement_bind_int64(statement, 0, intervalEnd.getRaw());
cass_statement_bind_int64(statement, 1, (int64_t)0);
while (cass_iterator_next(setIt)) {
cass_value_get_string(cass_iterator_get_value(setIt),
&nodeStr, &nodeStr_len);
job.nodes.emplace_back(nodeStr, nodeStr_len);
}
/* All parameters bound. Now execute the statement asynchronously */
future = cass_session_execute(session, statement);
//TODO job.nodes list deep copied?
jobs.push_back(job);
job.nodes.clear();
cass_iterator_free(setIt);
/* Wait for the statement to finish */
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
error = JD_UNKNOWNERROR;
} else {
error = JD_OK;
/* Retrieve data from result */
const CassResult* cresult = cass_future_get_result(future);
CassIterator* rowIt = cass_iterator_from_result(cresult);
error = parseJobs(rowIt, jobs, &jobIds);
cass_iterator_free(rowIt);
cass_result_free(cresult);
}
cass_iterator_free(rowIt);
cass_result_free(cresult);
}
cass_future_free(future);
cass_statement_free(statement);
cass_future_free(future);
cass_statement_free(statement);
/* +++ Second SELECT +++ */
/* Select entries from Cassandra where end_ts lays within the interval */
query = "SELECT * FROM " JD_KEYSPACE_NAME "." CF_JOBDATA
" WHERE start_ts <= ? AND end_ts >= ? ALLOW FILTERING;";
return error;
statement = cass_statement_new(query, 2);
cass_statement_bind_int64(statement, 0, intervalEnd.getRaw());
cass_statement_bind_int64(statement, 1, intervalStart.getRaw());
/* All parameters bound. Now execute the statement asynchronously */
future = cass_session_execute(session, statement);
/* Wait for the statement to finish */
cass_future_wait(future);
rc = cass_future_error_code(future);
if (rc != CASS_OK) {
connection->printError(future);
error = JD_UNKNOWNERROR;
} else {
/* Retrieve data from result */
const CassResult* cresult = cass_future_get_result(future);
CassIterator* rowIt = cass_iterator_from_result(cresult);
error = parseJobs(rowIt, jobs, &jobIds);
cass_iterator_free(rowIt);
cass_result_free(cresult);
}
cass_future_free(future);
cass_statement_free(statement);
return error;
}
/**
......@@ -606,6 +658,7 @@ JDError JobDataStoreImpl::getJobsInIntervalIncl(std::list<JobData>& jobs,
/* +++ First SELECT +++ */
/* Select entries from Cassandra where start_ts lays within the interval */
std::unordered_set<uint32_t> jobIds;
CassError rc = CASS_OK;
CassStatement* statement = nullptr;
CassFuture* future = nullptr;
......@@ -634,51 +687,7 @@ JDError JobDataStoreImpl::getJobsInIntervalIncl(std::list<JobData>& jobs,
const CassResult* cresult = cass_future_get_result(future);
CassIterator* rowIt = cass_iterator_from_result(cresult);
JobData job;
while (cass_iterator_next(rowIt)) {
const CassRow* row = cass_iterator_get_row(rowIt);
cass_int64_t jobId, userId, startTs, endTs;
/* jid and uid should always be set. Other values should be checked */
cass_value_get_int64(cass_row_get_column_by_name(row, "jid"), &jobId);
cass_value_get_int64(cass_row_get_column_by_name(row, "uid"), &userId);
if (cass_value_get_int64(cass_row_get_column_by_name(row, "start_ts"),
&startTs) != CASS_OK) {
startTs = 0;
error = JD_PARSINGERROR;
}
if (cass_value_get_int64(cass_row_get_column_by_name(row, "end_ts"),
&endTs) != CASS_OK) {
endTs = 0;
error = JD_PARSINGERROR;
}
/* Copy the data into job object */
job.jobId = (JobId) jobId;
job.userId = (UserId) userId;
job.startTime = (uint64_t) startTs;
job.endTime = (uint64_t) endTs;
/* Do not forget about the nodes... */
const char* nodeStr;
size_t nodeStr_len;
const CassValue* set = cass_row_get_column_by_name(row, "nodes");
CassIterator* setIt = cass_iterator_from_collection(set);
while (cass_iterator_next(setIt)) {
cass_value_get_string(cass_iterator_get_value(setIt),
&nodeStr, &nodeStr_len);
job.nodes.emplace_back(nodeStr, nodeStr_len);
}
//TODO job.nodes list deep copied?
jobs.push_back(job);
job.nodes.clear();
cass_iterator_free(setIt);
}
error = parseJobs(rowIt, jobs, &jobIds);
cass_iterator_free(rowIt);
cass_result_free(cresult);
......@@ -690,7 +699,7 @@ JDError JobDataStoreImpl::getJobsInIntervalIncl(std::list<JobData>& jobs,
/* +++ Second SELECT +++ */
/* Select entries from Cassandra where end_ts lays within the interval */
query = "SELECT * FROM " JD_KEYSPACE_NAME "." CF_JOBDATA
" WHERE end_ts >= ? AND end_ts <= ? ;";
" WHERE end_ts >= ? AND end_ts <= ? ALLOW FILTERING;";
statement = cass_statement_new(query, 2);
......@@ -712,63 +721,7 @@ JDError JobDataStoreImpl::getJobsInIntervalIncl(std::list<JobData>& jobs,
const CassResult* cresult = cass_future_get_result(future);
CassIterator* rowIt = cass_iterator_from_result(cresult);
JobData job;
while (cass_iterator_next(rowIt)) {
const CassRow* row = cass_iterator_get_row(rowIt);
cass_int64_t jobId, userId, startTs, endTs;
/* jid and uid should always be set. Other values should be checked */
cass_value_get_int64(cass_row_get_column_by_name(row, "jid"), &jobId);
cass_value_get_int64(cass_row_get_column_by_name(row, "uid"), &userId);
if (cass_value_get_int64(cass_row_get_column_by_name(row, "start_ts"),
&startTs) != CASS_OK) {
startTs = 0;
error = JD_PARSINGERROR;
}
if (cass_value_get_int64(cass_row_get_column_by_name(row, "end_ts"),
&endTs) != CASS_OK) {
endTs = 0;
error = JD_PARSINGERROR;
}
/* Copy the data into job object */
job.jobId = (JobId) jobId;
/* Manual "deduplication" */
bool alreadyPresent = false;
/* TODO Possible optimization: iterate only jobs from first SELECT */
for (const auto& j : jobs) {
if (j.jobId == job.jobId) {
alreadyPresent = true;
}
}
if (!alreadyPresent) {
job.userId = (UserId) userId;
job.startTime = (uint64_t) startTs;
job.endTime = (uint64_t) endTs;
/* Do not forget about the nodes... */
const char* nodeStr;
size_t nodeStr_len;
const CassValue* set = cass_row_get_column_by_name(row, "nodes");
CassIterator* setIt = cass_iterator_from_collection(set);
while (cass_iterator_next(setIt)) {
cass_value_get_string(cass_iterator_get_value(setIt),
&nodeStr, &nodeStr_len);
job.nodes.emplace_back(nodeStr, nodeStr_len);
}
//TODO job.nodes list deep copied?
jobs.push_back(job);
job.nodes.clear();
cass_iterator_free(setIt);
}
}
error = parseJobs(rowIt, jobs, &jobIds);
cass_iterator_free(rowIt);
cass_result_free(cresult);
......@@ -780,6 +733,55 @@ JDError JobDataStoreImpl::getJobsInIntervalIncl(std::list<JobData>& jobs,
return error;
}
JDError JobDataStoreImpl::parseJobs(CassIterator* rowIt, std::list<JobData>& jobs, std::unordered_set<uint32_t>* jobIds) {
JDError error = JD_OK;
cass_int64_t jobId, userId, startTs, endTs;
JobData job;
while (cass_iterator_next(rowIt)) {
const CassRow *row = cass_iterator_get_row(rowIt);
/* jid and uid should always be set. Other values should be checked */
cass_value_get_int64(cass_row_get_column_by_name(row, "jid"), &jobId);
cass_value_get_int64(cass_row_get_column_by_name(row, "uid"), &userId);
if (cass_value_get_int64(cass_row_get_column_by_name(row, "start_ts"), &startTs) != CASS_OK) {
startTs = 0;
error = JD_PARSINGERROR;
}
if (cass_value_get_int64(cass_row_get_column_by_name(row, "end_ts"), &endTs) != CASS_OK) {
endTs = 0;
error = JD_PARSINGERROR;
}
/* Copy the data into job object */
job.jobId = (JobId) jobId;
/* Set-based deduplication */
if (jobIds==nullptr || jobIds->insert(job.jobId).second) {
job.userId = (UserId) userId;
job.startTime = (uint64_t) startTs;
job.endTime = (uint64_t) endTs;
/* Do not forget about the nodes... */
const char *nodeStr;
size_t nodeStr_len;
const CassValue *set = cass_row_get_column_by_name(row, "nodes");
CassIterator *setIt = cass_iterator_from_collection(set);
while (cass_iterator_next(setIt)) {
cass_value_get_string(cass_iterator_get_value(setIt),
&nodeStr, &nodeStr_len);
job.nodes.emplace_back(nodeStr, nodeStr_len);
}
//TODO job.nodes list deep copied?
jobs.push_back(job);
job.nodes.clear();
cass_iterator_free(setIt);
}
}
return error;
}
/**
* @details
* Find the entry in the data store with matching JobId and highest start_ts
......@@ -945,6 +947,17 @@ JDError JobDataStore::getJobsInIntervalExcl(std::list<JobData>& jobs,
return impl->getJobsInIntervalExcl(jobs, intervalStart, intervalEnd);
}
/**
* @details
* Instead of doing the actual work, this function simply forwards to the
* corresponding function of the JobDataStoreImpl class.
*/
JDError JobDataStore::getJobsInIntervalRunning(std::list<JobData>& jobs,
TimeStamp intervalStart,
TimeStamp intervalEnd) {
return impl->getJobsInIntervalRunning(jobs, intervalStart, intervalEnd);
}
/**
* @details
* Instead of doing the actual work, this function simply forwards to the
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment