Skip to content

Commit

Permalink
chore: Add config for enabling SMJ with join condition (#937)
Browse files Browse the repository at this point in the history
* Add config for enabling SMJ with join condition

* Update common/src/main/scala/org/apache/comet/CometConf.scala

Co-authored-by: Oleks V <[email protected]>

* Update docs/source/user-guide/configs.md

Co-authored-by: Oleks V <[email protected]>

* enable config in stability suite

---------

Co-authored-by: Oleks V <[email protected]>
  • Loading branch information
andygrove and comphead authored Sep 16, 2024
1 parent e98ca3c commit 4ede214
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 0 deletions.
6 changes: 6 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ object CometConf extends ShimCometConf {
val COMET_EXEC_TAKE_ORDERED_AND_PROJECT_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig("takeOrderedAndProject", defaultValue = true)

val COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.exec.sortMergeJoinWithJoinFilter.enabled")
.doc("Experimental support for Sort Merge Join with filter")
.booleanConf
.createWithDefault(false)

val COMET_EXPR_STDDEV_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig(
"stddev",
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ Comet provides the following configuration settings.
| spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | true |
| spark.comet.exec.sort.enabled | Whether to enable sort by default. | true |
| spark.comet.exec.sortMergeJoin.enabled | Whether to enable sortMergeJoin by default. | true |
| spark.comet.exec.sortMergeJoinWithJoinFilter.enabled | Experimental support for Sort Merge Join with filter | false |
| spark.comet.exec.stddev.enabled | Whether to enable stddev by default. stddev is slower than Spark's implementation. | true |
| spark.comet.exec.takeOrderedAndProject.enabled | Whether to enable takeOrderedAndProject by default. | true |
| spark.comet.exec.union.enabled | Whether to enable union by default. | true |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2961,6 +2961,13 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
}
}

if (join.condition.isDefined &&
!CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED
.get(conf)) {
withInfo(join, join.condition.get)
return None
}

val condition = join.condition.map { cond =>
val condProto = exprToProto(cond, join.left.output ++ join.right.output)
if (condProto.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ class CometJoinSuite extends CometTestBase {

test("SortMergeJoin with join filter") {
withSQLConf(
CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key -> "true",
SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
withParquetTable((0 until 10).map(i => (i, i % 5)), "tbl_a") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ abstract class CometTestBase
conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true")
conf.set(CometConf.COMET_SPARK_TO_ARROW_ENABLED.key, "true")
conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "2g")
conf.set(CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key, "true")
conf
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ trait CometPlanStabilitySuite extends DisableAdaptiveExecutionSuite with TPCDSBa
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_DPP_FALLBACK_ENABLED.key -> "false",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key -> "true",
CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true", // needed for v1.4/q9, v1.4/q44, v2.7.0/q6, v2.7.0/q64
"spark.sql.readSideCharPadding" -> "false",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "10MB") {
Expand Down

0 comments on commit 4ede214

Please sign in to comment.