Skip to content

Commit

Permalink
updates
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra committed Sep 19, 2024
1 parent 64c9c37 commit 70dcd1a
Show file tree
Hide file tree
Showing 14 changed files with 75 additions and 63 deletions.
31 changes: 16 additions & 15 deletions api/src/main/java/org/apache/iceberg/types/Comparators.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.function.IntFunction;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -181,12 +181,8 @@ public static Comparator<CharSequence> filePath() {
return FilePathComparator.INSTANCE;
}

public static Comparator<DataFile> dataFile() {
return DataFileComparator.INSTANCE;
}

public static Comparator<DeleteFile> deleteFile() {
return DeleteFileComparator.INSTANCE;
public static Comparator<ContentFile<?>> contentFile() {
return ContentFileComparator.INSTANCE;
}

private static class NullsFirst<T> implements Comparator<T> {
Expand Down Expand Up @@ -405,24 +401,29 @@ public int compare(CharSequence s1, CharSequence s2) {
}
}

private static class DataFileComparator implements Comparator<DataFile> {
private static final DataFileComparator INSTANCE = new DataFileComparator();
private static class DeleteFileComparator implements Comparator<DeleteFile> {
private static final DeleteFileComparator INSTANCE = new DeleteFileComparator();

private DataFileComparator() {}
private DeleteFileComparator() {}

@Override
public int compare(DataFile s1, DataFile s2) {
public int compare(DeleteFile s1, DeleteFile s2) {
// this needs to be updated once support for DVs is added
return CharSeqComparator.INSTANCE.compare(s1.path(), s2.path());
}
}

private static class DeleteFileComparator implements Comparator<DeleteFile> {
private static final DeleteFileComparator INSTANCE = new DeleteFileComparator();
private static class ContentFileComparator implements Comparator<ContentFile<?>> {
private static final ContentFileComparator INSTANCE = new ContentFileComparator();

private DeleteFileComparator() {}
private ContentFileComparator() {}

@Override
public int compare(DeleteFile s1, DeleteFile s2) {
public int compare(ContentFile<?> s1, ContentFile<?> s2) {
if (s1 instanceof DeleteFile && s2 instanceof DeleteFile) {
return DeleteFileComparator.INSTANCE.compare((DeleteFile) s1, (DeleteFile) s2);
}

return CharSeqComparator.INSTANCE.compare(s1.path(), s2.path());
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@

public class BaseOverwriteFiles extends MergingSnapshotProducer<OverwriteFiles>
implements OverwriteFiles {
private final Set<DataFile> deletedDataFiles = Sets.newTreeSet(Comparators.dataFile());
private final Set<DataFile> deletedDataFiles = Sets.newTreeSet(Comparators.contentFile());
private boolean validateAddedFilesMatchOverwriteFilter = false;
private Long startingSnapshotId = null;
private Expression conflictDetectionFilter = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.iceberg.types.Comparators;

class BaseRewriteFiles extends MergingSnapshotProducer<RewriteFiles> implements RewriteFiles {
private final Set<DataFile> replacedDataFiles = Sets.newTreeSet(Comparators.dataFile());
private final Set<DataFile> replacedDataFiles = Sets.newTreeSet(Comparators.contentFile());
private Long startingSnapshotId = null;

BaseRewriteFiles(String tableName, TableOperations ops) {
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/FastAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class FastAppend extends SnapshotProducer<AppendFiles> implements AppendFiles {
private final PartitionSpec spec;
private final SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder();
private final List<DataFile> newFiles = Lists.newArrayList();
private final Set<DataFile> newFileSet = Sets.newTreeSet(Comparators.dataFile());
private final Set<DataFile> newFileSet = Sets.newTreeSet(Comparators.contentFile());
private final List<ManifestFile> appendManifests = Lists.newArrayList();
private final List<ManifestFile> rewrittenAppendManifests = Lists.newArrayList();
private List<ManifestFile> newManifests = null;
Expand Down
49 changes: 30 additions & 19 deletions core/src/main/java/org/apache/iceberg/ManifestFilterManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.iceberg.exceptions.RuntimeIOException;
import org.apache.iceberg.exceptions.ValidationException;
import org.apache.iceberg.expressions.Expression;
Expand All @@ -36,10 +37,10 @@
import org.apache.iceberg.relocated.com.google.common.base.Joiner;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.CharSequenceWrapper;
import org.apache.iceberg.util.ManifestFileUtil;
Expand Down Expand Up @@ -71,6 +72,7 @@ public String partition() {
private final PartitionSet deleteFilePartitions;
private final PartitionSet dropPartitions;
private final CharSequenceSet deletePaths = CharSequenceSet.empty();
private final Set<F> filesToDelete = Sets.newTreeSet(Comparators.contentFile());
private Expression deleteExpression = Expressions.alwaysFalse();
private long minSequenceNumber = 0;
private boolean hasPathOnlyDeletes = false;
Expand Down Expand Up @@ -153,7 +155,7 @@ void caseSensitive(boolean newCaseSensitive) {
void delete(F file) {
Preconditions.checkNotNull(file, "Cannot delete file: null");
invalidateFilteredCache();
deletePaths.add(file.path());
filesToDelete.add(file);
deleteFilePartitions.add(file.specId(), file.partition());
}

Expand All @@ -167,6 +169,7 @@ void delete(CharSequence path) {

boolean containsDeletes() {
return !deletePaths.isEmpty()
|| !filesToDelete.isEmpty()
|| deleteExpression != Expressions.alwaysFalse()
|| !dropPartitions.isEmpty();
}
Expand Down Expand Up @@ -233,23 +236,28 @@ SnapshotSummary.Builder buildSummary(Iterable<ManifestFile> manifests) {
@SuppressWarnings("CollectionUndefinedEquality")
private void validateRequiredDeletes(ManifestFile... manifests) {
if (failMissingDeletePaths) {
CharSequenceSet deletedFiles = deletedFiles(manifests);
Set<F> deletedFiles = deletedFiles(manifests);

ValidationException.check(
deletedFiles.containsAll(deletePaths),
deletedFiles.containsAll(filesToDelete),
"Missing required files to delete: %s",
COMMA.join(Iterables.filter(deletePaths, path -> !deletedFiles.contains(path))));
COMMA.join(
filesToDelete.stream()
.filter(f -> !deletedFiles.contains(f))
.map(ContentFile::path)
.collect(Collectors.toList())));
}
}

private CharSequenceSet deletedFiles(ManifestFile[] manifests) {
CharSequenceSet deletedFiles = CharSequenceSet.empty();
private Set<F> deletedFiles(ManifestFile[] manifests) {
Set<F> deletedFiles = Sets.newTreeSet(Comparators.contentFile());

if (manifests != null) {
for (ManifestFile manifest : manifests) {
Iterable<F> manifestDeletes = filteredManifestToDeletedFiles.get(manifest);
if (manifestDeletes != null) {
for (F file : manifestDeletes) {
deletedFiles.add(file.path());
deletedFiles.add(file);
}
}
}
Expand Down Expand Up @@ -347,7 +355,7 @@ private boolean canContainDeletedFiles(ManifestFile manifest) {
boolean canContainDroppedFiles;
if (hasPathOnlyDeletes) {
canContainDroppedFiles = true;
} else if (!deletePaths.isEmpty()) {
} else if (!filesToDelete.isEmpty()) {
// because there were no path-only deletes, the set of deleted file partitions is valid
canContainDroppedFiles =
ManifestFileUtil.canContainAny(manifest, deleteFilePartitions, specsById);
Expand All @@ -372,8 +380,14 @@ private boolean manifestHasDeletedFiles(

for (ManifestEntry<F> entry : reader.liveEntries()) {
F file = entry.file();

// add path-based delete to set of files to be deleted
if (deletePaths.contains(CharSequenceWrapper.wrap(file.path()))) {
filesToDelete.add(file);
}

boolean markedForDelete =
deletePaths.contains(file.path())
filesToDelete.contains(file)
|| dropPartitions.contains(file.specId(), file.partition())
|| (isDelete
&& entry.isLive()
Expand Down Expand Up @@ -409,8 +423,7 @@ private ManifestFile filterManifestWithDeletedFiles(
boolean isDelete = reader.isDeleteManifestReader();
// when this point is reached, there is at least one file that will be deleted in the
// manifest. produce a copy of the manifest with all deleted files removed.
List<F> deletedFiles = Lists.newArrayList();
Set<CharSequenceWrapper> deletedPaths = Sets.newHashSet();
Set<F> deleted = Sets.newTreeSet(Comparators.contentFile());

try {
ManifestWriter<F> writer = newManifestWriter(reader.spec());
Expand All @@ -421,7 +434,7 @@ private ManifestFile filterManifestWithDeletedFiles(
entry -> {
F file = entry.file();
boolean markedForDelete =
deletePaths.contains(file.path())
filesToDelete.contains(file)
|| dropPartitions.contains(file.specId(), file.partition())
|| (isDelete
&& entry.isLive()
Expand All @@ -441,18 +454,16 @@ private ManifestFile filterManifestWithDeletedFiles(
if (allRowsMatch) {
writer.delete(entry);

CharSequenceWrapper wrapper = CharSequenceWrapper.wrap(entry.file().path());
if (deletedPaths.contains(wrapper)) {
if (deleted.contains(entry.file())) {
LOG.warn(
"Deleting a duplicate path from manifest {}: {}",
manifest.path(),
wrapper.get());
entry.file().path());
duplicateDeleteCount += 1;
} else {
// only add the file to deletes if it is a new delete
// this keeps the snapshot summary accurate for non-duplicate data
deletedFiles.add(entry.file().copyWithoutStats());
deletedPaths.add(wrapper);
deleted.add(entry.file().copyWithoutStats());
}
} else {
writer.existing(entry);
Expand All @@ -472,7 +483,7 @@ private ManifestFile filterManifestWithDeletedFiles(

// update caches
filteredManifests.put(manifest, filtered);
filteredManifestToDeletedFiles.put(filtered, deletedFiles);
filteredManifestToDeletedFiles.put(filtered, deleted);

return filtered;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {

// update data
private final Map<PartitionSpec, List<DataFile>> newDataFilesBySpec = Maps.newHashMap();
private final Set<DataFile> newDataFiles = Sets.newTreeSet(Comparators.dataFile());
private final Set<DeleteFile> newDeleteFiles = Sets.newTreeSet(Comparators.deleteFile());
private final Set<DataFile> newDataFiles = Sets.newTreeSet(Comparators.contentFile());
private final Set<DeleteFile> newDeleteFiles = Sets.newTreeSet(Comparators.contentFile());
private Long newDataFilesDataSequenceNumber;
private final Map<Integer, List<DeleteFileHolder>> newDeleteFilesBySpec = Maps.newHashMap();
private final List<ManifestFile> appendManifests = Lists.newArrayList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ public RewriteDataFilesCommitManager(
* @param fileGroups fileSets to commit
*/
public void commitFileGroups(Set<RewriteFileGroup> fileGroups) {
Set<DataFile> rewrittenDataFiles = Sets.newTreeSet(Comparators.dataFile());
Set<DataFile> addedDataFiles = Sets.newTreeSet(Comparators.dataFile());
Set<DataFile> rewrittenDataFiles = Sets.newTreeSet(Comparators.contentFile());
Set<DataFile> addedDataFiles = Sets.newTreeSet(Comparators.contentFile());
for (RewriteFileGroup group : fileGroups) {
rewrittenDataFiles.addAll(group.rewrittenFiles());
addedDataFiles.addAll(group.addedFiles());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class RewriteFileGroup {
private final FileGroupInfo info;
private final List<FileScanTask> fileScanTasks;

private Set<DataFile> addedFiles = Sets.newTreeSet(Comparators.dataFile());
private Set<DataFile> addedFiles = Sets.newTreeSet(Comparators.contentFile());

public RewriteFileGroup(FileGroupInfo info, List<FileScanTask> fileScanTasks) {
this.info = info;
Expand All @@ -61,7 +61,7 @@ public void setOutputFiles(Set<DataFile> files) {
public Set<DataFile> rewrittenFiles() {
return fileScans().stream()
.map(FileScanTask::file)
.collect(Collectors.toCollection(() -> Sets.newTreeSet(Comparators.dataFile())));
.collect(Collectors.toCollection(() -> Sets.newTreeSet(Comparators.contentFile())));
}

public Set<DataFile> addedFiles() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public class RewritePositionDeletesGroup {
private final List<PositionDeletesScanTask> tasks;
private final long maxRewrittenDataSequenceNumber;

private Set<DeleteFile> addedDeleteFiles = Sets.newTreeSet(Comparators.deleteFile());
private Set<DeleteFile> addedDeleteFiles = Sets.newTreeSet(Comparators.contentFile());

public RewritePositionDeletesGroup(FileGroupInfo info, List<PositionDeletesScanTask> tasks) {
Preconditions.checkArgument(!tasks.isEmpty(), "Tasks must not be empty");
Expand Down Expand Up @@ -70,7 +70,7 @@ public long maxRewrittenDataSequenceNumber() {
public Set<DeleteFile> rewrittenDeleteFiles() {
return tasks().stream()
.map(PositionDeletesScanTask::file)
.collect(Collectors.toCollection(() -> Sets.newTreeSet(Comparators.deleteFile())));
.collect(Collectors.toCollection(() -> Sets.newTreeSet(Comparators.contentFile())));
}

public Set<DeleteFile> addedDeleteFiles() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.junit.jupiter.api.Test;

/**
* Testing {@link Comparators#dataFile()} / {@link Comparators#deleteFile()} is easier in
* iceberg-core since the data/delete file builders are located here
* Testing {@link Comparators#contentFile()} is easier in iceberg-core since the data/delete file
* builders are located here
*/
public class TestContentFileComparator extends TestBase {

Expand All @@ -38,11 +38,11 @@ public void compareDataFiles() {
.withFileSizeInBytes(100)
.withRecordCount(100)
.build();
assertThat(Comparators.dataFile().compare(FILE_A, dataFile)).isEqualTo(0);
assertThat(Comparators.dataFile().compare(FILE_A, FILE_A)).isEqualTo(0);
assertThat(Comparators.contentFile().compare(FILE_A, dataFile)).isEqualTo(0);
assertThat(Comparators.contentFile().compare(FILE_A, FILE_A)).isEqualTo(0);

assertThat(Comparators.dataFile().compare(FILE_A, FILE_B)).isLessThan(0);
assertThat(Comparators.dataFile().compare(FILE_B, FILE_A)).isGreaterThan(0);
assertThat(Comparators.contentFile().compare(FILE_A, FILE_B)).isLessThan(0);
assertThat(Comparators.contentFile().compare(FILE_B, FILE_A)).isGreaterThan(0);
}

@Test
Expand All @@ -55,13 +55,13 @@ public void compareDeleteFiles() {
.withFileSizeInBytes(100)
.withRecordCount(100)
.build();
assertThat(Comparators.deleteFile().compare(FILE_A_DELETES, deleteFile)).isEqualTo(0);
assertThat(Comparators.deleteFile().compare(FILE_A_DELETES, FILE_A_DELETES)).isEqualTo(0);
assertThat(Comparators.contentFile().compare(FILE_A_DELETES, deleteFile)).isEqualTo(0);
assertThat(Comparators.contentFile().compare(FILE_A_DELETES, FILE_A_DELETES)).isEqualTo(0);

assertThat(Comparators.deleteFile().compare(FILE_A_DELETES, FILE_B_DELETES)).isLessThan(0);
assertThat(Comparators.deleteFile().compare(FILE_B_DELETES, FILE_A_DELETES)).isGreaterThan(0);
assertThat(Comparators.contentFile().compare(FILE_A_DELETES, FILE_B_DELETES)).isLessThan(0);
assertThat(Comparators.contentFile().compare(FILE_B_DELETES, FILE_A_DELETES)).isGreaterThan(0);

assertThat(Comparators.deleteFile().compare(FILE_A_DELETES, FILE_A2_DELETES)).isLessThan(0);
assertThat(Comparators.deleteFile().compare(FILE_A2_DELETES, FILE_A_DELETES)).isGreaterThan(0);
assertThat(Comparators.contentFile().compare(FILE_A_DELETES, FILE_A2_DELETES)).isLessThan(0);
assertThat(Comparators.contentFile().compare(FILE_A2_DELETES, FILE_A_DELETES)).isGreaterThan(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public void commit(WriterCommitMessage[] messages) {
coordinator.stageRewrite(
table,
fileSetId,
ImmutableSortedSet.orderedBy(Comparators.deleteFile()).addAll(files(messages)).build());
ImmutableSortedSet.orderedBy(Comparators.contentFile()).addAll(files(messages)).build());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -495,7 +495,7 @@ public void commit(WriterCommitMessage[] messages) {
coordinator.stageRewrite(
table,
fileSetID,
ImmutableSortedSet.orderedBy(Comparators.dataFile()).addAll(files(messages)).build());
ImmutableSortedSet.orderedBy(Comparators.contentFile()).addAll(files(messages)).build());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public void testBinPackRewrite() throws NoSuchTableException, IOException {
Set<DataFile> rewrittenFiles =
taskSetManager.fetchTasks(table, fileSetID).stream()
.map(t -> t.asFileScanTask().file())
.collect(Collectors.toCollection(() -> Sets.newTreeSet(Comparators.dataFile())));
.collect(Collectors.toCollection(() -> Sets.newTreeSet(Comparators.contentFile())));
Set<DataFile> addedFiles = rewriteCoordinator.fetchNewFiles(table, fileSetID);
table.newRewrite().rewriteFiles(rewrittenFiles, addedFiles).commit();
}
Expand Down Expand Up @@ -167,7 +167,7 @@ public void testSortRewrite() throws NoSuchTableException, IOException {
Set<DataFile> rewrittenFiles =
taskSetManager.fetchTasks(table, fileSetID).stream()
.map(t -> t.asFileScanTask().file())
.collect(Collectors.toCollection(() -> Sets.newTreeSet(Comparators.dataFile())));
.collect(Collectors.toCollection(() -> Sets.newTreeSet(Comparators.contentFile())));
Set<DataFile> addedFiles = rewriteCoordinator.fetchNewFiles(table, fileSetID);
table.newRewrite().rewriteFiles(rewrittenFiles, addedFiles).commit();
}
Expand Down Expand Up @@ -249,7 +249,7 @@ public void testCommitMultipleRewrites() throws NoSuchTableException, IOExceptio
Set<DataFile> addedFiles =
fileSetIDs.stream()
.flatMap(fileSetID -> rewriteCoordinator.fetchNewFiles(table, fileSetID).stream())
.collect(Collectors.toCollection(() -> Sets.newTreeSet(Comparators.dataFile())));
.collect(Collectors.toCollection(() -> Sets.newTreeSet(Comparators.contentFile())));
table.newRewrite().rewriteFiles(rewrittenFiles, addedFiles).commit();

table.refresh();
Expand Down
Loading

0 comments on commit 70dcd1a

Please sign in to comment.