From c24c380a6d3f53991a705c7d8d55f0834cb1b8a4 Mon Sep 17 00:00:00 2001 From: Cuong Nguyen Date: Fri, 17 Jan 2025 11:48:53 -0800 Subject: [PATCH] Support altering multiple columns in the same command --- .../resources/error/error-conditions.json | 12 + .../sql/catalyst/parser/SqlBaseParser.g4 | 11 +- .../sql/catalyst/analysis/Analyzer.scala | 39 +-- .../sql/catalyst/analysis/CheckAnalysis.scala | 118 ++++++---- .../analysis/ResolveDefaultStringTypes.scala | 14 +- .../sql/catalyst/parser/AstBuilder.scala | 122 +++++----- .../plans/logical/v2AlterTableCommands.scala | 74 +++--- .../sql/catalyst/parser/DDLParserSuite.scala | 222 +++++++++++------- .../analysis/ReplaceCharWithVarchar.scala | 8 +- .../analysis/ResolveSessionCatalog.scala | 14 +- .../spark/sql/connector/AlterTableTests.scala | 81 ++++++- .../V2CommandsCaseSensitivitySuite.scala | 14 +- .../command/PlanResolutionSuite.scala | 87 ++++--- 13 files changed, 529 insertions(+), 287 deletions(-) diff --git a/common/utils/src/main/resources/error/error-conditions.json b/common/utils/src/main/resources/error/error-conditions.json index d949ece679036..263bec2d57729 100644 --- a/common/utils/src/main/resources/error/error-conditions.json +++ b/common/utils/src/main/resources/error/error-conditions.json @@ -3938,6 +3938,18 @@ ], "sqlState" : "0A000" }, + "NOT_SUPPORTED_CHANGE_PARENT_CHILD_COLUMN" : { + "message" : [ + "ALTER TABLE ALTER/CHANGE COLUMN is not supported for changing 's parent field and child field in the same command." + ], + "sqlState" : "0A000" + }, + "NOT_SUPPORTED_CHANGE_SAME_COLUMN" : { + "message" : [ + "ALTER TABLE ALTER/CHANGE COLUMN is not supported for changing
's column multiple times in the same command." + ], + "sqlState" : "0A000" + }, "NOT_SUPPORTED_COMMAND_FOR_V2_TABLE" : { "message" : [ " is not supported for v2 tables." diff --git a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 index 667d200268cf8..8e6edac2108a6 100644 --- a/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 +++ b/sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4 @@ -212,8 +212,7 @@ statement | ALTER (TABLE | VIEW) identifierReference UNSET TBLPROPERTIES (IF EXISTS)? propertyList #unsetTableProperties | ALTER TABLE table=identifierReference - (ALTER | CHANGE) COLUMN? column=multipartIdentifier - alterColumnAction? #alterTableAlterColumn + (ALTER | CHANGE) COLUMN? columns=alterColumnSpecList #alterTableAlterColumn | ALTER TABLE table=identifierReference partitionSpec? CHANGE COLUMN? colName=multipartIdentifier colType colPosition? #hiveChangeColumn @@ -1489,6 +1488,14 @@ number | MINUS? BIGDECIMAL_LITERAL #bigDecimalLiteral ; +alterColumnSpecList + : alterColumnSpec (COMMA alterColumnSpec)* + ; + +alterColumnSpec + : column=multipartIdentifier alterColumnAction? + ; + alterColumnAction : TYPE dataType | commentSpec diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 92cfc4119dd0c..0bbb105016517 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -3847,24 +3847,29 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor resolved.copyTagsFrom(a) resolved - case a @ AlterColumn( - table: ResolvedTable, ResolvedFieldName(path, field), dataType, _, _, position, _) => - val newDataType = dataType.flatMap { dt => - // Hive style syntax provides the column type, even if it may not have changed. - val existing = CharVarcharUtils.getRawType(field.metadata).getOrElse(field.dataType) - if (existing == dt) None else Some(dt) - } - val newPosition = position map { - case u @ UnresolvedFieldPosition(after: After) => - // TODO: since the field name is already resolved, it's more efficient if - // `ResolvedFieldName` carries the parent struct and we resolve column position - // based on the parent struct, instead of re-resolving the entire column path. - val resolved = resolveFieldNames(table, path :+ after.column(), u) - ResolvedFieldPosition(ColumnPosition.after(resolved.field.name)) - case u: UnresolvedFieldPosition => ResolvedFieldPosition(u.position) - case other => other + case a @ AlterColumns(table: ResolvedTable, columns, specs) => + val resolvedSpecs = columns.zip(specs).map { + case (ResolvedFieldName(path, field), s @ AlterColumnSpec(dataType, _, _, position, _)) => + val newDataType = dataType.flatMap { dt => + // Hive style syntax provides the column type, even if it may not have changed. + val existing = CharVarcharUtils.getRawType(field.metadata).getOrElse(field.dataType) + if (existing == dt) None else Some(dt) + } + val newPosition = position map { + case u @ UnresolvedFieldPosition(after: After) => + // TODO: since the field name is already resolved, it's more efficient if + // `ResolvedFieldName` carries the parent struct and we resolve column + // position based on the parent struct, instead of re-resolving the entire + // column path. + val resolved = resolveFieldNames(table, path :+ after.column(), u) + ResolvedFieldPosition(ColumnPosition.after(resolved.field.name)) + case u: UnresolvedFieldPosition => ResolvedFieldPosition(u.position) + case other => other + } + s.copy(dataType = newDataType, position = newPosition) + case (_, spec) => spec } - val resolved = a.copy(dataType = newDataType, position = newPosition) + val resolved = a.copy(specs = resolvedSpecs) resolved.copyTagsFrom(a) resolved } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala index 269411a6fc616..0164dcf30d336 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala @@ -1622,60 +1622,84 @@ trait CheckAnalysis extends PredicateHelper with LookupCatalog with QueryErrorsB case RenameColumn(table: ResolvedTable, col: ResolvedFieldName, newName) => checkColumnNotExists("rename", col.path :+ newName, table.schema) - case a @ AlterColumn(table: ResolvedTable, col: ResolvedFieldName, _, _, _, _, _) => - val fieldName = col.name.quoted - if (a.dataType.isDefined) { - val field = CharVarcharUtils.getRawType(col.field.metadata) - .map(dt => col.field.copy(dataType = dt)) - .getOrElse(col.field) - val newDataType = a.dataType.get - newDataType match { - case _: StructType => alter.failAnalysis( - "CANNOT_UPDATE_FIELD.STRUCT_TYPE", - Map("table" -> toSQLId(table.name), "fieldName" -> toSQLId(fieldName))) - case _: MapType => alter.failAnalysis( - "CANNOT_UPDATE_FIELD.MAP_TYPE", - Map("table" -> toSQLId(table.name), "fieldName" -> toSQLId(fieldName))) - case _: ArrayType => alter.failAnalysis( - "CANNOT_UPDATE_FIELD.ARRAY_TYPE", - Map("table" -> toSQLId(table.name), "fieldName" -> toSQLId(fieldName))) - case u: UserDefinedType[_] => alter.failAnalysis( - "CANNOT_UPDATE_FIELD.USER_DEFINED_TYPE", - Map( - "table" -> toSQLId(table.name), - "fieldName" -> toSQLId(fieldName), - "udtSql" -> toSQLType(u))) - case _: CalendarIntervalType | _: AnsiIntervalType => alter.failAnalysis( - "CANNOT_UPDATE_FIELD.INTERVAL_TYPE", - Map("table" -> toSQLId(table.name), "fieldName" -> toSQLId(fieldName))) - case _ => // update is okay - } - - // We don't need to handle nested types here which shall fail before. - def canAlterColumnType(from: DataType, to: DataType): Boolean = (from, to) match { - case (CharType(l1), CharType(l2)) => l1 == l2 - case (CharType(l1), VarcharType(l2)) => l1 <= l2 - case (VarcharType(l1), VarcharType(l2)) => l1 <= l2 - case _ => Cast.canUpCast(from, to) - } - if (!canAlterColumnType(field.dataType, newDataType)) { + case AlterColumns(table: ResolvedTable, columns, specs) => + val groupedColumns = columns.groupBy(_.name) + groupedColumns.collect { + case (name, occurrences) if occurrences.length > 1 => alter.failAnalysis( - errorClass = "NOT_SUPPORTED_CHANGE_COLUMN", + errorClass = "NOT_SUPPORTED_CHANGE_SAME_COLUMN", messageParameters = Map( "table" -> toSQLId(table.name), - "originName" -> toSQLId(fieldName), - "originType" -> toSQLType(field.dataType), - "newName" -> toSQLId(fieldName), - "newType" -> toSQLType(newDataType))) - } + "fieldName" -> toSQLId(name))) } - if (a.nullable.isDefined) { - if (!a.nullable.get && col.field.nullable) { + groupedColumns.keys.foreach { name => + val child = groupedColumns.keys.find(child => child != name && child.startsWith(name)) + if (child.isDefined) { alter.failAnalysis( - errorClass = "_LEGACY_ERROR_TEMP_2330", - messageParameters = Map("fieldName" -> fieldName)) + errorClass = "NOT_SUPPORTED_CHANGE_PARENT_CHILD_COLUMN", + messageParameters = Map( + "table" -> toSQLId(table.name), + "parentField" -> toSQLId(name), + "childField" -> toSQLId(child.get))) } } + columns.zip(specs).foreach { + case (col: ResolvedFieldName, a: AlterColumnSpec) => + val fieldName = col.name.quoted + if (a.dataType.isDefined) { + val field = CharVarcharUtils.getRawType(col.field.metadata) + .map(dt => col.field.copy(dataType = dt)) + .getOrElse(col.field) + val newDataType = a.dataType.get + newDataType match { + case _: StructType => alter.failAnalysis( + "CANNOT_UPDATE_FIELD.STRUCT_TYPE", + Map("table" -> toSQLId(table.name), "fieldName" -> toSQLId(fieldName))) + case _: MapType => alter.failAnalysis( + "CANNOT_UPDATE_FIELD.MAP_TYPE", + Map("table" -> toSQLId(table.name), "fieldName" -> toSQLId(fieldName))) + case _: ArrayType => alter.failAnalysis( + "CANNOT_UPDATE_FIELD.ARRAY_TYPE", + Map("table" -> toSQLId(table.name), "fieldName" -> toSQLId(fieldName))) + case u: UserDefinedType[_] => alter.failAnalysis( + "CANNOT_UPDATE_FIELD.USER_DEFINED_TYPE", + Map( + "table" -> toSQLId(table.name), + "fieldName" -> toSQLId(fieldName), + "udtSql" -> toSQLType(u))) + case _: CalendarIntervalType | _: AnsiIntervalType => alter.failAnalysis( + "CANNOT_UPDATE_FIELD.INTERVAL_TYPE", + Map("table" -> toSQLId(table.name), "fieldName" -> toSQLId(fieldName))) + case _ => // update is okay + } + + // We don't need to handle nested types here which shall fail before. + def canAlterColumnType(from: DataType, to: DataType): Boolean = (from, to) match { + case (CharType(l1), CharType(l2)) => l1 == l2 + case (CharType(l1), VarcharType(l2)) => l1 <= l2 + case (VarcharType(l1), VarcharType(l2)) => l1 <= l2 + case _ => Cast.canUpCast(from, to) + } + if (!canAlterColumnType(field.dataType, newDataType)) { + alter.failAnalysis( + errorClass = "NOT_SUPPORTED_CHANGE_COLUMN", + messageParameters = Map( + "table" -> toSQLId(table.name), + "originName" -> toSQLId(fieldName), + "originType" -> toSQLType(field.dataType), + "newName" -> toSQLId(fieldName), + "newType" -> toSQLType(newDataType))) + } + } + if (a.nullable.isDefined) { + if (!a.nullable.get && col.field.nullable) { + alter.failAnalysis( + errorClass = "_LEGACY_ERROR_TEMP_2330", + messageParameters = Map("fieldName" -> fieldName)) + } + } + case _ => + } case _ => } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultStringTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultStringTypes.scala index 75958ff3e1177..969a78a240c27 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultStringTypes.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultStringTypes.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, Literal} -import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumn, AlterViewAs, ColumnDefinition, CreateView, LogicalPlan, QualifiedColType, ReplaceColumns, V1CreateTablePlan, V2CreateTablePlan} +import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumns, AlterColumnSpec, AlterViewAs, ColumnDefinition, CreateView, LogicalPlan, QualifiedColType, ReplaceColumns, V1CreateTablePlan, V2CreateTablePlan} import org.apache.spark.sql.catalyst.rules.{Rule, RuleExecutor} import org.apache.spark.sql.types.{DataType, StringType} @@ -93,7 +93,7 @@ object ResolveDefaultStringTypes extends Rule[LogicalPlan] { StringType(conf.defaultStringType.collationId) private def isDDLCommand(plan: LogicalPlan): Boolean = plan exists { - case _: AddColumns | _: ReplaceColumns | _: AlterColumn => true + case _: AddColumns | _: ReplaceColumns | _: AlterColumns => true case _ => isCreateOrAlterPlan(plan) } @@ -115,9 +115,13 @@ object ResolveDefaultStringTypes extends Rule[LogicalPlan] { case replaceCols: ReplaceColumns => replaceCols.copy(columnsToAdd = replaceColumnTypes(replaceCols.columnsToAdd, newType)) - case alter: AlterColumn - if alter.dataType.isDefined && hasDefaultStringType(alter.dataType.get) => - alter.copy(dataType = Some(replaceDefaultStringType(alter.dataType.get, newType))) + case a @ AlterColumns(_, _, specs: Seq[AlterColumnSpec]) => + val newSpecs = specs.map { + case spec if spec.dataType.isDefined && hasDefaultStringType(spec.dataType.get) => + spec.copy(dataType = Some(replaceDefaultStringType(spec.dataType.get, newType))) + case col => col + } + a.copy(specs = newSpecs) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index b408fcefcfb26..ee0af469a0b2d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -4909,7 +4909,7 @@ class AstBuilder extends DataTypeAstBuilder } /** - * Parse a [[AlterColumn]] command to alter a column's property. + * Parse a [[AlterColumns]] command to alter a column's property. * * For example: * {{{ @@ -4919,67 +4919,76 @@ class AstBuilder extends DataTypeAstBuilder * ALTER TABLE table1 ALTER COLUMN a.b.c COMMENT 'new comment' * ALTER TABLE table1 ALTER COLUMN a.b.c FIRST * ALTER TABLE table1 ALTER COLUMN a.b.c AFTER x + * ALTER TABLE table1 ALTER COLUMN a.b.c TYPE bigint, x COMMENT 'new comment', y FIRST * }}} */ override def visitAlterTableAlterColumn( ctx: AlterTableAlterColumnContext): LogicalPlan = withOrigin(ctx) { - val action = ctx.alterColumnAction val verb = if (ctx.CHANGE != null) "CHANGE" else "ALTER" - if (action == null) { - operationNotAllowed( - s"ALTER TABLE table $verb COLUMN requires a TYPE, a SET/DROP, a COMMENT, or a FIRST/AFTER", - ctx) - } - val dataType = if (action.dataType != null) { - Some(typedVisit[DataType](action.dataType)) - } else { - None - } - val nullable = if (action.setOrDrop != null) { - action.setOrDrop.getType match { - case SqlBaseParser.SET => Some(false) - case SqlBaseParser.DROP => Some(true) + val (columns, specs) = ctx.columns.alterColumnSpec.asScala.map { spec => + val action = spec.alterColumnAction + if (action == null) { + operationNotAllowed( + s"ALTER TABLE table $verb COLUMN requires " + + "a TYPE, a SET/DROP, a COMMENT, or a FIRST/AFTER", + ctx) } - } else { - None - } - val comment = if (action.commentSpec != null) { - Some(visitCommentSpec(action.commentSpec())) - } else { - None - } - val position = if (action.colPosition != null) { - Some(UnresolvedFieldPosition(typedVisit[ColumnPosition](action.colPosition))) - } else { - None - } - val setDefaultExpression: Option[String] = - if (action.defaultExpression != null) { - Option(action.defaultExpression()).map(visitDefaultExpression).map(_.originalSQL) - } else if (action.dropDefault != null) { - Some("") + val dataType = if (action.dataType != null) { + Some(typedVisit[DataType](action.dataType)) } else { None } - if (setDefaultExpression.isDefined && !conf.getConf(SQLConf.ENABLE_DEFAULT_COLUMNS)) { - throw QueryParsingErrors.defaultColumnNotEnabledError(ctx) - } - - assert(Seq(dataType, nullable, comment, position, setDefaultExpression) - .count(_.nonEmpty) == 1) + val nullable = if (action.setOrDrop != null) { + action.setOrDrop.getType match { + case SqlBaseParser.SET => Some(false) + case SqlBaseParser.DROP => Some(true) + } + } else { + None + } + val comment = if (action.commentSpec != null) { + Some(visitCommentSpec(action.commentSpec())) + } else { + None + } + val position = if (action.colPosition != null) { + Some(UnresolvedFieldPosition(typedVisit[ColumnPosition](action.colPosition))) + } else { + None + } + val setDefaultExpression: Option[String] = + if (action.defaultExpression != null) { + Option(action.defaultExpression()).map(visitDefaultExpression).map(_.originalSQL) + } else if (action.dropDefault != null) { + Some("") + } else { + None + } + if (setDefaultExpression.isDefined && !conf.getConf(SQLConf.ENABLE_DEFAULT_COLUMNS)) { + throw QueryParsingErrors.defaultColumnNotEnabledError(ctx) + } - AlterColumn( + assert(Seq(dataType, nullable, comment, position, setDefaultExpression) + .count(_.nonEmpty) == 1) + + ( + UnresolvedFieldName(typedVisit[Seq[String]](spec.column)), + AlterColumnSpec( + dataType, + nullable, + comment, + position, + setDefaultExpression) + ) + }.unzip + AlterColumns( createUnresolvedTable(ctx.table, s"ALTER TABLE ... $verb COLUMN"), - UnresolvedFieldName(typedVisit[Seq[String]](ctx.column)), - dataType = dataType, - nullable = nullable, - comment = comment, - position = position, - setDefaultExpression = setDefaultExpression) + columns.toSeq, + specs.toSeq) } /** - * Parse a [[AlterColumn]] command. This is Hive SQL syntax. + * Parse a [[AlterColumns]] command. This is Hive SQL syntax. * * For example: * {{{ @@ -5003,15 +5012,16 @@ class AstBuilder extends DataTypeAstBuilder Some("please run ALTER COLUMN ... SET/DROP NOT NULL instead")) } - AlterColumn( + AlterColumns( createUnresolvedTable(ctx.table, "ALTER TABLE ... CHANGE COLUMN"), - UnresolvedFieldName(columnNameParts), - dataType = Option(ctx.colType().dataType()).map(typedVisit[DataType]), - nullable = None, - comment = Option(ctx.colType().commentSpec()).map(visitCommentSpec), - position = Option(ctx.colPosition).map( - pos => UnresolvedFieldPosition(typedVisit[ColumnPosition](pos))), - setDefaultExpression = None) + Seq(UnresolvedFieldName(columnNameParts)), + Seq(AlterColumnSpec( + dataType = Option(ctx.colType().dataType()).map(typedVisit[DataType]), + nullable = None, + comment = Option(ctx.colType().commentSpec()).map(visitCommentSpec), + position = Option(ctx.colPosition).map( + pos => UnresolvedFieldPosition(typedVisit[ColumnPosition](pos))), + setDefaultExpression = None))) } override def visitHiveReplaceColumns( diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala index dbd2c0ba8e420..3c3e0f998332b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/v2AlterTableCommands.scala @@ -201,49 +201,55 @@ case class RenameColumn( copy(table = newChild) } -/** - * The logical plan of the ALTER TABLE ... ALTER COLUMN command. - */ -case class AlterColumn( - table: LogicalPlan, - column: FieldName, +case class AlterColumnSpec( dataType: Option[DataType], nullable: Option[Boolean], comment: Option[String], position: Option[FieldPosition], - setDefaultExpression: Option[String]) extends AlterTableCommand { + setDefaultExpression: Option[String]) + +/** + * The logical plan of the ALTER TABLE ... ALTER COLUMN command. + */ +case class AlterColumns( + table: LogicalPlan, + columns: Seq[FieldName], + specs: Seq[AlterColumnSpec]) extends AlterTableCommand { override def changes: Seq[TableChange] = { - require(column.resolved, "FieldName should be resolved before it's converted to TableChange.") - val colName = column.name.toArray - val typeChange = dataType.map { newDataType => - TableChange.updateColumnType(colName, newDataType) - } - val nullabilityChange = nullable.map { nullable => - TableChange.updateColumnNullability(colName, nullable) - } - val commentChange = comment.map { newComment => - TableChange.updateColumnComment(colName, newComment) - } - val positionChange = position.map { newPosition => - require(newPosition.resolved, - "FieldPosition should be resolved before it's converted to TableChange.") - TableChange.updateColumnPosition(colName, newPosition.position) - } - val defaultValueChange = setDefaultExpression.map { newDefaultExpression => - if (newDefaultExpression.nonEmpty) { - // SPARK-45075: We call 'ResolveDefaultColumns.analyze' here to make sure that the default - // value parses successfully, and return an error otherwise - val newDataType = dataType.getOrElse(column.asInstanceOf[ResolvedFieldName].field.dataType) - ResolveDefaultColumns.analyze(column.name.last, newDataType, newDefaultExpression, - "ALTER TABLE ALTER COLUMN") + assert(columns.size == specs.size) + columns.zip(specs).flatMap { case (column, spec) => + require(column.resolved, "FieldName should be resolved before it's converted to TableChange.") + val colName = column.name.toArray + val typeChange = spec.dataType.map { newDataType => + TableChange.updateColumnType(colName, newDataType) + } + val nullabilityChange = spec.nullable.map { nullable => + TableChange.updateColumnNullability(colName, nullable) } - TableChange.updateColumnDefaultValue(colName, newDefaultExpression) + val commentChange = spec.comment.map { newComment => + TableChange.updateColumnComment(colName, newComment) + } + val positionChange = spec.position.map { newPosition => + require(newPosition.resolved, + "FieldPosition should be resolved before it's converted to TableChange.") + TableChange.updateColumnPosition(colName, newPosition.position) + } + val defaultValueChange = spec.setDefaultExpression.map { newDefaultExpression => + if (newDefaultExpression.nonEmpty) { + // SPARK-45075: We call 'ResolveDefaultColumns.analyze' here to make sure that the default + // value parses successfully, and return an error otherwise + val newDataType = spec.dataType.getOrElse( + column.asInstanceOf[ResolvedFieldName].field.dataType) + ResolveDefaultColumns.analyze(column.name.last, newDataType, newDefaultExpression, + "ALTER TABLE ALTER COLUMN") + } + TableChange.updateColumnDefaultValue(colName, newDefaultExpression) + } + typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange ++ defaultValueChange } - typeChange.toSeq ++ nullabilityChange ++ commentChange ++ positionChange ++ defaultValueChange } - override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = - copy(table = newChild) + override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = copy(newChild) } /** diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala index 0ec2c80282fc2..d17f8b87df9e2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/parser/DDLParserSuite.scala @@ -1218,14 +1218,15 @@ class DDLParserSuite extends AnalysisTest { test("alter table: update column type using ALTER") { comparePlans( parsePlan("ALTER TABLE table_name ALTER COLUMN a.b.c TYPE bigint"), - AlterColumn( + AlterColumns( UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ALTER COLUMN"), - UnresolvedFieldName(Seq("a", "b", "c")), - Some(LongType), - None, - None, - None, - None)) + Seq(UnresolvedFieldName(Seq("a", "b", "c"))), + Seq(AlterColumnSpec( + Some(LongType), + None, + None, + None, + None)))) } test("alter table: update column type invalid type") { @@ -1244,40 +1245,43 @@ class DDLParserSuite extends AnalysisTest { test("alter table: update column type") { comparePlans( parsePlan("ALTER TABLE table_name CHANGE COLUMN a.b.c TYPE bigint"), - AlterColumn( + AlterColumns( UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CHANGE COLUMN"), - UnresolvedFieldName(Seq("a", "b", "c")), - Some(LongType), - None, - None, - None, - None)) + Seq(UnresolvedFieldName(Seq("a", "b", "c"))), + Seq(AlterColumnSpec( + Some(LongType), + None, + None, + None, + None)))) } test("alter table: update column comment") { comparePlans( parsePlan("ALTER TABLE table_name CHANGE COLUMN a.b.c COMMENT 'new comment'"), - AlterColumn( + AlterColumns( UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CHANGE COLUMN"), - UnresolvedFieldName(Seq("a", "b", "c")), - None, - None, - Some("new comment"), - None, - None)) + Seq(UnresolvedFieldName(Seq("a", "b", "c"))), + Seq(AlterColumnSpec( + None, + None, + Some("new comment"), + None, + None)))) } test("alter table: update column position") { comparePlans( parsePlan("ALTER TABLE table_name CHANGE COLUMN a.b.c FIRST"), - AlterColumn( + AlterColumns( UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CHANGE COLUMN"), - UnresolvedFieldName(Seq("a", "b", "c")), - None, - None, - None, - Some(UnresolvedFieldPosition(first())), - None)) + Seq(UnresolvedFieldName(Seq("a", "b", "c"))), + Seq(AlterColumnSpec( + None, + None, + None, + Some(UnresolvedFieldPosition(first())), + None)))) } test("alter table: multiple property changes are not allowed") { @@ -1303,25 +1307,69 @@ class DDLParserSuite extends AnalysisTest { test("alter table: SET/DROP NOT NULL") { comparePlans( parsePlan("ALTER TABLE table_name ALTER COLUMN a.b.c SET NOT NULL"), - AlterColumn( + AlterColumns( UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ALTER COLUMN"), - UnresolvedFieldName(Seq("a", "b", "c")), - None, - Some(false), - None, - None, - None)) + Seq(UnresolvedFieldName(Seq("a", "b", "c"))), + Seq(AlterColumnSpec( + None, + Some(false), + None, + None, + None)))) comparePlans( parsePlan("ALTER TABLE table_name ALTER COLUMN a.b.c DROP NOT NULL"), - AlterColumn( + AlterColumns( UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ALTER COLUMN"), - UnresolvedFieldName(Seq("a", "b", "c")), - None, - Some(true), - None, - None, - None)) + Seq(UnresolvedFieldName(Seq("a", "b", "c"))), + Seq(AlterColumnSpec( + None, + Some(true), + None, + None, + None)))) + } + + test("alter table: bulk alter column") { + comparePlans( + parsePlan( + """ALTER TABLE table_name ALTER COLUMN + | a.b.c SET NOT NULL, + | d.e.f COMMENT 'new comment', + | x TYPE INT, + | y FIRST""".stripMargin), + AlterColumns( + UnresolvedTable(Seq("table_name"), "ALTER TABLE ... ALTER COLUMN"), + Seq( + UnresolvedFieldName(Seq("a", "b", "c")), + UnresolvedFieldName(Seq("d", "e", "f")), + UnresolvedFieldName(Seq("x")), + UnresolvedFieldName(Seq("y"))), + Seq( + AlterColumnSpec( + None, + Some(false), + None, + None, + None), + AlterColumnSpec( + None, + None, + Some("new comment"), + None, + None), + AlterColumnSpec( + Some(IntegerType), + None, + None, + None, + None), + AlterColumnSpec( + None, + None, + None, + Some(UnresolvedFieldPosition(first())), + None)))) } test("alter table: hive style change column") { @@ -1331,36 +1379,39 @@ class DDLParserSuite extends AnalysisTest { comparePlans( parsePlan(sql1), - AlterColumn( + AlterColumns( UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CHANGE COLUMN"), - UnresolvedFieldName(Seq("a", "b", "c")), - Some(IntegerType), - None, - None, - None, - None)) + Seq(UnresolvedFieldName(Seq("a", "b", "c"))), + Seq(AlterColumnSpec( + Some(IntegerType), + None, + None, + None, + None)))) comparePlans( parsePlan(sql2), - AlterColumn( + AlterColumns( UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CHANGE COLUMN"), - UnresolvedFieldName(Seq("a", "b", "c")), - Some(IntegerType), - None, - Some("new_comment"), - None, - None)) + Seq(UnresolvedFieldName(Seq("a", "b", "c"))), + Seq(AlterColumnSpec( + Some(IntegerType), + None, + Some("new_comment"), + None, + None)))) comparePlans( parsePlan(sql3), - AlterColumn( + AlterColumns( UnresolvedTable(Seq("table_name"), "ALTER TABLE ... CHANGE COLUMN"), - UnresolvedFieldName(Seq("a", "b", "c")), - Some(IntegerType), - None, - None, - Some(UnresolvedFieldPosition(after("other_col"))), - None)) + Seq(UnresolvedFieldName(Seq("a", "b", "c"))), + Seq(AlterColumnSpec( + Some(IntegerType), + None, + None, + Some(UnresolvedFieldPosition(after("other_col"))), + None)))) // renaming column not supported in hive style ALTER COLUMN. val sql4 = "ALTER TABLE table_name CHANGE COLUMN a.b.c new_name INT" @@ -2676,25 +2727,27 @@ class DDLParserSuite extends AnalysisTest { Seq(QualifiedColType(None, "x", IntegerType, false, None, None, Some("42"))))) comparePlans( parsePlan("ALTER TABLE t1 ALTER COLUMN a.b.c SET DEFAULT 42"), - AlterColumn( + AlterColumns( UnresolvedTable(Seq("t1"), "ALTER TABLE ... ALTER COLUMN"), - UnresolvedFieldName(Seq("a", "b", "c")), - None, - None, - None, - None, - Some("42"))) + Seq(UnresolvedFieldName(Seq("a", "b", "c"))), + Seq(AlterColumnSpec( + None, + None, + None, + None, + Some("42"))))) // It is possible to pass an empty string default value using quotes. comparePlans( parsePlan("ALTER TABLE t1 ALTER COLUMN a.b.c SET DEFAULT ''"), - AlterColumn( + AlterColumns( UnresolvedTable(Seq("t1"), "ALTER TABLE ... ALTER COLUMN"), - UnresolvedFieldName(Seq("a", "b", "c")), - None, - None, - None, - None, - Some("''"))) + Seq(UnresolvedFieldName(Seq("a", "b", "c"))), + Seq(AlterColumnSpec( + None, + None, + None, + None, + Some("''"))))) // It is not possible to pass an empty string default value without using quotes. // This results in a parsing error. val sql1 = "ALTER TABLE t1 ALTER COLUMN a.b.c SET DEFAULT " @@ -2712,14 +2765,15 @@ class DDLParserSuite extends AnalysisTest { comparePlans( parsePlan("ALTER TABLE t1 ALTER COLUMN a.b.c DROP DEFAULT"), - AlterColumn( + AlterColumns( UnresolvedTable(Seq("t1"), "ALTER TABLE ... ALTER COLUMN"), - UnresolvedFieldName(Seq("a", "b", "c")), - None, - None, - None, - None, - Some(""))) + Seq(UnresolvedFieldName(Seq("a", "b", "c"))), + Seq(AlterColumnSpec( + None, + None, + None, + None, + Some(""))))) // Make sure that the parser returns an exception when the feature is disabled. withSQLConf(SQLConf.ENABLE_DEFAULT_COLUMNS.key -> "false") { val sql = "CREATE TABLE my_tab(a INT, b STRING NOT NULL DEFAULT \"abc\") USING parquet" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ReplaceCharWithVarchar.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ReplaceCharWithVarchar.scala index c0777ac6e6604..155687d15c176 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ReplaceCharWithVarchar.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ReplaceCharWithVarchar.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.catalyst.analysis import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumn, ColumnDefinition, CreateTable, LogicalPlan, ReplaceColumns, ReplaceTable} +import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumns, ColumnDefinition, CreateTable, LogicalPlan, ReplaceColumns, ReplaceTable} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.CharVarcharUtils import org.apache.spark.sql.execution.command.{AlterTableAddColumnsCommand, AlterTableChangeColumnCommand, CreateDataSourceTableCommand, CreateTableCommand} @@ -39,8 +39,10 @@ object ReplaceCharWithVarchar extends Rule[LogicalPlan] { cmd.copy(columnsToAdd = cmd.columnsToAdd.map { col => col.copy(dataType = CharVarcharUtils.replaceCharWithVarchar(col.dataType)) }) - case cmd: AlterColumn => - cmd.copy(dataType = cmd.dataType.map(CharVarcharUtils.replaceCharWithVarchar)) + case cmd: AlterColumns => + cmd.copy(specs = cmd.specs.map { spec => + spec.copy(dataType = spec.dataType.map(CharVarcharUtils.replaceCharWithVarchar)) + }) case cmd: ReplaceColumns => cmd.copy(columnsToAdd = cmd.columnsToAdd.map { col => col.copy(dataType = CharVarcharUtils.replaceCharWithVarchar(col.dataType)) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala index b73ea2f80452b..7adf0b410f76b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala @@ -74,9 +74,15 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) case ReplaceColumns(ResolvedV1TableIdentifier(ident), _) => throw QueryCompilationErrors.unsupportedTableOperationError(ident, "REPLACE COLUMNS") - case a @ AlterColumn(ResolvedTable(catalog, ident, table: V1Table, _), _, _, _, _, _, _) + case AlterColumns(ResolvedTable(catalog, ident, table: V1Table, _), columns, specs) if supportsV1Command(catalog) => - if (a.column.name.length > 1) { + if (columns.size > 1) { + throw QueryCompilationErrors.unsupportedTableOperationError( + catalog, ident, "ALTER COLUMN in bulk") + } + val column = columns.head + val a = specs.head + if (column.name.length > 1) { throw QueryCompilationErrors.unsupportedTableOperationError( catalog, ident, "ALTER COLUMN with qualified column") } @@ -91,7 +97,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) val builder = new MetadataBuilder // Add comment to metadata a.comment.map(c => builder.putString("comment", c)) - val colName = a.column.name(0) + val colName = column.name.head val dataType = a.dataType.getOrElse { table.schema.findNestedField(Seq(colName), resolver = conf.resolver) .map { @@ -101,7 +107,7 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager) } .getOrElse { throw QueryCompilationErrors.unresolvedColumnError( - toSQLId(a.column.name), table.schema.fieldNames) + toSQLId(column.name), table.schema.fieldNames) } } // Add the current default column value string (if any) to the column metadata. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala index 00e1f2f93fdcb..3298f6595999b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/AlterTableTests.scala @@ -24,8 +24,8 @@ import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.catalyst.util.quoteIdentifier +import org.apache.spark.sql.connector.catalog.{Column, Table} import org.apache.spark.sql.connector.catalog.CatalogV2Util.withDefaultOwnership -import org.apache.spark.sql.connector.catalog.Table import org.apache.spark.sql.errors.QueryErrorsBase import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession @@ -1017,6 +1017,85 @@ trait AlterTableTests extends SharedSparkSession with QueryErrorsBase { } } + test("AlterTable: alter multiple columns/fields in the same command") { + withSQLConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS.key -> s"$v2Format, ") { + val t = fullTableName("table_name") + withTable(t) { + sql(s"""CREATE TABLE $t ( + | id int, + | data string, + | ts timestamp NOT NULL, + | points array>) USING $v2Format""".stripMargin) + sql(s"""ALTER TABLE $t ALTER COLUMN + | id TYPE bigint, + | data FIRST, + | ts DROP NOT NULL, + | points.element.x SET DEFAULT (1.0 + 2.0), + | points.element.y COMMENT 'comment on y'""".stripMargin) + + val table = getTableMetadata(t) + + assert(table.name === t) + assert( + table.columns() === Array( + Column.create("data", StringType), + Column.create("id", LongType), + Column.create("ts", TimestampType, true), + Column.create("points", ArrayType( + StructType( + Seq( + StructField("x", DoubleType).withCurrentDefaultValue("(1.0 + 2.0)"), + StructField("y", DoubleType).withComment("comment on y") + )))))) + } + } + } + + test("AlterTable: cannot alter the same column in the same command") { + val t = fullTableName("table_name") + withTable(t) { + sql(s"CREATE TABLE $t (id int, data string, ts timestamp) USING $v2Format") + val sqlText = s"ALTER TABLE $t ALTER COLUMN id TYPE bigint, id COMMENT 'id', data FIRST" + checkError( + exception = intercept[AnalysisException] { + sql(sqlText) + }, + condition = "NOT_SUPPORTED_CHANGE_SAME_COLUMN", + parameters = Map( + "table" -> s"${toSQLId(prependCatalogName(t))}", + "fieldName" -> "`id`"), + context = ExpectedContext(fragment = sqlText, start = 0, stop = sqlText.length - 1) + ) + } + } + + test("AlterTable: cannot alter parent and child fields in the same command") { + withSQLConf(SQLConf.DEFAULT_COLUMN_ALLOWED_PROVIDERS.key -> s"$v2Format, ") { + val t = fullTableName("table_name") + withTable(t) { + sql(s"CREATE TABLE $t (parent array>) USING $v2Format") + val sqlText = s"""ALTER TABLE $t ALTER COLUMN + | parent.element.child TYPE string, + | parent SET DEFAULT array(struct(1000))""".stripMargin + + checkError( + exception = intercept[AnalysisException] { + sql(sqlText) + }, + condition = "NOT_SUPPORTED_CHANGE_PARENT_CHILD_COLUMN", + parameters = Map( + "table" -> s"${toSQLId(prependCatalogName(t))}", + "parentField" -> "`parent`", + "childField" -> "`parent`.`element`.`child`"), + context = ExpectedContext( + fragment = sqlText, + start = 0, + stop = sqlText.length - 1) + ) + } + } + } + test("AlterTable: rename column") { val t = fullTableName("table_name") withTable(t) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala index 67fca09802139..68a0e714441b6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/V2CommandsCaseSensitivitySuite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.connector import org.apache.spark.sql.catalyst.analysis.{AnalysisTest, CreateTablePartitioningValidationSuite, ResolvedTable, TestRelation2, TestTable2, UnresolvedFieldName, UnresolvedFieldPosition, UnresolvedIdentifier} -import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumn, AlterTableCommand, CreateTableAsSelect, DropColumns, LogicalPlan, OptionList, QualifiedColType, RenameColumn, ReplaceColumns, ReplaceTableAsSelect, UnresolvedTableSpec} +import org.apache.spark.sql.catalyst.plans.logical.{AddColumns, AlterColumns, AlterColumnSpec, AlterTableCommand, CreateTableAsSelect, DropColumns, LogicalPlan, OptionList, QualifiedColType, RenameColumn, ReplaceColumns, ReplaceTableAsSelect, UnresolvedTableSpec} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.types.DataTypeUtils.toAttributes import org.apache.spark.sql.connector.catalog.Identifier @@ -368,8 +368,8 @@ class V2CommandsCaseSensitivitySuite test("AlterTable: drop column nullability resolution") { Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", "x")).foreach { ref => alterTableTest( - AlterColumn(table, UnresolvedFieldName(ref.toImmutableArraySeq), - None, Some(true), None, None, None), + AlterColumns(table, Seq(UnresolvedFieldName(ref.toImmutableArraySeq)), + Seq(AlterColumnSpec(None, Some(true), None, None, None))), expectedErrorCondition = "UNRESOLVED_COLUMN.WITH_SUGGESTION", expectedMessageParameters = Map( "objectName" -> s"${toSQLId(ref.toImmutableArraySeq)}", @@ -381,8 +381,8 @@ class V2CommandsCaseSensitivitySuite test("AlterTable: change column type resolution") { Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", "x")).foreach { ref => alterTableTest( - AlterColumn(table, UnresolvedFieldName(ref.toImmutableArraySeq), - Some(StringType), None, None, None, None), + AlterColumns(table, Seq(UnresolvedFieldName(ref.toImmutableArraySeq)), + Seq(AlterColumnSpec(Some(StringType), None, None, None, None))), expectedErrorCondition = "UNRESOLVED_COLUMN.WITH_SUGGESTION", expectedMessageParameters = Map( "objectName" -> s"${toSQLId(ref.toImmutableArraySeq)}", @@ -394,8 +394,8 @@ class V2CommandsCaseSensitivitySuite test("AlterTable: change column comment resolution") { Seq(Array("ID"), Array("point", "X"), Array("POINT", "X"), Array("POINT", "x")).foreach { ref => alterTableTest( - AlterColumn(table, UnresolvedFieldName(ref.toImmutableArraySeq), - None, None, Some("comment"), None, None), + AlterColumns(table, Seq(UnresolvedFieldName(ref.toImmutableArraySeq)), + Seq(AlterColumnSpec(None, None, Some("comment"), None, None))), expectedErrorCondition = "UNRESOLVED_COLUMN.WITH_SUGGESTION", expectedMessageParameters = Map( "objectName" -> s"${toSQLId(ref.toImmutableArraySeq)}", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala index 2cc203129817b..9b9c19e4ea1bb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/PlanResolutionSuite.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogStorageFormat, import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Cast, EqualTo, Expression, InSubquery, IntegerLiteral, ListQuery, Literal, StringLiteral} import org.apache.spark.sql.catalyst.expressions.objects.StaticInvoke import org.apache.spark.sql.catalyst.parser.ParseException -import org.apache.spark.sql.catalyst.plans.logical.{AlterColumn, AnalysisOnlyCommand, AppendData, Assignment, CreateTable, CreateTableAsSelect, DeleteAction, DeleteFromTable, DescribeRelation, DropTable, InsertAction, InsertIntoStatement, LocalRelation, LogicalPlan, MergeIntoTable, OneRowRelation, OverwriteByExpression, OverwritePartitionsDynamic, Project, SetTableLocation, SetTableProperties, ShowTableProperties, SubqueryAlias, UnsetTableProperties, UpdateAction, UpdateTable} +import org.apache.spark.sql.catalyst.plans.logical.{AlterColumns, AlterColumnSpec, AnalysisOnlyCommand, AppendData, Assignment, CreateTable, CreateTableAsSelect, DeleteAction, DeleteFromTable, DescribeRelation, DropTable, InsertAction, InsertIntoStatement, LocalRelation, LogicalPlan, MergeIntoTable, OneRowRelation, OverwriteByExpression, OverwritePartitionsDynamic, Project, SetTableLocation, SetTableProperties, ShowTableProperties, SubqueryAlias, UnsetTableProperties, UpdateAction, UpdateTable} import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.util.TypeUtils.toSQLId import org.apache.spark.sql.connector.FakeV2Provider @@ -1317,6 +1317,10 @@ class PlanResolutionSuite extends SharedSparkSession with AnalysisTest { case (tblName, useV1Command) => val sql1 = s"ALTER TABLE $tblName ALTER COLUMN i TYPE bigint" val sql2 = s"ALTER TABLE $tblName ALTER COLUMN i COMMENT 'new comment'" + val sql3 = + s"""ALTER TABLE $tblName ALTER COLUMN + | i TYPE bigint, + | s SET DEFAULT 'value'""".stripMargin val parsed1 = parseAndResolve(sql1) val parsed2 = parseAndResolve(sql2) @@ -1333,21 +1337,30 @@ class PlanResolutionSuite extends SharedSparkSession with AnalysisTest { comparePlans(parsed1, expected1) comparePlans(parsed2, expected2) - val sql3 = s"ALTER TABLE $tblName ALTER COLUMN j COMMENT 'new comment'" checkError( exception = intercept[AnalysisException] { parseAndResolve(sql3) }, + condition = "UNSUPPORTED_FEATURE.TABLE_OPERATION", + sqlState = "0A000", + parameters = Map("tableName" -> "`spark_catalog`.`default`.`v1Table`", + "operation" -> "ALTER COLUMN in bulk")) + + val sql4 = s"ALTER TABLE $tblName ALTER COLUMN j COMMENT 'new comment'" + checkError( + exception = intercept[AnalysisException] { + parseAndResolve(sql4) + }, condition = "UNRESOLVED_COLUMN.WITH_SUGGESTION", sqlState = "42703", parameters = Map( "objectName" -> "`j`", "proposal" -> "`i`, `s`, `point`"), - context = ExpectedContext(fragment = sql3, start = 0, stop = 55)) + context = ExpectedContext(fragment = sql4, start = 0, stop = 55)) - val sql4 = s"ALTER TABLE $tblName ALTER COLUMN point.x TYPE bigint" + val sql5 = s"ALTER TABLE $tblName ALTER COLUMN point.x TYPE bigint" val e2 = intercept[AnalysisException] { - parseAndResolve(sql4) + parseAndResolve(sql5) } checkError( exception = e2, @@ -1366,30 +1379,47 @@ class PlanResolutionSuite extends SharedSparkSession with AnalysisTest { "operation" -> "ALTER COLUMN ... SET NOT NULL")) } else { parsed1 match { - case AlterColumn( + case AlterColumns( _: ResolvedTable, - column: ResolvedFieldName, - Some(LongType), - None, - None, - None, - None) => + Seq(column: ResolvedFieldName), + Seq(AlterColumnSpec( + Some(LongType), + None, + None, + None, + None))) => assert(column.name == Seq("i")) case _ => fail("expect AlterTableAlterColumn") } parsed2 match { - case AlterColumn( + case AlterColumns( _: ResolvedTable, - column: ResolvedFieldName, - None, - None, - Some("new comment"), - None, - None) => + Seq(column: ResolvedFieldName), + Seq(AlterColumnSpec( + None, + None, + Some("new comment"), + None, + None))) => assert(column.name == Seq("i")) case _ => fail("expect AlterTableAlterColumn") } + + val parsed3 = parseAndResolve(sql3) + parsed3 match { + case AlterColumns( + _: ResolvedTable, + Seq( + column1: ResolvedFieldName, + column2: ResolvedFieldName), + Seq( + AlterColumnSpec(Some(LongType), None, None, None, None), + AlterColumnSpec(None, None, None, None, Some("'value'")))) => + assert(column1.name == Seq("i")) + assert(column2.name == Seq("s")) + case _ => fail("expect AlterTableAlterColumn") + } } } @@ -1445,16 +1475,19 @@ class PlanResolutionSuite extends SharedSparkSession with AnalysisTest { test("alter table: hive style change column") { Seq("v2Table", "testcat.tab").foreach { tblName => parseAndResolve(s"ALTER TABLE $tblName CHANGE COLUMN i i int COMMENT 'an index'") match { - case AlterColumn( - _: ResolvedTable, _: ResolvedFieldName, None, None, Some(comment), None, None) => + case AlterColumns( + _: ResolvedTable, + Seq(_: ResolvedFieldName), + Seq(AlterColumnSpec(None, None, Some(comment), None, None))) => assert(comment == "an index") case _ => fail("expect AlterTableAlterColumn with comment change only") } parseAndResolve(s"ALTER TABLE $tblName CHANGE COLUMN i i long COMMENT 'an index'") match { - case AlterColumn( - _: ResolvedTable, _: ResolvedFieldName, Some(dataType), None, Some(comment), None, - None) => + case AlterColumns( + _: ResolvedTable, + Seq(_: ResolvedFieldName), + Seq(AlterColumnSpec(Some(dataType), None, Some(comment), None, None))) => assert(comment == "an index") assert(dataType == LongType) case _ => fail("expect AlterTableAlterColumn with type and comment changes") @@ -1489,14 +1522,14 @@ class PlanResolutionSuite extends SharedSparkSession with AnalysisTest { val catalog = if (isSessionCatalog) v2SessionCatalog else testCat val tableIdent = if (isSessionCatalog) "v2Table" else "tab" parsed match { - case AlterColumn(r: ResolvedTable, _, _, _, _, _, _) => + case AlterColumns(r: ResolvedTable, _, _) => assert(r.catalog == catalog) assert(r.identifier.name() == tableIdent) case Project(_, AsDataSourceV2Relation(r)) => - assert(r.catalog.exists(_ == catalog)) + assert(r.catalog.contains(catalog)) assert(r.identifier.exists(_.name() == tableIdent)) case AppendData(r: DataSourceV2Relation, _, _, _, _, _) => - assert(r.catalog.exists(_ == catalog)) + assert(r.catalog.contains(catalog)) assert(r.identifier.exists(_.name() == tableIdent)) case DescribeRelation(r: ResolvedTable, _, _, _) => assert(r.catalog == catalog)