10.12., 9:00 - 11:00: Due to updates GitLab may be unavailable for some minutes between 09:00 and 11:00.

Commit 1874d6eb authored by Phillip Samfass's avatar Phillip Samfass

some refactoring + made more robust for dynamic AMR

parent 9ce3e90a
......@@ -31,6 +31,7 @@
#include "exahype/offloading/AggressiveDistributor.h"
#include "exahype/offloading/AggressiveCCPDistributor.h"
#include "exahype/offloading/AggressiveHybridDistributor.h"
#include "exahype/offloading/OffloadingAnalyser.h"
#endif
#if defined(TMPI_Heartbeats)
......@@ -255,6 +256,11 @@ void exahype::mappings::FinaliseMeshRefinement::endIteration(
}
}
#if defined(OffloadingStrategyAggressiveHybrid)
exahype::offloading::AggressiveHybridDistributor::getInstance().resetTasksToOffload();
exahype::offloading::OffloadingAnalyser::getInstance().resetMeasurements();
exahype::offloading::AggressiveHybridDistributor::getInstance().enable();
#endif
#endif
#if defined(TMPI_Heartbeats)
......
......@@ -36,6 +36,10 @@
#include "exahype/mappings/RefinementStatusSpreading.h"
#if defined(DistributedOffloading)
#include "exahype/offloading/AggressiveHybridDistributor.h"
#endif
#include <sstream>
bool exahype::mappings::MeshRefinement::DynamicLoadBalancing = false;
......@@ -147,6 +151,13 @@ void exahype::mappings::MeshRefinement::beginIteration( exahype::State& solverSt
if (! MetadataHeap::getInstance().validateThatIncomingJoinBuffersAreEmpty() ) {
exit(-1);
}
#if defined(DistributedOffloading)
#if defined(OffloadingStrategyAggressiveHybrid)
exahype::offloading::AggressiveHybridDistributor::getInstance().disable();
#endif
#endif
#endif
}
......
......@@ -316,6 +316,9 @@ void exahype::offloading::AggressiveHybridDistributor::configure(
}
void exahype::offloading::AggressiveHybridDistributor::printOffloadingStatistics() {
if(!_isEnabled) return;
int nnodes = tarch::parallel::Node::getInstance().getNumberOfNodes();
int myRank = tarch::parallel::Node::getInstance().getRank();
......@@ -346,6 +349,16 @@ void exahype::offloading::AggressiveHybridDistributor::resetRemainingTasksToOffl
}
}
void exahype::offloading::AggressiveHybridDistributor::resetTasksToOffload() {
int nnodes = tarch::parallel::Node::getInstance().getNumberOfNodes();
int myRank = tarch::parallel::Node::getInstance().getRank();
for(int i=0; i<nnodes; i++) {
_tasksToOffload[i] = 0;
}
}
void exahype::offloading::AggressiveHybridDistributor::handleEmergencyOnRank(int rank) {
_emergencies[rank]++;
logDebug("handleEmergencyOnRank()","emergencies for rank:"<<_emergencies[rank]);
......
......@@ -210,6 +210,8 @@ class exahype::offloading::AggressiveHybridDistributor {
*/
void resetRemainingTasksToOffload();
void resetTasksToOffload();
/**
* Print some useful statistics for offloading.
*/
......
......@@ -307,6 +307,13 @@ void exahype::offloading::OffloadingAnalyser::endToReceiveDataFromWorker( int fr
}
}
void exahype::offloading::OffloadingAnalyser::resetMeasurements() {
int nnodes = tarch::parallel::Node::getInstance().getNumberOfNodes();
for(int i=0; i<nnodes; i++)
//_waitForOtherRank.push_back(tarch::timing::GlidingAverageMeasurement(0.1,16));
_waitForOtherRank[i].erase();
}
void exahype::offloading::OffloadingAnalyser::beginToReceiveDataFromMaster(int master) {
//if (_isSwitchedOn && !_waitForMasterDataWatch.isOn()) {
......
......@@ -87,6 +87,8 @@ class exahype::offloading::OffloadingAnalyser : public peano::performanceanalysi
virtual void beginIteration();
virtual void endIteration(double numberOfInnerLeafCells, double numberOfOuterLeafCells, double numberOfInnerCells, double numberOfOuterCells, double numberOfLocalCells, double numberOfLocalVertices);
void resetMeasurements();
virtual void enterCentralElementOfEnclosingSpacetree();
virtual void leaveCentralElementOfEnclosingSpacetree();
......
......@@ -3915,15 +3915,13 @@ tarch::multicore::jobs::Job* exahype::solvers::ADERDGSolver::grabRecomputeJobFor
job = a_cellDescToJob->second;
_mapCellDescToRecompJob.erase(a_cellDescToJob);
}
//assert(found);
return job;
}
void exahype::solvers::ADERDGSolver::addRecomputeJobForCellDescription(tarch::multicore::jobs::Job* job, const CellDescription* cellDescription) {
tbb::concurrent_hash_map<const CellDescription*, tarch::multicore::jobs::Job* >::accessor a_cellDescToJob;
bool found = _mapCellDescToRecompJob.find(a_cellDescToJob, static_cast<const CellDescription*>(cellDescription));
assert(!found);
assertion(!found);
_mapCellDescToRecompJob.insert(std::make_pair(cellDescription, job));
}
......@@ -4004,45 +4002,9 @@ exahype::solvers::ADERDGSolver::MigratablePredictionJob* exahype::solvers::ADERD
tag);
}
#ifdef UseMPIOffloading
void exahype::solvers::ADERDGSolver::sendMigratablePredictionJobOffload(
double *luh,
double *lduh,
double *lQhbnd,
double *lFhbnd,
int dest,
int tag,
MPI_Comm comm,
double *metadata) {
int i = 0;
int ierr;
//MPI_Comm comm = exahype::offloading::OffloadingManager::getInstance().getMPICommunicator();
if(metadata != nullptr) {
ierr = MPI_Send_offload(metadata, 2*DIMENSIONS+3, MPI_DOUBLE, dest, tag, comm);
assertion(ierr==MPI_SUCCESS);
}
assertion(luh!=NULL);
ierr = MPI_Send_offload(luh, getDataPerCell(), MPI_DOUBLE, dest, tag, comm);
assertion(ierr==MPI_SUCCESS);
assertion(lduh!=NULL);
ierr = MPI_Send_offload(lduh, getUpdateSize(), MPI_DOUBLE, dest, tag, comm);
assertion(ierr==MPI_SUCCESS);
assertion(lQhbnd!=NULL);
ierr = MPI_Send_offload(lQhbnd, getBndTotalSize(), MPI_DOUBLE, dest, tag, comm);
assertion(ierr==MPI_SUCCESS);
assertion(lFhbnd!=NULL);
ierr = MPI_Send_offload(lFhbnd, getBndFluxTotalSize(), MPI_DOUBLE, dest, tag, comm);
assertion(ierr==MPI_SUCCESS);
};
#endif
///////////////////////////////////
// COMMUNICATION_ROUTINES
///////////////////////////////////
void exahype::solvers::ADERDGSolver::isendMigratablePredictionJob(
double *luh,
......@@ -4202,6 +4164,43 @@ void exahype::solvers::ADERDGSolver::recvMigratablePredictionJobOffload(
ierr = MPI_Recv_offload(lFhbnd, getBndFluxTotalSize(), MPI_DOUBLE, srcRank, tag, comm, &stat);
assertion(ierr==MPI_SUCCESS);
};
void exahype::solvers::ADERDGSolver::sendMigratablePredictionJobOffload(
double *luh,
double *lduh,
double *lQhbnd,
double *lFhbnd,
int dest,
int tag,
MPI_Comm comm,
double *metadata) {
int i = 0;
int ierr;
//MPI_Comm comm = exahype::offloading::OffloadingManager::getInstance().getMPICommunicator();
if(metadata != nullptr) {
ierr = MPI_Send_offload(metadata, 2*DIMENSIONS+3, MPI_DOUBLE, dest, tag, comm);
assertion(ierr==MPI_SUCCESS);
}
assertion(luh!=NULL);
ierr = MPI_Send_offload(luh, getDataPerCell(), MPI_DOUBLE, dest, tag, comm);
assertion(ierr==MPI_SUCCESS);
assertion(lduh!=NULL);
ierr = MPI_Send_offload(lduh, getUpdateSize(), MPI_DOUBLE, dest, tag, comm);
assertion(ierr==MPI_SUCCESS);
assertion(lQhbnd!=NULL);
ierr = MPI_Send_offload(lQhbnd, getBndTotalSize(), MPI_DOUBLE, dest, tag, comm);
assertion(ierr==MPI_SUCCESS);
assertion(lFhbnd!=NULL);
ierr = MPI_Send_offload(lFhbnd, getBndFluxTotalSize(), MPI_DOUBLE, dest, tag, comm);
assertion(ierr==MPI_SUCCESS);
};
#endif
......@@ -4222,7 +4221,6 @@ exahype::solvers::ADERDGSolver::CompressionJob::CompressionJob(
}
}
bool exahype::solvers::ADERDGSolver::CompressionJob::run(bool runOnMasterThread) {
_solver.determineUnknownAverages(_cellDescription);
_solver.computeHierarchicalTransform(_cellDescription,-1.0);
......@@ -4238,7 +4236,6 @@ bool exahype::solvers::ADERDGSolver::CompressionJob::run(bool runOnMasterThread)
return false;
}
void exahype::solvers::ADERDGSolver::compress( CellDescription& cellDescription, const bool isSkeletonCell ) const {
assertion1( cellDescription.getCompressionState() == CellDescription::Uncompressed, cellDescription.toString() );
if (CompressionAccuracy>0.0) {
......@@ -4263,7 +4260,6 @@ void exahype::solvers::ADERDGSolver::compress( CellDescription& cellDescription,
}
}
void exahype::solvers::ADERDGSolver::uncompress(CellDescription& cellDescription) const {
#ifdef SharedMemoryParallelisation
bool madeDecision = CompressionAccuracy<=0.0;
......@@ -4295,7 +4291,6 @@ void exahype::solvers::ADERDGSolver::uncompress(CellDescription& cellDescription
}
}
void exahype::solvers::ADERDGSolver::determineUnknownAverages(
CellDescription& cellDescription) const {
assertion1( DataHeap::getInstance().isValidIndex(cellDescription.getSolutionIndex()), cellDescription.toString() );
......@@ -4373,7 +4368,6 @@ void exahype::solvers::ADERDGSolver::determineUnknownAverages(
}
}
void exahype::solvers::ADERDGSolver::computeHierarchicalTransform(
CellDescription& cellDescription, double sign) const {
const int dataPerNode = getNumberOfParameters()+getNumberOfVariables();
......@@ -4698,7 +4692,6 @@ void exahype::solvers::ADERDGSolver::putUnknownsIntoByteStream(
);
}
void exahype::solvers::ADERDGSolver::pullUnknownsFromByteStream(
CellDescription& cellDescription) const {
assertion(CompressionAccuracy>0.0);
......
......@@ -1281,8 +1281,8 @@ private:
*/
MigratablePredictionJob* createFromData(
MigratablePredictionJobData *data,
const int origin,
const int tag);
const int origin,
const int tag);
/*
* Sends away data of a MigratablePredictionJob to a destination rank
......@@ -1319,10 +1319,10 @@ private:
double *lduh,
double *lQhbnd,
double *lFhbnd,
int srcRank,
int srcRank,
int tag,
MPI_Comm comm,
MPI_Request *requests,
MPI_Request *requests,
double *metadata =nullptr);
/*
......@@ -3329,16 +3329,15 @@ public:
if ( !cellDescription.getHasCompletedLastStep() ) {
peano::datatraversal::TaskSet::startToProcessBackgroundJobs();
#if !defined(OffloadingUseProgressThread)
if ( responsibleRank!=myRank) {
if ( responsibleRank != myRank) {
logInfo("waitUntil", "cell missing from responsible rank: "<<responsibleRank);
tryToReceiveTaskBack(this) ;
//const_cast<CellDescription*>(&cellDescription)->setHasCompletedLastStep(true);
}
#endif
}
while ( !cellDescription.getHasCompletedLastStep() ) {
#if !defined(OffloadingUseProgressThread)
if ( responsibleRank!=myRank) {
if ( responsibleRank != myRank ) {
tryToReceiveTaskBack(this);
//solver->spawnReceiveBackJob();
}
......@@ -3368,100 +3367,53 @@ public:
tarch::la::Vector<DIMENSIONS, double> center;
center = cellDescription.getOffset()+0.5*cellDescription.getSize();
logInfo("waitUntil()", " looking for recompute job center[0] = "<< center[0]
<<" center[1] = "<< center[1]
<<" center[2] = "<< center[2]);
// if(NumberOfEnclaveJobs==NumberOfRemoteJobs && NumberOfRemoteJobs>0)
// assert(responsibleRank!=myRank || NumberOfSkeletonJobs>0);
//logInfo("waitUntil()", " looking for recompute job center[0] = "<< center[0]
// <<" center[1] = "<< center[1]
// <<" center[2] = "<< center[2]);
if( responsibleRank!=myRank
&& (exahype::solvers::ADERDGSolver::NumberOfEnclaveJobs
== exahype::solvers::ADERDGSolver::NumberOfRemoteJobs)) {
#ifndef OffloadingDeactivateRecompute
tarch::multicore::Lock lock(exahype::solvers::ADERDGSolver::EmergencySemaphore);
if(!hasRecomputed) {
tarch::multicore::jobs::Job* recompJob = grabRecomputeJobForCellDescription((&cellDescription));
if(recompJob!=nullptr) {// got one
recompJob->run(true);
hasRecomputed = true;
exahype::offloading::JobTableStatistics::getInstance().notifyRecomputedTask();
}
}
if(!hasTriggeredEmergency) {
tarch::multicore::Lock lock(exahype::solvers::ADERDGSolver::EmergencySemaphore);
if(!hasRecomputed) {
tarch::multicore::jobs::Job* recompJob = grabRecomputeJobForCellDescription((&cellDescription));
if(recompJob!=nullptr) {// got one
recompJob->run(true);
hasRecomputed = true;
exahype::offloading::JobTableStatistics::getInstance().notifyRecomputedTask();
}
}
if(!hasTriggeredEmergency) {
//tarch::multicore::jobs::Job* recompJob = grabRecomputeJobForCellDescription((&cellDescription));//test
if(!exahype::solvers::ADERDGSolver::VetoEmergency && hasRecomputed) {
hasTriggeredEmergency = true;
logInfo("waitUntilCompletedTimeStep()","EMERGENCY: missing from rank "<<responsibleRank);
exahype::solvers::ADERDGSolver::VetoEmergency = true;
exahype::solvers::ADERDGSolver::LastEmergencyCell = &cellDescription;
exahype::offloading::OffloadingManager::getInstance().triggerEmergencyForRank(responsibleRank);
}
}
if(!exahype::solvers::ADERDGSolver::VetoEmergency && hasRecomputed) {
hasTriggeredEmergency = true;
logInfo("waitUntilCompletedTimeStep()","EMERGENCY: missing from rank "<<responsibleRank);
exahype::solvers::ADERDGSolver::VetoEmergency = true;
exahype::solvers::ADERDGSolver::LastEmergencyCell = &cellDescription;
exahype::offloading::OffloadingManager::getInstance().triggerEmergencyForRank(responsibleRank);
}
}
lock.free();
#else
if(!hasTriggeredEmergency) {
hasTriggeredEmergency = true;
logInfo("waitUntilCompletedTimeStep()","EMERGENCY: missing from rank "<<responsibleRank);
exahype::offloading::OffloadingManager::getInstance().triggerEmergencyForRank(responsibleRank);
}
if(!hasTriggeredEmergency) {
hasTriggeredEmergency = true;
logInfo("waitUntilCompletedTimeStep()","EMERGENCY: missing from rank "<<responsibleRank);
exahype::offloading::OffloadingManager::getInstance().triggerEmergencyForRank(responsibleRank);
}
#endif
}
#endif
/*#if defined(OffloadingLocalRecompute)
if ( responsibleRank!=myRank ) {
tarch::la::Vector<DIMENSIONS, double> center;
center = cellDescription.getOffset()+0.5*cellDescription.getSize();
logInfo("waitUntil()", " looking for recompute job center[0] = "<< center[0]
<<" center[1] = "<< center[1]
<<" center[2] = "<< center[2]);
if( (exahype::solvers::ADERDGSolver::NumberOfEnclaveJobs
== exahype::solvers::ADERDGSolver::NumberOfRemoteJobs) && !hasTriggeredEmergency) {
hasTriggeredEmergency = true;
logInfo("waitUntilCompletedTimeStep()","EMERGENCY: missing from rank "<<responsibleRank);
exahype::offloading::OffloadingManager::getInstance().triggerEmergencyForRank(responsibleRank);
}
#ifndef OffloadingDeactivateRecompute
tarch::multicore::jobs::Job * recompJob = grabRecomputeJobForCellDescription((const void*) &cellDescription);
if(recompJob!=nullptr) {// got one
recompJob->run(true);
}
continue;
#endif
}
#endif*/
// tarch::multicore::jobs::processBackgroundJobs( 1, -1, true );
// break;
// case JobSystemWaitBehaviourType::ProcessAnyJobs:
// tarch::multicore::jobs::processBackgroundJobs( 1, -1, true );
// break;
// default:
// break;
// }
//if((MPI_Wtime()-startTime)>0.0001) {// && responsibleRank!=myRank) {
// startTime = MPI_Wtime();
// logInfo("waitUntilCompletedTimeStep()","warning: rank waiting too long for missing task from rank "<<responsibleRank<< " outstanding jobs:"<<NumberOfRemoteJobs<< " outstanding enclave "<<NumberOfEnclaveJobs<< " outstanding skeleton "<<NumberOfSkeletonJobs<< " celldescription "<<cellDescription.toString() << " waiting jobs "<<tarch::multicore::jobs::getNumberOfWaitingBackgroundJobs());
//}
#if !defined(OffloadingLocalRecompute)
#if !defined(OffloadingUseProgressThread)
if( !cellDescription.getHasCompletedLastStep()
//&& tarch::multicore::jobs::getNumberOfWaitingBackgroundJobs()==1
&& !hasTriggeredEmergency
&& !progress
&& myRank!=responsibleRank
&& ( exahype::solvers::ADERDGSolver::NumberOfEnclaveJobs
-exahype::solvers::ADERDGSolver::NumberOfRemoteJobs)==0
)
//&& exahype::solvers::ADERDGSolver::NumberOfReceiveBackJobs==0)
//&& !exahype::offloading::OffloadingManager::getInstance().getRunningAndReceivingBack())
#else
if( !cellDescription.getHasCompletedLastStep()
&& !hasTriggeredEmergency
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment