Commit 31d0c28b authored by Carla Guillen Carias's avatar Carla Guillen Carias
Browse files

Merge branch 'development' of https://gitlab.lrz.de/dcdb/dcdb into development

parents 031c2621 874f06ce
......@@ -58,8 +58,7 @@ public:
* @param name Name of the operator
*/
JobOperatorTemplate(const string name) :
OperatorTemplate<S>(name),
_jobDataVec(nullptr) {
OperatorTemplate<S>(name) {
_unitAccess.store(false);
this->_dynamic = true;
......@@ -72,8 +71,7 @@ public:
*
*/
JobOperatorTemplate(const JobOperatorTemplate& other) :
OperatorTemplate<S>(other),
_jobDataVec(nullptr) {
OperatorTemplate<S>(other) {
_unitAccess.store(false);
this->_dynamic = true;
......@@ -87,7 +85,6 @@ public:
*/
JobOperatorTemplate& operator=(const JobOperatorTemplate& other) {
OperatorTemplate<S>::operator=(other);
_jobDataVec = nullptr;
this->_dynamic = true;
this->_jobFilterStr = QueryEngine::getInstance().getJobFilter();
this->_jobFilter = boost::regex(this->_jobFilterStr);
......@@ -97,10 +94,7 @@ public:
/**
* @brief Class destructor
*/
virtual ~JobOperatorTemplate() {
if(_jobDataVec)
delete _jobDataVec;
}
virtual ~JobOperatorTemplate() {}
/**
* @brief Returns the units of this operator
......@@ -145,15 +139,12 @@ public:
// Getting exclusive access to the operator
while( this->_onDemandLock.exchange(true) ) {}
uint32_t jobId = MQTTChecker::topicToJob(node);
if(_jobDataVec)
_jobDataVec->clear();
vector<qeJobData>* buf = this->_queryEngine.queryJob(jobId, 0, 0, _jobDataVec, true, false);
if(buf) _jobDataVec = buf;
if(buf && !buf->empty()) {
U_Ptr jobUnit = jobDataToUnit(_jobDataVec->at(0));
_jobDataVec.clear();
if(this->_queryEngine.queryJob(jobId, 0, 0, _jobDataVec, true, false) && !_jobDataVec.empty()) {
U_Ptr jobUnit = jobDataToUnit(_jobDataVec[0]);
if(!jobUnit)
throw std::runtime_error("Job " + node + " not in the domain of operator " + this->_name + "!");
this->compute(jobUnit, _jobDataVec->at(0));
this->compute(jobUnit, _jobDataVec[0]);
for (const auto &o : jobUnit->getOutputs()) {
outMap.insert(make_pair(o->getName(), o->getLatestValue()));
o->clearReadingQueue();
......@@ -294,14 +285,11 @@ protected:
}
try {
if(_jobDataVec)
_jobDataVec->clear();
vector<qeJobData>* buf = this->_queryEngine.queryJob(0, this->_interval * 1000000, 0, _jobDataVec, true, true);
if(buf) {
_jobDataVec = buf;
_jobDataVec.clear();
if(this->_queryEngine.queryJob(0, this->_interval * 1000000, 0, _jobDataVec, true, true)) {
_tempUnits.clear();
// Producing units from the job data, discarding invalid jobs in the process
for(auto& job : *_jobDataVec) {
for(auto& job : _jobDataVec) {
try {
_tempUnits.push_back(jobDataToUnit(job));
} catch(const invalid_argument& e2) {
......@@ -313,7 +301,7 @@ protected:
// Performing actual computation on each unit
for(size_t idx=0; idx<_tempUnits.size(); idx++)
if(_tempUnits[idx])
this->compute(_tempUnits[idx], _jobDataVec->at(idx));
this->compute(_tempUnits[idx], _jobDataVec[idx]);
// Acquiring the spinlock to refresh the exposed units
while(_unitAccess.exchange(true)) {}
this->clearUnits();
......@@ -348,7 +336,7 @@ protected:
// Spinlock used to regulate access to the internal units map, for "visualization" purposes
atomic<bool> _unitAccess;
// Vector of job data structures used to retrieve job data at runtime
vector<qeJobData>* _jobDataVec;
vector<qeJobData> _jobDataVec;
// Regex object used to filter out jobs
string _jobFilterStr;
boost::regex _jobFilter;
......
......@@ -43,9 +43,9 @@ struct qeJobData {
};
//Typedef for the callback used to retrieve sensors
typedef vector<reading_t>* (*QueryEngineCallback)(const string&, const uint64_t, const uint64_t, vector<reading_t>*, const bool);
typedef bool (*QueryEngineCallback)(const string&, const uint64_t, const uint64_t, vector<reading_t>&, const bool);
//Typedef for the job retrieval callback
typedef vector<qeJobData>* (*QueryEngineJobCallback)(const uint32_t, const uint64_t, const uint64_t, vector<qeJobData>*, const bool, const bool);
typedef bool (*QueryEngineJobCallback)(const uint32_t, const uint64_t, const uint64_t, vector<qeJobData>&, const bool, const bool);
/**
* @brief Class that grants query access to local and remote sensors
......@@ -207,12 +207,11 @@ public:
* @param name Name of the sensor to be queried
* @param startTs Start timestamp (in nanoseconds) of the time range for the query
* @param endTs End timestamp (in nanoseconds) of the time range for the query. Must be >= startTs
* @param buffer Vector in which readings must be stored. If NULL, a new vector will be allocated
* @param buffer Reference to a vector in which readings must be stored.
* @param rel If true, the input timestamps are considered to be relative offset against "now"
* @return Pointer to a vector containing readings for the given query
* @return True if successful, false otherwise
*/
//TODO: consider switching to a double pointer buffer input and boolean/int output
vector<reading_t>* querySensor(const string& name, const uint64_t startTs, const uint64_t endTs, vector<reading_t>* buffer, const bool rel=true) {
bool querySensor(const string& name, const uint64_t startTs, const uint64_t endTs, vector<reading_t>& buffer, const bool rel=true) {
if(!_callback)
throw runtime_error("Query Engine: callback not set!");
if((startTs > endTs && !rel) || (startTs < endTs && rel))
......@@ -239,12 +238,12 @@ public:
* @param jobId ID of the job to be retrieved (only if range=false)
* @param startTs Start timestamp (in nanoseconds) of the time range for the query (only if range=true)
* @param endTs End timestamp (in nanoseconds) of the time range for the query. (only if range=true)
* @param buffer Vector in which job info must be stored. If NULL, a new vector will be allocated
* @param buffer Reference to a vector in which job info must be stored.
* @param rel If true, the input timestamps are considered to be relative offset against "now"
* @param range If true, the jobId parameter is ignored, and all jobs in the given time range are returned
* @return Pointer to a vector containing job information for the given query
* @return True if successful, false otherwise
*/
vector<qeJobData>* queryJob(const uint32_t jobId, const uint64_t startTs, const uint64_t endTs, vector<qeJobData>* buffer, const bool rel=true, const bool range=false) {
bool queryJob(const uint32_t jobId, const uint64_t startTs, const uint64_t endTs, vector<qeJobData>& buffer, const bool rel=true, const bool range=false) {
if(!_jCallback)
throw runtime_error("Query Engine: job callback not set!");
if((startTs > endTs && !rel) || (startTs < endTs && rel))
......
......@@ -31,19 +31,14 @@
AggregatorOperator::AggregatorOperator(const std::string& name) : OperatorTemplate(name) {
_window = 0;
_relative = true;
_buffer = nullptr;
}
AggregatorOperator::AggregatorOperator(const AggregatorOperator& other) : OperatorTemplate(other) {
_window = other._window;
_relative = other._relative;
_buffer = nullptr;
}
AggregatorOperator::~AggregatorOperator() {
if(_buffer)
delete _buffer;
}
AggregatorOperator::~AggregatorOperator() {}
void AggregatorOperator::printConfig(LOG_LEVEL ll) {
LOG_VAR(ll) << " Window: " << _window;
......@@ -52,9 +47,8 @@ void AggregatorOperator::printConfig(LOG_LEVEL ll) {
}
void AggregatorOperator::compute(U_Ptr unit) {
// Clearing the buffer, if already allocated
if(_buffer)
_buffer->clear();
// Clearing the buffer
_buffer.clear();
size_t elCtr=0;
uint64_t startTs=0, endTs=0, now=getTimestamp();
startTs = _relative ? _window : now - _window;
......@@ -62,15 +56,14 @@ void AggregatorOperator::compute(U_Ptr unit) {
for(const auto& in : unit->getInputs()) {
// Getting the most recent values as specified in _window
// Since we do not clear the internal buffer, all sensor readings will be accumulated in the same vector
elCtr = _buffer==nullptr ? 0 : _buffer->size();
_buffer = _queryEngine.querySensor(in->getName(), startTs, endTs, _buffer, _relative);
if(!_buffer || _buffer->size()<=elCtr)
elCtr = _buffer.size();
if(!_queryEngine.querySensor(in->getName(), startTs, endTs, _buffer, _relative) || _buffer.size()<=elCtr)
throw std::runtime_error("Operator " + _name + ": cannot read from sensor " + in->getName() + "!");
}
compute_internal(unit, _buffer);
}
void AggregatorOperator::compute_internal(U_Ptr unit, vector<reading_t> *buffer) {
void AggregatorOperator::compute_internal(U_Ptr unit, vector<reading_t>& buffer) {
_percentileSensors.clear();
_percentiles.clear();
reading_t reading;
......@@ -82,22 +75,22 @@ void AggregatorOperator::compute_internal(U_Ptr unit, vector<reading_t> *buffer)
if(op!=AggregatorSensorBase::QTL) {
switch (op) {
case AggregatorSensorBase::SUM:
reading.value = computeSum(*buffer);
reading.value = computeSum(buffer);
break;
case AggregatorSensorBase::AVG:
reading.value = computeAvg(*buffer);
reading.value = computeAvg(buffer);
break;
case AggregatorSensorBase::MIN:
reading.value = computeMin(*buffer);
reading.value = computeMin(buffer);
break;
case AggregatorSensorBase::MAX:
reading.value = computeMax(*buffer);
reading.value = computeMax(buffer);
break;
case AggregatorSensorBase::STD:
reading.value = computeStd(*buffer);
reading.value = computeStd(buffer);
break;
case AggregatorSensorBase::OBS:
reading.value = computeObs(*buffer);
reading.value = computeObs(buffer);
break;
default:
LOG(warning) << _name << ": Encountered unknown operation!";
......@@ -111,7 +104,7 @@ void AggregatorOperator::compute_internal(U_Ptr unit, vector<reading_t> *buffer)
}
}
if(!_percentileSensors.empty()) {
computePercentiles(*buffer, _percentiles, _percentileResult);
computePercentiles(buffer, _percentiles, _percentileResult);
for(unsigned idx=0; idx<_percentileResult.size(); idx++) {
reading.value = _percentileResult[idx];
_percentileSensors[idx]->storeReading(reading);
......
......@@ -59,9 +59,9 @@ protected:
virtual void compute(U_Ptr unit) override;
// Internal method containing the actual logic of the operator
void compute_internal(U_Ptr unit, vector<reading_t> *buffer);
void compute_internal(U_Ptr unit, vector<reading_t>& buffer);
vector<reading_t> *_buffer;
vector<reading_t> _buffer;
vector<AggregatorSBPtr> _percentileSensors;
vector<size_t> _percentiles;
vector<int64_t> _percentileResult;
......
......@@ -37,9 +37,8 @@ JobAggregatorOperator::JobAggregatorOperator(const JobAggregatorOperator& other)
JobAggregatorOperator::~JobAggregatorOperator() {}
void JobAggregatorOperator::compute(U_Ptr unit, qeJobData& jobData) {
// Clearing the buffer, if already allocated
if(_buffer)
_buffer->clear();
// Clearing the buffer
_buffer.clear();
size_t elCtr=0;
uint64_t now = getTimestamp();
// Making sure that the aggregation boundaries do not go past the job start/end time
......@@ -50,9 +49,8 @@ void JobAggregatorOperator::compute(U_Ptr unit, qeJobData& jobData) {
// Getting the most recent values as specified in _window
// Since we do not clear the internal buffer, all sensor readings will be accumulated in the same vector
for(const auto& in : subUnit->getInputs()) {
elCtr = _buffer == nullptr ? 0 : _buffer->size();
_buffer = _queryEngine.querySensor(in->getName(), jobStart, jobEnd, _buffer, false);
if (!_buffer || _buffer->size() <= elCtr) {
elCtr = _buffer.size();
if (!_queryEngine.querySensor(in->getName(), jobStart, jobEnd, _buffer, false) || _buffer.size() <= elCtr) {
LOG(debug) << "Job Operator " << _name << " cannot read from sensor " << in->getName() << "!";
return;
}
......
......@@ -29,18 +29,13 @@
FilesinkOperator::FilesinkOperator(const std::string& name) : OperatorTemplate(name) {
_autoName = false;
_buffer = nullptr;
}
FilesinkOperator::FilesinkOperator(const FilesinkOperator& other) : OperatorTemplate(other) {
_autoName = other._autoName;
_buffer = nullptr;
}
FilesinkOperator::~FilesinkOperator() {
if(_buffer)
delete _buffer;
}
FilesinkOperator::~FilesinkOperator() {}
void FilesinkOperator::printConfig(LOG_LEVEL ll) {
LOG_VAR(ll) << " Auto naming: " << (_autoName ? "enabled" : "disabled");
......@@ -63,13 +58,11 @@ void FilesinkOperator::compute(U_Ptr unit) {
}
}
// Clearing the buffer, if already allocated
if(_buffer)
_buffer->clear();
_buffer = _queryEngine.querySensor(in->getName(), 0, 0, _buffer);
if(!_buffer || _buffer->empty())
// Clearing the buffer
_buffer.clear();
if(!_queryEngine.querySensor(in->getName(), 0, 0, _buffer) || _buffer.empty())
LOG(error) << "Operator " + _name + ": cannot read from sensor " + in->getName() + "!";
else if(!in->writeFile(_buffer->at(_buffer->size()-1)))
else if(!in->writeFile(_buffer[_buffer.size()-1]))
LOG(error) << "Operator " + _name + ": failed file write for sensor " << in->getName() << "!";
}
}
......
......@@ -61,7 +61,7 @@ protected:
std::string adjustPath(FilesinkSBPtr s);
bool _autoName;
vector<reading_t> *_buffer;
vector<reading_t> _buffer;
};
......
......@@ -38,7 +38,6 @@ RegressorOperator::RegressorOperator(const std::string& name) : OperatorTemplate
_trainingSet = nullptr;
_responseSet = nullptr;
_currentfVector = nullptr;
_buffer = nullptr;
}
RegressorOperator::RegressorOperator(const RegressorOperator& other) : OperatorTemplate(other) {
......@@ -52,7 +51,6 @@ RegressorOperator::RegressorOperator(const RegressorOperator& other) : OperatorT
_trainingSet = nullptr;
_responseSet = nullptr;
_currentfVector = nullptr;
_buffer = nullptr;
}
RegressorOperator::~RegressorOperator() {
......@@ -62,10 +60,7 @@ RegressorOperator::~RegressorOperator() {
delete _responseSet;
if(_currentfVector)
delete _currentfVector;
if(_buffer)
delete _buffer;
_rForest.release();
_buffer = nullptr;
_currentfVector = nullptr;
_trainingSet = nullptr;
_responseSet = nullptr;
......@@ -193,41 +188,39 @@ void RegressorOperator::computeFeatureVector(U_Ptr unit) {
std::vector<RegressorSBPtr>& inputs = unit->getInputs();
for(idx=0; idx<inputs.size(); idx++) {
_mean=0; _std=0; _diffsum=0; _qtl25=0; _qtl75=0;
if(_buffer)
_buffer->clear();
_buffer = _queryEngine.querySensor(inputs[idx]->getName(), _aggregationWindow, 0, _buffer);
if(!_buffer || _buffer->empty())
_buffer.clear();
if(!_queryEngine.querySensor(inputs[idx]->getName(), _aggregationWindow, 0, _buffer) || _buffer.empty())
throw std::runtime_error("Operator " + _name + ": cannot read from sensor " + inputs[idx]->getName() + "!");
if (inputs[idx]->getTrainingTarget())
_currentTarget = (float)_buffer->back().value;
_currentTarget = (float)_buffer.back().value;
// Computing MEAN and SUM OF DIFFERENCES
val = _buffer->front().value;
for(const auto& v : *_buffer) {
val = _buffer.front().value;
for(const auto& v : _buffer) {
_mean += v.value;
_diffsum += v.value - val;
val = v.value;
}
_mean /= _buffer->size();
_mean /= _buffer.size();
// Computing STD
for(const auto& v : *_buffer) {
for(const auto& v : _buffer) {
val = v.value - _mean;
_std += val*val;
}
_std = sqrt(_std/_buffer->size());
_std = sqrt(_std/_buffer.size());
// I know, sorting is costly; here, we assume that the aggregation window of sensor data is going to be relatively
// small, in which case the O(log(N)) complexity of the std::sort implementation converges to O(N)
std::sort(_buffer->begin(), _buffer->end(), [ ](const reading_t& lhs, const reading_t& rhs) { return lhs.value < rhs.value; });
std::sort(_buffer.begin(), _buffer.end(), [ ](const reading_t& lhs, const reading_t& rhs) { return lhs.value < rhs.value; });
// Computing 25th PERCENTILE
qId = (_buffer->size() * 25) / 100;
qMod = (_buffer->size() * 25) % 100;
_qtl25 = (qMod==0 || qId==_buffer->size()-1) ? _buffer->at(qId).value : (_buffer->at(qId).value + _buffer->at(qId+1).value)/2;
qId = (_buffer.size() * 25) / 100;
qMod = (_buffer.size() * 25) % 100;
_qtl25 = (qMod==0 || qId==_buffer.size()-1) ? _buffer[qId].value : (_buffer[qId].value + _buffer[qId+1].value)/2;
// Computing 75th PERCENTILE
qId = (_buffer->size() * 75) / 100;
qMod = (_buffer->size() * 75) % 100;
_qtl75 = (qMod==0 || qId==_buffer->size()-1) ? _buffer->at(qId).value : (_buffer->at(qId).value + _buffer->at(qId+1).value)/2;
qId = (_buffer.size() * 75) / 100;
qMod = (_buffer.size() * 75) % 100;
_qtl75 = (qMod==0 || qId==_buffer.size()-1) ? _buffer[qId].value : (_buffer[qId].value + _buffer[qId+1].value)/2;
fIdx = idx * REG_NUMFEATURES;
// Casting and storing the statistical features
......@@ -236,7 +229,7 @@ void RegressorOperator::computeFeatureVector(U_Ptr unit) {
_currentfVector->at<float>(fIdx+2) = (float)_diffsum;
_currentfVector->at<float>(fIdx+3) = (float)_qtl25;
_currentfVector->at<float>(fIdx+4) = (float)_qtl75;
_currentfVector->at<float>(fIdx+5) = (float)_buffer->at(_buffer->size()-1).value;
_currentfVector->at<float>(fIdx+5) = (float)_buffer[_buffer.size()-1].value;
}
//LOG(error) << "Target: " << _currentTarget;
//LOG(error) << "Vector: ";
......
......@@ -89,7 +89,7 @@ protected:
bool _trainingPending;
bool _importances;
vector<reading_t> *_buffer;
vector<reading_t> _buffer;
cv::Ptr<cv::ml::RTrees> _rForest;
cv::Mat *_trainingSet;
cv::Mat *_responseSet;
......
......@@ -61,19 +61,18 @@ void SMUCNGPerfOperator::compute(U_Ptr unit) {
if( outSensor->getPosition() == _metricToPosition[SMUCSensorBase::CPI]) {
std::vector<reading_t> & instructions = _buffers[0];
std::vector<reading_t> & clocks = _buffers[1];
_queryEngine.querySensor(inputs[_metricToPosition[SMUCSensorBase::INSTRUCTIONS]]->getName(), timestamp, timestamp, &instructions, false); //use absolute timestamp
_queryEngine.querySensor(inputs[_metricToPosition[SMUCSensorBase::CLOCKS]]->getName(), timestamp, timestamp, &clocks, false); //use absolute timestamp
_queryEngine.querySensor(inputs[_metricToPosition[SMUCSensorBase::INSTRUCTIONS]]->getName(), timestamp, timestamp, instructions, false); //use absolute timestamp
_queryEngine.querySensor(inputs[_metricToPosition[SMUCSensorBase::CLOCKS]]->getName(), timestamp, timestamp, clocks, false); //use absolute timestamp
reading_t cpi;
bool wascalced = false;
if (instructions.size() > 0 && clocks.size() > 0 && calculateMetricRatio(clocks[0], instructions[0],
outSensor->getScalingFactor(), cpi)) {
if (instructions.size() > 0 && clocks.size() > 0 && calculateMetricRatio(clocks[0], instructions[0], outSensor->getScalingFactor(), cpi)) {
outSensor->storeReading(cpi);
}
} else if (outSensor->getPosition() == _metricToPosition[SMUCSensorBase::FREQUENCY]) {
std::vector<reading_t> & clocks = _buffers[0];
std::vector<reading_t> & clocks_ref = _buffers[1];
_queryEngine.querySensor(inputs[_metricToPosition[SMUCSensorBase::CLOCKS]]->getName(), timestamp, timestamp, &clocks, false); //use absolute timestamp
_queryEngine.querySensor(inputs[_metricToPosition[SMUCSensorBase::CLOCKS_REF]]->getName(), timestamp, timestamp, &clocks_ref, false); //use absolute timestamp
_queryEngine.querySensor(inputs[_metricToPosition[SMUCSensorBase::CLOCKS]]->getName(), timestamp, timestamp, clocks, false); //use absolute timestamp
_queryEngine.querySensor(inputs[_metricToPosition[SMUCSensorBase::CLOCKS_REF]]->getName(), timestamp, timestamp, clocks_ref, false); //use absolute timestamp
reading_t frequency;
if( clocks.size() > 0 && clocks_ref.size() > 0 && calculateFrequency(clocks_ref[0],clocks[0], MIN_FREQ_MHZ, MAX_FREQ_MHZ, frequency)) {
outSensor->storeReading(frequency);
......
......@@ -31,20 +31,15 @@ TesterOperator::TesterOperator(const std::string& name) : OperatorTemplate(name)
_window = 0;
_numQueries = 1;
_relative = true;
_buffer = nullptr;
}
TesterOperator::TesterOperator(const TesterOperator& other) : OperatorTemplate(other) {
_numQueries = other._numQueries;
_window = other._window;
_relative = other._relative;
_buffer = nullptr;
}
TesterOperator::~TesterOperator() {
if(_buffer)
delete _buffer;
}
TesterOperator::~TesterOperator() {}
void TesterOperator::printConfig(LOG_LEVEL ll) {
LOG_VAR(ll) << " Window: " << _window;
......@@ -54,24 +49,21 @@ void TesterOperator::printConfig(LOG_LEVEL ll) {
}
void TesterOperator::compute(U_Ptr unit) {
uint64_t elCtr=0, queryCtr=0;
bool errorLog=false;
reading_t outR;
outR.timestamp = getTimestamp();
uint64_t elCtr=0, queryCtr=0;
uint64_t startTs = _relative ? _window : outR.timestamp-_window-TESTERAN_OFFSET;
uint64_t endTs = _relative ? 0 : outR.timestamp-TESTERAN_OFFSET;
// Looping to the desired number of queries
while(queryCtr < _numQueries) {
for (const auto &in : unit->getInputs()) {
// Clearing the buffer, if already allocated
if (_buffer)
_buffer->clear();
if(_relative)
_buffer = _queryEngine.querySensor(in->getName(), _window, 0, _buffer, true);
else
_buffer = _queryEngine.querySensor(in->getName(), outR.timestamp-_window-TESTERAN_OFFSET, outR.timestamp-TESTERAN_OFFSET, _buffer, false);
if (!_buffer || _buffer->empty())
// Clearing the buffer
_buffer.clear();
if (!_queryEngine.querySensor(in->getName(), startTs, endTs, _buffer, _relative) || _buffer.empty())
errorLog = true;
else
elCtr += _buffer->size();
elCtr += _buffer.size();
if(++queryCtr >= _numQueries)
break;
}
......
......@@ -61,7 +61,7 @@ protected:
virtual void compute(U_Ptr unit) override;
vector<reading_t> *_buffer;
vector<reading_t> _buffer;
unsigned long long _window;
unsigned long long _numQueries;
bool _relative;
......
......@@ -89,7 +89,7 @@ DCDB::SCError err;
QueryEngine& queryEngine = QueryEngine::getInstance();
logger_t lg;
std::vector<qeJobData>* jobQueryCallback(const uint32_t jobId, const uint64_t startTs, const uint64_t endTs, vector<qeJobData>* buffer, const bool rel, const bool range) {
bool jobQueryCallback(const uint32_t jobId, const uint64_t startTs, const uint64_t endTs, vector<qeJobData>& buffer, const bool rel, const bool range) {
std::list<JobData> tempList;
JobData tempData;
qeJobData tempQeData;
......@@ -101,47 +101,40 @@ std::vector<qeJobData>* jobQueryCallback(const uint32_t jobId, const uint64_t st
uint64_t endTsInt = rel ? now - endTs : endTs;
DCDB::TimeStamp start(startTsInt), end(endTsInt);
err = myJobDataStore->getJobsInIntervalRunning(tempList, start, end);
if(err != JD_OK) return buffer;
if(err != JD_OK) return false;
} else {
// Getting a single job by id
err = myJobDataStore->getJobById(tempData, jobId);
if(err != JD_OK) return buffer;
if(err != JD_OK) return false;
tempList.push_back(tempData);
}
if(!buffer)
buffer = new std::vector<qeJobData>();
//buffer->clear();
for(auto& jd : tempList) {
tempQeData.jobId = jd.jobId;
tempQeData.userId = jd.userId;
tempQeData.startTime = jd.startTime.getRaw();
tempQeData.endTime = jd.endTime.getRaw();
tempQeData.nodes = jd.nodes;
buffer->push_back(tempQeData);
buffer.push_back(tempQeData);
}
return buffer;
return true;
}
std::vector<reading_t>* sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>* buffer, const bool rel) {
bool sensorQueryCallback(const string& name, const uint64_t startTs, const uint64_t endTs, std::vector<reading_t>& buffer, const bool rel) {
std::string topic;
// Getting the topic of the queried sensor from the Navigator
try {
topic = queryEngine.getNavigator()->getNodeTopic(name);
} catch(const std::domain_error& e) {
return buffer;
return false;
}
std::vector <reading_t> *output = NULL;
DCDB::SensorId sid;
// Creating a SID to perform the query
sid.mqttTopicConvert(topic);
if(mySensorCache.getSensorMap().count(sid) > 0) {
CacheEntry &entry = mySensorCache.getSensorMap()[sid];
// Counting the number of elements in the buffer before accessing the cache
size_t elCtr = (buffer==nullptr) ? 0 : buffer->size();
output = entry.getView(startTs, endTs, buffer, rel);
if (output->size() > elCtr)
return output;
if (entry.getView(startTs, endTs, buffer, rel))
return true;
}
// If we are here then the sensor was not found in the cache - we need to fetch data from Cassandra
try {
......@@ -156,22 +149,19 @@ std::vector<reading_t>* sensorQueryCallback(const string& name, const uint64_t s
uint64_t endTsInt = rel ? now - endTs : endTs;
DCDB::TimeStamp start(startTsInt), end(endTsInt);
sensor.query(results, start, end, DCDB::AGGREGATE_NONE, 10000000000);
// Dealing with allocations that may have been performed by the cache search
if(!output)