From 9a092a2f0bc8dbe2e6a846ef71ddfa1ceff07852 Mon Sep 17 00:00:00 2001 From: Jamie Turner Date: Mon, 1 Feb 2021 04:20:02 +0000 Subject: [PATCH 1/2] `send_to` and `recv_from` methods utilizing associated SENDMSG and RECVMSG ops. A couple of structural changes to enable this: 1. Instead of a simple io_uring_cqe as the only result on successful execution of the entry, we also passthrough any address information that was written to the `msghdr`. This lets us convert these results into `SocketAddr` on sessionless transforts, like UDP. The `in_flight` system keeps an array of `Option` to act as the storage location for these socket addresses when necessary. 2. Instead of a bool for if the `with_cqe` method should use a `msghdr`, an enum is now used to allow the specification of an address that should be copied into this `msghdr` in addition to just `iovec` information. This is so that `send_to`/`SENDMSG` operations without an implicit peer can know the delivery destination of the datagram. --- examples/udp_echo.rs | 24 ++++++++ src/completion.rs | 24 ++++---- src/io_uring/cq.rs | 6 +- src/io_uring/in_flight.rs | 29 ++++++++++ src/io_uring/mod.rs | 36 ++++++++++-- src/io_uring/uring.rs | 119 +++++++++++++++++++++++++++----------- src/lib.rs | 23 +++++--- 7 files changed, 202 insertions(+), 59 deletions(-) create mode 100644 examples/udp_echo.rs diff --git a/examples/udp_echo.rs b/examples/udp_echo.rs new file mode 100644 index 0000000..0f968fd --- /dev/null +++ b/examples/udp_echo.rs @@ -0,0 +1,24 @@ +use std::{ + io::self, + net::{UdpSocket}, +}; + +fn main() -> io::Result<()> { + let mut config = rio::Config::default(); + config.depth = 4096; + let ring = config.start()?; + let socket = UdpSocket::bind("0.0.0.0:34254")?; + + extreme::run(async { + let buffer = &mut [0u8; 1024]; + loop { + let (amt, peer) = ring.recv_from(&socket, &buffer).await?; + let peer_bstr = &buffer[..amt]; + println!("Got bytes: {} with bytestring {:?} from peer {:?}", + amt, &peer_bstr, peer); + let sent = ring.send_to(&socket, &peer_bstr, &peer).await?; + println!("Sent bytes: {} to peer", sent); + assert_eq!(amt, sent); + } + }) +} diff --git a/src/completion.rs b/src/completion.rs index 16112b8..cf1e1ee 100644 --- a/src/completion.rs +++ b/src/completion.rs @@ -8,13 +8,13 @@ use std::{ }; use super::{ - io_uring::io_uring_cqe, FromCqe, Measure, Uring, M, + CqeData, FromCqeData, Measure, Uring, M, }; #[derive(Debug)] struct CompletionState { done: bool, - item: Option>, + item: Option>, waker: Option, } @@ -38,7 +38,7 @@ impl Default for CompletionState { /// happen with `std::mem::forget`, cycles in /// `Arc` or `Rc`, and in other ways. #[derive(Debug)] -pub struct Completion<'a, C: FromCqe> { +pub struct Completion<'a, C: FromCqeData> { lifetime: PhantomData<&'a C>, mu: Arc>, cv: Arc, @@ -55,7 +55,7 @@ pub struct Filler { /// Create a new `Filler` and the `Completion` /// that will be filled by its completion. -pub fn pair<'a, C: FromCqe>( +pub fn pair<'a, C: FromCqeData>( uring: &'a Uring, ) -> (Completion<'a, C>, Filler) { let mu = @@ -73,19 +73,19 @@ pub fn pair<'a, C: FromCqe>( (future, filler) } -impl<'a, C: FromCqe> Completion<'a, C> { +impl<'a, C: FromCqeData> Completion<'a, C> { /// Block on the `Completion`'s completion /// or dropping of the `Filler` pub fn wait(self) -> io::Result where - C: FromCqe, + C: FromCqeData, { self.wait_inner().unwrap() } fn wait_inner(&self) -> Option> where - C: FromCqe, + C: FromCqeData, { debug_assert_ne!( self.sqe_id, @@ -106,18 +106,18 @@ impl<'a, C: FromCqe> Completion<'a, C> { } inner.item.take().map(|io_result| { - io_result.map(FromCqe::from_cqe) + io_result.map(FromCqeData::from_cqe_data) }) } } -impl<'a, C: FromCqe> Drop for Completion<'a, C> { +impl<'a, C: FromCqeData> Drop for Completion<'a, C> { fn drop(&mut self) { self.wait_inner(); } } -impl<'a, C: FromCqe> Future for Completion<'a, C> { +impl<'a, C: FromCqeData> Future for Completion<'a, C> { type Output = io::Result; fn poll( @@ -135,7 +135,7 @@ impl<'a, C: FromCqe> Future for Completion<'a, C> { .item .take() .unwrap() - .map(FromCqe::from_cqe), + .map(FromCqeData::from_cqe_data), ) } else { if !state.done { @@ -148,7 +148,7 @@ impl<'a, C: FromCqe> Future for Completion<'a, C> { impl Filler { /// Complete the `Completion` - pub fn fill(self, inner: io::Result) { + pub fn fill(self, inner: io::Result) { let mut state = self.mu.lock().unwrap(); if let Some(waker) = state.waker.take() { diff --git a/src/io_uring/cq.rs b/src/io_uring/cq.rs index 606798b..c7973e6 100644 --- a/src/io_uring/cq.rs +++ b/src/io_uring/cq.rs @@ -151,7 +151,11 @@ impl Cq { let result = if res < 0 { Err(io::Error::from_raw_os_error(res.neg())) } else { - Ok(*cqe) + let address = cq.in_flight.take_address(ticket as usize); + Ok(CqeData{ + cqe: *cqe, + address: address, + }) }; completion_filler.fill(result); diff --git a/src/io_uring/in_flight.rs b/src/io_uring/in_flight.rs index f90baf3..af6058e 100644 --- a/src/io_uring/in_flight.rs +++ b/src/io_uring/in_flight.rs @@ -1,4 +1,5 @@ use std::ptr::null_mut; +use std::net::{SocketAddr, IpAddr, Ipv4Addr}; use super::*; @@ -6,6 +7,7 @@ pub(crate) struct InFlight { iovecs: UnsafeCell>, msghdrs: UnsafeCell>, fillers: UnsafeCell>>, + addresses: UnsafeCell>>, } impl std::fmt::Debug for InFlight { @@ -33,14 +35,18 @@ impl InFlight { ]); let mut filler_vec = Vec::with_capacity(size); + let mut addresses_vec = Vec::with_capacity(size); for _ in 0..size { filler_vec.push(None); + addresses_vec.push(None); } let fillers = UnsafeCell::new(filler_vec); + let addresses = UnsafeCell::new(addresses_vec); InFlight { iovecs, msghdrs, fillers, + addresses, } } @@ -48,6 +54,7 @@ impl InFlight { &self, ticket: usize, iovec: Option, + address: Option<(*const libc::sockaddr, libc::socklen_t)>, msghdr: bool, filler: Filler, ) -> u64 { @@ -55,6 +62,7 @@ impl InFlight { unsafe { let iovec_ptr = self.iovecs.get(); let msghdr_ptr = self.msghdrs.get(); + let addresses_ptr = self.addresses.get(); if let Some(iovec) = iovec { (*iovec_ptr)[ticket] = iovec; @@ -64,6 +72,17 @@ impl InFlight { .as_mut_ptr() .add(ticket); (*msghdr_ptr)[ticket].msg_iovlen = 1; + if let Some((sname, slen)) = address { + (*addresses_ptr)[ticket] = None; + (*msghdr_ptr)[ticket].msg_name = sname as *mut libc::c_void; + (*msghdr_ptr)[ticket].msg_namelen = slen; + } else { + (*addresses_ptr)[ticket] = + Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0)); + let (sname, slen) = addr2raw((*addresses_ptr)[ticket].as_ref().unwrap()); + (*msghdr_ptr)[ticket].msg_name = sname as *mut libc::c_void; + (*msghdr_ptr)[ticket].msg_namelen = slen; + } } } (*self.fillers.get())[ticket] = Some(filler); @@ -90,4 +109,14 @@ impl InFlight { (*self.fillers.get())[ticket].take().unwrap() } } + + pub(crate) fn take_address( + &self, + ticket: usize, + ) -> Option { + #[allow(unsafe_code)] + unsafe { + (*self.addresses.get())[ticket].take() + } + } } diff --git a/src/io_uring/mod.rs b/src/io_uring/mod.rs index 0cb4092..dfe0fe6 100644 --- a/src/io_uring/mod.rs +++ b/src/io_uring/mod.rs @@ -17,7 +17,7 @@ use std::{ }; use super::{ - pair, AsIoVec, AsIoVecMut, Completion, Filler, FromCqe, + pair, AsIoVec, AsIoVecMut, Completion, CqeData, Filler, FromCqeData, Measure, M, }; @@ -107,11 +107,39 @@ fn uring_mmap( Ok(ptr) } -impl FromCqe for TcpStream { - fn from_cqe(cqe: io_uring_cqe) -> TcpStream { +impl FromCqeData for TcpStream { + fn from_cqe_data(data: CqeData) -> TcpStream { #[allow(unsafe_code)] unsafe { - TcpStream::from_raw_fd(cqe.res) + TcpStream::from_raw_fd(data.cqe.res) + } + } +} + +impl FromCqeData for (usize, ::std::net::SocketAddr) { + fn from_cqe_data(data: CqeData) -> (usize, ::std::net::SocketAddr) { + let bytes = usize::try_from(data.cqe.res).unwrap(); + (bytes, data.address.unwrap()) + } +} + +fn addr2raw( + addr: &std::net::SocketAddr, +) -> (*const libc::sockaddr, libc::socklen_t) { + match *addr { + std::net::SocketAddr::V4(ref a) => { + let b: *const std::net::SocketAddrV4 = a; + ( + b as *const _, + std::mem::size_of_val(a) as libc::socklen_t, + ) + } + std::net::SocketAddr::V6(ref a) => { + let b: *const std::net::SocketAddrV6 = a; + ( + b as *const _, + std::mem::size_of_val(a) as libc::socklen_t, + ) } } } diff --git a/src/io_uring/uring.rs b/src/io_uring/uring.rs index 3e53a74..b611c49 100644 --- a/src/io_uring/uring.rs +++ b/src/io_uring/uring.rs @@ -25,6 +25,12 @@ pub struct Uring { submitted: AtomicU64, } +enum MsgInfo<'a> { + None, + VecOnly, + VecAndAddress(&'a ::std::net::SocketAddr), +} + #[allow(unsafe_code)] unsafe impl Send for Uring {} @@ -34,7 +40,7 @@ unsafe impl Sync for Uring {} impl Drop for Uring { fn drop(&mut self) { let poison_pill_res = - self.with_sqe::<_, ()>(None, false, |sqe| { + self.with_sqe::<_, ()>(None, MsgInfo::None, |sqe| { sqe.prep_rw( IORING_OP_NOP, 0, @@ -123,7 +129,7 @@ impl Uring { &'a self, tcp_listener: &'a TcpListener, ) -> Completion<'a, TcpStream> { - self.with_sqe(None, false, |sqe| { + self.with_sqe(None, MsgInfo::None, |sqe| { sqe.prep_rw( IORING_OP_ACCEPT, tcp_listener.as_raw_fd(), @@ -151,7 +157,7 @@ impl Uring { F: AsRawFd, { let (addr, len) = addr2raw(address); - self.with_sqe(None, false, |sqe| { + self.with_sqe(None, MsgInfo::None, |sqe| { sqe.prep_rw( IORING_OP_CONNECT, socket.as_raw_fd(), @@ -186,6 +192,34 @@ impl Uring { self.send_ordered(stream, iov, Ordering::None) } + /// Send a message to the specified address on the + /// given sessionless datagram socket. + /// + /// Returns the length that was successfully + /// written. + pub fn send_to<'a, F, B>( + &'a self, + socket: &'a F, + iov: &'a B, + address: &'a ::std::net::SocketAddr, + ) -> Completion<'a, usize> + where + F: AsRawFd, + B: 'a + AsIoVec, + { + let iov = iov.into_new_iovec(); + + self.with_sqe(Some(iov), MsgInfo::VecAndAddress(address), |sqe| { + sqe.prep_rw( + IORING_OP_SENDMSG, + socket.as_raw_fd(), + 0, + 0, + Ordering::None, + ); + }) + } + /// Send a buffer to the target socket /// or file-like destination. /// @@ -210,7 +244,7 @@ impl Uring { { let iov = iov.into_new_iovec(); - self.with_sqe(None, true, |sqe| { + self.with_sqe(None, MsgInfo::VecOnly, |sqe| { sqe.prep_rw( IORING_OP_SEND, stream.as_raw_fd(), @@ -246,6 +280,33 @@ impl Uring { self.recv_ordered(stream, iov, Ordering::None) } + /// Receive a message from a sessionless datagram + /// socket. + /// + /// Returns the length that was successfuly read and + /// the socket address of the peer. + pub fn recv_from<'a, F, B>( + &'a self, + socket: &'a F, + iov: &'a B, + ) -> Completion<'a, (usize, ::std::net::SocketAddr)> + where + F: AsRawFd, + B: AsIoVec + AsIoVecMut, + { + let iov = iov.into_new_iovec(); + + self.with_sqe(Some(iov), MsgInfo::VecOnly, |sqe| { + sqe.prep_rw( + IORING_OP_RECVMSG, + socket.as_raw_fd(), + 0, + 0, + Ordering::None, + ); + }) + } + /// Receive data from the target socket /// or file-like destination, and place /// it in the given buffer. @@ -271,7 +332,7 @@ impl Uring { { let iov = iov.into_new_iovec(); - self.with_sqe(Some(iov), true, |sqe| { + self.with_sqe(Some(iov), MsgInfo::VecOnly, |sqe| { sqe.prep_rw( IORING_OP_RECV, stream.as_raw_fd(), @@ -343,7 +404,7 @@ impl Uring { file: &'a File, ordering: Ordering, ) -> Completion<'a, ()> { - self.with_sqe(None, false, |sqe| { + self.with_sqe(None, MsgInfo::None, |sqe| { sqe.prep_rw( IORING_OP_FSYNC, file.as_raw_fd(), @@ -413,7 +474,7 @@ impl Uring { file: &'a File, ordering: Ordering, ) -> Completion<'a, ()> { - self.with_sqe(None, false, |mut sqe| { + self.with_sqe(None, MsgInfo::None, |mut sqe| { sqe.prep_rw( IORING_OP_FSYNC, file.as_raw_fd(), @@ -471,7 +532,7 @@ impl Uring { len: usize, ordering: Ordering, ) -> Completion<'a, ()> { - self.with_sqe(None, false, |mut sqe| { + self.with_sqe(None, MsgInfo::None, |mut sqe| { sqe.prep_rw( IORING_OP_SYNC_FILE_RANGE, file.as_raw_fd(), @@ -551,7 +612,7 @@ impl Uring { { self.with_sqe( Some(iov.into_new_iovec()), - false, + MsgInfo::None, |sqe| { sqe.prep_rw( IORING_OP_WRITEV, @@ -621,7 +682,7 @@ impl Uring { { self.with_sqe( Some(iov.into_new_iovec()), - false, + MsgInfo::None, |sqe| { sqe.prep_rw( IORING_OP_READV, @@ -646,7 +707,7 @@ impl Uring { &'a self, ordering: Ordering, ) -> Completion<'a, ()> { - self.with_sqe(None, false, |sqe| { + self.with_sqe(None, MsgInfo::None, |sqe| { sqe.prep_rw(IORING_OP_NOP, 0, 1, 0, ordering) }) } @@ -681,19 +742,28 @@ impl Uring { fn with_sqe<'a, F, C>( &'a self, iovec: Option, - msghdr: bool, + msg: MsgInfo<'a>, f: F, ) -> Completion<'a, C> where F: FnOnce(&mut io_uring_sqe), - C: FromCqe, + C: FromCqeData, { let ticket = self.ticket_queue.pop(); let (mut completion, filler) = pair(self); + let (msghdr, address) = match msg { + MsgInfo::None => (false, None), + MsgInfo::VecOnly => (true, None), + MsgInfo::VecAndAddress(address) => { + let address = addr2raw(address); + (true, Some(address)) + }, + }; + let data_ptr = self .in_flight - .insert(ticket, iovec, msghdr, filler); + .insert(ticket, iovec, address, msghdr, filler); let mut sq = { let _get_sq_mu = Measure::new(&M.sq_mu_wait); @@ -729,24 +799,3 @@ impl Uring { completion } } - -fn addr2raw( - addr: &std::net::SocketAddr, -) -> (*const libc::sockaddr, libc::socklen_t) { - match *addr { - std::net::SocketAddr::V4(ref a) => { - let b: *const std::net::SocketAddrV4 = a; - ( - b as *const _, - std::mem::size_of_val(a) as libc::socklen_t, - ) - } - std::net::SocketAddr::V6(ref a) => { - let b: *const std::net::SocketAddrV6 = a; - ( - b as *const _, - std::mem::size_of_val(a) as libc::socklen_t, - ) - } - } -} diff --git a/src/lib.rs b/src/lib.rs index 540d6fc..e2d8924 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -289,22 +289,31 @@ pub trait AsIoVecMut {} impl> AsIoVecMut for A {} +/// Successful operation return data from a completed CQ entry, +/// including both the low-level `io_uring_cqe` as well as any +/// socket address information received from the `msghdr`. +#[derive(Debug, Copy, Clone)] +pub struct CqeData { + cqe: io_uring::io_uring_cqe, + address: Option<::std::net::SocketAddr>, +} + /// A trait for describing transformations from the /// `io_uring_cqe` type into an expected meaningful /// high-level result. -pub trait FromCqe { +pub trait FromCqeData { /// Describes a conversion from a successful /// `io_uring_cqe` to a desired output type. - fn from_cqe(cqe: io_uring::io_uring_cqe) -> Self; + fn from_cqe_data(data: CqeData) -> Self; } -impl FromCqe for usize { - fn from_cqe(cqe: io_uring::io_uring_cqe) -> usize { +impl FromCqeData for usize { + fn from_cqe_data(data: CqeData) -> usize { use std::convert::TryFrom; - usize::try_from(cqe.res).unwrap() + usize::try_from(data.cqe.res).unwrap() } } -impl FromCqe for () { - fn from_cqe(_: io_uring::io_uring_cqe) {} +impl FromCqeData for () { + fn from_cqe_data(_: CqeData) {} } From 7fbce22841e7c3923b7fd0a676ed9881f0b4a93a Mon Sep 17 00:00:00 2001 From: Jamie Turner Date: Mon, 1 Feb 2021 04:33:19 +0000 Subject: [PATCH 2/2] No need to tweak configs for simple example. --- examples/udp_echo.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/examples/udp_echo.rs b/examples/udp_echo.rs index 0f968fd..2662aac 100644 --- a/examples/udp_echo.rs +++ b/examples/udp_echo.rs @@ -4,9 +4,7 @@ use std::{ }; fn main() -> io::Result<()> { - let mut config = rio::Config::default(); - config.depth = 4096; - let ring = config.start()?; + let ring = rio::new(); let socket = UdpSocket::bind("0.0.0.0:34254")?; extreme::run(async {