Skip to content

Configurable blocksize mode for streaming executor in unit tests #19146

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: branch-25.08
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ci/run_cudf_polars_pytests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ python -m pytest --cache-clear "$@" tests
# Test the "streaming" executor
python -m pytest --cache-clear "$@" tests --executor streaming

# Test the "streaming" executor with small blocksize
python -m pytest --cache-clear "$@" tests --executor streaming --blocksize-mode small

# Run experimental tests with Distributed cluster
python -m pytest --cache-clear "$@" "tests/experimental" \
--executor streaming \
Expand Down
27 changes: 23 additions & 4 deletions python/cudf_polars/cudf_polars/testing/asserts.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@
from __future__ import annotations

from pathlib import Path
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any, Literal

import polars as pl
from polars import GPUEngine
from polars.testing.asserts import assert_frame_equal

from cudf_polars.dsl.translate import Translator
from cudf_polars.utils.config import StreamingFallbackMode

if TYPE_CHECKING:
from cudf_polars.typing import OptimizationArgs
Expand All @@ -29,6 +30,7 @@
# and `--scheduler` command-line arguments
DEFAULT_EXECUTOR = "in-memory"
DEFAULT_SCHEDULER = "synchronous"
DEFAULT_BLOCKSIZE_MODE: Literal["small", "default"] = "default"


def assert_gpu_result_equal(
Expand All @@ -46,6 +48,7 @@ def assert_gpu_result_equal(
atol: float = 1e-08,
categorical_as_str: bool = False,
executor: str | None = None,
blocksize_mode: Literal["small", "default"] | None = None,
) -> None:
"""
Assert that collection of a lazyframe on GPU produces correct results.
Expand Down Expand Up @@ -86,6 +89,12 @@ def assert_gpu_result_equal(
executor
The executor configuration to pass to `GPUEngine`. If not specified
uses the module level `Executor` attribute.
blocksize_mode
The "mode" to use for choosing the blocksize for the streaming executor.
If not specified, uses the module level ``DEFAULT_BLOCKSIZE_MODE`` attribute.
Set to "small" to configure small values for ``max_rows_per_partition``
and ``target_partition_size``, which will typically cause many partitions
to be created while executing the query.

Raises
------
Expand All @@ -95,13 +104,23 @@ def assert_gpu_result_equal(
If GPU collection failed in some way.
"""
if engine is None:
executor_options: dict[str, Any] = {}
executor = executor or DEFAULT_EXECUTOR
if executor == "streaming":
executor_options["scheduler"] = DEFAULT_SCHEDULER

blocksize_mode = blocksize_mode or DEFAULT_BLOCKSIZE_MODE

if blocksize_mode == "small": # pragma: no cover
executor_options["max_rows_per_partition"] = 4
executor_options["target_partition_size"] = 10
# We expect many tests to fall back, so silence the warnings
executor_options["fallback_mode"] = StreamingFallbackMode.SILENT

engine = GPUEngine(
raise_on_fail=True,
executor=executor,
executor_options=(
{"scheduler": DEFAULT_SCHEDULER} if executor == "streaming" else {}
),
executor_options=executor_options,
)

final_polars_collect_kwargs, final_cudf_collect_kwargs = _process_kwargs(
Expand Down
14 changes: 14 additions & 0 deletions python/cudf_polars/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,17 @@ def pytest_addoption(parser):
help="Scheduler to use for 'streaming' executor.",
)

parser.addoption(
"--blocksize-mode",
action="store",
default="default",
choices=("small", "default"),
help=(
"Blocksize to use for 'streaming' executor. Set to 'small' "
"to run most tests with multiple partitions."
),
)


def pytest_configure(config):
import cudf_polars.testing.asserts
Expand All @@ -43,6 +54,9 @@ def pytest_configure(config):

cudf_polars.testing.asserts.DEFAULT_EXECUTOR = config.getoption("--executor")
cudf_polars.testing.asserts.DEFAULT_SCHEDULER = config.getoption("--scheduler")
cudf_polars.testing.asserts.DEFAULT_BLOCKSIZE_MODE = config.getoption(
"--blocksize-mode"
)


def pytest_sessionstart(session):
Expand Down
19 changes: 19 additions & 0 deletions python/cudf_polars/tests/containers/test_column.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,25 @@ def test_non_scalar_access_raises():
_ = column.obj_scalar


def test_check_sorted():
dtype = DataType(pl.Int8())
column = Column(
plc.Column.from_iterable_of_py([0, 1, 2], dtype.plc),
dtype=dtype,
)
assert column.check_sorted(
order=plc.types.Order.ASCENDING, null_order=plc.types.NullOrder.AFTER
)
column.set_sorted(
is_sorted=plc.types.Sorted.YES,
order=plc.types.Order.ASCENDING,
null_order=plc.types.NullOrder.AFTER,
)
assert column.check_sorted(
order=plc.types.Order.ASCENDING, null_order=plc.types.NullOrder.AFTER
)


@pytest.mark.parametrize("length", [0, 1])
def test_length_leq_one_always_sorted(length):
dtype = DataType(pl.Int8())
Expand Down
7 changes: 6 additions & 1 deletion python/cudf_polars/tests/expressions/test_agg.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

from cudf_polars.dsl import expr
from cudf_polars.testing.asserts import (
DEFAULT_BLOCKSIZE_MODE,
assert_gpu_result_equal,
assert_ir_translation_raises,
)
Expand Down Expand Up @@ -74,7 +75,11 @@ def test_agg(df, agg):

# https://github.com/rapidsai/cudf/issues/15852
check_dtypes = agg not in {"n_unique", "median"}
if not check_dtypes and q.collect_schema()["a"] != pl.Float64:
if (
not check_dtypes
and q.collect_schema()["a"] != pl.Float64
and DEFAULT_BLOCKSIZE_MODE == "default"
Copy link
Contributor Author

@TomAugspurger TomAugspurger Jun 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line is the new condition. The IR graph produced by the multi-partition executor includes some Cast nodes that mean we do match polars, so we don't raise an AssertionError in this block (since we've worked around #15852).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: Should we introduce those casts in the in-memory executor instead (during translation time).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe... Matching the output type is obviously nice. But I do like that it's obvious that we're not sacrificing performance with an ad-hoc cast.

):
with pytest.raises(AssertionError):
assert_gpu_result_equal(q)
assert_gpu_result_equal(q, check_dtypes=check_dtypes, check_exact=False)
Expand Down
3 changes: 2 additions & 1 deletion python/cudf_polars/tests/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,8 @@ def test_groupby_maintain_order_random(nrows, nkeys, with_nulls):
)
)
q = df.lazy().group_by(key_names, maintain_order=True).agg(pl.col("value").sum())
assert_gpu_result_equal(q)
# The streaming executor is too slow for large n_rows with blocksize_mode="small"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know if there's a better way to find slow tests, other than watching out for tests that large number of rows and the default engine / blocksize_mode. pytest does have a --durations flag we could include.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you saying we could use --durations to find the slow tests and then possibly set blocksize="default" on those tests?

assert_gpu_result_equal(q, blocksize_mode="default" if nrows > 30 else None)


def test_groupby_len_with_nulls():
Expand Down
12 changes: 8 additions & 4 deletions python/cudf_polars/tests/test_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import polars as pl

from cudf_polars.testing.asserts import (
DEFAULT_BLOCKSIZE_MODE,
assert_gpu_result_equal,
assert_ir_translation_raises,
)
Expand Down Expand Up @@ -71,7 +72,7 @@ def test_non_coalesce_join(left, right, how, nulls_equal, join_expr):
query = left.join(
right, on=join_expr, how=how, nulls_equal=nulls_equal, coalesce=False
)
assert_gpu_result_equal(query, check_row_order=how == "left")
assert_gpu_result_equal(query, check_row_order=False)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here, and in many of the other join tests, we no longer check row order.

We could make this slightly more complicated like check_row_order=(how == "left" and BLOCKSIZE_MODE="default"). My understanding is that newer versions of polars don't even guarantee the row order for left joins so I opted to just not check that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's a good decision. We should also remove the "by default reorder" for left joins if we don't already.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.



@pytest.mark.parametrize(
Expand All @@ -85,16 +86,19 @@ def test_coalesce_join(left, right, how, nulls_equal, join_expr):
query = left.join(
right, on=join_expr, how=how, nulls_equal=nulls_equal, coalesce=True
)
assert_gpu_result_equal(query, check_row_order=how == "left")
assert_gpu_result_equal(query, check_row_order=False)


def test_left_join_with_slice(left, right, nulls_equal, zlice):
q = left.join(right, on="a", how="left", nulls_equal=nulls_equal, coalesce=True)

if zlice is not None:
if DEFAULT_BLOCKSIZE_MODE == "small" and zlice != (0, None):
pytest.skip("Cannot match polars' ordering with multiple partitions.")
Comment on lines +96 to +97
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For left.join(right).slice(...), the actual output depends on the row ordering, which I'm calling an implementation detail here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's reasonable. It also depends on row ordering for a single partition, so how do we match polars for single partitions? Do we sort afterwards?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just speculation, but I think that for polars and the streaming-executor-with-one-partition just happens to still maintain the order for left joins. Perhaps as https://docs.pola.rs/api/python/stable/reference/dataframe/api/polars.DataFrame.join.html notes

Do not rely on any observed ordering without explicitly setting this parameter, as your code may break in a future release.

and we should relax this test to just check that the number of rows is 2 and that the schema matches.


q = q.slice(*zlice)

assert_gpu_result_equal(q)
assert_gpu_result_equal(q, check_row_order=False)


def test_cross_join(left, right, zlice):
Expand All @@ -114,7 +118,7 @@ def test_cross_join(left, right, zlice):
)
def test_join_literal_key(left, right, left_on, right_on):
q = left.join(right, left_on=left_on, right_on=right_on, how="inner")
assert_gpu_result_equal(q)
assert_gpu_result_equal(q, check_row_order=False)


@pytest.mark.parametrize(
Expand Down
7 changes: 5 additions & 2 deletions python/cudf_polars/tests/test_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,10 +331,13 @@ def test_scan_parquet_only_row_index_raises(df, tmp_path):
assert_ir_translation_raises(q, NotImplementedError)


def test_scan_include_file_path(request, tmp_path, format, scan_fn, df):
@pytest.mark.parametrize("n_rows", [None, 2])
def test_scan_include_file_path(request, tmp_path, format, scan_fn, df, n_rows):
if n_rows is not None:
df = df.head(n_rows)
Comment on lines +336 to +337
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was needed for test coverage with blocksize_mode="small".

make_partitioned_source(df, tmp_path / "file", format)

q = scan_fn(tmp_path / "file", include_file_paths="files")
q = scan_fn(tmp_path / "file", include_file_paths="files", n_rows=n_rows)

if format == "ndjson":
assert_ir_translation_raises(q, NotImplementedError)
Expand Down
Loading