@@ -40,16 +40,25 @@ impl Response {
4040 pub ( crate ) fn new ( response : HyperResponseFuture , compression : Compression ) -> Self {
4141 Self :: Waiting ( Box :: pin ( async move {
4242 let response = response. await ?;
43+
4344 let status = response. status ( ) ;
44- let body = response. into_body ( ) ;
45+ let exception_code = response. headers ( ) . get ( "X-ClickHouse-Exception-Code" ) ;
4546
46- if status == StatusCode :: OK {
47+ if status == StatusCode :: OK && exception_code . is_none ( ) {
4748 // More likely to be successful, start streaming.
4849 // It still can fail, but we'll handle it in `DetectDbException`.
49- Ok ( Chunks :: new ( body , compression) )
50+ Ok ( Chunks :: new ( response . into_body ( ) , compression) )
5051 } else {
5152 // An instantly failed request.
52- Err ( collect_bad_response ( status, body, compression) . await )
53+ Err ( collect_bad_response (
54+ status,
55+ exception_code
56+ . and_then ( |value| value. to_str ( ) . ok ( ) )
57+ . map ( |code| format ! ( "Code: {code}" ) ) ,
58+ response. into_body ( ) ,
59+ compression,
60+ )
61+ . await )
5362 }
5463 } ) )
5564 }
@@ -78,6 +87,7 @@ impl Response {
7887#[ inline( never) ]
7988async fn collect_bad_response (
8089 status : StatusCode ,
90+ exception_code : Option < String > ,
8191 body : Incoming ,
8292 compression : Compression ,
8393) -> Error {
@@ -90,8 +100,11 @@ async fn collect_bad_response(
90100 let raw_bytes = match body. collect ( ) . await {
91101 Ok ( collected) => collected. to_bytes ( ) ,
92102 // If we can't collect the body, return standardised reason for the status code.
93- Err ( _) => return Error :: BadResponse ( stringify_status ( status) ) ,
103+ Err ( _) => return Error :: BadResponse ( reason ( status, exception_code ) ) ,
94104 } ;
105+ if raw_bytes. is_empty ( ) {
106+ return Error :: BadResponse ( reason ( status, exception_code) ) ;
107+ }
95108
96109 // Try to decompress the body, because CH uses compression even for errors.
97110 let stream = stream:: once ( future:: ready ( Result :: < _ > :: Ok ( raw_bytes. slice ( ..) ) ) ) ;
@@ -106,7 +119,7 @@ async fn collect_bad_response(
106119 let reason = String :: from_utf8 ( bytes. into ( ) )
107120 . map ( |reason| reason. trim ( ) . into ( ) )
108121 // If we have a unreadable response, return standardised reason for the status code.
109- . unwrap_or_else ( |_| stringify_status ( status) ) ;
122+ . unwrap_or_else ( |_| reason ( status, exception_code ) ) ;
110123
111124 Error :: BadResponse ( reason)
112125}
@@ -124,12 +137,14 @@ async fn collect_bytes(stream: impl Stream<Item = Result<Bytes>>) -> Result<Byte
124137 Ok ( bytes. into ( ) )
125138}
126139
127- fn stringify_status ( status : StatusCode ) -> String {
128- format ! (
129- "{} {}" ,
130- status. as_str( ) ,
131- status. canonical_reason( ) . unwrap_or( "<unknown>" ) ,
132- )
140+ fn reason ( status : StatusCode , exception_code : Option < String > ) -> String {
141+ exception_code. unwrap_or_else ( || {
142+ format ! (
143+ "{} {}" ,
144+ status. as_str( ) ,
145+ status. canonical_reason( ) . unwrap_or( "<unknown>" ) ,
146+ )
147+ } )
133148}
134149
135150// === Chunks ===
0 commit comments