diff --git a/native/Cargo.toml b/native/Cargo.toml index f244118538..55e96b4851 100644 --- a/native/Cargo.toml +++ b/native/Cargo.toml @@ -39,8 +39,8 @@ arrow-buffer = { version = "53.1.0" } arrow-data = { version = "53.1.0" } arrow-schema = { version = "53.1.0" } parquet = { version = "53.1.0", default-features = false, features = ["experimental"] } -datafusion-common = { git = "https://github.com/apache/datafusion.git", rev = "3b6aac2" } -datafusion = { git = "https://github.com/apache/datafusion.git", rev = "3b6aac2", default-features = false, features = ["unicode_expressions", "crypto_expressions", "parquet"] } +datafusion-common = { git = "https://github.com/apache/datafusion.git", rev = "3b6aac2", features = ["backtrace"] } +datafusion = { git = "https://github.com/apache/datafusion.git", rev = "3b6aac2", default-features = false, features = ["unicode_expressions", "crypto_expressions", "parquet", "backtrace"] } datafusion-functions = { git = "https://github.com/apache/datafusion.git", rev = "3b6aac2", features = ["crypto_expressions"] } datafusion-functions-nested = { git = "https://github.com/apache/datafusion.git", rev = "3b6aac2", default-features = false } datafusion-expr = { git = "https://github.com/apache/datafusion.git", rev = "3b6aac2", default-features = false } @@ -69,3 +69,6 @@ overflow-checks = false lto = "thin" codegen-units = 1 strip = "debuginfo" + +[profile.bench] +lto = false diff --git a/native/core/Cargo.toml b/native/core/Cargo.toml index daa0837c82..898183c606 100644 --- a/native/core/Cargo.toml +++ b/native/core/Cargo.toml @@ -130,3 +130,7 @@ harness = false [[bench]] name = "bloom_filter_agg" harness = false + +[[bench]] +name = "spatial_range_query" +harness = false diff --git a/native/core/benches/spatial_range_query.rs b/native/core/benches/spatial_range_query.rs new file mode 100644 index 0000000000..25731b6996 --- /dev/null +++ b/native/core/benches/spatial_range_query.rs @@ -0,0 +1,82 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use arrow_schema::DataType::Boolean; +use arrow_schema::{DataType, Fields}; +use std::sync::Arc; + +use criterion::{criterion_group, criterion_main, Criterion}; + +use comet::execution::datafusion::expressions::comet_scalar_funcs::create_comet_physical_fun; +use datafusion::prelude::ParquetReadOptions; +use datafusion::prelude::SessionContext; +use datafusion_expr::registry::FunctionRegistry; +use tokio::runtime::Runtime; + +fn register_spatial_udfs(ctx: &SessionContext) -> datafusion::error::Result<()> { + let state_ref = ctx.state_ref(); + let mut state = state_ref.write(); + + let udf_st_intersects_wkb = create_comet_physical_fun("st_intersects_wkb", Boolean, ctx)?; + state.register_udf(Arc::clone(&udf_st_intersects_wkb))?; + + let polygon_type = datafusion_comet_spark_expr::scalar_funcs::geometry_helpers::get_geometry_fields_polygon( + datafusion_comet_spark_expr::scalar_funcs::geometry_helpers::get_coordinate_fields()); + let polygon_struct = DataType::Struct(Fields::from(polygon_type)); + let udf_st_geomfromwkt = create_comet_physical_fun("st_geomfromwkt", polygon_struct, ctx)?; + state.register_udf(Arc::clone(&udf_st_geomfromwkt))?; + + Ok(()) +} + +async fn run_query() -> datafusion::error::Result<()> { + let ctx = SessionContext::new(); + let table_path = "/Users/bopeng/workspace/wherobots/engine-v2/test-data/spark-warehouse/overture-buildings-sampled-coalesce-20"; + ctx.register_parquet("test_table", table_path, ParquetReadOptions::default()).await?; + register_spatial_udfs(&ctx)?; + + // let wkt = "POLYGON ((-78.75 5.965754, -76.289063 12.554564, -67.148438 15.961329, -63.632813 27.683528, -75.234375 25.165173, -75.9375 32.842674, -69.609375 39.368279, -59.414063 43.580391, -47.109375 47.040182, -55.898438 56.944974, -61.523438 62.431074, -78.398438 64.320872, -80.15625 66.93006, -78.046875 69.900118, -92.460937 70.728979, -107.226563 70.844673, -121.289063 71.074056, -139.21875 71.413177, -156.09375 71.746432, -167.34375 71.856229, -170.859375 69.037142, -169.453125 66.652977, -169.453125 63.704722, -169.101562 58.995311, -175.429688 54.977614, -184.21875 52.268157, -181.40625 47.754098, -166.640625 51.618017, -148.359375 55.37911, -140.625 54.775346, -134.648438 45.828799, -131.835938 36.315125, -124.101563 21.289374, -111.09375 15.623037, -94.570313 10.141932, -78.75 5.965754))"; + let wkt = "POLYGON ((-124.4009 41.9983,-123.6237 42.0024,-123.1526 42.0126,-122.0073 42.0075,-121.2369 41.9962,-119.9982 41.9983,-120.0037 39.0021,-117.9575 37.5555,-116.3699 36.3594,-114.6368 35.0075,-114.6382 34.9659,-114.6286 34.9107,-114.6382 34.8758,-114.5970 34.8454,-114.5682 34.7890,-114.4968 34.7269,-114.4501 34.6648,-114.4597 34.6581,-114.4322 34.5869,-114.3787 34.5235,-114.3869 34.4601,-114.3361 34.4500,-114.3031 34.4375,-114.2674 34.4024,-114.1864 34.3559,-114.1383 34.3049,-114.1315 34.2561,-114.1651 34.2595,-114.2249 34.2044,-114.2221 34.1914,-114.2908 34.1720,-114.3237 34.1368,-114.3622 34.1186,-114.4089 34.1118,-114.4363 34.0856,-114.4336 34.0276,-114.4652 34.0117,-114.5119 33.9582,-114.5366 33.9308,-114.5091 33.9058,-114.5256 33.8613,-114.5215 33.8248,-114.5050 33.7597,-114.4940 33.7083,-114.5284 33.6832,-114.5242 33.6363,-114.5393 33.5895,-114.5242 33.5528,-114.5586 33.5311,-114.5778 33.5070,-114.6245 33.4418,-114.6506 33.4142,-114.7055 33.4039,-114.6973 33.3546,-114.7302 33.3041,-114.7206 33.2858,-114.6808 33.2754,-114.6698 33.2582,-114.6904 33.2467,-114.6794 33.1720,-114.7083 33.0904,-114.6918 33.0858,-114.6629 33.0328,-114.6451 33.0501,-114.6286 33.0305,-114.5888 33.0282,-114.5750 33.0351,-114.5174 33.0328,-114.4913 32.9718,-114.4775 32.9764,-114.4844 32.9372,-114.4679 32.8427,-114.5091 32.8161,-114.5311 32.7850,-114.5284 32.7573,-114.5641 32.7503,-114.6162 32.7353,-114.6986 32.7480,-114.7220 32.7191,-115.1944 32.6868,-117.3395 32.5121,-117.4823 32.7838,-117.5977 33.0501,-117.6814 33.2341,-118.0591 33.4578,-118.6290 33.5403,-118.7073 33.7928,-119.3706 33.9582,-120.0050 34.1925,-120.7164 34.2561,-120.9128 34.5360,-120.8427 34.9749,-121.1325 35.2131,-121.3220 35.5255,-121.8013 35.9691,-122.1446 36.2808,-122.1721 36.7268,-122.6871 37.2227,-122.8903 37.7783,-123.2378 37.8965,-123.3202 38.3449,-123.8338 38.7423,-123.9793 38.9946,-124.0329 39.3088,-124.0823 39.7642,-124.5314 40.1663,-124.6509 40.4658,-124.3144 41.0110,-124.3419 41.2386,-124.4545 41.7170,-124.4009 41.9983))"; + let df = ctx.sql(&format!("select count(1) from test_table where st_intersects_wkb(geometry, st_geomfromwkt('{}'))", wkt)).await?; + df.show().await?; + Ok(()) +} + +fn criterion_benchmark(c: &mut Criterion) { + let mut group = c.benchmark_group("spatial_range_query"); + group.bench_function("spatial_range_query", |b| { + b.iter(|| { + let start = std::time::Instant::now(); + let rt = Runtime::new().unwrap(); + criterion::black_box(rt.block_on(run_query()).unwrap()); + let duration = start.elapsed(); + println!("Time elapsed in run_query() is: {:?}", duration); + }); + }); +} + + +fn config() -> Criterion { + Criterion::default().sample_size(10) +} + +criterion_group! { + name = benches; + config = config(); + targets = criterion_benchmark +} +criterion_main!(benches); diff --git a/native/spark-expr/src/scalar_funcs.rs b/native/spark-expr/src/scalar_funcs.rs index 3d2cd166ae..578ce87fea 100644 --- a/native/spark-expr/src/scalar_funcs.rs +++ b/native/spark-expr/src/scalar_funcs.rs @@ -52,7 +52,7 @@ pub use chr::SparkChrFunc; pub mod hash_expressions; mod st; -mod geometry_helpers; +pub mod geometry_helpers; mod geo_helpers; mod wkb; diff --git a/native/spark-expr/src/scalar_funcs/st.rs b/native/spark-expr/src/scalar_funcs/st.rs index 6815e26ae0..f08b890345 100644 --- a/native/spark-expr/src/scalar_funcs/st.rs +++ b/native/spark-expr/src/scalar_funcs/st.rs @@ -41,6 +41,7 @@ use crate::scalar_funcs::geometry_helpers::{ }; use crate::scalar_funcs::geo_helpers::{arrow_to_geo, arrow_to_geo_scalar, geo_to_arrow, geo_to_arrow_scalar}; use crate::scalar_funcs::wkb::read_wkb; +use std::sync::OnceLock; pub fn spark_st_point( args: &[ColumnarValue], @@ -386,6 +387,30 @@ pub fn spark_st_polygon( Ok(ColumnarValue::Array(Arc::new(geometry_array))) } +const SIN_COS_TABLE_SIZE: usize = 10000; +static SIN_TABLE: OnceLock<[f64; SIN_COS_TABLE_SIZE]> = OnceLock::new(); +static COS_TABLE: OnceLock<[f64; SIN_COS_TABLE_SIZE]> = OnceLock::new(); + +fn get_sin_table() -> &'static [f64; SIN_COS_TABLE_SIZE] { + SIN_TABLE.get_or_init(|| { + let mut table = [0.0; SIN_COS_TABLE_SIZE]; + for i in 0..SIN_COS_TABLE_SIZE { + table[i] = (i as f64 * 2.0 * std::f64::consts::PI / SIN_COS_TABLE_SIZE as f64).sin(); + } + table + }) +} + +fn get_cos_table() -> &'static [f64; SIN_COS_TABLE_SIZE] { + COS_TABLE.get_or_init(|| { + let mut table = [0.0; SIN_COS_TABLE_SIZE]; + for i in 0..SIN_COS_TABLE_SIZE { + table[i] = (i as f64 * 2.0 * std::f64::consts::PI / SIN_COS_TABLE_SIZE as f64).cos(); + } + table + }) +} + pub fn spark_st_random_polygon( args: &[ColumnarValue], _data_type: &DataType, @@ -438,11 +463,13 @@ pub fn spark_st_random_polygon( } }; - // Create the geometry builder + let sin_table = get_sin_table(); + let cos_table = get_cos_table(); + let mut geometry_builder = create_geometry_builder_polygon(); // Initialize vectors to hold angles, x_coords, and y_coords for reuse - let mut angles = Vec::new(); + let mut angle_indices = Vec::new(); let mut x_coords = Vec::new(); let mut y_coords = Vec::new(); @@ -458,12 +485,12 @@ pub fn spark_st_random_polygon( let mut random = XORShiftRandom::new(seed); // Generate random angles - angles.clear(); - angles.reserve(num_segments as usize); + angle_indices.clear(); + angle_indices.reserve(num_segments as usize); for _ in 0..num_segments { - angles.push(random.next_f64() * 2.0 * std::f64::consts::PI); + angle_indices.push(random.next_int(SIN_COS_TABLE_SIZE as i32)); } - angles.sort_by(|a, b| a.partial_cmp(b).unwrap()); + angle_indices.sort(); // Generate coordinates x_coords.clear(); @@ -472,10 +499,10 @@ pub fn spark_st_random_polygon( y_coords.reserve((num_segments + 1) as usize); for k in 0..num_segments { - let angle = angles[k as usize]; + let angle_idx = angle_indices[k as usize] as usize; let distance = random.next_f64() * (max_size / 2.0); - let x = center_x + distance * angle.cos(); - let y = center_y + distance * angle.sin(); + let x = center_x + distance * cos_table[angle_idx]; + let y = center_y + distance * sin_table[angle_idx]; x_coords.push(x); y_coords.push(y); } @@ -613,12 +640,35 @@ impl XORShiftRandom { fn next(&mut self, bits: i32) -> i32 { let mut next_seed = self.seed ^ (self.seed << 21); - next_seed ^= next_seed >> 35; + next_seed ^= ((next_seed as u64) >> 35) as i64; next_seed ^= next_seed << 4; self.seed = next_seed; (next_seed & ((1i64 << bits) - 1)) as i32 } + fn next_int(&mut self, bound: i32) -> i32 { + if bound <= 0 { + panic!("bound must be positive"); + } + + let mut r = self.next(31); + let m = bound - 1; + + if (bound & m) == 0 { // bound is a power of 2 + r = ((bound as i64 * r as i64) >> 31) as i32 + } else { + loop { + let u = r; + r = u % bound; + if u - r + m >= 0 { + break; + } + r = self.next(31); + } + } + r + } + fn next_f64(&mut self) -> f64 { // Combine two calls to next() to create a 53-bit precision float let val = ((self.next(26) as i64) << 27) + (self.next(27) as i64); @@ -975,7 +1025,9 @@ pub fn spark_st_within( // the zip function is used to iterate over both geometry arrays simultaneously, and the intersects function is applied to each pair of geometries. // This approach leverages vectorization by processing the arrays in a batch-oriented manner, which can be more efficient than processing each element individually. for (g1, g2) in geos_geom_array1.iter().zip(geos_geom_array2.iter()) { - let within = g1.is_within(g2); + let g1_bounds = g1.bounding_rect().unwrap(); + let g2_bounds = g2.bounding_rect().unwrap(); + let within = g1_bounds.is_within(&g2_bounds) && g1.is_within(g2); boolean_builder.append_value(within); } @@ -1009,7 +1061,9 @@ pub fn spark_st_contains( // the zip function is used to iterate over both geometry arrays simultaneously, and the intersects function is applied to each pair of geometries. // This approach leverages vectorization by processing the arrays in a batch-oriented manner, which can be more efficient than processing each element individually. for (g1, g2) in geos_geom_array1.iter().zip(geos_geom_array2.iter()) { - let contains = g1.contains(g2); + let g1_bounds = g1.bounding_rect().unwrap(); + let g2_bounds = g2.bounding_rect().unwrap(); + let contains = g1_bounds.contains(&g2_bounds) && g1.contains(g2); boolean_builder.append_value(contains); } @@ -1025,6 +1079,7 @@ mod tests { use arrow::datatypes::{DataType, Field}; use datafusion::physical_plan::ColumnarValue; use arrow_array::{ArrayRef, BooleanArray, StringArray}; + use wkt::ToWkt; use crate::scalar_funcs::geometry_helpers::{append_linestring, create_geometry_builder}; #[test] @@ -1376,18 +1431,19 @@ mod tests { // Create sample x1, y1, x2, and y2 coordinates as Float64Array let xs = vec![10.0, 20.0, 30.0]; let ys = vec![40.0, 50.0, 60.0]; + let seed = vec![123, 456, 789]; let x_coords = Float64Array::from(xs.clone()); let y_coords = Float64Array::from(ys.clone()); let max_size = ScalarValue::Float64(Some(0.1)); - let num_segments = ScalarValue::Int32(Some(3)); - let seed = ScalarValue::Int64(Some(123)); + let num_segments = ScalarValue::Int32(Some(10)); + let seed = Int64Array::from(seed.clone()); // Convert to ColumnarValue let x_value = ColumnarValue::Array(Arc::new(x_coords)); let y_value = ColumnarValue::Array(Arc::new(y_coords)); let max_size_value = ColumnarValue::Scalar(max_size); let num_segments_value = ColumnarValue::Scalar(num_segments); - let seed_value = ColumnarValue::Scalar(seed); + let seed_value = ColumnarValue::Array(Arc::new(seed)); // Call the spark_st_polygon function with x1, y1, x2, and y2 arguments let result = spark_st_random_polygon(&[x_value, y_value, max_size_value, num_segments_value, seed_value], &DataType::Null).unwrap(); @@ -1400,6 +1456,7 @@ mod tests { assert_eq!(polygons.len(), 3); for k in 0..3 { let polygon = &polygons[k]; + println!("polygon: {}", polygon.wkt_string()); let centroid = polygon.centroid().unwrap(); assert!((centroid.x() - xs[k]).abs() < 0.5); assert!((centroid.y() - ys[k]).abs() < 0.5); @@ -1415,18 +1472,19 @@ mod tests { // Create sample x1, y1, x2, and y2 coordinates as Float64Array let xs = vec![10.0, 20.0, 30.0]; let ys = vec![40.0, 50.0, 60.0]; + let seed = vec![123, 456, 789]; let x_coords = Float64Array::from(xs.clone()); let y_coords = Float64Array::from(ys.clone()); let max_segment_size = ScalarValue::Float64(Some(0.1)); let num_segments = ScalarValue::Int32(Some(5)); - let seed = ScalarValue::Int64(Some(123)); + let seed = Int64Array::from(seed.clone()); // Convert to ColumnarValue let x_value = ColumnarValue::Array(Arc::new(x_coords)); let y_value = ColumnarValue::Array(Arc::new(y_coords)); let max_segment_size_value = ColumnarValue::Scalar(max_segment_size); let num_segments_value = ColumnarValue::Scalar(num_segments); - let seed_value = ColumnarValue::Scalar(seed); + let seed_value = ColumnarValue::Array(Arc::new(seed)); // Call the spark_st_random_linestring function with x1, y1, x2, and y2 arguments let result = spark_st_random_linestring(&[x_value, y_value, max_segment_size_value, num_segments_value, seed_value], &DataType::Null).unwrap(); @@ -1439,6 +1497,7 @@ mod tests { assert_eq!(linestrings.len(), 3); for k in 0..3 { let linestring = &linestrings[k]; + println!("linestring: {}", linestring.wkt_string()); assert_eq!(6, linestring.coords_count()); for coord in linestring.coords_iter() { assert!((coord.x - xs[k]).abs() < 0.5); @@ -1449,4 +1508,12 @@ mod tests { panic!("Expected geometry to be linestring"); } } + + #[test] + fn test_xorshift_random() { + let mut random = XORShiftRandom::new(123); + for _ in 0..10 { + println!("{}", random.next_f64()); + } + } } diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/udf/CometUDF.scala b/spark/src/main/scala/org/apache/spark/sql/comet/udf/CometUDF.scala index be45044875..18567ee8e4 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/udf/CometUDF.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/udf/CometUDF.scala @@ -261,7 +261,7 @@ class CometUDF { val st_randomlinestring: UserDefinedFunction = udf( new UDF5[Row, Row, Row, Row, Row, Row] { override def call(x: Row, y: Row, sz: Row, nSeg: Row, seed: Row): Row = Row.empty }, - DataTypes.createStructType(GEOMETRY_POLYGON)) + DataTypes.createStructType(GEOMETRY_LINESTRING)) /** * Registers all UDFs defined in this class with the given SparkSession.