From d1d2561c8edb2fd4df50c98f701e782144b71f0d Mon Sep 17 00:00:00 2001 From: calilisantos Date: Thu, 6 Mar 2025 16:42:56 -0300 Subject: [PATCH 1/9] test(TimeType): add UDF cases --- .../scala/org/apache/spark/sql/UDFSuite.scala | 30 ++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala index d736e9494bd36..94dc4086b59f5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala @@ -862,7 +862,7 @@ class UDFSuite extends QueryTest with SharedSparkSession { .select(myUdf1(Column("col"))), Row(ArrayBuffer(100))) - val myUdf2 = udf((a: immutable.ArraySeq[Int]) => + val myUdf2 = udf((a: immutable.ArraySeq[Int]) => immutable.ArraySeq.unsafeWrapArray[Int]((a :+ 5 :+ 6).toArray)) checkAnswer(Seq(Array(1, 2, 3)) .toDF("col") @@ -1197,6 +1197,34 @@ class UDFSuite extends QueryTest with SharedSparkSession { Row(Row(null))) } + test("SPARK-51402: Test TimeType in UDF") { + // Regular case + val input = Seq(java.time.LocalDateTime.parse("2021-01-01T00:00:00.000000")).toDF("dateTime") + val plusHour = udf((l: java.time.LocalDateTime) => l.plusHours(1)) + val result = input.select(plusHour($"dateTime").cast(TimeType).as("newTime")) + checkAnswer(result, Row(java.time.LocalTime.parse("01:00:00.000000")) :: Nil) + assert(result.schema === new StructType().add("newTime", TimeType)) + // VALIDAR: UDF produces `null` + val nullFunc = udf((_: java.time.LocalTime) => null.asInstanceOf[java.time.LocalTime]) + val nullResult = input.select(nullFunc($"dateTime").as("nullTime")) + checkAnswer(nullResult, Row(null) :: Nil) + assert(nullResult.schema === new StructType().add("nullTime", TimeType)) + // TODO: Input parameter of UDF is null + val nullInput = Seq(null.asInstanceOf[java.time.LocalTime]).toDF("nullTime") + val constDuration = udf((_: java.time.LocalTime) => + java.time.LocalTime.parse("01:00:00.000000")) + val constResult = nullInput.select(constDuration($"nullTime").as("oneHour")) + checkAnswer(constResult, Row(java.time.LocalTime.parse("01:00:00.000000")) :: Nil) + assert(constResult.schema === new StructType().add("oneHour", TimeType)) + // TODO: Error in the conversion of UDF result to the internal representation of time + val overflowFunc = udf((l: java.time.LocalTime) => l.plusDays(Long.MaxValue)) + val e = intercept[SparkException] { + input.select(overflowFunc($"dateTime")).collect() + } + assert(e.getCondition == "FAILED_EXECUTE_UDF") + assert(e.getCause.isInstanceOf[java.lang.ArithmeticException]) + } + test("char/varchar as UDF return type") { Seq(CharType(5), VarcharType(5)).foreach { dt => val f = udf( From c05663a05b86671a91ab8d8eddacf54d174cbacb Mon Sep 17 00:00:00 2001 From: calilisantos Date: Thu, 6 Mar 2025 20:07:03 -0300 Subject: [PATCH 2/9] test(TimeType): some adjusts --- .../scala/org/apache/spark/sql/UDFSuite.scala | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala index 94dc4086b59f5..e478ec211f53a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala @@ -1198,28 +1198,30 @@ class UDFSuite extends QueryTest with SharedSparkSession { } test("SPARK-51402: Test TimeType in UDF") { + // Constant + val mockTimeStr = "00:00:00.000000" // Regular case - val input = Seq(java.time.LocalDateTime.parse("2021-01-01T00:00:00.000000")).toDF("dateTime") - val plusHour = udf((l: java.time.LocalDateTime) => l.plusHours(1)) - val result = input.select(plusHour($"dateTime").cast(TimeType).as("newTime")) + val input = Seq(java.time.LocalTime.parse(mockTimeStr)).toDF("currentTime") + val plusHour = udf((l: java.time.LocalTime) => l.plusHours(1)) + val result = input.select(plusHour($"currentTime").cast(TimeType).as("newTime")) checkAnswer(result, Row(java.time.LocalTime.parse("01:00:00.000000")) :: Nil) assert(result.schema === new StructType().add("newTime", TimeType)) // VALIDAR: UDF produces `null` val nullFunc = udf((_: java.time.LocalTime) => null.asInstanceOf[java.time.LocalTime]) - val nullResult = input.select(nullFunc($"dateTime").as("nullTime")) + val nullResult = input.select(nullFunc($"currentTime").as("nullTime")) checkAnswer(nullResult, Row(null) :: Nil) assert(nullResult.schema === new StructType().add("nullTime", TimeType)) // TODO: Input parameter of UDF is null val nullInput = Seq(null.asInstanceOf[java.time.LocalTime]).toDF("nullTime") val constDuration = udf((_: java.time.LocalTime) => - java.time.LocalTime.parse("01:00:00.000000")) - val constResult = nullInput.select(constDuration($"nullTime").as("oneHour")) - checkAnswer(constResult, Row(java.time.LocalTime.parse("01:00:00.000000")) :: Nil) - assert(constResult.schema === new StructType().add("oneHour", TimeType)) + java.time.LocalTime.parse(mockTimeStr)) + val constResult = nullInput.select(constDuration($"nullTime").as("zeroHour")) + checkAnswer(constResult, Row(java.time.LocalTime.parse(mockTimeStr)) :: Nil) + assert(constResult.schema === new StructType().add("zeroHour", TimeType)) // TODO: Error in the conversion of UDF result to the internal representation of time val overflowFunc = udf((l: java.time.LocalTime) => l.plusDays(Long.MaxValue)) val e = intercept[SparkException] { - input.select(overflowFunc($"dateTime")).collect() + input.select(overflowFunc($"currentTime")).collect() } assert(e.getCondition == "FAILED_EXECUTE_UDF") assert(e.getCause.isInstanceOf[java.lang.ArithmeticException]) From c63acf24aba340d013331337f7715235f05b317d Mon Sep 17 00:00:00 2001 From: calilisantos Date: Thu, 6 Mar 2025 20:34:50 -0300 Subject: [PATCH 3/9] test(TimeType): adjust error case --- sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala index e478ec211f53a..bbc2f3f13b3f6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala @@ -1219,7 +1219,7 @@ class UDFSuite extends QueryTest with SharedSparkSession { checkAnswer(constResult, Row(java.time.LocalTime.parse(mockTimeStr)) :: Nil) assert(constResult.schema === new StructType().add("zeroHour", TimeType)) // TODO: Error in the conversion of UDF result to the internal representation of time - val overflowFunc = udf((l: java.time.LocalTime) => l.plusDays(Long.MaxValue)) + val overflowFunc = udf((l: java.time.LocalTime) => l.plusHours(Long.MaxValue)) val e = intercept[SparkException] { input.select(overflowFunc($"currentTime")).collect() } From 1f45c8ec1fb6062a120d2e6d8861da238d5fc404 Mon Sep 17 00:00:00 2001 From: calilisantos Date: Thu, 6 Mar 2025 21:25:58 -0300 Subject: [PATCH 4/9] test(TimeType): TimeType call --- .../src/test/scala/org/apache/spark/sql/UDFSuite.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala index bbc2f3f13b3f6..73a1839ba82d9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala @@ -1203,21 +1203,21 @@ class UDFSuite extends QueryTest with SharedSparkSession { // Regular case val input = Seq(java.time.LocalTime.parse(mockTimeStr)).toDF("currentTime") val plusHour = udf((l: java.time.LocalTime) => l.plusHours(1)) - val result = input.select(plusHour($"currentTime").cast(TimeType).as("newTime")) + val result = input.select(plusHour($"currentTime").cast(TimeType()).as("newTime")) checkAnswer(result, Row(java.time.LocalTime.parse("01:00:00.000000")) :: Nil) - assert(result.schema === new StructType().add("newTime", TimeType)) + assert(result.schema === new StructType().add("newTime", TimeType())) // VALIDAR: UDF produces `null` val nullFunc = udf((_: java.time.LocalTime) => null.asInstanceOf[java.time.LocalTime]) val nullResult = input.select(nullFunc($"currentTime").as("nullTime")) checkAnswer(nullResult, Row(null) :: Nil) - assert(nullResult.schema === new StructType().add("nullTime", TimeType)) + assert(nullResult.schema === new StructType().add("nullTime", TimeType())) // TODO: Input parameter of UDF is null val nullInput = Seq(null.asInstanceOf[java.time.LocalTime]).toDF("nullTime") val constDuration = udf((_: java.time.LocalTime) => java.time.LocalTime.parse(mockTimeStr)) val constResult = nullInput.select(constDuration($"nullTime").as("zeroHour")) checkAnswer(constResult, Row(java.time.LocalTime.parse(mockTimeStr)) :: Nil) - assert(constResult.schema === new StructType().add("zeroHour", TimeType)) + assert(constResult.schema === new StructType().add("zeroHour", TimeType())) // TODO: Error in the conversion of UDF result to the internal representation of time val overflowFunc = udf((l: java.time.LocalTime) => l.plusHours(Long.MaxValue)) val e = intercept[SparkException] { From 539c8891dd9822441f69da2c1a053b5bb559662e Mon Sep 17 00:00:00 2001 From: calilisantos Date: Thu, 6 Mar 2025 23:17:20 -0300 Subject: [PATCH 5/9] test(TimeType): clean comments --- .../src/test/scala/org/apache/spark/sql/UDFSuite.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala index 73a1839ba82d9..6e4f6ab1c8e4d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala @@ -1198,27 +1198,27 @@ class UDFSuite extends QueryTest with SharedSparkSession { } test("SPARK-51402: Test TimeType in UDF") { - // Constant + // Mocks val mockTimeStr = "00:00:00.000000" - // Regular case val input = Seq(java.time.LocalTime.parse(mockTimeStr)).toDF("currentTime") + // Regular case val plusHour = udf((l: java.time.LocalTime) => l.plusHours(1)) val result = input.select(plusHour($"currentTime").cast(TimeType()).as("newTime")) checkAnswer(result, Row(java.time.LocalTime.parse("01:00:00.000000")) :: Nil) assert(result.schema === new StructType().add("newTime", TimeType())) - // VALIDAR: UDF produces `null` + // UDF produces `null` val nullFunc = udf((_: java.time.LocalTime) => null.asInstanceOf[java.time.LocalTime]) val nullResult = input.select(nullFunc($"currentTime").as("nullTime")) checkAnswer(nullResult, Row(null) :: Nil) assert(nullResult.schema === new StructType().add("nullTime", TimeType())) - // TODO: Input parameter of UDF is null + // Input parameter of UDF is null val nullInput = Seq(null.asInstanceOf[java.time.LocalTime]).toDF("nullTime") val constDuration = udf((_: java.time.LocalTime) => java.time.LocalTime.parse(mockTimeStr)) val constResult = nullInput.select(constDuration($"nullTime").as("zeroHour")) checkAnswer(constResult, Row(java.time.LocalTime.parse(mockTimeStr)) :: Nil) assert(constResult.schema === new StructType().add("zeroHour", TimeType())) - // TODO: Error in the conversion of UDF result to the internal representation of time + // Error in the conversion of UDF result to the internal representation of time val overflowFunc = udf((l: java.time.LocalTime) => l.plusHours(Long.MaxValue)) val e = intercept[SparkException] { input.select(overflowFunc($"currentTime")).collect() From d20b49e6fd82967c4d702da8b33888327891e653 Mon Sep 17 00:00:00 2001 From: calilisantos Date: Fri, 7 Mar 2025 11:32:24 -0300 Subject: [PATCH 6/9] test(TimeType): change error case --- .../src/test/scala/org/apache/spark/sql/UDFSuite.scala | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala index 6e4f6ab1c8e4d..8da43113a5af9 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala @@ -1219,12 +1219,11 @@ class UDFSuite extends QueryTest with SharedSparkSession { checkAnswer(constResult, Row(java.time.LocalTime.parse(mockTimeStr)) :: Nil) assert(constResult.schema === new StructType().add("zeroHour", TimeType())) // Error in the conversion of UDF result to the internal representation of time - val overflowFunc = udf((l: java.time.LocalTime) => l.plusHours(Long.MaxValue)) + val invalidFunc = udf((l: java.time.LocalTime) => l.plusHours("Zero").toLong) val e = intercept[SparkException] { - input.select(overflowFunc($"currentTime")).collect() + input.select(invalidFunc($"currentTime")).collect() } - assert(e.getCondition == "FAILED_EXECUTE_UDF") - assert(e.getCause.isInstanceOf[java.lang.ArithmeticException]) + assert(e.getCause.isInstanceOf[java.lang.NumberFormatException]) } test("char/varchar as UDF return type") { From fd27d5c5f88c88ba6f73257d54fb47825f875434 Mon Sep 17 00:00:00 2001 From: calilisantos Date: Fri, 7 Mar 2025 12:02:41 -0300 Subject: [PATCH 7/9] test(TimeType): fix error case --- sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala index 8da43113a5af9..efc492efe272f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala @@ -1219,7 +1219,7 @@ class UDFSuite extends QueryTest with SharedSparkSession { checkAnswer(constResult, Row(java.time.LocalTime.parse(mockTimeStr)) :: Nil) assert(constResult.schema === new StructType().add("zeroHour", TimeType())) // Error in the conversion of UDF result to the internal representation of time - val invalidFunc = udf((l: java.time.LocalTime) => l.plusHours("Zero").toLong) + val invalidFunc = udf((l: java.time.LocalTime) => "Zero".toLong) val e = intercept[SparkException] { input.select(invalidFunc($"currentTime")).collect() } From b65341a7798b9b59f06d8b1fb37b9eac51d2ebaf Mon Sep 17 00:00:00 2001 From: calilisantos Date: Sun, 9 Mar 2025 11:14:19 -0300 Subject: [PATCH 8/9] test(TimeType): change regular case --- sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala index efc492efe272f..adaaf815ec812 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala @@ -1203,7 +1203,7 @@ class UDFSuite extends QueryTest with SharedSparkSession { val input = Seq(java.time.LocalTime.parse(mockTimeStr)).toDF("currentTime") // Regular case val plusHour = udf((l: java.time.LocalTime) => l.plusHours(1)) - val result = input.select(plusHour($"currentTime").cast(TimeType()).as("newTime")) + val result = input.select(plusHour($"currentTime").as("newTime")) checkAnswer(result, Row(java.time.LocalTime.parse("01:00:00.000000")) :: Nil) assert(result.schema === new StructType().add("newTime", TimeType())) // UDF produces `null` @@ -1219,7 +1219,7 @@ class UDFSuite extends QueryTest with SharedSparkSession { checkAnswer(constResult, Row(java.time.LocalTime.parse(mockTimeStr)) :: Nil) assert(constResult.schema === new StructType().add("zeroHour", TimeType())) // Error in the conversion of UDF result to the internal representation of time - val invalidFunc = udf((l: java.time.LocalTime) => "Zero".toLong) + val invalidFunc = udf((l: java.time.LocalTime) => l.plusHours("Zero").toLong) val e = intercept[SparkException] { input.select(invalidFunc($"currentTime")).collect() } From a3a3c764fd0c9e5d06cb96cba6a05c2f396c1024 Mon Sep 17 00:00:00 2001 From: calilisantos Date: Sun, 9 Mar 2025 11:47:54 -0300 Subject: [PATCH 9/9] test(TimeType): new fix in error case --- sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala index adaaf815ec812..da524abf77578 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala @@ -1219,7 +1219,7 @@ class UDFSuite extends QueryTest with SharedSparkSession { checkAnswer(constResult, Row(java.time.LocalTime.parse(mockTimeStr)) :: Nil) assert(constResult.schema === new StructType().add("zeroHour", TimeType())) // Error in the conversion of UDF result to the internal representation of time - val invalidFunc = udf((l: java.time.LocalTime) => l.plusHours("Zero").toLong) + val invalidFunc = udf((l: java.time.LocalTime) => "Zero".toLong) val e = intercept[SparkException] { input.select(invalidFunc($"currentTime")).collect() }