Skip to content

Commit 9363910

Browse files
committed
Fix in-process kernel
1 parent 7f1c67e commit 9363910

18 files changed

+301
-226
lines changed
+4-36
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
11
"""An in-process terminal example."""
22
import os
3-
import sys
43

5-
import tornado
4+
from anyio import run
65
from jupyter_console.ptshell import ZMQTerminalInteractiveShell
76

87
from ipykernel.inprocess.manager import InProcessKernelManager
@@ -13,46 +12,15 @@ def print_process_id():
1312
print("Process ID is:", os.getpid())
1413

1514

16-
def init_asyncio_patch():
17-
"""set default asyncio policy to be compatible with tornado
18-
Tornado 6 (at least) is not compatible with the default
19-
asyncio implementation on Windows
20-
Pick the older SelectorEventLoopPolicy on Windows
21-
if the known-incompatible default policy is in use.
22-
do this as early as possible to make it a low priority and overrideable
23-
ref: https://github.com/tornadoweb/tornado/issues/2608
24-
FIXME: if/when tornado supports the defaults in asyncio,
25-
remove and bump tornado requirement for py38
26-
"""
27-
if (
28-
sys.platform.startswith("win")
29-
and sys.version_info >= (3, 8)
30-
and tornado.version_info < (6, 1)
31-
):
32-
import asyncio
33-
34-
try:
35-
from asyncio import WindowsProactorEventLoopPolicy, WindowsSelectorEventLoopPolicy
36-
except ImportError:
37-
pass
38-
# not affected
39-
else:
40-
if type(asyncio.get_event_loop_policy()) is WindowsProactorEventLoopPolicy:
41-
# WindowsProactorEventLoopPolicy is not compatible with tornado 6
42-
# fallback to the pre-3.8 default of Selector
43-
asyncio.set_event_loop_policy(WindowsSelectorEventLoopPolicy())
44-
45-
46-
def main():
15+
async def main():
4716
"""The main function."""
4817
print_process_id()
4918

5019
# Create an in-process kernel
5120
# >>> print_process_id()
5221
# will print the same process ID as the main process
53-
init_asyncio_patch()
5422
kernel_manager = InProcessKernelManager()
55-
kernel_manager.start_kernel()
23+
await kernel_manager.start_kernel()
5624
kernel = kernel_manager.kernel
5725
kernel.gui = "qt4"
5826
kernel.shell.push({"foo": 43, "print_process_id": print_process_id})
@@ -64,4 +32,4 @@ def main():
6432

6533

6634
if __name__ == "__main__":
67-
main()
35+
run(main)

ipykernel/inprocess/blocking.py

+2-3
Original file line numberDiff line numberDiff line change
@@ -80,10 +80,10 @@ class BlockingInProcessKernelClient(InProcessKernelClient):
8080
iopub_channel_class = Type(BlockingInProcessChannel)
8181
stdin_channel_class = Type(BlockingInProcessStdInChannel)
8282

83-
def wait_for_ready(self):
83+
async def wait_for_ready(self):
8484
"""Wait for kernel info reply on shell channel."""
8585
while True:
86-
self.kernel_info()
86+
await self.kernel_info()
8787
try:
8888
msg = self.shell_channel.get_msg(block=True, timeout=1)
8989
except Empty:
@@ -103,6 +103,5 @@ def wait_for_ready(self):
103103
while True:
104104
try:
105105
msg = self.iopub_channel.get_msg(block=True, timeout=0.2)
106-
print(msg["msg_type"])
107106
except Empty:
108107
break

ipykernel/inprocess/client.py

+19-29
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,9 @@
1111
# Imports
1212
# -----------------------------------------------------------------------------
1313

14-
import asyncio
1514

1615
from jupyter_client.client import KernelClient
1716
from jupyter_client.clientabc import KernelClientABC
18-
from jupyter_core.utils import run_sync
1917

2018
# IPython imports
2119
from traitlets import Instance, Type, default
@@ -101,7 +99,7 @@ def hb_channel(self):
10199
# Methods for sending specific messages
102100
# -------------------------------------
103101

104-
def execute(
102+
async def execute(
105103
self, code, silent=False, store_history=True, user_expressions=None, allow_stdin=None
106104
):
107105
"""Execute code on the client."""
@@ -115,19 +113,19 @@ def execute(
115113
allow_stdin=allow_stdin,
116114
)
117115
msg = self.session.msg("execute_request", content)
118-
self._dispatch_to_kernel(msg)
116+
await self._dispatch_to_kernel(msg)
119117
return msg["header"]["msg_id"]
120118

121-
def complete(self, code, cursor_pos=None):
119+
async def complete(self, code, cursor_pos=None):
122120
"""Get code completion."""
123121
if cursor_pos is None:
124122
cursor_pos = len(code)
125123
content = dict(code=code, cursor_pos=cursor_pos)
126124
msg = self.session.msg("complete_request", content)
127-
self._dispatch_to_kernel(msg)
125+
await self._dispatch_to_kernel(msg)
128126
return msg["header"]["msg_id"]
129127

130-
def inspect(self, code, cursor_pos=None, detail_level=0):
128+
async def inspect(self, code, cursor_pos=None, detail_level=0):
131129
"""Get code inspection."""
132130
if cursor_pos is None:
133131
cursor_pos = len(code)
@@ -137,14 +135,14 @@ def inspect(self, code, cursor_pos=None, detail_level=0):
137135
detail_level=detail_level,
138136
)
139137
msg = self.session.msg("inspect_request", content)
140-
self._dispatch_to_kernel(msg)
138+
await self._dispatch_to_kernel(msg)
141139
return msg["header"]["msg_id"]
142140

143-
def history(self, raw=True, output=False, hist_access_type="range", **kwds):
141+
async def history(self, raw=True, output=False, hist_access_type="range", **kwds):
144142
"""Get code history."""
145143
content = dict(raw=raw, output=output, hist_access_type=hist_access_type, **kwds)
146144
msg = self.session.msg("history_request", content)
147-
self._dispatch_to_kernel(msg)
145+
await self._dispatch_to_kernel(msg)
148146
return msg["header"]["msg_id"]
149147

150148
def shutdown(self, restart=False):
@@ -153,17 +151,17 @@ def shutdown(self, restart=False):
153151
msg = "Cannot shutdown in-process kernel"
154152
raise NotImplementedError(msg)
155153

156-
def kernel_info(self):
154+
async def kernel_info(self):
157155
"""Request kernel info."""
158156
msg = self.session.msg("kernel_info_request")
159-
self._dispatch_to_kernel(msg)
157+
await self._dispatch_to_kernel(msg)
160158
return msg["header"]["msg_id"]
161159

162-
def comm_info(self, target_name=None):
160+
async def comm_info(self, target_name=None):
163161
"""Request a dictionary of valid comms and their targets."""
164162
content = {} if target_name is None else dict(target_name=target_name)
165163
msg = self.session.msg("comm_info_request", content)
166-
self._dispatch_to_kernel(msg)
164+
await self._dispatch_to_kernel(msg)
167165
return msg["header"]["msg_id"]
168166

169167
def input(self, string):
@@ -173,29 +171,21 @@ def input(self, string):
173171
raise RuntimeError(msg)
174172
self.kernel.raw_input_str = string
175173

176-
def is_complete(self, code):
174+
async def is_complete(self, code):
177175
"""Handle an is_complete request."""
178176
msg = self.session.msg("is_complete_request", {"code": code})
179-
self._dispatch_to_kernel(msg)
177+
await self._dispatch_to_kernel(msg)
180178
return msg["header"]["msg_id"]
181179

182-
def _dispatch_to_kernel(self, msg):
180+
async def _dispatch_to_kernel(self, msg):
183181
"""Send a message to the kernel and handle a reply."""
184182
kernel = self.kernel
185183
if kernel is None:
186-
msg = "Cannot send request. No kernel exists."
187-
raise RuntimeError(msg)
184+
error_message = "Cannot send request. No kernel exists."
185+
raise RuntimeError(error_message)
188186

189-
stream = kernel.shell_stream
190-
self.session.send(stream, msg)
191-
msg_parts = stream.recv_multipart()
192-
if run_sync is not None:
193-
dispatch_shell = run_sync(kernel.dispatch_shell)
194-
dispatch_shell(msg_parts)
195-
else:
196-
loop = asyncio.get_event_loop()
197-
loop.run_until_complete(kernel.dispatch_shell(msg_parts))
198-
idents, reply_msg = self.session.recv(stream, copy=False)
187+
kernel.shell_socket.put(msg)
188+
reply_msg = await kernel.shell_socket.get()
199189
self.shell_channel.call_handlers_later(reply_msg)
200190

201191
def get_shell_msg(self, block=True, timeout=None):

ipykernel/inprocess/ipkernel.py

+19-10
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@
77
import sys
88
from contextlib import contextmanager
99

10+
from anyio import TASK_STATUS_IGNORED
11+
from anyio.abc import TaskStatus
1012
from IPython.core.interactiveshell import InteractiveShellABC
1113
from traitlets import Any, Enum, Instance, List, Type, default
1214

@@ -48,10 +50,10 @@ class InProcessKernel(IPythonKernel):
4850
# -------------------------------------------------------------------------
4951

5052
shell_class = Type(allow_none=True)
51-
_underlying_iopub_socket = Instance(DummySocket, ())
53+
_underlying_iopub_socket = Instance(DummySocket, (False,))
5254
iopub_thread: IOPubThread = Instance(IOPubThread) # type:ignore[assignment]
5355

54-
shell_stream = Instance(DummySocket, ())
56+
shell_socket = Instance(DummySocket, (True,))
5557

5658
@default("iopub_thread")
5759
def _default_iopub_thread(self):
@@ -65,23 +67,27 @@ def _default_iopub_thread(self):
6567
def _default_iopub_socket(self):
6668
return self.iopub_thread.background_socket
6769

68-
stdin_socket = Instance(DummySocket, ()) # type:ignore[assignment]
70+
stdin_socket = Instance(DummySocket, (False,)) # type:ignore[assignment]
6971

7072
def __init__(self, **traits):
7173
"""Initialize the kernel."""
7274
super().__init__(**traits)
7375

74-
self._underlying_iopub_socket.observe(self._io_dispatch, names=["message_sent"])
76+
self._io_dispatch()
7577
self.shell.kernel = self
7678

7779
async def execute_request(self, stream, ident, parent):
7880
"""Override for temporary IO redirection."""
7981
with self._redirected_io():
8082
await super().execute_request(stream, ident, parent)
8183

82-
def start(self):
84+
async def start(self, *, task_status: TaskStatus = TASK_STATUS_IGNORED) -> None:
8385
"""Override registration of dispatchers for streams."""
8486
self.shell.exit_now = False
87+
await super().start(task_status=task_status)
88+
89+
def stop(self):
90+
super().stop()
8591

8692
def _abort_queues(self):
8793
"""The in-process kernel doesn't abort requests."""
@@ -127,12 +133,15 @@ def _redirected_io(self):
127133

128134
# ------ Trait change handlers --------------------------------------------
129135

130-
def _io_dispatch(self, change):
136+
def _io_dispatch(self):
131137
"""Called when a message is sent to the IO socket."""
132138
assert self.iopub_socket.io_thread is not None
133-
ident, msg = self.session.recv(self.iopub_socket.io_thread.socket, copy=False)
134-
for frontend in self.frontends:
135-
frontend.iopub_channel.call_handlers(msg)
139+
140+
def callback(msg):
141+
for frontend in self.frontends:
142+
frontend.iopub_channel.call_handlers(msg)
143+
144+
self.iopub_thread.socket.on_recv = callback
136145

137146
# ------ Trait initializers -----------------------------------------------
138147

@@ -142,7 +151,7 @@ def _default_log(self):
142151

143152
@default("session")
144153
def _default_session(self):
145-
from jupyter_client.session import Session
154+
from .session import Session
146155

147156
return Session(parent=self, key=INPROCESS_KEY)
148157

ipykernel/inprocess/manager.py

+10-4
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,14 @@
33
# Copyright (c) IPython Development Team.
44
# Distributed under the terms of the Modified BSD License.
55

6+
from anyio import TASK_STATUS_IGNORED
7+
from anyio.abc import TaskStatus
68
from jupyter_client.manager import KernelManager
79
from jupyter_client.managerabc import KernelManagerABC
8-
from jupyter_client.session import Session
910
from traitlets import DottedObjectName, Instance, default
1011

1112
from .constants import INPROCESS_KEY
13+
from .session import Session
1214

1315

1416
class InProcessKernelManager(KernelManager):
@@ -41,27 +43,31 @@ def _default_session(self):
4143
# Kernel management methods
4244
# --------------------------------------------------------------------------
4345

44-
def start_kernel(self, **kwds):
46+
async def start_kernel(self, *, task_status: TaskStatus = TASK_STATUS_IGNORED, **kwds) -> None:
4547
"""Start the kernel."""
4648
from ipykernel.inprocess.ipkernel import InProcessKernel
4749

4850
self.kernel = InProcessKernel(parent=self, session=self.session)
51+
await self.kernel.start(task_status=task_status)
4952

5053
def shutdown_kernel(self):
5154
"""Shutdown the kernel."""
5255
self.kernel.iopub_thread.stop()
5356
self._kill_kernel()
5457

55-
def restart_kernel(self, now=False, **kwds):
58+
async def restart_kernel(
59+
self, now=False, *, task_status: TaskStatus = TASK_STATUS_IGNORED, **kwds
60+
) -> None:
5661
"""Restart the kernel."""
5762
self.shutdown_kernel()
58-
self.start_kernel(**kwds)
63+
await self.start_kernel(task_status=task_status, **kwds)
5964

6065
@property
6166
def has_kernel(self):
6267
return self.kernel is not None
6368

6469
def _kill_kernel(self):
70+
self.kernel.stop()
6571
self.kernel = None
6672

6773
def interrupt_kernel(self):

ipykernel/inprocess/session.py

+41
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
from jupyter_client.session import Session as _Session
2+
3+
4+
class Session(_Session):
5+
async def recv(self, socket, copy=True):
6+
return await socket.recv_multipart()
7+
8+
def send(
9+
self,
10+
socket,
11+
msg_or_type,
12+
content=None,
13+
parent=None,
14+
ident=None,
15+
buffers=None,
16+
track=False,
17+
header=None,
18+
metadata=None,
19+
):
20+
if isinstance(msg_or_type, str):
21+
msg = self.msg(
22+
msg_or_type,
23+
content=content,
24+
parent=parent,
25+
header=header,
26+
metadata=metadata,
27+
)
28+
else:
29+
# We got a Message or message dict, not a msg_type so don't
30+
# build a new Message.
31+
msg = msg_or_type
32+
buffers = buffers or msg.get("buffers", [])
33+
34+
socket.send_multipart(msg)
35+
return msg
36+
37+
def feed_identities(self, msg, copy=True):
38+
return "", msg
39+
40+
def deserialize(self, msg, content=True, copy=True):
41+
return msg

0 commit comments

Comments
 (0)