-
Notifications
You must be signed in to change notification settings - Fork 979
Storage Filter Pushdown
Suppose you are given a query such as the following:
SELECT a, b, c FROM example.myTable WHERE a = 10 AND myudf(b, 3) = 'foo'
Here, myudf
is a user-defined function (UDF) known only to Drill.
Let's also assume that our data source can handle simple equality clauses. Perhaps we have a partitioned directory structure:
myTable
|- a=1
|- a=2
...
|- a=10
If so, then we can restrict our scan to just the a=10
subdirectory.
Or, perhaps we are using a data source such as JDBC where we can "push" part of the WHERE clause to the data source.
In either case, we want to do three things:
- Capture the simple equality clause,
a = 10
so that our storage plugin can handle it, while leaving the Drill-specific predicate,myudf(b, 3) = 'foo'
, for Drill to execute. - Rewrite the query to remove the pushed-down predicates:
SELECT a, b, c FROM example.myTable WHERE myudf(b, 3) = 'foo'
- Execute logic to implement the predicate so that the query returns the same result as if Drill were to execute the same query (but, presumably, do so faster because we do not ship unneeded data to Drill.)
Database theory speaks of "conjunctive normal form" (CNF) which is just a fancy way of describing a set of conditions joined by AND
:
a = 10 AND myudf(b, 3) = 'foo'
CNF is important because conditions can be applied independently. The following are equivalent:
SELECT a, b, c FROM example.myTable WHERE a = 10 AND myudf(b, 3) = 'foo'
SELECT a, b, c FROM (
SELECT a, b, c FROM example.myTable WHERE a = 10)
WHERE myudf(b, 3) = 'foo'
Note that the above is not true if our WHERE
clause contains top-level OR statements:
SELECT a, b, c FROM example.myTable WHERE a = 10 OR myudf(b, 3) = 'foo'
The conclusion is that we can push filter predicates into our storage plugin if they appear anywhere in a top-level AND
clause, but not if they appear in an OR
clause.
Calcite presents the WHERE
clause to us as a parse tree, so we can handle the following:
SELECT a, b, c FROM example.myTable WHERE a = 10 AND myudf(b, 3) = 'foo' AND c = 30
Which Calcite renders into a binary tree of expressions:
expression(AND)
- a = 10
- expression(AND)
- myudf(b, 3) = 'foo'
- c = 30
So, our goal is to walk the tree, find all the conjunctive terms, decide which we can rewrite, and create a new tree that includes all the terms we cannot rewrite (which could be the entire tree, part of the tree, or nothing.)
Drill uses Calcite to perform planning. Calcite is an extremely complex system with, alas, limited documentation. Calcite works by applying rules. The first step is to create a push-down rule:
public class ExampleFilterPushDownRule extends StoragePluginOptimizerRule {
public static final StoragePluginOptimizerRule INSTANCE = new ExampleFilterPushDownRule();
private ExampleFilterPushDownRule() {
super(RelOptHelper.some(FilterPrel.class, RelOptHelper.any(ScanPrel.class)),
"ExampleFilterPushDown:Filter_On_Scan");
}
@Override
public boolean matches(RelOptRuleCall call) {
if (!super.matches(call)) {
return false;
}
final ScanPrel scan = call.rel(1);
return scan.getGroupScan() instanceof ExampleGroupScan;
}
@Override
public void onMatch(RelOptRuleCall call) {
final ScanPrel scan = call.rel(1);
final FilterPrel filter = call.rel(0);
final RexNode condition = filter.getCondition();
LogicalExpression conditionExp =
DrillOptiq.toDrill(new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, condition);
}
}
All of the above is boilerplate:
- The rule is stateless so we create a single global
INSTANCE
. - The
super
constructor call says that this rule applies to filters (FilterPrel
, which is "filter physical relational element") within a scan (ScanPrel
). - The
matches()
method further refines the rule application: it says the rule applies only where the scan is for our particular storage plugin. - The
onMatch()
call is where we will do our work. Drill's existing storage plugin rules do not actually work directly on the Calcite structures so the code converts Calcite to Drill structures. ("Optiq" is the former name of Calcite.)
We tell Calcite to use our new rule by adding a method to our storage plugin:
@Override
public Set<StoragePluginOptimizerRule> getPhysicalOptimizerRules(
OptimizerRulesContext optimizerRulesContext) {
return ImmutableSet.of(ExampleFilterPushDownRule.INSTANCE);
}
At this point, Calcite will call our rule (you can set a breakpoint in the methods and run your test), but the rule itself does nothing yet.