Skip to content

Commit

Permalink
[SPARK-46693][SQL] Inject LocalLimitExec when matching OffsetAndLimit…
Browse files Browse the repository at this point in the history
… or LimitAndOffset

### What changes were proposed in this pull request?

- Add LocalLimitExec to SparkStrategies in Limit + Offset cases
- Add UT

### Why are the changes needed?

Originally, `OffsetAndLimit` and `LimitAndOffset` match cases were matching then dropping a LocalLimit node. Adds this LocalLimitExec node to the physical plan to improve efficiency. Note that this was not a correctness bug since not applying LocalLimit only leads to larger intermediate shuffles / nodes.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

UT

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #44699 from n-young-db/limit-offset-drops-local-limit.

Authored-by: Nick Young <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
  • Loading branch information
n-young-db authored and dongjoon-hyun committed Jan 13, 2024
1 parent 4be05f5 commit 11890cc
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -895,10 +895,12 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
// We should match the combination of limit and offset first, to get the optimal physical
// plan, instead of planning limit and offset separately.
case LimitAndOffset(limit, offset, child) =>
GlobalLimitExec(limit, planLater(child), offset) :: Nil
GlobalLimitExec(limit,
LocalLimitExec(limit, planLater(child)), offset) :: Nil
case OffsetAndLimit(offset, limit, child) =>
// 'Offset a' then 'Limit b' is the same as 'Limit a + b' then 'Offset a'.
GlobalLimitExec(limit = offset + limit, child = planLater(child), offset = offset) :: Nil
GlobalLimitExec(offset + limit,
LocalLimitExec(offset + limit, planLater(child)), offset) :: Nil
case logical.LocalLimit(IntegerLiteral(limit), child) =>
execution.LocalLimitExec(limit, planLater(child)) :: Nil
case logical.GlobalLimit(IntegerLiteral(limit), child) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1446,6 +1446,21 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper {
}
}
}

test("Limit and offset should not drop LocalLimitExec operator") {
val df = sql("SELECT * FROM (SELECT * FROM RANGE(100) LIMIT 25 OFFSET 3) WHERE id > 10")
val planned = df.queryExecution.sparkPlan
assert(planned.exists(_.isInstanceOf[GlobalLimitExec]))
assert(planned.exists(_.isInstanceOf[LocalLimitExec]))
}

test("Offset and limit should not drop LocalLimitExec operator") {
val df = sql("""SELECT * FROM (SELECT * FROM
(SELECT * FROM RANGE(100) LIMIT 25) OFFSET 3) WHERE id > 10""".stripMargin)
val planned = df.queryExecution.sparkPlan
assert(planned.exists(_.isInstanceOf[GlobalLimitExec]))
assert(planned.exists(_.isInstanceOf[LocalLimitExec]))
}
}

// Used for unit-testing EnsureRequirements
Expand Down

0 comments on commit 11890cc

Please sign in to comment.