Commit c976d362 authored by Micha Mueller's avatar Micha Mueller
Browse files

Adapt Omnipath plugin to sensorgroupsV2 architecture

parent 30f00bf6
......@@ -2,51 +2,49 @@ global {
mqttPrefix /FF112233445566778899AABBFFFF
}
sensorTemplates {
sensor temp1 {
interval 1000
minValues 3
template_group temp1 {
interval 1000
minValues 3
sensor XmitDat {
cntData portXmitData
mqttsuffix 00
}
}
sensors {
sensor XmitDat1 {
default temp1
hfiNum 1
portNum 1
mqttsuffix 0001
}
group XmitDat1 {
default temp1
hfiNum 1
portNum 1
mqttpart 01
}
sensor XmitDat2 {
default temp1
mqttsuffix 0002
hfiNum 1
portNum 2
}
group XmitDat2 {
default temp1
mqttpart 02
hfiNum 1
portNum 2
}
groups {
group p1 {
interval 1000
mqttprefix 01
hfiNum 1
portNum 1
sensor rcvDat {
default temp1
mqttsuffix 00
cntData portRcvData
}
sensor rcvPkt {
mqttsuffix 00
cntData portRcvPkts
}
sensor linkDown {
mqttsuffix 00
cntData linkDowned
}
group p1 {
interval 1000
mqttpart 01
hfiNum 1
portNum 1
sensor rcvDat {
default temp1
mqttsuffix 00
cntData portRcvData
}
sensor rcvPkt {
mqttsuffix 00
cntData portRcvPkts
}
sensor linkDown {
mqttsuffix 00
cntData linkDowned
}
}
/*
* OpaAttributes.h
*
* Created on: 10.09.2018
* Author: Micha Mueller
*/
#ifndef OPA_OPAATTRIBUTES_H_
#define OPA_OPAATTRIBUTES_H_
#include <opamgt/opamgt.h>
#include <opamgt/opamgt_pa.h>
#include <inttypes.h>
class OpaAttributes {
public:
OpaAttributes() :
_hfiNum(0),
_portNum(0) {
_port = nullptr;
_imageID = {0};
}
virtual ~OpaAttributes() {
if(_port) {
omgt_close_port(port);
}
}
int32_t getHfiNum() const { return _hfiNum; }
uint8_t getPortNum() const { return _portNum; }
void setHfiNum(int32_t hfiNum) { _hfiNum = hfiNum; }
void setPortNum(uint8_t portNum) { _portNum = portNum; }
protected:
int32_t _hfiNum;
uint8_t _portNum;
struct omgt_port * _port;
STL_PA_IMAGE_ID_DATA _imageID;
STL_PA_IMAGE_INFO_DATA _imageInfo;
};
#endif /* OPA_OPAATTRIBUTES_H_ */
......@@ -40,124 +40,25 @@ OpaConfigurator::OpaConfigurator() {
OpaConfigurator::~OpaConfigurator() {}
bool OpaConfigurator::derivedReadConfig(boost::property_tree::iptree& cfg) {
//read one sensor at a time
BOOST_FOREACH(boost::property_tree::iptree::value_type &sensor, cfg.get_child("sensors")) {
if (STRCMP(sensor, "sensor")) {
LOG(debug) << "Sensor \"" << sensor.second.data() << "\"";
if (!sensor.second.empty()) {
OpaSingleSensor* opaSensor = new OpaSingleSensor(sensor.second.data());
//first check if default sensor is given
boost::optional<boost::property_tree::iptree&> defaultS = sensor.second.get_child_optional("default");
if(defaultS) {
LOG(debug) << " Using \"" << defaultS.get().data() << "\" as default.";
sensorMap_t::iterator it = _templateSensors.find(defaultS.get().data());
if(it != _templateSensors.end()) {
*opaSensor = it->second;
opaSensor->setName(sensor.second.data());
} else {
LOG(warning) << "Template sensor \"" << defaultS.get().data() << "\" not found! Using standard values.";
}
}
//check if hfiNum is given
boost::optional<boost::property_tree::iptree&> hfiNum = sensor.second.get_child_optional("hfiNum");
if (hfiNum) { //hfiNum given
opaSensor->setHfiNum(stoi(hfiNum.get().data()));
} else {
LOG(warning) << " Sensor \"" << sensor.second.data() << "\" has no hfiNum specified! Ignoring...";
continue;
}
//check if portNum is given
boost::optional<boost::property_tree::iptree&> portNum = sensor.second.get_child_optional("portNum");
if (portNum) { //portNum given
opaSensor->setPortNum(stoull(portNum.get().data()));
} else {
LOG(warning) << " Sensor \"" << sensor.second.data() << "\" has no portNum specified! Ignoring...";
continue;
}
//read remaining values
if(readSingleSensor(*opaSensor, sensor.second)) {
_sensors.push_back(opaSensor);
} else {
LOG(warning) << " Sensor \"" << sensor.second.data() << "\" has bad values! Ignoring...";
}
}
}
}
//read groups
BOOST_FOREACH(boost::property_tree::iptree::value_type &group, cfg.get_child("groups")) {
if (STRCMP(group, "group")) {
LOG(debug) << "Group \"" << group.second.data() << "\"";
if (!group.second.empty()) {
OpaSensorGroup* opaGroup = new OpaSensorGroup(group.second.data());
//check if hfiNum is given
boost::optional<boost::property_tree::iptree&> hfiNum = group.second.get_child_optional("hfiNum");
if (hfiNum) { //hfiNum given
opaGroup->setHfiNum(stoi(hfiNum.get().data()));
} else {
LOG(warning) << " SensorGroup \"" << group.second.data() << "\" has no hfiNum specified! Ignoring...";
continue;
}
//check if portNum is given
boost::optional<boost::property_tree::iptree&> portNum = group.second.get_child_optional("portNum");
if (portNum) { //portNum given
opaGroup->setPortNum(stoull(portNum.get().data()));
} else {
LOG(warning) << " SensorGroup \"" << group.second.data() << "\" has no portNum specified! Ignoring...";
continue;
}
//read mqttPart if present
std::string mqttPart = "";
boost::optional<boost::property_tree::iptree&> mqtt = group.second.get_child_optional("mqttprefix");
if (mqtt) {
mqttPart = mqtt.get().data();
}
//read remaining values
if (!readSensorInterface(*opaGroup, group.second)) {
LOG(warning) << " SensorGroup \"" << group.second.data() << "\" has bad values!";
}
//read sensors for this group
BOOST_FOREACH(boost::property_tree::iptree::value_type &val, group.second) {
if (STRCMP(val, "sensor")) {
LOG(debug) << "Sensor \"" << val.second.data() << "\"";
OpaSensorBase* opaSB = new OpaSensorBase(val.second.data());
if(readSensorBase(*opaSB, val.second)) {
opaSB->setMqtt(_mqttPrefix + mqttPart + opaSB->getMqtt());
LOG(debug) << " Sensor " << opaSB->getName() << " using MQTT-Topic " << opaSB->getMqtt();
opaGroup->pushBackSensor(opaSB);
} else {
LOG(warning) << " Sensor \"" << val.second.data() << "\" has bad values! Ignoring...";
}
}
}
_sensorGroups.push_back(opaGroup);
}
}
}
return true;
}
bool OpaConfigurator::derivedReadSensorBase(OpaSensorBase& sensor, boost::property_tree::iptree& config) {
void OpaConfigurator::sensorBase(OpaSensorBase& s, CFG_VAL config) {
/**
* Custom code because we need to use _enumCntData
*/
BOOST_FOREACH(boost::property_tree::iptree::value_type &val, config) {
if (STRCMP(val, "mqttsuffix")) {
sensor.setMqtt(val.second.data());
} else if (STRCMP(val, "cntData")) {
if (boost::iequals(val.first, "cntData")) {
enumMap_t::iterator it = _enumCntData.find(val.second.data());
if(it != _enumCntData.end()) {
sensor.setCounterData(it->second);
LOG(debug) << " cntData: " << val.second.data() << " (= " << sensor.getType() << ")";
s.setCounterData(it->second);
} else {
LOG(warning) << " cntData \"" << val.second.data() << "\" not known.";
return false;
}
}
}
return true;
}
void OpaConfigurator::sensorGroup(OpaSensorGroup& s, CFG_VAL config) {
ADD {
ATTRIBUTE("hfiNum", setHfiNum);
ATTRIBUTE("portNum", setPortNum);
}
}
......@@ -10,9 +10,8 @@
#include "../../includes/ConfiguratorTemplate.h"
#include "OpaSensorGroup.h"
#include "OpaSingleSensor.h"
class OpaConfigurator : public ConfiguratorTemplate<OpaSensorBase, OpaSingleSensor> {
class OpaConfigurator : public ConfiguratorTemplate<OpaSensorBase, OpaSensorGroup> {
typedef std::map<std::string, unsigned int> enumMap_t;
......@@ -21,10 +20,9 @@ public:
virtual ~OpaConfigurator();
protected:
bool derivedReadConfig(boost::property_tree::iptree& cfg) override;
void derivedReReadConfig() override { /* nothing to overwrite */ }
void derivedSetGlobalSettings(const pluginSettings_t& pluginSettings) override { /* nothing to overwrite */ }
bool derivedReadSensorBase(OpaSensorBase& sensor, boost::property_tree::iptree& config) override;
/* Overwritten from ConfiguratorTemplate */
void sensorBase(OpaSensorBase& s, CFG_VAL config) override;
void sensorGroup(OpaSensorGroup& s, CFG_VAL config) override;
private:
enumMap_t _enumCntData;
......
......@@ -10,16 +10,16 @@
#include "timestamp.h"
OpaSensorGroup::OpaSensorGroup(const std::string name) :
SensorGroupTemplate(name) {}
OpaSensorGroup::~OpaSensorGroup() {}
void OpaSensorGroup::init(boost::asio::io_service& io) {
_cacheSize = _cacheInterval / _interval + 1;
_timer.reset(new boost::asio::deadline_timer(io, boost::posix_time::seconds(0)));
SensorGroupTemplate(name),
_hfiNum(0),
_portNum(0) {
_port = nullptr;
_imageID = {0};
}
for (auto s : _sensors) {
s->initSensor(_cacheSize);
OpaSensorGroup::~OpaSensorGroup() {
if(_port) {
omgt_close_port(port);
}
}
......@@ -64,109 +64,111 @@ void OpaSensorGroup::read() {
reading_t reading;
reading.timestamp = getTimestamp();
STL_PORT_COUNTERS_DATA portCounters;
try {
STL_PORT_COUNTERS_DATA portCounters;
if (omgt_pa_get_port_stats(_port, _imageID, 1, _portNum, &_imageID, &portCounters, NULL, 0, 1)) {
throw std::runtime_error("Failed to get port counters");
}
for(auto s : _sensors) {
switch(s->getCounterData()) {
case(portXmitData) :
reading.value = portCounters.portXmitData;
break;
case(portRcvData) :
reading.value = portCounters.portRcvData;
break;
case(portXmitPkts) :
reading.value = portCounters.portXmitPkts;
break;
case(portRcvPkts) :
reading.value = portCounters.portRcvPkts;
break;
case(portMulticastXmitPkts) :
reading.value = portCounters.portMulticastXmitPkts;
break;
case(portMulticastRcvPkts) :
reading.value = portCounters.portMulticastRcvPkts;
break;
case(localLinkIntegrityErrors) :
reading.value = portCounters.localLinkIntegrityErrors;
break;
case(fmConfigErrors) :
reading.value = portCounters.fmConfigErrors;
break;
case(portRcvErrors) :
reading.value = portCounters.portRcvErrors;
break;
case(excessiveBufferOverruns) :
reading.value = portCounters.excessiveBufferOverruns;
break;
case(portRcvConstraintErrors) :
reading.value = portCounters.portRcvConstraintErrors;
break;
case(portRcvSwitchRelayErrors) :
reading.value = portCounters.portRcvSwitchRelayErrors;
break;
case(portXmitDiscards) :
reading.value = portCounters.portXmitDiscards;
break;
case(portXmitConstraintErrors) :
reading.value = portCounters.portXmitConstraintErrors;
break;
case(portRcvRemotePhysicalErrors) :
reading.value = portCounters.portRcvRemotePhysicalErrors;
break;
case(swPortCongestion) :
reading.value = portCounters.swPortCongestion;
break;
case(portXmitWait) :
reading.value = portCounters.portXmitWait;
break;
case(portRcvFECN) :
reading.value = portCounters.portRcvFECN;
break;
case(portRcvBECN) :
reading.value = portCounters.portRcvBECN;
break;
case(portXmitTimeCong) :
reading.value = portCounters.portXmitTimeCong;
break;
case(portXmitWastedBW) :
reading.value = portCounters.portXmitWastedBW;
break;
case(portXmitWaitData) :
reading.value = portCounters.portXmitWaitData;
break;
case(portRcvBubble) :
reading.value = portCounters.portRcvBubble;
break;
case(portMarkFECN) :
reading.value = portCounters.portMarkFECN;
break;
case(linkErrorRecovery) :
reading.value = portCounters.linkErrorRecovery;
break;
case(linkDowned) :
reading.value = portCounters.linkDowned;
break;
case(uncorrectableErrors) :
reading.value = portCounters.uncorrectableErrors;
break;
default:
throw std::runtime_error("Unknown counter data");
break;
}
s->storeReading(reading, _cacheIndex);
#ifdef DEBUG
LOG(debug) << _groupName << "::" << s->getName() << ": \"" << reading.value << "\"";
#endif
}
} catch (const std::exception& e) {
LOG(error) << "Sensorgroup" << _groupName << " could not read value: " << e.what();
return;
}
for(auto s : _sensors) {
switch(s->getCounterData()) {
case(portXmitData) :
reading.value = portCounters.portXmitData;
break;
case(portRcvData) :
reading.value = portCounters.portRcvData;
break;
case(portXmitPkts) :
reading.value = portCounters.portXmitPkts;
break;
case(portRcvPkts) :
reading.value = portCounters.portRcvPkts;
break;
case(portMulticastXmitPkts) :
reading.value = portCounters.portMulticastXmitPkts;
break;
case(portMulticastRcvPkts) :
reading.value = portCounters.portMulticastRcvPkts;
break;
case(localLinkIntegrityErrors) :
reading.value = portCounters.localLinkIntegrityErrors;
break;
case(fmConfigErrors) :
reading.value = portCounters.fmConfigErrors;
break;
case(portRcvErrors) :
reading.value = portCounters.portRcvErrors;
break;
case(excessiveBufferOverruns) :
reading.value = portCounters.excessiveBufferOverruns;
break;
case(portRcvConstraintErrors) :
reading.value = portCounters.portRcvConstraintErrors;
break;
case(portRcvSwitchRelayErrors) :
reading.value = portCounters.portRcvSwitchRelayErrors;
break;
case(portXmitDiscards) :
reading.value = portCounters.portXmitDiscards;
break;
case(portXmitConstraintErrors) :
reading.value = portCounters.portXmitConstraintErrors;
break;
case(portRcvRemotePhysicalErrors) :
reading.value = portCounters.portRcvRemotePhysicalErrors;
break;
case(swPortCongestion) :
reading.value = portCounters.swPortCongestion;
break;
case(portXmitWait) :
reading.value = portCounters.portXmitWait;
break;
case(portRcvFECN) :
reading.value = portCounters.portRcvFECN;
break;
case(portRcvBECN) :
reading.value = portCounters.portRcvBECN;
break;
case(portXmitTimeCong) :
reading.value = portCounters.portXmitTimeCong;
break;
case(portXmitWastedBW) :
reading.value = portCounters.portXmitWastedBW;
break;
case(portXmitWaitData) :
reading.value = portCounters.portXmitWaitData;
break;
case(portRcvBubble) :
reading.value = portCounters.portRcvBubble;
break;
case(portMarkFECN) :
reading.value = portCounters.portMarkFECN;
break;
case(linkErrorRecovery) :
reading.value = portCounters.linkErrorRecovery;
break;
case(linkDowned) :
reading.value = portCounters.linkDowned;
break;
case(uncorrectableErrors) :
reading.value = portCounters.uncorrectableErrors;
break;
default:
LOG(error) << _groupName << "::" << s->getName() << " could not read value!";
//dummy value
reading.value = 0;
break;
}
//to keep the _cacheIndex uniform for all sensors store value in every case
s->storeReading(reading, _cacheIndex);
#ifdef DEBUG
LOG(debug) << _groupName << "::" << s->getName() << ": \"" << reading.value << "\"";
#endif
}
_cacheIndex = (_cacheIndex + 1) % _cacheSize;
}
......
......@@ -9,21 +9,36 @@
#define OPA_OPASENSORGROUP_H_
#include "../../includes/SensorGroupTemplate.h"
#include "OpaAttributes.h"
#include "OpaSensorBase.h"
class OpaSensorGroup : public SensorGroupTemplate<OpaSensorBase>, public OpaAttributes {
#include <opamgt/opamgt.h>
#include <opamgt/opamgt_pa.h>
#include <inttypes.h>
class OpaSensorGroup : public SensorGroupTemplate<OpaSensorBase> {
public:
OpaSensorGroup(const std::string name);
virtual ~OpaSensorGroup();
void init(boost::asio::io_service& io) override;
void start() override;
void stop() override;
int32_t getHfiNum() const { return _hfiNum; }
uint8_t getPortNum() const { return _portNum; }
void setHfiNum(const std::string& hfiNum) { _hfiNum = stoi(hfiNum); }
void setPortNum(const std::string& portNum) { _portNum = stoull(portNum); }
private:
void read() override;
void readAsync() override;
int32_t _hfiNum;
uint8_t _portNum;
struct omgt_port * _port;
STL_PA_IMAGE_ID_DATA _imageID;
STL_PA_IMAGE_INFO_DATA _imageInfo;
};
#endif /* OPA_OPASENSORGROUP_H_ */
/*
* OpaSingleSensor.cpp
*
* Created on: 10.09.2018
* Author: Micha Mueller
*/
#include "OpaSingleSensor.h"
#include "timestamp.h"
OpaSingleSensor::OpaSingleSensor(const std::string& name) :
SensorBase(name), OpaSensorBase(name), SingleSensor(name) {}
OpaSingleSensor::~OpaSingleSensor() {}
void OpaSingleSensor::start() {
if (_keepRunning) {
//we have been started already
LOG(info) << "Sensor " << _name << " already running.";
return;
}
if (omgt_open_port_by_num(&_port, _hfiNum, _portNum, NULL) != OMGT_STATUS_SUCCESS) {
LOG(error) << "Sensor " << _name << " failed to open port or initialize PA connection";
_port = nullptr;
return;
}
if (omgt_pa_get_image_info(_port, _imageID, &_imageInfo)) {
LOG(error) << "Sensor " << _name << " failed to get PA image";
omgt_close_port(_port);
_port = nullptr;
return;
}