Commit 80590ac9 authored by Alessio Netti's avatar Alessio Netti
Browse files

ProcFS plugin overhaul

- Still work in progress
parent aa50c30f
......@@ -475,7 +475,9 @@ The ProcFS plugin enables dcdbpusher to sample resource usage metrics from a var
* /proc/meminfo: contains RAM memory-related usage metrics (note that some of the metrics overlap with /proc/vmstat);
* /proc/stat: contains CPU usage-related metrics, both at system and core levels.
Note that in the ProcFS plugin, MQTT topics are automatically generated at runtime depending on the number of metrics found in each parsed file. For this reason, please be careful when configuring the plugin so that its MQTT topics do not overlap with those of other plugins.
Note that the ProcFS plugin can operate in two distinct modes, with respect to MQTT topics:
* Automatic: if no sensors are specified, all metrics discovered in the underlying parsed file are acquired; sensors and MQTT topics are generated for them. Please be careful when configuring the plugin so that its MQTT topics do not overlap with those of other plugins.
* Manual: If at least one sensor is specified, only the corresponding metrics are acquired, and all other metrics in the parsed file are discarded. MQTT topics are assigned accordingly, using the mqttPrefix, mqttPart and mqttSuffix fields.
Explanation of the values specific for the ProcFS plugin:
......@@ -483,8 +485,30 @@ Explanation of the values specific for the ProcFS plugin:
|:----- |:----------- |
| type | The type of the file parsed by the sensor group. Can be either "vmstat", "meminfo", or "procstat"
| path | Path of the file, if different from the default path in the /proc filesystem
| mqttPart | The mqttPart works similarly as in the Perf-event plugin. For sensors associated to metrics that are core-specific (i.e. some of those in /proc/stat) the mqttPart is replaced with the CPU id. For all other metrics that are system-wide, the mqttPart is used as it is.
| mqttStart | Base MQTT suffix that is automatically incremented to generate topics for sensors associated to metrics in the same file. Note that mqttPart and mqttStart, when combined, must form a string of 4 hex characters, or 16 bits.
| mqttPart | The mqttPart works similarly as in the Perf-event plugin. For sensors associated to metrics that are core-specific (e.g. some of those in /proc/stat) the mqttPart is replaced with the CPU id. For all other metrics that are system-wide, the mqttPart is used as it is.
| mqttStart | Base MQTT suffix that is automatically incremented to generate topics for sensors associated to metrics in the same file. Note that this parameter is used only if automatic MQTT topic generation is enabled, when no sensors are explicitly defined.
| cpus | Defines the set of CPU cores for which metrics must be collected. Only affects extraction of core-specific metrics (e.g. those in /proc/stat), whereas system-level metrics are acquired regardless of this setting. If no CPU cores set is defined, metrics for all available CPU cores will be collected. This parameter follows the same syntax as in the Perf-event plugin.
Additionally, sensors in the ProcFS plugin (defined with the "metric" keyword) support the following additional values:
| Value | Explanation |
|:----- |:----------- |
| type | The type of the specific metric associated to the sensor. This field must match the exact name of a metric in the underlying parsed file. If such a match does not exist, the sensor is discarded.
| perCpu | Boolean. If set to "on", the metric will be collected for each CPU core specified with the "cpus" sensor group parameter, or for all CPU cores if none is specified. Otherwise, the metric will be collected only at system level. This parameter has no effect on metrics that are not acquired at CPU core level (e.g. those in /proc/vmstat and /proc/meminfo).
The "type" field can be inferred for each sensor by simply checking the underlying file parsed by the sensor group. For /proc/stat files, on the other hand, CPU core-related metrics are collected in separate columns, which adopt the following naming scheme that can be used to define sensors:
* user
* nice
* system
* idle
* iowait
* irq
* softirq
* steal
* guest
* guest_nice
Additional CPU-related metrics (that may be introduced in future versions of the Linux kernel) are not supported by the DCDB ProcFS plugin.
## Writing own plugins <a name="writingOwnPlugins"></a>
First make sure you read the [plugins](#plugins) section.
......
global {
mqttPrefix /FF112233445566778899AABB
mqttPrefix /FF112233445566778899AAB
}
template_file def1 {
......@@ -9,27 +9,94 @@ mqttPart 00
mqttStart E0
}
file meminfo {
default def1
path /Users/di52bul/dcdb/meminfo
type meminfo
mqttPart E00
mqttStart 00
metric MemFree {
type MemFree
mqttsuffix 36
skipConstVal true
}
metric AnonPages {
type AnonPages
mqttsuffix AA
skipConstVal true
}
metric Hugepagesize {
type Hugepagesize
mqttsuffix BB
skipConstVal true
}
}
file vmstat {
default def1
path /Users/di52bul/dcdb/vmstat
type vmstat
mqttPart B0
mqttPart D00
mqttStart 00
}
file meminfo {
default def1
path /Users/di52bul/dcdb/meminfo
type meminfo
mqttPart C0
mqttStart 00
metric nr_dirty_threshold {
type nr_dirty_threshold
mqttsuffix F0
skipConstVal true
}
metric nr_file_pages {
type nr_file_pages
mqttsuffix F1
skipConstVal true
}
metric nr_alloc_batch {
type nr_alloc_batch
mqttsuffix F2
skipConstVal true
}
}
file procstat {
default def1
path /Users/di52bul/dcdb/stat
type procstat
mqttPart D0
mqttPart C00
mqttStart 00
cpus 250,36
metric idle {
type idle
mqttsuffix 36
skipConstVal true
perCpu on
}
metric user {
type user
mqttsuffix 37
skipConstVal true
perCpu on
}
metric nice {
type nice
mqttsuffix 38
skipConstVal true
perCpu off
}
metric ctxt {
type ctxt
mqttsuffix 40
skipConstVal true
perCpu off
}
}
......@@ -13,7 +13,7 @@
*/
ProcfsConfigurator::ProcfsConfigurator() {
_groupName = "file";
_baseName = "INVALID";
_baseName = "metric";
_entityName = "INVALID";
}
......@@ -110,6 +110,22 @@ bool ProcfsConfigurator::readConfig(std::string cfgPath) {
return true;
}
/**
* Configures a ProcfsSensorBase object instantiated by the readSensorBase method.
*
* @param s: the ProcfsSensorBase object to be configured
* @param config: the BOOST property (sub-)tree containing configuration options for the sensor
*
*/
void ProcfsConfigurator::sensorBase(ProcfsSensorBase& s, CFG_VAL config) {
BOOST_FOREACH(boost::property_tree::iptree::value_type &val, config) {
if (boost::iequals(val.first, "type"))
s.setMetric(val.second.data());
else if (boost::iequals(val.first, "perCPU"))
s.setPerCPU( val.second.data() == "on" );
}
}
/**
* Configures a ProcfsSensorGroup object instantiated by the readSensorGroup method.
*
......@@ -123,6 +139,8 @@ bool ProcfsConfigurator::readConfig(std::string cfgPath) {
void ProcfsConfigurator::sensorGroup(ProcfsSensorGroup& sGroup, CFG_VAL config) {
// Read additional configuration parameters supported by the Procfs plugin
std::string filePath="", fileType="", mqttStart="00";
// Set of cpu ids read during configuration
std::set<short> cpuSet;
BOOST_FOREACH(boost::property_tree::iptree::value_type &val, config) {
if (boost::iequals(val.first, "type")) {
fileType = val.second.data();
......@@ -130,6 +148,8 @@ void ProcfsConfigurator::sensorGroup(ProcfsSensorGroup& sGroup, CFG_VAL config)
filePath = val.second.data();
} else if (boost::iequals(val.first, "mqttStart")) {
mqttStart = val.second.data();
} else if (boost::iequals(val.first, "cpus")) {
cpuSet = parseCpuString(val.second.data());
}
}
......@@ -148,49 +168,127 @@ void ProcfsConfigurator::sensorGroup(ProcfsSensorGroup& sGroup, CFG_VAL config)
LOG(warning) << _groupName << " " << sGroup.getGroupName() << "::" << "Unspecified or invalid type! Available types are vmstat, meminfo, procstat";
return; }
sGroup.setType(fileType);
// The sensor map contains all sensor objects specified by users in the configuration file; if any are present,
// automatic MQTT topic assignment is disabled and the topics supplied in the config are used
std::map<std::string, ProcfsSensorBase*> *sensorMap = sGroup.buildMap();
parser->init(sensorMap, &cpuSet);
// Probing the target file to be monitored and counting the available metrics
parser->init();
std::vector<std::string> *availableMetrics = parser->readNames();
// Reading the number of CPU cores present in the system (if needed by the sensor group)
std::vector<short> *metricsCores = parser->readCPUs();
int numMetrics = parser->getNumMetrics();
// Automatic MQTT topic assignment is enabled only if no sensors were specified in the config file
bool autoMQTT = sensorMap->empty();
sGroup.setType(fileType);
// If no metrics were found in the file (or the file is unreadable) the configuration aborts
if(numMetrics == 0) {
LOG(warning) << _groupName << " " << sGroup.getGroupName() << "::" << "Unable to parse file " << filePath << "!";
return;
}
LOG(warning) << _groupName << " " << sGroup.getGroupName() << "::" << "Unable to parse file " << filePath << ", please check your configuration!";
sGroup.getSensors().clear();
return; }
LOG(debug) << " Number of metrics found: " << numMetrics;
// After getting the vector of available metrics (subset of the sensor map, if any) from the parsed file, we delete
// all sensors not matching any parsed metrics, and arrange their order to respect the one in the file
if(cpuSet.empty() && metricsCores != NULL && !metricsCores->empty()) {
cpuSet.insert(metricsCores->begin(), metricsCores->end());
cpuSet.erase(-1);
}
// Sensors specified in the config are duplicated, so that each CPU core has its own set of sensors
sGroup.prepareCpuSensors(&cpuSet, sensorMap);
unsigned int orphanSensors = sGroup.setupSensors(availableMetrics, sensorMap);
if( orphanSensors > 0 )
LOG(warning) << _groupName << " " << sGroup.getGroupName() << ":: " << orphanSensors << " sensors could not be matched to metrics";
// Assigning the parser to the sensor group
sGroup.setParser(parser);
// Vector that keeps track of the current number of metrics for each CPU core for which a MQTT topic has been assigned
// All core-independent metrics refer to position 0 in the vector
// All core-independent metrics refer to position 0 in the vector. Used only on automatic MQTT assignment
std::vector<unsigned int> *metricsCounter = new std::vector<unsigned int>(parser->getNumCPUs() + 1);
// Reading the number of CPU cores present in the system (if needed by the sensor group)
std::vector<short> *metricsCores = parser->readCPUs();
for(int i=0;i < metricsCounter->size(); i++) metricsCounter->at(i) = 0;
// Adding the sensors corresponding to availablMetrics
// Adding the sensors corresponding to availableMetrics
for(int i=0;i < numMetrics; i++) {
ProcfsSensorBase* sensor = new ProcfsSensorBase(availableMetrics->at(i));
ProcfsSensorBase *sensor = (ProcfsSensorBase*)sGroup.getSensors().at(i);
// If the metric does not refer to a specific CPU core, the topic is prefix + default mqttPart + suffix
// The suffix is increased automatically through the metricsCounter vector
if( metricsCores == NULL || ( metricsCores != NULL && metricsCores->at(i) == -1 )) {
sensor->setMqtt(_mqttPrefix + sGroup.getMqttPart() + this->increaseMqtt(mqttStart, metricsCounter->at(0)));
if ( autoMQTT ) sensor->setMqtt(this->increaseMqtt(mqttStart, metricsCounter->at(0)));
sensor->setMqtt(_mqttPrefix + sGroup.getMqttPart() + sensor->getMqtt());
metricsCounter->at(0)++;
}
// If the metrics refers to a specific CPU core, the topic is prefix + CPU core ID + suffix
// The suffix is automatically increased for each core through the metricsCounter vector
else {
// setMqtt is used twice for readability
sensor->setMqtt(this->increaseMqtt(mqttStart, metricsCounter->at(metricsCores->at(i) + 1)));
if ( autoMQTT ) sensor->setMqtt(this->increaseMqtt(mqttStart, metricsCounter->at(metricsCores->at(i) + 1)));
sensor->setMqtt(_mqttPrefix + this->formatMqttCPU(sGroup.getMqttPart(), metricsCores->at(i)) + sensor->getMqtt());
metricsCounter->at(metricsCores->at(i) + 1)++;
}
sGroup.pushBackSensor(sensor);
//LOG(debug) << sensor->getName() << " : " << sensor->getMqtt() << " -> " << metricsValues->at(i).value;
}
// De-allocating memory and restoring the parser to its default state
// De-allocating memory
delete metricsCounter;
parser->close();
cpuSet.clear();
//parser->close();
}
//TODO: move this up in ConfiguratorTemplate
std::set<short> ProcfsConfigurator::parseCpuString(const std::string& cpuString) {
std::set<short> cpus;
int maxCpu = 512;
std::vector<std::string> subStrings;
std::stringstream ssComma(cpuString);
std::string item;
while (std::getline(ssComma, item, ',')) {
subStrings.push_back(item);
}
for (auto s : subStrings) {
if (s.find('-') != std::string::npos) { //range of values (e.g. 1-5) specified
std::stringstream ssHyphen(s);
std::string min, max;
std::getline(ssHyphen, min, '-');
std::getline(ssHyphen, max);
try {
int minVal = stoi(min);
int maxVal = stoi(max);
for (int i = minVal; i <= maxVal; i++) {
if (i >= 0 && i < maxCpu) {
cpus.insert((short)i);
}
}
} catch (const std::exception& e) {
LOG(debug) << "Could not parse values \"" << min << "-" << max << "\"";
}
} else { //single value
try {
int val = stoi(s);
if (val >= 0 && val < maxCpu) {
cpus.insert((short)val);
}
} catch (const std::exception& e) {
LOG(debug) << "Could not parse value \"" << s << "\"";
}
}
}
if (cpus.empty()) {
LOG(warning) << " CPUs could not be parsed!";
} else {
std::stringstream sstream;
sstream << " CPUS: ";
for (auto i : cpus) {
sstream << i << ", ";
}
std::string msg = sstream.str();
msg.pop_back();
msg.pop_back();
LOG(debug) << msg;
}
return cpus;
}
......@@ -27,10 +27,10 @@ public:
virtual ~ProcfsConfigurator();
protected:
// There are no sensor or sensor entities to be read from the config file, only groups
// Therefore, we only deal with the sensorGroup method from ConfiguratorTemplate
// There are no sensor entities to be read from the config file, only groups
// Therefore, we only deal with the sensorGroup and sensorBase methods from ConfiguratorTemplate
void sensorGroup(ProcfsSensorGroup& sGroup, CFG_VAL config) override;
void sensorBase(ProcfsSensorBase& s, CFG_VAL config) override {}
void sensorBase(ProcfsSensorBase& s, CFG_VAL config) override;
// The readConfig method has to be overridden as the MQTT topic logic is custom
bool readConfig(std::string cfgPath) override;
......@@ -38,6 +38,8 @@ protected:
const std::string increaseMqtt(const std::string& mqtt, int val);
// Similar to the above, formats a numerical CPU core ID into an hex string
const std::string formatMqttCPU(const std::string& mqttPart, unsigned int val);
// Copy-paste of the method in the perfevent plugin to parse CPU id strings
std::set<short> parseCpuString(const std::string& cpuString);
};
// Functions required to correctly generate Configurator objects from linked libraries
......
This diff is collapsed.
......@@ -9,9 +9,11 @@
#ifndef PROCFSPARSER_H_
#define PROCFSPARSER_H_
#include "../../includes/SensorBase.h"
#include "ProcfsSensorBase.h"
#include <boost/regex.hpp>
#include <vector>
#include <map>
#include <set>
#include <string>
#include <string.h>
#include <exception>
......@@ -28,7 +30,7 @@ public:
~ProcfsParser();
// Init/close methods
bool init();
bool init(std::map<std::string, ProcfsSensorBase*> *sensorMap = NULL, std::set<short> *cpuSet = NULL);
virtual void close();
// Setters and getters
......@@ -45,7 +47,7 @@ public:
protected:
// Private parsing methods that implement all necessary logic to parse metrics' names (and associated CPU cores, if any)
// and values. MUST be overridden by children classes
virtual std::vector<std::string> *_readNames() { return NULL; };
virtual std::vector<std::string> *_readNames(std::map<std::string, ProcfsSensorBase*> *sensorMap, std::set<short> *cpuSet) { return NULL; };
virtual std::vector<reading_t> *_readMetrics() { return NULL; };
// Vectors containing the names of parsed metrics, associated CPU cores (if any), and values.
......@@ -55,6 +57,11 @@ protected:
std::vector<short> *_metricsCores;
std::vector<reading_t> *_metricsReadings;
// Keeps track of which lines and columns in the parsed file must be skipped
std::vector<bool> _skipLine;
// Every element in skipColumn can be either 0 (skip column), 1 (parse for all CPUs) or 2 (parse only at node level)
std::vector<char> _skipColumn;
// Internal variables
bool _initialized;
unsigned int _numMetrics;
......@@ -78,7 +85,7 @@ public:
VmstatParser(const std::string path = "") : ProcfsParser(path) { this->LINE_SEP = " \t"; if(path == "") this->_path = "/proc/vmstat"; else this->_path = path; }
protected:
std::vector<std::string> *_readNames() override;
std::vector<std::string> *_readNames(std::map<std::string, ProcfsSensorBase*> *sensorMap, std::set<short> *cpuSet) override;
std::vector<reading_t> *_readMetrics() override;
};
......@@ -108,11 +115,11 @@ public:
ProcstatParser(const std::string path = "") : ProcfsParser(path) { this->LINE_SEP = " \t"; if(path == "") this->_path = "/proc/stat"; else this->_path = path; }
protected:
std::vector<std::string> *_readNames() override;
std::vector<std::string> *_readNames(std::map<std::string, ProcfsSensorBase*> *sensorMap, std::set<short> *cpuSet) override;
std::vector<reading_t> *_readMetrics() override;
// Internal variables
unsigned short _actualMetrics;
unsigned short _numColumns;
boost::cmatch _match;
// The number of known metrics in each "cpu" line together with their names, as of Oct. 2018
enum { DEFAULTMETRICS = 10 };
......
......@@ -11,15 +11,43 @@
#include "../../includes/SensorBase.h"
/**
* The ProcfsSensorBase class does nothing right now, compared to SensorBase.
* The ProcfsSensorBase class adds two private members, cpuID and metric, to the SensorBase class.
*
*/
class ProcfsSensorBase : public SensorBase {
public:
// Constructor and destructor
ProcfsSensorBase(const std::string& name) : SensorBase(name) {}
ProcfsSensorBase(const std::string& name) : SensorBase(name) {
this->_metric = "";
this->_perCPU = false;
this->_cpuId = -1;
}
// Copy constructor
ProcfsSensorBase(ProcfsSensorBase& other) : SensorBase(other) {
this->_metric = other.getMetric();
this->_perCPU = other.isPerCPU();
}
virtual ~ProcfsSensorBase() {}
void setMetric(std::string m) { this->_metric = m; }
std::string getMetric() { return this->_metric; }
void setPerCPU(bool p) { this->_perCPU = p; }
bool isPerCPU() { return this->_perCPU; }
void setCPUId(int i) { this->_cpuId = i; }
int getCPUId() { return this->_cpuId; }
protected:
// The metric field is used to decouple the sensor's name from the corresponding metric contained within the proc file
std::string _metric;
bool _perCPU;
int _cpuId;
};
#endif /* PROCFSSENSOR_H_ */
......@@ -18,6 +18,101 @@ ProcfsSensorGroup::~ProcfsSensorGroup() {
delete this->_parser;
}
/**
* Builds a STL map data structure that has sensor names of the group as keys, and sensor pointers as values.
*
* The map is dynamically allocated. Remember to delete it after use.
*
* @return a map associating sensor names to sensor objects
*
*/
std::map<std::string, ProcfsSensorBase*> *ProcfsSensorGroup::buildMap() {
std::map<std::string, ProcfsSensorBase*> *sensorMap = new std::map<std::string, ProcfsSensorBase*>();
for( auto s : this->_sensors )
sensorMap->insert(std::make_pair(s->getMetric(), s));
return sensorMap;
}
/**
* Duplicates sensor objects related to per-CPU metrics according to the input cpu set.
*
* Input must be a map built by the buildMap() method, and a set of CPU ids. Then, each sensor object in the map that
* has the perCPU flag set to true, is duplicated N times and renamed accordingly, with N being the number of CPU ids
* contained in the CPU set. No action is performed for sensors that have perCPU set to false.
*
* @param sensorMap: A map of sensor names -> sensor objects built by the buildMap() method
* @param cpuSet: a set of CPU ids
*
*/
void ProcfsSensorGroup::prepareCpuSensors(std::set<short> *cpuSet, std::map<std::string, ProcfsSensorBase*> *sensorMap) {
// The map does not need to be modified if it is empty (obviously), or if no CPU ids are specified in cpuSet
if( sensorMap == NULL || sensorMap->empty() || cpuSet == NULL || cpuSet->empty() )
return;
// The duplicated sensors are stored in a temporary map
std::map<std::string, ProcfsSensorBase*> *tempMap = new std::map<std::string, ProcfsSensorBase*>();
for( auto s : *sensorMap ) {
// If the sensor must be sampled per-cpu core we duplicate it, otherwise we keep it as it is
tempMap->insert(std::make_pair(s.first, s.second));
if (s.second->isPerCPU()) {
for (auto cpu : *cpuSet) {
ProcfsSensorBase *tempSensor = new ProcfsSensorBase(*s.second);
tempSensor->setName("cpu" + std::to_string(cpu) + "_" + tempSensor->getName());
tempSensor->setMetric("cpu" + std::to_string(cpu) + "_" + tempSensor->getMetric());
tempMap->insert(std::make_pair(tempSensor->getMetric(), tempSensor));
}
}
}
// Transferring the new sensors to the original sensorMap and deleting the tempMap
sensorMap->clear();
for( auto s : *tempMap ) sensorMap->insert(std::make_pair(s.first, s.second));
tempMap->clear();
delete tempMap;
}
/**
* Builds the internal vector of sensor objects so that it respects the order given by the "names" input vector.
*
* Input must be a vector of strings indicating the final order of sensors based on their name, and a map built
* by the buildMap() method. Be aware that in the process the original sensor vector is cleared, and only
* the information in the map is used. At the end of the procedure the map is cleared and deleted.
*
* @param names: a vector of strings indicating the final order of sensors based on their names
* @param sensorMap: A map of sensor names -> sensor objects built by the buildMap() method
*
* @return 0 on success, or the number of orphaned sensors (not matched in the names vector) if any
*/
unsigned int ProcfsSensorGroup::setupSensors(std::vector<std::string> *names, std::map<std::string, ProcfsSensorBase*> *sensorMap) {
// We clean up the original sensors (delete will be performed only for the sensors that must be discarded, later)
this->_sensors.clear();
this->_baseSensors.clear();
unsigned int orphans = 0;
// If a sensor map was specified, only these sensors are added to the group
if( sensorMap != NULL && !sensorMap->empty() ) {
for(int i = 0; i < names->size(); i++) {
// We assume that names is a subset of sensorMap, and that all entries of the first are in the second
this->pushBackSensor(sensorMap->at(names->at(i)));
sensorMap->erase(names->at(i));
}
// We delete all sensors that were not present in the "names" vector and were therefore not found in the file
if ( ( orphans = sensorMap->size() ) > 0 ) {
for (auto s : *sensorMap)
delete s.second;
sensorMap->clear();
}
}
// If no map was specified, we build all-new sensors depending on the "names" vector, as parsed from the file
else {
for(int i=0; i < names->size(); i++)
this->pushBackSensor(new ProcfsSensorBase(names->at(i)));
}
if ( sensorMap != NULL ) delete sensorMap;
return orphans;
}
/**
* Starts up the Procfs sensor group.
*
......@@ -31,7 +126,7 @@ void ProcfsSensorGroup::start() {
}
// If a parser object has not been set, the sensor group cannot be initialized
if( this->_parser != NULL ) {
this->_parser->init();
//this->_parser->init();
_keepRunning = 1;
_pendingTasks++;
_timer->async_wait(std::bind(&ProcfsSensorGroup::readAsync, this));
......@@ -55,7 +150,7 @@ void ProcfsSensorGroup::stop() {
_keepRunning = 0;
wait();
// Closing the internal parser object
if( this->_parser != NULL ) this->_parser->close();
//if( this->_parser != NULL ) this->_parser->close();
LOG(info) << "Sensorgroup " << _groupName << " stopped.";
}
......