diff --git a/ipykernel/iostream.py b/ipykernel/iostream.py index cb4fc0525..08f3e9fe7 100644 --- a/ipykernel/iostream.py +++ b/ipykernel/iostream.py @@ -7,6 +7,7 @@ import atexit import contextvars +import functools import io import os import sys @@ -620,10 +621,19 @@ def flush(self): else: self._flush() + @property def _flush(self): + """Prepare _flush_impl partial to be scheduled on the IO thread. + + This indirection is necessary to ensure _flush_impl calls hooks + registered from the current thread (as they are thread-local). + """ + return functools.partial(self._flush_impl, self._hooks) + + def _flush_impl(self, hooks=()): """This is where the actual send happens. - _flush should generally be called in the IO thread, + _flush_impl should generally be called in the IO thread, unless the thread has been destroyed (e.g. forked subprocess). """ self._flush_pending = False @@ -648,7 +658,7 @@ def _flush(self): # Each transform either returns a new # message or None. If None is returned, # the message has been 'used' and we return. - for hook in self._hooks: + for hook in hooks: msg = hook(msg) if msg is None: return diff --git a/tests/test_io.py b/tests/test_io.py index e3ff28159..e5cbb8c4e 100644 --- a/tests/test_io.py +++ b/tests/test_io.py @@ -118,6 +118,25 @@ async def test_outstream(anyio_backend, iopub_thread): assert stream.writable() +async def test_outstream_hooks(anyio_backend, iopub_thread): + session = Session() + + stream = OutStream(session, iopub_thread, "stdout") + + with stream: + hook_called = False + + def hook(msg): + nonlocal hook_called + hook_called = True + return msg + + stream.register_hook(hook) + stream.write("hi") + stream.flush() + assert hook_called + + @pytest.mark.anyio() async def test_event_pipe_gc(iopub_thread): session = Session(key=b"abc")