Commit e19fd249 authored by Alessio Netti's avatar Alessio Netti
Browse files

DA: data staleness checks

- Batch size in the collectagent now taken into account to determine if
a specific sensor cache is stale or not
parent 35cfab1b
......@@ -269,11 +269,12 @@ int mqttCallback(SimpleMQTTMessage *msg)
cout << endl;
#endif
std::list<DCDB::SensorDataStoreReading> readings;
for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
DCDB::SensorDataStoreReading r(sid, payload[i].timestamp, payload[i].value);
readings.push_back(r);
mySensorCache.storeSensor(sid, payload[i].timestamp, payload[i].value);
}
for (uint64_t i = 0; i < len / sizeof(mqttPayload); i++) {
DCDB::SensorDataStoreReading r(sid, payload[i].timestamp, payload[i].value);
readings.push_back(r);
mySensorCache.storeSensor(sid, payload[i].timestamp, payload[i].value);
}
mySensorCache.getSensorMap()[sid].updateBatchSize(uint64_t(len / sizeof(mqttPayload)));
mySensorDataStore->insertBatch(readings);
readingCtr+= readings.size();
......
......@@ -32,6 +32,7 @@ public:
_maxHistory = maxHistory;
_stable = false;
_cacheIndex = -1;
_batchSize = -1.0;
//We pre-allocate the cache to a initial guess of 600 elements - 10 minutes at 1s sampling period
_cache.reserve(600);
}
......@@ -41,13 +42,30 @@ public:
_cache.resize(size);
_stable = true;
_cacheIndex = -1;
_batchSize = -1.0;
}
~CacheEntry() {
_cache.clear();
}
/**
* @brief Updates the internal batch size value
*
* A 0.05 learning rate is used to update the internal batch size value.
*
* @param newsize The new observed batch size
**/
void updateBatchSize(uint64_t newsize) {
_batchSize = _batchSize < 0.0 ? (float)newsize : _batchSize*0.95 + (float)newsize*0.05;
}
/**
* @brief Returns the current batch size value.
**/
uint64_t getBatchSize() const { return (uint64_t)_batchSize; }
/**
* @brief Returns the time frame (in nanoseconds) covered by the cache.
**/
......@@ -102,8 +120,10 @@ public:
if(!buffer)
buffer = new std::vector<reading_t>();
buffer->clear();
uint64_t staleThreshold = (_maxHistory / (_cache.size() - 1)) * 4;
// If "live" is set to false, we add the estimated batch size in the computation of the stale threshold
uint64_t cacheSize = _cache.size()>1 ? _cache.size()-1 : 1;
uint64_t staleThreshold = (_maxHistory / cacheSize) * (live ? 4 : (uint64_t)_batchSize * 4);
uint64_t now = getTimestamp();
//Converting absolute timestamps to relative offsets for cache access
uint64_t startTsInt = rel ? startTs : now - startTs;
......@@ -116,12 +136,8 @@ public:
if( startIdx < 0 || endIdx < 0)
return buffer;
//Managing obsolete data
if(live && (now - startTsInt > _cache[startIdx].timestamp + staleThreshold || now - endTsInt > _cache[endIdx].timestamp + staleThreshold))
if(now - startTsInt > _cache[startIdx].timestamp + staleThreshold || now - endTsInt > _cache[endIdx].timestamp + staleThreshold)
return buffer;
//If no liveness check is performed, we still make sure that the latest entry in the cache
// is not older than its maximum length
else if(!live && now - getLatest().timestamp > _maxHistory)
return buffer;
if(startIdx <= endIdx)
buffer->insert(buffer->end(), _cache.begin() + startIdx, _cache.begin() + endIdx + 1);
else {
......@@ -137,31 +153,15 @@ public:
*
* The cache is considered valid if it is not outdated, that is, the latest reading is not
* older than 5 times the average sampling rate.
*
* @param live If true, more strict staleness checks are enforced
* @return True if the cache is still valid, False otherwise
**/
//TODO: update this method to make it independent from the sampling rate
bool checkValid() const {
if (_cache.size() > 2) {
// Cache element right after cacheIndex is the oldest entry (circular array)
int64_t ctr = (_cacheIndex + 1) % _cache.size();
uint64_t prev = _cache[ctr].timestamp;
// We compute the average sampling period for this specific sensor
uint64_t avg = 0;
do {
ctr = newer(ctr);
avg+= _cache[ctr].timestamp - prev;
prev = _cache[ctr].timestamp;
} while( ctr != _cacheIndex);
avg/= (_cache.size()-1);
// A SID is outdated if it's older than 5x the average sampling period.
if ((getTimestamp() - getLatest().timestamp) > 5 * avg) {
return false;
}
}
return true;
bool checkValid(bool live=false) const {
if(!_stable || _cache.empty())
return false;
uint64_t cacheSize = _cache.size()>1 ? _cache.size()-1 : 1;
uint64_t staleThreshold = (_maxHistory / cacheSize) * (live ? 4 : (uint64_t)_batchSize * 4);
return ((getTimestamp() - getLatest().timestamp) <= staleThreshold);
}
/**
......@@ -261,11 +261,11 @@ public:
* @return Index of element in the sensor cache.
**/
int64_t getOffset(int64_t t) const {
if(!_stable || t < 0)
if(!_stable || _cache.empty() || t < 0)
return -1;
else {
int64_t cacheSize = _cache.size();
int64_t offset = (( cacheSize * t ) / ((int64_t)_maxHistory));
int64_t offset = _maxHistory==0 ? 0 : (( cacheSize * t ) / ((int64_t)_maxHistory));
if(offset > cacheSize)
return -1;
return (cacheSize + _cacheIndex - offset) % cacheSize;
......@@ -314,6 +314,8 @@ private:
int64_t _cacheIndex;
// Time frame in nanoseconds covered by the cache
uint64_t _maxHistory;
// Batch size for the sensor represented by this cache entry
float _batchSize;
};
#endif /* CACHEENTRY_H_ */
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment