From b02d08b0c7d2a4e909cf12e835a4b6aed75a1336 Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Mon, 10 Nov 2025 12:03:24 -0800 Subject: [PATCH 1/8] feat: late materialization of vectors in filtered vector search KNN search is performed when a vector index is not present. When a table is partially covered by a vector index, we perform a union of an ANN search over the indexed data, and a KNN search over the unindexed data. If the table is completely unindexed it is just a KNN search on the data. Prior to this commit, when we would execute the KNN portion of a filtered vector search, we would perform a scan of all columns and remove results that did not match the filter. For large vectors, this amounts to a lot of overfetch from storage. When filters are selective, it is more efficient to read the filter column (typically much smaller than the vector), apply the filter, and then select matching vectors by row ID. This patch implements that strategy as well as an adaptive mechanism for deciding when to apply it. There is a new configuration concept in the scanner for specifying the filter selectivity at which it will be cheaper to do a scan. We will compute a target rowcount based on that threshold and scan the filter column for matches. If we encounter more matches than the target, we will give up and switch to a scan. --- rust/lance-datafusion/src/exec.rs | 15 + rust/lance-tools/src/meta.rs | 2 +- rust/lance/src/dataset.rs | 8 +- rust/lance/src/dataset/scanner.rs | 779 ++++++++++++++++++++++++++---- 4 files changed, 710 insertions(+), 94 deletions(-) diff --git a/rust/lance-datafusion/src/exec.rs b/rust/lance-datafusion/src/exec.rs index 50cdbcd2aa..f4ca16c231 100644 --- a/rust/lance-datafusion/src/exec.rs +++ b/rust/lance-datafusion/src/exec.rs @@ -411,6 +411,21 @@ pub struct ExecutionSummaryCounts { pub all_counts: HashMap, } +impl ExecutionSummaryCounts { + /// Create a new ExecutionSummaryCounts with all values initialized to zero + pub fn new() -> Self { + Self::default() + } + + /// Create a new ExecutionSummaryCounts with only custom counts + pub fn with_counts(counts: impl IntoIterator, usize)>) -> Self { + Self { + all_counts: counts.into_iter().map(|(k, v)| (k.into(), v)).collect(), + ..Default::default() + } + } +} + fn visit_node(node: &dyn ExecutionPlan, counts: &mut ExecutionSummaryCounts) { if let Some(metrics) = node.metrics() { for (metric_name, count) in metrics.iter_counts() { diff --git a/rust/lance-tools/src/meta.rs b/rust/lance-tools/src/meta.rs index d32fa8987f..f45b17dd8f 100644 --- a/rust/lance-tools/src/meta.rs +++ b/rust/lance-tools/src/meta.rs @@ -47,7 +47,7 @@ impl LanceToolFileMetadata { .open_file(&path, &CachedFileSize::unknown()) .await?; let file_metadata = FileReader::read_all_metadata(&file_scheduler).await?; - let lance_tool_file_metadata = LanceToolFileMetadata { file_metadata }; + let lance_tool_file_metadata = Self { file_metadata }; Ok(lance_tool_file_metadata) } } diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index ec0ac823f4..d3fd1bc0a8 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -8873,12 +8873,7 @@ mod tests { } fn make_tx(read_version: u64) -> Transaction { - Transaction::new( - read_version, - Operation::Append { fragments: vec![] }, - None, - None, - ) + Transaction::new(read_version, Operation::Append { fragments: vec![] }, None) } async fn delete_external_tx_file(ds: &Dataset) { @@ -8939,7 +8934,6 @@ mod tests { ds.load_indices().await.unwrap().as_ref().clone(), &tx_file, &ManifestWriteConfig::default(), - None, ) .unwrap(); let location = write_manifest_file( diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index b1f2d07540..8ec685eda0 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: Apache-2.0 // SPDX-FileCopyrightText: Copyright The Lance Authors +use std::collections::BTreeSet; use std::ops::Range; use std::pin::Pin; use std::sync::{Arc, LazyLock}; @@ -128,6 +129,13 @@ pub static DEFAULT_IO_BUFFER_SIZE: LazyLock = LazyLock::new(|| { .unwrap_or(2 * 1024 * 1024 * 1024) }); +/// Selectivity threshold for late materialization in filtered KNN searches. +/// If the filter selects fewer than this percentage of the table's rows, we use late +/// materialization to avoid fetching vector data for rows that will be filtered out. +/// If the filter selects this percentage or more rows, we do a single scan with vectors +/// to avoid the random access overhead of the take operation. +pub const LATE_MATERIALIZE_SELECTIVITY_THRESHOLD: f64 = 0.005; // 0.5% + /// Defines an ordering for a single column /// /// Floats are sorted using the IEEE 754 total ordering @@ -227,6 +235,21 @@ struct PlannedFilteredScan { filter_pushed_down: bool, } +/// Empty index info provider for cases where scalar indices are not used +struct EmptyIndexInfo; + +impl lance_index::scalar::expression::IndexInformationProvider for EmptyIndexInfo { + fn get_index( + &self, + _col: &str, + ) -> Option<( + &arrow_schema::DataType, + &dyn lance_index::scalar::expression::ScalarQueryParser, + )> { + None + } +} + /// Filter for filtering rows #[derive(Debug, Clone)] pub enum LanceFilter { @@ -407,6 +430,9 @@ pub struct Scanner { /// File reader options to use when reading data files. file_reader_options: Option, + /// Selectivity threshold for late materialization in filtered KNN searches. + late_materialize_selectivity_threshold: Option, + // Legacy fields to help migrate some old projection behavior to new behavior // // There are two behaviors we are moving away from: @@ -623,6 +649,7 @@ impl Scanner { scan_stats_callback: None, strict_batch_size: false, file_reader_options, + late_materialize_selectivity_threshold: None, legacy_with_row_addr: false, legacy_with_row_id: false, explicit_projection: false, @@ -856,6 +883,59 @@ impl Scanner { self } + /// Set the selectivity threshold for late materialization in filtered KNN searches. + /// + /// When a filter is present in a KNN search, we execute it first to measure selectivity. + /// If the filter selects fewer than this percentage of rows, we use late materialization + /// (scan scalars first, then take vectors for filtered rows only). If the filter selects + /// this percentage or more rows, we do a single scan with both filter and vector columns to avoid the + /// random access overhead of the take operation. + /// + /// The optimal value depends on your storage medium: + /// - **Object storage (S3, GCS, Azure)**: Use a low threshold like 0.005 (0.5%) since + /// random access is very expensive + /// - **Local SSD**: Can use a higher threshold like 0.05 (5%) since random access is cheaper + /// - **NVMe**: Can use even higher thresholds like 0.1 (10%) + /// + /// The default is 0.005 (0.5%), which is conservative for object storage. + /// + /// # Arguments + /// + /// * `threshold` - The selectivity threshold as a fraction (e.g., 0.005 for 0.5%) + /// + /// # Example + /// + /// ```ignore + /// // For local SSD, use a higher threshold + /// scanner.late_materialize_selectivity_threshold(0.05)?; + /// ``` + /// + /// # Errors + /// + /// Returns an error if the threshold is not finite or is outside the range [0.0, 1.0]. + pub fn late_materialize_selectivity_threshold(&mut self, threshold: f64) -> Result<&mut Self> { + if !threshold.is_finite() { + return Err(Error::invalid_input( + format!( + "late_materialize_selectivity_threshold must be a finite value, got {}", + threshold + ), + location!(), + )); + } + if !(0.0..=1.0).contains(&threshold) { + return Err(Error::invalid_input( + format!( + "late_materialize_selectivity_threshold must be between 0.0 and 1.0 (inclusive), got {}", + threshold + ), + location!(), + )); + } + self.late_materialize_selectivity_threshold = Some(threshold); + Ok(self) + } + /// Set the prefetch size. /// Ignored in v2 and newer format pub fn batch_readahead(&mut self, nbatches: usize) -> &mut Self { @@ -2891,33 +2971,51 @@ impl Scanner { Ok(knn_node) } else { // No index found. use flat search. - let mut columns = vec![q.column.clone()]; - if let Some(refine_expr) = filter_plan.refine_expr.as_ref() { - columns.extend(Planner::column_names_in_expr(refine_expr)); - } - let mut vector_scan_projection = self - .dataset - .empty_projection() - .with_row_id() - .union_columns(&columns, OnMissing::Error)?; + let has_filter = filter_plan.full_expr.is_some() || filter_plan.refine_expr.is_some(); + let has_scalar_index = filter_plan.index_query.is_some(); - vector_scan_projection.with_row_addr = - self.projection_plan.physical_projection.with_row_addr; + let plan = if has_filter && !has_scalar_index { + // Use adaptive late materialization for filters without scalar indices. + self.adaptive_late_materialization(filter_plan, None, &q.column, false) + .await? + } else { + // Direct scan when no filter or when scalar index is present. + // Scalar indices use MaterializeIndex which already optimizes I/O. + let mut scan_projection = self + .dataset + .empty_projection() + .with_row_id() + .union_column(&q.column, OnMissing::Error)?; - let PlannedFilteredScan { mut plan, .. } = self - .filtered_read( - filter_plan, - vector_scan_projection, - /*include_deleted_rows=*/ true, - None, - None, - /*is_prefilter= */ true, - ) - .await?; + // Include columns referenced by refine expression + if let Some(ref refine_expr) = filter_plan.refine_expr { + let refine_cols = Planner::column_names_in_expr(refine_expr); + scan_projection = scan_projection.union_columns(refine_cols, OnMissing::Error)?; + } + + scan_projection.with_row_addr = + self.projection_plan.physical_projection.with_row_addr; + + let PlannedFilteredScan { plan, .. } = self + .filtered_read( + filter_plan, + scan_projection, + /*include_deleted_rows=*/ true, + None, + None, + /*is_prefilter= */ true, + ) + .await?; + + // Apply refine filter if present + let plan = if let Some(ref refine_expr) = filter_plan.refine_expr { + Arc::new(LanceFilterExec::try_new(refine_expr.clone(), plan)?) + } else { + plan + }; + plan + }; - if let Some(refine_expr) = &filter_plan.refine_expr { - plan = Arc::new(LanceFilterExec::try_new(refine_expr.clone(), plan)?); - } Ok(self.flat_knn(plan, q)?) } } @@ -2957,36 +3055,54 @@ impl Scanner { knn_node = self.take(knn_node, vector_projection)?; } - let mut columns = vec![q.column.clone()]; - if let Some(expr) = filter_plan.full_expr.as_ref() { - let filter_columns = Planner::column_names_in_expr(expr); - columns.extend(filter_columns); - } - let vector_scan_projection = Arc::new(self.dataset.schema().project(&columns).unwrap()); - // Note: we could try and use the scalar indices here to reduce the scope of this scan but the - // most common case is that fragments that are newer than the vector index are going to be newer - // than the scalar indices anyways - let mut scan_node = self.scan_fragments( - true, - false, - false, - false, - false, - vector_scan_projection, - Arc::new(unindexed_fragments), - // Can't pushdown limit/offset in an ANN search - None, - // We are re-ordering anyways, so no need to get data in data - // in a deterministic order. - false, - ); + // Create a filter plan without scalar indices since they don't cover this fragment subset. + let unindexed_filter_plan = if let Some(expr) = filter_plan.full_expr.clone() { + let filter_schema = self.filterable_schema()?; + let planner = Planner::new(Arc::new(filter_schema.as_ref().into())); - if let Some(expr) = filter_plan.full_expr.as_ref() { - // If there is a prefilter we need to manually apply it to the new data - scan_node = Arc::new(LanceFilterExec::try_new(expr.clone(), scan_node)?); - } - // first we do flat search on just the new data - let topk_appended = self.flat_knn(scan_node, &q)?; + planner.create_filter_plan(expr, &EmptyIndexInfo, false)? + } else { + FilterPlan::empty() + }; + + let has_filter = unindexed_filter_plan.full_expr.is_some() + || unindexed_filter_plan.refine_expr.is_some(); + + let plan = if has_filter { + // Adaptive late materialization when there's a filter + self.adaptive_late_materialization( + &unindexed_filter_plan, + Some(Arc::new(unindexed_fragments.clone())), + &q.column, + unindexed_filter_plan.skip_recheck, + ) + .await? + } else { + // No filter: read vectors directly in a single scan + let mut scan_projection = self + .dataset + .empty_projection() + .with_row_id() + .union_column(&q.column, OnMissing::Error)?; + + scan_projection.with_row_addr = + self.projection_plan.physical_projection.with_row_addr; + + let PlannedFilteredScan { plan, .. } = self + .filtered_read( + &unindexed_filter_plan, + scan_projection, + /*include_deleted_rows=*/ false, + Some(Arc::new(unindexed_fragments.clone())), + None, + /*is_prefilter= */ true, + ) + .await?; + + plan + }; + + let topk_appended = self.flat_knn(plan, &q)?; // To do a union, we need to make the schemas match. Right now // knn_node: _distance, _rowid, vector @@ -3618,6 +3734,259 @@ impl Scanner { } } + /// Helper: performs a full scan with filter and vector columns together + async fn full_scan_with_filter( + &self, + filter_plan: &FilterPlan, + fragments: Option>>, + vector_column: &str, + filter_columns: Vec, + skip_recheck: bool, + ) -> Result> { + // Deduplicate columns to avoid issues if vector_column is in filter_columns + let mut columns_set: BTreeSet = filter_columns.into_iter().collect(); + columns_set.insert(vector_column.to_string()); + let columns_vec: Vec = columns_set.into_iter().collect(); + + let mut full_scan_projection = self + .dataset + .empty_projection() + .with_row_id() + .union_columns(&columns_vec, OnMissing::Error)?; + + full_scan_projection.with_row_addr = self.projection_plan.physical_projection.with_row_addr; + + let PlannedFilteredScan { mut plan, .. } = self + .filtered_read( + filter_plan, + full_scan_projection, + /*include_deleted_rows=*/ false, + fragments, + None, + /*is_prefilter= */ true, + ) + .await?; + + if !skip_recheck { + if let Some(refine_expr) = &filter_plan.refine_expr { + plan = Arc::new(LanceFilterExec::try_new(refine_expr.clone(), plan)?); + } + } + + Ok(plan) + } + + /// Get the total row count from the given fragments, or fall back to scanner/dataset fragments. + fn get_total_row_count(&self, fragments: Option<&Arc>>) -> usize { + if let Some(frags) = fragments { + frags.iter().filter_map(|f| f.num_rows()).sum() + } else { + let frags = if let Some(scanner_frags) = self.fragments.as_ref() { + scanner_frags + } else { + self.dataset.fragments().as_ref() + }; + frags.iter().filter_map(|f| f.num_rows()).sum() + } + } + + /// Adaptive late materialization for filtered vector scans. + /// + /// When a filter is present, this method: + /// 1. Scans with scalar columns first to check selectivity + /// 2. If selective (< threshold): uses late materialization (collect row IDs, then take vectors) + /// 3. If not selective (>= threshold): does a full scan with both filter and vector columns + /// + /// This avoids expensive random access for non-selective filters while benefiting + /// from late materialization for selective ones. + async fn adaptive_late_materialization( + &self, + filter_plan: &FilterPlan, + fragments: Option>>, + vector_column: &str, + skip_recheck: bool, + ) -> Result> { + let mut filter_columns_set: BTreeSet = BTreeSet::new(); + if let Some(refine_expr) = filter_plan.refine_expr.as_ref() { + filter_columns_set.extend(Planner::column_names_in_expr(refine_expr)); + } + if let Some(full_expr) = filter_plan.full_expr.as_ref() { + filter_columns_set.extend(Planner::column_names_in_expr(full_expr)); + } + + // If filter references the vector column, skip adaptive late materialization + // to avoid loading vectors twice (once for filter, once for distance calc) + if filter_columns_set.contains(vector_column) { + let mut filter_columns: Vec = filter_columns_set.into_iter().collect(); + filter_columns.sort(); + return self + .full_scan_with_filter( + filter_plan, + fragments, + vector_column, + filter_columns, + skip_recheck, + ) + .await; + } + + let mut filter_columns: Vec = filter_columns_set.into_iter().collect(); + filter_columns.sort(); + + let mut scalar_scan_projection = self + .dataset + .empty_projection() + .with_row_id() + .union_columns(&filter_columns, OnMissing::Error)?; + + scalar_scan_projection.with_row_addr = + self.projection_plan.physical_projection.with_row_addr; + + let PlannedFilteredScan { mut plan, .. } = self + .filtered_read( + filter_plan, + scalar_scan_projection, + /*include_deleted_rows=*/ false, + fragments.clone(), + None, + /*is_prefilter= */ true, + ) + .await?; + + if !skip_recheck { + if let Some(refine_expr) = &filter_plan.refine_expr { + plan = Arc::new(LanceFilterExec::try_new(refine_expr.clone(), plan)?); + } + } + + let total_count = self.get_total_row_count(fragments.as_ref()); + + // If we can't determine total count, fall back to full scan + // (conservative for object storage) + if total_count == 0 { + return self + .full_scan_with_filter( + filter_plan, + fragments, + vector_column, + filter_columns, + skip_recheck, + ) + .await; + } + let threshold = self + .late_materialize_selectivity_threshold + .unwrap_or(LATE_MATERIALIZE_SELECTIVITY_THRESHOLD); + + let (row_id_batch, filtered_count) = + self.collect_row_ids(plan, total_count, threshold).await?; + + let result = if let Some(row_id_batch) = row_id_batch { + let selectivity = filtered_count as f64 / total_count as f64; + + if let Some(callback) = &self.scan_stats_callback { + let stats = ExecutionSummaryCounts::with_counts([ + ("late_mat_selected_rows", filtered_count), + ("late_mat_total_rows", total_count), + ("late_mat_selectivity_pct", (selectivity * 100.0) as usize), + ("late_mat_strategy", 1), // 1 = late_materialized + ]); + callback(&stats); + } + + let row_id_plan = Arc::new(OneShotExec::from_batch(row_id_batch)); + let vector_projection = self + .dataset + .empty_projection() + .union_column(vector_column, OnMissing::Error)?; + self.take(row_id_plan, vector_projection)? + } else { + // Threshold exceeded: use full scan, don't report selectivity stats + if let Some(callback) = &self.scan_stats_callback { + let stats = ExecutionSummaryCounts::with_counts([ + ("late_mat_total_rows", total_count), + ("late_mat_strategy", 0), // 0 = full_scan + ("late_mat_threshold_exceeded", 1), + ]); + callback(&stats); + } + + self.full_scan_with_filter( + filter_plan, + fragments, + vector_column, + filter_columns, + skip_recheck, + ) + .await? + }; + + Ok(result) + } + + /// Collects row IDs from a plan and returns them as a batch along with the count. + /// + /// Returns (None, 0) if the filtered count reaches or exceeds the selectivity threshold. + /// The count of 0 is returned because we short-circuit before counting all rows; + /// it does not represent the actual filtered count, which is unknown. + async fn collect_row_ids( + &self, + plan: Arc, + total_count: usize, + threshold: f64, + ) -> Result<(Option, usize)> { + use arrow_array::UInt64Array; + + let mut stream = execute_plan( + plan, + LanceExecutionOptions { + batch_size: self.batch_size, + ..Default::default() + }, + )?; + + let threshold_count = ((total_count as f64) * threshold).ceil() as usize; + let mut row_ids = Vec::new(); + let mut filtered_count = 0; + while let Some(batch) = stream.next().await { + let batch = batch?; + let batch_row_count = batch.num_rows(); + + // Short-circuit: return 0 since we don't have accurate count + if filtered_count + batch_row_count >= threshold_count { + return Ok((None, 0)); + } + + filtered_count += batch_row_count; + let rowid_col = batch + .column_by_name(ROW_ID) + .ok_or_else(|| Error::Internal { + message: "Expected _rowid column in batch".to_string(), + location: location!(), + })?; + let rowid_array = rowid_col + .as_any() + .downcast_ref::() + .ok_or_else(|| Error::Internal { + message: "_rowid column is not UInt64Array".to_string(), + location: location!(), + })?; + + row_ids.extend(rowid_array.values().iter().copied()); + } + + // We stayed below threshold, return the collected row IDs for late materialization + let row_id_array = Arc::new(UInt64Array::from(row_ids)); + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + ROW_ID, + DataType::UInt64, + false, + )])); + let batch = RecordBatch::try_new(schema, vec![row_id_array])?; + + Ok((Some(batch), filtered_count)) + } + /// Global offset-limit of the result of the input plan fn limit_node(&self, plan: Arc) -> Arc { Arc::new(GlobalLimitExec::new( @@ -3877,7 +4246,6 @@ pub mod test_dataset { #[cfg(test)] mod test { - use std::collections::BTreeSet; use std::time::{Duration, Instant}; use std::vec; @@ -4053,6 +4421,153 @@ mod test { } } + #[tokio::test] + async fn test_late_materialize_selectivity_threshold() { + use super::test_dataset::TestVectorDataset; + + // Create a test dataset with vectors and a filter column (NO INDICES) + let test_ds = TestVectorDataset::new(LanceFileVersion::Stable, false) + .await + .unwrap(); + + let dataset = &test_ds.dataset; + + // Use a very selective filter "i < 2" which matches only 2/400 rows (0.5% selectivity) + // This selectivity is exactly at the default threshold (0.5%), so it will NOT trigger late materialization. + let q = Float32Array::from_iter_values((0..32).map(|v| v as f32)); + + // Test 1: With default threshold (0.005 = 0.5%), this filter is right at the boundary + // Since selectivity (2/400 = 0.005) == threshold (0.005), it should NOT use late materialization (>= threshold) + let plan_str_default = dataset + .scan() + .nearest("vec", &q, 5) + .unwrap() + .filter("i < 2") + .unwrap() + .prefilter(true) + .explain_plan(false) + .await + .unwrap(); + + // Test 2: With very high threshold (0.99 = 99%), even this selective filter + // should trigger late materialization since 0.5% < 99% + let plan_str_high = dataset + .scan() + .late_materialize_selectivity_threshold(0.99) + .unwrap() + .nearest("vec", &q, 5) + .unwrap() + .filter("i < 2") + .unwrap() + .prefilter(true) + .explain_plan(false) + .await + .unwrap(); + + // Test 3: With zero threshold (0.0), late materialization should never be used + // since no selectivity is < 0% + let plan_str_zero = dataset + .scan() + .late_materialize_selectivity_threshold(0.0) + .unwrap() + .nearest("vec", &q, 5) + .unwrap() + .filter("i < 2") + .unwrap() + .prefilter(true) + .explain_plan(false) + .await + .unwrap(); + + assert_eq!(plan_str_default, plan_str_zero, + "Default threshold (0.5%) and zero threshold should both skip late materialization when selectivity == threshold"); + + assert_ne!(plan_str_default, plan_str_high, + "High threshold (99%) should produce different plan than default, using late materialization"); + + assert!(plan_str_high.contains("OneShotStream"), + "High threshold should use late materialization with OneShotExec/OneShotStream, but plan was:\n{}", + plan_str_high); + } + + /// Verifies that filters referencing the vector column skip late materialization + #[tokio::test] + async fn test_late_mat_skipped_when_filter_uses_vector() { + use super::test_dataset::TestVectorDataset; + + let test_ds = TestVectorDataset::new(LanceFileVersion::Stable, false) + .await + .unwrap(); + + let dataset = &test_ds.dataset; + let q = Float32Array::from_iter_values((0..32).map(|v| v as f32)); + + // Filter references vector column - should skip late materialization + let plan_vector_filter = dataset + .scan() + .late_materialize_selectivity_threshold(0.99) + .unwrap() // High threshold + .nearest("vec", &q, 10) + .unwrap() + .filter("vec[0] > 0.0") // Filter uses vector column + .unwrap() + .prefilter(true) + .explain_plan(false) + .await + .unwrap(); + + // Should NOT use late materialization even with high threshold + assert!( + !plan_vector_filter.contains("OneShotStream"), + "Should skip late materialization when filter references vector column" + ); + } + + /// Verifies threshold short-circuiting: low threshold → full scan, high threshold → late mat + #[tokio::test] + async fn test_late_mat_threshold_short_circuit() { + use super::test_dataset::TestVectorDataset; + + let test_ds = TestVectorDataset::new(LanceFileVersion::Stable, false) + .await + .unwrap(); + + let dataset = &test_ds.dataset; + let q = Float32Array::from_iter_values((0..32).map(|v| v as f32)); + + // 1% threshold with 50% selectivity → exceeds threshold → full scan + let plan_low_threshold = dataset + .scan() + .late_materialize_selectivity_threshold(0.01) + .unwrap() + .nearest("vec", &q, 10) + .unwrap() + .filter("i < 200") + .unwrap() + .prefilter(true) + .explain_plan(false) + .await + .unwrap(); + + // 99% threshold with 50% selectivity → doesn't exceed → late materialization + let plan_high_threshold = dataset + .scan() + .late_materialize_selectivity_threshold(0.99) + .unwrap() + .nearest("vec", &q, 10) + .unwrap() + .filter("i < 200") + .unwrap() + .prefilter(true) + .explain_plan(false) + .await + .unwrap(); + + assert_ne!(plan_low_threshold, plan_high_threshold); + assert!(!plan_low_threshold.contains("OneShotStream")); + assert!(plan_high_threshold.contains("OneShotStream")); + } + #[cfg(not(windows))] #[tokio::test] async fn test_local_object_store() { @@ -6878,13 +7393,13 @@ mod test { ) .await?; - // use_index = False -> same plan as KNN + // use_index = False -> same plan as KNN without late materialization (no filter) log::info!("Test case: ANN with index disabled"); let expected = if data_storage_version == LanceFileVersion::Legacy { "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@0 as vec, _distance@2 as _distance] Take: columns=\"vec, _rowid, _distance, (i), (s)\" CoalesceBatchesExec: target_batch_size=8192 - FilterExec: _distance@... IS NOT NULL + FilterExec: _distance@2 IS NOT NULL SortExec: TopK(fetch=13), expr=... KNNVectorDistance: metric=l2 LanceScan: uri=..., projection=[vec], row_id=true, row_addr=false, ordered=false, range=None" @@ -6892,7 +7407,7 @@ mod test { "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@0 as vec, _distance@2 as _distance] Take: columns=\"vec, _rowid, _distance, (i), (s)\" CoalesceBatchesExec: target_batch_size=8192 - FilterExec: _distance@... IS NOT NULL + FilterExec: _distance@2 IS NOT NULL SortExec: TopK(fetch=13), expr=... KNNVectorDistance: metric=l2 LanceRead: uri=..., projection=[vec], num_fragments=2, range_before=None, range_after=None, \ @@ -6963,24 +7478,45 @@ mod test { dataset.append_new_data().await?; log::info!("Test case: Combined KNN/ANN"); - let expected = "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance] + let expected = if data_storage_version == LanceFileVersion::Legacy { + "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance] Take: columns=\"_rowid, vec, _distance, (i), (s)\" CoalesceBatchesExec: target_batch_size=8192 FilterExec: _distance@... IS NOT NULL - SortExec: TopK(fetch=6), expr=... + SortExec: TopK(fetch=6), expr=..., preserve_partitioning=... KNNVectorDistance: metric=l2 RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2 UnionExec ProjectionExec: expr=[_distance@2 as _distance, _rowid@1 as _rowid, vec@0 as vec] FilterExec: _distance@... IS NOT NULL - SortExec: TopK(fetch=6), expr=... + SortExec: TopK(fetch=6), expr=..., preserve_partitioning=... KNNVectorDistance: metric=l2 LanceScan: uri=..., projection=[vec], row_id=true, row_addr=false, ordered=false, range=None Take: columns=\"_distance, _rowid, (vec)\" CoalesceBatchesExec: target_batch_size=8192 - SortExec: TopK(fetch=6), expr=... + SortExec: TopK(fetch=6), expr=..., preserve_partitioning=... + ANNSubIndex: name=..., k=6, deltas=1 + ANNIvfPartition: uuid=..., minimum_nprobes=1, maximum_nprobes=None, deltas=1" + } else { + "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance] + Take: columns=\"_rowid, vec, _distance, (i), (s)\" + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: _distance@... IS NOT NULL + SortExec: TopK(fetch=6), expr=..., preserve_partitioning=... + KNNVectorDistance: metric=l2 + RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2 + UnionExec + ProjectionExec: expr=[_distance@2 as _distance, _rowid@1 as _rowid, vec@0 as vec] + FilterExec: _distance@... IS NOT NULL + SortExec: TopK(fetch=6), expr=..., preserve_partitioning=... + KNNVectorDistance: metric=l2 + LanceRead: uri=..., projection=[vec], num_fragments=..., range_before=None, range_after=None, row_id=true, row_addr=false, full_filter=--, refine_filter=-- + Take: columns=\"_distance, _rowid, (vec)\" + CoalesceBatchesExec: target_batch_size=8192 + SortExec: TopK(fetch=6), expr=..., preserve_partitioning=... ANNSubIndex: name=..., k=6, deltas=1 - ANNIvfPartition: uuid=..., minimum_nprobes=1, maximum_nprobes=None, deltas=1"; + ANNIvfPartition: uuid=..., minimum_nprobes=1, maximum_nprobes=None, deltas=1" + }; assert_plan_equals( &dataset.dataset, |scan| scan.nearest("vec", &q, 6), @@ -6990,9 +7526,10 @@ mod test { ) .await?; - // new data and with filter + // new data and with filter (postfilter - no late materialization) log::info!("Test case: Combined KNN/ANN with postfilter"); - let expected = "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance] + let expected = if data_storage_version == LanceFileVersion::Legacy { + "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance] Take: columns=\"_rowid, vec, _distance, i, (s)\" CoalesceBatchesExec: target_batch_size=8192 FilterExec: i@3 > 10 @@ -7012,7 +7549,30 @@ mod test { CoalesceBatchesExec: target_batch_size=8192 SortExec: TopK(fetch=15), expr=... ANNSubIndex: name=..., k=15, deltas=1 - ANNIvfPartition: uuid=..., minimum_nprobes=1, maximum_nprobes=None, deltas=1"; + ANNIvfPartition: uuid=..., minimum_nprobes=1, maximum_nprobes=None, deltas=1" + } else { + "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance] + Take: columns=\"_rowid, vec, _distance, i, (s)\" + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: i@3 > 10 + Take: columns=\"_rowid, vec, _distance, (i)\" + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: _distance@... IS NOT NULL + SortExec: TopK(fetch=15), expr=... + KNNVectorDistance: metric=l2 + RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2 + UnionExec + ProjectionExec: expr=[_distance@2 as _distance, _rowid@1 as _rowid, vec@0 as vec] + FilterExec: _distance@... IS NOT NULL + SortExec: TopK(fetch=15), expr=... + KNNVectorDistance: metric=l2 + LanceRead: uri=..., projection=[vec], num_fragments=..., range_before=None, range_after=None, row_id=true, row_addr=false, full_filter=--, refine_filter=-- + Take: columns=\"_distance, _rowid, (vec)\" + CoalesceBatchesExec: target_batch_size=8192 + SortExec: TopK(fetch=15), expr=... + ANNSubIndex: name=..., k=15, deltas=1 + ANNIvfPartition: uuid=..., minimum_nprobes=1, maximum_nprobes=None, deltas=1" + }; assert_plan_equals( &dataset.dataset, |scan| scan.nearest("vec", &q, 15)?.filter("i > 10"), @@ -7020,7 +7580,7 @@ mod test { ) .await?; - // new data and with prefilter + // new data and with prefilter (filter has 97% selectivity, so no late materialization) log::info!("Test case: Combined KNN/ANN with prefilter"); let expected = if data_storage_version == LanceFileVersion::Legacy { "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance] @@ -7031,12 +7591,12 @@ mod test { KNNVectorDistance: metric=l2 RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2 UnionExec - ProjectionExec: expr=[_distance@3 as _distance, _rowid@2 as _rowid, vec@0 as vec] + ProjectionExec: expr=[_distance@3 as _distance, _rowid@2 as _rowid, vec@1 as vec] FilterExec: _distance@... IS NOT NULL SortExec: TopK(fetch=5), expr=... KNNVectorDistance: metric=l2 - FilterExec: i@1 > 10 - LanceScan: uri=..., projection=[vec, i], row_id=true, row_addr=false, ordered=false, range=None + FilterExec: i@0 > 10 + LanceScan: uri=..., projection=[i, vec], row_id=true, row_addr=false, ordered=false, range=None Take: columns=\"_distance, _rowid, (vec)\" CoalesceBatchesExec: target_batch_size=8192 SortExec: TopK(fetch=5), expr=... @@ -7053,18 +7613,18 @@ mod test { KNNVectorDistance: metric=l2 RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2 UnionExec - ProjectionExec: expr=[_distance@3 as _distance, _rowid@2 as _rowid, vec@0 as vec] + ProjectionExec: expr=[_distance@3 as _distance, _rowid@2 as _rowid, vec@1 as vec] FilterExec: _distance@... IS NOT NULL SortExec: TopK(fetch=5), expr=... KNNVectorDistance: metric=l2 - FilterExec: i@1 > 10 - LanceScan: uri=..., projection=[vec, i], row_id=true, row_addr=false, ordered=false, range=None + LanceRead: uri=..., projection=[i, vec], num_fragments=..., range_before=None, range_after=None, \ + row_id=true, row_addr=false, full_filter=i > Int32(10), refine_filter=i > Int32(10) Take: columns=\"_distance, _rowid, (vec)\" CoalesceBatchesExec: target_batch_size=8192 SortExec: TopK(fetch=5), expr=... ANNSubIndex: name=..., k=5, deltas=1 ANNIvfPartition: uuid=..., minimum_nprobes=1, maximum_nprobes=None, deltas=1 - LanceRead: uri=..., projection=[], num_fragments=2, range_before=None, range_after=None, \ + LanceRead: uri=..., projection=[], num_fragments=..., range_before=None, range_after=None, \ row_id=true, row_addr=false, full_filter=i > Int32(10), refine_filter=i > Int32(10)" }; assert_plan_equals( @@ -7143,8 +7703,30 @@ mod test { dataset.append_new_data().await?; - log::info!("Test case: Combined KNN/ANN with scalar index"); - let expected = "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance] + log::info!("Test case: Combined KNN/ANN with scalar index (filter has 97% selectivity, so no late materialization)"); + let expected = if data_storage_version == LanceFileVersion::Legacy { + "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance] + Take: columns=\"_rowid, vec, _distance, (i), (s)\" + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: _distance@... IS NOT NULL + SortExec: TopK(fetch=8), expr=... + KNNVectorDistance: metric=l2 + RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2 + UnionExec + ProjectionExec: expr=[_distance@3 as _distance, _rowid@2 as _rowid, vec@1 as vec] + FilterExec: _distance@... IS NOT NULL + SortExec: TopK(fetch=8), expr=... + KNNVectorDistance: metric=l2 + FilterExec: i@0 > 10 + LanceScan: uri=..., projection=[i, vec], row_id=true, row_addr=false, ordered=false, range=None + Take: columns=\"_distance, _rowid, (vec)\" + CoalesceBatchesExec: target_batch_size=8192 + SortExec: TopK(fetch=8), expr=... + ANNSubIndex: name=..., k=8, deltas=1 + ANNIvfPartition: uuid=..., minimum_nprobes=1, maximum_nprobes=None, deltas=1 + ScalarIndexQuery: query=[i > 10]@i_idx" + } else { + "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance] Take: columns=\"_rowid, vec, _distance, (i), (s)\" CoalesceBatchesExec: target_batch_size=8192 FilterExec: _distance@... IS NOT NULL @@ -7152,18 +7734,19 @@ mod test { KNNVectorDistance: metric=l2 RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2 UnionExec - ProjectionExec: expr=[_distance@3 as _distance, _rowid@2 as _rowid, vec@0 as vec] + ProjectionExec: expr=[_distance@3 as _distance, _rowid@2 as _rowid, vec@1 as vec] FilterExec: _distance@... IS NOT NULL SortExec: TopK(fetch=8), expr=... KNNVectorDistance: metric=l2 - FilterExec: i@1 > 10 - LanceScan: uri=..., projection=[vec, i], row_id=true, row_addr=false, ordered=false, range=None + LanceRead: uri=..., projection=[i, vec], num_fragments=..., range_before=None, range_after=None, \ + row_id=true, row_addr=false, full_filter=i > Int32(10), refine_filter=i > Int32(10) Take: columns=\"_distance, _rowid, (vec)\" CoalesceBatchesExec: target_batch_size=8192 SortExec: TopK(fetch=8), expr=... ANNSubIndex: name=..., k=8, deltas=1 ANNIvfPartition: uuid=..., minimum_nprobes=1, maximum_nprobes=None, deltas=1 - ScalarIndexQuery: query=[i > 10]@i_idx"; + ScalarIndexQuery: query=[i > 10]@i_idx" + }; assert_plan_equals( &dataset.dataset, |scan| { @@ -7178,9 +7761,10 @@ mod test { // Update scalar index but not vector index log::info!( - "Test case: Combined KNN/ANN with updated scalar index and outdated vector index" + "Test case: Combined KNN/ANN with updated scalar index and outdated vector index (filter has 97% selectivity, so no late materialization)" ); - let expected = "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance] + let expected = if data_storage_version == LanceFileVersion::Legacy { + "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance] Take: columns=\"_rowid, vec, _distance, (i), (s)\" CoalesceBatchesExec: target_batch_size=8192 FilterExec: _distance@... IS NOT NULL @@ -7188,18 +7772,40 @@ mod test { KNNVectorDistance: metric=l2 RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2 UnionExec - ProjectionExec: expr=[_distance@3 as _distance, _rowid@2 as _rowid, vec@0 as vec] + ProjectionExec: expr=[_distance@3 as _distance, _rowid@2 as _rowid, vec@1 as vec] FilterExec: _distance@... IS NOT NULL SortExec: TopK(fetch=11), expr=... KNNVectorDistance: metric=l2 - FilterExec: i@1 > 10 - LanceScan: uri=..., projection=[vec, i], row_id=true, row_addr=false, ordered=false, range=None + FilterExec: i@0 > 10 + LanceScan: uri=..., projection=[i, vec], row_id=true, row_addr=false, ordered=false, range=None Take: columns=\"_distance, _rowid, (vec)\" CoalesceBatchesExec: target_batch_size=8192 SortExec: TopK(fetch=11), expr=... ANNSubIndex: name=..., k=11, deltas=1 ANNIvfPartition: uuid=..., minimum_nprobes=1, maximum_nprobes=None, deltas=1 - ScalarIndexQuery: query=[i > 10]@i_idx"; + ScalarIndexQuery: query=[i > 10]@i_idx" + } else { + "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance] + Take: columns=\"_rowid, vec, _distance, (i), (s)\" + CoalesceBatchesExec: target_batch_size=8192 + FilterExec: _distance@... IS NOT NULL + SortExec: TopK(fetch=11), expr=... + KNNVectorDistance: metric=l2 + RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2 + UnionExec + ProjectionExec: expr=[_distance@3 as _distance, _rowid@2 as _rowid, vec@1 as vec] + FilterExec: _distance@... IS NOT NULL + SortExec: TopK(fetch=11), expr=... + KNNVectorDistance: metric=l2 + LanceRead: uri=..., projection=[i, vec], num_fragments=..., range_before=None, range_after=None, \ + row_id=true, row_addr=false, full_filter=i > Int32(10), refine_filter=i > Int32(10) + Take: columns=\"_distance, _rowid, (vec)\" + CoalesceBatchesExec: target_batch_size=8192 + SortExec: TopK(fetch=11), expr=... + ANNSubIndex: name=..., k=11, deltas=1 + ANNIvfPartition: uuid=..., minimum_nprobes=1, maximum_nprobes=None, deltas=1 + ScalarIndexQuery: query=[i > 10]@i_idx" + }; dataset.make_scalar_index().await?; assert_plan_equals( &dataset.dataset, @@ -7576,7 +8182,8 @@ mod test { FilterExec: _distance@2 IS NOT NULL SortExec: TopK(fetch=34), expr=[_distance@2 ASC NULLS LAST, _rowid@1 ASC NULLS LAST]... KNNVectorDistance: metric=l2 - LanceScan: uri=..., projection=[vec], row_id=true, row_addr=false, ordered=false, range=None + LanceRead: uri=..., projection=[vec], num_fragments=..., range_before=None, range_after=None, \ + row_id=true, row_addr=false, full_filter=--, refine_filter=-- Take: columns=\"_distance, _rowid, (vec)\" CoalesceBatchesExec: target_batch_size=8192 SortExec: TopK(fetch=34), expr=[_distance@0 ASC NULLS LAST, _rowid@1 ASC NULLS LAST]... From c8e842bffbb6ecd45eadcb9fda3f1f29e4d5d208 Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Mon, 10 Nov 2025 21:49:20 -0800 Subject: [PATCH 2/8] format --- rust/lance/src/dataset/scanner.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index 8ec685eda0..218a453d58 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -2990,7 +2990,8 @@ impl Scanner { // Include columns referenced by refine expression if let Some(ref refine_expr) = filter_plan.refine_expr { let refine_cols = Planner::column_names_in_expr(refine_expr); - scan_projection = scan_projection.union_columns(refine_cols, OnMissing::Error)?; + scan_projection = + scan_projection.union_columns(refine_cols, OnMissing::Error)?; } scan_projection.with_row_addr = From fa6d871c5df27756ce9e1c33aee538a650d9a518 Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Mon, 10 Nov 2025 22:01:35 -0800 Subject: [PATCH 3/8] clippy --- rust/lance/src/dataset/scanner.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index 218a453d58..7cb3b32955 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -3009,12 +3009,12 @@ impl Scanner { .await?; // Apply refine filter if present - let plan = if let Some(ref refine_expr) = filter_plan.refine_expr { + + if let Some(ref refine_expr) = filter_plan.refine_expr { Arc::new(LanceFilterExec::try_new(refine_expr.clone(), plan)?) } else { plan - }; - plan + } }; Ok(self.flat_knn(plan, q)?) From 7f080cbd6d151621ecf2f1f0c3b3ce8cf9ce094b Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Mon, 10 Nov 2025 22:03:01 -0800 Subject: [PATCH 4/8] format --- rust/lance/src/dataset/scanner.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index 7cb3b32955..05430efa0f 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -3009,7 +3009,7 @@ impl Scanner { .await?; // Apply refine filter if present - + if let Some(ref refine_expr) = filter_plan.refine_expr { Arc::new(LanceFilterExec::try_new(refine_expr.clone(), plan)?) } else { From c2e4d42845de3dba8a443a142facf0fba3997a4a Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Tue, 11 Nov 2025 05:45:03 -0800 Subject: [PATCH 5/8] update python test --- python/python/tests/test_scalar_index.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/python/tests/test_scalar_index.py b/python/python/tests/test_scalar_index.py index 32c787ad9f..033ff2f72d 100644 --- a/python/python/tests/test_scalar_index.py +++ b/python/python/tests/test_scalar_index.py @@ -412,7 +412,7 @@ def make_fts_search(ds): assert "ScalarIndexQuery" in plan assert "MaterializeIndex" not in plan assert "KNNVectorDistance" in plan - assert "LanceScan" in plan + assert "LanceRead" in plan assert make_vec_search(ds).to_table().num_rows == 12 plan = make_fts_search(ds).explain_plan() From 63773c0cd01a17be249fd7e961da978d901aa404 Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Tue, 11 Nov 2025 10:36:34 -0800 Subject: [PATCH 6/8] python bindings --- python/python/lance/dataset.py | 45 ++++++++++++++++++++++++++++++++++ python/src/dataset.rs | 9 ++++++- 2 files changed, 53 insertions(+), 1 deletion(-) diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index cb67d62d13..9109ac7e4f 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -690,6 +690,7 @@ def scanner( strict_batch_size: Optional[bool] = None, order_by: Optional[List[Union[ColumnOrdering, str]]] = None, disable_scoring_autoprojection: Optional[bool] = None, + late_materialize_selectivity_threshold: Optional[float] = None, ) -> LanceScanner: """Return a Scanner that can support various pushdowns. @@ -870,6 +871,7 @@ def setopt(opt, val): setopt(builder.strict_batch_size, strict_batch_size) setopt(builder.order_by, order_by) setopt(builder.disable_scoring_autoprojection, disable_scoring_autoprojection) + setopt(builder.late_materialize_selectivity_threshold, late_materialize_selectivity_threshold) # columns=None has a special meaning. we can't treat it as "user didn't specify" if self._default_scan_options is None: # No defaults, use user-provided, if any @@ -4207,6 +4209,7 @@ def __init__(self, ds: LanceDataset): self._strict_batch_size = False self._orderings = None self._disable_scoring_autoprojection = False + self._late_materialize_selectivity_threshold: Optional[float] = None def apply_defaults(self, default_opts: Dict[str, Any]) -> ScannerBuilder: for key, value in default_opts.items(): @@ -4588,6 +4591,47 @@ def disable_scoring_autoprojection(self, disable: bool = True) -> ScannerBuilder self._disable_scoring_autoprojection = disable return self + def late_materialize_selectivity_threshold( + self, threshold: float + ) -> ScannerBuilder: + """ + Set the selectivity threshold for late materialization in filtered KNN searches. + + When a filter is present in a KNN search, Lance first executes it to measure selectivity. + If the filter selects fewer than this percentage of rows, Lance uses late materialization + (scan scalars first, then take vectors for filtered rows only). If the filter selects + this percentage or more rows, Lance does a single scan with both filter and vector columns + to avoid the random access overhead of the take operation. + + The optimal value depends on your storage medium: + - **Object storage (S3, GCS, Azure)**: Use a low threshold like 0.005 (0.5%) since + random access is very expensive + - **Local SSD**: Can use a higher threshold like 0.05 (5%) since random access is cheaper + - **NVMe**: Can use even higher thresholds like 0.1 (10%) + + The default is 0.005 (0.5%), which is conservative for object storage. + + Parameters + ---------- + threshold : float + The selectivity threshold as a fraction (e.g., 0.005 for 0.5%) + + Returns + ------- + ScannerBuilder + Returns self for method chaining + """ + if not isinstance(threshold, (int, float)): + raise TypeError( + f"late_materialize_selectivity_threshold must be a number, got {type(threshold)}" + ) + if not (0.0 <= threshold <= 1.0): + raise ValueError( + f"late_materialize_selectivity_threshold must be between 0.0 and 1.0 (inclusive), got {threshold}" + ) + self._late_materialize_selectivity_threshold = float(threshold) + return self + def to_scanner(self) -> LanceScanner: scanner = self.ds._ds.scanner( self._columns, @@ -4616,6 +4660,7 @@ def to_scanner(self) -> LanceScanner: self._strict_batch_size, self._orderings, self._disable_scoring_autoprojection, + self._late_materialize_selectivity_threshold, ) return LanceScanner(scanner, self.ds) diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 06cd596589..bc8a87dc4e 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -742,7 +742,7 @@ impl Dataset { } #[allow(clippy::too_many_arguments)] - #[pyo3(signature=(columns=None, columns_with_transform=None, filter=None, prefilter=None, limit=None, offset=None, nearest=None, batch_size=None, io_buffer_size=None, batch_readahead=None, fragment_readahead=None, scan_in_order=None, fragments=None, with_row_id=None, with_row_address=None, use_stats=None, substrait_filter=None, fast_search=None, full_text_query=None, late_materialization=None, use_scalar_index=None, include_deleted_rows=None, scan_stats_callback=None, strict_batch_size=None, order_by=None, disable_scoring_autoprojection=None))] + #[pyo3(signature=(columns=None, columns_with_transform=None, filter=None, prefilter=None, limit=None, offset=None, nearest=None, batch_size=None, io_buffer_size=None, batch_readahead=None, fragment_readahead=None, scan_in_order=None, fragments=None, with_row_id=None, with_row_address=None, use_stats=None, substrait_filter=None, fast_search=None, full_text_query=None, late_materialization=None, use_scalar_index=None, include_deleted_rows=None, scan_stats_callback=None, strict_batch_size=None, order_by=None, disable_scoring_autoprojection=None, late_materialize_selectivity_threshold=None))] fn scanner( self_: PyRef<'_, Self>, columns: Option>, @@ -771,6 +771,7 @@ impl Dataset { strict_batch_size: Option, order_by: Option>>, disable_scoring_autoprojection: Option, + late_materialize_selectivity_threshold: Option, ) -> PyResult { let mut scanner: LanceScanner = self_.ds.scan(); @@ -958,6 +959,12 @@ impl Dataset { scanner.strict_batch_size(strict_batch_size); } + if let Some(threshold) = late_materialize_selectivity_threshold { + scanner + .late_materialize_selectivity_threshold(threshold) + .map_err(|err| PyValueError::new_err(err.to_string()))?; + } + if let Some(nearest) = nearest { let column = nearest .get_item("column")? From 825a666346982727f6c797f41c4028e9add1119e Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Tue, 11 Nov 2025 12:08:13 -0800 Subject: [PATCH 7/8] migrate logic to filtered read node --- rust/lance/src/dataset/scanner.rs | 666 +++++++++++++----------- rust/lance/src/io/exec/filtered_read.rs | 388 +++++++++++++- 2 files changed, 736 insertions(+), 318 deletions(-) diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index 05430efa0f..f363607879 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -51,7 +51,11 @@ use lance_core::error::LanceOptionExt; use lance_core::utils::address::RowAddress; use lance_core::utils::mask::{RowIdMask, RowIdTreeMap}; use lance_core::utils::tokio::get_num_compute_intensive_cpus; -use lance_core::{ROW_ADDR, ROW_ID, ROW_OFFSET}; +use lance_core::{ + ROW_ADDR, ROW_CREATED_AT_VERSION, ROW_ID, ROW_LAST_UPDATED_AT_VERSION, ROW_OFFSET, +}; +#[cfg(feature = "substrait")] +use lance_datafusion::exec::get_session_context; use lance_datafusion::exec::{ analyze_plan, execute_plan, LanceExecutionOptions, OneShotExec, StrictBatchSizeExec, }; @@ -79,7 +83,7 @@ use crate::dataset::row_offsets_to_row_addresses; use crate::dataset::utils::SchemaAdapter; use crate::index::vector::utils::{get_vector_dim, get_vector_type}; use crate::index::DatasetIndexInternalExt; -use crate::io::exec::filtered_read::{FilteredReadExec, FilteredReadOptions}; +use crate::io::exec::filtered_read::{AdaptiveColumnConfig, FilteredReadExec, FilteredReadOptions}; use crate::io::exec::fts::{BoostQueryExec, FlatMatchQueryExec, MatchQueryExec, PhraseQueryExec}; use crate::io::exec::knn::MultivectorScoringExec; use crate::io::exec::scalar_index::{MaterializeIndexExec, ScalarIndexExec}; @@ -298,8 +302,6 @@ impl LanceFilter { } #[cfg(feature = "substrait")] Self::Substrait(expr) => { - use lance_datafusion::exec::{get_session_context, LanceExecutionOptions}; - let ctx = get_session_context(&LanceExecutionOptions::default()); let state = ctx.state(); let schema = Arc::new(ArrowSchema::from(dataset_schema)); @@ -2001,8 +2003,6 @@ impl Scanner { // Check if a filter plan references version columns fn filter_references_version_columns(&self, filter_plan: &FilterPlan) -> bool { - use lance_core::{ROW_CREATED_AT_VERSION, ROW_LAST_UPDATED_AT_VERSION}; - if let Some(refine_expr) = &filter_plan.refine_expr { let column_names = Planner::column_names_in_expr(refine_expr); for col_name in column_names { @@ -2912,115 +2912,163 @@ impl Scanner { )); }; - // Sanity check let (vector_type, _) = get_vector_type(self.dataset.schema(), &q.column)?; - let column_id = self.dataset.schema().field_id(q.column.as_str())?; + let use_index = self.nearest.as_ref().map(|q| q.use_index).unwrap_or(false); let indices = if use_index { self.dataset.load_indices().await? } else { Arc::new(vec![]) }; - if let Some(index) = indices.iter().find(|i| i.fields.contains(&column_id)) { - log::trace!("index found for vector search"); - // There is an index built for the column. - // We will use the index. - if matches!(q.refine_factor, Some(0)) { - return Err(Error::invalid_input( - "Refine factor cannot be zero".to_string(), - location!(), - )); - } - // Find all deltas with the same index name. - let deltas = self.dataset.load_indices_by_name(&index.name).await?; - let ann_node = match vector_type { - DataType::FixedSizeList(_, _) => self.ann(q, &deltas, filter_plan).await?, - DataType::List(_) => self.multivec_ann(q, &deltas, filter_plan).await?, - _ => unreachable!(), - }; + match indices.iter().find(|i| i.fields.contains(&column_id)) { + Some(index) => { + log::trace!("index found for vector search"); + self.vector_search_with_vector_index(q, index, vector_type, filter_plan) + .await + } + None => self.vector_search_flat(q, filter_plan).await, + } + } - let mut knn_node = if q.refine_factor.is_some() { - let vector_projection = self - .dataset - .empty_projection() - .union_column(&q.column, OnMissing::Error) - .unwrap(); - let knn_node_with_vector = self.take(ann_node, vector_projection)?; - // TODO: now we just open an index to get its metric type. - let idx = self - .dataset - .open_vector_index( - q.column.as_str(), - &index.uuid.to_string(), - &NoOpMetricsCollector, - ) - .await?; - let mut q = q.clone(); - q.metric_type = idx.metric_type(); - self.flat_knn(knn_node_with_vector, &q)? - } else { - ann_node - }; // vector, _distance, _rowid + /// Index-based vector search (ANN) + /// + /// Uses a vector index to perform approximate nearest neighbor search. + async fn vector_search_with_vector_index( + &self, + q: &Query, + index: &IndexMetadata, + vector_type: DataType, + filter_plan: &FilterPlan, + ) -> Result> { + if matches!(q.refine_factor, Some(0)) { + return Err(Error::invalid_input( + "Refine factor cannot be zero".to_string(), + location!(), + )); + } - if !self.fast_search { - knn_node = self.knn_combined(q, index, knn_node, filter_plan).await?; - } + // Find all deltas with the same index name + let deltas = self.dataset.load_indices_by_name(&index.name).await?; + let ann_plan = match vector_type { + DataType::FixedSizeList(_, _) => self.ann(q, &deltas, filter_plan).await?, + DataType::List(_) => self.multivec_ann(q, &deltas, filter_plan).await?, + _ => unreachable!(), + }; - Ok(knn_node) + let mut plan = if q.refine_factor.is_some() { + self.apply_vector_refinement(ann_plan, q, index).await? } else { - // No index found. use flat search. - let has_filter = filter_plan.full_expr.is_some() || filter_plan.refine_expr.is_some(); - let has_scalar_index = filter_plan.index_query.is_some(); + ann_plan + }; - let plan = if has_filter && !has_scalar_index { - // Use adaptive late materialization for filters without scalar indices. - self.adaptive_late_materialization(filter_plan, None, &q.column, false) - .await? - } else { - // Direct scan when no filter or when scalar index is present. - // Scalar indices use MaterializeIndex which already optimizes I/O. - let mut scan_projection = self - .dataset - .empty_projection() - .with_row_id() - .union_column(&q.column, OnMissing::Error)?; + if !self.fast_search { + plan = self.knn_combined(q, index, plan, filter_plan).await?; + } - // Include columns referenced by refine expression - if let Some(ref refine_expr) = filter_plan.refine_expr { - let refine_cols = Planner::column_names_in_expr(refine_expr); - scan_projection = - scan_projection.union_columns(refine_cols, OnMissing::Error)?; - } + Ok(plan) + } - scan_projection.with_row_addr = - self.projection_plan.physical_projection.with_row_addr; + /// Flat (non-indexed) vector search + /// + /// Performs brute-force KNN search by scanning all data and computing distances. + async fn vector_search_flat( + &self, + q: &Query, + filter_plan: &FilterPlan, + ) -> Result> { + let has_filter = filter_plan.full_expr.is_some() || filter_plan.refine_expr.is_some(); + let has_scalar_index = filter_plan.index_query.is_some(); - let PlannedFilteredScan { plan, .. } = self - .filtered_read( - filter_plan, - scan_projection, - /*include_deleted_rows=*/ true, - None, - None, - /*is_prefilter= */ true, - ) - .await?; + let plan = if has_filter && !has_scalar_index { + // Use adaptive late materialization for filters without scalar indices + self.adaptive_column_scan(filter_plan, None, &q.column, false) + .await? + } else { + // Direct scan when no filter or when scalar index is present + self.build_direct_vector_scan(q, filter_plan, None, /*include_deleted_rows=*/ true) + .await? + }; - // Apply refine filter if present + self.flat_knn(plan, q) + } - if let Some(ref refine_expr) = filter_plan.refine_expr { - Arc::new(LanceFilterExec::try_new(refine_expr.clone(), plan)?) - } else { - plan - } - }; + /// Build a direct scan plan for vector search + /// + /// Performs a direct filtered read when no filter is present or when scalar indices are used. + /// Scalar indices use MaterializeIndex which already optimizes I/O. + async fn build_direct_vector_scan( + &self, + q: &Query, + filter_plan: &FilterPlan, + fragments: Option>>, + include_deleted_rows: bool, + ) -> Result> { + let mut scan_projection = self + .dataset + .empty_projection() + .with_row_id() + .union_column(&q.column, OnMissing::Error)?; - Ok(self.flat_knn(plan, q)?) + // Include columns referenced by refine expression + if let Some(ref refine_expr) = filter_plan.refine_expr { + let refine_cols = Planner::column_names_in_expr(refine_expr); + scan_projection = scan_projection.union_columns(refine_cols, OnMissing::Error)?; + } + + scan_projection.with_row_addr = self.projection_plan.physical_projection.with_row_addr; + + let PlannedFilteredScan { plan, .. } = self + .filtered_read( + filter_plan, + scan_projection, + include_deleted_rows, + fragments, + None, + /*is_prefilter=*/ true, + ) + .await?; + + // Apply refine filter if present + if let Some(ref refine_expr) = filter_plan.refine_expr { + Ok(Arc::new(LanceFilterExec::try_new( + refine_expr.clone(), + plan, + )?)) + } else { + Ok(plan) } } + /// Apply refinement to ANN results by fetching vectors and recomputing distances + async fn apply_vector_refinement( + &self, + ann_node: Arc, + q: &Query, + index: &IndexMetadata, + ) -> Result> { + let vector_projection = self + .dataset + .empty_projection() + .union_column(&q.column, OnMissing::Error)?; + let knn_node_with_vector = self.take(ann_node, vector_projection)?; + + // Open index to get its metric type + let idx = self + .dataset + .open_vector_index( + q.column.as_str(), + &index.uuid.to_string(), + &NoOpMetricsCollector, + ) + .await?; + + let mut q = q.clone(); + q.metric_type = idx.metric_type(); + self.flat_knn(knn_node_with_vector, &q) + } + /// Combine ANN results with KNN results for data appended after index creation async fn knn_combined( &self, @@ -3068,10 +3116,11 @@ impl Scanner { let has_filter = unindexed_filter_plan.full_expr.is_some() || unindexed_filter_plan.refine_expr.is_some(); + let has_scalar_index = unindexed_filter_plan.index_query.is_some(); - let plan = if has_filter { - // Adaptive late materialization when there's a filter - self.adaptive_late_materialization( + let plan = if has_filter && !has_scalar_index { + // Adaptive late materialization when there's a filter without scalar index + self.adaptive_column_scan( &unindexed_filter_plan, Some(Arc::new(unindexed_fragments.clone())), &q.column, @@ -3079,28 +3128,14 @@ impl Scanner { ) .await? } else { - // No filter: read vectors directly in a single scan - let mut scan_projection = self - .dataset - .empty_projection() - .with_row_id() - .union_column(&q.column, OnMissing::Error)?; - - scan_projection.with_row_addr = - self.projection_plan.physical_projection.with_row_addr; - - let PlannedFilteredScan { plan, .. } = self - .filtered_read( - &unindexed_filter_plan, - scan_projection, - /*include_deleted_rows=*/ false, - Some(Arc::new(unindexed_fragments.clone())), - None, - /*is_prefilter= */ true, - ) - .await?; - - plan + // Direct scan when no filter or scalar index is present + self.build_direct_vector_scan( + &q, + &unindexed_filter_plan, + Some(Arc::new(unindexed_fragments.clone())), + /*include_deleted_rows=*/ false, + ) + .await? }; let topk_appended = self.flat_knn(plan, &q)?; @@ -3739,30 +3774,30 @@ impl Scanner { async fn full_scan_with_filter( &self, filter_plan: &FilterPlan, - fragments: Option>>, - vector_column: &str, - filter_columns: Vec, + frags: Option>>, + expensive_col: &str, + filter_cols: Vec, skip_recheck: bool, ) -> Result> { // Deduplicate columns to avoid issues if vector_column is in filter_columns - let mut columns_set: BTreeSet = filter_columns.into_iter().collect(); - columns_set.insert(vector_column.to_string()); - let columns_vec: Vec = columns_set.into_iter().collect(); + let mut cols: BTreeSet = filter_cols.into_iter().collect(); + cols.insert(expensive_col.to_string()); + let cols: Vec = cols.into_iter().collect(); - let mut full_scan_projection = self + let mut projection = self .dataset .empty_projection() .with_row_id() - .union_columns(&columns_vec, OnMissing::Error)?; + .union_columns(&cols, OnMissing::Error)?; - full_scan_projection.with_row_addr = self.projection_plan.physical_projection.with_row_addr; + projection.with_row_addr = self.projection_plan.physical_projection.with_row_addr; - let PlannedFilteredScan { mut plan, .. } = self + let PlannedFilteredScan { plan, .. } = self .filtered_read( filter_plan, - full_scan_projection, + projection, /*include_deleted_rows=*/ false, - fragments, + frags, None, /*is_prefilter= */ true, ) @@ -3770,7 +3805,10 @@ impl Scanner { if !skip_recheck { if let Some(refine_expr) = &filter_plan.refine_expr { - plan = Arc::new(LanceFilterExec::try_new(refine_expr.clone(), plan)?); + return Ok(Arc::new(LanceFilterExec::try_new( + refine_expr.clone(), + plan, + )?)); } } @@ -3800,192 +3838,88 @@ impl Scanner { /// /// This avoids expensive random access for non-selective filters while benefiting /// from late materialization for selective ones. - async fn adaptive_late_materialization( + async fn adaptive_column_scan( &self, filter_plan: &FilterPlan, - fragments: Option>>, - vector_column: &str, + frags: Option>>, + take_column: &str, skip_recheck: bool, ) -> Result> { - let mut filter_columns_set: BTreeSet = BTreeSet::new(); - if let Some(refine_expr) = filter_plan.refine_expr.as_ref() { - filter_columns_set.extend(Planner::column_names_in_expr(refine_expr)); - } - if let Some(full_expr) = filter_plan.full_expr.as_ref() { - filter_columns_set.extend(Planner::column_names_in_expr(full_expr)); - } - - // If filter references the vector column, skip adaptive late materialization - // to avoid loading vectors twice (once for filter, once for distance calc) - if filter_columns_set.contains(vector_column) { - let mut filter_columns: Vec = filter_columns_set.into_iter().collect(); - filter_columns.sort(); + // FilteredRead doesn't support v1/legacy files, so fall back for legacy datasets + if self.dataset.is_legacy_storage() { + let mut filter_cols: BTreeSet = BTreeSet::new(); + if let Some(refine_expr) = filter_plan.refine_expr.as_ref() { + filter_cols.extend(Planner::column_names_in_expr(refine_expr)); + } + if let Some(full_expr) = filter_plan.full_expr.as_ref() { + filter_cols.extend(Planner::column_names_in_expr(full_expr)); + } + let mut filter_cols: Vec = filter_cols.into_iter().collect(); + filter_cols.sort(); return self - .full_scan_with_filter( - filter_plan, - fragments, - vector_column, - filter_columns, - skip_recheck, - ) + .full_scan_with_filter(filter_plan, frags, take_column, filter_cols, skip_recheck) .await; } - let mut filter_columns: Vec = filter_columns_set.into_iter().collect(); - filter_columns.sort(); - - let mut scalar_scan_projection = self + // Build full projection (filter columns + vector column) + let mut filter_cols: BTreeSet = BTreeSet::new(); + if let Some(refine_expr) = filter_plan.refine_expr.as_ref() { + filter_cols.extend(Planner::column_names_in_expr(refine_expr)); + } + if let Some(full_expr) = filter_plan.full_expr.as_ref() { + filter_cols.extend(Planner::column_names_in_expr(full_expr)); + } + let mut filter_cols: Vec = filter_cols.into_iter().collect(); + filter_cols.sort(); + let mut full_projection = self .dataset .empty_projection() .with_row_id() - .union_columns(&filter_columns, OnMissing::Error)?; - - scalar_scan_projection.with_row_addr = - self.projection_plan.physical_projection.with_row_addr; + .union_columns(&filter_cols, OnMissing::Error)? + .union_column(take_column, OnMissing::Error)?; + full_projection.with_row_addr = self.projection_plan.physical_projection.with_row_addr; - let PlannedFilteredScan { mut plan, .. } = self - .filtered_read( - filter_plan, - scalar_scan_projection, - /*include_deleted_rows=*/ false, - fragments.clone(), - None, - /*is_prefilter= */ true, - ) - .await?; - - if !skip_recheck { - if let Some(refine_expr) = &filter_plan.refine_expr { - plan = Arc::new(LanceFilterExec::try_new(refine_expr.clone(), plan)?); - } - } - - let total_count = self.get_total_row_count(fragments.as_ref()); - - // If we can't determine total count, fall back to full scan - // (conservative for object storage) - if total_count == 0 { - return self - .full_scan_with_filter( - filter_plan, - fragments, - vector_column, - filter_columns, - skip_recheck, - ) - .await; - } + // Use new_filtered_read but add adaptive config let threshold = self .late_materialize_selectivity_threshold .unwrap_or(LATE_MATERIALIZE_SELECTIVITY_THRESHOLD); - let (row_id_batch, filtered_count) = - self.collect_row_ids(plan, total_count, threshold).await?; + let total_count = self.get_total_row_count(frags.as_ref()); - let result = if let Some(row_id_batch) = row_id_batch { - let selectivity = filtered_count as f64 / total_count as f64; - - if let Some(callback) = &self.scan_stats_callback { - let stats = ExecutionSummaryCounts::with_counts([ - ("late_mat_selected_rows", filtered_count), - ("late_mat_total_rows", total_count), - ("late_mat_selectivity_pct", (selectivity * 100.0) as usize), - ("late_mat_strategy", 1), // 1 = late_materialized - ]); - callback(&stats); - } - - let row_id_plan = Arc::new(OneShotExec::from_batch(row_id_batch)); - let vector_projection = self - .dataset - .empty_projection() - .union_column(vector_column, OnMissing::Error)?; - self.take(row_id_plan, vector_projection)? - } else { - // Threshold exceeded: use full scan, don't report selectivity stats - if let Some(callback) = &self.scan_stats_callback { - let stats = ExecutionSummaryCounts::with_counts([ - ("late_mat_total_rows", total_count), - ("late_mat_strategy", 0), // 0 = full_scan - ("late_mat_threshold_exceeded", 1), - ]); - callback(&stats); - } - - self.full_scan_with_filter( + let mut plan = self + .new_filtered_read( filter_plan, - fragments, - vector_column, - filter_columns, - skip_recheck, + full_projection, + /*make_deletions_null=*/ false, + frags.clone(), + /*scan_range=*/ None, ) - .await? - }; + .await?; - Ok(result) - } + // Unwrap FilteredReadExec to add adaptive config + if let Some(filtered_exec) = plan.as_any().downcast_ref::() { + let mut opts = filtered_exec.options().clone(); + opts.adaptive_expensive_column = Some(AdaptiveColumnConfig { + expensive_column: take_column.to_string(), + threshold, + total_row_count: total_count, + }); - /// Collects row IDs from a plan and returns them as a batch along with the count. - /// - /// Returns (None, 0) if the filtered count reaches or exceeds the selectivity threshold. - /// The count of 0 is returned because we short-circuit before counting all rows; - /// it does not represent the actual filtered count, which is unknown. - async fn collect_row_ids( - &self, - plan: Arc, - total_count: usize, - threshold: f64, - ) -> Result<(Option, usize)> { - use arrow_array::UInt64Array; + plan = Arc::new(FilteredReadExec::try_new( + filtered_exec.dataset().clone(), + opts, + filtered_exec.index_input().cloned(), + )?); + } - let mut stream = execute_plan( - plan, - LanceExecutionOptions { - batch_size: self.batch_size, - ..Default::default() - }, - )?; - - let threshold_count = ((total_count as f64) * threshold).ceil() as usize; - let mut row_ids = Vec::new(); - let mut filtered_count = 0; - while let Some(batch) = stream.next().await { - let batch = batch?; - let batch_row_count = batch.num_rows(); - - // Short-circuit: return 0 since we don't have accurate count - if filtered_count + batch_row_count >= threshold_count { - return Ok((None, 0)); + // Apply refine filter if needed + if !skip_recheck { + if let Some(refine_expr) = &filter_plan.refine_expr { + plan = Arc::new(LanceFilterExec::try_new(refine_expr.clone(), plan)?); } - - filtered_count += batch_row_count; - let rowid_col = batch - .column_by_name(ROW_ID) - .ok_or_else(|| Error::Internal { - message: "Expected _rowid column in batch".to_string(), - location: location!(), - })?; - let rowid_array = rowid_col - .as_any() - .downcast_ref::() - .ok_or_else(|| Error::Internal { - message: "_rowid column is not UInt64Array".to_string(), - location: location!(), - })?; - - row_ids.extend(rowid_array.values().iter().copied()); } - // We stayed below threshold, return the collected row IDs for late materialization - let row_id_array = Arc::new(UInt64Array::from(row_ids)); - let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( - ROW_ID, - DataType::UInt64, - false, - )])); - let batch = RecordBatch::try_new(schema, vec![row_id_array])?; - - Ok((Some(batch), filtered_count)) + Ok(plan) } /// Global offset-limit of the result of the input plan @@ -4434,11 +4368,10 @@ mod test { let dataset = &test_ds.dataset; // Use a very selective filter "i < 2" which matches only 2/400 rows (0.5% selectivity) - // This selectivity is exactly at the default threshold (0.5%), so it will NOT trigger late materialization. let q = Float32Array::from_iter_values((0..32).map(|v| v as f32)); - // Test 1: With default threshold (0.005 = 0.5%), this filter is right at the boundary - // Since selectivity (2/400 = 0.005) == threshold (0.005), it should NOT use late materialization (>= threshold) + // Test 1: With default threshold (0.005 = 0.5%) + // Adaptive late materialization is enabled internally in FilteredReadExec let plan_str_default = dataset .scan() .nearest("vec", &q, 5) @@ -4450,8 +4383,7 @@ mod test { .await .unwrap(); - // Test 2: With very high threshold (0.99 = 99%), even this selective filter - // should trigger late materialization since 0.5% < 99% + // Test 2: With very high threshold (0.99 = 99%) let plan_str_high = dataset .scan() .late_materialize_selectivity_threshold(0.99) @@ -4465,8 +4397,7 @@ mod test { .await .unwrap(); - // Test 3: With zero threshold (0.0), late materialization should never be used - // since no selectivity is < 0% + // Test 3: With zero threshold (0.0) let plan_str_zero = dataset .scan() .late_materialize_selectivity_threshold(0.0) @@ -4480,15 +4411,25 @@ mod test { .await .unwrap(); - assert_eq!(plan_str_default, plan_str_zero, - "Default threshold (0.5%) and zero threshold should both skip late materialization when selectivity == threshold"); + // All three should successfully build plans with LanceRead + // The adaptive behavior is internal to FilteredReadExec, not visible in the plan + assert!( + plan_str_default.contains("LanceRead"), + "Default threshold should build plan with LanceRead, but plan was:\n{}", + plan_str_default + ); - assert_ne!(plan_str_default, plan_str_high, - "High threshold (99%) should produce different plan than default, using late materialization"); + assert!( + plan_str_high.contains("LanceRead"), + "High threshold should build plan with LanceRead, but plan was:\n{}", + plan_str_high + ); - assert!(plan_str_high.contains("OneShotStream"), - "High threshold should use late materialization with OneShotExec/OneShotStream, but plan was:\n{}", - plan_str_high); + assert!( + plan_str_zero.contains("LanceRead"), + "Zero threshold should build plan with LanceRead, but plan was:\n{}", + plan_str_zero + ); } /// Verifies that filters referencing the vector column skip late materialization @@ -4519,12 +4460,12 @@ mod test { // Should NOT use late materialization even with high threshold assert!( - !plan_vector_filter.contains("OneShotStream"), + !plan_vector_filter.contains("AdaptiveColumnScan"), "Should skip late materialization when filter references vector column" ); } - /// Verifies threshold short-circuiting: low threshold → full scan, high threshold → late mat + /// Verifies threshold configuration: both plans use AdaptiveColumnScanExec with different thresholds #[tokio::test] async fn test_late_mat_threshold_short_circuit() { use super::test_dataset::TestVectorDataset; @@ -4536,7 +4477,7 @@ mod test { let dataset = &test_ds.dataset; let q = Float32Array::from_iter_values((0..32).map(|v| v as f32)); - // 1% threshold with 50% selectivity → exceeds threshold → full scan + // 1% threshold with 50% selectivity → FilteredReadExec will choose full scan at runtime let plan_low_threshold = dataset .scan() .late_materialize_selectivity_threshold(0.01) @@ -4550,7 +4491,7 @@ mod test { .await .unwrap(); - // 99% threshold with 50% selectivity → doesn't exceed → late materialization + // 99% threshold with 50% selectivity → FilteredReadExec will choose late materialization at runtime let plan_high_threshold = dataset .scan() .late_materialize_selectivity_threshold(0.99) @@ -4564,9 +4505,15 @@ mod test { .await .unwrap(); - assert_ne!(plan_low_threshold, plan_high_threshold); - assert!(!plan_low_threshold.contains("OneShotStream")); - assert!(plan_high_threshold.contains("OneShotStream")); + // Both should successfully build plans with LanceRead + assert!(plan_low_threshold.contains("LanceRead")); + assert!(plan_high_threshold.contains("LanceRead")); + + // The plans should show adaptive configuration + assert!(plan_low_threshold.contains("adaptive_column=vec")); + assert!(plan_high_threshold.contains("adaptive_column=vec")); + assert!(plan_low_threshold.contains("threshold=0.01")); + assert!(plan_high_threshold.contains("threshold=0.99")); } #[cfg(not(windows))] @@ -7619,7 +7566,7 @@ mod test { SortExec: TopK(fetch=5), expr=... KNNVectorDistance: metric=l2 LanceRead: uri=..., projection=[i, vec], num_fragments=..., range_before=None, range_after=None, \ - row_id=true, row_addr=false, full_filter=i > Int32(10), refine_filter=i > Int32(10) + row_id=true, row_addr=false, full_filter=i > Int32(10), refine_filter=i > Int32(10), adaptive_column=vec, threshold=... Take: columns=\"_distance, _rowid, (vec)\" CoalesceBatchesExec: target_batch_size=8192 SortExec: TopK(fetch=5), expr=... @@ -7740,7 +7687,7 @@ mod test { SortExec: TopK(fetch=8), expr=... KNNVectorDistance: metric=l2 LanceRead: uri=..., projection=[i, vec], num_fragments=..., range_before=None, range_after=None, \ - row_id=true, row_addr=false, full_filter=i > Int32(10), refine_filter=i > Int32(10) + row_id=true, row_addr=false, full_filter=i > Int32(10), refine_filter=i > Int32(10), adaptive_column=vec, threshold=... Take: columns=\"_distance, _rowid, (vec)\" CoalesceBatchesExec: target_batch_size=8192 SortExec: TopK(fetch=8), expr=... @@ -7799,7 +7746,7 @@ mod test { SortExec: TopK(fetch=11), expr=... KNNVectorDistance: metric=l2 LanceRead: uri=..., projection=[i, vec], num_fragments=..., range_before=None, range_after=None, \ - row_id=true, row_addr=false, full_filter=i > Int32(10), refine_filter=i > Int32(10) + row_id=true, row_addr=false, full_filter=i > Int32(10), refine_filter=i > Int32(10), adaptive_column=vec, threshold=... Take: columns=\"_distance, _rowid, (vec)\" CoalesceBatchesExec: target_batch_size=8192 SortExec: TopK(fetch=11), expr=... @@ -9007,6 +8954,95 @@ mod test { } } + /// Test vector search with filter on nested struct field to ensure column name handling is robust + #[tokio::test] + async fn test_vector_search_with_nested_struct_filter() { + use lance_arrow::FixedSizeListArrayExt; + + // Create minimal dataset with nested struct + vector + let category = StringArray::from(vec!["urgent", "normal", "urgent", "low"]); + let priority = Int32Array::from(vec![10, 50, 20, 80]); + let metadata = StructArray::from(vec![ + ( + Arc::new(ArrowField::new("category", DataType::Utf8, false)), + Arc::new(category) as ArrayRef, + ), + ( + Arc::new(ArrowField::new("priority", DataType::Int32, false)), + Arc::new(priority) as ArrayRef, + ), + ]); + + let vectors = FixedSizeListArray::try_new_from_values( + Float32Array::from_iter_values((0..32).map(|v| v as f32)), + 8, + ) + .unwrap(); + + let schema = Arc::new(ArrowSchema::new(vec![ + ArrowField::new("metadata", metadata.data_type().clone(), false), + ArrowField::new("vector", vectors.data_type().clone(), false), + ])); + + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(metadata), Arc::new(vectors)], + ) + .unwrap(); + + let tmp_dir = TempStrDir::default(); + let reader = RecordBatchIterator::new(vec![Ok(batch)].into_iter(), schema.clone()); + let dataset = Dataset::write(reader, &tmp_dir, None).await.unwrap(); + + let query_vec = Float32Array::from_iter_values((0..8).map(|v| v as f32)); + + // Test 1: Filter on nested string field + let results = dataset + .scan() + .nearest("vector", &query_vec, 10) + .unwrap() + .filter("metadata.category = 'urgent'") + .unwrap() + .prefilter(true) + .try_into_batch() + .await + .unwrap(); + + // Should return 2 rows (indices 0 and 2 have category='urgent') + assert_eq!(results.num_rows(), 2); + let metadata_col = results.column_by_name("metadata").unwrap().as_struct(); + let category_col = metadata_col + .column_by_name("category") + .unwrap() + .as_string::(); + for i in 0..results.num_rows() { + assert_eq!(category_col.value(i), "urgent"); + } + + // Test 2: Filter on nested int field + let results2 = dataset + .scan() + .nearest("vector", &query_vec, 10) + .unwrap() + .filter("metadata.priority >= 50") + .unwrap() + .prefilter(true) + .try_into_batch() + .await + .unwrap(); + + // Should return 2 rows (indices 1 and 3 have priority >= 50) + assert_eq!(results2.num_rows(), 2); + let metadata_col2 = results2.column_by_name("metadata").unwrap().as_struct(); + let priority_col = metadata_col2 + .column_by_name("priority") + .unwrap() + .as_primitive::(); + for i in 0..results2.num_rows() { + assert!(priority_col.value(i) >= 50); + } + } + #[test_log::test(test)] fn test_scan_finishes_all_tasks() { // Need to use multi-threaded runtime otherwise tasks don't run unless someone is polling somewhere diff --git a/rust/lance/src/io/exec/filtered_read.rs b/rust/lance/src/io/exec/filtered_read.rs index e1dd87195a..b8af5d29e1 100644 --- a/rust/lance/src/io/exec/filtered_read.rs +++ b/rust/lance/src/io/exec/filtered_read.rs @@ -34,7 +34,7 @@ use lance_core::utils::deletion::DeletionVector; use lance_core::utils::futures::FinallyStreamExt; use lance_core::utils::mask::RowIdMask; use lance_core::utils::tokio::get_num_compute_intensive_cpus; -use lance_core::{datatypes::Projection, Error, Result}; +use lance_core::{datatypes::Projection, Error, Result, ROW_ID}; use lance_datafusion::planner::Planner; use lance_datafusion::utils::{ ExecutionPlanMetricsSetExt, FRAGMENTS_SCANNED_METRIC, RANGES_SCANNED_METRIC, @@ -56,6 +56,7 @@ use crate::dataset::scanner::{ get_default_batch_size, BATCH_SIZE_FALLBACK, DEFAULT_FRAGMENT_READAHEAD, }; use crate::Dataset; +use crate::io::exec::TakeExec; use super::utils::IoMetrics; @@ -199,6 +200,10 @@ pub struct FilteredReadGlobalMetrics { ranges_scanned: Count, rows_scanned: Count, io_metrics: IoMetrics, + // Adaptive late materialization metrics + adaptive_used_take: Count, + adaptive_used_full_scan: Count, + adaptive_pass1_rows: Count, } impl FilteredReadGlobalMetrics { @@ -208,6 +213,9 @@ impl FilteredReadGlobalMetrics { ranges_scanned: metrics.new_count(RANGES_SCANNED_METRIC, 0), rows_scanned: metrics.new_count(ROWS_SCANNED_METRIC, 0), io_metrics: IoMetrics::new(metrics, 0), + adaptive_used_take: metrics.new_count("adaptive_used_take", 0), + adaptive_used_full_scan: metrics.new_count("adaptive_used_full_scan", 0), + adaptive_pass1_rows: metrics.new_count("adaptive_pass1_rows", 0), } } } @@ -1185,6 +1193,17 @@ impl FilteredReadStream { } } +/// Configuration for adaptive expensive column handling +#[derive(Debug, Clone)] +pub struct AdaptiveColumnConfig { + /// Column to handle adaptively (will be excluded from initial scan) + pub expensive_column: String, + /// Selectivity threshold (0.0 to 1.0) + pub threshold: f64, + /// Total row count for selectivity calculation + pub total_row_count: usize, +} + /// Options for a filtered read. #[derive(Debug, Clone)] pub struct FilteredReadOptions { @@ -1213,6 +1232,8 @@ pub struct FilteredReadOptions { pub threading_mode: FilteredReadThreadingMode, /// The size of the I/O buffer to use for the scan pub io_buffer_size_bytes: Option, + /// Enable adaptive expensive column handling + pub adaptive_expensive_column: Option, } impl FilteredReadOptions { @@ -1243,6 +1264,7 @@ impl FilteredReadOptions { threading_mode: FilteredReadThreadingMode::OnePartitionMultipleThreads( get_num_compute_intensive_cpus(), ), + adaptive_expensive_column: None, } } @@ -1390,6 +1412,12 @@ impl FilteredReadOptions { self.io_buffer_size_bytes = Some(io_buffer_size); self } + + /// Enable adaptive expensive column handling + pub fn with_adaptive_column(mut self, config: AdaptiveColumnConfig) -> Self { + self.adaptive_expensive_column = Some(config); + self + } } /// A plan node that reads a dataset, applying an optional filter and projection. @@ -1494,6 +1522,11 @@ impl FilteredReadExec { partition: usize, context: Arc, ) -> SendableRecordBatchStream { + // Check if adaptive mode is enabled + if self.options.adaptive_expensive_column.is_some() { + return self.obtain_adaptive_stream(partition, context); + } + // There are two subtleties here: // // First, we need to defer execution until first polled (hence the once/flatten) @@ -1538,6 +1571,262 @@ impl FilteredReadExec { Box::pin(RecordBatchStreamAdapter::new(self.schema(), stream)) } + /// Adaptive stream that decides at runtime whether to use take or full scan + fn obtain_adaptive_stream( + &self, + partition: usize, + context: Arc, + ) -> SendableRecordBatchStream { + use futures::StreamExt; + use arrow_array::UInt64Array; + use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema}; + use lance_datafusion::exec::OneShotExec; + + let dataset = self.dataset.clone(); + let options = self.options.clone(); + let metrics = self.metrics.clone(); + let index_input = self.index_input.clone(); + let output_schema = self.schema(); + let output_schema_for_adapter = output_schema.clone(); + + let stream = futures::stream::once(async move { + let config = options.adaptive_expensive_column.as_ref().unwrap().clone(); + + // Check if adaptive materialization is applicable + // If not applicable, fall back to normal stream + + // 1. Check if filter uses the expensive column + // If it does, we'd need to load it anyway for filtering, so no benefit + let filter_columns: Vec = { + let mut cols = Vec::new(); + if let Some(ref full_filter) = options.full_filter { + cols.extend(lance_datafusion::planner::Planner::column_names_in_expr(full_filter)); + } + if let Some(ref refine_filter) = options.refine_filter { + cols.extend(lance_datafusion::planner::Planner::column_names_in_expr(refine_filter)); + } + cols + }; + + if filter_columns.contains(&config.expensive_column) { + // Fall back to normal stream - filter needs the expensive column + let normal_stream = FilteredReadStream::try_new( + dataset.clone(), options, &metrics, None + ).await?; + return DataFusionResult::Ok(normal_stream.get_stream(&metrics, partition)); + } + + // 2. Check if total_row_count is valid + if config.total_row_count == 0 { + let normal_stream = FilteredReadStream::try_new( + dataset.clone(), options, &metrics, None + ).await?; + return DataFusionResult::Ok(normal_stream.get_stream(&metrics, partition)); + } + + // PASS 1: Scan cheap columns (exclude expensive column) + // Build projection without the expensive column + let projection_schema = options.projection.to_bare_schema(); + let cheap_columns: Vec = projection_schema + .fields + .iter() + .map(|f| f.name.clone()) + .filter(|name| name != &config.expensive_column) + .collect(); + + let cheap_column_refs: Vec<&str> = cheap_columns.iter().map(|s| s.as_str()).collect(); + let cheap_projection = dataset + .empty_projection() + .union_columns(cheap_column_refs, OnMissing::Error)? + .with_row_id(); + + let mut cheap_options = options.clone(); + cheap_options.projection = cheap_projection; + cheap_options.adaptive_expensive_column = None; // Disable recursion + + // Evaluate index if present + let mut evaluated_index = None; + if let Some(index_input) = index_input { + let mut index_search = index_input.execute(partition, context.clone()) + .map_err(|e| Error::from(e))?; + let index_search_result = + index_search.next().await.ok_or_else(|| Error::Internal { + message: "Index search did not yield any results".to_string(), + location: location!(), + })? + .map_err(|e| Error::from(e))?; + evaluated_index = Some(Arc::new(EvaluatedIndex::try_from_arrow( + &index_search_result, + )?)); + } + + let cheap_stream = FilteredReadStream::try_new( + dataset.clone(), cheap_options, &metrics, evaluated_index.clone() + ).await?; + + let mut cheap_stream_boxed = cheap_stream.get_stream(&metrics, partition); + + // Collect row IDs with EARLY TERMINATION + let threshold_count = (config.total_row_count as f64 * config.threshold).ceil() as usize; + let mut row_ids = Vec::new(); + let mut cheap_batches = Vec::new(); + let mut filtered_count = 0; + + while let Some(batch_result) = cheap_stream_boxed.next().await { + let batch = batch_result.map_err(|e| Error::from(e))?; + + // EARLY STOP CHECK + if filtered_count + batch.num_rows() >= threshold_count { + // NOT SELECTIVE - switch to full scan + drop(cheap_stream_boxed); // Drop the cheap stream + + // Record metrics: full scan path taken + let global_metrics = FilteredReadGlobalMetrics::new(&metrics); + global_metrics.adaptive_used_full_scan.add(1); + global_metrics.adaptive_pass1_rows.add(filtered_count); + + let full_stream = FilteredReadStream::try_new( + dataset.clone(), options, &metrics, evaluated_index + ).await?; + + return DataFusionResult::Ok(full_stream.get_stream(&metrics, partition)); + } + + filtered_count += batch.num_rows(); + + // Extract row IDs + let rowid_col = batch + .column_by_name(ROW_ID) + .ok_or_else(|| Error::Internal { + message: format!("Expected {} column in batch", ROW_ID), + location: location!(), + })?; + let rowid_array = rowid_col + .as_any() + .downcast_ref::() + .ok_or_else(|| Error::Internal { + message: format!("{} column is not UInt64Array", ROW_ID), + location: location!(), + })?; + + row_ids.extend(rowid_array.values().iter().copied()); + cheap_batches.push(batch); + } + + // SELECTIVE - do Pass 2 (take expensive column) + + // Record metrics: take path taken + let global_metrics = FilteredReadGlobalMetrics::new(&metrics); + global_metrics.adaptive_used_take.add(1); + global_metrics.adaptive_pass1_rows.add(filtered_count); + + // Handle empty result case + if cheap_batches.is_empty() || row_ids.is_empty() { + // No rows matched the filter, return empty stream + let empty_batch = RecordBatch::new_empty(output_schema.clone()); + let stream = futures::stream::iter(vec![Ok(empty_batch)]); + let stream_adapter = RecordBatchStreamAdapter::new(output_schema.clone(), stream); + return Result::::Ok(Box::pin(stream_adapter) as SendableRecordBatchStream); + } + + let expensive_projection = dataset.empty_projection() + .union_column(&config.expensive_column, OnMissing::Error)?; + + // Create row ID batch for TakeExec + let row_id_array = Arc::new(UInt64Array::from(row_ids.clone())); + let row_id_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + ROW_ID, + DataType::UInt64, + false, + )])); + let row_id_batch = RecordBatch::try_new(row_id_schema, vec![row_id_array])?; + let row_id_plan = Arc::new(OneShotExec::from_batch(row_id_batch)); + + let take_exec = TakeExec::try_new( + dataset.clone(), + row_id_plan, + expensive_projection, + )?; + + let take_exec = take_exec.ok_or_else(|| Error::Internal { + message: "TakeExec returned None unexpectedly".to_string(), + location: location!(), + })?; + + let mut expensive_stream = take_exec.execute(partition, context) + .map_err(|e| Error::from(e))?; + + // Concatenate all cheap batches into one + use arrow_select::concat::concat_batches; + let cheap_schema = cheap_batches[0].schema(); + let cheap_batch_combined = concat_batches(&cheap_schema, &cheap_batches) + .map_err(|e| Error::Arrow { + message: format!("Failed to concatenate cheap batches: {}", e), + location: location!(), + })?; + + // Collect all expensive batches + let mut expensive_batches = Vec::new(); + while let Some(batch_result) = expensive_stream.next().await { + let batch = batch_result.map_err(|e| Error::from(e))?; + expensive_batches.push(batch); + } + + // Concatenate all expensive batches into one + if expensive_batches.is_empty() { + return Err(Error::Internal { + message: "TakeExec returned no batches".to_string(), + location: location!(), + }); + } + let expensive_schema = expensive_batches[0].schema(); + let expensive_batch_combined = concat_batches(&expensive_schema, &expensive_batches) + .map_err(|e| Error::Arrow { + message: format!("Failed to concatenate expensive batches: {}", e), + location: location!(), + })?; + + // Merge the two combined batches - build columns in the order of output_schema + let expensive_col_idx = expensive_batch_combined.schema().index_of(&config.expensive_column) + .map_err(|e| Error::Internal { + message: format!("Expected {} column in expensive batch: {}", config.expensive_column, e), + location: location!(), + })?; + + let mut columns = Vec::new(); + for field in output_schema.fields() { + let col_name = field.name(); + if col_name == &config.expensive_column { + // Take from expensive batch + columns.push(expensive_batch_combined.column(expensive_col_idx).clone()); + } else { + // Take from cheap batch + let cheap_col_idx = cheap_batch_combined.schema().index_of(col_name) + .map_err(|e| Error::Internal { + message: format!("Expected {} column in cheap batch: {}", col_name, e), + location: location!(), + })?; + columns.push(cheap_batch_combined.column(cheap_col_idx).clone()); + } + } + + let combined_batch: RecordBatch = RecordBatch::try_new( + output_schema.clone(), + columns, + ).map_err(|e: arrow_schema::ArrowError| Error::from(e))?; + + // Return stream with single combined batch + let stream = futures::stream::iter( + vec![Ok(combined_batch)] + ); + let stream_adapter = RecordBatchStreamAdapter::new(output_schema.clone(), stream); + Result::::Ok(Box::pin(stream_adapter) as SendableRecordBatchStream) + }) + .try_flatten(); + + Box::pin(RecordBatchStreamAdapter::new(output_schema_for_adapter, stream)) + } + pub fn dataset(&self) -> &Arc { &self.dataset } @@ -1562,11 +1851,22 @@ impl DisplayAs for FilteredReadExec { .map(|f| f.name.as_str()) .collect::>() .join(", "); + + let adaptive_info = if let Some(ref config) = self.options.adaptive_expensive_column { + format!( + ", adaptive_column={}, threshold={}", + config.expensive_column, + config.threshold + ) + } else { + String::new() + }; + match t { DisplayFormatType::Default | DisplayFormatType::Verbose => { write!( f, - "LanceRead: uri={}, projection=[{}], num_fragments={}, range_before={:?}, range_after={:?}, row_id={}, row_addr={}, full_filter={}, refine_filter={}", + "LanceRead: uri={}, projection=[{}], num_fragments={}, range_before={:?}, range_after={:?}, row_id={}, row_addr={}, full_filter={}, refine_filter={}{}", self.dataset.data_dir(), columns, self.options.fragments.as_ref().map(|f| f.len()).unwrap_or(self.dataset.fragments().len()), @@ -1576,10 +1876,11 @@ impl DisplayAs for FilteredReadExec { self.options.projection.with_row_addr, self.options.full_filter.as_ref().map(|i| i.to_string()).unwrap_or("--".to_string()), self.options.refine_filter.as_ref().map(|i| i.to_string()).unwrap_or("--".to_string()), + adaptive_info, ) } DisplayFormatType::TreeRender => { - write!(f, "LanceRead\nuri={}\nprojection=[{}]\nnum_fragments={}\nrange_before={:?}\nrange_after={:?}\nrow_id={}\nrow_addr={}\nfull_filter={}\nrefine_filter={}", + write!(f, "LanceRead\nuri={}\nprojection=[{}]\nnum_fragments={}\nrange_before={:?}\nrange_after={:?}\nrow_id={}\nrow_addr={}\nfull_filter={}\nrefine_filter={}{}", self.dataset.data_dir(), columns, self.options.fragments.as_ref().map(|f| f.len()).unwrap_or(self.dataset.fragments().len()), @@ -1589,6 +1890,7 @@ impl DisplayAs for FilteredReadExec { self.options.projection.with_row_addr, self.options.full_filter.as_ref().map(|i| i.to_string()).unwrap_or("true".to_string()), self.options.refine_filter.as_ref().map(|i| i.to_string()).unwrap_or("true".to_string()), + adaptive_info, ) } } @@ -3359,4 +3661,84 @@ mod tests { .unwrap_or(0); assert!(iops > 0, "Should have recorded IO operations"); } + + #[tokio::test] + async fn test_adaptive_metrics() { + // Test that adaptive late materialization metrics are recorded correctly + use super::AdaptiveColumnConfig; + + let fixture = TestFixture::new().await; + + // Test SELECTIVE case - should use take path + // Filter for fully_indexed < 10 matches only 10 rows out of 300 + let mut options = FilteredReadOptions::basic_full_read(&fixture.dataset); + options.adaptive_expensive_column = Some(AdaptiveColumnConfig { + expensive_column: "vector".to_string(), + threshold: 0.99, // Very high threshold, so it will use take + total_row_count: 300, + }); + + // Add a filter that matches only a few rows (10 out of 300 = 3.3%) + options.full_filter = Some(datafusion_expr::col("fully_indexed").lt(datafusion_expr::lit(10u32))); + + let filtered_read = Arc::new(FilteredReadExec::try_new(fixture.dataset.clone(), options, None).unwrap()); + + let _batches = filtered_read + .execute(0, Arc::new(TaskContext::default())) + .unwrap() + .try_collect::>() + .await + .unwrap(); + + let metrics = filtered_read.metrics().unwrap(); + let used_take = metrics + .sum_by_name("adaptive_used_take") + .map(|v| v.as_usize()) + .unwrap_or(0); + let used_full_scan = metrics + .sum_by_name("adaptive_used_full_scan") + .map(|v| v.as_usize()) + .unwrap_or(0); + let pass1_rows = metrics + .sum_by_name("adaptive_pass1_rows") + .map(|v| v.as_usize()) + .unwrap_or(0); + + assert_eq!(used_take, 1, "Should have used take path for selective filter"); + assert_eq!(used_full_scan, 0, "Should not have used full scan path"); + assert!(pass1_rows > 0 && pass1_rows <= 10, "Should have scanned ~10 rows in pass 1"); + + // Test NON-SELECTIVE case - should use full scan path + // Filter for fully_indexed < 290 matches 290 rows out of 300 + let mut options2 = FilteredReadOptions::basic_full_read(&fixture.dataset); + options2.adaptive_expensive_column = Some(AdaptiveColumnConfig { + expensive_column: "vector".to_string(), + threshold: 0.01, // Very low threshold (1%), so it will use full scan + total_row_count: 300, + }); + + // Add a filter that matches many rows (290 out of 300 = 96.7%) + options2.full_filter = Some(datafusion_expr::col("fully_indexed").lt(datafusion_expr::lit(290u32))); + + let filtered_read2 = Arc::new(FilteredReadExec::try_new(fixture.dataset.clone(), options2, None).unwrap()); + + let _batches2 = filtered_read2 + .execute(0, Arc::new(TaskContext::default())) + .unwrap() + .try_collect::>() + .await + .unwrap(); + + let metrics2 = filtered_read2.metrics().unwrap(); + let used_take2 = metrics2 + .sum_by_name("adaptive_used_take") + .map(|v| v.as_usize()) + .unwrap_or(0); + let used_full_scan2 = metrics2 + .sum_by_name("adaptive_used_full_scan") + .map(|v| v.as_usize()) + .unwrap_or(0); + assert_eq!(used_take2, 0, "Should not have used take path for non-selective filter"); + assert_eq!(used_full_scan2, 1, "Should have used full scan path"); + } } From f759623e28df58040c698e35d8a1767efe6ff24d Mon Sep 17 00:00:00 2001 From: Wyatt Alt Date: Tue, 11 Nov 2025 22:08:09 -0800 Subject: [PATCH 8/8] lint fix --- rust/lance/src/dataset/scanner.rs | 8 +- rust/lance/src/io/exec/filtered_read.rs | 154 +++++++++++++++--------- 2 files changed, 99 insertions(+), 63 deletions(-) diff --git a/rust/lance/src/dataset/scanner.rs b/rust/lance/src/dataset/scanner.rs index f363607879..b18c7ebff8 100644 --- a/rust/lance/src/dataset/scanner.rs +++ b/rust/lance/src/dataset/scanner.rs @@ -8984,11 +8984,9 @@ mod test { ArrowField::new("vector", vectors.data_type().clone(), false), ])); - let batch = RecordBatch::try_new( - schema.clone(), - vec![Arc::new(metadata), Arc::new(vectors)], - ) - .unwrap(); + let batch = + RecordBatch::try_new(schema.clone(), vec![Arc::new(metadata), Arc::new(vectors)]) + .unwrap(); let tmp_dir = TempStrDir::default(); let reader = RecordBatchIterator::new(vec![Ok(batch)].into_iter(), schema.clone()); diff --git a/rust/lance/src/io/exec/filtered_read.rs b/rust/lance/src/io/exec/filtered_read.rs index b8af5d29e1..d99dc7e613 100644 --- a/rust/lance/src/io/exec/filtered_read.rs +++ b/rust/lance/src/io/exec/filtered_read.rs @@ -55,8 +55,8 @@ use crate::dataset::rowids::load_row_id_sequence; use crate::dataset::scanner::{ get_default_batch_size, BATCH_SIZE_FALLBACK, DEFAULT_FRAGMENT_READAHEAD, }; -use crate::Dataset; use crate::io::exec::TakeExec; +use crate::Dataset; use super::utils::IoMetrics; @@ -1577,9 +1577,9 @@ impl FilteredReadExec { partition: usize, context: Arc, ) -> SendableRecordBatchStream { - use futures::StreamExt; use arrow_array::UInt64Array; use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema}; + use futures::StreamExt; use lance_datafusion::exec::OneShotExec; let dataset = self.dataset.clone(); @@ -1600,27 +1600,29 @@ impl FilteredReadExec { let filter_columns: Vec = { let mut cols = Vec::new(); if let Some(ref full_filter) = options.full_filter { - cols.extend(lance_datafusion::planner::Planner::column_names_in_expr(full_filter)); + cols.extend(lance_datafusion::planner::Planner::column_names_in_expr( + full_filter, + )); } if let Some(ref refine_filter) = options.refine_filter { - cols.extend(lance_datafusion::planner::Planner::column_names_in_expr(refine_filter)); + cols.extend(lance_datafusion::planner::Planner::column_names_in_expr( + refine_filter, + )); } cols }; if filter_columns.contains(&config.expensive_column) { // Fall back to normal stream - filter needs the expensive column - let normal_stream = FilteredReadStream::try_new( - dataset.clone(), options, &metrics, None - ).await?; + let normal_stream = + FilteredReadStream::try_new(dataset.clone(), options, &metrics, None).await?; return DataFusionResult::Ok(normal_stream.get_stream(&metrics, partition)); } // 2. Check if total_row_count is valid if config.total_row_count == 0 { - let normal_stream = FilteredReadStream::try_new( - dataset.clone(), options, &metrics, None - ).await?; + let normal_stream = + FilteredReadStream::try_new(dataset.clone(), options, &metrics, None).await?; return DataFusionResult::Ok(normal_stream.get_stream(&metrics, partition)); } @@ -1647,33 +1649,41 @@ impl FilteredReadExec { // Evaluate index if present let mut evaluated_index = None; if let Some(index_input) = index_input { - let mut index_search = index_input.execute(partition, context.clone()) - .map_err(|e| Error::from(e))?; - let index_search_result = - index_search.next().await.ok_or_else(|| Error::Internal { + let mut index_search = index_input + .execute(partition, context.clone()) + .map_err(Error::from)?; + let index_search_result = index_search + .next() + .await + .ok_or_else(|| Error::Internal { message: "Index search did not yield any results".to_string(), location: location!(), })? - .map_err(|e| Error::from(e))?; + .map_err(Error::from)?; evaluated_index = Some(Arc::new(EvaluatedIndex::try_from_arrow( &index_search_result, )?)); } let cheap_stream = FilteredReadStream::try_new( - dataset.clone(), cheap_options, &metrics, evaluated_index.clone() - ).await?; + dataset.clone(), + cheap_options, + &metrics, + evaluated_index.clone(), + ) + .await?; let mut cheap_stream_boxed = cheap_stream.get_stream(&metrics, partition); // Collect row IDs with EARLY TERMINATION - let threshold_count = (config.total_row_count as f64 * config.threshold).ceil() as usize; + let threshold_count = + (config.total_row_count as f64 * config.threshold).ceil() as usize; let mut row_ids = Vec::new(); let mut cheap_batches = Vec::new(); let mut filtered_count = 0; while let Some(batch_result) = cheap_stream_boxed.next().await { - let batch = batch_result.map_err(|e| Error::from(e))?; + let batch = batch_result.map_err(Error::from)?; // EARLY STOP CHECK if filtered_count + batch.num_rows() >= threshold_count { @@ -1686,8 +1696,12 @@ impl FilteredReadExec { global_metrics.adaptive_pass1_rows.add(filtered_count); let full_stream = FilteredReadStream::try_new( - dataset.clone(), options, &metrics, evaluated_index - ).await?; + dataset.clone(), + options, + &metrics, + evaluated_index, + ) + .await?; return DataFusionResult::Ok(full_stream.get_stream(&metrics, partition)); } @@ -1726,10 +1740,13 @@ impl FilteredReadExec { let empty_batch = RecordBatch::new_empty(output_schema.clone()); let stream = futures::stream::iter(vec![Ok(empty_batch)]); let stream_adapter = RecordBatchStreamAdapter::new(output_schema.clone(), stream); - return Result::::Ok(Box::pin(stream_adapter) as SendableRecordBatchStream); + return Result::::Ok( + Box::pin(stream_adapter) as SendableRecordBatchStream + ); } - let expensive_projection = dataset.empty_projection() + let expensive_projection = dataset + .empty_projection() .union_column(&config.expensive_column, OnMissing::Error)?; // Create row ID batch for TakeExec @@ -1742,25 +1759,22 @@ impl FilteredReadExec { let row_id_batch = RecordBatch::try_new(row_id_schema, vec![row_id_array])?; let row_id_plan = Arc::new(OneShotExec::from_batch(row_id_batch)); - let take_exec = TakeExec::try_new( - dataset.clone(), - row_id_plan, - expensive_projection, - )?; + let take_exec = TakeExec::try_new(dataset.clone(), row_id_plan, expensive_projection)?; let take_exec = take_exec.ok_or_else(|| Error::Internal { message: "TakeExec returned None unexpectedly".to_string(), location: location!(), })?; - let mut expensive_stream = take_exec.execute(partition, context) - .map_err(|e| Error::from(e))?; + let mut expensive_stream = take_exec + .execute(partition, context) + .map_err(Error::from)?; // Concatenate all cheap batches into one use arrow_select::concat::concat_batches; let cheap_schema = cheap_batches[0].schema(); - let cheap_batch_combined = concat_batches(&cheap_schema, &cheap_batches) - .map_err(|e| Error::Arrow { + let cheap_batch_combined = + concat_batches(&cheap_schema, &cheap_batches).map_err(|e| Error::Arrow { message: format!("Failed to concatenate cheap batches: {}", e), location: location!(), })?; @@ -1768,7 +1782,7 @@ impl FilteredReadExec { // Collect all expensive batches let mut expensive_batches = Vec::new(); while let Some(batch_result) = expensive_stream.next().await { - let batch = batch_result.map_err(|e| Error::from(e))?; + let batch = batch_result.map_err(Error::from)?; expensive_batches.push(batch); } @@ -1787,9 +1801,14 @@ impl FilteredReadExec { })?; // Merge the two combined batches - build columns in the order of output_schema - let expensive_col_idx = expensive_batch_combined.schema().index_of(&config.expensive_column) + let expensive_col_idx = expensive_batch_combined + .schema() + .index_of(&config.expensive_column) .map_err(|e| Error::Internal { - message: format!("Expected {} column in expensive batch: {}", config.expensive_column, e), + message: format!( + "Expected {} column in expensive batch: {}", + config.expensive_column, e + ), location: location!(), })?; @@ -1801,30 +1820,37 @@ impl FilteredReadExec { columns.push(expensive_batch_combined.column(expensive_col_idx).clone()); } else { // Take from cheap batch - let cheap_col_idx = cheap_batch_combined.schema().index_of(col_name) - .map_err(|e| Error::Internal { - message: format!("Expected {} column in cheap batch: {}", col_name, e), - location: location!(), - })?; + let cheap_col_idx = + cheap_batch_combined + .schema() + .index_of(col_name) + .map_err(|e| Error::Internal { + message: format!( + "Expected {} column in cheap batch: {}", + col_name, e + ), + location: location!(), + })?; columns.push(cheap_batch_combined.column(cheap_col_idx).clone()); } } - let combined_batch: RecordBatch = RecordBatch::try_new( - output_schema.clone(), - columns, - ).map_err(|e: arrow_schema::ArrowError| Error::from(e))?; + let combined_batch: RecordBatch = RecordBatch::try_new(output_schema.clone(), columns) + .map_err(|e: arrow_schema::ArrowError| Error::from(e))?; // Return stream with single combined batch - let stream = futures::stream::iter( - vec![Ok(combined_batch)] - ); + let stream = futures::stream::iter(vec![Ok(combined_batch)]); let stream_adapter = RecordBatchStreamAdapter::new(output_schema.clone(), stream); - Result::::Ok(Box::pin(stream_adapter) as SendableRecordBatchStream) + Result::::Ok( + Box::pin(stream_adapter) as SendableRecordBatchStream + ) }) .try_flatten(); - Box::pin(RecordBatchStreamAdapter::new(output_schema_for_adapter, stream)) + Box::pin(RecordBatchStreamAdapter::new( + output_schema_for_adapter, + stream, + )) } pub fn dataset(&self) -> &Arc { @@ -1855,8 +1881,7 @@ impl DisplayAs for FilteredReadExec { let adaptive_info = if let Some(ref config) = self.options.adaptive_expensive_column { format!( ", adaptive_column={}, threshold={}", - config.expensive_column, - config.threshold + config.expensive_column, config.threshold ) } else { String::new() @@ -3679,9 +3704,11 @@ mod tests { }); // Add a filter that matches only a few rows (10 out of 300 = 3.3%) - options.full_filter = Some(datafusion_expr::col("fully_indexed").lt(datafusion_expr::lit(10u32))); + options.full_filter = + Some(datafusion_expr::col("fully_indexed").lt(datafusion_expr::lit(10u32))); - let filtered_read = Arc::new(FilteredReadExec::try_new(fixture.dataset.clone(), options, None).unwrap()); + let filtered_read = + Arc::new(FilteredReadExec::try_new(fixture.dataset.clone(), options, None).unwrap()); let _batches = filtered_read .execute(0, Arc::new(TaskContext::default())) @@ -3704,9 +3731,15 @@ mod tests { .map(|v| v.as_usize()) .unwrap_or(0); - assert_eq!(used_take, 1, "Should have used take path for selective filter"); + assert_eq!( + used_take, 1, + "Should have used take path for selective filter" + ); assert_eq!(used_full_scan, 0, "Should not have used full scan path"); - assert!(pass1_rows > 0 && pass1_rows <= 10, "Should have scanned ~10 rows in pass 1"); + assert!( + pass1_rows > 0 && pass1_rows <= 10, + "Should have scanned ~10 rows in pass 1" + ); // Test NON-SELECTIVE case - should use full scan path // Filter for fully_indexed < 290 matches 290 rows out of 300 @@ -3718,9 +3751,11 @@ mod tests { }); // Add a filter that matches many rows (290 out of 300 = 96.7%) - options2.full_filter = Some(datafusion_expr::col("fully_indexed").lt(datafusion_expr::lit(290u32))); + options2.full_filter = + Some(datafusion_expr::col("fully_indexed").lt(datafusion_expr::lit(290u32))); - let filtered_read2 = Arc::new(FilteredReadExec::try_new(fixture.dataset.clone(), options2, None).unwrap()); + let filtered_read2 = + Arc::new(FilteredReadExec::try_new(fixture.dataset.clone(), options2, None).unwrap()); let _batches2 = filtered_read2 .execute(0, Arc::new(TaskContext::default())) @@ -3738,7 +3773,10 @@ mod tests { .sum_by_name("adaptive_used_full_scan") .map(|v| v.as_usize()) .unwrap_or(0); - assert_eq!(used_take2, 0, "Should not have used take path for non-selective filter"); + assert_eq!( + used_take2, 0, + "Should not have used take path for non-selective filter" + ); assert_eq!(used_full_scan2, 1, "Should have used full scan path"); } }