Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kernel: Support schema evolution through existing withSchema API on T… #4196

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ public class TransactionBuilderImpl implements TransactionBuilder {
private final Map<String, DomainMetadata> domainMetadatasAdded = new HashMap<>();
private final Set<String> domainMetadatasRemoved = new HashSet<>();
private Optional<StructType> schema = Optional.empty();
private boolean updatedSchema;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this? If schema is non-empty it was updated

private Optional<List<String>> partitionColumns = Optional.empty();
private Optional<SetTransaction> setTxnOpt = Optional.empty();
private Optional<Map<String, String>> tableProperties = Optional.empty();
Expand All @@ -79,6 +80,7 @@ public TransactionBuilderImpl(TableImpl table, String engineInfo, Operation oper
@Override
public TransactionBuilder withSchema(Engine engine, StructType newSchema) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BI thought we were also discussing restricting this update schema method to a different non-public API? So that the public withSchema method always fails if it's not a new table

this.schema = Optional.of(newSchema); // will be verified as part of the build() call
this.updatedSchema = true;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove this and just do this.schema.isPresent?

OR, you can move this to a private def isSchemaUpdate that uses the this.schema.isPresent -- but let's not introduce a new member varible that represents coupled state

return this;
}

Expand Down Expand Up @@ -166,15 +168,23 @@ public Transaction build(Engine engine) {
boolean shouldUpdateProtocol = false;
Metadata metadata = snapshot.getMetadata();
Protocol protocol = snapshot.getProtocol();
Map<String, String> validatedProperties =
TableConfig.validateDeltaProperties(tableProperties.orElse(Collections.emptyMap()));
Map<String, String> newProperties = metadata.filterOutUnchangedProperties(validatedProperties);

ColumnMapping.verifyColumnMappingChange(metadata.getConfiguration(), newProperties, isNewTable);
if (tableProperties.isPresent()) {
Map<String, String> validatedProperties =
TableConfig.validateDeltaProperties(tableProperties.orElse(Collections.emptyMap()));
Map<String, String> newProperties = metadata.filterOutUnchangedProperties(validatedProperties);

if (!newProperties.isEmpty()) {
ColumnMapping.verifyColumnMappingChange(metadata.getConfiguration(), newProperties, isNewTable);

if (!newProperties.isEmpty()) {
shouldUpdateMetadata = true;
metadata = metadata.withNewConfiguration(newProperties);
}
}

Comment on lines -169 to +184
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what are all these changes for?

if (updatedSchema) {
shouldUpdateMetadata = true;
metadata = metadata.withNewConfiguration(newProperties);
metadata = metadata.withNewSchema(schema.get());
}

Optional<Tuple2<Protocol, Set<TableFeature>>> newProtocolAndFeatures =
Expand Down Expand Up @@ -204,7 +214,8 @@ public Transaction build(Engine engine) {
shouldUpdateProtocol,
maxRetries,
table.getClock(),
getDomainMetadatasToCommit(snapshot));
getDomainMetadatasToCommit(snapshot),
!isNewTable && updatedSchema);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you comment what this parameter is preserveFieldIds

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 pls; unnamed boolean params are confusing

}

/** Validate the given parameters for the transaction. */
Expand All @@ -213,14 +224,26 @@ private void validate(Engine engine, SnapshotImpl snapshot, boolean isNewTable)
// Validate the table has no features that Kernel doesn't yet support writing into it.
TableFeatures.validateKernelCanWriteToTable(
snapshot.getProtocol(), snapshot.getMetadata(), tablePath);
ColumnMappingMode mappingMode =
ColumnMapping.getColumnMappingMode(
isNewTable
? tableProperties.orElse(Collections.emptyMap())
: snapshot.getMetadata().getConfiguration());
Comment on lines +229 to +231
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pass new configuration to this method? I'm not sure this is safe since we need to combine any new properties with existing ones in the case of a table property update


if (!isNewTable) {
if (schema.isPresent()) {
throw tableAlreadyExists(
tablePath,
"Table already exists, but provided a new schema. "
+ "Schema can only be set on a new table.");
boolean columnMappingEnabled = isColumnMappingModeEnabled(mappingMode);
if (!columnMappingEnabled && updatedSchema) {
throw new IllegalArgumentException(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a temporary restriction or a permanent restriction?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also -- this should be a KernelException -- the user is performing an action that is invalid against the current table state.

"Cannot update schema for table when column mapping is disabled");
}

if (updatedSchema) {
// If overriding a schema on the existing table, the actual column IDs on the new schema
// should be validated
SchemaUtils.validateUpdatedSchema(snapshot.getSchema(), schema.get());
SchemaUtils.validatePartitionColumns(schema.get(), snapshot.getPartitionColumnNames());
}

if (partitionColumns.isPresent()) {
throw tableAlreadyExists(
tablePath,
Expand All @@ -229,9 +252,6 @@ private void validate(Engine engine, SnapshotImpl snapshot, boolean isNewTable)
}
} else {
// New table verify the given schema and partition columns
ColumnMappingMode mappingMode =
ColumnMapping.getColumnMappingMode(tableProperties.orElse(Collections.emptyMap()));

SchemaUtils.validateSchema(schema.get(), isColumnMappingModeEnabled(mappingMode));
SchemaUtils.validatePartitionColumns(
schema.get(), partitionColumns.orElse(Collections.emptyList()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ public class TransactionImpl implements Transaction {
private final Optional<SetTransaction> setTxnOpt;
private final boolean shouldUpdateProtocol;
private final Clock clock;
private List<DomainMetadata> domainMetadatas;
private final boolean preserveFieldIds;

private List<DomainMetadata> domainMetadatas = new ArrayList<>();
private Metadata metadata;
private boolean shouldUpdateMetadata;
private int maxRetries;
Expand All @@ -94,7 +96,8 @@ public TransactionImpl(
boolean shouldUpdateProtocol,
int maxRetries,
Clock clock,
List<DomainMetadata> domainMetadatas) {
List<DomainMetadata> domainMetadatas,
boolean preserveFieldIds) {
this.isNewTable = isNewTable;
this.dataPath = dataPath;
this.logPath = logPath;
Expand All @@ -109,6 +112,7 @@ public TransactionImpl(
this.maxRetries = maxRetries;
this.clock = clock;
this.domainMetadatas = domainMetadatas;
this.preserveFieldIds = preserveFieldIds;
}

@Override
Expand Down Expand Up @@ -301,11 +305,14 @@ private TransactionCommitResult doCommit(
List<Row> metadataActions = new ArrayList<>();
metadataActions.add(createCommitInfoSingleAction(attemptCommitInfo.toRow()));
if (shouldUpdateMetadata || isNewTable) {
this.metadata =
ColumnMapping.updateColumnMappingMetadata(
metadata,
ColumnMapping.getColumnMappingMode(metadata.getConfiguration()),
isNewTable);
if (!preserveFieldIds) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really understand why we need to predicate on this?

If the fieldIds are present -- we should never update/override them right? that would be incorrect?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe can you explain what preserveFieldIds means and why we use it as !isNewTable && updatedSchema; I think maybe it's just not clear in the code

Aka when we should/shouldn't preserve them

this.metadata =
ColumnMapping.updateColumnMappingMetadata(
metadata,
ColumnMapping.getColumnMappingMode(metadata.getConfiguration()),
isNewTable);
}

metadataActions.add(createMetadataSingleAction(metadata.toRow()));
}
if (shouldUpdateProtocol || isNewTable) {
Expand Down
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vkorukanti do we need to validate somewhere the specific requirements for fieldIds for IcebergCompatV2? Like for complex types. Do we already do that somewhere else?

Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,111 @@ static int findMaxColumnId(StructType schema) {
return maxColumnId;
}

static void validateColumnIds(StructType currentSchema, StructType updatedSchema) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

method docs for all of these please; we should be able to know what this does without reading the code

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(1) is this intentionally package private?
(2) can you add some method docs that say what "validate" means? I'd prefer to read a short method comment than read all this code to get the gist of this

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would some sort of schema visitor help here?

Map<Long, String> currentFieldIdToPhysicalName = new HashMap<>();
for (StructField field : currentSchema.fields()) {
validateColumnIds(field, currentFieldIdToPhysicalName);
}

Map<Long, String> updatedFieldIdToPhysicalName = new HashMap<>();
for (StructField field : updatedSchema.fields()) {
validateColumnIds(field, updatedFieldIdToPhysicalName);
}

Set<String> dedupedPhysicalNames = new HashSet<>(updatedFieldIdToPhysicalName.values());
if (dedupedPhysicalNames.size() != updatedFieldIdToPhysicalName.size()) {
throw new IllegalArgumentException("Assigned physical names must be unique");
}

for (Map.Entry<Long, String> field : updatedFieldIdToPhysicalName.entrySet()) {
String existingPhysicalName = currentFieldIdToPhysicalName.get(field.getKey());
// Found an existing field, verify the physical name is preserved
if (existingPhysicalName != null && !existingPhysicalName.equals(field.getValue())) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Validation that physical names for existing fields are preserved between updates

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is already done here, but I think we should also validate that for complex types the nested fieldIds are consistent (unchanged) as well

throw new IllegalArgumentException(
String.format(
"Existing field with id %s in current schema has physical name %s which is different from %s",
field.getKey(), existingPhysicalName, field.getValue()));
}
}
}

private static void validateColumnIds(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment describing what this function does

StructField field, Map<Long, String> fieldIdToPhysicalName) {
if (!hasPhysicalName(field)) {
throw new IllegalArgumentException(
String.format(
"Column mapping mode is name and field %s is missing physical name",
field.getName()));
}

if (!hasColumnId(field)) {
throw new IllegalArgumentException(
String.format(
"Column mapping mode is name and field %s is missing column id", field.getName()));
}
Comment on lines +189 to +200
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Validation that physical names and column ids are defined

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to fix the message since it applies for both column mapping modes

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we do this after we populate them for the non-DBI connector? We don't expect other connectors to populate their own physical names/column ids

(Referring to for table creation. For this updateSchema API I still think we should maybe make it a separate internal API and restrict it to just usage by DBI -- not opening up schema evolution for all connectors yet)


long columnId = getColumnId(field);

if (fieldIdToPhysicalName.containsKey(columnId)) {
throw new IllegalArgumentException(
String.format("Field %s with id %d already exists", field.getName(), columnId));
}

String physicalName = getPhysicalName(field);
fieldIdToPhysicalName.put(columnId, physicalName);

if (field.getDataType() instanceof MapType) {
if (!hasNestedColumnIds(field)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically the Delta protoocol doesn't specify that delta.columnMapping.nested.ids needs to be set but I believe it's a requirement for Iceberg compat.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think i just commented about this :) yeah we need to do this for IcebergCompat I think (but we should only do this when it's enabled!)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please clearly highlight in the code these sorts of irregularities? Leave a TODO

i.e. if this is a requirement for iceberg compat but not for column mapping .. that's good to know, we may want to update this code in the future to enable better column mapping compatibility, right?

throw new IllegalArgumentException(
String.format("Map field %s must have exactly 2 nested IDs", field.getName()));
}

List<Long> nestedFieldIds = getNestedFieldIds(field);

if (nestedFieldIds.size() != 2) {
throw new IllegalArgumentException(
String.format("Map field %s must have exactly 2 nested IDs", field.getName()));
}

Set<Long> dedupedNestedFieldIds = new HashSet<>(nestedFieldIds);
if (nestedFieldIds.size() != dedupedNestedFieldIds.size()) {
throw new IllegalArgumentException(
String.format("Map field %s cannot contain duplicate nested IDs", field.getName()));
}

for (Long id : dedupedNestedFieldIds) {
if (fieldIdToPhysicalName.containsKey(id)) {
throw new IllegalArgumentException(
String.format("Nested field with id %s already exists", id));
}
}

} else if (field.getDataType() instanceof ArrayType) {
if (!hasNestedColumnIds(field)) {
throw new IllegalArgumentException(
String.format("Array field %s must have exactly 1 nested ID", field.getName()));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make sure we have unit tests for each of these error cases?

}

List<Long> nestedFieldIds = getNestedFieldIds(field);
if (nestedFieldIds.size() != 1) {
throw new IllegalArgumentException(
String.format("Array field %s must have exactly 1 nested ID", field.getName()));
}
} else if (field.getDataType() instanceof StructType) {
StructType structType = (StructType) field.getDataType();
for (StructField nestedField : structType.fields()) {
validateColumnIds(nestedField, fieldIdToPhysicalName);
}
}
}

private static List<Long> getNestedFieldIds(StructField field) {
return getNestedColumnIds(field).getEntries().values().stream()
.filter(Long.class::isInstance)
.map(Long.class::cast)
.collect(Collectors.toList());
}

private static int findMaxColumnId(StructField field, int maxColumnId) {
if (hasColumnId(field)) {
maxColumnId = Math.max(maxColumnId, getColumnId(field));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package io.delta.kernel.internal.util;

import static io.delta.kernel.internal.DeltaErrors.*;
import static io.delta.kernel.internal.util.ColumnMapping.COLUMN_MAPPING_ID_KEY;
import static io.delta.kernel.internal.util.Preconditions.checkArgument;

import io.delta.kernel.expressions.Literal;
Expand Down Expand Up @@ -78,6 +79,59 @@ public static void validateSchema(StructType schema, boolean isColumnMappingEnab
validateSupportedType(schema);
}

public static void validateUpdatedSchema(StructType currentSchema, StructType schema) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we rename schema -> newSchema so it's more clear

validateSchema(schema, true);
ColumnMapping.validateColumnIds(currentSchema, schema);
validateUpdatedSchemaCompatibility(currentSchema, schema);
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docs pls

private static void validateUpdatedSchemaCompatibility(
StructType currentSchema, StructType newSchema) {
// Identify added columns based on field IDs
Map<Integer, StructField> newSchemaIdToField = idToField(newSchema);
Map<Integer, StructField> currentSchemaIdToField = idToField(currentSchema);
for (Map.Entry<Integer, StructField> newFieldEntry : newSchemaIdToField.entrySet()) {
if (!currentSchemaIdToField.containsKey(newFieldEntry.getKey())) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdas validates that a non-nullable field cannot be added.

if (!newFieldEntry.getValue().isNullable()) {
throw new IllegalArgumentException(
String.format(
"Cannot add a non-nullable field %s", newFieldEntry.getValue().getName()));
}
} else {
StructField currentField = currentSchemaIdToField.get(newFieldEntry.getKey());
StructField newField = newSchemaIdToField.get(newFieldEntry.getKey());
if (newField.getDataType() != currentField.getDataType()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Validates no type promotion, this will need to be loosened though as we support those features...

throw new IllegalArgumentException(
String.format(
"Cannot change existing field %s from %s to %s",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Cannot change existing field %s from %s to %s",
"Cannot change the type of existing field %s from %s to %s",

currentField.getName(), currentField.getDataType(), newField.getDataType()));
}
}
}
}

private static Map<Integer, StructField> idToField(StructType schema) {
Map<Integer, StructField> idToField = new HashMap<>();
for (StructField field : schema.fields()) {
idToField.putAll(idToField(field));
}

return idToField;
}

private static Map<Integer, StructField> idToField(StructField field) {
Map<Integer, StructField> idToField = new HashMap<>();
idToField.put(field.getMetadata().getLong(COLUMN_MAPPING_ID_KEY).intValue(), field);
if (field.getDataType() instanceof StructType) {
StructType structType = (StructType) field.getDataType();
for (StructField nestedField : structType.fields()) {
idToField.putAll(idToField(nestedField));
}
}

return idToField;
}

/**
* Verify the partition columns exists in the table schema and a supported data type for a
* partition column.
Expand Down
Loading
Loading