Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@ openai = ["openai>=1.102.0"]
[dependency-groups]
dev = [
"dirty-equals>=0.9.0",
# Dependency to support Docket memory:// brokers - only necessary until next fakeredis release
"fakeredis[lua] @ git+https://github.com/cunla/fakeredis-py.git@ad50a0de8d6dce554fb629ec284bc4ccbc6a7f12",
"fastmcp[openai]",
# add optional dependencies for fastmcp dev
"fastapi>=0.115.12",
Expand Down
4 changes: 3 additions & 1 deletion src/fastmcp/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@

from docket import Depends

__all__ = ["Depends"]
from fastmcp.server.dependencies import CurrentContext, CurrentDocket, CurrentWorker

__all__ = ["CurrentContext", "CurrentDocket", "CurrentWorker", "Depends"]
151 changes: 149 additions & 2 deletions src/fastmcp/server/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,11 @@
import inspect
from collections.abc import AsyncGenerator, Callable
from contextlib import AsyncExitStack, asynccontextmanager
from contextvars import ContextVar
from functools import lru_cache
from typing import TYPE_CHECKING, Any, get_type_hints
from typing import TYPE_CHECKING, Any, cast, get_type_hints

from docket.dependencies import _Depends, get_dependency_parameters
from docket.dependencies import Dependency, _Depends, get_dependency_parameters
from mcp.server.auth.middleware.auth_context import (
get_access_token as _sdk_get_access_token,
)
Expand All @@ -20,10 +21,20 @@
from fastmcp.utilities.types import is_class_member_of_type

if TYPE_CHECKING:
from docket import Docket
from docket.worker import Worker

from fastmcp.server.context import Context

# ContextVars for tracking Docket infrastructure
_current_docket: ContextVar[Docket | None] = ContextVar("docket", default=None) # type: ignore[assignment]
_current_worker: ContextVar[Worker | None] = ContextVar("worker", default=None) # type: ignore[assignment]

__all__ = [
"AccessToken",
"CurrentContext",
"CurrentDocket",
"CurrentWorker",
"get_access_token",
"get_context",
"get_http_headers",
Expand Down Expand Up @@ -126,6 +137,9 @@ async def _resolve_fastmcp_dependencies(
- A cache for resolved dependencies
- An AsyncExitStack for managing context manager lifetimes

The Docket instance (for CurrentDocket dependency) is managed separately
by the server's lifespan and made available via ContextVar.

Note: This does NOT set up Docket's Execution context. If user code needs
Docket-specific dependencies like TaskArgument(), TaskKey(), etc., those
will fail with clear errors about missing context.
Expand Down Expand Up @@ -235,6 +249,139 @@ def get_context() -> Context:
return context


class _CurrentContext(Dependency):
"""Internal dependency class for CurrentContext."""

async def __aenter__(self) -> Context:
return get_context()


def CurrentContext() -> Context:
"""Get the current FastMCP Context instance.

This dependency provides access to the active FastMCP Context for the
current MCP operation (tool/resource/prompt call).

Returns:
A dependency that resolves to the active Context instance

Raises:
RuntimeError: If no active context found (during resolution)

Example:
```python
from fastmcp.dependencies import CurrentContext

@mcp.tool()
async def log_progress(ctx: Context = CurrentContext()) -> str:
ctx.report_progress(50, 100, "Halfway done")
return "Working"
```
"""
return cast("Context", _CurrentContext())


class _CurrentDocket(Dependency):
"""Internal dependency class for CurrentDocket."""

async def __aenter__(self) -> Docket:
import fastmcp

# Check if experimental flag is enabled
if not fastmcp.settings.experimental.enable_docket:
raise RuntimeError(
"Docket support is not enabled. "
"Set FASTMCP_EXPERIMENTAL_ENABLE_DOCKET=true to enable experimental Docket support."
)

# Get Docket from ContextVar (set by _docket_lifespan)
docket = _current_docket.get()
if docket is None:
raise RuntimeError(
"No Docket instance found. This should not happen when "
"FASTMCP_EXPERIMENTAL_ENABLE_DOCKET is enabled."
)

return docket


def CurrentDocket() -> Docket:
"""Get the current Docket instance managed by FastMCP.

This dependency provides access to the Docket instance that FastMCP
automatically creates when experimental Docket support is enabled.

Requires:
- FASTMCP_EXPERIMENTAL_ENABLE_DOCKET=true

Returns:
A dependency that resolves to the active Docket instance

Raises:
RuntimeError: If experimental flag not enabled (during resolution)

Example:
```python
from fastmcp.dependencies import CurrentDocket

@mcp.tool()
async def schedule_task(docket: Docket = CurrentDocket()) -> str:
await docket.add(some_function)(arg1, arg2)
return "Scheduled"
```
"""
return cast("Docket", _CurrentDocket())


class _CurrentWorker(Dependency):
"""Internal dependency class for CurrentWorker."""

async def __aenter__(self) -> Worker:
import fastmcp

if not fastmcp.settings.experimental.enable_docket:
raise RuntimeError(
"Docket support is not enabled. "
"Set FASTMCP_EXPERIMENTAL_ENABLE_DOCKET=true to enable experimental Docket support."
)

worker = _current_worker.get()
if worker is None:
raise RuntimeError(
"No Worker instance found. This should not happen when "
"FASTMCP_EXPERIMENTAL_ENABLE_DOCKET is enabled."
)

return worker


def CurrentWorker() -> Worker:
"""Get the current Docket Worker instance managed by FastMCP.

This dependency provides access to the Worker instance that FastMCP
automatically creates when experimental Docket support is enabled.

Requires:
- FASTMCP_EXPERIMENTAL_ENABLE_DOCKET=true

Returns:
A dependency that resolves to the active Worker instance

Raises:
RuntimeError: If experimental flag not enabled (during resolution)

Example:
```python
from fastmcp.dependencies import CurrentWorker

@mcp.tool()
async def check_worker_status(worker: Worker = CurrentWorker()) -> str:
return f"Worker: {worker.name}"
```
"""
return cast("Worker", _CurrentWorker())


def get_http_request() -> Request:
from mcp.server.lowlevel.server import request_ctx

Expand Down
55 changes: 54 additions & 1 deletion src/fastmcp/server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,13 +382,66 @@ def icons(self) -> list[mcp.types.Icon]:
else:
return list(self._mcp_server.icons)

@asynccontextmanager
async def _docket_lifespan(
self, user_lifespan_result: LifespanResultT
) -> AsyncIterator[LifespanResultT]:
"""Manage Docket instance and Worker when experimental support is enabled.

Args:
user_lifespan_result: The result from the user's lifespan function

Yields:
User's lifespan result (Docket is managed via ContextVar, not lifespan result)
"""
from fastmcp import settings
from fastmcp.server.dependencies import _current_docket, _current_worker

if not settings.experimental.enable_docket:
# Docket support not enabled, pass through user lifespan result
yield user_lifespan_result
return

# Import Docket components
from docket import Docket, Worker

# Create Docket instance with memory:// URL
async with Docket(url="memory://") as docket:
# Set Docket in ContextVar so CurrentDocket can access it
docket_token = _current_docket.set(docket)
try:
# Create and start Worker, then task group for run_forever()
async with (
Worker(docket) as worker,
anyio.create_task_group() as tg,
):
# Set Worker in ContextVar so CurrentWorker can access it
worker_token = _current_worker.set(worker)
try:
# Start worker as background task
tg.start_soon(worker.run_forever)

try:
yield user_lifespan_result
finally:
# Cancel task group when exiting (cancels worker)
tg.cancel_scope.cancel()
finally:
_current_worker.reset(worker_token)
finally:
# Reset ContextVar
_current_docket.reset(docket_token)

@asynccontextmanager
async def _lifespan_manager(self) -> AsyncIterator[None]:
if self._lifespan_result_set:
yield
return

async with self._lifespan(self) as lifespan_result:
async with (
self._lifespan(self) as user_lifespan_result,
self._docket_lifespan(user_lifespan_result) as lifespan_result,
):
self._lifespan_result = lifespan_result
self._lifespan_result_set = True

Expand Down
13 changes: 13 additions & 0 deletions src/fastmcp/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,19 @@ class ExperimentalSettings(BaseSettings):
),
] = False

enable_docket: Annotated[
bool,
Field(
description=inspect.cleandoc(
"""
Enable experimental Docket support for background task execution.
When enabled, FastMCP will create a Docket instance with a Worker
to process background tasks.
"""
),
),
] = False


class Settings(BaseSettings):
"""FastMCP settings."""
Expand Down
35 changes: 34 additions & 1 deletion tests/server/test_dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@

from fastmcp import FastMCP
from fastmcp.client import Client
from fastmcp.dependencies import Depends
from fastmcp.dependencies import CurrentContext, Depends
from fastmcp.server.context import Context

HUZZAH = "huzzah!"


class Connection:
"""Test connection that tracks whether it's currently open."""
Expand Down Expand Up @@ -147,6 +149,37 @@ async def my_tool(
assert len(tool.inputSchema["properties"]) == 2


async def test_current_context_dependency(mcp: FastMCP):
"""Test that CurrentContext dependency provides access to FastMCP Context."""

@mcp.tool()
def use_context(ctx: Context = CurrentContext()) -> str:
assert isinstance(ctx, Context)
return HUZZAH

async with Client(mcp) as client:
result = await client.call_tool("use_context", {})
assert HUZZAH in str(result)


async def test_current_context_and_legacy_context_coexist(mcp: FastMCP):
"""Test that CurrentContext dependency and legacy Context injection work together."""

@mcp.tool()
def use_both_contexts(
legacy_ctx: Context,
dep_ctx: Context = CurrentContext(),
) -> str:
assert isinstance(legacy_ctx, Context)
assert isinstance(dep_ctx, Context)
assert legacy_ctx is dep_ctx
return HUZZAH

async with Client(mcp) as client:
result = await client.call_tool("use_both_contexts", {})
assert HUZZAH in str(result)


async def test_backward_compat_context_still_works(mcp: FastMCP):
"""Test that existing Context injection via type annotation still works."""

Expand Down
Loading
Loading