From 59c6de63a6d9901627dd7cce5208551928f57e56 Mon Sep 17 00:00:00 2001 From: Jiang-Jia-Jun Date: Tue, 18 Nov 2025 13:40:54 +0800 Subject: [PATCH 1/3] [Optimize] Reduce comm overhead of engine-worker by obtaining requests asynchronously --- fastdeploy/envs.py | 1 + fastdeploy/worker/worker_process.py | 30 +++++++++++++++++++++++++---- 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/fastdeploy/envs.py b/fastdeploy/envs.py index 9d6b597b166..5ebacf0b402 100644 --- a/fastdeploy/envs.py +++ b/fastdeploy/envs.py @@ -160,6 +160,7 @@ "FD_ENABLE_E2W_TENSOR_CONVERT": lambda: int(os.getenv("FD_ENABLE_E2W_TENSOR_CONVERT", "0")), "FD_ENGINE_TASK_QUEUE_WITH_SHM": lambda: int(os.getenv("FD_ENGINE_TASK_QUEUE_WITH_SHM", "0")), "FD_ENABLE_PDL": lambda: int(os.getenv("FD_ENABLE_PDL", "1")), + "DISABLE_ENGINE_WORKER_ASYNC_TASK_COMM": lambda: int(os.getenv("DISABLE_ENGINE_WORKER_ASYNC_TASK_COMM", 0)) == 1, } diff --git a/fastdeploy/worker/worker_process.py b/fastdeploy/worker/worker_process.py index 5016c176e93..d2372f15f2b 100644 --- a/fastdeploy/worker/worker_process.py +++ b/fastdeploy/worker/worker_process.py @@ -159,6 +159,27 @@ def __init__(self, fd_config: FDConfig, ranks: int = 1, local_rank: int = 0) -> self.max_chips_per_node = 16 if current_platform.is_iluvatar() else 8 + def _exist_tasks_from_engine(self): + """ + Check if there exists new tasks sent from engine process + """ + if envs.DISABLE_ENGINE_WORKER_ASYNC_TASK_COMM: + return self.task_queue.num_tasks() > 0 + else: + return self.local_synced_tasks is not None + + def _get_tasks_from_engine(self): + """ + Get new tasks that sent from engine process + """ + if envs.DISABLE_ENGINE_WORKER_ASYNC_TASK_COMM: + return self.task_queue.get_tasks() + else: + new_tasks, read_finished = self.local_synced_tasks, self.all_local_tp_synced + self.local_synced_tasks = None + self.all_local_tp_synced = False + return new_tasks, read_finished + def init_health_status(self) -> None: """ Initialize the health status of the worker. @@ -406,7 +427,7 @@ def event_loop_normal(self) -> None: # The first worker detects whether there are tasks in the task queue if tp_rank == 0: - if self.task_queue.num_tasks() > 0: + if self._exist_tasks_from_engine(): if envs.ENABLE_V1_KVCACHE_SCHEDULER or not ( self.fd_config.model_config.enable_mm and self.worker.exist_prefill() ): @@ -439,7 +460,7 @@ def event_loop_normal(self) -> None: self.worker.model_runner, self.parallel_config.engine_worker_queue_port, ) - logger.info(f"current task queue data: {self.task_queue.num_tasks()}") + logger.info(f"current task queue data: {self.local_synced_tasks}") self.task_queue.clear_data() self.model_weights_signal[0] = ModelWeightsStatus.NORMAL logger.info(f"Rank: {self.local_rank} has updated or cleared parameters.") @@ -448,11 +469,12 @@ def event_loop_normal(self) -> None: logger.info(f"Rank: {self.local_rank} Detected new requests.") self.insert_step = True - tasks, read_finish = self.task_queue.get_tasks() + tasks, read_finish = self.task_queue._get_tasks_from_engine() if read_finish: # Ensure that every worker get the task self.exist_task_signal.value[0] = ExistTaskStatus.EMPTY - self.task_queue.read_finish_flag.set(0) + if self.nnode > 1 and tp_size > self.max_chips_per_node: + self.task_queue.read_finish_flag.set(0) req_dicts = [] for req_dict, bsz in tasks: From e7f11b0051f069224e3e0eda86657ce08d77238d Mon Sep 17 00:00:00 2001 From: Jiang-Jia-Jun Date: Tue, 18 Nov 2025 13:53:35 +0800 Subject: [PATCH 2/3] [Optimize] Reduce comm overhead of engine-worker by obtaining requests asynchronously --- fastdeploy/worker/worker_process.py | 41 +++++++++++++++++++++++------ 1 file changed, 33 insertions(+), 8 deletions(-) diff --git a/fastdeploy/worker/worker_process.py b/fastdeploy/worker/worker_process.py index d2372f15f2b..883add58ffb 100644 --- a/fastdeploy/worker/worker_process.py +++ b/fastdeploy/worker/worker_process.py @@ -17,6 +17,7 @@ import argparse import json import os +import threading import time from multiprocessing import shared_memory from typing import Tuple @@ -159,26 +160,48 @@ def __init__(self, fd_config: FDConfig, ranks: int = 1, local_rank: int = 0) -> self.max_chips_per_node = 16 if current_platform.is_iluvatar() else 8 - def _exist_tasks_from_engine(self): + # synced requests from engine + self.local_synced_requests = None + # flag to determin if all tp process synced + self.all_local_tp_synced = False + + def _exist_requests_from_engine(self): """ - Check if there exists new tasks sent from engine process + Check if there exists new requests sent from engine process """ if envs.DISABLE_ENGINE_WORKER_ASYNC_TASK_COMM: return self.task_queue.num_tasks() > 0 else: - return self.local_synced_tasks is not None + return self.local_synced_requests is not None - def _get_tasks_from_engine(self): + def _get_requests_from_engine(self): """ - Get new tasks that sent from engine process + Get new requests that sent from engine process """ if envs.DISABLE_ENGINE_WORKER_ASYNC_TASK_COMM: return self.task_queue.get_tasks() else: - new_tasks, read_finished = self.local_synced_tasks, self.all_local_tp_synced - self.local_synced_tasks = None + new_requests, read_finished = self.local_synced_requests, self.all_local_tp_synced + self.local_synced_requests = None self.all_local_tp_synced = False - return new_tasks, read_finished + return new_requests, read_finished + + def _sync_requests_from_engine_loop(self): + """ + A thread that keeps sync all the new requests from engine process to worker process. + This function must be called in `event_loop_normal` since the `task_queue` is available + in that function. + """ + try: + while True: + if self.local_synced_requests is None and self.task_queue.num_tasks() > 0: + self.local_synced_requests, self.all_local_tp_synced = self.task_queue.get_tasks() + except Exception as e: + logger.error( + "There's unexcepted issue happend to get tasks from engine, this will cause the worker process cannot insert any new request! error={}".format( + e + ) + ) def init_health_status(self) -> None: """ @@ -304,6 +327,8 @@ def event_loop_normal(self) -> None: """Main event loop for Paddle Distributed Workers. TODO(gongshaotian): support remote calling of functions that control worker. """ + sync_request_thread = threading.Thread(target=self._sync_requests_from_engine_loop, daemon=True) + sync_request_thread.start() if self.eplb_config.enable_redundant_experts: self.last_dump_expert_workload_ts = 0 self.experts_manager = RedundantExpertManager( From ef057a86f335b0380b2cdf4b78f0cef4f0b9d79a Mon Sep 17 00:00:00 2001 From: Jiang-Jia-Jun <163579578+Jiang-Jia-Jun@users.noreply.github.com> Date: Tue, 18 Nov 2025 13:54:01 +0800 Subject: [PATCH 3/3] Update fastdeploy/envs.py Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- fastdeploy/envs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fastdeploy/envs.py b/fastdeploy/envs.py index 5ebacf0b402..0437c003fb4 100644 --- a/fastdeploy/envs.py +++ b/fastdeploy/envs.py @@ -160,7 +160,7 @@ "FD_ENABLE_E2W_TENSOR_CONVERT": lambda: int(os.getenv("FD_ENABLE_E2W_TENSOR_CONVERT", "0")), "FD_ENGINE_TASK_QUEUE_WITH_SHM": lambda: int(os.getenv("FD_ENGINE_TASK_QUEUE_WITH_SHM", "0")), "FD_ENABLE_PDL": lambda: int(os.getenv("FD_ENABLE_PDL", "1")), - "DISABLE_ENGINE_WORKER_ASYNC_TASK_COMM": lambda: int(os.getenv("DISABLE_ENGINE_WORKER_ASYNC_TASK_COMM", 0)) == 1, + "DISABLE_ENGINE_WORKER_ASYNC_TASK_COMM": lambda: int(os.getenv("DISABLE_ENGINE_WORKER_ASYNC_TASK_COMM", "0")) == 1, }