Commit 0b53c240 authored by Alessio Netti's avatar Alessio Netti
Browse files

Sensor cache overhaul

- Sensor cache is now based on circular arrays (actually vectors)
- CacheEntry class supplies a list of helper methods to ease access
to the cache (might be useful for dcdbpusher also)
- Thread safety for cache access is still to be verified (either way,
mutexes are not viable)
parent 31cd6276
......@@ -16,83 +16,24 @@
namespace DCDB {
SensorCache::SensorCache(uint64_t maxHistory) {
// TODO Auto-generated constructor stub
this->_maxHistory = maxHistory;
}
SensorCache::~SensorCache() {
// TODO Auto-generated destructor stub
sensorCache.clear();
}
void SensorCache::storeSensor(SensorId sid, uint64_t ts, int64_t val) {
sensorReading_t s = { val, ts };
const sensorCache_t& SensorCache::getSensorMap() {
return sensorCache;
}
void SensorCache::storeSensor(SensorId sid, uint64_t ts, uint64_t val) {
sensorReading_t s = { val, ts };
/* Remove the reserved bytes to leverage the standard find function */
sid.setRsvd(0);
sensorCache_t::iterator it = sensorCache.find(sid);
if (it != sensorCache.end()) {
// If the sensor cache's size is reduced at runtime, the while loop makes sure to resize everything nicely
while (it->second.size() && (it->second.front().timestamp+_maxHistory <= ts)) {
it->second.pop_front();
}
it->second.push_back(s);
} else {
sensorCache[sid].push_back(s);
}
}
bool SensorCache::checkValid(cacheEntry_t &entry) {
if (entry.size() > 2) {
TimeStamp ts;
uint64_t avg = 0;
cacheEntry_t::iterator it = entry.begin();
uint64_t prev = it->timestamp;
for (it++; it != entry.end(); it++) {
avg+= it->timestamp - prev;
prev = it->timestamp;
}
avg/= (entry.size()-1);
/*
* A SID is outdated if it's older than 5x the average sampling period.
*/
if ((ts.getRaw() - entry.back().timestamp) > 5 * avg) {
return false;
}
}
return true;
}
int64_t SensorCache::getAverage(cacheEntry_t &entry, uint64_t avg) {
TimeStamp ts;
if (entry.size() > 0) {
if (ts.getRaw() - entry.front().timestamp < avg * NS_PER_S) {
throw std::out_of_range("Sid outdated");
return 0;
}
double sum = 0;
cacheEntry_t::reverse_iterator it, prev;
it = prev = entry.rbegin();
it++;
while ((it != entry.rend()) && ((ts.getRaw() - it->timestamp) <= avg * NS_PER_S)) {
uint64_t deltaT = (prev->timestamp - it->timestamp);
sum+= ((it->val + prev->val) / 2) * deltaT;
//std::cout << "SensorCache::getAverage sum=" << sum << " deltaT=" <<deltaT << " it=(" << it->timestamp << "," <<it->val <<") prev=(" << prev->timestamp << "," << prev->val <<") " << (ts.getRaw() - it->timestamp) << std::endl;
prev = it++;
}
//std::cout << "SensorCache::getAverage (" << prev->timestamp << "," <<prev->val <<") (" << entry.back().timestamp << "," << entry.back().val << ") sum=" << sum << " deltaT=" << entry.back().timestamp - prev->timestamp << std::endl;
if (prev == entry.rbegin()) {
return entry.back().val;
} else {
return sum/(entry.back().timestamp - prev->timestamp);
}
}
throw std::invalid_argument("Sid not found");
return 0;
if(sensorCache.find(sid) == sensorCache.end())
sensorCache[sid] = CacheEntry(_maxHistory);
sensorCache[sid].store(s);
}
uint64_t SensorCache::getSensor(SensorId sid, uint64_t avg) {
......@@ -105,16 +46,16 @@ uint64_t SensorCache::getSensor(SensorId sid, uint64_t avg) {
return 0;
}
if (!checkValid(it->second))
if (!it->second.checkValid())
{
throw std::out_of_range("Sid outdated");
return 0;
}
if (avg) {
return getAverage(it->second, avg);
return it->second.getAverage(avg);
} else {
return it->second.back().val;
return it->second.getLatest().val;
}
}
......@@ -140,6 +81,7 @@ uint64_t SensorCache::getSensor(std::string topic, uint64_t avg) {
sensorCache_t::iterator mostRecentSidIt = sensorCache.end();
bool foundOne = false;
// TODO: check also this
/* Iterate over the cache until the current entry is > sidHi */
while ((it != sensorCache.end()) && (it->first <= sidHi)) {
if ((it->first & sidMask) == sidLow) {
......@@ -147,7 +89,7 @@ uint64_t SensorCache::getSensor(std::string topic, uint64_t avg) {
/* We only return one value, even if multiple SensorIds would match.
* At least make sure it's the most recent value
*/
if (checkValid(it->second) && ((mostRecentSidIt == sensorCache.end()) || mostRecentSidIt->second.back().timestamp < it->second.back().timestamp)) {
if (it->second.checkValid() && ((mostRecentSidIt == sensorCache.end()) || mostRecentSidIt->second.getLatest().timestamp < it->second.getLatest().timestamp)) {
mostRecentSidIt = it;
}
}
......@@ -164,9 +106,9 @@ uint64_t SensorCache::getSensor(std::string topic, uint64_t avg) {
}
if (avg) {
return getAverage(mostRecentSidIt->second, avg);
return mostRecentSidIt->second.getAverage(avg);
} else {
return mostRecentSidIt->second.back().val;
return mostRecentSidIt->second.getLatest().val;
}
}
......@@ -174,16 +116,151 @@ void SensorCache::dump() {
std::cout << "SensorCache Dump:" << std::endl;
for (sensorCache_t::iterator sit = sensorCache.begin(); sit != sensorCache.end(); sit++) {
std::cout << " id=" << sit->first.toString() << " data=[";
for (cacheEntry_t::iterator eit = sit->second.begin(); eit != sit->second.end(); eit++) {
if (eit != sit->second.begin()) {
for (std::vector<sensorReading_t>::const_iterator eit = sit->second.getCache().begin(); eit != sit->second.getCache().end(); eit++) {
if (eit != sit->second.getCache().begin()) {
std::cout << ",";
}
std::cout << "(" << eit->val << "," << eit->timestamp/NS_PER_S << "." << std::setfill ('0') << std::setw (9) << eit->timestamp%NS_PER_S << ")";
}
std::cout << "]" << std::endl;
}
}
// -------------------------------------------------------------------------------------------
// DEFINITIONS FOR THE CACHEENTRY CLASS
CacheEntry::CacheEntry(uint64_t maxHistory) {
_maxHistory = maxHistory;
_stable = false;
_cacheIndex = 0;
}
CacheEntry::~CacheEntry() {
_cache.clear();
}
const std::vector<sensorReading_t>& CacheEntry::getCache() { return _cache; }
sensorReading_t CacheEntry::getLatest() { return _cache[_cacheIndex]; }
sensorReading_t CacheEntry::getOldest() { return _cache[(_cacheIndex + 1) % _cache.size()]; }
uint64_t CacheEntry::older(uint64_t ind) { return ind == 0 ? _cache.size() - 1 : ind - 1; }
uint64_t CacheEntry::newer(uint64_t ind) { return (ind + 1) % _cache.size(); }
void CacheEntry::store(sensorReading_t reading) {
// Sensor caches have two operating phases: first, the sensor cache vector expands until the maximum allowed time
// range is covered. After this "stable" size is reached, the sensor vector is used like a circular array, and its
// size does not change anymore
//std::cout << "Value: " << reading.val << " at time: " << reading.timestamp << " size: " << _cache.size() << std::endl;
_cacheIndex = _stable ? (_cacheIndex + 1) % _cache.size() : (_cacheIndex + 1);
if(!_stable) {
_cache.push_back(reading);
if(_cache.front().timestamp + _maxHistory <= reading.timestamp)
_stable = true;
} else
_cache[_cacheIndex] = reading;
}
bool CacheEntry::checkValid() {
if (_cache.size() > 2) {
TimeStamp ts;
// Cache element right after cacheIndex is the oldest entry (circular array)
uint64_t ctr = (_cacheIndex + 1) % _cache.size();
uint64_t prev = _cache[ctr].timestamp;
// We compute the average sampling period for this specific sensor
uint64_t avg = 0;
do {
ctr = newer(ctr);
avg+= _cache[ctr].timestamp - prev;
prev = _cache[ctr].timestamp;
} while( ctr != _cacheIndex);
avg/= (_cache.size()-1);
// A SID is outdated if it's older than 5x the average sampling period.
if ((ts.getRaw() - getLatest().timestamp) > 5 * avg) {
return false;
}
}
return true;
}
uint64_t CacheEntry::getAverage(uint64_t avg) {
TimeStamp ts;
if (_cache.size() > 0) {
// TODO: check this
if (ts.getRaw() - getOldest().timestamp < avg * NS_PER_S) {
throw std::out_of_range("Sid outdated");
return 0;
}
double sum = 0;
uint64_t it, prev;
prev = _cacheIndex;
it = older(prev);
// We compute the weighted average of elements in the cache that fall within the specified window
while ((it != _cacheIndex) && ((ts.getRaw() - _cache[it].timestamp) <= avg * NS_PER_S)) {
uint64_t deltaT = (_cache[prev].timestamp - _cache[it].timestamp);
sum += ((_cache[it].val + _cache[prev].val) / 2) * deltaT;
//std::cout << "SensorCache::getAverage sum=" << sum << " deltaT=" <<deltaT << " it=(" << it->timestamp << "," <<it->val <<") prev=(" << prev->timestamp << "," << prev->val <<") " << (ts.getRaw() - it->timestamp) << std::endl;
prev = it;
it = older(it);
}
//std::cout << "SensorCache::getAverage (" << prev->timestamp << "," <<prev->val <<") (" << entry.back().timestamp << "," << entry.back().val << ") sum=" << sum << " deltaT=" << entry.back().timestamp - prev->timestamp << std::endl;
// If prev points to the cache head, there was only one element in the aggregation window
if (prev == _cacheIndex) {
return getLatest().val;
} else {
return sum/(getLatest().timestamp - _cache[prev].timestamp);
}
}
throw std::invalid_argument("Sid not found");
return 0;
}
int64_t CacheEntry::searchTimestamp(uint64_t t, bool relative) {
//TODO: Check for thread-safety
// Cache is empty or has only one element
if(_cache.size()<2)
return -1;
// Target timestamp (relative or absolute) is outside of the time frame contained in the cache
else if(!relative && (t > _cache[_cacheIndex].timestamp || t < _cache[(_cacheIndex+1) % _cache.size()].timestamp))
return -1;
else if(relative && _cache[_cacheIndex].timestamp - t < _cache[(_cacheIndex+1) % _cache.size()].timestamp)
return -1;
if(relative)
t = _cache[_cacheIndex].timestamp - t;
int64_t pivot=0, pivotReal=0, aPoint=0, bPoint=_cache.size()-1;
// Attention! aPoint and bPoint are linearized indexes, and do not take into account the presence of cacheIndex
// When computing the position of the pivot, we map it to the actual index in the circular array
// Standard (leftmost) binary search algorithm below
while(aPoint < bPoint) {
pivot = (aPoint + bPoint)/2;
pivotReal = (_cacheIndex + 1 + pivot) % _cache.size();
if(t <= _cache[pivotReal].timestamp)
bPoint = pivot;
else
aPoint = pivot + 1;
}
return (_cacheIndex + 1 + aPoint) % _cache.size();
}
int64_t CacheEntry::getOffset(uint64_t t) {
if(!_stable)
return -1;
else {
int64_t offset = ( ( _cache.size() * t ) / ( getLatest().timestamp - getOldest().timestamp ) ) - 1;
return ( _cache.size() + _cacheIndex - offset ) % _cache.size();
}
}
} /* namespace DCDB */
......@@ -9,18 +9,123 @@
#define COLLECTAGENT_SENSORCACHE_H_
#include <map>
#include <list>
#include <vector>
#include <utility>
#include <dcdb/sensorid.h>
namespace DCDB {
typedef struct {
int64_t val;
uint64_t val;
uint64_t timestamp;
} sensorReading_t;
typedef std::list<sensorReading_t> cacheEntry_t;
typedef std::map<SensorId, cacheEntry_t> sensorCache_t;
// -------------------------------------------------------------------------------------------
// DEFINITIONS FOR THE CACHEENTRY CLASS
class CacheEntry {
public:
CacheEntry(uint64_t maxHistory=300000000000);
virtual ~CacheEntry();
/**
* @brief Returns a constant reference to the internal cache vector.
**/
const std::vector<sensorReading_t>& getCache();
/**
* @brief Stores a sensor reading in the cache.
*
* @param reading sensorReading_t struct containing the latest sensor reading.
**/
void store(sensorReading_t reading);
/**
* @brief Ensures that the cache is still valid.
*
* The cache is considered valid if it is not outdated, that is, the latest reading is not
* older than 5 times the average sampling rate.
*
* @return True if the cache is stil valid, False otherwise
**/
bool checkValid();
/**
* @brief Returns an average of recent sensor readings.
*
* Only the sensor readings pushed in the last "avg" nanoseconds are used in the average
* computation.
*
* @param avg length of the average aggregation window in nanoseconds.
* @return Average value of the last sensor readings.
**/
uint64_t getAverage(uint64_t avg);
/**
* @brief Searches for the input timestamp in the cache.
*
* Binary search is used the search for the "t" timestamp within the sensor cache, and its
* index is returned if successful.
*
* @param t Timestamp to be searched, in nanoseconds.
* @param relative Boolean: if True, t is considered as a relative time offset against the most recent reading.
*
* @return The index of the closest sensor reading to t, or -1 if out of bounds.
**/
int64_t searchTimestamp(uint64_t t, bool relative=false);
/**
* @brief Returns the index of the cache element that is older than the latest entry by "t".
*
* Unlike searchTimestamp, this method does not perform an actual search, but simply computes
* the number of elements that cover a time interval of "t" nanoseconds. This value is then
* used together with _cacheIndex to compute the starting index of the most recent cache
* portion that covers such time interval.
*
* @param t Length of the time frame in nanoseconds.
*
* @return Index of element in the sensor cache.
**/
int64_t getOffset(uint64_t t);
/**
* @brief Returns the latest sensor reading in the cache.
**/
sensorReading_t getLatest();
/**
* @brief Returns the oldest sensor reading in the cache.
**/
sensorReading_t getOldest();
private:
/**
* @brief Returns the index of the immediately newer element with respect to input index "ind".
**/
uint64_t newer(uint64_t ind);
/**
* @brief Returns the index of the immediately older element with respect to input index "ind".
**/
uint64_t older(uint64_t ind);
// Internal cache vector
std::vector<sensorReading_t> _cache;
// Flag to signal cache status
bool _stable;
// Head of the cache in the circular array
uint64_t _cacheIndex;
// Time frame in nanoseconds covered by the cache
uint64_t _maxHistory;
};
// -------------------------------------------------------------------------------------------
// DEFINITIONS FOR THE SENSORCACHE CLASS
typedef std::map<SensorId, CacheEntry> sensorCache_t;
class SensorCache {
public:
......@@ -28,56 +133,63 @@ public:
virtual ~SensorCache();
/**
* @brief Store a sensor reading in the SensorCache.
* @param sid The SensorId of the sensor to be cached.
* @param ts The timestamp of the sensor reading.
* @param val The actual sensor reading.
* @return Returns true if the topic string was valid and the data field of the object was populated.
*/
void storeSensor(SensorId sid, uint64_t ts, int64_t val);
/**
* @brief Return a sensor reading from the SensorCache.
* @param sid The SensorId of the sensor to be looked up in the cache.
* @return The sensor reading of the corresponding cache entry.
* @throws std::invalid_argument if the SensorId doesn't exist in the SensorCache.
* @throws std::out_of_range if the sid was found in the cache entry but is outdated.
*/
* @brief Returns a constant reference to the internal sensor cache map.
**/
const sensorCache_t& getSensorMap();
/**
* @brief Store a sensor reading in the SensorCache.
*
* @param sid The SensorId of the sensor to be cached.
* @param ts The timestamp of the sensor reading.
* @param val The actual sensor reading.
* @return Returns true if the topic string was valid and the data field of the object was populated.
**/
void storeSensor(SensorId sid, uint64_t ts, uint64_t val);
/**
* @brief Return a sensor reading from the SensorCache.
*
* @param sid The SensorId of the sensor to be looked up in the cache.
* @return The sensor reading of the corresponding cache entry.
* @throws std::invalid_argument if the SensorId doesn't exist in the SensorCache.
* @throws std::out_of_range if the sid was found in the cache entry but is outdated.
**/
uint64_t getSensor(SensorId sid, uint64_t avg=0);
/**
* @brief Return a sensor reading from the SensorCache.
* @param topic The topic of the sensor to be looked up in the cache. May contain wildcards.
* @return The sensor reading of the corresponding cache entry.
* @throws std::invalid_argument if the topic couldn't be found in the SensorCache.
* @throws std::out_of_range if the topic was found in the cache entry but is outdated.
*/
/**
* @brief Return a sensor reading from the SensorCache.
*
* @param topic The topic of the sensor to be looked up in the cache. May contain wildcards.
* @return The sensor reading of the corresponding cache entry.
* @throws std::invalid_argument if the topic couldn't be found in the SensorCache.
* @throws std::out_of_range if the topic was found in the cache entry but is outdated.
**/
uint64_t getSensor(std::string topic, uint64_t avg=0);
/**
* @brief Dump the contents of the SensorCache to stdout.
*/
/**
* @brief Dump the contents of the SensorCache to stdout.
**/
void dump();
/**
* @brief Set a new maximum cache length.
*
* @param maxHistory: new sensor cache length value.
*/
* @brief Set a new maximum cache length.
*
* @param maxHistory: new sensor cache length value.
**/
void setMaxHistory(uint64_t maxHistory) { this->_maxHistory = maxHistory; }
/**
* @brief Returns the current maximum sensor cache length
*
* @returns Current maximum sensor cache length
*/
* @brief Returns the current maximum sensor cache length
*
* @return Current maximum sensor cache length
*/
uint64_t getMaxHistory() { return this->_maxHistory; }
private:
bool checkValid(cacheEntry_t &entry);
int64_t getAverage(cacheEntry_t &entry, uint64_t avg);
// Map containing the single sensor caches
sensorCache_t sensorCache;
// Global maximum allowed time frame for the sensor caches
uint64_t _maxHistory;
};
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment