Commit d8458a05 authored by Micha Mueller's avatar Micha Mueller
Browse files

Move _cacheIndex SensorGroup -> SensorBase

parent 99caeb0c
......@@ -68,7 +68,7 @@ void ${PLUGIN_NAME}SensorGroup::read() {
* Read a value for every sensor affiliated with this group and store
* it with the appropriate sensor.
*/ 0;
s->storeReading(reading, _cacheIndex);
s->storeReading(reading, _cacheSize);
#ifdef DEBUG
LOG(debug) << _groupName << "::" << s->getName() << ": \"" << reading.value << "\"";
#endif
......@@ -77,8 +77,6 @@ void ${PLUGIN_NAME}SensorGroup::read() {
LOG(error) << "Sensorgroup" << _groupName << " could not read value: " << e.what();
return;
}
_cacheIndex = (_cacheIndex + 1) % _cacheSize;
}
void ${PLUGIN_NAME}SensorGroup::readAsync() {
......
......@@ -24,6 +24,7 @@ public:
SensorBase(const std::string& name) :
_name(name),
_mqtt(""),
_cacheIndex(0),
_cache(nullptr),
_readingQueue(nullptr) {
......@@ -34,6 +35,7 @@ public:
SensorBase(const SensorBase& other) :
_name(other._name),
_mqtt(other._mqtt),
_cacheIndex(0),
_cache(nullptr),
_latestValue(other._latestValue),
_readingQueue(nullptr) {}
......@@ -43,6 +45,7 @@ public:
SensorBase& operator=(const SensorBase& other) {
_name = other._name;
_mqtt = other._mqtt;
_cacheIndex = 0;
_cache.reset(nullptr);
_latestValue.timestamp = other._latestValue.timestamp;
_latestValue.value = other._latestValue.value;
......@@ -75,9 +78,10 @@ public:
}
}
virtual void storeReading(reading_t reading, unsigned cacheIndex) {
virtual void storeReading(reading_t reading, unsigned int cacheSize) {
_readingQueue->push(reading);
_cache[cacheIndex] = reading;
_cache[_cacheIndex] = reading;
_cacheIndex = (_cacheIndex + 1) % cacheSize;
_latestValue.value = reading.value;
_latestValue.timestamp = reading.timestamp;
}
......@@ -88,7 +92,8 @@ protected:
std::string _name;
std::string _mqtt;
std::unique_ptr<reading_t[]> _cache; //ToDo why not vector<reading_t> ?
unsigned int _cacheIndex;
std::unique_ptr<reading_t[]> _cache;
reading_t _latestValue;
std::unique_ptr<boost::lockfree::spsc_queue<reading_t>> _readingQueue;
};
......
......@@ -26,7 +26,6 @@ public:
_interval(1000),
_cacheInterval(900000),
_cacheSize(1),
_cacheIndex(0),
_pendingTasks(0),
_timer(nullptr) {}
......@@ -39,7 +38,6 @@ public:
_interval(other._interval),
_cacheInterval(other._cacheInterval),
_cacheSize(other._cacheSize),
_cacheIndex(other._cacheIndex),
_timer(nullptr) {
_pendingTasks.store(other._pendingTasks.load());
}
......@@ -55,7 +53,6 @@ public:
_interval = other._interval;
_cacheInterval = other._cacheInterval;
_cacheSize = other._cacheSize;
_cacheIndex = other._cacheIndex;
_pendingTasks.store(other._pendingTasks.load());
_timer = nullptr;
......@@ -110,7 +107,6 @@ protected:
unsigned int _interval;
unsigned int _cacheInterval;
unsigned int _cacheSize;
unsigned int _cacheIndex;
std::atomic_uint _pendingTasks;
std::unique_ptr<boost::asio::deadline_timer> _timer;
boost::log::sources::severity_logger<boost::log::trivial::severity_level> lg;
......
......@@ -67,17 +67,16 @@ void BACnetSensorGroup::read() {
for(auto s : _sensors) {
try {
reading.value = _bacClient->readProperty(getDeviceInstance(), s->getObjectInstance(), s->getObjectType(), s->getPropertyId()) * s->getFactor();
#ifdef DEBUG
LOG(debug) << _groupName << "::" << s->getName() << ": \"" << reading.value << "\"";
#endif
s->storeReading(reading, _cacheSize);
} catch (const std::exception& e) {
LOG(error) << _groupName << "::" << s->getName() << " could not read value: " << e.what();
continue;
}
#ifdef DEBUG
LOG(debug) << _groupName << "::" << s->getName() << ": \"" << reading.value << "\"";
#endif
//to keep the _cacheIndex uniform for all sensors store value in every case
s->storeReading(reading, _cacheIndex);
}
_cacheIndex = (_cacheIndex + 1) % _cacheSize;
}
void BACnetSensorGroup::readAsync() {
......
......@@ -75,17 +75,15 @@ void IPMISensorGroup::read() {
} else { /* use raw command */
reading.value = _host->sendRawCmd(s->getRawCmd(), s->getStart(), s->getStop()) * s->getFactor();
}
#ifdef DEBUG
LOG(debug) << _groupName << "::" << s->getName() << ": \"" << reading.value << "\"";
#endif
s->storeReading(reading, _cacheSize);
} catch (const std::exception& e) {
LOG(error) << _groupName << "::" << s->getName() << " could not read value: " << e.what();
continue;
continue;
}
#ifdef DEBUG
LOG(debug) << _groupName << "::" << s->getName() << ": \"" << reading.value << "\"";
#endif
//to keep the _cacheIndex uniform for all sensors store value in every case
s->storeReading(reading, _cacheIndex);
}
_cacheIndex = (_cacheIndex + 1) % _cacheSize;
}
void IPMISensorGroup::readAsync() {
......
......@@ -157,17 +157,14 @@ void OpaSensorGroup::read() {
break;
default:
LOG(error) << _groupName << "::" << s->getName() << " could not read value!";
//dummy value
reading.value = 0;
continue;
break;
}
//to keep the _cacheIndex uniform for all sensors store value in every case
s->storeReading(reading, _cacheIndex);
s->storeReading(reading, _cacheSize);
#ifdef DEBUG
LOG(debug) << _groupName << "::" << s->getName() << ": \"" << reading.value << "\"";
#endif
}
_cacheIndex = (_cacheIndex + 1) % _cacheSize;
}
void OpaSensorGroup::readAsync() {
......
......@@ -116,17 +116,15 @@ void PDUSensorGroup::read() {
throw std::runtime_error("Value not found!");
}
reading.value = stoul(readStr);
#ifdef DEBUG
LOG(debug) << _groupName << "::" << s->getName() << ": \"" << reading.value << "\"";
#endif
s->storeReading(reading, _cacheSize);
} catch (const std::exception& e) {
LOG(error) << _groupName << "::" << s->getName() << " could not read value: " << e.what();
continue;
}
#ifdef DEBUG
LOG(debug) << _groupName << "::" << s->getName() << ": \"" << reading.value << "\"";
#endif
//to keep the _cacheIndex uniform for all sensors store value in every case
s->storeReading(reading, _cacheIndex);
}
_cacheIndex = (_cacheIndex + 1) % _cacheSize;
}
void PDUSensorGroup::readAsync() {
......
......@@ -28,9 +28,10 @@ public:
void setType(unsigned type) { _type = type; }
void setConfig(unsigned config) { _config = config; }
void storeInReadingQueue(reading_t & reading, unsigned cacheIndex) {
void storeInReadingQueue(reading_t & reading, unsigned cacheSize) {
_readingQueue->push(reading);
_cache[cacheIndex] = reading;
_cache[_cacheIndex] = reading;
_cacheIndex = (_cacheIndex + 1) % cacheSize;
}
void setLatestValid(reading_t & reading, uint64_t valueNoCorrection){
......
......@@ -205,7 +205,7 @@ void PerfSensorGroup::read() {
LOG(debug) << _groupName << "::" << _sensors[j]->getName() << ": \"" << reading.value << "\"";
#endif
if(_lastValid){
_sensors[j]->storeInReadingQueue(reading, _cacheIndex);
_sensors[j]->storeInReadingQueue(reading, _cacheSize);
}
//store original count (without correction) so we can compute the overflow and future intervals.
_sensors[j]->setLatestValid(reading, val);
......@@ -213,9 +213,6 @@ void PerfSensorGroup::read() {
}
}
}
if(_lastValid){ //last and current read are valid so we can move the cache index
_cacheIndex = (_cacheIndex + 1) % _cacheSize; //ToDo check
}
if(validMeasurement){ //set valid for the next time to read
_lastValid = true;
}
......
......@@ -25,7 +25,7 @@ ProcfsParser::ProcfsParser(std::string path) {
this->_metricsFile = NULL;
this->_stringBuffer = NULL;
this->_chars_read = 0;
this->_cacheIndex = 0;
this->_cacheSize = 0;
}
/**
......@@ -201,7 +201,7 @@ bool VmstatParser::_readMetrics() {
try { reading.value = std::stoll(lineToken); }
catch (const std::invalid_argument &ia) { return false; }
catch (const std::out_of_range &oor) { return false; }
this->_sensors->at(ctr++)->storeReading(reading, this->_cacheIndex);
this->_sensors->at(ctr++)->storeReading(reading, this->_cacheSize);
}
}
// Error: the number of read metrics does not match the one at initialization, file must have changed
......@@ -360,7 +360,7 @@ bool ProcstatParser::_readMetrics() {
try { reading.value = std::stoll(lineToken); }
catch (const std::invalid_argument &ia) { return false; }
catch (const std::out_of_range &oor) { return false; }
this->_sensors->at(ctr++)->storeReading(reading, this->_cacheIndex);
this->_sensors->at(ctr++)->storeReading(reading, this->_cacheSize);
parsedCols++;
}
colCtr++;
......@@ -376,7 +376,7 @@ bool ProcstatParser::_readMetrics() {
try { reading.value = std::stoll(lineToken); }
catch (const std::invalid_argument &ia) { return false; }
catch (const std::out_of_range &oor) { return false; }
this->_sensors->at(ctr++)->storeReading(reading, this->_cacheIndex);
this->_sensors->at(ctr++)->storeReading(reading, this->_cacheSize);
}
}
}
......@@ -431,7 +431,7 @@ bool ProcstatPercParser::_readMetrics() {
// column is flagged with 2 (sample only at node level) then the metric can be sampled
if (this->_skipColumn[colCtr] == 1 || (this->_skipColumn[colCtr] == 2 && strlen(cpuToken) == this->_cpu_prefix_len)) {
reading.value = this->_columnReadings[colCtr] * 1000 / this->_accumulator;
this->_sensors->at(ctr++)->storeReading(reading, this->_cacheIndex);
this->_sensors->at(ctr++)->storeReading(reading, this->_cacheSize);
}
}
// Error: less CPU core-related metrics than those counted upon initialization were parsed
......@@ -445,7 +445,7 @@ bool ProcstatPercParser::_readMetrics() {
try { reading.value = std::stoll(lineToken); }
catch (const std::invalid_argument &ia) { return false; }
catch (const std::out_of_range &oor) { return false; }
this->_sensors->at(ctr++)->storeReading(reading, this->_cacheIndex);
this->_sensors->at(ctr++)->storeReading(reading, this->_cacheSize);
}
}
}
......
......@@ -39,8 +39,8 @@ public:
void setPath(std::string new_path) { this->_path = new_path; this->close(); }
unsigned int getNumMetrics() { return this->_numMetrics; }
unsigned int getNumCPUs() { return this->_numCPUs; }
unsigned int getCacheIndex() { return this->_cacheIndex; }
void setCacheIndex(unsigned int c) { this->_cacheIndex = c; }
unsigned int getCacheSize() { return this->_cacheSize; }
void setCacheSize(unsigned int c) { this->_cacheSize = c; }
std::vector<ProcfsSensorBase*> *getSensors();
std::vector<ProcfsSensorBase*> *readSensors();
......@@ -62,7 +62,7 @@ protected:
// Internal variables
bool _initialized;
unsigned int _cacheIndex;
unsigned int _cacheSize;
unsigned int _numMetrics;
unsigned int _numCPUs;
std::string _path;
......
......@@ -105,12 +105,9 @@ void ProcfsSensorGroup::read() {
return;
}
#ifdef DEBUG
for(int i=0; i < this->_parser->getNumMetrics(); i++)
LOG(debug) << _groupName << "::" << _sensors[i]->getName() << ": \"" << reading.value << "\"";
for(unsigned int i=0; i < this->_parser->getNumMetrics(); i++)
LOG(debug) << _groupName << "::" << _sensors[i]->getName() << ": \"" << _sensors[i]->getLatestValue().value << "\"";
#endif
_cacheIndex = (_cacheIndex + 1) % _cacheSize;
// Updating the cache index used to store readings within the parser
this->_parser->setCacheIndex(_cacheIndex);
}
/**
......
......@@ -29,7 +29,11 @@ public:
void stop() override;
// Setters and getters
void setParser(ProcfsParser *parser) { this->_parser = parser; }
void setParser(ProcfsParser *parser) {
this->_parser = parser;
//parser needs to know the cache size to store readings
this->_parser->setCacheSize(_cacheSize);
}
ProcfsParser *getParser() { return this->_parser; }
void setType(std::string t) { this->_type = t; }
std::string getType() { return this->_type; }
......
......@@ -53,18 +53,15 @@ void SNMPSensorGroup::read() {
for(auto s : _sensors) {
try {
reading.value = _connection->issueGet(s->getOID(), s->getOIDLen());
#ifdef DEBUG
LOG(debug) << _groupName << "::" << s->getName() << ": \"" << reading.value << "\"";
#endif
s->storeReading(reading, _cacheSize);
} catch (const std::exception& e) {
LOG(error) << _groupName << "::" << s->getName() << " could not read value: " << e.what();
//dummy value
reading.value = 0;
continue;
}
#ifdef DEBUG
LOG(debug) << _groupName << "::" << s->getName() << ": \"" << reading.value << "\"";
#endif
//to keep the _cacheIndex uniform for all sensors store value in every case
s->storeReading(reading, _cacheIndex);
}
_cacheIndex = (_cacheIndex + 1) % _cacheSize;
}
void SNMPSensorGroup::readAsync() {
......
......@@ -73,17 +73,15 @@ void SysfsSensorGroup::read() {
} else {
reading.value = stoll(buf);
}
#ifdef DEBUG
LOG(debug) << _groupName << "::" << s->getName() << ": \"" << reading.value << "\"";
#endif
s->storeReading(reading, _cacheSize);
} catch (const std::exception& e) {
LOG(error) << _groupName << "::" << s->getName() << " could not read value: " << e.what();
continue;
}
#ifdef DEBUG
LOG(debug) << _groupName << "::" << s->getName() << ": \"" << reading.value << "\"";
#endif
//to keep the _cacheIndex uniform for all sensors store value in every case
s->storeReading(reading, _cacheIndex);
}
_cacheIndex = (_cacheIndex + 1) % _cacheSize;
} else {
LOG(error) << _groupName << " could not read file!";
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment