Commit 476dec13 authored by Alessio Netti's avatar Alessio Netti
Browse files

Hyperthreading aggregation in ProcFS

- Virtual threads can now be automatically aggregated for procstat and
sar groups
- the htVal parameter is the number of physical threads; corresponding
virtual threads are determined via modulo arithmetic
- Minor changes to AnalyticsController in collectagent
parent 03527a09
......@@ -14,15 +14,18 @@ void AnalyticsController::stop() {
_keepRunning = false;
LOG(info) << "Stopping sensors...";
_manager->stop();
_manager->clear();
LOG(info) << "Stopping data analytics management thread...";
_mainThread.join();
LOG(info) << "Stopping worker threads...";
_keepAliveWork.reset();
_threads.join_all();
_initialized = false;
}
//TODO: error checking on _io
restResponse_t AnalyticsController::REST(const vector<string>& pathStrs, const vector<pair<string,string>>& queries, const string& method) {
if(_initialized)
throw runtime_error("Cannot forward REST command, AnalyticsController is not initialized!");
return _manager->REST(pathStrs, queries, method, _io);
}
......
......@@ -118,6 +118,8 @@ void ProcfsConfigurator::sensorGroup(ProcfsSensorGroup& sGroup, CFG_VAL config)
sGroup.setMqttStart(val.second.data());
} else if (boost::iequals(val.first, "cpus")) {
sGroup.setCpuSet(parseCpuString(val.second.data()));
} else if (boost::iequals(val.first, "htVal")) {
sGroup.setHtVal(stoi(val.second.data()));
}
}
......@@ -145,11 +147,11 @@ void ProcfsConfigurator::sensorGroup(ProcfsSensorGroup& sGroup, CFG_VAL config)
// After getting the vector of available metrics (subset of the sensor map, if any) from the parsed file, we delete
// all sensors not matching any parsed metrics, and arrange their order to respect the one in the file
derivedSensors = sGroup.getDerivedSensors();
parser->init(&derivedSensors, sGroup.getCpuSet());
parser->setHtVal(sGroup.getHtVal());
// If no metrics were found in the file (or the file is unreadable) the configuration aborts
int numMetrics = parser->getNumMetrics();
if(numMetrics == 0) {
int numMetrics = 0;
if(!parser->init(&derivedSensors, sGroup.getCpuSet()) || (numMetrics = parser->getNumMetrics()) == 0) {
LOG(warning) << _groupName << " " << sGroup.getGroupName() << "::" << "Unable to parse file " << filePath << ", please check your configuration!";
sGroup.getSensors().clear();
sGroup.getDerivedSensors().clear();
......
This diff is collapsed.
......@@ -26,6 +26,14 @@
class ProcfsParser {
public:
struct ProcLine {
bool skip;
char columns;
int dest;
int cpuID;
};
// Constructor and destructor
ProcfsParser(std::string path);
virtual ~ProcfsParser();
......@@ -35,12 +43,14 @@ public:
virtual void close();
// Setters and getters
std::string getPath() { return this->_path; }
void setPath(std::string new_path) { this->_path = new_path; this->close(); }
unsigned int getNumMetrics() { return this->_numMetrics; }
unsigned int getNumCPUs() { return this->_numCPUs; }
unsigned int getCacheIndex() { return this->_cacheIndex; }
void setCacheIndex(unsigned int c) { this->_cacheIndex = c; }
std::string getPath() { return _path; }
int getHtVal() { return _htVal; }
unsigned int getNumMetrics() { return _numMetrics; }
unsigned int getNumCPUs() { return _numCPUs; }
unsigned int getCacheIndex() { return _cacheIndex; }
void setCacheIndex(unsigned int c) { _cacheIndex = c; }
void setHtVal(int v) { _htVal = v; _htAggr = v>0; }
void setPath(std::string new_path) { _path = new_path; close(); }
std::vector<ProcfsSBPtr> *getSensors();
std::vector<reading_t> *readSensors();
......@@ -54,10 +64,12 @@ protected:
// Vector containing the names of parsed metrics, associated CPU cores (if any), and values.
// After initialization, _sensors is always non-NULL.
std::vector<ProcfsSBPtr> *_sensors;
// Vector of performed readings
std::vector<reading_t> *_readings;
// Keeps track of which lines and columns in the parsed file must be skipped
std::vector<bool> _skipLine;
std::vector<ProcLine> _lines;
// Pointer to the current line being parsed
ProcLine* _l;
// Every element in skipColumn can be either 0 (skip column), 1 (parse for all CPUs) or 2 (parse only at node level)
std::vector<char> _skipColumn;
......@@ -65,7 +77,10 @@ protected:
bool _initialized;
unsigned int _cacheIndex;
unsigned int _numMetrics;
unsigned int _numInternalMetrics;
unsigned int _numCPUs;
int _htVal;
bool _htAggr;
std::string _path;
FILE *_metricsFile;
char *_stringBuffer;
......@@ -82,7 +97,10 @@ protected:
class MeminfoParser : public ProcfsParser {
public:
MeminfoParser(const std::string path = "") : ProcfsParser(path) { this->LINE_SEP = " :\t"; if(path == "") this->_path = "/proc/meminfo"; else this->_path = path; }
MeminfoParser(const std::string path="") : ProcfsParser(path) {
LINE_SEP = " :\t";
_path = (path=="" ? "/proc/meminfo" : path);
}
protected:
bool _readNames(std::map<std::string, ProcfsSBPtr> *sensorMap, std::set<int> *cpuSet) override;
......@@ -107,7 +125,10 @@ protected:
class VmstatParser : public MeminfoParser {
public:
VmstatParser(const std::string path = "") : MeminfoParser(path) { this->LINE_SEP = " \t"; if(path == "") this->_path = "/proc/vmstat"; else this->_path = path; }
VmstatParser(const std::string path="") : MeminfoParser(path) {
LINE_SEP = " \t";
_path = (path=="" ? "/proc/vmstat" : path);
}
};
// **************************************************************
......@@ -120,8 +141,11 @@ public:
class ProcstatParser : public ProcfsParser {
public:
ProcstatParser(const std::string path = "") : ProcfsParser(path) { this->LINE_SEP = " \t"; if(path == "") this->_path = "/proc/stat"; else this->_path = path; }
ProcstatParser(const std::string path = "") : ProcfsParser(path) {
LINE_SEP = " \t";
_path = (path=="" ? "/proc/stat" : path);
}
protected:
bool _readNames(std::map<std::string, ProcfsSBPtr> *sensorMap, std::set<int> *cpuSet) override;
bool _readMetrics() override;
......@@ -132,11 +156,10 @@ protected:
// The number of known metrics in each "cpu" line together with their names, as of Oct. 2018
enum { DEFAULTMETRICS = 10 };
const std::string DEFAULT_NAMES[DEFAULTMETRICS] = {"col_user", "col_nice", "col_system", "col_idle", "col_iowait", "col_irq", "col_softirq", "col_steal", "col_guest", "col_guest_nice"};
// Regex to match strings beginning with the "cpu" keyword
// Lookup vector that keeps track of where all the sensors for a specific CPU are mapped in the reading vector
// C strings that encode the prefixes of cpu-related lines
const char *_cpu_prefix = "cpu";
const unsigned short _cpu_prefix_len = strlen(this->_cpu_prefix);
const unsigned short _cpu_prefix_len = strlen(_cpu_prefix);
// Regex to match strings ending with integer numbers (in this case, CPU core IDs)
const boost::regex reg_exp_num = boost::regex("[0-9]+$");
};
......@@ -159,12 +182,13 @@ public:
protected:
bool _readMetrics() override;
// Stores the readings for all columns of a given CPU line; used to later compute percentages
unsigned long long *_columnBuffer;
// Accumulates the readings for all columns of a given CPU line; used to later compute percentages
unsigned long long *_accumulators;
// Matrix storing the readings of columns from all CPU lines; used to compute differences between readings
unsigned long long *_columnRawReadings;
unsigned long long _accumulator;
unsigned long long _latestValue;
unsigned long long _latestBuffer;
};
#endif /* PROCFSPARSER_H_ */
......@@ -17,35 +17,34 @@
class ProcfsSensorBase : public SensorBase {
public:
// Constructor and destructor
ProcfsSensorBase(const std::string& name) : SensorBase(name) {
this->_metric = "";
this->_perCPU = false;
this->_cpuId = -1;
_metric = "";
_perCPU = false;
_cpuId = -1;
}
ProcfsSensorBase(const std::string& name, const std::string& metric, bool percpu = false, int cpuid = -1) : SensorBase(name) {
this->_metric = metric;
this->_perCPU = percpu;
this->_cpuId = cpuid;
ProcfsSensorBase(const std::string& name, const std::string& metric, bool percpu=false, int cpuid=-1) : SensorBase(name) {
_metric = metric;
_perCPU = percpu;
_cpuId = cpuid;
}
// Copy constructor
ProcfsSensorBase(ProcfsSensorBase& other) : SensorBase(other) {
this->_metric = other.getMetric();
this->_perCPU = other.isPerCPU();
this->_cpuId = other.getCPUId();
_metric = other.getMetric();
_perCPU = other.isPerCPU();
_cpuId = other.getCPUId();
}
virtual ~ProcfsSensorBase() {}
void setMetric(std::string m, int cpuID=-1) { this->_metric = SensorBase::formatName(m, cpuID); }
std::string getMetric() { return this->_metric; }
void setPerCPU(bool p) { this->_perCPU = p; }
bool isPerCPU() { return this->_perCPU; }
void setCPUId(int i) { this->_cpuId = i; }
int getCPUId() { return this->_cpuId; }
void setMetric(std::string m, int cpuID=-1) { _metric = SensorBase::formatName(m, cpuID); }
void setPerCPU(bool p) { _perCPU = p; }
void setCPUId(int i) { _cpuId = i; }
std::string getMetric() { return _metric; }
int getCPUId() { return _cpuId; }
bool isPerCPU() { return _perCPU; }
void printConfig(LOG_LEVEL ll, LOGGER& lg, unsigned leadingSpaces=16) {
std::string leading(leadingSpaces, ' ');
......@@ -55,13 +54,10 @@ public:
}
protected:
// The metric field is used to decouple the sensor's name from the corresponding metric contained within the proc file
// The metric field is used to decouple the sensor's name from its metric within the proc file
std::string _metric;
bool _perCPU;
int _cpuId;
};
using ProcfsSBPtr = std::shared_ptr<ProcfsSensorBase>;
......
......@@ -28,8 +28,8 @@ ProcfsSensorGroup& ProcfsSensorGroup::operator=(const ProcfsSensorGroup& other)
*
*/
ProcfsSensorGroup::~ProcfsSensorGroup() {
if( this->_parser != NULL)
delete this->_parser;
if(_parser != NULL)
delete _parser;
}
/**
......@@ -43,16 +43,24 @@ ProcfsSensorGroup::~ProcfsSensorGroup() {
*/
void ProcfsSensorGroup::replaceSensors(std::vector<ProcfsSBPtr> *newSensors) {
std::set<std::string> sensorSet = std::set<std::string>();
for( const auto& s_new : *newSensors ) sensorSet.insert( s_new->getMetric() );
for( const auto& s : this->_sensors ) {
if( sensorSet.find(s->getMetric()) == sensorSet.end() )
LOG(warning) << _groupName << "::Sensor " << s->getName() << " could not be matched to any metric!";
std::set<int> effCpuSet;
for(const auto& s_new : *newSensors) {
sensorSet.insert(s_new->getMetric());
effCpuSet.insert(s_new->getCPUId());
}
for(const auto& s : _sensors)
if(sensorSet.find(s->getMetric()) == sensorSet.end())
LOG(warning) << _groupName << "::Sensor " << s->getName() << " could not be matched to any metric!";
for(const auto& c : _cpuSet)
if(c!=-1 && effCpuSet.find(c) == effCpuSet.end())
LOG(warning) << _groupName << "::CPU ID " << c << " could not be found!";
sensorSet.clear();
this->_sensors.clear();
this->_baseSensors.clear();
for( auto s : *newSensors ) this->pushBackSensor(std::make_shared<ProcfsSensorBase>(*s));
effCpuSet.clear();
_sensors.clear();
_baseSensors.clear();
for( auto s : *newSensors ) pushBackSensor(std::make_shared<ProcfsSensorBase>(*s));
}
/**
......@@ -67,7 +75,7 @@ void ProcfsSensorGroup::start() {
return;
}
// If a parser object has not been set, the sensor group cannot be initialized
if( this->_parser != NULL ) {
if(this->_parser != NULL) {
//this->_parser->init();
// Crude debugging stuff
......@@ -112,20 +120,20 @@ void ProcfsSensorGroup::read() {
// Read values from the target file using the parser's _readMetrics() method
// If an error is encountered (NULL return value) we abort here
// Sensors are automatically updated from within the parser
this->_readingVector = this->_parser->readSensors();
this->_readingBuffer.timestamp = getTimestamp();
if( this->_readingVector == NULL ) {
LOG(error) << _groupName << "::" << "Could not read values from " << this->_type << "!";
_readingVector = _parser->readSensors();
_readingBuffer.timestamp = getTimestamp();
if(_readingVector == NULL) {
LOG(error) << _groupName << "::" << "Could not read values from " << _type << "!";
return;
}
else {
for(unsigned int i=0; i < this->_sensors.size(); i++) {
this->_readingBuffer.value = this->_readingVector->at(i).value;
this->_sensors.at(i)->storeReading(this->_readingBuffer);
for(unsigned int i=0; i<_sensors.size(); i++) {
_readingBuffer.value = _readingVector->at(i).value;
_sensors.at(i)->storeReading(_readingBuffer);
}
}
#ifdef DEBUG
for(unsigned int i=0; i < this->_parser->getNumMetrics(); i++)
for(unsigned int i=0; i<_parser->getNumMetrics(); i++)
LOG(debug) << _groupName << "::" << _sensors[i]->getName() << ": \"" << _sensors[i]->getLatestValue().value << "\"";
#endif
}
......@@ -147,12 +155,9 @@ void ProcfsSensorGroup::readAsync() {
}
void ProcfsSensorGroup::printConfig(LOG_LEVEL ll) {
if (_parser) {
LOG_VAR(ll) << " Parser set";
} else {
LOG_VAR(ll) << " No Parser set!";
}
LOG_VAR(ll) << " Type: " << _type;
LOG_VAR(ll) << " Path: " << _path;
LOG_VAR(ll) << " HTVal: " << _htVal;
LOG_VAR(ll) << " MQTT start: " << _mqttStart;
LOG_VAR(ll) << (_parser ? " Parser set" : " No Parser set!");
}
......@@ -21,7 +21,7 @@ class ProcfsSensorGroup : public SensorGroupTemplate<ProcfsSensorBase> {
public:
// Constructor and destructor
ProcfsSensorGroup(const std::string& name) : SensorGroupTemplate(name) { this->_parser = NULL; }
ProcfsSensorGroup(const std::string& name) : SensorGroupTemplate(name) { this->_parser = NULL; this->_htVal=0; }
ProcfsSensorGroup& operator=(const ProcfsSensorGroup& other);
virtual ~ProcfsSensorGroup();
......@@ -30,12 +30,14 @@ public:
void stop() override;
// Setters and getters
void setHtVal(int htVal) { this->_htVal = htVal; }
void setParser(ProcfsParser *parser) { this->_parser = parser; }
void setType(std::string t) { this->_type = t; }
void setPath(std::string p) { this->_path = p; }
void setMqttStart(std::string m) { this->_mqttStart = m; }
void setCpuSet(std::set<int> s) { this->_cpuSet = s; }
int getHtVal() { return this->_htVal; }
ProcfsParser *getParser() { return this->_parser; }
std::string getType() { return this->_type; }
std::string getPath() { return this->_path; }
......@@ -61,6 +63,8 @@ private:
std::string _path;
// Start MQTT part to use when building sensors automatically
std::string _mqttStart;
// Aggregation value for hyperthreading cores
int _htVal;
// Set of cpu ids read during configuration
std::set<int> _cpuSet;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment