-
Notifications
You must be signed in to change notification settings - Fork 979
Storage Filter Pushdown
Suppose you are given a query such as the following:
SELECT a, b, c FROM example.myTable WHERE a = 10 AND myudf(b, 3) = 'foo'
Here, myudf
is a user-defined function (UDF) known only to Drill.
Let's also assume that our data source can handle simple equality clauses. Perhaps we have a partitioned directory structure:
myTable
|- a=1
|- a=2
...
|- a=10
If so, then we can restrict our scan to just the a=10
subdirectory.
Or, perhaps we are using a data source such as JDBC where we can "push" part of the WHERE clause to the data source.
In either case, we want to do three things:
- Capture the simple equality clause,
a = 10
so that our storage plugin can handle it, while leaving the Drill-specific predicate,myudf(b, 3) = 'foo'
, for Drill to execute. - Rewrite the query to remove the pushed-down predicates:
SELECT a, b, c FROM example.myTable WHERE myudf(b, 3) = 'foo'
- Execute logic to implement the predicate so that the query returns the same result as if Drill were to execute the same query (but, presumably, do so faster because we do not ship unneeded data to Drill.)
Database theory speaks of "conjunctive normal form" (CNF) which is just a fancy way of describing a set of conditions joined by AND
:
a = 10 AND myudf(b, 3) = 'foo'
CNF is important because conditions can be applied independently. The following are equivalent:
SELECT a, b, c FROM example.myTable WHERE a = 10 AND myudf(b, 3) = 'foo'
SELECT a, b, c FROM (
SELECT a, b, c FROM example.myTable WHERE a = 10)
WHERE myudf(b, 3) = 'foo'
Note that the above is not true if our WHERE
clause contains top-level OR statements:
SELECT a, b, c FROM example.myTable WHERE a = 10 OR myudf(b, 3) = 'foo'
The conclusion is that we can push filter predicates into our storage plugin if they appear anywhere in a top-level AND
clause, but not if they appear in an OR
clause.
Calcite presents the WHERE
clause to us as a parse tree, so we can handle the following:
SELECT a, b, c FROM example.myTable WHERE a = 10 AND myudf(b, 3) = 'foo' AND c = 30
Which Calcite renders into a binary tree of expressions:
expression(AND)
- a = 10
- expression(AND)
- myudf(b, 3) = 'foo'
- c = 30
So, our goal is to walk the tree, find all the conjunctive terms, decide which we can rewrite, and create a new tree that includes all the terms we cannot rewrite (which could be the entire tree, part of the tree, or nothing.)
Drill's Calcite-based planner converts the above query to an executable plan in multiple steps. It is important that we choose the correct step in which to perform push down. The major forms of the plan are:
- SQL parse tree
- Calcite logical plan
- Drill logical plan
- Drill "physical" (un-parallelized logical) plan
- Drill parallelized operator descriptions
- Drill executable operators
The first five steps occur in the planner, the last step occurs at run time.
The other big of planning internals we should know about is the point at which Drill decides to split our GroupScan
into multiple threads of execution (minor fragments). This is done in the ScanPrule
class where it checks if our maximum parallelism (returned from getMaxParallelizationWidth()
) is greater than one. If so, it marks the scan as RANDOM_DISTRIBUTED
, which will cause multiple threads of execution. Since this rule fires in the PHYSICAL
phase, we must have completed our filter push-down work before that time.
Most existing plugins perform filter push-down at the physical stage: operating on Drill's "physical" operators. (The term "physical" is in quotes because this stage of the plan is still an abstract, un-parallelized description; it is a logical plan that simply has more Drill-specific details than the Calcite logical plan.
However, because the physical approach occurs in the same phase as parallelization, we cannot use the outcome of filter-push down to decide how we want to parallelize; those decisions will have already been made. So, instead we may prefer to perform filter-push down at an earlier logical phase.
Each planner phase gathers Calcite rules to apply to the plan tree. Fortunately, Drill asks our plugin for those rules by calling:
@Override
public Set<? extends StoragePluginOptimizerRule> getOptimizerRules(
OptimizerRulesContext optimizerContext, PlannerPhase phase);
We can insert a print statement to see which phases are available:
@Override
public Set<? extends StoragePluginOptimizerRule> getOptimizerRules(OptimizerRulesContext optimizerContext, PlannerPhase phase) {
System.out.println(phase);
return ImmutableSet.of();
}
The phases are driven by 'DefaultSqlHandler.convertToRawDrel(). We can insert print statements after each phase like this:
System.out.println(RelOptUtil.toString(pruned, SqlExplainLevel.ALL_ATTRIBUTES));
final RelNode intermediateNode = transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, pruned, logicalTraits);
...
System.out.println(RelOptUtil.toString(transitiveClosureNode, SqlExplainLevel.ALL_ATTRIBUTES));
intermediateNode2 = transform(PlannerType.HEP_BOTTOM_UP, PlannerPhase.PARTITION_PRUNING, transitiveClosureNode);
And in convertToPrel()
:
System.out.println(RelOptUtil.toString(drel, SqlExplainLevel.ALL_ATTRIBUTES));
final RelNode relNode = transform(PlannerType.VOLCANO, PlannerPhase.PHYSICAL, drel, traits, false);
If we run our plugin test in the debugger, we see the following output:
DIRECTORY_PRUNING
LogicalProject(a=[$1], b=[$2]): rowcount = 25.0, cumulative cost = {150.0 rows, 251.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 27
LogicalFilter(condition=[OR(=($1, 'bar'), =($1, 'foo'))]): rowcount = 25.0, cumulative cost = {125.0 rows, 201.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 25
EnumerableTableScan(table=[[dummy, myTable]]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 3
LOGICAL
DrillFilterRel(condition=[OR(=($0, 'bar'), =($0, 'foo'))]): rowcount = 2500.0, cumulative cost = {20000.0 rows, 70000.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 55
DrillScanRel(table=[[dummy, myTable]], groupscan=[DummyGroupScan [user="dummy", columns=[`a`, `b`], scanSpec=DummyScanSpec [table="myTable"]]]): rowcount = 10000.0, cumulative cost = {10000.0 rows, 20000.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 47
PARTITION_PRUNING
JOIN_PLANNING
ROWKEYJOIN_CONVERSION
SUM_CONVERSION
DrillScreenRel: rowcount = 2500.0, cumulative cost = {20250.0 rows, 70250.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 69
DrillFilterRel(condition=[OR(=($0, 'bar'), =($0, 'foo'))]): rowcount = 2500.0, cumulative cost = {20000.0 rows, 70000.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 67
DrillScanRel(table=[[dummy, myTable]], groupscan=[DummyGroupScan [user="dummy", columns=[`a`, `b`], scanSpec=DummyScanSpec [table="myTable"]]]): rowcount = 10000.0, cumulative cost = {10000.0 rows, 20000.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 47
PHYSICAL
Looking at this, we see that the plan at the directory DIRECTORY_PRUNING
is in pure Calcite form: our GroupScan
has not yet been created, we only see our "scan spec." However, at PARTITION_PRUNING
, our GroupScan
is available to help with the filter push-down work. By the time few get to the PHYSICAL
stage, all the plan nodes have been converted to the Drill logical form.
Once the above is done, and if we've left those debugging print statements in place, we can see the effect of a filter push-down. Before filter push-down:
DrillFilterRel(condition=[OR(=($0, 'bar'), =($0, 'foo'))]): ...
DrillScanRel(table=[[dummy2, myTable]],
groupscan=[DummyGroupScan [user="dummy", columns=[`a`, `b`],
scanSpec=DummyScanSpec [table="myTable"]]]): ...
After filter push-down:
DrillScanRel(table=[[dummy2, myTable]],
groupscan=[DummyGroupScan [user="dummy", columns=[`a`, `b`],
scanSpec=DummyScanSpec [table="myTable"],
orFilters=DisjunctionFilterSpec [column="a", type=VARCHAR, values=[bar, foo]]]]): ...
Notice that, for this query, the filter node was removed as all predicates where pushed into the scan.
This section discusses how to do filter push down in the PHYSICAL
stage. As explained above, this is probably the wrong stage as it is too late to affect parallelization. However, since most existing plugins work this way, its worth understanding this phase. You can skip this if you've gone the logical route.
Drill uses Calcite to perform planning. Calcite is an extremely complex system with, alas, limited documentation. Calcite works by applying rules. The first step is to create a push-down rule:
public class ExampleFilterPushDownRule extends StoragePluginOptimizerRule {
public static final StoragePluginOptimizerRule INSTANCE = new ExampleFilterPushDownRule();
private ExampleFilterPushDownRule() {
super(RelOptHelper.some(FilterPrel.class, RelOptHelper.any(ScanPrel.class)),
"ExampleFilterPushDown:Filter_On_Scan");
}
@Override
public boolean matches(RelOptRuleCall call) {
if (!super.matches(call)) {
return false;
}
final ScanPrel scan = call.rel(1);
return scan.getGroupScan() instanceof ExampleGroupScan;
}
@Override
public void onMatch(RelOptRuleCall call) {
final ScanPrel scan = call.rel(1);
final FilterPrel filter = call.rel(0);
final RexNode condition = filter.getCondition();
LogicalExpression conditionExp =
DrillOptiq.toDrill(new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, condition);
}
}
All of the above is boilerplate:
- The rule is stateless so we create a single global
INSTANCE
. - The
super
constructor call says that this rule applies to filters (FilterPrel
, which is "filter physical relational element") within a scan (ScanPrel
). - The
matches()
method further refines the rule application: it says the rule applies only where the scan is for our particular storage plugin. - The
onMatch()
call is where we will do our work. Drill's existing storage plugin rules do not actually work directly on the Calcite structures so the code converts Calcite to Drill structures. ("Optiq" is the former name of Calcite.)
We tell Calcite to use our new rule by adding a method to our storage plugin:
@Override
public Set<StoragePluginOptimizerRule> getPhysicalOptimizerRules(
OptimizerRulesContext optimizerRulesContext) {
return ImmutableSet.of(ExampleFilterPushDownRule.INSTANCE);
}
At this point, Calcite will call our rule (you can set a breakpoint in the methods and run your test), but the rule itself does nothing yet.