From e0abc24697fb2d98ddcacce33e587298f037a73d Mon Sep 17 00:00:00 2001 From: GongXun Date: Thu, 21 Aug 2025 16:42:13 +0800 Subject: [PATCH 1/2] feat: pushdown bloom filter to pax table am This optimization pushes down Bloom Filter conditions for runtime filters to the Pax Table AM layer. By applying the filter earlier than the SeqNext() function, it eliminates the overhead of converting data from columnar format to TableTupleSlot, resulting in faster query execution --- .../pax_storage/src/cpp/access/pax_scanner.cc | 2 +- contrib/pax_storage/src/cpp/comm/cbdb_api.h | 1 + .../src/cpp/storage/filter/pax_filter.cc | 10 +- .../src/cpp/storage/filter/pax_filter.h | 3 +- .../src/cpp/storage/filter/pax_row_filter.cc | 73 +++- .../src/cpp/storage/filter/pax_row_filter.h | 35 +- .../cpp/storage/filter/pax_sparse_pg_path.cc | 6 +- .../micro_partition_row_filter_reader.cc | 123 +++++- .../micro_partition_row_filter_reader.h | 28 ++ .../regress/expected/gp_runtime_filter.out | 361 ++++++++++++++++++ .../test/regress/sql/gp_runtime_filter.sql | 188 +++++++++ src/backend/executor/nodeDynamicSeqscan.c | 32 +- src/backend/executor/nodeHashjoin.c | 11 +- src/backend/executor/nodeSeqscan.c | 6 +- 14 files changed, 822 insertions(+), 57 deletions(-) diff --git a/contrib/pax_storage/src/cpp/access/pax_scanner.cc b/contrib/pax_storage/src/cpp/access/pax_scanner.cc index 5a354e6fa0c..7afa8e7931c 100644 --- a/contrib/pax_storage/src/cpp/access/pax_scanner.cc +++ b/contrib/pax_storage/src/cpp/access/pax_scanner.cc @@ -372,7 +372,7 @@ TableScanDesc PaxScanDesc::BeginScanExtractColumns( && !(flags & SO_TYPE_VECTOR) #endif ) { - filter->InitRowFilter(rel, ps, filter->GetColumnProjection()); + filter->InitRowFilter(rel, ps, filter->GetColumnProjection(), key, nkeys); } } return BeginScan(rel, snapshot, nkeys, key, parallel_scan, flags, diff --git a/contrib/pax_storage/src/cpp/comm/cbdb_api.h b/contrib/pax_storage/src/cpp/comm/cbdb_api.h index 30db87143a1..9b65c2e1041 100644 --- a/contrib/pax_storage/src/cpp/comm/cbdb_api.h +++ b/contrib/pax_storage/src/cpp/comm/cbdb_api.h @@ -80,6 +80,7 @@ extern "C" { #include "commands/progress.h" #include "commands/tablecmds.h" #include "funcapi.h" +#include "lib/bloomfilter.h" #include "miscadmin.h" #include "nodes/bitmapset.h" #include "nodes/execnodes.h" diff --git a/contrib/pax_storage/src/cpp/storage/filter/pax_filter.cc b/contrib/pax_storage/src/cpp/storage/filter/pax_filter.cc index 5f19ab58400..d70b02f1a84 100644 --- a/contrib/pax_storage/src/cpp/storage/filter/pax_filter.cc +++ b/contrib/pax_storage/src/cpp/storage/filter/pax_filter.cc @@ -43,9 +43,8 @@ namespace pax { PaxFilter::PaxFilter() : sparse_filter_(nullptr), row_filter_(nullptr) {} -void PaxFilter::InitSparseFilter(Relation relation, List *quals, - ScanKey key, int nkeys, - bool allow_fallback_to_pg) { +void PaxFilter::InitSparseFilter(Relation relation, List *quals, ScanKey key, + int nkeys, bool allow_fallback_to_pg) { Assert(!sparse_filter_); sparse_filter_ = std::make_shared(relation, allow_fallback_to_pg); @@ -123,10 +122,11 @@ void PaxFilter::SetColumnProjection(const std::vector &cols, int natts) { } void PaxFilter::InitRowFilter(Relation relation, PlanState *ps, - const std::vector &projection) { + const std::vector &projection, ScanKey key, + int nkeys) { Assert(!row_filter_); row_filter_ = std::make_shared(); - if (!row_filter_->Initialize(relation, ps, projection)) { + if (!row_filter_->Initialize(relation, ps, projection, key, nkeys)) { row_filter_ = nullptr; } } diff --git a/contrib/pax_storage/src/cpp/storage/filter/pax_filter.h b/contrib/pax_storage/src/cpp/storage/filter/pax_filter.h index ebc2fff8538..0d5e1c08e14 100644 --- a/contrib/pax_storage/src/cpp/storage/filter/pax_filter.h +++ b/contrib/pax_storage/src/cpp/storage/filter/pax_filter.h @@ -73,7 +73,8 @@ class PaxFilter final { // The row filter void InitRowFilter(Relation relation, PlanState *ps, - const std::vector &projection); + const std::vector &projection, ScanKey key, + int nkeys); std::shared_ptr GetRowFilter(); void LogStatistics() const; diff --git a/contrib/pax_storage/src/cpp/storage/filter/pax_row_filter.cc b/contrib/pax_storage/src/cpp/storage/filter/pax_row_filter.cc index f653ec7406e..927ab33b4d4 100644 --- a/contrib/pax_storage/src/cpp/storage/filter/pax_row_filter.cc +++ b/contrib/pax_storage/src/cpp/storage/filter/pax_row_filter.cc @@ -26,6 +26,7 @@ */ #include "storage/filter/pax_row_filter.h" + #include "comm/cbdb_wrappers.h" namespace paxc { @@ -45,7 +46,8 @@ static inline void FindAttrsInQual(Node *qual, bool *proj, int ncol, } static bool BuildExecutionFilterForColumns(Relation rel, PlanState *ps, - pax::ExecutionFilterContext *ctx) { + pax::ExecutionFilterContext *ctx, + ScanKey key, int nkeys) { List *qual = ps->plan->qual; List **qual_list; ListCell *lc; @@ -53,11 +55,48 @@ static bool BuildExecutionFilterForColumns(Relation rel, PlanState *ps, int *qual_atts; int natts = RelationGetNumberOfAttributes(rel); - if (!qual || !IsA(qual, List)) return false; + int ret = false; + + if (key && nkeys > 0) { + // We don't need to support DynamicSeqScanState here. Even if the plan uses + // DynamicSeqScanNode for partitioned tables, it's always a regular SeqScan + // on a single table. So this will always be a SeqScanState. + if (nodeTag(ps) != T_SeqScanState) { + elog(ERROR, "runtime filter only support seqscan state, but got %d", + nodeTag(ps)); + } + + for (int i = 0; i < nkeys; i++) { + if (key[i].sk_flags & SK_BLOOM_FILTER) { + ctx->runtime_bloom_keys.emplace_back(key[i]); + ret = true; + } + } + + // register bloom filters + for (int i = 0; i < (int)ctx->runtime_bloom_keys.size(); ++i) { + pax::ExecutionFilterContext::FilterNode node; + node.kind = pax::ExecutionFilterContext::FilterKind::kBloom; + node.index = i; + ctx->filter_nodes.emplace_back(node); + } + + if (ps->instrument) { + ps->instrument->prf_work = true; + } + ctx->ps = ps; + + // set filter_in_seqscan to false, so that the filter will not be executed + // in SeqNext(), but will be executed in pax_row_filter + auto seqscan = (SeqScanState *)ps; + seqscan->filter_in_seqscan = false; + } + + if (!qual || !IsA(qual, List)) return ret; if (list_length(qual) == 1 && IsA(linitial(qual), BoolExpr)) { auto boolexpr = (BoolExpr *)linitial(qual); - if (boolexpr->boolop != AND_EXPR) return false; + if (boolexpr->boolop != AND_EXPR) return ret; qual = boolexpr->args; } Assert(IsA(qual, List)); @@ -98,6 +137,11 @@ static bool BuildExecutionFilterForColumns(Relation rel, PlanState *ps, if (!qual_list[i]) continue; ctx->estates[k] = ExecInitQual(qual_list[i], ps); ctx->attnos[k] = i; + // register expr filter node (by index k) + pax::ExecutionFilterContext::FilterNode node; + node.kind = pax::ExecutionFilterContext::FilterKind::kExpr; + node.index = k; + ctx->filter_nodes.emplace_back(node); list_free(qual_list[i]); k++; } @@ -108,7 +152,11 @@ static bool BuildExecutionFilterForColumns(Relation rel, PlanState *ps, list_free(qual_list[0]); } - Assert(ctx->size > 0 || ctx->estate_final); + Assert(ctx->size > 0 || ctx->estate_final || + ctx->runtime_bloom_keys.size() > 0); + + // remove qual from plan state, so that the qual will not be executed in + // executor, but will be executed in pax_row_filter ps->qual = nullptr; pfree(proj); @@ -117,20 +165,19 @@ static bool BuildExecutionFilterForColumns(Relation rel, PlanState *ps, return true; } -} // namespace paxc - +} // namespace paxc namespace pax { PaxRowFilter::PaxRowFilter() {} -bool PaxRowFilter::Initialize(Relation rel, PlanState *ps, const std::vector &projection) { +bool PaxRowFilter::Initialize(Relation rel, PlanState *ps, + const std::vector &projection, ScanKey key, + int nkeys) { bool ok = false; - + CBDB_WRAP_START; - { - ok = paxc::BuildExecutionFilterForColumns(rel, ps, &efctx_); - } + { ok = paxc::BuildExecutionFilterForColumns(rel, ps, &efctx_, key, nkeys); } CBDB_WRAP_END; if (ok) { @@ -140,7 +187,8 @@ bool PaxRowFilter::Initialize(Relation rel, PlanState *ps, const std::vector &projection) { +void PaxRowFilter::FillRemainingColumns(Relation rel, + const std::vector &projection) { int natts = RelationGetNumberOfAttributes(rel); auto proj_len = projection.size(); std::vector atts(natts); @@ -162,5 +210,4 @@ void PaxRowFilter::FillRemainingColumns(Relation rel, const std::vector &p } } - } // namespace pax \ No newline at end of file diff --git a/contrib/pax_storage/src/cpp/storage/filter/pax_row_filter.h b/contrib/pax_storage/src/cpp/storage/filter/pax_row_filter.h index d5a74f7619a..f4e3f85166f 100644 --- a/contrib/pax_storage/src/cpp/storage/filter/pax_row_filter.h +++ b/contrib/pax_storage/src/cpp/storage/filter/pax_row_filter.h @@ -42,16 +42,38 @@ struct ExecutionFilterContext { ExprState *estate_final = nullptr; ExprState **estates; AttrNumber *attnos; + PlanState *ps; int size = 0; inline bool HasExecutionFilter() const { return size > 0 || estate_final; } + + // runtime bloom filters pushed down via SeqScanState->filters + // (SK_BLOOM_FILTER) + std::vector runtime_bloom_keys; + + // unified filter nodes (expr + bloom) for execution ordering + enum class FilterKind { kExpr, kBloom }; + struct FilterNode { + FilterKind kind; + int index; // index in estates (for kExpr) or in runtime_bloom_keys (for + // kBloom) + uint64 tested = 0; // number of rows tested during sampling + uint64 passed = 0; // number of rows passed during sampling + double score = 1.0; // pass rate used for ordering (lower is better) + }; + std::vector filter_nodes; + + // sampling control to determine filter order + bool sampling = true; + uint64 sample_target = 65536; // number of rows for sampling phase + uint64 sample_rows = 0; // rows seen in sampling }; class PaxRowFilter final { -public: + public: PaxRowFilter(); bool Initialize(Relation rel, PlanState *ps, - const std::vector &projection); + const std::vector &projection, ScanKey key, int nkeys); inline const ExecutionFilterContext *GetExecutionFilterContext() const { return &efctx_; @@ -60,11 +82,11 @@ class PaxRowFilter final { inline const std::vector &GetRemainingColumns() const { return remaining_attnos_; } - -private: + + private: void FillRemainingColumns(Relation rel, const std::vector &projection); -private: + private: ExecutionFilterContext efctx_; // all selected columns - single row filting columns // before running final cross columns expression filtering, the remaining @@ -72,5 +94,4 @@ class PaxRowFilter final { std::vector remaining_attnos_; }; - -}; \ No newline at end of file +}; // namespace pax \ No newline at end of file diff --git a/contrib/pax_storage/src/cpp/storage/filter/pax_sparse_pg_path.cc b/contrib/pax_storage/src/cpp/storage/filter/pax_sparse_pg_path.cc index 3a7bc64f389..c979c7a0fce 100644 --- a/contrib/pax_storage/src/cpp/storage/filter/pax_sparse_pg_path.cc +++ b/contrib/pax_storage/src/cpp/storage/filter/pax_sparse_pg_path.cc @@ -50,12 +50,14 @@ void PaxSparseFilter::Initialize(List *quals, ScanKey key, int nkeys) { // walk scan key and only support min/max filter now for (int i = 0; i < nkeys; i++) { - // TODO: support bloom filter in PaxFilter - // but now just skip it, SeqNext() will check bloom filter in PassByBloomFilter() + // Now just skip bloom filter here, it will be handled in PaxRowFilter. if (key[i].sk_flags & SK_BLOOM_FILTER) { continue; } + // PushdownRuntimeFilter only support BTGreaterEqualStrategyNumber and + // BTLessEqualStrategyNumber. If the strategy is not supported, skip orther + // strategies. if (key[i].sk_strategy != BTGreaterEqualStrategyNumber && key[i].sk_strategy != BTLessEqualStrategyNumber) { continue; diff --git a/contrib/pax_storage/src/cpp/storage/micro_partition_row_filter_reader.cc b/contrib/pax_storage/src/cpp/storage/micro_partition_row_filter_reader.cc index c2daac1ac5d..3309d6763a1 100644 --- a/contrib/pax_storage/src/cpp/storage/micro_partition_row_filter_reader.cc +++ b/contrib/pax_storage/src/cpp/storage/micro_partition_row_filter_reader.cc @@ -27,12 +27,14 @@ #include "storage/micro_partition_row_filter_reader.h" +#include + #include "comm/guc.h" #include "comm/log.h" #include "comm/pax_memory.h" #include "storage/filter/pax_filter.h" -#include "storage/filter/pax_sparse_filter.h" #include "storage/filter/pax_row_filter.h" +#include "storage/filter/pax_sparse_filter.h" #include "storage/pax_defined.h" #include "storage/pax_itemptr.h" @@ -73,6 +75,109 @@ MicroPartitionRowFilterReader::GetNextGroup(TupleDesc desc) { return group_; } +void MicroPartitionRowFilterReader::LoadExprFilterColumns( + MicroPartitionReader::Group *group, TupleDesc desc, + const ExecutionFilterContext *ctx, size_t row_index, TupleTableSlot *slot) { + // There will not be duplicate attnos here because the attnos in ctx come from + // qual expressions. For each column index, there is at most one corresponding + // attno in ctx->attnos, so no attno appears more than once. As a result, this + // loop does not load the same column multiple times. + for (int i = 0; i < ctx->size; i++) { + auto attno = ctx->attnos[i]; + Assert(attno > 0); + std::tie(slot->tts_values[attno - 1], slot->tts_isnull[attno - 1]) = + group->GetColumnValue(desc, attno - 1, row_index); + } +} + +bool MicroPartitionRowFilterReader::EvalBloomNode( + const ExecutionFilterContext *ctx, MicroPartitionReader::Group *group, + TupleDesc desc, size_t row_index, int bloom_index) { + Assert(bloom_index >= 0 && + (size_t)bloom_index < ctx->runtime_bloom_keys.size()); + const auto &skd = ctx->runtime_bloom_keys[bloom_index]; + const ScanKey sk = const_cast(&skd); + bool isnull = false; + Datum val; + std::tie(val, isnull) = + group->GetColumnValue(desc, sk->sk_attno - 1, row_index); + if (isnull) return true; + bloom_filter *bf = (bloom_filter *)DatumGetPointer(sk->sk_argument); + return !bloom_lacks_element(bf, (unsigned char *)&val, sizeof(Datum)); +} + +bool MicroPartitionRowFilterReader::EvalExprNode( + const ExecutionFilterContext *ctx, TupleTableSlot *slot, int expr_index) { + return TestRowScanInternal(slot, ctx->estates[expr_index], + ctx->attnos[expr_index]); +} + +// Execute a filter node. +// During the sampling phase, updates the filter's pass rate statistics. +bool MicroPartitionRowFilterReader::EvalFilterNode( + ExecutionFilterContext *ctx, MicroPartitionReader::Group *group, + TupleDesc desc, size_t row_index, TupleTableSlot *slot, + ExecutionFilterContext::FilterNode &node, bool update_stats) { + bool pass = true; + if (node.kind == ExecutionFilterContext::FilterKind::kBloom) { + pass = EvalBloomNode(ctx, group, desc, row_index, node.index); + if (ctx->ps->instrument && !pass) ctx->ps->instrument->nfilteredPRF += 1; + } else { + pass = EvalExprNode(ctx, slot, node.index); + } + if (update_stats) { + node.tested++; + node.passed += pass ? 1 : 0; + } + return pass; +} + +// Applies the row filter nodes to the current tuple in two phases: Sampling and +// Filtering. +// In the sampling phase, pass rates for each filter expression are collected on +// the first 64k rows, then the filters are sorted by effectiveness (lower pass +// rate first) for optimal filtering order. +// In the filtering phase, filters are applied in the determined order with +// short-circuit evaluation; a failure in any filter causes immediate rejection +// of the tuple. +bool MicroPartitionRowFilterReader::ApplyFiltersWithSampling( + ExecutionFilterContext *ctx, MicroPartitionReader::Group *group, + TupleDesc desc, size_t row_index, TupleTableSlot *slot) { + if (!ctx->sampling) { + for (auto &node : ctx->filter_nodes) { + if (!EvalFilterNode(ctx, group, desc, row_index, slot, node, false)) { + return false; + } + } + return true; + } + + bool all_pass = true; + for (auto &node : ctx->filter_nodes) { + if (!EvalFilterNode(ctx, group, desc, row_index, slot, node, true)) { + all_pass = false; + break; + } + } + ctx->sample_rows++; + if (!all_pass) return false; + + if (ctx->sample_rows >= ctx->sample_target) { + for (auto &node : ctx->filter_nodes) { + node.score = + (node.tested == 0) ? 1.0 : (double)node.passed / (double)node.tested; + } + std::stable_sort(ctx->filter_nodes.begin(), ctx->filter_nodes.end(), + [](const auto &a, const auto &b) { + // Lower pass rate first (better selectivity) + if (a.score != b.score) return a.score < b.score; + return (int)a.kind < (int)b.kind; + }); + ctx->sampling = false; + } + return true; +} + bool MicroPartitionRowFilterReader::ReadTuple(TupleTableSlot *slot) { auto g = group_; Assert(filter_->GetRowFilter()); @@ -108,16 +213,14 @@ bool MicroPartitionRowFilterReader::ReadTuple(TupleTableSlot *slot) { } } - for (int i = 0; i < ctx->size; i++) { - auto attno = ctx->attnos[i]; - Assert(attno > 0); - std::tie(slot->tts_values[attno - 1], slot->tts_isnull[attno - 1]) = - g->GetColumnValue(desc, attno - 1, current_group_row_index_); - if (!TestRowScanInternal(slot, ctx->estates[i], attno)) { - current_group_row_index_++; - goto retry_next; - } + LoadExprFilterColumns(g.get(), desc, ctx, current_group_row_index_, slot); + if (!ApplyFiltersWithSampling(const_cast(ctx), + g.get(), desc, current_group_row_index_, + slot)) { + current_group_row_index_++; + goto retry_next; } + for (auto attno : remaining_columns) { std::tie(slot->tts_values[attno - 1], slot->tts_isnull[attno - 1]) = g->GetColumnValue(desc, attno - 1, current_group_row_index_); diff --git a/contrib/pax_storage/src/cpp/storage/micro_partition_row_filter_reader.h b/contrib/pax_storage/src/cpp/storage/micro_partition_row_filter_reader.h index 0e691182087..f1a3cf448bd 100644 --- a/contrib/pax_storage/src/cpp/storage/micro_partition_row_filter_reader.h +++ b/contrib/pax_storage/src/cpp/storage/micro_partition_row_filter_reader.h @@ -28,6 +28,7 @@ #pragma once #include "storage/micro_partition.h" +#include "storage/filter/pax_row_filter.h" namespace pax { class MicroPartitionRowFilterReader : public MicroPartitionReaderProxy { @@ -50,6 +51,33 @@ class MicroPartitionRowFilterReader : public MicroPartitionReaderProxy { bool TestRowScanInternal(TupleTableSlot *slot, ExprState *estate, AttrNumber attno); + // Ensure all columns needed by expr filters are loaded into the slot + inline void LoadExprFilterColumns(MicroPartitionReader::Group *group, TupleDesc desc, + const ExecutionFilterContext *ctx, + size_t row_index, TupleTableSlot *slot); + + // Evaluate a single bloom filter node + inline bool EvalBloomNode(const ExecutionFilterContext *ctx, + MicroPartitionReader::Group *group, TupleDesc desc, + size_t row_index, int bloom_index); + + // Evaluate a single expr filter node + inline bool EvalExprNode(const ExecutionFilterContext *ctx, TupleTableSlot *slot, + int expr_index); + + // Evaluate a unified filter node and optionally update sampling stats + inline bool EvalFilterNode(ExecutionFilterContext *ctx, + MicroPartitionReader::Group *group, TupleDesc desc, + size_t row_index, TupleTableSlot *slot, + ExecutionFilterContext::FilterNode &node, + bool update_stats); + + // Apply all filters with sampling and dynamic ordering + bool ApplyFiltersWithSampling(ExecutionFilterContext *ctx, + MicroPartitionReader::Group *group, + TupleDesc desc, size_t row_index, + TupleTableSlot *slot); + // filter is referenced only, the reader doesn't own it. std::shared_ptr filter_; std::shared_ptr group_; diff --git a/contrib/pax_storage/src/test/regress/expected/gp_runtime_filter.out b/contrib/pax_storage/src/test/regress/expected/gp_runtime_filter.out index 6af798604ff..8ffc0086820 100644 --- a/contrib/pax_storage/src/test/regress/expected/gp_runtime_filter.out +++ b/contrib/pax_storage/src/test/regress/expected/gp_runtime_filter.out @@ -1,3 +1,7 @@ +-- start_matchignore +-- m/^.*Extra Text:.*/ +-- m/^.*Buckets:.*/ +-- end_matchignore -- Disable ORCA SET optimizer TO off; -- Test Suit 1: runtime filter main case @@ -250,6 +254,363 @@ SELECT COUNT(*) FROM dim_rf 1600 (1 row) +-- Test bloom filter pushdown +SET enable_parallel TO off; +-- case 1: join on distribution table and replicated table. +DROP TABLE IF EXISTS t1; +NOTICE: table "t1" does not exist, skipping +DROP TABLE IF EXISTS t2; +NOTICE: table "t2" does not exist, skipping +CREATE TABLE t1(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, orientation=column) distributed by (c1); +CREATE TABLE t2(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, orientation=column) distributed REPLICATED; +INSERT INTO t1 VALUES (5,5,5,5,5); +INSERT INTO t2 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4); +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t2 select * FROM t2; +INSERT INTO t2 select * FROM t2; +INSERT INTO t2 select * FROM t2; +ANALYZE; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2; + QUERY PLAN +------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) (actual rows=0 loops=1) + -> Hash Join (actual rows=0 loops=1) + Hash Cond: (t1.c2 = t2.c2) + Extra Text: (seg2) Hash chain length 8.0 avg, 8 max, using 4 of 524288 buckets. + -> Seq Scan on t1 (actual rows=128 loops=1) + -> Hash (actual rows=32 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4098kB + -> Seq Scan on t2 (actual rows=32 loops=1) + Optimizer: Postgres query optimizer +(9 rows) + +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2; + QUERY PLAN +------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) (actual rows=0 loops=1) + -> Hash Join (actual rows=0 loops=1) + Hash Cond: (t1.c2 = t2.c2) + Extra Text: (seg2) Hash chain length 8.0 avg, 8 max, using 4 of 524288 buckets. + -> Seq Scan on t1 (actual rows=1 loops=1) + Rows Removed by Pushdown Runtime Filter: 127 + -> Hash (actual rows=32 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4098kB + -> Seq Scan on t2 (actual rows=32 loops=1) + Optimizer: Postgres query optimizer +(10 rows) + +RESET gp_enable_runtime_filter_pushdown; +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +-- case 2: join on partition table and replicated table. +CREATE TABLE t1 (c1 INT, c2 INT) DISTRIBUTED BY (c1) PARTITION BY RANGE (c2) (START (1) END (100) EVERY (50)); +CREATE TABLE t2 (c1 INT, c2 INT) DISTRIBUTED REPLICATED; +INSERT INTO t1 SELECT generate_series(1, 99), generate_series(1, 99); +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t2 SELECT generate_series(1, 5), generate_series(1, 5); +INSERT INTO t2 SELECT generate_series(51, 51), generate_series(51, 51); +ANALYZE; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT * FROM t1, t2 WHERE t1.c2 = t2.c2; + QUERY PLAN +------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) (actual rows=96 loops=1) + -> Hash Join (actual rows=64 loops=1) + Hash Cond: (t1.c2 = t2.c2) + Extra Text: (seg0) Hash chain length 1.0 avg, 1 max, using 6 of 524288 buckets. + -> Append (actual rows=608 loops=1) + Partition Selectors: $0 + -> Seq Scan on t1_1_prt_1 t1_1 (actual rows=288 loops=1) + -> Seq Scan on t1_1_prt_2 t1_2 (actual rows=320 loops=1) + -> Hash (actual rows=6 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4097kB + -> Partition Selector (selector id: $0) (actual rows=6 loops=1) + -> Seq Scan on t2 (actual rows=6 loops=1) + Optimizer: Postgres query optimizer +(13 rows) + +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT * FROM t1, t2 WHERE t1.c2 = t2.c2; + QUERY PLAN +------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) (actual rows=96 loops=1) + -> Hash Join (actual rows=64 loops=1) + Hash Cond: (t1.c2 = t2.c2) + Extra Text: (seg0) Hash chain length 1.0 avg, 1 max, using 6 of 524288 buckets. + -> Append (actual rows=64 loops=1) + Partition Selectors: $0 + -> Seq Scan on t1_1_prt_1 t1_1 (actual rows=48 loops=1) + Rows Removed by Pushdown Runtime Filter: 240 + -> Seq Scan on t1_1_prt_2 t1_2 (actual rows=16 loops=1) + Rows Removed by Pushdown Runtime Filter: 304 + -> Hash (actual rows=6 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4097kB + -> Partition Selector (selector id: $0) (actual rows=6 loops=1) + -> Seq Scan on t2 (actual rows=6 loops=1) + Optimizer: Postgres query optimizer +(15 rows) + +RESET gp_enable_runtime_filter_pushdown; +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +-- case 3: bug fix with explain +DROP TABLE IF EXISTS test_tablesample1; +NOTICE: table "test_tablesample1" does not exist, skipping +CREATE TABLE test_tablesample1 (dist int, id int, name text) DISTRIBUTED BY (dist); +INSERT INTO test_tablesample1 SELECT 0, i, repeat(i::text, 875) FROM generate_series(0, 9) s(i) ORDER BY i; +INSERT INTO test_tablesample1 SELECT 3, i, repeat(i::text, 875) FROM generate_series(10, 19) s(i) ORDER BY i; +INSERT INTO test_tablesample1 SELECT 5, i, repeat(i::text, 875) FROM generate_series(20, 29) s(i) ORDER BY i; +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (COSTS OFF) SELECT id FROM test_tablesample1 TABLESAMPLE SYSTEM (50) REPEATABLE (2); + QUERY PLAN +-------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) + -> Sample Scan on test_tablesample1 + Sampling: system ('50'::real) REPEATABLE ('2'::double precision) + Optimizer: Postgres query optimizer +(4 rows) + +RESET gp_enable_runtime_filter_pushdown; +DROP TABLE IF EXISTS test_tablesample1; +-- case 4: show debug info only when gp_enable_runtime_filter_pushdown is on +DROP TABLE IF EXISTS t1; +NOTICE: table "t1" does not exist, skipping +DROP TABLE IF EXISTS t2; +NOTICE: table "t2" does not exist, skipping +CREATE TABLE t1(c1 int, c2 int); +CREATE TABLE t2(c1 int, c2 int); +INSERT INTO t1 SELECT GENERATE_SERIES(1, 1000), GENERATE_SERIES(1, 1000); +INSERT INTO t2 SELECT * FROM t1; +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT count(t1.c2) FROM t1, t2 WHERE t1.c1 = t2.c1; + QUERY PLAN +--------------------------------------------------------------------------------------------------------- + Finalize Aggregate (actual rows=1 loops=1) + -> Gather Motion 3:1 (slice1; segments: 3) (actual rows=3 loops=1) + -> Partial Aggregate (actual rows=1 loops=1) + -> Hash Join (actual rows=340 loops=1) + Hash Cond: (t1.c1 = t2.c1) + Extra Text: (seg2) Hash chain length 1.0 avg, 1 max, using 340 of 524288 buckets. + -> Seq Scan on t1 (actual rows=340 loops=1) + Rows Removed by Pushdown Runtime Filter: 0 + -> Hash (actual rows=340 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4108kB + -> Seq Scan on t2 (actual rows=340 loops=1) + Optimizer: Postgres query optimizer +(12 rows) + +RESET gp_enable_runtime_filter_pushdown; +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +-- case 5: hashjoin + result + seqsacn +DROP TABLE IF EXISTS t1; +NOTICE: table "t1" does not exist, skipping +DROP TABLE IF EXISTS t2; +NOTICE: table "t2" does not exist, skipping +CREATE TABLE t1(c1 int, c2 int, c3 char(50), c4 char(50), c5 char(50)) DISTRIBUTED REPLICATED; +CREATE TABLE t2(c1 int, c2 int, c3 char(50), c4 char(50), c5 char(50)); +INSERT INTO t1 VALUES (5,5,5,5,5), (3,3,3,3,3), (4,4,4,4,4); +INSERT INTO t2 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4); +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t2 select * FROM t2; +ANALYZE; +SET optimizer TO on; +SET gp_enable_runtime_filter_pushdown TO off; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT t1.c3 FROM t1, t2 WHERE t1.c1 = t2.c1; + QUERY PLAN +------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) (actual rows=32 loops=1) + -> Hash Join (actual rows=32 loops=1) + Hash Cond: (t1.c1 = t2.c1) + Extra Text: (seg0) Hash chain length 2.0 avg, 2 max, using 3 of 524288 buckets. + -> Result (actual rows=16 loops=1) + -> Seq Scan on t1 (actual rows=24 loops=1) + -> Hash (actual rows=6 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4097kB + -> Seq Scan on t2 (actual rows=6 loops=1) + Optimizer: Pivotal Optimizer (GPORCA) +(10 rows) + +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT t1.c3 FROM t1, t2 WHERE t1.c1 = t2.c1; + QUERY PLAN +------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) (actual rows=32 loops=1) + -> Hash Join (actual rows=32 loops=1) + Hash Cond: (t1.c1 = t2.c1) + Extra Text: (seg0) Hash chain length 2.0 avg, 2 max, using 3 of 524288 buckets. + -> Result (actual rows=16 loops=1) + -> Seq Scan on t1 (actual rows=16 loops=1) + Rows Removed by Pushdown Runtime Filter: 8 + -> Hash (actual rows=6 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4097kB + -> Seq Scan on t2 (actual rows=6 loops=1) + Optimizer: Pivotal Optimizer (GPORCA) +(11 rows) + +RESET gp_enable_runtime_filter_pushdown; +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +-- case 6: hashjoin + hashjoin + seqscan +DROP TABLE IF EXISTS t1; +NOTICE: table "t1" does not exist, skipping +DROP TABLE IF EXISTS t2; +NOTICE: table "t2" does not exist, skipping +DROP TABLE IF EXISTS t3; +NOTICE: table "t3" does not exist, skipping +CREATE TABLE t1(c1 int, c2 int, c3 char(50), c4 char(50), c5 char(50)) DISTRIBUTED REPLICATED; +CREATE TABLE t2(c1 int, c2 int, c3 char(50), c4 char(50), c5 char(50)) DISTRIBUTED REPLICATED; +CREATE TABLE t3(c1 int, c2 int, c3 char(50), c4 char(50), c5 char(50)) DISTRIBUTED REPLICATED; +INSERT INTO t1 VALUES (1,1,1,1,1), (2,2,2,2,2), (5,5,5,5,5); +INSERT INTO t2 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4); +INSERT INTO t3 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4); +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t2 select * FROM t2; +INSERT INTO t2 select * FROM t2; +INSERT INTO t3 select * FROM t3; +ANALYZE; +SET optimizer TO off; +SET gp_enable_runtime_filter_pushdown TO off; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT * FROM t1, t2, t3 WHERE t1.c1 = t2.c1 AND t1.c2 = t3.c2; + QUERY PLAN +--------------------------------------------------------------------------------------- + Gather Motion 1:1 (slice1; segments: 1) (actual rows=256 loops=1) + -> Hash Join (actual rows=256 loops=1) + Hash Cond: (t1.c1 = t2.c1) + Extra Text: Hash chain length 4.0 avg, 4 max, using 4 of 32768 buckets. + -> Hash Join (actual rows=64 loops=1) + Hash Cond: (t1.c2 = t3.c2) + Extra Text: Hash chain length 2.0 avg, 2 max, using 4 of 32768 buckets. + -> Seq Scan on t1 (actual rows=48 loops=1) + -> Hash (actual rows=8 loops=1) + Buckets: 32768 Batches: 1 Memory Usage: 258kB + -> Seq Scan on t3 (actual rows=8 loops=1) + -> Hash (actual rows=16 loops=1) + Buckets: 32768 Batches: 1 Memory Usage: 260kB + -> Seq Scan on t2 (actual rows=16 loops=1) + Optimizer: Postgres query optimizer +(15 rows) + +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT * FROM t1, t2, t3 WHERE t1.c1 = t2.c1 AND t1.c2 = t3.c2; + QUERY PLAN +--------------------------------------------------------------------------------------- + Gather Motion 1:1 (slice1; segments: 1) (actual rows=256 loops=1) + -> Hash Join (actual rows=256 loops=1) + Hash Cond: (t1.c1 = t2.c1) + Extra Text: Hash chain length 4.0 avg, 4 max, using 4 of 32768 buckets. + -> Hash Join (actual rows=64 loops=1) + Hash Cond: (t1.c2 = t3.c2) + Extra Text: Hash chain length 2.0 avg, 2 max, using 4 of 32768 buckets. + -> Seq Scan on t1 (actual rows=32 loops=1) + Rows Removed by Pushdown Runtime Filter: 16 + -> Hash (actual rows=8 loops=1) + Buckets: 32768 Batches: 1 Memory Usage: 258kB + -> Seq Scan on t3 (actual rows=8 loops=1) + -> Hash (actual rows=16 loops=1) + Buckets: 32768 Batches: 1 Memory Usage: 260kB + -> Seq Scan on t2 (actual rows=16 loops=1) + Optimizer: Postgres query optimizer +(16 rows) + +RESET gp_enable_runtime_filter_pushdown; +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +DROP TABLE IF EXISTS t3; +-- case 7: scan partition table with dynamic scan +CREATE TABLE t1 (c1 INT, c2 INT) DISTRIBUTED BY (c1) PARTITION BY RANGE (c2) (START (1) END (100) EVERY (50)); +CREATE TABLE t2 (c1 INT, c2 INT) DISTRIBUTED REPLICATED; +INSERT INTO t1 SELECT generate_series(1, 99), generate_series(1, 99); +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t2 SELECT generate_series(1, 5), generate_series(1, 5); +INSERT INTO t2 SELECT generate_series(51, 51), generate_series(51, 51); +ANALYZE; +SET optimizer TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT t1.c2 FROM t1, t2 WHERE t1.c2 = t2.c2; + QUERY PLAN +------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) (actual rows=96 loops=1) + -> Hash Join (actual rows=64 loops=1) + Hash Cond: (t1.c2 = t2.c2) + Extra Text: (seg0) Hash chain length 1.0 avg, 1 max, using 6 of 524288 buckets. + -> Dynamic Seq Scan on t1 (actual rows=608 loops=1) + Number of partitions to scan: 2 (out of 2) + Partitions scanned: Avg 2.0 x 3 workers. Max 2 parts (seg0). + -> Hash (actual rows=6 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4097kB + -> Partition Selector (selector id: $0) (actual rows=6 loops=1) + -> Seq Scan on t2 (actual rows=6 loops=1) + Optimizer: GPORCA +(12 rows) + +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT t1.c2 FROM t1, t2 WHERE t1.c2 = t2.c2; + QUERY PLAN +------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) (actual rows=96 loops=1) + -> Hash Join (actual rows=64 loops=1) + Hash Cond: (t1.c2 = t2.c2) + Extra Text: (seg0) Hash chain length 1.0 avg, 1 max, using 6 of 524288 buckets. + -> Dynamic Seq Scan on t1 (actual rows=64 loops=1) + Rows Removed by Pushdown Runtime Filter: 544 + Number of partitions to scan: 2 (out of 2) + Partitions scanned: Avg 2.0 x 3 workers. Max 2 parts (seg0). + -> Hash (actual rows=6 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4097kB + -> Partition Selector (selector id: $0) (actual rows=6 loops=1) + -> Seq Scan on t2 (actual rows=6 loops=1) + Optimizer: GPORCA +(13 rows) + +set pax.enable_row_filter TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT t1.c2 FROM t1, t2 WHERE t1.c2 = t2.c2; + QUERY PLAN +------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) (actual rows=96 loops=1) + -> Hash Join (actual rows=64 loops=1) + Hash Cond: (t1.c2 = t2.c2) + Extra Text: (seg0) Hash chain length 1.0 avg, 1 max, using 6 of 524288 buckets. + -> Dynamic Seq Scan on t1 (actual rows=64 loops=1) + Rows Removed by Pushdown Runtime Filter: 544 + Number of partitions to scan: 2 (out of 2) + Partitions scanned: Avg 2.0 x 3 workers. Max 2 parts (seg0). + -> Hash (actual rows=6 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4097kB + -> Partition Selector (selector id: $0) (actual rows=6 loops=1) + -> Seq Scan on t2 (actual rows=6 loops=1) + Optimizer: GPORCA +(13 rows) + +RESET pax.enable_row_filter; +RESET gp_enable_runtime_filter_pushdown; +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +SET optimizer TO off; +RESET enable_parallel; -- Clean up: reset guc SET gp_enable_runtime_filter TO off; SET optimizer TO default; diff --git a/contrib/pax_storage/src/test/regress/sql/gp_runtime_filter.sql b/contrib/pax_storage/src/test/regress/sql/gp_runtime_filter.sql index 876575b78c3..3f238bf5d69 100644 --- a/contrib/pax_storage/src/test/regress/sql/gp_runtime_filter.sql +++ b/contrib/pax_storage/src/test/regress/sql/gp_runtime_filter.sql @@ -1,3 +1,7 @@ +-- start_matchignore +-- m/^.*Extra Text:.*/ +-- m/^.*Buckets:.*/ +-- end_matchignore -- Disable ORCA SET optimizer TO off; @@ -75,6 +79,190 @@ SELECT COUNT(*) FROM fact_rf SELECT COUNT(*) FROM dim_rf WHERE dim_rf.did IN (SELECT did FROM fact_rf) AND proj_id < 2; +-- Test bloom filter pushdown +SET enable_parallel TO off; + +-- case 1: join on distribution table and replicated table. +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +CREATE TABLE t1(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, orientation=column) distributed by (c1); +CREATE TABLE t2(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, orientation=column) distributed REPLICATED; + +INSERT INTO t1 VALUES (5,5,5,5,5); +INSERT INTO t2 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4); + +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; + +INSERT INTO t2 select * FROM t2; +INSERT INTO t2 select * FROM t2; +INSERT INTO t2 select * FROM t2; + +ANALYZE; + +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2; + +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2; + +RESET gp_enable_runtime_filter_pushdown; + +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; + +-- case 2: join on partition table and replicated table. +CREATE TABLE t1 (c1 INT, c2 INT) DISTRIBUTED BY (c1) PARTITION BY RANGE (c2) (START (1) END (100) EVERY (50)); +CREATE TABLE t2 (c1 INT, c2 INT) DISTRIBUTED REPLICATED; +INSERT INTO t1 SELECT generate_series(1, 99), generate_series(1, 99); +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t2 SELECT generate_series(1, 5), generate_series(1, 5); +INSERT INTO t2 SELECT generate_series(51, 51), generate_series(51, 51); +ANALYZE; + +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT * FROM t1, t2 WHERE t1.c2 = t2.c2; + +SET gp_enable_runtime_filter_pushdown TO on; + +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT * FROM t1, t2 WHERE t1.c2 = t2.c2; + +RESET gp_enable_runtime_filter_pushdown; + +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; + +-- case 3: bug fix with explain +DROP TABLE IF EXISTS test_tablesample1; +CREATE TABLE test_tablesample1 (dist int, id int, name text) DISTRIBUTED BY (dist); +INSERT INTO test_tablesample1 SELECT 0, i, repeat(i::text, 875) FROM generate_series(0, 9) s(i) ORDER BY i; +INSERT INTO test_tablesample1 SELECT 3, i, repeat(i::text, 875) FROM generate_series(10, 19) s(i) ORDER BY i; +INSERT INTO test_tablesample1 SELECT 5, i, repeat(i::text, 875) FROM generate_series(20, 29) s(i) ORDER BY i; + +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (COSTS OFF) SELECT id FROM test_tablesample1 TABLESAMPLE SYSTEM (50) REPEATABLE (2); +RESET gp_enable_runtime_filter_pushdown; + +DROP TABLE IF EXISTS test_tablesample1; + +-- case 4: show debug info only when gp_enable_runtime_filter_pushdown is on +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +CREATE TABLE t1(c1 int, c2 int); +CREATE TABLE t2(c1 int, c2 int); +INSERT INTO t1 SELECT GENERATE_SERIES(1, 1000), GENERATE_SERIES(1, 1000); +INSERT INTO t2 SELECT * FROM t1; + +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT count(t1.c2) FROM t1, t2 WHERE t1.c1 = t2.c1; +RESET gp_enable_runtime_filter_pushdown; + +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; + +-- case 5: hashjoin + result + seqsacn +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +CREATE TABLE t1(c1 int, c2 int, c3 char(50), c4 char(50), c5 char(50)) DISTRIBUTED REPLICATED; +CREATE TABLE t2(c1 int, c2 int, c3 char(50), c4 char(50), c5 char(50)); +INSERT INTO t1 VALUES (5,5,5,5,5), (3,3,3,3,3), (4,4,4,4,4); +INSERT INTO t2 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4); +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t2 select * FROM t2; +ANALYZE; + +SET optimizer TO on; +SET gp_enable_runtime_filter_pushdown TO off; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT t1.c3 FROM t1, t2 WHERE t1.c1 = t2.c1; + +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT t1.c3 FROM t1, t2 WHERE t1.c1 = t2.c1; + +RESET gp_enable_runtime_filter_pushdown; + +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; + +-- case 6: hashjoin + hashjoin + seqscan +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +DROP TABLE IF EXISTS t3; +CREATE TABLE t1(c1 int, c2 int, c3 char(50), c4 char(50), c5 char(50)) DISTRIBUTED REPLICATED; +CREATE TABLE t2(c1 int, c2 int, c3 char(50), c4 char(50), c5 char(50)) DISTRIBUTED REPLICATED; +CREATE TABLE t3(c1 int, c2 int, c3 char(50), c4 char(50), c5 char(50)) DISTRIBUTED REPLICATED; +INSERT INTO t1 VALUES (1,1,1,1,1), (2,2,2,2,2), (5,5,5,5,5); +INSERT INTO t2 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4); +INSERT INTO t3 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4); +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t2 select * FROM t2; +INSERT INTO t2 select * FROM t2; +INSERT INTO t3 select * FROM t3; +ANALYZE; + +SET optimizer TO off; +SET gp_enable_runtime_filter_pushdown TO off; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT * FROM t1, t2, t3 WHERE t1.c1 = t2.c1 AND t1.c2 = t3.c2; + +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT * FROM t1, t2, t3 WHERE t1.c1 = t2.c1 AND t1.c2 = t3.c2; + +RESET gp_enable_runtime_filter_pushdown; + +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +DROP TABLE IF EXISTS t3; + +-- case 7: scan partition table with dynamic scan +CREATE TABLE t1 (c1 INT, c2 INT) DISTRIBUTED BY (c1) PARTITION BY RANGE (c2) (START (1) END (100) EVERY (50)); +CREATE TABLE t2 (c1 INT, c2 INT) DISTRIBUTED REPLICATED; +INSERT INTO t1 SELECT generate_series(1, 99), generate_series(1, 99); +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t2 SELECT generate_series(1, 5), generate_series(1, 5); +INSERT INTO t2 SELECT generate_series(51, 51), generate_series(51, 51); +ANALYZE; + +SET optimizer TO on; + +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT t1.c2 FROM t1, t2 WHERE t1.c2 = t2.c2; + +SET gp_enable_runtime_filter_pushdown TO on; + +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT t1.c2 FROM t1, t2 WHERE t1.c2 = t2.c2; + +set pax.enable_row_filter TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT t1.c2 FROM t1, t2 WHERE t1.c2 = t2.c2; + + +RESET pax.enable_row_filter; +RESET gp_enable_runtime_filter_pushdown; + +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; + +SET optimizer TO off; + +RESET enable_parallel; -- Clean up: reset guc SET gp_enable_runtime_filter TO off; diff --git a/src/backend/executor/nodeDynamicSeqscan.c b/src/backend/executor/nodeDynamicSeqscan.c index 3380b907ba3..f74f0bae01e 100644 --- a/src/backend/executor/nodeDynamicSeqscan.c +++ b/src/backend/executor/nodeDynamicSeqscan.c @@ -170,6 +170,22 @@ initNextTableToScan(DynamicSeqScanState *node) node->seqScanState = ExecInitSeqScanForPartition(&plan->seqscan, estate, currentRelation); + + if(estate->es_instrument) + { + node->seqScanState->ss.ps.instrument = GpInstrAlloc(node->seqScanState->ss.ps.plan, estate->es_instrument, node->seqScanState->ss.ps.async_capable); + } + + if (gp_enable_runtime_filter_pushdown && !estate->useMppParallelMode) + { + node->seqScanState->filter_in_seqscan = true; + // copy filters to seqscan state + node->seqScanState->filters = list_concat(node->seqScanState->filters, node->filters); + } + else + { + node->seqScanState->filter_in_seqscan = false; + } return true; } @@ -223,16 +239,7 @@ ExecDynamicSeqScan(PlanState *pstate) slot = ExecProcNode(&node->seqScanState->ss.ps); if (!TupIsNull(slot)) - { - if (gp_enable_runtime_filter_pushdown - && !pstate->state->useMppParallelMode - && node->filters) - { - if (!PassByBloomFilter(&node->ss.ps, node->filters, slot)) - continue; - } break; - } /* No more tuples from this partition. Move to next one. */ CleanupOnePartition(node); @@ -252,6 +259,13 @@ CleanupOnePartition(DynamicSeqScanState *scanState) if (scanState->seqScanState) { + // if runtime filter really work in subpartition, accumulate the statistics + if(scanState->seqScanState->ss.ps.instrument && scanState->seqScanState->ss.ps.instrument->prf_work) + { + scanState->ss.ps.instrument->prf_work = true; + scanState->ss.ps.instrument->nfilteredPRF += scanState->seqScanState->ss.ps.instrument->nfilteredPRF; + } + ExecEndSeqScan(scanState->seqScanState); scanState->seqScanState = NULL; Assert(scanState->ss.ss_currentRelation != NULL); diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index 9ec70f16e31..e0832649340 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -2191,6 +2191,7 @@ CreateRuntimeFilter(HashJoinState* hjstate) HashState *hstate; AttrFilter *attr_filter; ListCell *lc; + ListCell *lc_targets; List *targets; /* @@ -2236,9 +2237,9 @@ CreateRuntimeFilter(HashJoinState* hjstate) if (lattno == -1 || targets == NULL) continue; - foreach(lc, targets) + foreach(lc_targets, targets) { - PlanState *target = lfirst(lc); + PlanState *target = lfirst(lc_targets); Assert(IsA(target, SeqScanState) || IsA(target, DynamicSeqScanState)); attr_filter = CreateAttrFilter(target, lattno, rattno, @@ -2341,12 +2342,6 @@ CheckTargetNode(PlanState *node, AttrNumber attno, AttrNumber *lattno) if (!IsA(te->expr, Var)) return false; - if (IsA(node, DynamicSeqScanState)) - { - *lattno = te->resno; - return true; - } - /* * seqscan is a special case, it's targetlist is a projection of the * relation's attributes. so we need to find the attribute number of the diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c index 7ee5a2e94cf..68998dc1f8e 100644 --- a/src/backend/executor/nodeSeqscan.c +++ b/src/backend/executor/nodeSeqscan.c @@ -77,7 +77,7 @@ SeqNext(SeqScanState *node) /* * Just when gp_enable_runtime_filter_pushdown enabled and - * node->filter_in_seqscan is false means scankey need to be pushed to + * table am support runtime filter, scankey need to be pushed to * AM. */ if (gp_enable_runtime_filter_pushdown && node->filter_in_seqscan && node->filters && @@ -231,6 +231,10 @@ ExecInitSeqScanForPartition(SeqScan *node, EState *estate, { scanstate->filter_in_seqscan = true; } + else + { + scanstate->filter_in_seqscan = false; + } return scanstate; } From 6bc5d05dfe7e32314625b383012e4fba5d701da7 Mon Sep 17 00:00:00 2001 From: GongXun Date: Mon, 20 Oct 2025 17:41:50 +0800 Subject: [PATCH 2/2] performance: eliminate unnecessary null_counts calculations after calculating the null_counts array in advance, there is no need to call GetColumnDatum to continue updating null_counts. We can directly read the datum. --- .../micro_partition_row_filter_reader.cc | 8 ++++---- .../src/cpp/storage/orc/orc_group.cc | 17 +++++++++++------ .../test/regress/expected/gp_runtime_filter.out | 4 ++-- .../src/test/regress/sql/gp_runtime_filter.sql | 4 ++-- 4 files changed, 19 insertions(+), 14 deletions(-) diff --git a/contrib/pax_storage/src/cpp/storage/micro_partition_row_filter_reader.cc b/contrib/pax_storage/src/cpp/storage/micro_partition_row_filter_reader.cc index 3309d6763a1..9eac653aebc 100644 --- a/contrib/pax_storage/src/cpp/storage/micro_partition_row_filter_reader.cc +++ b/contrib/pax_storage/src/cpp/storage/micro_partition_row_filter_reader.cc @@ -152,15 +152,15 @@ bool MicroPartitionRowFilterReader::ApplyFiltersWithSampling( return true; } + ctx->sample_rows++; bool all_pass = true; + // in the sampling phase, we need to evaluate all filter nodes, if any node + // fails, the tuple is rejected for (auto &node : ctx->filter_nodes) { if (!EvalFilterNode(ctx, group, desc, row_index, slot, node, true)) { all_pass = false; - break; } } - ctx->sample_rows++; - if (!all_pass) return false; if (ctx->sample_rows >= ctx->sample_target) { for (auto &node : ctx->filter_nodes) { @@ -175,7 +175,7 @@ bool MicroPartitionRowFilterReader::ApplyFiltersWithSampling( }); ctx->sampling = false; } - return true; + return all_pass; } bool MicroPartitionRowFilterReader::ReadTuple(TupleTableSlot *slot) { diff --git a/contrib/pax_storage/src/cpp/storage/orc/orc_group.cc b/contrib/pax_storage/src/cpp/storage/orc/orc_group.cc index d7c8c73d220..0a749c7a8b5 100644 --- a/contrib/pax_storage/src/cpp/storage/orc/orc_group.cc +++ b/contrib/pax_storage/src/cpp/storage/orc/orc_group.cc @@ -293,15 +293,20 @@ std::pair OrcGroup::GetColumnValueNoMissing(size_t column_index, return {0, true}; } - if (column->HasNull() && !nulls_shuffle_[column_index]) { - CalcNullShuffle(column, column_index); - } + if (column->HasNull()) { + const auto &bm = column->GetBitmap(); + Assert(bm); + if (!bm->Test(row_index)) { + return {0, true}; + } - if (nulls_shuffle_[column_index]) { + // if not null value, calculate the null offsets array for each row + if (!nulls_shuffle_[column_index]) { + CalcNullShuffle(column, column_index); + } null_counts = nulls_shuffle_[column_index][row_index]; } - - return GetColumnDatum(column, row_index, &null_counts); + return {column->GetDatum(row_index - null_counts), false}; } void OrcGroup::CalcNullShuffle(PaxColumn *column, size_t column_index) { diff --git a/contrib/pax_storage/src/test/regress/expected/gp_runtime_filter.out b/contrib/pax_storage/src/test/regress/expected/gp_runtime_filter.out index 8ffc0086820..e08b4633391 100644 --- a/contrib/pax_storage/src/test/regress/expected/gp_runtime_filter.out +++ b/contrib/pax_storage/src/test/regress/expected/gp_runtime_filter.out @@ -261,8 +261,8 @@ DROP TABLE IF EXISTS t1; NOTICE: table "t1" does not exist, skipping DROP TABLE IF EXISTS t2; NOTICE: table "t2" does not exist, skipping -CREATE TABLE t1(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, orientation=column) distributed by (c1); -CREATE TABLE t2(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, orientation=column) distributed REPLICATED; +CREATE TABLE t1(c1 int, c2 int, c3 int, c4 int, c5 int) distributed by (c1); +CREATE TABLE t2(c1 int, c2 int, c3 int, c4 int, c5 int) distributed REPLICATED; INSERT INTO t1 VALUES (5,5,5,5,5); INSERT INTO t2 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4); INSERT INTO t1 SELECT * FROM t1; diff --git a/contrib/pax_storage/src/test/regress/sql/gp_runtime_filter.sql b/contrib/pax_storage/src/test/regress/sql/gp_runtime_filter.sql index 3f238bf5d69..818759fdd0d 100644 --- a/contrib/pax_storage/src/test/regress/sql/gp_runtime_filter.sql +++ b/contrib/pax_storage/src/test/regress/sql/gp_runtime_filter.sql @@ -85,8 +85,8 @@ SET enable_parallel TO off; -- case 1: join on distribution table and replicated table. DROP TABLE IF EXISTS t1; DROP TABLE IF EXISTS t2; -CREATE TABLE t1(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, orientation=column) distributed by (c1); -CREATE TABLE t2(c1 int, c2 int, c3 int, c4 int, c5 int) with (appendonly=true, orientation=column) distributed REPLICATED; +CREATE TABLE t1(c1 int, c2 int, c3 int, c4 int, c5 int) distributed by (c1); +CREATE TABLE t2(c1 int, c2 int, c3 int, c4 int, c5 int) distributed REPLICATED; INSERT INTO t1 VALUES (5,5,5,5,5); INSERT INTO t2 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4);