From 59277c4674830122122f5cbcc085d1fdf73e53ee Mon Sep 17 00:00:00 2001 From: pavle-martinovic_data Date: Wed, 19 Mar 2025 13:29:36 +0100 Subject: [PATCH 1/3] Add rule that pushes Project through Offset and Suite that tests it --- .../spark/sql/catalyst/dsl/package.scala | 2 + .../sql/catalyst/optimizer/Optimizer.scala | 1 + .../PushProjectionThroughOffset.scala | 35 ++++++++ .../plans/logical/basicLogicalOperators.scala | 2 + .../sql/catalyst/trees/TreePatterns.scala | 1 + .../PushProjectionThroughOffsetSuite.scala | 82 +++++++++++++++++++ 6 files changed, 123 insertions(+) create mode 100644 sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectionThroughOffset.scala create mode 100644 sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectionThroughOffsetSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala index f1904c2436ab8..ccb60278f492f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala @@ -402,6 +402,8 @@ package object dsl { def localLimit(limitExpr: Expression): LogicalPlan = LocalLimit(limitExpr, logicalPlan) + def globalLimit(limitExpr: Expression): LogicalPlan = GlobalLimit(limitExpr, logicalPlan) + def offset(offsetExpr: Expression): LogicalPlan = Offset(offsetExpr, logicalPlan) def join( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 9d269f37e58b9..1067be49cab98 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -101,6 +101,7 @@ abstract class Optimizer(catalogManager: CatalogManager) // Operator push down PushProjectionThroughUnion, PushProjectionThroughLimit, + PushProjectionThroughOffset, ReorderJoin, EliminateOuterJoin, PushDownPredicates, diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectionThroughOffset.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectionThroughOffset.scala new file mode 100644 index 0000000000000..498b0131b1b62 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectionThroughOffset.scala @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Offset, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.catalyst.trees.TreePattern.{OFFSET, PROJECT} + +/** + * Pushes Project operator through Offset operator. + */ +object PushProjectionThroughOffset extends Rule[LogicalPlan] { + def apply(plan: LogicalPlan): LogicalPlan = plan.transformWithPruning( + _.containsAllPatterns(PROJECT, OFFSET)) { + + case p @ Project(projectList, offset @ Offset(_, child)) + if projectList.forall(_.deterministic) => + offset.copy(child = p.copy(projectList, child)) + } +} diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index caff207d28a16..52b7dabf252bb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -1399,6 +1399,8 @@ case class Offset(offsetExpr: Expression, child: LogicalPlan) extends OrderPrese } override protected def withNewChildInternal(newChild: LogicalPlan): Offset = copy(child = newChild) + + override val nodePatterns: Seq[TreePattern] = Seq(OFFSET) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala index 9856a26346f6a..3ea32f3cc464f 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreePatterns.scala @@ -131,6 +131,7 @@ object TreePattern extends Enumeration { val LOCAL_RELATION: Value = Value val LOGICAL_QUERY_STAGE: Value = Value val NATURAL_LIKE_JOIN: Value = Value + val OFFSET: Value = Value val OUTER_JOIN: Value = Value val PROJECT: Value = Value val PYTHON_DATA_SOURCE: Value = Value diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectionThroughOffsetSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectionThroughOffsetSuite.scala new file mode 100644 index 0000000000000..1eddba23054aa --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectionThroughOffsetSuite.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.Add +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.RuleExecutor + +class PushProjectionThroughOffsetSuite extends PlanTest { + object Optimize1 extends RuleExecutor[LogicalPlan] { + val batches = Batch("Optimizer Batch", + FixedPoint(100), + PushProjectionThroughLimit, + PushProjectionThroughOffset, + EliminateLimits) :: Nil + } + + object Optimize2 extends RuleExecutor[LogicalPlan] { + val batches = Batch("Optimizer Batch", + FixedPoint(100), + PushProjectionThroughLimit, + PushProjectionThroughOffset, + EliminateLimits, + LimitPushDown) :: Nil + } + + test("push projection through offset") { + val testRelation = LocalRelation.fromExternalRows( + Seq("a".attr.int, "b".attr.int, "c".attr.int), + 1.to(30).map(_ => Row(1, 2, 3))) + + val query1 = testRelation + .offset(5) + .select(Symbol("a"), Symbol("b"), 'c') + .analyze + val optimized1 = Optimize1.execute(query1) + val expected1 = testRelation + .select(Symbol("a"), Symbol("b"), 'c') + .offset(5).analyze + comparePlans(optimized1, expected1) + + val query2 = testRelation + .offset(5).limit(15) + .select(Symbol("a"), Symbol("b"), 'c') + .analyze + val optimized2 = Optimize1.execute(query2) + val expected2 = testRelation + .select(Symbol("a"), Symbol("b"), 'c') + .offset(5).limit(15).analyze + comparePlans(optimized2, expected2) + + val query3 = testRelation + .offset(5).limit(15) + .select(Symbol("a"), Symbol("b"), 'c') + .analyze + val optimized3 = Optimize2.execute(query3) + val expected3 = testRelation + .select(Symbol("a"), Symbol("b"), 'c') + .localLimit(Add(15, 5)).offset(5).globalLimit(15) + .analyze + comparePlans(optimized3, expected3) + } +} From 90e1b505d6b62623992e51ec8b92cf0c1004b34c Mon Sep 17 00:00:00 2001 From: pavle-martinovic_data Date: Wed, 19 Mar 2025 15:37:09 +0100 Subject: [PATCH 2/3] Replace to have just one Optimize, and change and add tests --- .../PushProjectionThroughOffsetSuite.scala | 52 ++++++++++++++----- 1 file changed, 39 insertions(+), 13 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectionThroughOffsetSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectionThroughOffsetSuite.scala index 1eddba23054aa..1866f45551b9e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectionThroughOffsetSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectionThroughOffsetSuite.scala @@ -26,15 +26,8 @@ import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor class PushProjectionThroughOffsetSuite extends PlanTest { - object Optimize1 extends RuleExecutor[LogicalPlan] { - val batches = Batch("Optimizer Batch", - FixedPoint(100), - PushProjectionThroughLimit, - PushProjectionThroughOffset, - EliminateLimits) :: Nil - } - object Optimize2 extends RuleExecutor[LogicalPlan] { + object Optimize extends RuleExecutor[LogicalPlan] { val batches = Batch("Optimizer Batch", FixedPoint(100), PushProjectionThroughLimit, @@ -52,31 +45,64 @@ class PushProjectionThroughOffsetSuite extends PlanTest { .offset(5) .select(Symbol("a"), Symbol("b"), 'c') .analyze - val optimized1 = Optimize1.execute(query1) + val optimized1 = Optimize.execute(query1) val expected1 = testRelation .select(Symbol("a"), Symbol("b"), 'c') .offset(5).analyze comparePlans(optimized1, expected1) val query2 = testRelation - .offset(5).limit(15) + .limit(15).offset(5) .select(Symbol("a"), Symbol("b"), 'c') .analyze - val optimized2 = Optimize1.execute(query2) + val optimized2 = Optimize.execute(query2) val expected2 = testRelation .select(Symbol("a"), Symbol("b"), 'c') - .offset(5).limit(15).analyze + .limit(15).offset(5).analyze comparePlans(optimized2, expected2) val query3 = testRelation .offset(5).limit(15) .select(Symbol("a"), Symbol("b"), 'c') .analyze - val optimized3 = Optimize2.execute(query3) + val optimized3 = Optimize.execute(query3) val expected3 = testRelation .select(Symbol("a"), Symbol("b"), 'c') .localLimit(Add(15, 5)).offset(5).globalLimit(15) .analyze comparePlans(optimized3, expected3) + + val query4 = testRelation + .offset(5).limit(15) + .select(Symbol("a"), Symbol("b"), 'c') + .limit(10).analyze + val optimized4 = Optimize.execute(query4) + val expected4 = testRelation + .select(Symbol("a"), Symbol("b"), 'c') + .localLimit(Add(10, 5)).offset(5).globalLimit(10) + .analyze + comparePlans(optimized4, expected4) + + val query5 = testRelation + .localLimit(10) + .select(Symbol("a"), Symbol("b"), 'c') + .offset(5).limit(10).analyze + val optimized5 = Optimize.execute(query5) + val expected5 = testRelation + .select(Symbol("a"), Symbol("b"), 'c') + .localLimit(10).offset(5).globalLimit(10) + .analyze + comparePlans(optimized5, expected5) + + val query6 = testRelation + .localLimit(20) + .select(Symbol("a"), Symbol("b"), 'c') + .offset(5).limit(10).analyze + val optimized6 = Optimize.execute(query6) + val expected6 = testRelation + .select(Symbol("a"), Symbol("b"), 'c') + .localLimit(15).offset(5).globalLimit(10) + .analyze + comparePlans(optimized6, expected6) } } From a565962bbe96e5ccc8d4683484cb69b2884110fb Mon Sep 17 00:00:00 2001 From: pavle-martinovic_data Date: Fri, 21 Mar 2025 14:55:03 +0100 Subject: [PATCH 3/3] Change select command --- .../PushProjectionThroughOffsetSuite.scala | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectionThroughOffsetSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectionThroughOffsetSuite.scala index 1866f45551b9e..7b0a5b0fd2e0b 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectionThroughOffsetSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PushProjectionThroughOffsetSuite.scala @@ -43,64 +43,64 @@ class PushProjectionThroughOffsetSuite extends PlanTest { val query1 = testRelation .offset(5) - .select(Symbol("a"), Symbol("b"), 'c') + .select($"a", $"b", $"c") .analyze val optimized1 = Optimize.execute(query1) val expected1 = testRelation - .select(Symbol("a"), Symbol("b"), 'c') + .select($"a", $"b", $"c") .offset(5).analyze comparePlans(optimized1, expected1) val query2 = testRelation .limit(15).offset(5) - .select(Symbol("a"), Symbol("b"), 'c') + .select($"a", $"b", $"c") .analyze val optimized2 = Optimize.execute(query2) val expected2 = testRelation - .select(Symbol("a"), Symbol("b"), 'c') + .select($"a", $"b", $"c") .limit(15).offset(5).analyze comparePlans(optimized2, expected2) val query3 = testRelation .offset(5).limit(15) - .select(Symbol("a"), Symbol("b"), 'c') + .select($"a", $"b", $"c") .analyze val optimized3 = Optimize.execute(query3) val expected3 = testRelation - .select(Symbol("a"), Symbol("b"), 'c') + .select($"a", $"b", $"c") .localLimit(Add(15, 5)).offset(5).globalLimit(15) .analyze comparePlans(optimized3, expected3) val query4 = testRelation .offset(5).limit(15) - .select(Symbol("a"), Symbol("b"), 'c') + .select($"a", $"b", $"c") .limit(10).analyze val optimized4 = Optimize.execute(query4) val expected4 = testRelation - .select(Symbol("a"), Symbol("b"), 'c') + .select($"a", $"b", $"c") .localLimit(Add(10, 5)).offset(5).globalLimit(10) .analyze comparePlans(optimized4, expected4) val query5 = testRelation .localLimit(10) - .select(Symbol("a"), Symbol("b"), 'c') + .select($"a", $"b", $"c") .offset(5).limit(10).analyze val optimized5 = Optimize.execute(query5) val expected5 = testRelation - .select(Symbol("a"), Symbol("b"), 'c') + .select($"a", $"b", $"c") .localLimit(10).offset(5).globalLimit(10) .analyze comparePlans(optimized5, expected5) val query6 = testRelation .localLimit(20) - .select(Symbol("a"), Symbol("b"), 'c') + .select($"a", $"b", $"c") .offset(5).limit(10).analyze val optimized6 = Optimize.execute(query6) val expected6 = testRelation - .select(Symbol("a"), Symbol("b"), 'c') + .select($"a", $"b", $"c") .localLimit(15).offset(5).globalLimit(10) .analyze comparePlans(optimized6, expected6)