Skip to content

Commit 89cdd63

Browse files
yhuang-dbgengliangwang
authored andcommitted
[SPARK-54761][SQL] Throw unsupportedTableOperation for constraint operation on DSv1/HMS table
### What changes were proposed in this pull request? This PR throws unsupportedTableOperation for constraint operations on DSv1/HMS table. The operations include: - CreateTable/CreateTableAsSelect with constraint - AddConstraint - DropConstraint - AddCheckConstraint ### Why are the changes needed? Constraints (DSv2) are not supported on DSv1/HMS tables. Currently, it does not throw exceptions and causes [confusion](#50761 (comment)). ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New unit tests on dsv1 and hive table. ### Was this patch authored or co-authored using generative AI tooling? Generated-by: claude-4.5-opus Closes #53532 from yhuang-db/SPARK-54761_unsupportedTableOperation-for-constraint-on-dsv1-hms. Authored-by: yhuang-db <[email protected]> Signed-off-by: Gengliang Wang <[email protected]> (cherry picked from commit 4b90a26) Signed-off-by: Gengliang Wang <[email protected]>
1 parent 9443122 commit 89cdd63

File tree

6 files changed

+217
-37
lines changed

6 files changed

+217
-37
lines changed

sql/core/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveSessionCatalog.scala

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import org.apache.spark.SparkException
2121
import org.apache.spark.internal.LogKeys.CONFIG
2222
import org.apache.spark.sql.SaveMode
2323
import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
24-
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils, ClusterBySpec}
24+
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType, CatalogUtils, ClusterBySpec, HiveTableRelation}
2525
import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute}
2626
import org.apache.spark.sql.catalyst.plans.logical._
2727
import org.apache.spark.sql.catalyst.rules.Rule
@@ -31,7 +31,7 @@ import org.apache.spark.sql.connector.catalog.{CatalogExtension, CatalogManager,
3131
import org.apache.spark.sql.connector.expressions.Transform
3232
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
3333
import org.apache.spark.sql.execution.command._
34-
import org.apache.spark.sql.execution.datasources.{CreateTable => CreateTableV1}
34+
import org.apache.spark.sql.execution.datasources.{CreateTable => CreateTableV1, LogicalRelation}
3535
import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
3636
import org.apache.spark.sql.internal.{HiveSerDe, SQLConf}
3737
import org.apache.spark.sql.internal.connector.V1Function
@@ -128,6 +128,25 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
128128
case DropColumns(ResolvedV1TableIdentifier(ident), _, _) =>
129129
throw QueryCompilationErrors.unsupportedTableOperationError(ident, "DROP COLUMN")
130130

131+
// V1 and hive tables do not support constraints
132+
case AddConstraint(ResolvedV1TableIdentifier(ident), _) =>
133+
throw QueryCompilationErrors.unsupportedTableOperationError(ident, "ADD CONSTRAINT")
134+
135+
case DropConstraint(ResolvedV1TableIdentifier(ident), _, _, _) =>
136+
throw QueryCompilationErrors.unsupportedTableOperationError(ident, "DROP CONSTRAINT")
137+
138+
case a: AddCheckConstraint
139+
if a.child.exists {
140+
case _: LogicalRelation => true
141+
case _: HiveTableRelation => true
142+
case _ => false
143+
} =>
144+
val tableIdent = a.child.collectFirst {
145+
case l: LogicalRelation => l.catalogTable.get.identifier
146+
case h: HiveTableRelation => h.tableMeta.identifier
147+
}.get
148+
throw QueryCompilationErrors.unsupportedTableOperationError(tableIdent, "ADD CONSTRAINT")
149+
131150
case SetTableProperties(ResolvedV1TableIdentifier(ident), props) =>
132151
AlterTableSetPropertiesCommand(ident, props, isView = false)
133152

@@ -187,6 +206,10 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
187206
c.tableSpec.provider, tableSpec.options, c.tableSpec.location, c.tableSpec.serde,
188207
ctas = false)
189208
if (!isV2Provider(provider)) {
209+
if (tableSpec.constraints.nonEmpty) {
210+
throw QueryCompilationErrors.unsupportedTableOperationError(
211+
ident, "CONSTRAINT")
212+
}
190213
constructV1TableCmd(None, c.tableSpec, ident, c.tableSchema, c.partitioning,
191214
c.ignoreIfExists, storageFormat, provider)
192215
} else {
@@ -203,6 +226,10 @@ class ResolveSessionCatalog(val catalogManager: CatalogManager)
203226
ctas = true)
204227

205228
if (!isV2Provider(provider)) {
229+
if (tableSpec.constraints.nonEmpty) {
230+
throw QueryCompilationErrors.unsupportedTableOperationError(
231+
ident, "CONSTRAINT")
232+
}
206233
constructV1TableCmd(Some(c.query), c.tableSpec, ident, new StructType, c.partitioning,
207234
c.ignoreIfExists, storageFormat, provider)
208235
} else {
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution.command.v1
19+
20+
import org.apache.spark.sql.AnalysisException
21+
import org.apache.spark.sql.execution.command.DDLCommandTestUtils
22+
23+
/**
24+
* This base suite contains unified tests for table constraints (CHECK, PRIMARY KEY, UNIQUE,
25+
* FOREIGN KEY) that check V1 table catalogs. V1 tables do not support table constraints.
26+
* The tests that cannot run for all V1 catalogs are located in more specific test suites:
27+
*
28+
* - V1 In-Memory catalog:
29+
* `org.apache.spark.sql.execution.command.v1.TableConstraintSuite`
30+
* - V1 Hive External catalog:
31+
* `org.apache.spark.sql.hive.execution.command.TableConstraintSuite`
32+
*/
33+
trait TableConstraintSuiteBase extends DDLCommandTestUtils {
34+
override val command = "TABLE CONSTRAINT"
35+
36+
private val constraintTypes = Seq(
37+
"CHECK (id > 0)",
38+
"PRIMARY KEY (id)",
39+
"UNIQUE (id)",
40+
"FOREIGN KEY (id) REFERENCES t2(id)"
41+
)
42+
43+
gridTest("SPARK-54761: create table with constraint - should fail")(constraintTypes)
44+
{ constraint =>
45+
withNamespaceAndTable("ns", "table_1") { t =>
46+
val createTableSql = s"CREATE TABLE $t (id INT, CONSTRAINT c1 $constraint) $defaultUsing"
47+
val error = intercept[AnalysisException] {
48+
sql(createTableSql)
49+
}
50+
checkError(
51+
exception = error,
52+
condition = "UNSUPPORTED_FEATURE.TABLE_OPERATION",
53+
parameters = Map(
54+
"tableName" -> s"`$catalog`.`ns`.`table_1`",
55+
"operation" -> "CONSTRAINT"
56+
)
57+
)
58+
}
59+
}
60+
61+
gridTest("SPARK-54761: alter table add constraint - should fail")(constraintTypes) { constraint =>
62+
withNamespaceAndTable("ns", "table_1") { t =>
63+
sql(s"CREATE TABLE $t (id INT) $defaultUsing")
64+
val alterTableSql = s"ALTER TABLE $t ADD CONSTRAINT c1 $constraint"
65+
val error = intercept[AnalysisException] {
66+
sql(alterTableSql)
67+
}
68+
checkError(
69+
exception = error,
70+
condition = "UNSUPPORTED_FEATURE.TABLE_OPERATION",
71+
parameters = Map(
72+
"tableName" -> s"`$catalog`.`ns`.`table_1`",
73+
"operation" -> "ADD CONSTRAINT"
74+
)
75+
)
76+
}
77+
}
78+
79+
test("SPARK-54761: alter table drop constraint - should fail") {
80+
withNamespaceAndTable("ns", "table_1") { t =>
81+
sql(s"CREATE TABLE $t (id INT) $defaultUsing")
82+
val error = intercept[AnalysisException] {
83+
sql(s"ALTER TABLE $t DROP CONSTRAINT c1")
84+
}
85+
checkError(
86+
exception = error,
87+
condition = "UNSUPPORTED_FEATURE.TABLE_OPERATION",
88+
parameters = Map(
89+
"tableName" -> s"`$catalog`.`ns`.`table_1`",
90+
"operation" -> "DROP CONSTRAINT"
91+
)
92+
)
93+
}
94+
}
95+
96+
// REPLACE TABLE is not supported for V1 tables, so the error should be about
97+
// REPLACE TABLE, not about CONSTRAINT
98+
gridTest("SPARK-54761: replace table with constraint - should fail")(constraintTypes)
99+
{ constraint =>
100+
withNamespaceAndTable("ns", "table_1") { t =>
101+
val replaceTableSql = s"REPLACE TABLE $t (id INT, CONSTRAINT c1 $constraint) $defaultUsing"
102+
val error = intercept[AnalysisException] {
103+
sql(replaceTableSql)
104+
}
105+
checkError(
106+
exception = error,
107+
condition = "UNSUPPORTED_FEATURE.TABLE_OPERATION",
108+
parameters = Map(
109+
"tableName" -> s"`$catalog`.`ns`.`table_1`",
110+
"operation" -> "REPLACE TABLE"
111+
)
112+
)
113+
}
114+
}
115+
}
116+
117+
/**
118+
* The class contains tests for table constraints to check V1 In-Memory table catalog.
119+
*/
120+
class TableConstraintSuite extends TableConstraintSuiteBase with CommandSuiteBase
121+

sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/CheckConstraintSuite.scala

Lines changed: 32 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ package org.apache.spark.sql.execution.command.v2
2020
import org.apache.spark.SparkRuntimeException
2121
import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
2222
import org.apache.spark.sql.catalyst.plans.logical.Filter
23+
import org.apache.spark.sql.catalyst.util.AttributeNameParser
24+
import org.apache.spark.sql.catalyst.util.QuotingUtils.quoteNameParts
2325
import org.apache.spark.sql.connector.catalog.Table
2426
import org.apache.spark.sql.connector.catalog.constraints.Check
2527
import org.apache.spark.sql.execution.command.DDLCommandTestUtils
@@ -29,11 +31,11 @@ class CheckConstraintSuite extends QueryTest with CommandSuiteBase with DDLComma
2931
override protected def command: String = "Check CONSTRAINT"
3032

3133
test("Nondeterministic expression -- alter table") {
32-
withTable("t") {
33-
sql("create table t(i double)")
34+
withNamespaceAndTable("ns", "tbl", nonPartitionCatalog) { t =>
35+
sql(s"CREATE TABLE $t (i DOUBLE) $defaultUsing")
3436
val query =
35-
"""
36-
|ALTER TABLE t ADD CONSTRAINT c1 CHECK (i > rand(0))
37+
s"""
38+
|ALTER TABLE $t ADD CONSTRAINT c1 CHECK (i > rand(0))
3739
|""".stripMargin
3840
val error = intercept[AnalysisException] {
3941
sql(query)
@@ -45,8 +47,8 @@ class CheckConstraintSuite extends QueryTest with CommandSuiteBase with DDLComma
4547
parameters = Map("checkCondition" -> "i > rand(0)"),
4648
context = ExpectedContext(
4749
fragment = "i > rand(0)",
48-
start = 40,
49-
stop = 50
50+
start = 67,
51+
stop = 77
5052
)
5153
)
5254
}
@@ -77,27 +79,31 @@ class CheckConstraintSuite extends QueryTest with CommandSuiteBase with DDLComma
7779
}
7880

7981
test("Expression referring a column of another table -- alter table") {
80-
withTable("t", "t2") {
81-
sql("CREATE TABLE t(i DOUBLE) USING parquet")
82-
sql("CREATE TABLE t2(j STRING) USING parquet")
83-
val query =
84-
"""
85-
|ALTER TABLE t ADD CONSTRAINT c1 CHECK (len(t2.j) > 0)
86-
|""".stripMargin
87-
val error = intercept[AnalysisException] {
88-
sql(query)
89-
}
90-
checkError(
91-
exception = error,
92-
condition = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
93-
sqlState = "42703",
94-
parameters = Map("objectName" -> "`t2`.`j`", "proposal" -> "`t`.`i`"),
95-
context = ExpectedContext(
96-
fragment = "t2.j",
97-
start = 44,
98-
stop = 47
82+
withNamespaceAndTable("ns", "tbl_1", nonPartitionCatalog) { t1 =>
83+
withNamespaceAndTable("ns", "tbl_2", nonPartitionCatalog) { t2 =>
84+
sql(s"CREATE TABLE $t1(i DOUBLE) $defaultUsing")
85+
sql(s"CREATE TABLE $t2(j STRING) $defaultUsing")
86+
val query =
87+
s"""
88+
|ALTER TABLE $t1 ADD CONSTRAINT c1 CHECK (len($t2.j) > 0)
89+
|""".stripMargin
90+
val error = intercept[AnalysisException] {
91+
sql(query)
92+
}
93+
checkError(
94+
exception = error,
95+
condition = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
96+
sqlState = "42703",
97+
parameters = Map(
98+
"objectName" -> quoteNameParts(AttributeNameParser.parseAttributeName(s"$t2.j")),
99+
"proposal" -> quoteNameParts(AttributeNameParser.parseAttributeName(s"$t1.i"))),
100+
context = ExpectedContext(
101+
fragment = s"$t2.j",
102+
start = 73,
103+
stop = 104
104+
)
99105
)
100-
)
106+
}
101107
}
102108
}
103109

sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/DescribeTableSuite.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -217,10 +217,10 @@ class DescribeTableSuite extends command.DescribeTableSuiteBase
217217

218218
test("desc table constraints") {
219219
withNamespaceAndTable("ns", "pk_table", nonPartitionCatalog) { tbl =>
220-
withTable("fk_table") {
220+
withNamespaceAndTable("ns", "fk_table", nonPartitionCatalog) { fkTable =>
221221
sql(
222222
s"""
223-
|CREATE TABLE fk_table (id INT PRIMARY KEY) USING parquet
223+
|CREATE TABLE $fkTable (id INT PRIMARY KEY) $defaultUsing
224224
""".stripMargin)
225225
sql(
226226
s"""
@@ -230,7 +230,7 @@ class DescribeTableSuite extends command.DescribeTableSuiteBase
230230
| b STRING,
231231
| c STRING,
232232
| PRIMARY KEY (id),
233-
| CONSTRAINT fk_a FOREIGN KEY (a) REFERENCES fk_table(id) RELY,
233+
| CONSTRAINT fk_a FOREIGN KEY (a) REFERENCES $fkTable(id) RELY,
234234
| CONSTRAINT uk_b UNIQUE (b),
235235
| CONSTRAINT uk_a_c UNIQUE (a, c),
236236
| CONSTRAINT c1 CHECK (c IS NOT NULL),
@@ -243,7 +243,7 @@ class DescribeTableSuite extends command.DescribeTableSuiteBase
243243
var expectedConstraintsDdl = Array(
244244
"# Constraints,,",
245245
"pk_table_pk,PRIMARY KEY (id) NOT ENFORCED,",
246-
"fk_a,FOREIGN KEY (a) REFERENCES fk_table (id) NOT ENFORCED RELY,",
246+
s"fk_a,FOREIGN KEY (a) REFERENCES $fkTable (id) NOT ENFORCED RELY,",
247247
"uk_b,UNIQUE (b) NOT ENFORCED,",
248248
"uk_a_c,UNIQUE (a, c) NOT ENFORCED,",
249249
"c1,CHECK (c IS NOT NULL) ENFORCED,",

sql/core/src/test/scala/org/apache/spark/sql/execution/command/v2/ShowCreateTableSuite.scala

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -184,13 +184,13 @@ class ShowCreateTableSuite extends command.ShowCreateTableSuiteBase with Command
184184

185185
test("show table constraints") {
186186
withNamespaceAndTable("ns", "tbl", nonPartitionCatalog) { t =>
187-
withTable("other_table") {
187+
withNamespaceAndTable("ns", "other_table", nonPartitionCatalog) { otherTable =>
188188
sql(
189189
s"""
190-
|CREATE TABLE other_table (
190+
|CREATE TABLE $otherTable (
191191
| id STRING PRIMARY KEY
192192
|)
193-
|USING parquet
193+
|$defaultUsing
194194
""".stripMargin)
195195
sql(
196196
s"""
@@ -200,7 +200,7 @@ class ShowCreateTableSuite extends command.ShowCreateTableSuiteBase with Command
200200
| c STRING,
201201
| PRIMARY KEY (a),
202202
| CONSTRAINT uk_b UNIQUE (b),
203-
| CONSTRAINT fk_c FOREIGN KEY (c) REFERENCES other_table(id) RELY,
203+
| CONSTRAINT fk_c FOREIGN KEY (c) REFERENCES $otherTable(id) RELY,
204204
| CONSTRAINT c1 CHECK (c IS NOT NULL),
205205
| CONSTRAINT c2 CHECK (a > 0)
206206
|)
@@ -214,7 +214,7 @@ class ShowCreateTableSuite extends command.ShowCreateTableSuiteBase with Command
214214
"c STRING,",
215215
"CONSTRAINT tbl_pk PRIMARY KEY (a) NOT ENFORCED NORELY,",
216216
"CONSTRAINT uk_b UNIQUE (b) NOT ENFORCED NORELY,",
217-
"CONSTRAINT fk_c FOREIGN KEY (c) REFERENCES other_table (id) NOT ENFORCED RELY,",
217+
s"CONSTRAINT fk_c FOREIGN KEY (c) REFERENCES $otherTable (id) NOT ENFORCED RELY,",
218218
"CONSTRAINT c1 CHECK (c IS NOT NULL) ENFORCED NORELY,"
219219
)
220220
assert(showDDL === expectedDDLPrefix ++ Array(
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.hive.execution.command
19+
20+
import org.apache.spark.sql.execution.command.v1
21+
22+
/**
23+
* The class contains tests for table constraints to check V1 Hive external table catalog.
24+
*/
25+
class TableConstraintSuite extends v1.TableConstraintSuiteBase with CommandSuiteBase
26+

0 commit comments

Comments
 (0)