Commit 4d204e8f authored by Ben Hazelwood's avatar Ben Hazelwood
Browse files

Merge branch 'master' of bitbucket.org:benjaminhazelwood/tmpi

parents 55de6c80 0a66fa36
...@@ -11,7 +11,7 @@ ...@@ -11,7 +11,7 @@
#include <cstdlib> #include <cstdlib>
#include <math.h> #include <math.h>
const int NUM_TRIALS = 100; const int NUM_TRIALS = 100 ;
const int NUM_COMPUTATIONS = 5e7; const int NUM_COMPUTATIONS = 5e7;
int main(int argc, char *argv[]) int main(int argc, char *argv[])
...@@ -30,7 +30,7 @@ int main(int argc, char *argv[]) ...@@ -30,7 +30,7 @@ int main(int argc, char *argv[])
} }
#ifdef COMPARE_PROGRESS #ifdef COMPARE_PROGRESS
MPI_Sendrecv(MPI_IN_PLACE, 0, MPI_BYTE, MPI_PROC_NULL, 1, MPI_IN_PLACE, 0, MPI_BYTE, MPI_PROC_NULL, 0, MPI_COMM_SELF, MPI_STATUS_IGNORE); MPI_Sendrecv(MPI_IN_PLACE, 0, MPI_BYTE, MPI_PROC_NULL, 0, MPI_IN_PLACE, 0, MPI_BYTE, MPI_PROC_NULL, 0, MPI_COMM_SELF, MPI_STATUS_IGNORE);
#endif #endif
MPI_Barrier(MPI_COMM_WORLD); MPI_Barrier(MPI_COMM_WORLD);
} }
......
...@@ -19,6 +19,8 @@ ...@@ -19,6 +19,8 @@
#include <stddef.h> #include <stddef.h>
#include <bitset> #include <bitset>
#include <unistd.h> #include <unistd.h>
#include <list>
#include <vector>
struct Timer { struct Timer {
// PMPI_Wtime at start of execution // PMPI_Wtime at start of execution
...@@ -29,27 +31,28 @@ struct Timer { ...@@ -29,27 +31,28 @@ struct Timer {
// Mark when an application sleeps // Mark when an application sleeps
std::vector<double> sleepPoints; std::vector<double> sleepPoints;
// TODO change to heartbeat terminology
// Times for each heartbeat (per replica) // TODO: add support for multiple tags (or do we need this?)
std::map< int, std::vector<double> > heartbeatTimes; // Delta times for each heartbeat (per replica)
// Store the MPI_Requests for each heartbeat (per replica) for calling MPI_Test std::map< int, std::list<double> > heartbeatTimes;
std::map< int, std::vector<MPI_Request> > heartbeatTimeRequests; // Store the MPI_Requests for each heartbeat delta (per replica)
std::map< int, std::list<MPI_Request> > heartbeatTimeRequests;
// Hash for each heartbeat buffer (per replica) // Hash for each heartbeat buffer (per replica)
std::map<int, std::vector<std::size_t> > heartbeatHashes; std::map<int, std::list<std::size_t> > heartbeatHashes;
// Store the MPI_Requests for each heartbeat (per replica) for calling MPI_Test // Store the MPI_Requests for each heartbeat (per replica)
std::map<int, std::vector<MPI_Request> > heartbeatHashRequests; std::map<int, std::list<MPI_Request> > heartbeatHashRequests;
} timer; } timer;
void Timing::initialiseTiming() { void Timing::initialiseTiming() {
synchroniseRanksInTeam(); synchroniseRanksInTeam();
timer.startTime = PMPI_Wtime(); timer.startTime = PMPI_Wtime();
for (int i=0; i < getNumberOfTeams(); i++) { for (int i=0; i < getNumberOfTeams(); i++) {
timer.heartbeatTimes.insert(std::make_pair(i,std::vector<double>())); timer.heartbeatTimes.insert({i, std::list<double>()});
timer.heartbeatTimeRequests.insert(std::make_pair(i,std::vector<MPI_Request>())); timer.heartbeatTimeRequests.insert({i, std::list<MPI_Request>()});
timer.heartbeatHashes.insert(std::make_pair(i,std::vector<std::size_t>())); timer.heartbeatHashes.insert({i, std::list<std::size_t>()});
timer.heartbeatHashRequests.insert(std::make_pair(i,std::vector<MPI_Request>())); timer.heartbeatHashRequests.insert({i, std::list<MPI_Request>()});
} }
} }
...@@ -59,10 +62,16 @@ void Timing::finaliseTiming() { ...@@ -59,10 +62,16 @@ void Timing::finaliseTiming() {
} }
void Timing::markTimeline(int tag) { void Timing::markTimeline(int tag) {
if (tag > 0) {
timer.heartbeatTimes.at(getTeam()).push_back(PMPI_Wtime()); timer.heartbeatTimes.at(getTeam()).push_back(PMPI_Wtime());
if (tag > 0) { } else if (tag < 0) {
if (timer.heartbeatTimes.at(getTeam()).size()) {
timer.heartbeatTimes.at(getTeam()).back() = PMPI_Wtime() - timer.heartbeatTimes.at(getTeam()).back();
compareProgressWithReplicas(); compareProgressWithReplicas();
} }
} else {
// TODO: if tag == 0 then single heartbeat mode not deltas
}
} }
void Timing::markTimeline(int tag, const void *sendbuf, int sendcount, MPI_Datatype sendtype) { void Timing::markTimeline(int tag, const void *sendbuf, int sendcount, MPI_Datatype sendtype) {
...@@ -73,27 +82,30 @@ void Timing::markTimeline(int tag, const void *sendbuf, int sendcount, MPI_Datat ...@@ -73,27 +82,30 @@ void Timing::markTimeline(int tag, const void *sendbuf, int sendcount, MPI_Datat
void Timing::compareProgressWithReplicas() { void Timing::compareProgressWithReplicas() {
for (int r=0; r < getNumberOfTeams(); r++) { for (int r=0; r < getNumberOfTeams(); r++) {
if (r != getTeam()) { if (r != getTeam()) {
// Send out this replica's times // Send out this replica's delta
MPI_Request request; timer.heartbeatTimeRequests.at(r).push_back(MPI_Request());
PMPI_Isend(timer.heartbeatTimes.at(getTeam()).data()+timer.heartbeatTimes.at(getTeam()).size() - 2, 2, MPI_DOUBLE, PMPI_Isend(&timer.heartbeatTimes.at(getTeam()).back(), 1, MPI_DOUBLE,
mapTeamToWorldRank(getTeamRank(), r), getTeam(), mapTeamToWorldRank(getTeamRank(), r), getTeam(),
getLibComm(), &request); getLibComm(), &timer.heartbeatTimeRequests.at(r).back());
MPI_Request_free(&request);
// Receive times from other replicas
timer.heartbeatTimes.at(r).push_back(0.0); // Receive deltas from other replicas
timer.heartbeatTimes.at(r).push_back(0.0); timer.heartbeatTimes.at(r).push_back(0.0);
timer.heartbeatTimeRequests.at(r).push_back(MPI_Request()); timer.heartbeatTimeRequests.at(r).push_back(MPI_Request());
PMPI_Irecv(timer.heartbeatTimes.at(getTeam()).data()+timer.heartbeatTimes.at(getTeam()).size() - 2, 2, MPI_DOUBLE, PMPI_Irecv(&timer.heartbeatTimes.at(getTeam()).back(), 1, MPI_DOUBLE,
mapTeamToWorldRank(getTeamRank(), r), r, getLibComm(), &timer.heartbeatTimeRequests.at(r).back()); mapTeamToWorldRank(getTeamRank(), r), r, getLibComm(), &timer.heartbeatTimeRequests.at(r).back());
// Test for completion of Irecv's auto it = timer.heartbeatTimeRequests.at(r).begin();
int numPending = 0; while (it != timer.heartbeatTimeRequests.at(r).end()) {
for (int i=0; i < timer.heartbeatTimeRequests.at(r).size(); i++) { int flag;
int flag = 0; PMPI_Test(&(*it), &flag, MPI_STATUS_IGNORE);
logDebug("Sucess") if (flag) {
PMPI_Test(&timer.heartbeatTimeRequests.at(r).at(i), &flag, MPI_STATUS_IGNORE); if (!((*it) == MPI_REQUEST_NULL)){
numPending += 1 - flag; MPI_Request_free(&(*it));
}
it = timer.heartbeatTimeRequests.at(r).erase(it);
}
++it;
} }
} }
} }
...@@ -129,14 +141,14 @@ void Timing::compareBufferWithReplicas(const void *sendbuf, int sendcount, MPI_D ...@@ -129,14 +141,14 @@ void Timing::compareBufferWithReplicas(const void *sendbuf, int sendcount, MPI_D
PMPI_Irecv(&timer.heartbeatHashes.at(r).back(), 1, TMPI_SIZE_T, PMPI_Irecv(&timer.heartbeatHashes.at(r).back(), 1, TMPI_SIZE_T,
mapTeamToWorldRank(getTeamRank(), r), r, getLibComm(), &timer.heartbeatHashRequests.at(r).back()); mapTeamToWorldRank(getTeamRank(), r), r, getLibComm(), &timer.heartbeatHashRequests.at(r).back());
// Test for completion of Irecv's // // Test for completion of Irecv's
int numPending = 0; // int numPending = 0;
for (int i=0; i < timer.heartbeatHashRequests.at(r).size(); i++) { // for (int i=0; i < timer.heartbeatHashRequests.at(r).size(); i++) {
int flag = 0; // int flag = 0;
PMPI_Test(&timer.heartbeatHashRequests.at(r).at(i), &flag, MPI_STATUS_IGNORE); // PMPI_Test(&timer.heartbeatHashRequests.at(r).at(i), &flag, MPI_STATUS_IGNORE);
numPending += 1 - flag; // numPending += 1 - flag;
} // }
std::cout << "Num pending: " << numPending << "\n"; // std::cout << "Num pending: " << numPending << "\n";
} }
} }
} }
...@@ -199,7 +211,7 @@ void Timing::outputTiming() { ...@@ -199,7 +211,7 @@ void Timing::outputTiming() {
f << "heartbeatTimes"; f << "heartbeatTimes";
for (const double& t : timer.heartbeatTimes.at(getTeam())) { for (const double& t : timer.heartbeatTimes.at(getTeam())) {
f << sep << t - timer.startTime; f << sep << t;
} }
f << "\n"; f << "\n";
......
...@@ -9,7 +9,7 @@ ...@@ -9,7 +9,7 @@
#define TIMING_H_ #define TIMING_H_
#include <mpi.h> #include <mpi.h>
#include <vector>
namespace Timing { namespace Timing {
......
#!/bin/bash #!/bin/bash
if (( $# < 4)); then if (( $# < 3)); then
echo "ERROR: At least four parameters are required" echo "ERROR: At least three parameters are required"
echo "Usage: { constant | increasing | random } { single | rr | random } application [application args...]" echo "Usage: { constant | increasing | random } { single | rr | random } application [application args...]"
exit 1 exit 1
fi fi
mpirun -np 4 -l ${@:3} & mpirun -np 4 ${@:3} &
sleep 2 sleep 2
pids=($(pgrep Latency)) pids=($(pgrep PerfSimulator))
iteration=1 iteration=1
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment