2.12.2021, 9:00 - 11:00: Due to updates GitLab may be unavailable for some minutes between 09:00 and 11:00.

Commit 4ea9ea32 authored by CAMP C++ Builder's avatar CAMP C++ Builder
Browse files

fixed reconnect problems by removing inheritance to Runnable and implementing own logic

parent 3424e596
......@@ -52,7 +52,13 @@ namespace campvis {
, p_voxelSize("VoxelSize", "Voxel Size in mm", tgt::vec3(1.f), tgt::vec3(-100.f), tgt::vec3(100.f), tgt::vec3(0.1f))
, p_receivePositions("ReceivePositions", "Receive POSITION Messages", true)
, p_targetPositionPrefix("targetPositionsPrefix", "Target Position Prefix", "IGTL.position.")
, _stopExecution()
, _receiverThread(nullptr)
, _receiverRunning()
{
_stopExecution = false;
_receiverRunning = false;
addProperty(p_address, VALID);
addProperty(p_port, VALID);
addProperty(p_deviceName, VALID);
......@@ -70,20 +76,28 @@ namespace campvis {
addProperty(p_targetPositionPrefix, VALID);
invalidate(INVALID_PROPERTIES);
}
OpenIGTLinkClient::~OpenIGTLinkClient() {
if (_receiverRunning)
stopReceiver();
if (_receiverThread)
delete _receiverThread;
}
void OpenIGTLinkClient::init() {
p_connect.s_clicked.connect(this, &OpenIGTLinkClient::connectToServer);
p_connect.s_clicked.connect(this, &OpenIGTLinkClient::connect);
p_disconnect.s_clicked.connect(this, &OpenIGTLinkClient::disconnect);
p_disconnect.setVisible(false);
}
void OpenIGTLinkClient::deinit() {
stopReceiver();
p_connect.s_clicked.disconnect(this);
stop(); //stop the receiving thread
p_disconnect.s_clicked.disconnect(this);
}
void OpenIGTLinkClient::updateResult(DataContainer& data) {
......@@ -111,7 +125,7 @@ namespace campvis {
igtl::ImageMessage::Pointer imageMessage = it->second;
WeaklyTypedPointer wtp;
wtp._pointer = new uint8_t[imageMessage->GetImageSize()];
#ifdef IGTL_CLIENT_EBUGGING
#ifdef IGTL_CLIENT_DEBUGGING
LDEBUG("Image has " << imageMessage->GetNumComponents() << " components and is of size " << imageMessage->GetImageSize());
#endif
memcpy(wtp._pointer, imageMessage->GetScalarPointer(), imageMessage->GetImageSize());
......@@ -183,14 +197,13 @@ namespace campvis {
validate(INVALID_PROPERTIES);
}
void OpenIGTLinkClient::connectToServer() {
void OpenIGTLinkClient::connect() {
if(_socket && _socket->GetConnected()) {
LWARNING("Already connected!");
return;
}
_socket = igtl::ClientSocket::New();
int r = _socket->ConnectToServer(p_address.getValue().c_str(), p_port.getValue());
......@@ -203,23 +216,21 @@ namespace campvis {
LINFO("Connected to server " << p_address.getValue() << ":" << p_port.getValue());
start(); //start receiving data in a new thread
startReceiver(); //start receiving data in a new thread
validate(INVALID_RESULT);
}
void OpenIGTLinkClient::disconnect() {
stop();
_socket = nullptr;
stopReceiver(); //sends the stop message to the receiver thread
LINFO("Disconnected.");
}
int OpenIGTLinkClient::ReceiveTransform(igtl::Socket * socket, igtl::MessageHeader::Pointer& header)
{
#ifdef IGTL_CLIENT_DEBUGGING
LDEBUG("Receiving TRANSFORM data type.");
#endif
// Create a message buffer to receive transform data
igtl::TransformMessage::Pointer transMsg;
transMsg = igtl::TransformMessage::New();
......@@ -238,8 +249,10 @@ namespace campvis {
tgt::mat4 mtx;
// Retrieve the transform data (this cast is a bit dubious but should be ok judging from the respective class internals)
transMsg->GetMatrix(*reinterpret_cast<igtl::Matrix4x4*>(mtx.elem));
#ifdef IGTL_CLIENT_DEBUGGING
igtl::PrintMatrix(*reinterpret_cast<igtl::Matrix4x4*>(mtx.elem));
std::cerr << std::endl;
#endif
_transformMutex.lock();
_receivedTransforms[transMsg->GetDeviceName()] = mtx;
......@@ -254,7 +267,9 @@ namespace campvis {
int OpenIGTLinkClient::ReceivePosition(igtl::Socket * socket, igtl::MessageHeader::Pointer& header)
{
#ifdef IGTL_CLIENT_DEBUGGING
LDEBUG("Receiving POSITION data type.");
#endif
// Create a message buffer to receive position data
igtl::PositionMessage::Pointer positionMsg;
......@@ -281,9 +296,12 @@ namespace campvis {
_receivedPositions[positionMsg->GetDeviceName()] = pmd;
_positionMutex.unlock();
#ifdef IGTL_CLIENT_DEBUGGING
std::cerr << "position = (" << pmd._position[0] << ", " << pmd._position[1] << ", " << pmd._position[2] << ")" << std::endl;
std::cerr << "quaternion = (" << pmd._quaternion[0] << ", " << pmd._quaternion[1] << ", "
<< pmd._quaternion[2] << ", " << pmd._quaternion[3] << ")" << std::endl << std::endl;
#endif
invalidate(INVALID_RESULT);
return 1;
......@@ -294,7 +312,9 @@ namespace campvis {
int OpenIGTLinkClient::ReceiveImage(igtl::Socket * socket, igtl::MessageHeader::Pointer& header)
{
#ifdef IGTL_CLIENT_DEBUGGING
LDEBUG("Receiving IMAGE data type.");
#endif
// Create a message buffer to receive transform data
igtl::ImageMessage::Pointer imgMsg;
......@@ -305,7 +325,7 @@ namespace campvis {
// Receive image data from the socket
socket->Receive(imgMsg->GetPackBodyPointer(), imgMsg->GetPackBodySize());
// Deserialize the transform data
// Deserialize the data
// If you want to skip CRC check, call Unpack() without argument.
int c = imgMsg->Unpack(1);
......@@ -336,6 +356,7 @@ namespace campvis {
_receivedTransforms[imgMsg->GetDeviceName()] = mtx;
_transformMutex.unlock();
#ifdef IGTL_CLIENT_DEBUGGING
std::cerr << "Device Name : " << imgMsg->GetDeviceName() << std::endl;
std::cerr << "Scalar Type : " << scalarType << std::endl;
std::cerr << "Dimensions : ("
......@@ -347,8 +368,10 @@ namespace campvis {
std::cerr << "Sub-Volume offset : ("
<< svoffset[0] << ", " << svoffset[1] << ", " << svoffset[2] << ")" << std::endl << std::endl;
igtl::PrintMatrix(*reinterpret_cast<igtl::Matrix4x4*>(mtx.elem));
std::cout << std::endl;
#endif
invalidate(INVALID_RESULT);
return 1;
......@@ -358,7 +381,7 @@ namespace campvis {
}
void OpenIGTLinkClient::run()
void OpenIGTLinkClient::runReceiverThread()
{
igtl::MessageHeader::Pointer headerMsg;
igtl::TimeStamp::Pointer ts;
......@@ -369,7 +392,7 @@ namespace campvis {
p_connect.setVisible(false);
p_disconnect.setVisible(true);
while (!_stopExecution && _socket)
while (!_stopExecution && _socket && _socket->GetConnected())
{
// Initialize receive buffer
headerMsg->InitPack();
......@@ -463,7 +486,7 @@ namespace campvis {
//#endif //OpenIGTLink_PROTOCOL_VERSION >= 2
else
{
LDEBUG("Receiving : " << headerMsg->GetDeviceType() << " which is not Handled!");
LDEBUG("Received IGTL Message Type " << headerMsg->GetDeviceType() << " which is not Handled!");
_socket->Skip(headerMsg->GetBodySizeToRead(), 0);
}
}
......@@ -473,4 +496,31 @@ namespace campvis {
p_disconnect.setVisible(false);
p_connect.setVisible(true);
}
void OpenIGTLinkClient::stopReceiver() {
if (!_receiverRunning || _receiverThread == 0)
return;
_stopExecution = true;
try {
if (_receiverThread->joinable())
_receiverThread->join();
_receiverRunning = false;
}
catch (std::exception& e) {
LERRORC("CAMPVis.modules.base.MatrixProcessor", "Caught exception during _thread.join: " << e.what());
}
}
void OpenIGTLinkClient::startReceiver() {
stopReceiver(); //make sure we are stopped and reset
_stopExecution = false;
_receiverThread = new std::thread([this]() {
runReceiverThread();
});
_receiverRunning = true;
}
}
\ No newline at end of file
......@@ -37,6 +37,7 @@
#include <tgt/matrix.h>
#include <tbb/compat/thread>
#include <tbb/atomic.h>
#include <tbb/mutex.h>
......@@ -45,7 +46,6 @@
#include "core/properties/datanameproperty.h"
#include "core/properties/floatingpointproperty.h"
#include "core/tools/runnable.h"
#include "core/datastructures/imagedata.h"
......@@ -56,7 +56,7 @@ namespace campvis {
* and p_receiveImage and puts them into the received data into the respective data containers.
* This Class contains modified code from the OpenIGTLink ReceiveClient example.
*/
class OpenIGTLinkClient : public AbstractProcessor, public Runnable {
class OpenIGTLinkClient : public AbstractProcessor {
public:
/**
* Constructs a new CampcomMhdReceiver Processor
......@@ -111,7 +111,7 @@ namespace campvis {
virtual void updateProperties(DataContainer& dataContainer);
/// Callback slot for connect button. can also be called from outside.
void connectToServer();
void connect();
/// Callback slot for disconnect button. can also be called from outside.
void disconnect();
......@@ -122,9 +122,9 @@ namespace campvis {
tgt::vec4 _quaternion;
};
/// Implements the \a Runnable::run() method to execute a new thread. The new thread will
/// Main method for the receiver thread. The new thread will
/// go into a receive loop to receive the OpenIGTLink messages asynchronously
virtual void run();
virtual void runReceiverThread();
/// Receive a TRANSFORM message from the OpenIGTLink socket and put the data into the local buffers
int ReceiveTransform(igtl::Socket* socket, igtl::MessageHeader::Pointer& header);
......@@ -135,7 +135,7 @@ namespace campvis {
/// Receive a IMAGE message from the OpenIGTLink socket and put into the local buffers
int ReceiveImage(igtl::Socket* socket, igtl::MessageHeader::Pointer& header);
//connection
//igtl connection
igtl::ClientSocket::Pointer _socket;
//data
......@@ -143,11 +143,23 @@ namespace campvis {
std::map<std::string, igtl::ImageMessage::Pointer> _receivedImages; ///< the image messages received by the igtl worker thread, mapped by device name
std::map<std::string, PositionMessageData> _receivedPositions; ///< position message data received by the igtl worker thread, mapped by device name
tbb::mutex _transformMutex; ///< mutex to control access to the _receivedTransforms pointer
tbb::mutex _imageMutex; ///< mutex to control access to the _receivedImages pointer
tbb::mutex _transformMutex; ///< mutex to control access to _receivedTransforms
tbb::mutex _imageMutex; ///< mutex to control access to _receivedImages
tbb::mutex _positionMutex; ///< mutex to control access to _receivedPositions
static const std::string loggerCat_;
// this is thread management stuff
// very similar to the Runnable base class
private:
/// Start the receiver thread
void startReceiver();
/// stop the receiver thread
void stopReceiver();
tbb::atomic<bool> _stopExecution; ///< Flag whether the thread should stop
std::thread* _receiverThread; ///< Thread of the Runnable
tbb::atomic<bool> _receiverRunning;
};
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment