Commit c8d1ed4e authored by Ben Hazelwood's avatar Ben Hazelwood
Browse files

Add support for corrupting data and detecting this

parent 5bc59626
......@@ -10,8 +10,10 @@
#include <assert.h>
#include <cstdlib>
#define COMPARE_HASH
const int NUM_TRIALS = 100;
const int NUM_TRIALS = 1000;
const int NUM_PONGS = 1e4;
int START = 0;
......@@ -65,13 +67,15 @@ int main(int argc, char* argv[]) {
MPI_Status status;
int counter;
for (int t = 0; t < NUM_TRIALS; t++) {
#ifdef COMPARE_PROGRESS
MPI_Sendrecv(MPI_IN_PLACE, 0, MPI_BYTE, MPI_PROC_NULL, 0, MPI_IN_PLACE, 0, MPI_BYTE, MPI_PROC_NULL, 0, MPI_COMM_SELF, MPI_STATUS_IGNORE);
#endif
MPI_Barrier(MPI_COMM_WORLD);
counter = 0;
for (int n = START; n <= FINISH; n += INCR) {
if (rank == source) {
// Handshake
MPI_Recv(&m, 1, MPI_CHAR, dest, 0, MPI_COMM_WORLD, &status);
// MPI_Recv(&m, 1, MPI_CHAR, dest, 0, MPI_COMM_WORLD, &status);
t1 = MPI_Wtime();
for (int p = 0; p < NUM_PONGS; p++) {
......@@ -88,7 +92,7 @@ int main(int argc, char* argv[]) {
if (rank == dest) {
// Handshake
MPI_Send(&m, 1, MPI_CHAR, source, 0, MPI_COMM_WORLD);
// MPI_Send(&m, 1, MPI_CHAR, source, 0, MPI_COMM_WORLD);
for (int p = 0; p < NUM_PONGS; p++) {
MPI_Recv(&m, n, MPI_CHAR, source, 0, MPI_COMM_WORLD, &status);
MPI_Send(&m, n, MPI_CHAR, source, 1, MPI_COMM_WORLD);
......@@ -96,6 +100,11 @@ int main(int argc, char* argv[]) {
}
counter++;
}
#ifdef COMPARE_HASH
// if (rank == source) {
MPI_Sendrecv(&m, 1, MPI_CHAR, MPI_PROC_NULL, 0, MPI_IN_PLACE, 0, MPI_BYTE, MPI_PROC_NULL, 0, MPI_COMM_SELF, MPI_STATUS_IGNORE);
// }
#endif
}
......
......@@ -26,14 +26,15 @@ static int numTeams;
static MPI_Comm TMPI_COMM_TEAM;
static MPI_Comm TMPI_COMM_DUP;
static bool shouldCorruptData;
int initialiseTMPI() {
/**
* The application should have no knowledge of the world_size or world_rank
*/
signal(SIGUSR1, pauseThisRankSignalHandler);
signal(SIGUSR2, corruptThisRankSignalHandler);
shouldCorruptData = false;
setEnvironment();
PMPI_Comm_size(MPI_COMM_WORLD, &worldSize);
......@@ -57,7 +58,7 @@ int initialiseTMPI() {
#ifndef REPLICAS_OUTPUT
// Disable output for all but master replica (0)
if (getTeam() > 0) {
disableLogging();
// disableLogging();
}
#endif
......@@ -160,6 +161,19 @@ void pauseThisRankSignalHandler( int signum ) {
sleepLength *= multiplier;
}
void corruptThisRankSignalHandler( int signum ) {
logDebug("Signal received: corrupt this rank");
shouldCorruptData = true;
}
bool getShouldCorruptData() {
return shouldCorruptData;
}
void setShouldCorruptData(bool toggle) {
shouldCorruptData = toggle;
}
int mapRankToTeamNumber(int rank) {
return rank / getTeamSize();
}
......
......@@ -10,9 +10,26 @@
#include <mpi.h>
#include <string>
#include <stdint.h>
#include <limits.h>
#define MASTER 0
// From https://stackoverflow.com/questions/40807833/sending-size-t-type-data-with-mpi
#if SIZE_MAX == UCHAR_MAX
#define TMPI_SIZE_T MPI_UNSIGNED_CHAR
#elif SIZE_MAX == USHRT_MAX
#define TMPI_SIZE_T MPI_UNSIGNED_SHORT
#elif SIZE_MAX == UINT_MAX
#define TMPI_SIZE_T MPI_UNSIGNED
#elif SIZE_MAX == ULONG_MAX
#define TMPI_SIZE_T MPI_UNSIGNED_LONG
#elif SIZE_MAX == ULLONG_MAX
#define TMPI_SIZE_T MPI_UNSIGNED_LONG_LONG
#else
#error "what is happening here?"
#endif
int initialiseTMPI();
int getWorldRank();
......@@ -44,6 +61,11 @@ void outputTiming();
void pauseThisRankSignalHandler(int signum);
void corruptThisRankSignalHandler(int signum);
bool getShouldCorruptData();
void setShouldCorruptData(bool toggle);
int mapRankToTeamNumber(int rank);
int mapWorldToTeamRank(int rank);
......
......@@ -14,6 +14,7 @@
#include <string>
#include <utility>
#include <stddef.h>
#include <bitset>
#include "Logging.h"
#include "Rank.h"
......@@ -23,7 +24,10 @@ struct Timer {
double endTime;
std::map< int, std::vector<double> > syncPoints;
std::map< int, std::vector<MPI_Request> > requests;
std::map< int, std::vector<MPI_Request> > syncRequests;
std::map<int, std::vector<std::size_t> > hashes;
std::map<int, std::vector<MPI_Request> > hashRequests;
} timer;
void Timing::initialiseTiming() {
......@@ -31,7 +35,10 @@ void Timing::initialiseTiming() {
timer.startTime = PMPI_Wtime();
for (int i=0; i < getNumberOfTeams(); i++) {
timer.syncPoints.insert(std::make_pair(i,std::vector<double>()));
timer.requests.insert(std::make_pair(i,std::vector<MPI_Request>()));
timer.syncRequests.insert(std::make_pair(i,std::vector<MPI_Request>()));
timer.hashes.insert(std::make_pair(i,std::vector<std::size_t>()));
timer.hashRequests.insert(std::make_pair(i,std::vector<MPI_Request>()));
}
}
......@@ -45,6 +52,11 @@ void Timing::markTimeline() {
compareProgressWithReplicas();
}
void Timing::markTimeline(const void *sendbuf, int sendcount, MPI_Datatype sendtype) {
markTimeline();
compareBufferWithReplicas(sendbuf, sendcount, sendtype);
}
void Timing::compareProgressWithReplicas() {
for (int r=0; r < getNumberOfTeams(); r++) {
if (r != getTeam()) {
......@@ -57,17 +69,58 @@ void Timing::compareProgressWithReplicas() {
// Receive times from other replicas
timer.syncPoints.at(r).push_back(0.0);
timer.requests.at(r).push_back(MPI_Request());
timer.syncRequests.at(r).push_back(MPI_Request());
PMPI_Irecv(&timer.syncPoints.at(r).back(), 1, MPI_DOUBLE,
mapTeamToWorldRank(getTeamRank(), r), r, getLibComm(), &timer.requests.at(r).back());
mapTeamToWorldRank(getTeamRank(), r), r, getLibComm(), &timer.syncRequests.at(r).back());
// Test for completion of Irecv's
int numPending = 0;
for (int i=0; i < timer.syncRequests.at(r).size(); i++) {
int flag = 0;
PMPI_Test(&timer.syncRequests.at(r).at(i), &flag, MPI_STATUS_IGNORE);
numPending += 1 - flag;
}
}
}
}
void Timing::compareBufferWithReplicas(const void *sendbuf, int sendcount, MPI_Datatype sendtype) {
if (getShouldCorruptData()) {
sendcount++;
setShouldCorruptData(false);
}
int typeSize;
MPI_Type_size(sendtype, &typeSize);
std::string bits((const char*)sendbuf, sendcount*typeSize);
std::hash<std::string> hash_fn;
std::size_t hash = hash_fn(bits);
timer.hashes.at(getTeam()).push_back((std::size_t)hash);
for (int r=0; r < getNumberOfTeams(); r++) {
if (r != getTeam()) {
// Send out this replica's times
MPI_Request request;
PMPI_Isend(&timer.hashes.at(getTeam()).back(), 1, TMPI_SIZE_T,
mapTeamToWorldRank(getTeamRank(), r), getTeam(),
getLibComm(), &request);
MPI_Request_free(&request);
// Receive times from other replicas
timer.hashes.at(r).push_back(0);
timer.hashRequests.at(r).push_back(MPI_Request());
PMPI_Irecv(&timer.hashes.at(r).back(), 1, TMPI_SIZE_T,
mapTeamToWorldRank(getTeamRank(), r), r, getLibComm(), &timer.hashRequests.at(r).back());
// Test for completion of Irecv's
int numPending = 0;
for (int i=0; i < timer.requests.at(r).size(); i++) {
for (int i=0; i < timer.hashRequests.at(r).size(); i++) {
int flag = 0;
PMPI_Test(&timer.requests.at(r).at(i), &flag, MPI_STATUS_IGNORE);
PMPI_Test(&timer.hashRequests.at(r).at(i), &flag, MPI_STATUS_IGNORE);
numPending += 1 - flag;
}
std::cout << "Num pending: " << numPending << "\n";
}
}
}
......
......@@ -14,11 +14,13 @@
namespace Timing {
void markTimeline();
void markTimeline(const void *sendbuf, int sendcount, MPI_Datatype sendtype);
void initialiseTiming();
void finaliseTiming();
void compareProgressWithReplicas();
void compareBufferWithReplicas(const void *sendbuf, int sendcount, MPI_Datatype sendtype);
const std::vector<double>& getSyncPoints();
......
......@@ -279,7 +279,7 @@ double MPI_Wtime() {
const double t = PMPI_Wtime();
// This was a bad idea
// Apparently Wtime is called maybe even before MPI_Init internally!
// Timing::markTimeline(Timing::markType::Generic);
// Timing::markTimeline();
return t;
}
......@@ -289,7 +289,11 @@ int MPI_Sendrecv(const void *sendbuf, int sendcount, MPI_Datatype sendtype,
int source, int recvtag,
MPI_Comm comm, MPI_Status *status) {
if (comm == MPI_COMM_SELF) {
Timing::markTimeline();
if (sendcount == 0) {
Timing::markTimeline();
} else {
Timing::markTimeline(sendbuf, sendcount, sendtype);
}
} else {
assert(comm == MPI_COMM_WORLD);
//TODO remap status?
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment