Skip to content

Commit c4eb910

Browse files
Merge pull request #889 from stjepang/fix-incoming
Store a future inside Incoming
2 parents e812663 + f7aa962 commit c4eb910

File tree

2 files changed

+62
-19
lines changed

2 files changed

+62
-19
lines changed

src/net/tcp/listener.rs

+32-10
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::fmt;
12
use std::future::Future;
23
use std::net::SocketAddr;
34
use std::pin::Pin;
@@ -8,7 +9,7 @@ use crate::io;
89
use crate::net::{TcpStream, ToSocketAddrs};
910
use crate::stream::Stream;
1011
use crate::sync::Arc;
11-
use crate::task::{Context, Poll};
12+
use crate::task::{ready, Context, Poll};
1213

1314
/// A TCP socket server, listening for connections.
1415
///
@@ -146,7 +147,10 @@ impl TcpListener {
146147
/// # Ok(()) }) }
147148
/// ```
148149
pub fn incoming(&self) -> Incoming<'_> {
149-
Incoming(self)
150+
Incoming {
151+
listener: self,
152+
accept: None,
153+
}
150154
}
151155

152156
/// Returns the local address that this listener is bound to.
@@ -182,18 +186,36 @@ impl TcpListener {
182186
/// [`incoming`]: struct.TcpListener.html#method.incoming
183187
/// [`TcpListener`]: struct.TcpListener.html
184188
/// [`std::net::Incoming`]: https://doc.rust-lang.org/std/net/struct.Incoming.html
185-
#[derive(Debug)]
186-
pub struct Incoming<'a>(&'a TcpListener);
189+
pub struct Incoming<'a> {
190+
listener: &'a TcpListener,
191+
accept: Option<
192+
Pin<Box<dyn Future<Output = io::Result<(TcpStream, SocketAddr)>> + Send + Sync + 'a>>,
193+
>,
194+
}
187195

188-
impl<'a> Stream for Incoming<'a> {
196+
impl Stream for Incoming<'_> {
189197
type Item = io::Result<TcpStream>;
190198

191-
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
192-
let future = self.0.accept();
193-
pin_utils::pin_mut!(future);
199+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
200+
loop {
201+
if self.accept.is_none() {
202+
self.accept = Some(Box::pin(self.listener.accept()));
203+
}
204+
205+
if let Some(f) = &mut self.accept {
206+
let res = ready!(f.as_mut().poll(cx));
207+
self.accept = None;
208+
return Poll::Ready(Some(res.map(|(stream, _)| stream)));
209+
}
210+
}
211+
}
212+
}
194213

195-
let (socket, _) = futures_core::ready!(future.poll(cx))?;
196-
Poll::Ready(Some(Ok(socket)))
214+
impl fmt::Debug for Incoming<'_> {
215+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
216+
f.debug_struct("Incoming")
217+
.field("listener", self.listener)
218+
.finish()
197219
}
198220
}
199221

src/os/unix/net/listener.rs

+30-9
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ use crate::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
1414
use crate::path::Path;
1515
use crate::stream::Stream;
1616
use crate::sync::Arc;
17-
use crate::task::{Context, Poll};
17+
use crate::task::{ready, Context, Poll};
1818

1919
/// A Unix domain socket server, listening for connections.
2020
///
@@ -128,7 +128,10 @@ impl UnixListener {
128128
/// # Ok(()) }) }
129129
/// ```
130130
pub fn incoming(&self) -> Incoming<'_> {
131-
Incoming(self)
131+
Incoming {
132+
listener: self,
133+
accept: None,
134+
}
132135
}
133136

134137
/// Returns the local socket address of this listener.
@@ -174,18 +177,36 @@ impl fmt::Debug for UnixListener {
174177
/// [`None`]: https://doc.rust-lang.org/std/option/enum.Option.html#variant.None
175178
/// [`incoming`]: struct.UnixListener.html#method.incoming
176179
/// [`UnixListener`]: struct.UnixListener.html
177-
#[derive(Debug)]
178-
pub struct Incoming<'a>(&'a UnixListener);
180+
pub struct Incoming<'a> {
181+
listener: &'a UnixListener,
182+
accept: Option<
183+
Pin<Box<dyn Future<Output = io::Result<(UnixStream, SocketAddr)>> + Send + Sync + 'a>>,
184+
>,
185+
}
179186

180187
impl Stream for Incoming<'_> {
181188
type Item = io::Result<UnixStream>;
182189

183-
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
184-
let future = self.0.accept();
185-
pin_utils::pin_mut!(future);
190+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
191+
loop {
192+
if self.accept.is_none() {
193+
self.accept = Some(Box::pin(self.listener.accept()));
194+
}
195+
196+
if let Some(f) = &mut self.accept {
197+
let res = ready!(f.as_mut().poll(cx));
198+
self.accept = None;
199+
return Poll::Ready(Some(res.map(|(stream, _)| stream)));
200+
}
201+
}
202+
}
203+
}
186204

187-
let (socket, _) = futures_core::ready!(future.poll(cx))?;
188-
Poll::Ready(Some(Ok(socket)))
205+
impl fmt::Debug for Incoming<'_> {
206+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
207+
f.debug_struct("Incoming")
208+
.field("listener", self.listener)
209+
.finish()
189210
}
190211
}
191212

0 commit comments

Comments
 (0)