Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

Commit 9a2bfd2f authored by Alessio Netti's avatar Alessio Netti
Browse files

Analytics: refactored QueryEngine interface

- Now using references instead of pointers
- Return type is now bool
parent a40e9c93
......@@ -58,8 +58,7 @@ public:
* @param name Name of the operator
*/
JobOperatorTemplate(const string name) :
OperatorTemplate<S>(name),
_jobDataVec(nullptr) {
OperatorTemplate<S>(name) {
_unitAccess.store(false);
this->_dynamic = true;
......@@ -72,8 +71,7 @@ public:
*
*/
JobOperatorTemplate(const JobOperatorTemplate& other) :
OperatorTemplate<S>(other),
_jobDataVec(nullptr) {
OperatorTemplate<S>(other) {
_unitAccess.store(false);
this->_dynamic = true;
......@@ -87,7 +85,6 @@ public:
*/
JobOperatorTemplate& operator=(const JobOperatorTemplate& other) {
OperatorTemplate<S>::operator=(other);
_jobDataVec = nullptr;
this->_dynamic = true;
this->_jobFilterStr = QueryEngine::getInstance().getJobFilter();
this->_jobFilter = boost::regex(this->_jobFilterStr);
......@@ -97,10 +94,7 @@ public:
/**
* @brief Class destructor
*/
virtual ~JobOperatorTemplate() {
if(_jobDataVec)
delete _jobDataVec;
}
virtual ~JobOperatorTemplate() {}
/**
* @brief Returns the units of this operator
......@@ -145,15 +139,12 @@ public:
// Getting exclusive access to the operator
while( this->_onDemandLock.exchange(true) ) {}
uint32_t jobId = MQTTChecker::topicToJob(node);
if(_jobDataVec)
_jobDataVec->clear();
vector<qeJobData>* buf = this->_queryEngine.queryJob(jobId, 0, 0, _jobDataVec, true, false);
if(buf) _jobDataVec = buf;
if(buf && !buf->empty()) {
U_Ptr jobUnit = jobDataToUnit(_jobDataVec->at(0));
_jobDataVec.clear();
if(this->_queryEngine.queryJob(jobId, 0, 0, _jobDataVec, true, false) && !_jobDataVec.empty()) {
U_Ptr jobUnit = jobDataToUnit(_jobDataVec[0]);
if(!jobUnit)
throw std::runtime_error("Job " + node + " not in the domain of operator " + this->_name + "!");
this->compute(jobUnit, _jobDataVec->at(0));
this->compute(jobUnit, _jobDataVec[0]);
for (const auto &o : jobUnit->getOutputs()) {
outMap.insert(make_pair(o->getName(), o->getLatestValue()));
o->clearReadingQueue();
......@@ -294,14 +285,11 @@ protected:
}
try {
if(_jobDataVec)
_jobDataVec->clear();
vector<qeJobData>* buf = this->_queryEngine.queryJob(0, this->_interval * 1000000, 0, _jobDataVec, true, true);
if(buf) {
_jobDataVec = buf;
_jobDataVec.clear();
if(this->_queryEngine.queryJob(0, this->_interval * 1000000, 0, _jobDataVec, true, true)) {
_tempUnits.clear();
// Producing units from the job data, discarding invalid jobs in the process
for(auto& job : *_jobDataVec) {
for(auto& job : _jobDataVec) {
try {
_tempUnits.push_back(jobDataToUnit(job));
} catch(const invalid_argument& e2) {
......@@ -313,7 +301,7 @@ protected:
// Performing actual computation on each unit
for(size_t idx=0; idx<_tempUnits.size(); idx++)
if(_tempUnits[idx])
this->compute(_tempUnits[idx], _jobDataVec->at(idx));
this->compute(_tempUnits[idx], _jobDataVec[idx]);
// Acquiring the spinlock to refresh the exposed units
while(_unitAccess.exchange(true)) {}
this->clearUnits();
......@@ -348,7 +336,7 @@ protected:
// Spinlock used to regulate access to the internal units map, for "visualization" purposes
atomic<bool> _unitAccess;
// Vector of job data structures used to retrieve job data at runtime
vector<qeJobData>* _jobDataVec;
vector<qeJobData> _jobDataVec;
// Regex object used to filter out jobs
string _jobFilterStr;
boost::regex _jobFilter;
......
......@@ -43,9 +43,9 @@ struct qeJobData {
};
//Typedef for the callback used to retrieve sensors
typedef vector<reading_t>* (*QueryEngineCallback)(const string&, const uint64_t, const uint64_t, vector<reading_t>*, const bool);
typedef bool (*QueryEngineCallback)(const string&, const uint64_t, const uint64_t, vector<reading_t>&, const bool);
//Typedef for the job retrieval callback
typedef vector<qeJobData>* (*QueryEngineJobCallback)(const uint32_t, const uint64_t, const uint64_t, vector<qeJobData>*, const bool, const bool);
typedef bool (*QueryEngineJobCallback)(const uint32_t, const uint64_t, const uint64_t, vector<qeJobData>&, const bool, const bool);
/**
* @brief Class that grants query access to local and remote sensors
......@@ -207,12 +207,11 @@ public:
* @param name Name of the sensor to be queried
* @param startTs Start timestamp (in nanoseconds) of the time range for the query
* @param endTs End timestamp (in nanoseconds) of the time range for the query. Must be >= startTs
* @param buffer Vector in which readings must be stored. If NULL, a new vector will be allocated
* @param buffer Reference to a vector in which readings must be stored.
* @param rel If true, the input timestamps are considered to be relative offset against "now"
* @return Pointer to a vector containing readings for the given query
* @return True if successful, false otherwise
*/
//TODO: consider switching to a double pointer buffer input and boolean/int output
vector<reading_t>* querySensor(const string& name, const uint64_t startTs, const uint64_t endTs, vector<reading_t>* buffer, const bool rel=true) {
bool querySensor(const string& name, const uint64_t startTs, const uint64_t endTs, vector<reading_t>& buffer, const bool rel=true) {
if(!_callback)
throw runtime_error("Query Engine: callback not set!");
if((startTs > endTs && !rel) || (startTs < endTs && rel))
......@@ -239,12 +238,12 @@ public:
* @param jobId ID of the job to be retrieved (only if range=false)
* @param startTs Start timestamp (in nanoseconds) of the time range for the query (only if range=true)
* @param endTs End timestamp (in nanoseconds) of the time range for the query. (only if range=true)
* @param buffer Vector in which job info must be stored. If NULL, a new vector will be allocated
* @param buffer Reference to a vector in which job info must be stored.
* @param rel If true, the input timestamps are considered to be relative offset against "now"
* @param range If true, the jobId parameter is ignored, and all jobs in the given time range are returned
* @return Pointer to a vector containing job information for the given query
* @return True if successful, false otherwise
*/
vector<qeJobData>* queryJob(const uint32_t jobId, const uint64_t startTs, const uint64_t endTs, vector<qeJobData>* buffer, const bool rel=true, const bool range=false) {
bool queryJob(const uint32_t jobId, const uint64_t startTs, const uint64_t endTs, vector<qeJobData>& buffer, const bool rel=true, const bool range=false) {
if(!_jCallback)
throw runtime_error("Query Engine: job callback not set!");
if((startTs > endTs && !rel) || (startTs < endTs && rel))
......
......@@ -31,19 +31,14 @@
AggregatorOperator::AggregatorOperator(const std::string& name) : OperatorTemplate(name) {
_window = 0;
_relative = true;
_buffer = nullptr;
}
AggregatorOperator::AggregatorOperator(const AggregatorOperator& other) : OperatorTemplate(other) {
_window = other._window;
_relative = other._relative;
_buffer = nullptr;
}
AggregatorOperator::~AggregatorOperator() {
if(_buffer)
delete _buffer;
}
AggregatorOperator::~AggregatorOperator() {}
void AggregatorOperator::printConfig(LOG_LEVEL ll) {
LOG_VAR(ll) << " Window: " << _window;
......@@ -52,9 +47,8 @@ void AggregatorOperator::printConfig(LOG_LEVEL ll) {
}
void AggregatorOperator::compute(U_Ptr unit) {
// Clearing the buffer, if already allocated
if(_buffer)
_buffer->clear();
// Clearing the buffer
_buffer.clear();
size_t elCtr=0;
uint64_t startTs=0, endTs=0, now=getTimestamp();
startTs = _relative ? _window : now - _window;
......@@ -62,15 +56,14 @@ void AggregatorOperator::compute(U_Ptr unit) {
for(const auto& in : unit->getInputs()) {
// Getting the most recent values as specified in _window
// Since we do not clear the internal buffer, all sensor readings will be accumulated in the same vector
elCtr = _buffer==nullptr ? 0 : _buffer->size();
_buffer = _queryEngine.querySensor(in->getName(), startTs, endTs, _buffer, _relative);
if(!_buffer || _buffer->size()<=elCtr)
elCtr = _buffer.size();
if(!_queryEngine.querySensor(in->getName(), startTs, endTs, _buffer, _relative) || _buffer.size()<=elCtr)
throw std::runtime_error("Operator " + _name + ": cannot read from sensor " + in->getName() + "!");
}
compute_internal(unit, _buffer);
}
void AggregatorOperator::compute_internal(U_Ptr unit, vector<reading_t> *buffer) {
void AggregatorOperator::compute_internal(U_Ptr unit, vector<reading_t>& buffer) {
_percentileSensors.clear();
_percentiles.clear();
reading_t reading;
......@@ -82,22 +75,22 @@ void AggregatorOperator::compute_internal(U_Ptr unit, vector<reading_t> *buffer)
if(op!=AggregatorSensorBase::QTL) {
switch (op) {
case AggregatorSensorBase::SUM:
reading.value = computeSum(*buffer);
reading.value = computeSum(buffer);
break;
case AggregatorSensorBase::AVG:
reading.value = computeAvg(*buffer);
reading.value = computeAvg(buffer);
break;
case AggregatorSensorBase::MIN:
reading.value = computeMin(*buffer);
reading.value = computeMin(buffer);
break;
case AggregatorSensorBase::MAX:
reading.value = computeMax(*buffer);
reading.value = computeMax(buffer);
break;
case AggregatorSensorBase::STD:
reading.value = computeStd(*buffer);
reading.value = computeStd(buffer);
break;
case AggregatorSensorBase::OBS:
reading.value = computeObs(*buffer);
reading.value = computeObs(buffer);
break;
default:
LOG(warning) << _name << ": Encountered unknown operation!";
......@@ -111,7 +104,7 @@ void AggregatorOperator::compute_internal(U_Ptr unit, vector<reading_t> *buffer)
}
}
if(!_percentileSensors.empty()) {
computePercentiles(*buffer, _percentiles, _percentileResult);
computePercentiles(buffer, _percentiles, _percentileResult);
for(unsigned idx=0; idx<_percentileResult.size(); idx++) {
reading.value = _percentileResult[idx];
_percentileSensors[idx]->storeReading(reading);
......
......@@ -59,9 +59,9 @@ protected:
virtual void compute(U_Ptr unit) override;
// Internal method containing the actual logic of the operator
void compute_internal(U_Ptr unit, vector<reading_t> *buffer);
void compute_internal(U_Ptr unit, vector<reading_t>& buffer);
vector<reading_t> *_buffer;
vector<reading_t> _buffer;
vector<AggregatorSBPtr> _percentileSensors;
vector<size_t> _percentiles;
vector<int64_t> _percentileResult;
......
......@@ -37,9 +37,8 @@ JobAggregatorOperator::JobAggregatorOperator(const JobAggregatorOperator& other)
JobAggregatorOperator::~JobAggregatorOperator() {}
void JobAggregatorOperator::compute(U_Ptr unit, qeJobData& jobData) {
// Clearing the buffer, if already allocated
if(_buffer)
_buffer->clear();
// Clearing the buffer
_buffer.clear();
size_t elCtr=0;
uint64_t now = getTimestamp();
// Making sure that the aggregation boundaries do not go past the job start/end time
......@@ -50,9 +49,8 @@ void JobAggregatorOperator::compute(U_Ptr unit, qeJobData& jobData) {
// Getting the most recent values as specified in _window
// Since we do not clear the internal buffer, all sensor readings will be accumulated in the same vector
for(const auto& in : subUnit->getInputs()) {
elCtr = _buffer == nullptr ? 0 : _buffer->size();
_buffer = _queryEngine.querySensor(in->getName(), jobStart, jobEnd, _buffer, false);
if (!_buffer || _buffer->size() <= elCtr) {
elCtr = _buffer.size();
if (!_queryEngine.querySensor(in->getName(), jobStart, jobEnd, _buffer, false) || _buffer.size() <= elCtr) {
LOG(debug) << "Job Operator " << _name << " cannot read from sensor " << in->getName() << "!";
return;
}
......
......@@ -29,18 +29,13 @@
FilesinkOperator::FilesinkOperator(const std::string& name) : OperatorTemplate(name) {
_autoName = false;
_buffer = nullptr;
}
FilesinkOperator::FilesinkOperator(const FilesinkOperator& other) : OperatorTemplate(other) {
_autoName = other._autoName;
_buffer = nullptr;
}
FilesinkOperator::~FilesinkOperator() {
if(_buffer)
delete _buffer;
}
FilesinkOperator::~FilesinkOperator() {}
void FilesinkOperator::printConfig(LOG_LEVEL ll) {
LOG_VAR(ll) << " Auto naming: " << (_autoName ? "enabled" : "disabled");
......@@ -63,13 +58,11 @@ void FilesinkOperator::compute(U_Ptr unit) {
}
}
// Clearing the buffer, if already allocated
if(_buffer)
_buffer->clear();
_buffer = _queryEngine.querySensor(in->getName(), 0, 0, _buffer);
if(!_buffer || _buffer->empty())
// Clearing the buffer
_buffer.clear();
if(!_queryEngine.querySensor(in->getName(), 0, 0, _buffer) || _buffer.empty())
LOG(error) << "Operator " + _name + ": cannot read from sensor " + in->getName() + "!";
else if(!in->writeFile(_buffer->at(_buffer->size()-1)))
else if(!in->writeFile(_buffer[_buffer.size()-1]))
LOG(error) << "Operator " + _name + ": failed file write for sensor " << in->getName() << "!";
}
}
......
......@@ -61,7 +61,7 @@ protected:
std::string adjustPath(FilesinkSBPtr s);
bool _autoName;
vector<reading_t> *_buffer;
vector<reading_t> _buffer;
};
......
......@@ -38,7 +38,6 @@ RegressorOperator::RegressorOperator(const std::string& name) : OperatorTemplate
_trainingSet = nullptr;
_responseSet = nullptr;
_currentfVector = nullptr;
_buffer = nullptr;
}
RegressorOperator::RegressorOperator(const RegressorOperator& other) : OperatorTemplate(other) {
......@@ -52,7 +51,6 @@ RegressorOperator::RegressorOperator(const RegressorOperator& other) : OperatorT
_trainingSet = nullptr;
_responseSet = nullptr;
_currentfVector = nullptr;
_buffer = nullptr;
}
RegressorOperator::~RegressorOperator() {
......@@ -62,10 +60,7 @@ RegressorOperator::~RegressorOperator() {
delete _responseSet;
if(_currentfVector)
delete _currentfVector;
if(_buffer)
delete _buffer;
_rForest.release();
_buffer = nullptr;
_currentfVector = nullptr;
_trainingSet = nullptr;
_responseSet = nullptr;
......@@ -193,41 +188,39 @@ void RegressorOperator::computeFeatureVector(U_Ptr unit) {
std::vector<RegressorSBPtr>& inputs = unit->getInputs();
for(idx=0; idx<inputs.size(); idx++) {
_mean=0; _std=0; _diffsum=0; _qtl25=0; _qtl75=0;
if(_buffer)
_buffer->clear();
_buffer = _queryEngine.querySensor(inputs[idx]->getName(), _aggregationWindow, 0, _buffer);
if(!_buffer || _buffer->empty())
_buffer.clear();
if(!_queryEngine.querySensor(inputs[idx]->getName(), _aggregationWindow, 0, _buffer) || _buffer.empty())
throw std::runtime_error("Operator " + _name + ": cannot read from sensor " + inputs[idx]->getName() + "!");
if (inputs[idx]->getTrainingTarget())
_currentTarget = (float)_buffer->back().value;
_currentTarget = (float)_buffer.back().value;
// Computing MEAN and SUM OF DIFFERENCES
val = _buffer->front().value;
for(const auto& v : *_buffer) {
val = _buffer.front().value;
for(const auto& v : _buffer) {
_mean += v.value;
_diffsum += v.value - val;
val = v.value;
}
_mean /= _buffer->size();
_mean /= _buffer.size();
// Computing STD
for(const auto& v : *_buffer) {
for(const auto& v : _buffer) {
val = v.value - _mean;
_std += val*val;
}
_std = sqrt(_std/_buffer->size());
_std = sqrt(_std/_buffer.size());
// I know, sorting is costly; here, we assume that the aggregation window of sensor data is going to be relatively
// small, in which case the O(log(N)) complexity of the std::sort implementation converges to O(N)
std::sort(_buffer->begin(), _buffer->end(), [ ](const reading_t& lhs, const reading_t& rhs) { return lhs.value < rhs.value; });
std::sort(_buffer.begin(), _buffer.end(), [ ](const reading_t& lhs, const reading_t& rhs) { return lhs.value < rhs.value; });
// Computing 25th PERCENTILE
qId = (_buffer->size() * 25) / 100;
qMod = (_buffer->size() * 25) % 100;
_qtl25 = (qMod==0 || qId==_buffer->size()-1) ? _buffer->at(qId).value : (_buffer->at(qId).value + _buffer->at(qId+1).value)/2;
qId = (_buffer.size() * 25) / 100;
qMod = (_buffer.size() * 25) % 100;
_qtl25 = (qMod==0 || qId==_buffer.size()-1) ? _buffer[qId].value : (_buffer[qId].value + _buffer[qId+1].value)/2;
// Computing 75th PERCENTILE
qId = (_buffer->size() * 75) / 100;
qMod = (_buffer->size() * 75) % 100;
_qtl75 = (qMod==0 || qId==_buffer->size()-1) ? _buffer->at(qId).value : (_buffer->at(qId).value + _buffer->at(qId+1).value)/2;
qId = (_buffer.size() * 75) / 100;
qMod = (_buffer.size() * 75) % 100;
_qtl75 = (qMod==0 || qId==_buffer.size()-1) ? _buffer[qId].value : (_buffer[qId].value + _buffer[qId+1].value)/2;
fIdx = idx * REG_NUMFEATURES;
// Casting and storing the statistical features
......@@ -236,7 +229,7 @@ void RegressorOperator::computeFeatureVector(U_Ptr unit) {
_currentfVector->at<float>(fIdx+2) = (float)_diffsum;
_currentfVector->at<float>(fIdx+3) = (float)_qtl25;
_currentfVector->at<float>(fIdx+4) = (float)_qtl75;
_currentfVector->at<float>(fIdx+5) = (float)_buffer->at(_buffer->size()-1).value;
_currentfVector->at<float>(fIdx+5) = (float)_buffer[_buffer.size()-1].value;
}
//LOG(error) << "Target: " << _currentTarget;
//LOG(error) << "Vector: ";
......
......@@ -89,7 +89,7 @@ protected:
bool _trainingPending;
bool _importances;
vector<reading_t> *_buffer;
vector<reading_t> _buffer;
cv::Ptr<cv::ml::RTrees> _rForest;
cv::Mat *_trainingSet;
cv::Mat *_responseSet;
......
......@@ -41,19 +41,18 @@ void SMUCNGPerfOperator::compute(U_Ptr unit) {
if( outSensor->getPosition() == _metricToPosition[SMUCSensorBase::CPI]) {
std::vector<reading_t> & instructions = _buffers[0];
std::vector<reading_t> & clocks = _buffers[1];
_queryEngine.querySensor(inputs[_metricToPosition[SMUCSensorBase::INSTRUCTIONS]]->getName(), timestamp, timestamp, &instructions, false); //use absolute timestamp
_queryEngine.querySensor(inputs[_metricToPosition[SMUCSensorBase::CLOCKS]]->getName(), timestamp, timestamp, &clocks, false); //use absolute timestamp
_queryEngine.querySensor(inputs[_metricToPosition[SMUCSensorBase::INSTRUCTIONS]]->getName(), timestamp, timestamp, instructions, false); //use absolute timestamp
_queryEngine.querySensor(inputs[_metricToPosition[SMUCSensorBase::CLOCKS]]->getName(), timestamp, timestamp, clocks, false); //use absolute timestamp
reading_t cpi;
bool wascalced = false;
if (instructions.size() > 0 && clocks.size() > 0 && calculateMetricRatio(clocks[0], instructions[0],
outSensor->getScalingFactor(), cpi)) {
if (instructions.size() > 0 && clocks.size() > 0 && calculateMetricRatio(clocks[0], instructions[0], outSensor->getScalingFactor(), cpi)) {
outSensor->storeReading(cpi);
}
} else if (outSensor->getPosition() == _metricToPosition[SMUCSensorBase::FREQUENCY]) {
std::vector<reading_t> & clocks = _buffers[0];
std::vector<reading_t> & clocks_ref = _buffers[1];
_queryEngine.querySensor(inputs[_metricToPosition[SMUCSensorBase::CLOCKS]]->getName(), timestamp, timestamp, &clocks, false); //use absolute timestamp
_queryEngine.querySensor(inputs[_metricToPosition[SMUCSensorBase::CLOCKS_REF]]->getName(), timestamp, timestamp, &clocks_ref, false); //use absolute timestamp
_queryEngine.querySensor(inputs[_metricToPosition[SMUCSensorBase::CLOCKS]]->getName(), timestamp, timestamp, clocks, false); //use absolute timestamp
_queryEngine.querySensor(inputs[_metricToPosition[SMUCSensorBase::CLOCKS_REF]]->getName(), timestamp, timestamp, clocks_ref, false); //use absolute timestamp
reading_t frequency;
bool wascalced = false;
if( clocks.size() > 0 && clocks_ref.size() > 0 && calculateFrequency(clocks_ref[0],clocks[0], MIN_FREQ_MHZ, MAX_FREQ_MHZ, frequency)) {
......
......@@ -31,20 +31,15 @@ TesterOperator::TesterOperator(const std::string& name) : OperatorTemplate(name)
_window = 0;
_numQueries = 1;
_relative = true;
_buffer = nullptr;
}
TesterOperator::TesterOperator(const TesterOperator& other) : OperatorTemplate(other) {
_numQueries = other._numQueries;
_window = other._window;
_relative = other._relative;
_buffer = nullptr;
}
TesterOperator::~TesterOperator() {
if(_buffer)
delete _buffer;
}
TesterOperator::~TesterOperator() {}
void TesterOperator::printConfig(LOG_LEVEL ll) {
LOG_VAR(ll) << " Window: " << _window;
......@@ -54,24 +49,21 @@ void TesterOperator::printConfig(LOG_LEVEL ll) {
}
void TesterOperator::compute(U_Ptr unit) {
uint64_t elCtr=0, queryCtr=0;
bool errorLog=false;
reading_t outR;
outR.timestamp = getTimestamp();
uint64_t elCtr=0, queryCtr=0;
uint64_t startTs = _relative ? _window : outR.timestamp-_window-TESTERAN_OFFSET;
uint64_t endTs = _relative ? 0 : outR.timestamp-TESTERAN_OFFSET;
// Looping to the desired number of queries
while(queryCtr < _numQueries) {
for (const auto &in : unit->getInputs()) {
// Clearing the buffer, if already allocated
if (_buffer)
_buffer->clear();
if(_relative)
_buffer = _queryEngine.querySensor(in->getName(), _window, 0, _buffer, true);
else
_buffer = _queryEngine.querySensor(in->getName(), outR.timestamp-_window-TESTERAN_OFFSET, outR.timestamp-TESTERAN_OFFSET, _buffer, false);
if (!_buffer || _buffer->empty())
// Clearing the buffer
_buffer.clear();
if (!_queryEngine.querySensor(in->getName(), startTs, endTs, _buffer, _relative) || _buffer.empty())
errorLog = true;
else
elCtr += _buffer->size();
elCtr += _buffer.size();
if(++queryCtr >= _numQueries)
break;
}
......
......@@ -61,7 +61,7 @@ protected:
virtual void compute(U_Ptr unit) override;
vector<reading_t> *_buffer;
vector<reading_t> _buffer;
unsigned long long _window;
unsigned long long _numQueries;
bool _relative;
......
......@@ -89,7 +89,7 @@ DCDB::SCError err;
QueryEngine& queryEngine = QueryEngine::getInstance();
logger_t lg;
std::vector<qeJobData>* jobQueryCallback(const uint32_t jobId, const uint64_t startTs, const uint64_t endTs, vector<qeJobData>* buffer, const bool rel, const bool range) {
bool jobQueryCallback(const uint32_t jobId, const uint64_t startTs, const uint64_t endTs, vector<qeJobData>& buffer, const bool rel, const bool range) {
std::list<JobData> tempList;
JobData tempData;
qeJobData tempQeData;
......@@ -101,47 +101,40 @@ std::vector<qeJobData>* jobQueryCallback(const uint32_t jobId, const uint64_t st
uint64_t endTsInt = rel ? now - endTs : endTs;
DCDB::TimeStamp start(startTsInt), end(endTsInt);
err = myJobDataStore->getJobsInIntervalRunning(tempList, start, end);
if(err != JD_OK) return buffer;
if(err != JD_OK) return false;
} else {
// Getting a single job by id
err = myJobDataStore->getJobById(tempData, jobId);
if(err != JD_OK) return buffer;
if(err != JD_OK) return false;
tempList.push_back(tempData);
}
if(!buffer)
buffer = new std::vector<qeJobData>();
//buffer->clear();
for(auto& jd : tempList) {
tempQeData.jobId = jd.jobId;
tempQeData.userId = jd.userId;
tempQeData.startTime = jd.startTime.getRaw();
tempQeData.endTime = jd.endTime.getRaw();
tempQeData.nodes = jd.nodes;
buffer->push_back(tempQeData);
buffer.push_back(tempQeData);
}
return buffer;
return true;
}
std::vector<reading_t>* sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>* buffer, const bool rel) {
bool sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>& buffer, const bool rel) {
std::string topic;
// Getting the topic of the queried sensor from the Navigator
try {
topic = queryEngine.getNavigator()->getNodeTopic(name);
} catch(const std::domain_error& e) {
return buffer;
return false;
}
std::vector <reading_t> *output = NULL;
DCDB::SensorId sid;
// Creating a SID to perform the query
sid.mqttTopicConvert(topic);
if(mySensorCache.getSensorMap().count(sid) > 0) {
CacheEntry &entry = mySensorCache.getSensorMap()[sid];
// Counting the number of elements in the buffer before accessing the cache
size_t elCtr = (buffer==nullptr) ? 0 : buffer->size();
output = entry.getView(startTs, endTs, buffer, rel);
if (output->size() > elCtr)
return output;
if (entry.getView(startTs, endTs, buffer, rel))
return true;
}
// If we are here then the sensor was not found in the cache - we need to fetch data from Cassandra
try {
......@@ -156,22 +149,19 @@ std::vector<reading_t>* sensorQueryCallback(const string& name, const uint64_t s
uint64_t endTsInt = rel ? now - endTs : endTs;
DCDB::TimeStamp start(startTsInt), end(endTsInt);
sensor.query(results, start, end, DCDB::AGGREGATE_NONE, 10000000000);
// Dealing with allocations that may have been performed by the cache search
if(!output)