Commit 18f03ef4 authored by Alessio Netti's avatar Alessio Netti
Browse files

Minor fixes and changes

- Subsampling behavior with delta fixed: the deltas that
are sent over MQTT are coherent with the subsampling frequency, and are
computed separately from the local high-frequency deltas
- Added exception handling when writing to sensor file sinks
- Fixed odd ProcFS behavior that led to reading constant values from
certain files by closing and re-opening every time. Overhead seems to
be negligible
parent a1fc4e26
......@@ -45,12 +45,13 @@ public:
_lastSentValue.value = 0;
_lastRawValue.timestamp = 0;
_lastRawValue.value = 0;
_accumulator.timestamp = 0;
_accumulator.value = 0;
}
SensorBase(const SensorBase& other) :
_name(other._name),
_mqtt(other._mqtt),
_sinkPath(other._sinkPath),
_skipConstVal(other._skipConstVal),
_cacheInterval(other._cacheInterval),
_subsamplingFactor(other._subsamplingFactor),
......@@ -63,6 +64,7 @@ public:
_latestValue(other._latestValue),
_lastRawValue(other._lastRawValue),
_lastSentValue(other._lastSentValue),
_accumulator(other._accumulator),
_readingQueue(nullptr),
_sinkFile(nullptr) {}
......@@ -71,7 +73,6 @@ public:
SensorBase& operator=(const SensorBase& other) {
_name = other._name;
_mqtt = other._mqtt;
_sinkPath = other._sinkPath;
_skipConstVal = other._skipConstVal;
_cacheInterval = other._cacheInterval;
_subsamplingFactor = other._subsamplingFactor;
......@@ -87,6 +88,8 @@ public:
_lastRawValue.value = other._lastRawValue.value;
_lastSentValue.timestamp= other._lastSentValue.timestamp;
_lastSentValue.value = other._lastSentValue.value;
_accumulator.timestamp = other._accumulator.timestamp;
_accumulator.value = other._accumulator.value;
_readingQueue.reset(nullptr);
_sinkFile.reset(nullptr);
......@@ -151,14 +154,27 @@ public:
else
reading.value = rawReading.value * factor;
//TODO: if sensor starts with values of 0, these won't be pushed. This should be fixed
if (!(_skipConstVal && (reading.value == _lastSentValue.value)) && _subsamplingIndex++ % _subsamplingFactor == 0) {
_readingQueue->push(reading);
_lastSentValue = reading;
if( _delta )
// If in delta mode, _accumulator acts as a buffer, summing all deltas for the subsampling period
_accumulator.value += reading.value;
else
_accumulator.value = reading.value;
if (_subsamplingIndex++ % _subsamplingFactor == 0) {
_accumulator.timestamp = reading.timestamp;
//TODO: if sensor starts with values of 0, these won't be pushed. This should be fixed
if( !(_skipConstVal && (_accumulator.value == _lastSentValue.value)) ) {
_readingQueue->push(_accumulator);
_lastSentValue = _accumulator;
}
// We reset the accumulator's value for the correct accumulation of deltas
_accumulator.value = 0;
}
if (_sinkFile) {
_sinkFile->seekp(0, std::ios::beg);
*_sinkFile << reading.value << std::endl;
try {
_sinkFile->seekp(0, std::ios::beg);
*_sinkFile << reading.value << std::endl;
} catch(const std::exception &e) { _sinkFile->close(); _sinkFile.reset(nullptr); }
}
_cache[_cacheIndex] = reading;
......@@ -185,6 +201,7 @@ protected:
reading_t _latestValue;
reading_t _lastRawValue;
reading_t _lastSentValue;
reading_t _accumulator;
std::unique_ptr<boost::lockfree::spsc_queue<reading_t>> _readingQueue;
std::unique_ptr<std::ofstream> _sinkFile;
};
......
......@@ -60,16 +60,26 @@ public:
else
_latestValue.value = value * factor;
if (!(_skipConstVal && (_latestValue.value == _lastSentValue.value)) && _subsamplingIndex++ % _subsamplingFactor == 0) {
_readingQueue->push(_latestValue);
_lastSentValue = _latestValue;
if( _delta )
_accumulator.value += _latestValue.value;
else
_accumulator.value = _latestValue.value;
if (_subsamplingIndex++ % _subsamplingFactor == 0) {
_accumulator.timestamp = _latestValue.timestamp;
if( !(_skipConstVal && (_accumulator.value == _lastSentValue.value)) ) {
_readingQueue->push(_accumulator);
_lastSentValue = _accumulator;
}
_accumulator.value = 0;
}
if (_sinkFile) {
_sinkFile->seekp(0, std::ios::beg);
*_sinkFile << _latestValue.value;
try {
_sinkFile->seekp(0, std::ios::beg);
*_sinkFile << _latestValue.value;
} catch(const std::exception &e) { _sinkFile->close(); _sinkFile.reset(nullptr); }
}
_readingQueue->push(_latestValue);
_cache[_cacheIndex] = _latestValue;
_cacheIndex = (_cacheIndex + 1) % _cacheSize;
}
......
......@@ -50,8 +50,6 @@ ProcfsParser::~ProcfsParser() { this->close(); }
bool ProcfsParser::init(std::vector<ProcfsSBPtr> *sensorVec, std::set<int> *cpuSet) {
if( this->_initialized )
return true;
if ((this->_metricsFile = fopen(this->_path.c_str(), "r")) == NULL)
return false;
// Building the auxiliary sensor map
std::map<std::string, ProcfsSBPtr> *sensorMap = sensorVec != NULL ? new std::map<std::string, ProcfsSBPtr>() : NULL;
if(sensorVec != NULL )
......@@ -149,7 +147,17 @@ bool MeminfoParser::_readNames(std::map<std::string, ProcfsSBPtr> *sensorMap, st
this->_sensors = new std::vector<ProcfsSBPtr>();
this->_skipLine.clear();
char *lineToken, *savePtr;
fseek(this->_metricsFile, 0, SEEK_SET);
// Closing and re-opening a ProcFS file at each sampling time incurs overhead, but solves issues with certain
// files not showing updated values
//fseek(this->_metricsFile, 0, SEEK_SET);
if(this->_metricsFile) {
fclose(this->_metricsFile);
this->_metricsFile = NULL;
}
if ((this->_metricsFile = fopen(this->_path.c_str(), "r")) == NULL)
return false;
// Reading the target file line by line
while( getline(&_stringBuffer, &_chars_read, this->_metricsFile) > 0 ) {
savePtr = NULL;
......@@ -202,7 +210,15 @@ bool MeminfoParser::_readNames(std::map<std::string, ProcfsSBPtr> *sensorMap, st
bool MeminfoParser::_readMetrics() {
if ( this->_readings == NULL )
this->_readings = new std::vector<reading_t>(this->_numMetrics);
fseek(this->_metricsFile, 0, SEEK_SET);
//fseek(this->_metricsFile, 0, SEEK_SET);
if(this->_metricsFile) {
fclose(this->_metricsFile);
this->_metricsFile = NULL;
}
if ((this->_metricsFile = fopen(this->_path.c_str(), "r")) == NULL)
return false;
int lineCtr=0;
unsigned int ctr = 0;
char *lineToken, *savePtr;
......@@ -291,7 +307,14 @@ bool ProcstatParser::_readNames(std::map<std::string, ProcfsSBPtr> *sensorMap, s
this->_numColumns++;
}
fseek(this->_metricsFile, 0, SEEK_SET);
//fseek(this->_metricsFile, 0, SEEK_SET);
if(this->_metricsFile) {
fclose(this->_metricsFile);
this->_metricsFile = NULL;
}
if ((this->_metricsFile = fopen(this->_path.c_str(), "r")) == NULL)
return false;
// Reading the target file line by line
while( getline(&_stringBuffer, &_chars_read, this->_metricsFile) > 0 ) {
savePtr = NULL;
......@@ -374,7 +397,15 @@ bool ProcstatParser::_readNames(std::map<std::string, ProcfsSBPtr> *sensorMap, s
bool ProcstatParser::_readMetrics() {
if ( this->_readings == NULL )
this->_readings = new std::vector<reading_t>(this->_numMetrics);
fseek(this->_metricsFile, 0, SEEK_SET);
//fseek(this->_metricsFile, 0, SEEK_SET);
if(this->_metricsFile) {
fclose(this->_metricsFile);
this->_metricsFile = NULL;
}
if ((this->_metricsFile = fopen(this->_path.c_str(), "r")) == NULL)
return false;
unsigned int ctr = 0, lineCtr = 0, colCtr = 0, parsedCols = 0;
char *lineToken, *cpuToken, *savePtr;
bool nodeCPU = false;
......@@ -454,7 +485,15 @@ bool SARParser::_readMetrics() {
this->_columnBuffer = new unsigned long long[DEFAULTMETRICS]();
this->_columnRawReadings = new unsigned long long[DEFAULTMETRICS * this->_skipLine.size()]();
}
fseek(this->_metricsFile, 0, SEEK_SET);
//fseek(this->_metricsFile, 0, SEEK_SET);
if(this->_metricsFile) {
fclose(this->_metricsFile);
this->_metricsFile = NULL;
}
if ((this->_metricsFile = fopen(this->_path.c_str(), "r")) == NULL)
return false;
unsigned int ctr=0, lineCtr=0, colCtr=0, parsedCols=0;
char *lineToken, *cpuToken, *savePtr;
bool nodeCPU = false;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment