Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
190 changes: 189 additions & 1 deletion aya/src/maps/hash_map/per_cpu_hash_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,14 @@ use crate::{
maps::{
IterableMap, MapData, MapError, MapIter, MapKeys, PerCpuValues, check_kv_size, hash_map,
},
sys::{SyscallError, bpf_map_lookup_elem_per_cpu, bpf_map_update_elem_per_cpu},
sys::{
SyscallError, bpf_map_lookup_and_delete_batch_per_cpu, bpf_map_lookup_elem_per_cpu,
bpf_map_update_elem_per_cpu,
},
};

type BatchResult<K, V> = (Vec<K>, Vec<PerCpuValues<V>>, Option<K>);

/// Similar to [`HashMap`](crate::maps::HashMap) but each CPU holds a separate value for a given key. Typically used to
/// minimize lock contention in eBPF programs.
///
Expand Down Expand Up @@ -82,6 +87,77 @@ impl<T: Borrow<MapData>, K: Pod, V: Pod> PerCpuHashMap<T, K, V> {
pub fn keys(&self) -> MapKeys<'_, K> {
MapKeys::new(self.inner.borrow())
}

/// Batch lookup and delete multiple key-value pairs from the map.
///
/// This method retrieves and removes up to `batch_size` entries from the map in a single
/// syscall, which is more efficient than calling `get` and `remove` individually for each key.
///
/// # Arguments
///
/// * `batch_size` - Maximum number of entries to retrieve in this batch
/// * `in_batch` - Optional cursor from a previous batch operation (use `None` for the first call)
/// * `flags` - Operation flags
///
/// # Returns
///
/// Returns a tuple of `(keys, values, out_batch)` where:
/// - `keys` - Vector of retrieved keys
/// - `values` - Vector of retrieved per-CPU values (one `PerCpuValues<V>` per key)
/// - `out_batch` - Optional cursor for the next batch (pass this as `in_batch` to continue iteration)
///
/// When `out_batch` is `None`, there are no more entries to retrieve.
///
/// # Minimum kernel version
///
/// The minimum kernel version required to use this feature is 5.6.
///
/// # Examples
///
/// ```no_run
/// # let mut bpf = aya::Ebpf::load(&[])?;
/// use aya::maps::PerCpuHashMap;
///
/// let mut hm = PerCpuHashMap::<_, u8, u32>::try_from(bpf.map_mut("PER_CPU_STORAGE").unwrap())?;
///
/// // Retrieve and delete entries in batches of 64
/// let mut cursor = None;
/// loop {
/// let (keys, values, next_cursor) = hm.batch_lookup_and_delete(64, cursor.as_ref(), 0)?;
///
/// if keys.is_empty() {
/// break;
/// }
///
/// for (key, per_cpu_vals) in keys.iter().zip(values.iter()) {
/// println!("Key: {}, Values: {:?}", key, per_cpu_vals);
/// }
///
/// cursor = next_cursor;
/// if cursor.is_none() {
/// break;
/// }
/// }
/// # Ok::<(), aya::EbpfError>(())
/// ```
pub fn batch_lookup_and_delete(
&self,
batch_size: usize,
in_batch: Option<&K>,
flags: u64,
) -> Result<BatchResult<K, V>, MapError> {
let fd = self.inner.borrow().fd().as_fd();

bpf_map_lookup_and_delete_batch_per_cpu(fd, in_batch, batch_size, flags)
.map(|batch| (batch.keys, batch.values, batch.out_batch))
.map_err(|io_error| {
SyscallError {
call: "bpf_map_lookup_and_delete_batch",
io_error,
}
.into()
})
}
}

impl<T: BorrowMut<MapData>, K: Pod, V: Pod> PerCpuHashMap<T, K, V> {
Expand Down Expand Up @@ -194,4 +270,116 @@ mod tests {

assert_matches!(map.get(&1, 0), Err(MapError::KeyNotFound));
}

#[test]
fn test_batch_lookup_and_delete_empty() {
use aya_obj::generated::bpf_cmd;

use crate::sys::Syscall;

let map_data =
|| test_utils::new_map(test_utils::new_obj_map::<u32>(BPF_MAP_TYPE_PERCPU_HASH));
let map = Map::PerCpuHashMap(map_data());
let map = PerCpuHashMap::<_, u32, u32>::try_from(&map).unwrap();

// Mock the syscall to return ENOENT (no entries)
override_syscall(|call| match call {
Syscall::Ebpf {
cmd: bpf_cmd::BPF_MAP_LOOKUP_AND_DELETE_BATCH,
attr,
} => {
// Kernel sets count to 0 when no entries are returned
attr.batch.count = 0;
sys_error(ENOENT)
}
_ => sys_error(libc::EINVAL),
});

let result = map.batch_lookup_and_delete(10, None, 0);
assert_matches!(result, Ok((keys, values, cursor)) => {
assert_eq!(keys.len(), 0);
assert_eq!(values.len(), 0);
assert_eq!(cursor, None);
});
}

#[test]
fn test_batch_lookup_and_delete_with_entries() {
use std::ptr;

use aya_obj::generated::bpf_cmd;

use crate::{sys::Syscall, util::nr_cpus};

let map_data =
|| test_utils::new_map(test_utils::new_obj_map::<u32>(BPF_MAP_TYPE_PERCPU_HASH));
let map = Map::PerCpuHashMap(map_data());
let map = PerCpuHashMap::<_, u32, u32>::try_from(&map).unwrap();

// Mock the syscall to return 2 entries
override_syscall(|call| match call {
Syscall::Ebpf {
cmd: bpf_cmd::BPF_MAP_LOOKUP_AND_DELETE_BATCH,
attr,
} => unsafe {
// Get nr_cpus inside the closure to avoid capturing
let nr_cpus = nr_cpus().unwrap();

// Fill in the keys
let keys_ptr = attr.batch.keys as *mut u32;
*keys_ptr.add(0) = 10;
*keys_ptr.add(1) = 20;

// Fill in the values (per-CPU)
let value_size = (std::mem::size_of::<u32>() + 7) & !7;
let values_ptr = attr.batch.values as *mut u8;

// For key 10: values [100, 101, 102, ...]
for cpu in 0..nr_cpus {
let offset = cpu * value_size;
ptr::write_unaligned(values_ptr.add(offset).cast::<u32>(), 100 + cpu as u32);
}

// For key 20: values [200, 201, 202, ...]
for cpu in 0..nr_cpus {
let offset = nr_cpus * value_size + cpu * value_size;
ptr::write_unaligned(values_ptr.add(offset).cast::<u32>(), 200 + cpu as u32);
}

// Set the actual count
attr.batch.count = 2;

// Set out_batch (next cursor)
let out_batch_ptr = attr.batch.out_batch as *mut u32;
*out_batch_ptr = 30;

Ok(0)
},
_ => sys_error(libc::EINVAL),
});

let result = map.batch_lookup_and_delete(10, None, 0);
let nr_cpus = nr_cpus().unwrap();
assert_matches!(result, Ok((keys, values, cursor)) => {
assert_eq!(keys.len(), 2);
assert_eq!(values.len(), 2);

assert_eq!(keys[0], 10);
assert_eq!(keys[1], 20);

// Check per-CPU values for key 10
assert_eq!(values[0].len(), nr_cpus);
for (cpu, value) in values[0].iter().enumerate().take(nr_cpus) {
assert_eq!(*value, 100 + cpu as u32);
}

// Check per-CPU values for key 20
assert_eq!(values[1].len(), nr_cpus);
for (cpu, value) in values[1].iter().enumerate().take(nr_cpus) {
assert_eq!(*value, 200 + cpu as u32);
}

assert_eq!(cursor, Some(30));
});
}
}
102 changes: 102 additions & 0 deletions aya/src/sys/bpf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,108 @@ pub(crate) fn bpf_map_lookup_elem_per_cpu<K: Pod, V: Pod>(
}
}

/// Result of a batch lookup operation on a per-CPU map.
pub(crate) struct PerCpuBatch<K: Pod, V: Pod> {
/// Vector of retrieved keys
pub(crate) keys: Vec<K>,
/// Vector of retrieved per-CPU values
pub(crate) values: Vec<PerCpuValues<V>>,
/// Optional cursor for the next batch
pub(crate) out_batch: Option<K>,
}

type PerCpuBatchResult<K, V> = io::Result<PerCpuBatch<K, V>>;

/// Batch lookup and delete elements from a per-cpu map.
///
/// # Arguments
///
/// * `fd` - The file descriptor of the map
/// * `in_batch` - Optional reference to the previous batch cursor (None for first call)
/// * `batch_size` - Maximum number of elements to retrieve
/// * `flags` - Operation flags
///
/// # Returns
///
/// Returns a tuple of (keys, values, out_batch) where:
/// - keys: Vector of retrieved keys
/// - values: Vector of retrieved per-CPU values
/// - out_batch: Optional cursor for the next batch
///
/// # Introduced in kernel v5.6
pub(crate) fn bpf_map_lookup_and_delete_batch_per_cpu<K: Pod, V: Pod>(
fd: BorrowedFd<'_>,
in_batch: Option<&K>,
batch_size: usize,
flags: u64,
) -> PerCpuBatchResult<K, V> {
if batch_size == 0 {
return Ok(PerCpuBatch {
keys: Vec::new(),
values: Vec::new(),
out_batch: None,
});
}

let mut attr = unsafe { mem::zeroed::<bpf_attr>() };
let mut out_batch = MaybeUninit::<K>::uninit();
let mut keys = vec![unsafe { mem::zeroed() }; batch_size];

let value_size = (mem::size_of::<V>() + 7) & !7;
let nr_cpus = crate::util::nr_cpus().map_err(|(_, error)| error)?;
// value out buffer
let mut values_buffer = vec![0u8; batch_size * nr_cpus * value_size];

let batch_attr = unsafe { &mut attr.batch };
batch_attr.map_fd = fd.as_raw_fd() as u32;
batch_attr.keys = keys.as_mut_ptr() as u64;
batch_attr.values = values_buffer.as_mut_ptr() as u64;
batch_attr.count = batch_size as u32;
if let Some(batch) = in_batch {
batch_attr.in_batch = ptr::from_ref(batch) as u64;
}
batch_attr.out_batch = ptr::from_mut(&mut out_batch) as u64;
batch_attr.flags = flags;

if let Err(e) = unit_sys_bpf(bpf_cmd::BPF_MAP_LOOKUP_AND_DELETE_BATCH, &mut attr) {
// ENOENT is returned when the map is empty or we've reached the end of iteration.
// This is not an error condition - we still return the partial results via attr.batch.count.
if e.raw_os_error() != Some(ENOENT) {
return Err(e);
}
}

let actual_count = unsafe { attr.batch.count } as usize;
keys.truncate(actual_count);
let mut values = Vec::with_capacity(actual_count);
for i in 0..actual_count {
let offset = i * nr_cpus * value_size;
let per_cpu_values: Vec<V> = (0..nr_cpus)
.map(|cpu| {
let value_offset = offset + cpu * value_size;
// SAFETY:
// 1. `values_buffer` is allocated with size `batch_size * nr_cpus * value_size`.
// 2. The loop bounds ensure `i < actual_count` (<= batch_size) and `cpu < nr_cpus`.
// 3. Therefore, `value_offset` is always within the bounds of `values_buffer`.
// 4. `ptr::read_unaligned` allows reading potentially unaligned values from the byte buffer.
unsafe { ptr::read_unaligned(values_buffer.as_ptr().add(value_offset).cast::<V>()) }
})
.collect();

values.push(PerCpuValues::try_from(per_cpu_values).map_err(io::Error::other)?);
}
let out_batch = if actual_count > 0 {
Some(unsafe { out_batch.assume_init() })
} else {
None
};
Ok(PerCpuBatch {
keys,
values,
out_batch,
})
}

pub(crate) fn bpf_map_lookup_elem_ptr<K: Pod, V>(
fd: BorrowedFd<'_>,
key: Option<&K>,
Expand Down
2 changes: 2 additions & 0 deletions xtask/public-api/aya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ impl<T> core::convert::From<T> for aya::maps::hash_map::HashMap<T, K, V>
pub fn aya::maps::hash_map::HashMap<T, K, V>::from(t: T) -> T
pub struct aya::maps::hash_map::PerCpuHashMap<T, K: aya::Pod, V: aya::Pod>
impl<T: core::borrow::Borrow<aya::maps::MapData>, K: aya::Pod, V: aya::Pod> aya::maps::hash_map::PerCpuHashMap<T, K, V>
pub fn aya::maps::hash_map::PerCpuHashMap<T, K, V>::batch_lookup_and_delete(&self, batch_size: usize, in_batch: core::option::Option<&K>, flags: u64) -> core::result::Result<(alloc::vec::Vec<K>, alloc::vec::Vec<aya::maps::PerCpuValues<V>>, core::option::Option<K>), aya::maps::MapError>
pub fn aya::maps::hash_map::PerCpuHashMap<T, K, V>::get(&self, key: &K, flags: u64) -> core::result::Result<aya::maps::PerCpuValues<V>, aya::maps::MapError>
pub fn aya::maps::hash_map::PerCpuHashMap<T, K, V>::iter(&self) -> aya::maps::MapIter<'_, K, aya::maps::PerCpuValues<V>, Self>
pub fn aya::maps::hash_map::PerCpuHashMap<T, K, V>::keys(&self) -> aya::maps::MapKeys<'_, K>
Expand Down Expand Up @@ -1964,6 +1965,7 @@ impl<T> core::convert::From<T> for aya::maps::PerCpuArray<T, V>
pub fn aya::maps::PerCpuArray<T, V>::from(t: T) -> T
pub struct aya::maps::PerCpuHashMap<T, K: aya::Pod, V: aya::Pod>
impl<T: core::borrow::Borrow<aya::maps::MapData>, K: aya::Pod, V: aya::Pod> aya::maps::hash_map::PerCpuHashMap<T, K, V>
pub fn aya::maps::hash_map::PerCpuHashMap<T, K, V>::batch_lookup_and_delete(&self, batch_size: usize, in_batch: core::option::Option<&K>, flags: u64) -> core::result::Result<(alloc::vec::Vec<K>, alloc::vec::Vec<aya::maps::PerCpuValues<V>>, core::option::Option<K>), aya::maps::MapError>
pub fn aya::maps::hash_map::PerCpuHashMap<T, K, V>::get(&self, key: &K, flags: u64) -> core::result::Result<aya::maps::PerCpuValues<V>, aya::maps::MapError>
pub fn aya::maps::hash_map::PerCpuHashMap<T, K, V>::iter(&self) -> aya::maps::MapIter<'_, K, aya::maps::PerCpuValues<V>, Self>
pub fn aya::maps::hash_map::PerCpuHashMap<T, K, V>::keys(&self) -> aya::maps::MapKeys<'_, K>
Expand Down