Skip to content

Conversation

@Yicong-Huang
Copy link
Contributor

@Yicong-Huang Yicong-Huang commented Oct 23, 2025

What changes were proposed in this pull request?

This PR adds support for the Iterator[pandas.DataFrame] API in groupBy().applyInPandas(), enabling batch-by-batch processing of grouped data for improved memory efficiency and scalability.

Key Changes:

  1. New PythonEvalType: Added SQL_GROUPED_MAP_PANDAS_ITER_UDF (216) to distinguish iterator-based UDFs from standard grouped map UDFs

  2. Type Inference: Implemented automatic detection of iterator signatures:

    • Iterator[pd.DataFrame] -> Iterator[pd.DataFrame]
    • Tuple[Any, ...], Iterator[pd.DataFrame] -> Iterator[pd.DataFrame]
  3. Streaming Serialization: Created GroupPandasIterUDFSerializer that streams results without materializing all DataFrames in memory

  4. Configuration Change: Updated FlatMapGroupsInPandasExec which was hardcoding pythonEvalType = 201 instead of extracting it from the UDF expression (mirrored fix from FlatMapGroupsInArrowExec)

Why are the changes needed?

The existing applyInPandas() API loads entire groups into memory as single DataFrames. For large groups, this can cause OOM errors. The iterator API allows:

  • Memory Efficiency: Process data batch-by-batch instead of materializing entire groups
  • Scalability: Handle arbitrarily large groups that don't fit in memory
  • Consistency: Mirrors the existing applyInArrow() iterator API design

Does this PR introduce any user-facing changes?

Yes, this PR adds a new API variant for applyInPandas():

Before (existing API, still supported):

def normalize(pdf: pd.DataFrame) -> pd.DataFrame:
    return pdf.assign(v=(pdf.v - pdf.v.mean()) / pdf.v.std())

df.groupBy("id").applyInPandas(normalize, schema="id long, v double")

After (new iterator API):

from typing import Iterator

def normalize(batches: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
    # Process data batch-by-batch
    for batch in batches:
        yield batch.assign(v=(batch.v - batch.v.mean()) / batch.v.std())

df.groupBy("id").applyInPandas(normalize, schema="id long, v double")

With Grouping Keys:

from typing import Iterator, Tuple, Any

def sum_by_key(key: Tuple[Any, ...], batches: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
    total = 0
    for batch in batches:
        total += batch['v'].sum()
    yield pd.DataFrame({"id": [key[0]], "total": [total]})

df.groupBy("id").applyInPandas(sum_by_key, schema="id long, total double")

Backward Compatibility: The existing DataFrame-to-DataFrame API is fully preserved and continues to work without changes.

How was this patch tested?

  • Added test_apply_in_pandas_iterator_basic - Basic functionality test
  • Added test_apply_in_pandas_iterator_with_keys - Test with grouping keys
  • Added test_apply_in_pandas_iterator_batch_slicing - Pressure test with 10M rows, 20 columns
  • Added test_apply_in_pandas_iterator_with_keys_batch_slicing - Pressure test with keys

Was this patch authored or co-authored using generative AI tooling?

Yes, tests generated by Cursor.

@Yicong-Huang Yicong-Huang changed the title [WIP][SPARK-53614] Add applyInPandas [WIP][SPARK-53614] Add Iterator[pandas.DataFrame] support to applyInPandas Oct 27, 2025
@Yicong-Huang Yicong-Huang changed the title [WIP][SPARK-53614] Add Iterator[pandas.DataFrame] support to applyInPandas [WIP][SPARK-53614] Add Iterator[pandas.DataFrame] support to applyInPandas Oct 27, 2025
@Yicong-Huang Yicong-Huang changed the title [WIP][SPARK-53614] Add Iterator[pandas.DataFrame] support to applyInPandas [WIP][SPARK-53614] Add Iterator[pandas.DataFrame] support to applyInPandas Oct 27, 2025
@Yicong-Huang Yicong-Huang changed the title [WIP][SPARK-53614] Add Iterator[pandas.DataFrame] support to applyInPandas [SPARK-53614] Add Iterator[pandas.DataFrame] support to applyInPandas Oct 27, 2025
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.pandas.typehints import infer_group_pandas_eval_type_from_func
from pyspark.sql.pandas.functions import PythonEvalType
import warnings
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not need to re-import PythonEvalType and warnings

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed


# Yield the generator for this group
# The generator must be fully consumed before the next group is processed
yield series_batches_gen
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's keep in line with

import pyarrow as pa
def process_group(batches: "Iterator[pa.RecordBatch]"):
for batch in batches:
struct = batch.column(0)
yield pa.RecordBatch.from_arrays(struct.flatten(), schema=pa.schema(struct.type))
dataframes_in_group = None
while dataframes_in_group is None or dataframes_in_group > 0:
dataframes_in_group = read_int(stream)
if dataframes_in_group == 1:
batch_iter = process_group(ArrowStreamSerializer.load_stream(self, stream))
yield batch_iter
# Make sure the batches are fully iterated before getting the next group
for _ in batch_iter:
pass
elif dataframes_in_group != 0:
raise PySparkValueError(
errorClass="INVALID_NUMBER_OF_DATAFRAMES_IN_GROUP",
messageParameters={"dataframes_in_group": str(dataframes_in_group)},
)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

aligned

)

# Yield the generator for this group
# The generator must be fully consumed before the next group is processed
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not ture, a UDF can partially consume a group, please refer to the GroupArrowUDFSerializer and the test

def test_apply_in_arrow_partial_iteration(self):
with self.sql_conf({"spark.sql.execution.arrow.maxRecordsPerBatch": 2}):
def func(group: Iterator[pa.RecordBatch]) -> Iterator[pa.RecordBatch]:
first = next(group)
yield pa.RecordBatch.from_pylist(
[{"value": r.as_py() % 4} for r in first.column(0)]
)
df = self.spark.range(20)
grouped_df = df.groupBy((col("id") % 4).cast("int"))
# Should get two records for each group
expected = [Row(value=x) for x in [0, 0, 1, 1, 2, 2, 3, 3]]
actual = grouped_df.applyInArrow(func, "value long").collect()
self.assertEqual(actual, expected)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's true. updated the comment and added a partial test.

@zhengruifeng zhengruifeng changed the title [SPARK-53614] Add Iterator[pandas.DataFrame] support to applyInPandas [SPARK-53614][PYTHON] Add Iterator[pandas.DataFrame] support to applyInPandas Oct 29, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants