Commit dc8ccb69 authored by Micha Mueller's avatar Micha Mueller
Browse files

Finish first working version of perfpusher

Deleted test.c as it was just a reminder-file.
Add readme.txt as kind of (incomplete) documentation
Various TODOs remain. Also some polishing could be required.
parent 74004fdb
......@@ -18,19 +18,45 @@ using namespace std;
Configuration::Configuration(std::string cfgFile) {
//set up enum-maps to map string from cfgFile to an enum value defined in linux/perf_event.h
enumType["PERF_TYPE_HARDWARE"] = PERF_TYPE_HARDWARE;
enumType["PERF_TYPE_SOFTWARE"] = PERF_TYPE_SOFTWARE;
enumType["PERF_TYPE_TRACEPOINT"] = PERF_TYPE_TRACEPOINT;
enumType["PERF_TYPE_HW_CACHE"] = PERF_TYPE_HW_CACHE;
enumType["PERF_TYPE_RAW"] = PERF_TYPE_RAW;
enumType["PERF_TYPE_BREAKPOINT"] = PERF_TYPE_BREAKPOINT;
//TODO all events used?
_enumType["PERF_TYPE_HARDWARE"] = PERF_TYPE_HARDWARE;
_enumType["PERF_TYPE_SOFTWARE"] = PERF_TYPE_SOFTWARE;
_enumType["PERF_TYPE_TRACEPOINT"] = PERF_TYPE_TRACEPOINT;
_enumType["PERF_TYPE_HW_CACHE"] = PERF_TYPE_HW_CACHE;
_enumType["PERF_TYPE_RAW"] = PERF_TYPE_RAW;
_enumType["PERF_TYPE_BREAKPOINT"] = PERF_TYPE_BREAKPOINT;
//TODO set up map for config enum
//if type==PERF_TYPE_HARDWARE
_enumConfig["PERF_COUNT_HW_CPU_CYCLES"] = PERF_COUNT_HW_CPU_CYCLES;
_enumConfig["PERF_COUNT_HW_INSTRUCTIONS"] = PERF_COUNT_HW_INSTRUCTIONS;
_enumConfig["PERF_COUNT_HW_CACHE_REFERENCES"] = PERF_COUNT_HW_CACHE_REFERENCES;
_enumConfig["PERF_COUNT_HW_CACHE_MISSES"] = PERF_COUNT_HW_CACHE_MISSES;
_enumConfig["PERF_COUNT_HW_BRANCH_INSTRUCTIONS"] = PERF_COUNT_HW_BRANCH_INSTRUCTIONS;
_enumConfig["PERF_COUNT_HW_BRANCH_MISSES"] = PERF_COUNT_HW_BRANCH_MISSES;
_enumConfig["PERF_COUNT_HW_BUS_CYCLES"] = PERF_COUNT_HW_BUS_CYCLES;
_enumConfig["PERF_COUNT_HW_STALLED_CYCLES_FRONTEND"] = PERF_COUNT_HW_STALLED_CYCLES_FRONTEND;
_enumConfig["PERF_COUNT_HW_STALLED_CYCLES_BACKEND"] = PERF_COUNT_HW_STALLED_CYCLES_BACKEND;
_enumConfig["PERF_COUNT_HW_REF_CPU_CYCLES"] = PERF_COUNT_HW_REF_CPU_CYCLES;
//if type==PERF_TYPE_SOFTWARE
_enumConfig["PERF_COUNT_SW_CPU_CLOCK"] = PERF_COUNT_SW_CPU_CLOCK;
_enumConfig["PERF_COUNT_SW_TASK_CLOCK"] = PERF_COUNT_SW_TASK_CLOCK;
_enumConfig["PERF_COUNT_SW_PAGE_FAULTS"] = PERF_COUNT_SW_PAGE_FAULTS;
_enumConfig["PERF_COUNT_SW_CONTEXT_SWITCHES"] = PERF_COUNT_SW_CONTEXT_SWITCHES;
_enumConfig["PERF_COUNT_SW_CPU_MIGRATIONS"] = PERF_COUNT_SW_CPU_MIGRATIONS;
_enumConfig["PERF_COUNT_SW_PAGE_FAULTS_MIN"] = PERF_COUNT_SW_PAGE_FAULTS_MIN;
_enumConfig["PERF_COUNT_SW_PAGE_FAULTS_MAJ"] = PERF_COUNT_SW_PAGE_FAULTS_MAJ;
_enumConfig["PERF_COUNT_SW_ALIGNMENT_FAULTS"] = PERF_COUNT_SW_ALIGNMENT_FAULTS;
_enumConfig["PERF_COUNT_SW_EMULATION_FAULTS"] = PERF_COUNT_SW_EMULATION_FAULTS;
_enumConfig["PERF_COUNT_SW_DUMMY"] = PERF_COUNT_SW_DUMMY;
//TODO set up map for rest of config enum
//set default values for global variables
_global.brokerHost = "";
_global.brokerPort = 1883;
_global.mqttPrefix = "";
_global.threads = 1;
boost::property_tree::iptree cfg;
boost::property_tree::read_info(cfgFile, cfg);
......@@ -63,8 +89,11 @@ Configuration::Configuration(std::string cfgFile) {
cout << "Template Counter \"" << counter.second.data() << "\"" << endl;
if (!counter.second.empty()) {
PerfCounter perfCounter(counter.second.data());
readCounter(perfCounter, counter.second);
_templateCounters.insert(perfCounterMap_t::value_type(perfCounter.getName(), perfCounter));
if(!readCounter(perfCounter, counter.second)) {
_templateCounters.insert(perfCounterMap_t::value_type(perfCounter.getName(), perfCounter));
} else {
cout << "Template Counter \"" << counter.second.data() << "\" has bad values! Ignoring..." << endl;
}
}
}
}
......@@ -85,12 +114,15 @@ Configuration::Configuration(std::string cfgFile) {
perfCounter = it->second;
perfCounter.setName(counter.second.data());
} else {
cout << "Template counter \"" << defaultC.get().data() << "\" not found! Using standard values." << endl;
cout << " Template counter \"" << defaultC.get().data() << "\" not found! Using standard values." << endl;
}
}
//read remaining values
readCounter(perfCounter, counter.second);
_perfCounters.push_back(perfCounter);
if(!readCounter(perfCounter, counter.second)) {
_perfCounters.push_back(perfCounter);
} else {
cout << " Counter \"" << counter.second.data() << "\" has bad values! Ignoring..." << endl;
}
}
}
}
......@@ -101,23 +133,41 @@ Configuration::~Configuration() {
}
//TODO extend for more required values
void Configuration::readCounter(PerfCounter& counter, boost::property_tree::iptree& config) {
BOOST_FOREACH(boost::property_tree::iptree::value_type &s, config) {
if (boost::iequals(s.first, "interval")) {
counter.setInterval(stoi(s.second.data()));
} else if (boost::iequals(s.first, "mqttsuffix")) {
counter.setMqtt(s.second.data());
} else if (boost::iequals(s.first, "minValues")) {
counter.setMinValues(stoull(s.second.data()));
} else if (boost::iequals(s.first, "default")) {
int Configuration::readCounter(PerfCounter& counter, boost::property_tree::iptree& config) {
BOOST_FOREACH(boost::property_tree::iptree::value_type &val, config) {
if (boost::iequals(val.first, "interval")) {
counter.setInterval(stoi(val.second.data()));
} else if (boost::iequals(val.first, "mqttsuffix")) {
counter.setStartMqtt(val.second.data());
} else if (boost::iequals(val.first, "minValues")) {
counter.setMinValues(stoull(val.second.data()));
} else if (boost::iequals(val.first, "type")) {
enumMap_t::iterator it = _enumType.find(val.second.data());
if(it != _enumType.end()) {
counter.setType(it->second);
cout << " Type: " << val.second.data() << " (= " << counter.getType() << ")" << endl;
} else {
cout << " Type \"" << val.second.data() << "\" not known." << endl;
return 1;
}
} else if (boost::iequals(val.first, "config")) {
enumMap_t::iterator it = _enumConfig.find(val.second.data());
if(it != _enumConfig.end()) {
counter.setConfig(it->second);
cout << " Config: " << val.second.data() << " (= " << counter.getConfig() << ")" << endl;
} else {
cout << " Config \"" << val.second.data() << "\" not known." << endl;
return 1;
}
} else if (boost::iequals(val.first, "default")) {
//avoid unnecessary "Value not recognized" message
} else {
cout << " Value \"" << s.first << "\" not recognized. Omitting..." << endl;
}//TODO read in type+config values (see enum-maps)
cout << " Value \"" << val.first << "\" not recognized. Omitting..." << endl;
}
}
cout << " MQTT : " << counter.getMqtt() << endl;
cout << " Interval: " << counter.getInterval() << endl;
cout << " minValues:" << counter.getMinValues() << endl;
return;
cout << " StartMQTT: " << counter.getStartMqtt() << endl;
cout << " Interval : " << counter.getInterval() << endl;
cout << " minValues: " << counter.getMinValues() << endl;
return 0;
}
......@@ -19,6 +19,7 @@ class Configuration {
typedef std::vector<PerfCounter> perfCounterVector_t;
typedef std::map<std::string, PerfCounter> perfCounterMap_t; //TODO templates useful?
typedef std::map<std::string, unsigned int> enumMap_t;
typedef struct {
int brokerPort;
std::string brokerHost;
......@@ -42,13 +43,14 @@ private:
* Set the variables of counter according to the values specified in config.
* @param counter The counter to be configured
* @param config A property(sub)tree containing the values
* @return 0 on success, 1 if a required value could not be parsed
*/
void readCounter(PerfCounter& counter, boost::property_tree::iptree& config);
int readCounter(PerfCounter& counter, boost::property_tree::iptree& config);
perfCounterMap_t _templateCounters;
std::map<std::string, unsigned int> enumType;
std::map<std::string, unsigned int> enumConfig;
enumMap_t _enumType;
enumMap_t _enumConfig;
};
#endif /* CONFIGURATION_H_ */
......@@ -54,37 +54,39 @@ void MQTTPusher::push() {
}
}
reading_t* reads = new reading_t[1024];
reading_t* reads = new reading_t[512];
std::size_t totalCount = 0;
while (keepRunning || totalCount) {
totalCount = 0;
BOOST_FOREACH(PerfCounter& counter, _cfg._perfCounters) {
if (counter.getSizeOfReadingQueue() >= counter.getMinValues()) {
if (!_connected) {
if (mosquitto_reconnect(_mosq) != MOSQ_ERR_SUCCESS) {
std::cout << "mosquitto: could not reconnect to MQTT broker " << _cfg._global.brokerHost << ":" << _cfg._global.brokerPort << std::endl;
sleep(1);
} else {
_connected = true;
for(cpu_t& cpu : counter._cpus) {
if (cpu.readingQueue->read_available() >= counter.getMinValues()) {
if (!_connected) {
if (mosquitto_reconnect(_mosq) != MOSQ_ERR_SUCCESS) {
std::cout << "mosquitto: could not reconnect to MQTT broker " << _cfg._global.brokerHost << ":" << _cfg._global.brokerPort << std::endl;
sleep(1);
} else {
_connected = true;
}
}
}
if (_connected) {
std::size_t count = counter.popReadingQueue(reads, 1024);
totalCount+= count;
if (_connected) {
std::size_t count = cpu.readingQueue->pop(reads, 512);
totalCount+= count;
#ifdef DEBUG
std::cout << counter.getName() << " has read " << count << " values:" << std::endl;
for (std::size_t i=0; i<count; i++) {
std::cout << " " << reads[i].timestamp << " " << reads[i].value << std::endl;
}
std::cout << counter.getName() << "/CPU" << cpu.id << " has read " << count << " values:" << std::endl;
for (std::size_t i=0; i<count; i++) {
std::cout << " " << reads[i].timestamp << " " << reads[i].value << std::endl;
}
#endif
if (mosquitto_publish(_mosq, NULL, (_cfg._global.mqttPrefix+counter.getMqtt()).c_str(), sizeof(reading_t)*count, reads, 0, false) != MOSQ_ERR_SUCCESS) {
std::cerr << "mosquitto: could not send message! Trying again later" << std::endl;
mosquitto_disconnect(_mosq);
_connected = false;
counter.pushReadingQueue(reads, count);
sleep(5);
break;
if (mosquitto_publish(_mosq, NULL, (_cfg._global.mqttPrefix+cpu.mqtt).c_str(), sizeof(reading_t)*count, reads, 0, false) != MOSQ_ERR_SUCCESS) {
std::cerr << "mosquitto: could not send message! Trying again later" << std::endl;
mosquitto_disconnect(_mosq);
_connected = false;
cpu.readingQueue->push(reads, count);
sleep(5);
break;
}
}
}
}
......
......@@ -7,44 +7,73 @@
#include "PerfCounter.h"
#include "timestamp.h"
#include <functional>
#include <string>
#include <sstream>
#include <unistd.h>
#include <sys/ioctl.h>
#include <sys/sysinfo.h>
#include <linux/perf_event.h>
#include <linux/hw_breakpoint.h>
#include <asm/unistd.h>
#include <functional>
extern volatile int keepRunning;
PerfCounter::PerfCounter(const std::string name) {
_name = name;
_mqtt = "";
_startMqtt = "";
_minValues = 1;
_interval = 0;
_interval = 1000;
_type = 0;
_config = 0;
_latestValue.value = 0;
_latestValue.timestamp = 0;
_timer = NULL;
_latestReading = 0;
_readingQueue = NULL;
cpu_t cpu;
cpu.latestValue.timestamp = 0;
cpu.latestValue.value = 0;
//TODO get_nprocs() best possibility? what about hyper threading?
//with hyper-threading present, perf_event_open counts per logical cpu
for (int i = 0; i < get_nprocs(); i++) {
cpu.id = i;
cpu.fd = -1;
cpu.mqtt = "";
cpu.readingQueue = NULL;
_cpus.push_back(cpu);
}
}
PerfCounter::~PerfCounter() {
if(_readingQueue != NULL) {
delete _readingQueue;
_readingQueue = NULL;
}
for(int fd : _fileDescriptors) {
close(fd); //TODO keep an eye here
for(cpu_t& cpu : _cpus) {
if(cpu.fd != -1) {
close(cpu.fd);
}
if (cpu.readingQueue != NULL) {
delete cpu.readingQueue;
}
}
}
void PerfCounter::read() {
reading_t reading;
reading.timestamp = getTimestamp();
//TODO
_readingQueue->push(reading);
for (cpu_t& cpu : _cpus) {
long long count;
if (cpu.fd == -1) {continue;}
::read(cpu.fd, &count, sizeof(long long));
if (count >= cpu.latestValue.value) {
reading.value = count - cpu.latestValue.value;
} else {
//the counter overflow since last read
//but what is the max-value of a counter? (=long long??)
//TODO how to proper handle overflow?
reading.value = count; //+ (maxVal - cpu.latestValue.value)
}
cpu.readingQueue->push(reading);
cpu.latestValue.value = count;
cpu.latestValue.timestamp = reading.timestamp;
}
}
void PerfCounter::readAsync() {
......@@ -58,39 +87,49 @@ void PerfCounter::readAsync() {
}
void PerfCounter::startPolling(boost::asio::io_service& io) {
if (_readingQueue == NULL) {
_readingQueue = new boost::lockfree::spsc_queue<reading_t>(1024);
for(cpu_t& cpu : _cpus) {
if (cpu.readingQueue == NULL) {
cpu.readingQueue = new boost::lockfree::spsc_queue<reading_t>(512);
}
}
//open perf-counter for every CPU
//TODO get_nprocs best possibility?
for (int i = 0; i < get_nprocs(); i++) {
int fd;
struct perf_event_attr pe;
memset(&pe, 0, sizeof(struct perf_event_attr));
pe.type = _type;
pe.size = sizeof(struct perf_event_attr);
pe.config = _config;
pe.disabled = 1;
pe.exclude_kernel = 1;
pe.exclude_hv = 1;
//TODO set other parameters?
fd = syscall(__NR_perf_event_open, &pe, -1, i, -1, 0);
if (fd == -1) {
//TODO more meaningfull error message
std::cerr << "Error opening performance-counter" << pe.config << std::endl;
struct perf_event_attr pe;
memset(&pe, 0, sizeof(struct perf_event_attr));
pe.type = _type;
pe.size = sizeof(struct perf_event_attr);
pe.config = _config;
pe.disabled = 1;
pe.exclude_kernel = 1;
pe.exclude_hv = 1;
//TODO set other parameters?
for (cpu_t& cpu : _cpus) {
cpu.mqtt = increase(_startMqtt, cpu.id);
#ifdef DEBUG
std::cout << " Counter " << _name << "|CPU" << cpu.id << " using MQTT-Suffix " << cpu.mqtt << std::endl;
#endif
//perf_event_open()
cpu.fd = syscall(__NR_perf_event_open, &pe, -1, cpu.id, -1, 0);
if (cpu.fd == -1) {
std::cerr << "Error opening performance-counter \"" << _name << "\" on CPU " << cpu.id << std::endl;
continue;
}
//TODO necessary? or just start with perf-counter enabled ( pe.disabled = 0)?
ioctl(fd, PERF_EVENT_IOC_RESET, 0);
ioctl(fd, PERF_EVENT_IOC_ENABLE, 0);
_fileDescriptors.push_back(fd);
ioctl(cpu.fd, PERF_EVENT_IOC_RESET, 0);
ioctl(cpu.fd, PERF_EVENT_IOC_ENABLE, 0);
}
//TODO open file descriptor?
_timer = new boost::asio::deadline_timer(io, boost::posix_time::seconds(0));
_timer->async_wait(std::bind(&PerfCounter::readAsync, this));
}
const std::string PerfCounter::increase(const std::string& mqtt, int val) {
unsigned long mqttDigits = stoul(mqtt, 0, 16);
mqttDigits += val;
std::stringstream stream;
stream << std::setfill ('0') << std::setw(mqtt.length()) << std::hex << mqttDigits;
return stream.str();
}
......@@ -8,6 +8,8 @@
#ifndef PERFCOUNTER_H_
#define PERFCOUNTER_H_
//#define DEBUG
#include <string>
#include <vector>
#include <boost/asio.hpp>
......@@ -18,6 +20,14 @@ typedef struct {
uint64_t timestamp;
} reading_t;
typedef struct {
int id;
int fd;
std::string mqtt;
reading_t latestValue;
boost::lockfree::spsc_queue<reading_t>* readingQueue;
} cpu_t;
class PerfCounter {
public:
......@@ -32,12 +42,12 @@ public:
_name = name;
}
const std::string& getMqtt() const {
return _mqtt;
const std::string& getStartMqtt() const {
return _startMqtt;
}
void setMqtt(const std::string& mqtt) {
_mqtt = mqtt;
void setStartMqtt(const std::string& mqtt) {
_startMqtt = mqtt;
}
unsigned getMinValues() const {
......@@ -56,6 +66,22 @@ public:
_interval = interval;
}
unsigned getType() const {
return _type;
}
void setType(unsigned type) {
_type = type;
}
unsigned getConfig() const {
return _config;
}
void setConfig(unsigned config) {
_config = config;
}
/*
const std::size_t getSizeOfReadingQueue() const {
return _readingQueue->read_available();
}
......@@ -66,7 +92,7 @@ public:
void pushReadingQueue(reading_t *reads, std::size_t count) const {
_readingQueue->push(reads, count);
}
}*/
void read();
......@@ -74,20 +100,30 @@ public:
void startPolling(boost::asio::io_service& io);
/**
* Increase the mqtt by val means
* Example: increase("003F", 1) returns "0040"
* increase("003F", 15) returns "004D"
*
* @param mqtt The mqtt-topic to be increased
* @param val The value by which the mqtt-topic should be increased
* @return The increased mqtt-topic
*/
const std::string increase(const std::string& mqtt, int val);
//TODO better way than making it public?
//MQTTPusher doesnt want an complicated access to readingQueues
std::vector<cpu_t> _cpus;
private:
std::string _name;
std::string _mqtt;
std::string _startMqtt;
unsigned int _minValues;
unsigned int _interval;
unsigned int _type;
unsigned int _config;
reading_t _latestValue;
std::vector<int> _fileDescriptors;
boost::asio::deadline_timer* _timer;
uint64_t _latestReading;
boost::lockfree::spsc_queue<reading_t>* _readingQueue;
};
#endif /* PERFCOUNTER_H_ */
global {
mqttBroker localhost:1883
mqttprefix /00112233445566778899AABB0000
threads 4
}
CounterTemplate {
counter def1 {
interval 5000
mqttsuffix 0222
minValues 5
type PERF_TYPE_HARDWARE
config PERF_COUNT_HW_INSTRUCTIONS
}
counter def2 {
interval 2000
type PERF_TYPE_HARDWARE
}
}
counters {
counter hw_instructions {
default def1
mqttsuffix 0020
}
counter hw_branch_instructions {
default def2
mqttsuffix 0024
config PERF_COUNT_HW_BRANCH_INSTRUCTIONS
}
counter hw_branch_misses {
default def2
mqttsuffix 002C
config PERF_COUNT_HW_BRANCH_MISSES
}
counter sw_pagefaults {
interval 5000
mqttsuffix 0030
type PERF_TYPE_SOFTWARE
config PERF_COUNT_SW_PAGE_FAULTS
}
counter sw_context_switches {
interval 5000
mqttsuffix 0034
type PERF_TYPE_SOFTWARE
config PERF_COUNT_SW_CONTEXT_SWITCHES
}
counter sw_cpu_migrations {
interval 5000
mqttsuffix 0038
type PERF_TYPE_SOFTWARE
config PERF_COUNT_SW_CPU_MIGRATIONS
}
}
......@@ -105,7 +105,7 @@ int main(int argc, char** argv) {
BOOST_FOREACH(PerfCounter& counter, cfg._perfCounters) {
#ifdef DEBUG
cout << "Starting sensor " << sensor.getName() << endl;
cout << "Starting sensor " << counter.getName() << endl;
#endif
counter.startPolling(io);
}
......
Welcome to the perfpusher for DCDB!
This is a scratchpad used to collect useful information and notes on the perfpusher.
This could be considered a (uncomplete) documentation.
Intro
DCDB is a database to collect various (sensor-)values of a datacenter for further anlysis.
For harvesting the data various pushers are available.
Perfpusher is tasked with collecting data from the CPUs various performance counters (PMUs) as configured in the config file.
Running perfpusher
An working installation of DCDB is required. Simply compile Perfpusher by running the given makefile. Run Perfpusher by executing the binary and specifying a config file.
Config-file
The config-file consists of a number of mandatory and optional values. The structure a config-file should follow can be seen in the example config-file "perfpusher.conf" which is provided in the repository.
The various parameters shall be explained in the following ([m] = mandatory, [o] = optional):
global { [m]
mqttBroker HOST:PORT [m] // define address of the mqtt-broker which collects the messages send by
perfpusher
mqttprefix /XXXXXXX [m] // to not rewrite a full mqtt-topic for every counter one can specify here the
consistend prefix
threads N [o] // specify how many threads should be created to collect values. If one configures
more counters than one thread can handle in time values are not guaranteed to be
consistent anymore. Default value of threads is 1. Note that the MQTTPusher
always starts an extra thread. So the actual number of started threads is always
N+1.
}
CounterTemplate { [m] // Here one can define template counters which then can be referenced further below
counter A {...} [o] by a real sensor. This field is mainly for convenience reasons. One is not
counter B {...} [o] obligated to define any template sensors. However it is required to at least
... [o] define an empty CounterTemplate {} structure.
}
counters { [m] // Wrapper structure where the "real" counters are specified.
counter c_name { // Every counter has to be announced with the word "counter" followed by a freely
selectable name.