Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 12 additions & 25 deletions bsp/src/main/java/edu/ucsb/cs/bsp/MPIFunctionCallHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,35 +35,22 @@ public void run() {
OutputStream out = socket.getOutputStream();
byte[] data = new byte[8096];
int length;
while (true) {
boolean dirty = false;
MPIFunctionCall function = new MPIFunctionCall();
while((length = in.read(data)) != -1) {
dirty = true;
function.consume(data, 0, length);
if (function.isComplete()) {
dirty = false;
if (!function.execute(peer, out)) {
closeSilently();
return;
}
break;
}
}

if (!function.isComplete()) {
if (dirty) {
// We were cut off while in the middle of reading
// something. Not good.
throw new IOException("Socket closed prematurely: " +
function.toString());
} else {
// Remote process exited gracefully - Not bad
socket.close();
MPIFunctionCall function = new MPIFunctionCall();
while((length = in.read(data)) != -1) {
function.consume(data, 0, length);
if (function.isComplete()) {
if (!function.execute(peer, out)) {
closeSilently();
return;
}
break;
}
}

if (!function.isComplete()) {
throw new IOException("Socket closed prematurely: " + function.toString());
}
socket.close();
} catch (Exception e) {
closeSilently();
}
Expand Down
118 changes: 111 additions & 7 deletions dist/mpirun
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,116 @@ BASEDIR=$(dirname $0)
HAMA_HOME=$BASEDIR/hama-0.6.0
HADOOP_HOME=$BASEDIR/hadoop-1.1.1

./start-hama.sh
MASTER_NODE=`head -n 1 $PBS_NODEFILE`
if [ "`hostname`" == "$MASTER_NODE.local" ]; then
$HAMA_HOME/bin/hama jar $BASEDIR/mpi2bsp.jar $2 $1
function print_usage {
echo "Usage: ./mpirun -np TASKS -machinefile MACHINEFILE -f [ -e ] EXECUTABLE"
echo " -np: Number of bsp-mpi tasks to run"
echo " -machinefile: list of hostnames to use for the cluster"
echo " the first node in the list is used as the namenode"
echo " default:\$PBS_NODEFILE"
echo " -f: run all commands even if this is not the master node"
echo " by default only the master runs the job so all nodes can "
echo " run the same script"
echo " -e: the mpi executable to run"
echo " if -e is not present, the first non-option argument is "
echo " used as the executable"
}

# initialize arguments
num_tasks=""
job_exec=""
machine_file="$PBS_NODEFILE"
force="false"

# parse arguments
while :
do
case "$1" in
-h | --help | -\?)
print_usage
exit 0
;;
-np | -n)
num_tasks=$2
shift 2
;;
-np=*)
num_tasks=${1#*=}
shift
;;
-m | -machinefile | --machinefile | -hostfile | --hostfile)
if [ ! -e "$2" ]; then
echo "-m must specify a machine file"
print_usage
exit 1
else
machine_file="$2"
fi
shift 2
;;
-e | -exec)
job_exec="$2"
shift 2
;;
-f)
force="true"
shift
;;
--)
shift
break
;;
-*)
echo "unknown option to mpirun: $1" >&2
print_usage
exit 1
;;
*) #we just parsed the last option so break
break
;;
esac
done

#do argument checking
if [ -z "$num_tasks" -o -z "$machine_file" ]; then
echo "missing required argument";
print_usage
exit 1
elif [ -z "$1" -a -z "job_exec" ]; then
echo "must specify an executable to run";
print_usage
exit 1;
fi

if [ -z "$job_exec" ]; then
job_exec="$1"
fi

if [ ! -x "$job_exec" ]; then
echo "program argument must be an executable file";
print_usage
exit 1
fi

mkdir -p ~/tmp/output/`hostname`
cp -r /tmp/output/* ~/tmp/output/`hostname`
./stop-hama.sh
if [ force == "true" ]; then
./start-hama.sh -f -m "$machine_file" > /dev/null
else
./start-hama.sh -m "$machine_file" > /dev/null
fi

MASTER_NODE=`head -n 1 $machine_file`
THIS_NODE=`hostname -s`

if [ "$THIS_NODE" == "$MASTER_NODE" -o "$force" == "true" ]; then
$HAMA_HOME/bin/hama jar $BASEDIR/mpi2bsp.jar $job_exec $num_tasks
#echo "output========";
cat /tmp/output/*
#echo "========output";
mkdir -p ~/tmp/output/$THIS_NODE
cp -r /tmp/output/* ~/tmp/output/$THIS_NODE
fi

if [ force == "true" ]; then
./stop-hama.sh -f -m "$machine_file" > /dev/null
else
./stop-hama.sh -m "$machine_file" > /dev/null
fi
14 changes: 8 additions & 6 deletions mpi/mpi.c
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ int MPI_Init(int *argc, char ***argv) {
}

int mpi2bsp(char* input, void* input_data, int in_length, void* output, int out_length, int binary) {
int sockfd = get_connection(POOL, "127.0.0.1", LOCAL_PORT);
int sockfd = open_connection("localhost", LOCAL_PORT);
if (sockfd < 0) {
printf("Failed to open a socket to the parent process\n");
exit(1);
Expand Down Expand Up @@ -111,7 +111,8 @@ int mpi2bsp(char* input, void* input_data, int in_length, void* output, int out_

}

release_connection(POOL, "127.0.0.1", LOCAL_PORT, sockfd);
close(sockfd);
//release_connection(POOL, "localhost", LOCAL_PORT, sockfd);
return bytes_read;
}

Expand Down Expand Up @@ -165,7 +166,7 @@ int MPI_Send(void* buffer, int count, MPI_Datatype type, int dest, int tag, MPI_
}

struct connection_info* info = CONNECTION_INFO + dest;
int sockfd = get_connection(POOL, info->host, info->port);
int sockfd = open_connection(info->host, info->port);
if (sockfd < 0) {
printf("Failed to open a socket to remote process\n");
exit(1);
Expand All @@ -190,7 +191,8 @@ int MPI_Send(void* buffer, int count, MPI_Datatype type, int dest, int tag, MPI_
}
}

release_connection(POOL, info->host, info->port, sockfd);
close(sockfd);
//release_connection(POOL, info->host, info->port, sockfd);
return 0;
}

Expand Down Expand Up @@ -251,7 +253,7 @@ int MPI_Reduce(void *sendbuf, void *recvbuf, int count, MPI_Datatype type, MPI_O
}

sprintf(input, "MPI_Reduce\nrcount=%d\nsource=%d\ncomm=%d\ntype=%d\n\n", size, root, comm, type);
int sockfd = get_connection(POOL, "127.0.0.1", LOCAL_PORT);
int sockfd = open_connection("localhost", LOCAL_PORT);
if (sockfd < 0) {
printf("Failed to open a socket to the parent process\n");
exit(1);
Expand All @@ -272,7 +274,7 @@ int MPI_Reduce(void *sendbuf, void *recvbuf, int count, MPI_Datatype type, MPI_O
}
mpi_reduce(recvbuf, temp, count, type, op);
}
release_connection(POOL, "127.0.0.1", LOCAL_PORT, sockfd);
close(sockfd);
free(temp);
}
return 0;
Expand Down