Skip to content
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion rerun_py/tests/e2e_redap_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
)


@pytest.fixture(scope="module")
@pytest.fixture(scope="function")
def table_filepath() -> Generator[pathlib.Path, None, None]:
"""
Copies test data to a temp directory.
Expand Down
58 changes: 58 additions & 0 deletions rerun_py/tests/e2e_redap_tests/test_table_write.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
from __future__ import annotations

import threading
from typing import TYPE_CHECKING

import pyarrow as pa
import pytest
from datafusion import DataFrameWriteOptions, InsertOp, SessionContext, col
from rerun.catalog import TableInsertMode

Expand Down Expand Up @@ -90,3 +92,59 @@ def test_client_append_to_table(server_instance: ServerInstance) -> None:
table_name, id=[3, 4, 5], bool_col=[False, True, None], double_col=[2.0, None, 1.0]
)
assert ctx.table(table_name).count() == original_rows + 4


@pytest.mark.parameterize("is_append", [True, False])
def test_concurrent_write_tables(server_instance: ServerInstance, is_append: bool) -> None:
num_writes = 100

table_name = "simple_datatypes"
ctx: SessionContext = server_instance.client.ctx

df_prior = ctx.table(table_name)
prior_count = df_prior.count()

df_low = ctx.table(table_name).filter(col("id") < 3).cache()
low_count = df_low.count()

df_high = ctx.table(table_name).filter(col("id") >= 3).cache()
high_count = df_high.count()

# Track any exceptions from threads
exceptions = []

def write_low() -> None:
for _ in range(num_writes):
try:
df_low.write_table(table_name)
except Exception as e:
exceptions.append(e)
return

def write_high() -> None:
for _ in range(num_writes):
try:
df_high.write_table(table_name)
except Exception as e:
exceptions.append(e)
return

thread1 = threading.Thread(target=write_low)
thread2 = threading.Thread(target=write_high)

thread1.start()
thread2.start()

thread1.join()
thread2.join()

if exceptions:
raise exceptions[0]

final_count = ctx.table(table_name).count()

expected = (
[prior_count + (num_writes * low_count) + (num_writes * high_count)] if is_append else [low_count, high_count]
)

assert final_count in [low_count, high_count], f"Expected rows in {expected} rows, got {final_count}"
Loading