-
Notifications
You must be signed in to change notification settings - Fork 952
[WIP] Track column statistics in cuDF-Polars #19130
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: branch-25.08
Are you sure you want to change the base?
Conversation
Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually. Contributors can view more details about this message here. |
@@ -49,6 +50,40 @@ def traversal(nodes: Sequence[NodeT]) -> Generator[NodeT, None, None]: | |||
lifo.append(child) | |||
|
|||
|
|||
def post_traversal(nodes: Sequence[NodeT]) -> Generator[NodeT, None, None]: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NOTE: This was copied from wence-:more-tablestat-doodles.
/ok to test |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Happy to see this coming together. Gave a quick pass and will look more closely later.
_SOURCE_STATS_CACHE_MAX_ITEMS: int = 10 | ||
|
||
|
||
def _update_source_stats_cache( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't looked closely, but would functools.lru_cache(maxsize=10)
work? https://docs.python.org/3/library/functools.html#functools.lru_cache
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looked through this some more. I think the tl/dr is that we could maybe
force this into using lru_cache
. It's unclear to me whether we should.
The idea here is to cache tuple[path, ...]
-> dict[str, ColumnSourceStats]
, i.e.
the column statistics for a given tuple of paths. Note that the paths
in the key are for everything in the IR, not just the ones sampled.
Some of the complexity seems to come potentially hitting this cache with
the same paths
, but a different subset of columns of interest: We might
have something like
a = pl.scan_parquet("data.parquet", columns=["a", "b"])
b = pl.scan_parquet("data.parquet", columns=["b", "c"])
Note that b
is in both, and the full table might have many columns. We want
to avoid computing the stats for b
twice, and we want to avoid computing the
stats for columns that we'll never use. This means a simple lru_cache
with
an entry per fileir.paths
isn't going to give us what we want.
This seems doable with a functools.lru_cache
on the tuple[path, ...]
key by
having it return a (mutable) dict[str, ColumnSourceStats]
. Then any callers
asking for the stats of a given set of paths will get a view on the same dict,
which they can mutate inplace. Something like
@functools.lru_cache
def get_source_stats(paths: tuple[str, ...]) -> dict[str, ColumnSourceStats]:
return {}
source_stats_cached = get_source_stats(paths)
for column in need_columns - set(source_stats_cached):
# compute for that column
source_stats_cached[column] = ColumnSourceStats(...)
Maybe this isn't much better. Mutating cached values is dangerous.
But it does give us an LRU cache rather than a FIFO cache, along with all
the nice things from lru_cache
like cache_info()
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You could indirect things by having a two-level scheme:
@functools.lru_cache
def stats_getter(paths: tuple[str, ...]) -> Callable[[str], ColumnSourceStats]:
@functools.lru_cache
def colstats(column: str) -> ColumnSourceStats:
return ...
...
colstats = stats_getter(paths)("column")
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah I was trying to figure out how to get the two layer thing to work, but couldn't do it in a way that didn't break the maxsize
of the outer cache. But I see now that that should be equivalent: set a maxsize
of 10 or whatever on the stats_getter
and an unlimited size on the colstats
. I think this should work out nicely.
self.cardinality: dict[IR, int] = {} | ||
self.column_statistics: dict[IR, dict[str, ColumnStats]] = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are the keys of these two dictionaries always the same?
If so, maybe we model this as a something like a record or tuple with two elements: one for the cardinality and one for the column statistics? That way they can't get out of sync?
If these keys can differ, then disregard.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://github.com/rapidsai/cudf/pull/19130/files#diff-fe5f5ae8b1b9a9f2ea2369b2ae9237d407170e4514c4a866ce0631b47b248511R42 is an example where we just update column_statistics
, not cardinatlity
, so these can differ.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another comment: we should probably be consistent about stats
vs. statistics
in our names, unless there's a strong reason not to. I don't have a preference for which.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is an example where we just update column_statistics, not cardinatlity, so these can differ.
Yeah, this is also one place where I'm trying to keep the API (StatsCollector
to be specific) similar to Lawrence's prototype/doodle.
Another comment: we should probably be consistent about stats vs. statistics in our names, unless there's a strong reason not to. I don't have a preference for which.
Yup - It seems like the inconsistency is because I tend to use stats
and Lawrence tends to use statistics
. I'm also indifferent, but agree that we should be consistent.
|
||
Parameters | ||
---------- | ||
table_source |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit surprised (not in a negative way) to see this modeled as ColumnSourceStats
having a TableSourceStats
field. I would naively expect something like
class TableSourceStats:
paths: tuple[str, ...]
cardinality: int | None
column_statistics: dict[str, ColumnSourceStats]
And then we have one TableSourceStats per source, and each TableSourceStats has one ColumnSourceStats
per column.
But I haven't looked at how this is used yet. Maybe in practice we're only ever looking at stats for a particular column, and going through the table
might be difficult / impossible?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a good question. I'm leaning pretty hard in the direction of: TableSourceStats
is unnecessary, and we should just add a cardinality
attribute to ColumnSourceStats
.
I originally started with the design you sketched above. However, since we are mostly interested in working with ColumnStats
at the IR level, we end up needing to do something like column_stats.table_source.column_statistics[original_column_name]
to get to the column-source statistics. This is both verbose, and opens us up to confusion if the name of the column has changed.
The current design shortens this to column_stats.source_stats
, and side-steps the possibility of column-renaming causing a problem. However, I still don't see a clear reason why the TableSourceStats
class needs to exist at all.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
and we should just add a cardinality attribute to ColumnSourceStats.
That seems pretty reasonable to me. We will end up with situations where we have the same key from different tables (e.g. primary key in one, foreign key in another), but that'll occur regardless of whether cardinality
is on ColumnSourceStats
or TableSourceStats
I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Went through building the ColumnSourceStats
in parquet. I haven't gone through the caching yet.
plc.io.SourceInfo(paths) | ||
) | ||
num_rows_per_file = int(metadata.num_rows() / len(paths)) | ||
num_rows_total = num_rows_per_file * file_count |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this just num_rows_total = metadata.num_rows()
? It seems like we're doing something close to that
num_rows_total = num_rows_per_file * file_count
= int(metadata.num_rows() / len(paths)) * len(paths)
so they should be the same other than rounding from the int
.
Ah, not quite because of the sampling. I'd suggest a comment explaining how we'll have some directly known values from the sampled files and estimates extrapolated from those. And then a convention of pre/post-fixing variable names with what type we're dealing with. I see num_rows_total
, total_uncompressed_size
so maybe align those to both use a prefix.
And maybe it's worth emphasizing that the total row count and total uncompressed size are estimates by including those in the variable names.
rowgroup_offsets_per_file = np.insert( | ||
np.cumsum(num_row_groups_per_file_samples), 0, 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be the same & probably faster
rowgroup_offsets_per_file = np.insert( | |
np.cumsum(num_row_groups_per_file_samples), 0, 0 | |
rowgroup_offsets_per_file = np.cumsum([0] + num_row_groups_per_file_samples) |
column_sizes = {} | ||
for name, uncompressed_sizes in metadata.columnchunk_metadata().items(): | ||
if name in need_columns: | ||
column_sizes[name] = np.array( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need an array here, rather than an aggregated value? I see we derive total_uncompressed_size
by taking a mean
later. Does something else use the unaggregated values?
Note to self: the length of this array should exactly match the number of files sampled.
# We have un-cached column metadata to process | ||
|
||
# Calculate the mean per-file `total_uncompressed_size` for each column | ||
total_uncompressed_size = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is total
correct here, or should this be avg
? IIUC this represents our estimate of the in-memory size of a given column for any given source file.
for path, num_rgs in zip( | ||
paths, num_row_groups_per_file_samples, strict=True | ||
): | ||
for rg_id in range(num_rgs): | ||
n += 1 | ||
samples[path].append(rg_id) | ||
if n == num_rg_samples: | ||
break | ||
if n == num_rg_samples: | ||
break |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
IIUC, this is building up a list of specific row groups to sample. It will be biased to sample row groups from early files.
Maybe it'd be better to build up the full list of (file, rowgroup_id)
and then slice through that, like we do with the files above? Roughly:
samples = [
(file, i)
for file, num_rgs in zip(paths, num_row_groups_per_file_samples, strict=True)
for i in range(num_rgs)
]
stride = max(1, int(len(samples) / num_rg_samples))
samples = samples[::stride]
Maybe not though. The number of files
will be limited by the config option, but the number of for groups per file could be very large (depends on the parquet file) so that list could be pretty large. There's probably a smart way to do this with some math.
unique_fraction_estimates[name] = max( | ||
min(1.0, row_group_unique_count / row_group_num_rows), | ||
0.00001, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the motivation behind this max
? I suppose that with very large row groups, this fraction can get arbitrarily close to zero. And that's something we don't want callers having to think about?
Do we lose anything by truncating to 0.00001?
Unique-count estimate. | ||
unique_fraction | ||
Unique-fraction estimate. | ||
file_size |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This name confused me initially. It's not the size of the source (e.g. Parquet) file on disk. It's the estimated in-memory size of some column, derived by sampling a few source files.
# Leave out unique stats if they were defined by the | ||
# user. This allows us to avoid collecting stats for | ||
# columns that are know to be problematic. | ||
user_fractions = ir.config_options.executor.unique_fraction |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking through how I feel about this design. IIUC, the intent is to always give preference to user-provided statistics over stats from the source, which makes sense. _get_unique_fractions
merges these two here.
Initially I wondered why we had to worry about this in two places: here and _get_unique_fractions
. I think the answers are
- We worry about them here as an optimization: we avoid computing stats for things that'll just be overridden later
- We worry about them in
_get_unique_fractions
since we can (in principal) have other places generating these statistics (like aDataFrameScan
).
So we could maybe cut out _get_unique_fractions
having to merge these by requiring whoever produces these ColumnSourceStats
to do the merging, with preference for user-provided stats. Dunno if that's worth it, but I wrote this up to understand things so I'll submit it for discussion :)
/ok to test |
target_partition_size: int = 0 | ||
parquet_metadata_samples: int = 3 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The docs say 5
is the default.
self.cardinality: dict[IR, int] = {} | ||
self.column_statistics: dict[IR, dict[str, ColumnStats]] = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://github.com/rapidsai/cudf/pull/19130/files#diff-fe5f5ae8b1b9a9f2ea2369b2ae9237d407170e4514c4a866ce0631b47b248511R42 is an example where we just update column_statistics
, not cardinatlity
, so these can differ.
self.cardinality: dict[IR, int] = {} | ||
self.column_statistics: dict[IR, dict[str, ColumnStats]] = {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Another comment: we should probably be consistent about stats
vs. statistics
in our names, unless there's a strong reason not to. I don't have a preference for which.
column_statistics, | ||
) | ||
|
||
unique_fraction = ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a difference between {}
and None
here? I think that unique_fraction_dict
could be {}
here if both column_statistics
and config_options.executor.unique_fraction
are empty. Then we'll have bool(unique_fraction_dict) is False
and get the None
.
config_options: ConfigOptions, | ||
column_statistics: MutableMapping[str, ColumnStats], | ||
) -> dict[str, float]: | ||
assert config_options.executor.name == "streaming", ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One way to avoid this assert
here is to change the function signature to take user_unique_fraction: dict[str, float]
and then make the caller responsible for this assertion.
It's possible that this will lead to more of these assertions (e.g. two functions call this, and neither is already making this assertion), but it reduces the "types" of places where we have to make this assertion. I'd have a slight preference to push as many of these assertions to the "edge" if possible.
_decompose_unique
is a somewhat good example of why pushing this to the edge might be better. It calls into _get_unique_fractions
with config_options
. I'm not sure whether _decompose_unique
is only called with a streaming executor, though, so we'd have to keep walking all the way up to see. I'm not 100% sure, but it does look like we could hit here with the in-memory executor.
Description
Supersedes #18865
An important goal of this PR is to lay the foundation for the kind of Join-based statistics gathering prototyped by @wence- in wence-:more-tablestat-doodles. This PR does NOT implement most of logic in that branch. However, it does implement a compatible foundation for that work.
In wence-:more-tablestat-doodles, statistics are collected in two passes over the original logical plan. The first pass is essentially collecting statistics originating only from
Scan
/DataFrameScan
IR nodes. The second pass updates these "base" statistics to account for Join/Filters/GroupBy/etc. This PR only implements the first pass. However, we now collect the "base" statistics (now referred to as "source" statistics) in a format that can also be used for partitioning, and to choose between shuffle- and reduction-based aggregations.Checklist