@@ -1402,6 +1402,91 @@ async def core_func2(task, args):
14021402 await canon_lift (opts2 , inst2 , ft2 , core_func2 , None , lambda :[], lambda _ :())
14031403
14041404
1405+ async def test_wasm_to_wasm_stream_empty ():
1406+ fut1 , fut2 , fut3 , fut4 = asyncio .Future (), asyncio .Future (), asyncio .Future (), asyncio .Future ()
1407+
1408+ inst1 = ComponentInstance ()
1409+ opts1 = mk_opts (memory = None , sync = False )
1410+ ft1 = FuncType ([], [StreamType (None )])
1411+ async def core_func1 (task , args ):
1412+ assert (not args )
1413+ [wsi ] = await canon_stream_new (None , task )
1414+ [] = await canon_task_return (task , [StreamType (None )], opts1 , [wsi ])
1415+
1416+ await task .on_block (fut1 )
1417+
1418+ [ret ] = await canon_stream_write (None , opts1 , task , wsi , 10000 , 2 )
1419+ assert (ret == 2 )
1420+ [ret ] = await canon_stream_write (None , opts1 , task , wsi , 10000 , 2 )
1421+ assert (ret == 2 )
1422+
1423+ await task .on_block (fut2 )
1424+
1425+ [ret ] = await canon_stream_write (None , opts1 , task , wsi , 0 , 8 )
1426+ assert (ret == definitions .BLOCKED )
1427+
1428+ fut3 .set_result (None )
1429+
1430+ event , p1 , p2 = await task .wait (sync = False )
1431+ assert (event == EventCode .STREAM_WRITE )
1432+ assert (p1 == wsi )
1433+ assert (p2 == 4 )
1434+
1435+ fut4 .set_result (None )
1436+
1437+ [errctxi ] = await canon_error_context_new (opts1 , task , 0 , 0 )
1438+ [] = await canon_stream_close_writable (None , task , wsi , errctxi )
1439+ [] = await canon_error_context_drop (task , errctxi )
1440+ return []
1441+
1442+ func1 = partial (canon_lift , opts1 , inst1 , ft1 , core_func1 )
1443+
1444+ inst2 = ComponentInstance ()
1445+ heap2 = Heap (10 )
1446+ mem2 = heap2 .memory
1447+ opts2 = mk_opts (memory = heap2 .memory , realloc = heap2 .realloc , sync = False )
1448+ ft2 = FuncType ([], [])
1449+ async def core_func2 (task , args ):
1450+ assert (not args )
1451+ [] = await canon_task_return (task , [], opts2 , [])
1452+
1453+ retp = 0
1454+ [ret ] = await canon_lower (opts2 , ft1 , func1 , task , [retp ])
1455+ assert (ret == 0 )
1456+ rsi = mem2 [0 ]
1457+ assert (rsi == 1 )
1458+
1459+ [ret ] = await canon_stream_read (None , opts2 , task , rsi , 0 , 8 )
1460+ assert (ret == definitions .BLOCKED )
1461+
1462+ fut1 .set_result (None )
1463+
1464+ event , p1 , p2 = await task .wait (sync = False )
1465+ assert (event == EventCode .STREAM_READ )
1466+ assert (p1 == rsi )
1467+ assert (p2 == 4 )
1468+
1469+ fut2 .set_result (None )
1470+ await task .on_block (fut3 )
1471+
1472+ [ret ] = await canon_stream_read (None , opts2 , task , rsi , 1000000 , 2 )
1473+ assert (ret == 2 )
1474+ [ret ] = await canon_stream_read (None , opts2 , task , rsi , 1000000 , 2 )
1475+ assert (ret == 2 )
1476+
1477+ await task .on_block (fut4 )
1478+
1479+ [ret ] = await canon_stream_read (None , opts2 , task , rsi , 1000000 , 2 )
1480+ errctxi = 1
1481+ assert (ret == (definitions .CLOSED | errctxi ))
1482+ [] = await canon_stream_close_readable (None , task , rsi )
1483+ [] = await canon_error_context_debug_message (opts2 , task , errctxi , 0 )
1484+ [] = await canon_error_context_drop (task , errctxi )
1485+ return []
1486+
1487+ await canon_lift (opts2 , inst2 , ft2 , core_func2 , None , lambda :[], lambda _ :())
1488+
1489+
14051490async def test_cancel_copy ():
14061491 inst = ComponentInstance ()
14071492 mem = bytearray (10 )
@@ -1612,6 +1697,7 @@ async def run_async_tests():
16121697 await test_host_partial_reads_writes ()
16131698 await test_async_stream_ops ()
16141699 await test_wasm_to_wasm_stream ()
1700+ await test_wasm_to_wasm_stream_empty ()
16151701 await test_cancel_copy ()
16161702 await test_futures ()
16171703
0 commit comments