Commit 4d684af8 authored by lu43jih's avatar lu43jih
Browse files

Merge branch 'supermucng' of https://gitlab.lrz.de/dcdb/dcdbpusher into supermucng

parents a5a9d20c 4d3650d6
......@@ -290,6 +290,9 @@ Explanation of the values specific for the perfevent plugin:
| cpus | One can define a comma-separated list of cpu numbers (also value ranges can be specified, e.g. 2-4 equals 2,3,4). The hardware counter will then be only opened on the specified cpus.
> NOTE     As perfevent counters are usually always monotonic, the delta attribute is by default set to true for all sensors. One has to explicitly set delta to "off" for a sensor to overwrite this behaviour.
### type and config <a name="perfTypeConfig"></a>
(see the [perf_event_open man-page](http://man7.org/linux/man-pages/man2/perf_event_open.2.html) for more detailed explanations)
......
......@@ -68,7 +68,7 @@ void ${PLUGIN_NAME}SensorGroup::read() {
* Read a value for every sensor affiliated with this group and store
* it with the appropriate sensor.
*/ 0;
s->storeReading(reading, _cacheSize);
s->storeReading(reading);
#ifdef DEBUG
LOG(debug) << _groupName << "::" << s->getName() << ": \"" << reading.value << "\"";
#endif
......
......@@ -4,7 +4,7 @@ cat << EOF > ${PLUGIN_NAME_LWC}.conf
;comments in config files are indicated by a semicolon
global {
mqttPrefix /FF112233445566778899AABBFFFF
mqttPrefix /FF112233445566778899AABB
;add here other global attributes for your plugin
}
......
......@@ -196,7 +196,7 @@ void HttpsServer::requestHandler::operator()(server::request const &request, ser
for(const auto& g : p.configurator->getSensorGroups()) {
for(const auto& s : g->getSensors()) {
if (s->getName() == sensor) {
response = pathStrs[0] + "::" + sensor + _httpsServer.calcAvg(*s, g->getCacheSize(), time);
response = pathStrs[0] + "::" + sensor + _httpsServer.calcAvg(*s, time);
connection->set_status(server::connection::ok);
break;
}
......@@ -321,12 +321,12 @@ HttpsServer::~HttpsServer() {
delete _server;
}
std::string HttpsServer::calcAvg(SensorBase& s, unsigned cacheSize, uint64_t time) {
std::string HttpsServer::calcAvg(SensorBase& s, uint64_t time) {
uint64_t avg = 0;
const reading_t * const cache = s.getCache();
unsigned count = 0;
for(unsigned i = 0; i < cacheSize; i++) {
for(unsigned i = 0; i < s.getCacheSize(); i++) {
if (cache[i].timestamp > time) {
avg += cache[i].value;
count++;
......
......@@ -94,7 +94,7 @@ private:
*
* @return Response message of the form " Average of last *count* values is *avg*"
*/
std::string calcAvg(SensorBase& s, unsigned cacheSize, uint64_t time);
std::string calcAvg(SensorBase& s, uint64_t time);
/*
* Check if the authkey is valid and has the permission requiredPerm associated
......
......@@ -330,6 +330,7 @@ protected:
* @return True on success, false otherwise
*/
bool readSensorBase(SBase& sBase, CFG_VAL config) {
sBase.setCacheInterval(_cacheInterval);
boost::optional<boost::property_tree::iptree&> def = config.get_child_optional("default");
if(def) {
//we copy all values from default (including copy constructing its sensors)
......@@ -367,7 +368,6 @@ protected:
* @return True on success, false otherwise
*/
bool readSensorGroup(SGroup& sGroup, CFG_VAL config) {
sGroup.setCacheInterval(_cacheInterval);
//first check if default group is given
boost::optional<boost::property_tree::iptree&> def = config.get_child_optional("default");
if(def) {
......
......@@ -25,6 +25,8 @@ public:
SensorBase(const std::string& name) :
_name(name),
_mqtt(""),
_cacheInterval(900000),
_cacheSize(1),
_cacheIndex(0),
_cache(nullptr),
_delta(false),
......@@ -39,6 +41,8 @@ public:
SensorBase(const SensorBase& other) :
_name(other._name),
_mqtt(other._mqtt),
_cacheInterval(other._cacheInterval),
_cacheSize(other._cacheSize),
_cacheIndex(0),
_cache(nullptr),
_delta(other._delta),
......@@ -51,6 +55,8 @@ public:
SensorBase& operator=(const SensorBase& other) {
_name = other._name;
_mqtt = other._mqtt;
_cacheInterval = other._cacheInterval;
_cacheSize = other._cacheSize;
_cacheIndex = 0;
_cache.reset(nullptr);
_delta = other._delta;
......@@ -66,21 +72,25 @@ public:
const bool isDelta() const { return _delta;}
const std::string& getName() const { return _name; }
const std::string& getMqtt() const { return _mqtt; }
unsigned getCacheSize() const { return _cacheSize; }
const reading_t * const getCache() const { return _cache.get(); }
const reading_t getLatestValue() const { return _latestValue; } /*TODO return reference*/
const reading_t& getLatestValue() const { return _latestValue; }
void setDelta(const bool delta) { _delta = delta; }
void setName(const std::string& name, int cpuID=-1) { _name = formatName(name, cpuID); }
void setMqtt(const std::string& mqtt) { _mqtt = mqtt; }
void setDelta(const bool delta) { _delta = delta; }
void setName(const std::string& name, int cpuID=-1) { _name = formatName(name, cpuID); }
void setMqtt(const std::string& mqtt) { _mqtt = mqtt; }
void setCacheInterval(unsigned cacheInterval) { _cacheInterval = cacheInterval; }
void setLastRawValue(uint64_t value) { _lastRawValue.value = value; }
const std::size_t getSizeOfReadingQueue() const { return _readingQueue->read_available(); }
std::size_t popReadingQueue(reading_t *reads, std::size_t max) const { return _readingQueue->pop(reads, max); }
void pushReadingQueue(reading_t *reads, std::size_t count) const { _readingQueue->push(reads, count); }
void initSensor(unsigned cacheSize) {
void initSensor(unsigned interval) {
_cacheSize = _cacheInterval / interval + 1;
if(!_cache) {
_cache.reset(new reading_t[cacheSize]);
for(unsigned i = 0; i < cacheSize; i++) {
_cache.reset(new reading_t[_cacheSize]);
for(unsigned i = 0; i < _cacheSize; i++) {
_cache[i] = _latestValue; //_latestValue should equal (0,0) at this point
}
}
......@@ -89,21 +99,21 @@ public:
}
}
virtual void storeReading(reading_t reading, unsigned int cacheSize, unsigned long long maxValue=ULLONG_MAX) {
virtual void storeReading(reading_t reading, double factor=1.0, unsigned long long maxValue=ULLONG_MAX) {
_latestValue.timestamp = reading.timestamp;
if( _delta ) {
if (reading.value < _lastRawValue.value)
_latestValue.value = reading.value + (maxValue - _lastRawValue.value);
_latestValue.value = (reading.value + (maxValue - _lastRawValue.value)) * factor;
else
_latestValue.value = reading.value - _lastRawValue.value;
_latestValue.value = (reading.value - _lastRawValue.value) * factor;
_lastRawValue.value = reading.value;
}
else
_latestValue.value = reading.value;
_latestValue.value = reading.value * factor;
_readingQueue->push(_latestValue);
_cache[_cacheIndex] = _latestValue;
_cacheIndex = (_cacheIndex + 1) % cacheSize;
_cacheIndex = (_cacheIndex + 1) % _cacheSize;
}
static std::string formatName(const std::string& name, int cpuID=-1) {return cpuID<0 ? name : "cpu" + std::to_string(cpuID) + "." + name;}
......@@ -112,6 +122,8 @@ protected:
std::string _name;
std::string _mqtt;
unsigned int _cacheInterval;
unsigned int _cacheSize;
unsigned int _cacheIndex;
std::unique_ptr<reading_t[]> _cache;
bool _delta;
......
......@@ -25,8 +25,6 @@ public:
_keepRunning(0),
_minValues(1),
_interval(1000),
_cacheInterval(900000),
_cacheSize(1),
_pendingTasks(0),
_timer(nullptr) {}
......@@ -37,8 +35,6 @@ public:
_keepRunning(other._keepRunning),
_minValues(other._minValues),
_interval(other._interval),
_cacheInterval(other._cacheInterval),
_cacheSize(other._cacheSize),
_timer(nullptr) {
_pendingTasks.store(other._pendingTasks.load());
}
......@@ -52,8 +48,6 @@ public:
_keepRunning = other._keepRunning;
_minValues = other._minValues;
_interval = other._interval;
_cacheInterval = other._cacheInterval;
_cacheSize = other._cacheSize;
_pendingTasks.store(other._pendingTasks.load());
_timer = nullptr;
......@@ -65,14 +59,12 @@ public:
bool getSync() const { return _sync; }
unsigned getMinValues() const { return _minValues; }
unsigned getInterval() const { return _interval; }
unsigned getCacheSize() const { return _cacheSize; }
void setGroupName(const std::string& groupName) { _groupName = groupName; }
void setMqttPart(const std::string& mqttPart) { _mqttPart = mqttPart; }
void setSync(bool sync) { _sync = sync; }
void setMinValues(unsigned minValues) { _minValues = minValues; }
void setInterval(unsigned interval) { _interval = interval; }
void setCacheInterval(unsigned cacheInterval) { _cacheInterval = cacheInterval; }
/**
* Does a busy wait until all dispatched handlers are finished (_pendingTasks == 0)
......@@ -85,7 +77,6 @@ public:
//can be overwritten
virtual void init(boost::asio::io_service& io) {
_cacheSize = _cacheInterval / _interval + 1;
_timer.reset(new boost::asio::deadline_timer(io, boost::posix_time::seconds(0)));
}
......@@ -106,8 +97,6 @@ protected:
int _keepRunning;
unsigned int _minValues;
unsigned int _interval;
unsigned int _cacheInterval;
unsigned int _cacheSize;
std::atomic_uint _pendingTasks;
std::unique_ptr<boost::asio::deadline_timer> _timer;
boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
......
......@@ -71,7 +71,7 @@ public:
SensorGroupInterface::init(io);
for(auto s : _sensors) {
s->initSensor(_cacheSize);
s->initSensor(_interval);
}
}
......
......@@ -66,11 +66,11 @@ void BACnetSensorGroup::read() {
for(const auto& s : _sensors) {
try {
reading.value = _bacClient->readProperty(getDeviceInstance(), s->getObjectInstance(), s->getObjectType(), s->getPropertyId()) * s->getFactor();
reading.value = _bacClient->readProperty(getDeviceInstance(), s->getObjectInstance(), s->getObjectType(), s->getPropertyId());
#ifdef DEBUG
LOG(debug) << _groupName << "::" << s->getName() << ": \"" << reading.value << "\"";
#endif
s->storeReading(reading, _cacheSize);
s->storeReading(reading, s->getFactor());
} catch (const std::exception& e) {
LOG(error) << _groupName << "::" << s->getName() << " could not read value: " << e.what();
continue;
......
......@@ -87,7 +87,7 @@ void GpfsmonSensorGroup::read() {
auto upper = std::upper_bound(lower, searchEnd, nodename, GPFSSensorCompare());
for(;lower!=upper; ++lower){
reading.value = _data[(*lower)->getMetricType()];
(*lower)->storeReading(reading, _cacheSize);
(*lower)->storeReading(reading);
#ifdef DEBUG
LOG(debug) << _groupName << "::" << s->getName() << ": \"" << reading.value << "\"";
#endif
......
......@@ -71,14 +71,14 @@ void IPMISensorGroup::read() {
_host->getSdrRecord(s->getRecordId(), sdrRecord);
s->setSdrRecord(sdrRecord);
}
reading.value = _host->readSensorRecord(sdrRecord) * s->getFactor();
reading.value = _host->readSensorRecord(sdrRecord);
} else { /* use raw command */
reading.value = _host->sendRawCmd(s->getRawCmd(), s->getStart(), s->getStop()) * s->getFactor();
reading.value = _host->sendRawCmd(s->getRawCmd(), s->getStart(), s->getStop());
}
#ifdef DEBUG
LOG(debug) << _groupName << "::" << s->getName() << ": \"" << reading.value << "\"";
#endif
s->storeReading(reading, _cacheSize);
s->storeReading(reading, s->getFactor());
} catch (const std::exception& e) {
LOG(error) << _groupName << "::" << s->getName() << " could not read value: " << e.what();
continue;
......
......@@ -160,7 +160,7 @@ void OpaSensorGroup::read() {
continue;
break;
}
s->storeReading(reading, _cacheSize);
s->storeReading(reading);
#ifdef DEBUG
LOG(debug) << _groupName << "::" << s->getName() << ": \"" << reading.value << "\"";
#endif
......
......@@ -119,7 +119,7 @@ void PDUSensorGroup::read() {
#ifdef DEBUG
LOG(debug) << _groupName << "::" << s->getName() << ": \"" << reading.value << "\"";
#endif
s->storeReading(reading, _cacheSize);
s->storeReading(reading);
} catch (const std::exception& e) {
LOG(error) << _groupName << "::" << s->getName() << " could not read value: " << e.what();
continue;
......
......@@ -28,18 +28,6 @@ public:
void setType(unsigned type) { _type = type; }
void setConfig(unsigned config) { _config = config; }
void storeInReadingQueue(reading_t & reading, unsigned cacheSize) {
_readingQueue->push(reading);
_cache[_cacheIndex] = reading;
_cacheIndex = (_cacheIndex + 1) % cacheSize;
}
void setLatestValid(reading_t & reading, uint64_t valueNoCorrection){
_latestValue.value = valueNoCorrection;
_latestValue.timestamp = reading.timestamp;
}
protected:
unsigned int _type;
unsigned int _config;
......
......@@ -197,19 +197,17 @@ void PerfSensorGroup::read() {
for (unsigned j = 0; j < _sensors.size(); j++) {
if (rf->values[i].id == _ids[j]) {
//has no correction (might only overflow on uncorrected value)
reading_t lV = _sensors[j]->getLatestValue();
//first calculate interval and then apply correction to interval
reading.value = static_cast<uint64_t>(calculateIntervalValue(lV.value, val, PerfSensorBase::MAXCOUNTERVALUE)*correction);
reading.value = val;
#ifdef DEBUG
LOG(debug) << _groupName << "::" << _sensors[j]->getName() << ": \"" << reading.value << "\"";
#endif
if(_lastValid){
_sensors[j]->storeInReadingQueue(reading, _cacheSize);
//storeReading takes care of delta computation and applies correction value on the result
_sensors[j]->storeReading(reading, correction, PerfSensorBase::MAXCOUNTERVALUE);
} else {
//Before we can compute correct values again after an invalid reading we have to update the lastRawValue first
_sensors[j]->setLastRawValue(val);
}
//store original count (without correction) so we can compute the overflow and future intervals.
_sensors[j]->setLatestValid(reading, val);
break;
}
}
......@@ -235,7 +233,7 @@ void PerfSensorGroup::readAsync() {
_pendingTasks--;
}
uint64_t calculateIntervalValue(uint64_t previous, uint64_t current, uint64_t maxValue){
uint64_t PerfSensorGroup::calculateIntervalValue(uint64_t previous, uint64_t current, uint64_t maxValue){
if(previous > current) { //overflow
return current + (maxValue - previous);
}
......
......@@ -55,8 +55,10 @@ private:
bool _lastValid;
double _maxCorrection;
uint64_t calculateIntervalValue(uint64_t previous, uint64_t current, uint64_t maxValue);
};
uint64_t calculateIntervalValue(uint64_t previous, uint64_t current, uint64_t maxValue);
#endif /* PERFSENSORGROUP_H_ */
......@@ -58,6 +58,8 @@ void PerfeventConfigurator::sensorBase(PerfSensorBase& s, CFG_VAL config) {
/*
* Custom code, as perf-event is an extra special plugin
*/
//default delta to true, as perfevent has only monotonic sensors usually
s.setDelta(true);
BOOST_FOREACH(boost::property_tree::iptree::value_type &val, config) {
if (boost::iequals(val.first, "type")) {
try {
......@@ -92,6 +94,9 @@ void PerfeventConfigurator::sensorBase(PerfSensorBase& s, CFG_VAL config) {
LOG(warning) << " Config \"" << val.second.data() << "\" not known.";
}
}
} else if (boost::iequals(val.first, "delta")) {
//it is explicitly stated to be off --> set it to false
s.setDelta( !(val.second.data() == "off") );
}
}
}
......@@ -194,7 +199,7 @@ bool PerfeventConfigurator::readConfig(std::string cfgPath) {
perfSG->setCpuId(*it);
perfSG->setMqttPart(formatMqttCPU(group.getMqttPart(), *it) + "/");
for(auto s : perfSG->getSensors()) s->setName(s->getName(), *it);
for(const auto& s : perfSG->getSensors()) s->setName(s->getName(), *it);
storeSensorGroup(perfSG);
leaderSG->pushBackGroup(perfSG);
......
......@@ -107,7 +107,7 @@ void ProcfsSensorGroup::read() {
else {
for(unsigned int i=0; i < this->_sensors.size(); i++) {
this->_readingBuffer.value = this->_readingVector->at(i).value;
this->_sensors.at(i)->storeReading(this->_readingBuffer, _cacheSize);
this->_sensors.at(i)->storeReading(this->_readingBuffer);
}
}
#ifdef DEBUG
......
......@@ -56,7 +56,7 @@ void SNMPSensorGroup::read() {
#ifdef DEBUG
LOG(debug) << _groupName << "::" << s->getName() << ": \"" << reading.value << "\"";
#endif
s->storeReading(reading, _cacheSize);
s->storeReading(reading);
} catch (const std::exception& e) {
LOG(error) << _groupName << "::" << s->getName() << " could not read value: " << e.what();
continue;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment