-
Notifications
You must be signed in to change notification settings - Fork 952
Leverage new pylibcudf grouped_range_rolling_window for cuDF classic rolling(window: int)
#19162
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
742d486
a2c62f5
621e1b3
2006e06
142c985
36c5694
92cd617
47106ad
c612780
4eaeb1c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
# Copyright (c) 2020-2025, NVIDIA CORPORATION | ||
from __future__ import annotations | ||
|
||
import itertools | ||
import warnings | ||
from typing import TYPE_CHECKING | ||
|
||
|
@@ -23,6 +24,7 @@ | |
|
||
if TYPE_CHECKING: | ||
from cudf.core.dataframe import DataFrame | ||
from cudf.core.index import Index | ||
from cudf.core.series import Series | ||
|
||
|
||
|
@@ -32,6 +34,7 @@ class _RollingBase: | |
""" | ||
|
||
obj: DataFrame | Series | ||
_group_keys: Index | None = None | ||
|
||
def _apply_agg_column( | ||
self, source_column: ColumnBase, agg_name: str | ||
|
@@ -194,6 +197,7 @@ class Rolling(GetAttrGetItemMixin, _RollingBase, Reducible): | |
_PROTECTED_KEYS = frozenset(("obj",)) | ||
|
||
_time_window = False | ||
_group_keys = None | ||
|
||
_VALID_REDUCTIONS = { | ||
"sum", | ||
|
@@ -259,69 +263,91 @@ def __getitem__(self, arg): | |
center=self.center, | ||
) | ||
|
||
def _apply_agg_column(self, source_column, agg_name): | ||
def _apply_agg_column(self, source_column, agg_name) -> ColumnBase: | ||
min_periods = self.min_periods or 1 | ||
if isinstance(self.window, int): | ||
preceding_window = None | ||
following_window = None | ||
window = self.window | ||
elif isinstance(self.window, BaseIndexer): | ||
start, end = self.window.get_window_bounds( | ||
num_values=len(self.obj), | ||
min_periods=self.min_periods, | ||
center=self.center, | ||
closed=None, | ||
step=None, | ||
) | ||
start = as_column(start, dtype=SIZE_TYPE_DTYPE) | ||
end = as_column(end, dtype=SIZE_TYPE_DTYPE) | ||
|
||
idx = as_column(range(len(start))) | ||
preceding_window = (idx - start + np.int32(1)).astype( | ||
SIZE_TYPE_DTYPE | ||
) | ||
following_window = (end - idx - np.int32(1)).astype( | ||
SIZE_TYPE_DTYPE | ||
if self.center: | ||
pre = (self.window // 2) + 1 | ||
fwd = self.window - (pre) | ||
else: | ||
pre = self.window | ||
fwd = 0 | ||
rolling_request = plc.rolling.RollingRequest( | ||
source_column.to_pylibcudf(mode="read"), | ||
min_periods, | ||
aggregation.make_aggregation( | ||
agg_name, | ||
{"dtype": source_column.dtype} | ||
if callable(agg_name) | ||
else self.agg_params, | ||
).plc_obj, | ||
) | ||
window = None | ||
orderby_obj = as_column(range(len(source_column))) | ||
if self._group_keys is not None: | ||
group_cols: list[plc.Column] = [ | ||
col.to_pylibcudf(mode="read") | ||
for col in self._group_keys._columns | ||
] | ||
else: | ||
group_cols = [] | ||
group_keys = plc.Table(group_cols) | ||
with acquire_spill_lock(): | ||
(plc_result,) = plc.rolling.grouped_range_rolling_window( | ||
group_keys, | ||
orderby_obj.to_pylibcudf(mode="read"), | ||
plc.types.Order.ASCENDING, | ||
plc.types.NullOrder.BEFORE, | ||
plc.rolling.BoundedOpen(plc.Scalar.from_py(pre)), | ||
plc.rolling.BoundedClosed(plc.Scalar.from_py(fwd)), | ||
[rolling_request], | ||
).columns() | ||
return ColumnBase.from_pylibcudf(plc_result) | ||
else: | ||
preceding_window = as_column(self.window) | ||
following_window = as_column( | ||
0, length=self.window.size, dtype=self.window.dtype | ||
) | ||
window = None | ||
if isinstance(self.window, BaseIndexer): | ||
start, end = self.window.get_window_bounds( | ||
num_values=len(self.obj), | ||
min_periods=self.min_periods, | ||
center=self.center, | ||
closed=None, | ||
step=None, | ||
) | ||
start = as_column(start, dtype=SIZE_TYPE_DTYPE) | ||
end = as_column(end, dtype=SIZE_TYPE_DTYPE) | ||
|
||
with acquire_spill_lock(): | ||
if window is None: | ||
idx = as_column(range(len(start))) | ||
preceding_window = (idx - start + np.int32(1)).astype( | ||
SIZE_TYPE_DTYPE | ||
) | ||
following_window = (end - idx - np.int32(1)).astype( | ||
SIZE_TYPE_DTYPE | ||
) | ||
else: | ||
if self.center: | ||
# TODO: we can support this even though Pandas currently does not | ||
raise NotImplementedError( | ||
"center is not implemented for offset-based windows" | ||
) | ||
pre = preceding_window.to_pylibcudf(mode="read") | ||
fwd = following_window.to_pylibcudf(mode="read") | ||
else: | ||
if self.center: | ||
pre = (window // 2) + 1 | ||
fwd = window - (pre) | ||
else: | ||
pre = window | ||
fwd = 0 | ||
|
||
return ColumnBase.from_pylibcudf( | ||
plc.rolling.rolling_window( | ||
source_column.to_pylibcudf(mode="read"), | ||
pre, | ||
fwd, | ||
min_periods, | ||
aggregation.make_aggregation( | ||
agg_name, | ||
{"dtype": source_column.dtype} | ||
if callable(agg_name) | ||
else self.agg_params, | ||
).plc_obj, | ||
preceding_window = as_column(self.window) | ||
following_window = as_column( | ||
0, length=self.window.size, dtype=self.window.dtype | ||
) | ||
pre = preceding_window.to_pylibcudf(mode="read") | ||
fwd = following_window.to_pylibcudf(mode="read") | ||
with acquire_spill_lock(): | ||
return ColumnBase.from_pylibcudf( | ||
plc.rolling.rolling_window( | ||
source_column.to_pylibcudf(mode="read"), | ||
pre, | ||
fwd, | ||
min_periods, | ||
aggregation.make_aggregation( | ||
agg_name, | ||
{"dtype": source_column.dtype} | ||
if callable(agg_name) | ||
else self.agg_params, | ||
).plc_obj, | ||
) | ||
) | ||
) | ||
|
||
def _reduce( | ||
self, | ||
|
@@ -560,31 +586,39 @@ def __init__(self, groupby, window, min_periods=None, center=False): | |
sort_order | ||
) | ||
|
||
gb_size = groupby.size().sort_index() | ||
self._group_starts = ( | ||
gb_size.cumsum().shift(1).fillna(0).repeat(gb_size) | ||
) | ||
if not is_integer(window): | ||
gb_size = groupby.size().sort_index() | ||
self._group_starts = ( | ||
gb_size.cumsum().shift(1).fillna(0).repeat(gb_size) | ||
) | ||
|
||
super().__init__(obj, window, min_periods=min_periods, center=center) | ||
|
||
@acquire_spill_lock() | ||
def _window_to_window_sizes(self, window): | ||
if is_integer(window): | ||
return cudautils.grouped_window_sizes_from_offset( | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we delete this function from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is still used when doing a |
||
as_column(range(len(self.obj))).data_array_view(mode="read"), | ||
self._group_starts, | ||
window, | ||
) | ||
return super()._window_to_window_sizes(window) | ||
else: | ||
return cudautils.grouped_window_sizes_from_offset( | ||
self.obj.index._column.data_array_view(mode="read"), | ||
self._group_starts, | ||
window, | ||
) | ||
with acquire_spill_lock(): | ||
return cudautils.grouped_window_sizes_from_offset( | ||
self.obj.index._column.data_array_view(mode="read"), | ||
self._group_starts, | ||
window, | ||
) | ||
|
||
def _apply_agg(self, agg_name): | ||
index = MultiIndex._from_data( | ||
{**self._group_keys._data, **self.obj.index._data} | ||
dict( | ||
enumerate( | ||
itertools.chain( | ||
self._group_keys._columns, self.obj.index._columns | ||
) | ||
) | ||
) | ||
) | ||
index.names = list( | ||
itertools.chain( | ||
self._group_keys._column_names, self.obj.index._column_names | ||
) | ||
) | ||
result = super()._apply_agg(agg_name) | ||
result.index = index | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found it a bit hard to follow why
_group_starts
is getting set this way, and that led me down a bit of a rabbit hole. Do you think it would be cleaner to move input validation from__init__
into separate helper functions so thatRollingGroupby
andRolling
could have completely separate__init__
functions that call helpers, and then inline the code for converting windows to sizes? The current way that_normalize
and_window_to_window_sizes
is set up seems to improve code reuse but at the significant expense of readability.Out of scope for this PR, but a suggestion for future improvement.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah agreed there's an opportunity here to make things clearer. Additionally, we may not even need this if we can convert this to purely use pylibcudf as described in #19162 (comment)