diff --git a/paddle/fluid/pybind/tensor.cc b/paddle/fluid/pybind/tensor.cc index 9f838e7350975c..a54c1c6ad686b2 100644 --- a/paddle/fluid/pybind/tensor.cc +++ b/paddle/fluid/pybind/tensor.cc @@ -989,7 +989,7 @@ void BindTensor(pybind11::module &m) { // NOLINT tensor dims, lod information, device index. )DOC") - .def("_share_device_ipc", + .def("_share_cuda", [](phi::DenseTensor self) { if (!self.IsInitialized() || self.numel() == 0) throw std::runtime_error( @@ -1053,9 +1053,9 @@ void BindTensor(pybind11::module &m) { // NOLINT >>> import paddle >>> tensor = paddle.ones([3,3]) - >>> metainfo = tensor.value().get_tensor()._share_device_ipc() + >>> metainfo = tensor.value().get_tensor()._share_cuda() )DOC") - .def("_new_from_ipc", + .def("_new_shared_cuda", [](py::tuple t) { if (FLAGS_use_virtual_memory_auto_growth && t.size() == 5) { return RebuildTensorFromVmmMeta(t); @@ -1102,8 +1102,8 @@ void BindTensor(pybind11::module &m) { // NOLINT >>> import paddle >>> tensor = paddle.ones([3,3]) - >>> metainfo = tensor.value().get_tensor()._share_device_ipc() - >>> tensor_from_shared = paddle.to_tensor(paddle.base.core.DenseTensor._new_from_ipc(metainfo)) + >>> metainfo = tensor.value().get_tensor()._share_cuda() + >>> tensor_from_shared = paddle.to_tensor(paddle.base.core.DenseTensor._new_shared_cuda(metainfo)) )DOC") #endif #ifdef PADDLE_WITH_XPU @@ -1157,7 +1157,7 @@ void BindTensor(pybind11::module &m) { // NOLINT tuple: contains data size, data type, tensor dims, lod information, device index. )DOC") - .def("_share_device_ipc", + .def("_share_xpu", [](phi::DenseTensor &self) { if (!self.IsInitialized() || self.numel() == 0) throw std::runtime_error( @@ -1167,7 +1167,7 @@ void BindTensor(pybind11::module &m) { // NOLINT // Get the current device ID. int dev_id = platform::GetXPUCurrentDeviceId(); paddle::platform::SetXPUDeviceId(dev_id); - VLOG(6) << "[DEBUG XPU] _share_device_ipc: current XPU device = " + VLOG(6) << "[DEBUG XPU] _share_xpu: current XPU device = " << dev_id; auto *holder = dynamic_cast( @@ -1180,18 +1180,18 @@ void BindTensor(pybind11::module &m) { // NOLINT void *base_ptr = holder->base_ptr(); ptrdiff_t offset_bytes = reinterpret_cast(holder->ptr()) - reinterpret_cast(base_ptr); - VLOG(6) << "[DEBUG XPU] _share_device_ipc: base_ptr = " << base_ptr + VLOG(6) << "[DEBUG XPU] _share_xpu: base_ptr = " << base_ptr << ", offset_bytes = " << offset_bytes; cudaIpcMemHandle_t handle; int ret = cudaIpcGetMemHandle(&handle, base_ptr); - VLOG(6) << "[DEBUG XPU] _share_device_ipc: " - << "cudaIpcGetMemHandle returned: " << ret; + VLOG(6) << "[DEBUG XPU] _share_xpu: cudaIpcGetMemHandle returned: " + << ret; PADDLE_ENFORCE_XPU_SUCCESS(ret); // Use the correct size for the IPC handle. auto _handle = py::bytes( reinterpret_cast(&handle), (py::ssize_t)sizeof(cudaIpcMemHandle_t)); - VLOG(6) << "[DEBUG XPU] _share_device_ipc: IPC handle (bytes) = " + VLOG(6) << "[DEBUG XPU] _share_xpu: IPC handle (bytes) = " << _handle; const auto &device_id = paddle::platform::GetXPUCurrentDeviceId(); @@ -1201,8 +1201,7 @@ void BindTensor(pybind11::module &m) { // NOLINT size_t data_size = self.numel() * framework::SizeOfType( framework::TransToProtoVarType(self.type())); - VLOG(6) << "[DEBUG XPU] _share_device_ipc: data_size = " - << data_size; + VLOG(6) << "[DEBUG XPU] _share_xpu: data_size = " << data_size; return py::make_tuple(_handle, (py::size_t)offset_bytes, data_size, @@ -1218,7 +1217,7 @@ void BindTensor(pybind11::module &m) { // NOLINT tuple: contains handle, offset, data size, data type, tensor dims, lod information, and device id. )DOC") - .def("_new_from_ipc", + .def("_new_shared_xpu", [](py::tuple t) { if (t.size() != 7) throw std::runtime_error( @@ -1227,14 +1226,14 @@ void BindTensor(pybind11::module &m) { // NOLINT // Get the current device ID. int dev_id = platform::GetXPUCurrentDeviceId(); paddle::platform::SetXPUDeviceId(dev_id); - VLOG(6) << "[DEBUG XPU] _new_from_ipc: current XPU device = " + VLOG(6) << "[DEBUG XPU] _new_shared_xpu: current XPU device = " << dev_id; phi::DenseTensor tensor; const std::string &handle = t[0].cast(); ptrdiff_t offset_bytes = (ptrdiff_t)t[1].cast(); auto device_id = t[6].cast(); - VLOG(6) << "[DEBUG XPU] _new_from_ipc: handle = " << handle + VLOG(6) << "[DEBUG XPU] _new_shared_xpu: handle = " << handle << ", offset_bytes = " << offset_bytes; auto base_ptr = memory::allocation::GetIpcBasePtr(handle); size_t size = t[2].cast(); @@ -1248,7 +1247,7 @@ void BindTensor(pybind11::module &m) { // NOLINT static_cast(t[3].cast())); tensor.Resize(common::make_ddim( t[4].cast>())); - VLOG(6) << "[DEBUG XPU] _new_from_ipc: Reshape tensor dims: " + VLOG(6) << "[DEBUG XPU] _new_shared_xpu: Reshape tensor dims: " << tensor.dims(); return tensor; }, diff --git a/python/paddle/distributed/fleet/utils/tensor_fusion_helper.py b/python/paddle/distributed/fleet/utils/tensor_fusion_helper.py index 38ecf1061249e5..0d53f9ed5a4d72 100644 --- a/python/paddle/distributed/fleet/utils/tensor_fusion_helper.py +++ b/python/paddle/distributed/fleet/utils/tensor_fusion_helper.py @@ -34,10 +34,8 @@ def _share_tensor_ipc_meta(tensor): if tensor is None: return None - if ( - core.is_compiled_with_cuda() or core.is_compiled_with_xpu() - ) and not core.is_compiled_with_rocm(): - return tensor.value().get_tensor()._share_device_ipc() + if core.is_compiled_with_cuda() and not core.is_compiled_with_rocm(): + return tensor.value().get_tensor()._share_cuda() return None diff --git a/python/paddle/incubate/multiprocessing/reductions.py b/python/paddle/incubate/multiprocessing/reductions.py index 42d30a7c1fa86c..c6fde04319ba2a 100644 --- a/python/paddle/incubate/multiprocessing/reductions.py +++ b/python/paddle/incubate/multiprocessing/reductions.py @@ -95,7 +95,7 @@ def _rebuild_tensor(cls, lodtensor, metadata): def _rebuild_vmm_tensor( cls, blob: bytes, dtype_idx: int, dims: list[int], lod, device: int ): - lodtensor = cls._new_from_ipc((blob, dtype_idx, dims, lod, device)) + lodtensor = cls._new_shared_cuda((blob, dtype_idx, dims, lod, device)) return lodtensor @@ -176,12 +176,35 @@ def _rebuild_lodtensor_filedescriptor( return lodtensor -def _rebuild_device_tensor( +def _rebuild_cuda_tensor( cls, handle, offset_bytes, size, type_idx, dims, lod, device_idx ): cache_tensor = _cuda_from_cache((handle, offset_bytes)) if cache_tensor is None: - lodtensor = cls._new_from_ipc( + lodtensor = cls._new_shared_cuda( + (handle, offset_bytes, size, type_idx, dims, lod, device_idx) + ) + # We only cache cuda shared tensor here. + # The opening cost of cudaIpcMemoryHandle is very high. + # Since we cache the received tensor directly, + # The sender may reallocate the tensor space, + # you should manually maintain the lifecycle of ipc tensor + shared_cache[(handle, offset_bytes)] = lodtensor + else: + lodtensor = paddle.base.core.DenseTensor() + lodtensor._share_buffer_with( + cache_tensor, (size, type_idx, dims, lod, device_idx) + ) + + return lodtensor + + +def _rebuild_xpu_tensor( + cls, handle, offset_bytes, size, type_idx, dims, lod, device_idx +): + cache_tensor = _cuda_from_cache((handle, offset_bytes)) + if cache_tensor is None: + lodtensor = cls._new_shared_xpu( (handle, offset_bytes, size, type_idx, dims, lod, device_idx) ) # We only cache cuda shared tensor here. @@ -237,17 +260,17 @@ def _reduce_lodtensor(lodtensor): if prev_id != cur_id: paddle.base.core.set_cuda_current_device_id(cur_id) try: - metadata = lodtensor._share_device_ipc() + metadata = lodtensor._share_cuda() if len(metadata) == 5: rebuild = _rebuild_vmm_tensor else: - rebuild = _rebuild_device_tensor + rebuild = _rebuild_cuda_tensor finally: if prev_id != cur_id: paddle.base.core.set_cuda_current_device_id(prev_id) elif lodtensor._place().is_xpu_place(): - metadata = lodtensor._share_device_ipc() - rebuild = _rebuild_device_tensor + metadata = lodtensor._share_xpu() + rebuild = _rebuild_xpu_tensor else: raise RuntimeError( "We only support pass cpu/gpu/xpu lodtensor for now!" diff --git a/python/paddle/optimizer/fusion_utils.py b/python/paddle/optimizer/fusion_utils.py index 9aff9e7996d5ae..c5017ea92ea5e4 100644 --- a/python/paddle/optimizer/fusion_utils.py +++ b/python/paddle/optimizer/fusion_utils.py @@ -42,11 +42,8 @@ def _share_tensor_ipc_meta(tensor): if tensor is None: return None - if ( - paddle.core.is_compiled_with_cuda() - or paddle.core.is_compiled_with_xpu() - ) and not paddle.core.is_compiled_with_rocm(): - return tensor.value().get_tensor()._share_device_ipc() + if paddle.is_compiled_with_cuda() and not paddle.is_compiled_with_rocm(): + return tensor.value().get_tensor()._share_cuda() return None @@ -245,7 +242,9 @@ def reset_meta( assert len(buffer_ipc_meta) in (5, 7), ( "buffer_ipc_meta must be a tuple with length 5 when FLAGS_use_virtual_memory_auto_growth is True or 7 when FLAGS_use_virtual_memory_auto_growth is False." ) - new_tensor = paddle.base.core.DenseTensor._new_from_ipc(buffer_ipc_meta) + new_tensor = paddle.base.core.DenseTensor._new_shared_cuda( + buffer_ipc_meta + ) self.buffer = paddle.to_tensor(new_tensor) self.cpu_buffer = self.buffer.pin_memory() diff --git a/test/legacy_test/test_cuda_vmm_memory.py b/test/legacy_test/test_cuda_vmm_memory.py index 83106f5bcaca7f..20604c805f6f97 100644 --- a/test/legacy_test/test_cuda_vmm_memory.py +++ b/test/legacy_test/test_cuda_vmm_memory.py @@ -46,8 +46,8 @@ def _vmm_runtime_available() -> bool: return False try: tensor = paddle.randn([32], dtype="float32") - meta = tensor.get_tensor()._share_device_ipc() - rebuilt = paddle.base.core.DenseTensor._new_from_ipc(meta) + meta = tensor.get_tensor()._share_cuda() + rebuilt = paddle.base.core.DenseTensor._new_shared_cuda(meta) _ = paddle.to_tensor(rebuilt) _VMM_RUNTIME_AVAILABLE = True except Exception: @@ -138,7 +138,7 @@ def test_reduce_scatter_buffer_uses_vmm(self): values = paddle.arange(param_storage.numel(), dtype=param_storage.dtype) values_md5sum = values._md5sum() param_storage.set_value(values) - imported = paddle.base.core.DenseTensor._new_from_ipc(refreshed_meta) + imported = paddle.base.core.DenseTensor._new_shared_cuda(refreshed_meta) imported_tensor = paddle.to_tensor(imported) np.testing.assert_allclose(imported_tensor.numpy(), values.numpy()) del imported_tensor @@ -167,7 +167,7 @@ def test_reduce_scatter_meta_refresh_after_tensor_swap(self): fused_comm_buffer._param_buffer_meta_tensor = param_storage meta_a = fused_comm_buffer.param_buffer_ipc_meta imported_a = paddle.to_tensor( - paddle.base.core.DenseTensor._new_from_ipc(meta_a) + paddle.base.core.DenseTensor._new_shared_cuda(meta_a) ) np.testing.assert_allclose( imported_a.numpy(), param_storage.numpy(), rtol=0, atol=0 @@ -177,7 +177,7 @@ def test_reduce_scatter_meta_refresh_after_tensor_swap(self): fused_comm_buffer._param_buffer_meta_tensor = new_storage meta_b = fused_comm_buffer.param_buffer_ipc_meta imported_b = paddle.to_tensor( - paddle.base.core.DenseTensor._new_from_ipc(meta_b) + paddle.base.core.DenseTensor._new_shared_cuda(meta_b) ) np.testing.assert_allclose( imported_b.numpy(), new_storage.numpy(), rtol=0, atol=0 diff --git a/test/legacy_test/test_paddle_multiprocessing.py b/test/legacy_test/test_paddle_multiprocessing.py index 2ffa5e04489709..4fa28d05b30a94 100644 --- a/test/legacy_test/test_paddle_multiprocessing.py +++ b/test/legacy_test/test_paddle_multiprocessing.py @@ -56,7 +56,7 @@ def check_ipc_tensor(event, ipc_metas): ground_truth1 = paddle.to_tensor([1, 2, 3]) ground_truth2 = paddle.to_tensor([3, 4, 5]) shared_ipc_tensor = paddle.to_tensor( - paddle.base.core.DenseTensor._new_from_ipc(ipc_metas) + paddle.base.core.DenseTensor._new_shared_cuda(ipc_metas) ) paddle.cuda.ipc_collect() @@ -235,7 +235,7 @@ def test_ipc_tensor(self): paddle.device.set_device(get_device()) initial_tensor = paddle.to_tensor([1, 2, 3]) bonus = paddle.to_tensor([2]) - ipc_metas = initial_tensor.value().get_tensor()._share_device_ipc() + ipc_metas = initial_tensor.value().get_tensor()._share_cuda() ctx = mp.get_context("spawn") event = ctx.Event() process = ctx.Process(target=check_ipc_tensor, args=(event, ipc_metas)) diff --git a/test/legacy_test/test_paddle_multiprocessing_v2.py b/test/legacy_test/test_paddle_multiprocessing_v2.py deleted file mode 100644 index 82722da012021f..00000000000000 --- a/test/legacy_test/test_paddle_multiprocessing_v2.py +++ /dev/null @@ -1,298 +0,0 @@ -# Copyright (c) 2022 PaddlePaddle Authors. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import gc -import os -import platform -import time -import unittest - -from op_test import get_device - -import paddle -import paddle.incubate.multiprocessing as mp -from paddle.incubate.multiprocessing import reductions -from paddle.optimizer.fusion_utils import FusionStorage - -REPEAT = 20 -HAS_SHM_FILES = os.path.isdir('/dev/shm') - - -def fill_tensor(queue, event): - data = queue.get() - with paddle.no_grad(): - data[0][:] = 5 - data[1][:] = 5 - - event.set() - - -def send_tensor(queue, event, device, dtype): - tensor = paddle.ones([5, 5], dtype=dtype) - queue.put(tensor) - queue.put(tensor) - event.wait() - - -def send_parambase(queue, event, device, dtype): - tensor = paddle.nn.Layer().create_parameter( - [5, 5], - dtype=dtype, - default_initializer=paddle.nn.initializer.Constant(value=1.0), - ) - queue.put(tensor) - queue.put(tensor) - event.wait() - - -def check_ipc_tensor(event, ipc_metas): - ground_truth1 = paddle.to_tensor([1, 2, 3]) - ground_truth2 = paddle.to_tensor([3, 4, 5]) - shared_ipc_tensor = paddle.to_tensor( - paddle.base.core.DenseTensor._new_from_ipc(ipc_metas) - ) - paddle.cuda.ipc_collect() - - def tensor_equal(t1, t2): - return (t1 == t2).all().item() - - # Step1: Check initial value of ipc tensor - while not tensor_equal(ground_truth1, shared_ipc_tensor): - time.sleep(0.1) - event.set() - - # Step2: Check ipc tensor after update - while not tensor_equal(ground_truth2, shared_ipc_tensor): - time.sleep(0.1) - event.set() - - -def check_ipc_reduce(event, rebuild, meta): - rebuilt_tensor = paddle.to_tensor(rebuild(*meta)) - event.set() - - -def check_fusion_storage(event, storage): - # helper = FusionStorageHelper( - # storage.accumulators_meta, - # storage.master_weights_meta, - # storage.merged_model_params_meta, - # storage.buffer_ipc_meta, - # ) - event.set() - - -class leak_checker: - def __init__(self, test_case): - self.checked_pids = [os.getpid()] - self.test_case = test_case - - def __enter__(self): - self.next_fds = self._get_next_fds(10) - return self - - def __exit__(self, *args): - if args[0] is None: - self.test_case.assertFalse(self.has_shm_files()) - return False - - def check_pid(self, pid): - self.checked_pids.append(pid) - - def _get_next_fds(self, n=1): - fds = [os.dup(0) for i in range(n)] - for fd in fds: - os.close(fd) - return fds - - def has_shm_files(self, wait=True): - if not HAS_SHM_FILES: - return False - result = self._has_shm_files() - if result and wait: - time.sleep(0.5) - return self._has_shm_files() - return result - - def _has_shm_files(self): - gc.collect() - names = ['paddle_' + str(pid) for pid in self.checked_pids] - for filename in os.listdir('/dev/shm'): - for name in names: - if filename.startswith(name): - print("have", filename) - return True - return False - - -class TestMultiprocessingBase(unittest.TestCase): - def get_tensor(self, device="cpu"): - self.device = device.lower() - place = None - tensor = paddle.zeros([5, 5], dtype="float32") - return tensor - - def get_parameter(self): - w = paddle.nn.Layer().create_parameter( - [10, 10], - default_initializer=paddle.nn.initializer.Constant(value=0.0), - ) - return w - - def _test_empty(self, dtype="float32"): - q = mp.Queue() - empty = paddle.to_tensor([], dtype=dtype) - q.put(empty) - out = q.get(timeout=1) - self.assertEqual(str(out), str(empty)) - - def _test_sharing( - self, ctx=mp, device='cpu', dtype="float32", repeat=1, param=False - ): - def test_fill(): - if param: - x = self.get_parameter() - y = (x[:, 1]).detach() - else: - x = self.get_tensor() - y = x[:, 1] - - data = [x, y] - - queue = ctx.Queue() - event = ctx.Event() - queue.put(data) - - process = ctx.Process(target=fill_tensor, args=(queue, event)) - process.daemon = True - lc.check_pid(process.pid) - process.start() - - event.wait(30) - - self.assertTrue(event.is_set()) - self.assertTrue(data[0].equal(5).all()) - self.assertTrue(data[1].equal(5).all()) - - process.join(1 if device != get_device() else 10) - self.assertFalse(process.is_alive()) - - def test_receive(): - queue = ctx.Queue() - event = ctx.Event() - - process = ctx.Process( - target=send_parambase if param else send_tensor, - args=(queue, event, device, dtype), - ) - process.daemon = True - lc.check_pid(process.pid) - process.start() - - t1 = queue.get() - t2 = queue.get() - self.assertTrue(t1.equal(1).all()) - del t1, t2 - - event.set() - process.join(1 if device != get_device() else 10) - self.assertFalse(process.is_alive()) - - with leak_checker(self) as lc: - for _ in range(repeat): - test_fill() - test_receive() - - -@unittest.skipIf( - ( - not ( - paddle.is_compiled_with_cuda() - and not paddle.is_compiled_with_rocm() - ) - and not paddle.is_compiled_with_xpu() - ) - or platform.system().lower() == "windows", - "Require compiled with CUDA or XPU. Skip: ipc function on Windows is not supported.", -) -class TestMultiprocessingGpu(TestMultiprocessingBase): - def func_test_pass_tensor(self): - paddle.set_device(get_device()) - self._test_sharing(mp.get_context("spawn"), get_device()) - - def test_pass_tensor(self): - self.func_test_pass_tensor() - - def test_ipc_tensor(self): - paddle.device.set_device(get_device()) - initial_tensor = paddle.to_tensor([1, 2, 3]) - bonus = paddle.to_tensor([2]) - ipc_metas = initial_tensor.value().get_tensor()._share_device_ipc() - ctx = mp.get_context("spawn") - event = ctx.Event() - process = ctx.Process(target=check_ipc_tensor, args=(event, ipc_metas)) - process.daemon = True - process.start() - - # Step1: Check initial value of ipc tensor - event.wait(30) - self.assertTrue(event.is_set()) - - # Step2: Check ipc tensor after update - event.clear() - initial_tensor.add_(bonus) - event.wait(30) - self.assertTrue(event.is_set()) - - process.join(10) - self.assertFalse(process.is_alive()) - - def test_ipc_reduce(self): - tensor = paddle.arange(0, 64, dtype="float32").reshape([8, 8]) - dense = tensor.value().get_tensor() - rebuild, meta = reductions._reduce_lodtensor(dense) - ctx = mp.get_context("spawn") - event = ctx.Event() - process = ctx.Process( - target=check_ipc_reduce, args=(event, rebuild, meta) - ) - process.daemon = True - process.start() - event.wait(30) - self.assertTrue(event.is_set()) - process.join(10) - - def test_fusion_storage(self): - tensor_a = paddle.zeros([16], dtype="float32") - tensor_b = paddle.zeros([16], dtype="float32") - accumulators = {"momentum": {"param_a": tensor_a}} - master_weights = {"param_b": tensor_b} - storage = FusionStorage( - accumulators=accumulators, master_weights=master_weights - ) - self.assertIsNotNone(storage.buffer_ipc_meta) - - ctx = mp.get_context("spawn") - event = ctx.Event() - process = ctx.Process( - target=check_fusion_storage, args=(event, storage) - ) - process.daemon = True - process.start() - event.wait(30) - self.assertTrue(event.is_set()) - process.join(10) - - -if __name__ == "__main__": - unittest.main() diff --git a/test/xpu/test_xpu_share_memory.py b/test/xpu/test_xpu_share_memory.py index 8fa3f0cadfc340..7538cb8070d016 100644 --- a/test/xpu/test_xpu_share_memory.py +++ b/test/xpu/test_xpu_share_memory.py @@ -15,18 +15,17 @@ # limitations under the License. """ -A minimal unit test for testing XPU IPC sharing (_share_device_ipc and _new_from_ipc). +A minimal unit test for testing XPU IPC sharing (_share_xpu and _new_shared_xpu). This test uses the spawn start method to create two child processes before the parent creates an XPU tensor. The parent then creates an XPU -tensor, calls _share_device_ipc to get IPC metadata, and sends that metadata to +tensor, calls _share_xpu to get IPC metadata, and sends that metadata to the children via a multiprocessing.Queue. Each child sets its XPU device, -reconstructs the shared tensor using _new_from_ipc, and verifies that its +reconstructs the shared tensor using _new_shared_xpu, and verifies that its content matches the expected value. """ import multiprocessing -import time import unittest # IMPORTANT: Use the spawn method before any CUDA/XPU initialization. @@ -44,7 +43,7 @@ def child_reader(queue): Child process function: - Initializes the XPU device. - Reads the IPC metadata from the queue. - - Reconstructs the shared tensor via _new_from_ipc. + - Reconstructs the shared tensor via _new_shared_xpu. - Verifies that its content equals TEST_VALUE. """ try: @@ -64,9 +63,9 @@ def child_reader(queue): try: # Reconstruct the shared tensor. - # (Note: _new_from_ipc is a private API; adjust accordingly for your version.) + # (Note: _new_shared_xpu is a private API; adjust accordingly for your version.) shared_tensor = paddle.to_tensor( - paddle.base.core.DenseTensor._new_from_ipc(ipc_meta) + paddle.base.core.DenseTensor._new_shared_xpu(ipc_meta) ) # print( # "[Child] Reconstructed tensor on", @@ -113,8 +112,8 @@ def test_ipc_share_read(self): # tensor.cpu().numpy(), # ) - # Get the IPC metadata by calling _share_device_ipc on the tensor. - ipc_meta = tensor.value().get_tensor()._share_device_ipc() + # Get the IPC metadata by calling _share_xpu on the tensor. + ipc_meta = tensor.value().get_tensor()._share_xpu() # print("[Parent] IPC metadata:", ipc_meta) # Put the same metadata into the queue for each child. @@ -128,96 +127,5 @@ def test_ipc_share_read(self): self.assertFalse(p2.is_alive()) -def check_ipc_tensor(event, ipc_metas): - ground_truth1 = paddle.to_tensor([1, 2, 3]) - ground_truth2 = paddle.to_tensor([3, 4, 5]) - shared_ipc_tensor = paddle.to_tensor( - paddle.base.core.DenseTensor._new_from_ipc(ipc_metas) - ) - paddle.cuda.ipc_collect() - - def tensor_equal(t1, t2): - return (t1 == t2).all().item() - - # Step1: Check initial value of ipc tensor - while not tensor_equal(ground_truth1, shared_ipc_tensor): - time.sleep(0.1) - event.set() - - # Step2: Check ipc tensor after update - while not tensor_equal(ground_truth2, shared_ipc_tensor): - time.sleep(0.1) - event.set() - - -def check_ipc_reduce(event, rebuild, meta): - rebuilt_tensor = paddle.to_tensor(rebuild(*meta)) - event.set() - - -class TestMultiprocessingBase(unittest.TestCase): - def get_tensor(self, device="cpu"): - self.device = device.lower() - place = None - tensor = paddle.zeros([5, 5], dtype="float32") - return tensor - - def get_parameter(self): - w = paddle.nn.Layer().create_parameter( - [10, 10], - default_initializer=paddle.nn.initializer.Constant(value=0.0), - ) - return w - - def _test_empty(self, dtype="float32"): - q = mp.Queue() - empty = paddle.to_tensor([], dtype=dtype) - q.put(empty) - out = q.get(timeout=1) - self.assertEqual(str(out), str(empty)) - - -class TestMultiprocessingXpu(TestMultiprocessingBase): - def test_ipc_tensor(self): - initial_tensor = paddle.to_tensor([1, 2, 3]) - bonus = paddle.to_tensor([2]) - ipc_metas = initial_tensor.value().get_tensor()._share_device_ipc() - ctx = mp.get_context("spawn") - event = ctx.Event() - process = ctx.Process(target=check_ipc_tensor, args=(event, ipc_metas)) - process.daemon = True - process.start() - - # Step1: Check initial value of ipc tensor - event.wait(30) - self.assertTrue(event.is_set()) - - # Step2: Check ipc tensor after update - event.clear() - initial_tensor.add_(bonus) - event.wait(30) - self.assertTrue(event.is_set()) - - process.join(10) - self.assertFalse(process.is_alive()) - - def test_reductions_use_vmm(self): - from paddle.incubate.multiprocessing import reductions - - tensor = paddle.arange(0, 64, dtype="float32").reshape([8, 8]) - dense = tensor.value().get_tensor() - rebuild, meta = reductions._reduce_lodtensor(dense) - ctx = mp.get_context("spawn") - event = ctx.Event() - process = ctx.Process( - target=check_ipc_reduce, args=(event, rebuild, meta) - ) - process.daemon = True - process.start() - event.wait(30) - self.assertTrue(event.is_set()) - process.join(10) - - if __name__ == "__main__": unittest.main()