-
Notifications
You must be signed in to change notification settings - Fork 3
[EPD] Allow to deallocate ec cache #18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Signed-off-by: knlnguyen1802 <[email protected]>
Signed-off-by: knlnguyen1802 <[email protected]>
Signed-off-by: knlnguyen1802 <[email protected]>
|
cc: @fake0fan @khuonglmhw @herotai214 for review |
Signed-off-by: knlnguyen1802 <[email protected]>
Signed-off-by: knlnguyen1802 <[email protected]>
Signed-off-by: knlnguyen1802 <[email protected]>
Signed-off-by: knlnguyen1802 <[email protected]>
Signed-off-by: knlnguyen1802 <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR introduces encoder cache deallocation capabilities for the EC (Encoder Cache) transfer system. The main purpose is to enable automatic cleanup of remote encoder cache files when they are no longer needed, based on read/write count tracking.
Key changes include:
- Added metadata tracking (read_count, write_count) to monitor cache usage and trigger deallocation when appropriate
- Extended the scheduler and connector interfaces to pass cache hit information (local_hit, remote_hit) through the allocation flow
- Implemented file-based locking mechanism to prevent race conditions during concurrent metadata operations
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 4 comments.
| File | Description |
|---|---|
| vllm/v1/worker/ec_connector_model_runner_mixin.py | Removed debug log and added call to update remote cache state |
| vllm/v1/core/sched/scheduler.py | Updated scheduler to track and pass local/remote cache hit information through allocation flow |
| vllm/distributed/ec_transfer/ec_connector/example_connector.py | Implemented deallocation logic with metadata tracking, file locking, and cache cleanup |
| vllm/distributed/ec_transfer/ec_connector/base.py | Added abstract methods for cache state updates with hit information parameters |
Comments suppressed due to low confidence (2)
vllm/distributed/ec_transfer/ec_connector/example_connector.py:1
- The variable name 'external_update_encoder_input' is ambiguous. Consider renaming it to 'encoder_cache_hit_tracking' or 'encoder_input_hit_states' to better reflect that it tracks cache hit status (local_hit, remote_hit) rather than just updates.
# SPDX-License-Identifier: Apache-2.0
vllm/distributed/ec_transfer/ec_connector/example_connector.py:1
- Corrected 'Store to option update' to 'Store to optionally update' for grammatical correctness.
# SPDX-License-Identifier: Apache-2.0
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| # Iterate over a snapshot so we can modify the original safely. | ||
| for mm_hash, num_token in list(self._mm_datas_need_saves.items()): |
Copilot
AI
Dec 16, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The _mm_datas_need_saves dictionary is accessed without acquiring the _mm_state_lock, but it's modified later under the lock (line 318). This creates a potential race condition. The lock should be acquired before creating the snapshot with list() to ensure thread-safe iteration.
| # Iterate over a snapshot so we can modify the original safely. | |
| for mm_hash, num_token in list(self._mm_datas_need_saves.items()): | |
| # Take a snapshot under lock to ensure thread safety. | |
| with self._mm_state_lock: | |
| mm_datas_need_saves_snapshot = list(self._mm_datas_need_saves.items()) | |
| for mm_hash, num_token in mm_datas_need_saves_snapshot: |
| # are a consumer/non-producer we increment the read_count | ||
| # (update_mm_meta_read) so deallocation can be observed. | ||
| processed_updates: list[str] = [] | ||
| for mm_hash in list(self._mm_datas_need_update_meta): |
Copilot
AI
Dec 16, 2025
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The _mm_datas_need_update_meta list is accessed without acquiring the _mm_state_lock, but it's modified later under the lock (line 320). This creates a potential race condition. The lock should be acquired before creating the snapshot with list() to ensure thread-safe iteration.
| for mm_hash in list(self._mm_datas_need_update_meta): | |
| with self._mm_state_lock: | |
| mm_datas_need_update_meta_snapshot = list(self._mm_datas_need_update_meta) | |
| for mm_hash in mm_datas_need_update_meta_snapshot: |
Co-authored-by: Copilot <[email protected]>
Co-authored-by: Copilot <[email protected]>
fake0fan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've left a few suggestions for your consideration.
Please also consider what if such an atomic count needs to be maintained by an EC connector.
Would this be easily scalable for other external storage?
|
|
||
| logger = init_logger(__name__) | ||
|
|
||
| # Cache FileLock objects per path to avoid reallocating locks repeatedly. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Therefore, we need a file lock for each mm-encoded embedding?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right
| external_update_encoder_input: list[tuple[int, bool, bool]] = [] | ||
|
|
||
| # Check remote cache first | ||
| if self.ec_connector is not None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I remember there was a previous PR that needed to avoid repeatedly checking all the encode caches every time it was accessed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It here: knlnguyen1802#7
| with open(meta_filename, "r+") as f: | ||
| data = json.load(f) | ||
| data["write_count"] = data.get("write_count", 0) + 1 | ||
| data["mm_hash"] = mm_meta.mm_hash |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is there still an update here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To make consistent between Encoder and PD, every access to local encoder cache also need to update remote cache
Signed-off-by: Khuong Le <[email protected]>
Signed-off-by: Khuong Le <[email protected]>
Signed-off-by: Khuong Le <[email protected]>
Purpose
We can enable a flag "deallocate_cache" in start script to allow deallocate ec cache after the PD read it
Click to expand code example
Design
Test Result
Essential Elements of an Effective PR Description Checklist
supported_models.mdandexamplesfor a new model.