Commit 0796f92d authored by lu43jih's avatar lu43jih
Browse files

Merging msr plugin with cpu aggregation

parent 1594c617
......@@ -32,7 +32,7 @@ OBJS = dcdbpusher.o \
../common/src/RESTHttpsServer.o
TARGET = dcdbpusher
PLUGINS = procfs pdu sysfs ipmi bacnet snmp gpfsmon tester
PLUGINS = procfs pdu sysfs ipmi bacnet snmp gpfsmon msr tester
ifeq ($(OS),Darwin)
BACNET_PORT = bsd
......@@ -108,5 +108,8 @@ libdcdbplugin_gpfsmon.$(LIBEXT): sensors/gpfsmon/GpfsmonSensorGroup.o sensors/gp
#libdcdbplugin_opa.$(LIBEXT): sensors/opa/OpaSensorGroup.o sensors/opa/OpaConfigurator.o
# $(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lopamgt -libverbs -libumad -lssl
libdcdbplugin_msr.$(LIBEXT): sensors/msr/MSRSensorGroup.o sensors/msr/MSRConfigurator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system
libdcdbplugin_caliper.$(LIBEXT): sensors/caliper/CaliperSensorGroup.o sensors/caliper/CaliperConfigurator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system
......@@ -52,6 +52,9 @@ void MSRConfigurator::sensorGroup(MSRSensorGroup& s, CFG_VAL config) {
for(int cpu: cpus){
s.addCpu(static_cast<unsigned int>(cpu));
}
} else if (boost::iequals(val.first, "htVal")){
unsigned int htVal = std::stoul(val.second.data());
s.setHtAggregation(htVal);
}
}
}
......@@ -208,7 +211,7 @@ void MSRConfigurator::customizeAndStore(SG_Ptr g) {
g->pushBackSensor(s_otherCPUs);
}
}
g->groupInBins();
storeSensorGroup(g);
}
......@@ -45,32 +45,62 @@
#include "Types.h"
#include <sstream>
#include <iomanip>
#include <thread>
MSRSensorGroup::MSRSensorGroup(const std::string& name) :
SensorGroupTemplate(name) {
const uint64_t MSR_MAXIMUM_SIZE = 281474976710656; //2^48
MSRSensorGroup::MSRSensorGroup(const std::string& name) : SensorGroupTemplate(name), _htAggregation(0) {
_total_number_cpus = std::thread::hardware_concurrency();
}
MSRSensorGroup::~MSRSensorGroup() {
}
void MSRSensorGroup::groupInBins(){
for(auto s : _sensors) {
_sensorBins[s->getCpu()].addMsrSensorBin(s);
}
//sanity check: all bins should have the same number of sensors
if (_sensorBins.size() == 0) {
LOG(error) << "Sensorgroup " << _groupName << " failed to sort sensors!";
return;
}
size_t binSensorSize = _sensorBins.front().sensors.size();
for (auto& b : _sensorBins) {
if (b.sensors.size() != binSensorSize) {
LOG(error) << "Sensorgroup " << _groupName << " sensor number missmatch!";
return;
}
}
//sort bins, so that the sensor ordering is equal in every bin (useful in case of hyper-threading aggregation
for (auto& b : _sensorBins) {
std::sort(b.sensors.begin(), b.sensors.end(), [](const S_Ptr& lhs, const S_Ptr& rhs)
{
return lhs->getMetric() < rhs->getMetric();
});
}
_sensorBins.shrink_to_fit();
}
bool MSRSensorGroup::execOnStart() {
for (auto &kv : cpuToFd) {
int cpu = kv.first;
char * path = new char[200];
snprintf(path, 200, "/dev/cpu/%d/msr", cpu);
for (unsigned int cpu = 0; cpu < _sensorBins.size(); ++cpu) {
if(!_sensorBins[cpu].isActive()){
continue;
}
const std::size_t BUF_LEN=200;
char path[BUF_LEN];
snprintf(path, BUF_LEN, "/dev/cpu/%d/msr", cpu);
int handle = open(path, O_RDWR);
if (handle < 0) { // try msr_safe
snprintf(path, 200, "/dev/cpu/%d/msr_safe", cpu);
snprintf(path, BUF_LEN, "/dev/cpu/%d/msr_safe", cpu);
handle = open(path, O_RDWR);
}
if (handle < 0){
LOG(error) << "Can't open msr device " << path;
delete [] path; // TODO do this with RAII
continue;
}
delete [] path; //TODO do this with RAII
cpuToFd[cpu] = handle;
_sensorBins[cpu].setFd(handle);
}
program_fixed();
......@@ -80,9 +110,11 @@ bool MSRSensorGroup::execOnStart() {
void MSRSensorGroup::execOnStop() {
//close file descriptors and leave counters running freely
for (auto &kv: cpuToFd) {
close(kv.second);
kv.second = -1;
for (unsigned int cpu=0; cpu < _sensorBins.size(); ++cpu) {
if(_sensorBins[cpu].isActive()){
close(_sensorBins[cpu].getFd());
_sensorBins[cpu].setFd(-1);
}
}
}
......@@ -94,23 +126,38 @@ void MSRSensorGroup::read() {
for(auto s : _sensors) {
auto ret_val = msr_read(s->getMetric(), &reading.value, s->getCpu());
if(ret_val != -1){
s->storeReading(reading);
s->storeReading(reading, 1, MSR_MAXIMUM_SIZE, !_htAggregation); //1 is no correction...
#ifdef DEBUG
LOG(debug) << _groupName << "::" << s->getName() << " raw reading: \"" << reading.value << "\"";
#endif
}
}
if(_htAggregation){
for(unsigned int cpu = 0; cpu < _htAggregation; ++cpu) {
for(unsigned int m = 0; m < _sensorBins[cpu].sensors.size(); ++m){
reading_t aggregation;
aggregation.value = 0;
aggregation.timestamp = reading.timestamp;
for(unsigned int agg = cpu; agg < _total_number_cpus; agg+=_htAggregation){
aggregation.value += _sensorBins[agg].isActive() ? _sensorBins[agg].sensors[m]->getLatestValue().value : 0;
}
_sensorBins[cpu].sensors[m]->storeReadingGlobal(aggregation);//, 1, MSR_MAXIMUM_SIZE, _htAggregation);
}
}
}
} catch (const std::exception& e) {
LOG(error) << "Sensorgroup" << _groupName << " could not read value: " << e.what();
LOG(error) << "Sensorgroup " << _groupName << " could not read value: " << e.what();
}
}
int32_t MSRSensorGroup::msr_read(uint64_t msr_number, uint64_t * value, unsigned int cpu){
return pread(cpuToFd[cpu], (void *) value, sizeof(uint64_t), msr_number);
return pread(_sensorBins[cpu].getFd(), (void *) value, sizeof(uint64_t), msr_number);
}
int32_t MSRSensorGroup::msr_write(uint64_t msr_number, uint64_t value, unsigned int cpu){
return pwrite(cpuToFd[cpu], (const void *) &value, sizeof(uint64_t), msr_number);
return pwrite(_sensorBins[cpu].getFd(), (const void *) &value, sizeof(uint64_t), msr_number);
}
/**
......@@ -121,26 +168,29 @@ int32_t MSRSensorGroup::msr_write(uint64_t msr_number, uint64_t value, unsigned
*/
void MSRSensorGroup::program_fixed(){
for (auto &kv : cpuToFd) {
for (unsigned int cpu=0; cpu < _sensorBins.size(); ++cpu) {
if(!_sensorBins[cpu].isActive()){ // CPU is not active, so it won't be programmed
continue;
}
// program core counters
//we do not want to interrupt other services already doing measurements with MSRs
//therefore check if any fixed counter is currently enabled
struct FixedEventControlRegister ctrl_reg;
msr_read(IA32_CR_FIXED_CTR_CTRL, &ctrl_reg.value, kv.first);
msr_read(IA32_CR_FIXED_CTR_CTRL, &ctrl_reg.value, cpu);
//are they all enabled?
if (ctrl_reg.fields.os0 && ctrl_reg.fields.usr0 && ctrl_reg.fields.os1
&& ctrl_reg.fields.usr1 && ctrl_reg.fields.os2
&& ctrl_reg.fields.usr2) {
//yes! Free running counters were set by someone else => we don't need to program them, just read them.
LOG(debug) << "CPU" << kv.first << " has free running counter, so there will be no fixed counter programming";
LOG(debug) << "CPU" << cpu << " has free running counter, so there will be no fixed counter programming";
continue;
}
//not all of them (or none) are enabled => we program them again
// disable counters while programming
msr_write(IA32_CR_PERF_GLOBAL_CTRL, 0, kv.first);
msr_write(IA32_CR_PERF_GLOBAL_CTRL, 0, cpu);
ctrl_reg.fields.os0 = 1;
ctrl_reg.fields.usr0 = 1;
......@@ -160,23 +210,28 @@ void MSRSensorGroup::program_fixed(){
ctrl_reg.fields.reserved1 = 0;
// program them
msr_write(IA32_CR_FIXED_CTR_CTRL, ctrl_reg.value, kv.first);
msr_write(IA32_CR_FIXED_CTR_CTRL, ctrl_reg.value, cpu);
// start counting, enable 3 fixed counters (enable also the programmables counters)
uint64_t value = (1ULL << 0) + (1ULL << 1) + (1ULL << 2) + (1ULL << 3) + (1ULL << 32) + (1ULL << 33) + (1ULL << 34);
//uint64_t value = (1ULL << 32) + (1ULL << 33) + (1ULL << 34);
msr_write(IA32_CR_PERF_GLOBAL_CTRL, value, kv.first);
msr_write(IA32_CR_PERF_GLOBAL_CTRL, value, cpu);
}
}
void MSRSensorGroup::addCpu(unsigned int cpu){
cpuToFd[cpu] = -1; /* -1 because no file descriptor has been assigned yet. */
if(cpu + 1 > _sensorBins.size()){
_sensorBins.resize(cpu + 1);
}
_sensorBins[cpu].setActive();
}
std::vector<unsigned> MSRSensorGroup::getCpus() {
std::vector<unsigned> cpus;
for(auto kv : cpuToFd) {
cpus.push_back(kv.first);
for(unsigned int cpu=0; cpu < _sensorBins.size(); ++cpu) {
if(_sensorBins[cpu].isActive()){
cpus.push_back(cpu);
}
}
return cpus;
}
......@@ -184,9 +239,11 @@ std::vector<unsigned> MSRSensorGroup::getCpus() {
void MSRSensorGroup::printGroupConfig(LOG_LEVEL ll) {
std::stringstream ss;
const char* separator = "";
for (auto &kv : cpuToFd) {
ss << separator << kv.first;
separator = ", ";
for (unsigned int cpu=0; cpu < _sensorBins.size(); ++cpu) {
if(_sensorBins[cpu].isActive()){
ss << separator << cpu;
separator = ", ";
}
}
LOG_VAR(ll) << " CPUs: " << ss.str();
......
......@@ -56,11 +56,55 @@ public:
void printGroupConfig(LOG_LEVEL ll) final override;
void setHtAggregation(unsigned int htAggregation) {
_htAggregation = htAggregation;
}
void groupInBins();
private:
void read() final override;
struct msrSensorBin { /**< A bin holds all sensors with same cpu. Therefore
all sensors of a bin belong to the same msr
group */
int fd; /**< File descriptor to read all events in this cpu */
std::vector<S_Ptr> sensors; /**< Sensors in this bin */
void addMsrSensorBin(const S_Ptr& s) {
sensors.push_back(s);
}
void setActive(){
if(fd == -2){
fd = -1;
}
}
bool isActive(){
return (fd == -2 ? false : true);
}
int getFd(){
return fd;
}
void setFd(int filedescriptor){
fd = filedescriptor;
}
msrSensorBin(): fd(-2){
}
};
void program_fixed();
std::map<unsigned int,int> cpuToFd;
unsigned int _htAggregation; /**< Value for hyper-threading aggregation. Zero indicates disabled HT agg. */
int _number_metrics_per_cpu;
unsigned int _total_number_cpus;
std::vector<msrSensorBin> _sensorBins; /**< Bins to sort sensors according to their _cpu. */
int32_t msr_read(uint64_t msr_number, uint64_t *value, unsigned int cpu);
int32_t msr_write(uint64_t msr_number, uint64_t value, unsigned int cpu);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment