@@ -251,6 +251,7 @@ def __init__(self) -> None:
251251 # New style request/response
252252 self ._resp_map : Dict [str , asyncio .Future ] = {}
253253 self ._resp_sub_prefix : Optional [bytearray ] = None
254+ self ._sub_prefix_subscription : Optional [Subscription ] = None
254255 self ._nuid = NUID ()
255256 self ._inbox_prefix = bytearray (DEFAULT_INBOX_PREFIX )
256257 self ._auth_configured : bool = False
@@ -680,11 +681,17 @@ async def _close(self, status: int, do_cbs: bool = True) -> None:
680681 if self .is_closed :
681682 self ._status = status
682683 return
683- self ._status = Client .CLOSED
684+
685+ if self ._sub_prefix_subscription is not None :
686+ subscription = self ._sub_prefix_subscription
687+ self ._sub_prefix_subscription = None
688+ await subscription .unsubscribe ()
684689
685690 # Kick the flusher once again so that Task breaks and avoid pending futures.
686691 await self ._flush_pending ()
687692
693+ self ._status = Client .CLOSED
694+
688695 if self ._reading_task is not None and not self ._reading_task .cancelled (
689696 ):
690697 self ._reading_task .cancel ()
@@ -726,11 +733,7 @@ async def _close(self, status: int, do_cbs: bool = True) -> None:
726733 # Cleanup subscriptions since not reconnecting so no need
727734 # to replay the subscriptions anymore.
728735 for sub in self ._subs .values ():
729- # Async subs use join when draining already so just cancel here.
730- if sub ._wait_for_msgs_task and not sub ._wait_for_msgs_task .done ():
731- sub ._wait_for_msgs_task .cancel ()
732- if sub ._message_iterator :
733- sub ._message_iterator ._cancel ()
736+ sub ._stop_processing ()
734737 # Sync subs may have some inflight next_msg calls that could be blocking
735738 # so cancel them here to unblock them.
736739 if sub ._pending_next_msgs_calls :
@@ -985,7 +988,7 @@ async def _init_request_sub(self) -> None:
985988 self ._resp_sub_prefix .extend (b"." )
986989 resp_mux_subject = self ._resp_sub_prefix [:]
987990 resp_mux_subject .extend (b"*" )
988- await self .subscribe (
991+ self . _sub_prefix_subscription = await self .subscribe (
989992 resp_mux_subject .decode (), cb = self ._request_sub_callback
990993 )
991994
@@ -2068,23 +2071,28 @@ async def _flusher(self) -> None:
20682071 if not self .is_connected or self .is_connecting :
20692072 break
20702073
2071- future : asyncio .Future = await self ._flush_queue .get ()
2072-
20732074 try :
2074- if self ._pending_data_size > 0 :
2075- self ._transport .writelines (self ._pending [:])
2076- self ._pending = []
2077- self ._pending_data_size = 0
2078- await self ._transport .drain ()
2079- except OSError as e :
2080- await self ._error_cb (e )
2081- await self ._process_op_err (e )
2082- break
2083- except (asyncio .CancelledError , RuntimeError , AttributeError ):
2084- # RuntimeError in case the event loop is closed
2085- break
2086- finally :
2087- future .set_result (None )
2075+ future : asyncio .Future = await self ._flush_queue .get ()
2076+ try :
2077+ if self ._pending_data_size > 0 :
2078+ self ._transport .writelines (self ._pending [:])
2079+ self ._pending = []
2080+ self ._pending_data_size = 0
2081+ await self ._transport .drain ()
2082+ except OSError as e :
2083+ await self ._error_cb (e )
2084+ await self ._process_op_err (e )
2085+ break
2086+ except (RuntimeError , AttributeError ):
2087+ # RuntimeError in case the event loop is closed
2088+ break
2089+ finally :
2090+ future .set_result (None )
2091+ except asyncio .CancelledError :
2092+ if self ._status == Client .CLOSED :
2093+ break
2094+ else :
2095+ continue
20882096
20892097 async def _ping_interval (self ) -> None :
20902098 while True :
@@ -2098,8 +2106,13 @@ async def _ping_interval(self) -> None:
20982106 await self ._process_op_err (ErrStaleConnection ())
20992107 return
21002108 await self ._send_ping ()
2101- except (asyncio . CancelledError , RuntimeError , AttributeError ):
2109+ except (RuntimeError , AttributeError ):
21022110 break
2111+ except asyncio .CancelledError :
2112+ if self ._status == Client .CLOSED :
2113+ break
2114+ else :
2115+ continue
21032116 # except asyncio.InvalidStateError:
21042117 # pass
21052118
@@ -2130,7 +2143,10 @@ async def _read_loop(self) -> None:
21302143 await self ._process_op_err (e )
21312144 break
21322145 except asyncio .CancelledError :
2133- break
2146+ if self ._status == Client .CLOSED :
2147+ break
2148+ else :
2149+ continue
21342150 except Exception as ex :
21352151 _logger .error ("nats: encountered error" , exc_info = ex )
21362152 break
0 commit comments