From 3070c9efe2aa05d08366d9796656226cdc8150e6 Mon Sep 17 00:00:00 2001 From: knlnguyen1802 Date: Fri, 19 Dec 2025 12:00:07 +0800 Subject: [PATCH 1/6] Fix pre-commit Signed-off-by: knlnguyen1802 --- vllm_omni/diffusion/data.py | 3 ++ vllm_omni/diffusion/worker/gpu_worker.py | 54 ++++++++++++++++++++++++ 2 files changed, 57 insertions(+) diff --git a/vllm_omni/diffusion/data.py b/vllm_omni/diffusion/data.py index 8210bedab..5958fb6d3 100644 --- a/vllm_omni/diffusion/data.py +++ b/vllm_omni/diffusion/data.py @@ -244,6 +244,9 @@ class OmniDiffusionConfig: # Compilation enable_torch_compile: bool = False + # Enable sleep mode + enable_sleep_mode: bool = False + disable_autocast: bool = False # VSA parameters diff --git a/vllm_omni/diffusion/worker/gpu_worker.py b/vllm_omni/diffusion/worker/gpu_worker.py index 868611841..cfe2f3aca 100644 --- a/vllm_omni/diffusion/worker/gpu_worker.py +++ b/vllm_omni/diffusion/worker/gpu_worker.py @@ -3,6 +3,8 @@ import multiprocessing as mp import os import time +from collections.abc import Iterable +from contextlib import AbstractContextManager, nullcontext import torch import zmq @@ -107,6 +109,58 @@ def execute_model(self, reqs: list[OmniDiffusionRequest], od_config: OmniDiffusi output = self.pipeline.forward(req) return output + def load_weights(self, weights: Iterable[tuple[str, torch.Tensor]]) -> set[str]: + return self.pipeline.loaded_weights(weights) + + def sleep(self, level: int = 1) -> bool: + from vllm.device_allocator.cumem import CuMemAllocator + + free_bytes_before_sleep = torch.cuda.mem_get_info()[0] + + # Save the buffers before level 2 sleep + if level == 2: + model = self.pipeline + self._sleep_saved_buffers = {name: buffer.cpu().clone() for name, buffer in model.named_buffers()} + + allocator = CuMemAllocator.get_instance() + allocator.sleep(offload_tags=("weights",) if level == 1 else tuple()) + free_bytes_after_sleep, total = torch.cuda.mem_get_info() + freed_bytes = free_bytes_after_sleep - free_bytes_before_sleep + used_bytes = total - free_bytes_after_sleep + assert freed_bytes >= 0, "Memory usage increased after sleeping." + logger.info( + "Sleep mode freed %.2f GiB memory, %.2f GiB memory is still in use.", + freed_bytes / GiB_bytes, + used_bytes / GiB_bytes, + ) + return True + + def wake_up(self, tags: list[str] | None = None) -> bool: + from vllm.device_allocator.cumem import CuMemAllocator + + allocator = CuMemAllocator.get_instance() + allocator.wake_up(tags) + + # Restore the buffers after level 2 sleep + if len(self._sleep_saved_buffers): + model = self.pipeline + for name, buffer in model.named_buffers(): + if name in self._sleep_saved_buffers: + buffer.data.copy_(self._sleep_saved_buffers[name].data) + self._sleep_saved_buffers = {} + return True + + def _maybe_get_memory_pool_context(self, tag: str) -> AbstractContextManager: + if self.od_config.enable_sleep_mode: + from vllm.device_allocator.cumem import CuMemAllocator + + allocator = CuMemAllocator.get_instance() + if tag == "weights": + assert allocator.get_current_usage() == 0, "Sleep mode can only be used for one instance per process." + return allocator.use_memory_pool(tag=tag) + else: + return nullcontext() + def shutdown(self) -> None: if torch.distributed.is_initialized(): try: From db68abfca0fc61012a89939245325a82272a7cec Mon Sep 17 00:00:00 2001 From: knlnguyen1802 Date: Fri, 19 Dec 2025 13:31:47 +0800 Subject: [PATCH 2/6] Fix code Signed-off-by: knlnguyen1802 --- vllm_omni/diffusion/worker/gpu_worker.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/vllm_omni/diffusion/worker/gpu_worker.py b/vllm_omni/diffusion/worker/gpu_worker.py index cfe2f3aca..6726a0fa1 100644 --- a/vllm_omni/diffusion/worker/gpu_worker.py +++ b/vllm_omni/diffusion/worker/gpu_worker.py @@ -44,7 +44,7 @@ def __init__( self.rank = rank self.od_config = od_config self.pipeline = None - + self._sleep_saved_buffers: dict[str, torch.Tensor] = {} self.init_device_and_model() def init_device_and_model(self) -> None: @@ -73,11 +73,12 @@ def init_device_and_model(self) -> None: load_config = LoadConfig() model_loader = DiffusersPipelineLoader(load_config) time_before_load = time.perf_counter() - with DeviceMemoryProfiler() as m: - self.pipeline = model_loader.load_model( - od_config=self.od_config, - load_device=f"cuda:{rank}", - ) + with self._maybe_get_memory_pool_context(tag="weights"): + with DeviceMemoryProfiler() as m: + self.pipeline = model_loader.load_model( + od_config=self.od_config, + load_device=f"cuda:{rank}", + ) time_after_load = time.perf_counter() logger.info( From 28b71954cc10c485003e3a7b55901399f01cf965 Mon Sep 17 00:00:00 2001 From: knlnguyen1802 Date: Mon, 22 Dec 2025 11:02:39 +0800 Subject: [PATCH 3/6] Add test Signed-off-by: knlnguyen1802 --- tests/diffusion/test_gpu_worker.py | 255 +++++++++++++++++++++++ vllm_omni/diffusion/worker/gpu_worker.py | 2 +- 2 files changed, 256 insertions(+), 1 deletion(-) create mode 100644 tests/diffusion/test_gpu_worker.py diff --git a/tests/diffusion/test_gpu_worker.py b/tests/diffusion/test_gpu_worker.py new file mode 100644 index 000000000..00972d5d1 --- /dev/null +++ b/tests/diffusion/test_gpu_worker.py @@ -0,0 +1,255 @@ +# SPDX-License-Identifier: Apache-2.0 +# SPDX-FileCopyrightText: Copyright contributors to the vLLM project + +""" +Unit tests for GPUWorker class. + +This module tests the GPUWorker implementation: +- load_weights: Loading model weights +- sleep: Putting worker into sleep mode (levels 1 and 2) +- wake_up: Waking worker from sleep mode +""" + +from unittest.mock import Mock, MagicMock, patch + +import pytest +import torch + +from vllm_omni.diffusion.worker.gpu_worker import GPUWorker + + +@pytest.fixture +def mock_od_config(): + """Create a mock OmniDiffusionConfig.""" + config = Mock() + config.num_gpus = 1 + config.master_port = 12345 + config.enable_sleep_mode = False + config.cache_backend = None + config.cache_config = None + config.model = "test-model" + return config + + +@pytest.fixture +def mock_gpu_worker(mock_od_config): + """Create a GPUWorker with mocked initialization.""" + with patch.object(GPUWorker, 'init_device_and_model'): + worker = GPUWorker(local_rank=0, rank=0, od_config=mock_od_config) + # Mock the pipeline + worker.pipeline = Mock() + worker.cache_backend = None + return worker + + +class TestGPUWorkerLoadWeights: + """Test GPUWorker.load_weights method.""" + + def test_load_weights_calls_pipeline(self, mock_gpu_worker): + """Test that load_weights delegates to pipeline.load_weights.""" + # Setup mock weights + mock_weights = [ + ("layer1.weight", torch.randn(10, 10)), + ("layer2.weight", torch.randn(20, 20)), + ] + expected_loaded = {"layer1.weight", "layer2.weight"} + + # Configure pipeline mock + mock_gpu_worker.pipeline.load_weights = Mock(return_value=expected_loaded) + + # Call load_weights + result = mock_gpu_worker.load_weights(mock_weights) + + # Verify pipeline.load_weights was called with the weights + mock_gpu_worker.pipeline.load_weights.assert_called_once_with(mock_weights) + assert result == expected_loaded + + def test_load_weights_empty_iterable(self, mock_gpu_worker): + """Test load_weights with empty weights iterable.""" + mock_gpu_worker.pipeline.load_weights = Mock(return_value=set()) + + result = mock_gpu_worker.load_weights([]) + + mock_gpu_worker.pipeline.load_weights.assert_called_once_with([]) + assert result == set() + + +class TestGPUWorkerSleep: + """Test GPUWorker.sleep method.""" + + @patch('vllm_omni.diffusion.worker.gpu_worker.torch.cuda.mem_get_info') + @patch('vllm_omni.diffusion.worker.gpu_worker.CuMemAllocator') + def test_sleep_level_1(self, mock_allocator_class, mock_mem_info, mock_gpu_worker): + """Test sleep mode level 1 (offload weights only).""" + # Setup memory info mocks + # Before sleep: 1GB free, 8GB total + # After sleep: 3GB free, 8GB total (freed 2GB) + mock_mem_info.side_effect = [ + (1 * 1024**3, 8 * 1024**3), # Before sleep + (3 * 1024**3, 8 * 1024**3), # After sleep + ] + + # Setup allocator mock + mock_allocator = Mock() + mock_allocator_class.get_instance = Mock(return_value=mock_allocator) + mock_allocator.sleep = Mock() + + # Call sleep with level 1 + result = mock_gpu_worker.sleep(level=1) + + # Verify sleep was called with correct tags + mock_allocator.sleep.assert_called_once_with(offload_tags=("weights",)) + assert result is True + # Verify buffers were NOT saved (level 1 doesn't save buffers) + assert len(mock_gpu_worker._sleep_saved_buffers) == 0 + + @patch('vllm_omni.diffusion.worker.gpu_worker.torch.cuda.mem_get_info') + @patch('vllm_omni.diffusion.worker.gpu_worker.CuMemAllocator') + def test_sleep_level_2(self, mock_allocator_class, mock_mem_info, mock_gpu_worker): + """Test sleep mode level 2 (offload all, save buffers).""" + # Setup memory info mocks + mock_mem_info.side_effect = [ + (1 * 1024**3, 8 * 1024**3), # Before sleep + (5 * 1024**3, 8 * 1024**3), # After sleep (freed 4GB) + ] + + # Setup allocator mock + mock_allocator = Mock() + mock_allocator_class.get_instance = Mock(return_value=mock_allocator) + mock_allocator.sleep = Mock() + + # Mock pipeline buffers + mock_buffer1 = torch.randn(10, 10) + mock_buffer2 = torch.randn(20, 20) + mock_gpu_worker.pipeline.named_buffers = Mock(return_value=[ + ("buffer1", mock_buffer1), + ("buffer2", mock_buffer2), + ]) + + # Call sleep with level 2 + result = mock_gpu_worker.sleep(level=2) + + # Verify sleep was called with empty tags (offload all) + mock_allocator.sleep.assert_called_once_with(offload_tags=tuple()) + assert result is True + + # Verify buffers were saved + assert len(mock_gpu_worker._sleep_saved_buffers) == 2 + assert "buffer1" in mock_gpu_worker._sleep_saved_buffers + assert "buffer2" in mock_gpu_worker._sleep_saved_buffers + + @patch('vllm_omni.diffusion.worker.gpu_worker.torch.cuda.mem_get_info') + @patch('vllm_omni.diffusion.worker.gpu_worker.CuMemAllocator') + def test_sleep_memory_freed_validation(self, mock_allocator_class, mock_mem_info, mock_gpu_worker): + """Test that sleep validates memory was actually freed.""" + # Simulate memory increase (should trigger assertion error) + mock_mem_info.side_effect = [ + (3 * 1024**3, 8 * 1024**3), # Before sleep: 3GB free + (1 * 1024**3, 8 * 1024**3), # After sleep: 1GB free (negative freed!) + ] + + mock_allocator = Mock() + mock_allocator_class.get_instance = Mock(return_value=mock_allocator) + mock_allocator.sleep = Mock() + + # This should raise an assertion error + with pytest.raises(AssertionError, match="Memory usage increased after sleeping"): + mock_gpu_worker.sleep(level=1) + + +class TestGPUWorkerWakeUp: + """Test GPUWorker.wake_up method.""" + + @patch('vllm_omni.diffusion.worker.gpu_worker.CuMemAllocator') + def test_wake_up_without_buffers(self, mock_allocator_class, mock_gpu_worker): + """Test wake_up without saved buffers (level 1 sleep).""" + # Setup allocator mock + mock_allocator = Mock() + mock_allocator_class.get_instance = Mock(return_value=mock_allocator) + mock_allocator.wake_up = Mock() + + # Ensure no saved buffers + mock_gpu_worker._sleep_saved_buffers = {} + + # Call wake_up + result = mock_gpu_worker.wake_up(tags=["weights"]) + + # Verify allocator.wake_up was called + mock_allocator.wake_up.assert_called_once_with(["weights"]) + assert result is True + + @patch('vllm_omni.diffusion.worker.gpu_worker.CuMemAllocator') + def test_wake_up_with_buffers(self, mock_allocator_class, mock_gpu_worker): + """Test wake_up with saved buffers (level 2 sleep).""" + # Setup allocator mock + mock_allocator = Mock() + mock_allocator_class.get_instance = Mock(return_value=mock_allocator) + mock_allocator.wake_up = Mock() + + # Create saved buffers + saved_buffer1 = torch.randn(10, 10) + saved_buffer2 = torch.randn(20, 20) + mock_gpu_worker._sleep_saved_buffers = { + "buffer1": saved_buffer1, + "buffer2": saved_buffer2, + } + + # Mock pipeline buffers (these will be restored) + mock_buffer1 = Mock() + mock_buffer1.data = Mock() + mock_buffer2 = Mock() + mock_buffer2.data = Mock() + + mock_gpu_worker.pipeline.named_buffers = Mock(return_value=[ + ("buffer1", mock_buffer1), + ("buffer2", mock_buffer2), + ]) + + # Call wake_up + result = mock_gpu_worker.wake_up(tags=None) + + # Verify allocator.wake_up was called + mock_allocator.wake_up.assert_called_once_with(None) + + # Verify buffers were restored + mock_buffer1.data.copy_.assert_called_once() + mock_buffer2.data.copy_.assert_called_once() + + # Verify saved buffers were cleared + assert len(mock_gpu_worker._sleep_saved_buffers) == 0 + assert result is True + + @patch('vllm_omni.diffusion.worker.gpu_worker.CuMemAllocator') + def test_wake_up_partial_buffer_restore(self, mock_allocator_class, mock_gpu_worker): + """Test wake_up only restores buffers that were saved.""" + # Setup allocator mock + mock_allocator = Mock() + mock_allocator_class.get_instance = Mock(return_value=mock_allocator) + mock_allocator.wake_up = Mock() + + # Only save buffer1, not buffer2 + saved_buffer1 = torch.randn(10, 10) + mock_gpu_worker._sleep_saved_buffers = { + "buffer1": saved_buffer1, + } + + # Mock pipeline has both buffers + mock_buffer1 = Mock() + mock_buffer1.data = Mock() + mock_buffer2 = Mock() + mock_buffer2.data = Mock() + + mock_gpu_worker.pipeline.named_buffers = Mock(return_value=[ + ("buffer1", mock_buffer1), + ("buffer2", mock_buffer2), + ]) + + # Call wake_up + result = mock_gpu_worker.wake_up() + + # Verify only buffer1 was restored + mock_buffer1.data.copy_.assert_called_once() + # buffer2 should NOT be restored since it wasn't saved + mock_buffer2.data.copy_.assert_not_called() + + assert result is True diff --git a/vllm_omni/diffusion/worker/gpu_worker.py b/vllm_omni/diffusion/worker/gpu_worker.py index 6726a0fa1..ee78f4357 100644 --- a/vllm_omni/diffusion/worker/gpu_worker.py +++ b/vllm_omni/diffusion/worker/gpu_worker.py @@ -111,7 +111,7 @@ def execute_model(self, reqs: list[OmniDiffusionRequest], od_config: OmniDiffusi return output def load_weights(self, weights: Iterable[tuple[str, torch.Tensor]]) -> set[str]: - return self.pipeline.loaded_weights(weights) + return self.pipeline.load_weights(weights) def sleep(self, level: int = 1) -> bool: from vllm.device_allocator.cumem import CuMemAllocator From 72a15d3b6025a2c71d06456241ad547b39042280 Mon Sep 17 00:00:00 2001 From: knlnguyen1802 Date: Mon, 22 Dec 2025 11:10:34 +0800 Subject: [PATCH 4/6] Fix name and add test Signed-off-by: knlnguyen1802 --- tests/diffusion/test_gpu_worker.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/tests/diffusion/test_gpu_worker.py b/tests/diffusion/test_gpu_worker.py index 00972d5d1..ae0b3cf86 100644 --- a/tests/diffusion/test_gpu_worker.py +++ b/tests/diffusion/test_gpu_worker.py @@ -78,7 +78,6 @@ class TestGPUWorkerSleep: """Test GPUWorker.sleep method.""" @patch('vllm_omni.diffusion.worker.gpu_worker.torch.cuda.mem_get_info') - @patch('vllm_omni.diffusion.worker.gpu_worker.CuMemAllocator') def test_sleep_level_1(self, mock_allocator_class, mock_mem_info, mock_gpu_worker): """Test sleep mode level 1 (offload weights only).""" # Setup memory info mocks @@ -104,7 +103,7 @@ def test_sleep_level_1(self, mock_allocator_class, mock_mem_info, mock_gpu_worke assert len(mock_gpu_worker._sleep_saved_buffers) == 0 @patch('vllm_omni.diffusion.worker.gpu_worker.torch.cuda.mem_get_info') - @patch('vllm_omni.diffusion.worker.gpu_worker.CuMemAllocator') + @patch('vllm.device_allocator.cumem.CuMemAllocator') def test_sleep_level_2(self, mock_allocator_class, mock_mem_info, mock_gpu_worker): """Test sleep mode level 2 (offload all, save buffers).""" # Setup memory info mocks @@ -139,7 +138,7 @@ def test_sleep_level_2(self, mock_allocator_class, mock_mem_info, mock_gpu_worke assert "buffer2" in mock_gpu_worker._sleep_saved_buffers @patch('vllm_omni.diffusion.worker.gpu_worker.torch.cuda.mem_get_info') - @patch('vllm_omni.diffusion.worker.gpu_worker.CuMemAllocator') + @patch('vllm.device_allocator.cumem.CuMemAllocator') def test_sleep_memory_freed_validation(self, mock_allocator_class, mock_mem_info, mock_gpu_worker): """Test that sleep validates memory was actually freed.""" # Simulate memory increase (should trigger assertion error) @@ -160,7 +159,7 @@ def test_sleep_memory_freed_validation(self, mock_allocator_class, mock_mem_info class TestGPUWorkerWakeUp: """Test GPUWorker.wake_up method.""" - @patch('vllm_omni.diffusion.worker.gpu_worker.CuMemAllocator') + @patch('vllm.device_allocator.cumem.CuMemAllocator') def test_wake_up_without_buffers(self, mock_allocator_class, mock_gpu_worker): """Test wake_up without saved buffers (level 1 sleep).""" # Setup allocator mock @@ -178,7 +177,7 @@ def test_wake_up_without_buffers(self, mock_allocator_class, mock_gpu_worker): mock_allocator.wake_up.assert_called_once_with(["weights"]) assert result is True - @patch('vllm_omni.diffusion.worker.gpu_worker.CuMemAllocator') + @patch('vllm.device_allocator.cumem.CuMemAllocator') def test_wake_up_with_buffers(self, mock_allocator_class, mock_gpu_worker): """Test wake_up with saved buffers (level 2 sleep).""" # Setup allocator mock @@ -219,7 +218,7 @@ def test_wake_up_with_buffers(self, mock_allocator_class, mock_gpu_worker): assert len(mock_gpu_worker._sleep_saved_buffers) == 0 assert result is True - @patch('vllm_omni.diffusion.worker.gpu_worker.CuMemAllocator') + @patch('vllm.device_allocator.cumem.CuMemAllocator') def test_wake_up_partial_buffer_restore(self, mock_allocator_class, mock_gpu_worker): """Test wake_up only restores buffers that were saved.""" # Setup allocator mock From 728162be34bf42463470577995b31c3126c39b0a Mon Sep 17 00:00:00 2001 From: knlnguyen1802 Date: Mon, 22 Dec 2025 11:11:50 +0800 Subject: [PATCH 5/6] Fix pre-commit Signed-off-by: knlnguyen1802 --- tests/diffusion/test_gpu_worker.py | 112 +++++++++++++++-------------- 1 file changed, 59 insertions(+), 53 deletions(-) diff --git a/tests/diffusion/test_gpu_worker.py b/tests/diffusion/test_gpu_worker.py index ae0b3cf86..1e9571813 100644 --- a/tests/diffusion/test_gpu_worker.py +++ b/tests/diffusion/test_gpu_worker.py @@ -10,7 +10,7 @@ - wake_up: Waking worker from sleep mode """ -from unittest.mock import Mock, MagicMock, patch +from unittest.mock import Mock, patch import pytest import torch @@ -34,7 +34,7 @@ def mock_od_config(): @pytest.fixture def mock_gpu_worker(mock_od_config): """Create a GPUWorker with mocked initialization.""" - with patch.object(GPUWorker, 'init_device_and_model'): + with patch.object(GPUWorker, "init_device_and_model"): worker = GPUWorker(local_rank=0, rank=0, od_config=mock_od_config) # Mock the pipeline worker.pipeline = Mock() @@ -53,13 +53,13 @@ def test_load_weights_calls_pipeline(self, mock_gpu_worker): ("layer2.weight", torch.randn(20, 20)), ] expected_loaded = {"layer1.weight", "layer2.weight"} - + # Configure pipeline mock mock_gpu_worker.pipeline.load_weights = Mock(return_value=expected_loaded) - + # Call load_weights result = mock_gpu_worker.load_weights(mock_weights) - + # Verify pipeline.load_weights was called with the weights mock_gpu_worker.pipeline.load_weights.assert_called_once_with(mock_weights) assert result == expected_loaded @@ -67,9 +67,9 @@ def test_load_weights_calls_pipeline(self, mock_gpu_worker): def test_load_weights_empty_iterable(self, mock_gpu_worker): """Test load_weights with empty weights iterable.""" mock_gpu_worker.pipeline.load_weights = Mock(return_value=set()) - + result = mock_gpu_worker.load_weights([]) - + mock_gpu_worker.pipeline.load_weights.assert_called_once_with([]) assert result == set() @@ -77,7 +77,7 @@ def test_load_weights_empty_iterable(self, mock_gpu_worker): class TestGPUWorkerSleep: """Test GPUWorker.sleep method.""" - @patch('vllm_omni.diffusion.worker.gpu_worker.torch.cuda.mem_get_info') + @patch("vllm_omni.diffusion.worker.gpu_worker.torch.cuda.mem_get_info") def test_sleep_level_1(self, mock_allocator_class, mock_mem_info, mock_gpu_worker): """Test sleep mode level 1 (offload weights only).""" # Setup memory info mocks @@ -87,23 +87,23 @@ def test_sleep_level_1(self, mock_allocator_class, mock_mem_info, mock_gpu_worke (1 * 1024**3, 8 * 1024**3), # Before sleep (3 * 1024**3, 8 * 1024**3), # After sleep ] - + # Setup allocator mock mock_allocator = Mock() mock_allocator_class.get_instance = Mock(return_value=mock_allocator) mock_allocator.sleep = Mock() - + # Call sleep with level 1 result = mock_gpu_worker.sleep(level=1) - + # Verify sleep was called with correct tags mock_allocator.sleep.assert_called_once_with(offload_tags=("weights",)) assert result is True # Verify buffers were NOT saved (level 1 doesn't save buffers) assert len(mock_gpu_worker._sleep_saved_buffers) == 0 - @patch('vllm_omni.diffusion.worker.gpu_worker.torch.cuda.mem_get_info') - @patch('vllm.device_allocator.cumem.CuMemAllocator') + @patch("vllm_omni.diffusion.worker.gpu_worker.torch.cuda.mem_get_info") + @patch("vllm.device_allocator.cumem.CuMemAllocator") def test_sleep_level_2(self, mock_allocator_class, mock_mem_info, mock_gpu_worker): """Test sleep mode level 2 (offload all, save buffers).""" # Setup memory info mocks @@ -111,34 +111,36 @@ def test_sleep_level_2(self, mock_allocator_class, mock_mem_info, mock_gpu_worke (1 * 1024**3, 8 * 1024**3), # Before sleep (5 * 1024**3, 8 * 1024**3), # After sleep (freed 4GB) ] - + # Setup allocator mock mock_allocator = Mock() mock_allocator_class.get_instance = Mock(return_value=mock_allocator) mock_allocator.sleep = Mock() - + # Mock pipeline buffers mock_buffer1 = torch.randn(10, 10) mock_buffer2 = torch.randn(20, 20) - mock_gpu_worker.pipeline.named_buffers = Mock(return_value=[ - ("buffer1", mock_buffer1), - ("buffer2", mock_buffer2), - ]) - + mock_gpu_worker.pipeline.named_buffers = Mock( + return_value=[ + ("buffer1", mock_buffer1), + ("buffer2", mock_buffer2), + ] + ) + # Call sleep with level 2 result = mock_gpu_worker.sleep(level=2) - + # Verify sleep was called with empty tags (offload all) mock_allocator.sleep.assert_called_once_with(offload_tags=tuple()) assert result is True - + # Verify buffers were saved assert len(mock_gpu_worker._sleep_saved_buffers) == 2 assert "buffer1" in mock_gpu_worker._sleep_saved_buffers assert "buffer2" in mock_gpu_worker._sleep_saved_buffers - @patch('vllm_omni.diffusion.worker.gpu_worker.torch.cuda.mem_get_info') - @patch('vllm.device_allocator.cumem.CuMemAllocator') + @patch("vllm_omni.diffusion.worker.gpu_worker.torch.cuda.mem_get_info") + @patch("vllm.device_allocator.cumem.CuMemAllocator") def test_sleep_memory_freed_validation(self, mock_allocator_class, mock_mem_info, mock_gpu_worker): """Test that sleep validates memory was actually freed.""" # Simulate memory increase (should trigger assertion error) @@ -146,11 +148,11 @@ def test_sleep_memory_freed_validation(self, mock_allocator_class, mock_mem_info (3 * 1024**3, 8 * 1024**3), # Before sleep: 3GB free (1 * 1024**3, 8 * 1024**3), # After sleep: 1GB free (negative freed!) ] - + mock_allocator = Mock() mock_allocator_class.get_instance = Mock(return_value=mock_allocator) mock_allocator.sleep = Mock() - + # This should raise an assertion error with pytest.raises(AssertionError, match="Memory usage increased after sleeping"): mock_gpu_worker.sleep(level=1) @@ -159,32 +161,32 @@ def test_sleep_memory_freed_validation(self, mock_allocator_class, mock_mem_info class TestGPUWorkerWakeUp: """Test GPUWorker.wake_up method.""" - @patch('vllm.device_allocator.cumem.CuMemAllocator') + @patch("vllm.device_allocator.cumem.CuMemAllocator") def test_wake_up_without_buffers(self, mock_allocator_class, mock_gpu_worker): """Test wake_up without saved buffers (level 1 sleep).""" # Setup allocator mock mock_allocator = Mock() mock_allocator_class.get_instance = Mock(return_value=mock_allocator) mock_allocator.wake_up = Mock() - + # Ensure no saved buffers mock_gpu_worker._sleep_saved_buffers = {} - + # Call wake_up result = mock_gpu_worker.wake_up(tags=["weights"]) - + # Verify allocator.wake_up was called mock_allocator.wake_up.assert_called_once_with(["weights"]) assert result is True - @patch('vllm.device_allocator.cumem.CuMemAllocator') + @patch("vllm.device_allocator.cumem.CuMemAllocator") def test_wake_up_with_buffers(self, mock_allocator_class, mock_gpu_worker): """Test wake_up with saved buffers (level 2 sleep).""" # Setup allocator mock mock_allocator = Mock() mock_allocator_class.get_instance = Mock(return_value=mock_allocator) mock_allocator.wake_up = Mock() - + # Create saved buffers saved_buffer1 = torch.randn(10, 10) saved_buffer2 = torch.randn(20, 20) @@ -192,63 +194,67 @@ def test_wake_up_with_buffers(self, mock_allocator_class, mock_gpu_worker): "buffer1": saved_buffer1, "buffer2": saved_buffer2, } - + # Mock pipeline buffers (these will be restored) mock_buffer1 = Mock() mock_buffer1.data = Mock() mock_buffer2 = Mock() mock_buffer2.data = Mock() - - mock_gpu_worker.pipeline.named_buffers = Mock(return_value=[ - ("buffer1", mock_buffer1), - ("buffer2", mock_buffer2), - ]) - + + mock_gpu_worker.pipeline.named_buffers = Mock( + return_value=[ + ("buffer1", mock_buffer1), + ("buffer2", mock_buffer2), + ] + ) + # Call wake_up result = mock_gpu_worker.wake_up(tags=None) - + # Verify allocator.wake_up was called mock_allocator.wake_up.assert_called_once_with(None) - + # Verify buffers were restored mock_buffer1.data.copy_.assert_called_once() mock_buffer2.data.copy_.assert_called_once() - + # Verify saved buffers were cleared assert len(mock_gpu_worker._sleep_saved_buffers) == 0 assert result is True - @patch('vllm.device_allocator.cumem.CuMemAllocator') + @patch("vllm.device_allocator.cumem.CuMemAllocator") def test_wake_up_partial_buffer_restore(self, mock_allocator_class, mock_gpu_worker): """Test wake_up only restores buffers that were saved.""" # Setup allocator mock mock_allocator = Mock() mock_allocator_class.get_instance = Mock(return_value=mock_allocator) mock_allocator.wake_up = Mock() - + # Only save buffer1, not buffer2 saved_buffer1 = torch.randn(10, 10) mock_gpu_worker._sleep_saved_buffers = { "buffer1": saved_buffer1, } - + # Mock pipeline has both buffers mock_buffer1 = Mock() mock_buffer1.data = Mock() mock_buffer2 = Mock() mock_buffer2.data = Mock() - - mock_gpu_worker.pipeline.named_buffers = Mock(return_value=[ - ("buffer1", mock_buffer1), - ("buffer2", mock_buffer2), - ]) - + + mock_gpu_worker.pipeline.named_buffers = Mock( + return_value=[ + ("buffer1", mock_buffer1), + ("buffer2", mock_buffer2), + ] + ) + # Call wake_up result = mock_gpu_worker.wake_up() - + # Verify only buffer1 was restored mock_buffer1.data.copy_.assert_called_once() # buffer2 should NOT be restored since it wasn't saved mock_buffer2.data.copy_.assert_not_called() - + assert result is True From 03321f07d0f57c9413a326f47b031ca2ccd9e1fe Mon Sep 17 00:00:00 2001 From: knlnguyen1802 Date: Mon, 22 Dec 2025 11:21:20 +0800 Subject: [PATCH 6/6] Fix test Signed-off-by: knlnguyen1802 --- tests/diffusion/test_gpu_worker.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/diffusion/test_gpu_worker.py b/tests/diffusion/test_gpu_worker.py index 1e9571813..defeffe5b 100644 --- a/tests/diffusion/test_gpu_worker.py +++ b/tests/diffusion/test_gpu_worker.py @@ -78,6 +78,7 @@ class TestGPUWorkerSleep: """Test GPUWorker.sleep method.""" @patch("vllm_omni.diffusion.worker.gpu_worker.torch.cuda.mem_get_info") + @patch("vllm.device_allocator.cumem.CuMemAllocator") def test_sleep_level_1(self, mock_allocator_class, mock_mem_info, mock_gpu_worker): """Test sleep mode level 1 (offload weights only).""" # Setup memory info mocks