Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 9 additions & 11 deletions datafusion/optimizer/src/push_down_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -457,9 +457,9 @@ fn push_down_all_join(

// For infer predicates, if they can not push through join, just drop them
for predicate in inferred_join_predicates {
if left_preserved && checker.is_left_only(&predicate) {
if checker.is_left_only(&predicate) {
left_push.push(predicate);
} else if right_preserved && checker.is_right_only(&predicate) {
} else if checker.is_right_only(&predicate) {
right_push.push(predicate);
}
Comment on lines 459 to 464
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My reasoning for this is that if this is an inferred predicated (i.e., inferred from the join) and can be pushed to the left/right relation, it should be pushed, independently of the type.

}
Expand Down Expand Up @@ -2721,8 +2721,7 @@ mod tests {
)
}

/// post-left-join predicate on a column common to both sides is only pushed to the left side
/// i.e. - not duplicated to the right side
/// post-left-join predicate on a column common to both sides is pushed to both sides
#[test]
fn filter_using_left_join_on_common() -> Result<()> {
let table_scan = test_table_scan()?;
Expand Down Expand Up @@ -2750,20 +2749,19 @@ mod tests {
TableScan: test2
",
);
// filter sent to left side of the join, not the right
// filter sent to left side of the join and to the right
assert_optimized_plan_equal!(
plan,
@r"
Left Join: Using test.a = test2.a
TableScan: test, full_filters=[test.a <= Int64(1)]
Projection: test2.a
TableScan: test2
TableScan: test2, full_filters=[test2.a <= Int64(1)]
"
)
}

/// post-right-join predicate on a column common to both sides is only pushed to the right side
/// i.e. - not duplicated to the left side.
/// post-right-join predicate on a column common to both sides is pushed to both sides
#[test]
fn filter_using_right_join_on_common() -> Result<()> {
let table_scan = test_table_scan()?;
Expand Down Expand Up @@ -2791,12 +2789,12 @@ mod tests {
TableScan: test2
",
);
// filter sent to right side of join, not duplicated to the left
// filter sent to right side of join, sent to the left as well
assert_optimized_plan_equal!(
plan,
@r"
Right Join: Using test.a = test2.a
TableScan: test
TableScan: test, full_filters=[test.a <= Int64(1)]
Projection: test2.a
TableScan: test2, full_filters=[test2.a <= Int64(1)]
"
Expand Down Expand Up @@ -2978,7 +2976,7 @@ mod tests {
Projection: test.a, test.b, test.c
TableScan: test
Projection: test2.a, test2.b, test2.c
TableScan: test2, full_filters=[test2.c > UInt32(4)]
TableScan: test2, full_filters=[test2.a > UInt32(1), test2.c > UInt32(4)]
"
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,18 +88,22 @@ logical_plan
02)--Projection: t2.a AS a2, t2.b
03)----RightSemi Join: t1.d = t2.d, t1.c = t2.c
04)------SubqueryAlias: t1
05)--------TableScan: annotated_data projection=[c, d]
06)------SubqueryAlias: t2
07)--------Filter: annotated_data.d = Int32(3)
08)----------TableScan: annotated_data projection=[a, b, c, d], partial_filters=[annotated_data.d = Int32(3)]
05)--------Filter: annotated_data.d = Int32(3)
06)----------TableScan: annotated_data projection=[c, d], partial_filters=[annotated_data.d = Int32(3)]
07)------SubqueryAlias: t2
08)--------Filter: annotated_data.d = Int32(3)
09)----------TableScan: annotated_data projection=[a, b, c, d], partial_filters=[annotated_data.d = Int32(3)]
physical_plan
01)SortPreservingMergeExec: [a2@0 ASC NULLS LAST, b@1 ASC NULLS LAST], fetch=10
02)--ProjectionExec: expr=[a@0 as a2, b@1 as b]
03)----HashJoinExec: mode=CollectLeft, join_type=RightSemi, on=[(d@1, d@3), (c@0, c@2)], projection=[a@0, b@1]
04)------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c, d], file_type=csv, has_header=true
05)------FilterExec: d@3 = 3
06)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true
07)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], file_type=csv, has_header=true
04)------CoalescePartitionsExec
05)--------FilterExec: d@1 = 3
06)----------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1
07)------------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c, d], file_type=csv, has_header=true
08)------FilterExec: d@3 = 3
09)--------RepartitionExec: partitioning=RoundRobinBatch(4), input_partitions=1, maintains_sort_order=true
10)----------DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, b, c, d], output_ordering=[a@0 ASC NULLS LAST, b@1 ASC NULLS LAST, c@2 ASC NULLS LAST], file_type=csv, has_header=true

# preserve_right_semi_join
query II nosort
Expand Down
7 changes: 4 additions & 3 deletions datafusion/sqllogictest/test_files/joins.slt
Original file line number Diff line number Diff line change
Expand Up @@ -5049,9 +5049,10 @@ WHERE k1 < 0
----
physical_plan
01)HashJoinExec: mode=CollectLeft, join_type=RightAnti, on=[(k2@0, k1@0)]
02)--DataSourceExec: partitions=1, partition_sizes=[0]
03)--FilterExec: k1@0 < 0
04)----DataSourceExec: partitions=1, partition_sizes=[10000]
02)--FilterExec: k2@0 < 0
03)----DataSourceExec: partitions=1, partition_sizes=[0]
04)--FilterExec: k1@0 < 0
05)----DataSourceExec: partitions=1, partition_sizes=[10000]

query II
SELECT *
Expand Down
254 changes: 254 additions & 0 deletions datafusion/sqllogictest/test_files/push_down_filter.slt
Original file line number Diff line number Diff line change
Expand Up @@ -488,3 +488,257 @@ physical_plan

statement ok
drop table agg_dyn_test;

statement ok
drop table t1;

statement ok
drop table t2;



# check LEFT/RIGHT joins with filter pushdown to both relations (when possible)

statement ok
create table t1(k int, v int);

statement ok
create table t2(k int, v int);

statement ok
insert into t1 values
(1, 10),
(2, 20),
(3, 30),
(null, 40),
(50, null),
(null, null);

statement ok
insert into t2 values
(1, 11),
(2, 21),
(2, 22),
(null, 41),
(51, null),
(null, null);

statement ok
set datafusion.explain.physical_plan_only = false;

statement ok
set datafusion.explain.logical_plan_only = true;


# left join + filter on join key -> pushed
query TT
explain select * from t1 left join t2 on t1.k = t2.k where t1.k > 1;
----
logical_plan
01)Left Join: t1.k = t2.k
02)--Filter: t1.k > Int32(1)
03)----TableScan: t1 projection=[k, v]
04)--Filter: t2.k > Int32(1)
05)----TableScan: t2 projection=[k, v]

query IIII rowsort
select * from t1 left join t2 on t1.k = t2.k where t1.k > 1;
----
2 20 2 21
2 20 2 22
3 30 NULL NULL
50 NULL NULL NULL

# left join + filter on another column -> not pushed
query TT
explain select * from t1 left join t2 on t1.k = t2.k where t1.v > 1;
----
logical_plan
01)Left Join: t1.k = t2.k
02)--Filter: t1.v > Int32(1)
03)----TableScan: t1 projection=[k, v]
04)--TableScan: t2 projection=[k, v]

query IIII rowsort
select * from t1 left join t2 on t1.k = t2.k where t1.v > 1;
----
1 10 1 11
2 20 2 21
2 20 2 22
3 30 NULL NULL
NULL 40 NULL NULL

# left join + or + filter on another column -> not pushed
query TT
explain select * from t1 left join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 20;
----
logical_plan
01)Left Join: t1.k = t2.k
02)--Filter: t1.k > Int32(3) OR t1.v > Int32(20)
03)----TableScan: t1 projection=[k, v]
04)--TableScan: t2 projection=[k, v]

query IIII rowsort
select * from t1 left join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 20;
----
3 30 NULL NULL
50 NULL NULL NULL
NULL 40 NULL NULL


# right join + filter on join key -> pushed
query TT
explain select * from t1 right join t2 on t1.k = t2.k where t1.k > 1;
----
logical_plan
01)Inner Join: t1.k = t2.k
02)--Filter: t1.k > Int32(1)
03)----TableScan: t1 projection=[k, v]
04)--Filter: t2.k > Int32(1)
05)----TableScan: t2 projection=[k, v]

query IIII rowsort
select * from t1 right join t2 on t1.k = t2.k where t1.k > 1;
----
2 20 2 21
2 20 2 22

# right join + filter on another column -> not pushed
query TT
explain select * from t1 right join t2 on t1.k = t2.k where t1.v > 1;
----
logical_plan
01)Inner Join: t1.k = t2.k
02)--Filter: t1.v > Int32(1)
03)----TableScan: t1 projection=[k, v]
04)--TableScan: t2 projection=[k, v]

query IIII rowsort
select * from t1 right join t2 on t1.k = t2.k where t1.v > 1;
----
1 10 1 11
2 20 2 21
2 20 2 22

# right join + or + filter on another column -> not pushed
query TT
explain select * from t1 right join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 20;
----
logical_plan
01)Inner Join: t1.k = t2.k
02)--Filter: t1.k > Int32(3) OR t1.v > Int32(20)
03)----TableScan: t1 projection=[k, v]
04)--TableScan: t2 projection=[k, v]

query IIII rowsort
select * from t1 right join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 20;
----


# left anti join + filter on join key -> pushed
query TT
explain select * from t1 left anti join t2 on t1.k = t2.k where t1.k > 1;
----
logical_plan
01)LeftAnti Join: t1.k = t2.k
02)--Filter: t1.k > Int32(1)
03)----TableScan: t1 projection=[k, v]
04)--Filter: t2.k > Int32(1)
05)----TableScan: t2 projection=[k]

query II rowsort
select * from t1 left anti join t2 on t1.k = t2.k where t1.k > 1;
----
3 30
50 NULL

# left anti join + filter on another column -> not pushed
query TT
explain select * from t1 left anti join t2 on t1.k = t2.k where t1.v > 1;
----
logical_plan
01)LeftAnti Join: t1.k = t2.k
02)--Filter: t1.v > Int32(1)
03)----TableScan: t1 projection=[k, v]
04)--TableScan: t2 projection=[k]

query II rowsort
select * from t1 left anti join t2 on t1.k = t2.k where t1.v > 1;
----
3 30
NULL 40

# left anti join + or + filter on another column -> not pushed
query TT
explain select * from t1 left anti join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 20;
----
logical_plan
01)LeftAnti Join: t1.k = t2.k
02)--Filter: t1.k > Int32(3) OR t1.v > Int32(20)
03)----TableScan: t1 projection=[k, v]
04)--TableScan: t2 projection=[k]

query II rowsort
select * from t1 left anti join t2 on t1.k = t2.k where t1.k > 3 or t1.v > 20;
----
3 30
50 NULL
NULL 40


# right anti join + filter on join key -> pushed
query TT
explain select * from t1 right anti join t2 on t1.k = t2.k where t2.k > 1;
----
logical_plan
01)RightAnti Join: t1.k = t2.k
02)--Filter: t1.k > Int32(1)
03)----TableScan: t1 projection=[k]
04)--Filter: t2.k > Int32(1)
05)----TableScan: t2 projection=[k, v]

query II rowsort
select * from t1 right anti join t2 on t1.k = t2.k where t2.k > 1;
----
51 NULL

# right anti join + filter on another column -> not pushed
query TT
explain select * from t1 right anti join t2 on t1.k = t2.k where t2.v > 1;
----
logical_plan
01)RightAnti Join: t1.k = t2.k
02)--TableScan: t1 projection=[k]
03)--Filter: t2.v > Int32(1)
04)----TableScan: t2 projection=[k, v]

query II rowsort
select * from t1 right anti join t2 on t1.k = t2.k where t2.v > 1;
----
NULL 41

# right anti join + or + filter on another column -> not pushed
query TT
explain select * from t1 right anti join t2 on t1.k = t2.k where t2.k > 3 or t2.v > 20;
----
logical_plan
01)RightAnti Join: t1.k = t2.k
02)--TableScan: t1 projection=[k]
03)--Filter: t2.k > Int32(3) OR t2.v > Int32(20)
04)----TableScan: t2 projection=[k, v]

query II rowsort
select * from t1 right anti join t2 on t1.k = t2.k where t2.k > 3 or t2.v > 20;
----
51 NULL
NULL 41


statement ok
set datafusion.explain.logical_plan_only = false;

statement ok
drop table t1;

statement ok
drop table t2;