Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion src/body/incoming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,16 @@ impl Body for Incoming {
ping.record_non_data();
Poll::Ready(Ok(t.map(Frame::trailers)).transpose())
}
Err(e) => Poll::Ready(Some(Err(crate::Error::new_h2(e)))),
Err(e) => {
match e.reason() {
// These reasons should cause reading the trailers to stop, but not fail it.
// The same logic as for `Read for H2Upgraded` is applied here.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I realize this is just bring down the logic that already exists for data frames, and the code itself looks fine. And the unit tests make sense why they want to suppress the error.

But, reading it all again, I have a worry: couldn't this also be swallowing real cancellations, and the user thinks they got a full body, when they got a truncated one?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking over RFC 7540, I believe you're right that both this and the above branch should only be looking at Reason::NO_ERROR, and shouldn't be dropping the Reason::CANCEL. The section on early responses in 8.1 only mentions NO_ERROR:

An HTTP response is complete after the server sends -- or the client
receives -- a frame with the END_STREAM flag set (including any
CONTINUATION frames needed to complete a header block). A server can
send a complete response prior to the client sending an entire
request if the response does not depend on any portion of the request
that has not been sent and received. When this is true, a server MAY
request that the client abort transmission of a request without error
by sending a RST_STREAM with an error code of NO_ERROR after sending
a complete response (i.e., a frame with the END_STREAM flag).
Clients MUST NOT discard responses as a result of receiving such a
RST_STREAM, though clients can always discard responses at their
discretion for other reasons.

The only time that sending CANCEL is mentioned is under the description of push responses in 8.2.2:

If the client determines, for any reason, that it does not wish to
receive the pushed response from the server or if the server takes
too long to begin sending the promised response, the client can send
a RST_STREAM frame, using either the CANCEL or REFUSED_STREAM code
and referencing the pushed stream's identifier.

This section may be why the code was added to the match in #3275, but I agree that it does sound like it should be propagated so that the server is aware of the cancellation, as opposed to NO_ERROR where the RFC says that clients must not discard the response. I'll update this and #3999 to not filter out the Reason::CANCEL error.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in e1a501f.

Some(h2::Reason::NO_ERROR) | Some(h2::Reason::CANCEL) => {
Poll::Ready(None)
}
_ => Poll::Ready(Some(Err(crate::Error::new_h2(e)))),
}
}
}
}

Expand Down
97 changes: 89 additions & 8 deletions tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2552,7 +2552,7 @@ mod conn {
}

#[tokio::test]
async fn http2_responds_before_consuming_request_body() {
async fn http2_responds_before_consuming_request_body_no_trailers() {
// Test that a early-response from server works correctly (request body wasn't fully consumed).
// https://github.com/hyperium/hyper/issues/2872
use hyper::service::service_fn;
Expand Down Expand Up @@ -2596,15 +2596,96 @@ mod conn {
let resp = client.send_request(req).await.expect("send_request");
assert!(resp.status().is_success());

let mut body = String::new();
concat(resp.into_body())
let (body, trailers) = crate::concat_with_trailers(resp.into_body()).await.unwrap();
assert_eq!(body.as_ref(), b"No bread for you!");
assert!(trailers.is_none());
}

#[tokio::test]
async fn http2_responds_before_consuming_request_body_with_trailers() {
// Test that a early-response from server works correctly (request body wasn't fully consumed).
// https://github.com/hyperium/hyper/issues/2872
use hyper::body::{Body, Frame, SizeHint};
use hyper::header::{HeaderMap, HeaderValue};
use hyper::service::service_fn;

let _ = pretty_env_logger::try_init();

let (listener, addr) = setup_tk_test_server().await;

/// An `HttpBody` implementation whose `is_end_stream()` will
/// return `true` after sending trailers.
pub struct TrailersBody(Option<HeaderMap>);

impl Body for TrailersBody {
type Data = bytes::Bytes;
type Error = hyper::Error;

fn poll_frame(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
if let Some(trailers) = self.0.take() {
Poll::Ready(Some(Ok(Frame::trailers(trailers))))
} else {
Poll::Ready(None)
}
}

fn is_end_stream(&self) -> bool {
self.0.is_none()
}

fn size_hint(&self) -> SizeHint {
SizeHint::with_exact(0)
}
}

// Spawn an HTTP2 server that responds before reading the whole request body.
// It's normal case to decline the request due to headers or size of the body.
tokio::spawn(async move {
let sock = TokioIo::new(listener.accept().await.unwrap().0);
hyper::server::conn::http2::Builder::new(TokioExecutor)
.timer(TokioTimer)
.serve_connection(
sock,
service_fn(|_req| async move {
let mut trailers = HeaderMap::new();
trailers.insert("grpc", HeaderValue::from_static("0"));
let body = TrailersBody(Some(trailers));
Ok::<_, hyper::Error>(http::Response::new(body))
}),
)
.await
.expect("serve_connection");
});

let io = tcp_connect(&addr).await.expect("tcp connect");
let (mut client, conn) = conn::http2::Builder::new(TokioExecutor)
.timer(TokioTimer)
.handshake(io)
.await
.unwrap()
.reader()
.read_to_string(&mut body)
.unwrap();
.expect("http handshake");

tokio::spawn(async move {
conn.await.expect("client conn shouldn't error");
});

// Use a channel to keep request stream open
let (_tx, recv) = mpsc::channel::<Result<Frame<Bytes>, Box<dyn Error + Send + Sync>>>(0);
let req = Request::post("/a").body(StreamBody::new(recv)).unwrap();
let resp = client.send_request(req).await.expect("send_request");
assert!(resp.status().is_success());

let (body, trailers) = crate::concat_with_trailers(resp.into_body()).await.unwrap();

// No body:
assert!(body.is_empty());

assert_eq!(&body, "No bread for you!");
// Have our `grpc` trailer:
let trailers = trailers.expect("response has trailers");
assert_eq!(trailers.len(), 1);
assert_eq!(trailers.get("grpc").unwrap(), "0");
}

#[tokio::test]
Expand Down