Skip to content

Commit 070bbc2

Browse files
committed
UCS/ARBITER: Allow same-group rescheduling from dispatch callback
Port below PR from master branch: openucx#5021 Signed-off-by: Liu, Changcheng <jerrliu@nvidia.com>
1 parent 997c8d4 commit 070bbc2

File tree

5 files changed

+190
-96
lines changed

5 files changed

+190
-96
lines changed

src/ucs/datastruct/arbiter.c

Lines changed: 143 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -46,23 +46,28 @@ static inline void ucs_arbiter_group_head_reset(ucs_arbiter_elem_t *head)
4646
head->list.next = NULL; /* Not scheduled yet */
4747
}
4848

49+
static inline void ucs_arbiter_elem_set_scheduled(ucs_arbiter_elem_t *elem,
50+
ucs_arbiter_group_t *group)
51+
{
52+
elem->group = group;
53+
}
54+
4955
void ucs_arbiter_group_push_elem_always(ucs_arbiter_group_t *group,
5056
ucs_arbiter_elem_t *elem)
5157
{
5258
ucs_arbiter_elem_t *tail = group->tail;
5359

54-
UCS_ARBITER_GROUP_GUARD_CHECK(group);
55-
5660
if (tail == NULL) {
61+
/* group is empty */
5762
ucs_arbiter_group_head_reset(elem);
5863
elem->next = elem; /* Connect to itself */
5964
} else {
6065
elem->next = tail->next; /* Point to first element */
6166
tail->next = elem; /* Point previous element to new one */
6267
}
6368

64-
elem->group = group; /* Always point to group */
6569
group->tail = elem; /* Update group tail */
70+
ucs_arbiter_elem_set_scheduled(elem, group);
6671
}
6772

6873
void ucs_arbiter_group_push_head_elem_always(ucs_arbiter_t *arbiter,
@@ -74,8 +79,8 @@ void ucs_arbiter_group_push_head_elem_always(ucs_arbiter_t *arbiter,
7479

7580
UCS_ARBITER_GROUP_GUARD_CHECK(group);
7681

77-
elem->group = group; /* Always point to group */
7882
ucs_arbiter_group_head_reset(elem);
83+
ucs_arbiter_elem_set_scheduled(elem, group);
7984

8085
if (tail == NULL) {
8186
elem->next = elem; /* Connect to itself */
@@ -94,14 +99,6 @@ void ucs_arbiter_group_push_head_elem_always(ucs_arbiter_t *arbiter,
9499
ucs_list_replace(&head->list, &elem->list);
95100
}
96101

97-
void ucs_arbiter_group_head_desched(ucs_arbiter_t *arbiter,
98-
ucs_arbiter_elem_t *head)
99-
{
100-
if (ucs_arbiter_group_head_is_scheduled(head)) {
101-
ucs_list_del(&head->list);
102-
}
103-
}
104-
105102
void ucs_arbiter_group_purge(ucs_arbiter_t *arbiter,
106103
ucs_arbiter_group_t *group,
107104
ucs_arbiter_callback_t cb, void *cb_arg)
@@ -133,8 +130,8 @@ void ucs_arbiter_group_purge(ucs_arbiter_t *arbiter,
133130
ptr = next;
134131
next = ptr->next;
135132
/* Can't touch the element after cb is called if it gets removed. But it
136-
* can be reused later as well, so it's next should be NULL. */
137-
ptr->next = NULL;
133+
* can be reused later as well, so it's group should be NULL. */
134+
ucs_arbiter_elem_init(ptr);
138135
result = cb(arbiter, group, ptr, cb_arg);
139136

140137
if (result == UCS_ARBITER_CB_RESULT_REMOVE_ELEM) {
@@ -158,8 +155,8 @@ void ucs_arbiter_group_purge(ucs_arbiter_t *arbiter,
158155
prev->next = next;
159156
} else {
160157
/* keep the element */
161-
ptr->next = next; /* Restore next pointer */
162-
prev = ptr;
158+
ucs_arbiter_elem_set_scheduled(ptr, group);
159+
prev = ptr;
163160
}
164161
} while (ptr != tail);
165162

@@ -174,6 +171,25 @@ void ucs_arbiter_group_purge(ucs_arbiter_t *arbiter,
174171
}
175172
}
176173

174+
size_t ucs_arbiter_group_num_elems(ucs_arbiter_group_t *group)
175+
{
176+
ucs_arbiter_elem_t *elem = group->tail;
177+
size_t num_elems;
178+
179+
if (elem == NULL) {
180+
return 0;
181+
}
182+
183+
num_elems = 0;
184+
do {
185+
++num_elems;
186+
elem = elem->next;
187+
} while (elem != group->tail);
188+
189+
return num_elems;
190+
}
191+
192+
177193
int ucs_arbiter_group_is_scheduled(ucs_arbiter_group_t *group)
178194
{
179195
ucs_arbiter_elem_t *head;
@@ -204,13 +220,7 @@ void ucs_arbiter_group_schedule_nonempty(ucs_arbiter_t *arbiter,
204220
ucs_assert(tail != NULL);
205221
head = tail->next;
206222

207-
if (head == NULL) {
208-
/* It means that 1 element group is scheduled during dispatch.
209-
* Restore next pointer.
210-
*/
211-
head = tail;
212-
}
213-
223+
ucs_assert(head != NULL);
214224
ucs_arbiter_schedule_head_if_not_scheduled(arbiter, head);
215225
UCS_ARBITER_GROUP_ARBITER_SET(group, arbiter);
216226
}
@@ -230,19 +240,47 @@ void ucs_arbiter_group_desched_nonempty(ucs_arbiter_t *arbiter,
230240
ucs_arbiter_group_head_reset(head);
231241
}
232242

243+
static inline void
244+
ucs_arbiter_remove_and_reset_if_scheduled(ucs_arbiter_elem_t *elem)
245+
{
246+
if (ucs_unlikely(ucs_arbiter_group_head_is_scheduled(elem))) {
247+
ucs_list_del(&elem->list);
248+
ucs_arbiter_group_head_reset(elem);
249+
}
250+
}
251+
252+
static inline void
253+
ucs_arbiter_group_head_replace(ucs_arbiter_group_t *group,
254+
ucs_arbiter_elem_t *group_head,
255+
ucs_arbiter_elem_t *new_group_head)
256+
{
257+
/* check if this is really the group head */
258+
ucs_assert(!ucs_arbiter_group_is_empty(group));
259+
ucs_assert(group->tail->next == group_head);
260+
261+
if (group_head->next == group_head) {
262+
group->tail = new_group_head;
263+
} else {
264+
new_group_head->next = group_head->next;
265+
}
266+
group->tail->next = new_group_head;
267+
}
268+
233269
void ucs_arbiter_dispatch_nonempty(ucs_arbiter_t *arbiter, unsigned per_group,
234270
ucs_arbiter_callback_t cb, void *cb_arg)
235271
{
236-
ucs_arbiter_elem_t *group_head, *group_tail, *next_elem;
272+
ucs_arbiter_elem_t *group_head;
237273
ucs_arbiter_cb_result_t result;
238274
unsigned group_dispatch_count;
239275
ucs_arbiter_group_t *group;
240276
UCS_LIST_HEAD(resched_list);
241-
int sched_group;
277+
ucs_arbiter_elem_t dummy;
242278

243279
ucs_assert(!ucs_list_is_empty(&arbiter->list));
244280

245-
for (;;) {
281+
ucs_arbiter_group_head_reset(&dummy);
282+
283+
do {
246284
group_head = ucs_list_extract_head(&arbiter->list, ucs_arbiter_elem_t,
247285
list);
248286
ucs_assert(group_head != NULL);
@@ -254,80 +292,111 @@ void ucs_arbiter_dispatch_nonempty(ucs_arbiter_t *arbiter, unsigned per_group,
254292
ucs_arbiter_group_head_reset(group_head);
255293

256294
group_dispatch_count = 0;
257-
sched_group = 1;
258295
group = group_head->group;
296+
dummy.group = group;
259297
UCS_ARBITER_GROUP_GUARD_CHECK(group);
260298

261-
do {
262-
/* zero pointer to next elem here because:
299+
for (;;) {
300+
ucs_assert(group_head->group == group);
301+
ucs_assert(dummy.group == group);
302+
ucs_assert(group_dispatch_count < per_group);
303+
304+
/* reset the dispatched element here because:
263305
* 1. if the element is removed from the arbiter it must be kept in
264306
* initialized state otherwise push will fail
265-
* 2. we can't zero the pointer after calling the callback because
307+
* 2. we can't reset the element after calling the callback because
266308
* the callback could release the element.
267309
*/
268-
next_elem = group_head->next;
269-
group_head->next = NULL;
270-
ucs_assert(group_head->group == group);
310+
ucs_arbiter_elem_init(group_head);
311+
ucs_assert(!ucs_arbiter_group_head_is_scheduled(group_head));
271312

313+
/* replace group head by a dummy element, to allow scheduling more
314+
* elements on this group from the dispatch callback.
315+
*/
316+
ucs_arbiter_group_head_replace(group, group_head, &dummy);
317+
318+
/* dispatch the element */
272319
ucs_trace_poll("dispatching arbiter element %p", group_head);
273320
UCS_ARBITER_GROUP_GUARD_ENTER(group);
274321
result = cb(arbiter, group, group_head, cb_arg);
275322
UCS_ARBITER_GROUP_GUARD_EXIT(group);
276323
ucs_trace_poll("dispatch result: %d", result);
277324
++group_dispatch_count;
278325

279-
if (result == UCS_ARBITER_CB_RESULT_REMOVE_ELEM) {
280-
group_tail = group->tail;
281-
if (group_head == group_tail) {
282-
/* Last element */
283-
group->tail = NULL; /* Group is empty now */
284-
sched_group = 0;
285-
group_head = NULL; /* for debugging */
326+
/* recursive push to head (during dispatch) is not allowed */
327+
ucs_assert(group->tail->next == &dummy);
328+
329+
/* element is not removed */
330+
if (ucs_unlikely(result != UCS_ARBITER_CB_RESULT_REMOVE_ELEM)) {
331+
/* restore group pointer */
332+
ucs_arbiter_elem_set_scheduled(group_head, group);
333+
334+
/* the head should not move, since dummy replaces it */
335+
ucs_assert(!ucs_arbiter_group_head_is_scheduled(group_head));
336+
337+
/* replace dummy element by group_head */
338+
ucs_arbiter_group_head_replace(group, &dummy, group_head);
339+
340+
if (result == UCS_ARBITER_CB_RESULT_DESCHED_GROUP) {
341+
/* take over a recursively scheduled group */
342+
if (ucs_unlikely(ucs_arbiter_group_head_is_scheduled(&dummy))) {
343+
ucs_list_replace(&dummy.list, &group_head->list);
344+
ucs_arbiter_group_head_reset(&dummy);
345+
}
286346
UCS_ARBITER_GROUP_ARBITER_SET(group, NULL);
287-
break;
288347
} else {
289-
/* Not last element */
290-
ucs_assert(group_head == group_tail->next);
291-
ucs_assert(group_head != next_elem);
292-
group_head = next_elem; /* Update group head */
293-
group_tail->next = group_head; /* Tail points to new head */
294-
ucs_arbiter_group_head_reset(group_head);
295-
}
296-
} else {
297-
/* element is not removed, restore next pointer */
298-
group_head->next = next_elem;
299-
300-
/* group must still be active */
301-
ucs_assert(sched_group == 1);
302-
303-
if (result == UCS_ARBITER_CB_RESULT_STOP) {
304-
/* exit the outmost loop and make sure that next dispatch()
305-
* will continue from the current group */
306-
ucs_list_add_head(&arbiter->list, &group_head->list);
307-
goto out;
308-
} else if (result != UCS_ARBITER_CB_RESULT_NEXT_GROUP) {
309-
/* resched/desched must avoid adding the group to the arbiter */
310-
sched_group = 0;
311-
if (result == UCS_ARBITER_CB_RESULT_DESCHED_GROUP) {
312-
UCS_ARBITER_GROUP_ARBITER_SET(group, NULL);
348+
/* remove a recursively scheduled group, give priority
349+
* to the original order */
350+
ucs_arbiter_remove_and_reset_if_scheduled(&dummy);
351+
352+
if (result == UCS_ARBITER_CB_RESULT_NEXT_GROUP) {
353+
/* add to arbiter tail */
354+
ucs_list_add_tail(&arbiter->list, &group_head->list);
313355
} else if (result == UCS_ARBITER_CB_RESULT_RESCHED_GROUP) {
356+
/* add to resched list */
314357
ucs_list_add_tail(&resched_list, &group_head->list);
358+
} else if (result == UCS_ARBITER_CB_RESULT_STOP) {
359+
/* exit the outmost loop and make sure that next dispatch()
360+
* will continue from the current group */
361+
ucs_list_add_head(&arbiter->list, &group_head->list);
362+
goto out;
315363
} else {
316364
ucs_bug("unexpected return value from arbiter callback");
317365
}
318-
break;
319366
}
367+
368+
break;
320369
}
321-
} while (group_dispatch_count < per_group);
322370

323-
if (sched_group) {
324-
/* the group could be scheduled again from dispatch callback */
325-
ucs_arbiter_schedule_head_if_not_scheduled(arbiter, group_head);
326-
ucs_assert(!ucs_list_is_empty(&arbiter->list));
327-
} else if (ucs_list_is_empty(&arbiter->list)) {
328-
break;
371+
/* last element removed */
372+
if (dummy.next == &dummy) {
373+
group->tail = NULL; /* group is empty now */
374+
group_head = NULL; /* for debugging */
375+
ucs_arbiter_remove_and_reset_if_scheduled(&dummy);
376+
UCS_ARBITER_GROUP_ARBITER_SET(group, NULL);
377+
break;
378+
}
379+
380+
/* non-last element removed */
381+
group_head = dummy.next; /* Update group head */
382+
group->tail->next = group_head; /* Tail points to new head */
383+
384+
if (ucs_unlikely(ucs_arbiter_group_head_is_scheduled(&dummy))) {
385+
/* take over a recursively scheduled group */
386+
ucs_list_replace(&dummy.list, &group_head->list);
387+
ucs_arbiter_group_head_reset(&dummy);
388+
/* the group is already scheduled, continue to next group */
389+
break;
390+
} else if (group_dispatch_count >= per_group) {
391+
/* add to arbiter tail and continue to next group */
392+
ucs_list_add_tail(&arbiter->list, &group_head->list);
393+
break;
394+
}
395+
396+
/* continue with new group head */
397+
ucs_arbiter_group_head_reset(group_head);
329398
}
330-
}
399+
} while (!ucs_list_is_empty(&arbiter->list));
331400

332401
out:
333402
ucs_list_splice_tail(&arbiter->list, &resched_list);

src/ucs/datastruct/arbiter.h

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ void ucs_arbiter_group_cleanup(ucs_arbiter_group_t *group);
195195
*/
196196
static inline void ucs_arbiter_elem_init(ucs_arbiter_elem_t *elem)
197197
{
198-
elem->next = NULL;
198+
elem->group = NULL;
199199
}
200200

201201

@@ -233,6 +233,12 @@ void ucs_arbiter_group_purge(ucs_arbiter_t *arbiter, ucs_arbiter_group_t *group,
233233
ucs_arbiter_callback_t cb, void *cb_arg);
234234

235235

236+
/**
237+
* @return Number of elements in the group
238+
*/
239+
size_t ucs_arbiter_group_num_elems(ucs_arbiter_group_t *group);
240+
241+
236242
void ucs_arbiter_dump(ucs_arbiter_t *arbiter, FILE *stream);
237243

238244

@@ -306,11 +312,10 @@ static inline void ucs_arbiter_group_desched(ucs_arbiter_t *arbiter,
306312
/**
307313
* @return Whether the element is queued in an arbiter group.
308314
* (an element can't be queued more than once)
309-
*
310315
*/
311316
static inline int ucs_arbiter_elem_is_scheduled(ucs_arbiter_elem_t *elem)
312317
{
313-
return elem->next != NULL;
318+
return elem->group != NULL;
314319
}
315320

316321

@@ -381,9 +386,9 @@ ucs_arbiter_dispatch(ucs_arbiter_t *arbiter, unsigned per_group,
381386
* @return true if element is the only one in the group
382387
*/
383388
static inline int
384-
ucs_arbiter_elem_is_only(ucs_arbiter_group_t *group, ucs_arbiter_elem_t *elem)
389+
ucs_arbiter_elem_is_only(ucs_arbiter_elem_t *elem)
385390
{
386-
return (group->tail == elem) && ((elem->next == elem) || (elem->next == NULL));
391+
return elem->next == elem;
387392
}
388393

389394
#endif

src/uct/ib/dc/dc_mlx5_ep.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1144,7 +1144,7 @@ uct_dc_mlx5_iface_dci_do_dcs_pending_tx(ucs_arbiter_t *arbiter,
11441144
arb_group);
11451145
uct_dc_mlx5_iface_t *iface = ucs_derived_of(ep->super.super.iface,
11461146
uct_dc_mlx5_iface_t);
1147-
int is_only = ucs_arbiter_elem_is_only(group, elem);
1147+
int is_only = ucs_arbiter_elem_is_only(elem);
11481148
ucs_arbiter_cb_result_t res;
11491149

11501150
res = uct_dc_mlx5_iface_dci_do_common_pending_tx(ep, elem);

src/uct/ib/ud/base/ud_inl.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ uct_ud_am_skb_common(uct_ud_iface_t *iface, uct_ud_ep_t *ep, uint8_t id,
168168
*/
169169
ucs_assertv((ep->flags & UCT_UD_EP_FLAG_IN_PENDING) ||
170170
ucs_arbiter_group_is_empty(&ep->tx.pending.group) ||
171-
ucs_arbiter_elem_is_only(&ep->tx.pending.group, &ep->tx.pending.elem),
171+
ucs_arbiter_elem_is_only(&ep->tx.pending.elem),
172172
"out-of-order send detected for ep %p am %d ep_pending %d arbelem %p",
173173
ep, id, (ep->flags & UCT_UD_EP_FLAG_IN_PENDING),
174174
&ep->tx.pending.elem);

0 commit comments

Comments
 (0)