From ff125a1af8d2db03bf83749bb28356ae8ed3e730 Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Wed, 18 Sep 2024 12:47:55 +0200 Subject: [PATCH] Core: Add Data/Delete file comparators --- .../org/apache/iceberg/types/Comparators.java | 32 +++++++++ .../apache/iceberg/BaseOverwriteFiles.java | 3 +- .../org/apache/iceberg/BaseRewriteFiles.java | 3 +- .../java/org/apache/iceberg/FastAppend.java | 7 +- .../iceberg/MergingSnapshotProducer.java | 17 ++--- .../RewriteDataFilesCommitManager.java | 5 +- .../iceberg/actions/RewriteFileGroup.java | 9 ++- .../actions/RewritePositionDeletesGroup.java | 9 ++- .../iceberg/TestContentFileComparator.java | 67 +++++++++++++++++++ .../source/SparkPositionDeletesRewrite.java | 10 ++- .../iceberg/spark/source/SparkWrite.java | 9 ++- .../spark/TestFileRewriteCoordinator.java | 8 ++- .../iceberg/spark/data/TestHelpers.java | 3 +- 13 files changed, 153 insertions(+), 29 deletions(-) create mode 100644 core/src/test/java/org/apache/iceberg/TestContentFileComparator.java diff --git a/api/src/main/java/org/apache/iceberg/types/Comparators.java b/api/src/main/java/org/apache/iceberg/types/Comparators.java index bfbffc64b673..c487327d878d 100644 --- a/api/src/main/java/org/apache/iceberg/types/Comparators.java +++ b/api/src/main/java/org/apache/iceberg/types/Comparators.java @@ -22,6 +22,8 @@ import java.util.Comparator; import java.util.List; import java.util.function.IntFunction; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; import org.apache.iceberg.StructLike; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.util.UnicodeUtil; @@ -179,6 +181,14 @@ public static Comparator filePath() { return FilePathComparator.INSTANCE; } + public static Comparator dataFile() { + return DataFileComparator.INSTANCE; + } + + public static Comparator deleteFile() { + return DeleteFileComparator.INSTANCE; + } + private static class NullsFirst implements Comparator { private static final NullsFirst INSTANCE = new NullsFirst<>(); @@ -394,4 +404,26 @@ public int compare(CharSequence s1, CharSequence s2) { return 0; } } + + private static class DataFileComparator implements Comparator { + private static final DataFileComparator INSTANCE = new DataFileComparator(); + + private DataFileComparator() {} + + @Override + public int compare(DataFile s1, DataFile s2) { + return CharSeqComparator.INSTANCE.compare(s1.path(), s2.path()); + } + } + + private static class DeleteFileComparator implements Comparator { + private static final DeleteFileComparator INSTANCE = new DeleteFileComparator(); + + private DeleteFileComparator() {} + + @Override + public int compare(DeleteFile s1, DeleteFile s2) { + return CharSeqComparator.INSTANCE.compare(s1.path(), s2.path()); + } + } } diff --git a/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java b/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java index d929bc068ec2..427625051a9c 100644 --- a/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java +++ b/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java @@ -27,10 +27,11 @@ import org.apache.iceberg.expressions.StrictMetricsEvaluator; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Comparators; public class BaseOverwriteFiles extends MergingSnapshotProducer implements OverwriteFiles { - private final Set deletedDataFiles = Sets.newHashSet(); + private final Set deletedDataFiles = Sets.newTreeSet(Comparators.dataFile()); private boolean validateAddedFilesMatchOverwriteFilter = false; private Long startingSnapshotId = null; private Expression conflictDetectionFilter = null; diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java b/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java index d231536d0642..97981135916e 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java +++ b/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java @@ -22,9 +22,10 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Comparators; class BaseRewriteFiles extends MergingSnapshotProducer implements RewriteFiles { - private final Set replacedDataFiles = Sets.newHashSet(); + private final Set replacedDataFiles = Sets.newTreeSet(Comparators.dataFile()); private Long startingSnapshotId = null; BaseRewriteFiles(String tableName, TableOperations ops) { diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java b/core/src/main/java/org/apache/iceberg/FastAppend.java index 1bae2e2fc5a0..7e54b49087a1 100644 --- a/core/src/main/java/org/apache/iceberg/FastAppend.java +++ b/core/src/main/java/org/apache/iceberg/FastAppend.java @@ -30,7 +30,8 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.util.CharSequenceSet; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Comparators; /** * {@link AppendFiles Append} implementation that adds a new manifest file for the write. @@ -44,7 +45,7 @@ class FastAppend extends SnapshotProducer implements AppendFiles { private final PartitionSpec spec; private final SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder(); private final List newFiles = Lists.newArrayList(); - private final CharSequenceSet newFilePaths = CharSequenceSet.empty(); + private final Set newFileSet = Sets.newTreeSet(Comparators.dataFile()); private final List appendManifests = Lists.newArrayList(); private final List rewrittenAppendManifests = Lists.newArrayList(); private List newManifests = null; @@ -86,7 +87,7 @@ protected Map summary() { @Override public FastAppend appendFile(DataFile file) { Preconditions.checkNotNull(file, "Invalid data file: null"); - if (newFilePaths.add(file.path())) { + if (newFileSet.add(file)) { this.hasNewFiles = true; newFiles.add(file); summaryBuilder.addedFile(spec, file); diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 6a4da2abc9b6..cc0f9d024a72 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -49,6 +49,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Comparators; import org.apache.iceberg.util.CharSequenceSet; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.PartitionSet; @@ -81,8 +82,8 @@ abstract class MergingSnapshotProducer extends SnapshotProducer { // update data private final Map> newDataFilesBySpec = Maps.newHashMap(); - private final CharSequenceSet newDataFilePaths = CharSequenceSet.empty(); - private final CharSequenceSet newDeleteFilePaths = CharSequenceSet.empty(); + private final Set newDataFiles = Sets.newTreeSet(Comparators.dataFile()); + private final Set newDeleteFiles = Sets.newTreeSet(Comparators.deleteFile()); private Long newDataFilesDataSequenceNumber; private final Map> newDeleteFilesBySpec = Maps.newHashMap(); private final List appendManifests = Lists.newArrayList(); @@ -234,7 +235,7 @@ protected boolean addsDeleteFiles() { /** Add a data file to the new snapshot. */ protected void add(DataFile file) { Preconditions.checkNotNull(file, "Invalid data file: null"); - if (newDataFilePaths.add(file.path())) { + if (newDataFiles.add(file)) { PartitionSpec fileSpec = ops.current().spec(file.specId()); Preconditions.checkArgument( fileSpec != null, @@ -244,9 +245,9 @@ protected void add(DataFile file) { addedFilesSummary.addedFile(fileSpec, file); hasNewDataFiles = true; - List newDataFiles = + List dataFiles = newDataFilesBySpec.computeIfAbsent(fileSpec, ignored -> Lists.newArrayList()); - newDataFiles.add(file); + dataFiles.add(file); } } @@ -268,7 +269,7 @@ private void add(DeleteFileHolder fileHolder) { List deleteFiles = newDeleteFilesBySpec.computeIfAbsent(specId, s -> Lists.newArrayList()); - if (newDeleteFilePaths.add(fileHolder.deleteFile().path())) { + if (newDeleteFiles.add(fileHolder.deleteFile())) { deleteFiles.add(fileHolder); addedFilesSummary.addedFile(fileSpec, fileHolder.deleteFile()); hasNewDeleteFiles = true; @@ -970,9 +971,9 @@ private List newDataFilesAsManifests() { if (cachedNewDataManifests.isEmpty()) { newDataFilesBySpec.forEach( - (dataSpec, newDataFiles) -> { + (dataSpec, dataFiles) -> { List newDataManifests = - writeDataManifests(newDataFiles, newDataFilesDataSequenceNumber, dataSpec); + writeDataManifests(dataFiles, newDataFilesDataSequenceNumber, dataSpec); cachedNewDataManifests.addAll(newDataManifests); }); this.hasNewDataFiles = false; diff --git a/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java b/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java index 45b4bcf0a4d9..f882af0b618b 100644 --- a/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java +++ b/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java @@ -28,6 +28,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Comparators; import org.apache.iceberg.util.Tasks; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,8 +74,8 @@ public RewriteDataFilesCommitManager( * @param fileGroups fileSets to commit */ public void commitFileGroups(Set fileGroups) { - Set rewrittenDataFiles = Sets.newHashSet(); - Set addedDataFiles = Sets.newHashSet(); + Set rewrittenDataFiles = Sets.newTreeSet(Comparators.dataFile()); + Set addedDataFiles = Sets.newTreeSet(Comparators.dataFile()); for (RewriteFileGroup group : fileGroups) { rewrittenDataFiles.addAll(group.rewrittenFiles()); addedDataFiles.addAll(group.addedFiles()); diff --git a/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java b/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java index dd1358f2ed40..fe0d3df46c6d 100644 --- a/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java +++ b/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java @@ -18,7 +18,6 @@ */ package org.apache.iceberg.actions; -import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Set; @@ -29,6 +28,8 @@ import org.apache.iceberg.actions.RewriteDataFiles.FileGroupInfo; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Comparators; /** * Container class representing a set of files to be rewritten by a RewriteAction and the new files @@ -38,7 +39,7 @@ public class RewriteFileGroup { private final FileGroupInfo info; private final List fileScanTasks; - private Set addedFiles = Collections.emptySet(); + private Set addedFiles = Sets.newTreeSet(Comparators.dataFile()); public RewriteFileGroup(FileGroupInfo info, List fileScanTasks) { this.info = info; @@ -58,7 +59,9 @@ public void setOutputFiles(Set files) { } public Set rewrittenFiles() { - return fileScans().stream().map(FileScanTask::file).collect(Collectors.toSet()); + return fileScans().stream() + .map(FileScanTask::file) + .collect(Collectors.toCollection(() -> Sets.newTreeSet(Comparators.dataFile()))); } public Set addedFiles() { diff --git a/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesGroup.java b/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesGroup.java index 2be7145bcd34..cb65340a7311 100644 --- a/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesGroup.java +++ b/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesGroup.java @@ -18,7 +18,6 @@ */ package org.apache.iceberg.actions; -import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Set; @@ -30,6 +29,8 @@ import org.apache.iceberg.actions.RewritePositionDeleteFiles.FileGroupRewriteResult; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Comparators; /** * Container class representing a set of position delete files to be rewritten by a {@link @@ -40,7 +41,7 @@ public class RewritePositionDeletesGroup { private final List tasks; private final long maxRewrittenDataSequenceNumber; - private Set addedDeleteFiles = Collections.emptySet(); + private Set addedDeleteFiles = Sets.newTreeSet(Comparators.deleteFile()); public RewritePositionDeletesGroup(FileGroupInfo info, List tasks) { Preconditions.checkArgument(!tasks.isEmpty(), "Tasks must not be empty"); @@ -67,7 +68,9 @@ public long maxRewrittenDataSequenceNumber() { } public Set rewrittenDeleteFiles() { - return tasks().stream().map(PositionDeletesScanTask::file).collect(Collectors.toSet()); + return tasks().stream() + .map(PositionDeletesScanTask::file) + .collect(Collectors.toCollection(() -> Sets.newTreeSet(Comparators.deleteFile()))); } public Set addedDeleteFiles() { diff --git a/core/src/test/java/org/apache/iceberg/TestContentFileComparator.java b/core/src/test/java/org/apache/iceberg/TestContentFileComparator.java new file mode 100644 index 000000000000..9d6ee651c797 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestContentFileComparator.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.apache.iceberg.types.Comparators; +import org.junit.jupiter.api.Test; + +/** + * Testing {@link Comparators#dataFile()} / {@link Comparators#deleteFile()} is easier in + * iceberg-core since the data/delete file builders are located here + */ +public class TestContentFileComparator extends TestBase { + + @Test + public void compareDataFiles() { + // path is same as FILE_A + DataFile dataFile = + DataFiles.builder(SPEC) + .withPath(FILE_A.path().toString()) + .withFileSizeInBytes(100) + .withRecordCount(100) + .build(); + assertThat(Comparators.dataFile().compare(FILE_A, dataFile)).isEqualTo(0); + assertThat(Comparators.dataFile().compare(FILE_A, FILE_A)).isEqualTo(0); + + assertThat(Comparators.dataFile().compare(FILE_A, FILE_B)).isLessThan(0); + assertThat(Comparators.dataFile().compare(FILE_B, FILE_A)).isGreaterThan(0); + } + + @Test + public void compareDeleteFiles() { + // same path as FILE_A_DELETES + DeleteFile deleteFile = + FileMetadata.deleteFileBuilder(SPEC) + .ofPositionDeletes() + .withPath(FILE_A_DELETES.path().toString()) + .withFileSizeInBytes(100) + .withRecordCount(100) + .build(); + assertThat(Comparators.deleteFile().compare(FILE_A_DELETES, deleteFile)).isEqualTo(0); + assertThat(Comparators.deleteFile().compare(FILE_A_DELETES, FILE_A_DELETES)).isEqualTo(0); + + assertThat(Comparators.deleteFile().compare(FILE_A_DELETES, FILE_B_DELETES)).isLessThan(0); + assertThat(Comparators.deleteFile().compare(FILE_B_DELETES, FILE_A_DELETES)).isGreaterThan(0); + + assertThat(Comparators.deleteFile().compare(FILE_A_DELETES, FILE_A2_DELETES)).isLessThan(0); + assertThat(Comparators.deleteFile().compare(FILE_A2_DELETES, FILE_A_DELETES)).isGreaterThan(0); + } +} diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java index d91779475845..18abda62f689 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java @@ -22,6 +22,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.MetadataColumns; @@ -36,11 +37,12 @@ import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.PositionDeletesRewriteCoordinator; import org.apache.iceberg.spark.ScanTaskSetManager; import org.apache.iceberg.spark.SparkWriteConf; +import org.apache.iceberg.types.Comparators; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.sql.SparkSession; @@ -148,7 +150,11 @@ public boolean useCommitCoordinator() { @Override public void commit(WriterCommitMessage[] messages) { PositionDeletesRewriteCoordinator coordinator = PositionDeletesRewriteCoordinator.get(); - coordinator.stageRewrite(table, fileSetId, ImmutableSet.copyOf(files(messages))); + coordinator.stageRewrite( + table, + fileSetId, + files(messages).stream() + .collect(Collectors.toCollection(() -> Sets.newTreeSet(Comparators.deleteFile())))); } @Override diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index e4a0eb700be6..cf4397ecc167 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -52,12 +52,13 @@ import org.apache.iceberg.io.PartitioningWriter; import org.apache.iceberg.io.RollingDataWriter; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.FileRewriteCoordinator; import org.apache.iceberg.spark.SparkWriteConf; import org.apache.iceberg.spark.SparkWriteRequirements; +import org.apache.iceberg.types.Comparators; import org.apache.spark.TaskContext; import org.apache.spark.TaskContext$; import org.apache.spark.api.java.JavaSparkContext; @@ -491,7 +492,11 @@ private RewriteFiles(String fileSetID) { @Override public void commit(WriterCommitMessage[] messages) { FileRewriteCoordinator coordinator = FileRewriteCoordinator.get(); - coordinator.stageRewrite(table, fileSetID, ImmutableSet.copyOf(files(messages))); + coordinator.stageRewrite( + table, + fileSetID, + files(messages).stream() + .collect(Collectors.toCollection(() -> Sets.newTreeSet(Comparators.dataFile())))); } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestFileRewriteCoordinator.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestFileRewriteCoordinator.java index 3955d0395474..cc7eab7c6047 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestFileRewriteCoordinator.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestFileRewriteCoordinator.java @@ -33,7 +33,9 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.source.SimpleRecord; +import org.apache.iceberg.types.Comparators; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; @@ -93,7 +95,7 @@ public void testBinPackRewrite() throws NoSuchTableException, IOException { Set rewrittenFiles = taskSetManager.fetchTasks(table, fileSetID).stream() .map(t -> t.asFileScanTask().file()) - .collect(Collectors.toSet()); + .collect(Collectors.toCollection(() -> Sets.newTreeSet(Comparators.dataFile()))); Set addedFiles = rewriteCoordinator.fetchNewFiles(table, fileSetID); table.newRewrite().rewriteFiles(rewrittenFiles, addedFiles).commit(); } @@ -165,7 +167,7 @@ public void testSortRewrite() throws NoSuchTableException, IOException { Set rewrittenFiles = taskSetManager.fetchTasks(table, fileSetID).stream() .map(t -> t.asFileScanTask().file()) - .collect(Collectors.toSet()); + .collect(Collectors.toCollection(() -> Sets.newTreeSet(Comparators.dataFile()))); Set addedFiles = rewriteCoordinator.fetchNewFiles(table, fileSetID); table.newRewrite().rewriteFiles(rewrittenFiles, addedFiles).commit(); } @@ -247,7 +249,7 @@ public void testCommitMultipleRewrites() throws NoSuchTableException, IOExceptio Set addedFiles = fileSetIDs.stream() .flatMap(fileSetID -> rewriteCoordinator.fetchNewFiles(table, fileSetID).stream()) - .collect(Collectors.toSet()); + .collect(Collectors.toCollection(() -> Sets.newTreeSet(Comparators.dataFile()))); table.newRewrite().rewriteFiles(rewrittenFiles, addedFiles).commit(); table.refresh(); diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java index c73ef630ac48..7be9ee2f7a05 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java @@ -57,6 +57,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.collect.Streams; import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.types.Comparators; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.orc.storage.serde2.io.DateWritable; @@ -787,7 +788,7 @@ public static List dataFiles(Table table, String branch) { } public static Set deleteFiles(Table table) { - Set deleteFiles = Sets.newHashSet(); + Set deleteFiles = Sets.newTreeSet(Comparators.deleteFile()); for (FileScanTask task : table.newScan().planFiles()) { deleteFiles.addAll(task.deletes());