@@ -27,12 +27,13 @@ use super::{request::IoRequest, tls::MaybeUpgradeTls};
2727
2828#[ derive( PartialEq , Debug ) ]
2929enum WorkerState {
30- // The connection is open and ready for business .
30+ // The connection is open and ready for requests .
3131 Open ,
32- // Sent/sending a [Terminate] message but did not close the socket. Responding to the last
33- // messages but not receiving new ones .
32+ // Responding to the last messages but not receiving new ones. After handling the last message
33+ // a [Terminate] message is issued .
3434 Closing ,
35- // The connection is terminated, this step closes the socket and stops the background task.
35+ // Last messages are handled, [Terminate] message is sent and the session is closed. Nog try
36+ // and close the socket.
3637 Closed ,
3738}
3839
@@ -86,29 +87,21 @@ impl Worker {
8687 // Tries to receive the next message from the channel. Also handles termination if needed.
8788 #[ inline( always) ]
8889 fn poll_next_request ( & mut self , cx : & mut Context < ' _ > ) -> Poll < IoRequest > {
89- if self . state != WorkerState :: Open {
90- return Poll :: Pending ;
91- }
92-
9390 match self . chan . poll_next_unpin ( cx) {
9491 Poll :: Pending => Poll :: Pending ,
9592 Poll :: Ready ( Some ( request) ) => Poll :: Ready ( request) ,
9693 Poll :: Ready ( None ) => {
9794 // Channel was closed, explicitly or because the sender was dropped. Either way
9895 // we should start a graceful shutdown.
99- self . socket
100- . write_buffer_mut ( )
101- . put_slice ( & [ Terminate :: FORMAT as u8 , 0 , 0 , 0 , 4 ] ) ;
102-
10396 self . state = WorkerState :: Closing ;
104- self . should_flush = true ;
10597 Poll :: Pending
10698 }
10799 }
108100 }
109101
110102 #[ inline( always) ]
111103 fn poll_receiver ( & mut self , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) > > {
104+ // Only try and receive io requests if we're open.
112105 if self . state != WorkerState :: Open {
113106 return Poll :: Ready ( Ok ( ( ) ) ) ;
114107 }
@@ -187,16 +180,17 @@ impl Worker {
187180 _ => self . send_back ( response) ?,
188181 }
189182 }
190-
191- if self . state != WorkerState :: Open && self . back_log . is_empty ( ) {
192- // After the connection is closed and the backlog is empty we can close the socket.
193- self . state = WorkerState :: Closed ;
194- }
195183 Ok ( ( ) )
196184 }
197185
198186 #[ inline( always) ]
199187 fn poll_next_message ( & mut self , cx : & mut Context < ' _ > ) -> Poll < Result < ReceivedMessage > > {
188+ if self . state == WorkerState :: Closed {
189+ // We're still responsing to the last messages, only after clearing the backlog we
190+ // should stop reading.
191+ return Poll :: Pending ;
192+ }
193+
200194 self . socket . poll_try_read ( cx, |buf| {
201195 // all packets in postgres start with a 5-byte header
202196 // this header contains the message type and the total length of the message
@@ -234,12 +228,21 @@ impl Worker {
234228
235229 #[ inline( always) ]
236230 fn poll_shutdown ( & mut self , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) > > {
237- if self . state == WorkerState :: Closed {
231+ match self . state {
232+ // After responding to the last messages we can issue a [Terminate] request and
233+ // close the connection.
234+ WorkerState :: Closing if self . back_log . is_empty ( ) => {
235+ let terminate = [ Terminate :: FORMAT as u8 , 0 , 0 , 0 , 4 ] ;
236+ self . socket . write_buffer_mut ( ) . put_slice ( & terminate) ;
237+ self . state = WorkerState :: Closed ;
238+
239+ // Closing the socket also flushes the buffer.
240+ self . socket . poll_close_unpin ( cx)
241+ }
238242 // The channel is closed, all requests are flushed and a [Terminate] message has been
239243 // sent, now try and close the socket
240- self . socket . poll_close_unpin ( cx)
241- } else {
242- Poll :: Pending
244+ WorkerState :: Closed => self . socket . poll_close_unpin ( cx) ,
245+ WorkerState :: Open | WorkerState :: Closing => Poll :: Pending ,
243246 }
244247 }
245248
0 commit comments