diff --git a/integrations/adk-middleware/python/src/ag_ui_adk/adk_agent.py b/integrations/adk-middleware/python/src/ag_ui_adk/adk_agent.py index 3734769cf..bb0408292 100644 --- a/integrations/adk-middleware/python/src/ag_ui_adk/adk_agent.py +++ b/integrations/adk-middleware/python/src/ag_ui_adk/adk_agent.py @@ -77,6 +77,9 @@ def __init__( # Session cleanup configuration cleanup_interval_seconds: int = 300, # 5 minutes default + max_sessions_per_user: Optional[int] = None, # No limit by default + delete_session_on_cleanup: bool = True, + save_session_to_memory_on_cleanup: bool = True, # Predictive state configuration predict_state: Optional[Iterable[PredictStateMapping]] = None, @@ -102,6 +105,9 @@ def __init__( tool_timeout_seconds: Timeout for individual tool calls max_concurrent_executions: Maximum concurrent background executions cleanup_interval_seconds: Interval for session cleanup + max_sessions_per_user: Maximum concurrent sessions per user (None = unlimited) + delete_session_on_cleanup: Whether to delete sessions from the adk SessionService on session cache cleanup + save_session_to_memory_on_cleanup: Whether to save sessions to the adk MemoryService on session cache cleanup predict_state: Configuration for predictive state updates. When provided, the agent will emit PredictState CustomEvents for matching tool calls, enabling the UI to show state changes in real-time as tool arguments @@ -113,6 +119,9 @@ def __init__( full message history (e.g., for client-side persistence or AG-UI protocol compliance). Note: Clients using CopilotKit can use the /agents/state endpoint instead for on-demand history retrieval. + + Note: + If delete_session_on_cleanup=False but save_session_to_memory_on_cleanup=True, sessions will accumulate in SessionService but still be saved to memory on cleanup. """ if app_name and app_name_extractor: raise ValueError("Cannot specify both 'app_name' and 'app_name_extractor'") @@ -151,8 +160,9 @@ def __init__( memory_service=self._memory_service, # Pass memory service for automatic session memory session_timeout_seconds=session_timeout_seconds, # 20 minutes default cleanup_interval_seconds=cleanup_interval_seconds, - max_sessions_per_user=None, # No limit by default - auto_cleanup=True # Enable by default + max_sessions_per_user=max_sessions_per_user, + delete_session_on_cleanup=delete_session_on_cleanup, + save_session_to_memory_on_cleanup=save_session_to_memory_on_cleanup ) # Tool execution tracking @@ -204,6 +214,9 @@ def from_app( # Session management session_timeout_seconds: Optional[int] = 1200, cleanup_interval_seconds: int = 300, + max_sessions_per_user: Optional[int] = None, # No limit by default + delete_session_on_cleanup: bool = True, + save_session_to_memory_on_cleanup: bool = True, # AG-UI specific predict_state: Optional[Iterable[PredictStateMapping]] = None, emit_messages_snapshot: bool = False, @@ -275,6 +288,9 @@ def from_app( max_concurrent_executions=max_concurrent_executions, session_timeout_seconds=session_timeout_seconds, cleanup_interval_seconds=cleanup_interval_seconds, + max_sessions_per_user=max_sessions_per_user, + delete_session_on_cleanup=delete_session_on_cleanup, + save_session_to_memory_on_cleanup=save_session_to_memory_on_cleanup, predict_state=predict_state, emit_messages_snapshot=emit_messages_snapshot, ) diff --git a/integrations/adk-middleware/python/src/ag_ui_adk/session_manager.py b/integrations/adk-middleware/python/src/ag_ui_adk/session_manager.py index 388c9d0c9..c9c4da967 100644 --- a/integrations/adk-middleware/python/src/ag_ui_adk/session_manager.py +++ b/integrations/adk-middleware/python/src/ag_ui_adk/session_manager.py @@ -43,7 +43,8 @@ def __init__( session_timeout_seconds: int = 1200, # 20 minutes default cleanup_interval_seconds: int = 300, # 5 minutes max_sessions_per_user: Optional[int] = None, - auto_cleanup: bool = True + delete_session_on_cleanup: bool = True, + save_session_to_memory_on_cleanup: bool = True, ): """Initialize the session manager. @@ -53,7 +54,8 @@ def __init__( session_timeout_seconds: Time before a session is considered expired cleanup_interval_seconds: Interval between cleanup cycles max_sessions_per_user: Maximum concurrent sessions per user (None = unlimited) - auto_cleanup: Enable automatic session cleanup task + delete_session_on_cleanup: Whether to delete sessions on cleanup + save_session_to_memory_on_cleanup: Whether to save sessions to memory on cleanup """ if self._initialized: return @@ -67,7 +69,8 @@ def __init__( self._timeout = session_timeout_seconds self._cleanup_interval = cleanup_interval_seconds self._max_per_user = max_sessions_per_user - self._auto_cleanup = auto_cleanup + self._delete_session_on_cleanup = delete_session_on_cleanup + self._save_session_to_memory_on_cleanup = save_session_to_memory_on_cleanup # Minimal tracking: just keys and user counts self._session_keys: Set[str] = set() # "app_name:session_id" keys @@ -137,8 +140,8 @@ async def get_or_create_session( self._track_session(session_key, user_id) logger.debug(f"Retrieved existing session for thread {thread_id}: {session.id}") - # Start cleanup if needed - if self._auto_cleanup and not self._cleanup_task: + # Start cleanup + if not self._cleanup_task: self._start_cleanup_task() return session, session.id @@ -163,8 +166,8 @@ async def get_or_create_session( self._track_session(session_key, user_id) logger.info(f"Created new session for thread {thread_id}: {session.id}") - # Start cleanup if needed - if self._auto_cleanup and not self._cleanup_task: + # Start cleanup + if not self._cleanup_task: self._start_cleanup_task() return session, session.id @@ -671,22 +674,23 @@ async def _delete_session(self, session): # If memory service is available, add session to memory before deletion logger.debug(f"Deleting session {session_key}, memory_service: {self._memory_service is not None}") - if self._memory_service: + if self._memory_service and self._save_session_to_memory_on_cleanup: try: await self._memory_service.add_session_to_memory(session) logger.debug(f"Added session {session_key} to memory before deletion") except Exception as e: logger.error(f"Failed to add session {session_key} to memory: {e}") - try: - await self._session_service.delete_session( - session_id=session.id, - app_name=session.app_name, - user_id=session.user_id - ) - logger.debug(f"Deleted session: {session_key}") - except Exception as e: - logger.error(f"Failed to delete session {session_key}: {e}") + if self._delete_session_on_cleanup: + try: + await self._session_service.delete_session( + session_id=session.id, + app_name=session.app_name, + user_id=session.user_id + ) + logger.debug(f"Deleted session: {session_key}") + except Exception as e: + logger.error(f"Failed to delete session {session_key}: {e}") self._untrack_session(session_key, session.user_id) diff --git a/integrations/adk-middleware/python/tests/test_from_app_integration.py b/integrations/adk-middleware/python/tests/test_from_app_integration.py index 3ed8c4f40..d097ef735 100644 --- a/integrations/adk-middleware/python/tests/test_from_app_integration.py +++ b/integrations/adk-middleware/python/tests/test_from_app_integration.py @@ -67,6 +67,48 @@ async def test_from_app_preserves_app_name(sample_app): adk_agent = ADKAgent.from_app(sample_app, user_id="test_user") assert adk_agent._static_app_name == "test_app" +@pytest.mark.asyncio +async def test_from_app_preserves_cleanup_options(sample_app): + """Test that cleanup options are preserved.""" + adk_agent = ADKAgent.from_app( + sample_app, + user_id="test_user", + delete_session_on_cleanup=False, + save_session_to_memory_on_cleanup=False, + ) + assert adk_agent._session_manager._delete_session_on_cleanup is False + assert adk_agent._session_manager._save_session_to_memory_on_cleanup is False + SessionManager.reset_instance() + + adk_agent = ADKAgent.from_app( + sample_app, + user_id="test_user", + delete_session_on_cleanup=True, + save_session_to_memory_on_cleanup=True, + ) + assert adk_agent._session_manager._delete_session_on_cleanup is True + assert adk_agent._session_manager._save_session_to_memory_on_cleanup is True + SessionManager.reset_instance() + + adk_agent = ADKAgent.from_app( + sample_app, + user_id="test_user", + delete_session_on_cleanup=False, + save_session_to_memory_on_cleanup=True, + ) + assert adk_agent._session_manager._delete_session_on_cleanup is False + assert adk_agent._session_manager._save_session_to_memory_on_cleanup is True + SessionManager.reset_instance() + + adk_agent = ADKAgent.from_app( + sample_app, + user_id="test_user", + delete_session_on_cleanup=True, + save_session_to_memory_on_cleanup=False, + ) + assert adk_agent._session_manager._delete_session_on_cleanup is True + assert adk_agent._session_manager._save_session_to_memory_on_cleanup is False + SessionManager.reset_instance() @pytest.mark.asyncio async def test_from_app_stores_app_reference(sample_app): diff --git a/integrations/adk-middleware/python/tests/test_session_deletion.py b/integrations/adk-middleware/python/tests/test_session_deletion.py index 24d6a3df0..04839279e 100644 --- a/integrations/adk-middleware/python/tests/test_session_deletion.py +++ b/integrations/adk-middleware/python/tests/test_session_deletion.py @@ -1,5 +1,6 @@ #!/usr/bin/env python """Test session deletion functionality with minimal session manager.""" +import pytest import asyncio from unittest.mock import AsyncMock, MagicMock @@ -7,233 +8,252 @@ from ag_ui_adk import SessionManager -async def test_session_deletion(): - """Test that session deletion calls delete_session with correct parameters.""" - print("๐Ÿงช Testing session deletion...") - - # Reset singleton for clean test - SessionManager.reset_instance() - - # Create mock session and service - test_thread_id = "test_thread_123" - test_backend_session_id = "backend_session_123" # Backend generates this - test_app_name = "test_app" - test_user_id = "test_user" - - # Mock session with state containing thread_id - created_session = MagicMock() - created_session.id = test_backend_session_id - created_session.state = {"_ag_ui_thread_id": test_thread_id, "test": "data"} - - mock_session_service = AsyncMock() - mock_session_service.list_sessions = AsyncMock(return_value=[]) # No existing sessions - mock_session_service.create_session = AsyncMock(return_value=created_session) - mock_session_service.delete_session = AsyncMock() - - # Create session manager with mock service - session_manager = SessionManager.get_instance( - session_service=mock_session_service, - auto_cleanup=False +class TestSessionDeletion: + + @pytest.fixture( + params=[True, False], ) + def save_session_to_memory_on_cleanup(self, request): + return request.param - # Create a session using thread_id (backend generates session_id) - session, backend_session_id = await session_manager.get_or_create_session( - thread_id=test_thread_id, - app_name=test_app_name, - user_id=test_user_id, - initial_state={"test": "data"} + @pytest.fixture( + params=[True, False], ) + def mock_memory_service(self, request): + """Create a mock memory service.""" + if request.param is False: + return None + service = AsyncMock() + service.add_session_to_memory = AsyncMock() + return service + + """Test session deletion functionality with minimal session manager.""" + async def test_session_deletion(self, mock_memory_service, save_session_to_memory_on_cleanup): + """Test that session deletion calls delete_session with correct parameters.""" + print("๐Ÿงช Testing session deletion...") + + # Reset singleton for clean test + SessionManager.reset_instance() + + # Create mock session and service + test_thread_id = "test_thread_123" + test_backend_session_id = "backend_session_123" # Backend generates this + test_app_name = "test_app" + test_user_id = "test_user" + + # Mock session with state containing thread_id + created_session = MagicMock() + created_session.id = test_backend_session_id + created_session.state = {"_ag_ui_thread_id": test_thread_id, "test": "data"} + + mock_session_service = AsyncMock() + mock_session_service.list_sessions = AsyncMock(return_value=[]) # No existing sessions + mock_session_service.create_session = AsyncMock(return_value=created_session) + mock_session_service.delete_session = AsyncMock() + + # Create session manager with mock service + session_manager = SessionManager.get_instance( + session_service=mock_session_service, + memory_service=mock_memory_service, + delete_session_on_cleanup=True, + save_session_to_memory_on_cleanup=save_session_to_memory_on_cleanup + ) - print(f"โœ… Created session with thread_id: {test_thread_id}, backend_id: {backend_session_id}") + # Create a session using thread_id (backend generates session_id) + session, backend_session_id = await session_manager.get_or_create_session( + thread_id=test_thread_id, + app_name=test_app_name, + user_id=test_user_id, + initial_state={"test": "data"} + ) - # Verify session exists in tracking (uses backend session_id) - session_key = f"{test_app_name}:{test_backend_session_id}" - assert session_key in session_manager._session_keys - print(f"โœ… Session tracked: {session_key}") + print(f"โœ… Created session with thread_id: {test_thread_id}, backend_id: {backend_session_id}") - # Create a mock session object for deletion - mock_session = MagicMock() - mock_session.id = test_backend_session_id - mock_session.app_name = test_app_name - mock_session.user_id = test_user_id + # Verify session exists in tracking (uses backend session_id) + session_key = f"{test_app_name}:{test_backend_session_id}" + assert session_key in session_manager._session_keys + print(f"โœ… Session tracked: {session_key}") - # Manually delete the session (internal method) - await session_manager._delete_session(mock_session) + # Create a mock session object for deletion + mock_session = MagicMock() + mock_session.id = test_backend_session_id + mock_session.app_name = test_app_name + mock_session.user_id = test_user_id - # Verify session is no longer tracked - assert session_key not in session_manager._session_keys - print("โœ… Session no longer in tracking") + # Manually delete the session (internal method) + await session_manager._delete_session(mock_session) - # Verify delete_session was called with correct parameters - mock_session_service.delete_session.assert_called_once_with( - session_id=test_backend_session_id, - app_name=test_app_name, - user_id=test_user_id - ) - print("โœ… delete_session called with correct parameters:") - print(f" session_id: {test_backend_session_id}") - print(f" app_name: {test_app_name}") - print(f" user_id: {test_user_id}") + # Verify session is no longer tracked + assert session_key not in session_manager._session_keys + print("โœ… Session no longer in tracking") - return True + # Verify delete_session was called with correct parameters + mock_session_service.delete_session.assert_called_once_with( + session_id=test_backend_session_id, + app_name=test_app_name, + user_id=test_user_id + ) + print("โœ… delete_session called with correct parameters:") + print(f" session_id: {test_backend_session_id}") + print(f" app_name: {test_app_name}") + print(f" user_id: {test_user_id}") + + if mock_memory_service is not None: + # Memory service add_session_to_memory should be called based on save_session_to_memory_on_cleanup flag + if save_session_to_memory_on_cleanup: + mock_memory_service.add_session_to_memory.assert_called_once() + else: + mock_memory_service.add_session_to_memory.assert_not_called() + return True -async def test_session_deletion_error_handling(): - """Test session deletion error handling.""" - print("\n๐Ÿงช Testing session deletion error handling...") + async def test_session_deletion_error_handling(self, mock_memory_service, save_session_to_memory_on_cleanup): + """Test session deletion error handling.""" + print("\n๐Ÿงช Testing session deletion error handling...") - # Reset singleton for clean test - SessionManager.reset_instance() + # Reset singleton for clean test + SessionManager.reset_instance() - # Create mock session and service - test_thread_id = "test_thread_456" - test_backend_session_id = "backend_session_456" - test_app_name = "test_app" - test_user_id = "test_user" + # Create mock session and service + test_thread_id = "test_thread_456" + test_backend_session_id = "backend_session_456" + test_app_name = "test_app" + test_user_id = "test_user" - created_session = MagicMock() - created_session.id = test_backend_session_id - created_session.state = {"_ag_ui_thread_id": test_thread_id} + created_session = MagicMock() + created_session.id = test_backend_session_id + created_session.state = {"_ag_ui_thread_id": test_thread_id} - mock_session_service = AsyncMock() - mock_session_service.list_sessions = AsyncMock(return_value=[]) - mock_session_service.create_session = AsyncMock(return_value=created_session) - mock_session_service.delete_session = AsyncMock(side_effect=Exception("Delete failed")) + mock_session_service = AsyncMock() + mock_session_service.list_sessions = AsyncMock(return_value=[]) + mock_session_service.create_session = AsyncMock(return_value=created_session) + mock_session_service.delete_session = AsyncMock(side_effect=Exception("Delete failed")) - # Create session manager with mock service - session_manager = SessionManager.get_instance( - session_service=mock_session_service, - auto_cleanup=False - ) + # Create session manager with mock service + session_manager = SessionManager.get_instance( + session_service=mock_session_service, + memory_service=mock_memory_service, + delete_session_on_cleanup=False, + save_session_to_memory_on_cleanup=save_session_to_memory_on_cleanup + ) - # Create a session - await session_manager.get_or_create_session( - thread_id=test_thread_id, - app_name=test_app_name, - user_id=test_user_id - ) + # Create a session + await session_manager.get_or_create_session( + thread_id=test_thread_id, + app_name=test_app_name, + user_id=test_user_id + ) - session_key = f"{test_app_name}:{test_backend_session_id}" - assert session_key in session_manager._session_keys + session_key = f"{test_app_name}:{test_backend_session_id}" + assert session_key in session_manager._session_keys - # Create mock session object for deletion - mock_session = MagicMock() - mock_session.id = test_backend_session_id - mock_session.app_name = test_app_name - mock_session.user_id = test_user_id + # Create mock session object for deletion + mock_session = MagicMock() + mock_session.id = test_backend_session_id + mock_session.app_name = test_app_name + mock_session.user_id = test_user_id - # Try to delete - should handle the error gracefully - try: + # Try to delete - should handle the error gracefully await session_manager._delete_session(mock_session) # Even if deletion failed, session should be untracked assert session_key not in session_manager._session_keys print("โœ… Session untracked even after deletion error") - return True - except Exception as e: - print(f"โŒ Unexpected exception: {e}") - return False - - -async def test_user_session_limits(): - """Test per-user session limits.""" - print("\n๐Ÿงช Testing per-user session limits...") - - # Reset singleton for clean test - SessionManager.reset_instance() - - import time - import uuid - - # Create mock session service - mock_session_service = AsyncMock() - - # Mock session objects with last_update_time and required attributes - class MockSession: - def __init__(self, update_time, session_id=None, app_name=None, user_id=None, state=None): - self.last_update_time = update_time - self.id = session_id - self.app_name = app_name - self.user_id = user_id - self.state = state or {} - - created_sessions = {} - - async def mock_list_sessions(app_name, user_id): - # Return sessions that match app_name/user_id - return [s for s in created_sessions.values() - if s.app_name == app_name and s.user_id == user_id] - - async def mock_get_session(session_id, app_name, user_id): - key = f"{app_name}:{session_id}" - return created_sessions.get(key) - - async def mock_create_session(app_name, user_id, state): - # Backend generates session_id - session_id = str(uuid.uuid4()) - session = MockSession(time.time(), session_id, app_name, user_id, state) - key = f"{app_name}:{session_id}" - created_sessions[key] = session - return session - - mock_session_service.list_sessions = mock_list_sessions - mock_session_service.get_session = mock_get_session - mock_session_service.create_session = mock_create_session - mock_session_service.delete_session = AsyncMock() - - # Create session manager with limit of 2 sessions per user - session_manager = SessionManager.get_instance( - session_service=mock_session_service, - max_sessions_per_user=2, - auto_cleanup=False - ) - - test_user = "limited_user" - test_app = "test_app" - - # Create 3 sessions for the same user (using different thread_ids) - for i in range(3): - await session_manager.get_or_create_session( - thread_id=f"thread_{i}", - app_name=test_app, - user_id=test_user + if mock_memory_service is not None: + # Memory service add_session_to_memory should be called based on save_session_to_memory_on_cleanup flag + if save_session_to_memory_on_cleanup: + mock_memory_service.add_session_to_memory.assert_called_once() + else: + mock_memory_service.add_session_to_memory.assert_not_called() + + + + + async def test_user_session_limits(self, mock_memory_service, save_session_to_memory_on_cleanup): + """Test per-user session limits.""" + print("\n๐Ÿงช Testing per-user session limits...") + + # Reset singleton for clean test + SessionManager.reset_instance() + + import time + import uuid + + # Create mock session service + mock_session_service = AsyncMock() + + # Mock session objects with last_update_time and required attributes + class MockSession: + def __init__(self, update_time, session_id=None, app_name=None, user_id=None, state=None): + self.last_update_time = update_time + self.id = session_id + self.app_name = app_name + self.user_id = user_id + self.state = state or {} + + created_sessions = {} + + async def mock_list_sessions(app_name, user_id): + # Return sessions that match app_name/user_id + return [s for s in created_sessions.values() + if s.app_name == app_name and s.user_id == user_id] + + async def mock_get_session(session_id, app_name, user_id): + key = f"{app_name}:{session_id}" + return created_sessions.get(key) + + async def mock_create_session(app_name, user_id, state): + # Backend generates session_id + session_id = str(uuid.uuid4()) + session = MockSession(time.time(), session_id, app_name, user_id, state) + key = f"{app_name}:{session_id}" + created_sessions[key] = session + return session + + mock_session_service.list_sessions = mock_list_sessions + mock_session_service.get_session = mock_get_session + mock_session_service.create_session = mock_create_session + mock_session_service.delete_session = AsyncMock() + + # Create session manager with limit of 2 sessions per user + session_manager = SessionManager.get_instance( + session_service=mock_session_service, + memory_service=mock_memory_service, + max_sessions_per_user=2, + delete_session_on_cleanup=False, + save_session_to_memory_on_cleanup=save_session_to_memory_on_cleanup ) - # Small delay to ensure different timestamps - await asyncio.sleep(0.1) - - # Should only have 2 sessions for this user - user_count = session_manager.get_user_session_count(test_user) - assert user_count == 2, f"Expected 2 sessions, got {user_count}" - print(f"โœ… User session limit enforced: {user_count} sessions") - # Verify we have exactly 2 session keys (session IDs are now UUIDs) - app_session_keys = [k for k in session_manager._session_keys if k.startswith(f"{test_app}:")] - assert len(app_session_keys) == 2, f"Expected 2 session keys, got {len(app_session_keys)}" - print("โœ… Oldest session was removed") - - return True - - -async def main(): - """Run all tests.""" - try: - success = await test_session_deletion() - success = success and await test_session_deletion_error_handling() - success = success and await test_user_session_limits() - - if success: - print("\nโœ… All session deletion tests passed!") - else: - print("\nโŒ Some tests failed!") - exit(1) - - except Exception as e: - print(f"\nโŒ Unexpected error: {e}") - import traceback - traceback.print_exc() - exit(1) + test_user = "limited_user" + test_app = "test_app" + + # Create 3 sessions for the same user (using different thread_ids) + for i in range(3): + await session_manager.get_or_create_session( + thread_id=f"thread_{i}", + app_name=test_app, + user_id=test_user + ) + # Small delay to ensure different timestamps + await asyncio.sleep(0.1) + + # Should only have 2 sessions for this user + user_count = session_manager.get_user_session_count(test_user) + assert user_count == 2, f"Expected 2 sessions, got {user_count}" + print(f"โœ… User session limit enforced: {user_count} sessions") + + # Verify we have exactly 2 session keys (session IDs are now UUIDs) + app_session_keys = [k for k in session_manager._session_keys if k.startswith(f"{test_app}:")] + assert len(app_session_keys) == 2, f"Expected 2 session keys, got {len(app_session_keys)}" + print("โœ… Oldest session was removed") + + if mock_memory_service is not None: + # Memory service add_session_to_memory should be called based on save_session_to_memory_on_cleanup flag + if save_session_to_memory_on_cleanup: + mock_memory_service.add_session_to_memory.assert_called_once() + else: + mock_memory_service.add_session_to_memory.assert_not_called() + return True -if __name__ == "__main__": - asyncio.run(main()) \ No newline at end of file diff --git a/integrations/adk-middleware/python/tests/test_session_memory.py b/integrations/adk-middleware/python/tests/test_session_memory.py index 7b7359f74..69a1d6e02 100644 --- a/integrations/adk-middleware/python/tests/test_session_memory.py +++ b/integrations/adk-middleware/python/tests/test_session_memory.py @@ -13,6 +13,12 @@ class TestSessionMemory: """Test cases for automatic session memory functionality.""" + @pytest.fixture( + params=[True, False], + ) + def delete_session_on_cleanup(self, request): + return request.param + @pytest.fixture(autouse=True) def reset_session_manager(self): """Reset session manager before each test.""" @@ -56,11 +62,12 @@ def to_dict(self): # ===== EXISTING MEMORY TESTS ===== @pytest.mark.asyncio - async def test_memory_service_disabled_by_default(self, mock_session_service, mock_session): + async def test_memory_service_disabled_by_default(self, mock_session_service, mock_session, delete_session_on_cleanup): """Test that memory service is disabled when not provided.""" manager = SessionManager.get_instance( session_service=mock_session_service, - auto_cleanup=False + delete_session_on_cleanup=delete_session_on_cleanup, + save_session_to_memory_on_cleanup=True ) # Verify memory service is None @@ -73,16 +80,20 @@ async def test_memory_service_disabled_by_default(self, mock_session_service, mo await manager.get_or_create_session("test_session", "test_app", "test_user") await manager._delete_session(mock_session) - # Only session service delete should be called - mock_session_service.delete_session.assert_called_once() + # Session service delete should only be called based on delete_session_on_cleanup flag + if delete_session_on_cleanup: + mock_session_service.delete_session.assert_called_once() + else: + mock_session_service.delete_session.assert_not_called() @pytest.mark.asyncio - async def test_memory_service_enabled_with_service(self, mock_session_service, mock_memory_service, mock_session): + async def test_memory_service_enabled_with_service(self, mock_session_service, mock_memory_service, mock_session, delete_session_on_cleanup): """Test that memory service is called when provided.""" manager = SessionManager.get_instance( session_service=mock_session_service, memory_service=mock_memory_service, - auto_cleanup=False + delete_session_on_cleanup=delete_session_on_cleanup, + save_session_to_memory_on_cleanup=True ) # Verify memory service is set @@ -94,20 +105,25 @@ async def test_memory_service_enabled_with_service(self, mock_session_service, m # Verify memory service was called with correct parameters mock_memory_service.add_session_to_memory.assert_called_once_with(mock_session) - # Verify session was also deleted from session service - mock_session_service.delete_session.assert_called_once_with( + # Session service delete should only be called based on delete_session_on_cleanup flag + if delete_session_on_cleanup: + mock_session_service.delete_session.assert_called_once_with( session_id="test_session", app_name="test_app", user_id="test_user" - ) + ) + else: + mock_session_service.delete_session.assert_not_called() + @pytest.mark.asyncio - async def test_memory_service_error_handling(self, mock_session_service, mock_memory_service, mock_session): + async def test_memory_service_error_handling(self, mock_session_service, mock_memory_service, mock_session, delete_session_on_cleanup): """Test that memory service errors don't prevent session deletion.""" manager = SessionManager.get_instance( session_service=mock_session_service, memory_service=mock_memory_service, - auto_cleanup=False + delete_session_on_cleanup=delete_session_on_cleanup, + save_session_to_memory_on_cleanup=True ) # Make memory service fail @@ -116,17 +132,23 @@ async def test_memory_service_error_handling(self, mock_session_service, mock_me # Delete should still succeed despite memory service error await manager._delete_session(mock_session) - # Verify both were called despite memory service error + # Verify memory service was called mock_memory_service.add_session_to_memory.assert_called_once() - mock_session_service.delete_session.assert_called_once() + + # Session service delete should only be called based on delete_session_on_cleanup flag + if delete_session_on_cleanup: + mock_session_service.delete_session.assert_called_once() + else: + mock_session_service.delete_session.assert_not_called() @pytest.mark.asyncio - async def test_memory_service_with_missing_session(self, mock_session_service, mock_memory_service): + async def test_memory_service_with_missing_session(self, mock_session_service, mock_memory_service, delete_session_on_cleanup): """Test memory service behavior when session doesn't exist.""" manager = SessionManager.get_instance( session_service=mock_session_service, memory_service=mock_memory_service, - auto_cleanup=False + delete_session_on_cleanup=delete_session_on_cleanup, + save_session_to_memory_on_cleanup=False ) # Delete a None session (simulates session not found) @@ -139,13 +161,14 @@ async def test_memory_service_with_missing_session(self, mock_session_service, m mock_session_service.delete_session.assert_not_called() @pytest.mark.asyncio - async def test_memory_service_during_cleanup(self, mock_session_service, mock_memory_service): + async def test_memory_service_during_cleanup(self, mock_session_service, mock_memory_service, delete_session_on_cleanup): """Test that memory service is used during automatic cleanup.""" manager = SessionManager.get_instance( session_service=mock_session_service, memory_service=mock_memory_service, session_timeout_seconds=1, # 1 second timeout - auto_cleanup=False # We'll trigger cleanup manually + delete_session_on_cleanup=delete_session_on_cleanup, + save_session_to_memory_on_cleanup=True ) # Create an expired session @@ -165,14 +188,21 @@ async def test_memory_service_during_cleanup(self, mock_session_service, mock_me # Verify memory service was called during cleanup mock_memory_service.add_session_to_memory.assert_called_once_with(old_session) + # Session service delete should only be called based on delete_session_on_cleanup flag + if delete_session_on_cleanup: + mock_session_service.delete_session.assert_called_once() + else: + mock_session_service.delete_session.assert_not_called() + @pytest.mark.asyncio - async def test_memory_service_during_user_limit_enforcement(self, mock_session_service, mock_memory_service): + async def test_memory_service_during_user_limit_enforcement(self, mock_session_service, mock_memory_service, delete_session_on_cleanup): """Test that memory service is used when removing oldest sessions due to user limits.""" manager = SessionManager.get_instance( session_service=mock_session_service, memory_service=mock_memory_service, max_sessions_per_user=1, # Limit to 1 session per user - auto_cleanup=False + delete_session_on_cleanup=delete_session_on_cleanup, + save_session_to_memory_on_cleanup=True ) # Create an old session that will be removed @@ -208,14 +238,21 @@ async def test_memory_service_during_user_limit_enforcement(self, mock_session_s # Verify memory service was called for the removed session mock_memory_service.add_session_to_memory.assert_called_once_with(old_session) + # Session service delete should only be called based on delete_session_on_cleanup flag + if delete_session_on_cleanup: + mock_session_service.delete_session.assert_called_once() + else: + mock_session_service.delete_session.assert_not_called() + @pytest.mark.asyncio - async def test_memory_service_configuration(self, mock_session_service, mock_memory_service): + async def test_memory_service_configuration(self, mock_session_service, mock_memory_service, delete_session_on_cleanup): """Test that memory service configuration is properly stored.""" # Test with memory service enabled SessionManager.reset_instance() manager = SessionManager.get_instance( session_service=mock_session_service, - memory_service=mock_memory_service + memory_service=mock_memory_service, + delete_session_on_cleanup=delete_session_on_cleanup ) assert manager._memory_service is mock_memory_service @@ -224,7 +261,8 @@ async def test_memory_service_configuration(self, mock_session_service, mock_mem SessionManager.reset_instance() manager = SessionManager.get_instance( session_service=mock_session_service, - memory_service=None + memory_service=None, + delete_session_on_cleanup=delete_session_on_cleanup ) assert manager._memory_service is None @@ -277,7 +315,8 @@ def manager(self, mock_session_service): """Create a session manager instance.""" return SessionManager.get_instance( session_service=mock_session_service, - auto_cleanup=False + delete_session_on_cleanup=False, + save_session_to_memory_on_cleanup=False ) # ===== UPDATE SESSION STATE TESTS ===== diff --git a/integrations/adk-middleware/python/tests/test_tool_tracking_hitl.py b/integrations/adk-middleware/python/tests/test_tool_tracking_hitl.py index 148ffabed..70501d178 100644 --- a/integrations/adk-middleware/python/tests/test_tool_tracking_hitl.py +++ b/integrations/adk-middleware/python/tests/test_tool_tracking_hitl.py @@ -181,4 +181,115 @@ async def mock_run_adk_in_background(*args, **kwargs): # Execution should NOT be cleaned up due to pending tool call assert "test_thread" in adk_middleware._active_executions execution = adk_middleware._active_executions["test_thread"] - assert execution.is_complete \ No newline at end of file + assert execution.is_complete + + @pytest.mark.asyncio + async def test_session_not_cleaned_up_with_pending_tools(self, mock_adk_agent, sample_tool): + """Test that executions with pending tool calls are not cleaned up.""" + # Create input + input_data = RunAgentInput( + thread_id="test_thread", + run_id="run_1", + messages=[UserMessage(id="1", role="user", content="Test")], + tools=[sample_tool], + context=[], + state={}, + forwarded_props={} + ) + + adk_middleware = ADKAgent( + adk_agent=mock_adk_agent, + app_name="test_app", + user_id="test_user", + delete_session_on_cleanup=True, + session_timeout_seconds=0 # all sessions expire immediately for test + ) + + # Ensure session exists first (returns tuple: session, backend_session_id) + session, backend_session_id = await adk_middleware._ensure_session_exists( + app_name="test_app", + user_id="test_user", + thread_id="test_thread", + initial_state={} + ) + + # Mock background execution to emit tool events + async def mock_run_adk_in_background(*args, **kwargs): + event_queue = kwargs['event_queue'] + + # Emit tool call events + tool_call_id = "test_tool_call_456" + await event_queue.put(ToolCallEndEvent( + type=EventType.TOOL_CALL_END, + tool_call_id=tool_call_id + )) + + # Signal completion + await event_queue.put(None) + + # Use the mock + with patch.object(adk_middleware, '_run_adk_in_background', side_effect=mock_run_adk_in_background): + events = [] + async for event in adk_middleware._start_new_execution(input_data): + events.append(event) + + # Execution should NOT be cleaned up due to pending tool call + assert "test_thread" in adk_middleware._active_executions + execution = adk_middleware._active_executions["test_thread"] + assert execution.is_complete + + await adk_middleware._session_manager._cleanup_expired_sessions() + # Session should still exist due to pending tool call + assert adk_middleware._session_manager.get_session_count() == 1 + + @pytest.mark.asyncio + async def test_session_cleaned_up_with_no_pending_tools(self, mock_adk_agent, sample_tool): + """Test that executions with no pending tool calls are cleaned up.""" + # Create input + input_data = RunAgentInput( + thread_id="test_thread", + run_id="run_1", + messages=[UserMessage(id="1", role="user", content="Test")], + tools=[sample_tool], + context=[], + state={}, + forwarded_props={} + ) + + adk_middleware = ADKAgent( + adk_agent=mock_adk_agent, + app_name="test_app", + user_id="test_user", + delete_session_on_cleanup=True, + session_timeout_seconds=0 # all sessions expire immediately for test + ) + + # Ensure session exists first (returns tuple: session, backend_session_id) + session, backend_session_id = await adk_middleware._ensure_session_exists( + app_name="test_app", + user_id="test_user", + thread_id="test_thread", + initial_state={} + ) + + # Mock background execution to emit tool events + async def mock_run_adk_in_background(*args, **kwargs): + event_queue = kwargs['event_queue'] + + # Emit NO tool call events + + # Signal completion + await event_queue.put(None) + + # Use the mock + with patch.object(adk_middleware, '_run_adk_in_background', side_effect=mock_run_adk_in_background): + events = [] + async for event in adk_middleware._start_new_execution(input_data): + events.append(event) + + # Execution should be cleaned up due to NO pending tool call + assert "test_thread" not in adk_middleware._active_executions + + await adk_middleware._session_manager._cleanup_expired_sessions() + # Session should not exist due cleanup + assert adk_middleware._session_manager.get_session_count() == 0 \ No newline at end of file