Skip to content

Commit aeeff75

Browse files
min-mweiminwei
andauthored
Streamable HTTP: drain SSE frames until the initialize response, ignoring early notifications to prevent handshake timeouts (#467)
Co-authored-by: Min Wei <[email protected]>
1 parent 87fcc7c commit aeeff75

File tree

1 file changed

+23
-10
lines changed

1 file changed

+23
-10
lines changed

crates/rmcp/src/transport/streamable_http_client.rs

Lines changed: 23 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ pub use sse_stream::Error as SseError;
55
use sse_stream::Sse;
66
use thiserror::Error;
77
use tokio_util::sync::CancellationToken;
8+
use tracing::debug;
89

910
use super::common::client_side_sse::{ExponentialBackoff, SseRetryPolicy, SseStreamReconnect};
1011
use crate::{
@@ -90,16 +91,28 @@ impl StreamableHttpPostResponse {
9091
match self {
9192
Self::Json(message, session_id) => Ok((message, session_id)),
9293
Self::Sse(mut stream, session_id) => {
93-
let event =
94-
stream
95-
.next()
96-
.await
97-
.ok_or(StreamableHttpError::UnexpectedServerResponse(
98-
"empty sse stream".into(),
99-
))??;
100-
let message: ServerJsonRpcMessage =
101-
serde_json::from_str(&event.data.unwrap_or_default())?;
102-
Ok((message, session_id))
94+
while let Some(event) = stream.next().await {
95+
let event = event?;
96+
let payload = event.data.unwrap_or_default();
97+
if payload.trim().is_empty() {
98+
continue;
99+
}
100+
101+
let message: ServerJsonRpcMessage = serde_json::from_str(&payload)?;
102+
103+
if matches!(message, ServerJsonRpcMessage::Response(_)) {
104+
return Ok((message, session_id));
105+
}
106+
107+
debug!(
108+
?message,
109+
"received message before initialize response; continuing to drain stream"
110+
);
111+
}
112+
113+
Err(StreamableHttpError::UnexpectedServerResponse(
114+
"empty sse stream".into(),
115+
))
103116
}
104117
_ => Err(StreamableHttpError::UnexpectedServerResponse(
105118
"expect initialized, accepted".into(),

0 commit comments

Comments
 (0)