-
Notifications
You must be signed in to change notification settings - Fork 28.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-49653][SQL] Single join for correlated scalar subqueries #48145
Changes from 7 commits
9a74418
37b092c
b714a09
6a22536
d688696
82023e3
e27b57f
607352e
d0c17ea
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -456,6 +456,33 @@ object PullupCorrelatedPredicates extends Rule[LogicalPlan] with PredicateHelper | |
(newPlan, newCond) | ||
} | ||
|
||
// Returns true if 'query' is guaranteed to return at most 1 row. | ||
private def guaranteedToReturnOneRow(query: LogicalPlan): Boolean = { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we do some refactor to avoid duplicating code between this and There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done, but the amount of savings is not spectacular, because in CheckAnalysis there is also a "legacy" path (that results in incorrect results) where the check is significantly weaker. Once the single join is rolled out, we will just remove all extra checks from CheckAnalysis. |
||
if (query.maxRows.exists(_ <= 1)) { | ||
return true | ||
} | ||
val aggNode = query match { | ||
case havingPart@Filter(_, aggPart: Aggregate) => Some(aggPart) | ||
case aggPart: Aggregate => Some(aggPart) | ||
// LIMIT 1 is handled above, this is for all other types of LIMITs | ||
case Limit(_, aggPart: Aggregate) => Some(aggPart) | ||
case Project(_, aggPart: Aggregate) => Some(aggPart) | ||
case _: LogicalPlan => None | ||
} | ||
if (!aggNode.isDefined) { | ||
return false | ||
} | ||
// This is the logic we currently use in CheckAnalysis for aggregates in scalar subqueries. | ||
val correlatedEquivalentExprs = getCorrelatedEquivalentInnerExpressions(query) | ||
// Grouping expressions, except outer refs and constant expressions - grouping by an | ||
// outer ref or a constant is always ok | ||
val groupByExprs = | ||
ExpressionSet(aggNode.get.groupingExpressions.filter(x => !x.isInstanceOf[OuterReference] && | ||
x.references.nonEmpty)) | ||
val nonEquivalentGroupByExprs = groupByExprs -- correlatedEquivalentExprs | ||
nonEquivalentGroupByExprs.isEmpty | ||
} | ||
|
||
private def rewriteSubQueries(plan: LogicalPlan): LogicalPlan = { | ||
/** | ||
* This function is used as a aid to enforce idempotency of pullUpCorrelatedPredicate rule. | ||
|
@@ -481,7 +508,8 @@ object PullupCorrelatedPredicates extends Rule[LogicalPlan] with PredicateHelper | |
} | ||
|
||
plan.transformExpressionsWithPruning(_.containsPattern(PLAN_EXPRESSION)) { | ||
case ScalarSubquery(sub, children, exprId, conditions, hint, mayHaveCountBugOld) | ||
case ScalarSubquery(sub, children, exprId, conditions, hint, | ||
mayHaveCountBugOld, needSingleJoinOld) | ||
if children.nonEmpty => | ||
|
||
def mayHaveCountBugAgg(a: Aggregate): Boolean = { | ||
|
@@ -527,8 +555,13 @@ object PullupCorrelatedPredicates extends Rule[LogicalPlan] with PredicateHelper | |
val (topPart, havingNode, aggNode) = splitSubquery(sub) | ||
(aggNode.isDefined && aggNode.get.groupingExpressions.isEmpty) | ||
} | ||
val needSingleJoin = if (needSingleJoinOld.isDefined) { | ||
needSingleJoinOld.get | ||
} else { | ||
conf.getConf(SQLConf.SCALAR_SUBQUERY_USE_SINGLE_JOIN) && !guaranteedToReturnOneRow(sub) | ||
} | ||
ScalarSubquery(newPlan, children, exprId, getJoinCondition(newCond, conditions), | ||
hint, Some(mayHaveCountBug)) | ||
hint, Some(mayHaveCountBug), Some(needSingleJoin)) | ||
case Exists(sub, children, exprId, conditions, hint) if children.nonEmpty => | ||
val (newPlan, newCond) = if (SQLConf.get.decorrelateInnerQueryEnabledForExistsIn) { | ||
decorrelate(sub, plan, handleCountBug = true) | ||
|
@@ -786,17 +819,22 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] with AliasHelpe | |
subqueries: ArrayBuffer[ScalarSubquery]): (LogicalPlan, AttributeMap[Attribute]) = { | ||
val subqueryAttrMapping = ArrayBuffer[(Attribute, Attribute)]() | ||
val newChild = subqueries.foldLeft(child) { | ||
case (currentChild, ScalarSubquery(sub, _, _, conditions, subHint, mayHaveCountBug)) => | ||
case (currentChild, ScalarSubquery(sub, _, _, conditions, subHint, mayHaveCountBug, | ||
needSingleJoin)) => | ||
val query = DecorrelateInnerQuery.rewriteDomainJoins(currentChild, sub, conditions) | ||
val origOutput = query.output.head | ||
// The subquery appears on the right side of the join, hence add its hint to the right | ||
// of a join hint | ||
val joinHint = JoinHint(None, subHint) | ||
|
||
val resultWithZeroTups = evalSubqueryOnZeroTups(query) | ||
val joinType = needSingleJoin match { | ||
case Some(true) => LeftSingle | ||
case _ => LeftOuter | ||
} | ||
lazy val planWithoutCountBug = Project( | ||
currentChild.output :+ origOutput, | ||
Join(currentChild, query, LeftOuter, conditions.reduceOption(And), joinHint)) | ||
Join(currentChild, query, joinType, conditions.reduceOption(And), joinHint)) | ||
|
||
if (Utils.isTesting) { | ||
assert(mayHaveCountBug.isDefined) | ||
|
@@ -845,7 +883,7 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] with AliasHelpe | |
currentChild.output :+ subqueryResultExpr, | ||
Join(currentChild, | ||
Project(query.output :+ alwaysTrueExpr, query), | ||
LeftOuter, conditions.reduceOption(And), joinHint)) | ||
joinType, conditions.reduceOption(And), joinHint)) | ||
|
||
} else { | ||
// CASE 3: Subquery with HAVING clause. Pull the HAVING clause above the join. | ||
|
@@ -877,7 +915,7 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] with AliasHelpe | |
currentChild.output :+ caseExpr, | ||
Join(currentChild, | ||
Project(subqueryRoot.output :+ alwaysTrueExpr, subqueryRoot), | ||
LeftOuter, conditions.reduceOption(And), joinHint)) | ||
joinType, conditions.reduceOption(And), joinHint)) | ||
} | ||
} | ||
} | ||
|
@@ -1028,7 +1066,7 @@ object OptimizeOneRowRelationSubquery extends Rule[LogicalPlan] { | |
|
||
case p: LogicalPlan => p.transformExpressionsUpWithPruning( | ||
_.containsPattern(SCALAR_SUBQUERY)) { | ||
case s @ ScalarSubquery(OneRowSubquery(p @ Project(_, _: OneRowRelation)), _, _, _, _, _) | ||
case s @ ScalarSubquery(OneRowSubquery(p @ Project(_, _: OneRowRelation)), _, _, _, _, _, _) | ||
if !hasCorrelatedSubquery(s.plan) && s.joinCond.isEmpty => | ||
assert(p.projectList.size == 1) | ||
stripOuterReferences(p.projectList).head | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -258,6 +258,7 @@ message Join { | |
JOIN_TYPE_LEFT_ANTI = 5; | ||
JOIN_TYPE_LEFT_SEMI = 6; | ||
JOIN_TYPE_CROSS = 7; | ||
JOIN_TYPE_LEFT_SINGLE = 8; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. do we need it? The parsed plan from Spark Connect client should never use it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. removed |
||
} | ||
|
||
// (Optional) Only used by joinWith. Set the left and right join data types. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -269,8 +269,13 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { | |
} | ||
} | ||
|
||
def canMerge(joinType: JoinType): Boolean = joinType match { | ||
case LeftSingle => false | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why can't SMJ do single join? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. in principle, it can do it, but I did not implement it. |
||
case _ => true | ||
} | ||
|
||
def createSortMergeJoin() = { | ||
if (RowOrdering.isOrderable(leftKeys)) { | ||
if (canMerge(joinType) && RowOrdering.isOrderable(leftKeys)) { | ||
Some(Seq(joins.SortMergeJoinExec( | ||
leftKeys, rightKeys, joinType, nonEquiCond, planLater(left), planLater(right)))) | ||
} else { | ||
|
@@ -297,7 +302,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { | |
// This join could be very slow or OOM | ||
// Build the smaller side unless the join requires a particular build side | ||
// (e.g. NO_BROADCAST_AND_REPLICATION hint) | ||
val requiredBuildSide = getBroadcastNestedLoopJoinBuildSide(hint) | ||
val requiredBuildSide = getBroadcastNestedLoopJoinBuildSide(hint, joinType) | ||
val buildSide = requiredBuildSide.getOrElse(getSmallerSide(left, right)) | ||
Seq(joins.BroadcastNestedLoopJoinExec( | ||
planLater(left), planLater(right), buildSide, joinType, j.condition)) | ||
|
@@ -390,7 +395,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { | |
// This join could be very slow or OOM | ||
// Build the desired side unless the join requires a particular build side | ||
// (e.g. NO_BROADCAST_AND_REPLICATION hint) | ||
val requiredBuildSide = getBroadcastNestedLoopJoinBuildSide(hint) | ||
val requiredBuildSide = getBroadcastNestedLoopJoinBuildSide(hint, joinType) | ||
val buildSide = requiredBuildSide.getOrElse(desiredBuildSide) | ||
Seq(joins.BroadcastNestedLoopJoinExec( | ||
planLater(left), planLater(right), buildSide, joinType, condition)) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just for my curiosity, why there are two places checking scalar subqueries?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The second check (in lines 1060-1080 of the current file) checks the places where the scalar subquery is allowed (e.g., it can occur in the project or filter but not in the join predicate).
The first check ensures the property of the subquery itself (1 column and at most 1 row)