Commit f8705d95 authored by Micha Mueller's avatar Micha Mueller
Browse files

Rework Configuration-class. Minor fixes

parent 0a4faf34
......@@ -8,11 +8,7 @@
#include "Configuration.h"
#include <iostream>
#include <string>
#include <sstream>
#include <unistd.h>
#include <iomanip>
#include <sys/sysinfo.h>
#include <linux/perf_event.h>
#include <boost/foreach.hpp>
#include <boost/property_tree/info_parser.hpp>
......@@ -20,51 +16,35 @@
using namespace std;
extern volatile int keepRunning;
Configuration::Configuration(std::string cfgFile) {
//set up enum-maps to map string from cfgFile to an enum value defined in linux/perf_event.h
_enumType["PERF_TYPE_HARDWARE"] = PERF_TYPE_HARDWARE;
_enumType["PERF_TYPE_SOFTWARE"] = PERF_TYPE_SOFTWARE;
_enumType["PERF_TYPE_TRACEPOINT"] = PERF_TYPE_TRACEPOINT;
_enumType["PERF_TYPE_HW_CACHE"] = PERF_TYPE_HW_CACHE;
_enumType["PERF_TYPE_RAW"] = PERF_TYPE_RAW;
_enumType["PERF_TYPE_BREAKPOINT"] = PERF_TYPE_BREAKPOINT;
//if type==PERF_TYPE_HARDWARE
_enumConfig["PERF_COUNT_HW_CPU_CYCLES"] = PERF_COUNT_HW_CPU_CYCLES;
_enumConfig["PERF_COUNT_HW_INSTRUCTIONS"] = PERF_COUNT_HW_INSTRUCTIONS;
_enumConfig["PERF_COUNT_HW_CACHE_REFERENCES"] = PERF_COUNT_HW_CACHE_REFERENCES;
_enumConfig["PERF_COUNT_HW_CACHE_MISSES"] = PERF_COUNT_HW_CACHE_MISSES;
_enumConfig["PERF_COUNT_HW_BRANCH_INSTRUCTIONS"] = PERF_COUNT_HW_BRANCH_INSTRUCTIONS;
_enumConfig["PERF_COUNT_HW_BRANCH_MISSES"] = PERF_COUNT_HW_BRANCH_MISSES;
_enumConfig["PERF_COUNT_HW_BUS_CYCLES"] = PERF_COUNT_HW_BUS_CYCLES;
_enumConfig["PERF_COUNT_HW_STALLED_CYCLES_FRONTEND"] = PERF_COUNT_HW_STALLED_CYCLES_FRONTEND;
_enumConfig["PERF_COUNT_HW_STALLED_CYCLES_BACKEND"] = PERF_COUNT_HW_STALLED_CYCLES_BACKEND;
_enumConfig["PERF_COUNT_HW_REF_CPU_CYCLES"] = PERF_COUNT_HW_REF_CPU_CYCLES;
//if type==PERF_TYPE_SOFTWARE
_enumConfig["PERF_COUNT_SW_CPU_CLOCK"] = PERF_COUNT_SW_CPU_CLOCK;
_enumConfig["PERF_COUNT_SW_TASK_CLOCK"] = PERF_COUNT_SW_TASK_CLOCK;
_enumConfig["PERF_COUNT_SW_PAGE_FAULTS"] = PERF_COUNT_SW_PAGE_FAULTS;
_enumConfig["PERF_COUNT_SW_CONTEXT_SWITCHES"] = PERF_COUNT_SW_CONTEXT_SWITCHES;
_enumConfig["PERF_COUNT_SW_CPU_MIGRATIONS"] = PERF_COUNT_SW_CPU_MIGRATIONS;
_enumConfig["PERF_COUNT_SW_PAGE_FAULTS_MIN"] = PERF_COUNT_SW_PAGE_FAULTS_MIN;
_enumConfig["PERF_COUNT_SW_PAGE_FAULTS_MAJ"] = PERF_COUNT_SW_PAGE_FAULTS_MAJ;
_enumConfig["PERF_COUNT_SW_ALIGNMENT_FAULTS"] = PERF_COUNT_SW_ALIGNMENT_FAULTS;
_enumConfig["PERF_COUNT_SW_EMULATION_FAULTS"] = PERF_COUNT_SW_EMULATION_FAULTS;
_enumConfig["PERF_COUNT_SW_DUMMY"] = PERF_COUNT_SW_DUMMY;
//TODO set up map for rest of config enum
Configuration::Configuration(const std::string& cfgFilePath) :
_cfgFilePath(cfgFilePath) {
if (_cfgFilePath[_cfgFilePath.length()-1] != '/') {
_cfgFilePath.append("/");
}
//set default values for global variables
_global.brokerHost = "";
_global.brokerPort = 1883;
_global.mqttPrefix = "";
_global.threads = 1;
}
Configuration::~Configuration() {
// TODO Auto-generated destructor stub
}
bool Configuration::read() {
std::string globalConfig = _cfgFilePath;
globalConfig.append("global.conf");
boost::property_tree::iptree cfg;
boost::property_tree::read_info(cfgFile, cfg);
try {
boost::property_tree::read_info(globalConfig, cfg);
} catch (boost::property_tree::info_parser_error& e) {
cout << "global.conf not found! Please make sure the config-path is correct." << endl;
return false;
}
//read global variables
BOOST_FOREACH(boost::property_tree::iptree::value_type &global, cfg.get_child("global")) {
......@@ -88,129 +68,47 @@ Configuration::Configuration(std::string cfgFile) {
}
}
//read template counters
BOOST_FOREACH(boost::property_tree::iptree::value_type &counter, cfg.get_child("CounterTemplate")) {
if (boost::iequals(counter.first, "counter")) {
cout << "Template Counter \"" << counter.second.data() << "\"" << endl;
if (!counter.second.empty()) {
PerfCounter perfCounter(counter.second.data());
if(!readCounter(perfCounter, counter.second)) {
_templateCounters.insert(perfCounterMap_t::value_type(perfCounter.getName(), perfCounter));
} else {
cout << "Template Counter \"" << counter.second.data() << "\" has bad values! Ignoring..." << endl;
}
}
}
std::string sysfsConfig = _cfgFilePath;
sysfsConfig.append("sysfs.conf");
ifstream sysfs(sysfsConfig.c_str());
if (sysfs.good()) {
cout << "sysfs.conf found" << endl;
//sysfs.conf exists. Open sysfs.so
}
//read one counter at a time
BOOST_FOREACH(boost::property_tree::iptree::value_type &counter, cfg.get_child("counters")) {
if (boost::iequals(counter.first, "counter")) {
cout << "Counter \"" << counter.second.data() << "\"" << endl;
if (!counter.second.empty()) {
PerfCounter perfCounter(counter.second.data());
//first check if default counter is given
boost::optional<boost::property_tree::iptree&> defaultC = counter.second.get_child_optional("default");
if(defaultC) {
cout << " Using \"" << defaultC.get().data() << "\" as default." << endl;
perfCounterMap_t::iterator it = _templateCounters.find(defaultC.get().data());
if(it != _templateCounters.end()) {
perfCounter = it->second;
perfCounter.setName(counter.second.data());
} else {
cout << " Template counter \"" << defaultC.get().data() << "\" not found! Using standard values." << endl;
}
}
//read remaining values
if(!readCounter(perfCounter, counter.second)) {
//create distinct perfCounter and mqttSuffix per CPU
string startMqtt = perfCounter.getMqtt();
//customize perfCounter for every CPU
for (int i = 0; i < get_nprocs(); i++) {
string incMqtt = increaseMqtt(startMqtt, i);
//check if mqtt-suffix is used twice
auto returnIt = _mqttSuffixes.insert(incMqtt);
if (!returnIt.second) {
cout << "ERROR: Mqtt-Topic (Suffix " << incMqtt << ") is used twice! Aborting" << endl;
keepRunning = 0;
return;
}
std::cout << " CPU " << perfCounter.getCpuId() << " using MQTT-Suffix " << incMqtt << std::endl;
perfCounter.setCpuId(i);
perfCounter.setMqtt(incMqtt);
_perfCounters.push_back(perfCounter);
}
} else {
cout << " Counter \"" << counter.second.data() << "\" has bad values! Ignoring..." << endl;
}
}
}
std::string perfeventConfig = _cfgFilePath;
perfeventConfig.append("perfevent.conf");
ifstream perfevent(perfeventConfig.c_str());
if (perfevent.good()) {
cout << "perfevent.conf found" << endl;
//perfevent.conf exists. Open perfevent.so
}
}
Configuration::~Configuration() {
// TODO Auto-generated destructor stub
//TODO check for more config-files
return true;
}
int Configuration::readCounter(PerfCounter& counter, boost::property_tree::iptree& config) {
bool Configuration::readSensorVals(Sensor& sensor, boost::property_tree::iptree& config) {
BOOST_FOREACH(boost::property_tree::iptree::value_type &val, config) {
if (boost::iequals(val.first, "interval")) {
counter.setInterval(stoi(val.second.data()));
} else if (boost::iequals(val.first, "mqttsuffix")) {
counter.setMqtt(val.second.data());
sensor.setInterval(stoull(val.second.data()));
} else if (boost::iequals(val.first, "minValues")) {
counter.setMinValues(stoull(val.second.data()));
} else if (boost::iequals(val.first, "type")) {
enumMap_t::iterator it = _enumType.find(val.second.data());
if(it != _enumType.end()) {
counter.setType(it->second);
cout << " Type: " << val.second.data() << " (=" << counter.getType() << ")" << endl;
} else {
cout << " Type \"" << val.second.data() << "\" not known." << endl;
return 1;
}
} else if (boost::iequals(val.first, "config")) {
if (counter.getType() == PERF_TYPE_BREAKPOINT) {
//leave config zero
} else if (counter.getType() == PERF_TYPE_RAW) {
//read in custom hex-value
//TODO read in string-name and map to hex-value here?
// see info to perf_event_open and libpfm4
unsigned long config = stoul(val.second.data(), 0, 16);
counter.setConfig(config);
cout << " Config: Raw value: " << counter.getConfig() << endl;
} else {
enumMap_t::iterator it = _enumConfig.find(val.second.data());
if(it != _enumConfig.end()) {
counter.setConfig(it->second);
cout << " Config: " << val.second.data() << " (= " << counter.getConfig() << ")" << endl;
} else {
cout << " Config \"" << val.second.data() << "\" not known." << endl;
return 1;
}
}
} else if (boost::iequals(val.first, "default")) {
//avoid unnecessary "Value not recognized" message
sensor.setMinValues(stoull(val.second.data()));
} else {
cout << " Value \"" << val.first << "\" not recognized. Omitting..." << endl;
//
}
}
cout << " StartMQTT: " << counter.getMqtt() << endl;
cout << " Interval : " << counter.getInterval() << endl;
cout << " minValues: " << counter.getMinValues() << endl;
return 0;
cout << " Interval : " << sensor.getInterval() << endl;
cout << " minValues: " << sensor.getMinValues() << endl;
return true;
}
const std::string Configuration::increaseMqtt(const std::string& mqtt, int val) {
unsigned long mqttDigits = stoul(mqtt, 0, 16);
mqttDigits += val;
std::stringstream stream;
stream << std::setfill ('0') << std::setw(mqtt.length()) << std::hex << mqttDigits;
return stream.str();
bool Configuration::checkMqtt(const std::string& mqtt) {
auto returnIt = _mqttSuffixes.insert(mqtt);
if (!returnIt.second) {
return false;
}
return true;
}
......@@ -9,19 +9,13 @@
#define CONFIGURATION_H_
#include <vector>
#include <map>
#include <set>
//#include "Sensor.h"
#include "PerfCounter.h"
#include "Sensor.h"
#include <boost/property_tree/ptree.hpp>
typedef std::vector<Sensor> sensorVector_t;
typedef std::vector<PerfCounter> perfCounterVector_t;
typedef std::map<std::string, PerfCounter> perfCounterMap_t;
typedef std::map<std::string, unsigned int> enumMap_t;
typedef std::vector<Sensor*> sensorVector_t;
typedef std::set<std::string> mqttSet_t;
typedef struct {
......@@ -35,40 +29,51 @@ class Configuration {
public:
/**
* Read the configuration for perfpusher.
* @param cfgFile Name of the configuration file
* Create new Configuration. Sets global config file to read from to cfgFile.
* @param cfgFilePath Path to where all config-files are located
*/
Configuration(std::string cfgFile);
Configuration(const std::string& cfgFilePath);
virtual ~Configuration();
perfCounterVector_t _perfCounters;
global_t _global;
/**
* Reads configuration from _cfgFile.
* Reads and sets global values as well as all sensors.
* Detects which sensor types are required and dynamically opens required libraries.
*
* @return true on success, false otherwise
*/
bool read();
private:
/**
* Set the variables of counter according to the values specified in config.
* @param counter The counter to be configured
* Read and set general sensor values (like interval, minvalues, ...).
* @param sensor The sensor to be configured
* @param config A property(sub)tree containing the values
* @return 0 on success, 1 if a required value could not be parsed
* @return True on success, false otherwise
*/
int readCounter(PerfCounter& counter, boost::property_tree::iptree& config);
bool readSensorVals(Sensor& sensor, boost::property_tree::iptree& config);
/**
* Increase the mqtt by val means
* Example: increaseMqtt("003F", 1) returns "0040"
* increaseMqtt("003F", 15) returns "004D"
*
* @param mqtt The mqtt-topic to be increased
* @param val The value by which the mqtt-topic should be increased
* @return The increased mqtt-topic
* Check if the mqtt-suffix is already in use.
* @param mqtt The MQTT-suffix to check
* @return True if the suffix is still available, false if already used
*/
const std::string increaseMqtt(const std::string& mqtt, int val);
bool checkMqtt(const std::string& mqtt);
global_t getGlobal() const {
return _global;
}
perfCounterMap_t _templateCounters;
sensorVector_t& getSensors() {
return _sensors;
}
private:
enumMap_t _enumType;
enumMap_t _enumConfig;
mqttSet_t _mqttSuffixes;
std::string _cfgFilePath;
global_t _global;
sensorVector_t _sensors;
mqttSet_t _mqttSuffixes;
};
#endif /* CONFIGURATION_H_ */
......@@ -59,8 +59,8 @@ void MQTTPusher::push() {
std::size_t totalCount = 0;
while (keepRunning || totalCount) {
totalCount = 0;
for(auto& s : _sensors) {
if (s.getSizeOfReadingQueue() >= s.getMinValues()) {
for(auto s : _sensors) {
if (s->getSizeOfReadingQueue() >= s->getMinValues()) {
if (!_connected) {
if (mosquitto_reconnect(_mosq) != MOSQ_ERR_SUCCESS) {
std::cout << "mosquitto: could not reconnect to MQTT broker " << _brokerHost << ":" << _brokerPort << std::endl;
......@@ -71,7 +71,7 @@ void MQTTPusher::push() {
}
if (_connected) {
std::size_t count = s.popReadingQueue(reads, 1024);
std::size_t count = s->popReadingQueue(reads, 1024);
totalCount+= count;
#ifdef DEBUG
std::cout << s.getName() << " has read " << count << " values:" << std::endl;
......@@ -79,11 +79,11 @@ void MQTTPusher::push() {
std::cout << " " << reads[i].timestamp << " " << reads[i].value << std::endl;
}
#endif
if (mosquitto_publish(_mosq, NULL, (_mqttPrefix+s.getMqtt()).c_str(), sizeof(reading_t)*count, reads, 0, false) != MOSQ_ERR_SUCCESS) {
if (mosquitto_publish(_mosq, NULL, (_mqttPrefix+s->getMqtt()).c_str(), sizeof(reading_t)*count, reads, 0, false) != MOSQ_ERR_SUCCESS) {
std::cerr << "mosquitto: could not send message! Trying again later" << std::endl;
mosquitto_disconnect(_mosq);
_connected = false;
s.pushReadingQueue(reads, count);
s->pushReadingQueue(reads, count);
totalCount -= count;
sleep(5);
break;
......
TARGET = dcdbpusher
DCDBBASEPATH ?= $(realpath $(dir $(lastword $(MAKEFILE_LIST)))/..)
DCDBCOREPATH ?= $(DCDBBASEPATH)/dcdb
include $(realpath $(dir $(lastword $(MAKEFILE_LIST)))/../..)/dcdb/common.mk
CXXFLAGS = -std=c++0x -O2 -g -Wall -Werror -Wno-unused-local-typedefs -Wno-unknown-warning-option -Wno-deprecated-declarations -I$(DCDBDEPLOYPATH)/include -I$(DCDBBASEPATH)/include
CXXFLAGS = -O2 -g -Wall -Wno-unused-function $(CXX11FLAGS) -I$(DCDBBASEPATH)/dcdb/include -I$(DCDBDEPLOYPATH)/include
LIBS = -L../deps/mosquitto_build/lib -L$(DCDBDEPLOYPATH)/lib/ -lmosquitto -lboost_system -lboost_thread
OBJS = dcdbpusher.o Configuration.o Sensor.o MQTTPusher.o
$(TARGET): $(OBJS)
$(CXX) -o $(TARGET) $(OBJS) $(LIBS)
all: $(TARGET)
clean:
rm -f $(OBJS) $(TARGET)
install: $(TARGET)
install $(TARGET) $(DCDBDEPLOYPATH)/bin/
install -m 644 $(TARGET).conf $(DCDBDEPLOYPATH)/etc
......@@ -11,8 +11,6 @@
#include <unistd.h>
#include <functional>
extern volatile int keepRunning;
Sensor::Sensor(const std::string name) :
_name(name), _mqtt(""), _minValues(1), _interval(1000) {
......@@ -23,7 +21,7 @@ Sensor::Sensor(const std::string name) :
_readingQueue = new boost::lockfree::spsc_queue<reading_t>(1024);
}
virtual Sensor::~Sensor() {
Sensor::~Sensor() {
delete _readingQueue;
}
......@@ -70,13 +68,3 @@ std::size_t Sensor::popReadingQueue(reading_t *reads, std::size_t max) const {
void Sensor::pushReadingQueue(reading_t *reads, std::size_t count) const {
_readingQueue->push(reads, count);
}
virtual void Sensor::readAsync() {
uint64_t now = getTimestamp();
this->read();
if (_timer != NULL && keepRunning) {
uint64_t next = now + MS_TO_NS(_interval);
_timer->expires_at(timestamp2ptime(next));
_timer->async_wait(std::bind(&(this->readAsync), this));
}
}
......@@ -47,8 +47,7 @@ public:
//have to be overwritten methods
virtual void read() = 0;
//TODO has readAsync to be overwritten??
virtual void readAsync();
virtual void readAsync() = 0;
virtual void startPolling(boost::asio::io_service& io) = 0;
protected:
......
......@@ -54,7 +54,7 @@ void printSyntax()
012345678901234567890123456789012345678901234567890123456789012345678901234567890
*/
cout << "Usage:" << endl;
cout << " dcdbpusher [-d] [-b<host>] [-t<string>] <path/to/configfile>" << endl;
cout << " dcdbpusher [-d] [-b<host>] [-t<string>] <path/to/configfiles/>" << endl;
cout << " dcdbpusher -h" << endl;
cout << endl;
......@@ -69,7 +69,7 @@ void printSyntax()
int main(int argc, char** argv) {
if (argc <= 1) {
std::cout << "Please specify a config file" << std::endl;
std::cout << "Please specify a path to the config-directory" << std::endl;
return 1;
}
......@@ -80,14 +80,13 @@ int main(int argc, char** argv) {
boost::thread_group threads;
Configuration cfg(argv[argc-1]);
global_t globalSettings = cfg.getGlobal();
sensorVector_t sensors = cfg.getSensors();
//keepRunning set to 0 if construction of cfg failed
if (!keepRunning) {
if(!cfg.read()) {
return 1;
}
global_t globalSettings = cfg.getGlobal();
sensorVector_t sensors = cfg.getSensors();
//read in optional arguments
char c;
while ((c = getopt(argc, argv, "b:t:dh")) != -1) {
......@@ -112,22 +111,22 @@ int main(int argc, char** argv) {
}
}
for(auto& s : sensors) {
for(auto s : sensors) {
#ifdef DEBUG
cout << "Starting sensor " << s.getName() << endl;
#endif
s.startPolling(io);
s->startPolling(io);
}
if (daemonize) {
dcdbdaemon();
}
for(size_t i = 0; i < cfg._global.threads; i++) {
for(size_t i = 0; i < globalSettings.threads; i++) {
threads.create_thread(bind(static_cast< size_t (boost::asio::io_service::*) () >(&boost::asio::io_service::run), &io));
}
MQTTPusher mqttPusher(cfg);
MQTTPusher mqttPusher(globalSettings.brokerPort, globalSettings.brokerHost, globalSettings.mqttPrefix, sensors);
boost::thread mqttThread(bind(&MQTTPusher::push, &mqttPusher));
mqttThread.join();
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment