Skip to content

Commit

Permalink
NA UCX: fix handling of unexpected messages without pre-posted recv
Browse files Browse the repository at this point in the history
NA SM: add perf warning message for unexpected messages without recv posted

NA perf: fix NA latency test to ensure recvs are always pre-posted
  • Loading branch information
soumagne committed May 24, 2023
1 parent 76ada12 commit ba50337
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 51 deletions.
73 changes: 43 additions & 30 deletions Testing/perf/na/na_perf_server.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,27 @@
/* Local Type and Struct Definition */
/************************************/

typedef na_return_t (*na_perf_recv_op_t)(na_class_t *na_class,
na_context_t *context, na_cb_t callback, void *arg, void *buf,
size_t buf_size, void *plugin_data, na_op_id_t *op_id);

struct na_perf_recv_info {
struct na_perf_info *info;
na_perf_recv_op_t recv_op;
na_cb_t recv_op_cb;
na_return_t ret;
const char *recv_op_name;
bool post_new_recv;
bool done;
};

typedef na_return_t (*na_perf_recv_op_t)(na_class_t *na_class,
na_context_t *context, na_cb_t callback, void *arg, void *buf,
size_t buf_size, void *plugin_data, na_op_id_t *op_id);

/********************/
/* Local Prototypes */
/********************/

static na_return_t
na_perf_loop(
struct na_perf_info *info, na_perf_recv_op_t recv_op, na_cb_t recv_op_cb);
na_perf_loop(struct na_perf_info *info, na_perf_recv_op_t recv_op,
na_cb_t recv_op_cb, const char *recv_op_name);

static void
na_perf_recv_cb(const struct na_cb_info *na_cb_info);
Expand All @@ -50,35 +53,33 @@ na_perf_process_recv(struct na_perf_recv_info *recv_info, void *actual_buf,

/*---------------------------------------------------------------------------*/
static na_return_t
na_perf_loop(
struct na_perf_info *info, na_perf_recv_op_t recv_op, na_cb_t recv_op_cb)
na_perf_loop(struct na_perf_info *info, na_perf_recv_op_t recv_op,
na_cb_t recv_op_cb, const char *recv_op_name)
{
struct na_perf_recv_info recv_info;
na_return_t ret;

memset(&recv_info, 0, sizeof(recv_info));
recv_info.info = info;
recv_info.post_new_recv = true;

recv_info.recv_op = recv_op;
recv_info.recv_op_cb = recv_op_cb;
recv_info.recv_op_name = recv_op_name;

/* Post initial recv */
ret = recv_op(info->na_class, info->context, recv_op_cb, &recv_info,
info->msg_unexp_buf, info->msg_unexp_size_max, info->msg_unexp_data,
info->msg_unexp_op_id);
NA_TEST_CHECK_NA_ERROR(
error, ret, "%s() failed (%s)", recv_op_name, NA_Error_to_string(ret));

/* Progress loop */
do {
unsigned int actual_count = 0;

if (recv_info.post_new_recv) {
recv_info.post_new_recv = false;

/* Post recv */
ret = recv_op(info->na_class, info->context, recv_op_cb, &recv_info,
info->msg_unexp_buf, info->msg_unexp_size_max,
info->msg_unexp_data, info->msg_unexp_op_id);
NA_TEST_CHECK_NA_ERROR(error, ret,
"NA_Msg_recv_unexpected() failed (%s)",
NA_Error_to_string(ret));
}

do {
ret = NA_Trigger(info->context, 1, &actual_count);
NA_TEST_CHECK_ERROR(recv_info.ret != NA_SUCCESS, error, ret,
recv_info.ret, "NA_Msg_recv_unexpected() failed (%s)",
recv_info.ret, "%s() failed (%s)", recv_op_name,
NA_Error_to_string(recv_info.ret));
} while ((ret == NA_SUCCESS) && actual_count);
NA_TEST_CHECK_ERROR_NORET(ret != NA_SUCCESS, error,
Expand Down Expand Up @@ -107,10 +108,9 @@ na_perf_recv_cb(const struct na_cb_info *na_cb_info)
const struct na_cb_info_recv_unexpected *msg_info =
&na_cb_info->info.recv_unexpected;

recv_info->post_new_recv = true;
na_perf_process_recv(recv_info, NULL, msg_info->actual_buf_size,
msg_info->source, msg_info->tag);

recv_info->post_new_recv = true;
}

/*---------------------------------------------------------------------------*/
Expand All @@ -122,10 +122,9 @@ na_perf_multi_recv_cb(const struct na_cb_info *na_cb_info)
const struct na_cb_info_multi_recv_unexpected *msg_info =
&na_cb_info->info.multi_recv_unexpected;

recv_info->post_new_recv = msg_info->last;
na_perf_process_recv(recv_info, msg_info->actual_buf,
msg_info->actual_buf_size, msg_info->source, msg_info->tag);

recv_info->post_new_recv = msg_info->last;
}

/*---------------------------------------------------------------------------*/
Expand All @@ -138,6 +137,19 @@ na_perf_process_recv(struct na_perf_recv_info *recv_info,
na_return_t ret = NA_SUCCESS;
size_t i;

/* Repost recv in advance to prevent buffering of unexpected msg */
if (recv_info->post_new_recv && tag != NA_PERF_TAG_DONE) {
recv_info->post_new_recv = false;

/* Post recv */
ret = recv_info->recv_op(info->na_class, info->context,
recv_info->recv_op_cb, recv_info, info->msg_unexp_buf,
info->msg_unexp_size_max, info->msg_unexp_data,
info->msg_unexp_op_id);
NA_TEST_CHECK_NA_ERROR(done, ret, "%s() failed (%s)",
recv_info->recv_op_name, NA_Error_to_string(ret));
}

switch (tag) {
case NA_PERF_TAG_LAT_INIT:
/* init data separately to avoid a memcpy */
Expand Down Expand Up @@ -201,10 +213,11 @@ main(int argc, char *argv[])
/* Loop */
if (NA_Has_opt_feature(info.na_class, NA_OPT_MULTI_RECV) &&
!info.na_test_info.no_multi_recv)
na_ret = na_perf_loop(
&info, NA_Msg_multi_recv_unexpected, na_perf_multi_recv_cb);
na_ret = na_perf_loop(&info, NA_Msg_multi_recv_unexpected,
na_perf_multi_recv_cb, "NA_Msg_multi_recv_unexpected");
else
na_ret = na_perf_loop(&info, NA_Msg_recv_unexpected, na_perf_recv_cb);
na_ret = na_perf_loop(&info, NA_Msg_recv_unexpected, na_perf_recv_cb,
"NA_Msg_recv_unexpected");
NA_TEST_CHECK_NA_ERROR(error, na_ret, "na_perf_loop() failed (%s)",
NA_Error_to_string(na_ret));

Expand Down
3 changes: 3 additions & 0 deletions src/na/na_sm.c
Original file line number Diff line number Diff line change
Expand Up @@ -4008,6 +4008,9 @@ na_sm_process_unexpected(struct na_sm_op_queue *unexpected_op_queue,
/* Complete operation (no need to notify) */
na_sm_complete(na_sm_op_id, NA_SUCCESS);
} else {
NA_LOG_SUBSYS_WARNING(
perf, "No operation was preposted, data must be copied");

/* If no error and message arrived, keep a copy of the struct in
* the unexpected message queue (should rarely happen) */
na_sm_unexpected_info = (struct na_sm_unexpected_info *) malloc(
Expand Down
107 changes: 86 additions & 21 deletions src/na/na_ucx.c
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ struct na_ucx_unexpected_info {
void *data;
size_t length;
ucp_tag_t tag;
bool data_alloc;
};

/* Msg queue */
Expand Down Expand Up @@ -649,6 +650,19 @@ na_ucx_addr_ref_incr(struct na_ucx_addr *na_ucx_addr);
static NA_INLINE void
na_ucx_addr_ref_decr(struct na_ucx_addr *na_ucx_addr);

/**
* Allocate unexpected info.
*/
static struct na_ucx_unexpected_info *
na_ucx_unexpected_info_alloc(void *data, size_t data_alloc_size);

/**
* Free unexpected info.
*/
static void
na_ucx_unexpected_info_free(
struct na_ucx_unexpected_info *na_ucx_unexpected_info);

/**
* Post RMA operation.
*/
Expand Down Expand Up @@ -1829,7 +1843,18 @@ na_ucp_am_recv(
HG_QUEUE_POP_HEAD(&unexpected_msg_queue->queue, entry);
hg_thread_spin_unlock(&unexpected_msg_queue->lock);

if (unlikely(na_ucx_unexpected_info)) {
if (likely(na_ucx_unexpected_info == NULL)) {
struct na_ucx_op_queue *unexpected_op_queue =
&na_ucx_class->unexpected_op_queue;

/* Nothing has been received yet so add op_id to progress queue */
hg_thread_spin_lock(&unexpected_op_queue->lock);
HG_QUEUE_PUSH_TAIL(&unexpected_op_queue->queue, na_ucx_op_id, entry);
hg_atomic_or32(&na_ucx_op_id->status, NA_UCX_OP_QUEUED);
hg_thread_spin_unlock(&unexpected_op_queue->lock);
} else {
NA_LOG_SUBSYS_DEBUG(msg, "Unexpected data was already received");

/* Copy buffers */
memcpy(na_ucx_op_id->info.msg.buf.ptr, na_ucx_unexpected_info->data,
na_ucx_unexpected_info->length);
Expand All @@ -1841,19 +1866,15 @@ na_ucp_am_recv(
.actual_buf_size = (size_t) na_ucx_unexpected_info->length,
.source = (na_addr_t *) na_ucx_unexpected_info->na_ucx_addr};

ucp_am_data_release(
na_ucx_class->ucp_worker, na_ucx_unexpected_info->data);
free(na_ucx_unexpected_info);
na_ucx_complete(na_ucx_op_id, NA_SUCCESS);
} else {
struct na_ucx_op_queue *unexpected_op_queue =
&na_ucx_class->unexpected_op_queue;
/* Release AM buffer if returned UCS_INPROGRESS */
if (!na_ucx_unexpected_info->data_alloc &&
na_ucx_unexpected_info->length > 0) {
ucp_am_data_release(
na_ucx_class->ucp_worker, na_ucx_unexpected_info->data);
}
na_ucx_unexpected_info_free(na_ucx_unexpected_info);

/* Nothing has been received yet so add op_id to progress queue */
hg_thread_spin_lock(&unexpected_op_queue->lock);
HG_QUEUE_PUSH_TAIL(&unexpected_op_queue->queue, na_ucx_op_id, entry);
hg_atomic_or32(&na_ucx_op_id->status, NA_UCX_OP_QUEUED);
hg_thread_spin_unlock(&unexpected_op_queue->lock);
na_ucx_complete(na_ucx_op_id, NA_SUCCESS);
}
}

Expand Down Expand Up @@ -1917,18 +1938,22 @@ na_ucp_am_recv_cb(void *arg, const void *header, size_t header_length,
struct na_ucx_unexpected_msg_queue *unexpected_msg_queue =
&na_ucx_class->unexpected_msg_queue;
struct na_ucx_unexpected_info *na_ucx_unexpected_info = NULL;
bool data_alloc = !(param->recv_attr & UCP_AM_RECV_ATTR_FLAG_DATA);

NA_LOG_SUBSYS_WARNING(perf,
"No operation was preposted, data will persist (data_alloc=%d)",
(int) data_alloc);

/* If no error and message arrived, keep a copy of the struct in
* the unexpected message queue (should rarely happen) */
na_ucx_unexpected_info = (struct na_ucx_unexpected_info *) malloc(
sizeof(*na_ucx_unexpected_info));
na_ucx_unexpected_info =
na_ucx_unexpected_info_alloc(data, data_alloc ? length : 0);
NA_CHECK_SUBSYS_ERROR(msg, na_ucx_unexpected_info == NULL, error, ret,
UCS_ERR_NO_MEMORY, "Could not allocate unexpected info");

*na_ucx_unexpected_info = (struct na_ucx_unexpected_info){.data = data,
.length = length,
.tag = tag,
.na_ucx_addr = source_addr};
na_ucx_unexpected_info->length = length;
na_ucx_unexpected_info->tag = tag;
na_ucx_unexpected_info->na_ucx_addr = source_addr;
na_ucx_addr_ref_incr(source_addr);

/* Otherwise push the unexpected message into our unexpected queue so
Expand All @@ -1939,8 +1964,8 @@ na_ucp_am_recv_cb(void *arg, const void *header, size_t header_length,
hg_thread_spin_unlock(&unexpected_msg_queue->lock);

/* If data is going to be used outside this callback, UCS_INPROGRESS
* should be returned */
return UCS_INPROGRESS;
* should be returned, otherwise return UCS_OK as a copy was made */
return (data_alloc) ? UCS_OK : UCS_INPROGRESS;
}

error:
Expand Down Expand Up @@ -2749,6 +2774,46 @@ na_ucx_addr_ref_decr(struct na_ucx_addr *na_ucx_addr)
}
}

/*---------------------------------------------------------------------------*/
static struct na_ucx_unexpected_info *
na_ucx_unexpected_info_alloc(void *data, size_t data_alloc_size)
{
struct na_ucx_unexpected_info *na_ucx_unexpected_info;

na_ucx_unexpected_info = (struct na_ucx_unexpected_info *) calloc(
1, sizeof(*na_ucx_unexpected_info));
NA_CHECK_SUBSYS_ERROR_NORET(msg, na_ucx_unexpected_info == NULL, error,
"Could not allocate unexpected info");

if (data_alloc_size > 0) {
na_ucx_unexpected_info->data = malloc(data_alloc_size);
NA_CHECK_SUBSYS_ERROR_NORET(msg, na_ucx_unexpected_info->data == NULL,
error, "Could not allocate data of size %zu", data_alloc_size);
na_ucx_unexpected_info->data_alloc = true;
memcpy(na_ucx_unexpected_info->data, data, data_alloc_size);
} else {
na_ucx_unexpected_info->data = data;
na_ucx_unexpected_info->data_alloc = false;
}

return na_ucx_unexpected_info;

error:
free(na_ucx_unexpected_info);

return NULL;
}

/*---------------------------------------------------------------------------*/
static void
na_ucx_unexpected_info_free(
struct na_ucx_unexpected_info *na_ucx_unexpected_info)
{
if (na_ucx_unexpected_info->data_alloc)
free(na_ucx_unexpected_info->data);
free(na_ucx_unexpected_info);
}

/*---------------------------------------------------------------------------*/
static na_return_t
na_ucx_rma(struct na_ucx_class NA_UNUSED *na_ucx_class, na_context_t *context,
Expand Down

0 comments on commit ba50337

Please sign in to comment.