Skip to content

Commit 736fa7c

Browse files
authored
feat(datafusion-spark): implement spark compatible unhex function (#19909)
## Which issue does this PR close? <!-- We generally require a GitHub issue to be filed for all bug fixes and enhancements and this helps us generate change logs for our releases. You can link an issue to this PR using the GitHub syntax. For example `Closes #123` indicates that this PR will close issue #123. --> Part of: #15914 ## Rationale for this change <!-- Why are you proposing this change? If this is already explained clearly in the issue then this section is not needed. Explaining clearly why changes are proposed helps reviewers understand your changes and offer better suggestions for fixes. --> Implement spark compatible unhex functions: https://spark.apache.org/docs/latest/api/sql/index.html#unhex ## What changes are included in this PR? <!-- There is no need to duplicate the description in the issue here but it is sometimes worth providing a summary of the individual changes in this PR. --> ## Are these changes tested? Yes. UTs and SLT added. <!-- We typically require tests for all PRs in order to: 1. Prevent the code from being accidentally broken by subsequent changes 2. Serve as another way to document the expected behavior of the code If tests are not included in your PR, please explain why (for example, are they covered by existing tests)? --> ## Are there any user-facing changes? No. <!-- If there are user-facing changes then we may require documentation to be updated before approving the PR. --> <!-- If there are any breaking changes to public APIs, please add the `api change` label. -->
1 parent 7cf63f1 commit 736fa7c

File tree

5 files changed

+468
-0
lines changed

5 files changed

+468
-0
lines changed

datafusion/spark/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,3 +79,7 @@ name = "slice"
7979
[[bench]]
8080
harness = false
8181
name = "substring"
82+
83+
[[bench]]
84+
harness = false
85+
name = "unhex"

datafusion/spark/benches/unhex.rs

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
extern crate criterion;
19+
20+
use arrow::array::{
21+
Array, LargeStringArray, LargeStringBuilder, StringArray, StringBuilder,
22+
StringViewArray, StringViewBuilder,
23+
};
24+
use arrow::datatypes::{DataType, Field};
25+
use criterion::{Criterion, criterion_group, criterion_main};
26+
use datafusion_common::config::ConfigOptions;
27+
use datafusion_expr::{ColumnarValue, ScalarFunctionArgs, ScalarUDFImpl};
28+
use datafusion_spark::function::math::unhex::SparkUnhex;
29+
use rand::rngs::StdRng;
30+
use rand::{Rng, SeedableRng};
31+
use std::hint::black_box;
32+
use std::sync::Arc;
33+
34+
fn generate_hex_string_data(size: usize, null_density: f32) -> StringArray {
35+
let mut rng = StdRng::seed_from_u64(42);
36+
let mut builder = StringBuilder::with_capacity(size, 0);
37+
let hex_chars = b"0123456789abcdefABCDEF";
38+
39+
for _ in 0..size {
40+
if rng.random::<f32>() < null_density {
41+
builder.append_null();
42+
} else {
43+
let len = rng.random_range::<usize, _>(2..=100);
44+
let s: String = std::iter::repeat_with(|| {
45+
hex_chars[rng.random_range(0..hex_chars.len())] as char
46+
})
47+
.take(len)
48+
.collect();
49+
builder.append_value(&s);
50+
}
51+
}
52+
builder.finish()
53+
}
54+
55+
fn generate_hex_large_string_data(size: usize, null_density: f32) -> LargeStringArray {
56+
let mut rng = StdRng::seed_from_u64(42);
57+
let mut builder = LargeStringBuilder::with_capacity(size, 0);
58+
let hex_chars = b"0123456789abcdefABCDEF";
59+
60+
for _ in 0..size {
61+
if rng.random::<f32>() < null_density {
62+
builder.append_null();
63+
} else {
64+
let len = rng.random_range::<usize, _>(2..=100);
65+
let s: String = std::iter::repeat_with(|| {
66+
hex_chars[rng.random_range(0..hex_chars.len())] as char
67+
})
68+
.take(len)
69+
.collect();
70+
builder.append_value(&s);
71+
}
72+
}
73+
builder.finish()
74+
}
75+
76+
fn generate_hex_utf8view_data(size: usize, null_density: f32) -> StringViewArray {
77+
let mut rng = StdRng::seed_from_u64(42);
78+
let mut builder = StringViewBuilder::with_capacity(size);
79+
let hex_chars = b"0123456789abcdefABCDEF";
80+
81+
for _ in 0..size {
82+
if rng.random::<f32>() < null_density {
83+
builder.append_null();
84+
} else {
85+
let len = rng.random_range::<usize, _>(2..=100);
86+
let s: String = std::iter::repeat_with(|| {
87+
hex_chars[rng.random_range(0..hex_chars.len())] as char
88+
})
89+
.take(len)
90+
.collect();
91+
builder.append_value(&s);
92+
}
93+
}
94+
builder.finish()
95+
}
96+
97+
fn run_benchmark(c: &mut Criterion, name: &str, size: usize, array: Arc<dyn Array>) {
98+
let unhex_func = SparkUnhex::new();
99+
let args = vec![ColumnarValue::Array(array)];
100+
let arg_fields: Vec<_> = args
101+
.iter()
102+
.enumerate()
103+
.map(|(idx, arg)| Field::new(format!("arg_{idx}"), arg.data_type(), true).into())
104+
.collect();
105+
let config_options = Arc::new(ConfigOptions::default());
106+
107+
c.bench_function(&format!("{name}/size={size}"), |b| {
108+
b.iter(|| {
109+
black_box(
110+
unhex_func
111+
.invoke_with_args(ScalarFunctionArgs {
112+
args: args.clone(),
113+
arg_fields: arg_fields.clone(),
114+
number_rows: size,
115+
return_field: Arc::new(Field::new("f", DataType::Binary, true)),
116+
config_options: Arc::clone(&config_options),
117+
})
118+
.unwrap(),
119+
)
120+
})
121+
});
122+
}
123+
124+
fn criterion_benchmark(c: &mut Criterion) {
125+
let sizes = vec![1024, 4096, 8192];
126+
let null_density = 0.1;
127+
128+
// Benchmark with hex string
129+
for &size in &sizes {
130+
let data = generate_hex_string_data(size, null_density);
131+
run_benchmark(c, "unhex_utf8", size, Arc::new(data));
132+
}
133+
134+
// Benchmark with hex large string
135+
for &size in &sizes {
136+
let data = generate_hex_large_string_data(size, null_density);
137+
run_benchmark(c, "unhex_large_utf8", size, Arc::new(data));
138+
}
139+
140+
// Benchmark with hex Utf8View
141+
for &size in &sizes {
142+
let data = generate_hex_utf8view_data(size, null_density);
143+
run_benchmark(c, "unhex_utf8view", size, Arc::new(data));
144+
}
145+
}
146+
147+
criterion_group!(benches, criterion_benchmark);
148+
criterion_main!(benches);

datafusion/spark/src/function/math/mod.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ pub mod hex;
2222
pub mod modulus;
2323
pub mod rint;
2424
pub mod trigonometry;
25+
pub mod unhex;
2526
pub mod width_bucket;
2627

2728
use datafusion_expr::ScalarUDF;
@@ -35,6 +36,7 @@ make_udf_function!(hex::SparkHex, hex);
3536
make_udf_function!(modulus::SparkMod, modulus);
3637
make_udf_function!(modulus::SparkPmod, pmod);
3738
make_udf_function!(rint::SparkRint, rint);
39+
make_udf_function!(unhex::SparkUnhex, unhex);
3840
make_udf_function!(width_bucket::SparkWidthBucket, width_bucket);
3941
make_udf_function!(trigonometry::SparkCsc, csc);
4042
make_udf_function!(trigonometry::SparkSec, sec);
@@ -57,6 +59,7 @@ pub mod expr_fn {
5759
"Returns the double value that is closest in value to the argument and is equal to a mathematical integer.",
5860
arg1
5961
));
62+
export_functions!((unhex, "Converts hexadecimal string to binary.", arg1));
6063
export_functions!((width_bucket, "Returns the bucket number into which the value of this expression would fall after being evaluated.", arg1 arg2 arg3 arg4));
6164
export_functions!((csc, "Returns the cosecant of expr.", arg1));
6265
export_functions!((sec, "Returns the secant of expr.", arg1));
@@ -71,6 +74,7 @@ pub fn functions() -> Vec<Arc<ScalarUDF>> {
7174
modulus(),
7275
pmod(),
7376
rint(),
77+
unhex(),
7478
width_bucket(),
7579
csc(),
7680
sec(),

0 commit comments

Comments
 (0)