From 64c9c376e6af8bca3b2324d84f07ad2d43f71379 Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Wed, 18 Sep 2024 12:47:55 +0200 Subject: [PATCH 1/2] Core: Add Data/Delete file comparators --- .../org/apache/iceberg/types/Comparators.java | 32 +++++++++ .../apache/iceberg/BaseOverwriteFiles.java | 3 +- .../org/apache/iceberg/BaseRewriteFiles.java | 3 +- .../java/org/apache/iceberg/FastAppend.java | 7 +- .../iceberg/MergingSnapshotProducer.java | 17 ++--- .../RewriteDataFilesCommitManager.java | 5 +- .../iceberg/actions/RewriteFileGroup.java | 9 ++- .../actions/RewritePositionDeletesGroup.java | 9 ++- .../iceberg/TestContentFileComparator.java | 67 +++++++++++++++++++ .../apache/iceberg/hive/HiveTableTest.java | 2 +- .../iceberg/nessie/TestNessieTable.java | 2 +- .../source/SparkPositionDeletesRewrite.java | 8 ++- .../iceberg/spark/source/SparkWrite.java | 8 ++- .../spark/TestFileRewriteCoordinator.java | 8 ++- .../iceberg/spark/data/TestHelpers.java | 3 +- 15 files changed, 152 insertions(+), 31 deletions(-) create mode 100644 core/src/test/java/org/apache/iceberg/TestContentFileComparator.java diff --git a/api/src/main/java/org/apache/iceberg/types/Comparators.java b/api/src/main/java/org/apache/iceberg/types/Comparators.java index bfbffc64b673..c487327d878d 100644 --- a/api/src/main/java/org/apache/iceberg/types/Comparators.java +++ b/api/src/main/java/org/apache/iceberg/types/Comparators.java @@ -22,6 +22,8 @@ import java.util.Comparator; import java.util.List; import java.util.function.IntFunction; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; import org.apache.iceberg.StructLike; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.util.UnicodeUtil; @@ -179,6 +181,14 @@ public static Comparator filePath() { return FilePathComparator.INSTANCE; } + public static Comparator dataFile() { + return DataFileComparator.INSTANCE; + } + + public static Comparator deleteFile() { + return DeleteFileComparator.INSTANCE; + } + private static class NullsFirst implements Comparator { private static final NullsFirst INSTANCE = new NullsFirst<>(); @@ -394,4 +404,26 @@ public int compare(CharSequence s1, CharSequence s2) { return 0; } } + + private static class DataFileComparator implements Comparator { + private static final DataFileComparator INSTANCE = new DataFileComparator(); + + private DataFileComparator() {} + + @Override + public int compare(DataFile s1, DataFile s2) { + return CharSeqComparator.INSTANCE.compare(s1.path(), s2.path()); + } + } + + private static class DeleteFileComparator implements Comparator { + private static final DeleteFileComparator INSTANCE = new DeleteFileComparator(); + + private DeleteFileComparator() {} + + @Override + public int compare(DeleteFile s1, DeleteFile s2) { + return CharSeqComparator.INSTANCE.compare(s1.path(), s2.path()); + } + } } diff --git a/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java b/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java index d929bc068ec2..427625051a9c 100644 --- a/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java +++ b/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java @@ -27,10 +27,11 @@ import org.apache.iceberg.expressions.StrictMetricsEvaluator; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Comparators; public class BaseOverwriteFiles extends MergingSnapshotProducer implements OverwriteFiles { - private final Set deletedDataFiles = Sets.newHashSet(); + private final Set deletedDataFiles = Sets.newTreeSet(Comparators.dataFile()); private boolean validateAddedFilesMatchOverwriteFilter = false; private Long startingSnapshotId = null; private Expression conflictDetectionFilter = null; diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java b/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java index d231536d0642..97981135916e 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java +++ b/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java @@ -22,9 +22,10 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Comparators; class BaseRewriteFiles extends MergingSnapshotProducer implements RewriteFiles { - private final Set replacedDataFiles = Sets.newHashSet(); + private final Set replacedDataFiles = Sets.newTreeSet(Comparators.dataFile()); private Long startingSnapshotId = null; BaseRewriteFiles(String tableName, TableOperations ops) { diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java b/core/src/main/java/org/apache/iceberg/FastAppend.java index 1bae2e2fc5a0..7e54b49087a1 100644 --- a/core/src/main/java/org/apache/iceberg/FastAppend.java +++ b/core/src/main/java/org/apache/iceberg/FastAppend.java @@ -30,7 +30,8 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; -import org.apache.iceberg.util.CharSequenceSet; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Comparators; /** * {@link AppendFiles Append} implementation that adds a new manifest file for the write. @@ -44,7 +45,7 @@ class FastAppend extends SnapshotProducer implements AppendFiles { private final PartitionSpec spec; private final SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder(); private final List newFiles = Lists.newArrayList(); - private final CharSequenceSet newFilePaths = CharSequenceSet.empty(); + private final Set newFileSet = Sets.newTreeSet(Comparators.dataFile()); private final List appendManifests = Lists.newArrayList(); private final List rewrittenAppendManifests = Lists.newArrayList(); private List newManifests = null; @@ -86,7 +87,7 @@ protected Map summary() { @Override public FastAppend appendFile(DataFile file) { Preconditions.checkNotNull(file, "Invalid data file: null"); - if (newFilePaths.add(file.path())) { + if (newFileSet.add(file)) { this.hasNewFiles = true; newFiles.add(file); summaryBuilder.addedFile(spec, file); diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index 6a4da2abc9b6..cc0f9d024a72 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -49,6 +49,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Comparators; import org.apache.iceberg.util.CharSequenceSet; import org.apache.iceberg.util.Pair; import org.apache.iceberg.util.PartitionSet; @@ -81,8 +82,8 @@ abstract class MergingSnapshotProducer extends SnapshotProducer { // update data private final Map> newDataFilesBySpec = Maps.newHashMap(); - private final CharSequenceSet newDataFilePaths = CharSequenceSet.empty(); - private final CharSequenceSet newDeleteFilePaths = CharSequenceSet.empty(); + private final Set newDataFiles = Sets.newTreeSet(Comparators.dataFile()); + private final Set newDeleteFiles = Sets.newTreeSet(Comparators.deleteFile()); private Long newDataFilesDataSequenceNumber; private final Map> newDeleteFilesBySpec = Maps.newHashMap(); private final List appendManifests = Lists.newArrayList(); @@ -234,7 +235,7 @@ protected boolean addsDeleteFiles() { /** Add a data file to the new snapshot. */ protected void add(DataFile file) { Preconditions.checkNotNull(file, "Invalid data file: null"); - if (newDataFilePaths.add(file.path())) { + if (newDataFiles.add(file)) { PartitionSpec fileSpec = ops.current().spec(file.specId()); Preconditions.checkArgument( fileSpec != null, @@ -244,9 +245,9 @@ protected void add(DataFile file) { addedFilesSummary.addedFile(fileSpec, file); hasNewDataFiles = true; - List newDataFiles = + List dataFiles = newDataFilesBySpec.computeIfAbsent(fileSpec, ignored -> Lists.newArrayList()); - newDataFiles.add(file); + dataFiles.add(file); } } @@ -268,7 +269,7 @@ private void add(DeleteFileHolder fileHolder) { List deleteFiles = newDeleteFilesBySpec.computeIfAbsent(specId, s -> Lists.newArrayList()); - if (newDeleteFilePaths.add(fileHolder.deleteFile().path())) { + if (newDeleteFiles.add(fileHolder.deleteFile())) { deleteFiles.add(fileHolder); addedFilesSummary.addedFile(fileSpec, fileHolder.deleteFile()); hasNewDeleteFiles = true; @@ -970,9 +971,9 @@ private List newDataFilesAsManifests() { if (cachedNewDataManifests.isEmpty()) { newDataFilesBySpec.forEach( - (dataSpec, newDataFiles) -> { + (dataSpec, dataFiles) -> { List newDataManifests = - writeDataManifests(newDataFiles, newDataFilesDataSequenceNumber, dataSpec); + writeDataManifests(dataFiles, newDataFilesDataSequenceNumber, dataSpec); cachedNewDataManifests.addAll(newDataManifests); }); this.hasNewDataFiles = false; diff --git a/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java b/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java index 45b4bcf0a4d9..f882af0b618b 100644 --- a/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java +++ b/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java @@ -28,6 +28,7 @@ import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Comparators; import org.apache.iceberg.util.Tasks; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,8 +74,8 @@ public RewriteDataFilesCommitManager( * @param fileGroups fileSets to commit */ public void commitFileGroups(Set fileGroups) { - Set rewrittenDataFiles = Sets.newHashSet(); - Set addedDataFiles = Sets.newHashSet(); + Set rewrittenDataFiles = Sets.newTreeSet(Comparators.dataFile()); + Set addedDataFiles = Sets.newTreeSet(Comparators.dataFile()); for (RewriteFileGroup group : fileGroups) { rewrittenDataFiles.addAll(group.rewrittenFiles()); addedDataFiles.addAll(group.addedFiles()); diff --git a/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java b/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java index dd1358f2ed40..fe0d3df46c6d 100644 --- a/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java +++ b/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java @@ -18,7 +18,6 @@ */ package org.apache.iceberg.actions; -import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Set; @@ -29,6 +28,8 @@ import org.apache.iceberg.actions.RewriteDataFiles.FileGroupInfo; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Comparators; /** * Container class representing a set of files to be rewritten by a RewriteAction and the new files @@ -38,7 +39,7 @@ public class RewriteFileGroup { private final FileGroupInfo info; private final List fileScanTasks; - private Set addedFiles = Collections.emptySet(); + private Set addedFiles = Sets.newTreeSet(Comparators.dataFile()); public RewriteFileGroup(FileGroupInfo info, List fileScanTasks) { this.info = info; @@ -58,7 +59,9 @@ public void setOutputFiles(Set files) { } public Set rewrittenFiles() { - return fileScans().stream().map(FileScanTask::file).collect(Collectors.toSet()); + return fileScans().stream() + .map(FileScanTask::file) + .collect(Collectors.toCollection(() -> Sets.newTreeSet(Comparators.dataFile()))); } public Set addedFiles() { diff --git a/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesGroup.java b/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesGroup.java index 2be7145bcd34..cb65340a7311 100644 --- a/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesGroup.java +++ b/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesGroup.java @@ -18,7 +18,6 @@ */ package org.apache.iceberg.actions; -import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Set; @@ -30,6 +29,8 @@ import org.apache.iceberg.actions.RewritePositionDeleteFiles.FileGroupRewriteResult; import org.apache.iceberg.relocated.com.google.common.base.MoreObjects; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Comparators; /** * Container class representing a set of position delete files to be rewritten by a {@link @@ -40,7 +41,7 @@ public class RewritePositionDeletesGroup { private final List tasks; private final long maxRewrittenDataSequenceNumber; - private Set addedDeleteFiles = Collections.emptySet(); + private Set addedDeleteFiles = Sets.newTreeSet(Comparators.deleteFile()); public RewritePositionDeletesGroup(FileGroupInfo info, List tasks) { Preconditions.checkArgument(!tasks.isEmpty(), "Tasks must not be empty"); @@ -67,7 +68,9 @@ public long maxRewrittenDataSequenceNumber() { } public Set rewrittenDeleteFiles() { - return tasks().stream().map(PositionDeletesScanTask::file).collect(Collectors.toSet()); + return tasks().stream() + .map(PositionDeletesScanTask::file) + .collect(Collectors.toCollection(() -> Sets.newTreeSet(Comparators.deleteFile()))); } public Set addedDeleteFiles() { diff --git a/core/src/test/java/org/apache/iceberg/TestContentFileComparator.java b/core/src/test/java/org/apache/iceberg/TestContentFileComparator.java new file mode 100644 index 000000000000..9d6ee651c797 --- /dev/null +++ b/core/src/test/java/org/apache/iceberg/TestContentFileComparator.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.apache.iceberg.types.Comparators; +import org.junit.jupiter.api.Test; + +/** + * Testing {@link Comparators#dataFile()} / {@link Comparators#deleteFile()} is easier in + * iceberg-core since the data/delete file builders are located here + */ +public class TestContentFileComparator extends TestBase { + + @Test + public void compareDataFiles() { + // path is same as FILE_A + DataFile dataFile = + DataFiles.builder(SPEC) + .withPath(FILE_A.path().toString()) + .withFileSizeInBytes(100) + .withRecordCount(100) + .build(); + assertThat(Comparators.dataFile().compare(FILE_A, dataFile)).isEqualTo(0); + assertThat(Comparators.dataFile().compare(FILE_A, FILE_A)).isEqualTo(0); + + assertThat(Comparators.dataFile().compare(FILE_A, FILE_B)).isLessThan(0); + assertThat(Comparators.dataFile().compare(FILE_B, FILE_A)).isGreaterThan(0); + } + + @Test + public void compareDeleteFiles() { + // same path as FILE_A_DELETES + DeleteFile deleteFile = + FileMetadata.deleteFileBuilder(SPEC) + .ofPositionDeletes() + .withPath(FILE_A_DELETES.path().toString()) + .withFileSizeInBytes(100) + .withRecordCount(100) + .build(); + assertThat(Comparators.deleteFile().compare(FILE_A_DELETES, deleteFile)).isEqualTo(0); + assertThat(Comparators.deleteFile().compare(FILE_A_DELETES, FILE_A_DELETES)).isEqualTo(0); + + assertThat(Comparators.deleteFile().compare(FILE_A_DELETES, FILE_B_DELETES)).isLessThan(0); + assertThat(Comparators.deleteFile().compare(FILE_B_DELETES, FILE_A_DELETES)).isGreaterThan(0); + + assertThat(Comparators.deleteFile().compare(FILE_A_DELETES, FILE_A2_DELETES)).isLessThan(0); + assertThat(Comparators.deleteFile().compare(FILE_A2_DELETES, FILE_A_DELETES)).isGreaterThan(0); + } +} diff --git a/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveTableTest.java b/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveTableTest.java index 9ae3c97db47c..13c459128dec 100644 --- a/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveTableTest.java +++ b/hive-metastore/src/test/java/org/apache/iceberg/hive/HiveTableTest.java @@ -213,7 +213,7 @@ public void testDropTable() throws IOException { table.newAppend().appendFile(file1).appendFile(file2).commit(); // delete file2 - table.newDelete().deleteFile(file2.path()).commit(); + table.newDelete().deleteFile(file2).commit(); String manifestListLocation = table.currentSnapshot().manifestListLocation().replace("file:", ""); diff --git a/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieTable.java b/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieTable.java index f0f75c842429..ca507eae575a 100644 --- a/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieTable.java +++ b/nessie/src/test/java/org/apache/iceberg/nessie/TestNessieTable.java @@ -407,7 +407,7 @@ public void testDropTable() throws IOException { table.newAppend().appendFile(file1).appendFile(file2).commit(); // delete file2 - table.newDelete().deleteFile(file2.path()).commit(); + table.newDelete().deleteFile(file2).commit(); String manifestListLocation = table.currentSnapshot().manifestListLocation().replace("file:", ""); diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java index d91779475845..7a973bd4c215 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java @@ -36,11 +36,12 @@ import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSortedSet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.PositionDeletesRewriteCoordinator; import org.apache.iceberg.spark.ScanTaskSetManager; import org.apache.iceberg.spark.SparkWriteConf; +import org.apache.iceberg.types.Comparators; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.broadcast.Broadcast; import org.apache.spark.sql.SparkSession; @@ -148,7 +149,10 @@ public boolean useCommitCoordinator() { @Override public void commit(WriterCommitMessage[] messages) { PositionDeletesRewriteCoordinator coordinator = PositionDeletesRewriteCoordinator.get(); - coordinator.stageRewrite(table, fileSetId, ImmutableSet.copyOf(files(messages))); + coordinator.stageRewrite( + table, + fileSetId, + ImmutableSortedSet.orderedBy(Comparators.deleteFile()).addAll(files(messages)).build()); } @Override diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index e4a0eb700be6..fed297d61579 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -52,12 +52,13 @@ import org.apache.iceberg.io.PartitioningWriter; import org.apache.iceberg.io.RollingDataWriter; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSortedSet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.FileRewriteCoordinator; import org.apache.iceberg.spark.SparkWriteConf; import org.apache.iceberg.spark.SparkWriteRequirements; +import org.apache.iceberg.types.Comparators; import org.apache.spark.TaskContext; import org.apache.spark.TaskContext$; import org.apache.spark.api.java.JavaSparkContext; @@ -491,7 +492,10 @@ private RewriteFiles(String fileSetID) { @Override public void commit(WriterCommitMessage[] messages) { FileRewriteCoordinator coordinator = FileRewriteCoordinator.get(); - coordinator.stageRewrite(table, fileSetID, ImmutableSet.copyOf(files(messages))); + coordinator.stageRewrite( + table, + fileSetID, + ImmutableSortedSet.orderedBy(Comparators.dataFile()).addAll(files(messages)).build()); } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestFileRewriteCoordinator.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestFileRewriteCoordinator.java index 3955d0395474..cc7eab7c6047 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestFileRewriteCoordinator.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestFileRewriteCoordinator.java @@ -33,7 +33,9 @@ import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.source.SimpleRecord; +import org.apache.iceberg.types.Comparators; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; @@ -93,7 +95,7 @@ public void testBinPackRewrite() throws NoSuchTableException, IOException { Set rewrittenFiles = taskSetManager.fetchTasks(table, fileSetID).stream() .map(t -> t.asFileScanTask().file()) - .collect(Collectors.toSet()); + .collect(Collectors.toCollection(() -> Sets.newTreeSet(Comparators.dataFile()))); Set addedFiles = rewriteCoordinator.fetchNewFiles(table, fileSetID); table.newRewrite().rewriteFiles(rewrittenFiles, addedFiles).commit(); } @@ -165,7 +167,7 @@ public void testSortRewrite() throws NoSuchTableException, IOException { Set rewrittenFiles = taskSetManager.fetchTasks(table, fileSetID).stream() .map(t -> t.asFileScanTask().file()) - .collect(Collectors.toSet()); + .collect(Collectors.toCollection(() -> Sets.newTreeSet(Comparators.dataFile()))); Set addedFiles = rewriteCoordinator.fetchNewFiles(table, fileSetID); table.newRewrite().rewriteFiles(rewrittenFiles, addedFiles).commit(); } @@ -247,7 +249,7 @@ public void testCommitMultipleRewrites() throws NoSuchTableException, IOExceptio Set addedFiles = fileSetIDs.stream() .flatMap(fileSetID -> rewriteCoordinator.fetchNewFiles(table, fileSetID).stream()) - .collect(Collectors.toSet()); + .collect(Collectors.toCollection(() -> Sets.newTreeSet(Comparators.dataFile()))); table.newRewrite().rewriteFiles(rewrittenFiles, addedFiles).commit(); table.refresh(); diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java index c73ef630ac48..7be9ee2f7a05 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java @@ -57,6 +57,7 @@ import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.relocated.com.google.common.collect.Streams; import org.apache.iceberg.spark.SparkSchemaUtil; +import org.apache.iceberg.types.Comparators; import org.apache.iceberg.types.Type; import org.apache.iceberg.types.Types; import org.apache.orc.storage.serde2.io.DateWritable; @@ -787,7 +788,7 @@ public static List dataFiles(Table table, String branch) { } public static Set deleteFiles(Table table) { - Set deleteFiles = Sets.newHashSet(); + Set deleteFiles = Sets.newTreeSet(Comparators.deleteFile()); for (FileScanTask task : table.newScan().planFiles()) { deleteFiles.addAll(task.deletes()); From 6fc4e3593c93d33cd08ba44bda5c285b0e23d097 Mon Sep 17 00:00:00 2001 From: Eduard Tudenhoefner Date: Thu, 19 Sep 2024 15:58:12 +0200 Subject: [PATCH 2/2] updates --- .../org/apache/iceberg/types/Comparators.java | 31 ++++++------ .../apache/iceberg/BaseOverwriteFiles.java | 2 +- .../org/apache/iceberg/BaseRewriteFiles.java | 2 +- .../java/org/apache/iceberg/FastAppend.java | 2 +- .../apache/iceberg/ManifestFilterManager.java | 49 ++++++++++++------- .../iceberg/MergingSnapshotProducer.java | 4 +- .../RewriteDataFilesCommitManager.java | 4 +- .../iceberg/actions/RewriteFileGroup.java | 4 +- .../actions/RewritePositionDeletesGroup.java | 4 +- .../iceberg/TestContentFileComparator.java | 24 ++++----- .../source/SparkPositionDeletesRewrite.java | 6 ++- .../iceberg/spark/source/SparkWrite.java | 5 +- .../spark/TestFileRewriteCoordinator.java | 6 +-- .../iceberg/spark/data/TestHelpers.java | 2 +- 14 files changed, 80 insertions(+), 65 deletions(-) diff --git a/api/src/main/java/org/apache/iceberg/types/Comparators.java b/api/src/main/java/org/apache/iceberg/types/Comparators.java index c487327d878d..d57247e19572 100644 --- a/api/src/main/java/org/apache/iceberg/types/Comparators.java +++ b/api/src/main/java/org/apache/iceberg/types/Comparators.java @@ -22,7 +22,7 @@ import java.util.Comparator; import java.util.List; import java.util.function.IntFunction; -import org.apache.iceberg.DataFile; +import org.apache.iceberg.ContentFile; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.StructLike; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; @@ -181,12 +181,8 @@ public static Comparator filePath() { return FilePathComparator.INSTANCE; } - public static Comparator dataFile() { - return DataFileComparator.INSTANCE; - } - - public static Comparator deleteFile() { - return DeleteFileComparator.INSTANCE; + public static Comparator> contentFile() { + return ContentFileComparator.INSTANCE; } private static class NullsFirst implements Comparator { @@ -405,24 +401,29 @@ public int compare(CharSequence s1, CharSequence s2) { } } - private static class DataFileComparator implements Comparator { - private static final DataFileComparator INSTANCE = new DataFileComparator(); + private static class DeleteFileComparator implements Comparator { + private static final DeleteFileComparator INSTANCE = new DeleteFileComparator(); - private DataFileComparator() {} + private DeleteFileComparator() {} @Override - public int compare(DataFile s1, DataFile s2) { + public int compare(DeleteFile s1, DeleteFile s2) { + // this needs to be updated once support for DVs is added return CharSeqComparator.INSTANCE.compare(s1.path(), s2.path()); } } - private static class DeleteFileComparator implements Comparator { - private static final DeleteFileComparator INSTANCE = new DeleteFileComparator(); + private static class ContentFileComparator implements Comparator> { + private static final ContentFileComparator INSTANCE = new ContentFileComparator(); - private DeleteFileComparator() {} + private ContentFileComparator() {} @Override - public int compare(DeleteFile s1, DeleteFile s2) { + public int compare(ContentFile s1, ContentFile s2) { + if (s1 instanceof DeleteFile && s2 instanceof DeleteFile) { + return DeleteFileComparator.INSTANCE.compare((DeleteFile) s1, (DeleteFile) s2); + } + return CharSeqComparator.INSTANCE.compare(s1.path(), s2.path()); } } diff --git a/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java b/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java index 427625051a9c..18b8a177eb66 100644 --- a/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java +++ b/core/src/main/java/org/apache/iceberg/BaseOverwriteFiles.java @@ -31,7 +31,7 @@ public class BaseOverwriteFiles extends MergingSnapshotProducer implements OverwriteFiles { - private final Set deletedDataFiles = Sets.newTreeSet(Comparators.dataFile()); + private final Set deletedDataFiles = Sets.newTreeSet(Comparators.contentFile()); private boolean validateAddedFilesMatchOverwriteFilter = false; private Long startingSnapshotId = null; private Expression conflictDetectionFilter = null; diff --git a/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java b/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java index 97981135916e..510295e91271 100644 --- a/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java +++ b/core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java @@ -25,7 +25,7 @@ import org.apache.iceberg.types.Comparators; class BaseRewriteFiles extends MergingSnapshotProducer implements RewriteFiles { - private final Set replacedDataFiles = Sets.newTreeSet(Comparators.dataFile()); + private final Set replacedDataFiles = Sets.newTreeSet(Comparators.contentFile()); private Long startingSnapshotId = null; BaseRewriteFiles(String tableName, TableOperations ops) { diff --git a/core/src/main/java/org/apache/iceberg/FastAppend.java b/core/src/main/java/org/apache/iceberg/FastAppend.java index 7e54b49087a1..616e009439da 100644 --- a/core/src/main/java/org/apache/iceberg/FastAppend.java +++ b/core/src/main/java/org/apache/iceberg/FastAppend.java @@ -45,7 +45,7 @@ class FastAppend extends SnapshotProducer implements AppendFiles { private final PartitionSpec spec; private final SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder(); private final List newFiles = Lists.newArrayList(); - private final Set newFileSet = Sets.newTreeSet(Comparators.dataFile()); + private final Set newFileSet = Sets.newTreeSet(Comparators.contentFile()); private final List appendManifests = Lists.newArrayList(); private final List rewrittenAppendManifests = Lists.newArrayList(); private List newManifests = null; diff --git a/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java b/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java index 106be74fa3ad..087e38d687b6 100644 --- a/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java +++ b/core/src/main/java/org/apache/iceberg/ManifestFilterManager.java @@ -25,6 +25,7 @@ import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.function.Supplier; +import java.util.stream.Collectors; import org.apache.iceberg.exceptions.RuntimeIOException; import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.expressions.Expression; @@ -36,10 +37,10 @@ import org.apache.iceberg.relocated.com.google.common.base.Joiner; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; +import org.apache.iceberg.types.Comparators; import org.apache.iceberg.util.CharSequenceSet; import org.apache.iceberg.util.CharSequenceWrapper; import org.apache.iceberg.util.ManifestFileUtil; @@ -71,6 +72,7 @@ public String partition() { private final PartitionSet deleteFilePartitions; private final PartitionSet dropPartitions; private final CharSequenceSet deletePaths = CharSequenceSet.empty(); + private final Set filesToDelete = Sets.newTreeSet(Comparators.contentFile()); private Expression deleteExpression = Expressions.alwaysFalse(); private long minSequenceNumber = 0; private boolean hasPathOnlyDeletes = false; @@ -153,7 +155,7 @@ void caseSensitive(boolean newCaseSensitive) { void delete(F file) { Preconditions.checkNotNull(file, "Cannot delete file: null"); invalidateFilteredCache(); - deletePaths.add(file.path()); + filesToDelete.add(file); deleteFilePartitions.add(file.specId(), file.partition()); } @@ -167,6 +169,7 @@ void delete(CharSequence path) { boolean containsDeletes() { return !deletePaths.isEmpty() + || !filesToDelete.isEmpty() || deleteExpression != Expressions.alwaysFalse() || !dropPartitions.isEmpty(); } @@ -233,23 +236,28 @@ SnapshotSummary.Builder buildSummary(Iterable manifests) { @SuppressWarnings("CollectionUndefinedEquality") private void validateRequiredDeletes(ManifestFile... manifests) { if (failMissingDeletePaths) { - CharSequenceSet deletedFiles = deletedFiles(manifests); + Set deletedFiles = deletedFiles(manifests); + ValidationException.check( - deletedFiles.containsAll(deletePaths), + deletedFiles.containsAll(filesToDelete), "Missing required files to delete: %s", - COMMA.join(Iterables.filter(deletePaths, path -> !deletedFiles.contains(path)))); + COMMA.join( + filesToDelete.stream() + .filter(f -> !deletedFiles.contains(f)) + .map(ContentFile::path) + .collect(Collectors.toList()))); } } - private CharSequenceSet deletedFiles(ManifestFile[] manifests) { - CharSequenceSet deletedFiles = CharSequenceSet.empty(); + private Set deletedFiles(ManifestFile[] manifests) { + Set deletedFiles = Sets.newTreeSet(Comparators.contentFile()); if (manifests != null) { for (ManifestFile manifest : manifests) { Iterable manifestDeletes = filteredManifestToDeletedFiles.get(manifest); if (manifestDeletes != null) { for (F file : manifestDeletes) { - deletedFiles.add(file.path()); + deletedFiles.add(file); } } } @@ -347,7 +355,7 @@ private boolean canContainDeletedFiles(ManifestFile manifest) { boolean canContainDroppedFiles; if (hasPathOnlyDeletes) { canContainDroppedFiles = true; - } else if (!deletePaths.isEmpty()) { + } else if (!filesToDelete.isEmpty()) { // because there were no path-only deletes, the set of deleted file partitions is valid canContainDroppedFiles = ManifestFileUtil.canContainAny(manifest, deleteFilePartitions, specsById); @@ -372,8 +380,14 @@ private boolean manifestHasDeletedFiles( for (ManifestEntry entry : reader.liveEntries()) { F file = entry.file(); + + // add path-based delete to set of files to be deleted + if (deletePaths.contains(CharSequenceWrapper.wrap(file.path()))) { + filesToDelete.add(file); + } + boolean markedForDelete = - deletePaths.contains(file.path()) + filesToDelete.contains(file) || dropPartitions.contains(file.specId(), file.partition()) || (isDelete && entry.isLive() @@ -409,8 +423,7 @@ private ManifestFile filterManifestWithDeletedFiles( boolean isDelete = reader.isDeleteManifestReader(); // when this point is reached, there is at least one file that will be deleted in the // manifest. produce a copy of the manifest with all deleted files removed. - List deletedFiles = Lists.newArrayList(); - Set deletedPaths = Sets.newHashSet(); + Set deleted = Sets.newTreeSet(Comparators.contentFile()); try { ManifestWriter writer = newManifestWriter(reader.spec()); @@ -421,7 +434,7 @@ private ManifestFile filterManifestWithDeletedFiles( entry -> { F file = entry.file(); boolean markedForDelete = - deletePaths.contains(file.path()) + filesToDelete.contains(file) || dropPartitions.contains(file.specId(), file.partition()) || (isDelete && entry.isLive() @@ -441,18 +454,16 @@ private ManifestFile filterManifestWithDeletedFiles( if (allRowsMatch) { writer.delete(entry); - CharSequenceWrapper wrapper = CharSequenceWrapper.wrap(entry.file().path()); - if (deletedPaths.contains(wrapper)) { + if (deleted.contains(entry.file())) { LOG.warn( "Deleting a duplicate path from manifest {}: {}", manifest.path(), - wrapper.get()); + entry.file().path()); duplicateDeleteCount += 1; } else { // only add the file to deletes if it is a new delete // this keeps the snapshot summary accurate for non-duplicate data - deletedFiles.add(entry.file().copyWithoutStats()); - deletedPaths.add(wrapper); + deleted.add(entry.file().copyWithoutStats()); } } else { writer.existing(entry); @@ -472,7 +483,7 @@ private ManifestFile filterManifestWithDeletedFiles( // update caches filteredManifests.put(manifest, filtered); - filteredManifestToDeletedFiles.put(filtered, deletedFiles); + filteredManifestToDeletedFiles.put(filtered, deleted); return filtered; diff --git a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java index cc0f9d024a72..c7f3ac3a8ec6 100644 --- a/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java +++ b/core/src/main/java/org/apache/iceberg/MergingSnapshotProducer.java @@ -82,8 +82,8 @@ abstract class MergingSnapshotProducer extends SnapshotProducer { // update data private final Map> newDataFilesBySpec = Maps.newHashMap(); - private final Set newDataFiles = Sets.newTreeSet(Comparators.dataFile()); - private final Set newDeleteFiles = Sets.newTreeSet(Comparators.deleteFile()); + private final Set newDataFiles = Sets.newTreeSet(Comparators.contentFile()); + private final Set newDeleteFiles = Sets.newTreeSet(Comparators.contentFile()); private Long newDataFilesDataSequenceNumber; private final Map> newDeleteFilesBySpec = Maps.newHashMap(); private final List appendManifests = Lists.newArrayList(); diff --git a/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java b/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java index f882af0b618b..435a5e4b118e 100644 --- a/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java +++ b/core/src/main/java/org/apache/iceberg/actions/RewriteDataFilesCommitManager.java @@ -74,8 +74,8 @@ public RewriteDataFilesCommitManager( * @param fileGroups fileSets to commit */ public void commitFileGroups(Set fileGroups) { - Set rewrittenDataFiles = Sets.newTreeSet(Comparators.dataFile()); - Set addedDataFiles = Sets.newTreeSet(Comparators.dataFile()); + Set rewrittenDataFiles = Sets.newTreeSet(Comparators.contentFile()); + Set addedDataFiles = Sets.newTreeSet(Comparators.contentFile()); for (RewriteFileGroup group : fileGroups) { rewrittenDataFiles.addAll(group.rewrittenFiles()); addedDataFiles.addAll(group.addedFiles()); diff --git a/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java b/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java index fe0d3df46c6d..8f80d07bf961 100644 --- a/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java +++ b/core/src/main/java/org/apache/iceberg/actions/RewriteFileGroup.java @@ -39,7 +39,7 @@ public class RewriteFileGroup { private final FileGroupInfo info; private final List fileScanTasks; - private Set addedFiles = Sets.newTreeSet(Comparators.dataFile()); + private Set addedFiles = Sets.newTreeSet(Comparators.contentFile()); public RewriteFileGroup(FileGroupInfo info, List fileScanTasks) { this.info = info; @@ -61,7 +61,7 @@ public void setOutputFiles(Set files) { public Set rewrittenFiles() { return fileScans().stream() .map(FileScanTask::file) - .collect(Collectors.toCollection(() -> Sets.newTreeSet(Comparators.dataFile()))); + .collect(Collectors.toCollection(() -> Sets.newTreeSet(Comparators.contentFile()))); } public Set addedFiles() { diff --git a/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesGroup.java b/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesGroup.java index cb65340a7311..8a24eafb412f 100644 --- a/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesGroup.java +++ b/core/src/main/java/org/apache/iceberg/actions/RewritePositionDeletesGroup.java @@ -41,7 +41,7 @@ public class RewritePositionDeletesGroup { private final List tasks; private final long maxRewrittenDataSequenceNumber; - private Set addedDeleteFiles = Sets.newTreeSet(Comparators.deleteFile()); + private Set addedDeleteFiles = Sets.newTreeSet(Comparators.contentFile()); public RewritePositionDeletesGroup(FileGroupInfo info, List tasks) { Preconditions.checkArgument(!tasks.isEmpty(), "Tasks must not be empty"); @@ -70,7 +70,7 @@ public long maxRewrittenDataSequenceNumber() { public Set rewrittenDeleteFiles() { return tasks().stream() .map(PositionDeletesScanTask::file) - .collect(Collectors.toCollection(() -> Sets.newTreeSet(Comparators.deleteFile()))); + .collect(Collectors.toCollection(() -> Sets.newTreeSet(Comparators.contentFile()))); } public Set addedDeleteFiles() { diff --git a/core/src/test/java/org/apache/iceberg/TestContentFileComparator.java b/core/src/test/java/org/apache/iceberg/TestContentFileComparator.java index 9d6ee651c797..2af78c6cc4a4 100644 --- a/core/src/test/java/org/apache/iceberg/TestContentFileComparator.java +++ b/core/src/test/java/org/apache/iceberg/TestContentFileComparator.java @@ -24,8 +24,8 @@ import org.junit.jupiter.api.Test; /** - * Testing {@link Comparators#dataFile()} / {@link Comparators#deleteFile()} is easier in - * iceberg-core since the data/delete file builders are located here + * Testing {@link Comparators#contentFile()} is easier in iceberg-core since the data/delete file + * builders are located here */ public class TestContentFileComparator extends TestBase { @@ -38,11 +38,11 @@ public void compareDataFiles() { .withFileSizeInBytes(100) .withRecordCount(100) .build(); - assertThat(Comparators.dataFile().compare(FILE_A, dataFile)).isEqualTo(0); - assertThat(Comparators.dataFile().compare(FILE_A, FILE_A)).isEqualTo(0); + assertThat(Comparators.contentFile().compare(FILE_A, dataFile)).isEqualTo(0); + assertThat(Comparators.contentFile().compare(FILE_A, FILE_A)).isEqualTo(0); - assertThat(Comparators.dataFile().compare(FILE_A, FILE_B)).isLessThan(0); - assertThat(Comparators.dataFile().compare(FILE_B, FILE_A)).isGreaterThan(0); + assertThat(Comparators.contentFile().compare(FILE_A, FILE_B)).isLessThan(0); + assertThat(Comparators.contentFile().compare(FILE_B, FILE_A)).isGreaterThan(0); } @Test @@ -55,13 +55,13 @@ public void compareDeleteFiles() { .withFileSizeInBytes(100) .withRecordCount(100) .build(); - assertThat(Comparators.deleteFile().compare(FILE_A_DELETES, deleteFile)).isEqualTo(0); - assertThat(Comparators.deleteFile().compare(FILE_A_DELETES, FILE_A_DELETES)).isEqualTo(0); + assertThat(Comparators.contentFile().compare(FILE_A_DELETES, deleteFile)).isEqualTo(0); + assertThat(Comparators.contentFile().compare(FILE_A_DELETES, FILE_A_DELETES)).isEqualTo(0); - assertThat(Comparators.deleteFile().compare(FILE_A_DELETES, FILE_B_DELETES)).isLessThan(0); - assertThat(Comparators.deleteFile().compare(FILE_B_DELETES, FILE_A_DELETES)).isGreaterThan(0); + assertThat(Comparators.contentFile().compare(FILE_A_DELETES, FILE_B_DELETES)).isLessThan(0); + assertThat(Comparators.contentFile().compare(FILE_B_DELETES, FILE_A_DELETES)).isGreaterThan(0); - assertThat(Comparators.deleteFile().compare(FILE_A_DELETES, FILE_A2_DELETES)).isLessThan(0); - assertThat(Comparators.deleteFile().compare(FILE_A2_DELETES, FILE_A_DELETES)).isGreaterThan(0); + assertThat(Comparators.contentFile().compare(FILE_A_DELETES, FILE_A2_DELETES)).isLessThan(0); + assertThat(Comparators.contentFile().compare(FILE_A2_DELETES, FILE_A_DELETES)).isGreaterThan(0); } } diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java index 7a973bd4c215..5a596652e7aa 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkPositionDeletesRewrite.java @@ -22,6 +22,7 @@ import java.util.Arrays; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import org.apache.iceberg.DeleteFile; import org.apache.iceberg.FileFormat; import org.apache.iceberg.MetadataColumns; @@ -36,8 +37,8 @@ import org.apache.iceberg.io.FileIO; import org.apache.iceberg.io.OutputFileFactory; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSortedSet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.PositionDeletesRewriteCoordinator; import org.apache.iceberg.spark.ScanTaskSetManager; import org.apache.iceberg.spark.SparkWriteConf; @@ -152,7 +153,8 @@ public void commit(WriterCommitMessage[] messages) { coordinator.stageRewrite( table, fileSetId, - ImmutableSortedSet.orderedBy(Comparators.deleteFile()).addAll(files(messages)).build()); + files(messages).stream() + .collect(Collectors.toCollection(() -> Sets.newTreeSet(Comparators.contentFile())))); } @Override diff --git a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java index fed297d61579..5bd58476628b 100644 --- a/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java +++ b/spark/v3.5/spark/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java @@ -52,8 +52,8 @@ import org.apache.iceberg.io.PartitioningWriter; import org.apache.iceberg.io.RollingDataWriter; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; -import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSortedSet; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.CommitMetadata; import org.apache.iceberg.spark.FileRewriteCoordinator; import org.apache.iceberg.spark.SparkWriteConf; @@ -495,7 +495,8 @@ public void commit(WriterCommitMessage[] messages) { coordinator.stageRewrite( table, fileSetID, - ImmutableSortedSet.orderedBy(Comparators.dataFile()).addAll(files(messages)).build()); + files(messages).stream() + .collect(Collectors.toCollection(() -> Sets.newTreeSet(Comparators.contentFile())))); } } diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestFileRewriteCoordinator.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestFileRewriteCoordinator.java index cc7eab7c6047..d48f1f74cdc9 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestFileRewriteCoordinator.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/TestFileRewriteCoordinator.java @@ -95,7 +95,7 @@ public void testBinPackRewrite() throws NoSuchTableException, IOException { Set rewrittenFiles = taskSetManager.fetchTasks(table, fileSetID).stream() .map(t -> t.asFileScanTask().file()) - .collect(Collectors.toCollection(() -> Sets.newTreeSet(Comparators.dataFile()))); + .collect(Collectors.toCollection(() -> Sets.newTreeSet(Comparators.contentFile()))); Set addedFiles = rewriteCoordinator.fetchNewFiles(table, fileSetID); table.newRewrite().rewriteFiles(rewrittenFiles, addedFiles).commit(); } @@ -167,7 +167,7 @@ public void testSortRewrite() throws NoSuchTableException, IOException { Set rewrittenFiles = taskSetManager.fetchTasks(table, fileSetID).stream() .map(t -> t.asFileScanTask().file()) - .collect(Collectors.toCollection(() -> Sets.newTreeSet(Comparators.dataFile()))); + .collect(Collectors.toCollection(() -> Sets.newTreeSet(Comparators.contentFile()))); Set addedFiles = rewriteCoordinator.fetchNewFiles(table, fileSetID); table.newRewrite().rewriteFiles(rewrittenFiles, addedFiles).commit(); } @@ -249,7 +249,7 @@ public void testCommitMultipleRewrites() throws NoSuchTableException, IOExceptio Set addedFiles = fileSetIDs.stream() .flatMap(fileSetID -> rewriteCoordinator.fetchNewFiles(table, fileSetID).stream()) - .collect(Collectors.toCollection(() -> Sets.newTreeSet(Comparators.dataFile()))); + .collect(Collectors.toCollection(() -> Sets.newTreeSet(Comparators.contentFile()))); table.newRewrite().rewriteFiles(rewrittenFiles, addedFiles).commit(); table.refresh(); diff --git a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java index 7be9ee2f7a05..6fdde47905bd 100644 --- a/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java +++ b/spark/v3.5/spark/src/test/java/org/apache/iceberg/spark/data/TestHelpers.java @@ -788,7 +788,7 @@ public static List dataFiles(Table table, String branch) { } public static Set deleteFiles(Table table) { - Set deleteFiles = Sets.newTreeSet(Comparators.deleteFile()); + Set deleteFiles = Sets.newTreeSet(Comparators.contentFile()); for (FileScanTask task : table.newScan().planFiles()) { deleteFiles.addAll(task.deletes());