Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion core/src/main/scala/org/apache/spark/Dependency.scala
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](
val mapSideCombine: Boolean = false,
val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor,
val rowBasedChecksums: Array[RowBasedChecksum] = ShuffleDependency.EMPTY_ROW_BASED_CHECKSUMS,
val checksumMismatchFullRetryEnabled: Boolean = false)
private val _checksumMismatchFullRetryEnabled: Boolean = false)
extends Dependency[Product2[K, V]] with Logging {

def this(
Expand Down Expand Up @@ -144,6 +144,9 @@ class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](

def shuffleMergeAllowed : Boolean = _shuffleMergeAllowed

def checksumMismatchFullRetryEnabled: Boolean =
_checksumMismatchFullRetryEnabled && !canShuffleMergeBeEnabled()

/**
* Stores the location of the list of chosen external shuffle services for handling the
* shuffle merge requests from mappers in this shuffle map stage.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3488,15 +3488,15 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
val shuffleDep1 = new ShuffleDependency(
shuffleMapRdd1,
new HashPartitioner(2),
checksumMismatchFullRetryEnabled = true
_checksumMismatchFullRetryEnabled = true
)
val shuffleId1 = shuffleDep1.shuffleId
val shuffleMapRdd2 = new MyRDD(sc, 2, List(shuffleDep1), tracker = mapOutputTracker)

val shuffleDep2 = new ShuffleDependency(
shuffleMapRdd2,
new HashPartitioner(2),
checksumMismatchFullRetryEnabled = true
_checksumMismatchFullRetryEnabled = true
)
val shuffleId2 = shuffleDep2.shuffleId
val finalRdd = new MyRDD(sc, 2, List(shuffleDep2), tracker = mapOutputTracker)
Expand Down Expand Up @@ -3528,7 +3528,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
val shuffleDep = new ShuffleDependency(
mapRdd,
new HashPartitioner(2),
checksumMismatchFullRetryEnabled = true
_checksumMismatchFullRetryEnabled = true
)
val shuffleId = shuffleDep.shuffleId
val finalRdd = new MyRDD(sc, 2, List(shuffleDep), tracker = mapOutputTracker)
Expand Down Expand Up @@ -3627,7 +3627,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
val shuffleDep1 = new ShuffleDependency(
shuffleMapRdd1,
new HashPartitioner(2),
checksumMismatchFullRetryEnabled = true
_checksumMismatchFullRetryEnabled = true
)
val shuffleId1 = shuffleDep1.shuffleId

Expand All @@ -3636,7 +3636,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
val shuffleDep2 = new ShuffleDependency(
shuffleMapRdd2,
new HashPartitioner(2),
checksumMismatchFullRetryEnabled = true
_checksumMismatchFullRetryEnabled = true
)
val shuffleId2 = shuffleDep2.shuffleId

Expand All @@ -3645,7 +3645,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
val shuffleDep3 = new ShuffleDependency(
shuffleMapRdd3,
new HashPartitioner(2),
checksumMismatchFullRetryEnabled = true
_checksumMismatchFullRetryEnabled = true
)
val shuffleId3 = shuffleDep3.shuffleId
val finalRdd = new MyRDD(sc, 2, List(shuffleDep3), tracker = mapOutputTracker)
Expand Down Expand Up @@ -3859,21 +3859,21 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
val shuffleDep1 = new ShuffleDependency(
shuffleMapRdd1,
new HashPartitioner(2),
checksumMismatchFullRetryEnabled = true)
_checksumMismatchFullRetryEnabled = true)
val shuffleId1 = shuffleDep1.shuffleId

val shuffleMapRdd2 = new MyRDD(sc, 2, List(shuffleDep1), tracker = mapOutputTracker)
val shuffleDep2 = new ShuffleDependency(
shuffleMapRdd2,
new HashPartitioner(2),
checksumMismatchFullRetryEnabled = true)
_checksumMismatchFullRetryEnabled = true)
val shuffleId2 = shuffleDep2.shuffleId

val shuffleMapRdd3 = new MyRDD(sc, 2, List(shuffleDep2), tracker = mapOutputTracker)
val shuffleDep3 = new ShuffleDependency(
shuffleMapRdd3,
new HashPartitioner(2),
checksumMismatchFullRetryEnabled = true)
_checksumMismatchFullRetryEnabled = true)
val shuffleId3 = shuffleDep3.shuffleId

val finalRdd = new MyRDD(sc, 2, List(shuffleDep1, shuffleDep3), tracker = mapOutputTracker)
Expand Down Expand Up @@ -3923,7 +3923,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
val shuffleDep1 = new ShuffleDependency(
shuffleMapRdd1,
new HashPartitioner(2),
checksumMismatchFullRetryEnabled = true)
_checksumMismatchFullRetryEnabled = true)
val shuffleId1 = shuffleDep1.shuffleId

// Submit a job depending on shuffleDep1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ object ShuffleExchangeExec {
serializer,
shuffleWriterProcessor = createShuffleWriteProcessor(writeMetrics),
rowBasedChecksums = UnsafeRowChecksum.createUnsafeRowChecksums(checksumSize),
checksumMismatchFullRetryEnabled = SQLConf.get.shuffleChecksumMismatchFullRetryEnabled)
_checksumMismatchFullRetryEnabled = SQLConf.get.shuffleChecksumMismatchFullRetryEnabled)

dependency
}
Expand Down