Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions crates/polars-time/src/chunkedarray/rolling_window/dispatch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,35 @@ where
InvalidOperation: "`window_size` must be strictly positive"
);

if by.null_count() > 0 {
let n = ca.len();
let mut valid_indices: Vec<IdxSize> = Vec::with_capacity(n - by.null_count());
let mut gather_indices: Vec<Option<IdxSize>> = Vec::with_capacity(n);
let mut rank: IdxSize = 0;

for (orig_pos, not_null) in by.is_not_null().iter().enumerate() {
if not_null == Some(true) {
valid_indices.push(orig_pos as IdxSize);
gather_indices.push(Some(rank));
rank += 1;
} else {
gather_indices.push(None);
}
}

let valid_idx_ca = IdxCa::from_vec(PlSmallStr::EMPTY, valid_indices);
let ca_rechunked = ca.rechunk();
let ca_filtered = unsafe { ca_rechunked.take_unchecked(&valid_idx_ca) };
let by_rechunked = by.rechunk();
let by_filtered = unsafe { by_rechunked.take_unchecked(&valid_idx_ca) };

let computed =
rolling_agg_by::<T, Out, NoNullsAgg, NullsAgg>(&ca_filtered, &by_filtered, options)?;

let gather_ca: IdxCa = gather_indices.into_iter().collect();
return Ok(unsafe { computed.take_unchecked(&gather_ca) });
}

let (by, tz) = match by.dtype() {
DataType::Datetime(tu, tz) => (by.cast(&DataType::Datetime(*tu, None))?, tz),
DataType::Date => (
Expand Down
40 changes: 40 additions & 0 deletions py-polars/tests/unit/series/test_rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,43 @@ def test_series_rolling_rank_by_temporal(
actual = values.rolling_rank_by(by_col_temporal, "2h", method="average")
expected = pl.Series([1.0, 2.0, 1.0, 2.0, 2.0, 1.0, 2.0, 2.0])
assert_series_equal(actual, expected)


def test_series_rolling_mean_by_null_by_single() -> None:
s = pl.Series([1.0, 2.0, 3.0, 4.0, 5.0])
by = pl.Series([1, 2, None, 4, 5], dtype=pl.Int64)
actual = s.rolling_mean_by(by, window_size="2i")
expected = pl.Series([1.0, 1.5, None, 4.0, 4.5])
assert_series_equal(actual, expected)


def test_series_rolling_mean_by_null_by_all() -> None:
s = pl.Series([1.0, 2.0, 3.0])
by = pl.Series([None, None, None], dtype=pl.Int64)
actual = s.rolling_mean_by(by, window_size="2i")
expected = pl.Series([None, None, None], dtype=pl.Float64)
assert_series_equal(actual, expected)


def test_series_rolling_mean_by_null_by_unsorted() -> None:
s = pl.Series([3.0, 1.0, 2.0, 4.0])
by = pl.Series([3, None, 1, 2], dtype=pl.Int64)
actual = s.rolling_mean_by(by, window_size="2i")
expected = pl.Series([3.5, None, 2.0, 3.0])
assert_series_equal(actual, expected)


def test_series_rolling_mean_by_null_by_and_values() -> None:
s = pl.Series([None, 2.0, 3.0, None, 5.0])
by = pl.Series([1, None, 3, 4, 5], dtype=pl.Int64)
actual = s.rolling_mean_by(by, window_size="2i")
expected = pl.Series([None, None, 3.0, 3.0, 5.0])
assert_series_equal(actual, expected)


def test_series_rolling_mean_by_null_by_multiple() -> None:
s = pl.Series([1.0, 2.0, 3.0, 4.0, 5.0])
by = pl.Series([None, 2, None, 4, None], dtype=pl.Int64)
actual = s.rolling_mean_by(by, window_size="2i")
expected = pl.Series([None, 2.0, None, 4.0, None])
assert_series_equal(actual, expected)
Loading