Skip to content

Commit 7270277

Browse files
authored
resolve projection against ListingTable table_schema incl. partition columns (#106)
1 parent b687ce4 commit 7270277

File tree

2 files changed

+28
-19
lines changed

2 files changed

+28
-19
lines changed

datafusion/proto/src/logical_plan/mod.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -378,16 +378,6 @@ impl AsLogicalPlan for LogicalPlanNode {
378378
LogicalPlanType::ListingScan(scan) => {
379379
let schema: Schema = convert_required!(scan.schema)?;
380380

381-
let mut projection = None;
382-
if let Some(columns) = &scan.projection {
383-
let column_indices = columns
384-
.columns
385-
.iter()
386-
.map(|name| schema.index_of(name))
387-
.collect::<Result<Vec<usize>, _>>()?;
388-
projection = Some(column_indices);
389-
}
390-
391381
let filters =
392382
from_proto::parse_exprs(&scan.filters, ctx, extension_codec)?;
393383

@@ -492,6 +482,16 @@ impl AsLogicalPlan for LogicalPlanNode {
492482
let table_name =
493483
from_table_reference(scan.table_name.as_ref(), "ListingTableScan")?;
494484

485+
let mut projection = None;
486+
if let Some(columns) = &scan.projection {
487+
let column_indices = columns
488+
.columns
489+
.iter()
490+
.map(|name| provider.schema().index_of(name))
491+
.collect::<Result<Vec<usize>, _>>()?;
492+
projection = Some(column_indices);
493+
}
494+
495495
LogicalPlanBuilder::scan_with_filters(
496496
table_name,
497497
provider_as_source(Arc::new(provider)),

datafusion/proto/tests/cases/roundtrip_logical_plan.rs

Lines changed: 18 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ use datafusion::datasource::file_format::arrow::ArrowFormatFactory;
4444
use datafusion::datasource::file_format::csv::CsvFormatFactory;
4545
use datafusion::datasource::file_format::parquet::ParquetFormatFactory;
4646
use datafusion::datasource::file_format::{format_as_file_type, DefaultFileType};
47+
use datafusion::datasource::DefaultTableSource;
4748
use datafusion::execution::session_state::SessionStateBuilder;
4849
use datafusion::execution::FunctionRegistry;
4950
use datafusion::functions_aggregate::count::count_udaf;
@@ -75,9 +76,9 @@ use datafusion_expr::expr::{
7576
use datafusion_expr::logical_plan::{Extension, UserDefinedLogicalNodeCore};
7677
use datafusion_expr::{
7778
Accumulator, AggregateUDF, ColumnarValue, ExprFunctionExt, ExprSchemable, Literal,
78-
LogicalPlan, Operator, PartitionEvaluator, ScalarUDF, Signature, TryCast, Volatility,
79-
WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition, WindowUDF,
80-
WindowUDFImpl,
79+
LogicalPlan, LogicalPlanBuilder, Operator, PartitionEvaluator, ScalarUDF, Signature,
80+
TryCast, Volatility, WindowFrame, WindowFrameBound, WindowFrameUnits,
81+
WindowFunctionDefinition, WindowUDF, WindowUDFImpl,
8182
};
8283
use datafusion_functions_aggregate::average::avg_udaf;
8384
use datafusion_functions_aggregate::expr_fn::{
@@ -2643,16 +2644,24 @@ async fn roundtrip_custom_listing_tables_schema() -> Result<()> {
26432644
.infer_schema(&ctx.state())
26442645
.await?;
26452646

2646-
ctx.register_table("hive_style", Arc::new(ListingTable::try_new(config)?))?;
2647+
let listing_table: Arc<dyn TableProvider> = Arc::new(ListingTable::try_new(config)?);
26472648

2648-
let plan = ctx
2649-
.sql("SELECT part, value FROM hive_style LIMIT 1")
2650-
.await?
2651-
.logical_plan()
2652-
.clone();
2649+
let projection = ["part", "value"]
2650+
.iter()
2651+
.map(|field_name| listing_table.schema().index_of(field_name))
2652+
.collect::<Result<Vec<_>, _>>()?;
2653+
2654+
let plan = LogicalPlanBuilder::scan(
2655+
"hive_style",
2656+
Arc::new(DefaultTableSource::new(listing_table)),
2657+
Some(projection),
2658+
)?
2659+
.limit(0, Some(1))?
2660+
.build()?;
26532661

26542662
let bytes = logical_plan_to_bytes(&plan)?;
26552663
let new_plan = logical_plan_from_bytes(&bytes, &ctx)?;
2664+
26562665
assert_eq!(plan, new_plan);
26572666
Ok(())
26582667
}

0 commit comments

Comments
 (0)