From 19e9cac2a6ea615dc3adf39dbffe05d5d257c398 Mon Sep 17 00:00:00 2001 From: Alan Tang Date: Sun, 20 Jul 2025 15:12:56 +0800 Subject: [PATCH 1/7] feat(spark): implement Spark datetime function last_day Signed-off-by: Alan Tang --- Cargo.lock | 1 + datafusion/spark/Cargo.toml | 2 + .../spark/src/function/datetime/last_day.rs | 145 ++++++++++++++++++ datafusion/spark/src/function/datetime/mod.rs | 17 +- .../test_files/spark/datetime/last_day.slt | 7 +- 5 files changed, 168 insertions(+), 4 deletions(-) create mode 100644 datafusion/spark/src/function/datetime/last_day.rs diff --git a/Cargo.lock b/Cargo.lock index 45a8333a1e68..deb70134bb38 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2607,6 +2607,7 @@ name = "datafusion-spark" version = "49.0.0" dependencies = [ "arrow", + "chrono", "criterion", "datafusion-catalog", "datafusion-common", diff --git a/datafusion/spark/Cargo.toml b/datafusion/spark/Cargo.toml index 9243103fff7d..cc8946bea94d 100644 --- a/datafusion/spark/Cargo.toml +++ b/datafusion/spark/Cargo.toml @@ -44,10 +44,12 @@ datafusion-expr = { workspace = true } datafusion-functions = { workspace = true, features = ["crypto_expressions"] } datafusion-macros = { workspace = true } log = { workspace = true } +chrono = { workspace = true } [dev-dependencies] criterion = { workspace = true } rand = { workspace = true } +chrono = { workspace = true } [[bench]] harness = false diff --git a/datafusion/spark/src/function/datetime/last_day.rs b/datafusion/spark/src/function/datetime/last_day.rs new file mode 100644 index 000000000000..9f5c4b959ad1 --- /dev/null +++ b/datafusion/spark/src/function/datetime/last_day.rs @@ -0,0 +1,145 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::any::Any; +use std::sync::Arc; + +use arrow::array::{ArrayRef, AsArray, Date32Array}; +use arrow::datatypes::{DataType, Date32Type}; +use chrono::{Datelike, Duration, NaiveDate}; +use datafusion_common::types::NativeType; +use datafusion_common::{exec_datafusion_err, exec_err, plan_err, Result, ScalarValue}; +use datafusion_expr::{ + ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, +}; + +#[derive(Debug)] +pub struct SparkLastDay { + signature: Signature, +} + +impl Default for SparkLastDay { + fn default() -> Self { + Self::new() + } +} + +impl SparkLastDay { + pub fn new() -> Self { + Self { + signature: Signature::user_defined(Volatility::Immutable), + } + } +} + +impl ScalarUDFImpl for SparkLastDay { + fn as_any(&self) -> &dyn Any { + self + } + + fn name(&self) -> &str { + "spark_last_day" + } + + fn signature(&self) -> &Signature { + &self.signature + } + + fn return_type(&self, _arg_types: &[DataType]) -> Result { + Ok(DataType::Date32) + } + + fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { + let ScalarFunctionArgs { args, .. } = args; + let [arg] = args.as_slice() else { + return exec_err!( + "Spark `last_day` function requires 1 argument, got {}", + args.len() + ); + }; + match arg { + ColumnarValue::Scalar(ScalarValue::Date32(days)) => { + if let Some(days) = days { + Ok(ColumnarValue::Scalar(ScalarValue::Date32(Some( + spark_last_day(*days)?, + )))) + } else { + Ok(ColumnarValue::Scalar(ScalarValue::Date32(None))) + } + } + ColumnarValue::Array(array) => { + let result = match array.data_type() { + DataType::Date32 => { + let result: Date32Array = array + .as_primitive::() + .try_unary(spark_last_day)? + .with_data_type(DataType::Date32); + Ok(Arc::new(result) as ArrayRef) + } + other => { + exec_err!("Unsupported data type {other:?} for Spark function `last_day`") + } + }?; + Ok(ColumnarValue::Array(result)) + } + other => exec_err!("Unsupported arg {other:?} for Spark function `last_day"), + } + } + + fn coerce_types(&self, arg_types: &[DataType]) -> Result> { + if arg_types.len() != 1 { + return exec_err!( + "Spark `last_day` function requires 1 argument, got {}", + arg_types.len() + ); + } + + let current_native_type: NativeType = (&arg_types[0]).into(); + if matches!(current_native_type, NativeType::Date) + || matches!(current_native_type, NativeType::String) + || matches!(current_native_type, NativeType::Null) + { + Ok(vec![DataType::Date32]) + } else { + plan_err!( + "The first argument of the Spark `last_day` function can only be a date or string, but got {}", &arg_types[0] + ) + } + } +} + +fn spark_last_day(days: i32) -> Result { + let date = Date32Type::to_naive_date(days); + + let (year, month) = (date.year(), date.month()); + let (next_year, next_month) = if month == 12 { + (year + 1, 1) + } else { + (year, month + 1) + }; + + let first_day_next_month = NaiveDate::from_ymd_opt(next_year, next_month, 1) + .ok_or_else(|| { + exec_datafusion_err!( + "Spark `last_day`: Unable to parse date from {next_year}, {next_month}, 1" + ) + })?; + + Ok(Date32Type::from_naive_date( + first_day_next_month - Duration::days(1), + )) +} diff --git a/datafusion/spark/src/function/datetime/mod.rs b/datafusion/spark/src/function/datetime/mod.rs index a87df9a2c87a..3bde960ae012 100644 --- a/datafusion/spark/src/function/datetime/mod.rs +++ b/datafusion/spark/src/function/datetime/mod.rs @@ -15,11 +15,24 @@ // specific language governing permissions and limitations // under the License. +pub mod last_day; + use datafusion_expr::ScalarUDF; +use datafusion_functions::make_udf_function; use std::sync::Arc; -pub mod expr_fn {} +make_udf_function!(last_day::SparkLastDay, last_day); + +pub mod expr_fn { + use datafusion_functions::export_functions; + + export_functions!(( + last_day, + "Returns the last day of the month which the date belongs to.", + arg1 + )); +} pub fn functions() -> Vec> { - vec![] + vec![last_day()] } diff --git a/datafusion/sqllogictest/test_files/spark/datetime/last_day.slt b/datafusion/sqllogictest/test_files/spark/datetime/last_day.slt index 29fb9ca11b0e..9c5f3d63b721 100644 --- a/datafusion/sqllogictest/test_files/spark/datetime/last_day.slt +++ b/datafusion/sqllogictest/test_files/spark/datetime/last_day.slt @@ -23,5 +23,8 @@ ## Original Query: SELECT last_day('2009-01-12'); ## PySpark 3.5.5 Result: {'last_day(2009-01-12)': datetime.date(2009, 1, 31), 'typeof(last_day(2009-01-12))': 'date', 'typeof(2009-01-12)': 'string'} -#query -#SELECT last_day('2009-01-12'::string); + +query D +SELECT last_day('2009-01-12'::string); +---- +2009-01-31 \ No newline at end of file From 92d7d88698cfa7e5b904ae62fea4c1a0651b8696 Mon Sep 17 00:00:00 2001 From: Alan Tang Date: Tue, 22 Jul 2025 08:40:44 +0800 Subject: [PATCH 2/7] chore: fix the export function name Signed-off-by: Alan Tang --- datafusion/spark/src/function/datetime/last_day.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/spark/src/function/datetime/last_day.rs b/datafusion/spark/src/function/datetime/last_day.rs index 9f5c4b959ad1..31fe0f6450ae 100644 --- a/datafusion/spark/src/function/datetime/last_day.rs +++ b/datafusion/spark/src/function/datetime/last_day.rs @@ -52,7 +52,7 @@ impl ScalarUDFImpl for SparkLastDay { } fn name(&self) -> &str { - "spark_last_day" + "last_day" } fn signature(&self) -> &Signature { From f30db215112c8b0da99789aa6ae7e4e7756ddef3 Mon Sep 17 00:00:00 2001 From: Alan Tang Date: Tue, 22 Jul 2025 08:52:38 +0800 Subject: [PATCH 3/7] chore: Fix Cargo.toml formatting Signed-off-by: Alan Tang --- datafusion/spark/Cargo.toml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/datafusion/spark/Cargo.toml b/datafusion/spark/Cargo.toml index cc8946bea94d..0a3fc0c803cd 100644 --- a/datafusion/spark/Cargo.toml +++ b/datafusion/spark/Cargo.toml @@ -37,6 +37,7 @@ name = "datafusion_spark" [dependencies] arrow = { workspace = true } +chrono = { workspace = true } datafusion-catalog = { workspace = true } datafusion-common = { workspace = true } datafusion-execution = { workspace = true } @@ -44,12 +45,11 @@ datafusion-expr = { workspace = true } datafusion-functions = { workspace = true, features = ["crypto_expressions"] } datafusion-macros = { workspace = true } log = { workspace = true } -chrono = { workspace = true } [dev-dependencies] +chrono = { workspace = true } criterion = { workspace = true } rand = { workspace = true } -chrono = { workspace = true } [[bench]] harness = false From 108ef718d0772569689b71b4b8836a8814d6ec98 Mon Sep 17 00:00:00 2001 From: Alan Tang Date: Tue, 22 Jul 2025 10:18:42 +0800 Subject: [PATCH 4/7] test: add more tests for spark function last_day Signed-off-by: Alan Tang --- .../test_files/spark/datetime/last_day.slt | 78 ++++++++++++++++++- 1 file changed, 74 insertions(+), 4 deletions(-) diff --git a/datafusion/sqllogictest/test_files/spark/datetime/last_day.slt b/datafusion/sqllogictest/test_files/spark/datetime/last_day.slt index 9c5f3d63b721..ed9eb90fdef1 100644 --- a/datafusion/sqllogictest/test_files/spark/datetime/last_day.slt +++ b/datafusion/sqllogictest/test_files/spark/datetime/last_day.slt @@ -21,10 +21,80 @@ # For more information, please see: # https://github.com/apache/datafusion/issues/15914 -## Original Query: SELECT last_day('2009-01-12'); -## PySpark 3.5.5 Result: {'last_day(2009-01-12)': datetime.date(2009, 1, 31), 'typeof(last_day(2009-01-12))': 'date', 'typeof(2009-01-12)': 'string'} - query D SELECT last_day('2009-01-12'::string); ---- -2009-01-31 \ No newline at end of file +2009-01-31 + + +query D +SELECT last_day('2015-02-28'::string); +---- +2015-02-28 + +query D +SELECT last_day('2015-03-27'::string); +---- +2015-03-31 + +query D +SELECT last_day('2015-04-26'::string); +---- +2015-04-30 + +query D +SELECT last_day('2015-05-25'::string); +---- +2015-05-31 + +query D +SELECT last_day('2015-06-24'::string); +---- +2015-06-30 + +query D +SELECT last_day('2015-07-23'::string); +---- +2015-07-31 + +query D +SELECT last_day('2015-08-01'::string); +---- +2015-08-31 + +query D +SELECT last_day('2015-09-02'::string); +---- +2015-09-30 + +query D +SELECT last_day('2015-10-03'::string); +---- +2015-10-31 + +query D +SELECT last_day('2015-11-04'::string); +---- +2015-11-30 + +query D +SELECT last_day('2015-12-05'::string); +---- +2015-12-31 + + +query D +SELECT last_day('2016-01-06'::string); +---- +2016-01-31 + +query D +SELECT last_day('2016-02-07'::string); +---- +2016-02-29 + + +query ? +SELECT NULL; +---- +NULL From dedd4940ea605b191384254a02606b8509c7e914 Mon Sep 17 00:00:00 2001 From: Alan Tang Date: Thu, 24 Jul 2025 08:55:25 +0800 Subject: [PATCH 5/7] feat(spark): set the signature to be taking exactly one Date32 type Signed-off-by: Alan Tang --- .../spark/src/function/datetime/last_day.rs | 34 ++++--------------- .../test_files/spark/datetime/last_day.slt | 34 +++++++++---------- 2 files changed, 24 insertions(+), 44 deletions(-) diff --git a/datafusion/spark/src/function/datetime/last_day.rs b/datafusion/spark/src/function/datetime/last_day.rs index 31fe0f6450ae..5a748816f40d 100644 --- a/datafusion/spark/src/function/datetime/last_day.rs +++ b/datafusion/spark/src/function/datetime/last_day.rs @@ -21,8 +21,7 @@ use std::sync::Arc; use arrow::array::{ArrayRef, AsArray, Date32Array}; use arrow::datatypes::{DataType, Date32Type}; use chrono::{Datelike, Duration, NaiveDate}; -use datafusion_common::types::NativeType; -use datafusion_common::{exec_datafusion_err, exec_err, plan_err, Result, ScalarValue}; +use datafusion_common::{exec_datafusion_err, internal_err, Result, ScalarValue}; use datafusion_expr::{ ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl, Signature, Volatility, }; @@ -41,7 +40,7 @@ impl Default for SparkLastDay { impl SparkLastDay { pub fn new() -> Self { Self { - signature: Signature::user_defined(Volatility::Immutable), + signature: Signature::exact(vec![DataType::Date32], Volatility::Immutable), } } } @@ -66,7 +65,7 @@ impl ScalarUDFImpl for SparkLastDay { fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result { let ScalarFunctionArgs { args, .. } = args; let [arg] = args.as_slice() else { - return exec_err!( + return internal_err!( "Spark `last_day` function requires 1 argument, got {}", args.len() ); @@ -91,33 +90,14 @@ impl ScalarUDFImpl for SparkLastDay { Ok(Arc::new(result) as ArrayRef) } other => { - exec_err!("Unsupported data type {other:?} for Spark function `last_day`") + internal_err!("Unsupported data type {other:?} for Spark function `last_day`") } }?; Ok(ColumnarValue::Array(result)) } - other => exec_err!("Unsupported arg {other:?} for Spark function `last_day"), - } - } - - fn coerce_types(&self, arg_types: &[DataType]) -> Result> { - if arg_types.len() != 1 { - return exec_err!( - "Spark `last_day` function requires 1 argument, got {}", - arg_types.len() - ); - } - - let current_native_type: NativeType = (&arg_types[0]).into(); - if matches!(current_native_type, NativeType::Date) - || matches!(current_native_type, NativeType::String) - || matches!(current_native_type, NativeType::Null) - { - Ok(vec![DataType::Date32]) - } else { - plan_err!( - "The first argument of the Spark `last_day` function can only be a date or string, but got {}", &arg_types[0] - ) + other => { + internal_err!("Unsupported arg {other:?} for Spark function `last_day") + } } } } diff --git a/datafusion/sqllogictest/test_files/spark/datetime/last_day.slt b/datafusion/sqllogictest/test_files/spark/datetime/last_day.slt index ed9eb90fdef1..30cc03e07c7b 100644 --- a/datafusion/sqllogictest/test_files/spark/datetime/last_day.slt +++ b/datafusion/sqllogictest/test_files/spark/datetime/last_day.slt @@ -22,79 +22,79 @@ # https://github.com/apache/datafusion/issues/15914 query D -SELECT last_day('2009-01-12'::string); +SELECT last_day('2009-01-12'::DATE); ---- 2009-01-31 query D -SELECT last_day('2015-02-28'::string); +SELECT last_day('2015-02-28'::DATE); ---- 2015-02-28 query D -SELECT last_day('2015-03-27'::string); +SELECT last_day('2015-03-27'::DATE); ---- 2015-03-31 query D -SELECT last_day('2015-04-26'::string); +SELECT last_day('2015-04-26'::DATE); ---- 2015-04-30 query D -SELECT last_day('2015-05-25'::string); +SELECT last_day('2015-05-25'::DATE); ---- 2015-05-31 query D -SELECT last_day('2015-06-24'::string); +SELECT last_day('2015-06-24'::DATE); ---- 2015-06-30 query D -SELECT last_day('2015-07-23'::string); +SELECT last_day('2015-07-23'::DATE); ---- 2015-07-31 query D -SELECT last_day('2015-08-01'::string); +SELECT last_day('2015-08-01'::DATE); ---- 2015-08-31 query D -SELECT last_day('2015-09-02'::string); +SELECT last_day('2015-09-02'::DATE); ---- 2015-09-30 query D -SELECT last_day('2015-10-03'::string); +SELECT last_day('2015-10-03'::DATE); ---- 2015-10-31 query D -SELECT last_day('2015-11-04'::string); +SELECT last_day('2015-11-04'::DATE); ---- 2015-11-30 query D -SELECT last_day('2015-12-05'::string); +SELECT last_day('2015-12-05'::DATE); ---- 2015-12-31 query D -SELECT last_day('2016-01-06'::string); +SELECT last_day('2016-01-06'::DATE); ---- 2016-01-31 query D -SELECT last_day('2016-02-07'::string); +SELECT last_day('2016-02-07'::DATE); ---- 2016-02-29 -query ? -SELECT NULL; +query D +SELECT last_day(null::DATE); ---- -NULL +NULL \ No newline at end of file From ec4f37a43308f48e2fc83782448f0b449823c5d7 Mon Sep 17 00:00:00 2001 From: Alan Tang Date: Thu, 24 Jul 2025 15:01:07 +0800 Subject: [PATCH 6/7] test(spark): add more bad cases Signed-off-by: Alan Tang --- .../test_files/spark/datetime/last_day.slt | 21 ++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/datafusion/sqllogictest/test_files/spark/datetime/last_day.slt b/datafusion/sqllogictest/test_files/spark/datetime/last_day.slt index 30cc03e07c7b..da3dd9711b94 100644 --- a/datafusion/sqllogictest/test_files/spark/datetime/last_day.slt +++ b/datafusion/sqllogictest/test_files/spark/datetime/last_day.slt @@ -97,4 +97,23 @@ SELECT last_day('2016-02-07'::DATE); query D SELECT last_day(null::DATE); ---- -NULL \ No newline at end of file +NULL + + +statement error Failed to coerce arguments to satisfy a call to 'last_day' function +select last_day('foo'); + + +statement error Failed to coerce arguments to satisfy a call to 'last_day' function +select last_day(123); + + +statement error 'last_day' does not support zero arguments +select last_day(); + +statement error Failed to coerce arguments to satisfy a call to 'last_day' function +select last_day(last_day('2016-02-07'::string, 'foo')); + +statement error Failed to coerce arguments to satisfy a call to 'last_day' function +select last_day(last_day('2016-02-31'::string)); + From 2f66bc89572886e16c7ffff3dec8192187bb3daa Mon Sep 17 00:00:00 2001 From: Alan Tang Date: Thu, 24 Jul 2025 17:39:47 +0800 Subject: [PATCH 7/7] chore: clean up redundant package Signed-off-by: Alan Tang --- datafusion/spark/Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/datafusion/spark/Cargo.toml b/datafusion/spark/Cargo.toml index 0a3fc0c803cd..bc7ae380f793 100644 --- a/datafusion/spark/Cargo.toml +++ b/datafusion/spark/Cargo.toml @@ -47,7 +47,6 @@ datafusion-macros = { workspace = true } log = { workspace = true } [dev-dependencies] -chrono = { workspace = true } criterion = { workspace = true } rand = { workspace = true }