refactor(experimental): reuse HTTP clients, add response models, and parallelize ops#1253
Conversation
…parallelize ops in inference service Replace per-request httpx/requests client creation with shared long-lived clients across the inference service stack (controller, gateway, router, data proxy, InfBridge). This eliminates repeated TCP connection setup and TLS handshake overhead on every API call. Key changes: - Controller: shared httpx.Client/AsyncClient, idempotent destroy() - Gateway: shared AsyncClient via lifespan, _use_client() helper in streaming - Router: shared AsyncClient, parallel health checks via asyncio.gather - Data proxy: Pydantic response models, shared client, parallel callbacks - InfBridge: shared AsyncClient with proper aclose() lifecycle - Parallelize: proxy registration, set_version, pause/continue broadcasts - Add Pydantic BaseModel response types across all services for type safety Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
4215028 to
e5e9e94
Compare
There was a problem hiding this comment.
Code Review
This pull request refactors the inference service components to use shared httpx clients instead of creating new ones per request or using the requests library. It introduces Pydantic response models for better API documentation and validation across the controller, data proxy, gateway, and router. Additionally, several operations have been parallelized using asyncio.gather or ThreadPoolExecutor to improve performance, including worker health checks, callback deliveries, and version updates. Feedback includes a recommendation to avoid catching BaseException to prevent interfering with task cancellation, and suggestions to reuse shared httpx clients in contexts where they were incorrectly assumed to be thread-unsafe or incompatible with streaming.
| config.request_timeout, | ||
| client=http_client, | ||
| ) | ||
| except BaseException as exc: |
There was a problem hiding this comment.
Catching BaseException is generally discouraged as it includes system-level exceptions like KeyboardInterrupt and SystemExit. More importantly, in an async context, it will catch asyncio.CancelledError, which can prevent tasks from being cancelled properly and lead to unexpected behavior or hangs during shutdown. Use Exception instead.
| except BaseException as exc: | |
| except Exception as exc: |
| # Each thread gets its own httpx.Client because httpx.Client | ||
| # is not thread-safe and must not be shared across threads. | ||
| with httpx.Client() as client: | ||
| resp = client.post( | ||
| f"{router_addr}/register", | ||
| json={"worker_addr": data_proxy_addr}, | ||
| headers={"Authorization": f"Bearer {admin_key}"}, | ||
| timeout=5, | ||
| ) |
There was a problem hiding this comment.
The comment stating that httpx.Client is not thread-safe is incorrect. According to the httpx documentation, both Client and AsyncClient are thread-safe and intended to be shared across threads and tasks. Creating a new client per thread is unnecessary and inefficient. You should use the existing self._sync_client instance instead.
# httpx.Client is thread-safe and can be shared across threads.
resp = self._sync_client.post(
f"{router_addr}/register",
json={"worker_addr": data_proxy_addr},
headers={"Authorization": f"Bearer {admin_key}"},
timeout=5,
)| try: | ||
| async with httpx.AsyncClient(timeout=httpx.Timeout(timeout)) as client: | ||
| async with client.stream( | ||
| async with httpx.AsyncClient(timeout=httpx.Timeout(timeout)) as c: |
There was a problem hiding this comment.
The note above (lines 452-454) claiming that streaming requires a per-request client is incorrect. httpx.AsyncClient supports multiple concurrent streams using the same client instance. Reusing the shared client via the client parameter (using the _use_client helper already defined in this file) would be more efficient and consistent with the PR's objectives.
| async with httpx.AsyncClient(timeout=httpx.Timeout(timeout)) as c: | |
| async with _use_client(client, timeout) as c: |
Description
Refactor the inference service HTTP layer to reuse long-lived httpx clients instead of creating per-request clients. Adds Pydantic response models across all services for type safety, and parallelizes sequential operations (health checks, proxy registrations, broadcasts) for better throughput.
Related Issue
Fixes #1217
Type of Change
Checklist
pre-commit run --all-files)./docs/build_all.sh)main/review-prcommand/create-prBreaking Change Details (if applicable):
N/A
Additional Context
Key changes:
httpx.Client/AsyncClient, idempotentdestroy(), parallel proxy registration viaThreadPoolExecutor, parallelset_version/pause/continueviaasyncio.gatherAsyncClientvia lifespan,_use_client()context manager in streaming module, parallel data proxy registrationAsyncClient, parallel health checks viaasyncio.gather, proper lifespan cleanup withtry/finallyInfBridgecleanup on shutdown and backend reconfigurationAsyncClientwithaclose()lifecycle methodFiles changed:
areal/experimental/inference_service/controller/controller.pyareal/experimental/inference_service/data_proxy/app.pyareal/experimental/inference_service/data_proxy/pause.pyareal/experimental/inference_service/gateway/app.pyareal/experimental/inference_service/gateway/streaming.pyareal/experimental/inference_service/inf_bridge.pyareal/experimental/inference_service/router/app.pytests/experimental/inference_service/test_data_proxy_chat.py