Currently job artifacts in CI/CD pipelines on LRZ GitLab never expire. Starting from Wed 26.1.2022 the default expiration time will be 30 days (GitLab default). Currently existing artifacts in already completed jobs will not be affected by the change. The latest artifacts for all jobs in the latest successful pipelines will be kept. More information: https://gitlab.lrz.de/help/user/admin_area/settings/continuous_integration.html#default-artifacts-expiration

Commit 961d5987 authored by Micha Mueller's avatar Micha Mueller
Browse files

Start work on unique dcdbpusher

Add generic Sensor-base class
Rework dcdbpusher + MQTTPusher to use generic base class
parent 35899b70
/*
* Configuration.cpp
*
* Created on: 13.12.2017
* Author: Micha Mueller
*/
#include "Configuration.h"
#include <iostream>
#include <string>
#include <sstream>
#include <unistd.h>
#include <iomanip>
#include <sys/sysinfo.h>
#include <linux/perf_event.h>
#include <boost/foreach.hpp>
#include <boost/property_tree/info_parser.hpp>
#include <boost/algorithm/string.hpp>
using namespace std;
extern volatile int keepRunning;
Configuration::Configuration(std::string cfgFile) {
//set up enum-maps to map string from cfgFile to an enum value defined in linux/perf_event.h
_enumType["PERF_TYPE_HARDWARE"] = PERF_TYPE_HARDWARE;
_enumType["PERF_TYPE_SOFTWARE"] = PERF_TYPE_SOFTWARE;
_enumType["PERF_TYPE_TRACEPOINT"] = PERF_TYPE_TRACEPOINT;
_enumType["PERF_TYPE_HW_CACHE"] = PERF_TYPE_HW_CACHE;
_enumType["PERF_TYPE_RAW"] = PERF_TYPE_RAW;
_enumType["PERF_TYPE_BREAKPOINT"] = PERF_TYPE_BREAKPOINT;
//if type==PERF_TYPE_HARDWARE
_enumConfig["PERF_COUNT_HW_CPU_CYCLES"] = PERF_COUNT_HW_CPU_CYCLES;
_enumConfig["PERF_COUNT_HW_INSTRUCTIONS"] = PERF_COUNT_HW_INSTRUCTIONS;
_enumConfig["PERF_COUNT_HW_CACHE_REFERENCES"] = PERF_COUNT_HW_CACHE_REFERENCES;
_enumConfig["PERF_COUNT_HW_CACHE_MISSES"] = PERF_COUNT_HW_CACHE_MISSES;
_enumConfig["PERF_COUNT_HW_BRANCH_INSTRUCTIONS"] = PERF_COUNT_HW_BRANCH_INSTRUCTIONS;
_enumConfig["PERF_COUNT_HW_BRANCH_MISSES"] = PERF_COUNT_HW_BRANCH_MISSES;
_enumConfig["PERF_COUNT_HW_BUS_CYCLES"] = PERF_COUNT_HW_BUS_CYCLES;
_enumConfig["PERF_COUNT_HW_STALLED_CYCLES_FRONTEND"] = PERF_COUNT_HW_STALLED_CYCLES_FRONTEND;
_enumConfig["PERF_COUNT_HW_STALLED_CYCLES_BACKEND"] = PERF_COUNT_HW_STALLED_CYCLES_BACKEND;
_enumConfig["PERF_COUNT_HW_REF_CPU_CYCLES"] = PERF_COUNT_HW_REF_CPU_CYCLES;
//if type==PERF_TYPE_SOFTWARE
_enumConfig["PERF_COUNT_SW_CPU_CLOCK"] = PERF_COUNT_SW_CPU_CLOCK;
_enumConfig["PERF_COUNT_SW_TASK_CLOCK"] = PERF_COUNT_SW_TASK_CLOCK;
_enumConfig["PERF_COUNT_SW_PAGE_FAULTS"] = PERF_COUNT_SW_PAGE_FAULTS;
_enumConfig["PERF_COUNT_SW_CONTEXT_SWITCHES"] = PERF_COUNT_SW_CONTEXT_SWITCHES;
_enumConfig["PERF_COUNT_SW_CPU_MIGRATIONS"] = PERF_COUNT_SW_CPU_MIGRATIONS;
_enumConfig["PERF_COUNT_SW_PAGE_FAULTS_MIN"] = PERF_COUNT_SW_PAGE_FAULTS_MIN;
_enumConfig["PERF_COUNT_SW_PAGE_FAULTS_MAJ"] = PERF_COUNT_SW_PAGE_FAULTS_MAJ;
_enumConfig["PERF_COUNT_SW_ALIGNMENT_FAULTS"] = PERF_COUNT_SW_ALIGNMENT_FAULTS;
_enumConfig["PERF_COUNT_SW_EMULATION_FAULTS"] = PERF_COUNT_SW_EMULATION_FAULTS;
_enumConfig["PERF_COUNT_SW_DUMMY"] = PERF_COUNT_SW_DUMMY;
//TODO set up map for rest of config enum
//set default values for global variables
_global.brokerHost = "";
_global.brokerPort = 1883;
_global.mqttPrefix = "";
_global.threads = 1;
boost::property_tree::iptree cfg;
boost::property_tree::read_info(cfgFile, cfg);
//read global variables
BOOST_FOREACH(boost::property_tree::iptree::value_type &global, cfg.get_child("global")) {
cout << global.first << " " << global.second.data() << endl;
if(boost::iequals(global.first, "mqttBroker")) {
_global.brokerHost = global.second.data();
size_t pos = _global.brokerHost.find(":");
if (pos != string::npos) {
_global.brokerPort = stoi(_global.brokerHost.substr(pos+1));
_global.brokerHost.erase(pos);
}
} else if(boost::iequals(global.first, "mqttprefix")) {
_global.mqttPrefix = global.second.data();
if (_global.mqttPrefix[_global.mqttPrefix.length()-1] != '/') {
_global.mqttPrefix.append("/");
}
} else if (boost::iequals(global.first, "threads")) {
_global.threads = stoi(global.second.data());
} else {
cout << " Value \"" << global.first << "\" not recognized. Omitting..." << endl;
}
}
//read template counters
BOOST_FOREACH(boost::property_tree::iptree::value_type &counter, cfg.get_child("CounterTemplate")) {
if (boost::iequals(counter.first, "counter")) {
cout << "Template Counter \"" << counter.second.data() << "\"" << endl;
if (!counter.second.empty()) {
PerfCounter perfCounter(counter.second.data());
if(!readCounter(perfCounter, counter.second)) {
_templateCounters.insert(perfCounterMap_t::value_type(perfCounter.getName(), perfCounter));
} else {
cout << "Template Counter \"" << counter.second.data() << "\" has bad values! Ignoring..." << endl;
}
}
}
}
//read one counter at a time
BOOST_FOREACH(boost::property_tree::iptree::value_type &counter, cfg.get_child("counters")) {
if (boost::iequals(counter.first, "counter")) {
cout << "Counter \"" << counter.second.data() << "\"" << endl;
if (!counter.second.empty()) {
PerfCounter perfCounter(counter.second.data());
//first check if default counter is given
boost::optional<boost::property_tree::iptree&> defaultC = counter.second.get_child_optional("default");
if(defaultC) {
cout << " Using \"" << defaultC.get().data() << "\" as default." << endl;
perfCounterMap_t::iterator it = _templateCounters.find(defaultC.get().data());
if(it != _templateCounters.end()) {
perfCounter = it->second;
perfCounter.setName(counter.second.data());
} else {
cout << " Template counter \"" << defaultC.get().data() << "\" not found! Using standard values." << endl;
}
}
//read remaining values
if(!readCounter(perfCounter, counter.second)) {
//create distinct perfCounter and mqttSuffix per CPU
string startMqtt = perfCounter.getMqtt();
//customize perfCounter for every CPU
for (int i = 0; i < get_nprocs(); i++) {
string incMqtt = increaseMqtt(startMqtt, i);
//check if mqtt-suffix is used twice
auto returnIt = _mqttSuffixes.insert(incMqtt);
if (!returnIt.second) {
cout << "ERROR: Mqtt-Topic (Suffix " << incMqtt << ") is used twice! Aborting" << endl;
keepRunning = 0;
return;
}
std::cout << " CPU " << perfCounter.getCpuId() << " using MQTT-Suffix " << incMqtt << std::endl;
perfCounter.setCpuId(i);
perfCounter.setMqtt(incMqtt);
_perfCounters.push_back(perfCounter);
}
} else {
cout << " Counter \"" << counter.second.data() << "\" has bad values! Ignoring..." << endl;
}
}
}
}
}
Configuration::~Configuration() {
// TODO Auto-generated destructor stub
}
int Configuration::readCounter(PerfCounter& counter, boost::property_tree::iptree& config) {
BOOST_FOREACH(boost::property_tree::iptree::value_type &val, config) {
if (boost::iequals(val.first, "interval")) {
counter.setInterval(stoi(val.second.data()));
} else if (boost::iequals(val.first, "mqttsuffix")) {
counter.setMqtt(val.second.data());
} else if (boost::iequals(val.first, "minValues")) {
counter.setMinValues(stoull(val.second.data()));
} else if (boost::iequals(val.first, "type")) {
enumMap_t::iterator it = _enumType.find(val.second.data());
if(it != _enumType.end()) {
counter.setType(it->second);
cout << " Type: " << val.second.data() << " (=" << counter.getType() << ")" << endl;
} else {
cout << " Type \"" << val.second.data() << "\" not known." << endl;
return 1;
}
} else if (boost::iequals(val.first, "config")) {
if (counter.getType() == PERF_TYPE_BREAKPOINT) {
//leave config zero
} else if (counter.getType() == PERF_TYPE_RAW) {
//read in custom hex-value
//TODO read in string-name and map to hex-value here?
// see info to perf_event_open and libpfm4
unsigned long config = stoul(val.second.data(), 0, 16);
counter.setConfig(config);
cout << " Config: Raw value: " << counter.getConfig() << endl;
} else {
enumMap_t::iterator it = _enumConfig.find(val.second.data());
if(it != _enumConfig.end()) {
counter.setConfig(it->second);
cout << " Config: " << val.second.data() << " (= " << counter.getConfig() << ")" << endl;
} else {
cout << " Config \"" << val.second.data() << "\" not known." << endl;
return 1;
}
}
} else if (boost::iequals(val.first, "default")) {
//avoid unnecessary "Value not recognized" message
} else {
cout << " Value \"" << val.first << "\" not recognized. Omitting..." << endl;
}
}
cout << " StartMQTT: " << counter.getMqtt() << endl;
cout << " Interval : " << counter.getInterval() << endl;
cout << " minValues: " << counter.getMinValues() << endl;
return 0;
}
const std::string Configuration::increaseMqtt(const std::string& mqtt, int val) {
unsigned long mqttDigits = stoul(mqtt, 0, 16);
mqttDigits += val;
std::stringstream stream;
stream << std::setfill ('0') << std::setw(mqtt.length()) << std::hex << mqttDigits;
return stream.str();
}
/*
* Configuration.h
*
* Created on: 13.12.2017
* Author: Micha Mueller
*/
#ifndef CONFIGURATION_H_
#define CONFIGURATION_H_
#include <vector>
#include <map>
#include <set>
//#include "Sensor.h"
#include "PerfCounter.h"
#include <boost/property_tree/ptree.hpp>
typedef std::vector<Sensor> sensorVector_t;
typedef std::vector<PerfCounter> perfCounterVector_t;
typedef std::map<std::string, PerfCounter> perfCounterMap_t;
typedef std::map<std::string, unsigned int> enumMap_t;
typedef std::set<std::string> mqttSet_t;
typedef struct {
int brokerPort;
std::string brokerHost;
std::string mqttPrefix;
uint32_t threads;
} global_t;
class Configuration {
public:
/**
* Read the configuration for perfpusher.
* @param cfgFile Name of the configuration file
*/
Configuration(std::string cfgFile);
virtual ~Configuration();
perfCounterVector_t _perfCounters;
global_t _global;
private:
/**
* Set the variables of counter according to the values specified in config.
* @param counter The counter to be configured
* @param config A property(sub)tree containing the values
* @return 0 on success, 1 if a required value could not be parsed
*/
int readCounter(PerfCounter& counter, boost::property_tree::iptree& config);
/**
* Increase the mqtt by val means
* Example: increaseMqtt("003F", 1) returns "0040"
* increaseMqtt("003F", 15) returns "004D"
*
* @param mqtt The mqtt-topic to be increased
* @param val The value by which the mqtt-topic should be increased
* @return The increased mqtt-topic
*/
const std::string increaseMqtt(const std::string& mqtt, int val);
perfCounterMap_t _templateCounters;
enumMap_t _enumType;
enumMap_t _enumConfig;
mqttSet_t _mqttSuffixes;
};
#endif /* CONFIGURATION_H_ */
/*
* MQTTPusher.cpp
*
* Created on: 13.12.2017
* Author: Micha Mueller
*/
#include "MQTTPusher.h"
#include <iostream>
#include <string>
#include <unistd.h>
extern volatile int keepRunning;
MQTTPusher::MQTTPusher(int brokerPort, const std::string& brokerHost,
const std::string& mqttPrefix, sensorVector_t& sensors) :
_brokerPort(brokerPort), _brokerHost(brokerHost),
_mqttPrefix(mqttPrefix),_sensors(sensors),_connected(false) {
//print some info
int mosqMajor, mosqMinor, mosqRevision;
mosquitto_lib_version(&mosqMajor, &mosqMinor, &mosqRevision);
std::cout << "mosquitto " << mosqMajor << "." << mosqMinor << "." << mosqRevision << std::endl;
char hostname[256];
if (gethostname(hostname, 255) != 0) {
fprintf(stderr, "Cannot get hostname.\n");
exit(EXIT_FAILURE);
}
hostname[255] = '\0';
std::cout << "Hostname: " << hostname << std::endl;
mosquitto_lib_init();
std::string clientID(_mqttPrefix);
_mosq = mosquitto_new(_mqttPrefix.c_str(), false, NULL);
if (!_mosq) {
perror(NULL);
exit(EXIT_FAILURE);
}
}
MQTTPusher::~MQTTPusher() {
if(_connected) {
mosquitto_disconnect(_mosq);
}
}
void MQTTPusher::push() {
while (keepRunning && !_connected) {
if (mosquitto_connect(_mosq, _brokerHost.c_str(), _brokerPort, 1000) != MOSQ_ERR_SUCCESS) {
std::cout << "Mosquitto: could not connect to MQTT broker "
<< _brokerHost << ":"
<< _brokerPort << std::endl;
sleep(1);
} else {
_connected = true;
}
}
reading_t* reads = new reading_t[1024];
std::size_t totalCount = 0;
while (keepRunning || totalCount) {
totalCount = 0;
for(auto& s : _sensors) {
if (s.getSizeOfReadingQueue() >= s.getMinValues()) {
if (!_connected) {
if (mosquitto_reconnect(_mosq) != MOSQ_ERR_SUCCESS) {
std::cout << "mosquitto: could not reconnect to MQTT broker " << _brokerHost << ":" << _brokerPort << std::endl;
sleep(1);
} else {
_connected = true;
}
}
if (_connected) {
std::size_t count = s.popReadingQueue(reads, 1024);
totalCount+= count;
#ifdef DEBUG
std::cout << s.getName() << " has read " << count << " values:" << std::endl;
for (std::size_t i=0; i<count; i++) {
std::cout << " " << reads[i].timestamp << " " << reads[i].value << std::endl;
}
#endif
if (mosquitto_publish(_mosq, NULL, (_mqttPrefix+s.getMqtt()).c_str(), sizeof(reading_t)*count, reads, 0, false) != MOSQ_ERR_SUCCESS) {
std::cerr << "mosquitto: could not send message! Trying again later" << std::endl;
mosquitto_disconnect(_mosq);
_connected = false;
s.pushReadingQueue(reads, count);
totalCount -= count;
sleep(5);
break;
}
}
}
}
sleep(1);
}
}
/*
* MQTTPusher.h
*
* Created on: 13.12.2017
* Author: Micha Mueller
*/
#ifndef MQTTPUSHER_H_
#define MQTTPUSHER_H_
#include "Configuration.h"
#include <mosquitto.h>
class MQTTPusher {
public:
MQTTPusher(int brokerPort, const std::string& _brokerHost,
const std::string& mqttPrefix, sensorVector_t& sensors);
virtual ~MQTTPusher();
void push();
private:
int _brokerPort;
std::string _brokerHost;
std::string _mqttPrefix;
sensorVector_t& _sensors;
struct mosquitto* _mosq;
bool _connected;
};
#endif /* MQTTPUSHER_H_ */
/*
* Sensor.cpp
*
* Created on: 12.01.2018
* Author: Micha Mueller
*/
#include "Sensor.h"
#include "timestamp.h"
#include <unistd.h>
#include <functional>
extern volatile int keepRunning;
Sensor::Sensor(const std::string name) :
_name(name), _mqtt(""), _minValues(1), _interval(1000) {
_latestValue.timestamp = 0;
_latestValue.value = 0;
_timer = NULL;
_readingQueue = new boost::lockfree::spsc_queue<reading_t>(1024);
}
virtual Sensor::~Sensor() {
delete _readingQueue;
}
const std::string& Sensor::getName() const {
return _name;
}
void Sensor::setName(const std::string& name) {
_name = name;
}
const std::string& Sensor::getMqtt() const {
return _mqtt;
}
void Sensor::setMqtt(const std::string& mqtt) {
_mqtt = mqtt;
}
unsigned Sensor::getMinValues() const {
return _minValues;
}
void Sensor::setMinValues(unsigned minValues) {
_minValues = minValues;
}
unsigned Sensor::getInterval() const {
return _interval;
}
void Sensor::setInterval(unsigned interval) {
_interval = interval;
}
const std::size_t Sensor::getSizeOfReadingQueue() const {
return _readingQueue->read_available();
}
std::size_t Sensor::popReadingQueue(reading_t *reads, std::size_t max) const {
return _readingQueue->pop(reads, max);
}
void Sensor::pushReadingQueue(reading_t *reads, std::size_t count) const {
_readingQueue->push(reads, count);
}
virtual void Sensor::readAsync() {
uint64_t now = getTimestamp();
this->read();
if (_timer != NULL && keepRunning) {
uint64_t next = now + MS_TO_NS(_interval);
_timer->expires_at(timestamp2ptime(next));
_timer->async_wait(std::bind(&(this->readAsync), this));
}
}
/*
* Sensor.h
*
* Created on: 12.01.2018
* Author: Micha Mueller
*
* Base class which defines the interface for concrete sensors.
* A concrete (derived) sensor must at least implement the read()-,
* readAsync()- and startPolling()-methods.
*/
#ifndef SENSOR_H_
#define SENSOR_H_
#include <string>
#include <boost/asio.hpp>
#include <boost/lockfree/spsc_queue.hpp>
typedef struct {
uint64_t value;
uint64_t timestamp;
} reading_t;
class Sensor {
public:
Sensor(const std::string name);
virtual ~Sensor();
//non-overwriteable methods
const std::string& getName() const;
void setName(const std::string& name);
const std::string& getMqtt() const;
void setMqtt(const std::string& mqtt);
unsigned getMinValues() const;
void setMinValues(unsigned minValues);
unsigned getInterval() const;
void setInterval(unsigned interval);
const std::size_t getSizeOfReadingQueue() const;
std::size_t popReadingQueue(reading_t *reads, std::size_t max) const;
void pushReadingQueue(reading_t *reads, std::size_t count) const;
//have to be overwritten methods
virtual void read() = 0;
//TODO has readAsync to be overwritten??
virtual void readAsync();
virtual void startPolling(boost::asio::io_service& io) = 0;
protected:
std::string _name;
std::string _mqtt;
unsigned int _minValues;
unsigned int _interval;
reading_t _latestValue;
boost::asio::deadline_timer* _timer;
boost::lockfree::spsc_queue<reading_t>* _readingQueue;
};
#endif /* SENSOR_H_ */
//================================================================================
// Name : dcdbpusher.cpp
// Author : Micha Mueller
// Copyright : Leibniz Supercomputing Centre
// Description : Main functions for the DCDB MQTT Pusher
//================================================================================
//================================================================================
// This file is part of DCDB (DataCenter DataBase)
// Copyright (C) 2011-2016 Leibniz Supercomputing Centre
//
// This program is free software; you can redistribute it and/or
// modify it under the terms of the GNU General Public License
// as published by the Free Software Foundation; either version 2
// of the License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program; if not, write to the Free Software
// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
//================================================================================
//define DEBUG
#include <dcdbdaemon.h>
#include <functional>
#include "Sensor.h"
#include "Configuration.h"
#include "MQTTPusher.h"
#include <boost/foreach.hpp>
#include <boost/asio.hpp>
#include <boost/thread/thread.hpp>
using namespace std;
volatile int keepRunning;
int daemonize = 0;
void sigHandler(int sig) {
cout << "Received SIGINT. Stopping sensor threads and flushing MQTT queues." << endl;
keepRunning = 0;
}