From 4df4a2b2ed677f9b886862b8efcf90e1a7e96da9 Mon Sep 17 00:00:00 2001 From: WangQing <2917021186@qq.com> Date: Fri, 28 Nov 2025 10:34:49 +0800 Subject: [PATCH 1/6] [Maca] fix memory sync --- .../pytorch/backends/dlinfer/maca/op_backend.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/lmdeploy/pytorch/backends/dlinfer/maca/op_backend.py b/lmdeploy/pytorch/backends/dlinfer/maca/op_backend.py index b454b995b4..0be4d85632 100644 --- a/lmdeploy/pytorch/backends/dlinfer/maca/op_backend.py +++ b/lmdeploy/pytorch/backends/dlinfer/maca/op_backend.py @@ -59,13 +59,17 @@ def get_total_slots(): is_unpaged_prefill = \ all((step_context.q_seqlens == step_context.kv_seqlens).tolist()) - q_start_loc = torch.cat((torch.tensor([0], device=device), step_context.q_seqlens.cumsum(0))).int() + q_start_loc = step_context.q_start_loc + cu_seqlens = torch.cat((q_start_loc, step_context.q_seqlens.sum().unsqueeze(0))).int() + q_seqlens = step_context.q_seqlens.int() kv_seqlens = step_context.kv_seqlens.int() - max_q_seq_len = torch.max(q_seqlens).item() - max_kv_seq_len = torch.max(kv_seqlens).item() if step_context.is_decoding: + # max_q_seq_len, max_kv_seq_len is not used in decoding stage + max_q_seq_len = -1 + max_kv_seq_len = -1 + # collect kv_start_indices without using a for-loop, # (fill kv-cache for just ONE token during the decoding phase) idx = (step_context.kv_seqlens - 1) % block_size @@ -73,6 +77,9 @@ def get_total_slots(): last_block = step_context.block_offsets.gather(1, b_num.view(-1, 1)).view(-1) kv_start_indices = (last_block * block_size + idx).reshape((-1, 1)) else: + max_q_seq_len = torch.max(q_seqlens).cpu().item() + max_kv_seq_len = torch.max(kv_seqlens).cpu().item() + for i in range(step_context.q_start_loc.size(0)): q_seq_len = int(step_context.q_seqlens[i]) kv_seq_len = int(step_context.kv_seqlens[i]) @@ -88,7 +95,7 @@ def get_total_slots(): attn_metadata = attn_meta_cls( step_context.is_decoding, step_context.block_offsets.int(), - q_start_loc=q_start_loc, + q_start_loc=cu_seqlens, q_seqlens=q_seqlens, kv_seqlens=kv_seqlens, kv_start_indices=kv_start_indices, From a3f88b603eb64e0795f95ae81bdf59b4d7fc60ab Mon Sep 17 00:00:00 2001 From: WangQing <2917021186@qq.com> Date: Fri, 28 Nov 2025 10:36:55 +0800 Subject: [PATCH 2/6] [Maca] fix ray blocking --- lmdeploy/pytorch/engine/executor/ray_executor.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/lmdeploy/pytorch/engine/executor/ray_executor.py b/lmdeploy/pytorch/engine/executor/ray_executor.py index c128f28587..8f4a653ca9 100644 --- a/lmdeploy/pytorch/engine/executor/ray_executor.py +++ b/lmdeploy/pytorch/engine/executor/ray_executor.py @@ -561,13 +561,15 @@ def _init_workers_ray(self, placement_group: PlacementGroup, worker_kwargs: dict def _init_distributed_environment_by_device(self, device_str: str): """Init distributed environment.""" driver_ip = _get_master_addr() - if device_str in ['cuda', 'maca']: + if device_str in ['cuda',]: self.workers = self._sort_workers(driver_ip, self.workers) elif device_str == 'ascend': self._init_ascend_distributed_environment(driver_ip) elif device_str == 'camb': self._init_camb_distributed_environment(driver_ip) + elif device_str == 'maca': + self._init_maca_distributed_environment(driver_ip) else: raise ValueError(f'Unsupported device type: {device_str}') @@ -594,6 +596,11 @@ def _init_camb_distributed_environment(self, driver_ip): self.workers = self._sort_workers(driver_ip, self.workers) ray.get([worker.set_device.remote(idx) for idx, worker in enumerate(self.workers)]) + def _init_maca_distributed_environment(self, driver_ip): + """Init maca distributed environment.""" + self.workers = self._sort_workers(driver_ip, self.workers) + ray.get([worker.set_device.remote(idx) for idx, worker in enumerate(self.workers)]) + """ PD Disaggregation API Begin """ def p2p_initialize(self, init_request: DistServeInitRequest): From 6e84e464290f18259025ec90be2ad5a0d88a4281 Mon Sep 17 00:00:00 2001 From: WangQing <2917021186@qq.com> Date: Fri, 28 Nov 2025 14:04:04 +0800 Subject: [PATCH 3/6] fix: resolve lint issues --- lmdeploy/pytorch/backends/dlinfer/maca/op_backend.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/lmdeploy/pytorch/backends/dlinfer/maca/op_backend.py b/lmdeploy/pytorch/backends/dlinfer/maca/op_backend.py index 0be4d85632..3be4ab6f24 100644 --- a/lmdeploy/pytorch/backends/dlinfer/maca/op_backend.py +++ b/lmdeploy/pytorch/backends/dlinfer/maca/op_backend.py @@ -52,7 +52,6 @@ def get_total_slots(): kv_start_indices, attention_mask = [], [] block_num, block_size, _, _ = step_context.kv_caches[0][1].shape - device = step_context.block_offsets.device is_unpaged_prefill = False if not step_context.is_decoding: @@ -61,7 +60,7 @@ def get_total_slots(): step_context.kv_seqlens).tolist()) q_start_loc = step_context.q_start_loc cu_seqlens = torch.cat((q_start_loc, step_context.q_seqlens.sum().unsqueeze(0))).int() - + q_seqlens = step_context.q_seqlens.int() kv_seqlens = step_context.kv_seqlens.int() @@ -69,7 +68,7 @@ def get_total_slots(): # max_q_seq_len, max_kv_seq_len is not used in decoding stage max_q_seq_len = -1 max_kv_seq_len = -1 - + # collect kv_start_indices without using a for-loop, # (fill kv-cache for just ONE token during the decoding phase) idx = (step_context.kv_seqlens - 1) % block_size @@ -79,7 +78,7 @@ def get_total_slots(): else: max_q_seq_len = torch.max(q_seqlens).cpu().item() max_kv_seq_len = torch.max(kv_seqlens).cpu().item() - + for i in range(step_context.q_start_loc.size(0)): q_seq_len = int(step_context.q_seqlens[i]) kv_seq_len = int(step_context.kv_seqlens[i]) From 971bcfb671c50f0ebcc761386c63050735878636 Mon Sep 17 00:00:00 2001 From: WangQing <2917021186@qq.com> Date: Fri, 28 Nov 2025 14:23:07 +0800 Subject: [PATCH 4/6] remove comma --- lmdeploy/pytorch/engine/executor/ray_executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lmdeploy/pytorch/engine/executor/ray_executor.py b/lmdeploy/pytorch/engine/executor/ray_executor.py index 8f4a653ca9..fb6fcc69d6 100644 --- a/lmdeploy/pytorch/engine/executor/ray_executor.py +++ b/lmdeploy/pytorch/engine/executor/ray_executor.py @@ -561,7 +561,7 @@ def _init_workers_ray(self, placement_group: PlacementGroup, worker_kwargs: dict def _init_distributed_environment_by_device(self, device_str: str): """Init distributed environment.""" driver_ip = _get_master_addr() - if device_str in ['cuda',]: + if device_str in ['cuda']: self.workers = self._sort_workers(driver_ip, self.workers) elif device_str == 'ascend': From 3e1d3da9c6cc6ad7015fc659fedd8159a6782049 Mon Sep 17 00:00:00 2001 From: WangQing <2917021186@qq.com> Date: Thu, 4 Dec 2025 16:41:41 +0800 Subject: [PATCH 5/6] fix code --- lmdeploy/pytorch/engine/executor/ray_executor.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lmdeploy/pytorch/engine/executor/ray_executor.py b/lmdeploy/pytorch/engine/executor/ray_executor.py index fb6fcc69d6..8c419a997b 100644 --- a/lmdeploy/pytorch/engine/executor/ray_executor.py +++ b/lmdeploy/pytorch/engine/executor/ray_executor.py @@ -561,7 +561,7 @@ def _init_workers_ray(self, placement_group: PlacementGroup, worker_kwargs: dict def _init_distributed_environment_by_device(self, device_str: str): """Init distributed environment.""" driver_ip = _get_master_addr() - if device_str in ['cuda']: + if device_str == 'cuda': self.workers = self._sort_workers(driver_ip, self.workers) elif device_str == 'ascend': From 208b06f2f7c0096fdfd865c8842108b145830a9b Mon Sep 17 00:00:00 2001 From: WangQing <2917021186@qq.com> Date: Fri, 5 Dec 2025 17:01:14 +0800 Subject: [PATCH 6/6] fix code --- lmdeploy/pytorch/engine/executor/ray_executor.py | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/lmdeploy/pytorch/engine/executor/ray_executor.py b/lmdeploy/pytorch/engine/executor/ray_executor.py index 8c419a997b..4ae018a46e 100644 --- a/lmdeploy/pytorch/engine/executor/ray_executor.py +++ b/lmdeploy/pytorch/engine/executor/ray_executor.py @@ -566,10 +566,9 @@ def _init_distributed_environment_by_device(self, device_str: str): elif device_str == 'ascend': self._init_ascend_distributed_environment(driver_ip) - elif device_str == 'camb': - self._init_camb_distributed_environment(driver_ip) - elif device_str == 'maca': - self._init_maca_distributed_environment(driver_ip) + elif device_str in ['camb', 'maca']: + self.workers = self._sort_workers(driver_ip, self.workers) + ray.get([worker.set_device.remote(idx) for idx, worker in enumerate(self.workers)]) else: raise ValueError(f'Unsupported device type: {device_str}') @@ -592,15 +591,6 @@ def _init_ascend_distributed_environment(self, driver_ip): else: self.workers = self._sort_workers(driver_ip, self.workers) - def _init_camb_distributed_environment(self, driver_ip): - self.workers = self._sort_workers(driver_ip, self.workers) - ray.get([worker.set_device.remote(idx) for idx, worker in enumerate(self.workers)]) - - def _init_maca_distributed_environment(self, driver_ip): - """Init maca distributed environment.""" - self.workers = self._sort_workers(driver_ip, self.workers) - ray.get([worker.set_device.remote(idx) for idx, worker in enumerate(self.workers)]) - """ PD Disaggregation API Begin """ def p2p_initialize(self, init_request: DistServeInitRequest):