Skip to content

Commit

Permalink
Remove type widening metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
johanl-db committed Feb 26, 2025
1 parent 9c932bf commit dad2c5a
Show file tree
Hide file tree
Showing 18 changed files with 138 additions and 228 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ package io.delta.sharing.spark
import scala.collection.JavaConverters._

import org.apache.spark.sql.delta.{
DeltaColumnMapping,
DeltaErrors,
DeltaTableUtils => TahoeDeltaTableUtils
}
Expand Down Expand Up @@ -409,13 +408,15 @@ private[sharing] class DeltaSharingDataSource
HadoopFsRelation(
location = fileIndex,
// This is copied from DeltaLog.buildHadoopFsRelationWithFileIndex.
// Dropping column mapping metadata because it is not relevant for partition schema.
partitionSchema = DeltaColumnMapping.dropColumnMappingMetadata(fileIndex.partitionSchema),
// Dropping delta metadata because it is not relevant for partition schema.
partitionSchema =
TahoeDeltaTableUtils.removeInternalDeltaMetadata(spark, fileIndex.partitionSchema),
// This is copied from DeltaLog.buildHadoopFsRelationWithFileIndex, original comment:
// We pass all table columns as `dataSchema` so that Spark will preserve the partition
// column locations. Otherwise, for any partition columns not in `dataSchema`, Spark would
// just append them to the end of `dataSchema`.
dataSchema = DeltaColumnMapping.dropColumnMappingMetadata(
dataSchema = TahoeDeltaTableUtils.removeInternalDeltaMetadata(
spark,
TahoeDeltaTableUtils.removeInternalWriterMetadata(
spark,
SchemaUtils.dropNullTypeColumns(deltaSharingTableMetadata.metadata.schema)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,24 +98,12 @@ class DeltaSharingDataSourceTypeWideningSuite
}
}

/** Short-hand for the type widening metadata for column `value` for the test table above. */
private val typeWideningMetadata: Metadata =
new MetadataBuilder()
.putMetadataArray(
"delta.typeChanges", Array(
new MetadataBuilder()
.putString("fromType", "short")
.putString("toType", "integer")
.build()))
.build()

test(s"Delta sharing with type widening") {
withTestTable { tableName =>
testReadingDeltaShare(
tableName,
versionAsOf = None,
expectedSchema = new StructType()
.add("value", IntegerType, nullable = true, metadata = typeWideningMetadata),
expectedSchema = new StructType().add("value", IntegerType),
expectedResult = Seq(1, 2, 3, Int.MaxValue, 4, 5).toDF("value"))
}
}
Expand All @@ -125,15 +113,13 @@ class DeltaSharingDataSourceTypeWideningSuite
testReadingDeltaShare(
tableName,
versionAsOf = Some(3),
expectedSchema = new StructType()
.add("value", IntegerType, nullable = true, metadata = typeWideningMetadata),
expectedSchema = new StructType().add("value", IntegerType),
expectedResult = Seq(1, 2, 3, Int.MaxValue).toDF("value"))

testReadingDeltaShare(
tableName,
versionAsOf = Some(2),
expectedSchema = new StructType()
.add("value", IntegerType, nullable = true, metadata = typeWideningMetadata),
expectedSchema = new StructType().add("value", IntegerType),
expectedResult = Seq(1, 2).toDF("value"))

testReadingDeltaShare(
Expand All @@ -151,8 +137,7 @@ class DeltaSharingDataSourceTypeWideningSuite
tableName,
versionAsOf = None,
filter = Some(col("value") === Int.MaxValue),
expectedSchema = new StructType()
.add("value", IntegerType, nullable = true, metadata = typeWideningMetadata),
expectedSchema = new StructType().add("value", IntegerType),
expectedResult = Seq(Int.MaxValue).toDF("value"),
expectedJsonPredicate = Seq(
"""
Expand Down Expand Up @@ -187,7 +172,7 @@ class DeltaSharingDataSourceTypeWideningSuite
versionAsOf = None,
filter = Some(col("part") === Int.MaxValue),
expectedSchema = new StructType()
.add("part", IntegerType, nullable = true, metadata = typeWideningMetadata)
.add("part", IntegerType)
.add("value", ShortType),
expectedResult = Seq((Int.MaxValue, 4)).toDF("part", "value"),
expectedJsonPredicate = Seq(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -620,13 +620,13 @@ class DeltaLog private(
}
HadoopFsRelation(
fileIndex,
partitionSchema = DeltaColumnMapping.dropColumnMappingMetadata(
snapshot.metadata.partitionSchema),
partitionSchema = DeltaTableUtils.removeInternalDeltaMetadata(
spark, snapshot.metadata.partitionSchema),
// We pass all table columns as `dataSchema` so that Spark will preserve the partition
// column locations. Otherwise, for any partition columns not in `dataSchema`, Spark would
// just append them to the end of `dataSchema`.
dataSchema = DeltaColumnMapping.dropColumnMappingMetadata(
DeltaTableUtils.removeInternalWriterMetadata(spark, dataSchema)
dataSchema = DeltaTableUtils.removeInternalDeltaMetadata(
spark, DeltaTableUtils.removeInternalWriterMetadata(spark, dataSchema)
),
bucketSpec = bucketSpec,
fileFormat(snapshot.protocol, snapshot.metadata),
Expand Down
17 changes: 17 additions & 0 deletions spark/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,23 @@ object DeltaTableUtils extends PredicateHelper
)
}

/**
* Removes internal Delta metadata from the given schema. This includes tyically metadata used by
* reader-writer table features that shouldn't leak outside of the table. Use
* [[removeInternalWriterMetadata]] in addition / instead to remove metadata for writer-only table
* features.
*/
def removeInternalDeltaMetadata(spark: SparkSession, schema: StructType): StructType = {
val cleanedSchema = DeltaColumnMapping.dropColumnMappingMetadata(schema)

val conf = spark.sessionState.conf
if (conf.getConf(DeltaSQLConf.DELTA_TYPE_WIDENING_REMOVE_SCHEMA_METADATA)) {
TypeWideningMetadata.removeTypeWideningMetadata(cleanedSchema)._1
} else {
cleanedSchema
}
}

}

sealed abstract class UnresolvedPathBasedDeltaTableBase(path: String) extends UnresolvedLeafNode {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ case class DeltaTableV2(
val baseSchema = cdcRelation.map(_.schema).getOrElse {
DeltaTableUtils.removeInternalWriterMetadata(spark, initialSnapshot.schema)
}
DeltaColumnMapping.dropColumnMappingMetadata(baseSchema)
DeltaTableUtils.removeInternalDeltaMetadata(spark, baseSchema)
}

override def schema(): StructType = tableSchema
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.concurrent.TimeUnit

import org.apache.spark.sql.delta.skipping.clustering.ClusteredTableUtils
import org.apache.spark.sql.delta._
import org.apache.spark.sql.delta.DeltaColumnMapping.{dropColumnMappingMetadata, filterColumnMappingProperties}
import org.apache.spark.sql.delta.DeltaColumnMapping.filterColumnMappingProperties
import org.apache.spark.sql.delta.actions.{Action, Metadata, Protocol, TableFeatureProtocolUtils}
import org.apache.spark.sql.delta.actions.DomainMetadata
import org.apache.spark.sql.delta.commands.DMLUtils.TaggedCommitData
Expand Down Expand Up @@ -398,7 +398,7 @@ case class CreateDeltaTableCommand(
ClusteredTableUtils.getDomainMetadataFromTransaction(
ClusteredTableUtils.getClusterBySpecOptional(table), txn).toSeq
} else {
verifyTableMetadata(txn, tableWithLocation)
verifyTableMetadata(sparkSession, txn, tableWithLocation)
Nil
}
}
Expand Down Expand Up @@ -539,6 +539,7 @@ case class CreateDeltaTableCommand(
* table.
*/
private def verifyTableMetadata(
sparkSession: SparkSession,
txn: OptimisticTransaction,
tableDesc: CatalogTable): Unit = {
val existingMetadata = txn.metadata
Expand All @@ -554,7 +555,7 @@ case class CreateDeltaTableCommand(
// However, if in column mapping mode, we can safely ignore the related metadata fields in
// existing metadata because new table desc will not have related metadata assigned yet
val differences = SchemaUtils.reportDifferences(
dropColumnMappingMetadata(existingMetadata.schema),
DeltaTableUtils.removeInternalDeltaMetadata(sparkSession, existingMetadata.schema),
tableDesc.schema)
if (differences.nonEmpty) {
throw DeltaErrors.createTableWithDifferentSchemaException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1134,7 +1134,7 @@ case class AlterTableSetLocationDeltaCommand(
val bypassSchemaCheck = sparkSession.sessionState.conf.getConf(
DeltaSQLConf.DELTA_ALTER_LOCATION_BYPASS_SCHEMA_CHECK)

if (!bypassSchemaCheck && !schemasEqual(oldMetadata, newMetadata)) {
if (!bypassSchemaCheck && !schemasEqual(sparkSession, oldMetadata, newMetadata)) {
throw DeltaErrors.alterTableSetLocationSchemaMismatchException(
oldMetadata.schema, newMetadata.schema)
}
Expand All @@ -1144,12 +1144,12 @@ case class AlterTableSetLocationDeltaCommand(
}

private def schemasEqual(
sparkSession: SparkSession,
oldMetadata: actions.Metadata, newMetadata: actions.Metadata): Boolean = {
import DeltaColumnMapping._
dropColumnMappingMetadata(oldMetadata.schema) ==
dropColumnMappingMetadata(newMetadata.schema) &&
dropColumnMappingMetadata(oldMetadata.partitionSchema) ==
dropColumnMappingMetadata(newMetadata.partitionSchema)
DeltaTableUtils.removeInternalDeltaMetadata(sparkSession, oldMetadata.schema) ==
DeltaTableUtils.removeInternalDeltaMetadata(sparkSession, newMetadata.schema) &&
DeltaTableUtils.removeInternalDeltaMetadata(sparkSession, oldMetadata.partitionSchema) ==
DeltaTableUtils.removeInternalDeltaMetadata(sparkSession, newMetadata.partitionSchema)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ class DeltaDataSource
throw DeltaErrors.specifySchemaAtReadTimeException
}

val schemaToUse = DeltaColumnMapping.dropColumnMappingMetadata(
val schemaToUse = DeltaTableUtils.removeInternalDeltaMetadata(
sqlContext.sparkSession,
DeltaTableUtils.removeInternalWriterMetadata(sqlContext.sparkSession, readSchema)
)
if (schemaToUse.isEmpty) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1295,6 +1295,14 @@ trait DeltaSQLConfBase {
.booleanConf
.createWithDefault(false)

val DELTA_TYPE_WIDENING_REMOVE_SCHEMA_METADATA =
buildConf("typeWidening.removeSchemaMetadata")
.doc("When true, type widening metadata is removed from schemas that are surfaced outside " +
"of Delta or used for schema comparisons")
.internal()
.booleanConf
.createWithDefault(true)

val DELTA_IS_DELTA_TABLE_THROW_ON_ERROR =
buildConf("isDeltaTable.throwOnError")
.internal()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ trait DeltaSourceBase extends Source
} else {
readSchema
}
DeltaColumnMapping.dropColumnMappingMetadata(readSchemaWithCdc)
DeltaTableUtils.removeInternalDeltaMetadata(spark, readSchemaWithCdc)
}

// A dummy empty dataframe that can be returned at various point during streaming
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,34 +107,9 @@ trait TypeWideningAlterTableNestedTests {

assert(readDeltaTable(tempPath).schema === new StructType()
.add("s", new StructType()
.add("a", ShortType, nullable = true, metadata = new MetadataBuilder()
.putMetadataArray("delta.typeChanges", Array(
new MetadataBuilder()
.putString("toType", "short")
.putString("fromType", "byte")
.build()
)).build()))
.add("m", MapType(IntegerType, IntegerType), nullable = true, metadata = new MetadataBuilder()
.putMetadataArray("delta.typeChanges", Array(
new MetadataBuilder()
.putString("toType", "integer")
.putString("fromType", "byte")
.putString("fieldPath", "key")
.build(),
new MetadataBuilder()
.putString("toType", "integer")
.putString("fromType", "short")
.putString("fieldPath", "value")
.build()
)).build())
.add("a", ArrayType(IntegerType), nullable = true, metadata = new MetadataBuilder()
.putMetadataArray("delta.typeChanges", Array(
new MetadataBuilder()
.putString("toType", "integer")
.putString("fromType", "short")
.putString("fieldPath", "element")
.build()
)).build()))
.add("a", ShortType))
.add("m", MapType(IntegerType, IntegerType))
.add("a", ArrayType(IntegerType)))

append(Seq((5, 6, 7, 8))
.toDF("a", "b", "c", "d")
Expand All @@ -153,34 +128,9 @@ trait TypeWideningAlterTableNestedTests {
"(s struct<a: short>, m map<int, int>, a array<int>)")
assert(readDeltaTable(tempPath).schema === new StructType()
.add("s", new StructType()
.add("a", ShortType, nullable = true, metadata = new MetadataBuilder()
.putMetadataArray("delta.typeChanges", Array(
new MetadataBuilder()
.putString("toType", "short")
.putString("fromType", "byte")
.build()
)).build()))
.add("m", MapType(IntegerType, IntegerType), nullable = true, metadata = new MetadataBuilder()
.putMetadataArray("delta.typeChanges", Array(
new MetadataBuilder()
.putString("toType", "integer")
.putString("fromType", "byte")
.putString("fieldPath", "key")
.build(),
new MetadataBuilder()
.putString("toType", "integer")
.putString("fromType", "short")
.putString("fieldPath", "value")
.build()
)).build())
.add("a", ArrayType(IntegerType), nullable = true, metadata = new MetadataBuilder()
.putMetadataArray("delta.typeChanges", Array(
new MetadataBuilder()
.putString("toType", "integer")
.putString("fromType", "short")
.putString("fieldPath", "element")
.build()
)).build()))
.add("a", ShortType))
.add("m", MapType(IntegerType, IntegerType))
.add("a", ArrayType(IntegerType)))

append(Seq((5, 6, 7, 8))
.toDF("a", "b", "c", "d")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,15 +135,7 @@ trait TypeWideningAlterTableTests
append(Seq(1, 2).toDF("value").select($"value".cast(ShortType)))
assert(readDeltaTable(tempPath).schema === new StructType().add("value", ShortType))
sql(s"ALTER TABLE delta.`$tempPath` REPLACE COLUMNS (value INT)")
assert(readDeltaTable(tempPath).schema ===
new StructType()
.add("value", IntegerType, nullable = true, metadata = new MetadataBuilder()
.putMetadataArray("delta.typeChanges", Array(
new MetadataBuilder()
.putString("toType", "integer")
.putString("fromType", "short")
.build()
)).build()))
assert(readDeltaTable(tempPath).schema === new StructType().add("value", IntegerType))
checkAnswer(readDeltaTable(tempPath), Seq(Row(1), Row(2)))
append(Seq(3, 4).toDF("value"))
checkAnswer(readDeltaTable(tempPath), Seq(Row(1), Row(2), Row(3), Row(4)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,7 @@ trait TypeWideningCompatibilityTests {
append(Seq((2.toShort, "ghi", "jkl")).toDF("a", "c", "v"))
assert(readDeltaTable(tempPath).schema ===
new StructType()
.add("a", ShortType, nullable = true,
metadata = typeWideningMetadata(ByteType, ShortType))
.add("a", ShortType)
.add("c", StringType, nullable = true,
metadata = new MetadataBuilder()
.putString("__CHAR_VARCHAR_TYPE_STRING", "char(3)")
Expand All @@ -148,8 +147,7 @@ trait TypeWideningCompatibilityTests {
append(Seq((3.toShort, "longer string 1", "longer string 2")).toDF("a", "c", "v"))
assert(readDeltaTable(tempPath).schema ===
new StructType()
.add("a", ShortType, nullable = true,
metadata = typeWideningMetadata(ByteType, ShortType))
.add("a", ShortType)
.add("c", StringType)
.add("v", StringType))
checkAnswer(readDeltaTable(tempPath),
Expand Down
Loading

0 comments on commit dad2c5a

Please sign in to comment.