Skip to content

Commit b12f119

Browse files
committed
Drop dependency on futures-channel
Tokio already provides mpsc and oneshot, and it is an unconditional dependency. Use those implementations and drop a superfluous dependency.
1 parent f9f8f44 commit b12f119

6 files changed

Lines changed: 19 additions & 21 deletions

File tree

Cargo.toml

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@ tokio = { version = "1", features = ["sync"] }
2828
# Optional
2929

3030
atomic-waker = { version = "1.1.2", optional = true }
31-
futures-channel = { version = "0.3", optional = true }
3231
futures-core = { version = "0.3.31", optional = true }
3332
futures-util = { version = "0.3", default-features = false, features = ["alloc"], optional = true }
3433
h2 = { version = "0.4.2", optional = true }
@@ -44,7 +43,6 @@ want = { version = "0.3", optional = true }
4443

4544
[dev-dependencies]
4645
form_urlencoded = "1"
47-
futures-channel = { version = "0.3", features = ["sink"] }
4846
futures-util = { version = "0.3", default-features = false, features = ["alloc", "sink"] }
4947
http-body-util = "0.1"
5048
pretty_env_logger = "0.5"
@@ -80,8 +78,8 @@ full = [
8078
]
8179

8280
# HTTP versions
83-
http1 = ["dep:atomic-waker", "dep:futures-channel", "dep:futures-core", "dep:httparse", "dep:itoa", "dep:pin-utils"]
84-
http2 = ["dep:futures-channel", "dep:futures-core", "dep:h2"]
81+
http1 = ["dep:atomic-waker", "dep:futures-core", "dep:httparse", "dep:itoa", "dep:pin-utils"]
82+
http2 = ["dep:futures-core", "dep:h2"]
8583

8684
# Client/Server
8785
client = ["dep:want", "dep:pin-project-lite", "dep:smallvec"]

benches/end_to_end.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -341,7 +341,7 @@ impl Opts {
341341
let make_request = || {
342342
let chunk_cnt = self.request_chunks;
343343
let body = if chunk_cnt > 0 {
344-
let (mut tx, rx) = futures_channel::mpsc::channel(0);
344+
let (mut tx, rx) = tokio::sync::mpsc::channel(0);
345345

346346
let chunk = self
347347
.request_body

src/body/incoming.rs

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,16 @@ use std::pin::Pin;
55
use std::task::{Context, Poll};
66

77
use bytes::Bytes;
8-
#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
9-
use futures_channel::{mpsc, oneshot};
108
#[cfg(all(
119
any(feature = "http1", feature = "http2"),
1210
any(feature = "client", feature = "server")
1311
))]
1412
use futures_core::ready;
1513
#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
16-
use futures_core::{stream::FusedStream, Stream}; // for mpsc::Receiver
17-
#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
1814
use http::HeaderMap;
1915
use http_body::{Body, Frame, SizeHint};
16+
#[cfg(all(feature = "http1", any(feature = "client", feature = "server")))]
17+
use tokio::sync::{mpsc, oneshot};
2018

2119
#[cfg(all(
2220
any(feature = "http1", feature = "http2"),
@@ -219,8 +217,8 @@ impl Body for Incoming {
219217
} => {
220218
want_tx.send(WANT_READY);
221219

222-
if !data_rx.is_terminated() {
223-
if let Some(chunk) = ready!(Pin::new(data_rx).poll_next(cx)?) {
220+
if !data_rx.is_closed() {
221+
if let Some(chunk) = ready!(Pin::new(data_rx).poll_recv(cx)?) {
224222
len.sub_if(chunk.len() as u64);
225223
return Poll::Ready(Some(Ok(Frame::data(chunk))));
226224
}
@@ -353,9 +351,11 @@ impl Sender {
353351
pub(crate) fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {
354352
// Check if the receiver end has tried polling for the body yet
355353
ready!(self.poll_want(cx)?);
356-
self.data_tx
357-
.poll_ready(cx)
358-
.map_err(|_| crate::Error::new_closed())
354+
if self.data_tx.capacity() > 0 {
355+
Poll::Ready(Ok(()))
356+
} else {
357+
Poll::Pending
358+
}
359359
}
360360

361361
fn poll_want(&mut self, cx: &mut Context<'_>) -> Poll<crate::Result<()>> {

src/proto/h2/client.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,13 @@ use std::{
99

1010
use crate::rt::{Read, Write};
1111
use bytes::Bytes;
12-
use futures_channel::mpsc::{Receiver, Sender};
13-
use futures_channel::{mpsc, oneshot};
14-
use futures_core::{ready, FusedFuture, FusedStream, Stream};
12+
use futures_core::{ready, FusedFuture};
1513
use h2::client::{Builder, Connection, SendRequest};
1614
use h2::SendStream;
1715
use http::{Method, StatusCode};
1816
use pin_project_lite::pin_project;
17+
use tokio::sync::mpsc::{Receiver, Sender};
18+
use tokio::sync::{mpsc, oneshot};
1919

2020
use super::ping::{Ponger, Recorder};
2121
use super::{ping, H2Upgraded, PipeToSendStream, SendBuf};
@@ -343,7 +343,7 @@ where
343343
return Poll::Ready(());
344344
}
345345

346-
if !this.drop_rx.is_terminated() && Pin::new(&mut this.drop_rx).poll_next(cx).is_ready() {
346+
if !this.drop_rx.is_closed() && Pin::new(&mut this.drop_rx).poll_recv(cx).is_ready() {
347347
// mpsc has been dropped, hopefully polling
348348
// the connection some more should start shutdown
349349
// and then close.

tests/client.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,10 @@ use hyper::header::{HeaderMap, HeaderName, HeaderValue};
1717
use hyper::{Method, Request, StatusCode, Uri, Version};
1818

1919
use bytes::Bytes;
20-
use futures_channel::oneshot;
2120
use futures_util::future::{self, FutureExt, TryFuture, TryFutureExt};
2221
use support::TokioIo;
2322
use tokio::net::TcpStream;
23+
use tokio::sync::oneshot;
2424
mod support;
2525

2626
fn s(buf: &[u8]) -> &str {
@@ -1494,12 +1494,12 @@ mod conn {
14941494
use std::time::Duration;
14951495

14961496
use bytes::{Buf, Bytes};
1497-
use futures_channel::{mpsc, oneshot};
14981497
use futures_util::future::{self, poll_fn, FutureExt, TryFutureExt};
14991498
use http_body_util::{BodyExt, Empty, Full, StreamBody};
15001499
use hyper::rt::Timer;
15011500
use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _, DuplexStream};
15021501
use tokio::net::{TcpListener as TkTcpListener, TcpStream};
1502+
use tokio::sync::{mpsc, oneshot};
15031503

15041504
use hyper::body::{Body, Frame};
15051505
use hyper::client::conn;

tests/server.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ use std::thread;
1414
use std::time::Duration;
1515

1616
use bytes::Bytes;
17-
use futures_channel::oneshot;
1817
use futures_util::future::{self, Either, FutureExt};
1918
use h2::client::SendRequest;
2019
use h2::{RecvStream, SendStream};
@@ -25,6 +24,7 @@ use hyper::rt::{Read as AsyncRead, Write as AsyncWrite};
2524
use support::{TokioExecutor, TokioIo, TokioTimer};
2625
use tokio::io::{AsyncReadExt, AsyncWriteExt};
2726
use tokio::net::{TcpListener as TkTcpListener, TcpListener, TcpStream as TkTcpStream};
27+
use tokio::sync::oneshot;
2828

2929
use hyper::body::{Body, Incoming as IncomingBody};
3030
use hyper::server::conn::{http1, http2};

0 commit comments

Comments
 (0)