Skip to content

Commit 9665259

Browse files
committed
make Reactor::sleep return associated Reactor::Sleep type
1 parent 5495708 commit 9665259

File tree

9 files changed

+25
-45
lines changed

9 files changed

+25
-45
lines changed

src/implementors/async_io.rs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,4 @@
1-
use crate::{
2-
sys::AsSysFd,
3-
traits::Reactor,
4-
util::{IOHandle, UnitFuture},
5-
};
1+
use crate::{sys::AsSysFd, traits::Reactor, util::IOHandle};
62
use async_io::{Async, Timer};
73
use futures_core::Stream;
84
use futures_io::{AsyncRead, AsyncWrite};
@@ -19,6 +15,7 @@ pub struct AsyncIO;
1915

2016
impl Reactor for AsyncIO {
2117
type TcpStream = Async<TcpStream>;
18+
type Sleep = Timer;
2219

2320
fn register<H: Read + Write + AsSysFd + Send + 'static>(
2421
&self,
@@ -27,8 +24,8 @@ impl Reactor for AsyncIO {
2724
Async::new(IOHandle::new(socket))
2825
}
2926

30-
fn sleep(&self, dur: Duration) -> impl Future<Output = ()> + Send + 'static {
31-
UnitFuture(Timer::after(dur))
27+
fn sleep(&self, dur: Duration) -> Self::Sleep {
28+
Timer::after(dur)
3229
}
3330

3431
fn interval(&self, dur: Duration) -> impl Stream<Item = Instant> + Send + 'static {

src/implementors/noop.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use crate::{
99
use futures_core::Stream;
1010
use futures_io::{AsyncRead, AsyncWrite};
1111
use std::{
12-
future::Future,
12+
future::{self, Future, Ready},
1313
io::{self, Read, Write},
1414
marker::PhantomData,
1515
net::SocketAddr,
@@ -60,6 +60,7 @@ impl Executor for Noop {
6060

6161
impl Reactor for Noop {
6262
type TcpStream = DummyIO;
63+
type Sleep = Ready<()>;
6364

6465
fn register<H: Read + Write + AsSysFd + Send + 'static>(
6566
&self,
@@ -68,8 +69,8 @@ impl Reactor for Noop {
6869
Ok(DummyIO)
6970
}
7071

71-
fn sleep(&self, _dur: Duration) -> impl Future<Output = ()> + Send + 'static {
72-
async {}
72+
fn sleep(&self, _dur: Duration) -> Self::Sleep {
73+
future::ready(())
7374
}
7475

7576
fn interval(&self, _dur: Duration) -> impl Stream<Item = Instant> + Send + 'static {

src/implementors/smol.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use crate::{
44
Runtime,
55
sys::AsSysFd,
66
traits::{Executor, Reactor, RuntimeKit},
7-
util::{IOHandle, Task, UnitFuture},
7+
util::{IOHandle, Task},
88
};
99
use futures_core::Stream;
1010
use futures_io::{AsyncRead, AsyncWrite};
@@ -58,6 +58,7 @@ impl Executor for Smol {
5858

5959
impl Reactor for Smol {
6060
type TcpStream = Async<TcpStream>;
61+
type Sleep = Timer;
6162

6263
fn register<H: Read + Write + AsSysFd + Send + 'static>(
6364
&self,
@@ -66,8 +67,8 @@ impl Reactor for Smol {
6667
Async::new(IOHandle::new(socket))
6768
}
6869

69-
fn sleep(&self, dur: Duration) -> impl Future<Output = ()> + Send + 'static {
70-
UnitFuture(Timer::after(dur))
70+
fn sleep(&self, dur: Duration) -> Self::Sleep {
71+
Timer::after(dur)
7172
}
7273

7374
fn interval(&self, dur: Duration) -> impl Stream<Item = Instant> + Send + 'static {

src/implementors/tokio.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use std::{
2222
use tokio::{
2323
net::TcpStream,
2424
runtime::{EnterGuard, Handle, Runtime as TokioRT},
25+
time::Sleep,
2526
};
2627
use tokio_stream::{StreamExt, wrappers::IntervalStream};
2728

@@ -137,6 +138,7 @@ impl Executor for Tokio {
137138

138139
impl Reactor for Tokio {
139140
type TcpStream = Compat<TcpStream>;
141+
type Sleep = Sleep;
140142

141143
fn register<H: Read + Write + AsSysFd + Send + 'static>(
142144
&self,
@@ -156,7 +158,7 @@ impl Reactor for Tokio {
156158
}
157159
}
158160

159-
fn sleep(&self, dur: Duration) -> impl Future<Output = ()> + Send + 'static {
161+
fn sleep(&self, dur: Duration) -> Self::Sleep {
160162
let _enter = self.enter();
161163
tokio::time::sleep(dur)
162164
}

src/runtime.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ impl<RK: RuntimeKit> Executor for Runtime<RK> {
6969

7070
impl<RK: RuntimeKit> Reactor for Runtime<RK> {
7171
type TcpStream = <RK as Reactor>::TcpStream;
72+
type Sleep = <RK as Reactor>::Sleep;
7273

7374
fn register<H: Read + Write + AsSysFd + Send + 'static>(
7475
&self,
@@ -77,7 +78,7 @@ impl<RK: RuntimeKit> Reactor for Runtime<RK> {
7778
self.kit.register(socket)
7879
}
7980

80-
fn sleep(&self, dur: Duration) -> impl Future<Output = ()> + Send + 'static {
81+
fn sleep(&self, dur: Duration) -> Self::Sleep {
8182
self.kit.sleep(dur)
8283
}
8384

src/traits/reactor.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,9 @@ pub trait Reactor {
1515
/// The type representing a TCP stream (after tcp_connect) for this reactor
1616
type TcpStream: AsyncRead + AsyncWrite + Send + Unpin + 'static;
1717

18+
/// The type representing a Sleep for this reactor
19+
type Sleep: Future + Send + 'static;
20+
1821
/// Register a synchronous handle, returning an asynchronous one
1922
fn register<H: Read + Write + AsSysFd + Send + 'static>(
2023
&self,
@@ -24,7 +27,7 @@ pub trait Reactor {
2427
Self: Sized;
2528

2629
/// Sleep for the given duration
27-
fn sleep(&self, dur: Duration) -> impl Future<Output = ()> + Send + 'static
30+
fn sleep(&self, dur: Duration) -> Self::Sleep
2831
where
2932
Self: Sized;
3033

@@ -69,6 +72,7 @@ where
6972
R::Target: Reactor + Sized,
7073
{
7174
type TcpStream = <<R as Deref>::Target as Reactor>::TcpStream;
75+
type Sleep = <<R as Deref>::Target as Reactor>::Sleep;
7276

7377
fn register<H: Read + Write + AsSysFd + Send + 'static>(
7478
&self,
@@ -77,7 +81,7 @@ where
7781
self.deref().register(socket)
7882
}
7983

80-
fn sleep(&self, dur: Duration) -> impl Future<Output = ()> + Send + 'static {
84+
fn sleep(&self, dur: Duration) -> Self::Sleep {
8185
self.deref().sleep(dur)
8286
}
8387

src/util/future.rs

Lines changed: 0 additions & 24 deletions
This file was deleted.

src/util/mod.rs

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,6 @@ pub use block_on::*;
99
mod dummy;
1010
pub use dummy::*;
1111

12-
mod future;
13-
pub use future::*;
14-
1512
#[cfg(feature = "async-io")]
1613
mod io;
1714
#[cfg(feature = "async-io")]

src/util/runtime.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ impl<E: Executor, R: Reactor> Executor for RuntimeParts<E, R> {
5353

5454
impl<E: Executor, R: Reactor> Reactor for RuntimeParts<E, R> {
5555
type TcpStream = R::TcpStream;
56+
type Sleep = R::Sleep;
5657

5758
fn register<H: Read + Write + AsSysFd + Send + 'static>(
5859
&self,
@@ -61,7 +62,7 @@ impl<E: Executor, R: Reactor> Reactor for RuntimeParts<E, R> {
6162
self.reactor.register(socket)
6263
}
6364

64-
fn sleep(&self, dur: Duration) -> impl Future<Output = ()> + Send + 'static {
65+
fn sleep(&self, dur: Duration) -> Self::Sleep {
6566
self.reactor.sleep(dur)
6667
}
6768

0 commit comments

Comments
 (0)