Skip to content

Commit ef73d2e

Browse files
committed
Simplify test_batch_io_large_operations
1 parent d29340a commit ef73d2e

File tree

1 file changed

+47
-212
lines changed

1 file changed

+47
-212
lines changed

cuda_bindings/tests/test_cufile.py

Lines changed: 47 additions & 212 deletions
Original file line numberDiff line numberDiff line change
@@ -1412,7 +1412,6 @@ def test_batch_io_cancel():
14121412
cufile.driver_close()
14131413
cuda.cuDevicePrimaryCtxRelease(device)
14141414

1415-
14161415
@pytest.mark.skipif(not isSupportedFilesystem(), reason="cuFile handle_register requires ext4 or xfs filesystem")
14171416
def test_batch_io_large_operations():
14181417
"""Test batch IO with large buffer operations."""
@@ -1458,32 +1457,28 @@ def test_batch_io_large_operations():
14581457
try:
14591458
# Create file with O_DIRECT
14601459
fd = os.open(file_path, os.O_CREAT | os.O_RDWR | os.O_DIRECT, 0o600)
1460+
14611461
# Register all buffers with cuFile
14621462
all_buffers = write_buffers + read_buffers
1463-
for i, buf in enumerate(all_buffers):
1463+
for buf in all_buffers:
14641464
buf_int = int(buf)
1465-
try:
1466-
cufile.buf_register(buf_int, buf_size, 0)
1467-
except Exception as e:
1468-
print(f"*** Buffer {i} registration FAILED: {e} ***")
1469-
raise
1465+
cufile.buf_register(buf_int, buf_size, 0)
14701466

14711467
# Create file descriptor
14721468
descr = cufile.Descr()
14731469
descr.type = cufile.FileHandleType.OPAQUE_FD
14741470
descr.handle.fd = fd
14751471
descr.fs_ops = 0
14761472

1477-
14781473
# Register file handle
14791474
handle = cufile.handle_register(descr.ptr)
14801475

14811476
# Set up batch IO
1482-
batch_handle = cufile.batch_io_set_up(num_operations * 2) # 2 writes + 2 reads
1477+
batch_handle = cufile.batch_io_set_up(num_operations) # Only for writes
14831478

14841479
# Create IOParams array for batch operations
1485-
io_params = cufile.IOParams(num_operations * 2)
1486-
io_events = cufile.IOEvents(num_operations * 2)
1480+
io_params = cufile.IOParams(num_operations)
1481+
io_events = cufile.IOEvents(num_operations)
14871482

14881483
# Prepare test data
14891484
test_strings = [
@@ -1513,58 +1508,35 @@ def test_batch_io_large_operations():
15131508
io_params[i].u.batch.dev_ptr_offset = 0
15141509
io_params[i].u.batch.size_ = buf_size
15151510

1516-
# Set up read operations
1517-
for i in range(num_operations):
1518-
idx = i + num_operations
1519-
io_params[idx].mode = cufile.BatchMode.BATCH # Batch mode
1520-
io_params[idx].fh = handle
1521-
io_params[idx].opcode = cufile.Opcode.READ # Read opcode
1522-
io_params[idx].cookie = i + 100
1523-
io_params[idx].u.batch.dev_ptr_base = int(read_buffers[i])
1524-
io_params[idx].u.batch.file_offset = i * buf_size
1525-
io_params[idx].u.batch.dev_ptr_offset = 0
1526-
io_params[idx].u.batch.size_ = buf_size
1527-
1528-
1529-
1530-
1531-
for i in range(num_operations):
1532-
print(f" Op {i}: cookie={io_params[i].cookie}, opcode={io_params[i].opcode}, offset={io_params[i].u.batch.file_offset}")
1533-
1534-
for i in range(num_operations):
1535-
idx = i + num_operations
1536-
print(f" Op {idx}: cookie={io_params[idx].cookie}, opcode={io_params[idx].opcode}, offset={io_params[idx].u.batch.file_offset}")
1537-
1538-
1539-
# Submit writes first
1540-
cufile.batch_io_submit(batch_handle, num_operations, io_params.ptr, 0) # Only writes
1511+
# Submit writes
1512+
cufile.batch_io_submit(batch_handle, num_operations, io_params.ptr, 0)
15411513

1542-
1514+
# Wait for writes to complete
15431515
nr_completed_writes = ctypes.c_uint(num_operations)
15441516
timeout = ctypes.c_int(10000)
15451517
cufile.batch_io_get_status(
15461518
batch_handle, num_operations, ctypes.addressof(nr_completed_writes),
15471519
io_events.ptr, ctypes.addressof(timeout)
15481520
)
15491521

1550-
15511522
# Verify writes succeeded
15521523
for i in range(nr_completed_writes.value):
1553-
if io_events[i].status != cufile.Status.COMPLETE:
1554-
raise RuntimeError(f"Write {i} failed: {io_events[i].status}")
1555-
print(f"Write {io_events[i].cookie}: {io_events[i].ret} bytes")
1524+
assert io_events[i].status == cufile.Status.COMPLETE, (
1525+
f"Write {i} failed with status {io_events[i].status}"
1526+
)
15561527

15571528
# Force file sync
15581529
os.fsync(fd)
1559-
print("File sync after writes completed")
1530+
1531+
# Clean up write batch
1532+
cufile.batch_io_destroy(batch_handle)
15601533

15611534
# Now submit reads separately
1562-
print("Submitting reads...")
15631535
read_batch_handle = cufile.batch_io_set_up(num_operations)
15641536
read_io_params = cufile.IOParams(num_operations)
15651537
read_io_events = cufile.IOEvents(num_operations)
15661538

1567-
# Set up read operations in separate array
1539+
# Set up read operations
15681540
for i in range(num_operations):
15691541
read_io_params[i].mode = cufile.BatchMode.BATCH
15701542
read_io_params[i].fh = handle
@@ -1579,156 +1551,44 @@ def test_batch_io_large_operations():
15791551
cufile.batch_io_submit(read_batch_handle, num_operations, read_io_params.ptr, 0)
15801552

15811553
# Wait for reads
1582-
nr_completed_reads = ctypes.c_uint(num_operations)
1554+
nr_completed = ctypes.c_uint(num_operations)
15831555
cufile.batch_io_get_status(
1584-
read_batch_handle, num_operations, ctypes.addressof(nr_completed_reads),
1556+
read_batch_handle, num_operations, ctypes.addressof(nr_completed),
15851557
read_io_events.ptr, ctypes.addressof(timeout)
15861558
)
15871559

1588-
1589-
# Check read results
1590-
for i in range(nr_completed_reads.value):
1591-
print(f"Read {read_io_events[i].cookie}: {read_io_events[i].ret} bytes")
1592-
1593-
# Use read_io_events for verification instead of io_events
1594-
io_events = read_io_events # Replace for rest of test
1595-
nr_completed = nr_completed_reads
1596-
1597-
# Clean up read batch
1598-
cufile.batch_io_destroy(read_batch_handle)
1599-
1600-
# Enhanced operation analysis
1601-
print("=== Detailed Operation Results ===")
1602-
# Check each operation's detailed status
1603-
write_ops = []
1604-
read_ops = []
1605-
1606-
for i in range(nr_completed.value):
1607-
event = io_events[i]
1608-
status_name = "UNKNOWN"
1609-
try:
1610-
status_name = cufile.Status(event.status).name
1611-
except:
1612-
pass
1613-
1614-
print(f"Operation {i}:")
1615-
print(f" Cookie: {event.cookie}")
1616-
print(f" Status: {event.status} ({status_name})")
1617-
print(f" Result: {event.ret}")
1618-
1619-
# Categorize operations by cookie
1620-
if event.cookie < 100: # Write operations (cookies 0, 1)
1621-
write_ops.append({
1622-
'index': i,
1623-
'cookie': event.cookie,
1624-
'result': event.ret,
1625-
'status': event.status
1626-
})
1627-
print(f" -> WRITE operation: {event.ret} bytes")
1628-
else: # Read operations (cookies 100, 101)
1629-
read_ops.append({
1630-
'index': i,
1631-
'cookie': event.cookie,
1632-
'result': event.ret,
1633-
'status': event.status
1634-
})
1635-
print(f" -> READ operation: {event.ret} bytes")
1636-
1637-
# Check if operation failed
1638-
if event.status != cufile.Status.COMPLETE:
1639-
print(f" *** OPERATION {i} FAILED ***")
1640-
if event.status == cufile.Status.FAILED:
1641-
print(f" Error code: {event.ret}")
1642-
1643-
print("=== Operation Analysis ===")
1644-
print(f"Write operations completed: {len(write_ops)}")
1645-
print(f"Read operations completed: {len(read_ops)}")
1646-
1647-
# Check if all writes succeeded before analyzing reads
1648-
all_writes_success = all(op['result'] > 0 for op in write_ops)
1649-
print(f"All writes successful: {all_writes_success}")
1650-
1651-
if all_writes_success:
1652-
print("Writes completed successfully, reads should now work")
1653-
else:
1654-
print("Some writes failed - this could explain read failures")
1655-
1656-
# Show operation completion order
1657-
print("=== Operation Completion Order ===")
1658-
for i, event in enumerate([(io_events[j].cookie, io_events[j].ret) for j in range(nr_completed.value)]):
1659-
cookie, result = event
1660-
op_type = "WRITE" if cookie < 100 else "READ"
1661-
print(f"Position {i}: {op_type} (cookie {cookie}) -> {result} bytes")
1662-
1663-
# Write completion check
1664-
print("=== Write Completion Check ===")
1665-
# Check if writes actually completed by reading file size
1666-
file_stat = os.fstat(fd)
1667-
print(f"File size after batch: {file_stat.st_size}")
1668-
1669-
# Try a small direct read to verify data is in file
1670-
try:
1671-
test_buf_size = 1024
1672-
err, test_buf = cuda.cuMemAlloc(test_buf_size)
1673-
cufile.buf_register(int(test_buf), test_buf_size, 0)
1674-
1675-
# Try reading first 1KB directly
1676-
cufile.read(handle, int(test_buf), test_buf_size, 0, 0)
1677-
1678-
# Copy back and check
1679-
test_host_buf = ctypes.create_string_buffer(test_buf_size)
1680-
cuda.cuMemcpyDtoH(test_host_buf, test_buf, test_buf_size)
1681-
test_data = test_host_buf.value
1682-
1683-
print(f"Direct read test: {len(test_data)} bytes")
1684-
print(f"First 50 bytes: {test_data[:50]!r}")
1685-
1686-
# Cleanup test buffer
1687-
cufile.buf_deregister(int(test_buf))
1688-
cuda.cuMemFree(test_buf)
1689-
1690-
except Exception as e:
1691-
print(f"Direct read test failed: {e}")
1692-
16931560
# Verify all operations completed successfully
16941561
assert nr_completed.value == num_operations, (
1695-
f"Expected {num_operations} read operations, got {nr_completed.value}"
1562+
f"Expected {num_operations} operations, got {nr_completed.value}"
16961563
)
16971564

16981565
# Collect all returned cookies
16991566
returned_cookies = set()
17001567
for i in range(num_operations):
1701-
if io_events[i].status != cufile.Status.COMPLETE:
1702-
print(f"*** Operation {i} with cookie {io_events[i].cookie} failed with status {io_events[i].status} ***")
1703-
assert io_events[i].status == cufile.Status.COMPLETE, (
1704-
f"Operation {i} failed with status {io_events[i].status}"
1568+
assert read_io_events[i].status == cufile.Status.COMPLETE, (
1569+
f"Operation {i} failed with status {read_io_events[i].status}"
17051570
)
1706-
returned_cookies.add(io_events[i].cookie)
1571+
returned_cookies.add(read_io_events[i].cookie)
17071572

17081573
# Verify all expected cookies are present
1709-
expected_cookies = set(range(100, 100 + num_operations)) # read cookies 100,101
1574+
expected_cookies = set(range(100, 100 + num_operations))
17101575
assert returned_cookies == expected_cookies, (
17111576
f"Cookie mismatch. Expected {expected_cookies}, got {returned_cookies}"
17121577
)
17131578

17141579
# Verify the read data matches the written data
17151580
for i in range(num_operations):
1716-
17171581
# Copy read data back to host
17181582
cuda.cuMemcpyDtoHAsync(host_buf, read_buffers[i], buf_size, 0)
17191583
cuda.cuStreamSynchronize(0)
17201584
read_data = host_buf.value
17211585

1722-
17231586
# Prepare expected data
17241587
test_string = test_strings[i]
17251588
test_string_len = len(test_string)
17261589
repetitions = buf_size // test_string_len
17271590
expected_data = (test_string * repetitions)[:buf_size]
17281591

1729-
1730-
1731-
17321592
if read_data != expected_data:
17331593
n = 100 # Show first n bytes
17341594
raise RuntimeError(
@@ -1738,58 +1598,33 @@ def test_batch_io_large_operations():
17381598
f"expected {expected_data[:n]!r}"
17391599
)
17401600

1741-
print("=== Test Completed Successfully ===")
1742-
1743-
finally:
1744-
# Cleanup
1745-
try:
1746-
if 'all_buffers' in locals():
1747-
for buf in all_buffers:
1748-
cufile.buf_deregister(int(buf))
1749-
cuda.cuMemFree(buf)
1750-
except Exception as e:
1751-
print(f"Cleanup error: {e}")
1752-
1753-
try:
1754-
if 'handle' in locals():
1755-
cufile.handle_deregister(handle)
1756-
except Exception as e:
1757-
print(f"Handle deregister error: {e}")
1758-
1759-
try:
1760-
if 'batch_handle' in locals():
1761-
cufile.batch_io_destroy(batch_handle)
1762-
except Exception as e:
1763-
print(f"Batch destroy error: {e}")
1764-
1765-
try:
1766-
if 'read_batch_handle' in locals():
1767-
cufile.batch_io_destroy(read_batch_handle)
1768-
except Exception as e:
1769-
print(f"Read batch destroy error: {e}")
1770-
1771-
try:
1772-
if 'fd' in locals():
1773-
os.close(fd)
1774-
except Exception as e:
1775-
print(f"File close error: {e}")
1601+
# Clean up batch IO
1602+
cufile.batch_io_destroy(read_batch_handle)
17761603

1777-
try:
1778-
if os.path.exists(file_path):
1779-
os.remove(file_path)
1780-
except Exception as e:
1781-
print(f"File remove error: {e}")
1604+
# Deregister file handle
1605+
cufile.handle_deregister(handle)
17821606

1783-
try:
1784-
cufile.driver_close()
1785-
except Exception as e:
1786-
print(f"Driver close error: {e}")
1607+
# Deregister buffers
1608+
for buf in all_buffers:
1609+
buf_int = int(buf)
1610+
cufile.buf_deregister(buf_int)
17871611

1612+
finally:
1613+
# Close file
1614+
os.close(fd)
1615+
# Free CUDA memory
1616+
for buf in all_buffers:
1617+
cuda.cuMemFree(buf)
1618+
# Clean up test file
17881619
try:
1789-
cuda.cuDevicePrimaryCtxRelease(device)
1790-
except Exception as e:
1791-
print(f"Context release error: {e}")
1792-
1620+
os.unlink(file_path)
1621+
except OSError as e:
1622+
if e.errno != errno.ENOENT:
1623+
raise
1624+
# Close cuFile driver
1625+
cufile.driver_close()
1626+
cuda.cuDevicePrimaryCtxRelease(device)
1627+
17931628
@pytest.mark.skipif(
17941629
cufileVersionLessThan(1140), reason="cuFile parameter APIs require cuFile library version 1.14.0 or later"
17951630
)
@@ -1984,7 +1819,7 @@ def test_set_get_parameter_bool():
19841819

19851820
finally:
19861821
cuda.cuDevicePrimaryCtxRelease(device)
1987-
1822+
19881823
def test_set_get_parameter_string():
19891824
"""Test setting and getting string parameters with cuFile validation."""
19901825

0 commit comments

Comments
 (0)