Commit f271b192 authored by Micha Mueller's avatar Micha Mueller
Browse files

Add counter for pending tasks to sensors. Wait until all tasks are finished...

Add counter for pending tasks to sensors. Wait until all tasks are finished before deconstructing anything
parent 8887cb79
......@@ -389,7 +389,7 @@ The existence of the perf_event_paranoid file is the official method for determi
## snmp
The SNMP plugin enables dcdbpusher to talk with devices which have an SNMP agent running and query requests from them. A SNMP sensor corresponds to a single value as identified by the unique OID. Sensors are aggregated by connections. See the exemplary snmp.conf file in the `config/` directory.
> NOTE     In the SNMP context the word privacy is used synonymously for encription.
> NOTE     In the SNMP context the word privacy is used synonymously for encryption.
| Value | Explanation |
|:----- |:----------- |
......
......@@ -50,11 +50,14 @@ public:
for (auto s : _sensors) {
s->stopPolling();
}
for (auto s : _sensors) {
s->wait();
}
for (auto s : _sensors) {
delete s;
}
_sensors.clear();
return this->readConfig(_cfgPath);
return true;
}
virtual void setGlobalSettings(const pluginSettings_t& pluginSettings) {
......
......@@ -13,7 +13,7 @@
Sensor::Sensor(const std::string name) :
_name(name), _mqtt(""), _keepRunning(0), _minValues(1), _interval(1000),
_cacheInterval(900000), _cacheSize(0), _cacheIndex(0) {
_cacheInterval(900000), _cacheSize(0), _cacheIndex(0), _pendingTasks(0) {
_latestValue.timestamp = 0;
_latestValue.value = 0;
......@@ -112,3 +112,9 @@ void Sensor::storeReading(reading_t reading) {
_cache[_cacheIndex] = reading;
_cacheIndex = (_cacheIndex + 1) % _cacheSize;
}
void Sensor::wait() {
while(_pendingTasks) {
sleep(1);
}
}
......@@ -62,6 +62,11 @@ public:
virtual void startPolling() = 0;
virtual void stopPolling() = 0;
/**
* Does a busy wait until all dispatched handlers are finished (_pendingTasks == 0)
*/
void wait();
protected:
void storeReading(reading_t reading);
......@@ -73,6 +78,7 @@ protected:
unsigned int _cacheInterval;
unsigned int _cacheSize;
unsigned int _cacheIndex;
unsigned int _pendingTasks;
reading_t * _cache;
reading_t _latestValue;
boost::asio::deadline_timer* _timer;
......
......@@ -36,14 +36,13 @@ public:
//Overwrite from Configurator
bool reReadConfig() {
for (auto s : _sensors) {
s->stopPolling();
}
sleep(10);
Configurator::reReadConfig();
delete _bacClient;
_bacClient = NULL;
_templateSensors.clear();
return Configurator::reReadConfig();
return readConfig(_cfgPath);
}
private:
......
......@@ -47,8 +47,10 @@ void BACnetSensor::readAsync() {
if (_timer != NULL && _keepRunning) {
uint64_t next = now + MS_TO_NS(_interval);
_timer->expires_at(timestamp2ptime(next));
_pendingTasks++;
_timer->async_wait(_bacClient->getStrand()->wrap(std::bind(&BACnetSensor::readAsync, this)));
}
_pendingTasks--;
}
void BACnetSensor::init(boost::asio::io_service& io) {
......@@ -69,6 +71,7 @@ void BACnetSensor::startPolling() {
if (_bacClient) {
_keepRunning = 1;
_pendingTasks++;
_timer->async_wait(_bacClient->getStrand()->wrap(std::bind(&BACnetSensor::readAsync, this)));
LOG(info) << "Sensor " << _name << " started.";
} else {
......
......@@ -43,13 +43,12 @@ namespace DCDB {
//Overwrite from Configurator
bool reReadConfig() {
for (auto s : _sensors) {
s->stopPolling();
}
sleep(10);
Configurator::reReadConfig();
_hosts.clear();
_templateSensors.clear();
return Configurator::reReadConfig();
return readConfig(_cfgPath);
}
private:
......
......@@ -65,8 +65,10 @@ namespace DCDB {
next+= MS_TO_NS(_interval);
}
_timer->expires_at(timestamp2ptime(next));
_pendingTasks++;
_timer->async_wait(_host->getStrand()->wrap(boost::bind(&IPMISensor::readAsync, this)));
}
_pendingTasks--;
}
void IPMISensor::init(boost::asio::io_service& io) {
......@@ -87,6 +89,7 @@ namespace DCDB {
if (_host) {
_keepRunning = 1;
_pendingTasks++;
_timer->async_wait(_host->getStrand()->wrap(boost::bind(&IPMISensor::readAsync, this)));
LOG(info) << "Sensor " << _name << " started.";
} else {
......
......@@ -36,13 +36,12 @@ public:
//Overwrite from Configurator
bool reReadConfig() {
for (auto s : _sensors) {
s->stopPolling();
}
sleep(10);
Configurator::reReadConfig();
_pdus.clear();
_templateSensors.clear();
return Configurator::reReadConfig();
return readConfig(_cfgPath);
}
private:
......
......@@ -41,8 +41,10 @@ void PDUSensor::readAsync() {
if (_timer != NULL && _keepRunning) {
uint64_t next = now + MS_TO_NS(_interval);
_timer->expires_at(timestamp2ptime(next));
_pendingTasks++;
_timer->async_wait(_pdu->getStrand()->wrap(std::bind(&PDUSensor::readAsync, this)));
}
_pendingTasks--;
}
void PDUSensor::init(boost::asio::io_service& io) {
......@@ -63,6 +65,7 @@ void PDUSensor::startPolling() {
if (_pdu) {
_keepRunning = 1;
_pendingTasks++;
_timer->async_wait(_pdu->getStrand()->wrap(std::bind(&PDUSensor::readAsync, this)));
LOG(info) << "Sensor " << _name << " started.";
} else {
......
......@@ -59,8 +59,10 @@ void PerfCounter::readAsync() {
if (_timer != NULL && _keepRunning) {
uint64_t next = now + MS_TO_NS(_interval);
_timer->expires_at(timestamp2ptime(next));
_pendingTasks++;
_timer->async_wait(std::bind(&PerfCounter::readAsync, this));
}
_pendingTasks--;
}
void PerfCounter::startPolling() {
......@@ -93,14 +95,13 @@ void PerfCounter::startPolling() {
ioctl(_fd, PERF_EVENT_IOC_ENABLE, 0);
_keepRunning = 1;
_pendingTasks++;
_timer->async_wait(std::bind(&PerfCounter::readAsync, this));
LOG(info) << "Sensor " << _name << " started.";
}
void PerfCounter::stopPolling() {
_keepRunning = 0;
//cancel any outstanding readAsync()
_timer->cancel();
if(_fd != -1) {
close(_fd);
_fd = -1;
......
......@@ -36,9 +36,10 @@ public:
//Overwrite from Configurator
bool reReadConfig() {
Configurator::reReadConfig();
_templateCounters.clear();
_templateCpus.clear();
return Configurator::reReadConfig();
return readConfig(_cfgPath);
}
private:
......
......@@ -37,13 +37,12 @@ public:
//Overwrite from Configurator
bool reReadConfig() {
for (auto s : _sensors) {
s->stopPolling();
}
sleep(10);
Configurator::reReadConfig();
_connections.clear();
_templateSensors.clear();
return Configurator::reReadConfig();
return readConfig(_cfgPath);
}
private:
......
......@@ -42,8 +42,10 @@ void SNMPSensor::readAsync() {
if (_timer != NULL && _keepRunning) {
uint64_t next = now + MS_TO_NS(_interval);
_timer->expires_at(timestamp2ptime(next));
_pendingTasks++;
_timer->async_wait(_connection->getStrand()->wrap(std::bind(&SNMPSensor::readAsync, this)));
}
_pendingTasks--;
}
void SNMPSensor::init(boost::asio::io_service& io) {
......@@ -64,6 +66,7 @@ void SNMPSensor::startPolling() {
if (_connection) {
_keepRunning = 1;
_pendingTasks++;
_timer->async_wait(_connection->getStrand()->wrap(std::bind(&SNMPSensor::readAsync, this)));
LOG(info) << "Sensor " << _name << " started.";
} else {
......@@ -74,7 +77,5 @@ void SNMPSensor::startPolling() {
void SNMPSensor::stopPolling() {
_keepRunning = 0;
//cancel any outstanding readAsync()
_timer->cancel();
LOG(info) << "Sensor " << _name << " stopped.";
}
......@@ -33,8 +33,9 @@ public:
//Overwrite from Configurator
bool reReadConfig() {
Configurator::reReadConfig();
_templateSensors.clear();
return Configurator::reReadConfig();
return readConfig(_cfgPath);
}
private:
......
......@@ -65,8 +65,10 @@ void SysfsSensor::readAsync() {
if (_timer != NULL && _keepRunning) {
uint64_t next = now + MS_TO_NS(_interval);
_timer->expires_at(timestamp2ptime(next));
_pendingTasks++;
_timer->async_wait(std::bind(&SysfsSensor::readAsync, this));
}
_pendingTasks--;
}
void SysfsSensor::startPolling() {
......@@ -85,16 +87,15 @@ void SysfsSensor::startPolling() {
}
_keepRunning = 1;
_pendingTasks++;
_timer->async_wait(std::bind(&SysfsSensor::readAsync, this));
LOG(info) << "Sensor " << _name << " started.";
}
void SysfsSensor::stopPolling() {
_keepRunning = 0;
//cancel any outstanding readAsync()
_timer->cancel();
//wait until read() finished before closing _file
_timer->wait();
//wait before closing _file
wait();
if (_file != NULL) {
fclose(_file);
_file = NULL;
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment