diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/timestamp-ntz.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/timestamp-ntz.sql.out index e92a392e22b67..9ab5b2445fc3a 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/timestamp-ntz.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/timestamp-ntz.sql.out @@ -137,3 +137,39 @@ select timestamp_ntz'2022-01-01 00:00:00' > timestamp_ltz'2022-01-01 00:00:00' select timestamp_ntz'2022-01-01 00:00:00' < timestamp_ltz'2022-01-01 00:00:00' -- !query analysis [Analyzer test output redacted due to nondeterminism] + + +-- !query +CREATE TABLE a (a timestamp_ntz, b int) using parquet PARTITIONED BY(a) +-- !query analysis +CreateDataSourceTableCommand `spark_catalog`.`default`.`a`, false + + +-- !query +INSERT INTO a PARTITION(a=timestamp_ntz'2018-11-17 13:33:33') VALUES (1) +-- !query analysis +InsertIntoHadoopFsRelationCommand file:[not included in comparison]/{warehouse_dir}/a, [a=2018-11-17 13:33:33], false, [a#x], Parquet, [path=file:[not included in comparison]/{warehouse_dir}/a], Append, `spark_catalog`.`default`.`a`, org.apache.spark.sql.execution.datasources.CatalogFileIndex(file:[not included in comparison]/{warehouse_dir}/a), [b, a] ++- Project [b#x, cast(2018-11-17 13:33:33 as timestamp_ntz) AS a#x] + +- Project [cast(col1#x as int) AS b#x] + +- LocalRelation [col1#x] + + +-- !query +DESC FORMATTED a +-- !query analysis +DescribeTableCommand `spark_catalog`.`default`.`a`, true, [col_name#x, data_type#x, comment#x] + + +-- !query +SELECT * FROM a +-- !query analysis +Project [b#x, a#x] ++- SubqueryAlias spark_catalog.default.a + +- Relation spark_catalog.default.a[b#x,a#x] parquet + + +-- !query +DROP TABLE a +-- !query analysis +DropTable false, false ++- ResolvedIdentifier V2SessionCatalog(spark_catalog), default.a diff --git a/sql/core/src/test/resources/sql-tests/inputs/timestamp-ntz.sql b/sql/core/src/test/resources/sql-tests/inputs/timestamp-ntz.sql index 07901093cfba8..7996f5879bf7b 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/timestamp-ntz.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/timestamp-ntz.sql @@ -31,3 +31,9 @@ select timestamp_ntz'2022-01-01 00:00:00' < date'2022-01-01'; select timestamp_ntz'2022-01-01 00:00:00' = timestamp_ltz'2022-01-01 00:00:00'; select timestamp_ntz'2022-01-01 00:00:00' > timestamp_ltz'2022-01-01 00:00:00'; select timestamp_ntz'2022-01-01 00:00:00' < timestamp_ltz'2022-01-01 00:00:00'; + +CREATE TABLE a (a timestamp_ntz, b int) using parquet PARTITIONED BY(a); +INSERT INTO a PARTITION(a=timestamp_ntz'2018-11-17 13:33:33') VALUES (1); +DESC FORMATTED a; +SELECT * FROM a; +DROP TABLE a; diff --git a/sql/core/src/test/resources/sql-tests/results/timestamp-ntz.sql.out b/sql/core/src/test/resources/sql-tests/results/timestamp-ntz.sql.out index 3a473dad828a9..9e37bf4e9caa5 100644 --- a/sql/core/src/test/resources/sql-tests/results/timestamp-ntz.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/timestamp-ntz.sql.out @@ -173,3 +173,59 @@ select timestamp_ntz'2022-01-01 00:00:00' < timestamp_ltz'2022-01-01 00:00:00' struct<(TIMESTAMP_NTZ '2022-01-01 00:00:00' < TIMESTAMP '2022-01-01 00:00:00'):boolean> -- !query output false + + +-- !query +CREATE TABLE a (a timestamp_ntz, b int) using parquet PARTITIONED BY(a) +-- !query schema +struct<> +-- !query output + + + +-- !query +INSERT INTO a PARTITION(a=timestamp_ntz'2018-11-17 13:33:33') VALUES (1) +-- !query schema +struct<> +-- !query output + + + +-- !query +DESC FORMATTED a +-- !query schema +struct +-- !query output +b int +a timestamp_ntz +# Partition Information +# col_name data_type comment +a timestamp_ntz + +# Detailed Table Information +Catalog spark_catalog +Database default +Table a +Created Time [not included in comparison] +Last Access [not included in comparison] +Created By [not included in comparison] +Type MANAGED +Provider parquet +Location [not included in comparison]/{warehouse_dir}/a +Partition Provider Catalog + + +-- !query +SELECT * FROM a +-- !query schema +struct +-- !query output +1 2018-11-17 13:33:33 + + +-- !query +DROP TABLE a +-- !query schema +struct<> +-- !query output + diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index d91d762048d29..21bdbd40caa81 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -2418,6 +2418,17 @@ abstract class DDLSuite extends QueryTest with DDLSuiteBase { ) } } + + test("SPARK-51418: Partitioned by Hive type incompatible columns") { + withTable("t1") { + sql("CREATE TABLE t1(a timestamp_ntz, b INTEGER) USING parquet PARTITIONED BY (a)") + sql("INSERT INTO t1 PARTITION(a=timestamp_ntz'2018-11-17 13:33:33') VALUES (1)") + checkAnswer(sql("SELECT * FROM t1"), sql("select 1, timestamp_ntz'2018-11-17 13:33:33'")) + sql("ALTER TABLE t1 ADD COLUMN (c string)") + checkAnswer(sql("SELECT * FROM t1"), + sql("select 1, null, timestamp_ntz'2018-11-17 13:33:33'")) + } + } } object FakeLocalFsFileSystem { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index 90f8a3a85d70c..57f6f999b6ade 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -468,7 +468,9 @@ private[hive] class HiveClientImpl( // Note: Hive separates partition columns and the schema, but for us the // partition columns are part of the schema val (cols, partCols) = try { - (h.getCols.asScala.map(fromHiveColumn), h.getPartCols.asScala.map(fromHiveColumn)) + (h.getCols.asScala.map(fromHiveColumn), + h.getPartCols.asScala.filter(_.getType != INCOMPATIBLE_PARTITION_TYPE_PLACEHOLDER) + .map(fromHiveColumn)) } catch { case ex: SparkException => throw QueryExecutionErrors.convertHiveTableToCatalogTableError( @@ -1093,6 +1095,13 @@ private[hive] class HiveClientImpl( } private[hive] object HiveClientImpl extends Logging { + // We can not pass raw catalogString of Hive incompatible types to Hive metastore. + // For regular columns, we have already empty the schema and read/write using table properties. + // For partition columns, we need to set them to the hive table and also avoid verification + // failures from HMS. We use the TYPE_PLACEHOLDER below to bypass the verification. + // See org.apache.hadoop.hive.metastore.MetaStoreUtils#validateColumnType for more details. + + lazy val INCOMPATIBLE_PARTITION_TYPE_PLACEHOLDER = "" /** Converts the native StructField to Hive's FieldSchema. */ def toHiveColumn(c: StructField): FieldSchema = { // For Hive Serde, we still need to to restore the raw type for char and varchar type. @@ -1167,10 +1176,17 @@ private[hive] object HiveClientImpl extends Logging { hiveTable.setProperty("EXTERNAL", "TRUE") } // Note: In Hive the schema and partition columns must be disjoint sets - val (partCols, schema) = table.schema.map(toHiveColumn).partition { c => - table.partitionColumnNames.contains(c.getName) + val (partSchema, schema) = table.schema.partition { c => + table.partitionColumnNames.contains(c.name) + } + + val partCols = partSchema.map { + case c if !HiveExternalCatalog.isHiveCompatibleDataType(c.dataType) => + new FieldSchema(c.name, INCOMPATIBLE_PARTITION_TYPE_PLACEHOLDER, c.getComment().orNull) + case c => toHiveColumn(c) } - hiveTable.setFields(schema.asJava) + + hiveTable.setFields(schema.map(toHiveColumn).asJava) hiveTable.setPartCols(partCols.asJava) Option(table.owner).filter(_.nonEmpty).orElse(userName).foreach(hiveTable.setOwner) hiveTable.setCreateTime(MILLISECONDS.toSeconds(table.createTime).toInt)