diff --git a/cuda_bindings/cuda/bindings/driver.pyx.in b/cuda_bindings/cuda/bindings/driver.pyx.in index 6be529571..9ef2c4162 100644 --- a/cuda_bindings/cuda/bindings/driver.pyx.in +++ b/cuda_bindings/cuda/bindings/driver.pyx.in @@ -34305,7 +34305,7 @@ def cuMemPrefetchAsync_v2(devPtr, size_t count, location not None : CUmemLocatio to the host NUMA node closest to the current thread's CPU by specifying :py:obj:`~.CU_MEM_LOCATION_TYPE_HOST_NUMA_CURRENT` for :py:obj:`~.CUmemLocation.type`. Note when - :py:obj:`~.CUmemLocation.type` is etiher + :py:obj:`~.type` is etiher :py:obj:`~.CU_MEM_LOCATION_TYPE_HOST` OR :py:obj:`~.CU_MEM_LOCATION_TYPE_HOST_NUMA_CURRENT`, :py:obj:`~.CUmemLocation.id` will be ignored. diff --git a/cuda_core/cuda/core/experimental/__init__.py b/cuda_core/cuda/core/experimental/__init__.py index 6e289d49b..6bd5acf5b 100644 --- a/cuda_core/cuda/core/experimental/__init__.py +++ b/cuda_core/cuda/core/experimental/__init__.py @@ -7,6 +7,7 @@ from cuda.core.experimental._event import EventOptions from cuda.core.experimental._launcher import LaunchConfig, launch from cuda.core.experimental._linker import Linker, LinkerOptions +from cuda.core.experimental._memory import AsyncMempool from cuda.core.experimental._module import ObjectCode from cuda.core.experimental._program import Program, ProgramOptions from cuda.core.experimental._stream import Stream, StreamOptions diff --git a/cuda_core/cuda/core/experimental/_device.py b/cuda_core/cuda/core/experimental/_device.py index 6be8077b8..00e9af64d 100644 --- a/cuda_core/cuda/core/experimental/_device.py +++ b/cuda_core/cuda/core/experimental/_device.py @@ -6,8 +6,9 @@ from typing import Optional, Union from cuda.core.experimental._context import Context, ContextOptions +from cuda.core.experimental._memory import AsyncMempool, Buffer, MemoryResource, _SynchronousMemoryResource from cuda.core.experimental._event import Event, EventOptions -from cuda.core.experimental._memory import Buffer, MemoryResource, _DefaultAsyncMempool, _SynchronousMemoryResource + from cuda.core.experimental._stream import Stream, StreamOptions, default_stream from cuda.core.experimental._utils.clear_error_support import assert_type from cuda.core.experimental._utils.cuda_utils import ( @@ -979,7 +980,7 @@ def __new__(cls, device_id=None): runtime.cudaDeviceGetAttribute(runtime.cudaDeviceAttr.cudaDevAttrMemoryPoolsSupported, 0) ) ) == 1: - dev._mr = _DefaultAsyncMempool(dev_id) + dev._mr = AsyncMempool._from_device(dev_id) else: dev._mr = _SynchronousMemoryResource(dev_id) diff --git a/cuda_core/cuda/core/experimental/_memory.py b/cuda_core/cuda/core/experimental/_memory.py index 6a0c611d3..be391f841 100644 --- a/cuda_core/cuda/core/experimental/_memory.py +++ b/cuda_core/cuda/core/experimental/_memory.py @@ -5,6 +5,9 @@ from __future__ import annotations import abc + +# Register cleanup function to be called at interpreter shutdown +import platform import weakref from typing import Optional, Tuple, TypeVar @@ -78,6 +81,11 @@ def close(self, stream=None): the default stream. """ + if self._mnff.mr is None: + raise RuntimeError( + "Cannot close a buffer that was not allocated from a memory resource, this buffer is: ", + self, + ) self._mnff.close(stream) @property @@ -211,8 +219,31 @@ def __release_buffer__(self, buffer: memoryview, /): raise NotImplementedError("WIP: Buffer.__release_buffer__ hasn't been implemented yet.") +class IPCBufferDescriptor: + """Buffer class to represent a buffer description which can be shared across processes. + It is not a valid buffer containing data, but rather a description used by the importing + process to construct a valid buffer. It's primary use is to provide a serialization + mechanism for passing exported buffers between processes.""" + + def __init__(self, reserved: bytes, size: int): + self.reserved = reserved + self._size = size + + def __reduce__(self): + # This is subject to change if the CumemPoolPtrExportData struct/object changes. + return (self._reconstruct, (self.reserved, self._size)) + + @classmethod + def _reconstruct(cls, reserved, size): + instance = cls(reserved, size) + return instance + + class MemoryResource(abc.ABC): - __slots__ = ("_handle",) + """Base class for memory resources. + + This class provides an abstract interface for memory resources. + """ @abc.abstractmethod def __init__(self, *args, **kwargs): ... @@ -245,36 +276,313 @@ def device_id(self) -> int: ... -class _DefaultAsyncMempool(MemoryResource): - __slots__ = ("_dev_id",) +def _get_platform_handle_type() -> int: + """Returns the appropriate handle type for the current platform.""" + system = platform.system() + if system == "Linux": + return driver.CUmemAllocationHandleType.CU_MEM_HANDLE_TYPE_POSIX_FILE_DESCRIPTOR + elif system == "Windows": + raise RuntimeError("IPC support is not yet available on Windows") + else: + raise RuntimeError(f"Unsupported platform: {system}") - def __init__(self, dev_id): - self._handle = handle_return(driver.cuDeviceGetMemPool(dev_id)) + +class AsyncMempool(MemoryResource): + """A CUDA memory pool for efficient memory allocation. + + This class creates a CUDA memory pool that provides better allocation and + deallocation performance compared to individual allocations. The pool can + optionally be configured to support sharing across process boundaries. + + Use the static methods create() or from_shared_handle() to instantiate. + Direct instantiation is not supported. + + Notes + ----- + The _from_device() method is for internal use by the Device class only and + should not be called directly by users. + """ + + class _MembersNeededForFinalize: + __slots__ = ("handle", "need_close") + + def __init__(self, mr_obj, handle, need_close): + self.handle = handle + self.need_close = need_close + weakref.finalize(mr_obj, self.close) + + def close(self): + if self.handle and self.need_close: + handle_return(driver.cuMemPoolDestroy(self.handle)) + self.handle = None + self.need_close = False + + __slots__ = ("_mnff", "_dev_id", "_ipc_enabled") + + def __init__(self): + """Direct instantiation is not supported. + + Use the static methods create() or from_shared_handle() instead. + """ + raise NotImplementedError( + "directly creating an AsyncMempool object is not supported. Please use either " + "AsyncMempool.create() or from_shared_handle()" + ) + + @staticmethod + def _init(dev_id: int, handle: int, ipc_enabled: bool = False, need_close: bool = False) -> AsyncMempool: + """Internal constructor for AsyncMempool objects. + + Parameters + ---------- + dev_id : int + The ID of the GPU device where the memory pool will be created + handle : int + The handle to the CUDA memory pool + ipc_enabled : bool + Whether the pool supports inter-process sharing capabilities + + Returns + ------- + AsyncMempool + A new memory pool instance + """ + self = AsyncMempool.__new__(AsyncMempool) self._dev_id = dev_id + self._ipc_enabled = ipc_enabled + self._mnff = AsyncMempool._MembersNeededForFinalize(self, handle, need_close) + return self - def allocate(self, size, stream=None) -> Buffer: + @staticmethod + def _from_device(dev_id: int) -> AsyncMempool: + """Internal method to create an AsyncMempool for a device's default memory pool. + + This method is intended for internal use by the Device class only. + Users should not call this method directly. + + Parameters + ---------- + dev_id : int + The ID of the GPU device to get the default memory pool from + + Returns + ------- + AsyncMempool + A memory pool instance connected to the device's default pool + """ + handle = handle_return(driver.cuDeviceGetMemPool(dev_id)) + return AsyncMempool._init(dev_id, handle, ipc_enabled=False, need_close=False) + + @staticmethod + def create(dev_id: int, max_size: int, ipc_enabled: bool = False) -> AsyncMempool: + """Create a new memory pool. + + Parameters + ---------- + dev_id : int + The ID of the GPU device where the memory pool will be created + max_size : int + Maximum size in bytes that the memory pool can grow to + ipc_enabled : bool, optional + Whether to enable inter-process sharing capabilities. Default is False. + Note: IPC support is not yet available on Windows. + + Returns + ------- + AsyncMempool + A new memory pool instance + + Raises + ------ + ValueError + If max_size is None + RuntimeError + If ipc_enabled is True on Windows (not yet supported) + CUDAError + If pool creation fails + """ + if max_size is None: + raise ValueError("max_size must be provided when creating a new memory pool") + + if platform.system() == "Windows" and ipc_enabled: + raise RuntimeError("IPC support is not yet available on Windows") + + properties = driver.CUmemPoolProps() + properties.allocType = driver.CUmemAllocationType.CU_MEM_ALLOCATION_TYPE_PINNED + properties.handleTypes = ( + _get_platform_handle_type() if ipc_enabled else driver.CUmemAllocationHandleType.CU_MEM_HANDLE_TYPE_NONE + ) + properties.location = driver.CUmemLocation() + properties.location.id = dev_id + properties.location.type = driver.CUmemLocationType.CU_MEM_LOCATION_TYPE_DEVICE + properties.maxSize = max_size + properties.win32SecurityAttributes = 0 + properties.usage = 0 + + handle = handle_return(driver.cuMemPoolCreate(properties)) + return AsyncMempool._init(dev_id, handle, ipc_enabled=ipc_enabled, need_close=True) + + @staticmethod + def from_shared_handle(dev_id: int, shared_handle: int) -> AsyncMempool: + """Create an AsyncMempool from an existing handle. + + Parameters + ---------- + dev_id : int + The ID of the GPU device where the memory pool will be created + shared_handle : int + A platform-specific handle to import an existing memory pool + + Returns + ------- + AsyncMempool + A memory pool instance connected to the existing pool + """ + handle = handle_return(driver.cuMemPoolImportFromShareableHandle(shared_handle, _get_platform_handle_type(), 0)) + return AsyncMempool._init( + dev_id, handle, ipc_enabled=True, need_close=True + ) # Imported pools are always IPC-enabled + + def get_shareable_handle(self) -> int: + """Get a platform-specific handle that can be shared with other processes.""" + if not self._ipc_enabled: + raise RuntimeError("This memory pool was not created with IPC support enabled") + return handle_return(driver.cuMemPoolExportToShareableHandle(self._mnff.handle, _get_platform_handle_type(), 0)) + + def export_buffer(self, buffer: Buffer) -> IPCBufferDescriptor: + """Export a buffer allocated from this pool for sharing between processes.""" + if not self._ipc_enabled: + raise RuntimeError("This memory pool was not created with IPC support enabled") + return IPCBufferDescriptor( + handle_return(driver.cuMemPoolExportPointer(buffer.handle)).reserved, buffer._mnff.size + ) + + def import_buffer(self, ipc_buffer: IPCBufferDescriptor) -> Buffer: + """Import a buffer that was exported from another process.""" + if not self._ipc_enabled: + raise RuntimeError("This memory pool was not created with IPC support enabled") + share_data = driver.CUmemPoolPtrExportData() + share_data.reserved = ipc_buffer.reserved + return Buffer( + handle_return(driver.cuMemPoolImportPointer(self._mnff.handle, share_data)), ipc_buffer._size, self + ) + + def allocate(self, size: int, stream=None) -> Buffer: + """Allocate memory from the pool.""" if stream is None: stream = default_stream() - ptr = handle_return(driver.cuMemAllocFromPoolAsync(size, self._handle, stream.handle)) + ptr = handle_return(driver.cuMemAllocFromPoolAsync(size, self._mnff.handle, stream.handle)) return Buffer(ptr, size, self) - def deallocate(self, ptr, size, stream=None): + def deallocate(self, ptr: int, size: int, stream=None) -> None: + """Deallocate memory back to the pool.""" if stream is None: stream = default_stream() handle_return(driver.cuMemFreeAsync(ptr, stream.handle)) @property def is_device_accessible(self) -> bool: + """Whether memory from this pool is accessible from device code.""" return True @property def is_host_accessible(self) -> bool: + """Whether memory from this pool is accessible from host code.""" return False @property def device_id(self) -> int: + """The ID of the GPU device this memory pool is associated with.""" return self._dev_id + @property + def reuse_follow_event_dependencies(self) -> bool: + """Allow memory to be reused when there are event dependencies between streams.""" + return bool( + handle_return( + driver.cuMemPoolGetAttribute( + self._mnff.handle, driver.CUmemPool_attribute.CU_MEMPOOL_ATTR_REUSE_FOLLOW_EVENT_DEPENDENCIES + ) + ) + ) + + @property + def reuse_allow_opportunistic(self) -> bool: + """Allow reuse of completed frees without dependencies.""" + return bool( + handle_return( + driver.cuMemPoolGetAttribute( + self._mnff.handle, driver.CUmemPool_attribute.CU_MEMPOOL_ATTR_REUSE_ALLOW_OPPORTUNISTIC + ) + ) + ) + + @property + def reuse_allow_internal_dependencies(self) -> bool: + """Allow insertion of new stream dependencies for memory reuse.""" + return bool( + handle_return( + driver.cuMemPoolGetAttribute( + self._mnff.handle, driver.CUmemPool_attribute.CU_MEMPOOL_ATTR_REUSE_ALLOW_INTERNAL_DEPENDENCIES + ) + ) + ) + + @property + def release_threshold(self) -> int: + """Amount of reserved memory to hold before OS release.""" + return int( + handle_return( + driver.cuMemPoolGetAttribute( + self._mnff.handle, driver.CUmemPool_attribute.CU_MEMPOOL_ATTR_RELEASE_THRESHOLD + ) + ) + ) + + @property + def reserved_mem_current(self) -> int: + """Current amount of backing memory allocated.""" + return int( + handle_return( + driver.cuMemPoolGetAttribute( + self._mnff.handle, driver.CUmemPool_attribute.CU_MEMPOOL_ATTR_RESERVED_MEM_CURRENT + ) + ) + ) + + @property + def reserved_mem_high(self) -> int: + """High watermark of backing memory allocated.""" + return int( + handle_return( + driver.cuMemPoolGetAttribute( + self._mnff.handle, driver.CUmemPool_attribute.CU_MEMPOOL_ATTR_RESERVED_MEM_HIGH + ) + ) + ) + + @property + def used_mem_current(self) -> int: + """Current amount of memory in use.""" + return int( + handle_return( + driver.cuMemPoolGetAttribute( + self._mnff.handle, driver.CUmemPool_attribute.CU_MEMPOOL_ATTR_USED_MEM_CURRENT + ) + ) + ) + + @property + def used_mem_high(self) -> int: + """High watermark of memory in use.""" + return int( + handle_return( + driver.cuMemPoolGetAttribute( + self._mnff.handle, driver.CUmemPool_attribute.CU_MEMPOOL_ATTR_USED_MEM_HIGH + ) + ) + ) + class _DefaultPinnedMemorySource(MemoryResource): def __init__(self): diff --git a/cuda_core/tests/test_memory.py b/cuda_core/tests/test_memory.py index 1ff728c64..449d6e115 100644 --- a/cuda_core/tests/test_memory.py +++ b/cuda_core/tests/test_memory.py @@ -6,18 +6,30 @@ # this software and related documentation outside the terms of the EULA # is strictly prohibited. +import platform +import traceback + +import pytest + try: from cuda.bindings import driver except ImportError: from cuda import cuda as driver +import array import ctypes +import multiprocessing + +if platform.system() == "Linux": + from socket import AF_UNIX, CMSG_LEN, SCM_RIGHTS, SOCK_DGRAM, SOL_SOCKET, socketpair + + pass import pytest from cuda.core.experimental import Device -from cuda.core.experimental._memory import Buffer, DLDeviceType, MemoryResource -from cuda.core.experimental._utils.cuda_utils import handle_return +from cuda.core.experimental._memory import Buffer, DLDeviceType, MemoryResource, AsyncMempool +from cuda.core.experimental._utils.cuda_utils import get_binding_version, handle_return class DummyDeviceMemoryResource(MemoryResource): @@ -49,12 +61,10 @@ def __init__(self): pass def allocate(self, size, stream=None) -> Buffer: - # Allocate a ctypes buffer of size `size` ptr = (ctypes.c_byte * size)() return Buffer(ptr=ptr, size=size, mr=self) def deallocate(self, ptr, size, stream=None): - # the memory is deallocated per the ctypes deallocation at garbage collection time pass @property @@ -123,11 +133,10 @@ class NullMemoryResource(DummyHostMemoryResource): def is_host_accessible(self) -> bool: return False - def buffer_initialization(dummy_mr: MemoryResource): - buffer = dummy_mr.allocate(size=1024) + buffer = dummy_mr.allocate(size=64) assert buffer.handle != 0 - assert buffer.size == 1024 + assert buffer.size == 64 assert buffer.memory_resource == dummy_mr assert buffer.is_device_accessible == dummy_mr.is_device_accessible assert buffer.is_host_accessible == dummy_mr.is_host_accessible @@ -144,13 +153,13 @@ def test_buffer_initialization(): def buffer_copy_to(dummy_mr: MemoryResource, device: Device, check=False): - src_buffer = dummy_mr.allocate(size=1024) - dst_buffer = dummy_mr.allocate(size=1024) + src_buffer = dummy_mr.allocate(size=64) + dst_buffer = dummy_mr.allocate(size=64) stream = device.create_stream() if check: src_ptr = ctypes.cast(src_buffer.handle, ctypes.POINTER(ctypes.c_byte)) - for i in range(1024): + for i in range(64): src_ptr[i] = ctypes.c_byte(i) src_buffer.copy_to(dst_buffer, stream=stream) @@ -158,7 +167,6 @@ def buffer_copy_to(dummy_mr: MemoryResource, device: Device, check=False): if check: dst_ptr = ctypes.cast(dst_buffer.handle, ctypes.POINTER(ctypes.c_byte)) - for i in range(10): assert dst_ptr[i] == src_ptr[i] @@ -175,13 +183,13 @@ def test_buffer_copy_to(): def buffer_copy_from(dummy_mr: MemoryResource, device, check=False): - src_buffer = dummy_mr.allocate(size=1024) - dst_buffer = dummy_mr.allocate(size=1024) + src_buffer = dummy_mr.allocate(size=64) + dst_buffer = dummy_mr.allocate(size=64) stream = device.create_stream() if check: src_ptr = ctypes.cast(src_buffer.handle, ctypes.POINTER(ctypes.c_byte)) - for i in range(1024): + for i in range(64): src_ptr[i] = ctypes.c_byte(i) dst_buffer.copy_from(src_buffer, stream=stream) @@ -189,7 +197,6 @@ def buffer_copy_from(dummy_mr: MemoryResource, device, check=False): if check: dst_ptr = ctypes.cast(dst_buffer.handle, ctypes.POINTER(ctypes.c_byte)) - for i in range(10): assert dst_ptr[i] == src_ptr[i] @@ -206,7 +213,7 @@ def test_buffer_copy_from(): def buffer_close(dummy_mr: MemoryResource): - buffer = dummy_mr.allocate(size=1024) + buffer = dummy_mr.allocate(size=64) buffer.close() assert buffer.handle == 0 assert buffer.memory_resource is None @@ -221,6 +228,308 @@ def test_buffer_close(): buffer_close(DummyPinnedMemoryResource(device)) +def test_mempool(): + if get_binding_version() < (12, 0): + pytest.skip("Test requires CUDA 12 or higher") + device = Device() + device.set_current() + + if not device.properties.memory_pools_supported: + pytest.skip("Device does not support mempool operations") + + pool_size = 2097152 # 2MB size + + # Test basic pool creation + mr = AsyncMempool.create(device.device_id, pool_size, ipc_enabled=False) + assert mr.device_id == device.device_id + assert mr.is_device_accessible + assert not mr.is_host_accessible + + # Test allocation and deallocation + buffer1 = mr.allocate(1024) + assert buffer1.handle != 0 + assert buffer1.size == 1024 + assert buffer1.memory_resource == mr + buffer1.close() + + # Test multiple allocations + buffer1 = mr.allocate(1024) + buffer2 = mr.allocate(2048) + assert buffer1.handle != buffer2.handle + assert buffer1.size == 1024 + assert buffer2.size == 2048 + buffer1.close() + buffer2.close() + + # Test stream-based allocation + stream = device.create_stream() + buffer = mr.allocate(1024, stream=stream) + assert buffer.handle != 0 + buffer.close() + + # Test memory copying between buffers from same pool + src_buffer = mr.allocate(64) + dst_buffer = mr.allocate(64) + stream = device.create_stream() + src_buffer.copy_to(dst_buffer, stream=stream) + device.sync() + dst_buffer.close() + src_buffer.close() + + # Test error cases + with pytest.raises(NotImplementedError, match="directly creating an AsyncMempool object is not supported"): + AsyncMempool() + + with pytest.raises(ValueError, match="max_size must be provided when creating a new memory pool"): + AsyncMempool.create(device.device_id, None) + + # Test IPC operations are disabled + buffer = mr.allocate(64) + + with pytest.raises(RuntimeError, match="This memory pool was not created with IPC support enabled"): + mr.get_shareable_handle() + + with pytest.raises(RuntimeError, match="This memory pool was not created with IPC support enabled"): + mr.export_buffer(buffer) + + with pytest.raises(RuntimeError, match="This memory pool was not created with IPC support enabled"): + mr.import_buffer(None) + + buffer.close() + + +@pytest.mark.parametrize( + "property_name,expected_type", + [ + ("reuse_follow_event_dependencies", bool), + ("reuse_allow_opportunistic", bool), + ("reuse_allow_internal_dependencies", bool), + ("release_threshold", int), + ("reserved_mem_current", int), + ("reserved_mem_high", int), + ("used_mem_current", int), + ("used_mem_high", int), + ], +) +def test_mempool_properties(property_name, expected_type): + """Test all properties of the AsyncMempool class.""" + # skip test if cuda version is less than 12 + if get_binding_version() < (12, 0): + pytest.skip("Test requires CUDA 12 or higher") + + device = Device() + device.set_current() + + if not device.properties.memory_pools_supported: + pytest.skip("Device does not support mempool operations") + + pool_size = 2097152 # 2MB size + mr = AsyncMempool.create(device.device_id, pool_size, ipc_enabled=False) + + try: + # Get the property value + value = getattr(mr, property_name) + + # Test type + assert isinstance(value, expected_type), f"{property_name} should return {expected_type}, got {type(value)}" + + # Test value constraints + if expected_type is int: + assert value >= 0, f"{property_name} should be non-negative" + + # Test memory usage properties with actual allocations + if property_name in ["reserved_mem_current", "used_mem_current"]: + # Allocate some memory and check if values increase + initial_value = value + buffer = None + try: + buffer = mr.allocate(1024) + new_value = getattr(mr, property_name) + assert new_value >= initial_value, f"{property_name} should increase or stay same after allocation" + finally: + if buffer is not None: + buffer.close() + + # Test high watermark properties + if property_name in ["reserved_mem_high", "used_mem_high"]: + # High watermark should never be less than current + current_prop = property_name.replace("_high", "_current") + current_value = getattr(mr, current_prop) + assert value >= current_value, f"{property_name} should be >= {current_prop}" + + finally: + # Ensure we allocate and deallocate a small buffer to flush any pending operations + flush_buffer = mr.allocate(64) + flush_buffer.close() + + +def mempool_child_process(importer, queue): + try: + device = Device() + device.set_current() + stream = device.create_stream() + + # Get the shared handle differently based on platform + if platform.system() == "Windows": + shared_handle = queue.get() # On Windows, we pass the handle through the queue + else: + # Unix socket handle transfer + fds = array.array("i") + _, ancdata, _, _ = importer.recvmsg(0, CMSG_LEN(fds.itemsize)) + assert len(ancdata) == 1 + cmsg_level, cmsg_type, cmsg_data = ancdata[0] + assert cmsg_level == SOL_SOCKET and cmsg_type == SCM_RIGHTS + fds.frombytes(cmsg_data[: len(cmsg_data) - (len(cmsg_data) % fds.itemsize)]) + shared_handle = int(fds[0]) + + mr = AsyncMempool.from_shared_handle(device.device_id, shared_handle) + ipc_buffer = queue.get() # Get exported buffer data + buffer = mr.import_buffer(ipc_buffer) + + # Create a new buffer to verify data using unified memory + unified_mr = DummyUnifiedMemoryResource(device) + verify_buffer = unified_mr.allocate(64) + + # Copy data from imported buffer to verify contents + verify_buffer.copy_from(buffer, stream=stream) + device.sync() + + # Verify the data matches expected pattern + verify_ptr = ctypes.cast(int(verify_buffer.handle), ctypes.POINTER(ctypes.c_byte)) + for i in range(64): + assert ctypes.c_byte(verify_ptr[i]).value == ctypes.c_byte(i).value, f"Data mismatch at index {i}" + + # Write new pattern to the buffer using unified memory + src_buffer = unified_mr.allocate(64) + src_ptr = ctypes.cast(int(src_buffer.handle), ctypes.POINTER(ctypes.c_byte)) + for i in range(64): + src_ptr[i] = ctypes.c_byte(255 - i) # Write inverted pattern + + # Copy new pattern to the IPC buffer + buffer.copy_from(src_buffer, stream=stream) + device.sync() + + verify_buffer.close() + src_buffer.close() + buffer.close() + + queue.put(True) + except Exception as e: + # Capture the full traceback + tb_str = "".join(traceback.format_exception(type(e), e, e.__traceback__)) + queue.put((e, tb_str)) + + +def test_ipc_mempool(): + if get_binding_version() < (12, 0): + pytest.skip("Test requires CUDA 12 or higher") + + # Check if IPC is supported on this platform/device + device = Device() + device.set_current() + if not device.properties.memory_pools_supported: + pytest.skip("Device does not support mempool operations") + + # Set multiprocessing start method before creating any multiprocessing objects + multiprocessing.set_start_method("spawn", force=True) + + stream = device.create_stream() + pool_size = 2097152 # 2MB size + mr = AsyncMempool.create(device.device_id, pool_size, ipc_enabled=True) + + # Create socket pair for handle transfer (only on Unix systems) + exporter = None + importer = None + if platform.system() == "Linux": + exporter, importer = socketpair(AF_UNIX, SOCK_DGRAM) + + queue = multiprocessing.Queue() + process = None + + try: + shareable_handle = mr.get_shareable_handle() + + # Allocate and export memory + buffer = mr.allocate(64) + + try: + # Fill buffer with test pattern using unified memory + unified_mr = DummyUnifiedMemoryResource(device) + src_buffer = unified_mr.allocate(64) + try: + src_ptr = ctypes.cast(int(src_buffer.handle), ctypes.POINTER(ctypes.c_byte)) + for i in range(64): + src_ptr[i] = ctypes.c_byte(i) + + buffer.copy_from(src_buffer, stream=stream) + device.sync() + finally: + src_buffer.close() + + # Export buffer for IPC + ipc_buffer = mr.export_buffer(buffer) + + # Start child process + process = multiprocessing.Process( + target=mempool_child_process, args=(importer if platform.system() == "Linux" else None, queue) + ) + process.start() + + # Send handles to child process + if platform.system() == "Windows": + queue.put(shareable_handle) # Send handle through queue on Windows + else: + # Use Unix socket for handle transfer + exporter.sendmsg([], [(SOL_SOCKET, SCM_RIGHTS, array.array("i", [shareable_handle]))]) + + queue.put(ipc_buffer) + + # Wait for child process + process.join(timeout=10) + assert process.exitcode == 0 + + # Check for exceptions + if not queue.empty(): + result = queue.get() + if isinstance(result, tuple): + exception, traceback_str = result + print("\nException in child process:") + print(traceback_str) + raise exception + assert result is True + + # Verify child process wrote the inverted pattern using unified memory + verify_buffer = unified_mr.allocate(64) + try: + verify_buffer.copy_from(buffer, stream=stream) + device.sync() + + verify_ptr = ctypes.cast(int(verify_buffer.handle), ctypes.POINTER(ctypes.c_byte)) + for i in range(64): + assert ( + ctypes.c_byte(verify_ptr[i]).value == ctypes.c_byte(255 - i).value + ), f"Child process data not reflected in parent at index {i}" + finally: + verify_buffer.close() + + finally: + buffer.close() + + finally: + # Clean up all resources + if process is not None and process.is_alive(): + process.terminate() + process.join(timeout=1) + queue.close() + queue.join_thread() + if exporter is not None: + exporter.close() + if importer is not None: + importer.close() + # Flush any pending operations + flush_buffer = mr.allocate(64) + flush_buffer.close() + def test_buffer_dunder_dlpack(): device = Device() device.set_current()