Skip to content

Commit

Permalink
Core: Optimize merging snapshot producer to use referenced manifest t…
Browse files Browse the repository at this point in the history
…o determine if a given manifest needs to be rewritten or not
  • Loading branch information
amogh-jahagirdar committed Oct 17, 2024
1 parent bbbfd1e commit 726fcfb
Show file tree
Hide file tree
Showing 4 changed files with 339 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@

import static org.apache.iceberg.types.Types.NestedField.required;

import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
Expand Down Expand Up @@ -70,14 +73,17 @@ public class ReplaceDeleteFilesBenchmark {
private static final HadoopTables TABLES = new HadoopTables();

private Table table;
private List<DeleteFile> deleteFiles;
private List<DeleteFile> deleteFilesToReplace;
private List<DeleteFile> pendingDeleteFiles;

@Param({"50000", "100000", "500000", "1000000", "2500000"})
@Param({"50000", "100000", "500000", "1000000", "2000000"})
private int numFiles;

@Param({"5", "25", "50", "100"})
private int percentDeleteFilesReplaced;

@Setup
public void setupBenchmark() {
public void setupBenchmark() throws IOException {
initTable();
initFiles();
}
Expand All @@ -90,8 +96,10 @@ public void tearDownBenchmark() {
@Benchmark
@Threads(1)
public void replaceDeleteFiles() {
Snapshot currentSnapshot = table.currentSnapshot();
RowDelta rowDelta = table.newRowDelta();
deleteFiles.forEach(rowDelta::removeDeletes);
rowDelta.validateFromSnapshot(currentSnapshot.snapshotId());
deleteFilesToReplace.forEach(rowDelta::removeDeletes);
pendingDeleteFiles.forEach(rowDelta::addDeletes);
rowDelta.commit();
}
Expand All @@ -104,27 +112,44 @@ private void dropTable() {
TABLES.dropTable(TABLE_IDENT);
}

private void initFiles() {
List<DeleteFile> generatedDeleteFiles = Lists.newArrayListWithExpectedSize(numFiles);
private void initFiles() throws IOException {
List<DeleteFile> generatedPendingDeleteFiles = Lists.newArrayListWithExpectedSize(numFiles);

int numDeleteFilesToReplace = (int) Math.ceil(numFiles * (percentDeleteFilesReplaced / 100.0));
Map<String, DeleteFile> filesToReplace =
Maps.newHashMapWithExpectedSize(numDeleteFilesToReplace);
RowDelta rowDelta = table.newRowDelta();

for (int ordinal = 0; ordinal < numFiles; ordinal++) {
DataFile dataFile = FileGenerationUtil.generateDataFile(table, null);
rowDelta.addRows(dataFile);

DeleteFile deleteFile = FileGenerationUtil.generatePositionDeleteFile(table, dataFile);
rowDelta.addDeletes(deleteFile);
generatedDeleteFiles.add(deleteFile);

DeleteFile pendingDeleteFile = FileGenerationUtil.generatePositionDeleteFile(table, dataFile);
generatedPendingDeleteFiles.add(pendingDeleteFile);
if (numDeleteFilesToReplace > 0) {
filesToReplace.put(deleteFile.location(), deleteFile);
DeleteFile pendingDeleteFile =
FileGenerationUtil.generatePositionDeleteFile(table, dataFile);
generatedPendingDeleteFiles.add(pendingDeleteFile);
numDeleteFilesToReplace--;
}
}

rowDelta.commit();

this.deleteFiles = generatedDeleteFiles;
List<DeleteFile> deleteFilesReadFromManifests = Lists.newArrayList();
for (ManifestFile deleteManifest : table.currentSnapshot().deleteManifests(table.io())) {
try (ManifestReader<DeleteFile> manifestReader =
ManifestFiles.readDeleteManifest(deleteManifest, table.io(), table.specs())) {
manifestReader
.iterator()
.forEachRemaining(
file -> {
if (filesToReplace.containsKey(file.location())) {
deleteFilesReadFromManifests.add(file);
}
});
}
}

this.pendingDeleteFiles = generatedPendingDeleteFiles;
this.deleteFilesToReplace = deleteFilesReadFromManifests;
}
}
82 changes: 54 additions & 28 deletions core/src/main/java/org/apache/iceberg/ManifestFilterManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.util.CharSequenceSet;
import org.apache.iceberg.util.ManifestFileUtil;
import org.apache.iceberg.util.Pair;
Expand Down Expand Up @@ -69,6 +70,7 @@ public String partition() {
private final Map<Integer, PartitionSpec> specsById;
private final PartitionSet deleteFilePartitions;
private final Set<F> deleteFiles = newFileSet();
private final Set<String> manifestsReferencedForDeletes = Sets.newHashSet();
private final PartitionSet dropPartitions;
private final CharSequenceSet deletePaths = CharSequenceSet.empty();
private Expression deleteExpression = Expressions.alwaysFalse();
Expand All @@ -77,6 +79,8 @@ public String partition() {
private boolean failMissingDeletePaths = false;
private int duplicateDeleteCount = 0;
private boolean caseSensitive = true;
private boolean allDeletesReferenceManifests = true;
private boolean trustReferencedManifests = false;

// cache filtered manifests to avoid extra work when commits fail.
private final Map<ManifestFile, ManifestFile> filteredManifests = Maps.newConcurrentMap();
Expand Down Expand Up @@ -121,13 +125,15 @@ protected void deleteByRowFilter(Expression expr) {
Preconditions.checkNotNull(expr, "Cannot delete files using filter: null");
invalidateFilteredCache();
this.deleteExpression = Expressions.or(deleteExpression, expr);
this.allDeletesReferenceManifests = false;
}

/** Add a partition tuple to drop from the table during the delete phase. */
protected void dropPartition(int specId, StructLike partition) {
Preconditions.checkNotNull(partition, "Cannot delete files in invalid partition: null");
invalidateFilteredCache();
dropPartitions.add(specId, partition);
this.allDeletesReferenceManifests = false;
}

/**
Expand All @@ -154,6 +160,13 @@ void caseSensitive(boolean newCaseSensitive) {
void delete(F file) {
Preconditions.checkNotNull(file, "Cannot delete file: null");
invalidateFilteredCache();

if (file.manifestLocation() == null) {
this.allDeletesReferenceManifests = false;
} else {
manifestsReferencedForDeletes.add(file.manifestLocation());
}

deleteFiles.add(file);
deleteFilePartitions.add(file.specId(), file.partition());
}
Expand All @@ -162,11 +175,13 @@ void delete(F file) {
void delete(CharSequence path) {
Preconditions.checkNotNull(path, "Cannot delete file path: null");
invalidateFilteredCache();
this.allDeletesReferenceManifests = false;
deletePaths.add(path);
}

boolean containsDeletes() {
return !deletePaths.isEmpty()
return !manifestsReferencedForDeletes.isEmpty()
|| !deletePaths.isEmpty()
|| !deleteFiles.isEmpty()
|| deleteExpression != Expressions.alwaysFalse()
|| !dropPartitions.isEmpty();
Expand All @@ -185,6 +200,12 @@ List<ManifestFile> filterManifests(Schema tableSchema, List<ManifestFile> manife
return ImmutableList.of();
}

// The current set of referenced manifests can be trusted if it is a subset of the manifests
// being filtered
Set<String> manifestLocations =
manifests.stream().map(ManifestFile::path).collect(Collectors.toSet());
trustReferencedManifests = manifestLocations.containsAll(manifestsReferencedForDeletes);

ManifestFile[] filtered = new ManifestFile[manifests.size()];
// open all of the manifest files in parallel, use index to avoid reordering
Tasks.range(filtered.length)
Expand Down Expand Up @@ -323,11 +344,15 @@ private ManifestFile filterManifest(Schema tableSchema, ManifestFile manifest) {
PartitionSpec spec = reader.spec();
PartitionAndMetricsEvaluator evaluator =
new PartitionAndMetricsEvaluator(tableSchema, spec, deleteExpression);
boolean hasDeletedFiles = manifestsReferencedForDeletes.contains(manifest.path());
if (hasDeletedFiles) {
return filterManifestWithDeletedFiles(evaluator, manifest, reader);
}

// this assumes that the manifest doesn't have files to remove and streams through the
// manifest without copying data. if a manifest does have a file to remove, this will break
// out of the loop and move on to filtering the manifest.
boolean hasDeletedFiles = manifestHasDeletedFiles(evaluator, reader);
hasDeletedFiles = manifestHasDeletedFiles(evaluator, reader);
if (!hasDeletedFiles) {
filteredManifests.put(manifest, manifest);
return manifest;
Expand All @@ -341,43 +366,44 @@ private ManifestFile filterManifest(Schema tableSchema, ManifestFile manifest) {
}

private boolean canContainDeletedFiles(ManifestFile manifest) {
boolean canContainExpressionDeletes;
return canContainDropBySeq(manifest)
|| canContainExpressionDeletes(manifest)
|| canContainDroppedPartitions(manifest)
|| canContainDroppedFiles(manifest);
}

private boolean canContainDropBySeq(ManifestFile manifest) {
return manifest.content() == ManifestContent.DELETES
&& manifest.minSequenceNumber() < minSequenceNumber;
}

private boolean canContainExpressionDeletes(ManifestFile manifest) {
if (deleteExpression != null && deleteExpression != Expressions.alwaysFalse()) {
ManifestEvaluator manifestEvaluator =
ManifestEvaluator.forRowFilter(
deleteExpression, specsById.get(manifest.partitionSpecId()), caseSensitive);
canContainExpressionDeletes = manifestEvaluator.eval(manifest);
} else {
canContainExpressionDeletes = false;
return manifestEvaluator.eval(manifest);
}

boolean canContainDroppedPartitions;
return false;
}

private boolean canContainDroppedPartitions(ManifestFile manifest) {
if (!dropPartitions.isEmpty()) {
canContainDroppedPartitions =
ManifestFileUtil.canContainAny(manifest, dropPartitions, specsById);
} else {
canContainDroppedPartitions = false;
return ManifestFileUtil.canContainAny(manifest, dropPartitions, specsById);
}

boolean canContainDroppedFiles;
if (!deletePaths.isEmpty()) {
canContainDroppedFiles = true;
} else if (!deleteFiles.isEmpty()) {
// because there were no path-only deletes, the set of deleted file partitions is valid
canContainDroppedFiles =
ManifestFileUtil.canContainAny(manifest, deleteFilePartitions, specsById);
return false;
}

private boolean canContainDroppedFiles(ManifestFile manifest) {
if (manifestsReferencedForDeletes.contains(manifest.path()) || !deletePaths.isEmpty()) {
return true;
} else if (allDeletesReferenceManifests && trustReferencedManifests) {
return false;
} else {
canContainDroppedFiles = false;
return ManifestFileUtil.canContainAny(manifest, deleteFilePartitions, specsById);
}

boolean canContainDropBySeq =
manifest.content() == ManifestContent.DELETES
&& manifest.minSequenceNumber() < minSequenceNumber;

return canContainExpressionDeletes
|| canContainDroppedPartitions
|| canContainDroppedFiles
|| canContainDropBySeq;
}

@SuppressWarnings({"CollectionUndefinedEquality", "checkstyle:CyclomaticComplexity"})
Expand Down
Loading

0 comments on commit 726fcfb

Please sign in to comment.