-
Notifications
You must be signed in to change notification settings - Fork 979
Storage Filter Pushdown
Typically you use "filter push-down" to figure out these partitions:
SELECT * FROM myPlugin.timeSeries
WHERE eventTime BETWEEN '2019-11-15 13:00:00' AND '2019-11-15 14:00:00'
You use the WHERE
clause filter to identify the time range, then use plugin-specific logic to understand that, say, eventTime
is a partition column, and to work out the partitions. For file-based storage, the directory structure drives partitions. For external systems, the partitioning is likely to be very specific to that system.
A "filter" is a condition in our query, typically in the WHERE
clause. Filter "push-down" is where we move the filter out of the query and into our scan implementation. There are two use cases:
- Optional: the filter can be executed in Drill, but we can improve performance by pushing the filter.
- Mandatory: the filter cannot be executed in Drill and must be pushed into the scan.
Suppose you are given a query such as the following:
SELECT a, b, c FROM example.myTable WHERE a = 10 AND myudf(b, 3) = 'foo'
Here, myudf
is a user-defined function (UDF) known only to Drill.
Let's also assume that our data source can handle simple equality clauses. Perhaps we have a partitioned directory structure:
myTable
|- a=1
|- a=2
...
|- a=10
If so, then we can restrict our scan to just the a=10
subdirectory. This is called a "filter push-down" we push the a = 10
filter out of Drill into our scan. In this case, we don't have to do the push down: we could scan all directories and let Drill throw away the unwanted data. Thus filter push-down increases performance, but does not change query results.
Note that the myudf(b, 3) = 'foo'
clause cannot be pushed down: it refers to a function known only to Drill. (Though if myudf
were, in fact, a function known to your data source, you could push that function as well. That, however, would be a fairly obscure use case.)
Or, perhaps we are using a data source such as REST where we must pass certain parameters to the target service:
SELECT * FROM example.timeseries
WHERE startTime = '2018-11-30T10:00:00'
AND endTime = '2018-11-30T11:00:00'
In this case, we require the filter, and we want to remove the filter from the query because the startTime
and endTime
fields will not appear in each record. (Instead, if we leave the filters in the query, Drill will create a null value for those fields, which will never evaluate as equal to the given dates, and so the query will return no results.)
Or, perhaps we are using a data source such as JDBC where we can "push" part of the WHERE clause to the data source.
In either case, we want to do three things:
- Capture the simple equality clause,
a = 10
so that our storage plugin can handle it, while leaving the Drill-specific predicate,myudf(b, 3) = 'foo'
, for Drill to execute. - Rewrite the query to remove the pushed-down predicates:
SELECT a, b, c FROM example.myTable WHERE myudf(b, 3) = 'foo'
- Execute logic to implement the predicate so that the query returns the same result as if Drill were to execute the same query (but, presumably, do so faster because we do not ship unneeded data to Drill.)
Database theory speaks of "conjunctive normal form" (CNF) which is just a fancy way of describing a set of conditions joined by AND
:
a = 10 AND myudf(b, 3) = 'foo'
CNF is important because conditions can be applied independently. The following are equivalent:
SELECT a, b, c FROM example.myTable WHERE a = 10 AND myudf(b, 3) = 'foo'
SELECT a, b, c FROM (
SELECT a, b, c FROM example.myTable WHERE a = 10)
WHERE myudf(b, 3) = 'foo'
Note that the above is not true if our WHERE
clause contains top-level OR statements:
SELECT a, b, c FROM example.myTable WHERE a = 10 OR myudf(b, 3) = 'foo'
The fancy term for a series of OR statements is "disjunctive normal form" (DNF).
The conclusion is that we can push filter predicates into our storage plugin if they appear anywhere in a top-level AND
clause, but not if they appear in an OR
clause. (Except for a special case we discuss later.)
Calcite presents the WHERE
clause to us as a parse tree. Given the following:
SELECT a, b, c FROM example.myTable WHERE a = 10 AND myudf(b, 3) = 'foo' AND c = 30
Calcite produces a binary tree of expressions:
expression(AND)
- a = 10
- expression(AND)
- myudf(b, 3) = 'foo'
- c = 30
So, our goal is to walk the tree, find all the conjunctive terms, decide which we can rewrite, and create a new tree that includes all the terms we cannot rewrite (which could be the entire tree, part of the tree, or nothing.)
Drill's Calcite-based planner converts the above query to an executable plan in multiple steps. It is important that we choose the correct step in which to perform push down. The major forms of the plan are:
- SQL parse tree
- Calcite logical plan
- Drill logical plan
- Drill "physical" (un-parallelized logical) plan
- Drill parallelized operator descriptions
- Drill executable operators
The first five steps occur in the planner, the last step occurs at run time.
Most existing plugins that implement filter push-down do so during physical planning. However, this creates a race condition: we don't know how to parallelize until we see the filters, but filter push-down happens after parallelization decisions are made.
To resolve this, the Base
framework provides a set of helper classes that perform filter push-down during logical planning so that we can be sure filter push-down is done before physical planning starts.
We do this because, as explained here, we want to use filters to drive query parallelization.
Calcite is a rule-driven, cost-based planner. Each planner phase gathers Calcite rules to apply to the plan tree. Our storage plugin can contribute rules unique to our plugin:
@Override
public Set<? extends StoragePluginOptimizerRule> getOptimizerRules(
OptimizerRulesContext optimizerContext, PlannerPhase phase) {
if (phase.isFilterPushDownPhase()) {
return ExampleFilterPushDownListener.rulesFor(optimizerContext, config);
}
return ImmutableSet.of();
}
If you use the Base
filter push-down framework, things should "just work." However, if you need to create your own implementation, you will need to know quite a bit about Calcite planning internals. Here are some of the basics.
The phases are driven by 'DefaultSqlHandler.convertToRawDrel(). We can insert print statements after each phase like this:
System.out.println(RelOptUtil.toString(pruned, SqlExplainLevel.ALL_ATTRIBUTES));
final RelNode intermediateNode = transform(PlannerType.VOLCANO, PlannerPhase.LOGICAL, pruned, logicalTraits);
...
System.out.println(RelOptUtil.toString(transitiveClosureNode, SqlExplainLevel.ALL_ATTRIBUTES));
intermediateNode2 = transform(PlannerType.HEP_BOTTOM_UP, PlannerPhase.PARTITION_PRUNING, transitiveClosureNode);
And in convertToPrel()
:
System.out.println(RelOptUtil.toString(drel, SqlExplainLevel.ALL_ATTRIBUTES));
final RelNode relNode = transform(PlannerType.VOLCANO, PlannerPhase.PHYSICAL, drel, traits, false);
If we run our plugin test in the debugger, we see the following output:
DIRECTORY_PRUNING
LogicalProject(a=[$1], b=[$2]): rowcount = 25.0, cumulative cost = {150.0 rows, 251.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 27
LogicalFilter(condition=[OR(=($1, 'bar'), =($1, 'foo'))]): rowcount = 25.0, cumulative cost = {125.0 rows, 201.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 25
EnumerableTableScan(table=[[dummy, myTable]]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 3
LOGICAL
DrillFilterRel(condition=[OR(=($0, 'bar'), =($0, 'foo'))]): rowcount = 2500.0, cumulative cost = {20000.0 rows, 70000.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 55
DrillScanRel(table=[[dummy, myTable]], groupscan=[DummyGroupScan [user="dummy", columns=[`a`, `b`], scanSpec=DummyScanSpec [table="myTable"]]]): rowcount = 10000.0, cumulative cost = {10000.0 rows, 20000.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 47
PARTITION_PRUNING
JOIN_PLANNING
ROWKEYJOIN_CONVERSION
SUM_CONVERSION
DrillScreenRel: rowcount = 2500.0, cumulative cost = {20250.0 rows, 70250.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 69
DrillFilterRel(condition=[OR(=($0, 'bar'), =($0, 'foo'))]): rowcount = 2500.0, cumulative cost = {20000.0 rows, 70000.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 67
DrillScanRel(table=[[dummy, myTable]], groupscan=[DummyGroupScan [user="dummy", columns=[`a`, `b`], scanSpec=DummyScanSpec [table="myTable"]]]): rowcount = 10000.0, cumulative cost = {10000.0 rows, 20000.0 cpu, 0.0 io, 0.0 network, 0.0 memory}, id = 47
PHYSICAL
Looking at this, we see that the plan at the directory DIRECTORY_PRUNING
is in pure Calcite form: our GroupScan
has not yet been created, we only see our "scan spec." However, at PARTITION_PRUNING
, our GroupScan
is available to help with the filter push-down work. By the time few get to the PHYSICAL
stage, all the plan nodes have been converted to the Drill logical form.
Once the above is done, and if we've left those debugging print statements in place, we can see the effect of a filter push-down. Before filter push-down:
DrillFilterRel(condition=[OR(=($0, 'bar'), =($0, 'foo'))]): ...
DrillScanRel(table=[[dummy2, myTable]],
groupscan=[DummyGroupScan [user="dummy", columns=[`a`, `b`],
scanSpec=DummyScanSpec [table="myTable"]]]): ...
After filter push-down:
DrillScanRel(table=[[dummy2, myTable]],
groupscan=[DummyGroupScan [user="dummy", columns=[`a`, `b`],
scanSpec=DummyScanSpec [table="myTable"],
orFilters=DisjunctionFilterSpec [column="a", type=VARCHAR, values=[bar, foo]]]]): ...
Notice that, for this query, the filter node was removed as all predicates where pushed into the scan.
This section discusses how to do filter push down in the PHYSICAL
stage. As explained above, this is probably the wrong stage as it is too late to affect parallelization. However, since most existing plugins work this way, its worth understanding this phase. You can skip this if you've gone the logical route.
Drill uses Calcite to perform planning. Calcite is an extremely complex system with, alas, limited documentation. Calcite works by applying rules. The first step is to create a push-down rule:
public class ExampleFilterPushDownRule extends StoragePluginOptimizerRule {
public static final StoragePluginOptimizerRule INSTANCE = new ExampleFilterPushDownRule();
private ExampleFilterPushDownRule() {
super(RelOptHelper.some(FilterPrel.class, RelOptHelper.any(ScanPrel.class)),
"ExampleFilterPushDown:Filter_On_Scan");
}
@Override
public boolean matches(RelOptRuleCall call) {
if (!super.matches(call)) {
return false;
}
final ScanPrel scan = call.rel(1);
return scan.getGroupScan() instanceof ExampleGroupScan;
}
@Override
public void onMatch(RelOptRuleCall call) {
final ScanPrel scan = call.rel(1);
final FilterPrel filter = call.rel(0);
final RexNode condition = filter.getCondition();
LogicalExpression conditionExp =
DrillOptiq.toDrill(new DrillParseContext(PrelUtil.getPlannerSettings(call.getPlanner())), scan, condition);
}
}
All of the above is boilerplate:
- The rule is stateless so we create a single global
INSTANCE
. - The
super
constructor call says that this rule applies to filters (FilterPrel
, which is "filter physical relational element") within a scan (ScanPrel
). - The
matches()
method further refines the rule application: it says the rule applies only where the scan is for our particular storage plugin. - The
onMatch()
call is where we will do our work. Drill's existing storage plugin rules do not actually work directly on the Calcite structures so the code converts Calcite to Drill structures. ("Optiq" is the former name of Calcite.)
We tell Calcite to use our new rule by adding a method to our storage plugin:
@Override
public Set<StoragePluginOptimizerRule> getPhysicalOptimizerRules(
OptimizerRulesContext optimizerRulesContext) {
return ImmutableSet.of(ExampleFilterPushDownRule.INSTANCE);
}
At this point, Calcite will call our rule (you can set a breakpoint in the methods and run your test), but the rule itself does nothing yet.