Commit 5c1d54c2 authored by lu43jih's avatar lu43jih
Browse files

Deleted plugins which were meant to be deleted. msr plugin may be needed later...

Deleted plugins which were meant to be deleted. msr plugin may be needed later on, but in previous commit it is anyway not usable
parent 82e0f303
/*
* FixedConfigurator.cpp
*
* Created on: 13.08.2018
* Author: lu43jih
*/
#include "FixedConfigurator.h"
#include <boost/foreach.hpp>
#include <boost/property_tree/info_parser.hpp>
#include <boost/algorithm/string.hpp>
#include "FixedMultiSensor.h"
#include "../sensorutils/utilities.h"
#include "../sensorutils/mqttutils.h"
FixedConfigurator::FixedConfigurator() {
}
FixedConfigurator::~FixedConfigurator() {
}
bool FixedConfigurator::readConfig(std::string cfgPath) {
boost::property_tree::iptree cfg;
boost::property_tree::read_info(cfgPath, cfg);
//read global variables (if present overwrite those from global.conf)
//currently only overwriting of mqttPrefix is supported
boost::optional<boost::property_tree::iptree&> globalVals = cfg.get_child_optional("global");
if (globalVals) {
BOOST_FOREACH(boost::property_tree::iptree::value_type &global, cfg.get_child("global")) {
if(boost::iequals(global.first, "mqttprefix")) {
_mqttPrefix = global.second.data();
// if (_mqttPrefix[_mqttPrefix.length()-1] != '/') {
// _mqttPrefix.append("/");
// }
LOG(debug) << " Using own MQTT-Prefix " << _mqttPrefix;
} else {
LOG(warning) << " Value \"" << global.first << "\" not recognized. Omitting...";
}
}
}
//read one sensor at a time
BOOST_FOREACH(boost::property_tree::iptree::value_type &sensor, cfg.get_child("sensors")) {
if (boost::iequals(sensor.first, "sensor")) {
LOG(debug) << "Sensor \"" << sensor.second.data() << "\"";
if (!sensor.second.empty()) {
auto fixedms = make_unique<FixedMultiSensor>(sensor.second.data(), _mqttPrefix);
//read remaining values
if(readSensor(fixedms, sensor.second)){
_multiSensors.push_back(std::move(fixedms));
}
}
}
}
return true;
}
bool FixedConfigurator::readSensor(std::unique_ptr<FixedMultiSensor>& sensor,
boost::property_tree::iptree& config) {
std::string unparsedMetricList;
std::string idlistfile;
BOOST_FOREACH(boost::property_tree::iptree::value_type &s, config) {
if (boost::iequals(s.first, "interval")) {
sensor->setInterval(std::stoull(s.second.data()));
} else if (boost::iequals(s.first, "minValues")) {
sensor->setMinValues(std::stoull(s.second.data()));
} else if (boost::iequals(s.first, "metricList")) {
unparsedMetricList = s.second.data();
} else if (boost::iequals(s.first, "idListFile")) {
idlistfile = s.second.data();
} else if (boost::iequals(s.first, "sleepTime")) {
sensor->setSleepTime(std::stoull(s.second.data()));
} else if (boost::iequals(s.first, "publishSize")){
sensor->setPublishSize(std::stoul(s.second.data()));
} else {
LOG(warning) << " Value \"" << s.first << "\" not recognized. Omitting...";
}
}
std::map<mqtt_id_t, metric_internalid> mqttToIds;
try {
getMqttToIds(mqttToIds,unparsedMetricList, idlistfile);
} catch(const std::ifstream::failure& e) {
LOG(error) << "FixedConfigurator:Exception in getMqttToIds=" << e.what();
}
for(auto &kv : mqttToIds){
sensor->setMqttSuffix(kv.first, kv.second.internalid);
}
LOG(debug) << " Name : " << sensor->getName();
LOG(debug) << " Interval: " << sensor->getInterval();
LOG(debug) << " minValues:" << sensor->getMinValues();
return true;
}
/*
* FixedConfigurator.h
*
* Created on: 13.08.2018
* Author: lu43jih
*/
#ifndef SRC_SENSORS_MSR_FIXEDCONFIGURATOR_H_
#define SRC_SENSORS_MSR_FIXEDCONFIGURATOR_H_
#include <boost/property_tree/ptree_fwd.hpp>
#include <string>
#include "FixedMultiSensor.h"
#include "../../Configurator.h"
class FixedConfigurator: public Configurator{
public:
FixedConfigurator();
virtual ~FixedConfigurator();
/**
* Read the configuration for perfpusher.
* @param cfgPath Path + name of the configuration file
* @return true on success, false otherwise
*/
bool readConfig(std::string cfgPath) override;
bool readSensor(std::unique_ptr<FixedMultiSensor>& fixedms, boost::property_tree::iptree& config);
};
extern "C" Configurator* create() {
return new FixedConfigurator;
}
extern "C" void destroy(Configurator* c) {
delete c;
}
#endif /* SRC_SENSORS_MSR_FIXEDCONFIGURATOR_H_ */
/*
* PCM.cpp
*
* Created on: Jun 28, 2018
* Author: carla
*/
#include "utils.h"
#include <string.h>
#include <queue>
#include <sys/time.h>
#include <utility>
#include "timestamp.h"
#include "utils.h"
#include <stdio.h>
#include "../sensorutils/utilities.h"
#include "../sensorutils/mqttutils.h"
#include "../sensorutils/grouppublishing.h"
#include "FixedMultiSensor.h"
/************************* PCM class **********************************/
// a secure (but partial) alternative for sscanf
// see example usage in pcm-core.cpp
using pcm_sscanf = std::istringstream;
FixedMultiSensor::FixedMultiSensor(const std::string& name, std::string& mqttPrefix) :
MultiSensor(name), _cpu_stepping(-1), _threads_per_core(0), _num_cores(0), _num_sockets(0), _num_phys_cores_per_socket(0),
_num_online_cores(0), _num_online_sockets(0), _core_fixed_counter_num_max(0),
_core_fixed_counter_width(0), _perfmon_version(0), _mqttPrefix(mqttPrefix), ableToMeasure(true), _sleep_time(0), _state(MEASURE_STATE::MEASUREMENT),
_publishSize(10), _begin_initialized(false){
detectModel();
if(!discoverSystemTopology()){
ableToMeasure = false;
return;
}
if(!initMSR()){
ableToMeasure = false;
return;
}
}
FixedMultiSensor::~FixedMultiSensor() {
MSR.clear();
}
void FixedMultiSensor::detectModel() {
char buffer[1024];
union {
char cbuf[16];
int ibuf[16/sizeof(int)];
}buf;
PCM_CPUID_INFO cpuinfo;
int max_cpuid;
pcm_cpuid(0, cpuinfo);
memset(buffer, 0, 1024);
memset(buf.cbuf, 0, 16);
buf.ibuf[0] = cpuinfo.array[1];
buf.ibuf[1] = cpuinfo.array[3];
buf.ibuf[2] = cpuinfo.array[2];
max_cpuid = cpuinfo.array[0];
pcm_cpuid(1, cpuinfo);
_cpu_stepping = cpuinfo.array[0] & 0x0f;
if (max_cpuid >= 0xa) {
// get counter related info
pcm_cpuid(0xa, cpuinfo);
_perfmon_version = extract_bits_ui(cpuinfo.array[0], 0, 7);
if (_perfmon_version > 1){
_core_fixed_counter_num_max = extract_bits_ui(cpuinfo.array[3], 0, 4);
_core_fixed_counter_width = extract_bits_ui(cpuinfo.array[3], 5, 12);
}
}
}
bool FixedMultiSensor::discoverSystemTopology() {
typedef std::map<uint32, uint32> socketIdMap_type;
socketIdMap_type socketIdMap;
PCM_CPUID_INFO cpuid_args;
pcm_cpuid(1, cpuid_args);
int apic_ids_per_package = (cpuid_args.array[1] & 0x00FF0000) >> 16, apic_ids_per_core;
if (apic_ids_per_package == 0) {
LOG(error)<< "FixedMultiSensor: apic_ids_per_package == 0";
return false;
}
pcm_cpuid(0xb, 0x0, cpuid_args);
if ((cpuid_args.array[2] & 0xFF00) == 0x100)
apic_ids_per_core = cpuid_args.array[1] & 0xFFFF;
else
apic_ids_per_core = 1;
if (apic_ids_per_core == 0) {
LOG(error)<< "FixedMultiSensor: apic_ids_per_core == 0";
return false;
}
// init constants for CPU topology leaf 0xB
// adapted from Topology Enumeration Reference code for Intel 64 Architecture
// https://software.intel.com/en-us/articles/intel-64-architecture-processor-topology-enumeration
int wasCoreReported = 0, wasThreadReported = 0;
int subleaf = 0, levelType, levelShift;
unsigned long coreplusSMT_Mask = 0L;
uint32 coreSelectMask = 0, smtSelectMask = 0, smtMaskWidth = 0;
uint32 l2CacheMaskShift = 0, l2CacheMaskWidth;
uint32 pkgSelectMask = (-1), pkgSelectMaskShift = 0;
unsigned long mask;
do {
pcm_cpuid(0xb, subleaf, cpuid_args);
if (cpuid_args.array[1] == 0) { // if EBX ==0 then this subleaf is not valid, we can exit the loop
break;
}
mask = (1 << (16)) - 1;
levelType = (cpuid_args.array[2] & mask) >> 8;
mask = (1 << (5)) - 1;
levelShift = (cpuid_args.array[0] & mask);
switch (levelType) {
case 1: //level type is SMT, so levelShift is the SMT_Mask_Width
smtSelectMask = ~((-1) << levelShift);
smtMaskWidth = levelShift;
wasThreadReported = 1;
break;
case 2: //level type is Core, so levelShift is the CorePlsuSMT_Mask_Width
coreplusSMT_Mask = ~((-1) << levelShift);
pkgSelectMaskShift = levelShift;
pkgSelectMask = (-1) ^ coreplusSMT_Mask;
wasCoreReported = 1;
break;
default:
break;
}
subleaf++;
} while (1);
if(wasThreadReported && wasCoreReported){
coreSelectMask = coreplusSMT_Mask ^ smtSelectMask;
}else if (!wasCoreReported && wasThreadReported){
pkgSelectMaskShift = smtMaskWidth;
pkgSelectMask = (-1) ^ smtSelectMask;
} else{
LOG(error)<< "FixedMultiSensor: ERROR: this should not happen if hardware function normally";
return false;
}
pcm_cpuid(0x4, 2, cpuid_args); // get ID for L2 cache
mask = ((1<<(12)) - 1) << (14);// mask with bits 25:14 set to 1
l2CacheMaskWidth = 1 + ((cpuid_args.array[0] & mask) >> 14);// number of APIC IDs sharing L2 cache
for(; l2CacheMaskWidth > 1; l2CacheMaskWidth >>= 1){
l2CacheMaskShift++;
}
TopologyEntry entry;
_num_cores = readMaxFromSysFS("/sys/devices/system/cpu/present");
if (_num_cores == -1) {
LOG(error)<< "FixedMultiSensor: Can not read number of present cores";
return false;
}
++_num_cores;
// open /proc/cpuinfo
FILE * f_cpuinfo = fopen("/proc/cpuinfo", "r");
if (!f_cpuinfo) {
LOG(error)<< "FixedMultiSensor: Can not open /proc/cpuinfo file.";
return false;
}
// map with key=pkg_apic_id (not necessarily zero based or sequential) and
// associated value=socket_id that should be 0 based and sequential
std::map<int, int> found_pkg_ids;
_topology.resize(_num_cores);
char buffer[1024];
while (0 != fgets(buffer, 1024, f_cpuinfo)) {
if (strncmp(buffer, "processor", sizeof("processor") - 1) == 0) {
pcm_sscanf(buffer) >> s_expect("processor\t: ") >> entry.os_id;
//std::cout << "os_core_id: "<<entry.os_id<< std::endl;
TemporalAffinity ta(entry.os_id);
pcm_cpuid(0xb, 0x0, cpuid_args);
int apic_id = cpuid_args.array[3];
entry.thread_id = (apic_id & smtSelectMask);
entry.core_id = (apic_id & coreSelectMask) >> smtMaskWidth;
entry.socket = (apic_id & pkgSelectMask) >> pkgSelectMaskShift;
entry.tile_id = (apic_id >> l2CacheMaskShift);
_topology[entry.os_id] = entry;
socketIdMap[entry.socket] = 0;
++_num_online_cores;
}
}
fclose(f_cpuinfo);
if (_num_cores == 0) {
_num_cores = (int32) _topology.size();
}
if (_num_sockets == 0) {
_num_sockets = (int32) (std::max)(socketIdMap.size(), (size_t) 1);
}
socketIdMap_type::iterator s = socketIdMap.begin();
for (uint32 sid = 0; s != socketIdMap.end(); ++s) {
s->second = sid++;
}
for (int i = 0; (i < (int) _num_cores) && (!socketIdMap.empty()); ++i) {
if (isCoreOnline((int32) i))
_topology[i].socket = socketIdMap[_topology[i].socket];
}
if (_threads_per_core == 0) {
for (int i = 0; i < (int) _num_cores; ++i) {
if (_topology[i].socket == _topology[0].socket
&& _topology[i].core_id == _topology[0].core_id)
++_threads_per_core;
}
}
if (_num_phys_cores_per_socket == 0)
_num_phys_cores_per_socket = _num_cores / _num_sockets / _threads_per_core;
if (_num_online_cores == 0)
_num_online_cores = _num_cores;
int32 i = 0;
_socketRefCore.resize(_num_sockets, -1);
for (i = 0; i < _num_cores; ++i) {
if (isCoreOnline(i)) {
_socketRefCore[_topology[i].socket] = i;
}
}
_num_online_sockets = 0;
for (i = 0; i < _num_sockets; ++i) {
if (isSocketOnline(i)) {
++_num_online_sockets;
}
}
return true;
}
bool FixedMultiSensor::initMSR() {
try {
for (int i = 0; i < (int)_num_cores; ++i) {
if (isCoreOnline((int32)i))
MSR.push_back(std::shared_ptr<SafeMsrHandle>(new SafeMsrHandle(i)));
else // the core is offlined, assign an invalid MSR handle
MSR.push_back(std::shared_ptr<SafeMsrHandle>(new SafeMsrHandle()));
}
} catch (...) { // failed
MSR.clear();
ableToMeasure = false;
LOG(error)<< "FixedMultiSensor: Can not access CPUs Model Specific Registers (MSRs).";
return false;
}
return true;
}
bool FixedMultiSensor::isCoreOnline(int32 os_core_id) const {
return (_topology[os_core_id].os_id != -1) && (_topology[os_core_id].core_id != -1) && (_topology[os_core_id].socket != -1);
}
bool FixedMultiSensor::isSocketOnline(int32 socket_id) const {
return _socketRefCore[socket_id] != -1;
}
void FixedMultiSensor::programFixed() {
if (!ableToMeasure)
return;
for (int i = 0; i < (int) _num_cores; ++i) {
// program core counters
TemporalAffinity tempThreadAffinity(i); // speedup trick for Linux
FixedEventControlRegister ctrl_reg;
// disable counters while programming
MSR[i]->write(IA32_CR_PERF_GLOBAL_CTRL, 0);
MSR[i]->read(IA32_CR_FIXED_CTR_CTRL, &ctrl_reg.value);
ctrl_reg.fields.os0 = 1;
ctrl_reg.fields.usr0 = 1;
ctrl_reg.fields.any_thread0 = 0;
ctrl_reg.fields.enable_pmi0 = 0;
ctrl_reg.fields.os1 = 1;
ctrl_reg.fields.usr1 = 1;
ctrl_reg.fields.any_thread1 = 0;
ctrl_reg.fields.enable_pmi1 = 0;
ctrl_reg.fields.os2 = 1;
ctrl_reg.fields.usr2 = 1;
ctrl_reg.fields.any_thread2 = 0;
ctrl_reg.fields.enable_pmi2 = 0;
ctrl_reg.fields.reserved1 = 0;
MSR[i]->write(IA32_CR_FIXED_CTR_CTRL, ctrl_reg.value);
// start counting, enable 3 fixed counters
//uint64 value = (1ULL << 0) + (1ULL << 1) + (1ULL << 2) + (1ULL << 3) + (1ULL << 32) + (1ULL << 33) + (1ULL << 34);
uint64 value = (1ULL << 32) + (1ULL << 33) + (1ULL << 34);
MSR[i]->write(IA32_CR_PERF_GLOBAL_CTRL, value);
}
}
void FixedMultiSensor::getAllCounterStates(std::vector<BasicCounterState> & coreStates) {
// zero-initialize all inputs
coreStates.clear();
coreStates.resize(_num_cores);
for (int32 core = 0; core < _num_cores; ++core) {
coreStates[core].readAndAggregate(MSR[core], _core_fixed_counter_width);
}
}
void FixedMultiSensor::read(){
getAllCounterStates(coreAfterState);
reading_t reading;
reading.timestamp = getTimestamp();
if(coreBeforeState.size() > 0) //nothing to compute first time...
for(int32 cpu = 0; cpu<_num_cores ; ++cpu) {
for(auto & kv: internalIdToMqttSuffix){
std::string mqttID=createIDFromMqttParts(_mqttPrefix, cpu, kv.second);
auto found = _readingQueuesMap.find(mqttID);
if(found != _readingQueuesMap.end()){
if(coreBeforeState[cpu].internalIdToData[kv.first] > coreAfterState[cpu].internalIdToData[kv.first]){ //overflow
reading.value = coreAfterState[cpu].internalIdToData[kv.first] + ((2 << _core_fixed_counter_width) - coreBeforeState[cpu].internalIdToData[kv.first]);
} else {
reading.value = coreAfterState[cpu].internalIdToData[kv.first] - coreBeforeState[cpu].internalIdToData[kv.first];
}
found->second->push(reading);
}
}
}
coreBeforeState = std::move(coreAfterState);
}
void FixedMultiSensor::readAsync(){
uint64_t now = getTimestamp();
uint64_t expires_at = 0;
if (_timer != nullptr && _keepRunning) {
read(); //populate coreBeforeState
if(_sleep_time && _state == MEASURE_STATE::SLEEP){
//LOG(debug) << "FixedMultiSensor Entering measurement time for " << _interval << "ms";
_state = MEASURE_STATE::MEASUREMENT;
expires_at = MS_TO_NS(now + _interval);
} else if(_sleep_time){ //_state == MEASURE_STATE::MEASUREMENT
coreBeforeState.clear(); //reset before
//LOG(debug) << "FixedMultiSensor Entering sleep time for " << _sleep_time << "ms";
_state = MEASURE_STATE::SLEEP;
expires_at = MS_TO_NS(now + _sleep_time);
} else { //no sleep time
//LOG(debug) << "FixedMultiSensor No sleep time, so expires at " << _interval << "ms";
expires_at = MS_TO_NS(now + _interval);
}
_timer->expires_at(timestamp2ptime(expires_at));
_timer->async_wait(std::bind(&FixedMultiSensor::readAsync, this));
}
}
void FixedMultiSensor::startPolling(){
programFixed();
getAllCounterStates(coreBeforeState);
_keepRunning = 1;
uint64_t waitToStart = 0;
if(_sleep_time){
waitToStart = starting_timestamp(_interval + _sleep_time);
_state = MEASURE_STATE::SLEEP;
} else {
waitToStart = starting_timestamp(_interval);
}
LOG(info) << "Sensor " << _name << " will be started in " << (waitToStart - getTimestamp())/(1000*1000) << "ms.";
_timer->expires_at(timestamp2ptime(waitToStart));
_timer->async_wait(std::bind(&FixedMultiSensor::readAsync, this));
}
void FixedMultiSensor::addMqttAndLog(int cpu, const std::string & mqttSuffix){
std::string mqttID = createIDFromMqttParts(_mqttPrefix, cpu, mqttSuffix);
bool added = MultiSensor::addMqtt(mqttID);
if(!added){
LOG(error) << "FixedMultiSensor: unable to add MqttID=" << mqttID << "(built from cpu=" << cpu << " mqttMetricSuffix=" << mqttSuffix << ")";
}
}
void FixedMultiSensor::stopPolling(){
_keepRunning = 0;
//cancel any outstanding readAsync()
_timer->cancel();
LOG(info) << "Sensor " << _name << " stopped.";
}
void FixedMultiSensor::setMqttSuffix(const std::string& mqttSuffix, uint64_t metricId){
if (metricId == INST_RETIRED_ANY
|| metricId == CPU_CLK_UNHALTED_THREAD
|| metricId == CPU_CLK_UNHALTED_REF) {
internalIdToMqttSuffix[static_cast<INTERNAL_METRICID>(metricId)] = mqttSuffix;
//create mqtts
for (int cpu = 0; cpu < _num_cores; ++cpu) {
addMqttAndLog(cpu, mqttSuffix);
}
}
}
iterMap_t FixedMultiSensor::cbegin_publish() {
return _begin_publishing;
}
iterMap_t FixedMultiSensor::cend_publish() {
return _end_publishing;
}
bool FixedMultiSensor::canPublish() {
return roundRobinPublishing(*this);
}
/*****************************CounterState classes***********************/
void BasicCounterState::readAndAggregate(std::shared_ptr<SafeMsrHandle> msr, uint32 core_fixed_counter_width) {
uint64 cInstRetiredAny = 0, cCpuClkUnhaltedThread = 0, cCpuClkUnhaltedRef = 0;
const int32 core_id = msr->getCoreId();
TemporalAffinity tA(core_id);
// reading core fixed counters
msr->read(FixedMultiSensor::INTERNAL_METRICID::INST_RETIRED_ANY, &cInstRetiredAny);
msr->read(FixedMultiSensor::INTERNAL_METRICID::CPU_CLK_UNHALTED_THREAD, &cCpuClkUnhaltedThread);
msr->read(FixedMultiSensor::INTERNAL_METRICID::CPU_CLK_UNHALTED_REF, &cCpuClkUnhaltedRef);
internalIdToData[FixedMultiSensor::INTERNAL_METRICID::INST_RETIRED_ANY] +=extractCoreFixedCounterValue(cInstRetiredAny, core_fixed_counter_width);
internalIdToData[FixedMultiSensor::INTERNAL_METRICID::CPU_CLK_UNHALTED_REF] +=extractCoreFixedCounterValue(cCpuClkUnhaltedThread, core_fixed_counter_width);
internalIdToData[FixedMultiSensor::INTERNAL_METRICID::CPU_CLK_UNHALTED_THREAD] = extractCoreFixedCounterValue(cCpuClkUnhaltedRef, core_fixed_counter_width);
}
// a secure (but partial) alternative for sscanf
// see example usage in pcm-core.cpp