Skip to content

Commit 36d256c

Browse files
mfijalkoKernel Patches Daemon
authored andcommitted
xsk: fix immature cq descriptor production
Eryk reported an issue that I have put under Closes: tag, related to umem addrs being prematurely produced onto pool's completion queue. Let us make the skb's destructor responsible for producing all addrs that given skb used. Commit from fixes tag introduced the buggy behavior, it was not broken from day 1, but rather when XSK multi-buffer got introduced. In order to mitigate performance impact as much as possible, mimic the linear and frag parts within skb by storing the first address from XSK descriptor at sk_buff::destructor_arg. For fragments, store them at ::cb via list. The nodes that will go onto list will be allocated via kmem_cache. xsk_destruct_skb() will consume address stored at ::destructor_arg and optionally go through list from ::cb, if count of descriptors associated with this particular skb is bigger than 1. Previous approach where whole array for storing UMEM addresses from XSK descriptors was pre-allocated during first fragment processing yielded too big performance regression for 64b traffic. In current approach impact is much reduced on my tests and for jumbo frames I observed traffic being slower by at most 9%. Magnus suggested to have this way of processing special cased for XDP_SHARED_UMEM, so we would identify this during bind and set different hooks for 'backpressure mechanism' on CQ and for skb destructor, but given that results looked promising on my side I decided to have a single data path for XSK generic Tx. I suppose other auxiliary stuff such as helpers introduced in this patch would have to land as well in order to make it work, so we might have ended up with more noisy diff. Fixes: b7f72a3 ("xsk: introduce wrappers and helpers for supporting multi-buffer in Tx path") Reported-by: Eryk Kubanski <[email protected]> Closes: https://lore.kernel.org/netdev/[email protected]/ Signed-off-by: Maciej Fijalkowski <[email protected]>
1 parent a387b48 commit 36d256c

File tree

2 files changed

+97
-14
lines changed

2 files changed

+97
-14
lines changed

net/xdp/xsk.c

Lines changed: 85 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,20 @@
3636
#define TX_BATCH_SIZE 32
3737
#define MAX_PER_SOCKET_BUDGET 32
3838

39+
struct xsk_addr_node {
40+
u64 addr;
41+
struct list_head addr_node;
42+
};
43+
44+
struct xsk_addr_head {
45+
u32 num_descs;
46+
struct list_head addrs_list;
47+
};
48+
49+
static struct kmem_cache *xsk_tx_generic_cache;
50+
51+
#define XSKCB(skb) ((struct xsk_addr_head *)((skb)->cb))
52+
3953
void xsk_set_rx_need_wakeup(struct xsk_buff_pool *pool)
4054
{
4155
if (pool->cached_need_wakeup & XDP_WAKEUP_RX)
@@ -532,24 +546,41 @@ static int xsk_wakeup(struct xdp_sock *xs, u8 flags)
532546
return dev->netdev_ops->ndo_xsk_wakeup(dev, xs->queue_id, flags);
533547
}
534548

535-
static int xsk_cq_reserve_addr_locked(struct xsk_buff_pool *pool, u64 addr)
549+
static int xsk_cq_reserve_locked(struct xsk_buff_pool *pool)
536550
{
537551
unsigned long flags;
538552
int ret;
539553

540554
spin_lock_irqsave(&pool->cq_lock, flags);
541-
ret = xskq_prod_reserve_addr(pool->cq, addr);
555+
ret = xskq_prod_reserve(pool->cq);
542556
spin_unlock_irqrestore(&pool->cq_lock, flags);
543557

544558
return ret;
545559
}
546560

547-
static void xsk_cq_submit_locked(struct xsk_buff_pool *pool, u32 n)
561+
static void xsk_cq_submit_addr_locked(struct xsk_buff_pool *pool,
562+
struct sk_buff *skb)
548563
{
564+
struct xsk_addr_node *pos, *tmp;
549565
unsigned long flags;
566+
u32 i = 0;
567+
u32 idx;
550568

551569
spin_lock_irqsave(&pool->cq_lock, flags);
552-
xskq_prod_submit_n(pool->cq, n);
570+
idx = xskq_get_prod(pool->cq);
571+
572+
xskq_prod_write_addr(pool->cq, idx, (u64)skb_shinfo(skb)->destructor_arg);
573+
i++;
574+
575+
if (unlikely(XSKCB(skb)->num_descs > 1)) {
576+
list_for_each_entry_safe(pos, tmp, &XSKCB(skb)->addrs_list, addr_node) {
577+
xskq_prod_write_addr(pool->cq, idx + i, pos->addr);
578+
i++;
579+
list_del(&pos->addr_node);
580+
kmem_cache_free(xsk_tx_generic_cache, pos);
581+
}
582+
}
583+
xskq_prod_submit_n(pool->cq, i);
553584
spin_unlock_irqrestore(&pool->cq_lock, flags);
554585
}
555586

@@ -562,9 +593,14 @@ static void xsk_cq_cancel_locked(struct xsk_buff_pool *pool, u32 n)
562593
spin_unlock_irqrestore(&pool->cq_lock, flags);
563594
}
564595

596+
static void xsk_inc_num_desc(struct sk_buff *skb)
597+
{
598+
XSKCB(skb)->num_descs++;
599+
}
600+
565601
static u32 xsk_get_num_desc(struct sk_buff *skb)
566602
{
567-
return skb ? (long)skb_shinfo(skb)->destructor_arg : 0;
603+
return XSKCB(skb)->num_descs;
568604
}
569605

570606
static void xsk_destruct_skb(struct sk_buff *skb)
@@ -576,23 +612,32 @@ static void xsk_destruct_skb(struct sk_buff *skb)
576612
*compl->tx_timestamp = ktime_get_tai_fast_ns();
577613
}
578614

579-
xsk_cq_submit_locked(xdp_sk(skb->sk)->pool, xsk_get_num_desc(skb));
615+
xsk_cq_submit_addr_locked(xdp_sk(skb->sk)->pool, skb);
580616
sock_wfree(skb);
581617
}
582618

583-
static void xsk_set_destructor_arg(struct sk_buff *skb)
619+
static void xsk_set_destructor_arg(struct sk_buff *skb, u64 addr)
584620
{
585-
long num = xsk_get_num_desc(xdp_sk(skb->sk)->skb) + 1;
586-
587-
skb_shinfo(skb)->destructor_arg = (void *)num;
621+
INIT_LIST_HEAD(&XSKCB(skb)->addrs_list);
622+
XSKCB(skb)->num_descs = 0;
623+
skb_shinfo(skb)->destructor_arg = (void *)addr;
588624
}
589625

590626
static void xsk_consume_skb(struct sk_buff *skb)
591627
{
592628
struct xdp_sock *xs = xdp_sk(skb->sk);
629+
u32 num_descs = xsk_get_num_desc(skb);
630+
struct xsk_addr_node *pos, *tmp;
631+
632+
if (unlikely(num_descs > 1)) {
633+
list_for_each_entry_safe(pos, tmp, &XSKCB(skb)->addrs_list, addr_node) {
634+
list_del(&pos->addr_node);
635+
kmem_cache_free(xsk_tx_generic_cache, pos);
636+
}
637+
}
593638

594639
skb->destructor = sock_wfree;
595-
xsk_cq_cancel_locked(xs->pool, xsk_get_num_desc(skb));
640+
xsk_cq_cancel_locked(xs->pool, num_descs);
596641
/* Free skb without triggering the perf drop trace */
597642
consume_skb(skb);
598643
xs->skb = NULL;
@@ -623,6 +668,8 @@ static struct sk_buff *xsk_build_skb_zerocopy(struct xdp_sock *xs,
623668
return ERR_PTR(err);
624669

625670
skb_reserve(skb, hr);
671+
672+
xsk_set_destructor_arg(skb, desc->addr);
626673
}
627674

628675
addr = desc->addr;
@@ -694,6 +741,8 @@ static struct sk_buff *xsk_build_skb(struct xdp_sock *xs,
694741
err = skb_store_bits(skb, 0, buffer, len);
695742
if (unlikely(err))
696743
goto free_err;
744+
745+
xsk_set_destructor_arg(skb, desc->addr);
697746
} else {
698747
int nr_frags = skb_shinfo(skb)->nr_frags;
699748
struct page *page;
@@ -759,7 +808,19 @@ static struct sk_buff *xsk_build_skb(struct xdp_sock *xs,
759808
skb->mark = READ_ONCE(xs->sk.sk_mark);
760809
skb->destructor = xsk_destruct_skb;
761810
xsk_tx_metadata_to_compl(meta, &skb_shinfo(skb)->xsk_meta);
762-
xsk_set_destructor_arg(skb);
811+
812+
xsk_inc_num_desc(skb);
813+
if (unlikely(xsk_get_num_desc(skb) > 1)) {
814+
struct xsk_addr_node *xsk_addr;
815+
816+
xsk_addr = kmem_cache_zalloc(xsk_tx_generic_cache, GFP_KERNEL);
817+
if (!xsk_addr) {
818+
err = -ENOMEM;
819+
goto free_err;
820+
}
821+
xsk_addr->addr = desc->addr;
822+
list_add_tail(&xsk_addr->addr_node, &XSKCB(skb)->addrs_list);
823+
}
763824

764825
return skb;
765826

@@ -769,7 +830,7 @@ static struct sk_buff *xsk_build_skb(struct xdp_sock *xs,
769830

770831
if (err == -EOVERFLOW) {
771832
/* Drop the packet */
772-
xsk_set_destructor_arg(xs->skb);
833+
xsk_inc_num_desc(xs->skb);
773834
xsk_drop_skb(xs->skb);
774835
xskq_cons_release(xs->tx);
775836
} else {
@@ -812,7 +873,7 @@ static int __xsk_generic_xmit(struct sock *sk)
812873
* if there is space in it. This avoids having to implement
813874
* any buffering in the Tx path.
814875
*/
815-
err = xsk_cq_reserve_addr_locked(xs->pool, desc.addr);
876+
err = xsk_cq_reserve_locked(xs->pool);
816877
if (err) {
817878
err = -EAGAIN;
818879
goto out;
@@ -1815,8 +1876,18 @@ static int __init xsk_init(void)
18151876
if (err)
18161877
goto out_pernet;
18171878

1879+
xsk_tx_generic_cache = kmem_cache_create("xsk_generic_xmit_cache",
1880+
sizeof(struct xsk_addr_node),
1881+
0, SLAB_HWCACHE_ALIGN, NULL);
1882+
if (!xsk_tx_generic_cache) {
1883+
err = -ENOMEM;
1884+
goto out_unreg_notif;
1885+
}
1886+
18181887
return 0;
18191888

1889+
out_unreg_notif:
1890+
unregister_netdevice_notifier(&xsk_netdev_notifier);
18201891
out_pernet:
18211892
unregister_pernet_subsys(&xsk_net_ops);
18221893
out_sk:

net/xdp/xsk_queue.h

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,11 @@ static inline u32 xskq_cons_present_entries(struct xsk_queue *q)
344344

345345
/* Functions for producers */
346346

347+
static inline u32 xskq_get_prod(struct xsk_queue *q)
348+
{
349+
return READ_ONCE(q->ring->producer);
350+
}
351+
347352
static inline u32 xskq_prod_nb_free(struct xsk_queue *q, u32 max)
348353
{
349354
u32 free_entries = q->nentries - (q->cached_prod - q->cached_cons);
@@ -390,6 +395,13 @@ static inline int xskq_prod_reserve_addr(struct xsk_queue *q, u64 addr)
390395
return 0;
391396
}
392397

398+
static inline void xskq_prod_write_addr(struct xsk_queue *q, u32 idx, u64 addr)
399+
{
400+
struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
401+
402+
ring->desc[idx & q->ring_mask] = addr;
403+
}
404+
393405
static inline void xskq_prod_write_addr_batch(struct xsk_queue *q, struct xdp_desc *descs,
394406
u32 nb_entries)
395407
{

0 commit comments

Comments
 (0)