Skip to content

Feat: Add Optional Structured Session Metadata #1474

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 40 additions & 0 deletions docs/sessions.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,46 @@ result = await Runner.run(
)
```

### Structured metadata

By default, `SQLiteSession` stores all conversation events as JSON blobs in a single table. You can enable structured metadata to create additional tables for messages, tool calls, and per-response usage:

```python
from agents import SQLiteSession

# Enable structured metadata storage
session = SQLiteSession(
"user_123",
"conversations.db",
structured_metadata=True,
)

# This creates additional tables:
# - agent_conversation_messages: stores user, assistant, and system messages
# - agent_tool_calls: stores tool call requests and outputs
# - agent_usage: stores per-response usage (model name, token counts) with trace/span attribution
```

With structured metadata enabled, you can query conversations and usage using standard SQL:

```sql
-- Get all user messages in a session
SELECT content FROM agent_conversation_messages
WHERE session_id = 'user_123' AND role = 'user';

-- Get all tool calls and their results
SELECT tool_name, arguments, output, status
FROM agent_tool_calls
WHERE session_id = 'user_123';

-- Inspect usage records (model, token counts) and spans
SELECT response_id, model, requests, input_tokens, output_tokens, total_tokens,
trace_id, span_id, created_at
FROM agent_usage
WHERE session_id = 'user_123'
ORDER BY created_at DESC;
```

### Multiple sessions

```python
Expand Down
133 changes: 133 additions & 0 deletions examples/basic/structured_metadata_session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
"""A script to test and demonstrate the structured metadata session storage feature."""

import asyncio
import os
import random
import sqlite3
import sys

# Add the parent directory to the path to import from the local package
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", ".."))

from agents import Agent, Runner, SQLiteSession, function_tool


async def main():
# Create a tool
@function_tool
def get_random_number(max_val: int) -> int:
"""Get a random number between 0 and max_val."""
return random.randint(0, max_val)

# Create an agent
agent = Agent(
name="Assistant",
instructions="Reply very concisely. When using tools, explain what you're doing.",
tools=[get_random_number],
)

# Create a session with structured storage enabled
db_path = "structured_conversation_demo.db"
session = SQLiteSession("demo_session", db_path, structured_metadata=True)

print("=== Structured Session Storage Demo ===")
print("This demo shows structured storage that makes conversations easy to query.\n")

# First turn
print("First turn:")
print("User: Pick 3 random numbers between 0 and 100")
result = await Runner.run(agent, "Pick 3 random numbers between 0 and 100", session=session)
print(f"Assistant: {result.final_output}")
print()

# Second turn - the agent will remember the previous conversation
print("Second turn:")
print("User: What number did you pick for me?")
result = await Runner.run(agent, "What number did you pick for me?", session=session)
print(f"Assistant: {result.final_output}")
print()

# Third turn - another tool call
print("Third turn:")
print("User: Now pick one more number between 0 and 50")
result = await Runner.run(agent, "Now pick one more number between 0 and 50", session=session)
print(f"Assistant: {result.final_output}")
print()

print("=== Conversation Complete ===")
print(f"Data stored in: {db_path}")
print()

# Now demonstrate the structured storage benefits
print("=== Structured Storage Analysis ===")
print("With structured storage, you can easily query the conversation:")
print()

conn = sqlite3.connect(db_path)

# Show all messages
print("1. All conversation messages:")
cursor = conn.execute("""
SELECT role, content FROM agent_conversation_messages
WHERE session_id = 'demo_session'
ORDER BY created_at
""")
for role, content in cursor.fetchall():
content_preview = content[:60] + "..." if len(content) > 60 else content
print(f" {role}: {content_preview}")
print()

# Show all tool calls
print("2. All tool calls and results:")
cursor = conn.execute("""
SELECT tool_name, arguments, output, status
FROM agent_tool_calls
WHERE session_id = 'demo_session'
ORDER BY created_at
""")
for tool_name, arguments, output, status in cursor.fetchall():
print(f" Tool: {tool_name}")
print(f" Args: {arguments}")
print(f" Result: {output}")
print(f" Status: {status}")
print()

# Show message count by role
print("3. Message count by role:")
cursor = conn.execute("""
SELECT role, COUNT(*) as count
FROM agent_conversation_messages
WHERE session_id = 'demo_session'
GROUP BY role
""")
for role, count in cursor.fetchall():
print(f" {role}: {count} messages")
print()

# Show usage rows with model and spans
print("4. Usage records (per model response):")
cursor = conn.execute(
"""
SELECT response_id, model, requests, input_tokens, output_tokens, total_tokens, trace_id, span_id, created_at
FROM agent_usage
WHERE session_id = 'demo_session'
ORDER BY created_at
"""
)
usage_rows = cursor.fetchall()
if not usage_rows:
print(" (no usage rows found — ensure your model/provider returns usage)")
for row in usage_rows:
response_id, model, requests, in_toks, out_toks, total, trace_id, span_id, created_at = row
print(
f" model={model} resp_id={response_id} reqs={requests} in={in_toks} out={out_toks} total={total}"
)
print(f" trace={trace_id} span={span_id} at={created_at}")
print()

conn.close()
session.close()


if __name__ == "__main__":
asyncio.run(main())
3 changes: 2 additions & 1 deletion examples/realtime/app/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@
import logging
import struct
from contextlib import asynccontextmanager
from typing import TYPE_CHECKING, Any, assert_never
from typing import TYPE_CHECKING, Any

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles
from typing_extensions import assert_never

from agents.realtime import RealtimeRunner, RealtimeSession, RealtimeSessionEvent

Expand Down
8 changes: 8 additions & 0 deletions src/agents/_run_impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -548,6 +548,14 @@ async def run_single_tool(
func_tool: FunctionTool, tool_call: ResponseFunctionToolCall
) -> Any:
with function_span(func_tool.name) as span_fn:
# Register span info so session storage can attribute this tool call
try:
# noqa: WPS433 import inside to avoid circular dependency
from .memory.session import register_tool_call_span

register_tool_call_span(tool_call.call_id, span_fn.trace_id, span_fn.span_id)
except Exception:
pass # Non-critical
tool_context = ToolContext.from_agent_context(
context_wrapper,
tool_call.call_id,
Expand Down
Loading