diff --git a/src/mcp/shared/session.py b/src/mcp/shared/session.py index ddfa909..e21bcbc 100644 --- a/src/mcp/shared/session.py +++ b/src/mcp/shared/session.py @@ -1,3 +1,4 @@ +import logging from contextlib import AbstractAsyncContextManager from datetime import timedelta from typing import Any, Callable, Generic, TypeVar @@ -273,19 +274,28 @@ async def _receive_loop(self) -> None: await self._incoming_message_stream_writer.send(responder) elif isinstance(message.root, JSONRPCNotification): - notification = self._receive_notification_type.model_validate( - message.root.model_dump( - by_alias=True, mode="json", exclude_none=True + try: + notification = self._receive_notification_type.model_validate( + message.root.model_dump( + by_alias=True, mode="json", exclude_none=True + ) + ) + # Handle cancellation notifications + if isinstance(notification.root, CancelledNotification): + cancelled_id = notification.root.params.requestId + if cancelled_id in self._in_flight: + await self._in_flight[cancelled_id].cancel() + else: + await self._received_notification(notification) + await self._incoming_message_stream_writer.send( + notification + ) + except Exception as e: + # For other validation errors, log and continue + logging.warning( + f"Failed to validate notification: {e}. " + f"Message was: {message.root}" ) - ) - # Handle cancellation notifications - if isinstance(notification.root, CancelledNotification): - cancelled_id = notification.root.params.requestId - if cancelled_id in self._in_flight: - await self._in_flight[cancelled_id].cancel() - else: - await self._received_notification(notification) - await self._incoming_message_stream_writer.send(notification) else: # Response or error stream = self._response_streams.pop(message.root.id, None) if stream: