Skip to content

Commit

Permalink
Fix incorrect results when rewriting all rows in Delta Lake
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Sep 18, 2024
1 parent ed4aaef commit 4c0e301
Show file tree
Hide file tree
Showing 10 changed files with 56 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package io.trino.plugin.deltalake;

import io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
Expand All @@ -24,6 +26,7 @@
public record DeltaLakeMergeResult(
List<String> partitionValues,
Optional<String> oldFile,
Optional<DeletionVectorEntry> oldDeletionVector,
Optional<DataFileInfo> newFile)
{
public DeltaLakeMergeResult
Expand All @@ -32,7 +35,9 @@ public record DeltaLakeMergeResult(
// noinspection Java9CollectionFactory
partitionValues = unmodifiableList(new ArrayList<>(requireNonNull(partitionValues, "partitionValues is null")));
requireNonNull(oldFile, "oldFile is null");
requireNonNull(oldDeletionVector, "oldDeletionVector is null");
requireNonNull(newFile, "newFile is null");
checkArgument(oldFile.isPresent() || newFile.isPresent(), "old or new must be present");
checkArgument(oldDeletionVector.isEmpty() || oldFile.isPresent(), "oldDeletionVector is present only when oldFile is present");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,7 @@ public CompletableFuture<Collection<Slice>> finish()
insertPageSink.finish().join().stream()
.map(Slice::getBytes)
.map(dataFileInfoCodec::fromJson)
.map(info -> new DeltaLakeMergeResult(info.partitionValues(), Optional.empty(), Optional.of(info)))
.map(info -> new DeltaLakeMergeResult(info.partitionValues(), Optional.empty(), Optional.empty(), Optional.of(info)))
.map(mergeResultJsonCodec::toJsonBytes)
.map(Slices::wrappedBuffer)
.forEach(fragments::add);
Expand All @@ -345,7 +345,7 @@ public CompletableFuture<Collection<Slice>> finish()
MoreFutures.getDone(cdfPageSink.finish()).stream()
.map(Slice::getBytes)
.map(dataFileInfoCodec::fromJson)
.map(info -> new DeltaLakeMergeResult(info.partitionValues(), Optional.empty(), Optional.of(info)))
.map(info -> new DeltaLakeMergeResult(info.partitionValues(), Optional.empty(), Optional.empty(), Optional.of(info)))
.map(mergeResultJsonCodec::toJsonBytes)
.map(Slices::wrappedBuffer)
.forEach(fragments::add);
Expand Down Expand Up @@ -407,7 +407,7 @@ private Slice writeDeletionVector(
deletion.partitionValues,
readStatistics(parquetMetadata, dataColumns, rowCount),
Optional.of(deletionVectorEntry));
DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues, Optional.of(sourceRelativePath), Optional.of(newFileInfo));
DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues, Optional.of(sourceRelativePath), Optional.empty(), Optional.of(newFileInfo));
return utf8Slice(mergeResultJsonCodec.toJson(result));
}
catch (Throwable e) {
Expand All @@ -426,7 +426,8 @@ private Slice writeDeletionVector(
private Slice onlySourceFile(String sourcePath, FileDeletion deletion)
{
String sourceRelativePath = relativePath(rootTableLocation.toString(), sourcePath);
DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues(), Optional.of(sourceRelativePath), Optional.empty());
DeletionVectorEntry deletionVector = deletionVectors.get(sourceRelativePath);
DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues(), Optional.of(sourceRelativePath), Optional.ofNullable(deletionVector), Optional.empty());
return utf8Slice(mergeResultJsonCodec.toJson(result));
}

Expand All @@ -453,7 +454,7 @@ private List<Slice> rewriteFile(String sourcePath, FileDeletion deletion)

Optional<DataFileInfo> newFileInfo = rewriteParquetFile(sourceLocation, deletion, writer);

DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues(), Optional.of(sourceRelativePath), newFileInfo);
DeltaLakeMergeResult result = new DeltaLakeMergeResult(deletion.partitionValues(), Optional.of(sourceRelativePath), Optional.empty(), newFileInfo);
return ImmutableList.of(utf8Slice(mergeResultJsonCodec.toJson(result)));
}
catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1213,7 +1213,7 @@ public void createTable(ConnectorSession session, ConnectorTableMetadata tableMe
while (addFileEntryIterator.hasNext()) {
long writeTimestamp = Instant.now().toEpochMilli();
AddFileEntry addFileEntry = addFileEntryIterator.next();
transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry(addFileEntry.getPath(), addFileEntry.getPartitionValues(), writeTimestamp, true));
transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry(addFileEntry.getPath(), addFileEntry.getPartitionValues(), writeTimestamp, true, Optional.empty()));
}
}
protocolEntry = protocolEntryForTable(tableHandle.getProtocolEntry(), containsTimestampType, tableMetadata.getProperties());
Expand Down Expand Up @@ -1610,7 +1610,7 @@ public Optional<ConnectorOutputMetadata> finishCreateTable(
Iterator<AddFileEntry> addFileEntryIterator = activeFiles.iterator();
while (addFileEntryIterator.hasNext()) {
AddFileEntry addFileEntry = addFileEntryIterator.next();
transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry(addFileEntry.getPath(), addFileEntry.getPartitionValues(), writeTimestamp, true));
transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry(addFileEntry.getPath(), addFileEntry.getPartitionValues(), writeTimestamp, true, Optional.empty()));
}
}
}
Expand Down Expand Up @@ -2564,7 +2564,7 @@ private long commitMergeOperation(
if (mergeResult.oldFile().isEmpty()) {
continue;
}
transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry(toUriFormat(mergeResult.oldFile().get()), createPartitionValuesMap(partitionColumns, mergeResult.partitionValues()), writeTimestamp, true));
transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry(toUriFormat(mergeResult.oldFile().get()), createPartitionValuesMap(partitionColumns, mergeResult.partitionValues()), writeTimestamp, true, mergeResult.oldDeletionVector()));
}

appendAddFileEntries(transactionLogWriter, newFiles, partitionColumns, getExactColumnNames(handle.getMetadataEntry()), true);
Expand Down Expand Up @@ -2767,7 +2767,8 @@ private void finishOptimize(ConnectorSession session, DeltaLakeTableExecuteHandl
toUriFormat(relativePath),
createPartitionValuesMap(canonicalPartitionValues),
writeTimestamp,
false));
false,
Optional.empty()));
}

// Note: during writes we want to preserve original case of partition columns
Expand Down Expand Up @@ -4160,7 +4161,7 @@ private CommitDeleteOperationResult commitDeleteOperation(
Iterator<AddFileEntry> addFileEntryIterator = activeFiles.iterator();
while (addFileEntryIterator.hasNext()) {
AddFileEntry addFileEntry = addFileEntryIterator.next();
transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry(addFileEntry.getPath(), addFileEntry.getPartitionValues(), writeTimestamp, true));
transactionLogWriter.appendRemoveFileEntry(new RemoveFileEntry(addFileEntry.getPath(), addFileEntry.getPartitionValues(), writeTimestamp, true, Optional.empty()));

Optional<Long> fileRecords = addFileEntry.getStats().flatMap(DeltaLakeFileStatistics::getNumRecords);
allDeletedFilesStatsPresent &= fileRecords.isPresent();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,20 @@
import jakarta.annotation.Nullable;

import java.util.Map;
import java.util.Optional;

import static java.util.Objects.requireNonNull;

public record RemoveFileEntry(
String path,
@Nullable Map<String, String> partitionValues,
long deletionTimestamp,
boolean dataChange)
boolean dataChange,
Optional<DeletionVectorEntry> deletionVector)
{
public RemoveFileEntry
{
requireNonNull(path, "path is null");
requireNonNull(deletionVector, "deletionVector is null");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ public String getColumnName()
private final Optional<RowType> addDeletionVectorType;
private final Optional<RowType> addParsedStatsFieldType;
private final Optional<RowType> removeType;
private final Optional<RowType> removeDeletionVectorType;
private final Optional<RowType> metadataType;
private final Optional<RowType> protocolType;
private final Optional<RowType> commitType;
Expand Down Expand Up @@ -246,6 +247,7 @@ public CheckpointEntryIterator(
addDeletionVectorType = addType.flatMap(type -> getOptionalFieldType(type, "deletionVector"));
addParsedStatsFieldType = addType.flatMap(type -> getOptionalFieldType(type, "stats_parsed"));
removeType = getParquetType(fields, REMOVE);
removeDeletionVectorType = removeType.flatMap(type -> getOptionalFieldType(type, "deletionVector"));
metadataType = getParquetType(fields, METADATA);
protocolType = getParquetType(fields, PROTOCOL);
commitType = getParquetType(fields, COMMIT);
Expand Down Expand Up @@ -537,11 +539,17 @@ private DeltaLakeTransactionLogEntry buildRemoveEntry(ConnectorSession session,
format("Expected block %s to have %d children, but found %s", block, removeFields, removeEntryRow.getFieldCount()));
}
CheckpointFieldReader remove = new CheckpointFieldReader(session, removeEntryRow, type);
Optional<DeletionVectorEntry> deletionVector = Optional.empty();
if (deletionVectorsEnabled) {
deletionVector = Optional.ofNullable(remove.getRow("deletionVector"))
.map(row -> parseDeletionVectorFromParquet(session, row, removeDeletionVectorType.orElseThrow()));
}
RemoveFileEntry result = new RemoveFileEntry(
remove.getString("path"),
remove.getMap(stringMap, "partitionValues"),
remove.getLong("deletionTimestamp"),
remove.getBoolean("dataChange"));
remove.getBoolean("dataChange"),
deletionVector);
log.debug("Result: %s", result);
return DeltaLakeTransactionLogEntry.removeFileEntry(result);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import io.trino.parquet.metadata.ParquetMetadata;
import io.trino.parquet.reader.MetadataReader;
import io.trino.plugin.deltalake.transactionlog.AddFileEntry;
import io.trino.plugin.deltalake.transactionlog.DeletionVectorEntry;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeTransactionLogEntry;
import io.trino.plugin.deltalake.transactionlog.MetadataEntry;
import io.trino.plugin.deltalake.transactionlog.ProtocolEntry;
Expand Down Expand Up @@ -1163,8 +1164,15 @@ public void testDeletionVectorsAllRows()
copyDirectoryContents(new File(Resources.getResource("databricks122/deletion_vectors").toURI()).toPath(), tableLocation);
assertUpdate("CALL system.register_table('%s', '%s', '%s')".formatted(getSession().getSchema().orElseThrow(), tableName, tableLocation.toUri()));

assertUpdate("DELETE FROM " + tableName + " WHERE a != 999", 1);

// 'remove' entry should have the same deletion vector as the previous operation when deleting all rows
DeletionVectorEntry deletionVector = getEntriesFromJson(2, tableLocation + "/_delta_log", FILE_SYSTEM).orElseThrow().get(2).getAdd().getDeletionVector().orElseThrow();
assertThat(getEntriesFromJson(3, tableLocation + "/_delta_log", FILE_SYSTEM).orElseThrow().get(1).getRemove().deletionVector().orElseThrow())
.isEqualTo(deletionVector);

assertUpdate("INSERT INTO " + tableName + " VALUES (3, 31), (3, 32)", 2);
assertUpdate("DELETE FROM " + tableName + " WHERE a != 999", 3);
assertUpdate("DELETE FROM " + tableName + " WHERE a != 999", 2);
assertQueryReturnsEmptyResult("SELECT * FROM " + tableName);

assertUpdate("INSERT INTO " + tableName + " VALUES (1, 10), (2, 20)", 2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,12 +105,12 @@ public class TestTransactionLogAccess
"age=29/part-00000-3794c463-cb0c-4beb-8d07-7cc1e3b5920f.c000.snappy.parquet");

private static final Set<RemoveFileEntry> EXPECTED_REMOVE_ENTRIES = ImmutableSet.of(
new RemoveFileEntry("age=30/part-00000-7e43a3c3-ea26-4ae7-8eac-8f60cbb4df03.c000.snappy.parquet", null, 1579190163932L, false),
new RemoveFileEntry("age=30/part-00000-72a56c23-01ba-483a-9062-dd0accc86599.c000.snappy.parquet", null, 1579190163932L, false),
new RemoveFileEntry("age=42/part-00000-951068bd-bcf4-4094-bb94-536f3c41d31f.c000.snappy.parquet", null, 1579190155406L, false),
new RemoveFileEntry("age=25/part-00000-609e34b1-5466-4dbc-a780-2708166e7adb.c000.snappy.parquet", null, 1579190163932L, false),
new RemoveFileEntry("age=42/part-00000-6aed618a-2beb-4edd-8466-653e67a9b380.c000.snappy.parquet", null, 1579190155406L, false),
new RemoveFileEntry("age=42/part-00000-b82d8859-84a0-4f05-872c-206b07dd54f0.c000.snappy.parquet", null, 1579190163932L, false));
new RemoveFileEntry("age=30/part-00000-7e43a3c3-ea26-4ae7-8eac-8f60cbb4df03.c000.snappy.parquet", null, 1579190163932L, false, Optional.empty()),
new RemoveFileEntry("age=30/part-00000-72a56c23-01ba-483a-9062-dd0accc86599.c000.snappy.parquet", null, 1579190163932L, false, Optional.empty()),
new RemoveFileEntry("age=42/part-00000-951068bd-bcf4-4094-bb94-536f3c41d31f.c000.snappy.parquet", null, 1579190155406L, false, Optional.empty()),
new RemoveFileEntry("age=25/part-00000-609e34b1-5466-4dbc-a780-2708166e7adb.c000.snappy.parquet", null, 1579190163932L, false, Optional.empty()),
new RemoveFileEntry("age=42/part-00000-6aed618a-2beb-4edd-8466-653e67a9b380.c000.snappy.parquet", null, 1579190155406L, false, Optional.empty()),
new RemoveFileEntry("age=42/part-00000-b82d8859-84a0-4f05-872c-206b07dd54f0.c000.snappy.parquet", null, 1579190163932L, false, Optional.empty()));

private final TestingTelemetry testingTelemetry = TestingTelemetry.create("transaction-log-access");
private final TracingFileSystemFactory tracingFileSystemFactory = new TracingFileSystemFactory(testingTelemetry.getTracer(), new HdfsFileSystemFactory(HDFS_ENVIRONMENT, HDFS_FILE_SYSTEM_STATS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,11 @@ public void testCheckpointBuilder()
builder.addLogEntry(transactionEntry(app2TransactionV5));

AddFileEntry addA1 = new AddFileEntry("a", Map.of(), 1, 1, true, Optional.empty(), Optional.empty(), Map.of(), Optional.empty());
RemoveFileEntry removeA1 = new RemoveFileEntry("a", Map.of(), 1, true);
RemoveFileEntry removeA1 = new RemoveFileEntry("a", Map.of(), 1, true, Optional.empty());
AddFileEntry addA2 = new AddFileEntry("a", Map.of(), 2, 1, true, Optional.empty(), Optional.empty(), Map.of(), Optional.empty());
AddFileEntry addB = new AddFileEntry("b", Map.of(), 1, 1, true, Optional.empty(), Optional.empty(), Map.of(), Optional.empty());
RemoveFileEntry removeB = new RemoveFileEntry("b", Map.of(), 1, true);
RemoveFileEntry removeC = new RemoveFileEntry("c", Map.of(), 1, true);
RemoveFileEntry removeB = new RemoveFileEntry("b", Map.of(), 1, true, Optional.empty());
RemoveFileEntry removeC = new RemoveFileEntry("c", Map.of(), 1, true, Optional.empty());
builder.addLogEntry(addFileEntry(addA1));
builder.addLogEntry(removeFileEntry(removeA1));
builder.addLogEntry(addFileEntry(addA2));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -612,7 +612,8 @@ public void testReadAllEntries()
// partitionValues information is missing in the checkpoint
null,
1579190155406L,
false));
false,
Optional.empty()));

// CommitInfoEntry
// not found in the checkpoint, TODO add a test
Expand Down Expand Up @@ -925,7 +926,8 @@ public void testSkipRemoveEntries()
UUID.randomUUID().toString(),
ImmutableMap.of("part_key", "2023-01-01 00:00:00"),
1000,
true))
true,
Optional.empty()))
.collect(toImmutableSet());

CheckpointEntries entries = new CheckpointEntries(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ public void testCheckpointWriteReadJsonRoundtrip()
"removeFilePath",
ImmutableMap.of("part_key", "7.0"),
1000,
true);
true,
Optional.empty());

CheckpointEntries entries = new CheckpointEntries(
metadataEntry,
Expand Down Expand Up @@ -325,7 +326,8 @@ public void testCheckpointWriteReadParquetStatisticsRoundtrip()
"removeFilePath",
ImmutableMap.of("part_key", "7.0"),
1000,
true);
true,
Optional.empty());

CheckpointEntries entries = new CheckpointEntries(
metadataEntry,
Expand Down

0 comments on commit 4c0e301

Please sign in to comment.