@@ -103,29 +103,6 @@ def start(
103103
104104 llm_logger .info (f"start expert service { local_data_parallel_id } " )
105105
106- if self .cfg .scheduler_config .splitwise_role != "mixed" or self .cfg .cache_config .enable_prefix_caching :
107- if self .do_profile :
108- get_profile_block_num = np .zeros ([1 ], dtype = np .int32 )
109- while True :
110- try :
111- self .get_profile_block_num_signal = IPCSignal (
112- name = "get_profile_block_num" ,
113- array = get_profile_block_num ,
114- dtype = np .int32 ,
115- suffix = int (self .cfg .engine_worker_queue_port [0 ]),
116- create = False ,
117- )
118- break
119- except :
120- time .sleep (1 )
121- self .reset_kvcache_blocks ()
122- ipc_signal_suffix_cache = self .cfg .parallel_config .engine_worker_queue_port [local_data_parallel_id ]
123- self .cache_manager_processes = self .engine .start_cache_service (
124- self .cfg .local_device_ids , ipc_signal_suffix_cache
125- )
126- if self .cfg .scheduler_config .splitwise_role != "mixed" :
127- self .engine .split_mode_get_tasks ()
128-
129106 if self .cfg .scheduler_config .name == "splitwise" :
130107 self .cfg .init_cache_info ()
131108 role = self .cfg .scheduler_config .splitwise_role
@@ -155,6 +132,29 @@ def start(
155132 )
156133 self .launched_expert_service_signal .value [local_rank ] = 1
157134
135+ if self .cfg .scheduler_config .splitwise_role != "mixed" or self .cfg .cache_config .enable_prefix_caching :
136+ if self .do_profile :
137+ get_profile_block_num = np .zeros ([1 ], dtype = np .int32 )
138+ while True :
139+ try :
140+ self .get_profile_block_num_signal = IPCSignal (
141+ name = "get_profile_block_num" ,
142+ array = get_profile_block_num ,
143+ dtype = np .int32 ,
144+ suffix = int (self .cfg .parallel_config .engine_worker_queue_port [0 ]),
145+ create = False ,
146+ )
147+ break
148+ except :
149+ time .sleep (1 )
150+ self .reset_kvcache_blocks ()
151+ ipc_signal_suffix_cache = self .cfg .parallel_config .engine_worker_queue_port [local_data_parallel_id ]
152+ self .cache_manager_processes = self .engine .start_cache_service (
153+ self .cfg .local_device_ids ,
154+ ipc_signal_suffix_cache ,
155+ create_cache_tensor = (self .cfg .scheduler_config .splitwise_role != "mixed" ),
156+ )
157+
158158 console_logger .info (
159159 f"Worker processes(rank { local_rank } ) are launched with { time .time () - start_time } seconds."
160160 )
0 commit comments