Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions fastdeploy/envs.py
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@
"FD_ENABLE_E2W_TENSOR_CONVERT": lambda: int(os.getenv("FD_ENABLE_E2W_TENSOR_CONVERT", "0")),
"FD_ENGINE_TASK_QUEUE_WITH_SHM": lambda: int(os.getenv("FD_ENGINE_TASK_QUEUE_WITH_SHM", "0")),
"FD_ENABLE_PDL": lambda: int(os.getenv("FD_ENABLE_PDL", "1")),
"DISABLE_ENGINE_WORKER_ASYNC_TASK_COMM": lambda: int(os.getenv("DISABLE_ENGINE_WORKER_ASYNC_TASK_COMM", 0)) == 1,
}


Expand Down
30 changes: 26 additions & 4 deletions fastdeploy/worker/worker_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,27 @@ def __init__(self, fd_config: FDConfig, ranks: int = 1, local_rank: int = 0) ->

self.max_chips_per_node = 16 if current_platform.is_iluvatar() else 8

def _exist_tasks_from_engine(self):
"""
Check if there exists new tasks sent from engine process
"""
if envs.DISABLE_ENGINE_WORKER_ASYNC_TASK_COMM:
return self.task_queue.num_tasks() > 0
else:
return self.local_synced_tasks is not None

def _get_tasks_from_engine(self):
"""
Get new tasks that sent from engine process
"""
if envs.DISABLE_ENGINE_WORKER_ASYNC_TASK_COMM:
return self.task_queue.get_tasks()
else:
new_tasks, read_finished = self.local_synced_tasks, self.all_local_tp_synced
self.local_synced_tasks = None
self.all_local_tp_synced = False
return new_tasks, read_finished
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The instance variables self.local_synced_tasks and self.all_local_tp_synced are referenced in this method but are never initialized in the __init__ method or elsewhere in the class. When DISABLE_ENGINE_WORKER_ASYNC_TASK_COMM is False, this will cause an AttributeError at runtime.

These variables should be initialized in the __init__ method, for example:

self.local_synced_tasks = None
self.all_local_tp_synced = False

Copilot uses AI. Check for mistakes.

def init_health_status(self) -> None:
"""
Initialize the health status of the worker.
Expand Down Expand Up @@ -406,7 +427,7 @@ def event_loop_normal(self) -> None:

# The first worker detects whether there are tasks in the task queue
if tp_rank == 0:
if self.task_queue.num_tasks() > 0:
if self._exist_tasks_from_engine():
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method name inconsistency: The method is defined as _exist_requests_from_engine (line 168), but it's called as _exist_tasks_from_engine() here. This will cause an AttributeError at runtime.

Suggested change
if self._exist_tasks_from_engine():
if self._exist_requests_from_engine():

Copilot uses AI. Check for mistakes.
if envs.ENABLE_V1_KVCACHE_SCHEDULER or not (
self.fd_config.model_config.enable_mm and self.worker.exist_prefill()
):
Expand Down Expand Up @@ -439,7 +460,7 @@ def event_loop_normal(self) -> None:
self.worker.model_runner,
self.parallel_config.engine_worker_queue_port,
)
logger.info(f"current task queue data: {self.task_queue.num_tasks()}")
logger.info(f"current task queue data: {self.local_synced_tasks}")
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Logging inconsistency: When async mode is enabled, this log message will output the raw object self.local_synced_tasks which could be None or a tuple/list. This differs from the original behavior which logged the number of tasks. Consider:

if envs.DISABLE_ENGINE_WORKER_ASYNC_TASK_COMM:
    logger.info(f"current task queue data: {self.task_queue.num_tasks()}")
else:
    logger.info(f"current task queue data: {self.local_synced_tasks}")

Or provide more descriptive logging:

task_info = self.task_queue.num_tasks() if envs.DISABLE_ENGINE_WORKER_ASYNC_TASK_COMM else len(self.local_synced_tasks) if self.local_synced_tasks else 0
logger.info(f"current task queue data: {task_info}")
Suggested change
logger.info(f"current task queue data: {self.local_synced_tasks}")
logger.info(f"current task queue data: {len(self.local_synced_tasks) if self.local_synced_tasks else 0}")

Copilot uses AI. Check for mistakes.
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variable name inconsistency: The variable is defined as self.local_synced_requests (line 164), but it's referenced as self.local_synced_tasks here. This will cause an AttributeError at runtime.

Suggested change
logger.info(f"current task queue data: {self.local_synced_tasks}")
logger.info(f"current task queue data: {self.local_synced_requests}")

Copilot uses AI. Check for mistakes.
self.task_queue.clear_data()
self.model_weights_signal[0] = ModelWeightsStatus.NORMAL
logger.info(f"Rank: {self.local_rank} has updated or cleared parameters.")
Expand All @@ -448,11 +469,12 @@ def event_loop_normal(self) -> None:
logger.info(f"Rank: {self.local_rank} Detected new requests.")
self.insert_step = True

tasks, read_finish = self.task_queue.get_tasks()
tasks, read_finish = self.task_queue._get_tasks_from_engine()
Copy link

Copilot AI Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Method name inconsistency: The method is defined as _get_requests_from_engine (line 177), but it's called as self.task_queue._get_tasks_from_engine() here. This will cause an AttributeError at runtime. Should be self._get_requests_from_engine() instead.

Suggested change
tasks, read_finish = self.task_queue._get_tasks_from_engine()
tasks, read_finish = self._get_requests_from_engine()

Copilot uses AI. Check for mistakes.
if read_finish:
# Ensure that every worker get the task
self.exist_task_signal.value[0] = ExistTaskStatus.EMPTY
self.task_queue.read_finish_flag.set(0)
if self.nnode > 1 and tp_size > self.max_chips_per_node:
self.task_queue.read_finish_flag.set(0)

req_dicts = []
for req_dict, bsz in tasks:
Expand Down
Loading