Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-51359][CORE][SQL] Set INT64 as the default timestamp type for Parquet files #50215

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1124,18 +1124,21 @@ object SQLConf {
.createWithDefault(false)

val PARQUET_INT96_AS_TIMESTAMP = buildConf("spark.sql.parquet.int96AsTimestamp")
.doc("Some Parquet-producing systems, in particular Impala, store Timestamp into INT96. " +
"Spark would also store Timestamp as INT96 because we need to avoid precision lost of the " +
"nanoseconds field. This flag tells Spark SQL to interpret INT96 data as a timestamp to " +
"provide compatibility with these systems.")
.doc( "This flag tells Spark SQL to interpret INT96 data as a timestamp " +
"to provide compatibility with any older versions of applications." +
"When this flag is enabled, Spark would also store Timestamp as INT96 " +
"because we need to avoid precision lost of the nanoseconds field." +
"The default value is set to false " +
"since INT96 has been deprecated in Parquet (see PARQUET-323).")
.version("1.3.0")
.booleanConf
.createWithDefault(true)
.createWithDefault(false)

val PARQUET_INT96_TIMESTAMP_CONVERSION = buildConf("spark.sql.parquet.int96TimestampConversion")
.doc("This controls whether timestamp adjustments should be applied to INT96 data when " +
"converting to timestamps, for data written by Impala. This is necessary because Impala " +
"stores INT96 data with a different timezone offset than Hive & Spark.")
"converting to timestamps, for data written by older versions of applications. " +
"Keeping this flag to ensure compatibility with any older versions of applications " +
"that stores INT96 data with a different timezone offset than Hive & Spark.")
.version("2.3.0")
.booleanConf
.createWithDefault(false)
Expand All @@ -1146,15 +1149,17 @@ object SQLConf {

val PARQUET_OUTPUT_TIMESTAMP_TYPE = buildConf("spark.sql.parquet.outputTimestampType")
.doc("Sets which Parquet timestamp type to use when Spark writes data to Parquet files. " +
"INT96 is a non-standard but commonly used timestamp type in Parquet. TIMESTAMP_MICROS " +
"is a standard timestamp type in Parquet, which stores number of microseconds from the " +
"Unix epoch. TIMESTAMP_MILLIS is also standard, but with millisecond precision, which " +
"means Spark has to truncate the microsecond portion of its timestamp value.")
"INT96 is a legacy, non-standard timestamp type, " +
"which has been deprecated in Parquet (see PARQUET-323)." +
"TIMESTAMP_MICROS (equivalent to INT64) is a standard timestamp type in Parquet, " +
"which stores number of microseconds from the Unix epoch. " +
"TIMESTAMP_MILLIS is also standard, but with millisecond precision, " +
"which means Spark has to truncate the microsecond portion of its timestamp value.")
.version("2.3.0")
.stringConf
.transform(_.toUpperCase(Locale.ROOT))
.checkValues(ParquetOutputTimestampType.values.map(_.toString))
.createWithDefault(ParquetOutputTimestampType.INT96.toString)
.createWithDefault(ParquetOutputTimestampType.TIMESTAMP_MICROS.toString)

val PARQUET_COMPRESSION = buildConf("spark.sql.parquet.compression.codec")
.doc("Sets the compression codec used when writing Parquet files. If either `compression` or " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,7 @@ class ParquetToSparkSchemaConverter(
class SparkToParquetSchemaConverter(
writeLegacyParquetFormat: Boolean = SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get,
outputTimestampType: SQLConf.ParquetOutputTimestampType.Value =
SQLConf.ParquetOutputTimestampType.INT96,
SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS,
useFieldId: Boolean = SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.defaultValue.get) {

def this(conf: SQLConf) = this(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class ParquetFieldIdSchemaSuite extends ParquetSchemaTest {
parquetSchema: String): Unit = {
val converter = new SparkToParquetSchemaConverter(
writeLegacyParquetFormat = false,
outputTimestampType = SQLConf.ParquetOutputTimestampType.INT96,
outputTimestampType = SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS,
useFieldId = true)

test(s"sql => parquet: $testName") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
parquetSchema: String,
writeLegacyParquetFormat: Boolean,
outputTimestampType: SQLConf.ParquetOutputTimestampType.Value =
SQLConf.ParquetOutputTimestampType.INT96,
SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS,
inferTimestampNTZ: Boolean = true): Unit = {
val converter = new SparkToParquetSchemaConverter(
writeLegacyParquetFormat = writeLegacyParquetFormat,
Expand All @@ -124,7 +124,7 @@ abstract class ParquetSchemaTest extends ParquetTest with SharedSparkSession {
int96AsTimestamp: Boolean,
writeLegacyParquetFormat: Boolean,
outputTimestampType: SQLConf.ParquetOutputTimestampType.Value =
SQLConf.ParquetOutputTimestampType.INT96,
SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS,
expectedParquetColumn: Option[ParquetColumn] = None,
nanosAsLong: Boolean = false): Unit = {

Expand Down Expand Up @@ -2305,7 +2305,8 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
""".stripMargin,
binaryAsString = true,
int96AsTimestamp = true,
writeLegacyParquetFormat = true)
writeLegacyParquetFormat = true,
outputTimestampType = SQLConf.ParquetOutputTimestampType.INT96)

testSchema(
"Timestamp written and read as INT64 with TIMESTAMP_MILLIS",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -349,11 +349,14 @@ class SQLConfSuite extends QueryTest with SharedSparkSession {

// check default value
assert(spark.sessionState.conf.parquetOutputTimestampType ==
SQLConf.ParquetOutputTimestampType.INT96)
SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS)

sqlConf.setConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE, "timestamp_micros")
assert(spark.sessionState.conf.parquetOutputTimestampType ==
SQLConf.ParquetOutputTimestampType.TIMESTAMP_MICROS)
sqlConf.setConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE, "timestamp_millis")
assert(spark.sessionState.conf.parquetOutputTimestampType ==
SQLConf.ParquetOutputTimestampType.TIMESTAMP_MILLIS)
sqlConf.setConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE, "int96")
assert(spark.sessionState.conf.parquetOutputTimestampType ==
SQLConf.ParquetOutputTimestampType.INT96)
Expand Down