Commit 30e14065 authored by Christian Schulte zu Berge's avatar Christian Schulte zu Berge
Browse files

Started working on Issue implementing asynchroneous signals with signalslot:

* Moved campvis::Runnable interface to tgt namespace (since it's needed by sigslot, which only depends on tgt)
* Introduced sigslot::signal_manager singleton class that will manage the dispatching of signals in its own thread
* Started proof-of-concept implementation of asynchroneous signals for signal0<> and signal1<>. Both classes define their own signal_handleN deriving from _signal_handle_base, which defines the signal to dispatch.

Proof-of-concept implementation seems to work so far.

refs #384

Conflicts:
	core/tools/opengljobprocessor.h
	ext/tgt/runnable.h

Conflicts:
	application/CMakeLists.txt
	core/tools/opengljobprocessor.h
parent de26ca34
...@@ -45,6 +45,7 @@ MESSAGE(STATUS "---------------------------------------------------------------- ...@@ -45,6 +45,7 @@ MESSAGE(STATUS "----------------------------------------------------------------
IF(CAMPVIS_BUILD_LIB_TGT) IF(CAMPVIS_BUILD_LIB_TGT)
ADD_SUBDIRECTORY(ext/tgt) ADD_SUBDIRECTORY(ext/tgt)
ADD_SUBDIRECTORY(ext/sigslot)
ENDIF() ENDIF()
IF(CAMPVIS_BUILD_LIB_LUA) IF(CAMPVIS_BUILD_LIB_LUA)
......
...@@ -94,7 +94,7 @@ LIST(APPEND CampvisApplicationSources ${CampvisApplicationFormsHeaders}) ...@@ -94,7 +94,7 @@ LIST(APPEND CampvisApplicationSources ${CampvisApplicationFormsHeaders})
LINK_DIRECTORIES(${CampvisGlobalLinkDirectories} ${CampvisModulesLinkDirectories}) LINK_DIRECTORIES(${CampvisGlobalLinkDirectories} ${CampvisModulesLinkDirectories})
SET(CampvisMainLibs campvis-core campvis-modules tgt) SET(CampvisMainLibs campvis-core campvis-modules tgt sigslot)
IF(CAMPVIS_ENABLE_SCRIPTING) IF(CAMPVIS_ENABLE_SCRIPTING)
LIST(APPEND CampvisMainLibs campvis-scripting) LIST(APPEND CampvisMainLibs campvis-scripting)
......
...@@ -67,6 +67,8 @@ namespace campvis { ...@@ -67,6 +67,8 @@ namespace campvis {
// Make Xlib and GLX thread safe under X11 // Make Xlib and GLX thread safe under X11
QApplication::setAttribute(Qt::AA_X11InitThreads); QApplication::setAttribute(Qt::AA_X11InitThreads);
sigslot::signal_manager::init();
sigslot::signal_manager::getRef().start();
_mainWindow = new MainWindow(this); _mainWindow = new MainWindow(this);
tgt::GlContextManager::init(); tgt::GlContextManager::init();
...@@ -85,6 +87,9 @@ namespace campvis { ...@@ -85,6 +87,9 @@ namespace campvis {
for (std::vector<DataContainer*>::iterator it = _dataContainers.begin(); it != _dataContainers.end(); ++it) { for (std::vector<DataContainer*>::iterator it = _dataContainers.begin(); it != _dataContainers.end(); ++it) {
delete *it; delete *it;
} }
sigslot::signal_manager::getRef().stop();
sigslot::signal_manager::deinit();
} }
void CampVisApplication::init() { void CampVisApplication::init() {
......
...@@ -53,7 +53,7 @@ ADD_LIBRARY(campvis-core ...@@ -53,7 +53,7 @@ ADD_LIBRARY(campvis-core
) )
ADD_DEFINITIONS(${CampvisGlobalDefinitions}) ADD_DEFINITIONS(${CampvisGlobalDefinitions})
INCLUDE_DIRECTORIES(${CampvisGlobalIncludeDirs}) INCLUDE_DIRECTORIES(${CampvisGlobalIncludeDirs})
TARGET_LINK_LIBRARIES(campvis-core tgt ${CampvisGlobalExternalLibs}) TARGET_LINK_LIBRARIES(campvis-core tgt sigslot ${CampvisGlobalExternalLibs})
# if campvis-core is built as a shared library, CMake will define the following flag to instruct # if campvis-core is built as a shared library, CMake will define the following flag to instruct
# the code to export DLL symbols # the code to export DLL symbols
......
...@@ -26,6 +26,8 @@ ...@@ -26,6 +26,8 @@
#define OPENGLJOBPROCESSOR_H__ #define OPENGLJOBPROCESSOR_H__
#include "sigslot/sigslot.h" #include "sigslot/sigslot.h"
#include "tgt/runnable.h"
#include "tgt/singleton.h"
#include <ext/threading.h> #include <ext/threading.h>
...@@ -39,7 +41,6 @@ ...@@ -39,7 +41,6 @@
#include "core/coreapi.h" #include "core/coreapi.h"
#include "core/tools/job.h" #include "core/tools/job.h"
#include "core/tools/runnable.h"
#include <ctime> #include <ctime>
...@@ -68,7 +69,7 @@ namespace campvis { ...@@ -68,7 +69,7 @@ namespace campvis {
* *
* This class is to be considered as thread-safe. * This class is to be considered as thread-safe.
*/ */
class CAMPVIS_CORE_API OpenGLJobProcessor : public tgt::Singleton<OpenGLJobProcessor>, public Runnable, public sigslot::has_slots<> { class CAMPVIS_CORE_API OpenGLJobProcessor : public tgt::Singleton<OpenGLJobProcessor>, public tgt::Runnable, public sigslot::has_slots<> {
friend class tgt::Singleton<OpenGLJobProcessor>; friend class tgt::Singleton<OpenGLJobProcessor>;
public: public:
......
################################################################################
# Project file for the sigslot library
################################################################################
PROJECT(sigslot)
CMAKE_MINIMUM_REQUIRED(VERSION 2.8.0 FATAL_ERROR)
INCLUDE(../../cmake/commonconf.cmake)
MESSAGE(STATUS "Configuring sigslot Library")
# configuration
# headers
SET(SIGSLOT_HEADERS sigslot.h)
# sources
SET(SIGSLOT_SOURCES sigslot.cpp)
################################################################################
# define library target
################################################################################
ADD_LIBRARY(sigslot ${SIGSLOT_HEADERS} ${SIGSLOT_SOURCES})
ADD_DEFINITIONS(${CampvisGlobalDefinitions})
IF(CAMPVIS_SHARED_LIBS AND MSVC)
ADD_DEFINITIONS("-DTGT_BUILD_DLL")
ENDIF()
INCLUDE_DIRECTORIES(${CampvisGlobalIncludeDirs})
TARGET_LINK_LIBRARIES(sigslot tgt)
################################################################################
# deployment
################################################################################
IF(CAMPVIS_ADD_INSTALL_TARGET)
INSTALL(TARGETS sigslot
RUNTIME DESTINATION .
)
ENDIF()
#include "sigslot.h"
namespace sigslot {
signal_manager::signal_manager() {
}
signal_manager::~signal_manager() {
}
void signal_manager::triggerSignal(_signal_handle_base* signal) const {
signal->emitSignal();
delete signal;
}
bool signal_manager::queueSignal(_signal_handle_base* signal) {
tgtAssert(signal != 0, "Signal must not be 0.");
if (signal == 0)
return false;
_signalQueue.push(signal);
_evaluationCondition.notify_all();
return true;
}
void signal_manager::run() {
std::unique_lock<tbb::mutex> lock(_ecMutex);
while (! _stopExecution) {
// try pop the next event from the event queue
_signal_handle_base* signal;
if (_signalQueue.try_pop(signal)) {
signal->emitSignal();
delete signal;
}
else {
// there currently is no event in this queue -> go sleep
_evaluationCondition.wait(lock);
}
}
}
void signal_manager::stop() {
_evaluationCondition.notify_all();
tgt::Runnable::stop();
}
const std::string signal_manager::loggerCat_;
}
...@@ -92,8 +92,14 @@ ...@@ -92,8 +92,14 @@
#include <set> #include <set>
#include <list> #include <list>
#include <tbb/compat/condition_variable>
#include <tbb/concurrent_queue.h>
#include <tbb/spin_rw_mutex.h> #include <tbb/spin_rw_mutex.h>
#include "ext/tgt/runnable.h"
#include "ext/tgt/singleton.h"
#ifndef SIGSLOT_DEFAULT_MT_POLICY #ifndef SIGSLOT_DEFAULT_MT_POLICY
# ifdef _SIGSLOT_SINGLE_THREADED # ifdef _SIGSLOT_SINGLE_THREADED
# define SIGSLOT_DEFAULT_MT_POLICY single_threaded # define SIGSLOT_DEFAULT_MT_POLICY single_threaded
...@@ -104,6 +110,53 @@ ...@@ -104,6 +110,53 @@
namespace sigslot { namespace sigslot {
class _signal_handle_base {
public:
virtual ~_signal_handle_base() {};
virtual void emitSignal() const = 0;
};
class signal_manager : public tgt::Singleton<signal_manager>, public tgt::Runnable {
friend class tgt::Singleton<signal_manager>;
public:
/**
* Directly dispatches the signal \a signal to all currently registered listeners.
* \note For using threaded signal dispatching use queueSignal()
* \param signal signal to dispatch
*/
void triggerSignal(_signal_handle_base* signal) const;
/**
* Enqueue \a signal into the list of signals to be dispatched.
* \note Dispatch will be perfomed in signal_mananger thread. For direct dispatch in
* caller thread use triggerSignal().
* \param signal signal to dispatch
* \return True, if \a signal was successfully enqueued to signal queue.
*/
bool queueSignal(_signal_handle_base* signal);
/// \see Runnable:run
virtual void run();
/// \see Runnable:stop
virtual void stop();
private:
/// Private constructor only for singleton
signal_manager();
/// Private destructor only for singleton
~signal_manager();
typedef tbb::concurrent_queue<_signal_handle_base*> SignalQueue;
SignalQueue _signalQueue; ///< Queue for signals to be dispatched
std::condition_variable _evaluationCondition; ///< conditional wait to be used when there are currently no jobs to process
tbb::mutex _ecMutex; ///< Mutex for protecting _evaluationCondition
static const std::string loggerCat_;
};
typedef tbb::spin_rw_mutex tbb_mutex; typedef tbb::spin_rw_mutex tbb_mutex;
// The multi threading policies only get compiled in if they are enabled. // The multi threading policies only get compiled in if they are enabled.
...@@ -182,6 +235,7 @@ namespace sigslot { ...@@ -182,6 +235,7 @@ namespace sigslot {
} }
}; };
template<class mt_policy> template<class mt_policy>
class has_slots; class has_slots;
...@@ -1737,7 +1791,33 @@ namespace sigslot { ...@@ -1737,7 +1791,33 @@ namespace sigslot {
typedef _signal_base0<mt_policy> base; typedef _signal_base0<mt_policy> base;
typedef typename base::connections_list connections_list; typedef typename base::connections_list connections_list;
using base::m_connected_slots; using base::m_connected_slots;
class signal_handle0 : public _signal_handle_base {
public:
signal_handle0(signal0<mt_policy>* sender)
: _sender(sender)
{};
~signal_handle0() {};
// override
void emitSignal() const {
lock_block_read<mt_policy> lock(_sender);
typename connections_list::const_iterator itNext, it = _sender->m_connected_slots.begin();
typename connections_list::const_iterator itEnd = _sender->m_connected_slots.end();
while (it != itEnd) {
itNext = it;
++itNext;
(*it)->emitSignal();
it = itNext;
}
};
signal0<mt_policy>* _sender;
};
signal0() {} signal0() {}
signal0(const signal0<mt_policy>& s) signal0(const signal0<mt_policy>& s)
...@@ -1755,30 +1835,14 @@ namespace sigslot { ...@@ -1755,30 +1835,14 @@ namespace sigslot {
void emitSignal() void emitSignal()
{ {
lock_block_read<mt_policy> lock(this); signal_handle0* sh = new signal_handle0(this);
typename connections_list::const_iterator itNext, it = m_connected_slots.begin(); signal_manager::getRef().triggerSignal(sh);
typename connections_list::const_iterator itEnd = m_connected_slots.end();
while (it != itEnd) {
itNext = it;
++itNext;
(*it)->emitSignal();
it = itNext;
}
} }
void operator()() void operator()()
{ {
lock_block_read<mt_policy> lock(this); signal_handle0* sh = new signal_handle0(this);
typename connections_list::const_iterator itNext, it = m_connected_slots.begin(); signal_manager::getRef().queueSignal(sh);
typename connections_list::const_iterator itEnd = m_connected_slots.end();
while (it != itEnd) {
itNext = it;
++itNext;
(*it)->emitSignal();
it = itNext;
}
} }
}; };
...@@ -1789,7 +1853,34 @@ namespace sigslot { ...@@ -1789,7 +1853,34 @@ namespace sigslot {
typedef _signal_base1<arg1_type, mt_policy> base; typedef _signal_base1<arg1_type, mt_policy> base;
typedef typename base::connections_list connections_list; typedef typename base::connections_list connections_list;
using base::m_connected_slots; using base::m_connected_slots;
class signal_handle1 : public _signal_handle_base {
public:
signal_handle1(signal1<arg1_type, mt_policy>* sender, arg1_type a1)
: _sender(sender)
, _a1(a1)
{};
~signal_handle1() {};
// override
void emitSignal() const {
lock_block_read<mt_policy> lock(_sender);
typename connections_list::const_iterator itNext, it = _sender->m_connected_slots.begin();
typename connections_list::const_iterator itEnd = _sender->m_connected_slots.end();
while (it != itEnd) {
itNext = it;
++itNext;
(*it)->emitSignal(_a1);
it = itNext;
}
};
signal1<arg1_type, mt_policy>* _sender;
arg1_type _a1;
};
signal1() {} signal1() {}
signal1(const signal1<arg1_type, mt_policy>& s) signal1(const signal1<arg1_type, mt_policy>& s)
...@@ -1807,30 +1898,14 @@ namespace sigslot { ...@@ -1807,30 +1898,14 @@ namespace sigslot {
void emitSignal(arg1_type a1) void emitSignal(arg1_type a1)
{ {
lock_block_read<mt_policy> lock(this); signal_handle1* sh = new signal_handle1(this, a1);
typename connections_list::const_iterator itNext, it = m_connected_slots.begin(); signal_manager::getRef().triggerSignal(sh);
typename connections_list::const_iterator itEnd = m_connected_slots.end();
while (it != itEnd) {
itNext = it;
++itNext;
(*it)->emitSignal(a1);
it = itNext;
}
} }
void operator()(arg1_type a1) void operator()(arg1_type a1)
{ {
lock_block_read<mt_policy> lock(this); signal_handle1* sh = new signal_handle1(this, a1);
typename connections_list::const_iterator itNext, it = m_connected_slots.begin(); signal_manager::getRef().queueSignal(sh);
typename connections_list::const_iterator itEnd = m_connected_slots.end();
while (it != itEnd) {
itNext = it;
++itNext;
(*it)->emitSignal(a1);
it = itNext;
}
} }
}; };
......
...@@ -43,6 +43,7 @@ SET(TGT_SOURCES ...@@ -43,6 +43,7 @@ SET(TGT_SOURCES
naturalcubicspline.cpp naturalcubicspline.cpp
openglgarbagecollector.cpp openglgarbagecollector.cpp
painter.cpp painter.cpp
runnable.cpp
shadermanager.cpp shadermanager.cpp
spline.cpp spline.cpp
stopwatch.cpp stopwatch.cpp
......
...@@ -24,7 +24,7 @@ ...@@ -24,7 +24,7 @@
#include "runnable.h" #include "runnable.h"
namespace campvis { namespace tgt {
namespace { namespace {
void invokeThread(Runnable* r) { void invokeThread(Runnable* r) {
r->run(); r->run();
...@@ -43,7 +43,7 @@ namespace campvis { ...@@ -43,7 +43,7 @@ namespace campvis {
if (_running) if (_running)
stop(); stop();
delete _thread; delete _thread;
} }
void Runnable::stop() { void Runnable::stop() {
......
...@@ -29,7 +29,6 @@ ...@@ -29,7 +29,6 @@
#include <ext/threading.h> #include <ext/threading.h>
#include <tbb/atomic.h> #include <tbb/atomic.h>
#include "core/coreapi.h" #include "core/coreapi.h"
namespace campvis { namespace campvis {
......
...@@ -22,7 +22,7 @@ ADD_LIBRARY(campvis-modules ${CampvisSharedStaticModulesFix} ...@@ -22,7 +22,7 @@ ADD_LIBRARY(campvis-modules ${CampvisSharedStaticModulesFix}
ADD_DEFINITIONS(${CampvisGlobalDefinitions} ${CampvisModulesDefinitions}) ADD_DEFINITIONS(${CampvisGlobalDefinitions} ${CampvisModulesDefinitions})
INCLUDE_DIRECTORIES(${CampvisGlobalIncludeDirs} ${CampvisModulesIncludeDirs}) INCLUDE_DIRECTORIES(${CampvisGlobalIncludeDirs} ${CampvisModulesIncludeDirs})
TARGET_LINK_LIBRARIES(campvis-modules campvis-core tgt ${CampvisGlobalExternalLibs} ${CampvisModulesExternalLibs}) TARGET_LINK_LIBRARIES(campvis-modules campvis-core tgt sigslot ${CampvisGlobalExternalLibs} ${CampvisModulesExternalLibs})
IF(CAMPVIS_GROUP_SOURCE_FILES) IF(CAMPVIS_GROUP_SOURCE_FILES)
DEFINE_SOURCE_GROUPS_FROM_SUBDIR(CampvisModulesSources ${CampvisHome} "") DEFINE_SOURCE_GROUPS_FROM_SUBDIR(CampvisModulesSources ${CampvisHome} "")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment