Commit 0c7355bc authored by Alessio Netti's avatar Alessio Netti
Browse files

Analytics: staleness checks in QueryEngine and thread safety

- Staleness checks when performing sensor queries in the cache are
now enforced both in relative and absolute modes
- Fixed a race condition causing wrong results when performing queries
in absolute mode
- Changed logging in the TesterAnalyzer plugin
parent 7beb1869
......@@ -55,6 +55,7 @@ void TesterAnalyzer::printConfig(LOG_LEVEL ll) {
void TesterAnalyzer::compute(U_Ptr unit) {
uint64_t elCtr=0, queryCtr=0;
bool errorLog=false;
reading_t outR;
outR.timestamp = getTimestamp();
// Looping to the desired number of queries
......@@ -68,12 +69,15 @@ void TesterAnalyzer::compute(U_Ptr unit) {
else
_buffer = _queryEngine.querySensor(in->getName(), outR.timestamp-_window-TESTERAN_OFFSET, outR.timestamp-TESTERAN_OFFSET, _buffer, false);
if (!_buffer || _buffer->empty())
throw std::runtime_error("Analyzer " + _name + ": cannot read from sensor " + in->getName() + "!");
elCtr += _buffer->size();
errorLog = true;
else
elCtr += _buffer->size();
if(++queryCtr >= _numQueries)
break;
}
}
if(errorLog)
LOG(error) << "Analyzer " << _name << ": could not read from one or more sensors!";
outR.value = (int64_t)elCtr;
unit->getOutputs()[0]->storeReading(outR);
}
......@@ -175,8 +175,7 @@ public:
if(startIdx < 0 || endIdx < 0)
return buffer;
//Managing obsolete data
//TODO: Consider making the staleness check cache-wide and not only for the returned view
if(startTsInt > _cache[startIdx].timestamp + staleThreshold || endTsInt > _cache[endIdx].timestamp + staleThreshold)
if(tsAbs(startTsInt, _cache[startIdx].timestamp) > staleThreshold || tsAbs(endTsInt, _cache[endIdx].timestamp) > staleThreshold)
return buffer;
if(startIdx <= endIdx)
buffer->insert(buffer->end(), _cache.begin() + startIdx, _cache.begin() + endIdx + 1);
......@@ -263,35 +262,35 @@ public:
// Cache is empty or has only one element
if(!_stable || _cache.empty())
return -1;
// Target timestamp (relative or absolute) is outside of the time frame contained in the cache
else if(t > _cache[_cacheIndex].timestamp || t < _cache[(_cacheIndex+1) % _cache.size()].timestamp)
return -1;
// Target timestamp (relative or absolute) is outside of the time frame contained in the cache
//else if(t > _cache[_cacheIndex].timestamp || t < _cache[(_cacheIndex+1) % _cache.size()].timestamp)
// return -1;
int64_t pivot = 0, pivotReal = 0, aPoint = 0, bPoint = _cache.size();
int64_t pivot = 0, pivotReal = 0, aPoint = 0, bPoint = _cache.size(), fixIndex=_cacheIndex;
if(leftmost) {
// Attention! aPoint and bPoint are linearized indexes, and do not take into account the presence of cacheIndex
// When computing the position of the pivot, we map it to the actual index in the circular array
// Standard binary search algorithm below
while (aPoint < bPoint) {
pivot = (aPoint + bPoint) / 2;
pivotReal = (_cacheIndex + 1 + pivot) % _cache.size();
pivotReal = (fixIndex + 1 + pivot) % _cache.size();
if (t <= _cache[pivotReal].timestamp)
bPoint = pivot;
else
aPoint = pivot + 1;
}
return (_cacheIndex + 1 + aPoint) % _cache.size();
return (fixIndex + 1 + aPoint) % _cache.size();
}
else {
while (aPoint < bPoint) {
pivot = (aPoint + bPoint) / 2;
pivotReal = (_cacheIndex + 1 + pivot) % _cache.size();
pivotReal = (fixIndex + 1 + pivot) % _cache.size();
if (t < _cache[pivotReal].timestamp)
bPoint = pivot;
else
aPoint = pivot + 1;
}
return (_cacheIndex + aPoint) % _cache.size();
return (fixIndex + aPoint) % _cache.size();
}
}
......@@ -342,6 +341,11 @@ public:
}
private:
inline
uint64_t tsAbs(uint64_t a, uint64_t b) const {
return a<=b ? b-a : a-b;
}
/**
* @brief Returns the index of the immediately newer element with respect to input index "ind".
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment