Commit 59925921 authored by Micha Mueller's avatar Micha Mueller
Browse files

Using separate instance of PerfCounter per counter and CPU

(former only one PerfCounter for all CPUs)
Add check if MQTT-Topic is assigned twice
parent 6e40c6fe
......@@ -8,6 +8,10 @@
#include "Configuration.h"
#include <iostream>
#include <string>
#include <sstream>
#include <unistd.h>
#include <iomanip>
#include <sys/sysinfo.h>
#include <linux/perf_event.h>
#include <boost/foreach.hpp>
......@@ -16,9 +20,10 @@
using namespace std;
extern volatile int keepRunning;
Configuration::Configuration(std::string cfgFile) {
//set up enum-maps to map string from cfgFile to an enum value defined in linux/perf_event.h
//TODO all events used?
_enumType["PERF_TYPE_HARDWARE"] = PERF_TYPE_HARDWARE;
_enumType["PERF_TYPE_SOFTWARE"] = PERF_TYPE_SOFTWARE;
_enumType["PERF_TYPE_TRACEPOINT"] = PERF_TYPE_TRACEPOINT;
......@@ -117,9 +122,29 @@ Configuration::Configuration(std::string cfgFile) {
cout << " Template counter \"" << defaultC.get().data() << "\" not found! Using standard values." << endl;
}
}
//read remaining values
if(!readCounter(perfCounter, counter.second)) {
_perfCounters.push_back(perfCounter);
//now create distinct perfCounter and mqttSuffix per CPU
string startMqtt = perfCounter.getMqtt();
//customize perfCounter for every CPU
for (int i = 0; i < get_nprocs(); i++) {
string incMqtt = increaseMqtt(startMqtt, i);
//check if mqtt-suffix is used twice
auto returnIt = _mqttSuffixes.insert(incMqtt);
if (!returnIt.second) {
cout << "ERROR: Mqtt-Topic (Suffix " << incMqtt << ") is used twice! Aborting" << endl;
keepRunning = 0;
return;
}
std::cout << " CPU " << perfCounter.getCpuId() << " using MQTT-Suffix " << incMqtt << std::endl;
perfCounter.setCpuId(i);
perfCounter.setMqtt(incMqtt);
_perfCounters.push_back(perfCounter);
}
} else {
cout << " Counter \"" << counter.second.data() << "\" has bad values! Ignoring..." << endl;
}
......@@ -132,20 +157,19 @@ Configuration::~Configuration() {
// TODO Auto-generated destructor stub
}
//TODO extend for more required values
int Configuration::readCounter(PerfCounter& counter, boost::property_tree::iptree& config) {
BOOST_FOREACH(boost::property_tree::iptree::value_type &val, config) {
if (boost::iequals(val.first, "interval")) {
counter.setInterval(stoi(val.second.data()));
} else if (boost::iequals(val.first, "mqttsuffix")) {
counter.setStartMqtt(val.second.data());
counter.setMqtt(val.second.data());
} else if (boost::iequals(val.first, "minValues")) {
counter.setMinValues(stoull(val.second.data()));
} else if (boost::iequals(val.first, "type")) {
enumMap_t::iterator it = _enumType.find(val.second.data());
if(it != _enumType.end()) {
counter.setType(it->second);
cout << " Type: " << val.second.data() << " (= " << counter.getType() << ")" << endl;
cout << " Type: " << val.second.data() << " (=" << counter.getType() << ")" << endl;
} else {
cout << " Type \"" << val.second.data() << "\" not known." << endl;
return 1;
......@@ -166,8 +190,16 @@ int Configuration::readCounter(PerfCounter& counter, boost::property_tree::iptre
}
}
cout << " StartMQTT: " << counter.getStartMqtt() << endl;
cout << " StartMQTT: " << counter.getMqtt() << endl;
cout << " Interval : " << counter.getInterval() << endl;
cout << " minValues: " << counter.getMinValues() << endl;
return 0;
}
const std::string Configuration::increaseMqtt(const std::string& mqtt, int val) {
unsigned long mqttDigits = stoul(mqtt, 0, 16);
mqttDigits += val;
std::stringstream stream;
stream << std::setfill ('0') << std::setw(mqtt.length()) << std::hex << mqttDigits;
return stream.str();
}
......@@ -10,6 +10,7 @@
#include <vector>
#include <map>
#include <set>
#include "PerfCounter.h"
......@@ -18,8 +19,10 @@
class Configuration {
typedef std::vector<PerfCounter> perfCounterVector_t;
typedef std::map<std::string, PerfCounter> perfCounterMap_t; //TODO templates useful?
typedef std::map<std::string, PerfCounter> perfCounterMap_t;
typedef std::map<std::string, unsigned int> enumMap_t;
typedef std::set<std::string> mqttSet_t;
typedef struct {
int brokerPort;
std::string brokerHost;
......@@ -47,10 +50,22 @@ private:
*/
int readCounter(PerfCounter& counter, boost::property_tree::iptree& config);
/**
* Increase the mqtt by val means
* Example: increaseMqtt("003F", 1) returns "0040"
* increaseMqtt("003F", 15) returns "004D"
*
* @param mqtt The mqtt-topic to be increased
* @param val The value by which the mqtt-topic should be increased
* @return The increased mqtt-topic
*/
const std::string increaseMqtt(const std::string& mqtt, int val);
perfCounterMap_t _templateCounters;
enumMap_t _enumType;
enumMap_t _enumConfig;
mqttSet_t _mqttSuffixes;
};
#endif /* CONFIGURATION_H_ */
......@@ -54,39 +54,37 @@ void MQTTPusher::push() {
}
}
reading_t* reads = new reading_t[512];
reading_t* reads = new reading_t[1024];
std::size_t totalCount = 0;
while (keepRunning || totalCount) {
totalCount = 0;
BOOST_FOREACH(PerfCounter& counter, _cfg._perfCounters) {
for(const cpu_t& cpu : counter.getCPUs()) {
if (cpu.readingQueue->read_available() >= counter.getMinValues()) {
if (!_connected) {
if (mosquitto_reconnect(_mosq) != MOSQ_ERR_SUCCESS) {
std::cout << "mosquitto: could not reconnect to MQTT broker " << _cfg._global.brokerHost << ":" << _cfg._global.brokerPort << std::endl;
sleep(1);
} else {
_connected = true;
}
if (counter.getSizeOfReadingQueue() >= counter.getMinValues()) {
if (!_connected) {
if (mosquitto_reconnect(_mosq) != MOSQ_ERR_SUCCESS) {
std::cout << "mosquitto: could not reconnect to MQTT broker " << _cfg._global.brokerHost << ":" << _cfg._global.brokerPort << std::endl;
sleep(1);
} else {
_connected = true;
}
}
if (_connected) {
std::size_t count = cpu.readingQueue->pop(reads, 512);
totalCount+= count;
if (_connected) {
std::size_t count = counter.popReadingQueue(reads, 1024);
totalCount+= count;
#ifdef DEBUG
std::cout << counter.getName() << "/CPU" << cpu.id << " has read " << count << " values:" << std::endl;
for (std::size_t i=0; i<count; i++) {
std::cout << " " << reads[i].timestamp << " " << reads[i].value << std::endl;
}
std::cout << counter.getName() << "/CPU" << counter.getCpuId() << " has read " << count << " values:" << std::endl;
for (std::size_t i=0; i<count; i++) {
std::cout << " " << reads[i].timestamp << " " << reads[i].value << std::endl;
}
#endif
if (mosquitto_publish(_mosq, NULL, (_cfg._global.mqttPrefix+cpu.mqtt).c_str(), sizeof(reading_t)*count, reads, 0, false) != MOSQ_ERR_SUCCESS) {
std::cerr << "mosquitto: could not send message! Trying again later" << std::endl;
mosquitto_disconnect(_mosq);
_connected = false;
cpu.readingQueue->push(reads, count);
sleep(5);
break;
}
if (mosquitto_publish(_mosq, NULL, (_cfg._global.mqttPrefix+counter.getMqtt()).c_str(), sizeof(reading_t)*count, reads, 0, false) != MOSQ_ERR_SUCCESS) {
std::cerr << "mosquitto: could not send message! Trying again later" << std::endl;
mosquitto_disconnect(_mosq);
_connected = false;
counter.pushReadingQueue(reads, count);
sleep(5);
break;
}
}
}
......
......@@ -8,10 +8,8 @@
#include "PerfCounter.h"
#include "timestamp.h"
#include <string>
#include <sstream>
#include <unistd.h>
#include <sys/ioctl.h>
#include <sys/sysinfo.h>
#include <linux/perf_event.h>
#include <linux/hw_breakpoint.h>
#include <asm/unistd.h>
......@@ -21,36 +19,26 @@ extern volatile int keepRunning;
PerfCounter::PerfCounter(const std::string name) {
_name = name;
_startMqtt = "";
_cpuId = 0;
_mqtt = "";
_minValues = 1;
_interval = 1000;
_type = 0;
_config = 0;
_fd = -1;
_latestValue.timestamp = 0;
_latestValue.value = 0;
_timer = NULL;
cpu_t cpu;
cpu.latestValue.timestamp = 0;
cpu.latestValue.value = 0;
//TODO get_nprocs() best possibility? what about hyper threading?
//with hyper-threading present, perf_event_open counts per logical cpu
for (int i = 0; i < get_nprocs(); i++) {
cpu.id = i;
cpu.fd = -1;
cpu.mqtt = "";
cpu.readingQueue = NULL;
_cpus.push_back(cpu);
}
_readingQueue = NULL;
}
PerfCounter::~PerfCounter() {
for(cpu_t& cpu : _cpus) {
if(cpu.fd != -1) {
close(cpu.fd);
}
if (cpu.readingQueue != NULL) {
delete cpu.readingQueue;
}
if(_fd != -1) {
close(_fd);
}
if (_readingQueue != NULL) {
delete _readingQueue;
}
}
......@@ -58,22 +46,21 @@ void PerfCounter::read() {
reading_t reading;
reading.timestamp = getTimestamp();
for (cpu_t& cpu : _cpus) {
long long count;
if (cpu.fd == -1) {continue;}
::read(cpu.fd, &count, sizeof(long long));
if (count >= cpu.latestValue.value) {
reading.value = count - cpu.latestValue.value;
} else {
//the counter overflow since last read
//but what is the max-value of a counter? (=long long??)
//TODO how to proper handle overflow?
reading.value = count; //+ (maxVal - cpu.latestValue.value)
}
cpu.readingQueue->push(reading);
cpu.latestValue.value = count;
cpu.latestValue.timestamp = reading.timestamp;
long long count;
if (_fd == -1) {return;}
::read(_fd, &count, sizeof(long long));
if (count >= _latestValue.value) {
reading.value = count - _latestValue.value;
} else {
//the counter overflow since last read
//but what is the max-value of a counter? (=long long??)
//TODO how to proper handle overflow?
reading.value = count; //+ (maxVal - cpu.latestValue.value)
}
_readingQueue->push(reading);
_latestValue.value = count;
_latestValue.timestamp = reading.timestamp;
}
void PerfCounter::readAsync() {
......@@ -87,13 +74,11 @@ void PerfCounter::readAsync() {
}
void PerfCounter::startPolling(boost::asio::io_service& io) {
for(cpu_t& cpu : _cpus) {
if (cpu.readingQueue == NULL) {
cpu.readingQueue = new boost::lockfree::spsc_queue<reading_t>(512);
}
if (_readingQueue == NULL) {
_readingQueue = new boost::lockfree::spsc_queue<reading_t>(1024);
}
//open perf-counter for every CPU
//open perf-counter
struct perf_event_attr pe;
memset(&pe, 0, sizeof(struct perf_event_attr));
......@@ -104,30 +89,17 @@ void PerfCounter::startPolling(boost::asio::io_service& io) {
pe.exclude_kernel = 1; //TODO want to count kernel + hypervisor?
pe.exclude_hv = 1;
for (cpu_t& cpu : _cpus) {
cpu.mqtt = increase(_startMqtt, cpu.id);
#ifdef DEBUG
std::cout << " Counter " << _name << "|CPU" << cpu.id << " using MQTT-Suffix " << cpu.mqtt << std::endl;
#endif
//perf_event_open()
cpu.fd = syscall(__NR_perf_event_open, &pe, -1, cpu.id, -1, 0);
if (cpu.fd == -1) {
std::cerr << "Error opening performance-counter \"" << _name << "\" on CPU " << cpu.id << std::endl;
continue;
}
ioctl(cpu.fd, PERF_EVENT_IOC_RESET, 0);
ioctl(cpu.fd, PERF_EVENT_IOC_ENABLE, 0);
//perf_event_open()
_fd = syscall(__NR_perf_event_open, &pe, -1, _cpuId, -1, 0);
if (_fd == -1) {
std::cerr << "Error opening performance-counter \"" << _name << "\" on CPU " << _cpuId << std::endl;
return;
}
ioctl(_fd, PERF_EVENT_IOC_RESET, 0);
ioctl(_fd, PERF_EVENT_IOC_ENABLE, 0);
_timer = new boost::asio::deadline_timer(io, boost::posix_time::seconds(0));
_timer->async_wait(std::bind(&PerfCounter::readAsync, this));
}
const std::string PerfCounter::increase(const std::string& mqtt, int val) {
unsigned long mqttDigits = stoul(mqtt, 0, 16);
mqttDigits += val;
std::stringstream stream;
stream << std::setfill ('0') << std::setw(mqtt.length()) << std::hex << mqttDigits;
return stream.str();
}
......@@ -20,15 +20,6 @@ typedef struct {
uint64_t timestamp;
} reading_t;
typedef struct {
int id;
int fd;
std::string mqtt;
reading_t latestValue;
//TODO better way of storing values for different CPUs?
boost::lockfree::spsc_queue<reading_t>* readingQueue;
} cpu_t;
class PerfCounter {
public:
......@@ -43,12 +34,20 @@ public:
_name = name;
}
const std::string& getStartMqtt() const {
return _startMqtt;
int getCpuId() const {
return _cpuId;
}
void setCpuId(int cpuId) {
_cpuId = cpuId;
}
const std::string& getMqtt() const {
return _mqtt;
}
void setStartMqtt(const std::string& mqtt) {
_startMqtt = mqtt;
void setMqtt(const std::string& mqtt) {
_mqtt = mqtt;
}
unsigned getMinValues() const {
......@@ -83,10 +82,6 @@ public:
_config = config;
}
const std::vector<cpu_t>& getCPUs() const {
return _cpus;
}
/*
const std::size_t getSizeOfReadingQueue() const {
return _readingQueue->read_available();
}
......@@ -97,7 +92,7 @@ public:
void pushReadingQueue(reading_t *reads, std::size_t count) const {
_readingQueue->push(reads, count);
}*/
}
void read();
......@@ -105,28 +100,19 @@ public:
void startPolling(boost::asio::io_service& io);
/**
* Increase the mqtt by val means
* Example: increase("003F", 1) returns "0040"
* increase("003F", 15) returns "004D"
*
* @param mqtt The mqtt-topic to be increased
* @param val The value by which the mqtt-topic should be increased
* @return The increased mqtt-topic
*/
const std::string increase(const std::string& mqtt, int val);
private:
std::string _name;
std::string _startMqtt;
int _cpuId;
std::string _mqtt;
unsigned int _minValues;
unsigned int _interval;
unsigned int _type;
unsigned int _config;
std::vector<cpu_t> _cpus;
int _fd;
reading_t _latestValue;
boost::asio::deadline_timer* _timer;
boost::lockfree::spsc_queue<reading_t>* _readingQueue;
};
#endif /* PERFCOUNTER_H_ */
......@@ -79,6 +79,11 @@ int main(int argc, char** argv) {
boost::thread_group threads;
Configuration cfg(argv[argc-1]);
//keepRunning set to 0 if construction of cfg failed
if (!keepRunning) {
return 1;
}
//read in optional arguments
char c;
while ((c = getopt(argc, argv, "b:t:dh")) != -1) {
......@@ -105,7 +110,7 @@ int main(int argc, char** argv) {
BOOST_FOREACH(PerfCounter& counter, cfg._perfCounters) {
#ifdef DEBUG
cout << "Starting sensor " << counter.getName() << endl;
cout << "Starting sensor " << counter.getName() << " on CPU " << counter.getCpuId() << endl;
#endif
counter.startPolling(io);
}
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment