11use std:: error:: Error as StdError ;
22use std:: marker:: Unpin ;
33
4+ use pin_project:: { pin_project, project} ;
45use h2:: Reason ;
56use h2:: server:: { Builder , Connection , Handshake , SendResponse } ;
67use tokio_io:: { AsyncRead , AsyncWrite } ;
@@ -199,19 +200,22 @@ where
199200}
200201
201202#[ allow( missing_debug_implementations) ]
203+ #[ pin_project]
202204pub struct H2Stream < F , B >
203205where
204206 B : Payload ,
205207{
206208 reply : SendResponse < SendBuf < B :: Data > > ,
209+ #[ pin]
207210 state : H2StreamState < F , B > ,
208211}
209212
213+ #[ pin_project]
210214enum H2StreamState < F , B >
211215where
212216 B : Payload ,
213217{
214- Service ( F ) ,
218+ Service ( # [ pin ] F ) ,
215219 Body ( PipeToSendStream < B > ) ,
216220}
217221
@@ -229,20 +233,34 @@ where
229233 }
230234}
231235
236+ macro_rules! reply {
237+ ( $me: expr, $res: expr, $eos: expr) => ( {
238+ match $me. reply. send_response( $res, $eos) {
239+ Ok ( tx) => tx,
240+ Err ( e) => {
241+ debug!( "send response error: {}" , e) ;
242+ $me. reply. send_reset( Reason :: INTERNAL_ERROR ) ;
243+ return Poll :: Ready ( Err ( crate :: Error :: new_h2( e) ) ) ;
244+ }
245+ }
246+ } )
247+ }
248+
232249impl < F , B , E > H2Stream < F , B >
233250where
234251 F : Future < Output = Result < Response < B > , E > > ,
235252 B : Payload + Unpin ,
236253 B :: Data : Unpin ,
237254 E : Into < Box < dyn StdError + Send + Sync > > ,
238255{
239- fn poll2 ( self : Pin < & mut Self > , cx : & mut task:: Context < ' _ > ) -> Poll < crate :: Result < ( ) > > {
240- // Safety: State::{Service, Body} futures are never moved
241- let me = unsafe { self . get_unchecked_mut ( ) } ;
256+ #[ project]
257+ fn poll2 ( mut self : Pin < & mut Self > , cx : & mut task:: Context < ' _ > ) -> Poll < crate :: Result < ( ) > > {
242258 loop {
243- let next = match me. state {
244- H2StreamState :: Service ( ref mut h) => {
245- let res = match unsafe { Pin :: new_unchecked ( h) } . poll ( cx) {
259+ let mut me = self . project ( ) ;
260+ #[ project]
261+ let next = match me. state . project ( ) {
262+ H2StreamState :: Service ( h) => {
263+ let res = match h. poll ( cx) {
246264 Poll :: Ready ( Ok ( r) ) => r,
247265 Poll :: Pending => {
248266 // Response is not yet ready, so we want to check if the client has sent a
@@ -274,37 +292,26 @@ where
274292 . expect ( "DATE is a valid HeaderName" )
275293 . or_insert_with ( crate :: proto:: h1:: date:: update_and_header_value) ;
276294
277- macro_rules! reply {
278- ( $eos: expr) => ( {
279- match me. reply. send_response( res, $eos) {
280- Ok ( tx) => tx,
281- Err ( e) => {
282- debug!( "send response error: {}" , e) ;
283- me. reply. send_reset( Reason :: INTERNAL_ERROR ) ;
284- return Poll :: Ready ( Err ( crate :: Error :: new_h2( e) ) ) ;
285- }
286- }
287- } )
288- }
295+
289296
290297 // automatically set Content-Length from body...
291298 if let Some ( len) = body. size_hint ( ) . exact ( ) {
292299 headers:: set_content_length_if_missing ( res. headers_mut ( ) , len) ;
293300 }
294301
295302 if !body. is_end_stream ( ) {
296- let body_tx = reply ! ( false ) ;
303+ let body_tx = reply ! ( me , res , false ) ;
297304 H2StreamState :: Body ( PipeToSendStream :: new ( body, body_tx) )
298305 } else {
299- reply ! ( true ) ;
306+ reply ! ( me , res , true ) ;
300307 return Poll :: Ready ( Ok ( ( ) ) ) ;
301308 }
302309 } ,
303310 H2StreamState :: Body ( ref mut pipe) => {
304311 return Pin :: new ( pipe) . poll ( cx) ;
305312 }
306313 } ;
307- me. state = next;
314+ me. state . set ( next) ;
308315 }
309316 }
310317}
0 commit comments