Skip to content

Commit

Permalink
Core: Add Data/Delete file comparators
Browse files Browse the repository at this point in the history
  • Loading branch information
nastra committed Sep 19, 2024
1 parent 40ffcb9 commit ff125a1
Show file tree
Hide file tree
Showing 13 changed files with 153 additions and 29 deletions.
32 changes: 32 additions & 0 deletions api/src/main/java/org/apache/iceberg/types/Comparators.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import java.util.Comparator;
import java.util.List;
import java.util.function.IntFunction;
import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.StructLike;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.util.UnicodeUtil;
Expand Down Expand Up @@ -179,6 +181,14 @@ public static Comparator<CharSequence> filePath() {
return FilePathComparator.INSTANCE;
}

public static Comparator<DataFile> dataFile() {
return DataFileComparator.INSTANCE;
}

public static Comparator<DeleteFile> deleteFile() {
return DeleteFileComparator.INSTANCE;
}

private static class NullsFirst<T> implements Comparator<T> {
private static final NullsFirst<?> INSTANCE = new NullsFirst<>();

Expand Down Expand Up @@ -394,4 +404,26 @@ public int compare(CharSequence s1, CharSequence s2) {
return 0;
}
}

private static class DataFileComparator implements Comparator<DataFile> {
private static final DataFileComparator INSTANCE = new DataFileComparator();

private DataFileComparator() {}

@Override
public int compare(DataFile s1, DataFile s2) {
return CharSeqComparator.INSTANCE.compare(s1.path(), s2.path());
}
}

private static class DeleteFileComparator implements Comparator<DeleteFile> {
private static final DeleteFileComparator INSTANCE = new DeleteFileComparator();

private DeleteFileComparator() {}

@Override
public int compare(DeleteFile s1, DeleteFile s2) {
return CharSeqComparator.INSTANCE.compare(s1.path(), s2.path());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@
import org.apache.iceberg.expressions.StrictMetricsEvaluator;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Comparators;

public class BaseOverwriteFiles extends MergingSnapshotProducer<OverwriteFiles>
implements OverwriteFiles {
private final Set<DataFile> deletedDataFiles = Sets.newHashSet();
private final Set<DataFile> deletedDataFiles = Sets.newTreeSet(Comparators.dataFile());
private boolean validateAddedFilesMatchOverwriteFilter = false;
private Long startingSnapshotId = null;
private Expression conflictDetectionFilter = null;
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/org/apache/iceberg/BaseRewriteFiles.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Comparators;

class BaseRewriteFiles extends MergingSnapshotProducer<RewriteFiles> implements RewriteFiles {
private final Set<DataFile> replacedDataFiles = Sets.newHashSet();
private final Set<DataFile> replacedDataFiles = Sets.newTreeSet(Comparators.dataFile());
private Long startingSnapshotId = null;

BaseRewriteFiles(String tableName, TableOperations ops) {
Expand Down
7 changes: 4 additions & 3 deletions core/src/main/java/org/apache/iceberg/FastAppend.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Comparators;

/**
* {@link AppendFiles Append} implementation that adds a new manifest file for the write.
Expand All @@ -44,7 +45,7 @@ class FastAppend extends SnapshotProducer<AppendFiles> implements AppendFiles {
private final PartitionSpec spec;
private final SnapshotSummary.Builder summaryBuilder = SnapshotSummary.builder();
private final List<DataFile> newFiles = Lists.newArrayList();
private final CharSequenceSet newFilePaths = CharSequenceSet.empty();
private final Set<DataFile> newFileSet = Sets.newTreeSet(Comparators.dataFile());
private final List<ManifestFile> appendManifests = Lists.newArrayList();
private final List<ManifestFile> rewrittenAppendManifests = Lists.newArrayList();
private List<ManifestFile> newManifests = null;
Expand Down Expand Up @@ -86,7 +87,7 @@ protected Map<String, String> summary() {
@Override
public FastAppend appendFile(DataFile file) {
Preconditions.checkNotNull(file, "Invalid data file: null");
if (newFilePaths.add(file.path())) {
if (newFileSet.add(file)) {
this.hasNewFiles = true;
newFiles.add(file);
summaryBuilder.addedFile(spec, file);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.Pair;
import org.apache.iceberg.util.PartitionSet;
Expand Down Expand Up @@ -81,8 +82,8 @@ abstract class MergingSnapshotProducer<ThisT> extends SnapshotProducer<ThisT> {

// update data
private final Map<PartitionSpec, List<DataFile>> newDataFilesBySpec = Maps.newHashMap();
private final CharSequenceSet newDataFilePaths = CharSequenceSet.empty();
private final CharSequenceSet newDeleteFilePaths = CharSequenceSet.empty();
private final Set<DataFile> newDataFiles = Sets.newTreeSet(Comparators.dataFile());
private final Set<DeleteFile> newDeleteFiles = Sets.newTreeSet(Comparators.deleteFile());
private Long newDataFilesDataSequenceNumber;
private final Map<Integer, List<DeleteFileHolder>> newDeleteFilesBySpec = Maps.newHashMap();
private final List<ManifestFile> appendManifests = Lists.newArrayList();
Expand Down Expand Up @@ -234,7 +235,7 @@ protected boolean addsDeleteFiles() {
/** Add a data file to the new snapshot. */
protected void add(DataFile file) {
Preconditions.checkNotNull(file, "Invalid data file: null");
if (newDataFilePaths.add(file.path())) {
if (newDataFiles.add(file)) {
PartitionSpec fileSpec = ops.current().spec(file.specId());
Preconditions.checkArgument(
fileSpec != null,
Expand All @@ -244,9 +245,9 @@ protected void add(DataFile file) {

addedFilesSummary.addedFile(fileSpec, file);
hasNewDataFiles = true;
List<DataFile> newDataFiles =
List<DataFile> dataFiles =
newDataFilesBySpec.computeIfAbsent(fileSpec, ignored -> Lists.newArrayList());
newDataFiles.add(file);
dataFiles.add(file);
}
}

Expand All @@ -268,7 +269,7 @@ private void add(DeleteFileHolder fileHolder) {
List<DeleteFileHolder> deleteFiles =
newDeleteFilesBySpec.computeIfAbsent(specId, s -> Lists.newArrayList());

if (newDeleteFilePaths.add(fileHolder.deleteFile().path())) {
if (newDeleteFiles.add(fileHolder.deleteFile())) {
deleteFiles.add(fileHolder);
addedFilesSummary.addedFile(fileSpec, fileHolder.deleteFile());
hasNewDeleteFiles = true;
Expand Down Expand Up @@ -970,9 +971,9 @@ private List<ManifestFile> newDataFilesAsManifests() {

if (cachedNewDataManifests.isEmpty()) {
newDataFilesBySpec.forEach(
(dataSpec, newDataFiles) -> {
(dataSpec, dataFiles) -> {
List<ManifestFile> newDataManifests =
writeDataManifests(newDataFiles, newDataFilesDataSequenceNumber, dataSpec);
writeDataManifests(dataFiles, newDataFilesDataSequenceNumber, dataSpec);
cachedNewDataManifests.addAll(newDataManifests);
});
this.hasNewDataFiles = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Comparators;
import org.apache.iceberg.util.Tasks;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -73,8 +74,8 @@ public RewriteDataFilesCommitManager(
* @param fileGroups fileSets to commit
*/
public void commitFileGroups(Set<RewriteFileGroup> fileGroups) {
Set<DataFile> rewrittenDataFiles = Sets.newHashSet();
Set<DataFile> addedDataFiles = Sets.newHashSet();
Set<DataFile> rewrittenDataFiles = Sets.newTreeSet(Comparators.dataFile());
Set<DataFile> addedDataFiles = Sets.newTreeSet(Comparators.dataFile());
for (RewriteFileGroup group : fileGroups) {
rewrittenDataFiles.addAll(group.rewrittenFiles());
addedDataFiles.addAll(group.addedFiles());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.iceberg.actions;

import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
Expand All @@ -29,6 +28,8 @@
import org.apache.iceberg.actions.RewriteDataFiles.FileGroupInfo;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Comparators;

/**
* Container class representing a set of files to be rewritten by a RewriteAction and the new files
Expand All @@ -38,7 +39,7 @@ public class RewriteFileGroup {
private final FileGroupInfo info;
private final List<FileScanTask> fileScanTasks;

private Set<DataFile> addedFiles = Collections.emptySet();
private Set<DataFile> addedFiles = Sets.newTreeSet(Comparators.dataFile());

public RewriteFileGroup(FileGroupInfo info, List<FileScanTask> fileScanTasks) {
this.info = info;
Expand All @@ -58,7 +59,9 @@ public void setOutputFiles(Set<DataFile> files) {
}

public Set<DataFile> rewrittenFiles() {
return fileScans().stream().map(FileScanTask::file).collect(Collectors.toSet());
return fileScans().stream()
.map(FileScanTask::file)
.collect(Collectors.toCollection(() -> Sets.newTreeSet(Comparators.dataFile())));
}

public Set<DataFile> addedFiles() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
*/
package org.apache.iceberg.actions;

import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Set;
Expand All @@ -30,6 +29,8 @@
import org.apache.iceberg.actions.RewritePositionDeleteFiles.FileGroupRewriteResult;
import org.apache.iceberg.relocated.com.google.common.base.MoreObjects;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Comparators;

/**
* Container class representing a set of position delete files to be rewritten by a {@link
Expand All @@ -40,7 +41,7 @@ public class RewritePositionDeletesGroup {
private final List<PositionDeletesScanTask> tasks;
private final long maxRewrittenDataSequenceNumber;

private Set<DeleteFile> addedDeleteFiles = Collections.emptySet();
private Set<DeleteFile> addedDeleteFiles = Sets.newTreeSet(Comparators.deleteFile());

public RewritePositionDeletesGroup(FileGroupInfo info, List<PositionDeletesScanTask> tasks) {
Preconditions.checkArgument(!tasks.isEmpty(), "Tasks must not be empty");
Expand All @@ -67,7 +68,9 @@ public long maxRewrittenDataSequenceNumber() {
}

public Set<DeleteFile> rewrittenDeleteFiles() {
return tasks().stream().map(PositionDeletesScanTask::file).collect(Collectors.toSet());
return tasks().stream()
.map(PositionDeletesScanTask::file)
.collect(Collectors.toCollection(() -> Sets.newTreeSet(Comparators.deleteFile())));
}

public Set<DeleteFile> addedDeleteFiles() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import static org.assertj.core.api.Assertions.assertThat;

import org.apache.iceberg.types.Comparators;
import org.junit.jupiter.api.Test;

/**
* Testing {@link Comparators#dataFile()} / {@link Comparators#deleteFile()} is easier in
* iceberg-core since the data/delete file builders are located here
*/
public class TestContentFileComparator extends TestBase {

@Test
public void compareDataFiles() {
// path is same as FILE_A
DataFile dataFile =
DataFiles.builder(SPEC)
.withPath(FILE_A.path().toString())
.withFileSizeInBytes(100)
.withRecordCount(100)
.build();
assertThat(Comparators.dataFile().compare(FILE_A, dataFile)).isEqualTo(0);
assertThat(Comparators.dataFile().compare(FILE_A, FILE_A)).isEqualTo(0);

assertThat(Comparators.dataFile().compare(FILE_A, FILE_B)).isLessThan(0);
assertThat(Comparators.dataFile().compare(FILE_B, FILE_A)).isGreaterThan(0);
}

@Test
public void compareDeleteFiles() {
// same path as FILE_A_DELETES
DeleteFile deleteFile =
FileMetadata.deleteFileBuilder(SPEC)
.ofPositionDeletes()
.withPath(FILE_A_DELETES.path().toString())
.withFileSizeInBytes(100)
.withRecordCount(100)
.build();
assertThat(Comparators.deleteFile().compare(FILE_A_DELETES, deleteFile)).isEqualTo(0);
assertThat(Comparators.deleteFile().compare(FILE_A_DELETES, FILE_A_DELETES)).isEqualTo(0);

assertThat(Comparators.deleteFile().compare(FILE_A_DELETES, FILE_B_DELETES)).isLessThan(0);
assertThat(Comparators.deleteFile().compare(FILE_B_DELETES, FILE_A_DELETES)).isGreaterThan(0);

assertThat(Comparators.deleteFile().compare(FILE_A_DELETES, FILE_A2_DELETES)).isLessThan(0);
assertThat(Comparators.deleteFile().compare(FILE_A2_DELETES, FILE_A_DELETES)).isGreaterThan(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.MetadataColumns;
Expand All @@ -36,11 +37,12 @@
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.PositionDeletesRewriteCoordinator;
import org.apache.iceberg.spark.ScanTaskSetManager;
import org.apache.iceberg.spark.SparkWriteConf;
import org.apache.iceberg.types.Comparators;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.SparkSession;
Expand Down Expand Up @@ -148,7 +150,11 @@ public boolean useCommitCoordinator() {
@Override
public void commit(WriterCommitMessage[] messages) {
PositionDeletesRewriteCoordinator coordinator = PositionDeletesRewriteCoordinator.get();
coordinator.stageRewrite(table, fileSetId, ImmutableSet.copyOf(files(messages)));
coordinator.stageRewrite(
table,
fileSetId,
files(messages).stream()
.collect(Collectors.toCollection(() -> Sets.newTreeSet(Comparators.deleteFile()))));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,13 @@
import org.apache.iceberg.io.PartitioningWriter;
import org.apache.iceberg.io.RollingDataWriter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.spark.CommitMetadata;
import org.apache.iceberg.spark.FileRewriteCoordinator;
import org.apache.iceberg.spark.SparkWriteConf;
import org.apache.iceberg.spark.SparkWriteRequirements;
import org.apache.iceberg.types.Comparators;
import org.apache.spark.TaskContext;
import org.apache.spark.TaskContext$;
import org.apache.spark.api.java.JavaSparkContext;
Expand Down Expand Up @@ -491,7 +492,11 @@ private RewriteFiles(String fileSetID) {
@Override
public void commit(WriterCommitMessage[] messages) {
FileRewriteCoordinator coordinator = FileRewriteCoordinator.get();
coordinator.stageRewrite(table, fileSetID, ImmutableSet.copyOf(files(messages)));
coordinator.stageRewrite(
table,
fileSetID,
files(messages).stream()
.collect(Collectors.toCollection(() -> Sets.newTreeSet(Comparators.dataFile()))));
}
}

Expand Down
Loading

0 comments on commit ff125a1

Please sign in to comment.