Commit 43adae18 authored by Ben Hazelwood's avatar Ben Hazelwood
Browse files

Mark when application sleeps

parent 073ea0a8
......@@ -10,8 +10,8 @@
#include <assert.h>
#include <cstdlib>
const int NUM_TRIALS = 100;
const int NUM_PONGS = 2e4;
const int NUM_TRIALS = 25;
const int NUM_PONGS = 1e5;
int START = 0;
int FINISH = 0;
......
......@@ -11,8 +11,8 @@
#include "RankControl.h"
#include "Logging.h"
#include "Timing.h"
static bool shouldSleepRank;
static bool shouldCorruptData;
......@@ -23,7 +23,10 @@ void registerSignalHandler() {
}
void pauseThisRankSignalHandler( int signum ) {
shouldSleepRank = true;
Timing::sleepRankRaised();
const double sleepLength = 1.0 * 1e6;
logDebug( "Signal received: sleep for 1s");
usleep(sleepLength);
}
void corruptThisRankSignalHandler( int signum ) {
......@@ -31,14 +34,6 @@ void corruptThisRankSignalHandler( int signum ) {
shouldCorruptData = true;
}
bool getShouldSleepRank() {
return shouldSleepRank;
}
void setShouldSleepRank(bool toggle) {
shouldSleepRank = toggle;
}
bool getShouldCorruptData() {
return shouldCorruptData;
}
......
......@@ -24,8 +24,4 @@ bool getShouldCorruptData();
void setShouldCorruptData(bool toggle);
bool getShouldSleepRank();
void setShouldSleepRank(bool toggle);
#endif
\ No newline at end of file
......@@ -26,27 +26,30 @@ struct Timer {
// PMPI_Wtime at the end of this ranks execution
double endTime;
// Mark when an application sleeps
std::vector<double> sleepPoints;
// TODO change to heartbeat terminology
// Times for each heartbeat (per replica)
std::map< int, std::vector<double> > syncPoints;
std::map< int, std::vector<double> > heartbeatTimes;
// Store the MPI_Requests for each heartbeat (per replica) for calling MPI_Test
std::map< int, std::vector<MPI_Request> > syncRequests;
std::map< int, std::vector<MPI_Request> > heartbeatTimeRequests;
// Hash for each heartbeat buffer (per replica)
std::map<int, std::vector<std::size_t> > hashes;
std::map<int, std::vector<std::size_t> > heartbeatHashes;
// Store the MPI_Requests for each heartbeat (per replica) for calling MPI_Test
std::map<int, std::vector<MPI_Request> > hashRequests;
std::map<int, std::vector<MPI_Request> > heartbeatHashRequests;
} timer;
void Timing::initialiseTiming() {
synchroniseRanksInTeam();
timer.startTime = PMPI_Wtime();
for (int i=0; i < getNumberOfTeams(); i++) {
timer.syncPoints.insert(std::make_pair(i,std::vector<double>()));
timer.syncRequests.insert(std::make_pair(i,std::vector<MPI_Request>()));
timer.heartbeatTimes.insert(std::make_pair(i,std::vector<double>()));
timer.heartbeatTimeRequests.insert(std::make_pair(i,std::vector<MPI_Request>()));
timer.hashes.insert(std::make_pair(i,std::vector<std::size_t>()));
timer.hashRequests.insert(std::make_pair(i,std::vector<MPI_Request>()));
timer.heartbeatHashes.insert(std::make_pair(i,std::vector<std::size_t>()));
timer.heartbeatHashRequests.insert(std::make_pair(i,std::vector<MPI_Request>()));
}
}
......@@ -56,13 +59,7 @@ void Timing::finaliseTiming() {
}
void Timing::markTimeline() {
if (getShouldSleepRank()) {
const double sleepLength = 1.0 * 1e6;
logDebug( "Signal received: sleep for 1s");
usleep(sleepLength);
setShouldSleepRank(false);
}
timer.syncPoints.at(getTeam()).push_back(PMPI_Wtime());
timer.heartbeatTimes.at(getTeam()).push_back(PMPI_Wtime());
compareProgressWithReplicas();
}
......@@ -76,22 +73,22 @@ void Timing::compareProgressWithReplicas() {
if (r != getTeam()) {
// Send out this replica's times
MPI_Request request;
PMPI_Isend(&timer.syncPoints.at(getTeam()).back(), 1, MPI_DOUBLE,
PMPI_Isend(&timer.heartbeatTimes.at(getTeam()).back(), 1, MPI_DOUBLE,
mapTeamToWorldRank(getTeamRank(), r), getTeam(),
getLibComm(), &request);
MPI_Request_free(&request);
// Receive times from other replicas
timer.syncPoints.at(r).push_back(0.0);
timer.syncRequests.at(r).push_back(MPI_Request());
PMPI_Irecv(&timer.syncPoints.at(r).back(), 1, MPI_DOUBLE,
mapTeamToWorldRank(getTeamRank(), r), r, getLibComm(), &timer.syncRequests.at(r).back());
timer.heartbeatTimes.at(r).push_back(0.0);
timer.heartbeatTimeRequests.at(r).push_back(MPI_Request());
PMPI_Irecv(&timer.heartbeatTimes.at(r).back(), 1, MPI_DOUBLE,
mapTeamToWorldRank(getTeamRank(), r), r, getLibComm(), &timer.heartbeatTimeRequests.at(r).back());
// Test for completion of Irecv's
int numPending = 0;
for (int i=0; i < timer.syncRequests.at(r).size(); i++) {
for (int i=0; i < timer.heartbeatTimeRequests.at(r).size(); i++) {
int flag = 0;
PMPI_Test(&timer.syncRequests.at(r).at(i), &flag, MPI_STATUS_IGNORE);
PMPI_Test(&timer.heartbeatTimeRequests.at(r).at(i), &flag, MPI_STATUS_IGNORE);
numPending += 1 - flag;
}
}
......@@ -111,28 +108,28 @@ void Timing::compareBufferWithReplicas(const void *sendbuf, int sendcount, MPI_D
std::string bits((const char*)sendbuf, sendcount*typeSize);
std::hash<std::string> hash_fn;
std::size_t hash = hash_fn(bits);
timer.hashes.at(getTeam()).push_back((std::size_t)hash);
timer.heartbeatHashes.at(getTeam()).push_back((std::size_t)hash);
for (int r=0; r < getNumberOfTeams(); r++) {
if (r != getTeam()) {
// Send out this replica's times
MPI_Request request;
PMPI_Isend(&timer.hashes.at(getTeam()).back(), 1, TMPI_SIZE_T,
PMPI_Isend(&timer.heartbeatHashes.at(getTeam()).back(), 1, TMPI_SIZE_T,
mapTeamToWorldRank(getTeamRank(), r), getTeam(),
getLibComm(), &request);
MPI_Request_free(&request);
// Receive times from other replicas
timer.hashes.at(r).push_back(0);
timer.hashRequests.at(r).push_back(MPI_Request());
PMPI_Irecv(&timer.hashes.at(r).back(), 1, TMPI_SIZE_T,
mapTeamToWorldRank(getTeamRank(), r), r, getLibComm(), &timer.hashRequests.at(r).back());
timer.heartbeatHashes.at(r).push_back(0);
timer.heartbeatHashRequests.at(r).push_back(MPI_Request());
PMPI_Irecv(&timer.heartbeatHashes.at(r).back(), 1, TMPI_SIZE_T,
mapTeamToWorldRank(getTeamRank(), r), r, getLibComm(), &timer.heartbeatHashRequests.at(r).back());
// Test for completion of Irecv's
int numPending = 0;
for (int i=0; i < timer.hashRequests.at(r).size(); i++) {
for (int i=0; i < timer.heartbeatHashRequests.at(r).size(); i++) {
int flag = 0;
PMPI_Test(&timer.hashRequests.at(r).at(i), &flag, MPI_STATUS_IGNORE);
PMPI_Test(&timer.heartbeatHashRequests.at(r).at(i), &flag, MPI_STATUS_IGNORE);
numPending += 1 - flag;
}
std::cout << "Num pending: " << numPending << "\n";
......@@ -140,6 +137,11 @@ void Timing::compareBufferWithReplicas(const void *sendbuf, int sendcount, MPI_D
}
}
void Timing::sleepRankRaised() {
timer.sleepPoints.push_back(PMPI_Wtime());
}
void Timing::outputTiming() {
std::cout.flush();
PMPI_Barrier(MPI_COMM_WORLD);
......@@ -191,12 +193,16 @@ void Timing::outputTiming() {
f << "endTime" << sep << timer.endTime - timer.startTime << "\n";
f << "syncPoints";
for (const double& t : timer.syncPoints.at(getTeam())) {
f << "heartbeatTimes";
for (const double& t : timer.heartbeatTimes.at(getTeam())) {
f << sep << t - timer.startTime;
}
f << "\n";
f << "heartbeatTimes";
for (const double& t : timer.sleepPoints) {
f << sep << t - timer.startTime;
}
f.close();
}
......
......@@ -26,6 +26,8 @@ void compareProgressWithReplicas();
// Also compare a hash of a heartbeat buffer
void compareBufferWithReplicas(const void *sendbuf, int sendcount, MPI_Datatype sendtype);
void sleepRankRaised();
void outputTiming();
}
......
......@@ -37,11 +37,11 @@ while true; do
fi
if [ $1 = "increasing" ]; then
sleep $(python -c "print(max(2,25/$iteration))")
sleep $(python -c "print(max(2,20.0/$iteration))")
fi
if [ $1 = "random" ]; then
sleep `python3 -c "from random import uniform; print(uniform(2,20))"`
sleep `python3 -c "from random import uniform; print(uniform(2,10))"`
fi
((iteration++))
done
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment