Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ ___
ddgs api # Start in foreground
ddgs api -d # Start in detached mode
ddgs api -s # Stop detached server
ddgs api --host 127.0.0.1 --port 9000 --proxy socks5h://127.0.0.1:9150 # Custom host / post / proxy
ddgs api --transport http # Start with Streamable HTTP transport at /mcp
ddgs api --host 127.0.0.1 --port 9000 --proxy socks5h://127.0.0.1:9150 # Custom host / port / proxy
```

- **Docker compose**
Expand All @@ -58,10 +59,15 @@ chmod +x start_api.sh

#### Available Endpoints
- MCP Endpoints (for AI assistance):
- `http://localhost:8000/sse` - SSE transport
- `http://localhost:8000/mcp` - [Streamable HTTP transport](https://modelcontextprotocol.io/specification/2025-11-25/basic/transports#streamable-http) (`ddgs api --transport http`)
- `http://localhost:8000/sse` - [SSE transport](https://modelcontextprotocol.io/specification/2024-11-05/basic/transports#http-with-sse) (`ddgs api --transport sse`, default)
- API Docs: `http://localhost:8000/docs`
- Health Check: `http://localhost:8000/health`

Use `--endpoint` to replace either default path with your own, for example `ddgs api --transport http --endpoint /search`.

SSE remains the default transport for backward compatibility, but Streamable HTTP is recommended for new MCP clients.

#### Available MCP Tools
- `search_text` - Web text searches
- `search_images` - Image searches
Expand Down
62 changes: 53 additions & 9 deletions ddgs/api_server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,60 @@
This module consolidates the FastAPI application and MCP server.
"""

import logging
import importlib
from collections.abc import AsyncIterator
from contextlib import asynccontextmanager
from typing import Any

# Import FastAPI app and MCP server
from ddgs.api_server.api import app as fastapi_app
from ddgs.api_server.mcp import mcp
from fastapi import FastAPI
from mcp.server.fastmcp import FastMCP

logger = logging.getLogger(__name__)
from ddgs.mcp_transport import normalize_mcp_endpoint, normalize_mcp_transport

# Mount MCP SSE endpoint to FastAPI app
fastapi_app.mount("/", mcp.sse_app())
logger.info("MCP server enabled at /sse")

__all__ = ["fastapi_app", "mcp"]
def _mount_http_transport(app: FastAPI, mcp_server: FastMCP[Any], endpoint: str) -> None:
"""Mount the HTTP MCP transport and bridge its lifespan into FastAPI."""
mcp_server.settings.streamable_http_path = endpoint
streamable_http_app = mcp_server.streamable_http_app()

# Mounted sub-app lifespans are not entered by FastAPI/Starlette, so the parent
# app needs to drive the streamable session manager explicitly.
original_lifespan = app.router.lifespan_context

@asynccontextmanager
async def lifespan(instance: FastAPI) -> AsyncIterator[Any]:
async with original_lifespan(instance) as lifespan_state, mcp_server.session_manager.run():
yield lifespan_state

app.router.lifespan_context = lifespan
app.mount("/", streamable_http_app)


def _create_fastapi_app_and_mcp(
transport: str | None = None, endpoint: str | None = None
) -> tuple[FastAPI, FastMCP[Any]]:
api_module = importlib.reload(importlib.import_module("ddgs.api_server.api"))
mcp_module = importlib.reload(importlib.import_module("ddgs.api_server.mcp"))

Comment on lines +38 to +40
fastapi_app = api_module.app
mcp_server = mcp_module.mcp
normalized_transport = normalize_mcp_transport(transport)
normalized_endpoint = normalize_mcp_endpoint(endpoint, normalized_transport)

if normalized_transport == "http":
_mount_http_transport(fastapi_app, mcp_server, normalized_endpoint)
else:
mcp_server.settings.sse_path = normalized_endpoint
fastapi_app.mount("/", mcp_server.sse_app())

return fastapi_app, mcp_server


def create_fastapi_app(transport: str | None = None, endpoint: str | None = None) -> FastAPI:
"""Create a FastAPI app with the selected MCP transport and endpoint."""
fastapi_app, _ = _create_fastapi_app_and_mcp(transport=transport, endpoint=endpoint)
return fastapi_app

fastapi_app, mcp = _create_fastapi_app_and_mcp()

__all__ = ["create_fastapi_app", "fastapi_app", "mcp"]
139 changes: 104 additions & 35 deletions ddgs/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,26 @@
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timezone
from pathlib import Path
from typing import TYPE_CHECKING
from urllib.parse import unquote

import click
import primp

from . import __version__
from .ddgs import DDGS
from .mcp_transport import (
DEFAULT_MCP_TRANSPORT,
MCP_TRANSPORT_CHOICES,
get_mcp_transport_endpoint,
normalize_mcp_endpoint,
normalize_mcp_transport,
)
from .utils import _expand_proxy_tb_alias

if TYPE_CHECKING:
from fastapi import FastAPI

# Use a consistent PID file location in user's home directory
_PID_FILE = Path.home() / ".cache" / "ddgs" / "api.pid"

Expand All @@ -42,6 +53,39 @@
}


def _create_api_server_app(transport: str, endpoint: str) -> "FastAPI":
"""Create a configured FastAPI app for the DDGS API server."""
from .api_server import create_fastapi_app # noqa: PLC0415

return create_fastapi_app(transport=transport, endpoint=endpoint)


def _uses_default_api_server_config(transport: str, endpoint: str) -> bool:
"""Return whether the requested API server config matches the default import path."""
return transport == DEFAULT_MCP_TRANSPORT and endpoint == get_mcp_transport_endpoint(transport)


def _get_detached_api_command(host: str, port: int, transport: str, endpoint: str, proxy: str | None) -> list[str]:
"""Build the detached child process command for the API server."""
cmd = [
sys.executable,
"-m",
"ddgs.cli",
"api",
"--host",
host,
"--port",
str(port),
"--transport",
transport,
]
if endpoint != get_mcp_transport_endpoint(transport):
cmd.extend(["--endpoint", endpoint])
if proxy:
cmd.extend(["--proxy", proxy])
return cmd


def _convert_tuple_to_csv(_ctx: click.Context, _param: click.Parameter, value: tuple[str] | None) -> str:
if value is not None and isinstance(value, tuple):
return ",".join(value)
Expand Down Expand Up @@ -531,17 +575,37 @@ def books(
@click.option("--host", default="0.0.0.0", help="Host to bind the server to") # noqa: S104
@click.option("--port", default=8000, type=int, help="Port to bind the server to")
@click.option("--reload", is_flag=True, help="Enable auto-reload on code changes")
@click.option(
"--transport",
default=DEFAULT_MCP_TRANSPORT,
type=click.Choice(MCP_TRANSPORT_CHOICES, case_sensitive=False),
show_default=True,
help="MCP transport to expose",
)
@click.option("--endpoint", help="Override the MCP endpoint path")
@click.option("-pr", "--proxy", help="the proxy to send requests, example: socks5h://127.0.0.1:9150")
def api(detach: bool, stop: bool, host: str, port: int, reload: bool, proxy: str | None) -> None: # noqa: FBT001, PLR0912, C901
def api( # noqa: C901, PLR0912, PLR0915
*,
detach: bool,
stop: bool,
host: str,
port: int,
reload: bool,
transport: str,
endpoint: str | None,
proxy: str | None,
) -> None:
"""Start/stop the DDGS MCP API server.

Starts a FastAPI server with MCP (Model Context Protocol) support for search tools.
The server exposes SSE endpoint at /sse and supports text, image, news, video, and book search.
The server exposes the selected MCP endpoint and supports text, image, news, video, and book search.

Examples:
ddgs api # Start server in foreground
ddgs api -d # Start server in detached mode
ddgs api -s # Stop the detached server
ddgs api --transport http # Expose HTTP transport at /mcp
ddgs api --transport http --endpoint /search # Expose HTTP transport at /search
ddgs api --host 127.0.0.1 --port 9000 # Bind to specific host/port
ddgs api -pr socks5h://127.0.0.1:9150 # Use proxy

Expand All @@ -566,36 +630,34 @@ def api(detach: bool, stop: bool, host: str, port: int, reload: bool, proxy: str
return

try:
import subprocess # noqa: PLC0415

import uvicorn # noqa: PLC0415
except ImportError:
click.echo("Error: API dependencies not installed. Run: pip install 'ddgs[api]'", err=True)
return

# Prepare proxy environment variable
proxy_env = os.environ.copy()
if proxy:
proxy_env["DDGS_PROXY"] = _expand_proxy_tb_alias(proxy) or proxy
normalized_transport = normalize_mcp_transport(transport)
normalized_endpoint = normalize_mcp_endpoint(endpoint, normalized_transport)
uses_default_api_server_config = _uses_default_api_server_config(normalized_transport, normalized_endpoint)
expanded_proxy = (_expand_proxy_tb_alias(proxy) or proxy) if proxy else None

Comment on lines +638 to +642
if reload and not uses_default_api_server_config:
click.echo("Error: --reload is only supported with the default SSE endpoint /sse", err=True)
return

if detach:
try:
import subprocess # noqa: PLC0415
except ImportError:
click.echo("Error: API dependencies not installed. Run: pip install 'ddgs[api]'", err=True)
return

import time # noqa: PLC0415

cmd = [
sys.executable,
"-m",
"uvicorn",
"ddgs.api_server:fastapi_app",
"--host",
host,
"--port",
str(port),
]
cmd = _get_detached_api_command(host, port, normalized_transport, normalized_endpoint, proxy)
process = subprocess.Popen( # noqa: S603
cmd,
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
env=proxy_env,
)

# Wait briefly and verify process started successfully
Expand All @@ -606,25 +668,32 @@ def api(detach: bool, stop: bool, host: str, port: int, reload: bool, proxy: str

_PID_FILE.write_text(str(process.pid))
click.echo(f"DDGS API server started in detached mode on http://{host}:{port} (PID: {process.pid})")
if proxy:
click.echo(f"Using proxy: {proxy_env['DDGS_PROXY']}")
click.echo("MCP server enabled at /sse")
if expanded_proxy:
click.echo(f"Using proxy: {expanded_proxy}")
click.echo(f"MCP server enabled with {normalized_transport} transport at {normalized_endpoint}")
else:
click.echo(f"Starting DDGS API server on http://{host}:{port}")
if proxy:
click.echo(f"Using proxy: {proxy_env['DDGS_PROXY']}")
click.echo("MCP server enabled at /sse")
if expanded_proxy:
os.environ["DDGS_PROXY"] = expanded_proxy
click.echo(f"Using proxy: {expanded_proxy}")
click.echo(f"MCP server enabled with {normalized_transport} transport at {normalized_endpoint}")
click.echo("Press Ctrl+C to stop")
# Set environment variable for the current process
if proxy:
os.environ["DDGS_PROXY"] = proxy_env["DDGS_PROXY"]
uvicorn.run(
"ddgs.api_server:fastapi_app",
host=host,
port=port,
log_level="info",
reload=reload,
)
if uses_default_api_server_config:
uvicorn.run(
"ddgs.api_server:fastapi_app",
host=host,
port=port,
log_level="info",
reload=reload,
)
else:
uvicorn.run(
_create_api_server_app(normalized_transport, normalized_endpoint),
host=host,
port=port,
log_level="info",
reload=False,
)


if __name__ == "__main__":
Expand Down
72 changes: 72 additions & 0 deletions ddgs/mcp_transport.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
"""Helpers for configuring DDGS MCP transports."""

from typing import Final, Literal, TypeAlias

MCPTransport: TypeAlias = Literal["sse", "http"]

DEFAULT_MCP_TRANSPORT: Final[MCPTransport] = "sse"
MCP_TRANSPORT_CHOICES: Final[tuple[MCPTransport, ...]] = ("sse", "http")

_MCP_TRANSPORT_ENDPOINTS: Final[dict[MCPTransport, str]] = {
"sse": "/sse",
"http": "/mcp",
}


class InvalidMCPTransportError(ValueError):
"""Raised when DDGS receives an unsupported MCP transport value."""

def __init__(self, transport: str) -> None:
self.transport = transport
super().__init__(transport)

def __str__(self) -> str:
"""Return a descriptive error for unsupported transport values."""
choices = ", ".join(MCP_TRANSPORT_CHOICES)
return f"Unsupported MCP transport {self.transport!r}. Expected one of: {choices}."


def normalize_mcp_transport(transport: str | None) -> MCPTransport:
"""Normalize an MCP transport string to a supported DDGS transport."""
if transport is None:
return DEFAULT_MCP_TRANSPORT

normalized_transport = transport.strip().lower()
if normalized_transport == "sse":
return "sse"
if normalized_transport == "http":
return "http"
raise InvalidMCPTransportError(transport)


def get_mcp_transport_endpoint(transport: str | None) -> str:
"""Return the public endpoint path for the configured MCP transport."""
return _MCP_TRANSPORT_ENDPOINTS[normalize_mcp_transport(transport)]


class InvalidMCPEndpointError(ValueError):
"""Raised when DDGS receives an invalid MCP endpoint value."""

def __init__(self, endpoint: str) -> None:
self.endpoint = endpoint
super().__init__(endpoint)

def __str__(self) -> str:
"""Return a descriptive error for unsupported endpoint values."""
return f"Invalid MCP endpoint {self.endpoint!r}. Expected a non-empty path."


def normalize_mcp_endpoint(endpoint: str | None, transport: str | None = None) -> str:
"""Normalize an MCP endpoint path.

If no endpoint is provided, the default endpoint for the given transport is used.
"""
if endpoint is None:
return get_mcp_transport_endpoint(transport)

normalized_endpoint = endpoint.strip()
if not normalized_endpoint:
raise InvalidMCPEndpointError(endpoint)

normalized_endpoint = f"/{normalized_endpoint.lstrip('/')}"
return normalized_endpoint.rstrip("/") or "/"
Loading
Loading