Skip to content

Conversation

peter-toth
Copy link
Contributor

@peter-toth peter-toth commented Sep 4, 2025

What changes were proposed in this pull request?

Latest improvements to CollapseProject rule (like #33958) prevented duplicating expressive expressons, which brings considerable performance improvement in many cases.
But there is one particular case when it can introduce significant perfoamance degradation. Consider a query where the adjacent project nodes don't get collapsed because they contain expensive, multiple referenced expressions, but the nodes also contain Python UDF expressions that otherwise wouldn't prevent project node collapsion. E.g.:

Project a + a as a_plus_a, PythonUDF(...) as udf2, udf1
  Project <expensive calculation> as a, PythonUDF(...) as udf1
    ...

In the above example CollapseProject doesn't modify the 2 project nodes, which then causes 2 BatchEvalPython nodes to appear in the plan when ExtractPythonUDFs extracts them:

Project a + a as a_plus_a, udf2, udf1
  BatchEvalPython PythonUDF(...) -> udf2
    Project <expensive calculation> as a, udf1
      BatchEvalPython PythonUDF(...) -> udf1
        ...

The 2 BatchEvalPython nodes can cause significant serialization/deserialization overhead compared to the case when the original project nodes were collapsed and we had only 1 BatchEvalPython node.

The old behaviour can be restored with setting spark.sql.optimizer.collapseProjectAlwaysInline=true, but it is still not ideal as we lose the performance improvement in other cases.

This PR improves to the CollapseProject rule to force merging Python UDFs in project groups (multiple adjacent project nodes) when they can be executed in one run.

Why are the changes needed?

To fix performance regression caused by latest changes to CollapseProject.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

New and existing UTs.

Was this patch authored or co-authored using generative AI tooling?

No.

@peter-toth
Copy link
Contributor Author

cc @cloud-fan , @dongjoon-hyun

@peter-toth peter-toth force-pushed the SPARK-53399-collapse-python-udfs branch from c7eb6cb to 4814344 Compare September 4, 2025 17:36

def correctEvalType(udf: PythonUDF): Int = {
if (udf.evalType == PythonEvalType.SQL_ARROW_BATCHED_UDF) {
if (conf.pythonUDFArrowFallbackOnUDT &&
Copy link
Member

@dongjoon-hyun dongjoon-hyun Sep 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we simply get the value of conf.pythonUDFArrowFallbackOnUDT as a parameter? It looks too much for me to bring SQLConfHelper simply in order to use conf in this object. WDYT, @peter-toth ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in 50664c5.

case Seq(child: PythonUDF) => correctEvalType(e) == correctEvalType(child) &&
shouldExtractUDFExpressionTree(child)
case Seq(child: PythonUDF) =>
PythonUDF.correctEvalType(e) == PythonUDF.correctEvalType(child) &&
Copy link
Member

@dongjoon-hyun dongjoon-hyun Sep 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we simply import PythonUDF.correctEvalType method directly?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, done in 50664c5.

@@ -51,6 +51,27 @@ object PythonUDF {
// support new types in the future, e.g, N -> N transform.
e.isInstanceOf[PythonUDAF]
}

def correctEvalType(udf: PythonUDF): Int = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what does "correct" mean here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, the code was moved from somwhere else.

@cloud-fan
Copy link
Contributor

Shall we add a new rule for merging Python UDFs? I think it's orthogonal to CollapseProject: The new rule will pull up the lower Python UDF to the upper Project, even if it duplicates expressions or make the lower Project very wide. Then we leave it to CollapseProject to decide if we can completely merge the two Projects. I think it can produce better plan as we only duplicate minimal expressions to make Python UDFs live in the same Project.

@peter-toth
Copy link
Contributor Author

Shall we add a new rule for merging Python UDFs? I think it's orthogonal to CollapseProject: The new rule will pull up the lower Python UDF to the upper Project, even if it duplicates expressions or make the lower Project very wide. Then we leave it to CollapseProject to decide if we can completely merge the two Projects. I think it can produce better plan as we only duplicate minimal expressions to make Python UDFs live in the same Project.

I want to revisit #52149 a bit later to not just collapse or don't collapse, but be able to merge certain expressions into upper and keep others in lower. I think once we do that in CollapseProject we don't need 2 separate rules (traversals on the plan).
Do you think we can merge this PR to fix the regression and I can work on an improvement next week?

case p1 @ Project(_, p2: Project)
if canCollapseExpressions(p1.projectList, p2.projectList, alwaysInline) =>
if canCollapseExpressions(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm OK to special-case Python UDF in CollpaseProject for now and refactor it later, but we should still make it efiicient.

Can we use the idea from #52149 to not completely merge two Projects for Python UDF? We can pull up the Python UDF from the lower Project to the upper one so that they live in the same Project. I.e. we add extra pattern matches after case ... if canCollapseExpressions so that if we can't fully collapse two Projects, we check if we can partially merge them for Python UDF.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the merge idea, but wanted handle the case fully in mergeProjectExpressions().

@peter-toth peter-toth force-pushed the SPARK-53399-collapse-python-udfs branch from 50664c5 to ebff22c Compare September 9, 2025 22:54
@@ -1319,7 +1447,7 @@ object CollapseProject extends Rule[LogicalPlan] with AliasHelper {

def buildCleanedProjectList(
upper: Seq[NamedExpression],
lower: Seq[NamedExpression]): Seq[NamedExpression] = {
lower: Iterable[NamedExpression]): Seq[NamedExpression] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a question. Where does this PR hand over non-Seq type with this method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -51,6 +51,27 @@ object PythonUDF {
// support new types in the future, e.g, N -> N transform.
e.isInstanceOf[PythonUDAF]
}

def correctEvalType(udf: PythonUDF, pythonUDFArrowFallbackOnUDT: Boolean): Int = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for avoiding SparkConf.

alwaysInline,
newPythonUDFEvalTypesInUpperProjects,
pythonUDFArrowFallbackOnUDT)
&& canCollapseAggregate(p, agg) =>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. maybe more indentation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, done in 5ef3869.


private def cheapToInlineProducer(
producer: NamedExpression,
relatedConsumers: Iterable[Expression]) = trimAliases(producer) match {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use Seq[Expression] here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can restore the Seq[] type, but here: https://github.com/apache/spark/pull/52238/files#diff-11264d807efa58054cca2d220aae8fba644ee0f0f2a4722c46d52828394846efR1323 we pass in an ExpressionSet type relatedConsumers argument and all we do in cheapToInlineProducer() is to iterate on relatedConsumers.

val pythonUDFArrowFallbackOnUDT = conf.pythonUDFArrowFallbackOnUDT

traverse(plan, alwaysInline, Set.empty, pythonUDFArrowFallbackOnUDT)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we simplify a little more like the following?

def apply(plan: LogicalPlan, alwaysInline: Boolean): LogicalPlan = {
  traverse(plan, alwaysInline, Set.empty, conf.pythonUDFArrowFallbackOnUDT)
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in e2db127.

@peter-toth peter-toth changed the title [SPARK-53399][SQL] Collapse Python UDFs [SPARK-53399][SQL] Merge Python UDFs Sep 10, 2025
@peter-toth peter-toth force-pushed the SPARK-53399-collapse-python-udfs branch from 61632fe to 30f3be9 Compare September 10, 2025 04:37
val neverInlines = ListBuffer.empty[NamedExpression]
val mustInlines = ListBuffer.empty[NamedExpression]
val maybeInlines = ListBuffer.empty[NamedExpression]
val others = ListBuffer.empty[NamedExpression]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This category is not explained in the comment above.

}
}

case o => others += o
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is always Attribute?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants