diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala index 8806142589275..cbb34d6d484f7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala @@ -895,10 +895,12 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] { // We should match the combination of limit and offset first, to get the optimal physical // plan, instead of planning limit and offset separately. case LimitAndOffset(limit, offset, child) => - GlobalLimitExec(limit, planLater(child), offset) :: Nil + GlobalLimitExec(limit, + LocalLimitExec(limit, planLater(child)), offset) :: Nil case OffsetAndLimit(offset, limit, child) => // 'Offset a' then 'Limit b' is the same as 'Limit a + b' then 'Offset a'. - GlobalLimitExec(limit = offset + limit, child = planLater(child), offset = offset) :: Nil + GlobalLimitExec(offset + limit, + LocalLimitExec(offset + limit, planLater(child)), offset) :: Nil case logical.LocalLimit(IntegerLiteral(limit), child) => execution.LocalLimitExec(limit, planLater(child)) :: Nil case logical.GlobalLimit(IntegerLiteral(limit), child) => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala index 8565e06ba9fa2..71a86d599c0c2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/PlannerSuite.scala @@ -1446,6 +1446,21 @@ class PlannerSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { } } } + + test("Limit and offset should not drop LocalLimitExec operator") { + val df = sql("SELECT * FROM (SELECT * FROM RANGE(100) LIMIT 25 OFFSET 3) WHERE id > 10") + val planned = df.queryExecution.sparkPlan + assert(planned.exists(_.isInstanceOf[GlobalLimitExec])) + assert(planned.exists(_.isInstanceOf[LocalLimitExec])) + } + + test("Offset and limit should not drop LocalLimitExec operator") { + val df = sql("""SELECT * FROM (SELECT * FROM + (SELECT * FROM RANGE(100) LIMIT 25) OFFSET 3) WHERE id > 10""".stripMargin) + val planned = df.queryExecution.sparkPlan + assert(planned.exists(_.isInstanceOf[GlobalLimitExec])) + assert(planned.exists(_.isInstanceOf[LocalLimitExec])) + } } // Used for unit-testing EnsureRequirements