Commit 06155b53 authored by Alessio Netti's avatar Alessio Netti

Sensor cache changes

- Sensor caches across DCDB are now managed by the CacheEntry
implementation under "common"
- Provides utility methods to perform averages, checks, get views etc.
- SensorCache class in collectagent kept due to its libdcdb dependencies
parent 7b39ee1d
include ../config.mk
CXXFLAGS = -O2 -g --std=c++11 -Wall -Wno-unused-local-typedefs -Wno-deprecated-declarations -Wno-unknown-warning-option -fmessage-length=0 -I../common/include/ -I../lib/include -I$(DCDBDEPLOYPATH)/include -DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG -DBOOST_LOG_DYN_LINK -I$(DCDBDEPSPATH)/cpp-netlib-0.12.0-final/deps/asio/asio/include -DVERSION=\"$(VERSION)\"
CXXFLAGS = -O2 -g --std=c++11 -Wall -Wno-unused-function -Wno-unused-local-typedefs -Wno-deprecated-declarations -Wno-unknown-warning-option -fmessage-length=0 -I../common/include/ -I../lib/include -I$(DCDBDEPLOYPATH)/include -DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG -DBOOST_LOG_DYN_LINK -I$(DCDBDEPSPATH)/cpp-netlib-0.12.0-final/deps/asio/asio/include -DVERSION=\"$(VERSION)\"
OBJS = ../common/src/logging.o \
../common/src/sensorcache.o \
sensorcache.o \
collectagent.o \
configuration.o \
simplemqttserver.o \
......
......@@ -46,7 +46,6 @@
#include "messaging.h"
#include "abrt.h"
#include "dcdbdaemon.h"
#include "sensorcache.h"
#define __STDC_FORMAT_MACROS
......@@ -59,10 +58,10 @@ bool statistics;
uint64_t msgCtr;
uint64_t pmsgCtr;
uint64_t readingCtr;
SensorCache mySensorCache;
DCDB::Connection* dcdbConn;
DCDB::SensorDataStore *mySensorDataStore;
DCDB::SensorConfig *mySensorConfig;
DCDB::SensorCache mySensorCache;
DCDB::SCError err;
logger_t lg;
......@@ -98,7 +97,7 @@ struct httpHandler_t {
boost::network::uri::query_map(uri, queries);
int avg = atoi(queries.find("avg")->second.c_str());
int64_t val = mySensorCache.getSensor(uri.path(), (uint64_t) avg);
int64_t val = mySensorCache.getSensor(uri.path(), (uint64_t)avg * 1000000000);
data << val << "\n";
//data << "Sid : " << sid.toString() << ", Value: " << val << "." << std::endl;
......
......@@ -6,14 +6,6 @@
*/
#include "sensorcache.h"
#include <dcdb/timestamp.h>
#include <exception>
#include <iostream>
#include <iomanip>
#include <algorithm>
namespace DCDB {
SensorCache::SensorCache(uint64_t maxHistory) {
this->_maxHistory = maxHistory;
......@@ -28,7 +20,7 @@ const sensorCache_t& SensorCache::getSensorMap() {
}
void SensorCache::storeSensor(SensorId sid, uint64_t ts, int64_t val) {
sensorReading_t s = { val, ts };
reading_t s = { val, ts };
/* Remove the reserved bytes to leverage the standard find function */
sid.setRsvd(0);
//TODO: Check for thread-safety
......@@ -54,7 +46,7 @@ int64_t SensorCache::getSensor(SensorId sid, uint64_t avg) {
if (avg) {
return it->second.getAverage(avg);
} else {
return it->second.getLatest().val;
return it->second.getLatest().value;
}
}
......@@ -106,7 +98,7 @@ int64_t SensorCache::getSensor(std::string topic, uint64_t avg) {
if (avg) {
return mostRecentSidIt->second.getAverage(avg);
} else {
return mostRecentSidIt->second.getLatest().val;
return mostRecentSidIt->second.getLatest().value;
}
}
......@@ -114,19 +106,18 @@ void SensorCache::dump() {
std::cout << "SensorCache Dump:" << std::endl;
for (sensorCache_t::iterator sit = sensorCache.begin(); sit != sensorCache.end(); sit++) {
std::cout << " id=" << sit->first.toString() << " data=[";
for (std::vector<sensorReading_t>::const_iterator eit = sit->second.getCache().begin(); eit != sit->second.getCache().end(); eit++) {
if (eit != sit->second.getCache().begin()) {
for (std::vector<reading_t>::const_iterator eit = sit->second.getRaw()->begin(); eit != sit->second.getRaw()->end(); eit++) {
if (eit != sit->second.getRaw()->begin()) {
std::cout << ",";
}
std::cout << "(" << eit->val << "," << eit->timestamp/NS_PER_S << "." << std::setfill ('0') << std::setw (9) << eit->timestamp%NS_PER_S << ")";
std::cout << "(" << eit->value << "," << eit->timestamp/NS_PER_S << "." << std::setfill ('0') << std::setw (9) << eit->timestamp%NS_PER_S << ")";
}
std::cout << "]" << std::endl;
}
}
uint64_t SensorCache::clean(uint64_t t) {
TimeStamp ts;
uint64_t thresh = ts.getRaw() - t;
uint64_t thresh = getTimestamp() - t;
uint64_t ctr = 0;
for (auto it = sensorCache.cbegin(); it != sensorCache.cend();) {
uint64_t latestTs = it->second.getLatest().timestamp;
......@@ -138,159 +129,3 @@ uint64_t SensorCache::clean(uint64_t t) {
}
return ctr;
}
// -------------------------------------------------------------------------------------------
// DEFINITIONS FOR THE CACHEENTRY CLASS
CacheEntry::CacheEntry(uint64_t maxHistory) {
_maxHistory = maxHistory;
_stable = false;
_cacheIndex = -1;
//We pre-allocate the cache to a initial guess of 600 elements - 10 minutes at 1s sampling period
_cache.reserve(600);
}
CacheEntry::~CacheEntry() {
_cache.clear();
}
std::vector<sensorReading_t>& CacheEntry::getCache() { return _cache; }
sensorReading_t CacheEntry::getLatest() const {
if(_cacheIndex==-1) {
sensorReading_t s = {0,0};
return s;
} else
return _cache[_cacheIndex];
}
sensorReading_t CacheEntry::getOldest() const {
if(_cacheIndex==-1) {
sensorReading_t s = {0,0};
return s;
} else
return _cache[(_cacheIndex + 1) % _cache.size()];
}
uint64_t CacheEntry::older(uint64_t ind) { return ind == 0 ? _cache.size() - 1 : ind - 1; }
uint64_t CacheEntry::newer(uint64_t ind) { return (ind + 1) % _cache.size(); }
void CacheEntry::store(sensorReading_t reading) {
// Sensor caches have two operating phases: first, the sensor cache vector expands until the maximum allowed time
// range is covered. After this "stable" size is reached, the sensor vector is used like a circular array, and its
// size does not change anymore
//std::cout << "Value: " << reading.val << " at time: " << reading.timestamp << " size: " << _cache.size() << std::endl;
_cacheIndex = _stable ? (_cacheIndex + 1) % _cache.size() : (_cacheIndex + 1);
if(!_stable) {
_cache.push_back(reading);
if(_cache.front().timestamp + _maxHistory <= reading.timestamp) {
_stable = true;
//We shrink the cache capacity, if necessary, to its actual size
_cache.shrink_to_fit();
}
} else
_cache[_cacheIndex] = reading;
}
bool CacheEntry::checkValid() {
if (_cache.size() > 2) {
TimeStamp ts;
// Cache element right after cacheIndex is the oldest entry (circular array)
int64_t ctr = (_cacheIndex + 1) % _cache.size();
uint64_t prev = _cache[ctr].timestamp;
// We compute the average sampling period for this specific sensor
uint64_t avg = 0;
do {
ctr = newer(ctr);
avg+= _cache[ctr].timestamp - prev;
prev = _cache[ctr].timestamp;
} while( ctr != _cacheIndex);
avg/= (_cache.size()-1);
// A SID is outdated if it's older than 5x the average sampling period.
if ((ts.getRaw() - getLatest().timestamp) > 5 * avg) {
return false;
}
}
return true;
}
int64_t CacheEntry::getAverage(uint64_t avg) {
TimeStamp ts;
if (_cache.size() > 0) {
if (ts.getRaw() - getOldest().timestamp < avg * NS_PER_S) {
throw std::out_of_range("Not enough data");
}
else if (ts.getRaw() - getLatest().timestamp > avg * NS_PER_S) {
throw std::out_of_range("SID outdated");
}
double sum = 0;
int64_t it, prev;
prev = _cacheIndex;
it = older(prev);
// We compute the weighted average of elements in the cache that fall within the specified window
while ((it != _cacheIndex) && ((ts.getRaw() - _cache[it].timestamp) <= avg * NS_PER_S)) {
uint64_t deltaT = (_cache[prev].timestamp - _cache[it].timestamp);
sum += ((_cache[it].val + _cache[prev].val) / 2) * deltaT;
//std::cout << "SensorCache::getAverage sum=" << sum << " deltaT=" <<deltaT << " it=(" << it->timestamp << "," <<it->val <<") prev=(" << prev->timestamp << "," << prev->val <<") " << (ts.getRaw() - it->timestamp) << std::endl;
prev = it;
it = older(it);
}
//std::cout << "SensorCache::getAverage (" << prev->timestamp << "," <<prev->val <<") (" << entry.back().timestamp << "," << entry.back().val << ") sum=" << sum << " deltaT=" << entry.back().timestamp - prev->timestamp << std::endl;
// If prev points to the cache head, there was only one element in the aggregation window
if (prev == _cacheIndex) {
return getLatest().val;
} else {
return sum/(getLatest().timestamp - _cache[prev].timestamp);
}
}
throw std::invalid_argument("Sid not found");
}
int64_t CacheEntry::searchTimestamp(uint64_t t, bool relative) {
// Cache is empty or has only one element
if(_cache.size()<2)
return -1;
// Target timestamp (relative or absolute) is outside of the time frame contained in the cache
else if(!relative && (t > _cache[_cacheIndex].timestamp || t < _cache[(_cacheIndex+1) % _cache.size()].timestamp))
return -1;
else if(relative && _cache[_cacheIndex].timestamp - t < _cache[(_cacheIndex+1) % _cache.size()].timestamp)
return -1;
if(relative)
t = _cache[_cacheIndex].timestamp - t;
int64_t pivot=0, pivotReal=0, aPoint=0, bPoint=_cache.size()-1;
// Attention! aPoint and bPoint are linearized indexes, and do not take into account the presence of cacheIndex
// When computing the position of the pivot, we map it to the actual index in the circular array
// Standard (leftmost) binary search algorithm below
while(aPoint < bPoint) {
pivot = (aPoint + bPoint)/2;
pivotReal = (_cacheIndex + 1 + pivot) % _cache.size();
if(t <= _cache[pivotReal].timestamp)
bPoint = pivot;
else
aPoint = pivot + 1;
}
return (_cacheIndex + 1 + aPoint) % _cache.size();
}
int64_t CacheEntry::getOffset(int64_t t) {
if(!_stable || t < 0)
return -1;
else {
int64_t offset = ( ( (int64_t)_cache.size() * t ) / ( getLatest().timestamp - getOldest().timestamp ) ); //- 1;
if(offset > (int64_t)_cache.size())
return -1;
return ( _cache.size() + _cacheIndex - offset ) % _cache.size();
}
}
} /* namespace DCDB */
......@@ -9,121 +9,11 @@
#define COLLECTAGENT_SENSORCACHE_H_
#include <map>
#include <vector>
#include <utility>
#include <dcdb/sensorid.h>
#include <dcdb/timestamp.h>
#include "cacheentry.h"
namespace DCDB {
typedef struct {
int64_t val;
uint64_t timestamp;
} sensorReading_t;
// -------------------------------------------------------------------------------------------
// DEFINITIONS FOR THE CACHEENTRY CLASS
class CacheEntry {
public:
CacheEntry(uint64_t maxHistory=300000000000);
virtual ~CacheEntry();
/**
* @brief Returns a constant reference to the internal cache vector.
**/
std::vector<sensorReading_t>& getCache();
/**
* @brief Stores a sensor reading in the cache.
*
* @param reading sensorReading_t struct containing the latest sensor reading.
**/
void store(sensorReading_t reading);
/**
* @brief Ensures that the cache is still valid.
*
* The cache is considered valid if it is not outdated, that is, the latest reading is not
* older than 5 times the average sampling rate.
*
* @return True if the cache is still valid, False otherwise
**/
bool checkValid();
/**
* @brief Returns an average of recent sensor readings.
*
* Only the sensor readings pushed in the last "avg" nanoseconds are used in the average
* computation.
*
* @param avg length of the average aggregation window in nanoseconds.
* @return Average value of the last sensor readings.
**/
int64_t getAverage(uint64_t avg);
/**
* @brief Searches for the input timestamp in the cache.
*
* Binary search is used the search for the "t" timestamp within the sensor cache, and its
* index is returned if successful.
*
* @param t Timestamp to be searched, in nanoseconds.
* @param relative Boolean: if True, t is considered as a relative time offset against the most recent reading.
*
* @return The index of the closest sensor reading to t, or -1 if out of bounds.
**/
int64_t searchTimestamp(uint64_t t, bool relative=false);
/**
* @brief Returns the index of the cache element that is older than the latest entry by "t".
*
* Unlike searchTimestamp, this method does not perform an actual search, but simply computes
* the number of elements that cover a time interval of "t" nanoseconds. This value is then
* used together with _cacheIndex to compute the starting index of the most recent cache
* portion that covers such time interval.
*
* @param t Length of the time frame in nanoseconds.
*
* @return Index of element in the sensor cache.
**/
int64_t getOffset(int64_t t);
/**
* @brief Returns the latest sensor reading in the cache.
**/
sensorReading_t getLatest() const;
/**
* @brief Returns the oldest sensor reading in the cache.
**/
sensorReading_t getOldest() const;
private:
/**
* @brief Returns the index of the immediately newer element with respect to input index "ind".
**/
uint64_t newer(uint64_t ind);
/**
* @brief Returns the index of the immediately older element with respect to input index "ind".
**/
uint64_t older(uint64_t ind);
// Internal cache vector
std::vector<sensorReading_t> _cache;
// Flag to signal cache status
bool _stable;
// Head of the cache in the circular array
int64_t _cacheIndex;
// Time frame in nanoseconds covered by the cache
uint64_t _maxHistory;
};
// -------------------------------------------------------------------------------------------
// DEFINITIONS FOR THE SENSORCACHE CLASS
using namespace DCDB;
typedef std::map<SensorId, CacheEntry> sensorCache_t;
......@@ -211,6 +101,4 @@ private:
};
} /* namespace DCDB */
#endif /* COLLECTAGENT_SENSORCACHE_H_ */
/*
* cacheentry.h
*
* Created on: 8 Mar 2019
* Author: Alessio Netti
*/
#ifndef CACHEENTRY_H_
#define CACHEENTRY_H_
#include <vector>
#include <utility>
#include <exception>
#include <iostream>
#include <iomanip>
#include <algorithm>
#include "timestamp.h"
typedef struct {
int64_t value;
uint64_t timestamp;
} reading_t;
typedef struct {
uint64_t value;
uint64_t timestamp;
} ureading_t;
class CacheEntry {
public:
CacheEntry(uint64_t maxHistory=300000000000) {
_maxHistory = maxHistory;
_stable = false;
_cacheIndex = -1;
//We pre-allocate the cache to a initial guess of 600 elements - 10 minutes at 1s sampling period
_cache.reserve(600);
}
CacheEntry(uint64_t maxHistory, uint64_t size) {
_maxHistory = maxHistory;
_cache.resize(size);
_stable = true;
_cacheIndex = -1;
}
~CacheEntry() {
_cache.clear();
}
/**
* @brief Returns the time frame (in nanoseconds) covered by the cache.
**/
uint64_t getMaxHistory() const { return _maxHistory; }
/**
* @brief Returns a reference to the internal cache vector.
**/
const std::vector<reading_t>* getRaw() const { return &_cache; }
/**
* @brief Returns the element with index i in the cache.
**/
reading_t get(unsigned i) const { return _cache[i]; }
/**
* @brief Stores a sensor reading in the cache.
*
* @param reading reading_t struct containing the latest sensor reading.
**/
void store(reading_t reading) {
// Sensor caches have two operating phases: first, the sensor cache vector expands until the maximum allowed time
// range is covered. After this "stable" size is reached, the sensor vector is used like a circular array, and its
// size does not change anymore
//std::cout << "Value: " << reading.value << " at time: " << reading.timestamp << " size: " << _cache.size() << std::endl;
_cacheIndex = _stable ? (_cacheIndex + 1) % _cache.size() : (_cacheIndex + 1);
if(!_stable) {
_cache.push_back(reading);
if(_cache.front().timestamp + _maxHistory <= reading.timestamp) {
_stable = true;
//We shrink the cache capacity, if necessary, to its actual size
_cache.shrink_to_fit();
}
} else
_cache[_cacheIndex] = reading;
}
/**
* @brief Returns a view of the sensor cache corresponding to a certain time frame.
*
* Starting from the sensor cache, a new vector is built using readings contained in the time
* frame specified by the startTs and endTs input arguments.
*
* @param startTs Starting timestamp of the desired view
* @param endTs End timestamp of the desired view
* @param buffer Pointer to a vector to be used to store the view. If null, a new vector is allocated
* @param rel If true, startTs and endTs are interpreted as relative timestamps against "the most recent sensor reading"
* @param live If true, checks are performed such that the returned view does not correspond to obsolete data
* @return A vector of sensor readings containing the cache view, or an empty vector if not possibile
**/
std::vector<reading_t>* getView(uint64_t startTs, uint64_t endTs, std::vector<reading_t>* buffer=nullptr, bool rel=false, bool live=false) const {
if(!buffer)
buffer = new std::vector<reading_t>();
buffer->clear();
uint64_t interval = (_maxHistory / (_cache.size() - 1)) * 2;
uint64_t now = getTimestamp();
//Converting absolute timestamps to relative offsets for cache access
uint64_t startTsInt = rel ? startTs : now - startTs;
uint64_t endTsInt = rel ? endTs : now - endTs;
//Getting the cache indexes to access sensor data
int64_t startIdx = getOffset(startTsInt);
int64_t endIdx = getOffset(endTsInt);
//Managing invalid time offsets
if( startIdx < 0 || endIdx < 0)
return buffer;
//Managing obsolete data
if(live && (now - startTsInt > _cache[startIdx].timestamp + interval || now - endTsInt > _cache[endIdx].timestamp + interval))
return buffer;
if(startIdx <= endIdx)
buffer->insert(buffer->end(), _cache.begin() + startIdx, _cache.begin() + endIdx + 1);
else {
buffer->insert(buffer->end(), _cache.begin() + startIdx, _cache.end());
buffer->insert(buffer->end(), _cache.begin(), _cache.begin() + endIdx + 1);
}
return buffer;
}
/**
* @brief Ensures that the cache is still valid.
*
* The cache is considered valid if it is not outdated, that is, the latest reading is not
* older than 5 times the average sampling rate.
*
* @return True if the cache is still valid, False otherwise
**/
bool checkValid() const {
if (_cache.size() > 2) {
// Cache element right after cacheIndex is the oldest entry (circular array)
int64_t ctr = (_cacheIndex + 1) % _cache.size();
uint64_t prev = _cache[ctr].timestamp;
// We compute the average sampling period for this specific sensor
uint64_t avg = 0;
do {
ctr = newer(ctr);
avg+= _cache[ctr].timestamp - prev;
prev = _cache[ctr].timestamp;
} while( ctr != _cacheIndex);
avg/= (_cache.size()-1);
// A SID is outdated if it's older than 5x the average sampling period.
if ((getTimestamp() - getLatest().timestamp) > 5 * avg) {
return false;
}
}
return true;
}
/**
* @brief Returns an average of recent sensor readings.
*
* Only the sensor readings pushed in the last "avg" nanoseconds are used in the average
* computation.
*
* @param avg length of the average aggregation window in nanoseconds.
* @return Average value of the last sensor readings.
**/
int64_t getAverage(uint64_t avg) const {
uint64_t ts = getTimestamp();
if (_cache.size() > 0) {
if (ts - getOldest().timestamp < avg) {
throw std::out_of_range("Not enough data");
}
else if (ts - getLatest().timestamp > avg && avg>0) {
throw std::out_of_range("Sensor outdated");
}
double sum = 0;
int64_t it, prev;
prev = _cacheIndex;
it = older(prev);
// We compute the weighted average of elements in the cache that fall within the specified window
while ((it != _cacheIndex) && ((ts - _cache[it].timestamp) <= avg)) {
uint64_t deltaT = (_cache[prev].timestamp - _cache[it].timestamp);
sum += ((_cache[it].value + _cache[prev].value) / 2) * deltaT;
//std::cout << "SensorCache::getAverage sum=" << sum << " deltaT=" <<deltaT << " it=(" << it->timestamp << "," <<it->val <<") prev=(" << prev->timestamp << "," << prev->val <<") " << (ts.getRaw() - it->timestamp) << std::endl;
prev = it;
it = older(it);
}
//std::cout << "SensorCache::getAverage (" << prev->timestamp << "," <<prev->val <<") (" << entry.back().timestamp << "," << entry.back().value << ") sum=" << sum << " deltaT=" << entry.back().timestamp - prev->timestamp << std::endl;
// If prev points to the cache head, there was only one element in the aggregation window
if (prev == _cacheIndex || avg==0) {
return getLatest().value;
} else {
return sum/(getLatest().timestamp - _cache[prev].timestamp);
}
}
throw std::invalid_argument("Sensor not found");
}
/**
* @brief Searches for the input timestamp in the cache.
*
* Binary search is used the search for the "t" timestamp within the sensor cache, and its
* index is returned if successful.
*
* @param t Timestamp to be searched, in nanoseconds.
* @param relative Boolean: if True, t is considered as a relative time offset against the most recent reading.
*
* @return The index of the closest sensor reading to t, or -1 if out of bounds.
**/
int64_t searchTimestamp(uint64_t t, bool relative=false) const {
// Cache is empty or has only one element