Skip to content

More efficient http #156 #157

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ else if (MESSAGE_EVENT_TYPE.equals(event.event())) {
JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(this.objectMapper, event.data());
s.next(message);
}
catch (IOException ioException) {
s.error(ioException);
catch (RuntimeException ioOrIllegalException) {
s.error(ioOrIllegalException);
}
}
else {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.modelcontextprotocol.server.transport;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

Expand Down Expand Up @@ -323,8 +324,8 @@ private Mono<ServerResponse> handleMessage(ServerRequest request) {
.bodyValue(new McpError(error.getMessage()));
});
}
catch (IllegalArgumentException | IOException e) {
logger.error("Failed to deserialize message: {}", e.getMessage());
catch (IllegalArgumentException | UncheckedIOException e) {
logger.error("Failed to deserialize message", e);
return ServerResponse.badRequest().bodyValue(new McpError("Invalid message format"));
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;

Expand Down Expand Up @@ -300,7 +299,7 @@ private ServerResponse handleMessage(ServerRequest request) {
return ServerResponse.status(HttpStatus.SERVICE_UNAVAILABLE).body("Server is shutting down");
}

if (!request.param("sessionId").isPresent()) {
if (request.param("sessionId").isEmpty()) {
return ServerResponse.badRequest().body(new McpError("Session ID missing in message endpoint"));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,6 @@
*/
package io.modelcontextprotocol.server;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;

import org.apache.catalina.Context;
import org.apache.catalina.startup.Tomcat;

Expand Down Expand Up @@ -53,15 +49,8 @@ public static TomcatServer createTomcatServer(String contextPath, int port, Clas
wrapper.setAsyncSupported(true);
context.addServletMappingDecoded("/*", "dispatcherServlet");

try {
// Configure and start the connector with async support
var connector = tomcat.getConnector();
connector.setAsyncTimeout(3000); // 3 seconds timeout for async requests
}
catch (Exception e) {
throw new RuntimeException("Failed to start Tomcat", e);
}

var connector = tomcat.getConnector();
connector.setAsyncTimeout(3000); // 3 seconds timeout for async requests
return new TomcatServer(tomcat, appContext);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,6 @@ protected McpServerTransportProvider createMcpTransportProvider() {
return transportProvider;
}

@Override
protected void onStart() {
}

@Override
protected void onClose() {
if (transportProvider != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,6 @@ protected WebMvcSseServerTransportProvider createMcpTransportProvider() {
return transportProvider;
}

@Override
protected void onStart() {
}

@Override
protected void onClose() {
if (transportProvider != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,14 +77,14 @@ void testConstructorWithInvalidArguments() {
void testGracefulShutdown() {
var mcpSyncServer = McpServer.sync(createMcpTransportProvider()).serverInfo("test-server", "1.0.0").build();

assertThatCode(() -> mcpSyncServer.closeGracefully()).doesNotThrowAnyException();
assertThatCode(mcpSyncServer::closeGracefully).doesNotThrowAnyException();
}

@Test
void testImmediateClose() {
var mcpSyncServer = McpServer.sync(createMcpTransportProvider()).serverInfo("test-server", "1.0.0").build();

assertThatCode(() -> mcpSyncServer.close()).doesNotThrowAnyException();
assertThatCode(mcpSyncServer::close).doesNotThrowAnyException();
}

@Test
Expand All @@ -93,7 +93,7 @@ void testGetAsyncServer() {

assertThat(mcpSyncServer.getAsyncServer()).isNotNull();

assertThatCode(() -> mcpSyncServer.closeGracefully()).doesNotThrowAnyException();
assertThatCode(mcpSyncServer::closeGracefully).doesNotThrowAnyException();
}

// ---------------------------------------
Expand Down Expand Up @@ -138,7 +138,7 @@ void testAddDuplicateTool() {
.isInstanceOf(McpError.class)
.hasMessage("Tool with name '" + TEST_TOOL_NAME + "' already exists");

assertThatCode(() -> mcpSyncServer.closeGracefully()).doesNotThrowAnyException();
assertThatCode(mcpSyncServer::closeGracefully).doesNotThrowAnyException();
}

@Test
Expand All @@ -153,7 +153,7 @@ void testRemoveTool() {

assertThatCode(() -> mcpSyncServer.removeTool(TEST_TOOL_NAME)).doesNotThrowAnyException();

assertThatCode(() -> mcpSyncServer.closeGracefully()).doesNotThrowAnyException();
assertThatCode(mcpSyncServer::closeGracefully).doesNotThrowAnyException();
}

@Test
Expand All @@ -173,9 +173,9 @@ void testRemoveNonexistentTool() {
void testNotifyToolsListChanged() {
var mcpSyncServer = McpServer.sync(createMcpTransportProvider()).serverInfo("test-server", "1.0.0").build();

assertThatCode(() -> mcpSyncServer.notifyToolsListChanged()).doesNotThrowAnyException();
assertThatCode(mcpSyncServer::notifyToolsListChanged).doesNotThrowAnyException();

assertThatCode(() -> mcpSyncServer.closeGracefully()).doesNotThrowAnyException();
assertThatCode(mcpSyncServer::closeGracefully).doesNotThrowAnyException();
}

// ---------------------------------------
Expand All @@ -186,9 +186,9 @@ void testNotifyToolsListChanged() {
void testNotifyResourcesListChanged() {
var mcpSyncServer = McpServer.sync(createMcpTransportProvider()).serverInfo("test-server", "1.0.0").build();

assertThatCode(() -> mcpSyncServer.notifyResourcesListChanged()).doesNotThrowAnyException();
assertThatCode(mcpSyncServer::notifyResourcesListChanged).doesNotThrowAnyException();

assertThatCode(() -> mcpSyncServer.closeGracefully()).doesNotThrowAnyException();
assertThatCode(mcpSyncServer::closeGracefully).doesNotThrowAnyException();
}

@Test
Expand Down Expand Up @@ -219,7 +219,7 @@ void testAddResourceWithNullSpecification() {
.isInstanceOf(McpError.class)
.hasMessage("Resource must not be null");

assertThatCode(() -> mcpSyncServer.closeGracefully()).doesNotThrowAnyException();
assertThatCode(mcpSyncServer::closeGracefully).doesNotThrowAnyException();
}

@Test
Expand Down Expand Up @@ -312,7 +312,7 @@ void testRemovePrompt() {

assertThatCode(() -> mcpSyncServer.removePrompt(TEST_PROMPT_NAME)).doesNotThrowAnyException();

assertThatCode(() -> mcpSyncServer.closeGracefully()).doesNotThrowAnyException();
assertThatCode(mcpSyncServer::closeGracefully).doesNotThrowAnyException();
}

@Test
Expand All @@ -325,7 +325,7 @@ void testRemoveNonexistentPrompt() {
assertThatThrownBy(() -> mcpSyncServer.removePrompt("nonexistent-prompt")).isInstanceOf(McpError.class)
.hasMessage("Prompt with name 'nonexistent-prompt' not found");

assertThatCode(() -> mcpSyncServer.closeGracefully()).doesNotThrowAnyException();
assertThatCode(mcpSyncServer::closeGracefully).doesNotThrowAnyException();
}

// ---------------------------------------
Expand Down Expand Up @@ -366,7 +366,7 @@ void testRootsChangeHandlers() {
.build();

assertThat(multipleConsumersServer).isNotNull();
assertThatCode(() -> multipleConsumersServer.closeGracefully()).doesNotThrowAnyException();
assertThatCode(multipleConsumersServer::closeGracefully).doesNotThrowAnyException();
onClose();

// Test error handling
Expand All @@ -378,14 +378,14 @@ void testRootsChangeHandlers() {
.build();

assertThat(errorHandlingServer).isNotNull();
assertThatCode(() -> errorHandlingServer.closeGracefully()).doesNotThrowAnyException();
assertThatCode(errorHandlingServer::closeGracefully).doesNotThrowAnyException();
onClose();

// Test without consumers
var noConsumersServer = McpServer.sync(createMcpTransportProvider()).serverInfo("test-server", "1.0.0").build();

assertThat(noConsumersServer).isNotNull();
assertThatCode(() -> noConsumersServer.closeGracefully()).doesNotThrowAnyException();
assertThatCode(noConsumersServer::closeGracefully).doesNotThrowAnyException();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ else if (MESSAGE_EVENT_TYPE.equals(event.type())) {
logger.error("Received unrecognized SSE event type: {}", event.type());
}
}
catch (IOException e) {
catch (RuntimeException e) {
logger.error("Error processing SSE event", e);
future.completeExceptionally(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ private static class AsyncServerImpl extends McpAsyncServer {

private final ConcurrentHashMap<String, McpServerFeatures.AsyncPromptSpecification> prompts = new ConcurrentHashMap<>();

// FIXME: this field is deprecated and should be remvoed together with the
// FIXME: this field is deprecated and should be removed together with the
// broadcasting loggingNotification.
private LoggingLevel minLoggingLevel = LoggingLevel.DEBUG;

Expand Down Expand Up @@ -334,6 +334,7 @@ private static class AsyncServerImpl extends McpAsyncServer {
mcpTransportProvider.setSessionFactory(
transport -> new McpServerSession(UUID.randomUUID().toString(), requestTimeout, transport,
this::asyncInitializeRequestHandler, Mono::empty, requestHandlers, notificationHandlers));

}

// ---------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
*/
package io.modelcontextprotocol.server.transport;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.UncheckedIOException;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
Expand Down Expand Up @@ -234,24 +234,20 @@ protected void doGet(HttpServletRequest request, HttpServletResponse response)
* and formats error responses according to the MCP specification.
* @param request The HTTP servlet request
* @param response The HTTP servlet response
* @throws ServletException If a servlet-specific error occurs
* @throws IOException If an I/O error occurs
*/
@Override
protected void doPost(HttpServletRequest request, HttpServletResponse response)
throws ServletException, IOException {

throws IOException {
if (isClosing.get()) {
response.sendError(HttpServletResponse.SC_SERVICE_UNAVAILABLE, "Server is shutting down");
return;
}

String requestURI = request.getRequestURI();
if (!requestURI.endsWith(messageEndpoint)) {
response.sendError(HttpServletResponse.SC_NOT_FOUND);
return;
}

// Get the session ID from the request parameter
String sessionId = request.getParameter("sessionId");
if (sessionId == null) {
Expand All @@ -277,24 +273,29 @@ protected void doPost(HttpServletRequest request, HttpServletResponse response)
writer.flush();
return;
}

try {
BufferedReader reader = request.getReader();
StringBuilder body = new StringBuilder();
String line;
while ((line = reader.readLine()) != null) {
body.append(line);
}

McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(objectMapper, body.toString());

// Process the message through the session's handle method
McpSchema.JSONRPCMessage message = McpSchema.deserializeJsonRpcMessage(objectMapper, request.getReader());
session.handle(message).block(); // Block for Servlet compatibility

response.setStatus(HttpServletResponse.SC_OK);
}
catch (IllegalArgumentException | UncheckedIOException ex) {
try {
McpError mcpError = new McpError(ex.getMessage());
response.setContentType(APPLICATION_JSON);
response.setCharacterEncoding(UTF_8);
response.setStatus(HttpServletResponse.SC_BAD_REQUEST);
String jsonError = objectMapper.writeValueAsString(mcpError);
PrintWriter writer = response.getWriter();
writer.write(jsonError);
writer.flush();
}
catch (IOException ex2) {
logger.error(FAILED_TO_SEND_ERROR_RESPONSE, ex2.getMessage());
response.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR, "Error processing message");
}
}
catch (Exception e) {
logger.error("Error processing message: {}", e.getMessage());
logger.error("Error processing message", e);
try {
McpError mcpError = new McpError(e.getMessage());
response.setContentType(APPLICATION_JSON);
Expand Down
43 changes: 25 additions & 18 deletions mcp/src/main/java/io/modelcontextprotocol/spec/McpSchema.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@

package io.modelcontextprotocol.spec;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.Reader;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -140,32 +143,36 @@ public sealed interface Request
/**
* Deserializes a JSON string into a JSONRPCMessage object.
* @param objectMapper The ObjectMapper instance to use for deserialization
* @param jsonText The JSON string to deserialize
* @param inputStream The JSON string to deserialize
* @return A JSONRPCMessage instance using either the {@link JSONRPCRequest},
* {@link JSONRPCNotification}, or {@link JSONRPCResponse} classes.
* @throws IOException If there's an error during deserialization
* @throws IllegalArgumentException If the JSON structure doesn't match any known
* message type
*/
public static JSONRPCMessage deserializeJsonRpcMessage(ObjectMapper objectMapper, String jsonText)
throws IOException {

logger.debug("Received JSON message: {}", jsonText);

var map = objectMapper.readValue(jsonText, MAP_TYPE_REF);

// Determine message type based on specific JSON structure
if (map.containsKey("method") && map.containsKey("id")) {
return objectMapper.convertValue(map, JSONRPCRequest.class);
}
else if (map.containsKey("method") && !map.containsKey("id")) {
return objectMapper.convertValue(map, JSONRPCNotification.class);
public static JSONRPCMessage deserializeJsonRpcMessage(ObjectMapper objectMapper, BufferedReader inputStream) {
try {
var map = objectMapper.readValue(inputStream, MAP_TYPE_REF);
// Determine message type based on specific JSON structure
if (map.containsKey("method") && map.containsKey("id")) {
return objectMapper.convertValue(map, JSONRPCRequest.class);
}
else if (map.containsKey("method") && !map.containsKey("id")) {
return objectMapper.convertValue(map, JSONRPCNotification.class);
}
else if (map.containsKey("result") || map.containsKey("error")) {
return objectMapper.convertValue(map, JSONRPCResponse.class);
}
throw new IllegalArgumentException("Cannot deserialize JSONRPCMessage: " + map);
}
else if (map.containsKey("result") || map.containsKey("error")) {
return objectMapper.convertValue(map, JSONRPCResponse.class);
catch (IOException e) {
throw new java.io.UncheckedIOException(e);
}
}

throw new IllegalArgumentException("Cannot deserialize JSONRPCMessage: " + jsonText);
public static JSONRPCMessage deserializeJsonRpcMessage(ObjectMapper objectMapper, String input) {
Reader inputString = new StringReader(input);
BufferedReader reader = new BufferedReader(inputString);
return deserializeJsonRpcMessage(objectMapper, reader);
}

// ---------------------------
Expand Down
Loading