diff --git a/contrib/pax_storage/src/cpp/access/pax_scanner.cc b/contrib/pax_storage/src/cpp/access/pax_scanner.cc index 5a354e6fa0c..7afa8e7931c 100644 --- a/contrib/pax_storage/src/cpp/access/pax_scanner.cc +++ b/contrib/pax_storage/src/cpp/access/pax_scanner.cc @@ -372,7 +372,7 @@ TableScanDesc PaxScanDesc::BeginScanExtractColumns( && !(flags & SO_TYPE_VECTOR) #endif ) { - filter->InitRowFilter(rel, ps, filter->GetColumnProjection()); + filter->InitRowFilter(rel, ps, filter->GetColumnProjection(), key, nkeys); } } return BeginScan(rel, snapshot, nkeys, key, parallel_scan, flags, diff --git a/contrib/pax_storage/src/cpp/comm/cbdb_api.h b/contrib/pax_storage/src/cpp/comm/cbdb_api.h index 30db87143a1..9b65c2e1041 100644 --- a/contrib/pax_storage/src/cpp/comm/cbdb_api.h +++ b/contrib/pax_storage/src/cpp/comm/cbdb_api.h @@ -80,6 +80,7 @@ extern "C" { #include "commands/progress.h" #include "commands/tablecmds.h" #include "funcapi.h" +#include "lib/bloomfilter.h" #include "miscadmin.h" #include "nodes/bitmapset.h" #include "nodes/execnodes.h" diff --git a/contrib/pax_storage/src/cpp/storage/filter/pax_filter.cc b/contrib/pax_storage/src/cpp/storage/filter/pax_filter.cc index 5f19ab58400..d70b02f1a84 100644 --- a/contrib/pax_storage/src/cpp/storage/filter/pax_filter.cc +++ b/contrib/pax_storage/src/cpp/storage/filter/pax_filter.cc @@ -43,9 +43,8 @@ namespace pax { PaxFilter::PaxFilter() : sparse_filter_(nullptr), row_filter_(nullptr) {} -void PaxFilter::InitSparseFilter(Relation relation, List *quals, - ScanKey key, int nkeys, - bool allow_fallback_to_pg) { +void PaxFilter::InitSparseFilter(Relation relation, List *quals, ScanKey key, + int nkeys, bool allow_fallback_to_pg) { Assert(!sparse_filter_); sparse_filter_ = std::make_shared(relation, allow_fallback_to_pg); @@ -123,10 +122,11 @@ void PaxFilter::SetColumnProjection(const std::vector &cols, int natts) { } void PaxFilter::InitRowFilter(Relation relation, PlanState *ps, - const std::vector &projection) { + const std::vector &projection, ScanKey key, + int nkeys) { Assert(!row_filter_); row_filter_ = std::make_shared(); - if (!row_filter_->Initialize(relation, ps, projection)) { + if (!row_filter_->Initialize(relation, ps, projection, key, nkeys)) { row_filter_ = nullptr; } } diff --git a/contrib/pax_storage/src/cpp/storage/filter/pax_filter.h b/contrib/pax_storage/src/cpp/storage/filter/pax_filter.h index ebc2fff8538..0d5e1c08e14 100644 --- a/contrib/pax_storage/src/cpp/storage/filter/pax_filter.h +++ b/contrib/pax_storage/src/cpp/storage/filter/pax_filter.h @@ -73,7 +73,8 @@ class PaxFilter final { // The row filter void InitRowFilter(Relation relation, PlanState *ps, - const std::vector &projection); + const std::vector &projection, ScanKey key, + int nkeys); std::shared_ptr GetRowFilter(); void LogStatistics() const; diff --git a/contrib/pax_storage/src/cpp/storage/filter/pax_row_filter.cc b/contrib/pax_storage/src/cpp/storage/filter/pax_row_filter.cc index f653ec7406e..927ab33b4d4 100644 --- a/contrib/pax_storage/src/cpp/storage/filter/pax_row_filter.cc +++ b/contrib/pax_storage/src/cpp/storage/filter/pax_row_filter.cc @@ -26,6 +26,7 @@ */ #include "storage/filter/pax_row_filter.h" + #include "comm/cbdb_wrappers.h" namespace paxc { @@ -45,7 +46,8 @@ static inline void FindAttrsInQual(Node *qual, bool *proj, int ncol, } static bool BuildExecutionFilterForColumns(Relation rel, PlanState *ps, - pax::ExecutionFilterContext *ctx) { + pax::ExecutionFilterContext *ctx, + ScanKey key, int nkeys) { List *qual = ps->plan->qual; List **qual_list; ListCell *lc; @@ -53,11 +55,48 @@ static bool BuildExecutionFilterForColumns(Relation rel, PlanState *ps, int *qual_atts; int natts = RelationGetNumberOfAttributes(rel); - if (!qual || !IsA(qual, List)) return false; + int ret = false; + + if (key && nkeys > 0) { + // We don't need to support DynamicSeqScanState here. Even if the plan uses + // DynamicSeqScanNode for partitioned tables, it's always a regular SeqScan + // on a single table. So this will always be a SeqScanState. + if (nodeTag(ps) != T_SeqScanState) { + elog(ERROR, "runtime filter only support seqscan state, but got %d", + nodeTag(ps)); + } + + for (int i = 0; i < nkeys; i++) { + if (key[i].sk_flags & SK_BLOOM_FILTER) { + ctx->runtime_bloom_keys.emplace_back(key[i]); + ret = true; + } + } + + // register bloom filters + for (int i = 0; i < (int)ctx->runtime_bloom_keys.size(); ++i) { + pax::ExecutionFilterContext::FilterNode node; + node.kind = pax::ExecutionFilterContext::FilterKind::kBloom; + node.index = i; + ctx->filter_nodes.emplace_back(node); + } + + if (ps->instrument) { + ps->instrument->prf_work = true; + } + ctx->ps = ps; + + // set filter_in_seqscan to false, so that the filter will not be executed + // in SeqNext(), but will be executed in pax_row_filter + auto seqscan = (SeqScanState *)ps; + seqscan->filter_in_seqscan = false; + } + + if (!qual || !IsA(qual, List)) return ret; if (list_length(qual) == 1 && IsA(linitial(qual), BoolExpr)) { auto boolexpr = (BoolExpr *)linitial(qual); - if (boolexpr->boolop != AND_EXPR) return false; + if (boolexpr->boolop != AND_EXPR) return ret; qual = boolexpr->args; } Assert(IsA(qual, List)); @@ -98,6 +137,11 @@ static bool BuildExecutionFilterForColumns(Relation rel, PlanState *ps, if (!qual_list[i]) continue; ctx->estates[k] = ExecInitQual(qual_list[i], ps); ctx->attnos[k] = i; + // register expr filter node (by index k) + pax::ExecutionFilterContext::FilterNode node; + node.kind = pax::ExecutionFilterContext::FilterKind::kExpr; + node.index = k; + ctx->filter_nodes.emplace_back(node); list_free(qual_list[i]); k++; } @@ -108,7 +152,11 @@ static bool BuildExecutionFilterForColumns(Relation rel, PlanState *ps, list_free(qual_list[0]); } - Assert(ctx->size > 0 || ctx->estate_final); + Assert(ctx->size > 0 || ctx->estate_final || + ctx->runtime_bloom_keys.size() > 0); + + // remove qual from plan state, so that the qual will not be executed in + // executor, but will be executed in pax_row_filter ps->qual = nullptr; pfree(proj); @@ -117,20 +165,19 @@ static bool BuildExecutionFilterForColumns(Relation rel, PlanState *ps, return true; } -} // namespace paxc - +} // namespace paxc namespace pax { PaxRowFilter::PaxRowFilter() {} -bool PaxRowFilter::Initialize(Relation rel, PlanState *ps, const std::vector &projection) { +bool PaxRowFilter::Initialize(Relation rel, PlanState *ps, + const std::vector &projection, ScanKey key, + int nkeys) { bool ok = false; - + CBDB_WRAP_START; - { - ok = paxc::BuildExecutionFilterForColumns(rel, ps, &efctx_); - } + { ok = paxc::BuildExecutionFilterForColumns(rel, ps, &efctx_, key, nkeys); } CBDB_WRAP_END; if (ok) { @@ -140,7 +187,8 @@ bool PaxRowFilter::Initialize(Relation rel, PlanState *ps, const std::vector &projection) { +void PaxRowFilter::FillRemainingColumns(Relation rel, + const std::vector &projection) { int natts = RelationGetNumberOfAttributes(rel); auto proj_len = projection.size(); std::vector atts(natts); @@ -162,5 +210,4 @@ void PaxRowFilter::FillRemainingColumns(Relation rel, const std::vector &p } } - } // namespace pax \ No newline at end of file diff --git a/contrib/pax_storage/src/cpp/storage/filter/pax_row_filter.h b/contrib/pax_storage/src/cpp/storage/filter/pax_row_filter.h index d5a74f7619a..f4e3f85166f 100644 --- a/contrib/pax_storage/src/cpp/storage/filter/pax_row_filter.h +++ b/contrib/pax_storage/src/cpp/storage/filter/pax_row_filter.h @@ -42,16 +42,38 @@ struct ExecutionFilterContext { ExprState *estate_final = nullptr; ExprState **estates; AttrNumber *attnos; + PlanState *ps; int size = 0; inline bool HasExecutionFilter() const { return size > 0 || estate_final; } + + // runtime bloom filters pushed down via SeqScanState->filters + // (SK_BLOOM_FILTER) + std::vector runtime_bloom_keys; + + // unified filter nodes (expr + bloom) for execution ordering + enum class FilterKind { kExpr, kBloom }; + struct FilterNode { + FilterKind kind; + int index; // index in estates (for kExpr) or in runtime_bloom_keys (for + // kBloom) + uint64 tested = 0; // number of rows tested during sampling + uint64 passed = 0; // number of rows passed during sampling + double score = 1.0; // pass rate used for ordering (lower is better) + }; + std::vector filter_nodes; + + // sampling control to determine filter order + bool sampling = true; + uint64 sample_target = 65536; // number of rows for sampling phase + uint64 sample_rows = 0; // rows seen in sampling }; class PaxRowFilter final { -public: + public: PaxRowFilter(); bool Initialize(Relation rel, PlanState *ps, - const std::vector &projection); + const std::vector &projection, ScanKey key, int nkeys); inline const ExecutionFilterContext *GetExecutionFilterContext() const { return &efctx_; @@ -60,11 +82,11 @@ class PaxRowFilter final { inline const std::vector &GetRemainingColumns() const { return remaining_attnos_; } - -private: + + private: void FillRemainingColumns(Relation rel, const std::vector &projection); -private: + private: ExecutionFilterContext efctx_; // all selected columns - single row filting columns // before running final cross columns expression filtering, the remaining @@ -72,5 +94,4 @@ class PaxRowFilter final { std::vector remaining_attnos_; }; - -}; \ No newline at end of file +}; // namespace pax \ No newline at end of file diff --git a/contrib/pax_storage/src/cpp/storage/filter/pax_sparse_pg_path.cc b/contrib/pax_storage/src/cpp/storage/filter/pax_sparse_pg_path.cc index 3a7bc64f389..c979c7a0fce 100644 --- a/contrib/pax_storage/src/cpp/storage/filter/pax_sparse_pg_path.cc +++ b/contrib/pax_storage/src/cpp/storage/filter/pax_sparse_pg_path.cc @@ -50,12 +50,14 @@ void PaxSparseFilter::Initialize(List *quals, ScanKey key, int nkeys) { // walk scan key and only support min/max filter now for (int i = 0; i < nkeys; i++) { - // TODO: support bloom filter in PaxFilter - // but now just skip it, SeqNext() will check bloom filter in PassByBloomFilter() + // Now just skip bloom filter here, it will be handled in PaxRowFilter. if (key[i].sk_flags & SK_BLOOM_FILTER) { continue; } + // PushdownRuntimeFilter only support BTGreaterEqualStrategyNumber and + // BTLessEqualStrategyNumber. If the strategy is not supported, skip orther + // strategies. if (key[i].sk_strategy != BTGreaterEqualStrategyNumber && key[i].sk_strategy != BTLessEqualStrategyNumber) { continue; diff --git a/contrib/pax_storage/src/cpp/storage/micro_partition_row_filter_reader.cc b/contrib/pax_storage/src/cpp/storage/micro_partition_row_filter_reader.cc index c2daac1ac5d..9eac653aebc 100644 --- a/contrib/pax_storage/src/cpp/storage/micro_partition_row_filter_reader.cc +++ b/contrib/pax_storage/src/cpp/storage/micro_partition_row_filter_reader.cc @@ -27,12 +27,14 @@ #include "storage/micro_partition_row_filter_reader.h" +#include + #include "comm/guc.h" #include "comm/log.h" #include "comm/pax_memory.h" #include "storage/filter/pax_filter.h" -#include "storage/filter/pax_sparse_filter.h" #include "storage/filter/pax_row_filter.h" +#include "storage/filter/pax_sparse_filter.h" #include "storage/pax_defined.h" #include "storage/pax_itemptr.h" @@ -73,6 +75,109 @@ MicroPartitionRowFilterReader::GetNextGroup(TupleDesc desc) { return group_; } +void MicroPartitionRowFilterReader::LoadExprFilterColumns( + MicroPartitionReader::Group *group, TupleDesc desc, + const ExecutionFilterContext *ctx, size_t row_index, TupleTableSlot *slot) { + // There will not be duplicate attnos here because the attnos in ctx come from + // qual expressions. For each column index, there is at most one corresponding + // attno in ctx->attnos, so no attno appears more than once. As a result, this + // loop does not load the same column multiple times. + for (int i = 0; i < ctx->size; i++) { + auto attno = ctx->attnos[i]; + Assert(attno > 0); + std::tie(slot->tts_values[attno - 1], slot->tts_isnull[attno - 1]) = + group->GetColumnValue(desc, attno - 1, row_index); + } +} + +bool MicroPartitionRowFilterReader::EvalBloomNode( + const ExecutionFilterContext *ctx, MicroPartitionReader::Group *group, + TupleDesc desc, size_t row_index, int bloom_index) { + Assert(bloom_index >= 0 && + (size_t)bloom_index < ctx->runtime_bloom_keys.size()); + const auto &skd = ctx->runtime_bloom_keys[bloom_index]; + const ScanKey sk = const_cast(&skd); + bool isnull = false; + Datum val; + std::tie(val, isnull) = + group->GetColumnValue(desc, sk->sk_attno - 1, row_index); + if (isnull) return true; + bloom_filter *bf = (bloom_filter *)DatumGetPointer(sk->sk_argument); + return !bloom_lacks_element(bf, (unsigned char *)&val, sizeof(Datum)); +} + +bool MicroPartitionRowFilterReader::EvalExprNode( + const ExecutionFilterContext *ctx, TupleTableSlot *slot, int expr_index) { + return TestRowScanInternal(slot, ctx->estates[expr_index], + ctx->attnos[expr_index]); +} + +// Execute a filter node. +// During the sampling phase, updates the filter's pass rate statistics. +bool MicroPartitionRowFilterReader::EvalFilterNode( + ExecutionFilterContext *ctx, MicroPartitionReader::Group *group, + TupleDesc desc, size_t row_index, TupleTableSlot *slot, + ExecutionFilterContext::FilterNode &node, bool update_stats) { + bool pass = true; + if (node.kind == ExecutionFilterContext::FilterKind::kBloom) { + pass = EvalBloomNode(ctx, group, desc, row_index, node.index); + if (ctx->ps->instrument && !pass) ctx->ps->instrument->nfilteredPRF += 1; + } else { + pass = EvalExprNode(ctx, slot, node.index); + } + if (update_stats) { + node.tested++; + node.passed += pass ? 1 : 0; + } + return pass; +} + +// Applies the row filter nodes to the current tuple in two phases: Sampling and +// Filtering. +// In the sampling phase, pass rates for each filter expression are collected on +// the first 64k rows, then the filters are sorted by effectiveness (lower pass +// rate first) for optimal filtering order. +// In the filtering phase, filters are applied in the determined order with +// short-circuit evaluation; a failure in any filter causes immediate rejection +// of the tuple. +bool MicroPartitionRowFilterReader::ApplyFiltersWithSampling( + ExecutionFilterContext *ctx, MicroPartitionReader::Group *group, + TupleDesc desc, size_t row_index, TupleTableSlot *slot) { + if (!ctx->sampling) { + for (auto &node : ctx->filter_nodes) { + if (!EvalFilterNode(ctx, group, desc, row_index, slot, node, false)) { + return false; + } + } + return true; + } + + ctx->sample_rows++; + bool all_pass = true; + // in the sampling phase, we need to evaluate all filter nodes, if any node + // fails, the tuple is rejected + for (auto &node : ctx->filter_nodes) { + if (!EvalFilterNode(ctx, group, desc, row_index, slot, node, true)) { + all_pass = false; + } + } + + if (ctx->sample_rows >= ctx->sample_target) { + for (auto &node : ctx->filter_nodes) { + node.score = + (node.tested == 0) ? 1.0 : (double)node.passed / (double)node.tested; + } + std::stable_sort(ctx->filter_nodes.begin(), ctx->filter_nodes.end(), + [](const auto &a, const auto &b) { + // Lower pass rate first (better selectivity) + if (a.score != b.score) return a.score < b.score; + return (int)a.kind < (int)b.kind; + }); + ctx->sampling = false; + } + return all_pass; +} + bool MicroPartitionRowFilterReader::ReadTuple(TupleTableSlot *slot) { auto g = group_; Assert(filter_->GetRowFilter()); @@ -108,16 +213,14 @@ bool MicroPartitionRowFilterReader::ReadTuple(TupleTableSlot *slot) { } } - for (int i = 0; i < ctx->size; i++) { - auto attno = ctx->attnos[i]; - Assert(attno > 0); - std::tie(slot->tts_values[attno - 1], slot->tts_isnull[attno - 1]) = - g->GetColumnValue(desc, attno - 1, current_group_row_index_); - if (!TestRowScanInternal(slot, ctx->estates[i], attno)) { - current_group_row_index_++; - goto retry_next; - } + LoadExprFilterColumns(g.get(), desc, ctx, current_group_row_index_, slot); + if (!ApplyFiltersWithSampling(const_cast(ctx), + g.get(), desc, current_group_row_index_, + slot)) { + current_group_row_index_++; + goto retry_next; } + for (auto attno : remaining_columns) { std::tie(slot->tts_values[attno - 1], slot->tts_isnull[attno - 1]) = g->GetColumnValue(desc, attno - 1, current_group_row_index_); diff --git a/contrib/pax_storage/src/cpp/storage/micro_partition_row_filter_reader.h b/contrib/pax_storage/src/cpp/storage/micro_partition_row_filter_reader.h index 0e691182087..f1a3cf448bd 100644 --- a/contrib/pax_storage/src/cpp/storage/micro_partition_row_filter_reader.h +++ b/contrib/pax_storage/src/cpp/storage/micro_partition_row_filter_reader.h @@ -28,6 +28,7 @@ #pragma once #include "storage/micro_partition.h" +#include "storage/filter/pax_row_filter.h" namespace pax { class MicroPartitionRowFilterReader : public MicroPartitionReaderProxy { @@ -50,6 +51,33 @@ class MicroPartitionRowFilterReader : public MicroPartitionReaderProxy { bool TestRowScanInternal(TupleTableSlot *slot, ExprState *estate, AttrNumber attno); + // Ensure all columns needed by expr filters are loaded into the slot + inline void LoadExprFilterColumns(MicroPartitionReader::Group *group, TupleDesc desc, + const ExecutionFilterContext *ctx, + size_t row_index, TupleTableSlot *slot); + + // Evaluate a single bloom filter node + inline bool EvalBloomNode(const ExecutionFilterContext *ctx, + MicroPartitionReader::Group *group, TupleDesc desc, + size_t row_index, int bloom_index); + + // Evaluate a single expr filter node + inline bool EvalExprNode(const ExecutionFilterContext *ctx, TupleTableSlot *slot, + int expr_index); + + // Evaluate a unified filter node and optionally update sampling stats + inline bool EvalFilterNode(ExecutionFilterContext *ctx, + MicroPartitionReader::Group *group, TupleDesc desc, + size_t row_index, TupleTableSlot *slot, + ExecutionFilterContext::FilterNode &node, + bool update_stats); + + // Apply all filters with sampling and dynamic ordering + bool ApplyFiltersWithSampling(ExecutionFilterContext *ctx, + MicroPartitionReader::Group *group, + TupleDesc desc, size_t row_index, + TupleTableSlot *slot); + // filter is referenced only, the reader doesn't own it. std::shared_ptr filter_; std::shared_ptr group_; diff --git a/contrib/pax_storage/src/cpp/storage/orc/orc_group.cc b/contrib/pax_storage/src/cpp/storage/orc/orc_group.cc index d7c8c73d220..0a749c7a8b5 100644 --- a/contrib/pax_storage/src/cpp/storage/orc/orc_group.cc +++ b/contrib/pax_storage/src/cpp/storage/orc/orc_group.cc @@ -293,15 +293,20 @@ std::pair OrcGroup::GetColumnValueNoMissing(size_t column_index, return {0, true}; } - if (column->HasNull() && !nulls_shuffle_[column_index]) { - CalcNullShuffle(column, column_index); - } + if (column->HasNull()) { + const auto &bm = column->GetBitmap(); + Assert(bm); + if (!bm->Test(row_index)) { + return {0, true}; + } - if (nulls_shuffle_[column_index]) { + // if not null value, calculate the null offsets array for each row + if (!nulls_shuffle_[column_index]) { + CalcNullShuffle(column, column_index); + } null_counts = nulls_shuffle_[column_index][row_index]; } - - return GetColumnDatum(column, row_index, &null_counts); + return {column->GetDatum(row_index - null_counts), false}; } void OrcGroup::CalcNullShuffle(PaxColumn *column, size_t column_index) { diff --git a/contrib/pax_storage/src/test/regress/expected/gp_runtime_filter.out b/contrib/pax_storage/src/test/regress/expected/gp_runtime_filter.out index 6af798604ff..e08b4633391 100644 --- a/contrib/pax_storage/src/test/regress/expected/gp_runtime_filter.out +++ b/contrib/pax_storage/src/test/regress/expected/gp_runtime_filter.out @@ -1,3 +1,7 @@ +-- start_matchignore +-- m/^.*Extra Text:.*/ +-- m/^.*Buckets:.*/ +-- end_matchignore -- Disable ORCA SET optimizer TO off; -- Test Suit 1: runtime filter main case @@ -250,6 +254,363 @@ SELECT COUNT(*) FROM dim_rf 1600 (1 row) +-- Test bloom filter pushdown +SET enable_parallel TO off; +-- case 1: join on distribution table and replicated table. +DROP TABLE IF EXISTS t1; +NOTICE: table "t1" does not exist, skipping +DROP TABLE IF EXISTS t2; +NOTICE: table "t2" does not exist, skipping +CREATE TABLE t1(c1 int, c2 int, c3 int, c4 int, c5 int) distributed by (c1); +CREATE TABLE t2(c1 int, c2 int, c3 int, c4 int, c5 int) distributed REPLICATED; +INSERT INTO t1 VALUES (5,5,5,5,5); +INSERT INTO t2 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4); +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t2 select * FROM t2; +INSERT INTO t2 select * FROM t2; +INSERT INTO t2 select * FROM t2; +ANALYZE; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2; + QUERY PLAN +------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) (actual rows=0 loops=1) + -> Hash Join (actual rows=0 loops=1) + Hash Cond: (t1.c2 = t2.c2) + Extra Text: (seg2) Hash chain length 8.0 avg, 8 max, using 4 of 524288 buckets. + -> Seq Scan on t1 (actual rows=128 loops=1) + -> Hash (actual rows=32 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4098kB + -> Seq Scan on t2 (actual rows=32 loops=1) + Optimizer: Postgres query optimizer +(9 rows) + +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2; + QUERY PLAN +------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) (actual rows=0 loops=1) + -> Hash Join (actual rows=0 loops=1) + Hash Cond: (t1.c2 = t2.c2) + Extra Text: (seg2) Hash chain length 8.0 avg, 8 max, using 4 of 524288 buckets. + -> Seq Scan on t1 (actual rows=1 loops=1) + Rows Removed by Pushdown Runtime Filter: 127 + -> Hash (actual rows=32 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4098kB + -> Seq Scan on t2 (actual rows=32 loops=1) + Optimizer: Postgres query optimizer +(10 rows) + +RESET gp_enable_runtime_filter_pushdown; +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +-- case 2: join on partition table and replicated table. +CREATE TABLE t1 (c1 INT, c2 INT) DISTRIBUTED BY (c1) PARTITION BY RANGE (c2) (START (1) END (100) EVERY (50)); +CREATE TABLE t2 (c1 INT, c2 INT) DISTRIBUTED REPLICATED; +INSERT INTO t1 SELECT generate_series(1, 99), generate_series(1, 99); +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t2 SELECT generate_series(1, 5), generate_series(1, 5); +INSERT INTO t2 SELECT generate_series(51, 51), generate_series(51, 51); +ANALYZE; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT * FROM t1, t2 WHERE t1.c2 = t2.c2; + QUERY PLAN +------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) (actual rows=96 loops=1) + -> Hash Join (actual rows=64 loops=1) + Hash Cond: (t1.c2 = t2.c2) + Extra Text: (seg0) Hash chain length 1.0 avg, 1 max, using 6 of 524288 buckets. + -> Append (actual rows=608 loops=1) + Partition Selectors: $0 + -> Seq Scan on t1_1_prt_1 t1_1 (actual rows=288 loops=1) + -> Seq Scan on t1_1_prt_2 t1_2 (actual rows=320 loops=1) + -> Hash (actual rows=6 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4097kB + -> Partition Selector (selector id: $0) (actual rows=6 loops=1) + -> Seq Scan on t2 (actual rows=6 loops=1) + Optimizer: Postgres query optimizer +(13 rows) + +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT * FROM t1, t2 WHERE t1.c2 = t2.c2; + QUERY PLAN +------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) (actual rows=96 loops=1) + -> Hash Join (actual rows=64 loops=1) + Hash Cond: (t1.c2 = t2.c2) + Extra Text: (seg0) Hash chain length 1.0 avg, 1 max, using 6 of 524288 buckets. + -> Append (actual rows=64 loops=1) + Partition Selectors: $0 + -> Seq Scan on t1_1_prt_1 t1_1 (actual rows=48 loops=1) + Rows Removed by Pushdown Runtime Filter: 240 + -> Seq Scan on t1_1_prt_2 t1_2 (actual rows=16 loops=1) + Rows Removed by Pushdown Runtime Filter: 304 + -> Hash (actual rows=6 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4097kB + -> Partition Selector (selector id: $0) (actual rows=6 loops=1) + -> Seq Scan on t2 (actual rows=6 loops=1) + Optimizer: Postgres query optimizer +(15 rows) + +RESET gp_enable_runtime_filter_pushdown; +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +-- case 3: bug fix with explain +DROP TABLE IF EXISTS test_tablesample1; +NOTICE: table "test_tablesample1" does not exist, skipping +CREATE TABLE test_tablesample1 (dist int, id int, name text) DISTRIBUTED BY (dist); +INSERT INTO test_tablesample1 SELECT 0, i, repeat(i::text, 875) FROM generate_series(0, 9) s(i) ORDER BY i; +INSERT INTO test_tablesample1 SELECT 3, i, repeat(i::text, 875) FROM generate_series(10, 19) s(i) ORDER BY i; +INSERT INTO test_tablesample1 SELECT 5, i, repeat(i::text, 875) FROM generate_series(20, 29) s(i) ORDER BY i; +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (COSTS OFF) SELECT id FROM test_tablesample1 TABLESAMPLE SYSTEM (50) REPEATABLE (2); + QUERY PLAN +-------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) + -> Sample Scan on test_tablesample1 + Sampling: system ('50'::real) REPEATABLE ('2'::double precision) + Optimizer: Postgres query optimizer +(4 rows) + +RESET gp_enable_runtime_filter_pushdown; +DROP TABLE IF EXISTS test_tablesample1; +-- case 4: show debug info only when gp_enable_runtime_filter_pushdown is on +DROP TABLE IF EXISTS t1; +NOTICE: table "t1" does not exist, skipping +DROP TABLE IF EXISTS t2; +NOTICE: table "t2" does not exist, skipping +CREATE TABLE t1(c1 int, c2 int); +CREATE TABLE t2(c1 int, c2 int); +INSERT INTO t1 SELECT GENERATE_SERIES(1, 1000), GENERATE_SERIES(1, 1000); +INSERT INTO t2 SELECT * FROM t1; +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT count(t1.c2) FROM t1, t2 WHERE t1.c1 = t2.c1; + QUERY PLAN +--------------------------------------------------------------------------------------------------------- + Finalize Aggregate (actual rows=1 loops=1) + -> Gather Motion 3:1 (slice1; segments: 3) (actual rows=3 loops=1) + -> Partial Aggregate (actual rows=1 loops=1) + -> Hash Join (actual rows=340 loops=1) + Hash Cond: (t1.c1 = t2.c1) + Extra Text: (seg2) Hash chain length 1.0 avg, 1 max, using 340 of 524288 buckets. + -> Seq Scan on t1 (actual rows=340 loops=1) + Rows Removed by Pushdown Runtime Filter: 0 + -> Hash (actual rows=340 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4108kB + -> Seq Scan on t2 (actual rows=340 loops=1) + Optimizer: Postgres query optimizer +(12 rows) + +RESET gp_enable_runtime_filter_pushdown; +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +-- case 5: hashjoin + result + seqsacn +DROP TABLE IF EXISTS t1; +NOTICE: table "t1" does not exist, skipping +DROP TABLE IF EXISTS t2; +NOTICE: table "t2" does not exist, skipping +CREATE TABLE t1(c1 int, c2 int, c3 char(50), c4 char(50), c5 char(50)) DISTRIBUTED REPLICATED; +CREATE TABLE t2(c1 int, c2 int, c3 char(50), c4 char(50), c5 char(50)); +INSERT INTO t1 VALUES (5,5,5,5,5), (3,3,3,3,3), (4,4,4,4,4); +INSERT INTO t2 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4); +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t2 select * FROM t2; +ANALYZE; +SET optimizer TO on; +SET gp_enable_runtime_filter_pushdown TO off; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT t1.c3 FROM t1, t2 WHERE t1.c1 = t2.c1; + QUERY PLAN +------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) (actual rows=32 loops=1) + -> Hash Join (actual rows=32 loops=1) + Hash Cond: (t1.c1 = t2.c1) + Extra Text: (seg0) Hash chain length 2.0 avg, 2 max, using 3 of 524288 buckets. + -> Result (actual rows=16 loops=1) + -> Seq Scan on t1 (actual rows=24 loops=1) + -> Hash (actual rows=6 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4097kB + -> Seq Scan on t2 (actual rows=6 loops=1) + Optimizer: Pivotal Optimizer (GPORCA) +(10 rows) + +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT t1.c3 FROM t1, t2 WHERE t1.c1 = t2.c1; + QUERY PLAN +------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) (actual rows=32 loops=1) + -> Hash Join (actual rows=32 loops=1) + Hash Cond: (t1.c1 = t2.c1) + Extra Text: (seg0) Hash chain length 2.0 avg, 2 max, using 3 of 524288 buckets. + -> Result (actual rows=16 loops=1) + -> Seq Scan on t1 (actual rows=16 loops=1) + Rows Removed by Pushdown Runtime Filter: 8 + -> Hash (actual rows=6 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4097kB + -> Seq Scan on t2 (actual rows=6 loops=1) + Optimizer: Pivotal Optimizer (GPORCA) +(11 rows) + +RESET gp_enable_runtime_filter_pushdown; +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +-- case 6: hashjoin + hashjoin + seqscan +DROP TABLE IF EXISTS t1; +NOTICE: table "t1" does not exist, skipping +DROP TABLE IF EXISTS t2; +NOTICE: table "t2" does not exist, skipping +DROP TABLE IF EXISTS t3; +NOTICE: table "t3" does not exist, skipping +CREATE TABLE t1(c1 int, c2 int, c3 char(50), c4 char(50), c5 char(50)) DISTRIBUTED REPLICATED; +CREATE TABLE t2(c1 int, c2 int, c3 char(50), c4 char(50), c5 char(50)) DISTRIBUTED REPLICATED; +CREATE TABLE t3(c1 int, c2 int, c3 char(50), c4 char(50), c5 char(50)) DISTRIBUTED REPLICATED; +INSERT INTO t1 VALUES (1,1,1,1,1), (2,2,2,2,2), (5,5,5,5,5); +INSERT INTO t2 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4); +INSERT INTO t3 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4); +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t2 select * FROM t2; +INSERT INTO t2 select * FROM t2; +INSERT INTO t3 select * FROM t3; +ANALYZE; +SET optimizer TO off; +SET gp_enable_runtime_filter_pushdown TO off; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT * FROM t1, t2, t3 WHERE t1.c1 = t2.c1 AND t1.c2 = t3.c2; + QUERY PLAN +--------------------------------------------------------------------------------------- + Gather Motion 1:1 (slice1; segments: 1) (actual rows=256 loops=1) + -> Hash Join (actual rows=256 loops=1) + Hash Cond: (t1.c1 = t2.c1) + Extra Text: Hash chain length 4.0 avg, 4 max, using 4 of 32768 buckets. + -> Hash Join (actual rows=64 loops=1) + Hash Cond: (t1.c2 = t3.c2) + Extra Text: Hash chain length 2.0 avg, 2 max, using 4 of 32768 buckets. + -> Seq Scan on t1 (actual rows=48 loops=1) + -> Hash (actual rows=8 loops=1) + Buckets: 32768 Batches: 1 Memory Usage: 258kB + -> Seq Scan on t3 (actual rows=8 loops=1) + -> Hash (actual rows=16 loops=1) + Buckets: 32768 Batches: 1 Memory Usage: 260kB + -> Seq Scan on t2 (actual rows=16 loops=1) + Optimizer: Postgres query optimizer +(15 rows) + +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT * FROM t1, t2, t3 WHERE t1.c1 = t2.c1 AND t1.c2 = t3.c2; + QUERY PLAN +--------------------------------------------------------------------------------------- + Gather Motion 1:1 (slice1; segments: 1) (actual rows=256 loops=1) + -> Hash Join (actual rows=256 loops=1) + Hash Cond: (t1.c1 = t2.c1) + Extra Text: Hash chain length 4.0 avg, 4 max, using 4 of 32768 buckets. + -> Hash Join (actual rows=64 loops=1) + Hash Cond: (t1.c2 = t3.c2) + Extra Text: Hash chain length 2.0 avg, 2 max, using 4 of 32768 buckets. + -> Seq Scan on t1 (actual rows=32 loops=1) + Rows Removed by Pushdown Runtime Filter: 16 + -> Hash (actual rows=8 loops=1) + Buckets: 32768 Batches: 1 Memory Usage: 258kB + -> Seq Scan on t3 (actual rows=8 loops=1) + -> Hash (actual rows=16 loops=1) + Buckets: 32768 Batches: 1 Memory Usage: 260kB + -> Seq Scan on t2 (actual rows=16 loops=1) + Optimizer: Postgres query optimizer +(16 rows) + +RESET gp_enable_runtime_filter_pushdown; +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +DROP TABLE IF EXISTS t3; +-- case 7: scan partition table with dynamic scan +CREATE TABLE t1 (c1 INT, c2 INT) DISTRIBUTED BY (c1) PARTITION BY RANGE (c2) (START (1) END (100) EVERY (50)); +CREATE TABLE t2 (c1 INT, c2 INT) DISTRIBUTED REPLICATED; +INSERT INTO t1 SELECT generate_series(1, 99), generate_series(1, 99); +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t2 SELECT generate_series(1, 5), generate_series(1, 5); +INSERT INTO t2 SELECT generate_series(51, 51), generate_series(51, 51); +ANALYZE; +SET optimizer TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT t1.c2 FROM t1, t2 WHERE t1.c2 = t2.c2; + QUERY PLAN +------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) (actual rows=96 loops=1) + -> Hash Join (actual rows=64 loops=1) + Hash Cond: (t1.c2 = t2.c2) + Extra Text: (seg0) Hash chain length 1.0 avg, 1 max, using 6 of 524288 buckets. + -> Dynamic Seq Scan on t1 (actual rows=608 loops=1) + Number of partitions to scan: 2 (out of 2) + Partitions scanned: Avg 2.0 x 3 workers. Max 2 parts (seg0). + -> Hash (actual rows=6 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4097kB + -> Partition Selector (selector id: $0) (actual rows=6 loops=1) + -> Seq Scan on t2 (actual rows=6 loops=1) + Optimizer: GPORCA +(12 rows) + +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT t1.c2 FROM t1, t2 WHERE t1.c2 = t2.c2; + QUERY PLAN +------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) (actual rows=96 loops=1) + -> Hash Join (actual rows=64 loops=1) + Hash Cond: (t1.c2 = t2.c2) + Extra Text: (seg0) Hash chain length 1.0 avg, 1 max, using 6 of 524288 buckets. + -> Dynamic Seq Scan on t1 (actual rows=64 loops=1) + Rows Removed by Pushdown Runtime Filter: 544 + Number of partitions to scan: 2 (out of 2) + Partitions scanned: Avg 2.0 x 3 workers. Max 2 parts (seg0). + -> Hash (actual rows=6 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4097kB + -> Partition Selector (selector id: $0) (actual rows=6 loops=1) + -> Seq Scan on t2 (actual rows=6 loops=1) + Optimizer: GPORCA +(13 rows) + +set pax.enable_row_filter TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT t1.c2 FROM t1, t2 WHERE t1.c2 = t2.c2; + QUERY PLAN +------------------------------------------------------------------------------------------- + Gather Motion 3:1 (slice1; segments: 3) (actual rows=96 loops=1) + -> Hash Join (actual rows=64 loops=1) + Hash Cond: (t1.c2 = t2.c2) + Extra Text: (seg0) Hash chain length 1.0 avg, 1 max, using 6 of 524288 buckets. + -> Dynamic Seq Scan on t1 (actual rows=64 loops=1) + Rows Removed by Pushdown Runtime Filter: 544 + Number of partitions to scan: 2 (out of 2) + Partitions scanned: Avg 2.0 x 3 workers. Max 2 parts (seg0). + -> Hash (actual rows=6 loops=1) + Buckets: 524288 Batches: 1 Memory Usage: 4097kB + -> Partition Selector (selector id: $0) (actual rows=6 loops=1) + -> Seq Scan on t2 (actual rows=6 loops=1) + Optimizer: GPORCA +(13 rows) + +RESET pax.enable_row_filter; +RESET gp_enable_runtime_filter_pushdown; +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +SET optimizer TO off; +RESET enable_parallel; -- Clean up: reset guc SET gp_enable_runtime_filter TO off; SET optimizer TO default; diff --git a/contrib/pax_storage/src/test/regress/sql/gp_runtime_filter.sql b/contrib/pax_storage/src/test/regress/sql/gp_runtime_filter.sql index 876575b78c3..818759fdd0d 100644 --- a/contrib/pax_storage/src/test/regress/sql/gp_runtime_filter.sql +++ b/contrib/pax_storage/src/test/regress/sql/gp_runtime_filter.sql @@ -1,3 +1,7 @@ +-- start_matchignore +-- m/^.*Extra Text:.*/ +-- m/^.*Buckets:.*/ +-- end_matchignore -- Disable ORCA SET optimizer TO off; @@ -75,6 +79,190 @@ SELECT COUNT(*) FROM fact_rf SELECT COUNT(*) FROM dim_rf WHERE dim_rf.did IN (SELECT did FROM fact_rf) AND proj_id < 2; +-- Test bloom filter pushdown +SET enable_parallel TO off; + +-- case 1: join on distribution table and replicated table. +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +CREATE TABLE t1(c1 int, c2 int, c3 int, c4 int, c5 int) distributed by (c1); +CREATE TABLE t2(c1 int, c2 int, c3 int, c4 int, c5 int) distributed REPLICATED; + +INSERT INTO t1 VALUES (5,5,5,5,5); +INSERT INTO t2 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4); + +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; + +INSERT INTO t2 select * FROM t2; +INSERT INTO t2 select * FROM t2; +INSERT INTO t2 select * FROM t2; + +ANALYZE; + +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2; + +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT t1.c3 FROM t1, t2 WHERE t1.c2 = t2.c2; + +RESET gp_enable_runtime_filter_pushdown; + +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; + +-- case 2: join on partition table and replicated table. +CREATE TABLE t1 (c1 INT, c2 INT) DISTRIBUTED BY (c1) PARTITION BY RANGE (c2) (START (1) END (100) EVERY (50)); +CREATE TABLE t2 (c1 INT, c2 INT) DISTRIBUTED REPLICATED; +INSERT INTO t1 SELECT generate_series(1, 99), generate_series(1, 99); +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t2 SELECT generate_series(1, 5), generate_series(1, 5); +INSERT INTO t2 SELECT generate_series(51, 51), generate_series(51, 51); +ANALYZE; + +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT * FROM t1, t2 WHERE t1.c2 = t2.c2; + +SET gp_enable_runtime_filter_pushdown TO on; + +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT * FROM t1, t2 WHERE t1.c2 = t2.c2; + +RESET gp_enable_runtime_filter_pushdown; + +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; + +-- case 3: bug fix with explain +DROP TABLE IF EXISTS test_tablesample1; +CREATE TABLE test_tablesample1 (dist int, id int, name text) DISTRIBUTED BY (dist); +INSERT INTO test_tablesample1 SELECT 0, i, repeat(i::text, 875) FROM generate_series(0, 9) s(i) ORDER BY i; +INSERT INTO test_tablesample1 SELECT 3, i, repeat(i::text, 875) FROM generate_series(10, 19) s(i) ORDER BY i; +INSERT INTO test_tablesample1 SELECT 5, i, repeat(i::text, 875) FROM generate_series(20, 29) s(i) ORDER BY i; + +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (COSTS OFF) SELECT id FROM test_tablesample1 TABLESAMPLE SYSTEM (50) REPEATABLE (2); +RESET gp_enable_runtime_filter_pushdown; + +DROP TABLE IF EXISTS test_tablesample1; + +-- case 4: show debug info only when gp_enable_runtime_filter_pushdown is on +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +CREATE TABLE t1(c1 int, c2 int); +CREATE TABLE t2(c1 int, c2 int); +INSERT INTO t1 SELECT GENERATE_SERIES(1, 1000), GENERATE_SERIES(1, 1000); +INSERT INTO t2 SELECT * FROM t1; + +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT count(t1.c2) FROM t1, t2 WHERE t1.c1 = t2.c1; +RESET gp_enable_runtime_filter_pushdown; + +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; + +-- case 5: hashjoin + result + seqsacn +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +CREATE TABLE t1(c1 int, c2 int, c3 char(50), c4 char(50), c5 char(50)) DISTRIBUTED REPLICATED; +CREATE TABLE t2(c1 int, c2 int, c3 char(50), c4 char(50), c5 char(50)); +INSERT INTO t1 VALUES (5,5,5,5,5), (3,3,3,3,3), (4,4,4,4,4); +INSERT INTO t2 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4); +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t2 select * FROM t2; +ANALYZE; + +SET optimizer TO on; +SET gp_enable_runtime_filter_pushdown TO off; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT t1.c3 FROM t1, t2 WHERE t1.c1 = t2.c1; + +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT t1.c3 FROM t1, t2 WHERE t1.c1 = t2.c1; + +RESET gp_enable_runtime_filter_pushdown; + +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; + +-- case 6: hashjoin + hashjoin + seqscan +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +DROP TABLE IF EXISTS t3; +CREATE TABLE t1(c1 int, c2 int, c3 char(50), c4 char(50), c5 char(50)) DISTRIBUTED REPLICATED; +CREATE TABLE t2(c1 int, c2 int, c3 char(50), c4 char(50), c5 char(50)) DISTRIBUTED REPLICATED; +CREATE TABLE t3(c1 int, c2 int, c3 char(50), c4 char(50), c5 char(50)) DISTRIBUTED REPLICATED; +INSERT INTO t1 VALUES (1,1,1,1,1), (2,2,2,2,2), (5,5,5,5,5); +INSERT INTO t2 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4); +INSERT INTO t3 VALUES (1,1,1,1,1), (2,2,2,2,2), (3,3,3,3,3), (4,4,4,4,4); +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t2 select * FROM t2; +INSERT INTO t2 select * FROM t2; +INSERT INTO t3 select * FROM t3; +ANALYZE; + +SET optimizer TO off; +SET gp_enable_runtime_filter_pushdown TO off; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT * FROM t1, t2, t3 WHERE t1.c1 = t2.c1 AND t1.c2 = t3.c2; + +SET gp_enable_runtime_filter_pushdown TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) SELECT * FROM t1, t2, t3 WHERE t1.c1 = t2.c1 AND t1.c2 = t3.c2; + +RESET gp_enable_runtime_filter_pushdown; + +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; +DROP TABLE IF EXISTS t3; + +-- case 7: scan partition table with dynamic scan +CREATE TABLE t1 (c1 INT, c2 INT) DISTRIBUTED BY (c1) PARTITION BY RANGE (c2) (START (1) END (100) EVERY (50)); +CREATE TABLE t2 (c1 INT, c2 INT) DISTRIBUTED REPLICATED; +INSERT INTO t1 SELECT generate_series(1, 99), generate_series(1, 99); +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t1 SELECT * FROM t1; +INSERT INTO t2 SELECT generate_series(1, 5), generate_series(1, 5); +INSERT INTO t2 SELECT generate_series(51, 51), generate_series(51, 51); +ANALYZE; + +SET optimizer TO on; + +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT t1.c2 FROM t1, t2 WHERE t1.c2 = t2.c2; + +SET gp_enable_runtime_filter_pushdown TO on; + +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT t1.c2 FROM t1, t2 WHERE t1.c2 = t2.c2; + +set pax.enable_row_filter TO on; +EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF) +SELECT t1.c2 FROM t1, t2 WHERE t1.c2 = t2.c2; + + +RESET pax.enable_row_filter; +RESET gp_enable_runtime_filter_pushdown; + +DROP TABLE IF EXISTS t1; +DROP TABLE IF EXISTS t2; + +SET optimizer TO off; + +RESET enable_parallel; -- Clean up: reset guc SET gp_enable_runtime_filter TO off; diff --git a/src/backend/executor/nodeDynamicSeqscan.c b/src/backend/executor/nodeDynamicSeqscan.c index 3380b907ba3..f74f0bae01e 100644 --- a/src/backend/executor/nodeDynamicSeqscan.c +++ b/src/backend/executor/nodeDynamicSeqscan.c @@ -170,6 +170,22 @@ initNextTableToScan(DynamicSeqScanState *node) node->seqScanState = ExecInitSeqScanForPartition(&plan->seqscan, estate, currentRelation); + + if(estate->es_instrument) + { + node->seqScanState->ss.ps.instrument = GpInstrAlloc(node->seqScanState->ss.ps.plan, estate->es_instrument, node->seqScanState->ss.ps.async_capable); + } + + if (gp_enable_runtime_filter_pushdown && !estate->useMppParallelMode) + { + node->seqScanState->filter_in_seqscan = true; + // copy filters to seqscan state + node->seqScanState->filters = list_concat(node->seqScanState->filters, node->filters); + } + else + { + node->seqScanState->filter_in_seqscan = false; + } return true; } @@ -223,16 +239,7 @@ ExecDynamicSeqScan(PlanState *pstate) slot = ExecProcNode(&node->seqScanState->ss.ps); if (!TupIsNull(slot)) - { - if (gp_enable_runtime_filter_pushdown - && !pstate->state->useMppParallelMode - && node->filters) - { - if (!PassByBloomFilter(&node->ss.ps, node->filters, slot)) - continue; - } break; - } /* No more tuples from this partition. Move to next one. */ CleanupOnePartition(node); @@ -252,6 +259,13 @@ CleanupOnePartition(DynamicSeqScanState *scanState) if (scanState->seqScanState) { + // if runtime filter really work in subpartition, accumulate the statistics + if(scanState->seqScanState->ss.ps.instrument && scanState->seqScanState->ss.ps.instrument->prf_work) + { + scanState->ss.ps.instrument->prf_work = true; + scanState->ss.ps.instrument->nfilteredPRF += scanState->seqScanState->ss.ps.instrument->nfilteredPRF; + } + ExecEndSeqScan(scanState->seqScanState); scanState->seqScanState = NULL; Assert(scanState->ss.ss_currentRelation != NULL); diff --git a/src/backend/executor/nodeHashjoin.c b/src/backend/executor/nodeHashjoin.c index 9ec70f16e31..e0832649340 100644 --- a/src/backend/executor/nodeHashjoin.c +++ b/src/backend/executor/nodeHashjoin.c @@ -2191,6 +2191,7 @@ CreateRuntimeFilter(HashJoinState* hjstate) HashState *hstate; AttrFilter *attr_filter; ListCell *lc; + ListCell *lc_targets; List *targets; /* @@ -2236,9 +2237,9 @@ CreateRuntimeFilter(HashJoinState* hjstate) if (lattno == -1 || targets == NULL) continue; - foreach(lc, targets) + foreach(lc_targets, targets) { - PlanState *target = lfirst(lc); + PlanState *target = lfirst(lc_targets); Assert(IsA(target, SeqScanState) || IsA(target, DynamicSeqScanState)); attr_filter = CreateAttrFilter(target, lattno, rattno, @@ -2341,12 +2342,6 @@ CheckTargetNode(PlanState *node, AttrNumber attno, AttrNumber *lattno) if (!IsA(te->expr, Var)) return false; - if (IsA(node, DynamicSeqScanState)) - { - *lattno = te->resno; - return true; - } - /* * seqscan is a special case, it's targetlist is a projection of the * relation's attributes. so we need to find the attribute number of the diff --git a/src/backend/executor/nodeSeqscan.c b/src/backend/executor/nodeSeqscan.c index 7ee5a2e94cf..68998dc1f8e 100644 --- a/src/backend/executor/nodeSeqscan.c +++ b/src/backend/executor/nodeSeqscan.c @@ -77,7 +77,7 @@ SeqNext(SeqScanState *node) /* * Just when gp_enable_runtime_filter_pushdown enabled and - * node->filter_in_seqscan is false means scankey need to be pushed to + * table am support runtime filter, scankey need to be pushed to * AM. */ if (gp_enable_runtime_filter_pushdown && node->filter_in_seqscan && node->filters && @@ -231,6 +231,10 @@ ExecInitSeqScanForPartition(SeqScan *node, EState *estate, { scanstate->filter_in_seqscan = true; } + else + { + scanstate->filter_in_seqscan = false; + } return scanstate; }