Commit 7b8cced8 authored by Micha Mueller's avatar Micha Mueller
Browse files

Add generic class-structure for a dcdb pusher

Most of the code is copied from sysfspusher and adapted to match perfpusher
TODO: think of a suitable config-file-layout
TODO: insert new code to read perf-counters
parent 1ae4f292
/*
* Configuration.cpp
*
* Created on: 13.12.2017
* Author: Micha Mueller
*/
#include "Configuration.h"
#include <iostream>
#include <string>
#include <linux/perf_event.h>
#include <boost/foreach.hpp>
#include <boost/property_tree/info_parser.hpp>
#include <boost/algorithm/string.hpp>
using namespace std;
Configuration::Configuration(std::string cfgFile) {
//set up enum-maps to map string from cfgFile to an enum value defined in linux/perf_event.h
enumType["PERF_TYPE_HARDWARE"] = PERF_TYPE_HARDWARE;
enumType["PERF_TYPE_SOFTWARE"] = PERF_TYPE_SOFTWARE;
enumType["PERF_TYPE_TRACEPOINT"] = PERF_TYPE_TRACEPOINT;
enumType["PERF_TYPE_HW_CACHE"] = PERF_TYPE_HW_CACHE;
enumType["PERF_TYPE_RAW"] = PERF_TYPE_RAW;
enumType["PERF_TYPE_BREAKPOINT"] = PERF_TYPE_BREAKPOINT;
//TODO set up map for config enum
//set default values for global variables
_global.brokerHost = "";
_global.brokerPort = 1883;
_global.mqttPrefix = "";
boost::property_tree::iptree cfg;
boost::property_tree::read_info(cfgFile, cfg);
//read global variables
BOOST_FOREACH(boost::property_tree::iptree::value_type &global, cfg.get_child("global")) {
cout << global.first << " " << global.second.data() << endl;
if(boost::iequals(global.first, "mqttBroker")) {
_global.brokerHost = global.second.data();
size_t pos = _global.brokerHost.find(":");
if (pos != string::npos) {
_global.brokerPort = stoi(_global.brokerHost.substr(pos+1));
_global.brokerHost.erase(pos);
}
} else if(boost::iequals(global.first, "mqttprefix")) {
_global.mqttPrefix = global.second.data();
if (_global.mqttPrefix[_global.mqttPrefix.length()-1] != '/') {
_global.mqttPrefix.append("/");
}
} else if (boost::iequals(global.first, "threads")) {
_global.threads = stoi(global.second.data());
} else {
cout << " Value \"" << global.first << "\" not recognized. Omitting..." << endl;
}
}
//read template counters
BOOST_FOREACH(boost::property_tree::iptree::value_type &counter, cfg.get_child("CounterTemplate")) {
if (boost::iequals(counter.first, "counter")) {
cout << "Template Counter \"" << counter.second.data() << "\"" << endl;
if (!counter.second.empty()) {
PerfCounter perfCounter(counter.second.data());
readCounter(perfCounter, counter.second);
_templateCounters.insert(perfCounterMap_t::value_type(perfCounter.getName(), perfCounter));
}
}
}
//read one counter at a time
BOOST_FOREACH(boost::property_tree::iptree::value_type &counter, cfg.get_child("counters")) {
if (boost::iequals(counter.first, "counter")) {
cout << "Counter \"" << counter.second.data() << "\"" << endl;
if (!counter.second.empty()) {
PerfCounter perfCounter(counter.second.data());
//first check if default counter is given
boost::optional<boost::property_tree::iptree&> defaultC = counter.second.get_child_optional("default");
if(defaultC) {
cout << " Using \"" << defaultC.get().data() << "\" as default." << endl;
perfCounterMap_t::iterator it = _templateCounters.find(defaultC.get().data());
if(it != _templateCounters.end()) {
perfCounter = it->second;
perfCounter.setName(counter.second.data());
} else {
cout << "Template counter \"" << defaultC.get().data() << "\" not found! Using standard values." << endl;
}
}
//read remaining values
readCounter(perfCounter, counter.second);
_perfCounters.push_back(perfCounter);
}
}
}
}
Configuration::~Configuration() {
// TODO Auto-generated destructor stub
}
//TODO extend for more required values
void Configuration::readCounter(PerfCounter& counter, boost::property_tree::iptree& config) {
BOOST_FOREACH(boost::property_tree::iptree::value_type &s, config) {
if (boost::iequals(s.first, "interval")) {
counter.setInterval(stoi(s.second.data()));
} else if (boost::iequals(s.first, "mqttsuffix")) {
counter.setMqtt(s.second.data());
} else if (boost::iequals(s.first, "minValues")) {
counter.setMinValues(stoull(s.second.data()));
} else if (boost::iequals(s.first, "default")) {
//avoid unnecessary "Value not recognized" message
} else {
cout << " Value \"" << s.first << "\" not recognized. Omitting..." << endl;
}//TODO read in type+config values (see enum-maps)
}
cout << " MQTT : " << counter.getMqtt() << endl;
cout << " Interval: " << counter.getInterval() << endl;
cout << " minValues:" << counter.getMinValues() << endl;
return;
}
/*
* Configuration.h
*
* Created on: 13.12.2017
* Author: Micha Mueller
*/
#ifndef CONFIGURATION_H_
#define CONFIGURATION_H_
#include <vector>
#include <map>
#include "PerfCounter.h"
#include <boost/property_tree/ptree.hpp>
class Configuration {
typedef std::vector<PerfCounter> perfCounterVector_t;
typedef std::map<std::string, PerfCounter> perfCounterMap_t; //TODO templates useful?
typedef struct {
int brokerPort;
std::string brokerHost;
std::string mqttPrefix;
uint32_t threads;
} global_t;
public:
/**
* Read the configuration for perfpusher.
* @param cfgFile Name of the configuration file
*/
Configuration(std::string cfgFile);
virtual ~Configuration();
perfCounterVector_t _perfCounters;
global_t _global;
private:
/**
* Set the variables of counter according to the values specified in config.
* @param counter The counter to be configured
* @param config A property(sub)tree containing the values
*/
void readCounter(PerfCounter& counter, boost::property_tree::iptree& config);
perfCounterVector_t _templateCounters;
std::map<std::string, unsigned int> enumType;
std::map<std::string, unsigned int> enumConfig;
};
#endif /* CONFIGURATION_H_ */
/*
* MQTTPusher.cpp
*
* Created on: 13.12.2017
* Author: Micha Mueller
*/
#include "MQTTPusher.h"
#include <boost/foreach.hpp>
MQTTPusher::MQTTPusher(Configuration& cfg) :
_cfg(cfg), _connected(false) {
//print some info
int mosqMajor, mosqMinor, mosqRevision;
mosquitto_lib_version(&mosqMajor, &mosqMinor, &mosqRevision);
std::cout << "mosquitto " << mosqMajor << "." << mosqMinor << "." << mosqRevision << std::endl;
char hostname[256];
if (gethostname(hostname, 255) != 0) {
fprintf(stderr, "Cannot get hostname.\n");
exit(EXIT_FAILURE);
}
hostname[255] = '\0';
std::cout << "Hostname: " << hostname << std::endl;
mosquitto_lib_init();
std::string clientID(_cfg._global.mqttPrefix);
_mosq = mosquitto_new(_cfg._global.mqttPrefix.c_str(), false, NULL);
if (!_mosq) {
perror(NULL);
exit(EXIT_FAILURE);
}
}
MQTTPusher::~MQTTPusher() {
if(_connected) {
mosquitto_disconnect(_mosq);
}
}
void MQTTPusher::push() {
while (keepRunning && !_connected) {
if (mosquitto_connect(_mosq, _cfg._global.brokerHost.c_str(), _cfg._global.brokerPort, 1000) != MOSQ_ERR_SUCCESS) {
std::cout << "Mosquitto: could not connect to MQTT broker "
<< _cfg._global.brokerHost << ":"
<< _cfg._global.brokerPort << std::endl;
sleep(1);
} else {
_connected = true;
}
}
reading_t* reads = new reading_t[1024];
std::size_t totalCount = 0;
while (keepRunning || totalCount) {
totalCount = 0;
BOOST_FOREACH(PerfCounter& counter, _cfg._perfCounters) {
if (counter.getSizeOfReadingQueue() >= counter.getMinValues()) {
if (!_connected) {
if (mosquitto_reconnect(_mosq) != MOSQ_ERR_SUCCESS) {
std::cout << "mosquitto: could not reconnect to MQTT broker " << _cfg._global.brokerHost << ":" << _cfg._global.brokerPort << std::endl;
sleep(1);
} else {
_connected = true;
}
}
if (_connected) {
std::size_t count = counter.popReadingQueue(reads, 1024);
totalCount+= count;
#ifdef DEBUG
std::cout << counter.getName() << " has read " << count << " values:" << std::endl;
for (std::size_t i=0; i<count; i++) {
std::cout << " " << reads[i].timestamp << " " << reads[i].value << std::endl;
}
#endif
if (mosquitto_publish(_mosq, NULL, (_cfg._global.mqttPrefix+counter.getMqtt()).c_str(), sizeof(reading_t)*count, reads, 0, false) != MOSQ_ERR_SUCCESS) {
std::cerr << "mosquitto: could not send message! Trying again later" << std::endl;
mosquitto_disconnect(_mosq);
_connected = false;
counter.pushReadingQueue(reads, count);
sleep(5);
break;
}
}
}
}
sleep(1);
}
}
/*
* MQTTPusher.h
*
* Created on: 13.12.2017
* Author: Micha Mueller
*/
#ifndef MQTTPUSHER_H_
#define MQTTPUSHER_H_
#include "Configuration.h"
#include <mosquitto.h>
class MQTTPusher {
public:
MQTTPusher(Configuration& cfg);
virtual ~MQTTPusher();
void push();
private:
Configuration& _cfg;
struct mosquitto* _mosq;
bool _connected;
};
#endif /* MQTTPUSHER_H_ */
TARGET = sysfspusher
DCDBBASEPATH ?= $(realpath $(dir $(lastword $(MAKEFILE_LIST)))/..)
DCDBCOREPATH ?= $(DCDBBASEPATH)/dcdb
include $(DCDBCOREPATH)/common.mk
CXXFLAGS = -std=c++0x -O2 -g -Wall -Werror -Wno-unused-local-typedefs -Wno-unknown-warning-option -Wno-deprecated-declarations -I$(DCDBDEPLOYPATH)/include -I$(DCDBBASEPATH)/include
CXXFLAGS = -O2 -g -Wall -Wno-unused-function $(CXX11FLAGS) -I$(DCDBBASEPATH)/dcdb/include -I$(DCDBDEPLOYPATH)/include
LIBS = -L../deps/mosquitto_build/lib -L$(DCDBDEPLOYPATH)/lib/ -lmosquitto -lboost_system -lboost_thread
OBJS = perfpusher.o Configuration.o PerfCounter.o MQTTPusher.o
$(TARGET): $(OBJS)
$(CXX) -o $(TARGET) $(OBJS) $(LIBS)
all: $(TARGET)
clean:
rm -f $(OBJS) $(TARGET)
install: $(TARGET)
install $(TARGET) $(DCDBDEPLOYPATH)/bin/
install -m 644 $(TARGET).conf $(DCDBDEPLOYPATH)/etc
/*
* PerfCounter.cpp
*
* Created on: 11.12.2017
* Author: Micha Mueller
*/
#include "PerfCounter.h"
#include "timestamp.h"
#include <functional>
extern volatile int keepRunning;
PerfCounter::PerfCounter(const std::string name) {
_name = name;
_mqtt = "";
_minValues = 1;
_fileDescriptor = -1;
_interval = 0;
_lastValue.value = 0;
_lastValue.timestamp = 0;
_timer = NULL;
_latestReading = 0;
_readingQueue = NULL;
}
PerfCounter::~PerfCounter() {
if(_readingQueue != NULL) {
delete _readingQueue;
_readingQueue = NULL;
}
close(fd); //TODO keep an eye here
}
void PerfCounter::read() {
reading_t reading;
reading.timestamp = getTimestamp();
//TODO
_readingQueue->push(reading);
}
void PerfCounter::readAsync() {
uint64_t now = getTimestamp();
read();
if (_timer != NULL && keepRunning) {
uint64_t next = now + MS_TO_NS(_interval);
_timer->expires_at(timestamp2ptime(next));
_timer->async_wait(std::bind(&PerfCounter::readAsync, this));
}
}
void PerfCounter::startPolling(boost::asio::io_service& io) {
if(_readingQueue == NULL) {
_readingQueue = new boost::lockfree::spsc_queue<reading_t>(1024);
}
//TODO open file descriptor?
_timer = new boost::asio::deadline_timer(io, boost::posix_time::seconds(0));
_timer->async_wait(std::bind(&PerfCounter::readAsync, this));
}
/*
* PerfCounter.h
*
* Created on: 11.12.2017
* Author: Micha Mueller
*/
#ifndef PERFCOUNTER_H_
#define PERFCOUNTER_H_
#include <string>
#include <boost/asio.hpp>
#include <boost/lockfree/spsc_queue.hpp>
typedef struct {
uint64_t value;
uint64_t timestamp;
} reading_t;
class PerfCounter {
public:
PerfCounter(const std::string name);
virtual ~PerfCounter();
const std::string& getName() const {
return _name;
}
void setName(const std::string& name) {
_name = name;
}
const std::string& getMqtt() const {
return _mqtt;
}
void setMqtt(const std::string& mqtt) {
_mqtt = mqtt;
}
unsigned getMinValues() const {
return _minValues;
}
void setMinValues(unsigned minValues) {
_minValues = minValues;
}
unsigned getInterval() const {
return _interval;
}
void setInterval(unsigned interval) {
_interval = interval;
}
const std::size_t getSizeOfReadingQueue() const {
return _readingQueue->read_available();
}
std::size_t popReadingQueue(reading_t *reads, std::size_t max) const {
return _readingQueue->pop(reads, max);
}
void pushReadingQueue(reading_t *reads, std::size_t count) const {
_readingQueue->push(reads, count);
}
void read();
void readAsync();
void startPolling(boost::asio::io_service& io);
private:
std::string _name;
std::string _mqtt;
unsigned int _minValues;
unsigned int _interval;
unsigned int _type;
unsigned int _config;
int _fileDescriptor;
reading_t _latestValue;
boost::asio::deadline_timer* _timer;
uint64_t _latestReading;
boost::lockfree::spsc_queue<reading_t>* _readingQueue;
};
#endif /* PERFCOUNTER_H_ */
//================================================================================
// Name : perfpusher.cpp
// Author : Micha Mueller
// Copyright : Leibniz Supercomputing Centre
// Description : Main functions for the DCDB perf-event Pusher
//================================================================================
//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2011-2016 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//================================================================================
#include <dcdbdaemon.h>
#include <functional>
#include "PerfCounter.h"
#include "Configuration.h"
#include "MQTTPusher.h"
#include <boost/foreach.hpp>
#include <boost/asio.hpp>
#include <boost/thread/thread.hpp>
using namespace std;
volatile int keepRunning;
int daemonize = 0;
void sigHandler(int sig) {
cout << "Received SIGINT. Stopping perf-counter threads and flushing MQTT queues." << endl;
keepRunning = 0;
}
void printSyntax()
{
/*
1 2 3 4 5 6 7 8
012345678901234567890123456789012345678901234567890123456789012345678901234567890
*/
cout << "Usage:" << endl;
cout << " perfpusher [-d] [-b<host>] [-t<string>] <configfile>" << endl;
cout << " perfpusher -h" << endl;
cout << endl;
cout << "Options:" << endl;
cout << " -b<host>[:port] MQTT broker"<< endl;
cout << " -t<string> MQTT topic prefix" << endl;
cout << endl;
cout << " -d Daemonize" << endl;
cout << " -h This help page" << endl;
cout << endl;
}
int main(int argc, char** argv) {
if (argc <= 1) {
std::cout << "Please specify a config file" << std::endl;
return 1;
}
keepRunning = 1;
signal(SIGINT, sigHandler);
boost::asio::io_service io;
boost::thread_group threads;
Configuration cfg(argv[argc-1]);
//read in optional arguments
char c;
while ((c = getopt(argc, argv, "b:t:dh")) != -1) {
switch (c)
{
case 'b':
cfg._global.brokerHost = optarg;
break;
case 't':
cfg._global.mqttPrefix = optarg;
break;
case 'd':
daemonize = 1;
break;
case 'h':
printSyntax();
return 1;
break;
default:
if (c != '?') cerr << "Unknown parameter: " << c << endl;
return 1;
}
}
BOOST_FOREACH(PerfCounter& counter, cfg._perfCounters) {
#ifdef DEBUG
cout << "Starting sensor " << sensor.getName() << endl;
#endif
counter.startPolling(io);
}
if (daemonize) {
dcdbdaemon();
}
for(size_t i = 0; i < cfg._global.threads; i++) {
threads.create_thread(bind(static_cast< size_t (boost::asio::io_service::*) () >(&boost::asio::io_service::run), &io));
}
MQTTPusher mqttPusher(cfg);
boost::thread mqttThread(bind(&MQTTPusher::push, &mqttPusher));
mqttThread.join();
threads.join_all();
return 0;
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment