Skip to content

[WIP] Track column statistics in cuDF-Polars #19130

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 28 commits into
base: branch-25.08
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
b9bb173
start with lawrences doodle
rjzamora Jun 5, 2025
8699a9e
save work
rjzamora Jun 6, 2025
2d590dc
revise basic class structure
rjzamora Jun 9, 2025
6620a6f
Merge remote-tracking branch 'upstream/branch-25.08' into column-stats
rjzamora Jun 9, 2025
511d059
tests passing
rjzamora Jun 10, 2025
2cf53d2
change the config name
rjzamora Jun 10, 2025
4505d7d
Merge remote-tracking branch 'upstream/branch-25.08' into column-stats
rjzamora Jun 10, 2025
49ce228
Merge branch 'branch-25.08' into column-stats
rjzamora Jun 10, 2025
4fee62c
remove TableSourceStats
rjzamora Jun 11, 2025
88247f4
Merge remote-tracking branch 'upstream/branch-25.08' into column-stats
rjzamora Jun 11, 2025
2d0c43d
minor cleanup
rjzamora Jun 11, 2025
455ad2d
Merge branch 'column-stats' of github.com:rjzamora/cudf into column-s…
rjzamora Jun 11, 2025
e4f284c
Merge remote-tracking branch 'upstream/branch-25.08' into column-stats
rjzamora Jun 12, 2025
63e650a
test coverage
rjzamora Jun 12, 2025
c076ec4
Update python/cudf_polars/cudf_polars/dsl/traversal.py
rjzamora Jun 12, 2025
6672aa3
use LRU instead of FIFO
rjzamora Jun 12, 2025
2def2df
Merge branch 'column-stats' of github.com:rjzamora/cudf into column-s…
rjzamora Jun 12, 2025
daedd2d
Merge remote-tracking branch 'upstream/branch-25.08' into column-stats
rjzamora Jun 12, 2025
adce001
avoid key errors
rjzamora Jun 12, 2025
b7f52e4
Merge remote-tracking branch 'upstream/branch-25.08' into column-stats
rjzamora Jun 12, 2025
49802a9
Merge remote-tracking branch 'upstream/branch-25.08' into column-stats
rjzamora Jun 26, 2025
afd1a9b
rename column_statistics to column_stats
rjzamora Jun 26, 2025
c6af3b4
more renaming of statistics to stats
rjzamora Jun 26, 2025
6b162b3
pull config_options back out of StatsCollector
rjzamora Jun 26, 2025
6d168eb
Merge remote-tracking branch 'upstream/branch-25.08' into column-stats
rjzamora Jun 26, 2025
2208fad
fix typo
rjzamora Jun 26, 2025
ab981c5
change _get_unique_fractions input types
rjzamora Jun 26, 2025
7cfd6bf
Merge branch 'branch-25.08' into column-stats
rjzamora Jun 26, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 89 additions & 0 deletions python/cudf_polars/cudf_polars/experimental/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from collections.abc import Generator, Iterator

from cudf_polars.dsl.expr import NamedExpr
from cudf_polars.dsl.ir import IR
from cudf_polars.dsl.nodebase import Node


Expand Down Expand Up @@ -44,3 +45,91 @@ def __rich_repr__(self) -> Generator[Any, None, None]:
def get_key_name(node: Node) -> str:
"""Generate the key name for a Node."""
return f"{type(node).__name__.lower()}-{hash(node)}"


class ColumnSourceStats:
"""
Column source statistics.
Parameters
----------
cardinality
Cardinality (row count).
unique_count
Unique-value count.
unique_fraction
Unique-value fraction.
storage_size_per_file
Average un-compressed storage size for this
column in a single file. This value is used to
calculate the partition count for an IR node.
exact
Tuple of attributes that have not been estimated
by partial sampling, and are known exactly,
Notes
-----
Source statistics are statistics coming from "source"
nodes like ``Scan` and ``DataFrameScan``.
"""

__slots__ = (
"cardinality",
"exact",
"storage_size_per_file",
"unique_count",
"unique_fraction",
)

def __init__(
self,
*,
cardinality: int | None = None,
storage_size_per_file: int | None = None,
unique_count: int | None = None,
unique_fraction: float | None = None,
exact: tuple[str, ...] = (),
):
self.cardinality = cardinality
self.storage_size_per_file = storage_size_per_file
self.unique_count = unique_count
self.unique_fraction = unique_fraction
self.exact = exact


class ColumnStats:
"""
Column statistics.
Parameters
----------
name
Column name.
unique_count
Unique-count estimate.
source_stats
Column-source statistics.
"""

__slots__ = ("name", "source_stats", "unique_count")

def __init__(
self,
*,
name: str | None = None,
unique_count: int | None = None,
source_stats: ColumnSourceStats | None = None,
) -> None:
self.name = name
self.unique_count = unique_count
self.source_stats = source_stats


class StatsCollector:
"""Column statistics collector."""

__slots__ = ("cardinality", "column_stats")

def __init__(self) -> None:
self.cardinality: dict[IR, int] = {}
self.column_stats: dict[IR, dict[str, ColumnStats]] = {}
Original file line number Diff line number Diff line change
Expand Up @@ -872,7 +872,7 @@ def run(options: Sequence[str] | None = None) -> None:
executor_options: dict[str, Any] = {}
if run_config.executor == "streaming":
executor_options = {
"cardinality_factor": {
"unique_fraction": {
"c_custkey": 0.05, # Q10
"l_orderkey": 1.0, # Q18
"l_partkey": 0.1, # Q20
Expand Down
25 changes: 23 additions & 2 deletions python/cudf_polars/cudf_polars/experimental/dispatch.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES.
# SPDX-FileCopyrightText: Copyright (c) 2024-2025, NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0
"""Multi-partition dispatch functions."""

Expand All @@ -12,8 +12,9 @@
from typing import TypeAlias

from cudf_polars.dsl.ir import IR
from cudf_polars.experimental.base import PartitionInfo
from cudf_polars.experimental.base import PartitionInfo, StatsCollector
from cudf_polars.typing import GenericTransformer
from cudf_polars.utils.config import ConfigOptions


LowerIRTransformer: TypeAlias = (
Expand Down Expand Up @@ -82,3 +83,23 @@ def generate_ir_tasks(
task_graph
"""
raise AssertionError(f"Unhandled type {type(ir)}") # pragma: no cover


@singledispatch
def add_source_stats(
ir: IR, stats: StatsCollector, config_options: ConfigOptions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this source stats specific? I see we also do stuff for Join, which IIUC just does stuff with ColumnStats. Maybe rename to collect_stats?

) -> None:
"""
Add basic source statistics for an IR node.

Parameters
----------
ir
The IR node to collect source statistics for.
stats
The `StatsCollector` object to update with new
source statistics.
config_options
GPUEngine configuration options.
"""
raise AssertionError(f"Unhandled type {type(ir)}") # pragma: no cover
44 changes: 27 additions & 17 deletions python/cudf_polars/cudf_polars/experimental/distinct.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,11 @@
from cudf_polars.dsl.ir import Distinct
from cudf_polars.experimental.base import PartitionInfo
from cudf_polars.experimental.dispatch import lower_ir_node
from cudf_polars.experimental.utils import _fallback_inform, _lower_ir_fallback
from cudf_polars.experimental.utils import (
_fallback_inform,
_get_unique_fractions,
_lower_ir_fallback,
)

if TYPE_CHECKING:
from collections.abc import MutableMapping
Expand All @@ -29,7 +33,7 @@ def lower_distinct(
partition_info: MutableMapping[IR, PartitionInfo],
config_options: ConfigOptions,
*,
cardinality: float | None = None,
unique_fraction: float | None = None,
) -> tuple[IR, MutableMapping[IR, PartitionInfo]]:
"""
Lower a Distinct IR into partition-wise stages.
Expand All @@ -46,8 +50,8 @@ def lower_distinct(
associated partitioning information.
config_options
GPUEngine configuration options.
cardinality
Cardinality factor to use for algorithm selection.
unique_fraction
Fractional unique count to use for algorithm selection.

Returns
-------
Expand Down Expand Up @@ -112,14 +116,14 @@ def lower_distinct(
# partitions. For now, we raise an error to fall back
# to one partition.
raise NotImplementedError("Unsupported slice for multiple partitions.")
elif cardinality is not None:
# Use cardinality to determine partitioningcardinality
n_ary = min(max(int(1.0 / cardinality), 2), child_count)
output_count = max(int(cardinality * child_count), 1)
elif unique_fraction is not None:
# Use unique_fraction to determine partitioning
n_ary = min(max(int(1.0 / unique_fraction), 2), child_count)
output_count = max(int(unique_fraction * child_count), 1)

if output_count > 1 and require_tree_reduction:
# Need to reduce down to a single partition even
# if the cardinality is large.
# if the unique_fraction is large.
output_count = 1
_fallback_inform(
"Unsupported unique options for multiple partitions.",
Expand Down Expand Up @@ -164,24 +168,30 @@ def _(
# Extract child partitioning
child, partition_info = rec(ir.children[0])
config_options = rec.state["config_options"]
column_stats = rec.state["stats"].column_stats.get(ir.children[0], {})

assert config_options.executor.name == "streaming", (
"'in-memory' executor not supported in 'lower_ir_node'"
)

subset: frozenset = ir.subset or frozenset(ir.schema)
cardinality_factor = {
c: max(min(f, 1.0), 0.00001)
for c, f in config_options.executor.cardinality_factor.items()
if c in subset
}
cardinality = max(cardinality_factor.values()) if cardinality_factor else None
subset: frozenset[str] = ir.subset or frozenset(ir.schema)
unique_fraction_dict = _get_unique_fractions(
tuple(subset),
config_options.executor.unique_fraction,
column_stats,
)

unique_fraction = (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a difference between {} and None here? I think that unique_fraction_dict could be {} here if both column_statistics and config_options.executor.unique_fraction are empty. Then we'll have bool(unique_fraction_dict) is False and get the None.

max(unique_fraction_dict.values()) if unique_fraction_dict else None
)

try:
return lower_distinct(
ir,
child,
partition_info,
config_options,
cardinality=cardinality,
unique_fraction=unique_fraction,
)
except NotImplementedError as err:
return _lower_ir_fallback(ir, rec, msg=str(err))
40 changes: 30 additions & 10 deletions python/cudf_polars/cudf_polars/experimental/expressions.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,15 @@
from cudf_polars.dsl.utils.naming import unique_names
from cudf_polars.experimental.base import PartitionInfo
from cudf_polars.experimental.repartition import Repartition
from cudf_polars.experimental.utils import _leaf_column_names
from cudf_polars.experimental.utils import _get_unique_fractions, _leaf_column_names

if TYPE_CHECKING:
from collections.abc import Generator, MutableMapping, Sequence
from typing import TypeAlias

from cudf_polars.dsl.expressions.base import Expr
from cudf_polars.dsl.ir import IR
from cudf_polars.experimental.base import ColumnStats
from cudf_polars.typing import GenericTransformer, Schema
from cudf_polars.utils.config import ConfigOptions

Expand Down Expand Up @@ -128,6 +129,7 @@ def _decompose_unique(
input_ir: IR,
partition_info: MutableMapping[IR, PartitionInfo],
config_options: ConfigOptions,
column_stats: MutableMapping[str, ColumnStats],
*,
names: Generator[str, None, None],
) -> tuple[Expr, IR, MutableMapping[IR, PartitionInfo]]:
Expand All @@ -145,6 +147,8 @@ def _decompose_unique(
associated partitioning information.
config_options
GPUEngine configuration options.
column_stats
Column statistics.
names
Generator of unique names for temporaries.

Expand Down Expand Up @@ -174,13 +178,15 @@ def _decompose_unique(
"'in-memory' executor not supported in '_decompose_unique'"
)

cardinality: float | None = None
if cardinality_factor := {
max(min(v, 1.0), 0.00001)
for k, v in config_options.executor.cardinality_factor.items()
if k in _leaf_column_names(child)
}:
cardinality = max(cardinality_factor)
unique_fraction_dict = _get_unique_fractions(
_leaf_column_names(child),
config_options.executor.unique_fraction,
column_stats,
)

unique_fraction = (
max(unique_fraction_dict.values()) if unique_fraction_dict else None
)

input_ir, partition_info = lower_distinct(
Distinct(
Expand All @@ -194,7 +200,7 @@ def _decompose_unique(
input_ir,
partition_info,
config_options,
cardinality=cardinality,
unique_fraction=unique_fraction,
)

return column, input_ir, partition_info
Expand Down Expand Up @@ -368,6 +374,7 @@ def _decompose_expr_node(
input_ir: IR,
partition_info: MutableMapping[IR, PartitionInfo],
config_options: ConfigOptions,
column_stats: MutableMapping[str, ColumnStats],
*,
names: Generator[str, None, None],
) -> tuple[Expr, IR, MutableMapping[IR, PartitionInfo]]:
Expand All @@ -385,6 +392,8 @@ def _decompose_expr_node(
associated partitioning information.
config_options
GPUEngine configuration options.
column_stats
Column statistics.
names
Generator of unique names for temporaries.

Expand Down Expand Up @@ -416,7 +425,12 @@ def _decompose_expr_node(
)
elif isinstance(expr, UnaryFunction) and expr.name == "unique":
return _decompose_unique(
expr, input_ir, partition_info, config_options, names=names
expr,
input_ir,
partition_info,
config_options,
column_stats,
names=names,
)
else:
# This is an un-supported expression - raise.
Expand All @@ -437,6 +451,7 @@ def _decompose(
rec.state["input_ir"],
{rec.state["input_ir"]: rec.state["input_partition_info"]},
rec.state["config_options"],
rec.state["column_stats"],
names=rec.state["unique_names"],
)

Expand Down Expand Up @@ -474,6 +489,7 @@ def _decompose(
input_ir,
partition_info,
rec.state["config_options"],
rec.state["column_stats"],
names=rec.state["unique_names"],
)

Expand All @@ -483,6 +499,7 @@ def decompose_expr_graph(
input_ir: IR,
partition_info: MutableMapping[IR, PartitionInfo],
config_options: ConfigOptions,
column_stats: MutableMapping[str, ColumnStats],
) -> tuple[NamedExpr, IR, MutableMapping[IR, PartitionInfo]]:
"""
Decompose a NamedExpr into stages.
Expand All @@ -499,6 +516,8 @@ def decompose_expr_graph(
associated partitioning information.
config_options
GPUEngine configuration options.
column_stats
Column statistics.

Returns
-------
Expand All @@ -519,6 +538,7 @@ def decompose_expr_graph(
"input_ir": input_ir,
"input_partition_info": partition_info[input_ir],
"config_options": config_options,
"column_stats": column_stats,
"unique_names": unique_names((named_expr.name, *input_ir.schema.keys())),
}
mapper = CachingVisitor(_decompose, state=state)
Expand Down
Loading
Loading