-
Notifications
You must be signed in to change notification settings - Fork 28.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[SPARK-50883][SQL] Support altering multiple columns in the same command #49559
Conversation
7524bee
to
18cc4bb
Compare
18cc4bb
to
b69f332
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@ctring Need to explicitly define semantic of new command: is it atomic or not. If it fails on one of columns, does it leave modified columns as is? Please, describe this in PR's description.
b69f332
to
c24c380
Compare
@MaxGekk I updated the PR |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a committer... but I've seen enough workloads with wide tables and frequent tool-generated schema changes that this looks appealing.
if (a.nullable.isDefined) { | ||
if (!a.nullable.get && col.field.nullable) { | ||
groupedColumns.keys.foreach { name => | ||
val child = groupedColumns.keys.find(child => child != name && child.startsWith(name)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is trying to handle e.g. x.y.z
vs. x.y
, but wouldn't this check also flag two (non-nested) fields where one happens to be a prefix of the other? e.g. customer
and customer_status
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
name
and child
are Seq
of the name parts so startsWith
is matching the name part prefix.
} | ||
|
||
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = | ||
copy(table = newChild) | ||
override protected def withNewChildInternal(newChild: LogicalPlan): LogicalPlan = copy(newChild) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how does this change work? Is there some overload of copy
available that only takes a table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't intent to write it this way and will fix this, but I think it still works because the child argument happens to be at the first position.
val newDataType = dataType.getOrElse(column.asInstanceOf[ResolvedFieldName].field.dataType) | ||
ResolveDefaultColumns.analyze(column.name.last, newDataType, newDefaultExpression, | ||
"ALTER TABLE ALTER COLUMN") | ||
assert(columns.size == specs.size) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall we include FieldName
in AlterColumnSpec
to make it more type-safe? The AlterColumnSpec
can be an expression as well, similar to MergeAction
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be much nicer that way. I didn't know making AlterColumnSpec
an expression was an option. I've updated the PR with this change.
thanks, merging to master/4.0! |
### What changes were proposed in this pull request? We propose the following new syntax for altering multiple columns at the same time: ``` ALTER TABLE table_name ALTER COLUMN { { column_identifier | field_name } { COMMENT comment | { FIRST | AFTER identifier } | { SET | DROP } NOT NULL | TYPE data_type | SET DEFAULT clause | DROP DEFAULT } } [, ...] ``` For example: ``` ALTER TABLE test_table ALTER COLUMN a COMMENT "new comment", b TYPE BIGINT, x.y.z FIRST ``` This new syntax is backward compatible with the current syntax. To bound the complexity of the initial support of this syntax we place the following restrictions: + Altering the same column multiple times is not allowed + Altering a parent and a child column (for nested data type) is not allowed. + Altering v1 tables with this new syntax is not allowed. In terms of implementation, we modify the current `AlterColumn` logical plan to be `AlterColumns` that can take in multiple columns and `AlterColumnSpec`s. All `AlterColumnSpec`s are checked during analyzing phase, so if one of them is invalid (e.g., non-existent column, wrong type conversion, etc), the entire command will fail. The `AlterColumnSpec`s are transformed into `TableChange`s, which are passed to the `TableCatalog::alterTable` method. Therefore, the semantics of this new command (atomic vs non-atomic) depends on the implementation of this method. The `V2SessionCatalog::alterTable` currently applies all table changes to the catalog table and then send to the catalog in one request. As a result, column changes are by default applied to the catalog (HMS) atomically: either all changes are made or none are. For example, the above command produces the following plans: ``` == Physical Plan == AlterTable org.apache.spark.sql.delta.catalog.DeltaCatalog6d89c923, default.test_table, [org.apache.spark.sql.connector.catalog.TableChange$UpdateColumnCommentff58ec42, org.apache.spark.sql.connector.catalog.TableChange$UpdateColumnType7e7c730c, org.apache.spark.sql.connector.catalog.TableChange$UpdateColumnPositionbc842915] == Parsed Logical Plan == 'AlterColumns [unresolvedfieldname(a), unresolvedfieldname(b), unresolvedfieldname(x, y, z)], [AlterColumnSpec(None,None,Some(new comment),None,None), AlterColumnSpec(Some(LongType),None,None,None,None), AlterColumnSpec(None,None,None,Some(unresolvedfieldposition(FIRST)),None)] +- 'UnresolvedTable [test_table], ALTER TABLE ... ALTER COLUMN == Analyzed Logical Plan == AlterColumns [resolvedfieldname(StructField(a,IntegerType,true)), resolvedfieldname(StructField(b,IntegerType,true)), resolvedfieldname(x, y, StructField(z,IntegerType,true))], [AlterColumnSpec(None,None,Some(new comment),None,None), AlterColumnSpec(Some(LongType),None,None,None,None), AlterColumnSpec(None,None,None,Some(resolvedfieldposition(FIRST)),None)] +- ResolvedTable org.apache.spark.sql.delta.catalog.DeltaCatalog6d89c923, default.test_table, DeltaTableV2(...)),Some(default.test_table),None,Map()), [a#163, b#164, x#165] == Physical Plan == AlterTable org.apache.spark.sql.delta.catalog.DeltaCatalog6d89c923, default.test_table, [org.apache.spark.sql.connector.catalog.TableChange$UpdateColumnCommentff58ec42, org.apache.spark.sql.connector.catalog.TableChange$UpdateColumnType7e7c730c, org.apache.spark.sql.connector.catalog.TableChange$UpdateColumnPositionbc842915] ``` ### Why are the changes needed? The current ALTER TABLE ... ALTER COLUMN syntax allows altering only one column at a time. For a large table with many columns, a command must be run for each column, which can be slow due to the repeated preprocessing and I/O costs. A new syntax that enables specifying multiple columns could allow these costs to be shared across multiple column changes. ### Does this PR introduce _any_ user-facing change? Yes ### How was this patch tested? New unit tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #49559 from ctring/bulk-alter-column. Authored-by: Cuong Nguyen <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit 4b35282) Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
We propose the following new syntax for altering multiple columns at the same time:
For example:
This new syntax is backward compatible with the current syntax. To bound the complexity of the initial support of this syntax we place the following restrictions:
In terms of implementation, we modify the current
AlterColumn
logical plan to beAlterColumns
that can take in multiple columns andAlterColumnSpec
s.All
AlterColumnSpec
s are checked during analyzing phase, so if one of them is invalid (e.g., non-existent column, wrong type conversion, etc), the entire command will fail.The
AlterColumnSpec
s are transformed intoTableChange
s, which are passed to theTableCatalog::alterTable
method. Therefore, the semantics of this new command (atomic vs non-atomic) depends on the implementation of this method.The
V2SessionCatalog::alterTable
currently applies all table changes to the catalog table and then send to the catalog in one request. As a result, column changes are by default applied to the catalog (HMS) atomically: either all changes are made or none are.For example, the above command produces the following plans:
Why are the changes needed?
The current ALTER TABLE ... ALTER COLUMN syntax allows altering only one column at a time. For a large table with many columns, a command must be run for each column, which can be slow due to the repeated preprocessing and I/O costs. A new syntax that enables specifying multiple columns could allow these costs to be shared across multiple column changes.
Does this PR introduce any user-facing change?
Yes
How was this patch tested?
New unit tests
Was this patch authored or co-authored using generative AI tooling?
No