-
Notifications
You must be signed in to change notification settings - Fork 3
Epd mooncake engine #19
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR adds Mooncake engine support for disaggregated encoder cache (EC) transfer in encoder-prefill-decode (EPD) architectures. The implementation enables remote encoder cache transfer between encoder workers and prefill/decode workers using the Mooncake transfer engine with RDMA support, improving performance in disaggregated serving scenarios.
Key Changes:
- Implemented MooncakeECConnector with scheduler and worker components for EC transfer over RDMA
- Extended TensorMemoryPool to support both CPU (pinned) and CUDA device memory with auto-eviction capability
- Added ec_transfer_params propagation throughout the request/response pipeline (similar to kv_transfer_params)
Reviewed changes
Copilot reviewed 19 out of 19 changed files in this pull request and generated 11 comments.
Show a summary per file
| File | Description |
|---|---|
| vllm/distributed/ec_transfer/ec_connector/mooncake_connector.py | New Mooncake connector implementation for EC transfer with scheduler/worker split and ZMQ-based coordination |
| vllm/distributed/kv_transfer/kv_connector/v1/p2p/tensor_memory_pool.py | Enhanced memory pool to support CUDA device memory and auto-eviction for flexible buffer management |
| vllm/v1/worker/gpu_model_runner.py | Integrated transfer pool initialization and wait_for_ec_load call before MM embedding gathering |
| vllm/v1/worker/ec_connector_model_runner_mixin.py | Added wait_for_load method to synchronize EC transfers before use |
| vllm/v1/request.py | Added ec_transfer_params field to Request for propagating EC transfer metadata |
| vllm/v1/engine/init.py | Added ec_transfer_params to EngineCoreOutput for returning EC metadata |
| vllm/v1/engine/output_processor.py | Plumbed ec_transfer_params through output processing pipeline |
| vllm/v1/core/sched/scheduler.py | Added _ec_connector_finished method and ec_transfer_params handling in request lifecycle |
| vllm/outputs.py | Added ec_transfer_params field to RequestOutput |
| vllm/envs.py | Added VLLM_EC_MOONCAKE_BOOTSTRAP_PORT configuration for Mooncake handshake |
| vllm/entrypoints/openai/serving_completion.py | Propagated ec_transfer_params in completion response generation |
| vllm/entrypoints/openai/serving_chat.py | Propagated ec_transfer_params in chat completion responses |
| vllm/entrypoints/openai/protocol.py | Added ec_transfer_params fields to request/response protocol models |
| vllm/distributed/ec_transfer/ec_connector/factory.py | Registered MooncakeECConnector in connector factory |
| vllm/distributed/ec_transfer/ec_connector/example_connector.py | Added wait_for_load stub to example connector for interface consistency |
| vllm/distributed/ec_transfer/ec_connector/base.py | Renamed register_caches to register_encoder_cache and added wait_for_load abstract method |
| examples/online_serving/disaggregated_encoder/mooncake_connector/disagg_1e1pd_example.sh | Shell script for 1 encoder + 1 prefill-decode worker setup with Mooncake |
| examples/online_serving/disaggregated_encoder/mooncake_connector/disagg_1e1p1d_example.sh | Shell script for 1 encoder + 1 prefill + 1 decode worker setup with Mooncake |
| examples/online_serving/disaggregated_encoder/disagg_epd_proxy.py | Enhanced proxy to aggregate ec_transfer_params from encoder responses and forward to prefill |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| None if decoder-only. | ||
| num_cached_tokens: The number of tokens with prefix cache hit. | ||
| kv_transfer_params: The params for remote K/V transfer. | ||
| ec_tranfer_params: The params for remote EC transfer. |
Copilot
AI
Jan 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo in docstring: "ec_tranfer_params" should be "ec_transfer_params" (missing 's').
| ec_tranfer_params: The params for remote EC transfer. | |
| ec_transfer_params: The params for remote EC transfer. |
| if not self.is_producer: | ||
| self._sender_executor.shutdown(wait=False) | ||
| if self._mooncake_sender_t: | ||
| self._mooncake_sender_t.join() | ||
| elif self.receiver_loop.is_running(): | ||
| self.receiver_loop.call_soon_threadsafe(self.receiver_loop.stop) | ||
| self._mooncake_receiver_t.join() |
Copilot
AI
Jan 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The shutdown logic appears to have inverted conditions. When is_producer is True, a producer should shutdown _sender_executor (which is created for producers). When is_producer is False (consumer), it should shutdown the receiver loop. Currently, the conditions are reversed.
| Raises: | ||
| ValueError: If address is invalid or not allocated | ||
| """ | ||
| if not addr: |
Copilot
AI
Jan 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The condition if not addr: will be True when addr is 0, which is a valid memory address. This should be if addr is None: to properly check for None.
| if not addr: | |
| if addr is None: |
vllm/distributed/kv_transfer/kv_connector/v1/p2p/tensor_memory_pool.py
Outdated
Show resolved
Hide resolved
vllm/distributed/ec_transfer/ec_connector/mooncake_connector.py
Outdated
Show resolved
Hide resolved
| while True: | ||
| try: | ||
| return self._allocate(required_size) | ||
| except ValueError: | ||
| if self.auto_evict: | ||
| self.free() | ||
| else: | ||
| raise | ||
|
|
Copilot
AI
Jan 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The auto_evict loop could run indefinitely if the pool cannot satisfy the allocation even after freeing the oldest block. There should be a check to prevent infinite looping, such as tracking whether any block was freed or limiting the number of eviction attempts.
| while True: | |
| try: | |
| return self._allocate(required_size) | |
| except ValueError: | |
| if self.auto_evict: | |
| self.free() | |
| else: | |
| raise | |
| # Bound the number of eviction attempts to avoid potential infinite loops. | |
| # We cannot free more blocks than currently allocated. | |
| max_evictions = len(self.allocated_blocks) | |
| evictions = 0 | |
| while True: | |
| try: | |
| return self._allocate(required_size) | |
| except ValueError as e: | |
| if not self.auto_evict: | |
| # Auto-eviction disabled: propagate the allocation failure. | |
| raise | |
| if evictions >= max_evictions: | |
| # All currently allocated blocks have already been considered | |
| # for eviction; further attempts are unlikely to succeed. | |
| raise e | |
| prev_len = len(self.allocated_blocks) | |
| try: | |
| # Free the oldest allocated block. | |
| self.free() | |
| except ValueError: | |
| # No block could be freed; propagate the original allocation error. | |
| raise e | |
| # Ensure that eviction made progress; if not, avoid looping endlessly. | |
| if len(self.allocated_blocks) >= prev_len: | |
| raise e | |
| evictions += 1 |
examples/online_serving/disaggregated_encoder/disagg_epd_proxy.py
Outdated
Show resolved
Hide resolved
Co-authored-by: Copilot <[email protected]>
Co-authored-by: Copilot <[email protected]>
Co-authored-by: Copilot <[email protected]>
Co-authored-by: Copilot <[email protected]>
…_pool.py Co-authored-by: Copilot <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copilot encountered an error and was unable to review this pull request. You can try again by re-requesting a review.
Purpose
Workflow
Test Plan
Potential issue: currently the transfer pool automatically evicts tensor when it is full, so the tensor might not get transferred. Im not sure if it could happen in production. The transfer pool size is 1GB and it works fine so far.
Test Result
100 requests / 400 text tokens / 3 images (560,560,1)
Mooncake
Disk
Correctness:
Essential Elements of an Effective PR Description Checklist
supported_models.mdandexamplesfor a new model.