Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions ompi/mca/coll/acoll/coll_acoll.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ END_C_DECLS
#define MCA_COLL_ACOLL_ROOT_CHANGE_THRESH 10
#define MCA_COLL_ACOLL_SPLIT_FACTOR_LIST_LEN 6
#define MCA_COLL_ACOLL_SPLIT_FACTOR_LIST {2, 4, 8, 16, 32, 64}
#define MCA_COLL_ACOLL_PROGRESS_COUNT 10000

/* Hybrid backoff spin-wait thresholds for intra-node synchronization */
#define MCA_COLL_ACOLL_SPIN_FAST_PATH_ITERS 200 /* Pure spinning iterations */
#define MCA_COLL_ACOLL_SPIN_MEDIUM_PATH_ITERS 300 /* Moderate progress iterations */
#define MCA_COLL_ACOLL_SPIN_MEDIUM_PATH_FREQ 20 /* Progress call frequency in medium path */
#define MCA_COLL_ACOLL_SPIN_SLOW_PATH_MAX_FREQ 3 /* Max progress call frequency in slow path */

typedef enum MCA_COLL_ACOLL_SG_SIZES {
MCA_COLL_ACOLL_SG_SIZE_1 = 8,
Expand Down Expand Up @@ -234,6 +241,8 @@ struct mca_coll_acoll_module_t {
int force_numa;
int use_dyn_rules;
int disable_shmbcast;
int disable_fallback;
int red_algo;
// Todo: Use substructure for every API related ones
int use_mnode;
int use_lin0;
Expand Down
128 changes: 113 additions & 15 deletions ompi/mca/coll/acoll/coll_acoll_allreduce.c
Original file line number Diff line number Diff line change
Expand Up @@ -100,19 +100,19 @@ static inline int mca_coll_acoll_reduce_smsc_h(const void *sbuf, void *rbuf, siz
err = comm->c_coll->coll_allgather(sbuf_vaddr, sizeof(void *), MPI_BYTE, data->allshm_sbuf,
sizeof(void *), MPI_BYTE, comm,
comm->c_coll->coll_allgather_module);
if (err != MPI_SUCCESS) {
if (MPI_SUCCESS != err) {
return err;
}

err = comm->c_coll->coll_allgather(rbuf_vaddr, sizeof(void *), MPI_BYTE, data->allshm_rbuf,
sizeof(void *), MPI_BYTE, comm,
comm->c_coll->coll_allgather_module);
if (err != MPI_SUCCESS) {
if (MPI_SUCCESS != err) {
return err;
}

err = register_mem_with_smsc(rank, size, total_dsize, data, comm);
if (err != MPI_SUCCESS) {
if (MPI_SUCCESS != err) {
return err;
}

Expand All @@ -121,7 +121,7 @@ static inline int mca_coll_acoll_reduce_smsc_h(const void *sbuf, void *rbuf, siz
size_t my_count_size = (l1_local_rank == (l1_gp_size - 1)) ? chunk + count % l1_gp_size : chunk;

if (rank == l1_gp[0]) {
if (sbuf != MPI_IN_PLACE)
if (MPI_IN_PLACE != sbuf)
memcpy(tmp_rbuf, sbuf, my_count_size * dsize);

for (int i = 1; i < l1_gp_size; i++) {
Expand All @@ -144,7 +144,7 @@ static inline int mca_coll_acoll_reduce_smsc_h(const void *sbuf, void *rbuf, siz
}
}
err = ompi_coll_base_barrier_intra_tree(comm, module);
if (err != MPI_SUCCESS) {
if (MPI_SUCCESS != err) {
return err;
}

Expand Down Expand Up @@ -231,26 +231,26 @@ static inline int mca_coll_acoll_allreduce_smsc_f(const void *sbuf, void *rbuf,
err = comm->c_coll->coll_allgather(sbuf_vaddr, sizeof(void *), MPI_BYTE, data->allshm_sbuf,
sizeof(void *), MPI_BYTE, comm,
comm->c_coll->coll_allgather_module);
if (err != MPI_SUCCESS) {
if (MPI_SUCCESS != err) {
return err;
}
err = comm->c_coll->coll_allgather(rbuf_vaddr, sizeof(void *), MPI_BYTE, data->allshm_rbuf,
sizeof(void *), MPI_BYTE, comm,
comm->c_coll->coll_allgather_module);

if (err != MPI_SUCCESS) {
if (MPI_SUCCESS != err) {
return err;
}

err = register_mem_with_smsc(rank, size, total_dsize, data, comm);
if (err != MPI_SUCCESS) {
if (MPI_SUCCESS != err) {
return err;
}

size_t chunk = count / size;
size_t my_count_size = (rank == (size - 1)) ? (count / size) + count % size : count / size;
if (0 == rank) {
if (sbuf != MPI_IN_PLACE)
if (MPI_IN_PLACE != sbuf)
memcpy(tmp_rbuf, sbuf, my_count_size * dsize);
} else {
ompi_3buff_op_reduce(op, (char *) data->smsc_saddr[0] + chunk * rank * dsize,
Expand All @@ -259,7 +259,7 @@ static inline int mca_coll_acoll_allreduce_smsc_f(const void *sbuf, void *rbuf,
}

err = ompi_coll_base_barrier_intra_tree(comm, module);
if (err != MPI_SUCCESS) {
if (MPI_SUCCESS != err) {
return err;
}

Expand All @@ -271,7 +271,7 @@ static inline int mca_coll_acoll_allreduce_smsc_f(const void *sbuf, void *rbuf,
(char *) tmp_rbuf + chunk * rank * dsize, my_count_size, dtype);
}
err = ompi_coll_base_barrier_intra_tree(comm, module);
if (err != MPI_SUCCESS) {
if (MPI_SUCCESS != err) {
return err;
}

Expand Down Expand Up @@ -509,6 +509,101 @@ int mca_coll_acoll_allreduce_intra(const void *sbuf, void *rbuf, size_t count,

alg = coll_allreduce_decision_fixed(size, total_dsize);

/* Try with socket/node based split */
if (num_nodes > 1) {
if (total_dsize > 16384) {
return ompi_coll_base_allreduce_intra_redscat_allgather(sbuf, rbuf, count, dtype, op,
comm, module);
}
int use_socket = acoll_module->use_socket != -1 ? acoll_module->use_socket : 0;
coll_acoll_subcomms_t *soc_subc = NULL;
ompi_communicator_t *soc_comm = use_socket ? subc->socket_comm : subc->local_comm;
ompi_communicator_t *ldr_comm = use_socket ? subc->socket_ldr_comm : subc->leader_comm;
int ldr_root = use_socket ? subc->socket_ldr_root : subc->outer_grp_root;
int soc_root = use_socket ? subc->local_root[MCA_COLL_ACOLL_LYR_SOCKET] : subc->local_root[MCA_COLL_ACOLL_LYR_NODE];

/* Validate communicator hierarchy before proceeding */
if (NULL == soc_comm || NULL == ldr_comm) {
return ompi_coll_base_allreduce_intra_redscat_allgather(sbuf, rbuf, count, dtype, op,
comm, module);
}

err = check_and_create_subc(soc_comm, acoll_module, &soc_subc);
if (NULL != soc_subc) {
if (!soc_subc->initialized || (soc_root != soc_subc->prev_init_root)) {
err = mca_coll_acoll_comm_split_init(soc_comm, acoll_module, soc_subc, soc_root);
if (MPI_SUCCESS != err)
return err;
}
char *inplacebuf_free = NULL, *inplacebuf = NULL;
void *tmp_rbuf = rbuf;
void *tmp_sbuf = (void *)sbuf;
/* Socket/Node level reduce */
if (ompi_comm_size(soc_comm) > 1) {
ptrdiff_t span, gap = 0;
span = opal_datatype_span(&dtype->super, count, &gap);
if (ompi_comm_rank(soc_comm) == soc_root) {
inplacebuf_free = (char*) malloc(span);
if (NULL == inplacebuf_free) { err = -1; return err; }
inplacebuf = inplacebuf_free - gap;
tmp_rbuf = (void *)inplacebuf;
tmp_sbuf = tmp_rbuf;
}

if((total_dsize > 8192) &&
((0 != subc->smsc_use_sr_buf) || (subc->smsc_buf_size > 2 * total_dsize)) &&
(1 != subc->without_smsc) && is_opt) {
err = mca_coll_acoll_reduce_smsc_h(sbuf, tmp_rbuf, count, dtype, op,
soc_comm, module, soc_subc);
} else {
acoll_module->red_algo = total_dsize <= 8192 ? 0 : 1;
err = mca_coll_acoll_reduce_intra(sbuf, tmp_rbuf, count, dtype, op,
soc_root, soc_comm, module);
acoll_module->red_algo = -1;
}

if (MPI_SUCCESS != err) {
if (NULL != inplacebuf_free) {
free(inplacebuf_free);
}
return err;
}
}
/* Allreduce across socket/node leaders */
if (ompi_comm_size(ldr_comm) > 1 && -1 != ldr_root) {
if ((MPI_IN_PLACE == sbuf)) {
err = ompi_coll_base_allreduce_intra_recursivedoubling(MPI_IN_PLACE, rbuf, count, dtype, op,
ldr_comm, module);
} else {
err = ompi_coll_base_allreduce_intra_recursivedoubling(tmp_sbuf, rbuf, count, dtype, op,
ldr_comm, module);
}
if (MPI_SUCCESS != err) {
if (NULL != inplacebuf_free) {
free(inplacebuf_free);
}
return err;
}
}
if (ompi_comm_size(soc_comm) > 1) {
acoll_module->disable_fallback = 1;
err = mca_coll_acoll_bcast(rbuf, count, dtype, soc_root,
soc_comm, module);
acoll_module->disable_fallback = 0;
if (MPI_SUCCESS != err) {
if (NULL != inplacebuf_free) {
free(inplacebuf_free);
}
return err;
}
}
if (NULL != inplacebuf_free) {
free(inplacebuf_free);
}
return err;
}
}

if (1 == num_nodes) {
if (total_dsize < 32) {
return ompi_coll_base_allreduce_intra_recursivedoubling(sbuf, rbuf, count, dtype, op,
Expand All @@ -526,27 +621,30 @@ int mca_coll_acoll_allreduce_intra(const void *sbuf, void *rbuf, size_t count,
} else if (2 == alg) {
return ompi_coll_base_allreduce_intra_redscat_allgather(sbuf, rbuf, count, dtype,
op, comm, module);
} else { /*alg == 3 */
} else { /*3 == alg */
return ompi_coll_base_allreduce_intra_ring_segmented(sbuf, rbuf, count, dtype, op,
comm, module, 0);
}
} else if (total_dsize < 4194304) {
if (((subc->smsc_use_sr_buf != 0) || (subc->smsc_buf_size > 2 * total_dsize)) && (subc->without_smsc != 1) && is_opt) {
if (((0 != subc->smsc_use_sr_buf) || (subc->smsc_buf_size > 2 * total_dsize))
&& (1 != subc->without_smsc) && is_opt) {
return mca_coll_acoll_allreduce_smsc_f(sbuf, rbuf, count, dtype, op, comm, module, subc);
} else {
return ompi_coll_base_allreduce_intra_redscat_allgather(sbuf, rbuf, count, dtype,
op, comm, module);
}
} else if (total_dsize <= 16777216) {
if (((subc->smsc_use_sr_buf != 0) || (subc->smsc_buf_size > 2 * total_dsize)) && (subc->without_smsc != 1) && is_opt) {
if (((0 != subc->smsc_use_sr_buf) || (subc->smsc_buf_size > 2 * total_dsize))
&& (1 != subc->without_smsc) && is_opt) {
mca_coll_acoll_reduce_smsc_h(sbuf, rbuf, count, dtype, op, comm, module, subc);
return mca_coll_acoll_bcast(rbuf, count, dtype, 0, comm, module);
} else {
return ompi_coll_base_allreduce_intra_redscat_allgather(sbuf, rbuf, count, dtype,
op, comm, module);
}
} else {
if (((subc->smsc_use_sr_buf != 0) || (subc->smsc_buf_size > 2 * total_dsize)) && (subc->without_smsc != 1) && is_opt) {
if (((0 != subc->smsc_use_sr_buf) || (subc->smsc_buf_size > 2 * total_dsize))
&& (1 != subc->without_smsc) && is_opt) {
return mca_coll_acoll_allreduce_smsc_f(sbuf, rbuf, count, dtype, op, comm, module, subc);
} else {
return ompi_coll_base_allreduce_intra_redscat_allgather(sbuf, rbuf, count, dtype,
Expand Down
65 changes: 8 additions & 57 deletions ompi/mca/coll/acoll/coll_acoll_barrier.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,6 @@
#include "coll_acoll_utils.h"



#define PROGRESS_COUNT 10000

int mca_coll_acoll_barrier_shm_h(struct ompi_communicator_t *comm, mca_coll_base_module_t *module, coll_acoll_subcomms_t *subc);
int mca_coll_acoll_barrier_shm_f(struct ompi_communicator_t *comm, mca_coll_base_module_t *module, coll_acoll_subcomms_t *subc);

Expand Down Expand Up @@ -146,35 +143,22 @@ int mca_coll_acoll_barrier_shm_h(struct ompi_communicator_t *comm, mca_coll_base
my_leader_shm = (int *) ((char *) data->allshmmmap_sbuf[l1_gp[0]] + offset_barrier
+ CACHE_LINE_SIZE * l1_gp[0]);
int ready;
int count = 0;
if (rank == root) {
ready = *leader_shm;
for (int i = 0; i < l2_gp_size; i++) {
if (l2_gp[i] == root)
continue;
volatile int *val = (int *) ((char *) data->allshmmmap_sbuf[root] + offset_barrier
+ CACHE_LINE_SIZE * l2_gp[i]);
while (*val != ready + 1) {
count++;
if (count == PROGRESS_COUNT) {
count = 0;
opal_progress();
}
}
spin_wait_with_progress(val, ready + 1);
}
ready++;
for (int i = 0; i < l1_gp_size; i++) {
if (l1_gp[i] == root)
continue;
volatile int *val = (int *) ((char *) data->allshmmmap_sbuf[root] + offset_barrier
+ CACHE_LINE_SIZE * l1_gp[i]);
while (*val != ready) {
count++;
if (count == PROGRESS_COUNT) {
count = 0;
opal_progress();
}
}
spin_wait_with_progress(val, ready);
}
*leader_shm = ready;
} else if (rank == l1_gp[0]) {
Expand All @@ -183,38 +167,18 @@ int mca_coll_acoll_barrier_shm_h(struct ompi_communicator_t *comm, mca_coll_base
if (l1_gp[i] == l1_gp[0])
continue;
volatile int *vali = (int *) ((char *) data->allshmmmap_sbuf[l1_gp[0]] + offset_barrier
+ CACHE_LINE_SIZE
* l1_gp[i]); // do we need atomic_load here?
while (*vali != val + 1) {
count++;
if (PROGRESS_COUNT == count) {
count = 0;
opal_progress();
}
}
+ CACHE_LINE_SIZE * l1_gp[i]);
spin_wait_with_progress(vali, val + 1);
}
val++;
*root_rank_offset = val;
while (*leader_shm != val) {
count++;
if (PROGRESS_COUNT == count) {
count = 0;
opal_progress();
}
}
spin_wait_with_progress((volatile int *)leader_shm, val);
*l1_rank_offset = val;
} else {

int done = *l1_rank_offset;
done++;
*l1_rank_offset = done;
while (done != *my_leader_shm) {
count++;
if (10000 == count) {
count = 0;
opal_progress();
}
}
spin_wait_with_progress((volatile int *)my_leader_shm, done);
}
return err;
}
Expand Down Expand Up @@ -246,31 +210,18 @@ int mca_coll_acoll_barrier_shm_f(struct ompi_communicator_t *comm, mca_coll_base
+ CACHE_LINE_SIZE * root);

int ready = *leader_shm;
int count = 0;
if (rank == root) {
for (int i = 0; i < size; i++) {
if (i == root)
continue;
volatile int *val = (int *) ((char *) data->allshmmmap_sbuf[root] + offset_barrier
+ CACHE_LINE_SIZE * i);
while (*val != ready + 1) {
count++;
if (count == PROGRESS_COUNT) {
count = 0;
opal_progress();
}
}
spin_wait_with_progress(val, ready + 1);
}
(*leader_shm)++;
} else {
int val = ++(*root_rank_offset);
while (*leader_shm != val) {
count++;
if (PROGRESS_COUNT == count) {
count = 0;
opal_progress();
}
}
spin_wait_with_progress((volatile int *)leader_shm, val);
}
return err;
}
Expand Down
Loading
Loading