Skip to content

Commit

Permalink
[SPARK] Support CTAS from a table with a deleted default value (#4142)
Browse files Browse the repository at this point in the history
#### Which Delta project/connector is this regarding?

- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

We are currently leaking values using the 'EXISTS_DEFAULT' config key
that is used for default columns in Spark into table schemas. The
'EXISTS_DEFAULT' key holds values for rows that existed when a new
column with a default value was added to the table. This use case is not
supported in Delta.

In this PR, we remove the 'EXISTS_DEFAULT' entries every time we update
the metadata of a table. This includes the case when a table is created.
This ensures that new tables do not have 'EXISTS_DEFAULT' entries and
operations that update metadata remove entries that existed before
because previous versions added them.

This change also now allows us to CTAS from a table where a default
value existed in the past but has been dropped. It does not enable CTAS
from a table with an active default value.

The feature has a kill switch:
REMOVE_EXISTS_DEFAULT_FROM_SCHEMA_ON_EVERY_METADATA_CHANGE

## How was this patch tested?

Added new tests

## Does this PR introduce _any_ user-facing changes?

Previously it was not possible to CTAS from a table with a removed
default without setting the delta.feature.allowColumnDefaults even
though no default value was actually part of the new table. this is now
possible.
  • Loading branch information
olaky authored Feb 26, 2025
1 parent 602edad commit 6e9498c
Show file tree
Hide file tree
Showing 4 changed files with 158 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,15 @@ trait OptimisticTransactionImpl extends DeltaTransaction
}
}

if (spark.sessionState.conf
.getConf(DeltaSQLConf.REMOVE_EXISTS_DEFAULT_FROM_SCHEMA_ON_EVERY_METADATA_CHANGE)) {
val schemaWithRemovedExistsDefaults =
SchemaUtils.removeExistsDefaultMetadata(newMetadataTmp.schema)
if (schemaWithRemovedExistsDefaults != newMetadataTmp.schema) {
newMetadataTmp = newMetadataTmp.copy(schemaString = schemaWithRemovedExistsDefaults.json)
}
}

// Table features Part 2: add manually-supported features specified in table properties, aka
// those start with [[FEATURE_PROP_PREFIX]].
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression, GetArrayItem, GetArrayStructFields, GetMapValue, GetStructField}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Project}
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, ResolveDefaultColumnsUtils}
import org.apache.spark.sql.execution.streaming.IncrementalExecution
import org.apache.spark.sql.functions.{col, struct}
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -1546,6 +1546,30 @@ def normalizeColumnNamesInDataType(
def areLogicalNamesEqual(col1: Seq[String], col2: Seq[String]): Boolean = {
col1.length == col2.length && col1.zip(col2).forall(DELTA_COL_RESOLVER.tupled)
}

def removeExistsDefaultMetadata(schema: StructType): StructType = {
// 'EXISTS_DEFAULT' is not used in Delta because it is not allowed to add a column with a
// default value. Spark does though still add the metadata key when a column with a default
// value is added at table creation.
// We remove the metadata field here because it is not part of the Delta protocol and
// having it in the schema prohibits CTAS from a table with a dropped default value.
// @TODO: Clarify if active default values should be propagated to the target table in CTAS or
// not and if not also remove 'CURRENT_DEFAULT' in CTAS.
SchemaUtils.transformSchema(schema) {
case (_, StructType(fields), _)
if fields.exists(_.metadata.contains(
ResolveDefaultColumnsUtils.EXISTS_DEFAULT_COLUMN_METADATA_KEY)) =>
val newFields = fields.map { field =>
val builder = new MetadataBuilder()
.withMetadata(field.metadata)
.remove(ResolveDefaultColumnsUtils.EXISTS_DEFAULT_COLUMN_METADATA_KEY)

field.copy(metadata = builder.build())
}
StructType(newFields)
case (_, other, _) => other
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,18 @@ trait DeltaSQLConfBase {
.booleanConf
.createWithDefault(true)

val REMOVE_EXISTS_DEFAULT_FROM_SCHEMA_ON_EVERY_METADATA_CHANGE =
buildConf("allowColumnDefaults.removeExistsDefaultFromSchemaOnMetadataChange")
.internal()
.doc("When enabled, remove all field metadata entries using the 'EXISTS_DEFAULT' key " +
"from the schema whenever the table metadata is updated. 'EXISTS_DEFAULT' holds values " +
"that are used in Spark for existing rows when a new column with a default value is " +
"added to a table. Since we do not support adding columns with a default value in " +
"Delta, this configuration should always be removed, also when it was written by an " +
"older version that still put it into the schema.")
.booleanConf
.createWithDefault(true)

//////////////////////////////////////////////
// DynamoDB Commit Coordinator-specific configs
/////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, ExternalCatalogUtils, SessionCatalog}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumnsUtils
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, Table, TableCatalog}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
Expand Down Expand Up @@ -2409,6 +2410,117 @@ class DeltaTableCreationSuite
}
}
}

private def schemaContainsExistsDefaultKey(testTableName: String): Boolean = {
val (_, snapshot) = DeltaLog.forTableWithSnapshot(spark, TableIdentifier(testTableName))
snapshot.metadata.schema.fields.exists(
_.metadata.contains(ResolveDefaultColumnsUtils.EXISTS_DEFAULT_COLUMN_METADATA_KEY))
}

test("Default column values: A table metadata update removes EXISTS_DEFAULT from a table") {
val testTableName = "test_table"
val metadataOperations = Seq(
s"ALTER TABLE $testTableName ALTER COLUMN int_col SET DEFAULT 2",
s"ALTER TABLE $testTableName CLUSTER BY (int_col)",
s"COMMENT ON TABLE $testTableName IS 'test comment'"
)

metadataOperations.foreach { metadataUpdatingQuery =>
withTable(testTableName) {
// Create the table and ensure that EXISTS_DEFAULT is part of the schema.
withSQLConf(DeltaSQLConf
.REMOVE_EXISTS_DEFAULT_FROM_SCHEMA_ON_EVERY_METADATA_CHANGE.key -> "false") {
sql(s"""CREATE TABLE $testTableName(int_col INT DEFAULT 2)
|USING delta
|TBLPROPERTIES ('delta.feature.allowColumnDefaults' = 'supported')""".stripMargin)
assert(schemaContainsExistsDefaultKey(testTableName))
}

// Execute the metadata operation and assert that it removed EXISTS_DEFAULT from the schema.
withSQLConf(DeltaSQLConf
.REMOVE_EXISTS_DEFAULT_FROM_SCHEMA_ON_EVERY_METADATA_CHANGE.key -> "true") {
sql(metadataUpdatingQuery)
assert(!schemaContainsExistsDefaultKey(testTableName),
s"Operation '$metadataUpdatingQuery' did not remove EXISTS_DEFAULT from the schema.")
}
}
}
}

test("Default column values: Writes to a table do not remove EXISTS_DEFAULT from a table") {
val testTableName = "test_table"
withTable(testTableName) {
// Add an EXISTS_DEFAULT entry to the schema by disabling the feature flag.
withSQLConf(DeltaSQLConf
.REMOVE_EXISTS_DEFAULT_FROM_SCHEMA_ON_EVERY_METADATA_CHANGE.key -> "false") {
sql(s"""CREATE TABLE $testTableName(int_col INT DEFAULT 2)
|USING delta
|TBLPROPERTIES ('delta.feature.allowColumnDefaults' = 'supported')""".stripMargin)
assert(schemaContainsExistsDefaultKey(testTableName))

withSQLConf(DeltaSQLConf
.REMOVE_EXISTS_DEFAULT_FROM_SCHEMA_ON_EVERY_METADATA_CHANGE.key -> "true") {
sql(s"INSERT INTO $testTableName VALUES (1)")
// Validate that EXISTS_DEFAULT is still part of the schema.
assert(schemaContainsExistsDefaultKey(testTableName))
}
}
}
}

test("Default column values: CREATE TABLE selecting from a table with dropped column defaults") {
for (sourceTableSchemaContainsKey <- Seq(true, false)) {
withTable("test_table", "test_table_2", "test_table_3") {
// To test with the 'EXISTS_DEFAULT' key present in the source table, we disable removal.
withSQLConf(DeltaSQLConf.REMOVE_EXISTS_DEFAULT_FROM_SCHEMA_ON_EVERY_METADATA_CHANGE.key
-> (!sourceTableSchemaContainsKey).toString) {
// Defaults are only possible for top level columns.
sql("""CREATE TABLE test_table(int_col INT DEFAULT 2)
|USING delta
|TBLPROPERTIES ('delta.feature.allowColumnDefaults' = 'supported')""".stripMargin)
}

def schemaContainsExistsKey(tableName: String): Boolean = {
val (_, snapshot) = DeltaLog.forTableWithSnapshot(spark, TableIdentifier(tableName))
snapshot.schema.fields.exists { field =>
field.metadata.contains(ResolveDefaultColumnsUtils.EXISTS_DEFAULT_COLUMN_METADATA_KEY)
}
}

def defaultsTableFeatureEnabled(tableName: String): Boolean = {
val (_, snapshot) = DeltaLog.forTableWithSnapshot(spark, TableIdentifier(tableName))
val isEnabled =
snapshot.protocol.writerFeatureNames.contains(AllowColumnDefaultsTableFeature.name)
val schemaContainsCurrentDefaultKey = snapshot.schema.fields.exists { field =>
field.metadata.contains(
ResolveDefaultColumnsUtils.CURRENT_DEFAULT_COLUMN_METADATA_KEY)
}
assert(schemaContainsCurrentDefaultKey === isEnabled)
isEnabled
}

assert(schemaContainsExistsKey("test_table") == sourceTableSchemaContainsKey)
assert(defaultsTableFeatureEnabled("test_table"))

// It is not possible to add a column with a default to a Delta table.
assertThrows[DeltaAnalysisException] {
sql("ALTER TABLE test_table ADD COLUMN new_column_with_a_default INT DEFAULT 0")
}

// @TODO: It is currently not possible to CTAS from a table with an active column default
// without explicitly enabling the table feature.
assertThrows[AnalysisException] {
sql("CREATE TABLE test_table_2 USING DELTA AS SELECT * FROM test_table")
}

sql("ALTER TABLE test_table ALTER COLUMN int_col DROP DEFAULT")
sql("CREATE TABLE test_table_3 USING DELTA AS SELECT * FROM test_table")

assert(schemaContainsExistsKey("test_table_3") === false)
assert(!defaultsTableFeatureEnabled("test_table_3"))
}
}
}
}

trait DeltaTableCreationColumnMappingSuiteBase extends DeltaColumnMappingSelectedTestMixin {
Expand Down

0 comments on commit 6e9498c

Please sign in to comment.