Description
Version
hyper = { version = "1.1.0", features = ["full"] }
tokio = { version = "1", features = ["full"] }
http-body-util = "0.1"
hyper-util = { version = "0.1", features = ["full"] }
futures-channel = { version = "0.3", features = ["sink"] }
futures-util = { version = "0.3", default-features = false, features = ["alloc", "sink"] }
bytes = "1.6.0"
h2 = "0.4"
http = "1.1.0"
Platform
Darwin Kernel Version 23.4.0: Fri Mar 15 00:12:49 PDT 2024; root:xnu-10063.101.17~1/RELEASE_ARM64_T6020 arm64
Description
Silent failure on HTTP2 server reading request body when client sends RST_STREAM(CANCEL)
on the stream containing request body.
In the example repro, the client is sending request body containing payload, followed by RST_STREAM(CANCEL)
frame. Since the server is reading canceled stream, request.body_mut().collect().await.unwrap().to_bytes()
is expected to trigger a panic via unwrap()
. However, the function call returns the payload successfully without any indication of error.
After digging through the PR history, the changes to ignore CANCEL
and NO_ERROR
code on RST_STREAM
was introduced in #3275 in response to issue #2872. While this may be an expected behavior, is there a way to read stream more gracefully on the server such that the error on RST_STREAM
is propagated?
Repro
https://github.com/sirahd/hyper-http2-rst-stream-cancel-repro
server code snippet:
use std::convert::Infallible;
use std::future::Future;
use std::net::SocketAddr;
use http_body_util::BodyExt;
use http_body_util::Full;
use hyper::{Method, Request, Response};
use hyper::body::Bytes;
use hyper::rt::Executor;
use hyper::server::conn::http2;
use hyper::service::service_fn;
use hyper_util::rt::TokioIo;
use tokio::net::TcpListener;
async fn hello(mut request: Request<hyper::body::Incoming>) -> Result<Response<Full<Bytes>>, Infallible> {
println!("receiving request {:?}", request);
match *request.method() {
Method::POST => {
let data = request.body_mut().collect().await.unwrap().to_bytes();
let transfer_bytes = data.len();
println!("{:?}", transfer_bytes);
Ok(Response::new(Full::new(Bytes::from(transfer_bytes.to_string()))))
}
_ => Ok(Response::new(Full::new(Bytes::new())))
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let addr = SocketAddr::from(([127, 0, 0, 1], 4034));
let listener = TcpListener::bind(addr).await?;
loop {
let (stream, _) = listener.accept().await?;
let io = TokioIo::new(stream);
tokio::task::spawn(async move {
if let Err(err) = http2::Builder::new(TokioExecutor)
.serve_connection(io, service_fn(hello))
.await
{
eprintln!("Error serving connection: {:?}", err);
}
});
}
}
#[derive(Clone)]
struct TokioExecutor;
impl<F> Executor<F> for TokioExecutor
where
F: Future + Send + 'static,
F::Output: Send + 'static,
{
fn execute(&self, future: F) {
tokio::spawn(future);
}
}
client code snippet:
use std::error::Error;
use std::time::Duration;
use bytes::Bytes;
use h2::{client, Reason};
use http::{Method, Request};
use tokio::net::TcpStream;
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
let tcp = TcpStream::connect("127.0.0.1:4034").await?;
let (h2, connection) = client::handshake(tcp).await?;
tokio::spawn(async move {
connection.await.unwrap();
});
let mut send_request = h2.ready().await?;
let request = Request::get("http://127.0.0.1:4034")
.method(Method::POST)
.body(())
.unwrap();
let (response, mut send_stream) = send_request
.send_request(request, false).unwrap();
send_stream.send_data(Bytes::from(vec![5; 1000]), false)?;
tokio::time::sleep(Duration::from_secs(5)).await;
// sending RST_STREAM(CANCEL) here
send_stream.send_reset(Reason::CANCEL);
let response = response.await?;
println!("{:?}", response);
Ok(())
}