Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Kernel: Support schema evolution through existing withSchema API on T… #4196

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from

Conversation

amogh-jahagirdar
Copy link
Contributor

…ransactionBuilder

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Description

How was this patch tested?

Does this PR introduce any user-facing changes?

Comment on lines 213 to 223
throw new IllegalArgumentException(
"Map field " + field.getName() + " must have exactly two nested IDs");
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tests for this

Comment on lines 219 to 229
throw new IllegalArgumentException(
"Map field " + field.getName() + " cannot contain duplicate nested IDs");
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tests for this

@amogh-jahagirdar amogh-jahagirdar force-pushed the with-schema-for-existing-tables branch from 031ba66 to 93b22a1 Compare February 27, 2025 09:33
@amogh-jahagirdar amogh-jahagirdar force-pushed the with-schema-for-existing-tables branch from 93b22a1 to 4278745 Compare February 27, 2025 10:52
== FieldMetadata.builder().putLong("map.key", 5).putLong("map.value", 6).build())
}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add some tests with struct of struct, struct with map/array field, array of struct/map etc

Map<Integer, StructField> newSchemaIdToField = idToField(newSchema);
Map<Integer, StructField> currentSchemaIdToField = idToField(currentSchema);
for (Map.Entry<Integer, StructField> newFieldEntry : newSchemaIdToField.entrySet()) {
if (!currentSchemaIdToField.containsKey(newFieldEntry.getKey())) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tdas validates that a non-nullable field cannot be added.

} else {
StructField currentField = currentSchemaIdToField.get(newFieldEntry.getKey());
StructField newField = newSchemaIdToField.get(newFieldEntry.getKey());
if (newField.getDataType() != currentField.getDataType()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Validates no type promotion, this will need to be loosened though as we support those features...

fieldIdToPhysicalName.put(columnId, physicalName);

if (field.getDataType() instanceof MapType) {
if (!hasNestedColumnIds(field)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically the Delta protoocol doesn't specify that delta.columnMapping.nested.ids needs to be set but I believe it's a requirement for Iceberg compat.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think i just commented about this :) yeah we need to do this for IcebergCompat I think (but we should only do this when it's enabled!)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please clearly highlight in the code these sorts of irregularities? Leave a TODO

i.e. if this is a requirement for iceberg compat but not for column mapping .. that's good to know, we may want to update this code in the future to enable better column mapping compatibility, right?

for (Map.Entry<Long, String> field : updatedFieldIdToPhysicalName.entrySet()) {
String existingPhysicalName = currentFieldIdToPhysicalName.get(field.getKey());
// Found an existing field, verify the physical name is preserved
if (existingPhysicalName != null && !existingPhysicalName.equals(field.getValue())) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Validation that physical names for existing fields are preserved between updates

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is already done here, but I think we should also validate that for complex types the nested fieldIds are consistent (unchanged) as well

Comment on lines +1488 to +1492
FieldMetadata.builder().putLong(ColumnMapping.COLUMN_MAPPING_ID_KEY, 4)
.putString(ColumnMapping.COLUMN_MAPPING_PHYSICAL_NAME_KEY, "d").build())
.add("e", IntegerType.INTEGER, true,
FieldMetadata.builder().putLong(ColumnMapping.COLUMN_MAPPING_ID_KEY, 5)
.putString(ColumnMapping.COLUMN_MAPPING_PHYSICAL_NAME_KEY, "e").build()), true,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe put some of these field metadata building in some helper so it's easier to read

Comment on lines +189 to +200
if (!hasPhysicalName(field)) {
throw new IllegalArgumentException(
String.format(
"Column mapping mode is name and field %s is missing physical name",
field.getName()));
}

if (!hasColumnId(field)) {
throw new IllegalArgumentException(
String.format(
"Column mapping mode is name and field %s is missing column id", field.getName()));
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Validation that physical names and column ids are defined

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

need to fix the message since it applies for both column mapping modes

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we do this after we populate them for the non-DBI connector? We don't expect other connectors to populate their own physical names/column ids

(Referring to for table creation. For this updateSchema API I still think we should maybe make it a separate internal API and restrict it to just usage by DBI -- not opening up schema evolution for all connectors yet)

Copy link
Collaborator

@nicklan nicklan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flushing a few small things

@@ -204,7 +214,8 @@ public Transaction build(Engine engine) {
shouldUpdateProtocol,
maxRetries,
table.getClock(),
getDomainMetadatasToCommit(snapshot));
getDomainMetadatasToCommit(snapshot),
!isNewTable && updatedSchema);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you comment what this parameter is preserveFieldIds

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 pls; unnamed boolean params are confusing

@@ -78,6 +79,59 @@ public static void validateSchema(StructType schema, boolean isColumnMappingEnab
validateSupportedType(schema);
}

public static void validateUpdatedSchema(StructType currentSchema, StructType schema) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we rename schema -> newSchema so it's more clear

if (newField.getDataType() != currentField.getDataType()) {
throw new IllegalArgumentException(
String.format(
"Cannot change existing field %s from %s to %s",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"Cannot change existing field %s from %s to %s",
"Cannot change the type of existing field %s from %s to %s",

}
}

private static void validateColumnIds(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment describing what this function does

Copy link
Collaborator

@allisonport-db allisonport-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Skimmed it a bit. Can you please add more method docs throughout so it's easier to understand what's happening :)

Also still would like to have a more concrete list of what we are validating, and what we explicitly aren't validating (and why we aren't and the consequences of not doing so).

Thank you!!

@@ -79,6 +80,7 @@ public TransactionBuilderImpl(TableImpl table, String engineInfo, Operation oper
@Override
public TransactionBuilder withSchema(Engine engine, StructType newSchema) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BI thought we were also discussing restricting this update schema method to a different non-public API? So that the public withSchema method always fails if it's not a new table

@@ -59,6 +59,7 @@ public class TransactionBuilderImpl implements TransactionBuilder {
private final Map<String, DomainMetadata> domainMetadatasAdded = new HashMap<>();
private final Set<String> domainMetadatasRemoved = new HashSet<>();
private Optional<StructType> schema = Optional.empty();
private boolean updatedSchema;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this? If schema is non-empty it was updated

@@ -204,7 +214,8 @@ public Transaction build(Engine engine) {
shouldUpdateProtocol,
maxRetries,
table.getClock(),
getDomainMetadatasToCommit(snapshot));
getDomainMetadatasToCommit(snapshot),
!isNewTable && updatedSchema);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 pls; unnamed boolean params are confusing

Comment on lines -169 to +184
Map<String, String> validatedProperties =
TableConfig.validateDeltaProperties(tableProperties.orElse(Collections.emptyMap()));
Map<String, String> newProperties = metadata.filterOutUnchangedProperties(validatedProperties);

ColumnMapping.verifyColumnMappingChange(metadata.getConfiguration(), newProperties, isNewTable);
if (tableProperties.isPresent()) {
Map<String, String> validatedProperties =
TableConfig.validateDeltaProperties(tableProperties.orElse(Collections.emptyMap()));
Map<String, String> newProperties = metadata.filterOutUnchangedProperties(validatedProperties);

if (!newProperties.isEmpty()) {
ColumnMapping.verifyColumnMappingChange(metadata.getConfiguration(), newProperties, isNewTable);

if (!newProperties.isEmpty()) {
shouldUpdateMetadata = true;
metadata = metadata.withNewConfiguration(newProperties);
}
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what are all these changes for?

Comment on lines +229 to +231
isNewTable
? tableProperties.orElse(Collections.emptyMap())
: snapshot.getMetadata().getConfiguration());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pass new configuration to this method? I'm not sure this is safe since we need to combine any new properties with existing ones in the case of a table property update

metadata,
ColumnMapping.getColumnMappingMode(metadata.getConfiguration()),
isNewTable);
if (!preserveFieldIds) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't really understand why we need to predicate on this?

If the fieldIds are present -- we should never update/override them right? that would be incorrect?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe can you explain what preserveFieldIds means and why we use it as !isNewTable && updatedSchema; I think maybe it's just not clear in the code

Aka when we should/shouldn't preserve them

@@ -156,6 +156,111 @@ static int findMaxColumnId(StructType schema) {
return maxColumnId;
}

static void validateColumnIds(StructType currentSchema, StructType updatedSchema) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

method docs for all of these please; we should be able to know what this does without reading the code

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vkorukanti do we need to validate somewhere the specific requirements for fieldIds for IcebergCompatV2? Like for complex types. Do we already do that somewhere else?

fieldIdToPhysicalName.put(columnId, physicalName);

if (field.getDataType() instanceof MapType) {
if (!hasNestedColumnIds(field)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think i just commented about this :) yeah we need to do this for IcebergCompat I think (but we should only do this when it's enabled!)

ColumnMapping.validateColumnIds(currentSchema, schema);
validateUpdatedSchemaCompatibility(currentSchema, schema);
}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

docs pls

Copy link
Collaborator

@scottsand-db scottsand-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! Left some comments and questions!

@@ -79,6 +80,7 @@ public TransactionBuilderImpl(TableImpl table, String engineInfo, Operation oper
@Override
public TransactionBuilder withSchema(Engine engine, StructType newSchema) {
this.schema = Optional.of(newSchema); // will be verified as part of the build() call
this.updatedSchema = true;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove this and just do this.schema.isPresent?

OR, you can move this to a private def isSchemaUpdate that uses the this.schema.isPresent -- but let's not introduce a new member varible that represents coupled state

+ "Schema can only be set on a new table.");
boolean columnMappingEnabled = isColumnMappingModeEnabled(mappingMode);
if (!columnMappingEnabled && updatedSchema) {
throw new IllegalArgumentException(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a temporary restriction or a permanent restriction?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also -- this should be a KernelException -- the user is performing an action that is invalid against the current table state.

@@ -156,6 +156,111 @@ static int findMaxColumnId(StructType schema) {
return maxColumnId;
}

static void validateColumnIds(StructType currentSchema, StructType updatedSchema) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(1) is this intentionally package private?
(2) can you add some method docs that say what "validate" means? I'd prefer to read a short method comment than read all this code to get the gist of this

fieldIdToPhysicalName.put(columnId, physicalName);

if (field.getDataType() instanceof MapType) {
if (!hasNestedColumnIds(field)) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please clearly highlight in the code these sorts of irregularities? Leave a TODO

i.e. if this is a requirement for iceberg compat but not for column mapping .. that's good to know, we may want to update this code in the future to enable better column mapping compatibility, right?

@@ -1321,6 +1321,369 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
}
}

test("Test set schema on existing table") {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: for bervity you can ommit the "Test" in your test name

} else if (field.getDataType() instanceof ArrayType) {
if (!hasNestedColumnIds(field)) {
throw new IllegalArgumentException(
String.format("Array field %s must have exactly 1 nested ID", field.getName()));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you make sure we have unit tests for each of these error cases?

}
}

test("Test updating schema with adding an array and map type") {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this test and the first test Test set schema on existing table are the only "positive" cases covered. Are there others we should cover here?

Should renaming, moving, etc. be covered here?

@@ -1321,6 +1321,369 @@ class DeltaTableWritesSuite extends DeltaTableWriteSuiteBase with ParquetSuiteBa
}
}

test("Test set schema on existing table") {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should split these tests off to their own suite -- what do you think?

Also -- @amogh-jahagirdar -- If you see the classdocs for this test suite, "Transaction commit in this suite IS REQUIRED TO use commitTransaction than .commit" -- the fact that you are not doing this and things are still working fine makes me (a) think these should be in a new suite, and (b) why we need to use commitTransaction ?

cc @vkorukanti

@@ -156,6 +156,111 @@ static int findMaxColumnId(StructType schema) {
return maxColumnId;
}

static void validateColumnIds(StructType currentSchema, StructType updatedSchema) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would some sort of schema visitor help here?

@nicklan nicklan requested a review from vkorukanti February 28, 2025 22:09
Copy link
Collaborator

@allisonport-db allisonport-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Commenting a few things that I think need to be checked but didn't see specifically (but didn't parse the code line-by-line so could be missing something)

  1. We need to forbid tightening nullability for existing columns. I think you mentioned this was implemented but didn't see it commented so not 100% sure.
  2. We need to forbid dropping partition columns. I tried this in spark SQL and it is not allowed (makes sense!).

(in the future)
3) No new generated columns AND generation expression is unchanged for existing columns.

for (Map.Entry<Long, String> field : updatedFieldIdToPhysicalName.entrySet()) {
String existingPhysicalName = currentFieldIdToPhysicalName.get(field.getKey());
// Found an existing field, verify the physical name is preserved
if (existingPhysicalName != null && !existingPhysicalName.equals(field.getValue())) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if this is already done here, but I think we should also validate that for complex types the nested fieldIds are consistent (unchanged) as well

Comment on lines +189 to +200
if (!hasPhysicalName(field)) {
throw new IllegalArgumentException(
String.format(
"Column mapping mode is name and field %s is missing physical name",
field.getName()));
}

if (!hasColumnId(field)) {
throw new IllegalArgumentException(
String.format(
"Column mapping mode is name and field %s is missing column id", field.getName()));
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we do this after we populate them for the non-DBI connector? We don't expect other connectors to populate their own physical names/column ids

(Referring to for table creation. For this updateSchema API I still think we should maybe make it a separate internal API and restrict it to just usage by DBI -- not opening up schema evolution for all connectors yet)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants