11.08., 9:00 - 11:00: Due to updates GitLab will be unavailable for some minutes between 09:00 and 11:00.

Commit a63c10c9 authored by Phillip Samfass's avatar Phillip Samfass

Merge branch 'philipp/replication_fix' into philipp/replication

parents 81bc1fdb 226dba38
......@@ -25,3 +25,6 @@
[submodule "Submodules/six"]
path = Submodules/six
url = https://github.com/benjaminp/six.git
[submodule "six"]
path = six
url = git@github.com:benjaminp/six.git
......@@ -297,6 +297,7 @@ void exahype::offloading::OffloadingManager::createRequestArray(
}
}
#if defined(DirtyCleanUp)
void exahype::offloading::OffloadingManager::cancelOutstandingRequests() {
std::vector<RequestType> types = {RequestType::send,
RequestType::sendBack,
......@@ -334,6 +335,7 @@ void exahype::offloading::OffloadingManager::cancelOutstandingRequests() {
}
lock.free();
}
#endif
bool exahype::offloading::OffloadingManager::hasOutstandingRequestOfType(RequestType requestType) {
logDebug("hasOutstandingRequestOfType",
......
......@@ -314,7 +314,9 @@ class exahype::offloading::OffloadingManager {
bool progressReceiveBackRequests();
bool hasOutstandingRequestOfType(RequestType requestType);
#if defined (DirtyCleanUp)
void cancelOutstandingRequests();
#endif
#if defined (TaskSharing)
void setTMPIInterTeamCommunicators(MPI_Comm comm, MPI_Comm commKey, MPI_Comm commAck);
......
......@@ -17,9 +17,9 @@
#include "tarch/services/ServiceFactory.h"
#include "tarch/multicore/Jobs.h"
#ifndef TaskSharing
//#ifndef TaskSharing
registerService(exahype::offloading::OffloadingProgressService);
#endif
//#endif
tarch::logging::Log exahype::offloading::OffloadingProgressService::_log("exahype::offloading::OffloadingProgressService");
......@@ -36,7 +36,7 @@ void exahype::offloading::OffloadingProgressService::receiveDanglingMessages() {
if(_isSet && _isEnabled) {
exahype::solvers::ADERDGSolver::setMaxNumberOfIprobesInProgressOffloading(1);
// ToDo (Philipp): pass number of iterations through progress engine directly
exahype::solvers::ADERDGSolver::progressOffloading(_solver, true);
exahype::solvers::ADERDGSolver::progressOffloading(_solver, true, 1);
exahype::solvers::ADERDGSolver::setMaxNumberOfIprobesInProgressOffloading(std::numeric_limits<int>::max());
}
}
......
......@@ -2478,14 +2478,14 @@ void exahype::solvers::ADERDGSolver::finishOutstandingInterTeamCommunication ()
while(exahype::offloading::OffloadingManager::getInstance().hasOutstandingRequestOfType(exahype::offloading::RequestType::sendReplica)
|| exahype::offloading::OffloadingManager::getInstance().hasOutstandingRequestOfType(exahype::offloading::RequestType::receiveReplica) ) {
progressOffloading(this, false);
progressOffloading(this, false, std::numeric_limits<int>::max());
}
MPI_Request request;
MPI_Ibarrier(interTeamComm, &request);
int finished = 0;
while(!finished) {
progressOffloading(this, false);
progressOffloading(this, false, std::numeric_limits<int>::max());
MPI_Test(&request, &finished, MPI_STATUS_IGNORE);
}
}
......@@ -2502,13 +2502,13 @@ void exahype::solvers::ADERDGSolver::cleanUpStaleReplicatedSTPs(bool isFinal) {
<<" allocated jobs send "<<AllocatedSTPsSend
<<" allocated jobs receive "<<AllocatedSTPsReceive
<<" estimated additional mem consumption "<<(double) getAdditionalCurrentMemoryUsageReplication()/1E9<<"GB"
<<" actual mem usage "<<peano::utils::UserInterface::getMemoryUsageMB()
<<" actual mem usage "<<peano::utils::UserInterface::getMemoryUsageMB()
<<" memory per stp "<< sizeof(StealablePredictionJobData) + sizeof(double) * ( getDataPerCell() + getUpdateSize() + getBndTotalSize() + getBndFluxTotalSize() )
<<" allocated stps (constructor) "<<AllocatedSTPs
<<" entrys in hash map "<<_jobDatabase.size()
<<" sent STPs "<<SentSTPs
<<" completed sends "<<CompletedSentSTPs
<<" outstanding requests "<<exahype::offloading::OffloadingManager::getInstance().getNumberOfOutstandingRequests(exahype::offloading::RequestType::sendReplica)
<<" entrys in hash map "<<_jobDatabase.size()
<<" sent STPs "<<SentSTPs
<<" completed sends "<<CompletedSentSTPs
<<" outstanding requests "<<exahype::offloading::OffloadingManager::getInstance().getNumberOfOutstandingRequests(exahype::offloading::RequestType::sendReplica)
+exahype::offloading::OffloadingManager::getInstance().getNumberOfOutstandingRequests(exahype::offloading::RequestType::receiveReplica)
);
......@@ -3034,7 +3034,7 @@ void exahype::solvers::ADERDGSolver::receiveTaskOutcome(int tag, int src, exahyp
}
#endif
void exahype::solvers::ADERDGSolver::pollForOutstandingCommunicationRequests(exahype::solvers::ADERDGSolver *solver, bool calledOnMaster) {
void exahype::solvers::ADERDGSolver::pollForOutstandingCommunicationRequests(exahype::solvers::ADERDGSolver *solver, bool calledOnMaster, int maxIts) {
MPI_Status stat, statMapped;
int receivedTask = 0;
int receivedTaskBack = 0;
......@@ -3084,7 +3084,8 @@ void exahype::solvers::ADERDGSolver::pollForOutstandingCommunicationRequests(exa
#if defined (TaskSharing)
while(
(receivedTask || receivedTaskBack || receivedReplicaTask || receivedReplicaAck || receivedReplicaKey)
&& (iprobesCounter<MaxIprobesInOffloadingProgress || receivedReplicaKey || receivedReplicaAck || receivedReplicaTask) && !terminateImmediately )
&& (iprobesCounter<MaxIprobesInOffloadingProgress || receivedReplicaKey || receivedReplicaAck || receivedReplicaTask) && !terminateImmediately
&& iprobesCounter<maxIts)
{
#else
while( (receivedTask || receivedTaskBack) && iprobesCounter<MaxIprobesInOffloadingProgress && !terminateImmediately ) {
......@@ -3239,21 +3240,21 @@ void exahype::solvers::ADERDGSolver::pollForOutstandingCommunicationRequests(exa
assertion( ierr==MPI_SUCCESS );
#ifndef OffloadingUseProgressThread
tarch::multicore::RecursiveLock lock( tarch::services::Service::receiveDanglingMessagesSemaphore, false );
if(lock.tryLock()) {
tarch::parallel::Node::getInstance().receiveDanglingMessages();
lock.free();
}
// tarch::multicore::RecursiveLock lock( tarch::services::Service::receiveDanglingMessagesSemaphore, false );
// if(lock.tryLock()) {
// tarch::parallel::Node::getInstance().receiveDanglingMessages();
// lock.free();
// }
#endif
#endif
exahype::offloading::OffloadingManager::getInstance().progressRequests();
if(calledOnMaster) break;
// if(calledOnMaster) break;
#endif
}
time+= MPI_Wtime();
}
void exahype::solvers::ADERDGSolver::progressOffloading(exahype::solvers::ADERDGSolver* solver, bool runOnMaster) {
void exahype::solvers::ADERDGSolver::progressOffloading(exahype::solvers::ADERDGSolver* solver, bool runOnMaster, int maxIts) {
bool canRun;
tarch::multicore::Lock lock(OffloadingSemaphore, false);
......@@ -3263,6 +3264,7 @@ void exahype::solvers::ADERDGSolver::progressOffloading(exahype::solvers::ADERDG
else
canRun = lock.tryLock();
#else
//assert(!runOnMaster);
// First, we ensure here that only one thread at a time progresses offloading
// this avoids multithreaded MPI problems
canRun = lock.tryLock();
......@@ -3275,6 +3277,12 @@ void exahype::solvers::ADERDGSolver::progressOffloading(exahype::solvers::ADERDG
//VT_begin(event_progress);
#endif
//if(tarch::multicore::Core::getInstance().getThreadNum()==0) {
//std::cout<<"Error!!!!!!"<<std::endl;
// lock.free();
// return;
// }
// 2. make progress on any outstanding MPI communication
//if(!runOnMaster)
exahype::offloading::OffloadingManager::getInstance().progressRequests();
......@@ -3283,8 +3291,8 @@ void exahype::solvers::ADERDGSolver::progressOffloading(exahype::solvers::ADERDG
exahype::offloading::PerformanceMonitor::getInstance().run();
// 4. detect whether local rank should anything
pollForOutstandingCommunicationRequests(solver, runOnMaster);
//if(!runOnMaster)
pollForOutstandingCommunicationRequests(solver, runOnMaster, maxIts);
lock.free();
#ifdef USE_ITAC
......@@ -3601,7 +3609,7 @@ bool exahype::solvers::ADERDGSolver::OffloadingManagerJob::run( bool isCalledOnM
// logInfo("run()", "WARNING: memory usage is quite high!");
//}
exahype::solvers::ADERDGSolver::progressOffloading(&_solver, false);
exahype::solvers::ADERDGSolver::progressOffloading(&_solver, false, std::numeric_limits<int>::max());
if(_solver._offloadingManagerJobTriggerTerminate) {
_state = State::Terminate;
......
......@@ -2813,7 +2813,7 @@ public:
/*
* Makes progress on all offloading-related MPI communication.
*/
static void progressOffloading(exahype::solvers::ADERDGSolver* solver, bool isCalledOnMaster);
static void progressOffloading(exahype::solvers::ADERDGSolver* solver, bool isCalledOnMaster, int maxIts);
static void receiveTaskOutcome(int tag, int src, exahype::solvers::ADERDGSolver *solver);
......@@ -2821,7 +2821,7 @@ public:
static void receiveBackMigratableJob(int tag, int src, exahype::solvers::ADERDGSolver *solver);
static void pollForOutstandingCommunicationRequests(exahype::solvers::ADERDGSolver *solver, bool calledOnMaster);
static void pollForOutstandingCommunicationRequests(exahype::solvers::ADERDGSolver *solver, bool calledOnMaster, int maxIts);
static void setMaxNumberOfIprobesInProgressOffloading(int maxNumIprobes);
......
......@@ -200,8 +200,8 @@ bool exahype::solvers::ADERDGSolver::MigratablePredictionJob::handleLocalExecuti
delete data;
}
else {
bool sendTaskOutcome = AllocatedSTPsSend<=exahype::offloading::PerformanceMonitor::getInstance().getTasksPerTimestep()
&& exahype::offloading::MemoryMonitor::getInstance().getFreeMemMB()>10000;
bool sendTaskOutcome = AllocatedSTPsSend<=exahype::offloading::PerformanceMonitor::getInstance().getTasksPerTimestep();
// && exahype::offloading::MemoryMonitor::getInstance().getFreeMemMB()>10000;
#if defined(TaskSharingUseHandshake)
if(sendTaskOutcome) {
_solver.sendKeyOfReplicatedSTPToOtherTeams(this);
......@@ -215,8 +215,8 @@ bool exahype::solvers::ADERDGSolver::MigratablePredictionJob::handleLocalExecuti
#endif
}
#ifndef OffloadingUseProgressThread
if(!isRunOnMaster)
exahype::solvers::ADERDGSolver::progressOffloading(&_solver, isRunOnMaster);
// if(!isRunOnMaster)
// exahype::solvers::ADERDGSolver::progressOffloading(&_solver, isRunOnMaster);
#endif
}
#endif
......@@ -244,6 +244,10 @@ bool exahype::solvers::ADERDGSolver::MigratablePredictionJob::handleExecution(bo
result = handleLocalExecution(isRunOnMaster);
NumberOfEnclaveJobs--;
assertion( NumberOfEnclaveJobs>=0 );
#ifndef OffloadingUseProgressThread
if(!isRunOnMaster)
exahype::solvers::ADERDGSolver::progressOffloading(&_solver, isRunOnMaster, std::numeric_limits<int>::max());
#endif
}
//remote task, need to execute and send it back
else {
......@@ -268,7 +272,7 @@ bool exahype::solvers::ADERDGSolver::MigratablePredictionJob::handleExecution(bo
MPI_Request sendBackRequests[4];
//logInfo("handleLocalExecution()", "postSendBack");
_solver.isendStealablePredictionJob(_luh,
_lduh,
_lduh,
_lQhbnd,
_lFhbnd,
_originRank,
......@@ -277,12 +281,12 @@ bool exahype::solvers::ADERDGSolver::MigratablePredictionJob::handleExecution(bo
sendBackRequests);
exahype::offloading::OffloadingManager::getInstance().submitRequests(
sendBackRequests,
4,
_tag,
_originRank,
sendBackHandler,
exahype::offloading::RequestType::sendBack,
&_solver);
4,
_tag,
_originRank,
sendBackHandler,
exahype::offloading::RequestType::sendBack,
&_solver);
}
return result;
}
......@@ -349,9 +353,9 @@ void exahype::solvers::ADERDGSolver::MigratablePredictionJob::receiveHandlerRepl
key.timestamp = data->_metadata[2*DIMENSIONS];
key.element = data->_metadata[2*DIMENSIONS+2];
bool criticalMemoryConsumption = exahype::offloading::MemoryMonitor::getInstance().getFreeMemMB()<1000;
//bool criticalMemoryConsumption = exahype::offloading::MemoryMonitor::getInstance().getFreeMemMB()<1000;
if(key.timestamp<static_cast<exahype::solvers::ADERDGSolver*> (solver)->getMinTimeStamp() || criticalMemoryConsumption) {
if(key.timestamp<static_cast<exahype::solvers::ADERDGSolver*> (solver)->getMinTimeStamp()) {// || criticalMemoryConsumption) {
exahype::offloading::ReplicationStatistics::getInstance().notifyLateTask();
delete data;
AllocatedSTPsReceive--;
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment