From e6529a940d26e19369c6aa64a7f5dece7b0aaa15 Mon Sep 17 00:00:00 2001 From: ForestFairy Date: Thu, 17 Apr 2025 14:24:48 +0800 Subject: [PATCH 1/2] feat: the last request timestamp api for McpSession --- .../io/modelcontextprotocol/spec/McpClientSession.java | 9 +++++++++ .../io/modelcontextprotocol/spec/McpServerSession.java | 9 +++++++++ .../java/io/modelcontextprotocol/spec/McpSession.java | 5 +++++ 3 files changed, 23 insertions(+) diff --git a/mcp/src/main/java/io/modelcontextprotocol/spec/McpClientSession.java b/mcp/src/main/java/io/modelcontextprotocol/spec/McpClientSession.java index c1f42e3f..ce2b2cda 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/spec/McpClientSession.java +++ b/mcp/src/main/java/io/modelcontextprotocol/spec/McpClientSession.java @@ -61,6 +61,9 @@ public class McpClientSession implements McpSession { /** Atomic counter for generating unique request IDs */ private final AtomicLong requestCounter = new AtomicLong(0); + /** To record the last request timestamp */ + private final AtomicLong lastRequestTs = new AtomicLong(System.currentTimeMillis()); + private final Disposable connection; /** @@ -135,6 +138,7 @@ public McpClientSession(Duration requestTimeout, McpClientTransport transport, } else if (message instanceof McpSchema.JSONRPCRequest request) { logger.debug("Received request: {}", request); + lastRequestTs.set(System.currentTimeMillis()); handleIncomingRequest(request).subscribe(response -> transport.sendMessage(response).subscribe(), error -> { var errorResponse = new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, request.id(), @@ -286,4 +290,9 @@ public void close() { transport.close(); } + @Override + public long lastRequestTimestamp() { + return this.lastRequestTs.get(); + } + } diff --git a/mcp/src/main/java/io/modelcontextprotocol/spec/McpServerSession.java b/mcp/src/main/java/io/modelcontextprotocol/spec/McpServerSession.java index 46c356cd..26f6007a 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/spec/McpServerSession.java +++ b/mcp/src/main/java/io/modelcontextprotocol/spec/McpServerSession.java @@ -56,6 +56,9 @@ public class McpServerSession implements McpSession { private final AtomicInteger state = new AtomicInteger(STATE_UNINITIALIZED); + /** To record the last request timestamp */ + private final AtomicLong lastRequestTs = new AtomicLong(System.currentTimeMillis()); + /** * Creates a new server session with the given parameters and the transport to use. * @param id session id @@ -169,6 +172,7 @@ public Mono handle(McpSchema.JSONRPCMessage message) { } else if (message instanceof McpSchema.JSONRPCRequest request) { logger.debug("Received request: {}", request); + lastRequestTs.set(System.currentTimeMillis()); return handleIncomingRequest(request).onErrorResume(error -> { var errorResponse = new McpSchema.JSONRPCResponse(McpSchema.JSONRPC_VERSION, request.id(), null, new McpSchema.JSONRPCResponse.JSONRPCError(McpSchema.ErrorCodes.INTERNAL_ERROR, @@ -277,6 +281,11 @@ public void close() { this.transport.close(); } + @Override + public long lastRequestTimestamp() { + return lastRequestTs.get(); + } + /** * Request handler for the initialization request. */ diff --git a/mcp/src/main/java/io/modelcontextprotocol/spec/McpSession.java b/mcp/src/main/java/io/modelcontextprotocol/spec/McpSession.java index 473a860c..a8036ea3 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/spec/McpSession.java +++ b/mcp/src/main/java/io/modelcontextprotocol/spec/McpSession.java @@ -79,4 +79,9 @@ default Mono sendNotification(String method) { */ void close(); + /** + * @return get the timestamp of the last request the session received. + */ + long lastRequestTimestamp(); + } From b00bf762538beadeaafb7fec5d78cd8356268eb6 Mon Sep 17 00:00:00 2001 From: ForestFairy Date: Fri, 18 Apr 2025 09:29:54 +0800 Subject: [PATCH 2/2] fix: add the scheduled thread to fix #162 --- ...HttpServletSseServerTransportProvider.java | 22 +++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/mcp/src/main/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProvider.java b/mcp/src/main/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProvider.java index afdbff47..64ab3db2 100644 --- a/mcp/src/main/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProvider.java +++ b/mcp/src/main/java/io/modelcontextprotocol/server/transport/HttpServletSseServerTransportProvider.java @@ -6,9 +6,14 @@ import java.io.BufferedReader; import java.io.IOException; import java.io.PrintWriter; +import java.time.Duration; +import java.util.ArrayList; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import com.fasterxml.jackson.core.type.TypeReference; @@ -102,6 +107,7 @@ public class HttpServletSseServerTransportProvider extends HttpServlet implement /** Session factory for creating new sessions */ private McpServerSession.Factory sessionFactory; + private ScheduledFuture removeSessionsDeprecarted; /** * Creates a new HttpServletSseServerTransportProvider instance with a custom SSE @@ -131,6 +137,18 @@ public HttpServletSseServerTransportProvider(ObjectMapper objectMapper, String b this.baseUrl = baseUrl; this.messageEndpoint = messageEndpoint; this.sseEndpoint = sseEndpoint; + this.removeSessionsDeprecarted = Executors.newScheduledThreadPool(1) + .scheduleAtFixedRate(() -> new ArrayList<>(sessions.values()) + .forEach(session -> { + if (TimeUnit.MINUTES.convert(Duration.ofMillis(System.currentTimeMillis() - session.lastRequestTimestamp())) > 30L) { + // close the session if it has not received a request in the last 30 minutes + session.closeGracefully() + .doOnError(error -> + logger.warn("Failed to gracefully close the session {} while it has not received any request in the last 30 minutes." + + "Here is the following error msg: {}", session.getId(), error.getMessage())) + .subscribe(); + } + }), 0, 1, TimeUnit.MINUTES); } /** @@ -323,8 +341,8 @@ protected void doPost(HttpServletRequest request, HttpServletResponse response) public Mono closeGracefully() { isClosing.set(true); logger.debug("Initiating graceful shutdown with {} active sessions", sessions.size()); - - return Flux.fromIterable(sessions.values()).flatMap(McpServerSession::closeGracefully).then(); + return Flux.fromIterable(sessions.values()).flatMap(McpServerSession::closeGracefully) + .doOnTerminate(() -> this.removeSessionsDeprecarted.cancel(true)).then(); } /**