Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions datafusion/proto/src/logical_plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,16 +379,6 @@ impl AsLogicalPlan for LogicalPlanNode {
LogicalPlanType::ListingScan(scan) => {
let schema: Schema = convert_required!(scan.schema)?;

let mut projection = None;
if let Some(columns) = &scan.projection {
let column_indices = columns
.columns
.iter()
.map(|name| schema.index_of(name))
.collect::<Result<Vec<usize>, _>>()?;
projection = Some(column_indices);
}

let filters =
from_proto::parse_exprs(&scan.filters, ctx, extension_codec)?;

Expand Down Expand Up @@ -496,6 +486,16 @@ impl AsLogicalPlan for LogicalPlanNode {
let table_name =
from_table_reference(scan.table_name.as_ref(), "ListingTableScan")?;

let mut projection = None;
if let Some(columns) = &scan.projection {
let column_indices = columns
.columns
.iter()
.map(|name| provider.schema().index_of(name))
.collect::<Result<Vec<usize>, _>>()?;
projection = Some(column_indices);
}

LogicalPlanBuilder::scan_with_filters(
table_name,
provider_as_source(Arc::new(provider)),
Expand Down
27 changes: 18 additions & 9 deletions datafusion/proto/tests/cases/roundtrip_logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use datafusion::datasource::file_format::arrow::ArrowFormatFactory;
use datafusion::datasource::file_format::csv::CsvFormatFactory;
use datafusion::datasource::file_format::parquet::ParquetFormatFactory;
use datafusion::datasource::file_format::{format_as_file_type, DefaultFileType};
use datafusion::datasource::DefaultTableSource;
use datafusion::execution::session_state::SessionStateBuilder;
use datafusion::execution::FunctionRegistry;
use datafusion::functions_aggregate::count::count_udaf;
Expand Down Expand Up @@ -77,9 +78,9 @@ use datafusion_expr::expr::{
use datafusion_expr::logical_plan::{Extension, UserDefinedLogicalNodeCore};
use datafusion_expr::{
Accumulator, AggregateUDF, ColumnarValue, ExprFunctionExt, ExprSchemable, Literal,
LogicalPlan, Operator, PartitionEvaluator, ScalarUDF, Signature, TryCast, Volatility,
WindowFrame, WindowFrameBound, WindowFrameUnits, WindowFunctionDefinition, WindowUDF,
WindowUDFImpl,
LogicalPlan, LogicalPlanBuilder, Operator, PartitionEvaluator, ScalarUDF, Signature,
TryCast, Volatility, WindowFrame, WindowFrameBound, WindowFrameUnits,
WindowFunctionDefinition, WindowUDF, WindowUDFImpl,
};
use datafusion_functions_aggregate::average::avg_udaf;
use datafusion_functions_aggregate::expr_fn::{
Expand Down Expand Up @@ -2659,16 +2660,24 @@ async fn roundtrip_custom_listing_tables_schema() -> Result<()> {
.infer_schema(&ctx.state())
.await?;

ctx.register_table("hive_style", Arc::new(ListingTable::try_new(config)?))?;
let listing_table: Arc<dyn TableProvider> = Arc::new(ListingTable::try_new(config)?);

let plan = ctx
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please leave the existing test case so it is clear there is no loss of coverage, and add the new plan as a new test case?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original test passes for me (where it should fail, after cargo clean) on main refs:

  • daeb6597a0c7344735460bb2dce13879fd89d7bd
  • 182d5dc5e456322664da921f446018a0549e60bc

At any rate, the old plan was 'supposed' to be the new plan! I am happy to make the change if I got this wrong, just wanted to double check 😄

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I think it best is if the changes to the tests illustrate the changes in the code on the behavior of the code

If you change the test and the code in the same PR it is harder to evaluate the impact of your change on behavior.

For example, if the original query now fails, perhaps you can change it to

let err = thing_that_errors.unwrap_err().to_string();
assert_contains!(err, <expected message>)

To show that a test that (incorrectly) used to work no longer does

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a new test alongside the existing one. The difference between the tests is that the new one will have its projection pushed down to the TableScan node whereas the original doesn't, which covers the projection case for this bug.

I could not figure out what kind of assertion change to make to the existing test (it will continue to pass with/without these changes).

.sql("SELECT part, value FROM hive_style LIMIT 1")
.await?
.logical_plan()
.clone();
let projection = ["part", "value"]
.iter()
.map(|field_name| listing_table.schema().index_of(field_name))
.collect::<Result<Vec<_>, _>>()?;

let plan = LogicalPlanBuilder::scan(
"hive_style",
Arc::new(DefaultTableSource::new(listing_table)),
Some(projection),
)?
.limit(0, Some(1))?
.build()?;

let bytes = logical_plan_to_bytes(&plan)?;
let new_plan = logical_plan_from_bytes(&bytes, &ctx)?;

assert_eq!(plan, new_plan);
Ok(())
}
Expand Down