From a8121daca56542415902252987f6d4a405c7e1e4 Mon Sep 17 00:00:00 2001 From: Cuong Nguyen Date: Tue, 25 Feb 2025 19:12:36 +0000 Subject: [PATCH] Support committing multiple column changes in one txn --- .../spark/sql/delta/DeltaOperations.scala | 17 ++ .../sql/delta/OptimisticTransaction.scala | 5 +- .../sql/delta/catalog/DeltaCatalog.scala | 72 ++--- .../commands/alterDeltaTableCommands.scala | 284 +++++++++++------- 4 files changed, 229 insertions(+), 149 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala index 351025883d5..7957260ab64 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaOperations.scala @@ -565,6 +565,23 @@ object DeltaOperations { override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false) } + + /** Recorded when columns are changed in bulk. */ + case class ChangeColumns(columns: Seq[ChangeColumn]) extends Operation("CHANGE COLUMNS") { + + override val parameters: Map[String, Any] = Map( + "columns" -> JsonUtils.toJson( + columns.map(col => + structFieldToMap(col.columnPath, col.newColumn) ++ col.colPosition.map("position" -> _)) + ) + ) + + // This operation shouldn't be introducing AddFile actions at all. This check should be trivial. + override def checkAddFileWithDeletionVectorStatsAreNotTightBounds: Boolean = true + + override val isInPlaceFileMetadataUpdate: Option[Boolean] = Some(false) + } + /** Recorded when columns are replaced. */ case class ReplaceColumns( columns: Seq[StructField]) extends Operation("REPLACE COLUMNS") { diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala index af57cbaac90..c4935a7e060 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/OptimisticTransaction.scala @@ -28,7 +28,7 @@ import scala.util.control.NonFatal import com.databricks.spark.util.TagDefinitions.TAG_LOG_STORE_CLASS import org.apache.spark.sql.delta.ClassicColumnConversions._ -import org.apache.spark.sql.delta.DeltaOperations.{ChangeColumn, CreateTable, Operation, ReplaceColumns, ReplaceTable, UpdateSchema} +import org.apache.spark.sql.delta.DeltaOperations.{ChangeColumn, ChangeColumns, CreateTable, Operation, ReplaceColumns, ReplaceTable, UpdateSchema} import org.apache.spark.sql.delta.RowId.RowTrackingMetadataDomain import org.apache.spark.sql.delta.actions._ import org.apache.spark.sql.delta.catalog.DeltaTableV2 @@ -2751,6 +2751,9 @@ trait OptimisticTransactionImpl extends DeltaTransaction case change: ChangeColumn if usesDefaults(change.newColumn) => throwError("WRONG_COLUMN_DEFAULTS_FOR_DELTA_FEATURE_NOT_ENABLED", Array("ALTER TABLE")) + case changes: ChangeColumns if changes.columns.exists(c => usesDefaults(c.newColumn)) => + throwError("WRONG_COLUMN_DEFAULTS_FOR_DELTA_FEATURE_NOT_ENABLED", + Array("ALTER TABLE")) case create: CreateTable if create.metadata.schema.fields.exists(usesDefaults) => throwError("WRONG_COLUMN_DEFAULTS_FOR_DELTA_FEATURE_NOT_ENABLED", Array("CREATE TABLE")) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalog.scala b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalog.scala index 6d827928b69..d756b9b0179 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalog.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/catalog/DeltaCatalog.scala @@ -660,9 +660,7 @@ class DeltaCatalog extends DelegatingCatalogExtension case _ => return super.alterTable(ident, changes: _*) } - // Whether this is an ALTER TABLE ALTER COLUMN SYNC IDENTITY command. - var syncIdentity = false - val columnUpdates = new mutable.HashMap[Seq[String], (StructField, Option[ColumnPosition])]() + val columnUpdates = new mutable.HashMap[Seq[String], DeltaChangeColumnSpec]() val isReplaceColumnsCommand = grouped.get(classOf[DeleteColumn]) match { case Some(deletes) if grouped.contains(classOf[AddColumn]) => // Convert to Seq so that contains method works @@ -745,7 +743,8 @@ class DeltaCatalog extends DelegatingCatalogExtension } else { snapshotSchema } - def getColumn(fieldNames: Seq[String]): (StructField, Option[ColumnPosition]) = { + def getColumn(fieldNames: Seq[String]) + : DeltaChangeColumnSpec = { columnUpdates.getOrElseUpdate(fieldNames, { val colName = UnresolvedAttribute(fieldNames).name val fieldOpt = schema.findNestedField(fieldNames, includeCollections = true, @@ -754,7 +753,12 @@ class DeltaCatalog extends DelegatingCatalogExtension val field = fieldOpt.getOrElse { throw DeltaErrors.nonExistentColumnInSchema(colName, schema.treeString) } - field -> None + DeltaChangeColumnSpec( + fieldNames.init, + fieldNames.last, + field, + colPosition = None, + syncIdentity = false) }) } @@ -768,8 +772,8 @@ class DeltaCatalog extends DelegatingCatalogExtension disallowedColumnChangesOnIdentityColumns.foreach { case change: ColumnChange => val field = change.fieldNames() - val (existingField, _) = getColumn(field) - if (ColumnWithDefaultExprUtils.isIdentityColumn(existingField)) { + val spec = getColumn(field) + if (ColumnWithDefaultExprUtils.isIdentityColumn(spec.newColumn)) { throw DeltaErrors.identityColumnAlterColumnNotSupported() } } @@ -777,50 +781,54 @@ class DeltaCatalog extends DelegatingCatalogExtension columnChanges.foreach { case comment: UpdateColumnComment => val field = comment.fieldNames() - val (oldField, pos) = getColumn(field) - columnUpdates(field) = oldField.withComment(comment.newComment()) -> pos + val spec = getColumn(field) + columnUpdates(field) = spec.copy( + newColumn = spec.newColumn.withComment(comment.newComment())) case dataType: UpdateColumnType => val field = dataType.fieldNames() - val (oldField, pos) = getColumn(field) - columnUpdates(field) = oldField.copy(dataType = dataType.newDataType()) -> pos + val spec = getColumn(field) + columnUpdates(field) = spec.copy( + newColumn = spec.newColumn.copy(dataType = dataType.newDataType())) case position: UpdateColumnPosition => val field = position.fieldNames() - val (oldField, pos) = getColumn(field) - columnUpdates(field) = oldField -> Option(position.position()) + val spec = getColumn(field) + columnUpdates(field) = spec.copy(colPosition = Option(position.position())) case nullability: UpdateColumnNullability => val field = nullability.fieldNames() - val (oldField, pos) = getColumn(field) - columnUpdates(field) = oldField.copy(nullable = nullability.nullable()) -> pos + val spec = getColumn(field) + columnUpdates(field) = spec.copy( + newColumn = spec.newColumn.copy(nullable = nullability.nullable())) case rename: RenameColumn => val field = rename.fieldNames() - val (oldField, pos) = getColumn(field) - columnUpdates(field) = oldField.copy(name = rename.newName()) -> pos + val spec = getColumn(field) + columnUpdates(field) = spec.copy( + newColumn = spec.newColumn.copy(name = rename.newName())) case sync: SyncIdentity => - syncIdentity = true val field = sync.fieldNames - val (oldField, pos) = getColumn(field) - if (!ColumnWithDefaultExprUtils.isIdentityColumn(oldField)) { + val spec = getColumn(field).copy(syncIdentity = true) + columnUpdates(field) = spec + if (!ColumnWithDefaultExprUtils.isIdentityColumn(spec.newColumn)) { throw DeltaErrors.identityColumnAlterNonIdentityColumnError() } // If the IDENTITY column does not allow explicit insert, high water mark should - // always be sync'ed and this is an no-op. - if (IdentityColumn.allowExplicitInsert(oldField)) { - columnUpdates(field) = oldField.copy() -> pos + // always be sync'ed and this is a no-op. + if (IdentityColumn.allowExplicitInsert(spec.newColumn)) { + columnUpdates(field) = spec } case updateDefault: UpdateColumnDefaultValue => val field = updateDefault.fieldNames() - val (oldField, pos) = getColumn(field) + val spec = getColumn(field) val updatedField = updateDefault.newDefaultValue() match { - case "" => oldField.clearCurrentDefaultValue() - case newDefault => oldField.withCurrentDefaultValue(newDefault) + case "" => spec.newColumn.clearCurrentDefaultValue() + case newDefault => spec.newColumn.withCurrentDefaultValue(newDefault) } - columnUpdates(field) = updatedField -> pos + columnUpdates(field) = spec.copy(newColumn = updatedField) case other => throw DeltaErrors.unrecognizedColumnChange(s"${other.getClass}") @@ -872,14 +880,8 @@ class DeltaCatalog extends DelegatingCatalogExtension } } - columnUpdates.foreach { case (fieldNames, (newField, newPositionOpt)) => - AlterTableChangeColumnDeltaCommand( - table, - fieldNames.dropRight(1), - fieldNames.last, - newField, - newPositionOpt, - syncIdentity = syncIdentity).run(spark) + if (columnUpdates.nonEmpty) { + AlterTableChangeColumnDeltaCommand(table, columnUpdates.values.toSeq).run(spark) } loadTable(ident) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala index f684714c39f..83fa8ee5f30 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/alterDeltaTableCommands.scala @@ -717,6 +717,13 @@ case class AlterTableDropColumnsDeltaCommand( } } +case class DeltaChangeColumnSpec( + columnPath: Seq[String], + columnName: String, + newColumn: StructField, + colPosition: Option[ColumnPosition], + syncIdentity: Boolean) + /** * A command to change the column for a Delta table, support changing the comment of a column and * reordering columns. @@ -730,11 +737,7 @@ case class AlterTableDropColumnsDeltaCommand( */ case class AlterTableChangeColumnDeltaCommand( table: DeltaTableV2, - columnPath: Seq[String], - columnName: String, - newColumn: StructField, - colPosition: Option[ColumnPosition], - syncIdentity: Boolean) + columnChanges: Seq[DeltaChangeColumnSpec]) extends LeafRunnableCommand with AlterDeltaTableCommand with IgnoreCachedData { override def run(sparkSession: SparkSession): Seq[Row] = { @@ -751,89 +754,107 @@ case class AlterTableChangeColumnDeltaCommand( } val resolver = sparkSession.sessionState.conf.resolver - // Verify that the columnName provided actually exists in the schema - SchemaUtils.findColumnPosition(columnPath :+ columnName, oldSchema, resolver) - - val transformedSchema = transformSchema(oldSchema, Some(columnName)) { - case (`columnPath`, struct @ StructType(fields), _) => - val oldColumn = struct(columnName) - - // Analyzer already validates the char/varchar type change of ALTER COLUMN in - // `CheckAnalysis.checkAlterTableCommand`. We should normalize char/varchar type - // to string type first, then apply Delta-specific checks. - val oldColumnForVerification = if (bypassCharVarcharToStringFix) { - oldColumn - } else { - CharVarcharUtils.replaceCharVarcharWithStringInSchema(StructType(Seq(oldColumn))).head - } - verifyColumnChange(sparkSession, oldColumnForVerification, resolver, txn) - - val newField = { - if (syncIdentity) { - assert(oldColumn == newColumn) - val df = txn.snapshot.deltaLog.createDataFrame(txn.snapshot, txn.filterFiles()) - val allowLoweringHighWaterMarkForSyncIdentity = sparkSession.conf - .get(DeltaSQLConf.DELTA_IDENTITY_ALLOW_SYNC_IDENTITY_TO_LOWER_HIGH_WATER_MARK) - val field = IdentityColumn.syncIdentity( - deltaLog, - newColumn, - df, - allowLoweringHighWaterMarkForSyncIdentity - ) - txn.setSyncIdentity() - txn.readWholeTable() - field + columnChanges.foreach(change => { + val columnName = change.columnName + val columnPath = change.columnPath + val newColumn = change.newColumn + if (newColumn.name != columnName) { + // need to validate the changes if the column is renamed + checkDependentExpressions( + sparkSession, columnPath :+ columnName, metadata, txn.protocol) + } + // Verify that the columnName provided actually exists in the schema + SchemaUtils.findColumnPosition(columnPath :+ columnName, oldSchema, resolver) + }) + + def transformSchemaOnce(prevSchema: StructType, change: DeltaChangeColumnSpec) = { + val columnPath = change.columnPath + val columnName = change.columnName + val newColumn = change.newColumn + transformSchema(prevSchema, Some(columnName)) { + case (`columnPath`, struct @ StructType(fields), _) => + val oldColumn = struct(columnName) + + // Analyzer already validates the char/varchar type change of ALTER COLUMN in + // `CheckAnalysis.checkAlterTableCommand`. We should normalize char/varchar type + // to string type first, then apply Delta-specific checks. + val oldColumnForVerification = if (bypassCharVarcharToStringFix) { + oldColumn } else { - // Take the name, comment, nullability and data type from newField - // It's crucial to keep the old column's metadata, which may contain column mapping - // metadata. - var result = newColumn.getComment().map(oldColumn.withComment).getOrElse(oldColumn) - // Apply the current default value as well, if any. - result = newColumn.getCurrentDefaultValue() match { - case Some(newDefaultValue) => result.withCurrentDefaultValue(newDefaultValue) - case None => result.clearCurrentDefaultValue() + CharVarcharUtils.replaceCharVarcharWithStringInSchema(StructType(Seq(oldColumn))).head + } + verifyColumnChange(change, sparkSession, oldColumnForVerification, resolver, txn) + + val newField = { + if (change.syncIdentity) { + assert(oldColumn == newColumn) + val df = txn.snapshot.deltaLog.createDataFrame(txn.snapshot, txn.filterFiles()) + val allowLoweringHighWaterMarkForSyncIdentity = sparkSession.conf + .get(DeltaSQLConf.DELTA_IDENTITY_ALLOW_SYNC_IDENTITY_TO_LOWER_HIGH_WATER_MARK) + val field = IdentityColumn.syncIdentity( + deltaLog, + newColumn, + df, + allowLoweringHighWaterMarkForSyncIdentity + ) + txn.setSyncIdentity() + txn.readWholeTable() + field + } else { + // Take the name, comment, nullability and data type from newField + // It's crucial to keep the old column's metadata, which may contain column mapping + // metadata. + var result = newColumn.getComment().map(oldColumn.withComment).getOrElse(oldColumn) + // Apply the current default value as well, if any. + result = newColumn.getCurrentDefaultValue() match { + case Some(newDefaultValue) => result.withCurrentDefaultValue(newDefaultValue) + case None => result.clearCurrentDefaultValue() + } + + result + .copy( + name = newColumn.name, + dataType = + SchemaUtils.changeDataType(oldColumn.dataType, newColumn.dataType, resolver), + nullable = newColumn.nullable) } - - result - .copy( - name = newColumn.name, - dataType = - SchemaUtils.changeDataType(oldColumn.dataType, newColumn.dataType, resolver), - nullable = newColumn.nullable) } - } - - // Replace existing field with new field - val newFieldList = fields.map { field => - if (DeltaColumnMapping.getPhysicalName(field) == - DeltaColumnMapping.getPhysicalName(newField)) { - newField - } else field - } - - // Reorder new field to correct position if necessary - StructType(colPosition.map { position => - reorderFieldList(struct, newFieldList, newField, position, resolver) - }.getOrElse(newFieldList.toSeq)) - - case (`columnPath`, m: MapType, _) if columnName == "key" => - val originalField = StructField(columnName, m.keyType, nullable = false) - verifyMapArrayChange(sparkSession, originalField, resolver, txn) - m.copy(keyType = SchemaUtils.changeDataType(m.keyType, newColumn.dataType, resolver)) - case (`columnPath`, m: MapType, _) if columnName == "value" => - val originalField = StructField(columnName, m.valueType, nullable = m.valueContainsNull) - verifyMapArrayChange(sparkSession, originalField, resolver, txn) - m.copy(valueType = SchemaUtils.changeDataType(m.valueType, newColumn.dataType, resolver)) - - case (`columnPath`, a: ArrayType, _) if columnName == "element" => - val originalField = StructField(columnName, a.elementType, nullable = a.containsNull) - verifyMapArrayChange(sparkSession, originalField, resolver, txn) - a.copy(elementType = - SchemaUtils.changeDataType(a.elementType, newColumn.dataType, resolver)) + // Replace existing field with new field + val newFieldList = fields.map { field => + if (DeltaColumnMapping.getPhysicalName(field) == + DeltaColumnMapping.getPhysicalName(newField)) { + newField + } else field + } - case (_, other @ (_: StructType | _: ArrayType | _: MapType), _) => other + // Reorder new field to correct position if necessary + StructType(change.colPosition.map { position => + reorderFieldList(columnName, struct, newFieldList, newField, position, resolver) + }.getOrElse(newFieldList.toSeq)) + + case (`columnPath`, m: MapType, _) if columnName == "key" => + val originalField = StructField(columnName, m.keyType, nullable = false) + verifyMapArrayChange(change, sparkSession, originalField, resolver, txn) + m.copy(keyType = SchemaUtils.changeDataType(m.keyType, newColumn.dataType, resolver)) + + case (`columnPath`, m: MapType, _) if columnName == "value" => + val originalField = StructField(columnName, m.valueType, nullable = m.valueContainsNull) + verifyMapArrayChange(change, sparkSession, originalField, resolver, txn) + m.copy( + valueType = SchemaUtils.changeDataType(m.valueType, newColumn.dataType, resolver)) + + case (`columnPath`, a: ArrayType, _) if columnName == "element" => + val originalField = StructField(columnName, a.elementType, nullable = a.containsNull) + verifyMapArrayChange(change, sparkSession, originalField, resolver, txn) + a.copy(elementType = + SchemaUtils.changeDataType(a.elementType, newColumn.dataType, resolver)) + + case (_, other @ (_: StructType | _: ArrayType | _: MapType), _) => other + } } + + val transformedSchema = columnChanges.foldLeft(oldSchema)(transformSchemaOnce) val newSchema = if (bypassCharVarcharToStringFix) { transformedSchema } else { @@ -844,47 +865,70 @@ case class AlterTableChangeColumnDeltaCommand( CharVarcharUtils.replaceCharVarcharWithStringInSchema(transformedSchema) } - // update `partitionColumns` if the changed column is a partition column - val newPartitionColumns = if (columnPath.isEmpty) { - metadata.partitionColumns.map { partCol => - if (partCol == columnName) newColumn.name else partCol - } - } else metadata.partitionColumns - - val oldColumnPath = columnPath :+ columnName - val newColumnPath = columnPath :+ newColumn.name - // Rename the column in the delta statistics columns configuration, if present. - val newConfiguration = metadata.configuration ++ - StatisticsCollection.renameDeltaStatsColumn(metadata, oldColumnPath, newColumnPath) - val newSchemaWithTypeWideningMetadata = TypeWideningMetadata.addTypeWideningMetadata( txn, schema = newSchema, oldSchema = metadata.schema) - val newMetadata = metadata.copy( - schemaString = newSchemaWithTypeWideningMetadata.json, - partitionColumns = newPartitionColumns, - configuration = newConfiguration - ) + val metadataWithNewSchema = metadata.copy( + schemaString = newSchemaWithTypeWideningMetadata.json) + + def updateMetadataOnce(prevMetadata: actions.Metadata, change: DeltaChangeColumnSpec) = { + val columnPath = change.columnPath + val columnName = change.columnName + val newColumn = change.newColumn + // update `partitionColumns` if the changed column is a partition column + val newPartitionColumns = if (columnPath.isEmpty) { + metadata.partitionColumns.map { partCol => + if (partCol == columnName) newColumn.name else partCol + } + } else metadata.partitionColumns + + val oldColumnPath = columnPath :+ columnName + val newColumnPath = columnPath :+ newColumn.name + // Rename the column in the delta statistics columns configuration, if present. + val newConfiguration = metadata.configuration ++ + StatisticsCollection.renameDeltaStatsColumn(metadata, oldColumnPath, newColumnPath) + + val updatedMetadata = prevMetadata.copy( + partitionColumns = newPartitionColumns, + configuration = newConfiguration + ) - if (newColumn.name != columnName) { - // need to validate the changes if the column is renamed - checkDependentExpressions( - sparkSession, columnPath :+ columnName, metadata, txn.protocol) + updatedMetadata } + val newMetadata = columnChanges.foldLeft(metadataWithNewSchema)(updateMetadataOnce) + txn.updateMetadata(newMetadata) - if (newColumn.name != columnName) { - // record column rename separately - txn.commit(Nil, DeltaOperations.RenameColumn(oldColumnPath, newColumnPath)) + def getDeltaChangeColumnOperation(change: DeltaChangeColumnSpec) = + DeltaOperations.ChangeColumn( + change.columnPath, + change.columnName, + change.newColumn, + change.colPosition.map(_.toString)) + + val operation = if (columnChanges.size == 1) { + val change = columnChanges.head + val columnName = change.columnName + val newColumn = change.newColumn + if (newColumn.name != columnName) { + val columnPath = change.columnPath + val oldColumnPath = columnPath :+ columnName + val newColumnPath = columnPath :+ newColumn.name + // record column rename separately + DeltaOperations.RenameColumn(oldColumnPath, newColumnPath) + } else { + getDeltaChangeColumnOperation(change) + } } else { - txn.commit(Nil, DeltaOperations.ChangeColumn( - columnPath, columnName, newColumn, colPosition.map(_.toString))) + val changes = columnChanges.map(getDeltaChangeColumnOperation) + DeltaOperations.ChangeColumns(changes) } + txn.commit(Nil, operation) Seq.empty[Row] } @@ -893,6 +937,7 @@ case class AlterTableChangeColumnDeltaCommand( /** * Reorder the given fieldList to place `field` at the given `position` in `fieldList` * + * @param columnName Name of the column being reordered * @param struct The initial StructType with the original field at its original position * @param fieldList List of fields with the changed field in the original position * @param field The field that is to be added @@ -900,6 +945,7 @@ case class AlterTableChangeColumnDeltaCommand( * @return Returns a new list of fields with the changed field in the new position */ private def reorderFieldList( + columnName: String, struct: StructType, fieldList: Array[StructField], field: StructField, @@ -936,14 +982,18 @@ case class AlterTableChangeColumnDeltaCommand( * Note that this requires a full table scan in the case of SET NOT NULL to verify that all * existing values are valid. * + * @param change Information about the column change * @param originalField The existing column */ private def verifyColumnChange( + change: DeltaChangeColumnSpec, spark: SparkSession, originalField: StructField, resolver: Resolver, txn: OptimisticTransaction): Unit = { - + val columnPath = change.columnPath + val columnName = change.columnName + val newColumn = change.newColumn originalField.dataType match { case same if same == newColumn.dataType => // just changing comment or position so this is fine @@ -1006,10 +1056,18 @@ case class AlterTableChangeColumnDeltaCommand( * Verify whether replacing the original map key/value or array element with a new data type is a * valid operation. * + * @param change Information about the column change * @param originalField the original map key/value or array element to update. */ - private def verifyMapArrayChange(spark: SparkSession, originalField: StructField, - resolver: Resolver, txn: OptimisticTransaction): Unit = { + private def verifyMapArrayChange( + change: DeltaChangeColumnSpec, + spark: SparkSession, + originalField: StructField, + resolver: Resolver, + txn: OptimisticTransaction): Unit = { + val columnPath = change.columnPath + val columnName = change.columnName + val newColumn = change.newColumn // Map key/value and array element can't have comments. if (newColumn.getComment().nonEmpty) { throw DeltaErrors.addCommentToMapArrayException( @@ -1024,7 +1082,7 @@ case class AlterTableChangeColumnDeltaCommand( newField = newColumn ) } - verifyColumnChange(spark, originalField, resolver, txn) + verifyColumnChange(change, spark, originalField, resolver, txn) } }