Skip to content

Conversation

chencjcj
Copy link
Collaborator

@chencjcj chencjcj commented Oct 15, 2025

@gemini-code-assist
Copy link

Summary of Changes

Hello @chencjcj, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a comprehensive retransmission mechanism to the IBUC (InfiniBand Unified Communication) adaptor. The primary goal is to enhance the reliability of data transfers by detecting and retransmitting lost packets. This is achieved through the implementation of sequence numbering, an acknowledgment (ACK) system with Selective Acknowledgments (SACK), and dynamic Retransmission Timeout (RTO) calculation. The changes involve significant additions to the core InfiniBand common header, new dedicated retransmission header and implementation files, and modifications to the main adaptor logic to integrate retransmission state management, control Queue Pairs (QPs) for ACKs, and Shared Receive Queues (SRQs) for efficient receive buffer handling.

Highlights

  • Retransmission Mechanism: A comprehensive retransmission protocol has been implemented for IBUC, ensuring reliable data delivery by detecting and retransmitting lost packets.
  • Sequence Numbering and ACKs: The system now uses sequence numbers for packets and includes a dedicated acknowledgment (ACK) mechanism with Selective Acknowledgments (SACK) to confirm received data and handle out-of-order delivery.
  • Dynamic RTO Calculation: The Retransmission Timeout (RTO) is dynamically adjusted based on Round-Trip Time (RTT) and RTT variance, similar to established network protocols like TCP, to optimize retransmission timing.
  • Control Queue Pairs (UD QPs): Unreliable Datagram (UD) Queue Pairs are utilized for efficient out-of-band transmission and reception of ACK messages, separating control traffic from data traffic.
  • Shared Receive Queues (SRQs): Shared Receive Queues are integrated to manage receive buffers for retransmitted data, improving resource utilization and simplifying receive-side buffer management across multiple QPs.
  • Dynamic Symbol Loading for SRQ: SRQ-related functions are dynamically loaded from libibverbs.so, allowing the system to gracefully handle environments where SRQ might not be fully supported or available.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a significant new feature: a retransmission mechanism for the IBUC (InfiniBand Unreliable Connected) transport, effectively adding a layer of reliability. The implementation is comprehensive, touching connection setup, data transfer, and teardown, and includes logic for acknowledgements, timeouts, and flow control. My review has identified a few issues, including a critical bug in sequence number handling that could break the retransmission logic, an issue with ineffective flow control, and some opportunities for improving code clarity and correctness in multi-communicator scenarios.

immData = reqs[0]->send.size;
if (comm->retrans.enabled) {
seq = comm->retrans.send_seq;
comm->retrans.send_seq = (comm->retrans.send_seq + 1) & 0xFF;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There is an inconsistency in the sequence number handling. The sequence number send_seq is wrapped at 8 bits (& 0xFF), but other parts of the retransmission logic, such as flagcxIbEncodeImmData and flagcxIbSeqLess, operate on a 16-bit sequence number space (& 0xFFFF). This mismatch will cause the retransmission mechanism to fail for sequence numbers greater than 255. The wrap-around logic should be consistently 16-bit.

      comm->retrans.send_seq = (comm->retrans.send_seq + 1) & 0xFFFF;

Comment on lines 1366 to 1369
if (comm->outstanding_sends >= comm->max_outstanding) {
TRACE(FLAGCX_NET, "Flow control: outstanding=%d >= max=%d, deferring send",
comm->outstanding_sends, comm->max_outstanding);
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This flow control check is currently ineffective. It only logs a trace message but allows the send operation to proceed. This can lead to an overflow of the retransmission buffer if acknowledgements are delayed, potentially causing data loss when the buffer wraps around. For effective flow control, the send operation should be deferred. This check should ideally be moved to the calling function, flagcxIbucIsend, before a request is matched, which would allow deferring the send without losing the request.

if (seq == state->recv_seq) {
state->recv_seq = (state->recv_seq + 1) & 0xFF;

state->buffer_tail = (state->buffer_tail + 1) % 16;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

This line appears to be a bug, likely from a copy-paste error. The buffer_tail field is part of the sender's retransmission buffer management within flagcxIbRetransState. This function, flagcxIbRetransRecvPacket, executes on the receiver side, which does not use this retransmission buffer. Modifying buffer_tail here is incorrect and should be removed.

Comment on lines 114 to 115
flagcxResult_t flagcxIbRetransResendViaSend(struct flagcxIbSendComm *comm,
struct flagcxIbRetransEntry *entry);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This appears to be a duplicate and unused declaration of flagcxIbRetransResendViaSend. The function signature is different from the one declared on line 98, and the implementation in ibuc_retrans.cc matches the declaration on line 98 (which takes a uint32_t seq). This unused declaration should be removed to improve code clarity and avoid confusion.

return flagcxSuccess;
}

static int retrans_qp_index = 0;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The use of a static int for retrans_qp_index makes it a global variable shared across all communicator instances. If multiple communicators are used concurrently, they will contend for this single index, leading to unpredictable QP selection for retransmissions rather than proper round-robin per communicator. This can impact performance and load balancing. This index should be part of the per-communicator state, for example, by adding it as a member to struct flagcxIbSendComm.

@chencjcj chencjcj force-pushed the main branch 2 times, most recently from 5c3dbec to 7660b43 Compare October 16, 2025 08:07
@@ -0,0 +1,117 @@
/*************************************************************************
* Copyright (c) 2024, FlagCX Inc.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copyright (c) 2023 BAAI. All rights reserved.

@MC952-arch
Copy link
Collaborator

@gemini-code-assist please review this pull request in details

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a retransmission mechanism for the IBUC transport to enhance reliability, which is a significant and valuable addition. The implementation is comprehensive, touching connection setup, data transfer, and teardown, and introduces new logic for handling ACKs via UD QPs and retransmissions using SRQ. My review has identified several issues, including a critical compilation error, a bug in buffer management that could lead to state corruption, and a potential for silent data loss. I have also provided suggestions to improve thread safety, robustness of the timeout mechanism, and configurability, along with pointing out some dead code that could be cleaned up.

Comment on lines 114 to 115
flagcxResult_t flagcxIbRetransResendViaSend(struct flagcxIbSendComm *comm,
struct flagcxIbRetransEntry *entry);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

This is a duplicate declaration of flagcxIbRetransResendViaSend with a different signature from the one on line 98. This will cause a compilation error. The function with this signature (taking struct flagcxIbRetransEntry *) is not implemented as a public function in ibuc_retrans.cc. This declaration should be removed.

Comment on lines 344 to 293
if (entry->retry_count >= state->max_retry) {
WARN("Packet exceeded max retries: seq=%u, retry=%d, max=%d. "
"Marking as delivered to continue.",
entry->seq, entry->retry_count, state->max_retry);
entry->valid = 0;
state->buffer_count--;
continue;
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

When a packet exceeds the maximum number of retries, it is marked as invalid and effectively dropped. The WARN message says "Marking as delivered to continue", but this is misleading. The packet is lost, and the corresponding send request will never complete, causing the application to hang. This is a form of silent data loss. The transport layer should propagate this failure to the upper layer so that the application can handle the error, for example by aborting the operation.

"Marking as delivered to continue.",
entry->seq, entry->retry_count, state->max_retry);
entry->valid = 0;
state->buffer_count--;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

Decrementing state->buffer_count here is incorrect and can lead to corruption of the retransmission buffer's state. The entry is marked as invalid, and it will be cleaned up later when buffer_head advances over it. Decrementing the count here without adjusting buffer_head or buffer_tail creates an inconsistency in the circular buffer's state, which can lead to unpredictable behavior, such as overwriting packets that are still in flight. The buffer_count should only be decremented when an element is removed from the head of the buffer.

          

Comment on lines 1832 to 1835
static int timeout_check_count = 0;
if ((timeout_check_count++ % 500) == 0) {
FLAGCXCHECK(flagcxIbRetransCheckTimeout(&sComm->retrans, sComm));
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The timeout check for retransmissions is triggered based on the number of calls to flagcxIbucTest, which is not robust. The actual time between checks will vary depending on the application's behavior and how frequently it polls. This can lead to delayed or inconsistent timeout detection. It would be more reliable to use a time-based check.

      static __thread uint64_t last_timeout_check_us = 0;
      uint64_t now_us = flagcxIbGetTimeUs();
      if (now_us - last_timeout_check_us > 1000) { // Check every 1ms, for example
        FLAGCXCHECK(flagcxIbRetransCheckTimeout(&sComm->retrans, sComm));
        last_timeout_check_us = now_us;
      }

Comment on lines 802 to 804
static int retrans_qp_index = 0;
int qpIndex = retrans_qp_index % comm->base.nqps;
retrans_qp_index++;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The use of a static int retrans_qp_index to round-robin QPs for retransmissions is not thread-safe. If multiple threads call this function (e.g., for different communicators), they will contend for this single static variable, leading to race conditions. This index should be part of the communicator's state, for example, in the flagcxIbRetransState struct, to ensure thread-safety. This would also require adding int retrans_qp_index; to the flagcxIbRetransState struct in flagcx/adaptor/include/ib_common.h.

  int qpIndex = comm->retrans.retrans_qp_index % comm->base.nqps;
  comm->retrans.retrans_qp_index++;


comm->outstanding_sends = 0;
comm->outstanding_retrans = 0;
comm->max_outstanding = 16;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The max_outstanding sends is hardcoded to 16. This value can have a significant impact on performance and might need tuning for different network environments. It would be better to make this configurable, for example, via an environment variable using FLAGCX_PARAM, which is a common pattern in this codebase.

  comm->max_outstanding = flagcxParamIbMaxOutstanding();

Comment on lines 266 to 968
flagcxIbRetransResendPacket(struct flagcxIbSendComm *comm,
struct flagcxIbRetransEntry *entry) {

if (!comm || !entry || !entry->valid)
return flagcxInternalError;

TRACE(FLAGCX_NET,
"Retransmitting packet (RDMA Write): seq=%u, size=%u, retry=%d",
entry->seq, entry->size, entry->retry_count);

int qpIndex = comm->base.qpIndex;
struct flagcxIbQp *qp = &comm->base.qps[qpIndex];
int devIndex = qp->devIndex;
struct flagcxIbSendCommDev *commDev = &comm->devs[devIndex];

// Poll CQ
for (int poll_iter = 0; poll_iter < 16; poll_iter++) {
struct ibv_wc wcs[64];
int n_cqe = 0;
flagcxWrapIbvPollCq(commDev->base.cq, 64, wcs, &n_cqe);
if (n_cqe == 0)
break;
}

// RDMA Write with IMM
struct ibv_sge sge;
sge.addr = (uint64_t)entry->data;
sge.length = entry->size;
sge.lkey = entry->lkeys[devIndex];

struct ibv_send_wr wr;
memset(&wr, 0, sizeof(wr));
wr.wr_id = 0xFFFFFFFFULL;
wr.next = NULL;
wr.sg_list = &sge;
wr.num_sge = 1;
wr.opcode = IBV_WR_RDMA_WRITE_WITH_IMM;
wr.send_flags = IBV_SEND_SIGNALED;
wr.imm_data = flagcxIbEncodeImmData(entry->seq, entry->size);
wr.wr.rdma.remote_addr = entry->remote_addr;
wr.wr.rdma.rkey = entry->rkeys[qp->remDevIdx];

struct ibv_send_wr *bad_wr;
flagcxResult_t result = flagcxWrapIbvPostSend(qp->qp, &wr, &bad_wr);

if (result != flagcxSuccess) {
entry->send_time_us = flagcxIbGetTimeUs();
return flagcxSuccess;
}

entry->send_time_us = flagcxIbGetTimeUs();
entry->retry_count++;
comm->base.qpIndex = (comm->base.qpIndex + 1) % comm->base.nqps;

return flagcxSuccess;
}

flagcxResult_t flagcxIbRetransCheckTimeout(struct flagcxIbRetransState *state,
struct flagcxIbSendComm *comm) {

if (!state || !state->enabled || !comm)
return flagcxSuccess;
if (state->buffer_count == 0)
return flagcxSuccess;

uint64_t now_us = flagcxIbGetTimeUs();
int retrans_count = 0;
const int MAX_RETRANS_PER_CALL = 1;

int idx = state->buffer_head;
for (int i = 0;
i < state->buffer_count && retrans_count < MAX_RETRANS_PER_CALL; i++) {
struct flagcxIbRetransEntry *entry = &state->buffer[idx];

if (entry->valid) {
uint64_t elapsed_us = now_us - entry->send_time_us;

if (elapsed_us >= state->rto_us) {
if (entry->retry_count >= state->max_retry) {
WARN("Packet exceeded max retries: seq=%u, retry=%d, max=%d. "
"Marking as delivered to continue.",
entry->seq, entry->retry_count, state->max_retry);
entry->valid = 0;
state->buffer_count--;
continue;
}

flagcxResult_t retrans_result =
flagcxIbRetransResendViaSend(comm, entry->seq);
if (retrans_result == flagcxSuccess) {
state->total_timeout++;
retrans_count++;

state->rto_us = (state->rto_us * 2 > state->max_rto_us)
? state->max_rto_us
: state->rto_us * 2;
} else {
TRACE(FLAGCX_NET,
"Retransmit failed, stopping this round to allow queue drain");
break;
}
}
}

idx = (idx + 1) % FLAGCX_IB_RETRANS_MAX_INFLIGHT;
}

if (retrans_count > 0) {
TRACE(FLAGCX_NET, "Retransmitted %d packets, RTO=%luus, pending=%d",
retrans_count, (unsigned long)state->rto_us, state->buffer_count);
}

return flagcxSuccess;
}

flagcxResult_t flagcxIbRetransRecvPacket(struct flagcxIbRetransState *state,
uint32_t seq,
struct flagcxIbAckMsg *ack_msg,
int *should_ack) {

if (!state || !state->enabled || !ack_msg || !should_ack) {
*should_ack = 0;
return flagcxSuccess;
}

*should_ack = 0;
uint64_t now_us = flagcxIbGetTimeUs();

TRACE(FLAGCX_NET, "Received packet: seq=%u, expect=%u", seq, state->recv_seq);

if (seq == state->recv_seq) {
state->recv_seq = (state->recv_seq + 1) & 0xFFFF;

// Send ACK periodically based on recv_seq to avoid excessive ACK traffic
if ((state->recv_seq % state->ack_interval) == 0) {
*should_ack = 1;
}
} else if (flagcxIbSeqLess(seq, state->recv_seq)) {
TRACE(FLAGCX_NET, "Received duplicate packet: seq=%u, expect=%u", seq,
state->recv_seq);
*should_ack = 1;
} else {
TRACE(FLAGCX_NET, "Received out-of-order packet: seq=%u, expect=%u", seq,
state->recv_seq);
*should_ack = 1;

int gap = seq - state->recv_seq;
if (gap > 0 && gap < 64) {
ack_msg->sack_bitmap |= (1ULL << (gap - 1));

ack_msg->sack_bitmap_count = 0;
uint64_t bitmap = ack_msg->sack_bitmap;
while (bitmap) {
ack_msg->sack_bitmap_count += (bitmap & 1);
bitmap >>= 1;
}

TRACE(FLAGCX_NET, "SACK: gap=%d, bitmap=0x%lx, count=%u", gap,
(unsigned long)ack_msg->sack_bitmap, ack_msg->sack_bitmap_count);
}
}

if (*should_ack) {
ack_msg->ack_seq = (state->recv_seq - 1) & 0xFFFF;
ack_msg->timestamp_us = now_us;
ack_msg->peer_id = 0;
ack_msg->flow_id = 0;
ack_msg->path = 0;

TRACE(FLAGCX_NET,
"Generated ACK: ack_seq=%u, sack_count=%u, sack_bitmap=0x%lx",
ack_msg->ack_seq, ack_msg->sack_bitmap_count,
(unsigned long)ack_msg->sack_bitmap);
}

return flagcxSuccess;
}

flagcxResult_t flagcxIbRetransPiggybackAck(struct flagcxIbSendFifo *fifo_elem,
struct flagcxIbAckMsg *ack_msg) {
return flagcxSuccess;
}

flagcxResult_t flagcxIbRetransExtractAck(struct flagcxIbSendFifo *fifo_elem,
struct flagcxIbAckMsg *ack_msg) {
return flagcxSuccess;
}

void flagcxIbRetransPrintStats(struct flagcxIbRetransState *state,
const char *prefix) {
if (!state || !state->enabled)
return;

INFO(
FLAGCX_NET,
"%s Retransmission Stats: Sent=%lu, Retrans=%lu, Acked=%lu, Timeout=%lu, "
"Pending=%d, RTO=%luus, SRTT=%luus",
prefix ? prefix : "", (unsigned long)state->total_sent,
(unsigned long)state->total_retrans, (unsigned long)state->total_acked,
(unsigned long)state->total_timeout, state->buffer_count,
(unsigned long)state->rto_us, (unsigned long)state->srtt_us);
}

flagcxResult_t flagcxIbCreateCtrlQp(struct ibv_context *context,
struct ibv_pd *pd, uint8_t port_num,
struct flagcxIbCtrlQp *ctrlQp) {

if (!context || !pd || !ctrlQp)
return flagcxInternalError;

memset(ctrlQp, 0, sizeof(struct flagcxIbCtrlQp));

FLAGCXCHECK(flagcxWrapIbvCreateCq(&ctrlQp->cq, context, 1024, NULL, NULL, 0));

struct ibv_qp_init_attr qp_init_attr;
memset(&qp_init_attr, 0, sizeof(qp_init_attr));
qp_init_attr.qp_type = IBV_QPT_UD;
qp_init_attr.send_cq = ctrlQp->cq;
qp_init_attr.recv_cq = ctrlQp->cq;
qp_init_attr.cap.max_send_wr = 512;
qp_init_attr.cap.max_recv_wr = 128;
qp_init_attr.cap.max_send_sge = 1;
qp_init_attr.cap.max_recv_sge = 1;
qp_init_attr.cap.max_inline_data = 64;

FLAGCXCHECK(flagcxWrapIbvCreateQp(&ctrlQp->qp, pd, &qp_init_attr));
if (!ctrlQp->qp) {
WARN("Failed to create control UD QP");
flagcxWrapIbvDestroyCq(ctrlQp->cq);
return flagcxInternalError;
}

struct ibv_qp_attr qp_attr;
memset(&qp_attr, 0, sizeof(qp_attr));
qp_attr.qp_state = IBV_QPS_INIT;
qp_attr.pkey_index = 0;
qp_attr.port_num = port_num;
qp_attr.qkey = 0x11111111;

FLAGCXCHECK(flagcxWrapIbvModifyQp(ctrlQp->qp, &qp_attr,
IBV_QP_STATE | IBV_QP_PKEY_INDEX |
IBV_QP_PORT | IBV_QP_QKEY));

TRACE(FLAGCX_NET, "Created control UD QP: qpn=%u", ctrlQp->qp->qp_num);

return flagcxSuccess;
}

flagcxResult_t flagcxIbDestroyCtrlQp(struct flagcxIbCtrlQp *ctrlQp) {
if (!ctrlQp)
return flagcxSuccess;

if (ctrlQp->ah) {
if (ctrlQp->qp && ctrlQp->qp->context) {
ctrlQp->qp->context->ops.destroy_ah(ctrlQp->ah);
}
ctrlQp->ah = NULL;
}

if (ctrlQp->qp) {
FLAGCXCHECK(flagcxWrapIbvDestroyQp(ctrlQp->qp));
ctrlQp->qp = NULL;
}

if (ctrlQp->cq) {
FLAGCXCHECK(flagcxWrapIbvDestroyCq(ctrlQp->cq));
ctrlQp->cq = NULL;
}

return flagcxSuccess;
}

flagcxResult_t
flagcxIbSetupCtrlQpConnection(struct ibv_context *context, struct ibv_pd *pd,
struct flagcxIbCtrlQp *ctrlQp,
uint32_t remote_qpn, union ibv_gid *remote_gid,
uint16_t remote_lid, uint8_t port_num,
uint8_t link_layer, uint8_t local_gid_index) {

if (!ctrlQp || !ctrlQp->qp)
return flagcxInternalError;

ctrlQp->remote_qpn = remote_qpn;
ctrlQp->remote_qkey = 0x11111111;

struct ibv_qp_attr qp_attr;
memset(&qp_attr, 0, sizeof(qp_attr));
qp_attr.qp_state = IBV_QPS_RTR;

FLAGCXCHECK(flagcxWrapIbvModifyQp(ctrlQp->qp, &qp_attr, IBV_QP_STATE));

memset(&qp_attr, 0, sizeof(qp_attr));
qp_attr.qp_state = IBV_QPS_RTS;
qp_attr.sq_psn = 0;

FLAGCXCHECK(flagcxWrapIbvModifyQp(ctrlQp->qp, &qp_attr,
IBV_QP_STATE | IBV_QP_SQ_PSN));

struct ibv_ah_attr ah_attr;
memset(&ah_attr, 0, sizeof(ah_attr));
ah_attr.port_num = port_num;

if (link_layer == IBV_LINK_LAYER_ETHERNET) {
if (!remote_gid) {
WARN("remote_gid is NULL for RoCE");
return flagcxInternalError;
}

TRACE(FLAGCX_NET,
"Creating AH for RoCE: remote_gid=%lx:%lx, local_gid_idx=%u, port=%u",
(unsigned long)remote_gid->global.subnet_prefix,
(unsigned long)remote_gid->global.interface_id, local_gid_index,
port_num);

ah_attr.is_global = 1;
ah_attr.grh.dgid = *remote_gid;
ah_attr.grh.sgid_index = local_gid_index;
ah_attr.grh.hop_limit = 255;
ah_attr.grh.traffic_class = 0;
ah_attr.grh.flow_label = 0;
} else {
TRACE(FLAGCX_NET, "Creating AH for IB: remote_lid=%u, port=%u", remote_lid,
port_num);

ah_attr.is_global = 0;
ah_attr.dlid = remote_lid;
}

ah_attr.sl = 0;
ah_attr.src_path_bits = 0;

ctrlQp->ah = context->ops.create_ah(pd, &ah_attr);
if (!ctrlQp->ah) {
WARN(" link_layer=%d (%s)", link_layer,
link_layer == IBV_LINK_LAYER_ETHERNET ? "RoCE" : "IB");
WARN(" remote_lid=%u", remote_lid);
if (link_layer == IBV_LINK_LAYER_ETHERNET && remote_gid) {
WARN(" remote_gid=%lx:%lx",
(unsigned long)remote_gid->global.subnet_prefix,
(unsigned long)remote_gid->global.interface_id);
WARN(" local_gid_index=%u", local_gid_index);
}
return flagcxSuccess;
}

INFO(FLAGCX_NET, "Control QP setup: local_qpn=%u, remote_qpn=%u",
ctrlQp->qp->qp_num, remote_qpn);

return flagcxSuccess;
}

flagcxResult_t flagcxIbRetransSendAckViaUd(struct flagcxIbRecvComm *comm,
struct flagcxIbAckMsg *ack_msg,
int devIndex) {

if (!comm || !ack_msg || devIndex >= comm->base.ndevs) {
WARN("Invalid parameters for sending ACK: comm=%p, ack_msg=%p, "
"devIndex=%d, ndevs=%d",
comm, ack_msg, devIndex, comm ? comm->base.ndevs : -1);
return flagcxInternalError;
}

struct flagcxIbRecvCommDev *commDev = &comm->devs[devIndex];
struct flagcxIbCtrlQp *ctrlQp = &commDev->ctrlQp;

if (!ctrlQp->qp || !ctrlQp->ah) {
WARN("Control QP not initialized: qp=%p, ah=%p", ctrlQp->qp, ctrlQp->ah);
return flagcxInternalError;
}

static int ack_send_count_debug = 0;
if ((ack_send_count_debug++ % 100) == 0) {
TRACE(FLAGCX_NET,
"Sending ACK #%d: ack_seq=%u, sack_bitmap=0x%lx, remote_qpn=%u",
ack_send_count_debug, ack_msg->ack_seq,
(unsigned long)ack_msg->sack_bitmap, ctrlQp->remote_qpn);
}

int total_polled = 0;
for (int poll_iter = 0; poll_iter < 8; poll_iter++) {
struct ibv_wc wcs[32];
int n_cqe = 0;
flagcxWrapIbvPollCq(ctrlQp->cq, 32, wcs, &n_cqe);
total_polled += n_cqe;
if (n_cqe == 0)
break;
}

if (total_polled > 0) {
TRACE(FLAGCX_NET, "Polled %d ACK completions before sending", total_polled);
}

struct flagcxIbAckMsg *ack_buf = (struct flagcxIbAckMsg *)commDev->ackBuffer;
if (!ack_buf)
return flagcxInternalError;

memcpy(ack_buf, ack_msg, sizeof(struct flagcxIbAckMsg));

struct ibv_sge sge;
sge.addr = (uint64_t)ack_buf;
sge.length = sizeof(struct flagcxIbAckMsg);
sge.lkey = commDev->ackMr->lkey;

struct ibv_send_wr wr;
memset(&wr, 0, sizeof(wr));
wr.wr_id = 0;
wr.next = NULL;
wr.sg_list = &sge;
wr.num_sge = 1;
wr.opcode = IBV_WR_SEND;

wr.send_flags = IBV_SEND_INLINE | IBV_SEND_SIGNALED;

wr.wr.ud.ah = ctrlQp->ah;
wr.wr.ud.remote_qpn = ctrlQp->remote_qpn;
wr.wr.ud.remote_qkey = ctrlQp->remote_qkey;

struct ibv_send_wr *bad_wr;
flagcxResult_t result = flagcxWrapIbvPostSend(ctrlQp->qp, &wr, &bad_wr);

if (result != flagcxSuccess) {
int total_polled = 0;
for (int i = 0; i < 4; i++) {
struct ibv_wc wcs[32];
int n = 0;
flagcxWrapIbvPollCq(ctrlQp->cq, 32, wcs, &n);
total_polled += n;
if (n == 0)
break;
}

TRACE(FLAGCX_NET, "ACK send failed, polled %d completions, retrying...",
total_polled);

result = flagcxWrapIbvPostSend(ctrlQp->qp, &wr, &bad_wr);
if (result != flagcxSuccess) {
WARN("Failed to send ACK after retry (polled %d completions)",
total_polled);
return result;
}
}

TRACE(FLAGCX_NET, "Sent ACK via UD: ack_seq=%u, sack_count=%u",
ack_msg->ack_seq, ack_msg->sack_bitmap_count);

return flagcxSuccess;
}

flagcxResult_t flagcxIbRetransRecvAckViaUd(struct flagcxIbSendComm *comm,
int devIndex) {

if (!comm || devIndex >= comm->base.ndevs) {
return flagcxInternalError;
}

struct flagcxIbSendCommDev *commDev = &comm->devs[devIndex];
struct flagcxIbCtrlQp *ctrlQp = &commDev->ctrlQp;

if (!ctrlQp->qp || !ctrlQp->cq)
return flagcxInternalError;

struct ibv_wc wc;
int n_cqe = 0;
flagcxResult_t result = flagcxWrapIbvPollCq(ctrlQp->cq, 1, &wc, &n_cqe);

if (result != flagcxSuccess || n_cqe == 0) {
return flagcxSuccess;
}

static int ack_recv_count_debug = 0;
if ((ack_recv_count_debug++ % 100) == 0) {
TRACE(FLAGCX_NET, "Polled ACK completion #%d: n_cqe=%d, status=%d",
ack_recv_count_debug, n_cqe, wc.status);
}

if (wc.status != IBV_WC_SUCCESS) {
WARN("ACK completion with error: status=%d", wc.status);
return flagcxRemoteError;
}

int buf_idx = (int)wc.wr_id;
size_t buf_entry_size = sizeof(struct flagcxIbAckMsg) + 40;
char *buf_base = (char *)commDev->ackBuffer + buf_idx * buf_entry_size;
struct flagcxIbAckMsg *ack_msg = (struct flagcxIbAckMsg *)(buf_base + 40);

TRACE(
FLAGCX_NET,
"Received ACK: ack_seq=%u, sack_count=%u, sack_bitmap=0x%lx, buf_idx=%d",
ack_msg->ack_seq, ack_msg->sack_bitmap_count,
(unsigned long)ack_msg->sack_bitmap, buf_idx);

FLAGCXCHECK(flagcxIbRetransProcessAck(&comm->retrans, ack_msg));

struct ibv_sge sge;
sge.addr = (uint64_t)buf_base;
sge.length = buf_entry_size;
sge.lkey = commDev->ackMr->lkey;

struct ibv_recv_wr recv_wr;
memset(&recv_wr, 0, sizeof(recv_wr));
recv_wr.wr_id = buf_idx;
recv_wr.next = NULL;
recv_wr.sg_list = &sge;
recv_wr.num_sge = 1;

struct ibv_recv_wr *bad_wr;
FLAGCXCHECK(flagcxWrapIbvPostRecv(ctrlQp->qp, &recv_wr, &bad_wr));

TRACE(FLAGCX_NET, "Re-posted recv WR for ACK buffer %d", buf_idx);

return flagcxSuccess;
}

flagcxResult_t flagcxIbRetransResendViaSend(struct flagcxIbSendComm *comm,
uint32_t seq) {

if (!comm || !comm->retrans.enabled)
return flagcxSuccess;

struct flagcxIbRetransEntry *entry = NULL;
int idx = comm->retrans.buffer_head;
for (int i = 0; i < comm->retrans.buffer_count; i++) {
if (comm->retrans.buffer[idx].valid &&
comm->retrans.buffer[idx].seq == seq) {
entry = &comm->retrans.buffer[idx];
break;
}
idx = (idx + 1) % FLAGCX_IB_RETRANS_MAX_INFLIGHT;
}

if (!entry) {
TRACE(FLAGCX_NET, "Retrans: seq=%u not found in buffer (already acked?)",
seq);
return flagcxSuccess;
}

static int retrans_qp_index = 0;
int qpIndex = retrans_qp_index % comm->base.nqps;
retrans_qp_index++;

struct flagcxIbQp *qp = &comm->base.qps[qpIndex];
int devIndex = qp->devIndex;

struct ibv_send_wr wr;
struct ibv_sge sge[2];
memset(&wr, 0, sizeof(wr));

struct flagcxIbRetransHdr *hdr = &comm->retrans_hdr_pool[seq % 32];
hdr->magic = 0xDEADBEEF;
hdr->seq = seq;
hdr->size = entry->size;
hdr->remote_addr = entry->remote_addr;

sge[0].addr = (uint64_t)hdr;
sge[0].length = sizeof(struct flagcxIbRetransHdr);
sge[0].lkey = comm->retrans_hdr_mr->lkey;

sge[1].addr = (uint64_t)entry->data;
sge[1].length = entry->size;
sge[1].lkey = entry->lkeys[devIndex];

wr.wr_id = 0xFFFFFFFEULL;
wr.sg_list = sge;
wr.num_sge = 2;
wr.opcode = IBV_WR_SEND;
wr.send_flags = IBV_SEND_SIGNALED;
wr.next = NULL;

struct ibv_send_wr *bad_wr;
flagcxResult_t result = flagcxWrapIbvPostSend(qp->qp, &wr, &bad_wr);

if (result != flagcxSuccess) {
TRACE(FLAGCX_NET, "Retrans SEND failed for seq=%u, qp=%d", seq, qpIndex);
return result;
}

entry->retry_count++;
entry->send_time_us = flagcxIbGetTimeUs();
comm->retrans.total_retrans++;
comm->outstanding_retrans++;

TRACE(FLAGCX_NET,
"Retransmitted via SEND: seq=%u, size=%u, qp=%d, devIndex=%d, "
"retry_count=%d",
seq, entry->size, qpIndex, devIndex, entry->retry_count);

return flagcxSuccess;
}

flagcxResult_t flagcxIbRetransInitRecvPool(struct flagcxIbRecvComm *comm) {

if (!comm || !comm->retrans.enabled)
return flagcxSuccess;

for (int dev = 0; dev < comm->base.ndevs; dev++) {
struct flagcxIbRecvCommDev *commDev = &comm->devs[dev];

size_t buf_size = sizeof(struct flagcxIbRetransHdr) + 65536;
size_t total_size = buf_size * 32;

void *buf_pool = malloc(total_size);
if (!buf_pool) {
WARN("Failed to allocate retrans recv buffer pool for dev %d", dev);
return flagcxInternalError;
}

FLAGCXCHECK(flagcxWrapIbvRegMr(&commDev->retransRecvMr, commDev->base.pd,
buf_pool, total_size,
IBV_ACCESS_LOCAL_WRITE));

for (int i = 0; i < 32; i++) {
commDev->retransRecvBufs[i] = (char *)buf_pool + i * buf_size;
}

for (int q = 0; q < comm->base.nqps; q++) {
if (comm->base.qps[q].devIndex != dev)
continue;

for (int i = 0; i < 32; i++) {
struct ibv_recv_wr recv_wr;
struct ibv_sge recv_sge;
memset(&recv_wr, 0, sizeof(recv_wr));

recv_sge.addr = (uint64_t)commDev->retransRecvBufs[i];
recv_sge.length = buf_size;
recv_sge.lkey = commDev->retransRecvMr->lkey;

recv_wr.wr_id =
0x8000000000000000ULL | (uint64_t)commDev->retransRecvBufs[i];
recv_wr.sg_list = &recv_sge;
recv_wr.num_sge = 1;
recv_wr.next = NULL;

struct ibv_recv_wr *bad_wr;
FLAGCXCHECK(
flagcxWrapIbvPostRecv(comm->base.qps[q].qp, &recv_wr, &bad_wr));
}

commDev->retransRecvBufCount = 32;

TRACE(FLAGCX_NET,
"Posted 32 recv WRs to UC QP %d (dev=%d) for retransmission", q,
dev);
}
}

return flagcxSuccess;
}

flagcxResult_t flagcxIbRetransHandleRecvSend(struct flagcxIbRecvComm *comm,
void *recv_buf, uint32_t byte_len,
int qpIndex) {

if (!comm || !recv_buf)
return flagcxInternalError;

struct flagcxIbRetransHdr *hdr = (struct flagcxIbRetransHdr *)recv_buf;

if (hdr->magic != 0xDEADBEEF) {
WARN("Received SEND with invalid magic: 0x%x", hdr->magic);
return flagcxInternalError;
}

uint32_t seq = hdr->seq;
uint32_t size = hdr->size;

TRACE(FLAGCX_NET, "Received SEND retransmission: seq=%u, size=%u, qp=%d", seq,
size, qpIndex);

struct flagcxIbAckMsg ack_msg = {0};
int should_ack = 0;
FLAGCXCHECK(
flagcxIbRetransRecvPacket(&comm->retrans, seq, &ack_msg, &should_ack));

if (should_ack) {
int devIndex = comm->base.qps[qpIndex].devIndex;
FLAGCXCHECK(flagcxIbRetransSendAckViaUd(comm, &ack_msg, devIndex));
}

struct ibv_recv_wr recv_wr;
struct ibv_sge recv_sge;
memset(&recv_wr, 0, sizeof(recv_wr));

size_t buf_size = sizeof(struct flagcxIbRetransHdr) + 65536;
recv_sge.addr = (uint64_t)recv_buf;
recv_sge.length = buf_size;

int devIndex = comm->base.qps[qpIndex].devIndex;
recv_sge.lkey = comm->devs[devIndex].retransRecvMr->lkey;

recv_wr.wr_id = 0x8000000000000000ULL | (uint64_t)recv_buf;
recv_wr.sg_list = &recv_sge;
recv_wr.num_sge = 1;
recv_wr.next = NULL;

struct ibv_recv_wr *bad_wr;
FLAGCXCHECK(
flagcxWrapIbvPostRecv(comm->base.qps[qpIndex].qp, &recv_wr, &bad_wr));

TRACE(FLAGCX_NET, "Re-posted recv WR for retrans buffer");

return flagcxSuccess;
}

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This file contains several functions that appear to be unused in the current implementation:

  • flagcxIbRetransResendPacket (defined at line 266)
  • flagcxIbRetransInitRecvPool (defined at line 855)
  • flagcxIbRetransHandleRecvSend (defined at line 915)

This dead code can be confusing for future maintenance and should be removed if it's not intended to be used.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants