Commit 07149b15 authored by Micha Mueller's avatar Micha Mueller
Browse files

Merge branch 'supermucng' of gitlab.lrz.de:dcdb/dcdbpusher into supermucng

parents 73a85f92 c1e93745
......@@ -9,6 +9,7 @@
#define SRC_CONFIGURATORTEMPLATE_H_
#include <map>
#include <set>
#include <boost/foreach.hpp>
#include <boost/algorithm/string.hpp>
......@@ -18,6 +19,10 @@
#include "SensorBase.h"
#include "SensorGroupTemplate.h"
#include <iostream>
#include <sstream>
#include <iomanip>
//#define STRCMP(node,str) boost::iequals(node.first,str) //DEPRECATED
#define CFG_VAL boost::property_tree::iptree&
......@@ -635,6 +640,109 @@ protected:
*/
virtual void global(CFG_VAL config) {}
/**
* Increases by a certain value the input MQTT hex topic.
*
* Example: a mqtt="AAB7" and val=5 produce "AAC2" as output.
*
* @param mqtt: the MQTT hex string whose value has to be increased
* @param val: the value by which mqtt has to be increased
*
* @return the increased MQTT string
*
*/
const std::string increaseMqtt(const std::string& mqtt, int val) {
unsigned long mqttDigits = stoul(mqtt, 0, 16);
mqttDigits += val;
std::stringstream stream;
stream << std::setfill ('0') << std::setw(mqtt.length()) << std::uppercase << std::hex << mqttDigits;
return stream.str();
}
/**
* Formats a numerical CPU core ID into a hex string of specified length.
*
* Example: a mqttPart="xx" and val=11 produce "0B" as output.
*
* @param mqttPart: a template MQTT string, defines the length of the final string
* @param val: the value of the CPU core ID
*
* @return the hex string representation of the input CPU core ID
*
*/
const std::string formatMqttCPU(const std::string& mqttPart, unsigned int val) {
std::stringstream stream;
stream << std::setfill ('0') << std::setw(mqttPart.length()) << std::uppercase << std::hex << val;
return stream.str();
}
/**
* Tries to parse the given cpuString as integer numbers. On success, the specified numbers will be inserted
* into a set, which will be returned. On failure, an empty set is returned. A set is used to maintain uniqueness
* and an ascending order among the numbers although this is not strictly required.
*
* @param cpuString String which specifies a range and/or set of numbers (e.g. "1,2,3-5,7-9,10")
* @return A set of integers as specified in the cpuString. If the string could not be parsed the set will be empty.
*/
std::set<int> parseCpuString(const std::string& cpuString) {
std::set<int> cpus;
int maxCpu = 512;
std::vector<std::string> subStrings;
std::stringstream ssComma(cpuString);
std::string item;
while (std::getline(ssComma, item, ',')) {
subStrings.push_back(item);
}
for (auto s : subStrings) {
if (s.find('-') != std::string::npos) { //range of values (e.g. 1-5) specified
std::stringstream ssHyphen(s);
std::string min, max;
std::getline(ssHyphen, min, '-');
std::getline(ssHyphen, max);
try {
int minVal = stoi(min);
int maxVal = stoi(max);
for (int i = minVal; i <= maxVal; i++) {
if (i >= 0 && i < maxCpu) {
cpus.insert((int)i);
}
}
} catch (const std::exception& e) {
LOG(debug) << "Could not parse values \"" << min << "-" << max << "\"";
}
} else { //single value
try {
int val = stoi(s);
if (val >= 0 && val < maxCpu) {
cpus.insert((int)val);
}
} catch (const std::exception& e) {
LOG(debug) << "Could not parse value \"" << s << "\"";
}
}
}
if (cpus.empty()) {
LOG(warning) << " CPUs could not be parsed!";
} else {
std::stringstream sstream;
sstream << " CPUS: ";
for (auto i : cpus) {
sstream << i << ", ";
}
std::string msg = sstream.str();
msg.pop_back();
msg.pop_back();
LOG(debug) << msg;
}
return cpus;
}
std::string _entityName;
std::string _groupName;
std::string _baseName;
......
......@@ -59,7 +59,7 @@ public:
const reading_t * const getCache() const { return _cache.get(); }
const reading_t getLatestValue() const { return _latestValue; } /*TODO return reference*/
void setName(const std::string& name) { _name = name; }
void setName(const std::string& name, int cpuID=-1) { _name = formatName(name, cpuID); }
void setMqtt(const std::string& mqtt) { _mqtt = mqtt; }
const std::size_t getSizeOfReadingQueue() const { return _readingQueue->read_available(); }
......@@ -86,6 +86,8 @@ public:
_latestValue.timestamp = reading.timestamp;
}
static std::string formatName(const std::string& name, int cpuID=-1) {return cpuID<0 ? name : "cpu" + std::to_string(cpuID) + "." + name;}
protected:
std::string _name;
......
......@@ -7,10 +7,7 @@
#include "PerfeventConfigurator.h"
#include <iostream>
#include <sstream>
#include <unistd.h>
#include <iomanip>
#include <sys/sysinfo.h>
#include <linux/perf_event.h>
......@@ -171,24 +168,26 @@ bool PerfeventConfigurator::readConfig(std::string cfgPath) {
std::set<int>::iterator it = cpuSet.begin();
PerfSGPtr leaderSG = std::make_shared<PerfSensorGroup>(group);
leaderSG->setSensorGroupLeader(true);
std::stringstream mqttPart;
mqttPart << std::setfill ('0') << std::setw(group.getMqttPart().size()) << std::hex << *it << "/";
leaderSG->setGroupName(leaderSG->getGroupName() + std::to_string(*it));
leaderSG->setGroupName(SensorBase::formatName(leaderSG->getGroupName(), *it));
leaderSG->setCpuId(*it);
leaderSG->setMqttPart(mqttPart.str());
leaderSG->setMqttPart(formatMqttCPU(group.getMqttPart(), *it) + "/");
for(const auto& s : leaderSG->getSensors()) s->setName(s->getName(), *it);
storeSensorGroup(leaderSG);
it++;
//create fellow groups
for (; it != cpuSet.end(); ++it) {
PerfSGPtr perfSG = std::make_shared<PerfSensorGroup>(group);
std::stringstream mqttPart;
mqttPart << std::setfill ('0') << std::setw(group.getMqttPart().size()) << std::hex << *it << "/";
perfSG->setGroupName(perfSG->getGroupName() + std::to_string(*it));
perfSG->setGroupName(SensorBase::formatName(perfSG->getGroupName(), *it));
perfSG->setCpuId(*it);
perfSG->setMqttPart(mqttPart.str());
perfSG->setMqttPart(formatMqttCPU(group.getMqttPart(), *it) + "/");
for(auto s : perfSG->getSensors()) s->setName(s->getName(), *it);
storeSensorGroup(perfSG);
leaderSG->pushBackGroup(perfSG);
}
......@@ -203,68 +202,10 @@ bool PerfeventConfigurator::readConfig(std::string cfgPath) {
for(const auto& g : _sensorGroups) {
for(const auto& s : g->getSensors()) {
s->setMqtt(_mqttPrefix + g->getMqttPart() + s->getMqtt());
LOG(debug) << g->getGroupName() << "::" << s->getName() << " using MQTT-topic \"" << s->getMqtt() << "\"";
}
}
//we do not need them anymore
_templateCpus.clear();
return true;
}
std::set<int> PerfeventConfigurator::parseCpuString(const std::string& cpuString) {
std::set<int> cpus;
int maxCpu = get_nprocs();
std::vector<std::string> subStrings;
std::stringstream ssComma(cpuString);
std::string item;
while (std::getline(ssComma, item, ',')) {
subStrings.push_back(item);
}
for (auto s : subStrings) {
if (s.find('-') != std::string::npos) { //range of values (e.g. 1-5) specified
std::stringstream ssHyphen(s);
std::string min, max;
std::getline(ssHyphen, min, '-');
std::getline(ssHyphen, max);
try {
int minVal = stoi(min);
int maxVal = stoi(max);
for (int i = minVal; i <= maxVal; i++) {
if (i >= 0 && i < maxCpu) {
cpus.insert(i);
}
}
} catch (const std::exception& e) {
LOG(debug) << "Could not parse values \"" << min << "-" << max << "\"";
}
} else { //single value
try {
int val = stoi(s);
if (val >= 0 && val < maxCpu) {
cpus.insert(val);
}
} catch (const std::exception& e) {
LOG(debug) << "Could not parse value \"" << s << "\"";
}
}
}
if (cpus.empty()) {
LOG(warning) << " CPUs could not be parsed!";
} else {
std::stringstream sstream;
sstream << " CPUS: ";
for (auto i : cpus) {
sstream << i << ", ";
}
std::string msg = sstream.str();
msg.pop_back();
msg.pop_back();
LOG(debug) << msg;
}
return cpus;
}
......@@ -29,16 +29,6 @@ protected:
bool readConfig(std::string cfgPath) override;
private:
/**
* Tries to parse the given cpuString as integer numbers. On success, the specified numbers will be inserted
* into a set, which will be returned. On failure, an empty set is returned. A set is used to maintain uniqueness
* and an ascending order among the numbers although this is not strictly required.
*
* @param cpuString String which specifies a range and/or set of numbers (e.g. "1,2,3-5,7-9,10")
* @return A set of integers as specified in the cpuString. If the string could not be parsed the set will be empty.
*/
std::set<int> parseCpuString(const std::string& cpuString);
templateCpuMap_t _templateCpus;
enumMap_t _enumType;
......
......@@ -23,42 +23,6 @@ ProcfsConfigurator::ProcfsConfigurator() {
*/
ProcfsConfigurator::~ProcfsConfigurator() {}
/**
* Increases by a certain value the input MQTT hex topic.
*
* Example: a mqtt="AAB7" and val=5 produce "AAC2" as output.
*
* @param mqtt: the MQTT hex string whose value has to be increased
* @param val: the value by which mqtt has to be increased
*
* @return the increased MQTT string
*
*/
const std::string ProcfsConfigurator::increaseMqtt(const std::string& mqtt, int val) {
unsigned long mqttDigits = stoul(mqtt, 0, 16);
mqttDigits += val;
std::stringstream stream;
stream << std::setfill ('0') << std::setw(mqtt.length()) << std::uppercase << std::hex << mqttDigits;
return stream.str();
}
/**
* Formats a numerical CPU core ID into a hex string of specified length.
*
* Example: a mqttPart="xx" and val=11 produce "0B" as output.
*
* @param mqttPart: a template MQTT string, defines the length of the final string
* @param val: the value of the CPU core ID
*
* @return the hex string representation of the input CPU core ID
*
*/
const std::string ProcfsConfigurator::formatMqttCPU(const std::string& mqttPart, unsigned int val) {
std::stringstream stream;
stream << std::setfill ('0') << std::setw(mqttPart.length()) << std::uppercase << std::hex << val;
return stream.str();
}
/**
* Reads a configuration file and instantiates Procfs Sensor Groups accordingly.
*
......@@ -200,16 +164,16 @@ void ProcfsConfigurator::sensorGroup(ProcfsSensorGroup& sGroup, CFG_VAL config)
// Adding the sensors corresponding to availableMetrics
for(int i=0;i < numMetrics; i++) {
const ProcfsSBPtr& sensor = sGroup.getDerivedSensors().at(i);
if ( autoMQTT ) sensor->setMqtt(this->increaseMqtt(mqttStart, metricsCounter->at(sensor->getCPUId() + 1)++ ));
ProcfsSBPtr sensor = static_pointer_cast<ProcfsSBPtr>(sGroup.getSensors().at(i));
if ( autoMQTT ) sensor->setMqtt(increaseMqtt(mqttStart, metricsCounter->at(sensor->getCPUId() + 1)++ ));
// If the metric does not refer to a specific CPU core, the topic is prefix + default mqttPart + suffix
// The suffix is increased automatically through the metricsCounter vector
if( sensor->getCPUId() == -1 )
sensor->setMqtt(_mqttPrefix + sGroup.getMqttPart() + sensor->getMqtt());
sensor->setMqtt(_mqttPrefix + sGroup.getMqttPart() + "/" + sensor->getMqtt());
// If the metrics refers to a specific CPU core, the topic is prefix + CPU core ID + suffix
// The suffix is automatically increased for each core through the metricsCounter vector
else
sensor->setMqtt(_mqttPrefix + this->formatMqttCPU(sGroup.getMqttPart(), sensor->getCPUId()) + sensor->getMqtt());
sensor->setMqtt(_mqttPrefix + formatMqttCPU(sGroup.getMqttPart(), sensor->getCPUId()) + "/" + sensor->getMqtt());
LOG(debug) << sGroup.getGroupName() << "::" << sensor->getName() << " using MQTT-topic \"" << sensor->getMqtt() << "\"";
}
......@@ -218,63 +182,3 @@ void ProcfsConfigurator::sensorGroup(ProcfsSensorGroup& sGroup, CFG_VAL config)
cpuSet.clear();
//parser->close();
}
//TODO: move this up in ConfiguratorTemplate
std::set<int> ProcfsConfigurator::parseCpuString(const std::string& cpuString) {
std::set<int> cpus;
int maxCpu = 512;
std::vector<std::string> subStrings;
std::stringstream ssComma(cpuString);
std::string item;
while (std::getline(ssComma, item, ',')) {
subStrings.push_back(item);
}
for (auto s : subStrings) {
if (s.find('-') != std::string::npos) { //range of values (e.g. 1-5) specified
std::stringstream ssHyphen(s);
std::string min, max;
std::getline(ssHyphen, min, '-');
std::getline(ssHyphen, max);
try {
int minVal = stoi(min);
int maxVal = stoi(max);
for (int i = minVal; i <= maxVal; i++) {
if (i >= 0 && i < maxCpu) {
cpus.insert((int)i);
}
}
} catch (const std::exception& e) {
LOG(debug) << "Could not parse values \"" << min << "-" << max << "\"";
}
} else { //single value
try {
int val = stoi(s);
if (val >= 0 && val < maxCpu) {
cpus.insert((int)val);
}
} catch (const std::exception& e) {
LOG(debug) << "Could not parse value \"" << s << "\"";
}
}
}
if (cpus.empty()) {
LOG(warning) << " CPUs could not be parsed!";
} else {
std::stringstream sstream;
sstream << " CPUS: ";
for (auto i : cpus) {
sstream << i << ", ";
}
std::string msg = sstream.str();
msg.pop_back();
msg.pop_back();
LOG(debug) << msg;
}
return cpus;
}
......@@ -8,9 +8,6 @@
#ifndef PROCFSCONFIGURATOR_H_
#define PROCFSCONFIGURATOR_H_
#include <iostream>
#include <sstream>
#include <iomanip>
#include <vector>
#include "ProcfsSensorGroup.h"
#include "../../includes/ConfiguratorTemplate.h"
......@@ -33,13 +30,6 @@ protected:
void sensorBase(ProcfsSensorBase& s, CFG_VAL config) override;
// The readConfig method has to be overridden as the MQTT topic logic is custom
bool readConfig(std::string cfgPath) override;
// Copy-paste of the method in the perfevent plugin, used to increase MQTT topics
const std::string increaseMqtt(const std::string& mqtt, int val);
// Similar to the above, formats a numerical CPU core ID into an hex string
const std::string formatMqttCPU(const std::string& mqttPart, unsigned int val);
// Copy-paste of the method in the perfevent plugin to parse CPU id strings
std::set<int> parseCpuString(const std::string& cpuString);
};
// Functions required to correctly generate Configurator objects from linked libraries
......
......@@ -316,14 +316,12 @@ bool ProcstatParser::_readNames(std::map<std::string, ProcfsSBPtr> *sensorMap, s
std::string cpuName(lineToken);
while ((lineToken = strtok_r(this->_stringBuffer, this->LINE_SEP, &savePtr)) != NULL && colCtr < this->DEFAULTMETRICS) {
if( this->_skipColumn[colCtr] == 1 || ( this->_skipColumn[colCtr] == 2 && currCPU == -1 ) ) {
if( sensorMap != NULL && !sensorMap->empty() ) {
if( sensorMap != NULL && !sensorMap->empty() )
s = std::make_shared<ProcfsSensorBase>(*sensorMap->find(this->DEFAULT_NAMES[colCtr])->second);
s->setName(currCPU == -1 ? s->getName() : cpuName + "_" + s->getName());
}
else
s = std::make_shared<ProcfsSensorBase>(currCPU == -1 ? this->DEFAULT_NAMES[colCtr] : cpuName + "_" + this->DEFAULT_NAMES[colCtr]);
s->setMetric( currCPU == -1 ? this->DEFAULT_NAMES[colCtr] : cpuName + "_" + this->DEFAULT_NAMES[colCtr] );
s = std::make_shared<ProcfsSensorBase>(this->DEFAULT_NAMES[colCtr]);
s->setName(s->getName(), currCPU);
s->setMetric(this->DEFAULT_NAMES[colCtr], currCPU);
s->setCPUId(currCPU);
this->_sensors->push_back(s);
// This variable counts the number of metrics for each CPU that are effectively parsed from
......
......@@ -84,8 +84,8 @@ protected:
bool _readNames(std::map<std::string, ProcfsSBPtr> *sensorMap, std::set<int> *cpuSet) override;
bool _readMetrics() override;
unsigned int _memTotalLine;
unsigned int _memFreeLine;
int _memTotalLine;
int _memFreeLine;
reading_t _memTotalValue;
reading_t _memFreeValue;
reading_t _valueBuffer;
......
......@@ -40,7 +40,7 @@ public:
virtual ~ProcfsSensorBase() {}
void setMetric(std::string m) { this->_metric = m; }
void setMetric(std::string m, int cpuID=-1) { this->_metric = SensorBase::formatName(m, cpuID); }
std::string getMetric() { return this->_metric; }
void setPerCPU(bool p) { this->_perCPU = p; }
bool isPerCPU() { return this->_perCPU; }
......
......@@ -105,7 +105,7 @@ void ProcfsSensorGroup::read() {
return;
}
else {
for(int i=0; i < this->_sensors.size(); i++) {
for(unsigned int i=0; i < this->_sensors.size(); i++) {
this->_readingBuffer.value = this->_readingVector->at(i).value;
this->_sensors.at(i)->storeReading(this->_readingBuffer, _cacheSize);
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment