diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index f1904c2436ab8..ccb60278f492f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -402,6 +402,8 @@ package object dsl { def localLimit(limitExpr: Expression): LogicalPlan = LocalLimit(limitExpr, logicalPlan) + def globalLimit(limitExpr: Expression): LogicalPlan = GlobalLimit(limitExpr, logicalPlan) + def offset(offsetExpr: Expression): LogicalPlan = Offset(offsetExpr, logicalPlan) def join( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 9d269f37e58b9..1067be49cab98 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -101,6 +101,7 @@ abstract class Optimizer(catalogManager: CatalogManager) // Operator push down PushProjectionThroughUnion, PushProjectionThroughLimit, + PushProjectionThroughOffset, ReorderJoin, EliminateOuterJoin, PushDownPredicates, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectionThroughOffset.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectionThroughOffset.scala new file mode 100644 index 0000000000000..498b0131b1b62 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectionThroughOffset.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Offset, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.TreePattern.{OFFSET, PROJECT} + +/** + * Pushes Project operator through Offset operator. + */ +object PushProjectionThroughOffset extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning( + _.containsAllPatterns(PROJECT, OFFSET)) { + + case p @ Project(projectList, offset @ Offset(_, child)) + if projectList.forall(_.deterministic) => + offset.copy(child = p.copy(projectList, child)) + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index caff207d28a16..52b7dabf252bb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -1399,6 +1399,8 @@ case class Offset(offsetExpr: Expression, child: LogicalPlan) extends OrderPrese } override protected def withNewChildInternal(newChild: LogicalPlan): Offset = copy(child = newChild) + + override val nodePatterns: Seq[TreePattern] = Seq(OFFSET) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala index 9856a26346f6a..3ea32f3cc464f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala @@ -131,6 +131,7 @@ object TreePattern extends Enumeration { val LOCAL_RELATION: Value = Value val LOGICAL_QUERY_STAGE: Value = Value val NATURAL_LIKE_JOIN: Value = Value + val OFFSET: Value = Value val OUTER_JOIN: Value = Value val PROJECT: Value = Value val PYTHON_DATA_SOURCE: Value = Value diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectionThroughOffsetSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectionThroughOffsetSuite.scala new file mode 100644 index 0000000000000..7b0a5b0fd2e0b --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectionThroughOffsetSuite.scala @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.Add +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.RuleExecutor + +class PushProjectionThroughOffsetSuite extends PlanTest { + + object Optimize extends RuleExecutor[LogicalPlan] { + val batches = Batch("Optimizer Batch", + FixedPoint(100), + PushProjectionThroughLimit, + PushProjectionThroughOffset, + EliminateLimits, + LimitPushDown) :: Nil + } + + test("push projection through offset") { + val testRelation = LocalRelation.fromExternalRows( + Seq("a".attr.int, "b".attr.int, "c".attr.int), + 1.to(30).map(_ => Row(1, 2, 3))) + + val query1 = testRelation + .offset(5) + .select($"a", $"b", $"c") + .analyze + val optimized1 = Optimize.execute(query1) + val expected1 = testRelation + .select($"a", $"b", $"c") + .offset(5).analyze + comparePlans(optimized1, expected1) + + val query2 = testRelation + .limit(15).offset(5) + .select($"a", $"b", $"c") + .analyze + val optimized2 = Optimize.execute(query2) + val expected2 = testRelation + .select($"a", $"b", $"c") + .limit(15).offset(5).analyze + comparePlans(optimized2, expected2) + + val query3 = testRelation + .offset(5).limit(15) + .select($"a", $"b", $"c") + .analyze + val optimized3 = Optimize.execute(query3) + val expected3 = testRelation + .select($"a", $"b", $"c") + .localLimit(Add(15, 5)).offset(5).globalLimit(15) + .analyze + comparePlans(optimized3, expected3) + + val query4 = testRelation + .offset(5).limit(15) + .select($"a", $"b", $"c") + .limit(10).analyze + val optimized4 = Optimize.execute(query4) + val expected4 = testRelation + .select($"a", $"b", $"c") + .localLimit(Add(10, 5)).offset(5).globalLimit(10) + .analyze + comparePlans(optimized4, expected4) + + val query5 = testRelation + .localLimit(10) + .select($"a", $"b", $"c") + .offset(5).limit(10).analyze + val optimized5 = Optimize.execute(query5) + val expected5 = testRelation + .select($"a", $"b", $"c") + .localLimit(10).offset(5).globalLimit(10) + .analyze + comparePlans(optimized5, expected5) + + val query6 = testRelation + .localLimit(20) + .select($"a", $"b", $"c") + .offset(5).limit(10).analyze + val optimized6 = Optimize.execute(query6) + val expected6 = testRelation + .select($"a", $"b", $"c") + .localLimit(15).offset(5).globalLimit(10) + .analyze + comparePlans(optimized6, expected6) + } +}