Commit eb186235 authored by Alessio Netti's avatar Alessio Netti
Browse files

DA: search-based sensor queries

- When performing sensor queries with absolute timestamps (rel=false) a
binary search algorithm is used to compute the bounds of the required
sensor cache view
- This is more accurate than the fuzzy method employed with relative
timestamps (rel=true)
parent 6f9f4d9d
......@@ -91,6 +91,14 @@ public:
* "buffer" vector allows to re-use memory over successive readings. Note that in order to use
* this method, a callback must have been set through the setQueryCallback method. If not, this
* method will throw an exception.
*
* The "rel" argument governs how the search is performed in local sensor caches: if set to true,
* startTs and endTs indicate relative offsets against the most recent reading, and the returned
* vector is a view of the cache whose range is computed statically in O(1), and therefore the
* underlying data may be slightly unaligned depending on the sampling rate. If rel is set to
* false, startTs and endTs are interpreted as absolute timestamps, and the cache view is
* determined by performing binary search with O(log(n)) complexity, thus resulting in a accurate
* time range. This parameter does not affect the query method when using the Cassandra datastore.
*
* @param name Name of the sensor to be queried
* @param startTs Start timestamp (in nanoseconds) of the time range for the query
......
......@@ -84,8 +84,7 @@ std::vector<reading_t>* sensorQueryCallback(const string& name, const uint64_t s
sid.mqttTopicConvert(topic);
if(mySensorCache.getSensorMap().count(sid) > 0) {
CacheEntry &entry = mySensorCache.getSensorMap()[sid];
// getView is called with live=false to drop strict staleness checks
output = entry.getView(startTs, endTs, buffer, rel, false);
output = entry.getView(startTs, endTs, buffer, rel);
if (output->size() > 0)
return output;
}
......
......@@ -56,9 +56,13 @@ public:
* A 0.05 learning rate is used to update the internal batch size value.
*
* @param newsize The new observed batch size
* @param enforce If true, the batch size is updated ignoring the learning rate
**/
void updateBatchSize(uint64_t newsize) {
_batchSize = _batchSize < 0.0 ? (float)newsize : _batchSize*0.95 + (float)newsize*0.05;
void updateBatchSize(uint64_t newsize, bool enforce=false) {
if(!enforce)
_batchSize = _batchSize < 0.0 ? (float)newsize : _batchSize*0.95 + (float)newsize*0.05;
else
_batchSize = (float)newsize;
}
/**
......@@ -113,30 +117,30 @@ public:
* @param endTs End timestamp of the desired view
* @param buffer Pointer to a vector to be used to store the view. If null, a new vector is allocated
* @param rel If true, startTs and endTs are interpreted as relative timestamps against "the most recent sensor reading"
* @param live If true, checks are performed such that the returned view does not correspond to obsolete data
* @return A vector of sensor readings containing the cache view, or an empty vector if not possibile
**/
std::vector<reading_t>* getView(uint64_t startTs, uint64_t endTs, std::vector<reading_t>* buffer=nullptr, bool rel=false, bool live=false) const {
std::vector<reading_t>* getView(uint64_t startTs, uint64_t endTs, std::vector<reading_t>* buffer=nullptr, bool rel=false) const {
if(!buffer)
buffer = new std::vector<reading_t>();
buffer->clear();
// If "live" is set to false, we add the estimated batch size in the computation of the stale threshold
// We add the estimated batch size in the computation of the stale threshold (set to 1 if not used)
uint64_t cacheSize = _cache.size()>1 ? _cache.size()-1 : 1;
uint64_t staleThreshold = (_maxHistory / cacheSize) * (live ? 4 : (uint64_t)_batchSize * 4);
uint64_t staleThreshold = (_maxHistory / cacheSize) * (uint64_t)_batchSize * 4;
uint64_t now = getTimestamp();
//Converting absolute timestamps to relative offsets for cache access
uint64_t startTsInt = rel ? startTs : now - startTs;
uint64_t endTsInt = rel ? endTs : now - endTs;
//Converting relative offsets to absolute timestamp for staleness checking
uint64_t startTsInt = rel ? now - startTs : startTs;
uint64_t endTsInt = rel ? now - endTs : endTs;
//Getting the cache indexes to access sensor data
int64_t startIdx = getOffset(startTsInt);
int64_t endIdx = getOffset(endTsInt);
int64_t startIdx = rel ? getOffset(startTs) : searchTimestamp(startTs);
int64_t endIdx = rel ? getOffset(endTs) : searchTimestamp(endTs);
//Managing invalid time offsets
if( startIdx < 0 || endIdx < 0)
if(startIdx < 0 || endIdx < 0)
return buffer;
//Managing obsolete data
if(now - startTsInt > _cache[startIdx].timestamp + staleThreshold || now - endTsInt > _cache[endIdx].timestamp + staleThreshold)
//TODO: Consider making the staleness check cache-wide and not only for the returned view
if(startTsInt > _cache[startIdx].timestamp + staleThreshold || endTsInt > _cache[endIdx].timestamp + staleThreshold)
return buffer;
if(startIdx <= endIdx)
buffer->insert(buffer->end(), _cache.begin() + startIdx, _cache.begin() + endIdx + 1);
......@@ -215,23 +219,17 @@ public:
* index is returned if successful.
*
* @param t Timestamp to be searched, in nanoseconds.
* @param relative Boolean: if True, t is considered as a relative time offset against the most recent reading.
*
* @return The index of the closest sensor reading to t, or -1 if out of bounds.
**/
int64_t searchTimestamp(uint64_t t, bool relative=false) const {
int64_t searchTimestamp(uint64_t t) const {
// Cache is empty or has only one element
if(_cache.size()<2)
if(!_stable || _cache.empty())
return -1;
// Target timestamp (relative or absolute) is outside of the time frame contained in the cache
else if(!relative && (t > _cache[_cacheIndex].timestamp || t < _cache[(_cacheIndex+1) % _cache.size()].timestamp))
return -1;
else if(relative && _cache[_cacheIndex].timestamp - t < _cache[(_cacheIndex+1) % _cache.size()].timestamp)
else if(t > _cache[_cacheIndex].timestamp || t < _cache[(_cacheIndex+1) % _cache.size()].timestamp)
return -1;
if(relative)
t = _cache[_cacheIndex].timestamp - t;
int64_t pivot=0, pivotReal=0, aPoint=0, bPoint=_cache.size()-1;
// Attention! aPoint and bPoint are linearized indexes, and do not take into account the presence of cacheIndex
......@@ -260,12 +258,12 @@ public:
*
* @return Index of element in the sensor cache.
**/
int64_t getOffset(int64_t t) const {
if(!_stable || _cache.empty() || t < 0)
int64_t getOffset(uint64_t t) const {
if(!_stable || _cache.empty())
return -1;
else {
int64_t cacheSize = _cache.size();
int64_t offset = _maxHistory==0 ? 0 : (( cacheSize * t ) / ((int64_t)_maxHistory));
int64_t offset = _maxHistory==0 ? 0 : (( cacheSize * (int64_t)t ) / ((int64_t)_maxHistory));
if(offset > cacheSize)
return -1;
return (cacheSize + _cacheIndex - offset) % cacheSize;
......
......@@ -126,6 +126,7 @@ public:
if(!_cache) {
//TODO: have all time-related configuration parameters use the same unit (e.g. milliseconds)
_cache.reset(new CacheEntry( (uint64_t)_cacheInterval * 1000000, cacheSize));
_cache->updateBatchSize(1, true);
}
if(!_readingQueue) {
_readingQueue.reset(new boost::lockfree::spsc_queue<reading_t>(QUEUE_MAXLIMIT));
......
......@@ -86,7 +86,7 @@ std::vector<reading_t>* sensorQueryCallback(const string& name, const uint64_t s
if(!sensor->isInit())
return NULL;
else
return sensor->getCache()->getView(startTs, endTs, buffer, rel, true);
return sensor->getCache()->getView(startTs, endTs, buffer, rel);
}
return NULL;
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment