From 722a251df7147f39500b691edde61c21684703ba Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 11 Sep 2024 06:58:44 -0600 Subject: [PATCH 1/4] Add config for enabling SMJ with join condition --- common/src/main/scala/org/apache/comet/CometConf.scala | 6 ++++++ docs/source/user-guide/configs.md | 1 + .../main/scala/org/apache/comet/serde/QueryPlanSerde.scala | 7 +++++++ .../test/scala/org/apache/comet/exec/CometJoinSuite.scala | 1 + .../test/scala/org/apache/spark/sql/CometTestBase.scala | 1 + 5 files changed, 16 insertions(+) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 8828b70f8..1f2297ca5 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -145,6 +145,12 @@ object CometConf extends ShimCometConf { val COMET_EXEC_TAKE_ORDERED_AND_PROJECT_ENABLED: ConfigEntry[Boolean] = createExecEnabledConfig("takeOrderedAndProject", defaultValue = true) + val COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED: ConfigEntry[Boolean] = + conf("spark.comet.exec.sortMergeJoinWithJoinFilter.enabled") + .doc("Experimental support for SMJ with join condition") + .booleanConf + .createWithDefault(false) + val COMET_EXPR_STDDEV_ENABLED: ConfigEntry[Boolean] = createExecEnabledConfig( "stddev", diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 1b5fe7368..6c4852c02 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -54,6 +54,7 @@ Comet provides the following configuration settings. | spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | true | | spark.comet.exec.sort.enabled | Whether to enable sort by default. | true | | spark.comet.exec.sortMergeJoin.enabled | Whether to enable sortMergeJoin by default. | true | +| spark.comet.exec.sortMergeJoinWithJoinFilter.enabled | Experimental support for SMJ with join condition | false | | spark.comet.exec.stddev.enabled | Whether to enable stddev by default. stddev is slower than Spark's implementation. | true | | spark.comet.exec.takeOrderedAndProject.enabled | Whether to enable takeOrderedAndProject by default. | true | | spark.comet.exec.union.enabled | Whether to enable union by default. | true | diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala index dbc3a1d80..50d921650 100644 --- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala +++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala @@ -2961,6 +2961,13 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim } } + if (join.condition.isDefined && + !CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED + .get(conf)) { + withInfo(join, join.condition.get) + return None + } + val condition = join.condition.map { cond => val condProto = exprToProto(cond, join.left.output ++ join.right.output) if (condProto.isEmpty) { diff --git a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala index 1bd0e1b7a..d787a9b1e 100644 --- a/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala +++ b/spark/src/test/scala/org/apache/comet/exec/CometJoinSuite.scala @@ -340,6 +340,7 @@ class CometJoinSuite extends CometTestBase { test("SortMergeJoin with join filter") { withSQLConf( + CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key -> "true", SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { withParquetTable((0 until 10).map(i => (i, i % 5)), "tbl_a") { diff --git a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala index d49095e25..1709cce61 100644 --- a/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala +++ b/spark/src/test/scala/org/apache/spark/sql/CometTestBase.scala @@ -80,6 +80,7 @@ abstract class CometTestBase conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true") conf.set(CometConf.COMET_SPARK_TO_ARROW_ENABLED.key, "true") conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "2g") + conf.set(CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key, "true") conf } From cf597223dfdaa37979db9808a4cd737bbc1b47bd Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 11 Sep 2024 09:45:33 -0600 Subject: [PATCH 2/4] Update common/src/main/scala/org/apache/comet/CometConf.scala Co-authored-by: Oleks V --- common/src/main/scala/org/apache/comet/CometConf.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index 1f2297ca5..03b7a2a41 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -147,7 +147,7 @@ object CometConf extends ShimCometConf { val COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED: ConfigEntry[Boolean] = conf("spark.comet.exec.sortMergeJoinWithJoinFilter.enabled") - .doc("Experimental support for SMJ with join condition") + .doc("Experimental support for Sort Merge Join with filter") .booleanConf .createWithDefault(false) From b7c5c42d1a7fbc8a158f34ffb82ad3b91b7f5330 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Wed, 11 Sep 2024 09:45:40 -0600 Subject: [PATCH 3/4] Update docs/source/user-guide/configs.md Co-authored-by: Oleks V --- docs/source/user-guide/configs.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 6c4852c02..ff2db342a 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -54,7 +54,7 @@ Comet provides the following configuration settings. | spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | true | | spark.comet.exec.sort.enabled | Whether to enable sort by default. | true | | spark.comet.exec.sortMergeJoin.enabled | Whether to enable sortMergeJoin by default. | true | -| spark.comet.exec.sortMergeJoinWithJoinFilter.enabled | Experimental support for SMJ with join condition | false | +| spark.comet.exec.sortMergeJoinWithJoinFilter.enabled | Experimental support for Sort Merge Join with filter | false | | spark.comet.exec.stddev.enabled | Whether to enable stddev by default. stddev is slower than Spark's implementation. | true | | spark.comet.exec.takeOrderedAndProject.enabled | Whether to enable takeOrderedAndProject by default. | true | | spark.comet.exec.union.enabled | Whether to enable union by default. | true | From d6f5b904c2e66d44917090ec3246fbc561239652 Mon Sep 17 00:00:00 2001 From: Andy Grove Date: Mon, 16 Sep 2024 07:29:56 -0600 Subject: [PATCH 4/4] enable config in stability suite --- .../org/apache/spark/sql/comet/CometPlanStabilitySuite.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala index 83cc89827..a553e61c7 100644 --- a/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/comet/CometPlanStabilitySuite.scala @@ -262,6 +262,7 @@ trait CometPlanStabilitySuite extends DisableAdaptiveExecutionSuite with TPCDSBa CometConf.COMET_EXEC_ENABLED.key -> "true", CometConf.COMET_DPP_FALLBACK_ENABLED.key -> "false", CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true", + CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key -> "true", CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true", // needed for v1.4/q9, v1.4/q44, v2.7.0/q6, v2.7.0/q64 "spark.sql.readSideCharPadding" -> "false", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "10MB") {