Commit 8f952bb9 authored by Micha Mueller's avatar Micha Mueller
Browse files

Move storeReading method for unsigned reading values from Perf plugin to SensorBase

parent c9e3d846
......@@ -19,6 +19,11 @@ typedef struct {
uint64_t timestamp;
} reading_t;
typedef struct {
uint64_t value;
uint64_t timestamp;
} ureading_t;
class SensorBase {
public:
static const size_t QUEUE_MAXLIMIT=1024;
......@@ -113,6 +118,8 @@ public:
void setSinkPath(const std::string& path) { _sinkPath = path; }
void setCacheInterval(unsigned cacheInterval) { _cacheInterval = cacheInterval; }
void setSubsampling(unsigned factor) { _subsamplingFactor = factor; }
void setLastRaw(int64_t raw) { _lastRawValue.value = raw; }
void setLastURaw(uint64_t raw) { _lastRawUValue.value = raw; }
const std::size_t getSizeOfReadingQueue() const { return _readingQueue->read_available(); }
std::size_t popReadingQueue(reading_t *reads, std::size_t max) const { return _readingQueue->pop(reads, max); }
......@@ -136,7 +143,19 @@ public:
}
}
void storeReading(reading_t rawReading, double factor=1.0, unsigned long long maxValue=ULLONG_MAX) {
/**
* Store a reading, in order to get it pushed to the data base eventually.
* Also this methods takes care of other optional reading post-processing,
* e.g. delta computation, subsampling, caching, scaling, etc.
*
* This is the primary storeReading() and should be used whenever possible.
*
* @param rawReading Reading struct with value and timestamp to be stored.
* @param factor Scaling factor, which is applied to the reading value (optional)
* @param maxValue Maximum possible value of the reading; required for the
* delta computation to detect an overflow.
*/
void storeReading(reading_t rawReading, double factor=1.0, unsigned long long maxValue=LLONG_MAX) {
reading_t reading = rawReading;
if( _delta ) {
if (!_firstReading) {
......@@ -182,6 +201,70 @@ public:
_latestValue = reading;
}
/**
* Store an unsigned reading, in order to get it pushed to the data base eventually.
* Also this methods takes care of other optional reading post-processing,
* e.g. delta computation, subsampling, caching, scaling, etc.
*
* This is a variant of the primary storeReading() for monotonically increasing
* sensors reading unsigned 64bit values which may require more than the 63bit
* offered by a signed reading_t. The readings are still stored as signed int64
* in the database, therefore all such sensors should enable storage of deltas!
*
* This variant only adapts the delta computation for ureading_t actually.
* FIXME: Avoid code duplication
*
* @param rawReading Reading struct with (usigned) value and timestamp to be stored.
* @param factor Scaling factor, which is applied to the reading value (optional)
* @param maxValue Maximum possible value of the reading; required for the
* delta computation to detect an overflow.
*/
void storeReading(ureading_t rawReading, double factor=1.0, unsigned long long maxValue=ULLONG_MAX) {
reading_t reading = rawReading;
if( _delta ) {
if (!_firstReading) {
if (rawReading.value < _lastRawUValue.value)
reading.value = (rawReading.value + (maxValue - _lastRawUValue.value)) * factor;
else
reading.value = (rawReading.value - _lastRawUValue.value) * factor;
} else {
_firstReading = false;
_lastRawUValue = rawReading;
return;
}
_lastRawUValue = rawReading;
}
else
reading.value = rawReading.value * factor;
if( _delta )
// If in delta mode, _accumulator acts as a buffer, summing all deltas for the subsampling period
_accumulator.value += reading.value;
else
_accumulator.value = reading.value;
if (_subsamplingIndex++ % _subsamplingFactor == 0) {
_accumulator.timestamp = reading.timestamp;
//TODO: if sensor starts with values of 0, these won't be pushed. This should be fixed
if( !(_skipConstVal && (_accumulator.value == _lastSentValue.value)) ) {
_readingQueue->push(_accumulator);
_lastSentValue = _accumulator;
}
// We reset the accumulator's value for the correct accumulation of deltas
_accumulator.value = 0;
}
if (_sinkFile) {
try {
_sinkFile->seekp(0, std::ios::beg);
*_sinkFile << reading.value << std::endl;
} catch(const std::exception &e) { _sinkFile->close(); _sinkFile.reset(nullptr); }
}
_cache[_cacheIndex] = reading;
_cacheIndex = (_cacheIndex + 1) % _cacheSize;
_latestValue = reading;
}
static std::string formatName(const std::string& name, int cpuID=-1) {return cpuID<0 ? name : "cpu" + std::to_string(cpuID) + "." + name;}
protected:
......@@ -198,8 +281,9 @@ protected:
std::unique_ptr<reading_t[]> _cache;
bool _delta;
bool _firstReading;
reading_t _latestValue;
ureading_t _lastRawUValue;
reading_t _lastRawValue;
reading_t _latestValue;
reading_t _lastSentValue;
reading_t _accumulator;
std::unique_ptr<boost::lockfree::spsc_queue<reading_t>> _readingQueue;
......
......@@ -18,8 +18,7 @@ public:
PerfSensorBase(const std::string& name) :
SensorBase(name),
_type(0),
_config(0),
_lastRawPerf(0) {
_config(0) {
//default delta to true, as perfevent has only monotonic sensors usually
_delta = true;
}
......@@ -27,8 +26,7 @@ public:
PerfSensorBase(const PerfSensorBase& other) :
SensorBase(other),
_type(other._type),
_config(other._config),
_lastRawPerf(other._lastRawPerf) {}
_config(other._config) {}
virtual ~PerfSensorBase() {}
......@@ -36,7 +34,6 @@ public:
SensorBase::operator=(other);
_type = other._type;
_config = other._config;
_lastRawPerf = other._lastRawPerf;
return *this;
}
......@@ -46,48 +43,10 @@ public:
void setType(unsigned type) { _type = type; }
void setConfig(unsigned config) { _config = config; }
void setLastRaw(uint64_t raw) { _lastRawPerf = raw; }
void storeReading(uint64_t value, uint64_t timestamp, double factor=1.0, unsigned long long maxValue=ULLONG_MAX) {
_latestValue.timestamp = timestamp;
if( _delta ) {
if (value < _lastRawPerf)
_latestValue.value = (value + (maxValue - _lastRawPerf)) * factor;
else
_latestValue.value = (value - _lastRawPerf) * factor;
_lastRawPerf = value;
}
else
_latestValue.value = value * factor;
if( _delta )
_accumulator.value += _latestValue.value;
else
_accumulator.value = _latestValue.value;
if (_subsamplingIndex++ % _subsamplingFactor == 0) {
_accumulator.timestamp = _latestValue.timestamp;
if( !(_skipConstVal && (_accumulator.value == _lastSentValue.value)) ) {
_readingQueue->push(_accumulator);
_lastSentValue = _accumulator;
}
_accumulator.value = 0;
}
if (_sinkFile) {
try {
_sinkFile->seekp(0, std::ios::beg);
*_sinkFile << _latestValue.value;
} catch(const std::exception &e) { _sinkFile->close(); _sinkFile.reset(nullptr); }
}
_cache[_cacheIndex] = _latestValue;
_cacheIndex = (_cacheIndex + 1) % _cacheSize;
}
protected:
unsigned int _type;
unsigned int _config;
uint64_t _lastRawPerf;
};
using PerfSBPtr = std::shared_ptr<PerfSensorBase>;
......
......@@ -189,7 +189,8 @@ void PerfSensorGroup::stop() {
}
void PerfSensorGroup::read() {
uint64_t timestamp = getTimestamp();
ureading_t reading;
reading.timestamp = getTimestamp();
struct read_format* rf = (struct read_format*) _buf;
unsigned long long count;
......@@ -225,7 +226,7 @@ void PerfSensorGroup::read() {
}
//iterate over all values returned by ::read()
for (unsigned i = 0; i < rf->nr; i++) {
uint64_t val = rf->values[i].value;
reading.value = rf->values[i].value;
//iterate over all counters and find the one with matching id
for (unsigned j = 0; j < _sensors.size(); j++) {
......@@ -235,10 +236,10 @@ void PerfSensorGroup::read() {
#endif
if(_lastValid){
//storeReading takes care of delta computation and applies correction value on the result
_sensors[j]->storeReading(val, timestamp, correction, PerfSensorBase::MAXCOUNTERVALUE);
_sensors[j]->storeReading(reading, correction, PerfSensorBase::MAXCOUNTERVALUE);
} else {
//Before we can compute correct values again after an invalid reading we have to update the lastRawValue first
_sensors[j]->setLastRaw(val);
_sensors[j]->setLastURaw(reading.value);
}
break;
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment