77from contextvars import ContextVar
88from typing import Dict , Any , List , Awaitable
99
10- from anyio import Event , create_memory_object_stream , create_task_group , from_thread , get_cancelled_exc_class , run , sleep , to_thread
10+ from anyio import (
11+ Event ,
12+ create_memory_object_stream ,
13+ create_task_group ,
14+ from_thread ,
15+ get_cancelled_exc_class ,
16+ run ,
17+ sleep ,
18+ to_thread ,
19+ )
1120import comm # type: ignore
1221from akernel .comm .manager import CommManager
1322from akernel .display import display
@@ -103,6 +112,7 @@ def __init__(
103112 self .stop_event = Event ()
104113 if execute_in_thread :
105114 import threading
115+
106116 self ._stop_event = threading .Event ()
107117 self .key = "0"
108118
@@ -191,7 +201,9 @@ async def thread_execute(self):
191201 return
192202 except Exception :
193203 exc_type , exception , traceback = sys .exc_info ()
194- from_thread .run_sync (self .from_thread_send_stream .send_nowait , (result , exception , traceback ))
204+ from_thread .run_sync (
205+ self .from_thread_send_stream .send_nowait , (result , exception , traceback )
206+ )
195207
196208 async def thread_main (self ) -> None :
197209 async with create_task_group () as tg :
@@ -206,8 +218,11 @@ async def start(self) -> None:
206218 async with create_task_group () as self .task_group :
207219 if self .execute_in_thread :
208220 from queue import Queue
221+
209222 self .to_thread_queue = Queue ()
210- self .from_thread_send_stream , self .from_thread_receive_stream = create_memory_object_stream (max_buffer_size = 1 )
223+ self .from_thread_send_stream , self .from_thread_receive_stream = (
224+ create_memory_object_stream (max_buffer_size = 1 )
225+ )
211226 self .task_group .start_soon (to_thread .run_sync , self .thread )
212227 msg = self .create_message ("status" , content = {"execution_state" : self .execution_state })
213228 to_send = serialize (msg , self .key )
@@ -250,7 +265,10 @@ async def listen_shell(self) -> None:
250265 # if there was a blocking cell execution, and it was interrupted,
251266 # let's ignore all the following execution requests until the pipe
252267 # is empty
253- if self .interrupted and self .to_shell_receive_stream .statistics ().tasks_waiting_send == 0 :
268+ if (
269+ self .interrupted
270+ and self .to_shell_receive_stream .statistics ().tasks_waiting_send == 0
271+ ):
254272 self .interrupted = False
255273 msg_list = await self .to_shell_receive_stream .receive ()
256274 idents , msg_list = feed_identities (msg_list )
@@ -421,7 +439,9 @@ async def execute_and_finish(
421439 namespace = self .get_namespace (parent_header )
422440 try :
423441 if self .execute_in_thread :
424- self .to_thread_queue .put ((parent , idents , self .locals [namespace ][f"__async_cell{ task_i } __" ]))
442+ self .to_thread_queue .put (
443+ (parent , idents , self .locals [namespace ][f"__async_cell{ task_i } __" ])
444+ )
425445 result , exception , traceback = await self .from_thread_receive_stream .receive ()
426446 else :
427447 PARENT_VAR .set (parent )
0 commit comments