Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Add config for enabling SMJ with join condition #937

Merged
merged 4 commits into from
Sep 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,12 @@ object CometConf extends ShimCometConf {
val COMET_EXEC_TAKE_ORDERED_AND_PROJECT_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig("takeOrderedAndProject", defaultValue = true)

val COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.exec.sortMergeJoinWithJoinFilter.enabled")
.doc("Experimental support for Sort Merge Join with filter")
.booleanConf
.createWithDefault(false)

val COMET_EXPR_STDDEV_ENABLED: ConfigEntry[Boolean] =
createExecEnabledConfig(
"stddev",
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ Comet provides the following configuration settings.
| spark.comet.exec.shuffle.enabled | Whether to enable Comet native shuffle. Note that this requires setting 'spark.shuffle.manager' to 'org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager'. 'spark.shuffle.manager' must be set before starting the Spark application and cannot be changed during the application. | true |
| spark.comet.exec.sort.enabled | Whether to enable sort by default. | true |
| spark.comet.exec.sortMergeJoin.enabled | Whether to enable sortMergeJoin by default. | true |
| spark.comet.exec.sortMergeJoinWithJoinFilter.enabled | Experimental support for Sort Merge Join with filter | false |
| spark.comet.exec.stddev.enabled | Whether to enable stddev by default. stddev is slower than Spark's implementation. | true |
| spark.comet.exec.takeOrderedAndProject.enabled | Whether to enable takeOrderedAndProject by default. | true |
| spark.comet.exec.union.enabled | Whether to enable union by default. | true |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2961,6 +2961,13 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
}
}

if (join.condition.isDefined &&
!CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED
.get(conf)) {
withInfo(join, join.condition.get)
return None
}

val condition = join.condition.map { cond =>
val condProto = exprToProto(cond, join.left.output ++ join.right.output)
if (condProto.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ class CometJoinSuite extends CometTestBase {

test("SortMergeJoin with join filter") {
withSQLConf(
CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key -> "true",
SQLConf.ADAPTIVE_AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
withParquetTable((0 until 10).map(i => (i, i % 5)), "tbl_a") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ abstract class CometTestBase
conf.set(CometConf.COMET_EXEC_SHUFFLE_ENABLED.key, "true")
conf.set(CometConf.COMET_SPARK_TO_ARROW_ENABLED.key, "true")
conf.set(CometConf.COMET_MEMORY_OVERHEAD.key, "2g")
conf.set(CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key, "true")
conf
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ trait CometPlanStabilitySuite extends DisableAdaptiveExecutionSuite with TPCDSBa
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_DPP_FALLBACK_ENABLED.key -> "false",
CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "true",
CometConf.COMET_EXEC_SORT_MERGE_JOIN_WITH_JOIN_FILTER_ENABLED.key -> "true",
CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true", // needed for v1.4/q9, v1.4/q44, v2.7.0/q6, v2.7.0/q64
"spark.sql.readSideCharPadding" -> "false",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "10MB") {
Expand Down
Loading