Skip to content

Commit 7660b43

Browse files
author
chencjcj
committed
Add retrans for IBUC
1 parent 11dc7e4 commit 7660b43

File tree

8 files changed

+1969
-54
lines changed

8 files changed

+1969
-54
lines changed

flagcx/adaptor/include/ib_common.h

Lines changed: 124 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,9 +125,101 @@ struct flagcxIbMrHandle {
125125
#define FLAGCX_NET_IB_REQ_SEND 1
126126
#define FLAGCX_NET_IB_REQ_RECV 2
127127
#define FLAGCX_NET_IB_REQ_FLUSH 3
128+
#define FLAGCX_NET_IB_REQ_ACK 4
128129

129130
extern const char *reqTypeStr[];
130131

132+
#define FLAGCX_IB_RETRANS_MAX_INFLIGHT 2048
133+
#define FLAGCX_IB_RETRANS_BUFFER_SIZE 1024
134+
#define FLAGCX_IB_RETRANS_MAX_CHUNK_SIZE (8 * 1024 * 1024)
135+
#define FLAGCX_IB_SRQ_SIZE 1024
136+
137+
struct flagcxIbRetransHdr {
138+
uint32_t magic;
139+
uint32_t seq;
140+
uint32_t size;
141+
uint32_t rkey;
142+
uint64_t remote_addr;
143+
uint32_t imm_data;
144+
uint32_t padding;
145+
} __attribute__((packed));
146+
147+
struct flagcxIbAckMsg {
148+
uint16_t peer_id;
149+
uint16_t flow_id;
150+
uint16_t path;
151+
uint16_t ack_seq;
152+
uint16_t sack_bitmap_count;
153+
uint16_t padding;
154+
uint64_t timestamp_us;
155+
uint64_t sack_bitmap;
156+
} __attribute__((packed));
157+
158+
struct flagcxIbCtrlQp {
159+
struct ibv_qp *qp;
160+
struct ibv_cq *cq;
161+
struct ibv_ah *ah;
162+
uint32_t remote_qpn;
163+
uint32_t remote_qkey;
164+
};
165+
166+
struct flagcxIbRetransRecvBuf {
167+
void *buffer;
168+
struct ibv_mr *mr;
169+
size_t size;
170+
int in_use;
171+
};
172+
173+
struct flagcxIbSrqMgr {
174+
void *srq;
175+
struct ibv_cq *cq;
176+
struct flagcxIbRetransRecvBuf bufs[FLAGCX_IB_SRQ_SIZE];
177+
int buf_count;
178+
// Buffer management for SRQ (similar to UCCL)
179+
int free_buf_indices[FLAGCX_IB_SRQ_SIZE]; // Stack of free buffer indices
180+
int free_buf_count; // Number of free buffers available
181+
int post_srq_count; // Number of recv WRs that need to be posted to SRQ
182+
};
183+
184+
struct flagcxIbRetransEntry {
185+
uint32_t seq;
186+
uint32_t size;
187+
uint64_t send_time_us;
188+
uint64_t remote_addr;
189+
void *data;
190+
uint32_t lkeys[FLAGCX_IB_MAX_DEVS_PER_NIC];
191+
uint32_t rkeys[FLAGCX_IB_MAX_DEVS_PER_NIC];
192+
int retry_count;
193+
int valid;
194+
};
195+
196+
struct flagcxIbRetransState {
197+
uint32_t send_seq;
198+
uint32_t send_una;
199+
uint32_t recv_seq;
200+
201+
struct flagcxIbRetransEntry buffer[FLAGCX_IB_RETRANS_MAX_INFLIGHT];
202+
int buffer_head;
203+
int buffer_tail;
204+
int buffer_count;
205+
206+
uint64_t last_ack_time_us;
207+
uint64_t rto_us;
208+
uint64_t srtt_us;
209+
uint64_t rttvar_us;
210+
211+
uint64_t total_sent;
212+
uint64_t total_retrans;
213+
uint64_t total_acked;
214+
uint64_t total_timeout;
215+
216+
int enabled;
217+
int max_retry;
218+
int ack_interval;
219+
uint32_t min_rto_us;
220+
uint32_t max_rto_us;
221+
};
222+
131223
struct flagcxIbQp {
132224
struct ibv_qp *qp;
133225
int devIndex;
@@ -176,6 +268,11 @@ struct flagcxIbConnectionMetadata {
176268
char devName[MAX_MERGED_DEV_NAME];
177269
uint64_t fifoAddr;
178270
int ndevs;
271+
272+
uint32_t ctrlQpn[FLAGCX_IB_MAX_DEVS_PER_NIC];
273+
union ibv_gid ctrlGid[FLAGCX_IB_MAX_DEVS_PER_NIC];
274+
uint16_t ctrlLid[FLAGCX_IB_MAX_DEVS_PER_NIC];
275+
int retransEnabled;
179276
};
180277

181278
struct flagcxIbNetCommDevBase {
@@ -199,6 +296,10 @@ struct flagcxIbRemSizesFifo {
199296
struct flagcxIbSendCommDev {
200297
struct flagcxIbNetCommDevBase base;
201298
struct ibv_mr *fifoMr;
299+
300+
struct flagcxIbCtrlQp ctrlQp;
301+
struct ibv_mr *ackMr;
302+
void *ackBuffer;
202303
};
203304

204305
struct alignas(32) flagcxIbNetCommBase {
@@ -226,7 +327,16 @@ struct flagcxIbSendComm {
226327
struct ibv_send_wr wrs[FLAGCX_NET_IB_MAX_RECVS + 1];
227328
struct flagcxIbRemSizesFifo remSizesFifo;
228329
uint64_t fifoHead;
229-
int ar; // Use adaptive routing when all merged devices have it enabled
330+
int ar;
331+
332+
struct flagcxIbRetransState retrans;
333+
334+
int outstanding_sends;
335+
int outstanding_retrans;
336+
int max_outstanding;
337+
338+
struct flagcxIbRetransHdr retrans_hdr_pool[32];
339+
struct ibv_mr *retrans_hdr_mr;
230340
};
231341

232342
struct flagcxIbGpuFlush {
@@ -248,7 +358,16 @@ struct alignas(16) flagcxIbRecvCommDev {
248358
uint32_t fifoRkey;
249359
struct ibv_mr *fifoMr;
250360
struct ibv_sge fifoSge;
361+
251362
struct ibv_mr *sizesFifoMr;
363+
364+
struct flagcxIbCtrlQp ctrlQp;
365+
struct ibv_mr *ackMr;
366+
void *ackBuffer;
367+
368+
void *retransRecvBufs[32];
369+
struct ibv_mr *retransRecvMr;
370+
int retransRecvBufCount;
252371
};
253372

254373
struct alignas(32) flagcxIbRecvComm {
@@ -258,6 +377,10 @@ struct alignas(32) flagcxIbRecvComm {
258377
int sizesFifo[MAX_REQUESTS][FLAGCX_NET_IB_MAX_RECVS];
259378
int gpuFlushHostMem;
260379
int flushEnabled;
380+
381+
struct flagcxIbRetransState retrans;
382+
383+
struct flagcxIbSrqMgr srqMgr;
261384
};
262385

263386
// Global arrays (declared as extern, defined in adaptor files)
Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
/*************************************************************************
2+
* Copyright (c) 2024, FlagCX Inc.
3+
* All rights reserved.
4+
*
5+
* IBUC Retransmission Support - Header
6+
************************************************************************/
7+
8+
#ifndef FLAGCX_IBUC_RETRANS_H_
9+
#define FLAGCX_IBUC_RETRANS_H_
10+
11+
#include "flagcx_common.h"
12+
#include "ib_common.h"
13+
#include <stdint.h>
14+
#include <time.h>
15+
16+
extern int64_t flagcxParamIbRetransEnable(void);
17+
extern int64_t flagcxParamIbRetransTimeout(void);
18+
extern int64_t flagcxParamIbRetransMaxRetry(void);
19+
extern int64_t flagcxParamIbRetransAckInterval(void);
20+
21+
static inline uint64_t flagcxIbGetTimeUs(void) {
22+
struct timespec ts;
23+
clock_gettime(CLOCK_MONOTONIC, &ts);
24+
return (uint64_t)ts.tv_sec * 1000000ULL + (uint64_t)ts.tv_nsec / 1000ULL;
25+
}
26+
27+
static inline int flagcxIbSeqLess(uint32_t a, uint32_t b) {
28+
uint16_t a16 = a & 0xFFFF;
29+
uint16_t b16 = b & 0xFFFF;
30+
return (int16_t)(a16 - b16) < 0;
31+
}
32+
33+
static inline int flagcxIbSeqLeq(uint32_t a, uint32_t b) {
34+
uint16_t a16 = a & 0xFFFF;
35+
uint16_t b16 = b & 0xFFFF;
36+
return (int16_t)(a16 - b16) <= 0;
37+
}
38+
39+
flagcxResult_t flagcxIbRetransInit(struct flagcxIbRetransState *state);
40+
41+
flagcxResult_t flagcxIbRetransDestroy(struct flagcxIbRetransState *state);
42+
43+
flagcxResult_t flagcxIbRetransAddPacket(struct flagcxIbRetransState *state,
44+
uint32_t seq, uint32_t size, void *data,
45+
uint64_t remote_addr, uint32_t *lkeys,
46+
uint32_t *rkeys);
47+
48+
flagcxResult_t flagcxIbRetransProcessAck(struct flagcxIbRetransState *state,
49+
struct flagcxIbAckMsg *ack_msg);
50+
51+
flagcxResult_t flagcxIbRetransCheckTimeout(struct flagcxIbRetransState *state,
52+
struct flagcxIbSendComm *comm);
53+
54+
flagcxResult_t flagcxIbRetransRecvPacket(struct flagcxIbRetransState *state,
55+
uint32_t seq,
56+
struct flagcxIbAckMsg *ack_msg,
57+
int *should_ack);
58+
59+
flagcxResult_t flagcxIbRetransPiggybackAck(struct flagcxIbSendFifo *fifo_elem,
60+
struct flagcxIbAckMsg *ack_msg);
61+
62+
flagcxResult_t flagcxIbRetransExtractAck(struct flagcxIbSendFifo *fifo_elem,
63+
struct flagcxIbAckMsg *ack_msg);
64+
65+
static inline uint32_t flagcxIbEncodeImmData(uint32_t seq, uint32_t size) {
66+
return ((seq & 0xFFFF) << 16) | (size & 0xFFFF);
67+
}
68+
69+
static inline void flagcxIbDecodeImmData(uint32_t imm_data, uint32_t *seq,
70+
uint32_t *size) {
71+
*seq = (imm_data >> 16) & 0xFFFF;
72+
*size = imm_data & 0xFFFF;
73+
}
74+
75+
void flagcxIbRetransPrintStats(struct flagcxIbRetransState *state,
76+
const char *prefix);
77+
78+
flagcxResult_t flagcxIbCreateCtrlQp(struct ibv_context *context,
79+
struct ibv_pd *pd, uint8_t port_num,
80+
struct flagcxIbCtrlQp *ctrlQp);
81+
82+
flagcxResult_t flagcxIbDestroyCtrlQp(struct flagcxIbCtrlQp *ctrlQp);
83+
84+
flagcxResult_t
85+
flagcxIbSetupCtrlQpConnection(struct ibv_context *context, struct ibv_pd *pd,
86+
struct flagcxIbCtrlQp *ctrlQp,
87+
uint32_t remote_qpn, union ibv_gid *remote_gid,
88+
uint16_t remote_lid, uint8_t port_num,
89+
uint8_t link_layer, uint8_t local_gid_index);
90+
91+
flagcxResult_t flagcxIbRetransSendAckViaUd(struct flagcxIbRecvComm *comm,
92+
struct flagcxIbAckMsg *ack_msg,
93+
int devIndex);
94+
95+
flagcxResult_t flagcxIbRetransRecvAckViaUd(struct flagcxIbSendComm *comm,
96+
int devIndex);
97+
98+
flagcxResult_t flagcxIbRetransResendViaSend(struct flagcxIbSendComm *comm,
99+
uint32_t seq);
100+
101+
flagcxResult_t flagcxIbRetransInitRecvPool(struct flagcxIbRecvComm *comm);
102+
103+
flagcxResult_t flagcxIbRetransHandleRecvSend(struct flagcxIbRecvComm *comm,
104+
void *recv_buf, uint32_t byte_len,
105+
int qpIndex);
106+
107+
flagcxResult_t flagcxIbCreateSrq(struct ibv_context *context, struct ibv_pd *pd,
108+
struct flagcxIbSrqMgr *srqMgr);
109+
110+
flagcxResult_t flagcxIbDestroySrq(struct flagcxIbSrqMgr *srqMgr);
111+
112+
flagcxResult_t flagcxIbSrqPostRecv(struct flagcxIbSrqMgr *srqMgr, int count);
113+
114+
flagcxResult_t flagcxIbRetransResendViaSend(struct flagcxIbSendComm *comm,
115+
struct flagcxIbRetransEntry *entry);
116+
117+
#endif // FLAGCX_IBUC_RETRANS_H_

0 commit comments

Comments
 (0)