Commit 05e98971 authored by Alessio Netti's avatar Alessio Netti

Automatic cache cleanup

- The CollectAgent sensor cache is automatically purged each X seconds
(X is configurable through the cleaningInterval parameter), deleting
all entries than have not been updated in the last X seconds
- Fixed some minor bugs related to index management in sensor caches
parent cc37b99a
......@@ -465,9 +465,19 @@ int main(int argc, char* const argv[]) {
pmsgCtr = 0;
readingCtr = 0;
gettimeofday(&start, NULL);
uint64_t lastCleanup = start.tv_sec;
LOG(info) << "Collect Agent running...";
while(keepRunning) {
gettimeofday(&start, NULL);
if(start.tv_sec - lastCleanup > settings.cleaningInterval) {
uint64_t purged = mySensorCache.clean(settings.cleaningInterval * 1000000000);
lastCleanup = start.tv_sec;
if(purged > 0)
LOG(info) << "Cache: purged " << purged << " obsolete entries";
}
sleep(60);
/* not really thread safe but will do the job */
gettimeofday(&end, NULL);
......
global {
mqttListenAddress 127.0.0.1:1883
cleaningInterval 86400
messageThreads 128
messageSlots 16
cacheInterval 120
......
......@@ -21,6 +21,7 @@ Configuration::Configuration(const std::string& cfgFilePath) :
_global.messageThreads = 128;
_global.messageSlots = 16;
_global.cacheInterval = 900;
_global.cleaningInterval = 86400;
_global.logLevelFile = boost::log::trivial::trace;
_global.logLevelCmd = boost::log::trivial::info;
......@@ -50,6 +51,8 @@ bool Configuration::readGlobal() {
BOOST_FOREACH(boost::property_tree::iptree::value_type &global, cfg.get_child("global")) {
if (boost::iequals(global.first, "mqttListenAddress")) {
_global.mqttListenAddress = global.second.data();
} else if (boost::iequals(global.first, "cleaningInterval")) {
_global.cleaningInterval = stoul(global.second.data());
} else if (boost::iequals(global.first, "messageThreads")) {
_global.messageThreads = stoul(global.second.data());
} else if (boost::iequals(global.first, "messageSlots")) {
......
......@@ -35,6 +35,7 @@ typedef struct {
std::string mqttListenAddress;
std::string restListenAddress;
std::string tempDir;
uint64_t cleaningInterval;
uint64_t messageThreads;
uint64_t messageSlots;
uint64_t cacheInterval;
......
......@@ -124,6 +124,21 @@ void SensorCache::dump() {
}
}
uint64_t SensorCache::clean(uint64_t t) {
TimeStamp ts;
uint64_t thresh = ts.getRaw() - t;
uint64_t ctr = 0;
for (auto it = sensorCache.cbegin(); it != sensorCache.cend();) {
uint64_t latestTs = it->second.getLatest().timestamp;
if (latestTs!=0 && latestTs < thresh) {
it = sensorCache.erase(it);
ctr++;
} else
++it;
}
return ctr;
}
// -------------------------------------------------------------------------------------------
// DEFINITIONS FOR THE CACHEENTRY CLASS
......@@ -131,7 +146,7 @@ void SensorCache::dump() {
CacheEntry::CacheEntry(uint64_t maxHistory) {
_maxHistory = maxHistory;
_stable = false;
_cacheIndex = 0;
_cacheIndex = -1;
//We pre-allocate the cache to a initial guess of 600 elements - 10 minutes at 1s sampling period
_cache.reserve(600);
}
......@@ -142,9 +157,21 @@ CacheEntry::~CacheEntry() {
std::vector<sensorReading_t>& CacheEntry::getCache() { return _cache; }
sensorReading_t CacheEntry::getLatest() { return _cache[_cacheIndex]; }
sensorReading_t CacheEntry::getLatest() const {
if(_cacheIndex==-1) {
sensorReading_t s = {0,0};
return s;
} else
return _cache[_cacheIndex];
}
sensorReading_t CacheEntry::getOldest() { return _cache[(_cacheIndex + 1) % _cache.size()]; }
sensorReading_t CacheEntry::getOldest() const {
if(_cacheIndex==-1) {
sensorReading_t s = {0,0};
return s;
} else
return _cache[(_cacheIndex + 1) % _cache.size()];
}
uint64_t CacheEntry::older(uint64_t ind) { return ind == 0 ? _cache.size() - 1 : ind - 1; }
......@@ -167,7 +194,6 @@ void CacheEntry::store(sensorReading_t reading) {
_cache[_cacheIndex] = reading;
}
bool CacheEntry::checkValid() {
if (_cache.size() > 2) {
TimeStamp ts;
......
......@@ -92,12 +92,12 @@ typedef struct {
/**
* @brief Returns the latest sensor reading in the cache.
**/
sensorReading_t getLatest();
sensorReading_t getLatest() const;
/**
* @brief Returns the oldest sensor reading in the cache.
**/
sensorReading_t getOldest();
sensorReading_t getOldest() const;
private:
......@@ -116,7 +116,7 @@ typedef struct {
// Flag to signal cache status
bool _stable;
// Head of the cache in the circular array
uint64_t _cacheIndex;
int64_t _cacheIndex;
// Time frame in nanoseconds covered by the cache
uint64_t _maxHistory;
......@@ -178,6 +178,17 @@ public:
**/
void dump();
/**
* @brief Removes all obsolete entries from the cache
*
* All entries in the cache whose latest sensor reading is older than "now" - t nanoseconds are
* removed.
*
* @param t The threshold in nanoseconds for entries that must be removed
* @return The number of purged cache entries
**/
uint64_t clean(uint64_t t);
/**
* @brief Set a new maximum cache length.
*
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment