Skip to content

Commit 8811361

Browse files
committed
CABI refactor: improve call_and_handle_blocking interface
1 parent 11fe9a6 commit 8811361

File tree

2 files changed

+61
-52
lines changed

2 files changed

+61
-52
lines changed

design/mvp/CanonicalABI.md

+40-34
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ class EventCode(IntEnum):
304304

305305
EventTuple = tuple[EventCode, int]
306306
EventCallback = Callable[[], EventTuple]
307-
OnBlockCallback = Callable[[Awaitable], any]
307+
OnBlockCallback = Callable[[Awaitable], Any]
308308
```
309309
The `CallState` enum describes the linear sequence of states that an async call
310310
necessarily transitions through: [`STARTING`](Async.md#backpressure), `STARTED`,
@@ -340,45 +340,51 @@ async def default_on_block(f):
340340
await current_task.acquire()
341341
return v
342342

343-
async def call_and_handle_blocking(callee):
344-
blocked = asyncio.Future()
343+
class Blocked: pass
344+
345+
async def call_and_handle_blocking(callee, *args) -> Blocked|Any:
346+
blocked_or_result = asyncio.Future[Blocked|Any]()
345347
async def on_block(f):
346-
if not blocked.done():
347-
blocked.set_result(True)
348+
if not blocked_or_result.done():
349+
blocked_or_result.set_result(Blocked())
348350
else:
349351
current_task.release()
352+
assert(not f.done())
350353
v = await f
351354
await current_task.acquire()
352355
return v
353356
async def do_call():
354-
await callee(on_block)
355-
if not blocked.done():
356-
blocked.set_result(False)
357+
result = await callee(*args, on_block)
358+
if not blocked_or_result.done():
359+
blocked_or_result.set_result(result)
357360
else:
358361
current_task.release()
359362
asyncio.create_task(do_call())
360-
return await blocked
363+
return await blocked_or_result
361364
```
362365
Talking through this little Python pretzel of control flow:
363366
1. `call_and_handle_blocking` starts by running `do_call` in a fresh Python
364367
task and then immediately `await`ing a future that will be resolved by
365368
`do_call`. Since `current_task` isn't `release()`d or `acquire()`d as part
366369
of this process, the net effect is to directly transfer control flow from
367370
`call_and_handle_blocking` to `do_call` task without allowing other tasks to
368-
run (as if by `cont.new` + `resume` in [stack-switching]).
371+
run (as if by the `cont.new` + `resume` instructions of [stack-switching]).
369372
2. `do_call` passes the local `on_block` closure to `callee`, which the
370-
Canonical ABI ensures will be called whenever there is a need to block.
371-
3. If `on_block` is called, the first time it resolves `blocking`. Because
373+
Canonical ABI ensures will be called whenever there is a need to block on
374+
I/O (represented by the future `f`).
375+
3. If `on_block` is called, the first time it is called it will signal that
376+
the `callee` has `Blocked` before `await`ing the unresolved future. Because
372377
the `current_task` lock is not `release()`d or `acquire()`d as part of this
373-
process, the net effect is to directly transfer control flow from `do_call`
374-
back to `call_and_handle_blocking` without allowing other tasks to run (as
375-
if by `suspend` in [stack-switching]).
378+
process, the net effect is to transfer control flow directly from
379+
`on_block` to `call_and_handle_blocking` without allowing any other tasks
380+
to execute (as if by the `suspend` instruction of [stack-switching]).
376381
4. If `on_block` is called more than once, there is no longer a caller to
377382
directly switch to, so the `current_task` lock is `release()`d, just like
378383
in `default_on_block`, so that the Python async scheduler can pick another
379384
task to switch to.
380385
5. If `do_call` finishes without `on_block` ever having been called, it
381-
resolves `blocking` to `False` to communicate this fact to the caller.
386+
resolves `blocking` to the (not-`Blocking`) return value of `callee` to
387+
communicate this fact to the caller.
382388

383389
With these tricky primitives defined, the rest of the logic below can simply
384390
use `on_block` when there is a need to block and `call_and_handle_blocking`
@@ -616,7 +622,7 @@ tree.
616622
class Subtask(CallContext):
617623
ft: FuncType
618624
flat_args: CoreValueIter
619-
flat_results: Optional[list[any]]
625+
flat_results: Optional[list[Any]]
620626
state: CallState
621627
lenders: list[ResourceHandle]
622628
notify_supertask: bool
@@ -2147,25 +2153,25 @@ async def canon_lower(opts, ft, callee, task, flat_args):
21472153
async def do_call(on_block):
21482154
await callee(task, subtask.on_start, subtask.on_return, on_block)
21492155
[] = subtask.finish()
2150-
if await call_and_handle_blocking(do_call):
2151-
subtask.notify_supertask = True
2152-
task.need_to_drop += 1
2153-
i = task.inst.async_subtasks.add(subtask)
2154-
flat_results = [pack_async_result(i, subtask.state)]
2155-
else:
2156-
flat_results = [0]
2156+
match await call_and_handle_blocking(do_call):
2157+
case Blocked():
2158+
subtask.notify_supertask = True
2159+
task.need_to_drop += 1
2160+
i = task.inst.async_subtasks.add(subtask)
2161+
flat_results = [pack_async_result(i, subtask.state)]
2162+
case None:
2163+
flat_results = [0]
21572164
return flat_results
21582165
```
2159-
In the asynchronous case, `Task.call_and_handle_blocking` returns `True` if the
2160-
call to `do_call` blocks. In this blocking case, the `Subtask` is added to
2161-
stored in an instance-wide table and given an `i32` index that is later
2162-
returned by `task.wait` to indicate that the subtask made progress. The
2163-
`need_to_drop` increment is matched by a decrement in `canon_subtask_drop` and
2164-
ensures that all subtasks of a supertask are allowed to complete before the
2165-
supertask completes. The `notify_supertask` flag is set to tell `Subtask`
2166-
methods (below) to asynchronously notify the supertask of progress. Lastly,
2167-
the current state of the subtask is eagerly returned to the caller, packed
2168-
with the `i32` subtask index:
2166+
In the asynchronous case, if `do_call` blocks before `Subtask.finish`
2167+
(signalled by `callee` calling `on_block`), the `Subtask` is added to an
2168+
instance-wide table and given an `i32` index that is later returned by
2169+
`task.wait` to signal subtask's progress. The `need_to_drop` increment is
2170+
matched by a decrement in `canon_subtask_drop` and ensures that all subtasks
2171+
of a supertask are allowed to complete before the supertask completes. The
2172+
`notify_supertask` flag is set to tell `Subtask` methods (below) to
2173+
asynchronously notify the supertask of progress. Lastly, the current progress
2174+
of the subtask is returned to the caller, packed with the `i32` subtask index:
21692175
```python
21702176
def pack_async_result(i, state):
21712177
assert(0 < i < 2**30)

design/mvp/canonical-abi/definitions.py

+21-18
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from __future__ import annotations
88
from dataclasses import dataclass
99
from functools import partial
10-
from typing import Optional, Callable, Awaitable, Literal, MutableMapping, TypeVar, Generic
10+
from typing import Any, Optional, Callable, Awaitable, Literal, MutableMapping, TypeVar, Generic
1111
from enum import IntEnum
1212
import math
1313
import struct
@@ -304,7 +304,7 @@ class EventCode(IntEnum):
304304

305305
EventTuple = tuple[EventCode, int]
306306
EventCallback = Callable[[], EventTuple]
307-
OnBlockCallback = Callable[[Awaitable], any]
307+
OnBlockCallback = Callable[[Awaitable], Any]
308308

309309
current_task = asyncio.Lock()
310310
asyncio.run(current_task.acquire())
@@ -315,24 +315,26 @@ async def default_on_block(f):
315315
await current_task.acquire()
316316
return v
317317

318-
async def call_and_handle_blocking(callee):
319-
blocked = asyncio.Future()
318+
class Blocked: pass
319+
320+
async def call_and_handle_blocking(callee, *args) -> Blocked|Any:
321+
blocked_or_result = asyncio.Future[Blocked|Any]()
320322
async def on_block(f):
321-
if not blocked.done():
322-
blocked.set_result(True)
323+
if not blocked_or_result.done():
324+
blocked_or_result.set_result(Blocked())
323325
else:
324326
current_task.release()
325327
v = await f
326328
await current_task.acquire()
327329
return v
328330
async def do_call():
329-
await callee(on_block)
330-
if not blocked.done():
331-
blocked.set_result(False)
331+
result = await callee(*args, on_block)
332+
if not blocked_or_result.done():
333+
blocked_or_result.set_result(result)
332334
else:
333335
current_task.release()
334336
asyncio.create_task(do_call())
335-
return await blocked
337+
return await blocked_or_result
336338

337339
class Task(CallContext):
338340
ft: FuncType
@@ -457,7 +459,7 @@ def exit(self):
457459
class Subtask(CallContext):
458460
ft: FuncType
459461
flat_args: CoreValueIter
460-
flat_results: Optional[list[any]]
462+
flat_results: Optional[list[Any]]
461463
state: CallState
462464
lenders: list[ResourceHandle]
463465
notify_supertask: bool
@@ -1454,13 +1456,14 @@ async def canon_lower(opts, ft, callee, task, flat_args):
14541456
async def do_call(on_block):
14551457
await callee(task, subtask.on_start, subtask.on_return, on_block)
14561458
[] = subtask.finish()
1457-
if await call_and_handle_blocking(do_call):
1458-
subtask.notify_supertask = True
1459-
task.need_to_drop += 1
1460-
i = task.inst.async_subtasks.add(subtask)
1461-
flat_results = [pack_async_result(i, subtask.state)]
1462-
else:
1463-
flat_results = [0]
1459+
match await call_and_handle_blocking(do_call):
1460+
case Blocked():
1461+
subtask.notify_supertask = True
1462+
task.need_to_drop += 1
1463+
i = task.inst.async_subtasks.add(subtask)
1464+
flat_results = [pack_async_result(i, subtask.state)]
1465+
case None:
1466+
flat_results = [0]
14641467
return flat_results
14651468

14661469
def pack_async_result(i, state):

0 commit comments

Comments
 (0)