Skip to content

Commit

Permalink
Support altering multiple columns in the same command
Browse files Browse the repository at this point in the history
  • Loading branch information
ctring committed Jan 21, 2025
1 parent 3ef529b commit c24c380
Show file tree
Hide file tree
Showing 13 changed files with 529 additions and 287 deletions.
12 changes: 12 additions & 0 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -3938,6 +3938,18 @@
],
"sqlState" : "0A000"
},
"NOT_SUPPORTED_CHANGE_PARENT_CHILD_COLUMN" : {
"message" : [
"ALTER TABLE ALTER/CHANGE COLUMN is not supported for changing <table>'s parent field <parentField> and child field <childField> in the same command."
],
"sqlState" : "0A000"
},
"NOT_SUPPORTED_CHANGE_SAME_COLUMN" : {
"message" : [
"ALTER TABLE ALTER/CHANGE COLUMN is not supported for changing <table>'s column <fieldName> multiple times in the same command."
],
"sqlState" : "0A000"
},
"NOT_SUPPORTED_COMMAND_FOR_V2_TABLE" : {
"message" : [
"<cmd> is not supported for v2 tables."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,7 @@ statement
| ALTER (TABLE | VIEW) identifierReference
UNSET TBLPROPERTIES (IF EXISTS)? propertyList #unsetTableProperties
| ALTER TABLE table=identifierReference
(ALTER | CHANGE) COLUMN? column=multipartIdentifier
alterColumnAction? #alterTableAlterColumn
(ALTER | CHANGE) COLUMN? columns=alterColumnSpecList #alterTableAlterColumn
| ALTER TABLE table=identifierReference partitionSpec?
CHANGE COLUMN?
colName=multipartIdentifier colType colPosition? #hiveChangeColumn
Expand Down Expand Up @@ -1489,6 +1488,14 @@ number
| MINUS? BIGDECIMAL_LITERAL #bigDecimalLiteral
;

alterColumnSpecList
: alterColumnSpec (COMMA alterColumnSpec)*
;

alterColumnSpec
: column=multipartIdentifier alterColumnAction?
;

alterColumnAction
: TYPE dataType
| commentSpec
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3847,24 +3847,29 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor
resolved.copyTagsFrom(a)
resolved

case a @ AlterColumn(
table: ResolvedTable, ResolvedFieldName(path, field), dataType, _, _, position, _) =>
val newDataType = dataType.flatMap { dt =>
// Hive style syntax provides the column type, even if it may not have changed.
val existing = CharVarcharUtils.getRawType(field.metadata).getOrElse(field.dataType)
if (existing == dt) None else Some(dt)
}
val newPosition = position map {
case u @ UnresolvedFieldPosition(after: After) =>
// TODO: since the field name is already resolved, it's more efficient if
// `ResolvedFieldName` carries the parent struct and we resolve column position
// based on the parent struct, instead of re-resolving the entire column path.
val resolved = resolveFieldNames(table, path :+ after.column(), u)
ResolvedFieldPosition(ColumnPosition.after(resolved.field.name))
case u: UnresolvedFieldPosition => ResolvedFieldPosition(u.position)
case other => other
case a @ AlterColumns(table: ResolvedTable, columns, specs) =>
val resolvedSpecs = columns.zip(specs).map {
case (ResolvedFieldName(path, field), s @ AlterColumnSpec(dataType, _, _, position, _)) =>
val newDataType = dataType.flatMap { dt =>
// Hive style syntax provides the column type, even if it may not have changed.
val existing = CharVarcharUtils.getRawType(field.metadata).getOrElse(field.dataType)
if (existing == dt) None else Some(dt)
}
val newPosition = position map {
case u @ UnresolvedFieldPosition(after: After) =>
// TODO: since the field name is already resolved, it's more efficient if
// `ResolvedFieldName` carries the parent struct and we resolve column
// position based on the parent struct, instead of re-resolving the entire
// column path.
val resolved = resolveFieldNames(table, path :+ after.column(), u)
ResolvedFieldPosition(ColumnPosition.after(resolved.field.name))
case u: UnresolvedFieldPosition => ResolvedFieldPosition(u.position)
case other => other
}
s.copy(dataType = newDataType, position = newPosition)
case (_, spec) => spec
}
val resolved = a.copy(dataType = newDataType, position = newPosition)
val resolved = a.copy(specs = resolvedSpecs)
resolved.copyTagsFrom(a)
resolved
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1622,60 +1622,84 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB
case RenameColumn(table: ResolvedTable, col: ResolvedFieldName, newName) =>
checkColumnNotExists("rename", col.path :+ newName, table.schema)

case a @ AlterColumn(table: ResolvedTable, col: ResolvedFieldName, _, _, _, _, _) =>
val fieldName = col.name.quoted
if (a.dataType.isDefined) {
val field = CharVarcharUtils.getRawType(col.field.metadata)
.map(dt => col.field.copy(dataType = dt))
.getOrElse(col.field)
val newDataType = a.dataType.get
newDataType match {
case _: StructType => alter.failAnalysis(
"CANNOT_UPDATE_FIELD.STRUCT_TYPE",
Map("table" -> toSQLId(table.name), "fieldName" -> toSQLId(fieldName)))
case _: MapType => alter.failAnalysis(
"CANNOT_UPDATE_FIELD.MAP_TYPE",
Map("table" -> toSQLId(table.name), "fieldName" -> toSQLId(fieldName)))
case _: ArrayType => alter.failAnalysis(
"CANNOT_UPDATE_FIELD.ARRAY_TYPE",
Map("table" -> toSQLId(table.name), "fieldName" -> toSQLId(fieldName)))
case u: UserDefinedType[_] => alter.failAnalysis(
"CANNOT_UPDATE_FIELD.USER_DEFINED_TYPE",
Map(
"table" -> toSQLId(table.name),
"fieldName" -> toSQLId(fieldName),
"udtSql" -> toSQLType(u)))
case _: CalendarIntervalType | _: AnsiIntervalType => alter.failAnalysis(
"CANNOT_UPDATE_FIELD.INTERVAL_TYPE",
Map("table" -> toSQLId(table.name), "fieldName" -> toSQLId(fieldName)))
case _ => // update is okay
}

// We don't need to handle nested types here which shall fail before.
def canAlterColumnType(from: DataType, to: DataType): Boolean = (from, to) match {
case (CharType(l1), CharType(l2)) => l1 == l2
case (CharType(l1), VarcharType(l2)) => l1 <= l2
case (VarcharType(l1), VarcharType(l2)) => l1 <= l2
case _ => Cast.canUpCast(from, to)
}
if (!canAlterColumnType(field.dataType, newDataType)) {
case AlterColumns(table: ResolvedTable, columns, specs) =>
val groupedColumns = columns.groupBy(_.name)
groupedColumns.collect {
case (name, occurrences) if occurrences.length > 1 =>
alter.failAnalysis(
errorClass = "NOT_SUPPORTED_CHANGE_COLUMN",
errorClass = "NOT_SUPPORTED_CHANGE_SAME_COLUMN",
messageParameters = Map(
"table" -> toSQLId(table.name),
"originName" -> toSQLId(fieldName),
"originType" -> toSQLType(field.dataType),
"newName" -> toSQLId(fieldName),
"newType" -> toSQLType(newDataType)))
}
"fieldName" -> toSQLId(name)))
}
if (a.nullable.isDefined) {
if (!a.nullable.get && col.field.nullable) {
groupedColumns.keys.foreach { name =>
val child = groupedColumns.keys.find(child => child != name && child.startsWith(name))
if (child.isDefined) {
alter.failAnalysis(
errorClass = "_LEGACY_ERROR_TEMP_2330",
messageParameters = Map("fieldName" -> fieldName))
errorClass = "NOT_SUPPORTED_CHANGE_PARENT_CHILD_COLUMN",
messageParameters = Map(
"table" -> toSQLId(table.name),
"parentField" -> toSQLId(name),
"childField" -> toSQLId(child.get)))
}
}
columns.zip(specs).foreach {
case (col: ResolvedFieldName, a: AlterColumnSpec) =>
val fieldName = col.name.quoted
if (a.dataType.isDefined) {
val field = CharVarcharUtils.getRawType(col.field.metadata)
.map(dt => col.field.copy(dataType = dt))
.getOrElse(col.field)
val newDataType = a.dataType.get
newDataType match {
case _: StructType => alter.failAnalysis(
"CANNOT_UPDATE_FIELD.STRUCT_TYPE",
Map("table" -> toSQLId(table.name), "fieldName" -> toSQLId(fieldName)))
case _: MapType => alter.failAnalysis(
"CANNOT_UPDATE_FIELD.MAP_TYPE",
Map("table" -> toSQLId(table.name), "fieldName" -> toSQLId(fieldName)))
case _: ArrayType => alter.failAnalysis(
"CANNOT_UPDATE_FIELD.ARRAY_TYPE",
Map("table" -> toSQLId(table.name), "fieldName" -> toSQLId(fieldName)))
case u: UserDefinedType[_] => alter.failAnalysis(
"CANNOT_UPDATE_FIELD.USER_DEFINED_TYPE",
Map(
"table" -> toSQLId(table.name),
"fieldName" -> toSQLId(fieldName),
"udtSql" -> toSQLType(u)))
case _: CalendarIntervalType | _: AnsiIntervalType => alter.failAnalysis(
"CANNOT_UPDATE_FIELD.INTERVAL_TYPE",
Map("table" -> toSQLId(table.name), "fieldName" -> toSQLId(fieldName)))
case _ => // update is okay
}

// We don't need to handle nested types here which shall fail before.
def canAlterColumnType(from: DataType, to: DataType): Boolean = (from, to) match {
case (CharType(l1), CharType(l2)) => l1 == l2
case (CharType(l1), VarcharType(l2)) => l1 <= l2
case (VarcharType(l1), VarcharType(l2)) => l1 <= l2
case _ => Cast.canUpCast(from, to)
}
if (!canAlterColumnType(field.dataType, newDataType)) {
alter.failAnalysis(
errorClass = "NOT_SUPPORTED_CHANGE_COLUMN",
messageParameters = Map(
"table" -> toSQLId(table.name),
"originName" -> toSQLId(fieldName),
"originType" -> toSQLType(field.dataType),
"newName" -> toSQLId(fieldName),
"newType" -> toSQLType(newDataType)))
}
}
if (a.nullable.isDefined) {
if (!a.nullable.get && col.field.nullable) {
alter.failAnalysis(
errorClass = "_LEGACY_ERROR_TEMP_2330",
messageParameters = Map("fieldName" -> fieldName))
}
}
case _ =>
}
case _ =>
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, Literal}
import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumn, AlterViewAs, ColumnDefinition, CreateView, LogicalPlan, QualifiedColType, ReplaceColumns, V1CreateTablePlan, V2CreateTablePlan}
import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumns, AlterColumnSpec, AlterViewAs, ColumnDefinition, CreateView, LogicalPlan, QualifiedColType, ReplaceColumns, V1CreateTablePlan, V2CreateTablePlan}
import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor}
import org.apache.spark.sql.types.{DataType, StringType}

Expand Down Expand Up @@ -93,7 +93,7 @@ object ResolveDefaultStringTypes extends Rule[LogicalPlan] {
StringType(conf.defaultStringType.collationId)

private def isDDLCommand(plan: LogicalPlan): Boolean = plan exists {
case _: AddColumns | _: ReplaceColumns | _: AlterColumn => true
case _: AddColumns | _: ReplaceColumns | _: AlterColumns => true
case _ => isCreateOrAlterPlan(plan)
}

Expand All @@ -115,9 +115,13 @@ object ResolveDefaultStringTypes extends Rule[LogicalPlan] {
case replaceCols: ReplaceColumns =>
replaceCols.copy(columnsToAdd = replaceColumnTypes(replaceCols.columnsToAdd, newType))

case alter: AlterColumn
if alter.dataType.isDefined && hasDefaultStringType(alter.dataType.get) =>
alter.copy(dataType = Some(replaceDefaultStringType(alter.dataType.get, newType)))
case a @ AlterColumns(_, _, specs: Seq[AlterColumnSpec]) =>
val newSpecs = specs.map {
case spec if spec.dataType.isDefined && hasDefaultStringType(spec.dataType.get) =>
spec.copy(dataType = Some(replaceDefaultStringType(spec.dataType.get, newType)))
case col => col
}
a.copy(specs = newSpecs)
}
}

Expand Down
Loading

0 comments on commit c24c380

Please sign in to comment.