Commit 691ebd93 authored by Phillip Samfass's avatar Phillip Samfass

more refactoring + complete any outstanding requests before termination

parent b25f5f57
...@@ -98,6 +98,17 @@ void Timing::progressOutstandingRequests(int targetTeam) { ...@@ -98,6 +98,17 @@ void Timing::progressOutstandingRequests(int targetTeam) {
} }
} }
void Timing::pollForAndReceiveHeartbeat(int targetTeam) {
int received = 0;
PMPI_Iprobe(mapTeamToWorldRank(getTeamRank(),targetTeam), targetTeam, getLibComm(), &received, MPI_STATUS_IGNORE);
if(received) {
timer.heartbeatTimes.at(targetTeam).push_back(0.0);
timer.heartbeatTimeRequests.at(targetTeam).push_back(MPI_Request());
PMPI_Irecv(&timer.heartbeatTimes.at(targetTeam).back(), 1, MPI_DOUBLE,
mapTeamToWorldRank(getTeamRank(), targetTeam), targetTeam, getLibComm(), &timer.heartbeatTimeRequests.at(targetTeam).back());
}
}
void Timing::compareProgressWithReplicas() { void Timing::compareProgressWithReplicas() {
for (int r=0; r < getNumberOfTeams(); r++) { for (int r=0; r < getNumberOfTeams(); r++) {
if (r != getTeam()) { if (r != getTeam()) {
...@@ -109,15 +120,7 @@ void Timing::compareProgressWithReplicas() { ...@@ -109,15 +120,7 @@ void Timing::compareProgressWithReplicas() {
// Receive deltas from other replicas // Receive deltas from other replicas
int received = 0; pollForAndReceiveHeartbeat(r);
PMPI_Iprobe(mapTeamToWorldRank(getTeamRank(),r), r, getLibComm(), &received, MPI_STATUS_IGNORE);
if(received) {
timer.heartbeatTimes.at(r).push_back(0.0);
timer.heartbeatTimeRequests.at(r).push_back(MPI_Request());
PMPI_Irecv(&timer.heartbeatTimes.at(r).back(), 1, MPI_DOUBLE,
mapTeamToWorldRank(getTeamRank(), r), r, getLibComm(), &timer.heartbeatTimeRequests.at(r).back());
}
progressOutstandingRequests(r); progressOutstandingRequests(r);
} }
} }
...@@ -177,11 +180,13 @@ void Timing::outputTiming() { ...@@ -177,11 +180,13 @@ void Timing::outputTiming() {
//finish outstanding communication requests //finish outstanding communication requests
bool finished_all = false; bool finished_all = false;
while(!finished_all) { while(!finished_all) {
finished_all = true;
for(int r=0; r<getNumberOfTeams(); r++) { for(int r=0; r<getNumberOfTeams(); r++) {
finished_all &= timer.heartbeatTimeRequests.at(r).empty(); if(r!=getTeam()) {
} pollForAndReceiveHeartbeat(r);
if(!finished_all) { progressOutstandingRequests(r);
finished_all &= timer.heartbeatTimeRequests.at(r).empty();
}
} }
} }
......
...@@ -26,6 +26,8 @@ void compareProgressWithReplicas(); ...@@ -26,6 +26,8 @@ void compareProgressWithReplicas();
// Also compare a hash of a heartbeat buffer // Also compare a hash of a heartbeat buffer
void compareBufferWithReplicas(const void *sendbuf, int sendcount, MPI_Datatype sendtype); void compareBufferWithReplicas(const void *sendbuf, int sendcount, MPI_Datatype sendtype);
void pollForAndReceiveHeartbeat(int targetTeam);
void progressOutstandingRequests(int targetTeam); void progressOutstandingRequests(int targetTeam);
void sleepRankRaised(); void sleepRankRaised();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment