Commit 0fda2236 authored by lu43jih's avatar lu43jih
Browse files

Working on Gpfsmon plugin

parent fadec908
......@@ -17,8 +17,7 @@ CXXFLAGS = -std=c++11 -DBOOST_DATE_TIME_POSIX_TIME_STD_CONFIG -DBOOST_NETWORK_EN
LIBS = -L../deps/mosquitto_build/lib -L$(DCDBDEPLOYPATH)/lib/ -ldl -lmosquitto -lboost_system -lboost_thread -lboost_log_setup -lboost_log -lpthread -lcrypto -lssl -lcppnetlib-server-parsers -lcppnetlib-uri -rdynamic
OBJS = src/dcdbpusher.o src/Configuration.o src/MQTTPusher.o src/HttpsServer.o
PLUGINS = procfs pdu sysfs ipmi bacnet snmp
#gpfsmon
PLUGINS = procfs pdu sysfs ipmi bacnet snmp gpfsmon
ifeq ($(OS),Darwin)
BACNET_PORT = bsd
......@@ -98,9 +97,10 @@ libdcdbplugin_snmp.$(LIBEXT): src/sensors/snmp/SNMPSensorGroup.o src/sensors/snm
libdcdbplugin_procfs.$(LIBEXT): src/sensors/procfs/ProcfsSensorGroup.o src/sensors/procfs/ProcfsParser.o src/sensors/procfs/ProcfsConfigurator.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lboost_regex
#libdcdbplugin_gpfsmon.$(LIBEXT): src/sensors/gpfsmon/gpfsmonSensorGroup.o src/sensors/gpfsmon/gpfsmonConfigurator.o
# $(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system
libdcdbplugin_gpfsmon.$(LIBEXT): src/sensors/gpfsmon/GpfsmonSensorGroup.o
$(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system
#src/sensors/gpfsmon/gpfsmonConfigurator.o
#libdcdbplugin_opa.$(LIBEXT): src/sensors/opa/OpaSensorGroup.o src/sensors/opa/OpaConfigurator.o
# $(CXX) $(LIBFLAGS)$@ -o $@ $^ -L$(DCDBDEPLOYPATH)/lib/ -lboost_log -lboost_system -lopamgt -libverbs -libumad -lssl
......@@ -10,38 +10,55 @@
#include "../../includes/SensorBase.h"
/*
* TODO
* Add plugin specific includes
*/
#include <regex>
class gpfsmonSensorBase : public SensorBase {
enum GPFS_METRIC {
TIMESTAMP_GPFS=0,
IOBYTESREAD=1,
IOBYTESWRITE=2,
IOOPENS=3,
IOCLOSES=4,
IOREADS=5,
IOWRITES=6,
READDIR=7,
INODE_UPDATES=8,
SIZE = INODE_UPDATES +1
};
class GpfsmonSensorBase : public SensorBase {
public:
gpfsmonSensorBase(const std::string& name) :
SensorBase(name) {
/*
* TODO
* Initialize plugin specific attributes
*/
GpfsmonSensorBase(const std::string& name, GPFS_METRIC metric_type, const std::string& node_name) :
SensorBase(name), _metric_type(metric_type), _node_name(node_name) {
setDelta(true);
}
virtual ~gpfsmonSensorBase() {
virtual ~GpfsmonSensorBase() {
/*
* TODO
* If necessary, deconstruct plugin specific attributes
*/
}
/*
* TODO
* Getters and Setters for plugin specific attributes
*/
GPFS_METRIC getMetricType() const {
return _metric_type;
}
void setMetricType(GPFS_METRIC metricType) {
_metric_type = metricType;
}
const std::string& getNodeName() const {
return _node_name;
}
void setNodeName(const std::string& nodeName) {
_node_name = nodeName;
}
protected:
/*
* TODO
* Add plugin specific attributes here
*/
GPFS_METRIC _metric_type;
std::string _node_name;
};
......
/*
* gpfsmonSensorGroup.cpp
*
* Created on: 26.11.2018
* Author: Your name goes here!
*/
#include "GpfsmonSensorGroup.h"
#include "timestamp.h"
#include <algorithm>
#include <fstream>
/**
* Comparation functor for GpfsmonSensorBase.
* No lambda used since the same function is needed in more than one place...
*/
struct SensorCompare {
bool operator()(const Gpfs_SB& left, const Gpfs_SB & rhs) const;
};
bool SensorCompare::operator()(const Gpfs_SB& lhs, const Gpfs_SB & rhs) const {
return lhs->getNodeName() < rhs->getNodeName() && lhs->getMetricType() < rhs->getMetricType();
}
GpfsmonSensorGroup::GpfsmonSensorGroup(const std::string& name) :
SensorGroupTemplate(name) {
createTempFile();
searchDummy = std::make_shared<GpfsmonSensorBase>("", GPFS_METRIC::SIZE, "" );
}
GpfsmonSensorGroup::~GpfsmonSensorGroup() {
/*
* TODO
* Tear down attributes
*/
}
void GpfsmonSensorGroup::start() {
if (_keepRunning) {
//we have been started already
LOG(info) << "Sensorgroup " << _groupName << " already running.";
return;
}
std::sort(_sensors.begin(), _sensors.end(),SensorCompare());
_keepRunning = 1;
_pendingTasks++;
_timer->async_wait(std::bind(&GpfsmonSensorGroup::readAsync, this));
LOG(info) << "Sensorgroup " << _groupName << " started.";
}
void GpfsmonSensorGroup::stop() {
_keepRunning = 0;
/*
* TODO
* Stop plugin specific stuff
*/
LOG(info) << "Sensorgroup " << _groupName << " stopped.";
}
void GpfsmonSensorGroup::read() {
reading_t reading;
reading.timestamp = getTimestamp();
try {
std::string toparse;
FILE *pf = popen(cmd_io.c_str(),"r");
if (pf != nullptr) {
char buf[BUFFER_SIZE];
while (fgets(buf, BUFFER_SIZE, pf) != nullptr) {
toparse= std::string(buf);
//ToDo parse nodename here
std::string nodename = "blabla";
std::string::size_type bytereads_pos = toparse.find("_br_ ");
std::string::size_type bytewrite_pos = toparse.find(" _bw_ ");
if( bytereads_pos != std::string::npos && bytewrite_pos != std::string::npos){
//cout << "token_substr = " << toparse.substr(bytereads_pos + 5, bytewrite_pos - bytereads_pos) << endl;
data[IOBYTESREAD] = std::stoull(toparse.substr(bytereads_pos + 5, bytewrite_pos - bytereads_pos));
//cout << "iobytesread = " << iobytesread << endl;
} else {
break;
}
std::string::size_type opens_pos = toparse.find(" _oc_ ");
if( opens_pos != std::string::npos ){
//cout << "token_substr = " << toparse.substr(bytewrite_pos + 6, opens_pos - bytewrite_pos) << endl;
data[IOBYTESWRITE] = std::stoull(toparse.substr(bytewrite_pos + 6, opens_pos - bytewrite_pos));
//cout << "iobyteswrite = " << iobyteswrite << endl;
} else {
break;
}
std::string::size_type closes_pos = toparse.find(" _cc_ ");
if( closes_pos != std::string::npos){
//cout << "token_substr = " << toparse.substr(opens_pos + 6, closes_pos - opens_pos) << endl;
data[IOOPENS] = std::stoull(toparse.substr(opens_pos + 6, closes_pos - opens_pos));
//cout << "ioopens = " << ioopens << endl;
} else {
break;
}
std::string::size_type reads_pos = toparse.find(" _rdc_ ");
if( reads_pos != std::string::npos ){
//cout << "token_substr = " << toparse.substr(closes_pos + 6, reads_pos - closes_pos) << endl;
data[IOCLOSES] = std::stoull(toparse.substr(closes_pos + 6, reads_pos - closes_pos));
//cout << "iocloses = " << iocloses << endl;
} else {
break;
}
std::string::size_type writes_pos = toparse.find(" _wc_ ");
if( writes_pos != std::string::npos ){
//cout << "token_substr = " << toparse.substr(reads_pos + 7, writes_pos - reads_pos) << endl;
data[IOREADS] = std::stoull(toparse.substr(reads_pos + 7, writes_pos - reads_pos));
//cout << "ioreads = " << ioreads << endl;
} else {
break;
}
std::string::size_type dir_pos = toparse.find(" _dir_");
if( dir_pos != std::string::npos ){
//cout << "token_substr = " << toparse.substr(writes_pos + 6, dir_pos - writes_pos) << endl;
data[IOWRITES] = std::stoull(toparse.substr(writes_pos + 6, dir_pos - writes_pos));
//cout << "iowrites = " << iowrites << endl;
} else {
break;
}
for(unsigned int i=0; i< static_cast<unsigned int>(GPFS_METRIC::SIZE); ++i){
reading.value = data[i];
searchDummy->setNodeName(nodename);
searchDummy->setMetricType(static_cast<GPFS_METRIC>(i));
auto found = std::lower_bound(_sensors.begin(), _sensors.end(), searchDummy, SensorCompare());
if(found != _sensors.end()){
(*found)->storeReading(reading, _cacheSize);
#ifdef DEBUG
LOG(debug) << _groupName << "::" << s->getName() << ": \"" << reading.value << "\"";
#endif
}
}
}
}
} catch (const std::exception& e) {
LOG(error) << "Sensorgroup" << _groupName << " could not read value: " << e.what();
}
}
void GpfsmonSensorGroup::readAsync() {
uint64_t now = getTimestamp();
read();
if (_timer && _keepRunning) {
uint64_t next = now + MS_TO_NS(_interval);
_timer->expires_at(timestamp2ptime(next));
_pendingTasks++;
_timer->async_wait(std::bind(&GpfsmonSensorGroup::readAsync, this));
}
_pendingTasks--;
}
void GpfsmonSensorGroup::createTempFile(){
std::ofstream gpfsmonFile;
gpfsmonFile.open("/tmp/gpfsmon");
if(gpfsmonFile.is_open()){
gpfsmonFile << "io_s\n";
gpfsmonFile.close();
} else {
LOG(error) << "Gpfsmon: unable to create temporary file for mmpmon";
}
}
......@@ -10,30 +10,32 @@
#include "../../includes/SensorGroupTemplate.h"
#include "gpfsmonSensorBase.h"
#include "GpfsmonSensorBase.h"
class gpfsmonSensorGroup : public SensorGroupTemplate<gpfsmonSensorBase> {
using Gpfs_SB = std::shared_ptr<GpfsmonSensorBase>;
class GpfsmonSensorGroup : public SensorGroupTemplate<GpfsmonSensorBase> {
public:
gpfsmonSensorGroup(const std::string& name);
virtual ~gpfsmonSensorGroup();
GpfsmonSensorGroup(const std::string& name);
virtual ~GpfsmonSensorGroup();
void init(boost::asio::io_service& io) override;
void start() override;
void stop() override;
/*
* TODO
* Add getter and setters for group attributes if required
*/
const static int BUFFER_SIZE=255; //!< constant buffer that is parse by a line given from the popen command
private:
void read() override;
void readAsync() override;
/*
* TODO
* Add group internal attributes
*/
void createTempFile();
const std::string &cmd_io = "/usr/lpp/mmfs/bin/mmpmon -p -i /tmp/gpfsmon";
Gpfs_SB searchDummy;
std::vector<uint64_t> data; //todo change file
};
#endif /* GPFSMON_GPFSMONSENSORGROUP_H_ */
......@@ -18,7 +18,7 @@ gpfsmonConfigurator::gpfsmonConfigurator() {
gpfsmonConfigurator::~gpfsmonConfigurator() {}
void gpfsmonConfigurator::sensorBase(gpfsmonSensorBase& s, CFG_VAL config) {
void gpfsmonConfigurator::sensorBase(GpfsmonSensorBase& s, CFG_VAL config) {
ADD {
/*
* TODO
......@@ -28,7 +28,7 @@ void gpfsmonConfigurator::sensorBase(gpfsmonSensorBase& s, CFG_VAL config) {
}
}
void gpfsmonConfigurator::sensorGroup(gpfsmonSensorGroup& s, CFG_VAL config) {
void gpfsmonConfigurator::sensorGroup(GpfsmonSensorGroup& s, CFG_VAL config) {
ADD {
/*
* TODO
......
......@@ -9,9 +9,9 @@
#define GPFSMON_GPFSMONCONFIGURATOR_H_
#include "../../includes/ConfiguratorTemplate.h"
#include "gpfsmonSensorGroup.h"
#include "GpfsmonSensorGroup.h"
class gpfsmonConfigurator : public ConfiguratorTemplate<gpfsmonSensorBase, gpfsmonSensorGroup> {
class gpfsmonConfigurator : public ConfiguratorTemplate<GpfsmonSensorBase, GpfsmonSensorGroup> {
public:
gpfsmonConfigurator();
......@@ -19,8 +19,8 @@ public:
protected:
/* Overwritten from ConfiguratorTemplate */
void sensorBase(gpfsmonSensorBase& s, CFG_VAL config) override;
void sensorGroup(gpfsmonSensorGroup& s, CFG_VAL config) override;
void sensorBase(GpfsmonSensorBase& s, CFG_VAL config) override;
void sensorGroup(GpfsmonSensorGroup& s, CFG_VAL config) override;
};
extern "C" ConfiguratorInterface* create() {
......
/*
* gpfsmonSensorGroup.cpp
*
* Created on: 26.11.2018
* Author: Your name goes here!
*/
#include "gpfsmonSensorGroup.h"
#include "timestamp.h"
gpfsmonSensorGroup::gpfsmonSensorGroup(const std::string& name) :
SensorGroupTemplate(name) {
/*
* TODO
* Init attributes
*/
}
gpfsmonSensorGroup::~gpfsmonSensorGroup() {
/*
* TODO
* Tear down attributes
*/
}
void gpfsmonSensorGroup::start() {
if (_keepRunning) {
//we have been started already
LOG(info) << "Sensorgroup " << _groupName << " already running.";
return;
}
/*
* TODO
* Start plugin specific stuff
*/
_keepRunning = 1;
_pendingTasks++;
_timer->async_wait(std::bind(&gpfsmonSensorGroup::readAsync, this));
LOG(info) << "Sensorgroup " << _groupName << " started.";
}
void gpfsmonSensorGroup::stop() {
_keepRunning = 0;
/*
* TODO
* Stop plugin specific stuff
*/
LOG(info) << "Sensorgroup " << _groupName << " stopped.";
}
void gpfsmonSensorGroup::read() {
reading_t reading;
reading.timestamp = getTimestamp();
try {
for(auto s : _sensors) {
reading.value = /*
* TODO
* Read a value for every sensor affiliated with this group and store
* it with the appropriate sensor.
*/ 0;
s->storeReading(reading, _cacheSize);
#ifdef DEBUG
LOG(debug) << _groupName << "::" << s->getName() << ": \"" << reading.value << "\"";
#endif
}
} catch (const std::exception& e) {
LOG(error) << "Sensorgroup" << _groupName << " could not read value: " << e.what();
}
}
void gpfsmonSensorGroup::readAsync() {
uint64_t now = getTimestamp();
read();
if (_timer && _keepRunning) {
uint64_t next = now + MS_TO_NS(_interval);
_timer->expires_at(timestamp2ptime(next));
_pendingTasks++;
_timer->async_wait(std::bind(&gpfsmonSensorGroup::readAsync, this));
}
_pendingTasks--;
}
/*
* mqttutils.cpp
*
* Created on: 13.08.2018
* Author: lu43jih
*/
#include "mqttutils.h"
#include <algorithm>
#include <fstream>
#include <sstream>
#include <string>
#include <set>
std::string intTo2DigitHex(int myInt){
std::stringstream stream;
if(myInt < 16){
stream << "0";
}
stream << std::hex << myInt;
std::string out = stream.str();
std::transform(out.begin(), out.end(), out.begin(),
[](unsigned char c){ return std::toupper(c); }
);
return out;
}
std::string createIDFromMqttParts(std::string & _mqttPrefix, int cpuId, const std::string & mqttSuffix){
std::stringstream ss;
ss << _mqttPrefix << intTo2DigitHex(cpuId) << mqttSuffix << '/';
return ss.str();
}
void parseIdListFile(std::string filename, std::map<mqtt_id_t, metric_internalid>& mqttToIds, std::set<std::string> &metrics){
std::ifstream file;
file.exceptions(std::ifstream::badbit);
try {
file.open(filename);
std::string line;
const std::string whitespace = " \t\n\v\f\r";
while (std::getline(file, line)){
std::size_t found = line.find_first_not_of(whitespace);
if(found != std::string::npos && line[found] != ';'){
std::istringstream iss(line);
std::string metric, mqtt_suffix_s, toolid_s;
if (!(iss >> metric >> mqtt_suffix_s >> toolid_s)) { continue; } //error...
auto x_pos = mqtt_suffix_s.find('x'); //removing 0x part from hex string (example: 0x1D -> 1D)
if(x_pos != std::string::npos && x_pos + 1 != std::string::npos){
mqtt_suffix_s = mqtt_suffix_s.substr(x_pos + 1);
}
if(metrics.find(metric) != metrics.end()){
metric_internalid mtid;
mtid.metric_name =metric;
mtid.internalid=std::stoull(toolid_s, nullptr, 16);
mqttToIds[mqtt_suffix_s] = mtid;
}
}
}
} catch (const std::ifstream::failure& e){
throw e;
}
}
void parseMetricList(std::string line, std::set<std::string> & list, const char delimiter) {
std::string str(line);
// Skip delimiters at beginning.
std::string::size_type lastPos = str.find_first_not_of(delimiter, 0);
// Find first "non-delimiter".
std::string::size_type pos = str.find_first_of(delimiter, lastPos);
while (std::string::npos != pos || std::string::npos != lastPos) {
// Found a token, add it to the vector.
list.insert(str.substr(lastPos, pos - lastPos));
// Skip delimiters. Note the "not_of"
lastPos = str.find_first_not_of(delimiter, pos);
// Find next "non-delimiter"
pos = str.find_first_of(delimiter, lastPos);
}
}
void getMqttToIds(std::map<mqtt_id_t, metric_internalid> &mqttToIds, std::string &unparsedMetricList, std::string &idlistfile){
std::set<std::string> list;
parseMetricList(unparsedMetricList, list);
try {
parseIdListFile(idlistfile, mqttToIds, list);
} catch(const std::ifstream::failure& e) { //rethrow
throw e;
}
}
/*
* mqttutils.h
*
* Created on: 13.08.2018
* Author: lu43jih
*/
#ifndef SRC_SENSORS_SENSORUTILS_MQTTUTILS_H_
#define SRC_SENSORS_SENSORUTILS_MQTTUTILS_H_
#include <string>
#include <sstream>
#include <map>
#include <set>
std::string intTo2DigitHex(int myInt);
std::string createIDFromMqttParts(std::string &_mqttPrefix, int cpuId, const std::string & mqttSuffix);
using mqtt_id_t = std::string;
struct metric_internalid {
std::string metric_name;
uint64_t internalid;
};
void parseIdListFile(std::string filename, std::map<mqtt_id_t, metric_internalid>& mqttToIds, std::set<std::string> &metrics);
void parseMetricList(std::string line, std::set<std::string> & list, const char delimiter = ',');
void getMqttToIds(std::map<mqtt_id_t, metric_internalid> &mqttToIds, std::string &unparsedMetricList, std::string &idlistfile);
#endif /* SRC_SENSORS_SENSORUTILS_MQTTUTILS_H_ */
/*
* timeutils.cpp
*
* Created on: 17.09.2018
* Author: lu43jih
*/
#include "timeutils.h"
#include "timestamp.h"
#include <cmath>
#include <chrono>
using namespace std::chrono;
uint64_t starting_timestamp(unsigned int intervalInMS){
uint64_t interval = static_cast<uint64_t>(intervalInMS);
uint64_t now_ms = duration_cast< milliseconds >(system_clock::now().time_since_epoch()).count();
uint64_t waitToStart = interval - (now_ms%interval); //synchronize all measurements with other sensors
if(!waitToStart ){ // less than 1 ms seconds is too small, so we wait the entire interval for the next measurement
return (now_ms + interval)*1000*1000;
}
return (now_ms + waitToStart)*1000*1000;
}