Commit c7f0cdbf authored by Micha Mueller's avatar Micha Mueller
Browse files

Change perf-plugin implementation: Instead of duplicating sensorGroups per CPU...

Change perf-plugin implementation: Instead of duplicating sensorGroups per CPU only one group is used with duplicated sensors
parent 2116bfbb
...@@ -320,11 +320,11 @@ Explanation of the values specific for the perfevent plugin: ...@@ -320,11 +320,11 @@ Explanation of the values specific for the perfevent plugin:
| Value | Explanation | | Value | Explanation |
|:----- |:----------- | |:----- |:----------- |
| mqttpart | In the context of the perfevent plugin the mqttpart of a group is only used as a place holder for the CPU id. It will be replaced by a string of the same width that holds the CPU id for the event.
| type | Type of which the counter should be. Each type determines different possible values for the config-field. Possible type-values are described below. | type | Type of which the counter should be. Each type determines different possible values for the config-field. Possible type-values are described below.
| config | Together with the type-field config determines which performance counter should be read. Possible values and what they measure are listed below. | config | Together with the type-field config determines which performance counter should be read. Possible values and what they measure are listed below.
| cpus | One can define a comma-separated list of cpu numbers (also value ranges can be specified, e.g. 2-4 equals 2,3,4). The hardware counter will then be only opened on the specified cpus. | cpus | One can define a comma-separated list of cpu numbers (also value ranges can be specified, e.g. 2-4 equals 2,3,4). The hardware counter will then be only opened on the specified cpus.
| htVal | Specify multiplier for CPU aggregation. All CPUs where (CPU-number % htVal) has the same result are aggregated together. Only CPUs which are included in the "cpus" field (or all CPUs if the "cpus" field is not present) are aggregated. Background: To reduce the amount of pushed sensor data, it is possible to aggregate cpu readings. This feature is specifically aimed at processors which are hyper-threading enabled but can also come in handy for other use cases. Only the values pushed via the MQTT-Pusher are aggregated. There still exist sensors for each CPU and they store unaggregated readings in their local caches. | htVal | Specify multiplier for CPU aggregation. All CPUs where (CPU-number % htVal) has the same result are aggregated together. Only CPUs which are included in the "cpus" field (or all CPUs if the "cpus" field is not present) are aggregated. Background: To reduce the amount of pushed sensor data, it is possible to aggregate cpu readings. This feature is specifically aimed at processors which are hyper-threading enabled but can also come in handy for other use cases. Only the values pushed via the MQTT-Pusher are aggregated. There still exist sensors for each CPU and they store unaggregated readings in their local caches.
| mqttsufffix | In the context of the perfevent plugin the mqttpart requires a place holder ('x') for the CPU id. Sensors will be duplicated in order to open hardware counter for each CPU. Therefore the mqttsuffix should contain a placeholder consisting of 'x' to be replaced by the CPU id and make the suffix unique.
> NOTE     As perfevent counters are usually always monotonic, the delta attribute is by default set to true for all sensors. One has to explicitly set delta to "off" for a sensor to overwrite this behaviour. > NOTE     As perfevent counters are usually always monotonic, the delta attribute is by default set to true for all sensors. One has to explicitly set delta to "off" for a sensor to overwrite this behaviour.
...@@ -532,7 +532,7 @@ Explanation of the values specific for the ProcFS plugin: ...@@ -532,7 +532,7 @@ Explanation of the values specific for the ProcFS plugin:
|:----- |:----------- | |:----- |:----------- |
| type | The type of the file parsed by the sensor group. Can be either "vmstat", "meminfo", "procstat" or "sar" | type | The type of the file parsed by the sensor group. Can be either "vmstat", "meminfo", "procstat" or "sar"
| path | Path of the file, if different from the default path in the /proc filesystem | path | Path of the file, if different from the default path in the /proc filesystem
| mqttPart | The mqttPart works similarly as in the Perf-event plugin. For sensors associated to metrics that are core-specific (e.g. some of those in /proc/stat) the mqttPart is replaced with the CPU id. For all other metrics that are system-wide, the mqttPart is used as it is. | mqttPart | The mqttPart can be used a placeholder. For sensors associated to metrics that are core-specific (e.g. some of those in /proc/stat) the mqttPart is replaced with the CPU id. For all other metrics that are system-wide, the mqttPart is used as it is.
| mqttStart | Base MQTT suffix that is automatically incremented to generate topics for sensors associated to metrics in the same file. Note that this parameter is used only if automatic MQTT topic generation is enabled, when no sensors are explicitly defined. | mqttStart | Base MQTT suffix that is automatically incremented to generate topics for sensors associated to metrics in the same file. Note that this parameter is used only if automatic MQTT topic generation is enabled, when no sensors are explicitly defined.
| cpus | Defines the set of CPU cores for which metrics must be collected. Only affects extraction of core-specific metrics (e.g. those in /proc/stat), whereas system-level metrics are acquired regardless of this setting. If no CPU cores set is defined, metrics for all available CPU cores will be collected. This parameter follows the same syntax as in the Perf-event plugin. | cpus | Defines the set of CPU cores for which metrics must be collected. Only affects extraction of core-specific metrics (e.g. those in /proc/stat), whereas system-level metrics are acquired regardless of this setting. If no CPU cores set is defined, metrics for all available CPU cores will be collected. This parameter follows the same syntax as in the Perf-event plugin.
......
template_group def1 { template_group def1 {
interval 5000 interval 5000
mqttpart xx mqttpart 00
minValues 5 minValues 5
counter hw_instructions { counter hw_instructions {
type PERF_TYPE_HARDWARE type PERF_TYPE_HARDWARE
config PERF_COUNT_HW_INSTRUCTIONS config PERF_COUNT_HW_INSTRUCTIONS
mqttsuffix 22 mqttsuffix 0x
} }
} }
template_group def2 { template_group def2 {
interval 2000 interval 2000
cpus 1,2 cpus 1,2
mqttpart xx mqttpart 01
} }
template_single_counter def3 { template_single_counter def3 {
interval 2000 interval 2000
mqttpart xx mqttpart 10
minValues 3 minValues 3
cpus 1-3 cpus 1-3
htVal 2 htVal 2
type PERF_TYPE_HARDWARE type PERF_TYPE_HARDWARE
config PERF_COUNT_HW_CACHE_REFERENCES config PERF_COUNT_HW_CACHE_REFERENCES
} }
group hw_i { group hw_i {
default def1 default def1
htVal 2 htVal 2
mqttpart xx mqttpart 02
} }
group hw_bi { group hw_bi {
default def2 default def2
mqttpart xx mqttpart 03
counter hw_branch_instructions { counter hw_branch_instructions {
mqttsuffix 24 mqttsuffix 1x
type PERF_TYPE_HARDWARE type PERF_TYPE_HARDWARE
config PERF_COUNT_HW_BRANCH_INSTRUCTIONS config PERF_COUNT_HW_BRANCH_INSTRUCTIONS
} }
...@@ -46,50 +46,50 @@ group hw_bi { ...@@ -46,50 +46,50 @@ group hw_bi {
group hw_bm { group hw_bm {
default def2 default def2
cpus 0,2-3 cpus 0,2-3
mqttpart xx mqttpart 04
counter hw_branch_misses { counter hw_branch_misses {
mqttsuffix 2c mqttsuffix 2x
type PERF_TYPE_HARDWARE type PERF_TYPE_HARDWARE
config PERF_COUNT_HW_BRANCH_MISSES config PERF_COUNT_HW_BRANCH_MISSES
} }
} }
single_counter cacheReferences { single_counter cacheReferences {
default def3 default def3
cpus 2-3 cpus 2-3
mqttsuffix 10 mqttsuffix 3x
} }
single_counter cacheMisses { single_counter cacheMisses {
interval 2000 interval 2000
mqttpart xxxx mqttpart 0A
minValues 3 minValues 3
cpus 2-3 cpus 2-3
type PERF_TYPE_HARDWARE type PERF_TYPE_HARDWARE
config PERF_COUNT_HW_CACHE_MISSES config PERF_COUNT_HW_CACHE_MISSES
mqttsuffix xx
} }
group sw { group sw {
interval 1000 interval 1000
mqttpart xx
minValues 3 minValues 3
cpus 1-3 cpus 1-3
counter sw_pagefaults { counter sw_pagefaults {
mqttsuffix 30 mqttsuffix 06xx
type PERF_TYPE_SOFTWARE type PERF_TYPE_SOFTWARE
config PERF_COUNT_SW_PAGE_FAULTS config PERF_COUNT_SW_PAGE_FAULTS
} }
counter sw_context_switches { counter sw_context_switches {
mqttsuffix 34 mqttsuffix 07xx
type PERF_TYPE_SOFTWARE type PERF_TYPE_SOFTWARE
config PERF_COUNT_SW_CONTEXT_SWITCHES config PERF_COUNT_SW_CONTEXT_SWITCHES
} }
counter sw_cpu_migrations { counter sw_cpu_migrations {
mqttsuffix 38 mqttsuffix 08xx
type PERF_TYPE_SOFTWARE type PERF_TYPE_SOFTWARE
config PERF_COUNT_SW_CPU_MIGRATIONS config PERF_COUNT_SW_CPU_MIGRATIONS
} }
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#include "SensorGroupTemplate.h" #include "SensorGroupTemplate.h"
#include "version.h" #include "version.h"
#include <algorithm>
#include <iostream> #include <iostream>
#include <sstream> #include <sstream>
#include <iomanip> #include <iomanip>
...@@ -753,9 +754,13 @@ protected: ...@@ -753,9 +754,13 @@ protected:
} }
/** /**
* Formats a numerical CPU core ID into a hex string of specified length. * Replaces occurences of 'x' characters by a hex representation of a
* numerical CPU core ID. If no 'x' characters are found only the CPU hex
* string with specified width is returned.
* *
* Example: a mqttPart="xx" and val=11 produce "0B" as output. * Examples: mqttPart= "xx", val=11 --> return "0B"
* mqttPart="A3xx", val=11 --> return "A30B"
* mqttPart="A3YY", val=11 --> return "000B"
* *
* @param mqttPart: a template MQTT string, defines the length of the final string * @param mqttPart: a template MQTT string, defines the length of the final string
* @param val: the value of the CPU core ID * @param val: the value of the CPU core ID
...@@ -765,8 +770,23 @@ protected: ...@@ -765,8 +770,23 @@ protected:
*/ */
const std::string formatMqttCPU(const std::string& mqttPart, unsigned int val) { const std::string formatMqttCPU(const std::string& mqttPart, unsigned int val) {
std::stringstream stream; std::stringstream stream;
stream << std::setfill ('0') << std::setw(mqttPart.length()) << std::uppercase << std::hex << val;
return stream.str(); size_t n = std::count(mqttPart.begin(), mqttPart.end(), 'x');
if (n==0) {
stream << std::setfill ('0') << std::setw(mqttPart.length()) << std::uppercase << std::hex << val;
return stream.str();
} else {
std::string result(mqttPart);
stream << std::setfill ('0') << std::setw(n) << std::uppercase << std::hex << val;
std::string replacement = stream.str();
std::string pattern(n, 'x');
size_t index = result.find(pattern, index);
result.replace(index, n, replacement);
return result;
}
} }
/** /**
......
...@@ -18,7 +18,10 @@ public: ...@@ -18,7 +18,10 @@ public:
PerfSensorBase(const std::string& name) : PerfSensorBase(const std::string& name) :
SensorBase(name), SensorBase(name),
_type(0), _type(0),
_config(0) { _config(0),
_cpu(-1),
_fd(-1),
_id(0) {
//default delta to true, as perfevent has only monotonic sensors usually //default delta to true, as perfevent has only monotonic sensors usually
_delta = true; _delta = true;
} }
...@@ -26,7 +29,10 @@ public: ...@@ -26,7 +29,10 @@ public:
PerfSensorBase(const PerfSensorBase& other) : PerfSensorBase(const PerfSensorBase& other) :
SensorBase(other), SensorBase(other),
_type(other._type), _type(other._type),
_config(other._config) {} _config(other._config),
_cpu(other._cpu),
_fd(-1),
_id(0) {}
virtual ~PerfSensorBase() {} virtual ~PerfSensorBase() {}
...@@ -34,91 +40,38 @@ public: ...@@ -34,91 +40,38 @@ public:
SensorBase::operator=(other); SensorBase::operator=(other);
_type = other._type; _type = other._type;
_config = other._config; _config = other._config;
_cpu = other._cpu;
_fd = -1;
_id = 0;
return *this; return *this;
} }
unsigned getType() const { return _type; } unsigned getType() const { return _type; }
unsigned getConfig() const { return _config; } unsigned getConfig() const { return _config; }
int getCpu() const { return _cpu; }
int getFd() const { return _fd; }
uint64_t getId() const { return _id; }
void setType(unsigned type) { _type = type; } void setType(unsigned type) { _type = type; }
void setConfig(unsigned config) { _config = config; } void setConfig(unsigned config) { _config = config; }
void setCpu(int cpu) { _cpu = cpu; }
void setFd(int fd) { _fd = fd; }
void setId(uint64_t id) { _id = id; }
void printConfig(LOG_LEVEL ll, LOGGER& lg, unsigned leadingSpaces=16) { void printConfig(LOG_LEVEL ll, LOGGER& lg, unsigned leadingSpaces=16) {
std::string leading(leadingSpaces, ' '); std::string leading(leadingSpaces, ' ');
LOG_VAR(ll) << leading << " Type: " << _type; LOG_VAR(ll) << leading << " Type: " << _type;
LOG_VAR(ll) << leading << " Config: " << _config; LOG_VAR(ll) << leading << " Config: " << _config;
} LOG_VAR(ll) << leading << " CPU: " << _cpu;
/**
* Required for hyper-threading aggregation feature.
*
* FIXME: logic should be kept in sync with storeReading of common sensorbase
*
* Store reading within the sensor, but do not put it in the readingQueue
* so the reading does not get pushed but the caches are still updated.
*/
void storeReadingLocal(ureading_t rawReading, double factor=1.0, unsigned long long maxValue=ULLONG_MAX) {
reading_t reading;
reading.timestamp = rawReading.timestamp;
if( _delta ) {
if (!_firstReading) {
if (rawReading.value < _lastRawUValue.value)
reading.value = (rawReading.value + (maxValue - _lastRawUValue.value)) * factor;
else
reading.value = (rawReading.value - _lastRawUValue.value) * factor;
} else {
_firstReading = false;
_lastRawUValue = rawReading;
return;
}
_lastRawUValue = rawReading;
}
else
reading.value = rawReading.value * factor;
if (_sinkFile) {
try {
_sinkFile->seekp(0, std::ios::beg);
*_sinkFile << reading.value << std::endl;
} catch(const std::exception &e) { _sinkFile->close(); _sinkFile.reset(nullptr); }
}
_cache->store(reading);
_latestValue = reading;
}
/**
* Required for hyper-threading aggregation feature.
*
* FIXME: logic should be kept in sync with storeReading of common sensorbase
*
* Store the reading in the readingQueue so it can get pushed.
*/
void storeReadingGlobal(reading_t reading) {
if( _delta )
// If in delta mode, _accumulator acts as a buffer, summing all deltas for the subsampling period
_accumulator.value += reading.value;
else
_accumulator.value = reading.value;
if (_subsamplingIndex++ % _subsamplingFactor == 0) {
_accumulator.timestamp = reading.timestamp;
//TODO: if sensor starts with values of 0, these won't be pushed. This should be fixed
if( !(_skipConstVal && (_accumulator.value == _lastSentValue.value)) ) {
_readingQueue->push(_accumulator);
_lastSentValue = _accumulator;
}
// We reset the accumulator's value for the correct accumulation of deltas
_accumulator.value = 0;
}
} }
protected: protected:
unsigned int _type; unsigned int _type;
unsigned int _config; unsigned int _config;
int _cpu;
int _fd;
uint64_t _id;
}; };
using PerfSBPtr = std::shared_ptr<PerfSensorBase>;
#endif /* PERFEVENT_PERFSENSORBASE_H_ */ #endif /* PERFEVENT_PERFSENSORBASE_H_ */
...@@ -7,13 +7,18 @@ ...@@ -7,13 +7,18 @@
#include "PerfSensorGroup.h" #include "PerfSensorGroup.h"
#include <algorithm>
#include <functional>
#include <limits.h>
#include <unistd.h> #include <unistd.h>
#include <sys/ioctl.h>
#include <asm/unistd.h>
#include <linux/perf_event.h> #include <linux/perf_event.h>
#include <linux/hw_breakpoint.h> #include <linux/hw_breakpoint.h>
#include <asm/unistd.h>
#include <functional> #include <sys/ioctl.h>
#include <limits.h> #include <sys/sysinfo.h>
//the read group data will have this format //the read group data will have this format
struct read_format { struct read_format {
...@@ -28,33 +33,18 @@ struct read_format { ...@@ -28,33 +33,18 @@ struct read_format {
PerfSensorGroup::PerfSensorGroup(const std::string& name) : PerfSensorGroup::PerfSensorGroup(const std::string& name) :
SensorGroupTemplate(name), SensorGroupTemplate(name),
_sensorGroupLeader(false), _htAggregation(0),
_htAggregation(false), _maxCorrection(20),
_htAggregator(true),
_cpuId(0),
_group_fd(-1),
_buf(nullptr), _buf(nullptr),
_bufSize(0), _bufSize(0) {
_latest_time_enabled (0), }
_latest_time_running(0),
_lastValid(true),
_latestValueValid(false),
_maxCorrection(20) {}
PerfSensorGroup::PerfSensorGroup(const PerfSensorGroup& other) : PerfSensorGroup::PerfSensorGroup(const PerfSensorGroup& other) :
SensorGroupTemplate(other), SensorGroupTemplate(other),
_sensorGroupLeader(false),
_htAggregation(other._htAggregation), _htAggregation(other._htAggregation),
_htAggregator(true), _maxCorrection(other._maxCorrection),
_cpuId(other._cpuId),
_group_fd(-1),
_buf(nullptr), _buf(nullptr),
_bufSize(0), _bufSize(0) {
_latest_time_enabled(0),
_latest_time_running(0),
_lastValid(true),
_latestValueValid(false),
_maxCorrection(other._maxCorrection) {
} }
PerfSensorGroup::~PerfSensorGroup() { PerfSensorGroup::~PerfSensorGroup() {
...@@ -64,26 +54,68 @@ PerfSensorGroup::~PerfSensorGroup() { ...@@ -64,26 +54,68 @@ PerfSensorGroup::~PerfSensorGroup() {
} }
PerfSensorGroup& PerfSensorGroup::operator=(const PerfSensorGroup& other) { PerfSensorGroup& PerfSensorGroup::operator=(const PerfSensorGroup& other) {
SensorGroupTemplate::operator=(other); SensorGroupTemplate::operator=(other);
_sensorGroupLeader = false; _htAggregation = other._htAggregation;
_htAggregation = other._htAggregation; _maxCorrection = other._maxCorrection;
_htAggregator = true; _buf = nullptr;
_cpuId = other._cpuId; _bufSize = 0;
_group_fd = -1;
_buf = nullptr; return *this;
_bufSize = 0;
_latest_time_enabled = 0;
_latest_time_running = 0;
_lastValid = true;
_latestValueValid = false;
_maxCorrection = other._maxCorrection;
return *this;
} }
void PerfSensorGroup::init(boost::asio::io_service& io) { void PerfSensorGroup::init(boost::asio::io_service& io) {
SensorGroupTemplate::init(io); SensorGroupTemplate::init(io);
//clear vectors in case this method gets called multiple times, although it shouldn't
_sensorBins.clear();
_cpuBinMapping.clear();
for (int i = 0; i < get_nprocs(); i++) {
_cpuBinMapping.push_back(-1);
}
//Sort sensors into bins. Every bin equals an perf-event group
for (auto s : _sensors) {
int cpu = s->getCpu();
int bin = _cpuBinMapping[cpu];
if (bin != -1) {
_sensorBins[bin].sensors.push_back(s);
} else {
sensorBin bin(s, cpu);
_sensorBins.push_back(bin);
_cpuBinMapping[cpu] = _sensorBins.size() - 1;
}
}
//sanity check: all bins should have the same number of sensors
if (_sensorBins.size() == 0) {
LOG(error) << "Sensorgroup " << _groupName << " failed to sort sensors!";
return;
}
size_t binSensorSize = _sensorBins.front().sensors.size();
for (auto& b : _sensorBins) {
if (b.sensors.size() != binSensorSize) {
LOG(error) << "Sensorgroup " << _groupName << " sensor number missmatch!";
return;
}
}
//sort bins, so that the sensor ordering is equal in every bin (useful in case of hyper-threading aggregation
for (auto& b : _sensorBins) {
std::sort(b.sensors.begin(), b.sensors.end(), [](const S_Ptr& lhs, const S_Ptr& rhs)
{
if (lhs->getType() == rhs->getType()) {
return lhs->getConfig() < rhs->getConfig();
} else {
return lhs->getType() < rhs->getType();
}
});
}
_sensorBins.shrink_to_fit();
_cpuBinMapping.shrink_to_fit();
/* Allocate buffer to read in later. Reading struct has the following format: /* Allocate buffer to read in later. Reading struct has the following format:
* *
* struct read_format { * struct read_format {
...@@ -98,7 +130,7 @@ void PerfSensorGroup::init(boost::asio::io_service& io) { ...@@ -98,7 +130,7 @@ void PerfSensorGroup::init(boost::asio::io_service& io) {
* *
* Therefore we require 16 byte per sensor plus an additional 8*3 byte * Therefore we require 16 byte per sensor plus an additional 8*3 byte
*/ */
std::size_t bufSize = _sensors.size() * 16 + 24; std::size_t bufSize = binSensorSize * 16 + 24;
if (!_buf) { if (!_buf) {
_buf = new char[bufSize]; _buf = new char[bufSize];
_bufSize = bufSize; _bufSize = bufSize;
...@@ -108,9 +140,26 @@ void PerfSensorGroup::init(boost::asio::io_service& io) { ...@@ -108,9 +140,26 @@ void PerfSensorGroup::init(boost::asio::io_service& io) {
_bufSize = bufSize; _bufSize = bufSize;
} }
if(!_sensorGroupLeader) {