Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK] Support CTAS from a table with a deleted default value #4142

Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -683,6 +683,15 @@ trait OptimisticTransactionImpl extends DeltaTransaction
}
}

if (spark.sessionState.conf
.getConf(DeltaSQLConf.REMOVE_EXISTS_DEFAULT_FROM_SCHEMA_ON_EVERY_METADATA_CHANGE)) {
val schemaWithRemovedExistsDefaults =
SchemaUtils.removeExistsDefaultMetadata(newMetadataTmp.schema)
if (schemaWithRemovedExistsDefaults != newMetadataTmp.schema) {
newMetadataTmp = newMetadataTmp.copy(schemaString = schemaWithRemovedExistsDefaults.json)
}
}

// Table features Part 2: add manually-supported features specified in table properties, aka
// those start with [[FEATURE_PROP_PREFIX]].
//
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,13 @@ import org.apache.spark.sql.delta.schema.SchemaMergingUtils._
import org.apache.spark.sql.delta.sources.DeltaSourceUtils.GENERATION_EXPRESSION_METADATA_KEY
import org.apache.spark.sql.delta.sources.{DeltaSQLConf, DeltaStreamUtils}
import org.apache.spark.sql.util.ScalaExtensions._

import org.apache.spark.internal.MDC
import org.apache.spark.sql._
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{Resolver, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeReference, Expression, GetArrayItem, GetArrayStructFields, GetMapValue, GetStructField}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Project}
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
import org.apache.spark.sql.catalyst.util.{CharVarcharUtils, ResolveDefaultColumnsUtils}
import org.apache.spark.sql.execution.streaming.IncrementalExecution
import org.apache.spark.sql.functions.{col, struct}
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -1546,6 +1545,30 @@ def normalizeColumnNamesInDataType(
def areLogicalNamesEqual(col1: Seq[String], col2: Seq[String]): Boolean = {
col1.length == col2.length && col1.zip(col2).forall(DELTA_COL_RESOLVER.tupled)
}

def removeExistsDefaultMetadata(schema: StructType): StructType = {
// 'EXISTS_DEFAULT' is not used in Delta because it is not allowed to add a column with a
// default value. Spark does though still add the metadata key when a column with a default
// value is added at table creation.
// We remove the metadata field here because it is not part of the Delta protocol and
// having it in the schema prohibits CTAS from a table with a dropped default value.
// @TODO: Clarify if active default values should be propagated to the target table in CTAS or
// not and if not also remove 'CURRENT_DEFAULT' in CTAS.
SchemaUtils.transformSchema(schema) {
case (_, StructType(fields), _)
if fields.exists(_.metadata.contains(
ResolveDefaultColumnsUtils.EXISTS_DEFAULT_COLUMN_METADATA_KEY)) =>
val newFields = fields.map { field =>
val builder = new MetadataBuilder()
.withMetadata(field.metadata)
.remove(ResolveDefaultColumnsUtils.EXISTS_DEFAULT_COLUMN_METADATA_KEY)

field.copy(metadata = builder.build())
}
StructType(newFields)
case (_, other, _) => other
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -618,6 +618,18 @@ trait DeltaSQLConfBase {
.booleanConf
.createWithDefault(true)

val REMOVE_EXISTS_DEFAULT_FROM_SCHEMA_ON_EVERY_METADATA_CHANGE =
buildConf("allowColumnDefaults.removeExistsDefaultFromSchemaOnMetadataChange")
.internal()
.doc("When enabled, remove all field metadata entries using the 'EXISTS_DEFAULT' key " +
"from the schema whenever the table metadata is updated. 'EXISTS_DEFAULT' holds values " +
"that are used in Spark for existing rows when a new column with a default value is " +
"added to a table. Since we do not support adding columns with a default value in " +
"Delta, this configuration should always be removed, also when it was written by an " +
"older version that still put it into the schema.")
.booleanConf
.createWithDefault(true)

//////////////////////////////////////////////
// DynamoDB Commit Coordinator-specific configs
/////////////////////////////////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType, ExternalCatalogUtils, SessionCatalog}
import org.apache.spark.sql.catalyst.parser.ParseException
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumnsUtils
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Identifier, Table, TableCatalog}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
Expand Down Expand Up @@ -2409,6 +2410,117 @@ class DeltaTableCreationSuite
}
}
}

private def schemaContainsExistsDefaultKey(testTableName: String): Boolean = {
val (_, snapshot) = DeltaLog.forTableWithSnapshot(spark, TableIdentifier(testTableName))
snapshot.metadata.schema.fields.exists(
_.metadata.contains(ResolveDefaultColumnsUtils.EXISTS_DEFAULT_COLUMN_METADATA_KEY))
}

test("Default column values: A table metadata update removes EXISTS_DEFAULT from a table") {
val testTableName = "test_table"
val metadataOperations = Seq(
s"ALTER TABLE $testTableName ALTER COLUMN int_col SET DEFAULT 2",
s"ALTER TABLE $testTableName CLUSTER BY (int_col)",
s"COMMENT ON TABLE $testTableName IS 'test comment'"
)

metadataOperations.foreach { metadataUpdatingQuery =>
withTable(testTableName) {
// Create the table and ensure that EXISTS_DEFAULT is part of the schema.
withSQLConf(DeltaSQLConf
.REMOVE_EXISTS_DEFAULT_FROM_SCHEMA_ON_EVERY_METADATA_CHANGE.key -> "false") {
sql(s"""CREATE TABLE $testTableName(int_col INT DEFAULT 2)
|USING delta
|TBLPROPERTIES ('delta.feature.allowColumnDefaults' = 'supported')""".stripMargin)
assert(schemaContainsExistsDefaultKey(testTableName))
}

// Execute the metadata operation and assert that it removed EXISTS_DEFAULT from the schema.
withSQLConf(DeltaSQLConf
.REMOVE_EXISTS_DEFAULT_FROM_SCHEMA_ON_EVERY_METADATA_CHANGE.key -> "true") {
sql(metadataUpdatingQuery)
assert(!schemaContainsExistsDefaultKey(testTableName),
s"Operation '$metadataUpdatingQuery' did not remove EXISTS_DEFAULT from the schema.")
}
}
}
}

test("Default column values: Writes to a table do not remove EXISTS_DEFAULT from a table") {
val testTableName = "test_table"
withTable(testTableName) {
// Add an EXISTS_DEFAULT entry to the schema by disabling the feature flag.
withSQLConf(DeltaSQLConf
.REMOVE_EXISTS_DEFAULT_FROM_SCHEMA_ON_EVERY_METADATA_CHANGE.key -> "false") {
sql(s"""CREATE TABLE $testTableName(int_col INT DEFAULT 2)
|USING delta
|TBLPROPERTIES ('delta.feature.allowColumnDefaults' = 'supported')""".stripMargin)
assert(schemaContainsExistsDefaultKey(testTableName))

withSQLConf(DeltaSQLConf
.REMOVE_EXISTS_DEFAULT_FROM_SCHEMA_ON_EVERY_METADATA_CHANGE.key -> "true") {
sql(s"INSERT INTO $testTableName VALUES (1)")
// Validate that EXISTS_DEFAULT is still part of the schema.
assert(schemaContainsExistsDefaultKey(testTableName))
}
}
}
}

test("Default column values: CREATE TABLE selecting from a table with dropped column defaults") {
for (sourceTableSchemaContainsKey <- Seq(true, false)) {
withTable("test_table", "test_table_2", "test_table_3") {
// To test with the 'EXISTS_DEFAULT' key present in the source table, we disable removal.
withSQLConf(DeltaSQLConf.REMOVE_EXISTS_DEFAULT_FROM_SCHEMA_ON_EVERY_METADATA_CHANGE.key
-> (!sourceTableSchemaContainsKey).toString) {
// Defaults are only possible for top level columns.
sql("""CREATE TABLE test_table(int_col INT DEFAULT 2)
|USING delta
|TBLPROPERTIES ('delta.feature.allowColumnDefaults' = 'supported')""".stripMargin)
}

def schemaContainsExistsKey(tableName: String): Boolean = {
val (_, snapshot) = DeltaLog.forTableWithSnapshot(spark, TableIdentifier(tableName))
snapshot.schema.fields.exists { field =>
field.metadata.contains(ResolveDefaultColumnsUtils.EXISTS_DEFAULT_COLUMN_METADATA_KEY)
}
}

def defaultsTableFeatureEnabled(tableName: String): Boolean = {
val (_, snapshot) = DeltaLog.forTableWithSnapshot(spark, TableIdentifier(tableName))
val isEnabled =
snapshot.protocol.writerFeatureNames.contains(AllowColumnDefaultsTableFeature.name)
val schemaContainsCurrentDefaultKey = snapshot.schema.fields.exists { field =>
field.metadata.contains(
ResolveDefaultColumnsUtils.CURRENT_DEFAULT_COLUMN_METADATA_KEY)
}
assert(schemaContainsCurrentDefaultKey === isEnabled)
isEnabled
}

assert(schemaContainsExistsKey("test_table") == sourceTableSchemaContainsKey)
assert(defaultsTableFeatureEnabled("test_table"))

// It is not possible to add a column with a default to a Delta table.
assertThrows[DeltaAnalysisException] {
sql("ALTER TABLE test_table ADD COLUMN new_column_with_a_default INT DEFAULT 0")
}

// @TODO: It is currently not possible to CTAS from a table with an active column default
// without explicitly enabling the table feature.
assertThrows[AnalysisException] {
sql("CREATE TABLE test_table_2 USING DELTA AS SELECT * FROM test_table")
}

sql("ALTER TABLE test_table ALTER COLUMN int_col DROP DEFAULT")
sql("CREATE TABLE test_table_3 USING DELTA AS SELECT * FROM test_table")

assert(schemaContainsExistsKey("test_table_3") === false)
assert(!defaultsTableFeatureEnabled("test_table_3"))
}
}
}
}

trait DeltaTableCreationColumnMappingSuiteBase extends DeltaColumnMappingSelectedTestMixin {
Expand Down
Loading