Skip to content

Conversation

@knlnguyen1802
Copy link
Contributor

@knlnguyen1802 knlnguyen1802 commented Dec 18, 2025

Purpose

Fix #342 and partial fix #316
This will support to call sleep, wait and load_weights for AR ModelRunner


Essential Elements of an Effective PR Description Checklist
  • The purpose of the PR, such as "Fix some issue (link existing issues this PR will resolve)".
  • The test plan, such as providing test command.
  • The test results, such as pasting the results comparison before and after, or e2e results
  • (Optional) The necessary documentation update, such as updating supported_models.md and examples for a new model.
  • (Optional) Release notes update. If your change is user facing, please update the release notes draft.

BEFORE SUBMITTING, PLEASE READ https://github.com/vllm-project/vllm-omni/blob/main/CONTRIBUTING.md (anything written below this line will be removed by GitHub Actions)

Signed-off-by: knlnguyen1802 <[email protected]>
Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment on lines 525 to 529
_logging.getLogger(__name__).debug(
"[Stage-%s] Executing collective_rpc: method=%s", stage_id, method
)
result = stage_engine.collective_rpc(method, timeout, args, kwargs)
out_q.put({

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Worker RPC handler calls missing method

The collective_rpc path in the worker invokes stage_engine.collective_rpc(...), but the stage engine is an OmniStageLLM (created at line 500) and there is no collective_rpc implementation anywhere in that class or the rest of the repo. Every collective_rpc task will therefore raise an AttributeError and be returned as an error response, so the new API never actually executes on the worker.

Useful? React with 👍 / 👎.

Comment on lines 374 to 378
result = self.try_collect()
if result is not None and result.get("type") == "collective_rpc_result" and result.get("rpc_id") == rpc_id:
if "error" in result:
raise RuntimeError(f"collective_rpc failed: {result['error']}")
return result["result"]

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge collective_rpc drops unrelated out-queue messages

While waiting for a matching collective_rpc_result, this loop consumes any item from self._out_q via try_collect() and discards it if the type/rpc_id do not match. That permanently removes normal stage outputs that arrive while the RPC is pending, causing those requests to hang because their responses are lost. The code should buffer or requeue non-RPC messages instead of dropping them.

Useful? React with 👍 / 👎.

assert self._in_q is not None and self._out_q is not None, "Queues must be attached before collective_rpc"

# Submit collective_rpc task to worker
import uuid
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why we use lazy import here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, it should be a draft [WIP] but thanks for the review. Will fix it

@knlnguyen1802 knlnguyen1802 marked this pull request as draft December 18, 2025 03:40
Signed-off-by: knlnguyen1802 <[email protected]>
@knlnguyen1802 knlnguyen1802 marked this pull request as ready for review December 18, 2025 07:26
@knlnguyen1802
Copy link
Contributor Author

@ZJY0516 Could you have a look again thanks

@knlnguyen1802 knlnguyen1802 changed the title Try to do rpc RPC support for OmniLLM Dec 18, 2025
Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment on lines +526 to +528
try:
result = stage_engine.collective_rpc(method, timeout, args, kwargs)
out_q.put({

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge collective_rpc task calls missing engine method

The worker loop dispatches collective RPCs to stage_engine.collective_rpc(...), but OmniStageLLM (and the rest of the repo—checked with rg "collective_rpc") defines no such method. Any collective_rpc task will therefore raise an AttributeError in the worker, propagate back as an error reply, and cause OmniStage.collective_rpc to raise a RuntimeError, leaving the new API unusable.

Useful? React with 👍 / 👎.

Comment on lines +374 to +378
result = self.try_collect()
if result is not None:
if result.get("type") == "collective_rpc_result":
if result.get("rpc_id") == rpc_id:
if "error" in result:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge RPC wait loop discards non-RPC outputs

While waiting for a matching collective_rpc_result, the new collective_rpc method pops any item from self._out_q via try_collect() and ignores everything that is not the target RPC result. That silently drops unrelated stage outputs (e.g., pending inference responses or status messages), so issuing a collective RPC while other work is in-flight will lose those messages and leave callers hanging without responses.

Useful? React with 👍 / 👎.

Signed-off-by: knlnguyen1802 <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[RFC]: Support more functionality other than generate [Feature][RL]: Support Model weight offload, reload and sync model weight & Offload DIT cache

2 participants