diff --git a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java index 154eb4703..f6eeb0b1f 100644 --- a/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java +++ b/mcp-spring/mcp-spring-webflux/src/main/java/io/modelcontextprotocol/client/transport/WebClientStreamableHttpTransport.java @@ -18,6 +18,7 @@ import org.springframework.http.HttpStatus; import org.springframework.http.MediaType; import org.springframework.http.codec.ServerSentEvent; +import org.springframework.util.StringUtils; import org.springframework.web.reactive.function.client.ClientResponse; import org.springframework.web.reactive.function.client.WebClient; import org.springframework.web.reactive.function.client.WebClientResponseException; @@ -279,8 +280,8 @@ public Mono sendMessage(McpSchema.JSONRPCMessage message) { }) .bodyValue(message) .exchangeToFlux(response -> { - if (transportSession - .markInitialized(response.headers().asHttpHeaders().getFirst(HttpHeaders.MCP_SESSION_ID))) { + String mcpSessionId = response.headers().asHttpHeaders().getFirst(HttpHeaders.MCP_SESSION_ID); + if (StringUtils.hasText(mcpSessionId) && transportSession.markInitialized(mcpSessionId)) { // Once we have a session, we try to open an async stream for // the server to send notifications and requests out-of-band. reconnect(null).contextWrite(sink.contextView()).subscribe(); diff --git a/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java b/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java index c73515938..fff8d5d97 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java +++ b/mcp/src/main/java/io/modelcontextprotocol/client/transport/HttpClientStreamableHttpTransport.java @@ -12,6 +12,7 @@ import java.net.http.HttpResponse.BodyHandler; import java.time.Duration; import java.util.List; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.CompletionException; import java.util.concurrent.atomic.AtomicReference; @@ -442,8 +443,8 @@ public Mono sendMessage(McpSchema.JSONRPCMessage sentMessage) { })).onErrorMap(CompletionException.class, t -> t.getCause()).onErrorComplete().subscribe(); })).flatMap(responseEvent -> { - if (transportSession.markInitialized( - responseEvent.responseInfo().headers().firstValue("mcp-session-id").orElseGet(() -> null))) { + String mcpSessionId = responseEvent.responseInfo().headers().firstValue("mcp-session-id").orElseGet(() -> null); + if (Objects.nonNull(mcpSessionId) && transportSession.markInitialized(mcpSessionId)) { // Once we have a session, we try to open an async stream for // the server to send notifications and requests out-of-band.