Skip to content

Commit ca3ade0

Browse files
authored
Fix concurrency update (#24)
1 parent ee38fa1 commit ca3ade0

File tree

3 files changed

+92
-3
lines changed

3 files changed

+92
-3
lines changed

iceberg_rust_ffi/Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

iceberg_rust_ffi/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "iceberg_rust_ffi"
3-
version = "0.4.0"
3+
version = "0.4.1"
44
edition = "2021"
55

66
[lib]
@@ -12,7 +12,7 @@ default = ["julia"]
1212
julia = []
1313

1414
[dependencies]
15-
iceberg = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "cd1daca8d45eb2a78bc90f1eec18435502c6bc04" }
15+
iceberg = { git = "https://github.com/RelationalAI/iceberg-rust.git", rev = "feeb34438f3803395c24b9ffeab95402fd9b2ddd" }
1616
object_store_ffi = { git = "https://github.com/RelationalAI/object_store_ffi", rev = "79b08071c7a1642532b5891253280861eca9e44e", default-features = false }
1717
tokio = { version = "1.0", features = ["full"] }
1818
futures = "0.3"

test/runtests.jl

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,95 @@ end
390390
end
391391
end
392392

393+
@testset "Incremental Scan with nothing for both snapshot IDs" begin
394+
# Test scanning from root (nothing) to current (nothing) - full history
395+
scan3 = new_incremental_scan(table, nothing, nothing)
396+
@test scan3 isa RustyIceberg.IncrementalScan
397+
@test scan3.ptr != C_NULL
398+
println("✅ Incremental scan created with nothing for both snapshot IDs")
399+
400+
inserts_stream3, deletes_stream3 = RustyIceberg.scan!(scan3)
401+
@test inserts_stream3 != C_NULL
402+
@test deletes_stream3 != C_NULL
403+
println("✅ Streams obtained successfully for full history scan")
404+
405+
try
406+
# Read and validate from DELETES stream FIRST (to test the deadlock fix)
407+
# This is the scenario that previously caused indefinite blocking
408+
deletes_values = Tuple{String, Int64}[]
409+
deletes_batches = 0
410+
411+
while true
412+
batch_ptr = RustyIceberg.next_batch(deletes_stream3)
413+
if batch_ptr == C_NULL
414+
break
415+
end
416+
deletes_batches += 1
417+
batch = unsafe_load(batch_ptr)
418+
arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length))
419+
@test arrow_table isa Arrow.Table
420+
421+
# Convert to DataFrame and extract position delete metadata
422+
df = DataFrame(arrow_table)
423+
for row in eachrow(df)
424+
push!(deletes_values, (row.file_path, row.pos))
425+
end
426+
427+
RustyIceberg.free_batch(batch_ptr)
428+
end
429+
430+
# Verify we have no delete records (since this is a full history scan)
431+
@test isempty(deletes_values)
432+
433+
println("✅ Full history deletes stream validated successfully")
434+
println(" - Total deletes batches: $deletes_batches")
435+
println(" - Total delete records: $(length(deletes_values))")
436+
437+
# Now read and validate from inserts stream AFTER deletes
438+
inserts_values = Int64[]
439+
inserts_batches = 0
440+
441+
while true
442+
batch_ptr = RustyIceberg.next_batch(inserts_stream3)
443+
if batch_ptr == C_NULL
444+
break
445+
end
446+
inserts_batches += 1
447+
batch = unsafe_load(batch_ptr)
448+
arrow_table = Arrow.Table(unsafe_wrap(Array, batch.data, batch.length))
449+
450+
# Convert to DataFrame and collect values from column "n"
451+
df = DataFrame(arrow_table)
452+
@test "n" in names(df)
453+
append!(inserts_values, df.n)
454+
455+
RustyIceberg.free_batch(batch_ptr)
456+
end
457+
458+
# When scanning full history (nothing to nothing), we should get rows from all transactions
459+
# Verify we have some rows
460+
@test length(inserts_values) > 0
461+
sort!(inserts_values)
462+
463+
# Verify we have expected values from the test table
464+
# The test table was created with: range(1, 11), range(101, 200), range(201, 300)
465+
# And row 150 and 250 were deleted
466+
@test 1 in inserts_values # From first insert
467+
@test 101 in inserts_values # From second insert
468+
@test 201 in inserts_values # From third insert
469+
@test 150 inserts_values # Was deleted
470+
@test 250 inserts_values # Was deleted
471+
472+
println("✅ Full history inserts stream validated successfully")
473+
println(" - Total inserts batches: $inserts_batches")
474+
println(" - Total inserts rows: $(length(inserts_values))")
475+
finally
476+
RustyIceberg.free_stream(inserts_stream3)
477+
RustyIceberg.free_stream(deletes_stream3)
478+
RustyIceberg.free_incremental_scan!(scan3)
479+
end
480+
end
481+
393482
# Clean up table
394483
RustyIceberg.free_table(table)
395484
println("✅ Incremental scan test completed successfully!")

0 commit comments

Comments
 (0)