Skip to content

Commit d02c244

Browse files
committed
fix(spark): Increment segmentIndex when skipping segment due to crc check failure
1 parent ae417e6 commit d02c244

2 files changed

Lines changed: 29 additions & 4 deletions

File tree

client/src/main/java/org/apache/uniffle/client/impl/ShuffleReadClientImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,11 @@ public ShuffleBlock readShuffleBlockData() {
297297
if (shuffleServerInfoList.size() > 1) {
298298
LOG.warn(errMsg);
299299
clientReadHandler.updateConsumedBlockInfo(bs, true);
300+
if (decompressionWorker != null) {
301+
decompressionWorker.get(batchIndex - 1, segmentIndex++);
302+
} else {
303+
segmentIndex += 1;
304+
}
300305
continue;
301306
} else {
302307
throw new RssFetchFailedException(errMsg);

client/src/test/java/org/apache/uniffle/client/impl/ShuffleReadClientImplTest.java

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.uniffle.client.impl;
1919

2020
import java.nio.ByteBuffer;
21+
import java.util.LinkedHashMap;
2122
import java.util.List;
2223
import java.util.Map;
2324
import java.util.Random;
@@ -59,6 +60,7 @@
5960
import static org.junit.jupiter.api.Assertions.assertTrue;
6061
import static org.junit.jupiter.api.Assertions.fail;
6162
import static org.mockito.ArgumentMatchers.any;
63+
import static org.mockito.ArgumentMatchers.anyInt;
6264

6365
public class ShuffleReadClientImplTest extends HadoopTestBase {
6466

@@ -371,11 +373,14 @@ public void readTest8(Supplier<ShuffleClientFactory.ReadClientBuilder> builderSu
371373
String basePath = uniq(HDFS_URI + "clientReadTest8");
372374
HadoopShuffleWriteHandler writeHandler =
373375
new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), conf);
376+
HadoopShuffleWriteHandler writeHandler2 =
377+
new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi2.getId(), conf);
374378

375-
Map<Long, byte[]> expectedData = Maps.newHashMap();
379+
LinkedHashMap<Long, byte[]> expectedData = Maps.newLinkedHashMap();
376380
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
377381
Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
378382
writeTestData(writeHandler, 2, 30, 0, 0, expectedData, blockIdBitmap);
383+
writeTestData(writeHandler2, 2, 30, 0, 0, expectedData, blockIdBitmap);
379384
ShuffleReadClientImpl readClient =
380385
builderSupplier
381386
.get()
@@ -396,8 +401,16 @@ public void readTest8(Supplier<ShuffleClientFactory.ReadClientBuilder> builderSu
396401
.shuffleServerInfoList(Lists.newArrayList(ssi1, ssi2))
397402
.build();
398403
// crc32 is incorrect
404+
AtomicInteger readCount = new AtomicInteger(0);
399405
try (MockedStatic<ChecksumUtils> checksumUtilsMock = Mockito.mockStatic(ChecksumUtils.class)) {
400-
checksumUtilsMock.when(() -> ChecksumUtils.getCrc32((ByteBuffer) any())).thenReturn(-1L);
406+
checksumUtilsMock.when(() -> ChecksumUtils.getCrc32(any(ByteBuffer.class), anyInt(), anyInt())).then(invocation -> {
407+
// crc check fails for readClient1 and frist block of readClient2
408+
if (readCount.getAndIncrement() < 2) {
409+
return -1;
410+
} else {
411+
return invocation.callRealMethod();
412+
}
413+
});
401414
try {
402415
ByteBuffer bb = readClient.readShuffleBlockData().getByteBuffer();
403416
while (bb != null) {
@@ -408,8 +421,15 @@ public void readTest8(Supplier<ShuffleClientFactory.ReadClientBuilder> builderSu
408421
assertTrue(e.getMessage().startsWith("Unexpected crc value"), e.getMessage());
409422
}
410423

411-
ShuffleBlock block = readClient2.readShuffleBlockData();
412-
assertNull(block);
424+
// the frist block has been skipped due to crc check failure
425+
Long firstKey = expectedData.keySet().iterator().next();
426+
expectedData.remove(firstKey);
427+
TestUtils.validateResult(readClient2, expectedData);
428+
try {
429+
readClient2.checkProcessedBlockIds();
430+
} catch (Exception e) {
431+
assertTrue(e.getMessage().contains("expected 4 blocks, actual 3 blocks"), e.getMessage());
432+
}
413433
}
414434
readClient.close();
415435
readClient2.close();

0 commit comments

Comments
 (0)