@@ -95,7 +95,7 @@ impl Worker {
9595 Poll :: Ready ( Some ( request) ) => Poll :: Ready ( request) ,
9696 Poll :: Ready ( None ) => {
9797 // Channel was closed, explicitly or because the sender was dropped. Either way
98- // we should start a gracefull shutdown.
98+ // we should start a graceful shutdown.
9999 self . socket
100100 . write_buffer_mut ( )
101101 . put_slice ( & [ Terminate :: FORMAT as u8 , 0 , 0 , 0 , 4 ] ) ;
@@ -151,6 +151,8 @@ impl Worker {
151151 while let Poll :: Ready ( response) = self . poll_next_message ( cx) ? {
152152 match response. format {
153153 BackendMessageFormat :: ReadyForQuery => {
154+ // Cloning a `ReceivedMessage` here is cheap because it only clones the
155+ // underlying `Bytes`
154156 let rfq: ReadyForQuery = response. clone ( ) . decode ( ) ?;
155157 self . shared . set_transaction_status ( rfq. transaction_status ) ;
156158
@@ -187,6 +189,7 @@ impl Worker {
187189 }
188190
189191 if self . state != WorkerState :: Open && self . back_log . is_empty ( ) {
192+ // After the connection is closed and the backlog is empty we can close the socket.
190193 self . state = WorkerState :: Closed ;
191194 }
192195 Ok ( ( ) )
@@ -232,18 +235,15 @@ impl Worker {
232235 #[ inline( always) ]
233236 fn poll_shutdown ( & mut self , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) > > {
234237 if self . state == WorkerState :: Closed {
235- // The buffer is closed, a [Terminate] message has been sent, now try and close the socket.
238+ // The channel is closed, all requests are flushed and a [Terminate] message has been
239+ // sent, now try and close the socket
236240 self . socket . poll_close_unpin ( cx)
237241 } else {
238242 Poll :: Pending
239243 }
240244 }
241- }
242-
243- impl Future for Worker {
244- type Output = Result < ( ) > ;
245245
246- fn poll ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
246+ fn poll_worker ( mut self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) > > {
247247 // Try to receive responses from the database and handle them.
248248 self . poll_backlog ( cx) ?;
249249
@@ -260,6 +260,17 @@ impl Future for Worker {
260260 }
261261}
262262
263+ impl Future for Worker {
264+ type Output = Result < ( ) > ;
265+
266+ fn poll ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
267+ self . poll_worker ( cx) . map_err ( |e| {
268+ tracing:: error!( "Background worker stopped with error: {e:?}" ) ;
269+ e
270+ } )
271+ }
272+ }
273+
263274#[ derive( Clone ) ]
264275pub struct Shared ( Arc < Mutex < SharedInner > > ) ;
265276
0 commit comments