Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
cc89958
[Doc] Update installation instructions for vLLM and vLLM-Omni to vers…
tzhouam Dec 19, 2025
a726f76
[Refactor] Update tokenizer references in output processor and entry …
tzhouam Dec 19, 2025
6f55cd3
rename the --init-sleep-time to --stage-init-timeout to control the s…
tzhouam Dec 20, 2025
3b8787c
fix the bug that online timeout is not passed in
tzhouam Dec 20, 2025
5cb4759
correct the log
tzhouam Dec 20, 2025
dfaebd6
follow pre-commit
tzhouam Dec 20, 2025
90ac08d
Update examples/offline_inference/qwen3_omni/run_single_prompt_tp.sh
tzhouam Dec 20, 2025
d2fe522
Update vllm_omni/entrypoints/async_omni.py
tzhouam Dec 20, 2025
27c6782
Update vllm_omni/entrypoints/omni_llm.py
tzhouam Dec 20, 2025
5003d5a
unify the defaul value for stage init timeout
tzhouam Dec 20, 2025
54d008b
Merge branch 'dev/control-init-timeout' of https://github.com/tzhouam…
tzhouam Dec 20, 2025
154858e
remove out of block reference for wait_start
tzhouam Dec 20, 2025
eb92908
update the unit test _FakeStage as it accepts the stage_init_timeout …
tzhouam Dec 20, 2025
3cfa121
unify the name to stage_init_timeout
tzhouam Dec 22, 2025
9edda8c
Unify to the stage init timeout for the run_single_prompt_tp.sh to th…
tzhouam Dec 22, 2025
0d39365
Merge branch 'main' of https://github.com/tzhouam/vllm-omni into main
tzhouam Dec 23, 2025
0dd2912
Merge branch 'main' into dev/control-init-timeout
tzhouam Dec 23, 2025
6bd0d5b
Merge branch 'vllm-project:main' into main
tzhouam Dec 23, 2025
c0770f1
correct installation info
tzhouam Dec 23, 2025
751fe42
Merge branch 'vllm-project:main' into main
tzhouam Dec 27, 2025
603b6d4
Merge branch 'main' into dev/control-init-timeout
tzhouam Dec 27, 2025
b47702f
Add stage initialization timeout attribute to OmniStage class
tzhouam Dec 28, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions examples/offline_inference/qwen2_5_omni/end2end.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ def main(args):
model=model_name,
log_stats=args.enable_stats,
log_file=("omni_llm_pipeline.log" if args.enable_stats else None),
init_sleep_seconds=args.init_sleep_seconds,
stage_init_timeout=args.stage_init_timeout,
batch_timeout=args.batch_timeout,
init_timeout=args.init_timeout,
shm_threshold_bytes=args.shm_threshold_bytes,
Expand Down Expand Up @@ -427,10 +427,10 @@ def parse_args():
help="Enable writing detailed statistics (default: disabled)",
)
parser.add_argument(
"--init-sleep-seconds",
"--stage-init-timeout",
type=int,
default=20,
help="Sleep seconds after starting each stage process to allow initialization (default: 20)",
default=300,
help="Timeout for initializing a single stage in seconds (default: 300)",
)
parser.add_argument(
"--batch-timeout",
Expand Down
7 changes: 4 additions & 3 deletions examples/offline_inference/qwen3_omni/end2end.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ def main(args):
stage_configs_path=args.stage_configs_path,
log_file=log_file,
log_stats=args.enable_stats,
stage_init_timeout=args.stage_init_timeout,
)

thinker_sampling_params = SamplingParams(
Expand Down Expand Up @@ -293,10 +294,10 @@ def parse_args():
help="Enable writing detailed statistics (default: disabled)",
)
parser.add_argument(
"--init-sleep-seconds",
"--stage-init-timeout",
type=int,
default=20,
help="Sleep seconds after starting each stage process to allow initialization (default: 20)",
default=300,
help="Timeout for initializing a single stage in seconds (default: 300)",
)
parser.add_argument(
"--batch-timeout",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
python end2end.py --output-wav output_audio \
--query-type use_audio \
--init-sleep-seconds 90
--stage-init-timeout 300

# init-sleep-seconds works to avoid two vLLM stages initialized at the same time within a card.
# stage-init-timeout sets the maximum wait to avoid two vLLM stages initializing at the same time on the same card.
6 changes: 3 additions & 3 deletions tests/e2e/offline_inference/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def __init__(
self,
model_name: str,
seed: int = 42,
init_sleep_seconds: int = 20,
stage_init_timeout: int = 300,
batch_timeout: int = 10,
init_timeout: int = 300,
shm_threshold_bytes: int = 65536,
Expand All @@ -40,7 +40,7 @@ def __init__(
Args:
model_name: The model name or path
seed: Random seed for reproducibility
init_sleep_seconds: Sleep time after starting each stage
stage_init_timeout: Timeout for initializing a single stage in seconds
batch_timeout: Timeout for batching in seconds
init_timeout: Timeout for initializing stages in seconds
shm_threshold_bytes: Threshold for using shared memory
Expand All @@ -54,7 +54,7 @@ def __init__(
self.omni = Omni(
model=model_name,
log_stats=log_stats,
init_sleep_seconds=init_sleep_seconds,
stage_init_timeout=stage_init_timeout,
batch_timeout=batch_timeout,
init_timeout=init_timeout,
shm_threshold_bytes=shm_threshold_bytes,
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/offline_inference/test_qwen3_omni.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
def test_video_to_audio(omni_runner: type[OmniRunner], test_config) -> None:
"""Test processing video, generating audio output."""
model, stage_config_path = test_config
with omni_runner(model, seed=42, stage_configs_path=stage_config_path, init_sleep_seconds=90) as runner:
with omni_runner(model, seed=42, stage_configs_path=stage_config_path, stage_init_timeout=300) as runner:
# Prepare inputs
question = "Describe the video briefly."
video = VideoAsset(name="baby_reading", num_frames=4).np_ndarrays
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/online_serving/test_qwen3_omni.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def omni_server(request):
Multi-stage initialization can take 10-20+ minutes.
"""
model, stage_config_path = request.param
with OmniServer(model, ["--stage-configs-path", stage_config_path, "--init-sleep-seconds", "90"]) as server:
with OmniServer(model, ["--stage-configs-path", stage_config_path, "--stage-init-timeout", "90"]) as server:
yield server


Expand Down
41 changes: 18 additions & 23 deletions tests/entrypoints/test_omni_llm.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@
from unittest.mock import MagicMock

import pytest
from vllm.sampling_params import SamplingParams

from vllm_omni.entrypoints.stage_utils import _to_dict

# Suppress noisy DeprecationWarnings from optional Swig bindings imported by vLLM dependencies.
warnings.filterwarnings(
Expand Down Expand Up @@ -71,7 +68,7 @@ def empty(self):
class _FakeStage:
"""Lightweight Stage stub for multi-process pipeline version with queue support."""

def __init__(self, config):
def __init__(self, config, stage_init_timeout: int = 300):
# Handle both dict and object configs
if isinstance(config, dict):
config = _FakeStageConfig(config)
Expand All @@ -95,9 +92,7 @@ def __init__(self, config):
self._in_q = None
self._out_q = None
self._proc = None # Mock process reference

default_sampling_params = getattr(config, "default_sampling_params", {})
self.default_sampling_params = SamplingParams(**_to_dict(default_sampling_params))
self._stage_init_timeout = max(0, int(stage_init_timeout))

def attach_queues(self, in_q, out_q):
"""Attach input and output queues."""
Expand Down Expand Up @@ -502,7 +497,7 @@ def _fake_loader(model: str):
# Replace OmniStage
monkeypatch.setattr(
"vllm_omni.entrypoints.omni_stage.OmniStage",
lambda cfg: _FakeStage(cfg),
lambda cfg, **kwargs: _FakeStage(cfg, **kwargs),
raising=False,
)

Expand All @@ -511,7 +506,7 @@ def _fake_loader(model: str):

# Patch the imported function and class in the module
monkeypatch.setattr(omni_module, "load_stage_configs_from_model", _fake_loader)
monkeypatch.setattr(omni_module, "OmniStage", lambda cfg: _FakeStage(cfg))
monkeypatch.setattr(omni_module, "OmniStage", lambda cfg, **kwargs: _FakeStage(cfg, **kwargs))

from vllm_omni.entrypoints.omni import Omni

Expand Down Expand Up @@ -571,14 +566,14 @@ def _fake_loader(model: str):
)
monkeypatch.setattr(
"vllm_omni.entrypoints.omni_stage.OmniStage",
lambda cfg: _FakeStage(cfg),
lambda cfg, **kwargs: _FakeStage(cfg, **kwargs),
raising=False,
)

import vllm_omni.entrypoints.omni as omni_module

monkeypatch.setattr(omni_module, "load_stage_configs_from_model", _fake_loader)
monkeypatch.setattr(omni_module, "OmniStage", lambda cfg: _FakeStage(cfg))
monkeypatch.setattr(omni_module, "OmniStage", lambda cfg, **kwargs: _FakeStage(cfg, **kwargs))

from vllm_omni.entrypoints.omni import Omni

Expand Down Expand Up @@ -630,14 +625,14 @@ def _fake_loader(model: str):
)
monkeypatch.setattr(
"vllm_omni.entrypoints.omni_stage.OmniStage",
lambda cfg: _FakeStage(cfg),
lambda cfg, **kwargs: _FakeStage(cfg, **kwargs),
raising=False,
)

import vllm_omni.entrypoints.omni as omni_module

monkeypatch.setattr(omni_module, "load_stage_configs_from_model", _fake_loader)
monkeypatch.setattr(omni_module, "OmniStage", lambda cfg: _FakeStage(cfg))
monkeypatch.setattr(omni_module, "OmniStage", lambda cfg, **kwargs: _FakeStage(cfg, **kwargs))

# Mock uuid.uuid4() to return a predictable value for request ID generation
test_uuid = uuid.UUID("00000000-0000-0000-0000-000000000000")
Expand Down Expand Up @@ -730,14 +725,14 @@ def _fake_loader(model: str):
)
monkeypatch.setattr(
"vllm_omni.entrypoints.omni_stage.OmniStage",
lambda cfg: _FakeStage(cfg),
lambda cfg, **kwargs: _FakeStage(cfg, **kwargs),
raising=False,
)

import vllm_omni.entrypoints.omni as omni_module

monkeypatch.setattr(omni_module, "load_stage_configs_from_model", _fake_loader)
monkeypatch.setattr(omni_module, "OmniStage", lambda cfg: _FakeStage(cfg))
monkeypatch.setattr(omni_module, "OmniStage", lambda cfg, **kwargs: _FakeStage(cfg, **kwargs))

# Mock uuid.uuid4() to return a predictable value for request ID generation
test_uuid = uuid.UUID("00000000-0000-0000-0000-000000000000")
Expand Down Expand Up @@ -811,14 +806,14 @@ def _fake_loader(model: str):
)
monkeypatch.setattr(
"vllm_omni.entrypoints.omni_stage.OmniStage",
lambda cfg: _FakeStage(cfg),
lambda cfg, **kwargs: _FakeStage(cfg, **kwargs),
raising=False,
)

import vllm_omni.entrypoints.omni as omni_module

monkeypatch.setattr(omni_module, "load_stage_configs_from_model", _fake_loader)
monkeypatch.setattr(omni_module, "OmniStage", lambda cfg: _FakeStage(cfg))
monkeypatch.setattr(omni_module, "OmniStage", lambda cfg, **kwargs: _FakeStage(cfg, **kwargs))

# Mock uuid.uuid4() to return a predictable value for request ID generation
test_uuid = uuid.UUID("00000000-0000-0000-0000-000000000000")
Expand Down Expand Up @@ -897,14 +892,14 @@ def init_stage_worker(self, *args, **kwargs):

monkeypatch.setattr(
"vllm_omni.entrypoints.omni_stage.OmniStage",
lambda cfg: _FakeStageNoReady(cfg),
lambda cfg, **kwargs: _FakeStageNoReady(cfg, **kwargs),
raising=False,
)

import vllm_omni.entrypoints.omni as omni_module

monkeypatch.setattr(omni_module, "load_stage_configs_from_model", _fake_loader)
monkeypatch.setattr(omni_module, "OmniStage", lambda cfg: _FakeStageNoReady(cfg))
monkeypatch.setattr(omni_module, "OmniStage", lambda cfg, **kwargs: _FakeStageNoReady(cfg, **kwargs))

from vllm_omni.entrypoints.omni import Omni

Expand Down Expand Up @@ -949,14 +944,14 @@ def _fake_loader(model: str):
)
monkeypatch.setattr(
"vllm_omni.entrypoints.omni_stage.OmniStage",
lambda cfg: _FakeStage(cfg),
lambda cfg, **kwargs: _FakeStage(cfg, **kwargs),
raising=False,
)

import vllm_omni.entrypoints.omni as omni_module

monkeypatch.setattr(omni_module, "load_stage_configs_from_model", _fake_loader)
monkeypatch.setattr(omni_module, "OmniStage", lambda cfg: _FakeStage(cfg))
monkeypatch.setattr(omni_module, "OmniStage", lambda cfg, **kwargs: _FakeStage(cfg, **kwargs))

# Mock uuid.uuid4() to return a predictable value for request ID generation
test_uuid = uuid.UUID("00000000-0000-0000-0000-000000000000")
Expand Down Expand Up @@ -1037,14 +1032,14 @@ def _fake_loader(model: str):
)
monkeypatch.setattr(
"vllm_omni.entrypoints.omni_stage.OmniStage",
lambda cfg: _FakeStage(cfg),
lambda cfg, **kwargs: _FakeStage(cfg, **kwargs),
raising=False,
)

import vllm_omni.entrypoints.omni as omni_module

monkeypatch.setattr(omni_module, "load_stage_configs_from_model", _fake_loader)
monkeypatch.setattr(omni_module, "OmniStage", lambda cfg: _FakeStage(cfg))
monkeypatch.setattr(omni_module, "OmniStage", lambda cfg, **kwargs: _FakeStage(cfg, **kwargs))

from vllm_omni.entrypoints.omni import Omni

Expand Down
12 changes: 5 additions & 7 deletions vllm_omni/entrypoints/async_omni.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,7 @@ class AsyncOmni:
configurations. If None, configurations are loaded from the model.
- log_stats: Whether to enable statistics logging
be written to files with stage-specific suffixes.
- init_sleep_seconds: Number of seconds to sleep between starting
each stage process during initialization
- stage_init_timeout: Timeout for initializing a single stage in seconds.
- shm_threshold_bytes: Threshold in bytes for using shared memory
for IPC. Objects larger than this threshold will use shared memory.
- batch_timeout: Timeout in seconds for batching requests within a stage
Expand Down Expand Up @@ -642,7 +641,7 @@ async def _monitor_stage_outputs_async(self, stage_id: int, stage: OmniStage) ->
def _initialize_stages(
self,
model: str,
init_sleep_seconds: int,
stage_init_timeout: int,
shm_threshold_bytes: int,
init_timeout: int,
) -> None:
Expand All @@ -651,7 +650,7 @@ def _initialize_stages(
# Build OmniStage instances in parallel, preserve original order
def _build_stage(idx_cfg: tuple[int, Any]) -> tuple[int, OmniStage]:
idx, cfg = idx_cfg
return idx, OmniStage(cfg)
return idx, OmniStage(cfg, stage_init_timeout=stage_init_timeout)

with ThreadPoolExecutor(max_workers=min(len(self.stage_configs), max(1, os.cpu_count() or 1))) as executor:
futures = [executor.submit(_build_stage, (idx, cfg)) for idx, cfg in enumerate(self.stage_configs)]
Expand All @@ -672,7 +671,7 @@ def _build_stage(idx_cfg: tuple[int, Any]) -> tuple[int, OmniStage]:

self._stage_in_queues: list[mp.Queue] = []
self._stage_out_queues: list[mp.Queue] = []
self._init_sleep_seconds = max(0, int(init_sleep_seconds))
self._stage_init_timeout = max(0, int(stage_init_timeout))
self._shm_threshold_bytes = max(0, int(shm_threshold_bytes))
self._start_stages(model)
# Wait for all stages to report readiness before seeding
Expand Down Expand Up @@ -714,7 +713,6 @@ def _start_stages(self, model: str) -> None:
)

logger.debug("[Orchestrator] Stage-%s process started", stage_id)
time.sleep(self._init_sleep_seconds)

def close(self) -> None:
"""Close all stage processes and clean up resources.
Expand Down Expand Up @@ -1041,7 +1039,7 @@ def _wait_for_stages_ready(self, timeout: int = 120) -> None:
correct.",
"Check GPU/host memory availability; reduce model or batch size if needed.", # noqa: E501
"Check model weights path and network reachability (if loading remotely).", # noqa: E501
"Increase initialization wait time (init_sleep_seconds or \
"Increase initialization wait time (stage_init_timeout or \
call-site timeout).",
]
logger.error(
Expand Down
6 changes: 3 additions & 3 deletions vllm_omni/entrypoints/cli/serve.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,10 +82,10 @@ def subparser_init(self, subparsers: argparse._SubParsersAction) -> FlexibleArgu
help="Path to the stage configs file. If not specified, the stage configs will be loaded from the model.",
)
serve_parser.add_argument(
"--init-sleep-seconds",
"--stage-init-timeout",
type=int,
default=30,
help="The number of seconds to sleep before initializing the stages.",
default=300,
help="The timeout for initializing a single stage in seconds (default: 300)",
)
serve_parser.add_argument(
"--init-timeout",
Expand Down
Loading