Commit 68c9c879 authored by Phillip Samfass's avatar Phillip Samfass

cleaned up

parent e831b597
......@@ -41,6 +41,7 @@ void NoiseGenerationStrategyChaseVictim::generateNoise(int rank, std::chrono::sy
pid_t pid = getpid();
static int cnt = 0;
#if defined(DistributedOffloading)
static bool triggeredVictim = false;
static const int kRecover = 10;
......@@ -66,12 +67,15 @@ void NoiseGenerationStrategyChaseVictim::generateNoise(int rank, std::chrono::sy
}
cnt++;
#else
//todo
#endif
}
void NoiseGenerationStrategyChaseVictim::generateNoiseSTP(int rank, std::chrono::system_clock::time_point timestamp) {
int nranks = tarch::parallel::Node::getInstance().getNumberOfNodes();
/* int nranks = tarch::parallel::Node::getInstance().getNumberOfNodes();
#if defined(DistributedOffloading)
if( OffloadingManager::getInstance().isVictim()) {
double timePerSTP = exahype::offloading::OffloadingAnalyser::getInstance().getTimePerSTP();
double timeToWait = timePerSTP*_factor*1e06;
......@@ -83,7 +87,9 @@ void NoiseGenerationStrategyChaseVictim::generateNoiseSTP(int rank, std::chrono:
logInfo("generateNoiseSTP()", "slept "<<std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now()-timestamp).count()<<std::endl);
}
#else
//todo
#endif */
}
} /* namespace offloading */
......
......@@ -925,7 +925,7 @@ void exahype::runners::Runner::initHPCEnvironment() {
//
#ifdef USE_ITAC
int ierr=0;
ierr=VT_funcdef("FusedTimeStep::noiseHandle" , VT_NOCLASS, &exahype::mappings::FusedTimeStep::noiseHandle ); assertion(ierr==0);
ierr=VT_funcdef("FusedTimeStep::noiseHandle" , VT_NOCLASS, &exahype::mappings::FusedTimeStep::noiseHandle ); assertion(ierr==0);
ierr=VT_funcdef("Empty::iterationHandle" , VT_NOCLASS, &exahype::mappings::Empty::iterationHandle ); assertion(ierr==0);
ierr=VT_funcdef("Solver::waitUntilCompletedLastStepHandle" , VT_NOCLASS, &exahype::solvers::Solver::waitUntilCompletedLastStepHandle ); assertion(ierr==0);
ierr=VT_funcdef("Solver::ensureAllJobsHaveTerminatedHandle" , VT_NOCLASS, &exahype::solvers::Solver::ensureAllJobsHaveTerminatedHandle ); assertion(ierr==0);
......@@ -1028,17 +1028,6 @@ int exahype::runners::Runner::run() {
if ( _parser.isValid() )
initSharedMemoryConfiguration();
/* #if defined(DistributedOffloading)
for (auto* solver : exahype::solvers::RegisteredSolvers) {
if (solver->getType()==exahype::solvers::Solver::Type::ADERDG) {
static_cast<exahype::solvers::ADERDGSolver*>(solver)->startOffloadingManager();
}
if (solver->getType()==exahype::solvers::Solver::Type::LimitingADERDG) {
static_cast<exahype::solvers::LimitingADERDGSolver*>(solver)->startOffloadingManager();
}
}
#endif
*/
if ( _parser.isValid() )
initDataCompression();
if ( _parser.isValid() )
......@@ -1097,10 +1086,6 @@ int exahype::runners::Runner::run() {
exahype::offloading::JobTableStatistics::getInstance().printStatistics();
#endif
// exahype::offloading::OffloadingProfiler::getInstance().endPhase();
// logInfo("shutdownDistributedMemoryConfiguration()","ended profiling phase");
// exahype::offloading::OffloadingProfiler::getInstance().printStatistics();
// logInfo("shutdownDistributedMemoryConfiguration()","printed stats");
#endif
#if defined(MemoryMonitoring) && defined(MemoryMonitoringTrack)
......@@ -1321,14 +1306,6 @@ int exahype::runners::Runner::runAsMaster(exahype::repositories::Repository& rep
timeStep < simulationTimeSteps
) {
#ifdef USE_ITAC
if(timeStep>18 && timeStep<21) {
VT_traceon(); // turn ITAC tracing off during mesh refinement; is switched on again in mapping Prediction
}
else
VT_traceoff();
#endif
bool plot = exahype::plotters::checkWhetherPlotterBecomesActive(
solvers::Solver::getMinTimeStampOfAllSolvers()); // has no side effects
......
......@@ -995,25 +995,25 @@ private:
Running,
Resume,
Paused,
Terminate,
Terminated
Terminate,
Terminated
};
OffloadingManagerJob(ADERDGSolver& solver);
~OffloadingManagerJob();
bool run( bool isCalledOnMaster );
#ifdef OffloadingUseProgressThread
tbb::task* execute();
tbb::task* execute();
#endif
#ifndef OffloadingUseProgressThread
void pause();
void resume();
void pause();
void resume();
#endif
void terminate();
public:
State _state;
State _state;
private:
ADERDGSolver& _solver;
bool _started;
ADERDGSolver& _solver;
bool _started;
};
class ReceiveJob : public tarch::multicore::jobs::Job {
......@@ -1051,9 +1051,11 @@ private:
double _metadata[2*DIMENSIONS+3];
MigratablePredictionJobData(ADERDGSolver& solver);
~MigratablePredictionJobData();
MigratablePredictionJobData(const MigratablePredictionJobData&) = delete; //deleted copy constructor
MigratablePredictionJobData& operator = (const MigratablePredictionJobData &) = delete; //deleted copy assignment operator
~MigratablePredictionJobData();
//deleted copy constructor
MigratablePredictionJobData(const MigratablePredictionJobData&) = delete;
//deleted copy assignment operator
MigratablePredictionJobData& operator = (const MigratablePredictionJobData &) = delete;
};
/**
......@@ -1070,19 +1072,15 @@ private:
tbb::concurrent_hash_map<const CellDescription*, std::pair<int,int>> _mapCellDescToTagRank;
tbb::concurrent_hash_map<const CellDescription*, tarch::multicore::jobs::Job*> _mapCellDescToRecompJob;
tbb::concurrent_hash_map<int, double*> _mapTagToMetaData;
//tbb::concurrent_hash_map<int, MigratablePredictionJobData*> _mapTagToSTPData;
tbb::concurrent_hash_map<int, MigratablePredictionJobData*> _mapTagToSTPData;
// Used in order to time offloaded tasks.
tbb::concurrent_hash_map<int, double> _mapTagToOffloadTime;
//#if defined(TaskSharing)
tbb::concurrent_hash_map<int, MigratablePredictionJobData*> _mapTagToSTPData;
//#endif
/**
* These vectors are used to avoid duplicate
* receives that may be induced by MPI_Iprobe.
*/
std::vector<int> _lastReceiveTag;
#if defined(TaskSharing)
std::vector<int> _lastReceiveReplicaTag;
#endif
......@@ -1100,20 +1098,20 @@ private:
class MigratablePredictionJob : public tarch::multicore::jobs::Job {
friend class exahype::solvers::ADERDGSolver;
private:
ADERDGSolver& _solver;
ADERDGSolver& _solver;
const int _cellDescriptionsIndex;
const int _element;
const double _predictorTimeStamp;
const double _predictorTimeStepSize;
const int _originRank;
const int _tag;
double* _luh; // ndata *ndof^DIM
double* _lduh; // nvar *ndof^DIM
double* _lQhbnd;
double* _lFhbnd;
double _center[DIMENSIONS];
double _dx[DIMENSIONS];
bool _isLocalReplica;
const double _predictorTimeStamp;
const double _predictorTimeStepSize;
const int _originRank;
const int _tag;
double* _luh; // ndata *ndof^DIM
double* _lduh; // nvar *ndof^DIM
double* _lQhbnd;
double* _lFhbnd;
double _center[DIMENSIONS];
double _dx[DIMENSIONS];
bool _isLocalReplica;
static std::atomic<int> JobCounter;
......@@ -1124,24 +1122,24 @@ private:
public:
// constructor for local jobs that can be stolen
MigratablePredictionJob(
ADERDGSolver& solver,
ADERDGSolver& solver,
const int cellDescriptionsIndex,
const int element,
const double predictorTimeStamp,
const double predictorTimeStepSize
const int element,
const double predictorTimeStamp,
const double predictorTimeStepSize
);
// constructor for remote jobs that were received from another rank
MigratablePredictionJob(
ADERDGSolver& solver,
int cellDescriptionsIndex,
int element,
const double predictorTimeStamp,
const double predictorTimeStepSize,
double *luh, double *lduh,
double *lQhbnd, double *lFhbnd,
double *dx, double *center,
const int originRank,
const int tag
ADERDGSolver& solver,
int cellDescriptionsIndex,
int element,
const double predictorTimeStamp,
const double predictorTimeStepSize,
double *luh, double *lduh,
double *lQhbnd, double *lFhbnd,
double *dx, double *center,
const int originRank,
const int tag
);
MigratablePredictionJob(const MigratablePredictionJob& stp) = delete;
......@@ -1149,58 +1147,57 @@ private:
// call-back method: called when a job has been sent back to its origin rank
static void sendBackHandler(
exahype::solvers::Solver* solver,
int tag,
int rank);
exahype::solvers::Solver* solver,
int tag,
int rank);
// call-back method: called when a job has been successfully offloaded to another rank
static void sendHandler(
exahype::solvers::Solver* solver,
int tag,
int rank);
int tag,
int rank);
#if defined(TaskSharing)
static void sendKeyHandlerTaskSharing(
exahype::solvers::Solver* solver,
int tag,
int rank);
int tag,
int rank);
static void sendAckHandlerTaskSharing(
exahype::solvers::Solver* solver,
int tag,
int rank);
int tag,
int rank);
static void sendHandlerTaskSharing(
exahype::solvers::Solver* solver,
int tag,
int rank);
int tag,
int rank);
#endif
// call-back method: called when a remotely executed job has been returned back
static void receiveBackHandler(
exahype::solvers::Solver* solver,
int tag,
int rank);
int tag,
int rank);
// call-back method: called when a job has been received from another rank
static void receiveHandler(
exahype::solvers::Solver* solver,
int tag,
int rank);
int tag,
int rank);
#if defined(TaskSharing)
static void receiveKeyHandlerTaskSharing(
exahype::solvers::Solver* solver,
int tag,
int rank);
int tag,
int rank);
// call-back method: called when a job has been received from another rank
static void receiveHandlerTaskSharing(
exahype::solvers::Solver* solver,
int tag,
int rank);
int tag,
int rank);
#endif
bool run(bool calledFromMaster) override;
};
#if defined(TaskSharing) || defined(OffloadingLocalRecompute)
#if defined(TaskSharing) || defined(OffloadingLocalRecompute) //Todo(Philipp): do we still need this for local recompute?
static int REQUEST_JOB_CANCEL;
static int REQUEST_JOB_ACK;
......@@ -1225,12 +1222,15 @@ private:
return result;
}
/**
* Unique hash function for a JobTableKey/task outcome
*/
operator std::size_t() const {
using std::hash;
std::size_t result = 0;
std::hash<double> hash_fn_db;
std::hash<int> hash_fn_int;
//size_t str_hash = hash_fn(str);
for( int i=0; i<DIMENSIONS; i++) {
result ^= hash_fn_db(center[i]);
}
......@@ -1260,18 +1260,16 @@ private:
* If a task decides to send itself away, an offload entry is generated and submitted into
* a concurrent TBB queue. The offloading manager will take care of sending away
* the tasks in the concurrent TBB queue.
* Todo (Philipp): outdated, no longer necessary
*/
struct OffloadEntry {
int destRank;
int cellDescriptionsIndex;
int element;
int cellDescriptionsIndex;
int element;
double predictorTimeStamp;
double predictorTimeStepSize;
double predictorTimeStepSize;
};
// queue for outstanding offloads
//tbb::concurrent_queue<OffloadEntry> _outstandingOffloads;
/*
* Packs metadata into a contiguous buffer.
*/
......@@ -1365,6 +1363,7 @@ private:
std::atomic<bool> _offloadingManagerJobTriggerTerminate;
// limit the maximum number of iprobes in progressOffloading()
// Todo(Philipp): probably outdated
static std::atomic<int> MaxIprobesInOffloadingProgress;
#endif
......@@ -2826,19 +2825,36 @@ public:
/*
* Makes progress on all offloading-related MPI communication.
*/
static void progressOffloading(exahype::solvers::ADERDGSolver* solver, bool isCalledOnMaster, int maxIts);
static void progressOffloading(
exahype::solvers::ADERDGSolver* solver,
bool isCalledOnMaster,
int maxIts);
static void receiveTaskOutcome(int tag, int src, exahype::solvers::ADERDGSolver *solver);
static void receiveTaskOutcome(
int tag,
int src,
exahype::solvers::ADERDGSolver *solver);
static void receiveMigratableJob(int tag, int src, exahype::solvers::ADERDGSolver *solver);
static void receiveMigratableJob(
int tag,
int src,
exahype::solvers::ADERDGSolver *solver);
static void receiveBackMigratableJob(int tag, int src, exahype::solvers::ADERDGSolver *solver);
static void receiveBackMigratableJob(
int tag,
int src,
exahype::solvers::ADERDGSolver *solver);
static void pollForOutstandingCommunicationRequests(exahype::solvers::ADERDGSolver *solver, bool calledOnMaster, int maxIts);
static void pollForOutstandingCommunicationRequests(
exahype::solvers::ADERDGSolver *solver,
bool calledOnMaster,
int maxIts);
static void setMaxNumberOfIprobesInProgressOffloading(int maxNumIprobes);
static bool tryToReceiveTaskBack(exahype::solvers::ADERDGSolver* solver, const void* cellDescription = nullptr);
static bool tryToReceiveTaskBack(
exahype::solvers::ADERDGSolver* solver,
const void* cellDescription = nullptr);
size_t getAdditionalCurrentMemoryUsageReplication();
......@@ -2871,7 +2887,7 @@ public:
void addRecomputeJobForCellDescription(tarch::multicore::jobs::Job* job, const CellDescription* cellDescription);
#endif
#endif
#endif /*DistributedOffloading*/
#endif
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment