Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .unreleased/saop-pushdown
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Implements: #9192 Push down scalar array operations into the columnar metadata scan by transforming them into an OR/AND clause.
209 changes: 207 additions & 2 deletions tsl/src/nodes/columnar_scan/qual_pushdown.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ typedef struct QualPushdownContext
bool can_pushdown;
bool needs_recheck;
} QualPushdownContext;

static QualPushdownContext
copy_context(const QualPushdownContext *source)
{
Expand Down Expand Up @@ -622,6 +621,154 @@ pushdown_saop_bloom1(QualPushdownContext *context, ScalarArrayOpExpr *orig_saop)
COERCE_EXPLICIT_CALL);
}

/*
* Deconstruct a Const of array type into a list of the array values.
*/
static List *
deconstruct_array_const(Const *array_const)
{
/*
* No way to represent that as a list (NIL is an empty array), so has to be
* handled by the caller.
*/
Assert(!array_const->constisnull);

Oid array_type = array_const->consttype;
Datum array_datum = array_const->constvalue;

Oid element_type = get_element_type(array_type);
Assert(OidIsValid(element_type));

int16 typlen;
bool typbyval;
char typalign;
get_typlenbyvalalign(element_type, &typlen, &typbyval, &typalign);

int nelems;
Datum *elem_values;
bool *elem_nulls;
deconstruct_array(DatumGetArrayTypeP(array_datum),
element_type,
typlen,
typbyval,
typalign,
&elem_values,
&elem_nulls,
&nelems);

List *const_list = NIL;
for (int i = 0; i < nelems; i++)
{
Const *elem_const = makeConst(element_type,
array_const->consttypmod,
array_const->constcollid,
typlen,
elem_values[i],
elem_nulls[i],
typbyval);
const_list = lappend(const_list, elem_const);
}

return const_list;
}

/*
* Push down the scalar array operation by transforming it into a series of
* OR/AND clauses.
*/
static Expr *
pushdown_saop_boolexpr(QualPushdownContext *context, ScalarArrayOpExpr *saop)
{
void *scalar_arg = linitial(saop->args);
void *array_arg = list_nth(saop->args, 1);
List *array_elements;
if (IsA(array_arg, Const) && !castNode(Const, array_arg)->constisnull)
{
array_elements = deconstruct_array_const(castNode(Const, array_arg));
}
else if (IsA(array_arg, ArrayExpr))
{
array_elements = castNode(ArrayExpr, array_arg)->elements;
}
else
{
/*
* We can encounter an array-type Param here, and maybe something else.
* This function has to deconstruct the array into elements now, so
* these types of array argument are not suitable.
*/
context->can_pushdown = false;
return (Expr *) saop;
}

/*
* This will be the operation on the scalar value and an individual array
* element.
*/
OpExpr *opexpr = makeNode(OpExpr);
opexpr->opno = saop->opno;
opexpr->opfuncid = saop->opfuncid;
opexpr->opresulttype = BOOLOID;
opexpr->inputcollid = saop->inputcollid;

/*
* Try to apply the above operation for each array element.
*/
List *pushed_down_ops = NIL;
ListCell *lc;
foreach (lc, array_elements)
{
opexpr->args = list_make2(scalar_arg, lfirst(lc));

QualPushdownContext tmp_context = copy_context(context);
void *transformed = qual_pushdown_mutator((Node *) opexpr, &tmp_context);

/*
* If the scalar array operation uses AND, it's correct and useful to
* push down the check only for some array elements.
*
* For OR, we must be able to push down the checks for every element.
*/
if (!tmp_context.can_pushdown)
{
if (saop->useOr)
{
context->can_pushdown = false;
return (Expr *) saop;
}

continue;
}
context->needs_recheck |= tmp_context.needs_recheck;
pushed_down_ops = lappend(pushed_down_ops, transformed);
}

/*
* We can have no pushed down clauses if:
* 1) we had an AND scalar array operation, but failed to push down every
* individual clause.
* 2) we had an empty array argument, apparently it's not simplified by
* Postgres' eval_const_expressions().
*/
if (pushed_down_ops == NIL)
{
context->can_pushdown = false;
return (Expr *) saop;
}

if (list_length(pushed_down_ops) == 1)
return linitial(pushed_down_ops);

if (saop->useOr)
{
return make_orclause(pushed_down_ops);
}
else
{
return make_andclause(pushed_down_ops);
}
}

static bool
contain_volatile_functions_checker(Oid func_id, void *context)
{
Expand Down Expand Up @@ -803,17 +950,75 @@ qual_pushdown_mutator(Node *orig_node, QualPushdownContext *context)
}
}

/*
* Generic code for scalar array operation pushdown that transforms
* them into a series of OR/AND clauses.
*/
tmp_context = *context;
pushed_down = pushdown_saop_boolexpr(&tmp_context, saop);
if (tmp_context.can_pushdown)
{
context->needs_recheck |= tmp_context.needs_recheck;
return pushed_down;
}

/*
* No other ways to push it down, so consider it failed.
*/
context->can_pushdown = false;
return orig_node;
}
case T_BoolExpr:
{
BoolExpr *orig_boolexpr = castNode(BoolExpr, orig_node);
List *pushed_down_args = NIL;
ListCell *lc;
foreach (lc, orig_boolexpr->args)
{
QualPushdownContext tmp_context = *context;
void *pushed_down = qual_pushdown_mutator(lfirst(lc), &tmp_context);

/*
* If the bool operation uses AND, it's correct and useful to
* push down only some arguments.
*
* For OR, we must be able to push down every argument.
*/
if (!tmp_context.can_pushdown)
{
if (orig_boolexpr->boolop != AND_EXPR)
{
context->can_pushdown = false;
return orig_node;
}

continue;
}

context->needs_recheck |= tmp_context.needs_recheck;
pushed_down_args = lappend(pushed_down_args, pushed_down);
}

/*
* We might have no pushed down arguments if we had an AND bool
* operation, but failed to push down every individual argument.
*/
if (pushed_down_args == NIL)
{
context->can_pushdown = false;
return orig_node;
}

BoolExpr *boolexpr_copy = makeNode(BoolExpr);
*boolexpr_copy = *orig_boolexpr;
boolexpr_copy->args = pushed_down_args;
return (Node *) boolexpr_copy;
}

/*
* These nodes do not influence the pushdown by themselves, so we
* recurse.
*/
case T_BoolExpr:
case T_FuncExpr:
case T_CoerceViaIO:
case T_RelabelType:
Expand Down
9 changes: 5 additions & 4 deletions tsl/test/expected/compress_bloom_sparse-15.out
Original file line number Diff line number Diff line change
Expand Up @@ -251,19 +251,20 @@ execute p('2345');
deallocate p;
reset plan_cache_mode;
reset timescaledb.enable_chunk_append;
-- Only some scalar array operations are supported.
-- Scalar array operations are supported as well.
explain (analyze, verbose, buffers off, costs off, timing off, summary off)
select count(*) from bloom where x < any(array[1000, 2000]::int[]);
--- QUERY PLAN ---
Aggregate (actual rows=1.00 loops=1)
Output: count(*)
-> Custom Scan (ColumnarScan) on _timescaledb_internal._hyper_1_1_chunk (actual rows=1999.00 loops=1)
Vectorized Filter: (_hyper_1_1_chunk.x < ANY ('{1000,2000}'::integer[]))
Rows Removed by Filter: 8001
Batches Removed by Filter: 8
Rows Removed by Filter: 1
Bulk Decompression: true
-> Seq Scan on _timescaledb_internal.compress_hyper_2_2_chunk (actual rows=10.00 loops=1)
-> Seq Scan on _timescaledb_internal.compress_hyper_2_2_chunk (actual rows=2.00 loops=1)
Output: compress_hyper_2_2_chunk._ts_meta_count, compress_hyper_2_2_chunk._ts_meta_min_1, compress_hyper_2_2_chunk._ts_meta_max_1, compress_hyper_2_2_chunk.x, compress_hyper_2_2_chunk.regress-test-bloom_value, compress_hyper_2_2_chunk.value, compress_hyper_2_2_chunk.regress-test-bloom_u, compress_hyper_2_2_chunk.u, compress_hyper_2_2_chunk._ts_meta_v2_min_ts, compress_hyper_2_2_chunk._ts_meta_v2_max_ts, compress_hyper_2_2_chunk.ts
Filter: ((compress_hyper_2_2_chunk._ts_meta_min_1 < 1000) OR (compress_hyper_2_2_chunk._ts_meta_min_1 < 2000))
Rows Removed by Filter: 8

explain (analyze, verbose, buffers off, costs off, timing off, summary off)
select count(*) from bloom where value = any(array[md5('1000'), md5('2000')]);
Expand Down
9 changes: 5 additions & 4 deletions tsl/test/expected/compress_bloom_sparse-16.out
Original file line number Diff line number Diff line change
Expand Up @@ -251,19 +251,20 @@ execute p('2345');
deallocate p;
reset plan_cache_mode;
reset timescaledb.enable_chunk_append;
-- Only some scalar array operations are supported.
-- Scalar array operations are supported as well.
explain (analyze, verbose, buffers off, costs off, timing off, summary off)
select count(*) from bloom where x < any(array[1000, 2000]::int[]);
--- QUERY PLAN ---
Aggregate (actual rows=1.00 loops=1)
Output: count(*)
-> Custom Scan (ColumnarScan) on _timescaledb_internal._hyper_1_1_chunk (actual rows=1999.00 loops=1)
Vectorized Filter: (_hyper_1_1_chunk.x < ANY ('{1000,2000}'::integer[]))
Rows Removed by Filter: 8001
Batches Removed by Filter: 8
Rows Removed by Filter: 1
Bulk Decompression: true
-> Seq Scan on _timescaledb_internal.compress_hyper_2_2_chunk (actual rows=10.00 loops=1)
-> Seq Scan on _timescaledb_internal.compress_hyper_2_2_chunk (actual rows=2.00 loops=1)
Output: compress_hyper_2_2_chunk._ts_meta_count, compress_hyper_2_2_chunk._ts_meta_min_1, compress_hyper_2_2_chunk._ts_meta_max_1, compress_hyper_2_2_chunk.x, compress_hyper_2_2_chunk.regress-test-bloom_value, compress_hyper_2_2_chunk.value, compress_hyper_2_2_chunk.regress-test-bloom_u, compress_hyper_2_2_chunk.u, compress_hyper_2_2_chunk._ts_meta_v2_min_ts, compress_hyper_2_2_chunk._ts_meta_v2_max_ts, compress_hyper_2_2_chunk.ts
Filter: ((compress_hyper_2_2_chunk._ts_meta_min_1 < 1000) OR (compress_hyper_2_2_chunk._ts_meta_min_1 < 2000))
Rows Removed by Filter: 8

explain (analyze, verbose, buffers off, costs off, timing off, summary off)
select count(*) from bloom where value = any(array[md5('1000'), md5('2000')]);
Expand Down
9 changes: 5 additions & 4 deletions tsl/test/expected/compress_bloom_sparse-17.out
Original file line number Diff line number Diff line change
Expand Up @@ -251,19 +251,20 @@ execute p('2345');
deallocate p;
reset plan_cache_mode;
reset timescaledb.enable_chunk_append;
-- Only some scalar array operations are supported.
-- Scalar array operations are supported as well.
explain (analyze, verbose, buffers off, costs off, timing off, summary off)
select count(*) from bloom where x < any(array[1000, 2000]::int[]);
--- QUERY PLAN ---
Aggregate (actual rows=1.00 loops=1)
Output: count(*)
-> Custom Scan (ColumnarScan) on _timescaledb_internal._hyper_1_1_chunk (actual rows=1999.00 loops=1)
Vectorized Filter: (_hyper_1_1_chunk.x < ANY ('{1000,2000}'::integer[]))
Rows Removed by Filter: 8001
Batches Removed by Filter: 8
Rows Removed by Filter: 1
Bulk Decompression: true
-> Seq Scan on _timescaledb_internal.compress_hyper_2_2_chunk (actual rows=10.00 loops=1)
-> Seq Scan on _timescaledb_internal.compress_hyper_2_2_chunk (actual rows=2.00 loops=1)
Output: compress_hyper_2_2_chunk._ts_meta_count, compress_hyper_2_2_chunk._ts_meta_min_1, compress_hyper_2_2_chunk._ts_meta_max_1, compress_hyper_2_2_chunk.x, compress_hyper_2_2_chunk.regress-test-bloom_value, compress_hyper_2_2_chunk.value, compress_hyper_2_2_chunk.regress-test-bloom_u, compress_hyper_2_2_chunk.u, compress_hyper_2_2_chunk._ts_meta_v2_min_ts, compress_hyper_2_2_chunk._ts_meta_v2_max_ts, compress_hyper_2_2_chunk.ts
Filter: ((compress_hyper_2_2_chunk._ts_meta_min_1 < 1000) OR (compress_hyper_2_2_chunk._ts_meta_min_1 < 2000))
Rows Removed by Filter: 8

explain (analyze, verbose, buffers off, costs off, timing off, summary off)
select count(*) from bloom where value = any(array[md5('1000'), md5('2000')]);
Expand Down
9 changes: 5 additions & 4 deletions tsl/test/expected/compress_bloom_sparse-18.out
Original file line number Diff line number Diff line change
Expand Up @@ -251,19 +251,20 @@ execute p('2345');
deallocate p;
reset plan_cache_mode;
reset timescaledb.enable_chunk_append;
-- Only some scalar array operations are supported.
-- Scalar array operations are supported as well.
explain (analyze, verbose, buffers off, costs off, timing off, summary off)
select count(*) from bloom where x < any(array[1000, 2000]::int[]);
--- QUERY PLAN ---
Aggregate (actual rows=1.00 loops=1)
Output: count(*)
-> Custom Scan (ColumnarScan) on _timescaledb_internal._hyper_1_1_chunk (actual rows=1999.00 loops=1)
Vectorized Filter: (_hyper_1_1_chunk.x < ANY ('{1000,2000}'::integer[]))
Rows Removed by Filter: 8001
Batches Removed by Filter: 8
Rows Removed by Filter: 1
Bulk Decompression: true
-> Seq Scan on _timescaledb_internal.compress_hyper_2_2_chunk (actual rows=10.00 loops=1)
-> Seq Scan on _timescaledb_internal.compress_hyper_2_2_chunk (actual rows=2.00 loops=1)
Output: compress_hyper_2_2_chunk._ts_meta_count, compress_hyper_2_2_chunk._ts_meta_min_1, compress_hyper_2_2_chunk._ts_meta_max_1, compress_hyper_2_2_chunk.x, compress_hyper_2_2_chunk.regress-test-bloom_value, compress_hyper_2_2_chunk.value, compress_hyper_2_2_chunk.regress-test-bloom_u, compress_hyper_2_2_chunk.u, compress_hyper_2_2_chunk._ts_meta_v2_min_ts, compress_hyper_2_2_chunk._ts_meta_v2_max_ts, compress_hyper_2_2_chunk.ts
Filter: ((compress_hyper_2_2_chunk._ts_meta_min_1 < 1000) OR (compress_hyper_2_2_chunk._ts_meta_min_1 < 2000))
Rows Removed by Filter: 8

explain (analyze, verbose, buffers off, costs off, timing off, summary off)
select count(*) from bloom where value = any(array[md5('1000'), md5('2000')]);
Expand Down
Loading
Loading