diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index f0a43bff8f1fb..9075e396df3e9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -1124,18 +1124,21 @@ object SQLConf { .createWithDefault(false) val PARQUET_INT96_AS_TIMESTAMP = buildConf("spark.sql.parquet.int96AsTimestamp") - .doc("Some Parquet-producing systems, in particular Impala, store Timestamp into INT96. " + - "Spark would also store Timestamp as INT96 because we need to avoid precision lost of the " + - "nanoseconds field. This flag tells Spark SQL to interpret INT96 data as a timestamp to " + - "provide compatibility with these systems.") + .doc( "This flag tells Spark SQL to interpret INT96 data as a timestamp " + + "to provide compatibility with any older versions of applications." + + "When this flag is enabled, Spark would also store Timestamp as INT96 " + + "because we need to avoid precision lost of the nanoseconds field." + + "The default value is set to false " + + "since INT96 has been deprecated in Parquet (see PARQUET-323).") .version("1.3.0") .booleanConf - .createWithDefault(true) + .createWithDefault(false) val PARQUET_INT96_TIMESTAMP_CONVERSION = buildConf("spark.sql.parquet.int96TimestampConversion") .doc("This controls whether timestamp adjustments should be applied to INT96 data when " + - "converting to timestamps, for data written by Impala. This is necessary because Impala " + - "stores INT96 data with a different timezone offset than Hive & Spark.") + "converting to timestamps, for data written by older versions of applications. " + + "Keeping this flag to ensure compatibility with any older versions of applications " + + "that stores INT96 data with a different timezone offset than Hive & Spark.") .version("2.3.0") .booleanConf .createWithDefault(false) @@ -1146,15 +1149,17 @@ object SQLConf { val PARQUET_OUTPUT_TIMESTAMP_TYPE = buildConf("spark.sql.parquet.outputTimestampType") .doc("Sets which Parquet timestamp type to use when Spark writes data to Parquet files. " + - "INT96 is a non-standard but commonly used timestamp type in Parquet. TIMESTAMP_MICROS " + - "is a standard timestamp type in Parquet, which stores number of microseconds from the " + - "Unix epoch. TIMESTAMP_MILLIS is also standard, but with millisecond precision, which " + - "means Spark has to truncate the microsecond portion of its timestamp value.") + "INT96 is a legacy, non-standard timestamp type, " + + "which has been deprecated in Parquet (see PARQUET-323)." + + "TIMESTAMP_MICROS (equivalent to INT64) is a standard timestamp type in Parquet, " + + "which stores number of microseconds from the Unix epoch. " + + "TIMESTAMP_MILLIS is also standard, but with millisecond precision, " + + "which means Spark has to truncate the microsecond portion of its timestamp value.") .version("2.3.0") .stringConf .transform(_.toUpperCase(Locale.ROOT)) .checkValues(ParquetOutputTimestampType.values.map(_.toString)) - .createWithDefault(ParquetOutputTimestampType.INT96.toString) + .createWithDefault(ParquetOutputTimestampType.TIMESTAMP_MICROS.toString) val PARQUET_COMPRESSION = buildConf("spark.sql.parquet.compression.codec") .doc("Sets the compression codec used when writing Parquet files. If either `compression` or " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala index daeb8e88a924b..9e015a544efc4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala @@ -504,7 +504,7 @@ class ParquetToSparkSchemaConverter( class SparkToParquetSchemaConverter( writeLegacyParquetFormat: Boolean = SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get, outputTimestampType: SQLConf.ParquetOutputTimestampType.Value = - SQLConf.ParquetOutputTimestampType.INT96, + SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS, useFieldId: Boolean = SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.defaultValue.get) { def this(conf: SQLConf) = this( diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdSchemaSuite.scala index d3aad531ed7a1..dd6543674d551 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdSchemaSuite.scala @@ -88,7 +88,7 @@ class ParquetFieldIdSchemaSuite extends ParquetSchemaTest { parquetSchema: String): Unit = { val converter = new SparkToParquetSchemaConverter( writeLegacyParquetFormat = false, - outputTimestampType = SQLConf.ParquetOutputTimestampType.INT96, + outputTimestampType = SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS, useFieldId = true) test(s"sql => parquet: $testName") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index 0acb21f3e6fba..05b6fce5b69c2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -102,7 +102,7 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession { parquetSchema: String, writeLegacyParquetFormat: Boolean, outputTimestampType: SQLConf.ParquetOutputTimestampType.Value = - SQLConf.ParquetOutputTimestampType.INT96, + SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS, inferTimestampNTZ: Boolean = true): Unit = { val converter = new SparkToParquetSchemaConverter( writeLegacyParquetFormat = writeLegacyParquetFormat, @@ -124,7 +124,7 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession { int96AsTimestamp: Boolean, writeLegacyParquetFormat: Boolean, outputTimestampType: SQLConf.ParquetOutputTimestampType.Value = - SQLConf.ParquetOutputTimestampType.INT96, + SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS, expectedParquetColumn: Option[ParquetColumn] = None, nanosAsLong: Boolean = false): Unit = { @@ -2305,7 +2305,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest { """.stripMargin, binaryAsString = true, int96AsTimestamp = true, - writeLegacyParquetFormat = true) + writeLegacyParquetFormat = true, + outputTimestampType = SQLConf.ParquetOutputTimestampType.INT96) testSchema( "Timestamp written and read as INT64 with TIMESTAMP_MILLIS", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala index 0a50f07a1b2bc..8a8ae668cdb6b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/internal/SQLConfSuite.scala @@ -349,11 +349,14 @@ class SQLConfSuite extends QueryTest with SharedSparkSession { // check default value assert(spark.sessionState.conf.parquetOutputTimestampType == - SQLConf.ParquetOutputTimestampType.INT96) + SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS) sqlConf.setConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE, "timestamp_micros") assert(spark.sessionState.conf.parquetOutputTimestampType == SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS) + sqlConf.setConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE, "timestamp_millis") + assert(spark.sessionState.conf.parquetOutputTimestampType == + SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS) sqlConf.setConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE, "int96") assert(spark.sessionState.conf.parquetOutputTimestampType == SQLConf.ParquetOutputTimestampType.INT96)