Commit fd7dbb41 authored by Micha Müller's avatar Micha Müller
Browse files

Pusher: Further work on Caliper plugin:

- use std::atomic_flag based spin lock for thread safety
- minor fixes and improvements
parent 2a1cb43b
......@@ -561,7 +561,7 @@ protected:
group->setEntity(&sEntity);
SB_Ptr sensor;
//perhaps one sensor is already present because it was copied from the template group
if (group->aquireSensors().size() != 0) {
if (group->acquireSensors().size() != 0) {
group->releaseSensors();
sensor = std::dynamic_pointer_cast<SBase>(group->acquireSensors()[0]);
group->releaseSensors();
......@@ -579,6 +579,7 @@ protected:
<< val.second.data() << " had a type mismatch when casting! Omitting";
}
} else {
group->releaseSensors();
sensor = std::make_shared<SBase>(val.second.data());
if (readSensorBase(*sensor, val.second)) {
group->pushBackSensor(sensor);
......
......@@ -68,11 +68,11 @@ public:
_groupName(other._groupName),
_mqttPart(other._mqttPart),
_sync(other._sync),
_keepRunning(other._keepRunning),
_keepRunning(false),
_minValues(other._minValues),
_interval(other._interval),
_pendingTasks(0),
_timer(nullptr) {
_pendingTasks.store(other._pendingTasks.load());
}
virtual ~SensorGroupInterface() {}
......@@ -81,10 +81,10 @@ public:
_groupName = other._groupName;
_mqttPart = other._mqttPart;
_sync = other._sync;
_keepRunning = other._keepRunning;
_keepRunning = false;
_minValues = other._minValues;
_interval = other._interval;
_pendingTasks.store(other._pendingTasks.load());
_pendingTasks.store(0);
_timer = nullptr;
return *this;
......
......@@ -34,25 +34,43 @@
#include "timestamp.h"
CaliperSensorGroup::CaliperSensorGroup(const std::string& name) :
SensorGroupTemplate(name),
_socket(-1),
_connection(-1) {
SensorGroupTemplate(name),
_socket(-1),
_connection(-1),
_globalMqttPrefix("") {
_lock.clear();
}
CaliperSensorGroup::CaliperSensorGroup(const CaliperSensorGroup& other) :
SensorGroupTemplate(other),
_socket(-1),
_connection(-1) {
SensorGroupTemplate(other),
_socket(-1),
_connection(-1),
_globalMqttPrefix(other._globalMqttPrefix) {
_lock.clear();
//SensorGroupTemplate already copy constructed _sensor
for (auto& s : _sensors) {
_sensorIndex.insert(std::make_pair(s->getName(), s));
}
}
CaliperSensorGroup::~CaliperSensorGroup() {}
CaliperSensorGroup::~CaliperSensorGroup() {
_sensorIndex.clear();
}
CaliperSensorGroup& CaliperSensorGroup::operator=(const CaliperSensorGroup& other) {
SensorGroupTemplate::operator=(other);
_socket = -1;
_connection = -1;
return *this;
SensorGroupTemplate::operator=(other);
_socket = -1;
_connection = -1;
_globalMqttPrefix = other._globalMqttPrefix;
_lock.clear();
//SensorGroupTemplate already copied _sensor
for (auto& s : _sensors) {
_sensorIndex.insert(std::make_pair(s->getName(), s));
}
return *this;
}
bool CaliperSensorGroup::execOnStart() {
......@@ -97,7 +115,6 @@ void CaliperSensorGroup::execOnStop() {
}
}
//TODO y not terminating?
void CaliperSensorGroup::read() {
if (_connection == -1) {
_connection = accept(_socket, NULL, NULL);
......@@ -125,9 +142,14 @@ void CaliperSensorGroup::read() {
close(_connection);
_connection = -1;
LOG(debug) << _groupName << ": Connection closed";
acquireSensors();
_sensors.clear();
_baseSensors.clear();
releaseSensors();
_sensorIndex.clear();
return;
//nrec==-1 indicates an error during recv()
//if errno==EAGAIN or errno==EWOULDBLOCK there are currently no more messages available to receive
//if errno==EAGAIN or errno==EWOULDBLOCK there are currently just no more messages available to receive
} else if (nrec == -1) {
if (errno != EAGAIN && errno != EWOULDBLOCK) {
LOG(error) << _groupName << ": Recv failed: " << strerror(errno);
......@@ -158,9 +180,10 @@ void CaliperSensorGroup::read() {
s->setName(s->getMqtt());
s->initSensor(_interval);
//TODO lock access!
acquireSensors();
_sensors.push_back(s);
_baseSensors.push_back(s);
releaseSensors();
_sensorIndex.insert(std::make_pair(feName, s));
}
s->storeReading(reading);
......
......@@ -31,6 +31,7 @@
#include "CaliperSensorBase.h"
#include <atomic>
#include <unordered_map>
/**
......@@ -48,7 +49,15 @@ public:
bool execOnStart() final override;
void execOnStop() final override;
//TODO overwrite acquire/release-sensors()
//TODO spins forever ?! why !?!? probably some evil spirit does not release sensors after acquiring them
std::vector<SBasePtr>& acquireSensors() final override {
while (_lock.test_and_set(std::memory_order_acquire)) {}
return _baseSensors;
}
void releaseSensors() final override {
_lock.clear(std::memory_order_release);
}
void printGroupConfig(LOG_LEVEL ll) final override;
......@@ -61,6 +70,7 @@ private:
int _connection;
std::string _globalMqttPrefix;
std::atomic_flag _lock; ///< Lock to synchronize access to associated sensors
std::unordered_map<std::string, S_Ptr> _sensorIndex; ///< Additional sensor storage for fast lookup
};
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment