Skip to content

Commit 75aed56

Browse files
sunchaodongjoon-hyun
authored andcommitted
[SPARK-45652][SQL][3.4] SPJ: Handle empty input partitions after dynamic filtering
This is a cherry-pick of apache#43531 to branch-3.4, with a few modifications. ### What changes were proposed in this pull request? Handle the case when input partitions become empty after V2 dynamic filtering, when SPJ is enabled. ### Why are the changes needed? Current in the situation when all input partitions are filtered out via dynamic filtering, SPJ doesn't work but instead will panic: ``` java.util.NoSuchElementException: None.get at scala.None$.get(Option.scala:529) at scala.None$.get(Option.scala:527) at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.filteredPartitions$lzycompute(BatchScanExec.scala:108) at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.filteredPartitions(BatchScanExec.scala:65) at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.inputRDD$lzycompute(BatchScanExec.scala:136) at org.apache.spark.sql.execution.datasources.v2.BatchScanExec.inputRDD(BatchScanExec.scala:135) at org.apache.spark.sql.boson.BosonBatchScanExec.inputRDD$lzycompute(BosonBatchScanExec.scala:28) ``` This is because the `groupPartitions` method will return `None` in this scenario. We should handle the case. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added a test case for this. ### Was this patch authored or co-authored using generative AI tooling? No Closes apache#43539 from sunchao/SPARK-45652-branch-3.4. Authored-by: Chao Sun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent ecdb69f commit 75aed56

File tree

2 files changed

+45
-2
lines changed

2 files changed

+45
-2
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/BatchScanExec.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ case class BatchScanExec(
105105
"partition values that are not present in the original partitioning.")
106106
}
107107

108-
groupPartitions(newPartitions).get.map(_._2)
108+
groupPartitions(newPartitions).getOrElse(Seq.empty).map(_._2)
109109

110110
case _ =>
111111
// no validation is needed as the data source did not report any specific partitioning
@@ -148,7 +148,8 @@ case class BatchScanExec(
148148
s"${SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key} " +
149149
"is enabled")
150150

151-
val groupedPartitions = groupPartitions(finalPartitions.map(_.head), true).get
151+
val groupedPartitions = groupPartitions(finalPartitions.map(_.head), true)
152+
.getOrElse(Seq.empty)
152153

153154
// This means the input partitions are not grouped by partition values. We'll need to
154155
// check `groupByPartitionValues` and decide whether to group and replicate splits

sql/core/src/test/scala/org/apache/spark/sql/connector/KeyGroupedPartitioningSuite.scala

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1093,4 +1093,46 @@ class KeyGroupedPartitioningSuite extends DistributionAndOrderingSuiteBase {
10931093
}
10941094
}
10951095
}
1096+
1097+
test("SPARK-45652: SPJ should handle empty partition after dynamic filtering") {
1098+
withSQLConf(
1099+
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1",
1100+
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false",
1101+
SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
1102+
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
1103+
SQLConf.DYNAMIC_PARTITION_PRUNING_FALLBACK_FILTER_RATIO.key -> "10") {
1104+
val items_partitions = Array(identity("id"))
1105+
createTable(items, items_schema, items_partitions)
1106+
sql(s"INSERT INTO testcat.ns.$items VALUES " +
1107+
s"(1, 'aa', 40.0, cast('2020-01-01' as timestamp)), " +
1108+
s"(1, 'aa', 41.0, cast('2020-01-15' as timestamp)), " +
1109+
s"(2, 'bb', 10.0, cast('2020-01-01' as timestamp)), " +
1110+
s"(2, 'bb', 10.5, cast('2020-01-01' as timestamp)), " +
1111+
s"(3, 'cc', 15.5, cast('2020-02-01' as timestamp))")
1112+
1113+
val purchases_partitions = Array(identity("item_id"))
1114+
createTable(purchases, purchases_schema, purchases_partitions)
1115+
sql(s"INSERT INTO testcat.ns.$purchases VALUES " +
1116+
s"(1, 42.0, cast('2020-01-01' as timestamp)), " +
1117+
s"(1, 44.0, cast('2020-01-15' as timestamp)), " +
1118+
s"(1, 45.0, cast('2020-01-15' as timestamp)), " +
1119+
s"(2, 11.0, cast('2020-01-01' as timestamp)), " +
1120+
s"(3, 19.5, cast('2020-02-01' as timestamp))")
1121+
1122+
Seq(true, false).foreach { pushDownValues =>
1123+
Seq(true, false).foreach { partiallyClustered => {
1124+
withSQLConf(
1125+
SQLConf.V2_BUCKETING_PARTIALLY_CLUSTERED_DISTRIBUTION_ENABLED.key ->
1126+
partiallyClustered.toString,
1127+
SQLConf.V2_BUCKETING_PUSH_PART_VALUES_ENABLED.key -> pushDownValues.toString) {
1128+
// The dynamic filtering effectively filtered out all the partitions
1129+
val df = sql(s"SELECT p.price from testcat.ns.$items i, testcat.ns.$purchases p " +
1130+
"WHERE i.id = p.item_id AND i.price > 50.0")
1131+
checkAnswer(df, Seq.empty)
1132+
}
1133+
}
1134+
}
1135+
}
1136+
}
1137+
}
10961138
}

0 commit comments

Comments
 (0)