5151from fastdeploy .splitwise .splitwise_connector import SplitwiseConnector
5252from fastdeploy .trace .constants import LoggingEventName
5353from fastdeploy .trace .trace_logger import print as trace_print
54- from fastdeploy .utils import (
55- EngineError ,
56- check_download_links ,
57- envs ,
58- get_logger ,
59- init_bos_client ,
60- llm_logger ,
61- )
54+ from fastdeploy .utils import EngineError , envs , get_logger , llm_logger
6255
6356try :
6457 TokenProcessor = load_token_processor_plugins ()
@@ -808,7 +801,7 @@ def _fetch_request():
808801 else :
809802 raise
810803 # 2. Schedule requests
811- tasks = self .resource_manager .schedule ()
804+ tasks , error_tasks = self .resource_manager .schedule ()
812805
813806 # 3. Send to engine
814807 if tasks :
@@ -833,7 +826,16 @@ def _fetch_request():
833826 trace_print (LoggingEventName .REQUEST_SCHEDULE_END , task .request_id , getattr (task , "user" , "" ))
834827 trace_print (LoggingEventName .INFERENCE_START , task .request_id , getattr (task , "user" , "" ))
835828 self .engine_worker_queue .put_tasks ((tasks , self .resource_manager .real_bsz ))
836- else :
829+
830+ # 4. Response error tasks
831+ if error_tasks :
832+ for request_id , failed in error_tasks :
833+ if failed is None :
834+ llm_logger .warning (f"Request { request_id } has no error, skip sending error response." )
835+ continue
836+ self ._send_error_response (request_id , failed )
837+
838+ if not tasks and not error_tasks :
837839 time .sleep (0.005 )
838840
839841 except RuntimeError as e :
@@ -909,24 +911,6 @@ def _insert_zmq_task_to_scheduler(self):
909911 self .llm_logger .error (f"Receive request error: { err_msg } " )
910912 results .append ((request .request_id , err_msg ))
911913
912- if self ._has_features_info (request ) and err_msg is None :
913- if self .bos_client is None :
914- self .bos_client = init_bos_client ()
915-
916- download_urls = []
917- inputs = request .multimodal_inputs
918- if inputs .get ("video_feature_urls" ) is not None :
919- download_urls .extend (inputs .get ("video_feature_urls" ))
920- if inputs .get ("image_feature_urls" ) is not None :
921- download_urls .extend (inputs .get ("image_feature_urls" ))
922- if inputs .get ("audio_feature_urls" ) is not None :
923- download_urls .extend (inputs .get ("audio_feature_urls" ))
924-
925- err_msg = check_download_links (self .bos_client , download_urls )
926- if err_msg :
927- llm_logger .error (f"Receive request { request .request_id } download error: { err_msg } " )
928- results .append ((request .request_id , err_msg ))
929-
930914 if err_msg is None :
931915 insert_task .append (request )
932916
@@ -948,21 +932,27 @@ def _insert_zmq_task_to_scheduler(self):
948932 main_process_metrics .num_requests_waiting .inc (1 )
949933 continue
950934
951- error_result = RequestOutput (
952- request_id = request_id ,
953- finished = True ,
954- error_code = 500 ,
955- error_msg = failed ,
956- )
957- # Since the request is not in scheduler
958- # Send result by zmq directly
959- self .send_response_server .send_response (request_id , [error_result ])
935+ self ._send_error_response (request_id , failed )
960936 except Exception as e :
961937 self .llm_logger .error (
962938 f"Error happened while receiving new request from zmq, details={ e } , "
963939 f"traceback={ traceback .format_exc ()} "
964940 )
965941
942+ def _send_error_response (self , request_id , error_msg , error_code : int = 500 ):
943+ llm_logger .error (
944+ f"Send error response to client, request_id: { request_id } , error_msg: { error_msg } , error_code: { error_code } "
945+ )
946+ error_result = RequestOutput (
947+ request_id = request_id ,
948+ finished = True ,
949+ error_code = error_code ,
950+ error_msg = error_msg ,
951+ )
952+ # Since the request is not in scheduler
953+ # Send result by zmq directly
954+ self .send_response_server .send_response (request_id , [error_result ])
955+
966956 def _decode_token (self , token_ids , req_id , is_end ):
967957 delta_text = ""
968958 if envs .FD_ENABLE_RETURN_TEXT :
@@ -977,19 +967,6 @@ def _decode_token(self, token_ids, req_id, is_end):
977967 del self .data_processor .decode_status [req_id ]
978968 return delta_text , token_ids
979969
980- def _has_features_info (self , task ):
981- inputs = task .multimodal_inputs
982- if inputs is None or len (inputs ) == 0 :
983- return False
984-
985- if (
986- (inputs .get ("video_feature_urls" ) is not None and len (inputs ["video_feature_urls" ]) > 0 )
987- or (inputs .get ("image_feature_urls" ) is not None and len (inputs ["image_feature_urls" ]) > 0 )
988- or (inputs .get ("audio_feature_urls" ) is not None and len (inputs ["audio_feature_urls" ]) > 0 )
989- ):
990- return True
991- return False
992-
993970 def _zmq_send_generated_tokens (self ):
994971 """
995972 Recieve output for zmq
0 commit comments