Skip to content

Commit 514ff92

Browse files
committed
Make future and stream element types optional
1 parent b87bcf4 commit 514ff92

File tree

7 files changed

+155
-18
lines changed

7 files changed

+155
-18
lines changed

design/mvp/Async.md

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,17 @@ readable and writable ends of streams and futures each have a well-defined
296296
parent `Task` that will receive "progress" events on all child streams/futures
297297
that have previously blocked.
298298

299+
The `T` element type of streams and futures is optional, such that `future` and
300+
`stream` can be written in WIT without a trailing `<T>`. In this case, the
301+
asynchronous "values(s)" being delivered are effectively meaningless [unit]
302+
values. However, the *timing* of delivery is meaningful and thus `future` and
303+
`stream` can used to convey timing-related information. Note that, since
304+
functions are asynchronous by default, a plain `f: func()` conveys completion
305+
without requiring an explicit `future` return type. Thus, a function like
306+
`f2: func() -> future` would convey *two* events: first, the return of `f2`, at
307+
which point the caller receives the readable end of a `future` that, when
308+
successfully read, conveys the completion of a second event.
309+
299310
From a [structured-concurrency](#structured-concurrency) perspective, the
300311
readable and writable ends of streams and futures are leaves of the async call
301312
tree. Unlike subtasks, the parent of the readable ends of streams and future
@@ -606,6 +617,7 @@ comes after:
606617
[CPS Transform]: https://en.wikipedia.org/wiki/Continuation-passing_style
607618
[Event Loop]: https://en.wikipedia.org/wiki/Event_loop
608619
[Structured Concurrency]: https://en.wikipedia.org/wiki/Structured_concurrency
620+
[Unit]: https://en.wikipedia.org/wiki/Unit_type
609621

610622
[AST Explainer]: Explainer.md
611623
[Lift and Lower Definitions]: Explainer.md#canonical-definitions

design/mvp/Binary.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -203,8 +203,8 @@ defvaltype ::= pvt:<primvaltype> => pvt
203203
| 0x6a t?:<valtype>? u?:<valtype>? => (result t? (error u)?)
204204
| 0x69 i:<typeidx> => (own i)
205205
| 0x68 i:<typeidx> => (borrow i)
206-
| 0x66 i:<typeidx> => (stream i)
207-
| 0x65 i:<typeidx> => (future i)
206+
| 0x66 i?:<typeidx>? => (stream i?)
207+
| 0x65 i?:<typeidx>? => (future i?)
208208
labelvaltype ::= l:<label'> t:<valtype> => l t
209209
case ::= l:<label'> t?:<valtype>? 0x00 => (case l t?)
210210
label' ::= len:<u32> l:<label> => l (if len = |l|)

design/mvp/CanonicalABI.md

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -760,8 +760,9 @@ class BufferGuestImpl(Buffer):
760760

761761
def __init__(self, cx, t, ptr, length):
762762
trap_if(length == 0 or length > Buffer.MAX_LENGTH)
763-
trap_if(ptr != align_to(ptr, alignment(t)))
764-
trap_if(ptr + length * elem_size(t) > len(cx.opts.memory))
763+
if t:
764+
trap_if(ptr != align_to(ptr, alignment(t)))
765+
trap_if(ptr + length * elem_size(t) > len(cx.opts.memory))
765766
self.cx = cx
766767
self.t = t
767768
self.ptr = ptr
@@ -774,18 +775,29 @@ class BufferGuestImpl(Buffer):
774775
class ReadableBufferGuestImpl(BufferGuestImpl):
775776
def lift(self, n):
776777
assert(n <= self.remain())
777-
vs = load_list_from_valid_range(self.cx, self.ptr, n, self.t)
778-
self.ptr += n * elem_size(self.t)
778+
if t:
779+
vs = load_list_from_valid_range(self.cx, self.ptr, n, self.t)
780+
self.ptr += n * elem_size(self.t)
781+
else:
782+
vs = n * [()]
779783
self.progress += n
780784
return vs
781785

782786
class WritableBufferGuestImpl(BufferGuestImpl, WritableBuffer):
783787
def lower(self, vs):
784788
assert(len(vs) <= self.remain())
785-
store_list_into_valid_range(self.cx, vs, self.ptr, self.t)
786-
self.ptr += len(vs) * elem_size(self.t)
789+
if t:
790+
store_list_into_valid_range(self.cx, vs, self.ptr, self.t)
791+
self.ptr += len(vs) * elem_size(self.t)
792+
else:
793+
assert(all(v == () for v in vs))
787794
self.progress += len(vs)
788795
```
796+
Note that when `t` is `None` (arising from `stream` and `future` with empty
797+
element types), the core-wasm-supplied `ptr` is entirely ignored, while the
798+
`length` and `progress` are still semantically meaningful. Source bindings may
799+
represent this case with a generic stream/future of [unit] type or a distinct
800+
type that conveys events without values.
789801

790802
The `ReadableStreamGuestImpl` class implements `ReadableStream` for a stream
791803
created by wasm (via `canon stream.new`) and encapsulates the synchronization

design/mvp/Explainer.md

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -555,8 +555,8 @@ defvaltype ::= bool
555555
| (result <valtype>? (error <valtype>)?)
556556
| (own <typeidx>)
557557
| (borrow <typeidx>)
558-
| (stream <typeidx>)
559-
| (future <typeidx>)
558+
| (stream <typeidx>?)
559+
| (future <typeidx>?)
560560
valtype ::= <typeidx>
561561
| <defvaltype>
562562
resourcetype ::= (resource (rep i32) (dtor async? <funcidx> (callback <funcidx>)?)?)
@@ -733,6 +733,14 @@ futures are useful in more advanced scenarios where a parameter or result
733733
value may not be ready at the same time as the other synchronous parameters or
734734
results.
735735

736+
The `T` element type of `stream` and `future` is an optional `valtype`. As with
737+
variant-case payloads and function results, when `T` is absent, the "value(s)"
738+
being asynchronously passed can be thought of as [unit] values. In such cases,
739+
there is no representation of the value in Core WebAssembly (pointers into
740+
linear memory are ignored) however the *timing* of completed reads and writes
741+
is observable and meaningful. Thus, empty futures and streams can be useful for
742+
timing-related APIs.
743+
736744
Currently, validation rejects `(stream T)` and `(future T)` when `T`
737745
transitively contains a `borrow`. This restriction could be relaxed in the
738746
future by extending the call-scoping rules of `borrow` to streams and futures.
@@ -2672,6 +2680,7 @@ For some use-case-focused, worked examples, see:
26722680
[Subtyping]: https://en.wikipedia.org/wiki/Subtyping
26732681
[Universal Types]: https://en.wikipedia.org/wiki/System_F
26742682
[Existential Types]: https://en.wikipedia.org/wiki/System_F
2683+
[Unit]: https://en.wikipedia.org/wiki/Unit_type
26752684

26762685
[Generative]: https://www.researchgate.net/publication/2426300_A_Syntactic_Theory_of_Type_Generativity_and_Sharing
26772686
[Avoidance Problem]: https://counterexamples.org/avoidance.html

design/mvp/WIT.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1557,6 +1557,8 @@ ty ::= 'u8' | 'u16' | 'u32' | 'u64'
15571557
| option
15581558
| result
15591559
| handle
1560+
| future
1561+
| stream
15601562
| id
15611563
15621564
tuple ::= 'tuple' '<' tuple-list '>'
@@ -1574,6 +1576,12 @@ result ::= 'result' '<' ty ',' ty '>'
15741576
| 'result' '<' '_' ',' ty '>'
15751577
| 'result' '<' ty '>'
15761578
| 'result'
1579+
1580+
future ::= 'future' '<' ty '>'
1581+
| 'future'
1582+
1583+
stream ::= 'stream' '<' ty '>'
1584+
| 'stream'
15771585
```
15781586

15791587
The `tuple` type is semantically equivalent to a `record` with numerical fields,
@@ -1608,6 +1616,9 @@ variant result {
16081616
These types are so frequently used and frequently have language-specific
16091617
meanings though so they're also provided as first-class types.
16101618

1619+
The `future` and `stream` types are described as part of the [async
1620+
explainer](Async.md#streams-and-futures).
1621+
16111622
Finally the last case of a `ty` is simply an `id` which is intended to refer to
16121623
another type or resource defined in the document. Note that definitions can come
16131624
through a `use` statement or they can be defined locally.

design/mvp/canonical-abi/definitions.py

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -171,11 +171,11 @@ class BorrowType(ValType):
171171

172172
@dataclass
173173
class StreamType(ValType):
174-
t: ValType
174+
t: Optional[ValType]
175175

176176
@dataclass
177177
class FutureType(ValType):
178-
t: ValType
178+
t: Optional[ValType]
179179

180180
### Lifting and Lowering Context
181181

@@ -534,8 +534,9 @@ class BufferGuestImpl(Buffer):
534534

535535
def __init__(self, cx, t, ptr, length):
536536
trap_if(length == 0 or length > Buffer.MAX_LENGTH)
537-
trap_if(ptr != align_to(ptr, alignment(t)))
538-
trap_if(ptr + length * elem_size(t) > len(cx.opts.memory))
537+
if t:
538+
trap_if(ptr != align_to(ptr, alignment(t)))
539+
trap_if(ptr + length * elem_size(t) > len(cx.opts.memory))
539540
self.cx = cx
540541
self.t = t
541542
self.ptr = ptr
@@ -548,16 +549,22 @@ def remain(self):
548549
class ReadableBufferGuestImpl(BufferGuestImpl):
549550
def lift(self, n):
550551
assert(n <= self.remain())
551-
vs = load_list_from_valid_range(self.cx, self.ptr, n, self.t)
552-
self.ptr += n * elem_size(self.t)
552+
if self.t:
553+
vs = load_list_from_valid_range(self.cx, self.ptr, n, self.t)
554+
self.ptr += n * elem_size(self.t)
555+
else:
556+
vs = n * [()]
553557
self.progress += n
554558
return vs
555559

556560
class WritableBufferGuestImpl(BufferGuestImpl, WritableBuffer):
557561
def lower(self, vs):
558562
assert(len(vs) <= self.remain())
559-
store_list_into_valid_range(self.cx, vs, self.ptr, self.t)
560-
self.ptr += len(vs) * elem_size(self.t)
563+
if self.t:
564+
store_list_into_valid_range(self.cx, vs, self.ptr, self.t)
565+
self.ptr += len(vs) * elem_size(self.t)
566+
else:
567+
assert(all(v == () for v in vs))
561568
self.progress += len(vs)
562569

563570
class ReadableStreamGuestImpl(ReadableStream):

design/mvp/canonical-abi/run_tests.py

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1402,6 +1402,91 @@ async def core_func2(task, args):
14021402
await canon_lift(opts2, inst2, ft2, core_func2, None, lambda:[], lambda _:())
14031403

14041404

1405+
async def test_wasm_to_wasm_stream_empty():
1406+
fut1, fut2, fut3, fut4 = asyncio.Future(), asyncio.Future(), asyncio.Future(), asyncio.Future()
1407+
1408+
inst1 = ComponentInstance()
1409+
opts1 = mk_opts(memory=None, sync=False)
1410+
ft1 = FuncType([], [StreamType(None)])
1411+
async def core_func1(task, args):
1412+
assert(not args)
1413+
[wsi] = await canon_stream_new(None, task)
1414+
[] = await canon_task_return(task, [StreamType(None)], opts1, [wsi])
1415+
1416+
await task.on_block(fut1)
1417+
1418+
[ret] = await canon_stream_write(None, opts1, task, wsi, 10000, 2)
1419+
assert(ret == 2)
1420+
[ret] = await canon_stream_write(None, opts1, task, wsi, 10000, 2)
1421+
assert(ret == 2)
1422+
1423+
await task.on_block(fut2)
1424+
1425+
[ret] = await canon_stream_write(None, opts1, task, wsi, 0, 8)
1426+
assert(ret == definitions.BLOCKED)
1427+
1428+
fut3.set_result(None)
1429+
1430+
event, p1, p2 = await task.wait(sync = False)
1431+
assert(event == EventCode.STREAM_WRITE)
1432+
assert(p1 == wsi)
1433+
assert(p2 == 4)
1434+
1435+
fut4.set_result(None)
1436+
1437+
[errctxi] = await canon_error_context_new(opts1, task, 0, 0)
1438+
[] = await canon_stream_close_writable(None, task, wsi, errctxi)
1439+
[] = await canon_error_context_drop(task, errctxi)
1440+
return []
1441+
1442+
func1 = partial(canon_lift, opts1, inst1, ft1, core_func1)
1443+
1444+
inst2 = ComponentInstance()
1445+
heap2 = Heap(10)
1446+
mem2 = heap2.memory
1447+
opts2 = mk_opts(memory=heap2.memory, realloc=heap2.realloc, sync=False)
1448+
ft2 = FuncType([], [])
1449+
async def core_func2(task, args):
1450+
assert(not args)
1451+
[] = await canon_task_return(task, [], opts2, [])
1452+
1453+
retp = 0
1454+
[ret] = await canon_lower(opts2, ft1, func1, task, [retp])
1455+
assert(ret == 0)
1456+
rsi = mem2[0]
1457+
assert(rsi == 1)
1458+
1459+
[ret] = await canon_stream_read(None, opts2, task, rsi, 0, 8)
1460+
assert(ret == definitions.BLOCKED)
1461+
1462+
fut1.set_result(None)
1463+
1464+
event, p1, p2 = await task.wait(sync = False)
1465+
assert(event == EventCode.STREAM_READ)
1466+
assert(p1 == rsi)
1467+
assert(p2 == 4)
1468+
1469+
fut2.set_result(None)
1470+
await task.on_block(fut3)
1471+
1472+
[ret] = await canon_stream_read(None, opts2, task, rsi, 1000000, 2)
1473+
assert(ret == 2)
1474+
[ret] = await canon_stream_read(None, opts2, task, rsi, 1000000, 2)
1475+
assert(ret == 2)
1476+
1477+
await task.on_block(fut4)
1478+
1479+
[ret] = await canon_stream_read(None, opts2, task, rsi, 1000000, 2)
1480+
errctxi = 1
1481+
assert(ret == (definitions.CLOSED | errctxi))
1482+
[] = await canon_stream_close_readable(None, task, rsi)
1483+
[] = await canon_error_context_debug_message(opts2, task, errctxi, 0)
1484+
[] = await canon_error_context_drop(task, errctxi)
1485+
return []
1486+
1487+
await canon_lift(opts2, inst2, ft2, core_func2, None, lambda:[], lambda _:())
1488+
1489+
14051490
async def test_cancel_copy():
14061491
inst = ComponentInstance()
14071492
mem = bytearray(10)
@@ -1612,6 +1697,7 @@ async def run_async_tests():
16121697
await test_host_partial_reads_writes()
16131698
await test_async_stream_ops()
16141699
await test_wasm_to_wasm_stream()
1700+
await test_wasm_to_wasm_stream_empty()
16151701
await test_cancel_copy()
16161702
await test_futures()
16171703

0 commit comments

Comments
 (0)