Commit b583a11f authored by Micha Mueller's avatar Micha Mueller
Browse files

Perf plugin: add functionality to aggregate cpu readings

parent 19417130
......@@ -324,6 +324,7 @@ Explanation of the values specific for the perfevent plugin:
| type | Type of which the counter should be. Each type determines different possible values for the config-field. Possible type-values are described below.
| config | Together with the type-field config determines which performance counter should be read. Possible values and what they measure are listed below.
| cpus | One can define a comma-separated list of cpu numbers (also value ranges can be specified, e.g. 2-4 equals 2,3,4). The hardware counter will then be only opened on the specified cpus.
| htVal | Specify multiplier for CPU aggregation. All CPUs where (CPU-number % htVal) has the same result are aggregated together. Only CPUs which are included in the "cpus" field (or all CPUs if the "cpus" field is not present) are aggregated. Background: To reduce the amount of pushed sensor data, it is possible to aggregate cpu readings. This feature is specifically aimed at processors which are hyper-threading enabled but can also come in handy for other use cases. Only the values pushed via the MQTT-Pusher are aggregated. There still exist sensors for each CPU and they store unaggregated readings in their local caches.
> NOTE     As perfevent counters are usually always monotonic, the delta attribute is by default set to true for all sensors. One has to explicitly set delta to "off" for a sensor to overwrite this behaviour.
......
......@@ -21,12 +21,14 @@ template_single_counter def3 {
mqttpart xx
minValues 3
cpus 1-3
htVal 2
type PERF_TYPE_HARDWARE
config PERF_COUNT_HW_CACHE_REFERENCES
}
group hw_i {
default def1
htVal 2
mqttpart xx
}
......@@ -72,7 +74,7 @@ group sw {
interval 1000
mqttpart xx
minValues 3
cpus 1-4
cpus 1-3
counter sw_pagefaults {
mqttsuffix 30
......
......@@ -50,6 +50,70 @@ public:
LOG_VAR(ll) << leading << " Config: " << _config;
}
/**
* Required for hyper-threading aggregation feature.
*
* FIXME: logic should be kept in sync with storeReading of common sensorbase
*
* Store reading within the sensor, but do not put it in the readingQueue
* so the reading does not get pushed but the caches are still updated.
*/
void storeReadingLocal(ureading_t rawReading, double factor=1.0, unsigned long long maxValue=ULLONG_MAX) {
reading_t reading;
reading.timestamp = rawReading.timestamp;
if( _delta ) {
if (!_firstReading) {
if (rawReading.value < _lastRawUValue.value)
reading.value = (rawReading.value + (maxValue - _lastRawUValue.value)) * factor;
else
reading.value = (rawReading.value - _lastRawUValue.value) * factor;
} else {
_firstReading = false;
_lastRawUValue = rawReading;
return;
}
_lastRawUValue = rawReading;
}
else
reading.value = rawReading.value * factor;
if (_sinkFile) {
try {
_sinkFile->seekp(0, std::ios::beg);
*_sinkFile << reading.value << std::endl;
} catch(const std::exception &e) { _sinkFile->close(); _sinkFile.reset(nullptr); }
}
_cache->store(reading);
_latestValue = reading;
}
/**
* Required for hyper-threading aggregation feature.
*
* FIXME: logic should be kept in sync with storeReading of common sensorbase
*
* Store the reading in the readingQueue so it can get pushed.
*/
void storeReadingGlobal(reading_t reading) {
if( _delta )
// If in delta mode, _accumulator acts as a buffer, summing all deltas for the subsampling period
_accumulator.value += reading.value;
else
_accumulator.value = reading.value;
if (_subsamplingIndex++ % _subsamplingFactor == 0) {
_accumulator.timestamp = reading.timestamp;
//TODO: if sensor starts with values of 0, these won't be pushed. This should be fixed
if( !(_skipConstVal && (_accumulator.value == _lastSentValue.value)) ) {
_readingQueue->push(_accumulator);
_lastSentValue = _accumulator;
}
// We reset the accumulator's value for the correct accumulation of deltas
_accumulator.value = 0;
}
}
protected:
unsigned int _type;
unsigned int _config;
......
......@@ -29,6 +29,8 @@ struct read_format {
PerfSensorGroup::PerfSensorGroup(const std::string& name) :
SensorGroupTemplate(name),
_sensorGroupLeader(false),
_htAggregation(false),
_htAggregator(true),
_cpuId(0),
_group_fd(-1),
_buf(nullptr),
......@@ -36,11 +38,14 @@ PerfSensorGroup::PerfSensorGroup(const std::string& name) :
_latest_time_enabled (0),
_latest_time_running(0),
_lastValid(true),
_latestValueValid(false),
_maxCorrection(20) {}
PerfSensorGroup::PerfSensorGroup(const PerfSensorGroup& other) :
SensorGroupTemplate(other),
_sensorGroupLeader(false),
_htAggregation(other._htAggregation),
_htAggregator(true),
_cpuId(other._cpuId),
_group_fd(-1),
_buf(nullptr),
......@@ -48,6 +53,7 @@ PerfSensorGroup::PerfSensorGroup(const PerfSensorGroup& other) :
_latest_time_enabled(0),
_latest_time_running(0),
_lastValid(true),
_latestValueValid(false),
_maxCorrection(other._maxCorrection) {
}
......@@ -60,6 +66,8 @@ PerfSensorGroup::~PerfSensorGroup() {
PerfSensorGroup& PerfSensorGroup::operator=(const PerfSensorGroup& other) {
SensorGroupTemplate::operator=(other);
_sensorGroupLeader = false;
_htAggregation = other._htAggregation;
_htAggregator = true;
_cpuId = other._cpuId;
_group_fd = -1;
_buf = nullptr;
......@@ -67,6 +75,7 @@ PerfSensorGroup& PerfSensorGroup::operator=(const PerfSensorGroup& other) {
_latest_time_enabled = 0;
_latest_time_running = 0;
_lastValid = true;
_latestValueValid = false;
_maxCorrection = other._maxCorrection;
return *this;
......@@ -156,10 +165,10 @@ void PerfSensorGroup::start() {
_fds.push_back(fd);
LOG(debug) << " " << _groupName << "::" << pc->getName() << " opened with ID " << std::to_string(id);
} else {
LOG(debug) << " " << _groupName << "::" << pc->getName() << " errro obtaining ID: " << strerror(rc);
LOG(debug) << " " << _groupName << "::" << pc->getName() << " error obtaining ID: " << strerror(rc);
}
} else {
LOG(debug) << " " << _groupName << "::" << pc->getName() << " errro opening perf file descriptor: " << strerror(errno);
LOG(debug) << " " << _groupName << "::" << pc->getName() << " error opening perf file descriptor: " << strerror(errno);
}
}
......@@ -222,6 +231,7 @@ void PerfSensorGroup::read() {
if(!validMeasurement){
_lastValid=false;
_latestValueValid=false;
return;
}
//iterate over all values returned by ::read()
......@@ -235,8 +245,13 @@ void PerfSensorGroup::read() {
LOG(debug) << _groupName << "::" << _sensors[j]->getName() << " raw reading: \"" << reading.value << "\"";
#endif
if(_lastValid){
if (_htAggregation) {
_sensors[j]->storeReadingLocal(reading, correction, PerfSensorBase::MAXCOUNTERVALUE);
} else {
//storeReading takes care of delta computation and applies correction value on the result
_sensors[j]->storeReading(reading, correction, PerfSensorBase::MAXCOUNTERVALUE);
}
_latestValueValid = true;
} else {
//Before we can compute correct values again after an invalid reading we have to update the lastRawValue first
_sensors[j]->setLastURaw(reading.value);
......@@ -257,6 +272,12 @@ void PerfSensorGroup::readAsync() {
g->read();
}
}
if(_htAggregation) {
accumulateHT();
for(const auto& g : _fellowSensorGroups) {
g->accumulateHT();
}
}
if (_timer && _keepRunning) {
_timer->expires_at(timestamp2ptime(nextReadingTime()));
......@@ -279,5 +300,39 @@ void PerfSensorGroup::printConfig(LOG_LEVEL ll) {
} else {
LOG_VAR(ll) << " This is a Non-Leader Group";
}
if (_htAggregation) {
if (_htAggregator) {
LOG_VAR(ll) << " This is an Aggregator Group";
} else {
LOG_VAR(ll) << " This is not an Aggregator Group";
}
}
LOG_VAR(ll) << " CPU Id: " << _cpuId;
}
void PerfSensorGroup::accumulateHT() {
if (!_htAggregator) {
//this group gets aggregated
return;
}
for(size_t i = 0; i < _sensors.size(); i++) {
reading_t reading;
if (_latestValueValid) {
reading = _sensors[i]->getLatestValue();
} else {
reading.timestamp = 0;
reading.value = 0;
}
for(const auto& g : _htAggregationGroups) {
if(g->getLatestValueValid()) {
reading.value += g->_sensors[i]->getLatestValue().value;
if (!_latestValueValid) {
reading.timestamp = g->_sensors[i]->getLatestValue().timestamp;
}
}
}
if (reading.timestamp != 0) {
_sensors[i]->storeReadingGlobal(reading);
}
}
}
......@@ -30,9 +30,14 @@ public:
void setCpuId(int cpuId) { _cpuId = cpuId; }
void setSensorGroupLeader(bool sensorGroupLeader) { _sensorGroupLeader = sensorGroupLeader; }
void setHtAggregation(bool htAggregation) { _htAggregation = htAggregation; }
void setHtAggregator(bool htAggregator) { _htAggregator = htAggregator; }
void setMaxCorrection(double maxCorrection){ _maxCorrection = maxCorrection; }
bool getLatestValueValid() { return _latestValueValid; }
void pushBackGroup(PerfSGPtr perfGroup) { _fellowSensorGroups.push_back(perfGroup); }
void aggregateGroup(PerfSGPtr perfGroup) { _htAggregationGroups.push_back(perfGroup); }
void printConfig(LOG_LEVEL ll) override;
......@@ -40,8 +45,14 @@ private:
void read() override;
void readAsync() override;
void accumulateHT();
//are we the sensorGroupLeader?
bool _sensorGroupLeader;
//is hyper-threading aggregation enabled?
bool _htAggregation;
//are we base group that aggregates or do we get aggregated?
bool _htAggregator;
int _cpuId;
int _group_fd;
......@@ -53,10 +64,14 @@ private:
//the sensorGroupLeader stores attached groups in here
std::vector<PerfSGPtr> _fellowSensorGroups;
//if hyper-threading aggregation is enabled, other groups which should be
//accumulated by this group are stored here
std::vector<PerfSGPtr> _htAggregationGroups;
uint64_t _latest_time_enabled;
uint64_t _latest_time_running;
bool _lastValid;
bool _latestValueValid;
double _maxCorrection;
......
......@@ -135,6 +135,14 @@ bool PerfeventConfigurator::readConfig(std::string cfgPath) {
_templateCpus.insert(templateCpuMap_t::value_type(val.second.data(), cpuVec));
}
//check if hyper-thread aggregation value is given for this template group
boost::optional<boost::property_tree::iptree&> ht = val.second.get_child_optional("htVal");
if(ht) {
unsigned htVal = std::stoul(ht.get().data());
_templateHTs.insert(templateHTMap_t::value_type(val.second.data(), htVal));
LOG(debug) << "HT value of " << htVal << " given for \"" << val.second.data() << "\"";
}
auto ret = _templateSensorGroups.insert(std::pair<std::string, PerfSensorGroup*>(val.second.data(), group));
if(!ret.second) {
LOG(warning) << "Template " << _groupName << " " << val.second.data() << " already exists! Omitting...";
......@@ -159,6 +167,14 @@ bool PerfeventConfigurator::readConfig(std::string cfgPath) {
_templateCpus.insert(templateCpuMap_t::value_type(val.second.data(), cpuVec));
}
//check if hyper-thread aggregation value is given for this template group
boost::optional<boost::property_tree::iptree&> ht = val.second.get_child_optional("htVal");
if(ht) {
unsigned htVal = std::stoul(ht.get().data());
LOG(debug) << "HT value of " << htVal << " given for \"" << val.second.data() << "\"";
_templateHTs.insert(templateHTMap_t::value_type(val.second.data(), htVal));
}
//group which consists of only one sensor
std::shared_ptr<PerfSensorBase> sensor = std::make_shared<PerfSensorBase>(val.second.data());
if (readSensorBase(*sensor, val.second)) {
......@@ -262,12 +278,41 @@ void PerfeventConfigurator::customizeAndStore(PerfSensorGroup& group, CFG_VAL cf
}
}
//check if hyper-threading aggregation is enabled and if so, what the htVal is
unsigned htVal = 0;
//check if htVal is given; if so, overwrite default htVal of zero (no ht-aggregation)
boost::optional<boost::property_tree::iptree&> ht = cfg.get_child_optional("htVal");
if (ht) { //htVal given
htVal = std::stoul(ht.get().data());
} else { //htVal not given, but perhaps template counter has one
boost::optional<boost::property_tree::iptree&> def = cfg.get_child_optional("default");
if (def) {
templateHTMap_t::iterator itH = _templateHTs.find(def.get().data());
if(itH != _templateHTs.end()) {
htVal = itH->second;
}
}
}
//enable htAggregation if htVal is found
if (htVal != 0) {
group.setHtAggregation(true);
}
if (group.getMqttPart().size() == 0) {
LOG(warning) << _groupName << " \"" << cfg.data() << "\" has no mqttPart entry set. This is required as a place holder for the CPU id!";
}
//remember group to cpu mapping to set up hyper-threading aggregation properly later
std::vector<PerfSGPtr> groups;
for (int i = 0; i < get_nprocs(); i++) {
groups.push_back(nullptr);
}
if(cpuSet.empty()) {
return;
}
//customize perfCounterGroup for every CPU
if(!cpuSet.empty()) {
//first create groupLeader
std::set<int>::iterator it = cpuSet.begin();
PerfSGPtr leaderSG = std::make_shared<PerfSensorGroup>(group);
......@@ -279,6 +324,7 @@ void PerfeventConfigurator::customizeAndStore(PerfSensorGroup& group, CFG_VAL cf
for(const auto& s : leaderSG->getSensors()) s->setName(s->getName(), *it);
groups[*it] = leaderSG;
storeSensorGroup(leaderSG);
it++;
......@@ -292,8 +338,28 @@ void PerfeventConfigurator::customizeAndStore(PerfSensorGroup& group, CFG_VAL cf
for(const auto& s : perfSG->getSensors()) s->setName(s->getName(), *it);
groups[*it] = perfSG;
storeSensorGroup(perfSG);
leaderSG->pushBackGroup(perfSG);
}
if (htVal == 0) {
return;
}
//set up hyper-threading aggregation
for (it = cpuSet.begin(); it != cpuSet.end(); ++it) {
int mod = *it % htVal;
for (int aggregator = mod; aggregator < get_nprocs(); aggregator+=htVal) {
if (aggregator == *it) {
//we reached ourself. Seems like we must be an aggregator.
break;
}
if (groups[aggregator] != nullptr) {
groups[aggregator]->aggregateGroup(groups[*it]);
groups[*it]->setHtAggregator(false);
break;
}
}
}
}
......@@ -16,6 +16,7 @@
class PerfeventConfigurator : public ConfiguratorTemplate<PerfSensorBase, PerfSensorGroup> {
typedef std::map<std::string, std::set<int>> templateCpuMap_t;
typedef std::map<std::string, unsigned> templateHTMap_t;
typedef std::map<std::string, unsigned int> enumMap_t;
public:
......@@ -32,7 +33,8 @@ private:
/**
* Takes a PerfSensorGroup and duplicates it for every CPU specified.
* Assigns one CPU value to every newly constructed group and stores them
* afterwards.
* afterwards. Also sets everything for hyper-threading aggregation if
* enabled.
*
* @param group PerfSensorGroup which is to be customized for every CPU
* @param cfg Config tree for the group. Required to read in possibly
......@@ -41,6 +43,7 @@ private:
void customizeAndStore(PerfSensorGroup& group, CFG_VAL cfg);
templateCpuMap_t _templateCpus;
templateHTMap_t _templateHTs;
enumMap_t _enumType;
enumMap_t _enumConfig;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment