Skip to content

Commit 5d66f9d

Browse files
committed
Make future and stream element types optional
1 parent b87bcf4 commit 5d66f9d

File tree

5 files changed

+132
-18
lines changed

5 files changed

+132
-18
lines changed

design/mvp/Binary.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -203,8 +203,8 @@ defvaltype ::= pvt:<primvaltype> => pvt
203203
| 0x6a t?:<valtype>? u?:<valtype>? => (result t? (error u)?)
204204
| 0x69 i:<typeidx> => (own i)
205205
| 0x68 i:<typeidx> => (borrow i)
206-
| 0x66 i:<typeidx> => (stream i)
207-
| 0x65 i:<typeidx> => (future i)
206+
| 0x66 i?:<typeidx>? => (stream i?)
207+
| 0x65 i?:<typeidx>? => (future i?)
208208
labelvaltype ::= l:<label'> t:<valtype> => l t
209209
case ::= l:<label'> t?:<valtype>? 0x00 => (case l t?)
210210
label' ::= len:<u32> l:<label> => l (if len = |l|)

design/mvp/CanonicalABI.md

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -760,8 +760,9 @@ class BufferGuestImpl(Buffer):
760760

761761
def __init__(self, cx, t, ptr, length):
762762
trap_if(length == 0 or length > Buffer.MAX_LENGTH)
763-
trap_if(ptr != align_to(ptr, alignment(t)))
764-
trap_if(ptr + length * elem_size(t) > len(cx.opts.memory))
763+
if t:
764+
trap_if(ptr != align_to(ptr, alignment(t)))
765+
trap_if(ptr + length * elem_size(t) > len(cx.opts.memory))
765766
self.cx = cx
766767
self.t = t
767768
self.ptr = ptr
@@ -774,18 +775,29 @@ class BufferGuestImpl(Buffer):
774775
class ReadableBufferGuestImpl(BufferGuestImpl):
775776
def lift(self, n):
776777
assert(n <= self.remain())
777-
vs = load_list_from_valid_range(self.cx, self.ptr, n, self.t)
778-
self.ptr += n * elem_size(self.t)
778+
if t:
779+
vs = load_list_from_valid_range(self.cx, self.ptr, n, self.t)
780+
self.ptr += n * elem_size(self.t)
781+
else:
782+
vs = n * [()]
779783
self.progress += n
780784
return vs
781785

782786
class WritableBufferGuestImpl(BufferGuestImpl, WritableBuffer):
783787
def lower(self, vs):
784788
assert(len(vs) <= self.remain())
785-
store_list_into_valid_range(self.cx, vs, self.ptr, self.t)
786-
self.ptr += len(vs) * elem_size(self.t)
789+
if t:
790+
store_list_into_valid_range(self.cx, vs, self.ptr, self.t)
791+
self.ptr += len(vs) * elem_size(self.t)
792+
else:
793+
assert(all(v == () for v in vs))
787794
self.progress += len(vs)
788795
```
796+
Note that when `t` is `None` (arising from `stream` and `future` with empty
797+
element types), the core-wasm-supplied `ptr` is entirely ignored, while the
798+
`length` and `progress` are still semantically meaningful. Source bindings may
799+
represent this case with a generic of [unit] type or a distinct type that
800+
conveys timing events without a value.
789801

790802
The `ReadableStreamGuestImpl` class implements `ReadableStream` for a stream
791803
created by wasm (via `canon stream.new`) and encapsulates the synchronization

design/mvp/Explainer.md

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -555,8 +555,8 @@ defvaltype ::= bool
555555
| (result <valtype>? (error <valtype>)?)
556556
| (own <typeidx>)
557557
| (borrow <typeidx>)
558-
| (stream <typeidx>)
559-
| (future <typeidx>)
558+
| (stream <typeidx>?)
559+
| (future <typeidx>?)
560560
valtype ::= <typeidx>
561561
| <defvaltype>
562562
resourcetype ::= (resource (rep i32) (dtor async? <funcidx> (callback <funcidx>)?)?)
@@ -733,6 +733,14 @@ futures are useful in more advanced scenarios where a parameter or result
733733
value may not be ready at the same time as the other synchronous parameters or
734734
results.
735735

736+
The `T` element type of `stream` and `future` is an optional `valtype`. As with
737+
variant-case payloads and function results, when `T` is absent, the "value(s)"
738+
being asynchronously passed can be thought of as [unit] values. In such cases,
739+
there is no representation of the value in Core WebAssembly (pointers into
740+
linear memory are ignored) however the *timing* of completed reads and writes
741+
is observable and meaningful. Thus, empty futures and streams can be useful for
742+
timing-related APIs.
743+
736744
Currently, validation rejects `(stream T)` and `(future T)` when `T`
737745
transitively contains a `borrow`. This restriction could be relaxed in the
738746
future by extending the call-scoping rules of `borrow` to streams and futures.
@@ -2672,6 +2680,7 @@ For some use-case-focused, worked examples, see:
26722680
[Subtyping]: https://en.wikipedia.org/wiki/Subtyping
26732681
[Universal Types]: https://en.wikipedia.org/wiki/System_F
26742682
[Existential Types]: https://en.wikipedia.org/wiki/System_F
2683+
[Unit]: https://en.wikipedia.org/wiki/Unit_type
26752684

26762685
[Generative]: https://www.researchgate.net/publication/2426300_A_Syntactic_Theory_of_Type_Generativity_and_Sharing
26772686
[Avoidance Problem]: https://counterexamples.org/avoidance.html

design/mvp/canonical-abi/definitions.py

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -171,11 +171,11 @@ class BorrowType(ValType):
171171

172172
@dataclass
173173
class StreamType(ValType):
174-
t: ValType
174+
t: Optional[ValType]
175175

176176
@dataclass
177177
class FutureType(ValType):
178-
t: ValType
178+
t: Optional[ValType]
179179

180180
### Lifting and Lowering Context
181181

@@ -534,8 +534,9 @@ class BufferGuestImpl(Buffer):
534534

535535
def __init__(self, cx, t, ptr, length):
536536
trap_if(length == 0 or length > Buffer.MAX_LENGTH)
537-
trap_if(ptr != align_to(ptr, alignment(t)))
538-
trap_if(ptr + length * elem_size(t) > len(cx.opts.memory))
537+
if t:
538+
trap_if(ptr != align_to(ptr, alignment(t)))
539+
trap_if(ptr + length * elem_size(t) > len(cx.opts.memory))
539540
self.cx = cx
540541
self.t = t
541542
self.ptr = ptr
@@ -548,16 +549,22 @@ def remain(self):
548549
class ReadableBufferGuestImpl(BufferGuestImpl):
549550
def lift(self, n):
550551
assert(n <= self.remain())
551-
vs = load_list_from_valid_range(self.cx, self.ptr, n, self.t)
552-
self.ptr += n * elem_size(self.t)
552+
if self.t:
553+
vs = load_list_from_valid_range(self.cx, self.ptr, n, self.t)
554+
self.ptr += n * elem_size(self.t)
555+
else:
556+
vs = n * [()]
553557
self.progress += n
554558
return vs
555559

556560
class WritableBufferGuestImpl(BufferGuestImpl, WritableBuffer):
557561
def lower(self, vs):
558562
assert(len(vs) <= self.remain())
559-
store_list_into_valid_range(self.cx, vs, self.ptr, self.t)
560-
self.ptr += len(vs) * elem_size(self.t)
563+
if self.t:
564+
store_list_into_valid_range(self.cx, vs, self.ptr, self.t)
565+
self.ptr += len(vs) * elem_size(self.t)
566+
else:
567+
assert(all(v == () for v in vs))
561568
self.progress += len(vs)
562569

563570
class ReadableStreamGuestImpl(ReadableStream):

design/mvp/canonical-abi/run_tests.py

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1402,6 +1402,91 @@ async def core_func2(task, args):
14021402
await canon_lift(opts2, inst2, ft2, core_func2, None, lambda:[], lambda _:())
14031403

14041404

1405+
async def test_wasm_to_wasm_stream_empty():
1406+
fut1, fut2, fut3, fut4 = asyncio.Future(), asyncio.Future(), asyncio.Future(), asyncio.Future()
1407+
1408+
inst1 = ComponentInstance()
1409+
opts1 = mk_opts(memory=None, sync=False)
1410+
ft1 = FuncType([], [StreamType(None)])
1411+
async def core_func1(task, args):
1412+
assert(not args)
1413+
[wsi] = await canon_stream_new(None, task)
1414+
[] = await canon_task_return(task, [StreamType(None)], opts1, [wsi])
1415+
1416+
await task.on_block(fut1)
1417+
1418+
[ret] = await canon_stream_write(None, opts1, task, wsi, 10000, 2)
1419+
assert(ret == 2)
1420+
[ret] = await canon_stream_write(None, opts1, task, wsi, 10000, 2)
1421+
assert(ret == 2)
1422+
1423+
await task.on_block(fut2)
1424+
1425+
[ret] = await canon_stream_write(None, opts1, task, wsi, 0, 8)
1426+
assert(ret == definitions.BLOCKED)
1427+
1428+
fut3.set_result(None)
1429+
1430+
event, p1, p2 = await task.wait(sync = False)
1431+
assert(event == EventCode.STREAM_WRITE)
1432+
assert(p1 == wsi)
1433+
assert(p2 == 4)
1434+
1435+
fut4.set_result(None)
1436+
1437+
[errctxi] = await canon_error_context_new(opts1, task, 0, 0)
1438+
[] = await canon_stream_close_writable(None, task, wsi, errctxi)
1439+
[] = await canon_error_context_drop(task, errctxi)
1440+
return []
1441+
1442+
func1 = partial(canon_lift, opts1, inst1, ft1, core_func1)
1443+
1444+
inst2 = ComponentInstance()
1445+
heap2 = Heap(10)
1446+
mem2 = heap2.memory
1447+
opts2 = mk_opts(memory=heap2.memory, realloc=heap2.realloc, sync=False)
1448+
ft2 = FuncType([], [])
1449+
async def core_func2(task, args):
1450+
assert(not args)
1451+
[] = await canon_task_return(task, [], opts2, [])
1452+
1453+
retp = 0
1454+
[ret] = await canon_lower(opts2, ft1, func1, task, [retp])
1455+
assert(ret == 0)
1456+
rsi = mem2[0]
1457+
assert(rsi == 1)
1458+
1459+
[ret] = await canon_stream_read(None, opts2, task, rsi, 0, 8)
1460+
assert(ret == definitions.BLOCKED)
1461+
1462+
fut1.set_result(None)
1463+
1464+
event, p1, p2 = await task.wait(sync = False)
1465+
assert(event == EventCode.STREAM_READ)
1466+
assert(p1 == rsi)
1467+
assert(p2 == 4)
1468+
1469+
fut2.set_result(None)
1470+
await task.on_block(fut3)
1471+
1472+
[ret] = await canon_stream_read(None, opts2, task, rsi, 1000000, 2)
1473+
assert(ret == 2)
1474+
[ret] = await canon_stream_read(None, opts2, task, rsi, 1000000, 2)
1475+
assert(ret == 2)
1476+
1477+
await task.on_block(fut4)
1478+
1479+
[ret] = await canon_stream_read(None, opts2, task, rsi, 1000000, 2)
1480+
errctxi = 1
1481+
assert(ret == (definitions.CLOSED | errctxi))
1482+
[] = await canon_stream_close_readable(None, task, rsi)
1483+
[] = await canon_error_context_debug_message(opts2, task, errctxi, 0)
1484+
[] = await canon_error_context_drop(task, errctxi)
1485+
return []
1486+
1487+
await canon_lift(opts2, inst2, ft2, core_func2, None, lambda:[], lambda _:())
1488+
1489+
14051490
async def test_cancel_copy():
14061491
inst = ComponentInstance()
14071492
mem = bytearray(10)
@@ -1612,6 +1697,7 @@ async def run_async_tests():
16121697
await test_host_partial_reads_writes()
16131698
await test_async_stream_ops()
16141699
await test_wasm_to_wasm_stream()
1700+
await test_wasm_to_wasm_stream_empty()
16151701
await test_cancel_copy()
16161702
await test_futures()
16171703

0 commit comments

Comments
 (0)