Skip to content

Commit

Permalink
Merge branch 'master' into auto-compaction-bugfix-021925
Browse files Browse the repository at this point in the history
  • Loading branch information
mwc360 authored Feb 24, 2025
2 parents a1343ae + e628ff9 commit ceda681
Show file tree
Hide file tree
Showing 39 changed files with 2,775 additions and 201 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.sql.delta.schema.SchemaUtils
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.commons.lang3.exception.ExceptionUtils
import org.apache.hadoop.conf.Configuration
import shadedForDelta.org.apache.iceberg.{AppendFiles, DeleteFiles, OverwriteFiles, PendingUpdate, RewriteFiles, Transaction => IcebergTransaction}
import shadedForDelta.org.apache.iceberg.{AppendFiles, DeleteFiles, OverwriteFiles, PartitionSpec, PendingUpdate, RewriteFiles, Transaction => IcebergTransaction}
import shadedForDelta.org.apache.iceberg.ExpireSnapshots
import shadedForDelta.org.apache.iceberg.mapping.MappingUtil
import shadedForDelta.org.apache.iceberg.mapping.NameMappingParser
Expand Down Expand Up @@ -64,7 +64,8 @@ class IcebergConversionTransaction(
protected val postCommitSnapshot: Snapshot,
protected val tableOp: IcebergTableOp = WRITE_TABLE,
protected val lastConvertedIcebergSnapshotId: Option[Long] = None,
protected val lastConvertedDeltaVersion: Option[Long] = None) extends DeltaLogging {
protected val lastConvertedDeltaVersion: Option[Long] = None
) extends DeltaLogging {

///////////////////////////
// Nested Helper Classes //
Expand Down Expand Up @@ -100,7 +101,7 @@ class IcebergConversionTransaction(
convertDeltaAddFileToIcebergDataFile(
add,
tablePath,
partitionSpec,
currentPartitionSpec,
logicalToPhysicalPartitionNames,
statsParser,
postCommitSnapshot
Expand Down Expand Up @@ -141,7 +142,7 @@ class IcebergConversionTransaction(
convertDeltaAddFileToIcebergDataFile(
add,
tablePath,
partitionSpec,
currentPartitionSpec,
logicalToPhysicalPartitionNames,
statsParser,
postCommitSnapshot
Expand All @@ -154,7 +155,7 @@ class IcebergConversionTransaction(
convertDeltaRemoveFileToIcebergDataFile(
remove,
tablePath,
partitionSpec,
currentPartitionSpec,
logicalToPhysicalPartitionNames,
postCommitSnapshot)
)
Expand All @@ -178,7 +179,7 @@ class IcebergConversionTransaction(
convertDeltaRemoveFileToIcebergDataFile(
f,
tablePath,
partitionSpec,
currentPartitionSpec,
logicalToPhysicalPartitionNames,
postCommitSnapshot)
}.toSet.asJava
Expand All @@ -188,7 +189,7 @@ class IcebergConversionTransaction(
convertDeltaAddFileToIcebergDataFile(
f,
tablePath,
partitionSpec,
currentPartitionSpec,
logicalToPhysicalPartitionNames,
statsParser,
postCommitSnapshot
Expand All @@ -212,8 +213,15 @@ class IcebergConversionTransaction(
protected val tablePath = postCommitSnapshot.deltaLog.dataPath
protected val icebergSchema =
convertDeltaSchemaToIcebergSchema(postCommitSnapshot.metadata.schema)
// Initial partition spec converted from Delta
protected val partitionSpec =
createPartitionSpec(icebergSchema, postCommitSnapshot.metadata.partitionColumns)

// Current partition spec from iceberg table
def currentPartitionSpec: PartitionSpec = {
Some(txn.table()).map(_.spec()).getOrElse(partitionSpec)
}

private val logicalToPhysicalPartitionNames =
getPartitionPhysicalNameMapping(postCommitSnapshot.metadata.partitionSchema)

Expand Down
9 changes: 9 additions & 0 deletions kernel/kernel-api/src/main/java/io/delta/kernel/Snapshot.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import io.delta.kernel.engine.Engine;
import io.delta.kernel.types.StructType;
import java.util.List;
import java.util.Optional;

/**
* Represents the snapshot of a Delta table.
Expand Down Expand Up @@ -61,6 +62,14 @@ public interface Snapshot {
*/
StructType getSchema();

/**
* Returns the configuration for the provided {@code domain} if it exists in the snapshot. Returns
* empty if the {@code domain} is not present in the snapshot.
*
* @return the configuration for the provided domain if it exists
*/
Optional<String> getDomainMetadata(String domain);

/**
* Create a scan builder to construct a {@link Scan} to read data from this snapshot.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.delta.kernel.annotation.Evolving;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.ConcurrentTransactionException;
import io.delta.kernel.exceptions.DomainDoesNotExistException;
import io.delta.kernel.exceptions.InvalidConfigurationValueException;
import io.delta.kernel.exceptions.UnknownConfigurationException;
import io.delta.kernel.internal.TableConfig;
Expand Down Expand Up @@ -91,6 +92,40 @@ TransactionBuilder withTransactionId(
*/
TransactionBuilder withMaxRetries(int maxRetries);

/**
* Commit the provided domain metadata as part of this transaction. If this is called more than
* once with the same {@code domain} the latest provided {@code config} will be committed in the
* transaction. Only user-controlled domains are allowed (aka. domains with a `delta.` prefix are
* not allowed). Adding and removing a domain with the same identifier in the same txn is not
* allowed.
*
* <p>See the Delta protocol for more information on how to use domain metadata <a
* href="https://github.com/delta-io/delta/blob/master/PROTOCOL.md#domain-metadata">Domain
* Metadata</a>.
*
* <p>Please note using this API will automatically upgrade the protocol of the table to support
* Domain Metadata if it is not already supported. See <a
* href="https://docs.delta.io/latest/versioning.html#how-does-delta-lake-manage-feature-compatibility">
* How does Delta Lake manage feature compatibility?</a> for more details. This may break existing
* writers that do not support the Domain Metadata feature; readers will be unaffected.
*
* @param domain the domain identifier
* @param config configuration string for this domain
* @return updated {@link TransactionBuilder} instance
*/
TransactionBuilder withDomainMetadata(String domain, String config);

/**
* Mark the domain metadata with identifier {@code domain} as removed in this transaction. If this
* domain does not exist in the latest version of the table will throw a {@link
* DomainDoesNotExistException} upon calling {@link TransactionBuilder#build(Engine)}. Adding and
* removing a domain with the same identifier in one txn is not allowed.
*
* @param domain the domain identifier for the domain to remove
* @return updated {@link TransactionBuilder} instance
*/
TransactionBuilder withDomainMetadataRemoved(String domain);

/**
* Build the transaction. Also validates the given info to ensure that a valid transaction can be
* created.
Expand All @@ -101,6 +136,8 @@ TransactionBuilder withTransactionId(
* @throws InvalidConfigurationValueException if the value of the property is invalid.
* @throws UnknownConfigurationException if any of the properties are unknown to {@link
* TableConfig}.
* @throws DomainDoesNotExistException if removing a domain that does not exist in the latest
* version of the table
*/
Transaction build(Engine engine);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (2025) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.delta.kernel.exceptions;

import io.delta.kernel.annotation.Evolving;

/** Thrown when attempting to remove a domain metadata that does not exist in the read snapshot. */
@Evolving
public class DomainDoesNotExistException extends KernelException {
public DomainDoesNotExistException(String tablePath, String domain, long snapshotVersion) {
super(
String.format(
"%s: Cannot remove domain metadata with identifier %s because it does not exist in the "
+ "read snapshot at version %s",
tablePath, domain, snapshotVersion));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,14 @@ enum PostCommitHookType {
* perform this operation, reading previous checkpoint + logs is required to construct a new
* checkpoint, with latency scaling based on log size (typically seconds to minutes).
*/
CHECKPOINT
CHECKPOINT,
/**
* Writes a checksum file at the version committed by the transaction. This hook is present when
* all required table statistics (e.g. table size) for checksum file are known when a
* transaction commits. This operation has a minimal latency with no requirement of reading
* previous checkpoint or logs.
*/
CHECKSUM_SIMPLE
}

/** Invokes the post commit operation whose implementation must be thread safe. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@
import io.delta.kernel.internal.actions.DomainMetadata;
import io.delta.kernel.internal.actions.Metadata;
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.checksum.CRCInfo;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.metrics.SnapshotQueryContext;
import io.delta.kernel.internal.metrics.SnapshotReportImpl;
import io.delta.kernel.internal.replay.CRCInfo;
import io.delta.kernel.internal.replay.CreateCheckpointIterator;
import io.delta.kernel.internal.replay.LogReplay;
import io.delta.kernel.internal.snapshot.LogSegment;
Expand Down Expand Up @@ -117,6 +117,13 @@ public StructType getSchema() {
return getMetadata().getSchema();
}

@Override
public Optional<String> getDomainMetadata(String domain) {
return Optional.ofNullable(getDomainMetadataMap().get(domain))
.filter(dm -> !dm.isRemoved()) // only consider active domain metadatas (not tombstones)
.map(DomainMetadata::getConfiguration);
}

@Override
public ScanBuilder getScanBuilder() {
return new ScanBuilderImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static io.delta.kernel.internal.DeltaErrors.tableAlreadyExists;
import static io.delta.kernel.internal.TransactionImpl.DEFAULT_READ_VERSION;
import static io.delta.kernel.internal.TransactionImpl.DEFAULT_WRITE_VERSION;
import static io.delta.kernel.internal.tablefeatures.TableFeatures.DOMAIN_METADATA_W_FEATURE;
import static io.delta.kernel.internal.util.ColumnMapping.isColumnMappingModeEnabled;
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
import static io.delta.kernel.internal.util.SchemaUtils.casePreservingPartitionColNames;
Expand All @@ -28,6 +29,7 @@

import io.delta.kernel.*;
import io.delta.kernel.engine.Engine;
import io.delta.kernel.exceptions.DomainDoesNotExistException;
import io.delta.kernel.exceptions.TableNotFoundException;
import io.delta.kernel.internal.actions.*;
import io.delta.kernel.internal.fs.Path;
Expand All @@ -53,6 +55,8 @@ public class TransactionBuilderImpl implements TransactionBuilder {
private final TableImpl table;
private final String engineInfo;
private final Operation operation;
private final Map<String, DomainMetadata> domainMetadatasAdded = new HashMap<>();
private final Set<String> domainMetadatasRemoved = new HashSet<>();
private Optional<StructType> schema = Optional.empty();
private Optional<List<String>> partitionColumns = Optional.empty();
private Optional<SetTransaction> setTxnOpt = Optional.empty();
Expand Down Expand Up @@ -110,6 +114,31 @@ public TransactionBuilder withMaxRetries(int maxRetries) {
return this;
}

@Override
public TransactionBuilder withDomainMetadata(String domain, String config) {
checkArgument(
DomainMetadata.isUserControlledDomain(domain),
"Setting a system-controlled domain is not allowed: " + domain);
checkArgument(
!domainMetadatasRemoved.contains(domain),
"Cannot add a domain that is removed in this transaction");
// we override any existing value
domainMetadatasAdded.put(domain, new DomainMetadata(domain, config, false /* removed */));
return this;
}

@Override
public TransactionBuilder withDomainMetadataRemoved(String domain) {
checkArgument(
DomainMetadata.isUserControlledDomain(domain),
"Removing a system-controlled domain is not allowed: " + domain);
checkArgument(
!domainMetadatasAdded.containsKey(domain),
"Cannot remove a domain that is added in this transaction");
domainMetadatasRemoved.add(domain);
return this;
}

@Override
public Transaction build(Engine engine) {
SnapshotImpl snapshot;
Expand Down Expand Up @@ -155,14 +184,29 @@ public Transaction build(Engine engine) {
if (!newWriterFeatures.isEmpty()) {
logger.info("Automatically enabling writer features: {}", newWriterFeatures);
shouldUpdateProtocol = true;
List<String> oldWriterFeatures = protocol.getWriterFeatures();
Set<String> oldWriterFeatures = protocol.getWriterFeatures();
protocol = protocol.withNewWriterFeatures(newWriterFeatures);
List<String> curWriterFeatures = protocol.getWriterFeatures();
Set<String> curWriterFeatures = protocol.getWriterFeatures();
checkArgument(!Objects.equals(oldWriterFeatures, curWriterFeatures));
TableFeatures.validateWriteSupportedTable(protocol, metadata, table.getPath(engine));
}
}

/* --------------- Domain Metadata Protocol upgrade if necessary------------ */
if (!TableFeatures.isDomainMetadataSupported(protocol)) {
if (!domainMetadatasAdded.isEmpty()) {
// This txn is setting a domain metadata, enable the feature in the protocol
logger.info(
"Automatically enabling writer feature: {}", DOMAIN_METADATA_W_FEATURE.featureName());
protocol =
protocol.withNewWriterFeatures(
Collections.singleton(DOMAIN_METADATA_W_FEATURE.featureName()));
shouldUpdateProtocol = true;
}
// If domainMetadatasRemoved is non-empty we do nothing. A DomainDoesNotExistException will be
// thrown in `getDomainMetadatasToCommit` since the domain cannot exist in the readSnapshot.
}

return new TransactionImpl(
isNewTable,
table.getDataPath(),
Expand All @@ -176,7 +220,8 @@ public Transaction build(Engine engine) {
shouldUpdateMetadata,
shouldUpdateProtocol,
maxRetries,
table.getClock());
table.getClock(),
getDomainMetadatasToCommit(snapshot));
}

/** Validate the given parameters for the transaction. */
Expand Down Expand Up @@ -287,10 +332,47 @@ private Metadata getInitialMetadata() {
}

private Protocol getInitialProtocol() {
return new Protocol(
DEFAULT_READ_VERSION,
DEFAULT_WRITE_VERSION,
null /* readerFeatures */,
null /* writerFeatures */);
return new Protocol(DEFAULT_READ_VERSION, DEFAULT_WRITE_VERSION);
}

/**
* Returns a list of the domain metadatas to commit. This consists of the domain metadatas added
* in the transaction using {@link TransactionBuilder#withDomainMetadata(String, String)} and the
* tombstones for the domain metadatas removed in the transaction using {@link
* TransactionBuilder#withDomainMetadataRemoved(String)}.
*/
private List<DomainMetadata> getDomainMetadatasToCommit(SnapshotImpl snapshot) {
// Add all domain metadatas added in the transaction
List<DomainMetadata> finalDomainMetadatas = new ArrayList<>(domainMetadatasAdded.values());

// Generate the tombstones for the removed domain metadatas
Map<String, DomainMetadata> snapshotDomainMetadataMap = snapshot.getDomainMetadataMap();
for (String domainName : domainMetadatasRemoved) {
// Note: we know domainName is not already in finalDomainMetadatas because we do not allow
// removing and adding a domain with the same identifier in a single txn!
if (snapshotDomainMetadataMap.containsKey(domainName)) {
DomainMetadata domainToRemove = snapshotDomainMetadataMap.get(domainName);
if (domainToRemove.isRemoved()) {
// If the domain is already removed we throw an error to avoid any inconsistencies or
// ambiguity. The snapshot read by the connector is inconsistent with the snapshot
// loaded here as the domain to remove no longer exists.
throw new DomainDoesNotExistException(
table.getDataPath().toString(), domainName, snapshot.getVersion());
}
finalDomainMetadatas.add(domainToRemove.removed());
} else {
// We must throw an error if the domain does not exist. Otherwise, there could be unexpected
// behavior within conflict resolution. For example, consider the following
// 1. Table has no domains set in V0
// 2. txnA is started and wants to remove domain "foo"
// 3. txnB is started and adds domain "foo" and commits V1 before txnA
// 4. txnA needs to perform conflict resolution against the V1 commit from txnB
// Conflict resolution should fail but since the domain does not exist we cannot create
// a tombstone to mark it as removed and correctly perform conflict resolution.
throw new DomainDoesNotExistException(
table.getDataPath().toString(), domainName, snapshot.getVersion());
}
}
return finalDomainMetadatas;
}
}
Loading

0 comments on commit ceda681

Please sign in to comment.