Skip to content
20 changes: 18 additions & 2 deletions integrations/adk-middleware/python/src/ag_ui_adk/adk_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ def __init__(

# Session cleanup configuration
cleanup_interval_seconds: int = 300, # 5 minutes default
max_sessions_per_user: Optional[int] = None, # No limit by default
delete_session_on_cleanup: bool = True,
save_session_to_memory_on_cleanup: bool = True,

# Predictive state configuration
predict_state: Optional[Iterable[PredictStateMapping]] = None,
Expand All @@ -102,6 +105,9 @@ def __init__(
tool_timeout_seconds: Timeout for individual tool calls
max_concurrent_executions: Maximum concurrent background executions
cleanup_interval_seconds: Interval for session cleanup
max_sessions_per_user: Maximum concurrent sessions per user (None = unlimited)
delete_session_on_cleanup: Whether to delete sessions from the adk SessionService on session cache cleanup
save_session_to_memory_on_cleanup: Whether to save sessions to the adk MemoryService on session cache cleanup
predict_state: Configuration for predictive state updates. When provided,
the agent will emit PredictState CustomEvents for matching tool calls,
enabling the UI to show state changes in real-time as tool arguments
Expand All @@ -113,6 +119,9 @@ def __init__(
full message history (e.g., for client-side persistence or AG-UI
protocol compliance). Note: Clients using CopilotKit can use the
/agents/state endpoint instead for on-demand history retrieval.

Note:
If delete_session_on_cleanup=False but save_session_to_memory_on_cleanup=True, sessions will accumulate in SessionService but still be saved to memory on cleanup.
"""
if app_name and app_name_extractor:
raise ValueError("Cannot specify both 'app_name' and 'app_name_extractor'")
Expand Down Expand Up @@ -151,8 +160,9 @@ def __init__(
memory_service=self._memory_service, # Pass memory service for automatic session memory
session_timeout_seconds=session_timeout_seconds, # 20 minutes default
cleanup_interval_seconds=cleanup_interval_seconds,
max_sessions_per_user=None, # No limit by default
auto_cleanup=True # Enable by default
max_sessions_per_user=max_sessions_per_user,
delete_session_on_cleanup=delete_session_on_cleanup,
save_session_to_memory_on_cleanup=save_session_to_memory_on_cleanup
)

# Tool execution tracking
Expand Down Expand Up @@ -204,6 +214,9 @@ def from_app(
# Session management
session_timeout_seconds: Optional[int] = 1200,
cleanup_interval_seconds: int = 300,
max_sessions_per_user: Optional[int] = None, # No limit by default
delete_session_on_cleanup: bool = True,
save_session_to_memory_on_cleanup: bool = True,
# AG-UI specific
predict_state: Optional[Iterable[PredictStateMapping]] = None,
emit_messages_snapshot: bool = False,
Expand Down Expand Up @@ -275,6 +288,9 @@ def from_app(
max_concurrent_executions=max_concurrent_executions,
session_timeout_seconds=session_timeout_seconds,
cleanup_interval_seconds=cleanup_interval_seconds,
max_sessions_per_user=max_sessions_per_user,
delete_session_on_cleanup=delete_session_on_cleanup,
save_session_to_memory_on_cleanup=save_session_to_memory_on_cleanup,
predict_state=predict_state,
emit_messages_snapshot=emit_messages_snapshot,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ def __init__(
session_timeout_seconds: int = 1200, # 20 minutes default
cleanup_interval_seconds: int = 300, # 5 minutes
max_sessions_per_user: Optional[int] = None,
auto_cleanup: bool = True
delete_session_on_cleanup: bool = True,
save_session_to_memory_on_cleanup: bool = True,
):
"""Initialize the session manager.

Expand All @@ -53,7 +54,8 @@ def __init__(
session_timeout_seconds: Time before a session is considered expired
cleanup_interval_seconds: Interval between cleanup cycles
max_sessions_per_user: Maximum concurrent sessions per user (None = unlimited)
auto_cleanup: Enable automatic session cleanup task
delete_session_on_cleanup: Whether to delete sessions on cleanup
save_session_to_memory_on_cleanup: Whether to save sessions to memory on cleanup
"""
if self._initialized:
return
Expand All @@ -67,7 +69,8 @@ def __init__(
self._timeout = session_timeout_seconds
self._cleanup_interval = cleanup_interval_seconds
self._max_per_user = max_sessions_per_user
self._auto_cleanup = auto_cleanup
self._delete_session_on_cleanup = delete_session_on_cleanup
self._save_session_to_memory_on_cleanup = save_session_to_memory_on_cleanup

# Minimal tracking: just keys and user counts
self._session_keys: Set[str] = set() # "app_name:session_id" keys
Expand Down Expand Up @@ -137,8 +140,8 @@ async def get_or_create_session(
self._track_session(session_key, user_id)
logger.debug(f"Retrieved existing session for thread {thread_id}: {session.id}")

# Start cleanup if needed
if self._auto_cleanup and not self._cleanup_task:
# Start cleanup
if not self._cleanup_task:
self._start_cleanup_task()

return session, session.id
Expand All @@ -163,8 +166,8 @@ async def get_or_create_session(
self._track_session(session_key, user_id)
logger.info(f"Created new session for thread {thread_id}: {session.id}")

# Start cleanup if needed
if self._auto_cleanup and not self._cleanup_task:
# Start cleanup
if not self._cleanup_task:
self._start_cleanup_task()

return session, session.id
Expand Down Expand Up @@ -671,22 +674,23 @@ async def _delete_session(self, session):

# If memory service is available, add session to memory before deletion
logger.debug(f"Deleting session {session_key}, memory_service: {self._memory_service is not None}")
if self._memory_service:
if self._memory_service and self._save_session_to_memory_on_cleanup:
try:
await self._memory_service.add_session_to_memory(session)
logger.debug(f"Added session {session_key} to memory before deletion")
except Exception as e:
logger.error(f"Failed to add session {session_key} to memory: {e}")

try:
await self._session_service.delete_session(
session_id=session.id,
app_name=session.app_name,
user_id=session.user_id
)
logger.debug(f"Deleted session: {session_key}")
except Exception as e:
logger.error(f"Failed to delete session {session_key}: {e}")
if self._delete_session_on_cleanup:
try:
await self._session_service.delete_session(
session_id=session.id,
app_name=session.app_name,
user_id=session.user_id
)
logger.debug(f"Deleted session: {session_key}")
except Exception as e:
logger.error(f"Failed to delete session {session_key}: {e}")

self._untrack_session(session_key, session.user_id)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,48 @@ async def test_from_app_preserves_app_name(sample_app):
adk_agent = ADKAgent.from_app(sample_app, user_id="test_user")
assert adk_agent._static_app_name == "test_app"

@pytest.mark.asyncio
async def test_from_app_preserves_cleanup_options(sample_app):
"""Test that cleanup options are preserved."""
adk_agent = ADKAgent.from_app(
sample_app,
user_id="test_user",
delete_session_on_cleanup=False,
save_session_to_memory_on_cleanup=False,
)
assert adk_agent._session_manager._delete_session_on_cleanup is False
assert adk_agent._session_manager._save_session_to_memory_on_cleanup is False
SessionManager.reset_instance()

adk_agent = ADKAgent.from_app(
sample_app,
user_id="test_user",
delete_session_on_cleanup=True,
save_session_to_memory_on_cleanup=True,
)
assert adk_agent._session_manager._delete_session_on_cleanup is True
assert adk_agent._session_manager._save_session_to_memory_on_cleanup is True
SessionManager.reset_instance()

adk_agent = ADKAgent.from_app(
sample_app,
user_id="test_user",
delete_session_on_cleanup=False,
save_session_to_memory_on_cleanup=True,
)
assert adk_agent._session_manager._delete_session_on_cleanup is False
assert adk_agent._session_manager._save_session_to_memory_on_cleanup is True
SessionManager.reset_instance()

adk_agent = ADKAgent.from_app(
sample_app,
user_id="test_user",
delete_session_on_cleanup=True,
save_session_to_memory_on_cleanup=False,
)
assert adk_agent._session_manager._delete_session_on_cleanup is True
assert adk_agent._session_manager._save_session_to_memory_on_cleanup is False
SessionManager.reset_instance()

@pytest.mark.asyncio
async def test_from_app_stores_app_reference(sample_app):
Expand Down
Loading
Loading