@@ -27,7 +27,6 @@ use databend_common_base::runtime::ParentMemStat;
2727use databend_common_base:: runtime:: ThreadTracker ;
2828use databend_common_base:: runtime:: GLOBAL_MEM_STAT ;
2929use databend_common_config:: GlobalConfig ;
30- use databend_common_exception:: ErrorCode ;
3130use databend_common_expression:: DataSchemaRef ;
3231use databend_common_management:: WorkloadGroupResourceManager ;
3332use databend_common_metrics:: http:: metrics_incr_http_response_errors_count;
@@ -56,11 +55,11 @@ use serde::Deserialize;
5655use serde:: Serialize ;
5756use uuid:: Uuid ;
5857
58+ use super :: query:: CloseReason ;
5959use super :: query:: ExecuteStateKind ;
6060use super :: query:: HttpQuery ;
6161use super :: query:: HttpQueryRequest ;
6262use super :: query:: HttpQueryResponseInternal ;
63- use super :: query:: StopReason ;
6463use crate :: clusters:: ClusterDiscovery ;
6564use crate :: servers:: http:: error:: HttpErrorCode ;
6665use crate :: servers:: http:: error:: QueryError ;
@@ -166,10 +165,10 @@ pub struct QueryResponse {
166165}
167166
168167impl QueryResponse {
169- pub ( crate ) fn removed ( query_id : & str , remove_reason : StopReason ) -> impl IntoResponse {
168+ pub ( crate ) fn closed ( query_id : & str , close_reason : CloseReason ) -> impl IntoResponse {
170169 let id = query_id. to_string ( ) ;
171- let state = match remove_reason {
172- StopReason :: Finished => ExecuteStateKind :: Succeeded ,
170+ let state = match close_reason {
171+ CloseReason :: Finalized => ExecuteStateKind :: Succeeded ,
173172 _ => ExecuteStateKind :: Failed ,
174173 } ;
175174 Json ( QueryResponse {
@@ -198,7 +197,7 @@ impl QueryResponse {
198197 id : String ,
199198 r : HttpQueryResponseInternal ,
200199 is_final : bool ,
201- ) -> ( impl IntoResponse , bool ) {
200+ ) -> impl IntoResponse {
202201 let state = r. state . clone ( ) ;
203202 let ( data, next_uri) = if is_final {
204203 ( Arc :: new ( BlocksSerializer :: empty ( ) ) , None )
@@ -248,12 +247,7 @@ impl QueryResponse {
248247 } ;
249248 let rows = data. num_rows ( ) ;
250249
251- let next_is_final = next_uri
252- . as_ref ( )
253- . map ( |u| u. ends_with ( "final" ) )
254- . unwrap_or ( false ) ;
255-
256- let resp = Json ( QueryResponse {
250+ Json ( QueryResponse {
257251 data,
258252 state : state. state ,
259253 schema : state. schema . clone ( ) ,
@@ -274,8 +268,7 @@ impl QueryResponse {
274268 } )
275269 . with_header ( HEADER_QUERY_ID , id. clone ( ) )
276270 . with_header ( HEADER_QUERY_STATE , state. state . to_string ( ) )
277- . with_header ( HEADER_QUERY_PAGE_ROWS , rows) ;
278- ( resp, next_is_final)
271+ . with_header ( HEADER_QUERY_PAGE_ROWS , rows)
279272 }
280273}
281274
@@ -305,11 +298,11 @@ async fn query_final_handler(
305298 ) ;
306299 let http_query_manager = HttpQueryManager :: instance ( ) ;
307300 match http_query_manager
308- . stop_query (
301+ . close_query (
309302 & query_id,
303+ CloseReason :: Finalized ,
310304 & ctx. client_session_id ,
311- StopReason :: Finished ,
312- ErrorCode :: ClosedQuery ( "Query closed by client" ) ,
305+ true ,
313306 )
314307 . await ?
315308 {
@@ -320,7 +313,7 @@ async fn query_final_handler(
320313 // it is safe to set these 2 fields to None, because client now check for null/None first.
321314 response. session = None ;
322315 response. state . affect = None ;
323- Ok ( QueryResponse :: from_internal ( query_id, response, true ) . 0 )
316+ Ok ( QueryResponse :: from_internal ( query_id, response, true ) )
324317 }
325318 None => Err ( query_id_not_found ( & query_id, & ctx. node_id ) ) ,
326319 }
@@ -346,11 +339,11 @@ async fn query_cancel_handler(
346339 ) ;
347340 let http_query_manager = HttpQueryManager :: instance ( ) ;
348341 match http_query_manager
349- . stop_query (
342+ . close_query (
350343 & query_id,
344+ CloseReason :: Canceled ,
351345 & ctx. client_session_id ,
352- StopReason :: Canceled ,
353- ErrorCode :: AbortedQuery ( "canceled by client" ) ,
346+ true ,
354347 )
355348 . await ?
356349 {
@@ -374,15 +367,13 @@ async fn query_state_handler(
374367 let http_query_manager = HttpQueryManager :: instance ( ) ;
375368 match http_query_manager. get_query ( & query_id) {
376369 Some ( query) => {
377- if let Some ( reason) = query. check_removed ( ) {
378- Ok ( QueryResponse :: removed ( & query_id, reason) . into_response ( ) )
370+ if let Some ( reason) = query. check_closed ( ) {
371+ Ok ( QueryResponse :: closed ( & query_id, reason . reason ) . into_response ( ) )
379372 } else {
380373 let response = query
381374 . get_response_state_only ( )
382375 . map_err ( HttpErrorCode :: server_error) ?;
383- Ok ( QueryResponse :: from_internal ( query_id, response, false )
384- . 0
385- . into_response ( ) )
376+ Ok ( QueryResponse :: from_internal ( query_id, response, false ) . into_response ( ) )
386377 }
387378 }
388379 None => Err ( query_id_not_found ( & query_id, & ctx. node_id ) ) ,
@@ -423,18 +414,16 @@ async fn query_page_handler(
423414 }
424415
425416 query. check_client_session_id ( & ctx. client_session_id ) ?;
426- if let Some ( reason) = query. check_removed ( ) {
427- log:: info!(
428- "[HTTP-QUERY] /query/{}/page/{} - query is removed (reason: {})" ,
429- query_id,
430- page_no,
431- reason
417+ if let Some ( st) = query. check_closed ( ) {
418+ info ! (
419+ "[HTTP-QUERY] /query/{}/page/{} - query is close (reason: {:?})" ,
420+ query_id, page_no, st
432421 ) ;
433- Err ( query_id_removed ( & query_id, reason) )
422+ Err ( query_id_closed ( & query_id, st . reason ) )
434423 } else {
435- query. update_expire_time ( true ) . await ;
424+ query. update_expire_time ( true , false ) . await ;
436425 let resp = query. get_response_page ( page_no) . await . map_err ( |err| {
437- log :: info!(
426+ info ! (
438427 "[HTTP-QUERY] /query/{}/page/{} - get response page error (reason: {})" ,
439428 query_id,
440429 page_no,
@@ -445,12 +434,10 @@ async fn query_page_handler(
445434 StatusCode :: NOT_FOUND ,
446435 )
447436 } ) ?;
448- query. update_expire_time ( false ) . await ;
449- let ( resp, next_is_final) = QueryResponse :: from_internal ( query_id, resp, false ) ;
450- if next_is_final {
451- query. set_data_drained ( )
452- }
453- Ok ( resp)
437+ query
438+ . update_expire_time ( false , resp. is_data_drained ( ) )
439+ . await ;
440+ Ok ( QueryResponse :: from_internal ( query_id, resp, false ) )
454441 }
455442 }
456443 } ;
@@ -519,7 +506,6 @@ pub(crate) async fn query_handler(
519506 let http_query_manager = HttpQueryManager :: instance ( ) ;
520507 let query = http_query_manager. add_query ( query) . await ;
521508
522- // tmp workaround to tolerant old clients
523509 let resp = query
524510 . get_response_page ( 0 )
525511 . await
@@ -542,13 +528,10 @@ pub(crate) async fn query_handler(
542528 info ! ( "[HTTP-QUERY] Initial response for query_id={}, state={:?}, rows={}, next_page={:?}, sql='{}'" ,
543529 & query. id, & resp. state, rows, next_page, mask_connection_info( & sql)
544530 ) ;
545- query. update_expire_time ( false ) . await ;
546- let ( resp, next_is_final) =
547- QueryResponse :: from_internal ( query. id . to_string ( ) , resp, false ) ;
548- if next_is_final {
549- query. set_data_drained ( )
550- }
551- Ok ( resp. into_response ( ) )
531+ query
532+ . update_expire_time ( false , resp. is_data_drained ( ) )
533+ . await ;
534+ Ok ( QueryResponse :: from_internal ( query. id . to_string ( ) , resp, false ) . into_response ( ) )
552535 }
553536 }
554537 } ;
@@ -838,9 +821,12 @@ pub fn query_route() -> Route {
838821 route
839822}
840823
841- fn query_id_removed ( query_id : & str , remove_reason : StopReason ) -> PoemError {
824+ fn query_id_closed ( query_id : & str , closed_reason : CloseReason ) -> PoemError {
842825 PoemError :: from_string (
843- format ! ( "[HTTP-QUERY] Query ID {query_id} {}" , remove_reason) ,
826+ format ! (
827+ "[HTTP-QUERY] Query {query_id} is closed for {}" ,
828+ closed_reason
829+ ) ,
844830 StatusCode :: BAD_REQUEST ,
845831 )
846832}
0 commit comments