Skip to content

Commit da57ee7

Browse files
authored
fix: don't block on creating the SSE stream (#553)
1 parent 76cdf48 commit da57ee7

File tree

1 file changed

+44
-40
lines changed

1 file changed

+44
-40
lines changed

crates/rmcp/src/transport/streamable_http_client.rs

Lines changed: 44 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -391,47 +391,51 @@ impl<C: StreamableHttpClient> Worker for StreamableHttpClientWorker<C> {
391391
}
392392
let mut streams = tokio::task::JoinSet::new();
393393
if let Some(session_id) = &session_id {
394-
match self
395-
.client
396-
.get_stream(
397-
config.uri.clone(),
398-
session_id.clone(),
399-
None,
400-
config.auth_header.clone(),
401-
)
402-
.await
403-
{
404-
Ok(stream) => {
405-
let sse_stream = SseAutoReconnectStream::new(
406-
stream,
407-
StreamableHttpClientReconnect {
408-
client: self.client.clone(),
409-
session_id: session_id.clone(),
410-
uri: config.uri.clone(),
411-
auth_header: config.auth_header.clone(),
412-
},
413-
self.config.retry_config.clone(),
414-
);
415-
streams.spawn(Self::execute_sse_stream(
416-
sse_stream,
417-
sse_worker_tx.clone(),
418-
false,
419-
transport_task_ct.child_token(),
420-
));
421-
tracing::debug!("got common stream");
422-
}
423-
Err(StreamableHttpError::ServerDoesNotSupportSse) => {
424-
tracing::debug!("server doesn't support sse, skip common stream");
425-
}
426-
Err(e) => {
427-
// fail to get common stream
428-
tracing::error!("fail to get common stream: {e}");
429-
return Err(WorkerQuitReason::fatal(
430-
e,
431-
"get general purpose event stream",
432-
));
394+
let client = self.client.clone();
395+
let uri = config.uri.clone();
396+
let session_id = session_id.clone();
397+
let auth_header = config.auth_header.clone();
398+
let retry_config = self.config.retry_config.clone();
399+
let sse_worker_tx = sse_worker_tx.clone();
400+
let transport_task_ct = transport_task_ct.clone();
401+
let config_uri = config.uri.clone();
402+
let config_auth_header = config.auth_header.clone();
403+
404+
streams.spawn(async move {
405+
match client
406+
.get_stream(uri.clone(), session_id.clone(), None, auth_header.clone())
407+
.await
408+
{
409+
Ok(stream) => {
410+
let sse_stream = SseAutoReconnectStream::new(
411+
stream,
412+
StreamableHttpClientReconnect {
413+
client: client.clone(),
414+
session_id: session_id.clone(),
415+
uri: config_uri,
416+
auth_header: config_auth_header,
417+
},
418+
retry_config,
419+
);
420+
Self::execute_sse_stream(
421+
sse_stream,
422+
sse_worker_tx,
423+
false,
424+
transport_task_ct.child_token(),
425+
)
426+
.await
427+
}
428+
Err(StreamableHttpError::ServerDoesNotSupportSse) => {
429+
tracing::debug!("server doesn't support sse, skip common stream");
430+
Ok(())
431+
}
432+
Err(e) => {
433+
// fail to get common stream
434+
tracing::error!("fail to get common stream: {e}");
435+
Err(e)
436+
}
433437
}
434-
}
438+
});
435439
}
436440
loop {
437441
let event = tokio::select! {

0 commit comments

Comments
 (0)