Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-49653][SQL] Single join for correlated scalar subqueries #48145

Open
wants to merge 9 commits into
base: master
Choose a base branch
from

Conversation

agubichev
Copy link
Contributor

@agubichev agubichev commented Sep 18, 2024

What changes were proposed in this pull request?

Single join is a left outer join that checks that there is at most 1 build row for every probe row.

This PR adds single join implementation to support correlated scalar subqueries where the optimizer can't guarantee that 1 row is coming from them, e.g.:
select *, (select t1.x from t1 where t1.y >= t_outer.y) from t_outer.
-- this subquery is going to be rewritten as a single join that makes sure there is at most 1 matching build row for every probe row. It will issue a spark runtime error otherwise.

Design doc: https://docs.google.com/document/d/1NTsvtBTB9XvvyRvH62QzWIZuw4hXktALUG1fBP7ha1Q/edit

The optimizer introduces a single join in cases that were previously returning incorrect results (or were unsupported).
Only hash-based implementation is supported, the optimizer makes sure we don't plan a single join as a sort-merge join.

Why are the changes needed?

Expands our subquery coverage.

Does this PR introduce any user-facing change?

Yes, previously unsupported scalar subqueries should now work.

How was this patch tested?

Unit tests for the single join operator. Query tests for the subqueries.

Was this patch authored or co-authored using generative AI tooling?

No.

@agubichev agubichev changed the title [SPARK-49653] Single join for scalar subqueries [SQL] [SPARK-49653] Single join for scalar subqueries Sep 18, 2024
@agubichev agubichev changed the title [SQL] [SPARK-49653] Single join for scalar subqueries [SQL] [SPARK-49653] Single join for correlated scalar subqueries Sep 18, 2024
@dongjoon-hyun dongjoon-hyun changed the title [SQL] [SPARK-49653] Single join for correlated scalar subqueries [SPARK-49653][SQL] Single join for correlated scalar subqueries Sep 18, 2024
@agubichev
Copy link
Contributor Author

@cloud-fan
Copy link
Contributor

Do all the existing optimizer rules work fine with this single join? I understand that we need to implement the single-match check in all the physical join nodes, but semantic wise, is there anything we need to take care?

@agubichev
Copy link
Contributor Author

Do all the existing optimizer rules work fine with this single join? I understand that we need to implement the single-match check in all the physical join nodes, but semantic wise, is there anything we need to take care?

@cloud-fan

I've traced all the usages of LeftOuter in the catalyst rules (see the full list below). In general, the rules act on the basis of "allow-list", so if the join type is not explicitly matched by the rule, it is not applied. As LeftOuter is a "close relative" to LeftSingle (in fact, at HEAD we are using LeftOuter in place of LeftSingle), it is enough to check the rules that already reference LeftOuter explicitly. Since LeftOuter joins are already super restrictive as to what kind of optimizations can be applied to them (and frequently LeftOuter joins restrict optimizations in the plan nodes around them too), I am not aware of many jointype-agnostic rules. The ones that I do know of, like ReplaceNullWithFalseInPredicate, apply to both LeftOuter and LeftSingle without change.

These rules have been updated for LeftSingle join:

  • EliminateOuterJoin -- should not apply to LeftSingle, updated
  • PushPredicateThroughJoin -- not all cases should apply to LeftSingle, updated
  • FoldablePropagation

The following rules are only matching LeftOuter join for now, therefore skipping LeftSingle join unchanged. Semantics-wise, it is ok to skip every single one of these rules for the LeftSingle join. Further analysis is needed on whether we can/should enable them for LeftSingle joins:

  • InferFiltersFromConstraints
  • LimitPushDown
  • PropagateEmptyRelation
  • PushLeftSemiLeftAntiThroughJoin
  • PushExtraPredicateThroughJoin

There are couple of rules that apply to LeftOuter, but do not make sense to LeftSingle. In both cases LeftOuter is explicitly matched, so they will skip LeftSingle as they should:

  • CheckCartesianProducts
  • RewriteAsOfJoin

"MUST_AGGREGATE_CORRELATED_SCALAR_SUBQUERY",
messageParameters = Map.empty)
if (!SQLConf.get.getConf(SQLConf.SCALAR_SUBQUERY_USE_SINGLE_JOIN)) {
expr.failAnalysis(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just for my curiosity, why there are two places checking scalar subqueries?

Copy link
Contributor Author

@agubichev agubichev Sep 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The second check (in lines 1060-1080 of the current file) checks the places where the scalar subquery is allowed (e.g., it can occur in the project or filter but not in the join predicate).
The first check ensures the property of the subquery itself (1 column and at most 1 row)

@@ -456,6 +456,33 @@ object PullupCorrelatedPredicates extends Rule[LogicalPlan] with PredicateHelper
(newPlan, newCond)
}

// Returns true if 'query' is guaranteed to return at most 1 row.
private def guaranteedToReturnOneRow(query: LogicalPlan): Boolean = {
Copy link
Contributor

@cloud-fan cloud-fan Sep 20, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we do some refactor to avoid duplicating code between this and CheckAnalysis?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done, but the amount of savings is not spectacular, because in CheckAnalysis there is also a "legacy" path (that results in incorrect results) where the check is significantly weaker.

Once the single join is rolled out, we will just remove all extra checks from CheckAnalysis.

@@ -258,6 +258,7 @@ message Join {
JOIN_TYPE_LEFT_ANTI = 5;
JOIN_TYPE_LEFT_SEMI = 6;
JOIN_TYPE_CROSS = 7;
JOIN_TYPE_LEFT_SINGLE = 8;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need it? The parsed plan from Spark Connect client should never use it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

@@ -269,8 +269,13 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
}
}

def canMerge(joinType: JoinType): Boolean = joinType match {
case LeftSingle => false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why can't SMJ do single join?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in principle, it can do it, but I did not implement it.
as we only need single joins for correlated scalar subqueries with non-equal predicate, the SMJ are unlikely to be useful there anyways.
(for equi-predicates like
'col = outer(col) .. group by col', we reason that the query returns at most 1 row and don't need the single join).

private def outerJoin(relation: Broadcast[Array[InternalRow]]): RDD[InternalRow] = {
private def outerJoin(
relation: Broadcast[Array[InternalRow]],
checkMatches: Int => Int): RDD[InternalRow] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't it be Int => Unit?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed the checkMatches in favor of your next suggestion.

}
while (nextIndex < buildRows.length) {
resultRow = joinedRow(streamRow, buildRows(nextIndex))
nextIndex += 1
if (boundCondition(resultRow)) {
foundMatch = true
matches = checkMatches(matches)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we only need a single singleJoin boolean flag?

if (boundCondition(resultRow)) {
  if (foundMatch && singleJoin) throw ...
  foundMatch = true
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

neat!

select * from x where (select count(*) from y where y1 > x1 group by y1) = 1;
select *, (select count(*) from y where y1 + y2 = x1 group by y1) from x;
select *, (select count(*) from y where x1 = y1 and y2 + 10 = x1 + 1 group by y2) from x;
reset spark.sql.optimizer.scalarSubqueryUseSingleJoin;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems not needed as the following testing queries set the config explicitly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed

select * from x where (select count(*) from y where y1 > x1 group by y1) = 1;
reset spark.sql.legacy.scalarSubqueryAllowGroupByNonEqualityCorrelatedPredicate;
reset spark.sql.optimizer.scalarSubqueryUseSingleJoin;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's actually not needed as each golden file is tested with a fresh SparkSession

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed



private def testSingleJoin(
testName: String,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: 4 spaces indentation

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


testWithWholeStageCodegenOnAndOff(s"$testName using BroadcastHashJoin") { _ =>
extractJoinParts().foreach { case (_, leftKeys, rightKeys, boundCondition, _, _, _, _) =>
withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "1") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a specific reason to set this conf?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no reason, just copied it from similar test. Removed it now

@github-actions github-actions bot removed the CONNECT label Sep 20, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants