11.08., 9:00 - 11:00: Due to updates GitLab will be unavailable for some minutes between 09:00 and 11:00.

Commit 4b849c6c authored by Philipp Samfaß's avatar Philipp Samfaß

Merge branch 'philipp/replication_fix' into scorep

parents 18640855 1874d6eb
......@@ -31,6 +31,7 @@
#include "exahype/offloading/AggressiveDistributor.h"
#include "exahype/offloading/AggressiveCCPDistributor.h"
#include "exahype/offloading/AggressiveHybridDistributor.h"
#include "exahype/offloading/OffloadingAnalyser.h"
#endif
#if defined(TMPI_Heartbeats)
......@@ -255,6 +256,11 @@ void exahype::mappings::FinaliseMeshRefinement::endIteration(
}
}
#if defined(OffloadingStrategyAggressiveHybrid)
exahype::offloading::AggressiveHybridDistributor::getInstance().resetTasksToOffload();
exahype::offloading::OffloadingAnalyser::getInstance().resetMeasurements();
exahype::offloading::AggressiveHybridDistributor::getInstance().enable();
#endif
#endif
#if defined(TMPI_Heartbeats)
......
......@@ -37,6 +37,16 @@
#include "exahype/offloading/OffloadingAnalyser.h"
#endif
#include "exahype/offloading/NoiseGenerator.h"
#ifdef USE_ITAC
#include "VT.h"
#endif
#ifdef USE_ITAC
int exahype::mappings::FusedTimeStep::noiseHandle = 0;
#endif
tarch::logging::Log exahype::mappings::FusedTimeStep::_log("exahype::mappings::FusedTimeStep");
bool exahype::mappings::FusedTimeStep::issuePredictionJobsInThisIteration() {
......@@ -165,7 +175,6 @@ void exahype::mappings::FusedTimeStep::beginIteration(
}
#ifdef Parallel
#ifdef DistributedOffloading
static bool isFirst = true;
isFirst = false;
......@@ -190,13 +199,10 @@ void exahype::mappings::FusedTimeStep::beginIteration(
if(issuePredictionJobsInThisIteration()) {
exahype::offloading::StaticDistributor::getInstance().resetRemainingTasksToOffload();
}
//exahype::offloading::OffloadingAnalyser::getInstance().beginIteration();
#elif defined(OffloadingStrategyAggressive) || defined(OffloadingStrategyAggressiveHybrid) || defined(OffloadingStrategyAggressiveDiffusive)
//exahype::offloading::OffloadingAnalyser::getInstance().beginIteration();
#endif
#endif
#endif
// ensure reductions are inititated from worker side
// ensure reductions are inititated from worker side
solverState.setReduceStateAndCell( exahype::State::isLastIterationOfBatchOrNoBatch() );
#endif
......@@ -207,6 +213,18 @@ void exahype::mappings::FusedTimeStep::endIteration(
exahype::State& state) {
logTraceInWith1Argument("endIteration(State)", state);
#if defined(GenerateNoise)
if(issuePredictionJobsInThisIteration()) {
#ifdef USE_ITAC
VT_begin(noiseHandle);
#endif
exahype::offloading::NoiseGenerator::getInstance().generateNoise();
#ifdef USE_ITAC
VT_end(noiseHandle);
#endif
}
#endif
if ( sendOutRiemannDataInThisIteration() ) {
exahype::plotters::finishedPlotting();
......@@ -230,7 +248,6 @@ void exahype::mappings::FusedTimeStep::endIteration(
#if defined(Parallel) && defined(DistributedOffloading)
#if defined(OffloadingStrategyAggressive) || defined(OffloadingStrategyAggressiveHybrid) || defined(OffloadingStrategyAggressiveDiffusive) || defined(OffloadingStrategyStaticHardcoded)
//exahype::offloading::OffloadingAnalyser::getInstance().endIteration();
#ifdef OffloadingUseProgressTask
if(issuePredictionJobsInThisIteration() ) {
exahype::offloading::OffloadingManager::getInstance().notifyAllVictimsSendCompletedIfNotNotified();
......
......@@ -86,6 +86,13 @@ private:
static bool issuePredictionJobsInThisIteration();
public:
#ifdef USE_ITAC
/**
* A handle to measure the noise generated by the noise generator.
*/
static int noiseHandle;
#endif
/**
* Run through the whole tree. Run concurrently on the fine grid.
*
......
......@@ -36,6 +36,10 @@
#include "exahype/mappings/RefinementStatusSpreading.h"
#if defined(DistributedOffloading)
#include "exahype/offloading/AggressiveHybridDistributor.h"
#endif
#include <sstream>
bool exahype::mappings::MeshRefinement::DynamicLoadBalancing = false;
......@@ -147,6 +151,13 @@ void exahype::mappings::MeshRefinement::beginIteration( exahype::State& solverSt
if (! MetadataHeap::getInstance().validateThatIncomingJoinBuffersAreEmpty() ) {
exit(-1);
}
#if defined(DistributedOffloading)
#if defined(OffloadingStrategyAggressiveHybrid)
exahype::offloading::AggressiveHybridDistributor::getInstance().disable();
#endif
#endif
#endif
}
......
......@@ -43,7 +43,9 @@ exahype::offloading::AggressiveHybridDistributor::AggressiveHybridDistributor()
_adaptTemperature(false),
_CCPFrequency(0),
_CCPStepsPerPhase(0),
_thresholdTempAdaptation(1)
_thresholdTempAdaptation(1),
_currentOptimalVictim(-1),
_currentCriticalRank(-1)
{
int nnodes = tarch::parallel::Node::getInstance().getNumberOfNodes();
......@@ -314,6 +316,9 @@ void exahype::offloading::AggressiveHybridDistributor::configure(
}
void exahype::offloading::AggressiveHybridDistributor::printOffloadingStatistics() {
if(!_isEnabled) return;
int nnodes = tarch::parallel::Node::getInstance().getNumberOfNodes();
int myRank = tarch::parallel::Node::getInstance().getRank();
......@@ -344,6 +349,16 @@ void exahype::offloading::AggressiveHybridDistributor::resetRemainingTasksToOffl
}
}
void exahype::offloading::AggressiveHybridDistributor::resetTasksToOffload() {
int nnodes = tarch::parallel::Node::getInstance().getNumberOfNodes();
int myRank = tarch::parallel::Node::getInstance().getRank();
for(int i=0; i<nnodes; i++) {
_tasksToOffload[i] = 0;
}
}
void exahype::offloading::AggressiveHybridDistributor::handleEmergencyOnRank(int rank) {
_emergencies[rank]++;
logDebug("handleEmergencyOnRank()","emergencies for rank:"<<_emergencies[rank]);
......@@ -487,6 +502,11 @@ void exahype::offloading::AggressiveHybridDistributor::updateLoadDistributionDif
_tasksToOffload[currentCriticalRank] = std::max( (1-_temperatureDiffusion), 0.0)*_tasksToOffload[currentCriticalRank];
}
#endif
//store decision
_currentOptimalVictim = currentOptimalVictim;
_currentCriticalRank = currentCriticalRank;
resetRemainingTasksToOffload();
}
......@@ -498,6 +518,14 @@ void exahype::offloading::AggressiveHybridDistributor::getAllVictimRanks(std::ve
}
}
int exahype::offloading::AggressiveHybridDistributor::getCurrentOptimalVictim() {
return _currentOptimalVictim;
}
int exahype::offloading::AggressiveHybridDistributor::getCurrentCriticalRank() {
return _currentCriticalRank;
}
bool exahype::offloading::AggressiveHybridDistributor::selectVictimRank(int& victim, bool& last) {
int nnodes = tarch::parallel::Node::getInstance().getNumberOfNodes();
int myRank = tarch::parallel::Node::getInstance().getRank();
......@@ -531,13 +559,14 @@ bool exahype::offloading::AggressiveHybridDistributor::selectVictimRank(int& vic
int threshold = 1
+ std::max(1, tarch::multicore::Core::getInstance().getNumberOfThreads()-1)
* tarch::multicore::jobs::internal::_minimalNumberOfJobsPerConsumerRun;
logDebug("selectVictimRank", "threshold "<<threshold
<< " there are "<<exahype::solvers::ADERDGSolver::NumberOfEnclaveJobs-exahype::solvers::ADERDGSolver::NumberOfRemoteJobs
<<" jobs "<< " and "<<tarch::multicore::jobs::getNumberOfWaitingBackgroundJobs());
threshold = std::max(threshold, 20);
threshold = std::max(threshold, 20);
if(exahype::solvers::ADERDGSolver::NumberOfEnclaveJobs-exahype::solvers::ADERDGSolver::NumberOfRemoteJobs<
threshold) {
logInfo("selectVictimRank", "threshold "<<threshold
<< " there are "<<exahype::solvers::ADERDGSolver::NumberOfEnclaveJobs-exahype::solvers::ADERDGSolver::NumberOfRemoteJobs
<<" jobs "<< " and "<<tarch::multicore::jobs::getNumberOfWaitingBackgroundJobs());
_tasksNotOffloaded[victim]++;
victim = myRank;
}
......
......@@ -148,6 +148,9 @@ class exahype::offloading::AggressiveHybridDistributor {
*/
bool _isEnabled;
int _currentOptimalVictim;
int _currentCriticalRank;
/**
* Conducts a CCP step.
*/
......@@ -207,6 +210,8 @@ class exahype::offloading::AggressiveHybridDistributor {
*/
void resetRemainingTasksToOffload();
void resetTasksToOffload();
/**
* Print some useful statistics for offloading.
*/
......@@ -228,6 +233,16 @@ class exahype::offloading::AggressiveHybridDistributor {
* @param victimRanks Result
*/
void getAllVictimRanks(std::vector<int>& victimRanks);
/**
* Returns current optimal victim
*/
int getCurrentOptimalVictim();
/**
* Returns current critical rank.
*/
int getCurrentCriticalRank();
/**
* Conducts another update of the load re-distribution. This is
......
......@@ -11,17 +11,17 @@
* For the full license text, see LICENSE.txt
**/
#include "ReplicationStatistics.h"
#include "JobTableStatistics.h"
#include "exahype/offloading/OffloadingManager.h"
namespace exahype {
namespace offloading {
tarch::logging::Log ReplicationStatistics::_log( "exahype::offloading::ReplicationStatistics" );
tarch::logging::Log JobTableStatistics::_log( "exahype::offloading::JobTableStatistics" );
ReplicationStatistics::ReplicationStatistics() :
JobTableStatistics::JobTableStatistics() :
_spawnedTasks(0),
_executedTasks(0),
_savedTasks(0),
......@@ -30,70 +30,76 @@ ReplicationStatistics::ReplicationStatistics() :
_declinedTasks(0),
_lateTasks(0),
_sentKeys(0),
_receivedKeys(0)
_receivedKeys(0),
_recomputedTasks(0)
{
// TODO Auto-generated constructor stub
// TODO Auto-generated constructor stub
}
ReplicationStatistics::~ReplicationStatistics() {
// TODO Auto-generated destructor stub
JobTableStatistics::~JobTableStatistics() {
// TODO Auto-generated destructor stub
}
ReplicationStatistics& ReplicationStatistics::getInstance() {
static ReplicationStatistics replicationStats;
return replicationStats;
JobTableStatistics& JobTableStatistics::getInstance() {
static JobTableStatistics replicationStats;
return replicationStats;
}
void ReplicationStatistics::notifyLateTask() {
_lateTasks++;
void JobTableStatistics::notifyLateTask() {
_lateTasks++;
}
void ReplicationStatistics::notifyDeclinedTask() {
_declinedTasks++;
void JobTableStatistics::notifyDeclinedTask() {
_declinedTasks++;
}
void ReplicationStatistics::notifyReceivedTask(){
_receivedTasks++;
void JobTableStatistics::notifyReceivedTask(){
_receivedTasks++;
}
void ReplicationStatistics::notifySentTask(){
_sentTasks++;
void JobTableStatistics::notifySentTask(){
_sentTasks++;
}
void ReplicationStatistics::notifySavedTask(){
_savedTasks++;
void JobTableStatistics::notifySavedTask(){
_savedTasks++;
}
void ReplicationStatistics::notifySpawnedTask(){
void JobTableStatistics::notifySpawnedTask(){
_spawnedTasks++;
}
void ReplicationStatistics::notifyExecutedTask(){
void JobTableStatistics::notifyExecutedTask(){
_executedTasks++;
}
void ReplicationStatistics::notifySentKey() {
_sentKeys++;
void JobTableStatistics::notifySentKey() {
_sentKeys++;
}
void ReplicationStatistics::notifyReceivedKey() {
_receivedKeys++;
void JobTableStatistics::notifyReceivedKey() {
_receivedKeys++;
}
void ReplicationStatistics::printStatistics() {
#if defined(TaskSharing)
void JobTableStatistics::notifyRecomputedTask() {
_recomputedTasks++;
}
void JobTableStatistics::printStatistics() {
#if defined(TaskSharing) || defined(OffloadingLocalRecompute)
int team = exahype::offloading::OffloadingManager::getInstance().getTMPIInterTeamRank();
logInfo("printStatistics", " team "<<team
<<" spawned tasks = "<<_spawnedTasks
<<" executed tasks = "<<_executedTasks
<<" saved tasks = "<<_savedTasks
<<" sent tasks = "<<_sentTasks
<<" received tasks = "<<_receivedTasks
<<" received keys= "<<_receivedKeys
<<" sent keys= "<<_sentKeys
<<" declined tasks = "<<_declinedTasks
<<" late tasks = "<<_lateTasks);
<<" spawned tasks = "<<_spawnedTasks
<<" executed tasks = "<<_executedTasks
<<" saved tasks = "<<_savedTasks
<<" sent tasks = "<<_sentTasks
<<" received tasks = "<<_receivedTasks
<<" received keys= "<<_receivedKeys
<<" sent keys= "<<_sentKeys
<<" declined tasks = "<<_declinedTasks
<<" late tasks = "<<_lateTasks
<<" recomputed tasks = "<<_recomputedTasks);
#endif
}
......
......@@ -20,39 +20,44 @@
namespace exahype {
namespace offloading {
class ReplicationStatistics {
/**
* Gathers statistics about job execution and job migration at runtime (only with local recompute or task sharing).
*/
class JobTableStatistics {
private:
static tarch::logging::Log _log;
static tarch::logging::Log _log;
std::atomic<int> _spawnedTasks;
std::atomic<int> _executedTasks;
std::atomic<int> _savedTasks;
std::atomic<int> _receivedTasks;
std::atomic<int> _sentTasks;
std::atomic<int> _sentKeys;
std::atomic<int> _receivedKeys;
std::atomic<int> _declinedTasks;
std::atomic<int> _lateTasks;
std::atomic<int> _spawnedTasks;
std::atomic<int> _executedTasks;
std::atomic<int> _savedTasks;
std::atomic<int> _receivedTasks;
std::atomic<int> _sentTasks;
std::atomic<int> _sentKeys;
std::atomic<int> _receivedKeys;
std::atomic<int> _declinedTasks;
std::atomic<int> _lateTasks;
std::atomic<int> _recomputedTasks;
ReplicationStatistics();
virtual ~ReplicationStatistics();
JobTableStatistics();
virtual ~JobTableStatistics();
public:
static ReplicationStatistics& getInstance();
static JobTableStatistics& getInstance();
void printStatistics();
void printStatistics();
void notifyLateTask();
void notifyDeclinedTask();
void notifyReceivedTask();
void notifySentTask();
void notifySentKey();
void notifyReceivedKey();
void notifySavedTask();
void notifySpawnedTask();
void notifyExecutedTask();
void notifyLateTask();
void notifyDeclinedTask();
void notifyReceivedTask();
void notifySentTask();
void notifySentKey();
void notifyReceivedKey();
void notifySavedTask();
void notifySpawnedTask();
void notifyExecutedTask();
void notifyRecomputedTask();
};
} /* namespace offloading */
......
/**
* This file is part of the ExaHyPE project.
* Copyright (c) 2016 http://exahype.eu
* All rights reserved.
*
* The project has received funding from the European Union's Horizon
* 2020 research and innovation programme under grant agreement
* No 671698. For copyrights and licensing, please consult the webpage.
*
* Released under the BSD 3 Open Source License.
* For the full license text, see LICENSE.txt
**/
#ifndef EXAHYPE_EXAHYPE_OFFLOADING_NOISEGENERATIONSTRATEGY_H_
#define EXAHYPE_EXAHYPE_OFFLOADING_NOISEGENERATIONSTRATEGY_H_
#include <chrono>
namespace exahype {
namespace offloading {
/**
* Generic strategy pattern for noise generation. A specific noise
* generation strategy needs to implement this interface. The
* noise generator context class will call the contained
* noise generation methods during the simulation.
*/
class NoiseGenerationStrategy {
public:
// generates noise at the beginning of a time step
virtual void generateNoise(int rank, std::chrono::system_clock::time_point timestamp ) = 0;
//generates noise when executing STPs
virtual void generateNoiseSTP(int rank, std::chrono::system_clock::time_point timestamp ) = 0;
};
} /* namespace offloading */
} /* namespace exahype */
#endif /* EXAHYPE_EXAHYPE_OFFLOADING_NOISEGENERATIONSTRATEGY_H_ */
/**
* This file is part of the ExaHyPE project.
* Copyright (c) 2016 http://exahype.eu
* All rights reserved.
*
* The project has received funding from the European Union's Horizon
* 2020 research and innovation programme under grant agreement
* No 671698. For copyrights and licensing, please consult the webpage.
*
* Released under the BSD 3 Open Source License.
* For the full license text, see LICENSE.txt
**/
#include "NoiseGenerationStrategyChaseVictim.h"
#include "OffloadingAnalyser.h"
#include "tarch/parallel/Node.h"
#include "AggressiveHybridDistributor.h"
#include "OffloadingManager.h"
#include <unistd.h>
#include <cstdlib>
#include <string>
namespace exahype {
namespace offloading {
tarch::logging::Log exahype::offloading::NoiseGenerationStrategyChaseVictim::_log( "exahype::offloading::NoiseGenerationStrategyChaseVictim" );
NoiseGenerationStrategyChaseVictim::NoiseGenerationStrategyChaseVictim() : _factor(0.5), _baseNoise(1){
}
NoiseGenerationStrategyChaseVictim::NoiseGenerationStrategyChaseVictim(double factor, double baseNoise)
: _factor(factor), _baseNoise(baseNoise) {
}
NoiseGenerationStrategyChaseVictim::~NoiseGenerationStrategyChaseVictim() {
}
void NoiseGenerationStrategyChaseVictim::generateNoise(int rank, std::chrono::system_clock::time_point timestamp) {
pid_t pid = getpid();
static int cnt = 0;
#if defined(DistributedOffloading)
static bool triggeredVictim = false;
static const int kRecover = 10;
static int kLastTriggered = -1;
if(triggeredVictim && (cnt-kLastTriggered)==kRecover){
triggeredVictim = false;
kLastTriggered = -1;
}
if (OffloadingManager::getInstance().isVictim() && !triggeredVictim) {
//double timePerTimeStep = exahype::offloading::OffloadingAnalyser::getInstance().getTimePerTimeStep();
double timeToWait = _baseNoise*_factor;
std::string call = " kill -STOP "+std::to_string(pid)+" ; sleep "+std::to_string(timeToWait)+"; kill -CONT "+std::to_string(pid);
logInfo("generateNoise()", "running cmd "<<call<<std::endl);
std::system( call.c_str() );
if(OffloadingManager::getInstance().isVictim()) {
triggeredVictim = true;
kLastTriggered = cnt;
}
}
cnt++;
#else
//todo
#endif
}
void NoiseGenerationStrategyChaseVictim::generateNoiseSTP(int rank, std::chrono::system_clock::time_point timestamp) {
/* int nranks = tarch::parallel::Node::getInstance().getNumberOfNodes();
#if defined(DistributedOffloading)
if( OffloadingManager::getInstance().isVictim()) {
double timePerSTP = exahype::offloading::OffloadingAnalyser::getInstance().getTimePerSTP();
double timeToWait = timePerSTP*_factor*1e06;
logInfo("generateNoiseSTP()", "sleeping "<<timeToWait<<std::endl);
while( std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now()-timestamp).count()<timeToWait) {
}
logInfo("generateNoiseSTP()", "slept "<<std::chrono::duration_cast<std::chrono::microseconds>(std::chrono::system_clock::now()-timestamp).count()<<std::endl);
}
#else
//todo
#endif */
}
} /* namespace offloading */
} /* namespace exahype */
/**
* This file is part of the ExaHyPE project.
* Copyright (c) 2016 http://exahype.eu
* All rights reserved.
*
* The project has received funding from the European Union's Horizon
* 2020 research and innovation programme under grant agreement
* No 671698. For copyrights and licensing, please consult the webpage.
*
* Released under the BSD 3 Open Source License.
* For the full license text, see LICENSE.txt
**/
#ifndef EXAHYPE_EXAHYPE_OFFLOADING_NOISEGENERATIONSTRATEGYCHASEVICTIM_H_
#define EXAHYPE_EXAHYPE_OFFLOADING_NOISEGENERATIONSTRATEGYCHASEVICTIM_H_
#include "NoiseGenerationStrategy.h"
#include "tarch/logging/Log.h"
namespace exahype {
namespace offloading {
/**
* Implements a noise generation strategy where victim ranks are disturbed.
*/
class NoiseGenerationStrategyChaseVictim : public NoiseGenerationStrategy {
public:
static tarch::logging::Log _log;
NoiseGenerationStrategyChaseVictim();
NoiseGenerationStrategyChaseVictim(double factor, double baseNoise);
virtual ~NoiseGenerationStrategyChaseVictim();
virtual void generateNoise(int rank, std::chrono::system_clock::time_point timestamp);
virtual void generateNoiseSTP(int rank, std::chrono::system_clock::time_point timestamp);
private:
double _factor;
double _baseNoise;
};
} /* namespace offloading */
} /* namespace exahype */
#endif /* EXAHYPE_EXAHYPE_OFFLOADING_NOISEGENERATIONSTRATEGYCHASEVICTIM_H_ */
/**
* This file is part of the ExaHyPE project.
* Copyright (c) 2016 http://exahype.eu
* All rights reserved.
*
* The project has received funding from the European Union's Horizon
* 2020 research and innovation programme under grant agreement
* No 671698. For copyrights and licensing, please consult the webpage.
*
* Released under the BSD 3 Open Source License.
* For the full license text, see LICENSE.txt
**/
#include "NoiseGenerationStrategyRoundRobin.h"
#include "OffloadingAnalyser.h"
#include "tarch/parallel/Node.h"
#include <unistd.h>
#include <cstdlib>
#include <string>
namespace exahype {
namespace offloading {
tarch::logging::Log exahype::offloading::NoiseGenerationStrategyRoundRobin::_log( "exahype::offloading::NoiseGenerationStrategyRoundRobin" );
NoiseGenerationStrategyRoundRobin::NoiseGenerationStrategyRoundRobin() : _frequency(1), _factor(0.5){
}
NoiseGenerationStrategyRoundRobin::NoiseGenerationStrategyRoundRobin(int frequency, double factor)
: _frequency(frequency), _factor(factor){
}
NoiseGenerationStrategyRoundRobin::~NoiseGenerationStrategyRoundRobin() {
}
void NoiseGenerationStrategyRoundRobin::generateNoise(int rank, std::chrono::system_clock::time_point timestamp) {
pid_t pid = getpid();
static int cnt = 0;
static int phase_cnt = 0;
int nranks = tarch::parallel::Node::getInstance().getNumberOfNodes();
if(phase_cnt==rank) {
double timePerTimeStep = exahype::offloading::OffloadingAnalyser::getInstance().getTimePerTimeStep();
double timeToWait = timePerTimeStep*_factor;
std::string call = " kill -STOP "+std::to_string(pid)+" ; sleep "+std::to_string(timeToWait)+"; kill -CONT "+std::to_string(pid);
logInfo("generateNoise()", "running cmd "<<call<<std::endl);
std::system( call.c_str() );
}
cnt = cnt + 1;