From c9b2f369a1c16a97339f39c2204857d5ec3025ce Mon Sep 17 00:00:00 2001 From: Tamir Duberstein Date: Wed, 18 Feb 2026 14:43:03 -0500 Subject: [PATCH 1/3] maps: rehome read_item on ConsumerPos This makes it easier to reuse later in new experimental APIs. --- aya/src/maps/ring_buf.rs | 81 ++++++++++++++++++++-------------------- 1 file changed, 40 insertions(+), 41 deletions(-) diff --git a/aya/src/maps/ring_buf.rs b/aya/src/maps/ring_buf.rs index 94c8eda8c..134cad4e9 100644 --- a/aya/src/maps/ring_buf.rs +++ b/aya/src/maps/ring_buf.rs @@ -243,6 +243,45 @@ impl ConsumerPos { // [2]: https://github.com/torvalds/linux/blob/2772d7df/kernel/bpf/ringbuf.c#L494 metadata.as_ref().store(*pos, Ordering::SeqCst); } + + fn read_item<'data>(&self, data: &'data [u8], mask: u32) -> Item<'data> { + let Self { pos, .. } = self; + let offset = pos & usize::try_from(mask).unwrap(); + #[expect( + clippy::panic, + reason = "invalid ring buffer layout is a fatal internal error" + )] + let must_get_data = |offset, len| { + data.get(offset..offset + len) + .unwrap_or_else(|| panic!("{:?} not in {:?}", offset..offset + len, 0..data.len())) + }; + let header_ptr: *const AtomicU32 = must_get_data(offset, size_of::()) + .as_ptr() + .cast(); + // Pair the kernel's SeqCst write (implies Release) [1] with an Acquire load. This + // ensures data written by the producer will be visible. + // + // [1]: https://github.com/torvalds/linux/blob/eb26cbb1/kernel/bpf/ringbuf.c#L488 + let header = unsafe { &*header_ptr }.load(Ordering::Acquire); + if header & BPF_RINGBUF_BUSY_BIT != 0 { + Item::Busy + } else { + let len = usize::try_from(header & mask).unwrap(); + if header & BPF_RINGBUF_DISCARD_BIT != 0 { + Item::Discard { len } + } else { + let data_offset = offset + usize::try_from(BPF_RINGBUF_HDR_SZ).unwrap(); + let data = must_get_data(data_offset, len); + Item::Data(data) + } + } + } +} + +enum Item<'a> { + Busy, + Discard { len: usize }, + Data(&'a [u8]), } struct ProducerData { @@ -335,7 +374,7 @@ impl ProducerData { ) }); while data_available(mmap, pos_cache, consumer) { - match read_item(data_pages, *mask, consumer) { + match consumer.read_item(data_pages, *mask) { Item::Busy => return None, Item::Discard { len } => consumer.consume(len), Item::Data(data) => return Some(RingBufItem { data, consumer }), @@ -343,12 +382,6 @@ impl ProducerData { } return None; - enum Item<'a> { - Busy, - Discard { len: usize }, - Data(&'a [u8]), - } - fn data_available( producer: &MMap, producer_cache: &mut usize, @@ -372,40 +405,6 @@ impl ProducerData { // [1]: https://github.com/torvalds/linux/blob/4b810bf0/kernel/bpf/ringbuf.c#L434-L440 consumer != producer_cache } - - fn read_item<'data>(data: &'data [u8], mask: u32, pos: &ConsumerPos) -> Item<'data> { - let ConsumerPos { pos, .. } = pos; - let offset = pos & usize::try_from(mask).unwrap(); - #[expect( - clippy::panic, - reason = "invalid ring buffer layout is a fatal internal error" - )] - let must_get_data = |offset, len| { - data.get(offset..offset + len).unwrap_or_else(|| { - panic!("{:?} not in {:?}", offset..offset + len, 0..data.len()) - }) - }; - let header_ptr: *const AtomicU32 = must_get_data(offset, size_of::()) - .as_ptr() - .cast(); - // Pair the kernel's SeqCst write (implies Release) [1] with an Acquire load. This - // ensures data written by the producer will be visible. - // - // [1]: https://github.com/torvalds/linux/blob/eb26cbb1/kernel/bpf/ringbuf.c#L488 - let header = unsafe { &*header_ptr }.load(Ordering::Acquire); - if header & BPF_RINGBUF_BUSY_BIT != 0 { - Item::Busy - } else { - let len = usize::try_from(header & mask).unwrap(); - if header & BPF_RINGBUF_DISCARD_BIT != 0 { - Item::Discard { len } - } else { - let data_offset = offset + usize::try_from(BPF_RINGBUF_HDR_SZ).unwrap(); - let data = must_get_data(data_offset, len); - Item::Data(data) - } - } - } } } From f47bda86736071d8ffa4218f33b6ee38bd073307 Mon Sep 17 00:00:00 2001 From: Tamir Duberstein Date: Thu, 19 Feb 2026 11:21:31 -0500 Subject: [PATCH 2/3] misc: avoid some allocations --- test-distro/src/depmod.rs | 7 ++++--- test-distro/src/init.rs | 15 ++++++--------- test-distro/src/lib.rs | 10 +++++----- xtask/src/codegen/aya.rs | 9 +++------ xtask/src/codegen/aya_ebpf_bindings.rs | 4 ++-- xtask/src/codegen/helpers.rs | 14 ++++++++++---- xtask/src/run.rs | 14 ++++++++------ 7 files changed, 38 insertions(+), 35 deletions(-) diff --git a/test-distro/src/depmod.rs b/test-distro/src/depmod.rs index 2e782d34c..f75672af2 100644 --- a/test-distro/src/depmod.rs +++ b/test-distro/src/depmod.rs @@ -5,9 +5,10 @@ //! the constraints of the test environment. Not for production use. use std::{ + borrow::Cow, fs::File, io::{BufWriter, Write as _}, - path::PathBuf, + path::{Path, PathBuf}, }; use anyhow::{Context as _, anyhow}; @@ -25,8 +26,8 @@ struct Args { fn main() -> anyhow::Result<()> { let Args { base_dir } = Parser::parse(); - let modules_dir = if let Some(base_dir) = base_dir { - base_dir + let modules_dir: Cow<'static, Path> = if let Some(base_dir) = base_dir { + base_dir.into() } else { resolve_modules_dir().context("failed to resolve modules dir")? }; diff --git a/test-distro/src/init.rs b/test-distro/src/init.rs index 17352fa1c..5196ca643 100644 --- a/test-distro/src/init.rs +++ b/test-distro/src/init.rs @@ -143,14 +143,11 @@ fn run() -> anyhow::Result<()> { // Read kernel parameters from /proc/cmdline. They're space separated on a single line. let cmdline = std::fs::read_to_string("/proc/cmdline") .with_context(|| "read_to_string(/proc/cmdline) failed")?; - let args = cmdline - .split_whitespace() - .filter_map(|parameter| { - parameter - .strip_prefix("init.arg=") - .map(std::ffi::OsString::from) - }) - .collect::>(); + let args = cmdline.split_whitespace().filter_map(|parameter| { + parameter + .strip_prefix("init.arg=") + .map(std::ffi::OsStr::new) + }); // Iterate files in /bin. let read_dir = std::fs::read_dir("/bin").context("read_dir(/bin) failed")?; @@ -159,7 +156,7 @@ fn run() -> anyhow::Result<()> { let entry = entry.context("read_dir(/bin) failed")?; let path = entry.path(); let mut cmd = std::process::Command::new(&path); - cmd.args(&args) + cmd.args(args.clone()) .env("RUST_BACKTRACE", "1") .env("RUST_LOG", "debug"); diff --git a/test-distro/src/lib.rs b/test-distro/src/lib.rs index 77a9b299c..5191be89a 100644 --- a/test-distro/src/lib.rs +++ b/test-distro/src/lib.rs @@ -1,4 +1,4 @@ -use std::path::PathBuf; +use std::{borrow::Cow, path::Path}; use anyhow::Context as _; use nix::sys::utsname::uname; @@ -6,13 +6,13 @@ use nix::sys::utsname::uname; /// Kernel modules are in `/lib/modules`. /// They may be in the root of this directory, /// or in subdirectory named after the kernel release. -pub fn resolve_modules_dir() -> anyhow::Result { - let modules_dir = PathBuf::from("/lib/modules"); +pub fn resolve_modules_dir() -> anyhow::Result> { + let modules_dir = Path::new("/lib/modules"); let stat = modules_dir .metadata() .with_context(|| format!("stat(): {}", modules_dir.display()))?; if stat.is_dir() { - return Ok(modules_dir); + return Ok(modules_dir.into()); } let utsname = uname().context("uname()")?; @@ -26,7 +26,7 @@ pub fn resolve_modules_dir() -> anyhow::Result { "{} is not a directory", modules_dir.display() ); - Ok(modules_dir) + Ok(modules_dir.into()) } pub fn read_to_end(path: &std::path::Path, compressed: bool) -> anyhow::Result> { diff --git a/xtask/src/codegen/aya.rs b/xtask/src/codegen/aya.rs index 042ea9d5b..95761cd64 100644 --- a/xtask/src/codegen/aya.rs +++ b/xtask/src/codegen/aya.rs @@ -1,7 +1,4 @@ -use std::{ - fs::create_dir_all, - path::{Path, PathBuf}, -}; +use std::{fs::create_dir_all, path::Path}; use anyhow::{Context as _, Result}; use aya_tool::bindgen; @@ -14,7 +11,7 @@ pub(crate) fn codegen(opts: &SysrootOptions, libbpf_dir: &Path) -> Result<()> { } fn codegen_internal_btf_bindings(libbpf_dir: &Path) -> Result<()> { - let dir = PathBuf::from("aya-obj"); + let dir = Path::new("aya-obj"); let generated = dir.join("src/generated"); let mut bindgen = bindgen::user_builder() @@ -48,7 +45,7 @@ fn codegen_bindings(opts: &SysrootOptions, libbpf_dir: &Path) -> Result<()> { s390x_sysroot, x86_64_sysroot, } = opts; - let dir = PathBuf::from("aya-obj"); + let dir = Path::new("aya-obj"); let generated = dir.join("src/generated"); create_dir_all(&generated)?; diff --git a/xtask/src/codegen/aya_ebpf_bindings.rs b/xtask/src/codegen/aya_ebpf_bindings.rs index 1d816f594..ef58f2500 100644 --- a/xtask/src/codegen/aya_ebpf_bindings.rs +++ b/xtask/src/codegen/aya_ebpf_bindings.rs @@ -1,6 +1,6 @@ use std::{ fs::{File, create_dir_all}, - path::{Path, PathBuf}, + path::Path, }; use anyhow::{Context as _, Result}; @@ -33,7 +33,7 @@ pub(crate) fn codegen(opts: &SysrootOptions, libbpf_dir: &Path) -> Result<()> { let mut cmd = install_libbpf_headers_cmd(libbpf_dir, &libbpf_headers_dir); exec(&mut cmd)?; - let dir = PathBuf::from("ebpf/aya-ebpf-bindings"); + let dir = Path::new("ebpf/aya-ebpf-bindings"); let builder = || { let mut bindgen = bindgen::bpf_builder() diff --git a/xtask/src/codegen/helpers.rs b/xtask/src/codegen/helpers.rs index b431e4dd0..254f47d82 100644 --- a/xtask/src/codegen/helpers.rs +++ b/xtask/src/codegen/helpers.rs @@ -76,10 +76,16 @@ pub(crate) fn expand_helper(helper: &Helper<'_>) -> TokenStream { call_index, } = helper; - let args = inputs - .iter() - .map(|arg| &arg.name.as_ref().unwrap().0) - .collect::>(); + let args = inputs.iter().map( + |BareFnArg { + attrs: _, + name, + ty: _, + }| { + let (name, _) = name.as_ref().unwrap(); + name + }, + ); let helper = quote! { pub unsafe fn #ident(#inputs) #output { diff --git a/xtask/src/run.rs b/xtask/src/run.rs index 92d5b71c2..ac9093d85 100644 --- a/xtask/src/run.rs +++ b/xtask/src/run.rs @@ -254,14 +254,16 @@ pub(crate) fn run(opts: Options, workspace_root: &Path) -> Result<()> { // Use --test-threads=1 to prevent tests from interacting with shared // kernel state due to the lack of inter-test isolation. - let default_args = [OsString::from("--test-threads=1")]; - let run_args = default_args.iter().chain(run_args.iter()); + let default_args = ["--test-threads=1"]; + let run_args = default_args + .iter() + .map(OsStr::new) + .chain(run_args.iter().map(OsString::as_os_str)); match environment { Environment::Local { runner } => { let mut args = runner.trim().split_terminator(' '); let runner = args.next().ok_or_else(|| anyhow!("no first argument"))?; - let args = args.collect::>(); let binaries = binaries(&package, None, &[])?; @@ -269,7 +271,7 @@ pub(crate) fn run(opts: Options, workspace_root: &Path) -> Result<()> { for (profile, binaries) in binaries { for (name, binary) in binaries { let mut cmd = Command::new(runner); - cmd.args(args.iter()) + cmd.args(args.clone()) .arg(binary) .args(run_args.clone()) .env("RUST_BACKTRACE", "1") @@ -798,8 +800,8 @@ pub(crate) fn run(opts: Options, workspace_root: &Path) -> Result<()> { for accel in ["kvm", "hvf", "tcg"] { qemu.args(["-accel", accel]); } - let console = OsString::from(console); - let mut kernel_args = std::iter::once(("console", &console)) + let console = OsStr::new(console); + let mut kernel_args = std::iter::once(("console", console)) .chain(run_args.clone().map(|run_arg| ("init.arg", run_arg))) .enumerate() .fold(OsString::new(), |mut acc, (i, (k, v))| { From 403a44fe6d8e9a96cff5cc05233f79fe1df41df8 Mon Sep 17 00:00:00 2001 From: Tamir Duberstein Date: Tue, 17 Feb 2026 16:43:37 -0500 Subject: [PATCH 3/3] maps: add `fold/try_fold/for_each` ring buffer API This implementation should have lower overhead than `RingBuffer::next` and by avoiding the overhead associated with `RingBufItem`. --- aya-log/src/lib.rs | 4 +- aya/Cargo.toml | 1 + aya/src/maps/ring_buf.rs | 252 ++++++++++++++---- test/integration-test/src/tests/load.rs | 11 +- test/integration-test/src/tests/ring_buf.rs | 125 ++++----- .../src/tests/uprobe_cookie.rs | 13 +- xtask/public-api/aya.txt | 6 + 7 files changed, 284 insertions(+), 128 deletions(-) diff --git a/aya-log/src/lib.rs b/aya-log/src/lib.rs index 3b6040c7e..b067dcb2f 100644 --- a/aya-log/src/lib.rs +++ b/aya-log/src/lib.rs @@ -182,9 +182,7 @@ impl EbpfLogger { /// Reads log records from eBPF and writes them to the logger. pub fn flush(&mut self) { let Self { ring_buf, logger } = self; - while let Some(buf) = ring_buf.next() { - log_buf(buf.as_ref(), logger).unwrap(); - } + ring_buf.for_each(|buf| log_buf(buf, logger).unwrap()); } } diff --git a/aya/Cargo.toml b/aya/Cargo.toml index 09dbcf5bf..4416b9e47 100644 --- a/aya/Cargo.toml +++ b/aya/Cargo.toml @@ -26,6 +26,7 @@ libc = { workspace = true } log = { workspace = true } object = { workspace = true, features = ["elf", "read_core", "std", "write"] } once_cell = { workspace = true } +scopeguard = { workspace = true } thiserror = { workspace = true } [dev-dependencies] diff --git a/aya/src/maps/ring_buf.rs b/aya/src/maps/ring_buf.rs index 134cad4e9..353c86184 100644 --- a/aya/src/maps/ring_buf.rs +++ b/aya/src/maps/ring_buf.rs @@ -6,8 +6,9 @@ use std::{ borrow::Borrow, + convert::Infallible, fmt::{self, Debug, Formatter}, - ops::Deref, + ops::{ControlFlow, Deref}, os::fd::{AsFd, AsRawFd, BorrowedFd, RawFd}, sync::atomic::{AtomicU32, AtomicUsize, Ordering}, }; @@ -37,7 +38,9 @@ use crate::{ /// /// To receive events you need to: /// * Construct [`RingBuf`] using [`RingBuf::try_from`]. -/// * Call [`RingBuf::next`] to poll events from the [`RingBuf`]. +/// * Call [`RingBuf::next`] to read one entry at a time, or +/// [`RingBuf::try_fold`], [`RingBuf::fold`], or [`RingBuf::for_each`] to +/// process available entries in bulk. /// /// To receive async notifications of data availability, you may construct an /// [`tokio::io::unix::AsyncFd`] from the [`RingBuf`]'s file descriptor and poll it for readiness. @@ -71,9 +74,9 @@ use crate::{ /// loop { /// let mut guard = poll.readable(); /// let ring_buf = guard.inner_mut(); -/// while let Some(item) = ring_buf.next() { +/// ring_buf.for_each(|item| { /// println!("received: {:?}", item); -/// } +/// }); /// guard.clear_ready(); /// } /// # Ok::<(), aya::EbpfError>(()) @@ -130,7 +133,53 @@ impl RingBuf { let Self { consumer, producer, .. } = self; - producer.next(consumer) + producer.next(consumer, None) + } + + /// Processes entries in the ring buffer with `f`. + /// + /// For each available data entry, `f` receives the accumulator and + /// entry: + /// * [`ControlFlow::Continue(next)`](ControlFlow::Continue) keeps draining with `next`. + /// * [`ControlFlow::Break(break_value)`](ControlFlow::Break) stops early and returns `break_value`. + /// + /// If the ring buffer is fully drained, returns [`ControlFlow::Continue`] + /// containing the final accumulator. + pub fn try_fold(&mut self, init: C, f: F) -> ControlFlow + where + F: FnMut(C, &[u8]) -> ControlFlow, + { + let Self { + consumer, producer, .. + } = self; + producer.try_fold(consumer, init, f) + } + + /// Processes entries in the ring buffer with `f`. + /// + /// For each available data entry, `f` receives the accumulator and entry, + /// and returns the next accumulator. + /// + /// Unlike [`RingBuf::try_fold`], this function cannot short-circuit: it + /// always processes entries until the ring buffer is fully drained, then + /// returns the final accumulator. + pub fn fold(&mut self, init: C, mut f: F) -> C + where + F: FnMut(C, &[u8]) -> C, + { + let ControlFlow::Continue(acc) = self + .try_fold::(init, |acc, data| ControlFlow::Continue(f(acc, data))); + acc + } + + /// Processes entries in the ring buffer with `f`. + /// + /// For each available data entry, `f` receives the entry. + pub fn for_each(&mut self, mut f: F) + where + F: FnMut(&[u8]), + { + self.fold((), |(), data| f(data)) } } @@ -151,10 +200,11 @@ impl> AsRawFd for RingBuf { } } -/// The current outstanding item read from the ringbuf. +/// An item read from the ring buffer. pub struct RingBufItem<'a> { data: &'a [u8], consumer: &'a mut ConsumerPos, + advanced: Option<&'a mut bool>, } impl Deref for RingBufItem<'_> { @@ -168,8 +218,17 @@ impl Deref for RingBufItem<'_> { impl Drop for RingBufItem<'_> { fn drop(&mut self) { - let Self { consumer, data } = self; - consumer.consume(data.len()) + let Self { + data, + consumer, + advanced, + } = self; + consumer.consume(data.len()); + if let Some(advanced) = advanced { + **advanced = true; + } else { + consumer.commit(); + } } } @@ -182,6 +241,7 @@ impl Debug for RingBufItem<'_> { pos, metadata: ConsumerMetadata { mmap: _ }, }, + advanced: _, } = self; // In general Relaxed here is sufficient, for debugging, it certainly is. f.debug_struct("RingBufItem") @@ -228,10 +288,13 @@ impl ConsumerPos { } fn consume(&mut self, len: usize) { - let Self { pos, metadata } = self; + let Self { pos, metadata: _ } = self; *pos += (usize::try_from(BPF_RINGBUF_HDR_SZ).unwrap() + len).next_multiple_of(8); + } + fn commit(&self) { + let Self { pos, metadata } = self; // Write operation needs to be properly ordered with respect to the producer committing new // data to the ringbuf. The producer uses xchg (SeqCst) to commit new data [1]. The producer // reads the consumer offset after clearing the busy bit on a new entry [2]. By using SeqCst @@ -353,58 +416,155 @@ impl ProducerData { }) } - fn next<'a>(&'a mut self, consumer: &'a mut ConsumerPos) -> Option> { - let Self { - mmap, - data_offset, - pos_cache, - mask, - } = self; - let mmap = &*mmap; + fn data_pages(mmap: &MMap, data_offset: usize) -> &[u8] { let mmap_data = mmap.as_ref(); #[expect( clippy::panic, reason = "invalid ring buffer layout is a fatal internal error" )] - let data_pages = mmap_data.get(*data_offset..).unwrap_or_else(|| { + mmap_data.get(data_offset..).unwrap_or_else(|| { panic!( "offset {} out of bounds, data len {}", data_offset, mmap_data.len() ) - }); - while data_available(mmap, pos_cache, consumer) { - match consumer.read_item(data_pages, *mask) { + }) + } + + fn data_available( + mmap: &MMap, + pos_cache: &mut usize, + consumer: &mut ConsumerPos, + mut flush_consumer: F, + ) -> bool + where + F: FnMut(&mut ConsumerPos), + { + // Refresh the producer position cache if it appears that the consumer is caught up + // with the producer position. + if consumer.pos == *pos_cache { + // Persist the consumer position to avoid starving the producer. + flush_consumer(consumer); + *pos_cache = load_producer_pos(mmap); + } + + // Note that we don't compare the order of the values because the producer position may + // overflow u32 and wrap around to 0. Instead we just compare equality and assume that + // the consumer position is always logically less than the producer position. + // + // Note also that the kernel, at the time of writing [1], doesn't seem to handle this + // overflow correctly at all, and it's not clear that one can produce events after the + // producer position has wrapped around. + // + // [1]: https://github.com/torvalds/linux/blob/4b810bf0/kernel/bpf/ringbuf.c#L434-L440 + consumer.pos != *pos_cache + } + + fn next<'a>( + &'a mut self, + consumer: &'a mut ConsumerPos, + mut advanced: Option<&'a mut bool>, + ) -> Option> { + let Self { + mmap, + data_offset, + pos_cache, + .. + } = self; + let data_pages = Self::data_pages(mmap, *data_offset); + loop { + if !Self::data_available(mmap, pos_cache, consumer, |consumer| { + if let Some(advanced) = advanced.as_mut() { + if std::mem::replace(*advanced, false) { + consumer.commit(); + } + } + }) { + return None; + } + match consumer.read_item(data_pages, self.mask) { Item::Busy => return None, - Item::Discard { len } => consumer.consume(len), - Item::Data(data) => return Some(RingBufItem { data, consumer }), + Item::Discard { len } => { + consumer.consume(len); + if let Some(advanced) = advanced.as_mut() { + **advanced = true; + } else { + consumer.commit(); + } + } + Item::Data(data) => { + return Some(RingBufItem { + data, + consumer, + advanced, + }); + } } } - return None; - - fn data_available( - producer: &MMap, - producer_cache: &mut usize, - consumer: &ConsumerPos, - ) -> bool { - let ConsumerPos { pos: consumer, .. } = consumer; - // Refresh the producer position cache if it appears that the consumer is caught up - // with the producer position. - if consumer == producer_cache { - *producer_cache = load_producer_pos(producer); - } + } - // Note that we don't compare the order of the values because the producer position may - // overflow u32 and wrap around to 0. Instead we just compare equality and assume that - // the consumer position is always logically less than the producer position. - // - // Note also that the kernel, at the time of writing [1], doesn't seem to handle this - // overflow correctly at all, and it's not clear that one can produce events after the - // producer position has wrapped around. - // - // [1]: https://github.com/torvalds/linux/blob/4b810bf0/kernel/bpf/ringbuf.c#L434-L440 - consumer != producer_cache + fn try_fold( + &mut self, + consumer: &mut ConsumerPos, + init: C, + mut f: F, + ) -> ControlFlow + where + F: FnMut(C, &[u8]) -> ControlFlow, + { + let Self { + mmap, + data_offset, + pos_cache, + .. + } = self; + let data_pages = Self::data_pages(mmap, *data_offset); + let mut acc = init; + let mut advanced = false; + let consume = |consumer: &mut ConsumerPos, advanced: &mut bool, len: usize| { + consumer.consume(len); + *advanced = true; + }; + let flush = |consumer: &mut ConsumerPos, advanced: &mut bool| -> bool { + let flush = std::mem::replace(advanced, false); + if flush { + consumer.commit(); + } + flush + }; + // This must be deferred in case `f` panics. + let mut guard = scopeguard::guard((consumer, &mut advanced), |(consumer, advanced)| { + flush(consumer, advanced); + }); + loop { + let (consumer, advanced) = &mut *guard; + if !Self::data_available(mmap, pos_cache, consumer, |consumer| { + flush(consumer, advanced); + }) { + break; + } + match consumer.read_item(data_pages, self.mask) { + Item::Busy => { + if !flush(consumer, advanced) { + break; + } + } + Item::Discard { len } => consume(consumer, advanced, len), + Item::Data(data) => { + // This must be deferred in case `f` panics. + scopeguard::defer! { consume(consumer, advanced, data.len()) }; + match f(acc, data) { + ControlFlow::Continue(next) => { + acc = next; + } + ControlFlow::Break(v) => { + return ControlFlow::Break(v); + } + } + } + } } + ControlFlow::Continue(acc) } } diff --git a/test/integration-test/src/tests/load.rs b/test/integration-test/src/tests/load.rs index 8ff890975..ae430bb37 100644 --- a/test/integration-test/src/tests/load.rs +++ b/test/integration-test/src/tests/load.rs @@ -60,10 +60,13 @@ fn ringbuffer_btf_map() { trigger_bpf_program(); - let item = ring_buf.next().unwrap(); - let item: [u8; 4] = (*item).try_into().unwrap(); - let val = u32::from_ne_bytes(item); - assert_eq!(val, 0xdeadbeef); + let mut items = Vec::new(); + ring_buf.for_each(|item| { + let item = item.try_into().unwrap(); + let val = u32::from_ne_bytes(item); + items.push(val); + }); + assert_eq!(items, &[0xdeadbeef]); } #[test_log::test] diff --git a/test/integration-test/src/tests/ring_buf.rs b/test/integration-test/src/tests/ring_buf.rs index 800407f41..ba61a3dec 100644 --- a/test/integration-test/src/tests/ring_buf.rs +++ b/test/integration-test/src/tests/ring_buf.rs @@ -1,4 +1,5 @@ use std::{ + ops::ControlFlow, os::fd::AsRawFd as _, path::Path, sync::{ @@ -9,7 +10,6 @@ use std::{ time::Duration, }; -use anyhow::Context as _; use assert_matches::assert_matches; use aya::{ Ebpf, EbpfLoader, @@ -147,21 +147,13 @@ fn ring_buf(n: usize) { } } - let mut seen = Vec::::new(); - while seen.len() < expected.len() { - if let Some(read) = ring_buf.next() { - let read: [u8; 8] = (*read) - .try_into() - .with_context(|| format!("data: {:?}", read.len())) - .unwrap(); - let arg = u64::from_ne_bytes(read); - assert_eq!(arg % 2, 0, "got {arg} from probe"); - seen.push(arg); - } - } - - // Make sure that there is nothing else in the ring_buf. - assert_matches!(ring_buf.next(), None); + let mut seen = Vec::with_capacity(expected.len()); + ring_buf.for_each(|read| { + let read = read.try_into().unwrap(); + let arg = u64::from_ne_bytes(read); + assert_eq!(arg % 2, 0, "got {arg} from probe"); + seen.push(arg); + }); // Ensure that the data that was read matches what was passed, and the rejected count was set // properly. @@ -202,13 +194,13 @@ fn ring_buf_mismatch_size( prog.attach(trigger_symbol, "/proc/self/exe", None).unwrap(); trigger(value.into()); - { - let read = ring_buf.next().unwrap(); + let mut items = Vec::new(); + ring_buf.for_each(|read| { assert_eq!(read.len(), size_of::()); - let decoded = decode(read.as_ref()); - assert_eq!(decoded, value); - } - assert_matches!(ring_buf.next(), None); + let decoded = decode(read); + items.push(decoded); + }); + assert_eq!(items, &[value]); } #[test_log::test] @@ -221,8 +213,8 @@ fn ring_buf_mismatch_small() { ring_buf_trigger_ebpf_program, value, |read| { - let bytes: [u8; 2] = read.try_into().unwrap(); - u16::from_ne_bytes(bytes) + let read = read.try_into().unwrap(); + u16::from_ne_bytes(read) }, ); } @@ -237,8 +229,8 @@ fn ring_buf_mismatch_large() { ring_buf_trigger_ebpf_program, value, |read| { - let bytes: [u8; 8] = read.try_into().unwrap(); - u64::from_ne_bytes(bytes) + let read = read.try_into().unwrap(); + u64::from_ne_bytes(read) }, ); } @@ -266,15 +258,12 @@ async fn ring_buf_async_with_drops() { // Construct an AsyncFd from the RingBuf in order to receive readiness notifications. let mut seen = 0; let mut process_ring_buf = |ring_buf: &mut RingBuf<_>| { - while let Some(read) = ring_buf.next() { - let read: [u8; 8] = (*read) - .try_into() - .with_context(|| format!("data: {:?}", read.len())) - .unwrap(); + ring_buf.for_each(|read| { + let read = read.try_into().unwrap(); let arg = u64::from_ne_bytes(read); assert_eq!(arg % 2, 0, "got {arg} from probe"); seen += 1; - } + }); }; let mut writer = futures::future::try_join_all(data.chunks(8).map(ToOwned::to_owned).map(|v| { @@ -315,9 +304,6 @@ async fn ring_buf_async_with_drops() { } } - // Make sure that there is nothing else in the ring_buf. - assert_matches!(async_fd.into_inner().next(), None); - let max_dropped: u64 = u64::try_from(data.len().saturating_sub(RING_BUF_MAX_ENTRIES - 1)).unwrap(); let max_seen = u64::try_from(data.iter().filter(|v| *v % 2 == 0).count()).unwrap(); @@ -388,24 +374,18 @@ async fn ring_buf_async_no_drop() { while seen.len() < expected_len { let mut guard = async_fd.readable_mut().await.unwrap(); let ring_buf = guard.get_inner_mut(); - while let Some(read) = ring_buf.next() { - let read: [u8; 8] = (*read) - .try_into() - .with_context(|| format!("data: {:?}", read.len())) - .unwrap(); + ring_buf.for_each(|read| { + let read = read.try_into().unwrap(); let arg = u64::from_ne_bytes(read); seen.push(arg); - } + }); guard.clear_ready(); } - (seen, async_fd.into_inner()) + seen }; - let (writer, (seen, mut ring_buf)) = futures::future::join(writer, reader).await; + let (writer, seen) = futures::future::join(writer, reader).await; writer.unwrap(); - // Make sure that there is nothing else in the ring_buf. - assert_matches!(ring_buf.next(), None); - // Ensure that the data that was read matches what was passed. assert_eq!(&seen, &expected); let Registers { dropped, rejected } = regs.get(&0, 0).unwrap().iter().sum(); @@ -443,10 +423,10 @@ fn ring_buf_epoll_wakeup() { let writer = WriterThread::spawn(); while total_events < WriterThread::NUM_MESSAGES { epoll::wait(epoll_fd, -1, &mut epoll_event_buf).unwrap(); - while let Some(read) = ring_buf.next() { + ring_buf.for_each(|read| { assert_eq!(read.len(), 8); total_events += 1; - } + }); } writer.join(); } @@ -468,11 +448,11 @@ async fn ring_buf_asyncfd_events() { let writer = WriterThread::spawn(); while total_events < WriterThread::NUM_MESSAGES { let mut guard = async_fd.readable_mut().await.unwrap(); - let rb = guard.get_inner_mut(); - while let Some(read) = rb.next() { + let ring_buf = guard.get_inner_mut(); + ring_buf.for_each(|read| { assert_eq!(read.len(), 8); total_events += 1; - } + }); guard.clear_ready(); } writer.join(); @@ -546,11 +526,22 @@ async fn ring_buf_pinned() { ring_buf_trigger_ebpf_program(v); } let (to_read_before_reopen, to_read_after_reopen) = to_write_before_reopen.split_at(2); - for v in to_read_before_reopen { - let item = ring_buf.next().unwrap(); - let item: [u8; 8] = item.as_ref().try_into().unwrap(); - assert_eq!(item, v.to_ne_bytes()); - } + + assert_matches!( + ring_buf.try_fold(to_read_before_reopen, |to_read, item| { + assert_matches!(to_read, [v, to_read @ ..] => { + let item: [u8; 8] = item.as_ref().try_into().unwrap(); + assert_eq!(item, v.to_ne_bytes()); + if to_read.is_empty() { + ControlFlow::Break(()) + } else { + ControlFlow::Continue(to_read) + } + }) + }), + ControlFlow::Break(()) + ); + drop(ring_buf); drop(bpf); @@ -574,15 +565,17 @@ async fn ring_buf_pinned() { } // Read both the data that was written before the ring buffer was reopened and the data that // was written after it was reopened. - for v in to_read_after_reopen - .iter() - .chain(to_write_after_reopen.iter()) - { - let item = ring_buf.next().unwrap(); - let item: [u8; 8] = item.as_ref().try_into().unwrap(); - assert_eq!(item, v.to_ne_bytes()); - } - // Make sure there is nothing else in the ring buffer. - assert_matches!(ring_buf.next(), None); + let mut iter = ring_buf.fold( + to_read_after_reopen + .iter() + .chain(to_write_after_reopen.iter()), + |mut iter, item| { + let v = iter.next().unwrap(); + let item = item.try_into().unwrap(); + assert_eq!(u64::from_ne_bytes(item), *v); + iter + }, + ); + assert_eq!(iter.next(), None); } } diff --git a/test/integration-test/src/tests/uprobe_cookie.rs b/test/integration-test/src/tests/uprobe_cookie.rs index edc6c2b0b..074b4d781 100644 --- a/test/integration-test/src/tests/uprobe_cookie.rs +++ b/test/integration-test/src/tests/uprobe_cookie.rs @@ -60,15 +60,10 @@ fn test_uprobe_cookie() { const EXP: &[u64] = &[1, 2, 1, 3]; let mut seen = Vec::new(); - while let Some(read) = ring_buf.next() { - let read = read.as_ref(); - match read.try_into() { - Ok(read) => seen.push(u64::from_le_bytes(read)), - Err(std::array::TryFromSliceError { .. }) => { - panic!("invalid ring buffer data: {read:x?}") - } - } - } + ring_buf.for_each(|read| { + let read = read.try_into().unwrap(); + seen.push(u64::from_le_bytes(read)); + }); assert_eq!(seen, EXP); } diff --git a/xtask/public-api/aya.txt b/xtask/public-api/aya.txt index fd6a16eeb..37b323493 100644 --- a/xtask/public-api/aya.txt +++ b/xtask/public-api/aya.txt @@ -563,7 +563,10 @@ pub fn aya::maps::queue::Queue::from(t: T) -> T pub mod aya::maps::ring_buf pub struct aya::maps::ring_buf::RingBuf impl aya::maps::ring_buf::RingBuf +pub fn aya::maps::ring_buf::RingBuf::fold(&mut self, init: C, f: F) -> C where F: core::ops::function::FnMut(C, &[u8]) -> C +pub fn aya::maps::ring_buf::RingBuf::for_each(&mut self, f: F) where F: core::ops::function::FnMut(&[u8]) pub fn aya::maps::ring_buf::RingBuf::next(&mut self) -> core::option::Option> +pub fn aya::maps::ring_buf::RingBuf::try_fold(&mut self, init: C, f: F) -> core::ops::control_flow::ControlFlow where F: core::ops::function::FnMut(C, &[u8]) -> core::ops::control_flow::ControlFlow impl core::convert::TryFrom for aya::maps::ring_buf::RingBuf pub type aya::maps::ring_buf::RingBuf::Error = aya::maps::MapError pub fn aya::maps::ring_buf::RingBuf::try_from(map: aya::maps::Map) -> core::result::Result @@ -2246,7 +2249,10 @@ impl core::convert::From for aya::maps::queue::Queue pub fn aya::maps::queue::Queue::from(t: T) -> T pub struct aya::maps::RingBuf impl aya::maps::ring_buf::RingBuf +pub fn aya::maps::ring_buf::RingBuf::fold(&mut self, init: C, f: F) -> C where F: core::ops::function::FnMut(C, &[u8]) -> C +pub fn aya::maps::ring_buf::RingBuf::for_each(&mut self, f: F) where F: core::ops::function::FnMut(&[u8]) pub fn aya::maps::ring_buf::RingBuf::next(&mut self) -> core::option::Option> +pub fn aya::maps::ring_buf::RingBuf::try_fold(&mut self, init: C, f: F) -> core::ops::control_flow::ControlFlow where F: core::ops::function::FnMut(C, &[u8]) -> core::ops::control_flow::ControlFlow impl core::convert::TryFrom for aya::maps::ring_buf::RingBuf pub type aya::maps::ring_buf::RingBuf::Error = aya::maps::MapError pub fn aya::maps::ring_buf::RingBuf::try_from(map: aya::maps::Map) -> core::result::Result