From df5d8c1beb79c7e07431ca7e667cc7828eb96932 Mon Sep 17 00:00:00 2001 From: Jerry Hu Date: Thu, 6 Mar 2025 15:51:38 +0800 Subject: [PATCH] [ut](spill) add UT for partitioned hash join (#48432) --- be/src/pipeline/exec/hashjoin_build_sink.h | 9 +- .../pipeline/exec/hashjoin_probe_operator.h | 6 +- be/src/pipeline/exec/operator.h | 8 +- .../partitioned_hash_join_probe_operator.cpp | 10 +- .../partitioned_hash_join_probe_operator.h | 7 +- .../partitioned_hash_join_sink_operator.cpp | 2 +- .../partitioned_hash_join_sink_operator.h | 6 +- be/src/pipeline/exec/spill_utils.h | 11 +- be/src/runtime/fragment_mgr.h | 3 +- ...titioned_hash_join_probe_operator_test.cpp | 1162 +++++++++++++++++ ...rtitioned_hash_join_sink_operator_test.cpp | 398 ++++++ .../partitioned_hash_join_test_helper.cpp | 243 ++++ .../partitioned_hash_join_test_helper.h | 292 +++++ be/test/testutil/creators.h | 109 ++ be/test/testutil/mock/mock_descriptors.h | 24 + be/test/testutil/mock/mock_operators.h | 64 + 16 files changed, 2334 insertions(+), 20 deletions(-) create mode 100644 be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp create mode 100644 be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp create mode 100644 be/test/pipeline/operator/partitioned_hash_join_test_helper.cpp create mode 100644 be/test/pipeline/operator/partitioned_hash_join_test_helper.h create mode 100644 be/test/testutil/creators.h create mode 100644 be/test/testutil/mock/mock_operators.h diff --git a/be/src/pipeline/exec/hashjoin_build_sink.h b/be/src/pipeline/exec/hashjoin_build_sink.h index 62a4be16792094..81100bdbfaef87 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.h +++ b/be/src/pipeline/exec/hashjoin_build_sink.h @@ -17,6 +17,7 @@ #pragma once +#include "common/be_mock_util.h" #include "exprs/runtime_filter_slots.h" #include "join_build_sink_operator.h" #include "operator.h" @@ -25,7 +26,7 @@ namespace doris::pipeline { #include "common/compile_check_begin.h" class HashJoinBuildSinkOperatorX; -class HashJoinBuildSinkLocalState final +class HashJoinBuildSinkLocalState MOCK_REMOVE(final) : public JoinBuildSinkLocalState { public: ENABLE_FACTORY_CREATOR(HashJoinBuildSinkLocalState); @@ -56,7 +57,7 @@ class HashJoinBuildSinkLocalState final Status disable_runtime_filters(RuntimeState* state); - [[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state, bool eos); + [[nodiscard]] MOCK_FUNCTION size_t get_reserve_mem_size(RuntimeState* state, bool eos); protected: Status _hash_table_init(RuntimeState* state); @@ -109,7 +110,7 @@ class HashJoinBuildSinkLocalState final RuntimeProfile::Counter* _runtime_filter_init_timer = nullptr; }; -class HashJoinBuildSinkOperatorX final +class HashJoinBuildSinkOperatorX MOCK_REMOVE(final) : public JoinBuildSinkOperatorX { public: HashJoinBuildSinkOperatorX(ObjectPool* pool, int operator_id, int dest_id, @@ -129,7 +130,7 @@ class HashJoinBuildSinkOperatorX final [[nodiscard]] size_t get_memory_usage(RuntimeState* state) const; - std::string get_memory_usage_debug_str(RuntimeState* state) const; + MOCK_FUNCTION std::string get_memory_usage_debug_str(RuntimeState* state) const; bool should_dry_run(RuntimeState* state) override { return _is_broadcast_join && !state->get_sink_local_state() diff --git a/be/src/pipeline/exec/hashjoin_probe_operator.h b/be/src/pipeline/exec/hashjoin_probe_operator.h index 7928c4bc411a4d..b9774b5bb0d708 100644 --- a/be/src/pipeline/exec/hashjoin_probe_operator.h +++ b/be/src/pipeline/exec/hashjoin_probe_operator.h @@ -18,6 +18,7 @@ #include +#include "common/be_mock_util.h" #include "common/status.h" #include "operator.h" #include "pipeline/exec/join_probe_operator.h" @@ -43,7 +44,7 @@ using HashTableCtxVariants = ProcessHashTableProbe>; class HashJoinProbeOperatorX; -class HashJoinProbeLocalState final +class HashJoinProbeLocalState MOCK_REMOVE(final) : public JoinProbeLocalState { public: using Parent = HashJoinProbeOperatorX; @@ -124,7 +125,8 @@ class HashJoinProbeLocalState final RuntimeProfile::Counter* _non_equal_join_conjuncts_timer = nullptr; }; -class HashJoinProbeOperatorX final : public JoinProbeOperatorX { +class HashJoinProbeOperatorX MOCK_REMOVE(final) + : public JoinProbeOperatorX { public: HashJoinProbeOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, const DescriptorTbl& descs); diff --git a/be/src/pipeline/exec/operator.h b/be/src/pipeline/exec/operator.h index 1f46fa7ea61b28..9e177fa0072d8e 100644 --- a/be/src/pipeline/exec/operator.h +++ b/be/src/pipeline/exec/operator.h @@ -284,7 +284,6 @@ class PipelineXSpillLocalState : public PipelineXLocalState { Status init(RuntimeState* state, LocalStateInfo& info) override { RETURN_IF_ERROR(PipelineXLocalState::init(state, info)); - _spill_total_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillTotalTime", 1); init_spill_read_counters(); return Status::OK(); @@ -317,6 +316,8 @@ class PipelineXSpillLocalState : public PipelineXLocalState { } void init_spill_read_counters() { + _spill_total_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillTotalTime", 1); + // Spill read counters _spill_recover_time = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillRecoverTime", 1); @@ -669,7 +670,11 @@ class PipelineXSpillSinkLocalState : public PipelineXSinkLocalState dependencies() const override { diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp index af84d515e4a40f..481305c566ffc3 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp @@ -58,6 +58,11 @@ Status PartitionedHashJoinProbeLocalState::init(RuntimeState* state, LocalStateI "HashJoinProbeSpillDependency", true); state->get_task()->add_spill_dependency(_spill_dependency.get()); + init_counters(); + return Status::OK(); +} + +void PartitionedHashJoinProbeLocalState::init_counters() { _partition_timer = ADD_TIMER(profile(), "SpillPartitionTime"); _partition_shuffle_timer = ADD_TIMER(profile(), "SpillPartitionShuffleTime"); _spill_build_rows = ADD_COUNTER(profile(), "SpillBuildRows", TUnit::UNIT); @@ -78,7 +83,6 @@ Status PartitionedHashJoinProbeLocalState::init(RuntimeState* state, LocalStateI ADD_COUNTER_WITH_LEVEL(profile(), "ProbeBloksBytesInMem", TUnit::BYTES, 1); _memory_usage_reserved = ADD_COUNTER_WITH_LEVEL(profile(), "MemoryUsageReserved", TUnit::BYTES, 1); - return Status::OK(); } #define UPDATE_COUNTER_FROM_INNER(name) \ @@ -417,7 +421,7 @@ Status PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_disk(Runtim st = spilled_stream->read_next_block_sync(&block, &eos); if (!st.ok()) { break; - } else { + } else if (!block.empty()) { COUNTER_UPDATE(_recovery_probe_rows, block.rows()); COUNTER_UPDATE(_recovery_probe_blocks, 1); read_size += block.allocated_bytes(); @@ -786,7 +790,7 @@ size_t PartitionedHashJoinProbeOperatorX::_revocable_mem_size(RuntimeState* stat size_t PartitionedHashJoinProbeOperatorX::get_reserve_mem_size(RuntimeState* state) { auto& local_state = get_local_state(state); const auto need_to_spill = local_state._shared_state->need_to_spill; - if (!need_to_spill || !local_state._child_eos) { + if (!need_to_spill || local_state._child_eos) { return Base::get_reserve_mem_size(state); } diff --git a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h index 4bbdfa7371c57b..e6e12b381ad6a1 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_probe_operator.h @@ -20,6 +20,7 @@ #include #include +#include "common/be_mock_util.h" #include "common/status.h" #include "operator.h" #include "pipeline/dependency.h" @@ -36,7 +37,7 @@ namespace pipeline { class PartitionedHashJoinProbeOperatorX; -class PartitionedHashJoinProbeLocalState final +class PartitionedHashJoinProbeLocalState MOCK_REMOVE(final) : public PipelineXSpillLocalState { public: using Parent = PartitionedHashJoinProbeOperatorX; @@ -65,7 +66,9 @@ class PartitionedHashJoinProbeLocalState final std::string debug_string(int indentation_level = 0) const override; - void update_profile_from_inner(); + MOCK_FUNCTION void update_profile_from_inner(); + + void init_counters(); friend class PartitionedHashJoinProbeOperatorX; diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp index 83f094bf92e8fc..aadabff950e68e 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.cpp @@ -188,7 +188,7 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block( -(inner_sink_state->_hash_table_memory_usage->value() + inner_sink_state->_build_arena_memory_usage->value())); } - auto row_desc = p._child->row_desc(); + const auto& row_desc = p._child->row_desc(); const auto num_slots = row_desc.num_slots(); vectorized::Block build_block; int64_t block_old_mem = 0; diff --git a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h index d073a69516b080..7bca4da8ce23be 100644 --- a/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h +++ b/be/src/pipeline/exec/partitioned_hash_join_sink_operator.h @@ -21,6 +21,7 @@ #include +#include "common/be_mock_util.h" #include "common/status.h" #include "operator.h" #include "pipeline/exec/hashjoin_build_sink.h" @@ -51,7 +52,7 @@ class PartitionedHashJoinSinkLocalState size_t revocable_mem_size(RuntimeState* state) const; [[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state, bool eos); void update_memory_usage(); - void update_profile_from_inner(); + MOCK_FUNCTION void update_profile_from_inner(); Dependency* finishdependency() override; @@ -143,6 +144,9 @@ class PartitionedHashJoinSinkOperatorX private: friend class PartitionedHashJoinSinkLocalState; +#ifdef BE_TEST + friend class PartitionedHashJoinSinkOperatorTest; +#endif const TJoinDistributionType::type _join_distribution; diff --git a/be/src/pipeline/exec/spill_utils.h b/be/src/pipeline/exec/spill_utils.h index ff40c4656dc2a6..89b40ed7745924 100644 --- a/be/src/pipeline/exec/spill_utils.h +++ b/be/src/pipeline/exec/spill_utils.h @@ -138,9 +138,16 @@ class SpillRunnable : public Runnable { return; } + Defer set_ready_defer([&] { + if (_spill_dependency) { + _spill_dependency->set_ready(); + } + }); + if (_state->is_cancelled()) { return; } + auto status = _spill_exec_func(); if (!status.ok()) { ExecEnv::GetInstance()->fragment_mgr()->cancel_query(_state->query_id(), status); @@ -153,10 +160,6 @@ class SpillRunnable : public Runnable { ExecEnv::GetInstance()->fragment_mgr()->cancel_query(_state->query_id(), status2); } } - - if (_spill_dependency) { - _spill_dependency->set_ready(); - } } protected: diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h index 9e37b7811f043b..b80272cf010fab 100644 --- a/be/src/runtime/fragment_mgr.h +++ b/be/src/runtime/fragment_mgr.h @@ -31,6 +31,7 @@ #include #include +#include "common/be_mock_util.h" #include "common/status.h" #include "gutil/ref_counted.h" #include "http/rest_monitor_iface.h" @@ -140,7 +141,7 @@ class FragmentMgr : public RestMonitorIface { std::shared_ptr&&); // Can be used in both version. - void cancel_query(const TUniqueId query_id, const Status reason); + MOCK_FUNCTION void cancel_query(const TUniqueId query_id, const Status reason); void cancel_worker(); diff --git a/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp b/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp new file mode 100644 index 00000000000000..67378fe66a81d1 --- /dev/null +++ b/be/test/pipeline/operator/partitioned_hash_join_probe_operator_test.cpp @@ -0,0 +1,1162 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "pipeline/exec/partitioned_hash_join_probe_operator.h" + +#include + +#include + +#include "common/config.h" +#include "olap/olap_define.h" +#include "partitioned_hash_join_test_helper.h" +#include "pipeline/exec/hashjoin_build_sink.h" +#include "pipeline/exec/partitioned_hash_join_sink_operator.h" +#include "pipeline/exec/spill_utils.h" +#include "pipeline/pipeline_task.h" +#include "runtime/exec_env.h" +#include "runtime/fragment_mgr.h" +#include "testutil/column_helper.h" +#include "testutil/creators.h" +#include "testutil/mock/mock_operators.h" +#include "testutil/mock/mock_runtime_state.h" +#include "util/debug_points.h" +#include "util/runtime_profile.h" +#include "vec/core/block.h" +#include "vec/data_types/data_type_number.h" +#include "vec/spill/spill_stream_manager.h" + +namespace doris::pipeline { +class PartitionedHashJoinProbeOperatorTest : public testing::Test { +public: + void SetUp() override { _helper.SetUp(); } + void TearDown() override { _helper.TearDown(); } + +protected: + PartitionedHashJoinTestHelper _helper; +}; + +TEST_F(PartitionedHashJoinProbeOperatorTest, debug_string) { + auto [probe_operator, sink_operator] = _helper.create_operators(); + + std::shared_ptr shared_state; + auto local_state = _helper.create_probe_local_state(_helper.runtime_state.get(), + probe_operator.get(), shared_state); + + auto debug_string = local_state->debug_string(0); + std::cout << "debug string: " << debug_string << std::endl; + + shared_state->need_to_spill = false; + debug_string = local_state->debug_string(0); + std::cout << "debug string: " << debug_string << std::endl; + + ASSERT_TRUE(local_state->profile()->pretty_print().find("ExecTime") != std::string::npos); +} + +TEST_F(PartitionedHashJoinProbeOperatorTest, InitAndOpen) { + auto [probe_operator, sink_operator] = _helper.create_operators(); + + std::map, std::shared_ptr>> + le_state_map; + + auto local_state = PartitionedHashJoinProbeLocalState::create_shared( + _helper.runtime_state.get(), probe_operator.get()); + + auto shared_state = std::make_shared(); + LocalStateInfo info {.parent_profile = _helper.runtime_profile.get(), + .scan_ranges = {}, + .shared_state = shared_state.get(), + .le_state_map = le_state_map, + .task_idx = 0}; + auto st = local_state->init(_helper.runtime_state.get(), info); + ASSERT_TRUE(st) << "init failed: " << st.to_string(); + + // before opening, should setup probe_operator's partitioner. + const auto& tnode = probe_operator->_tnode; + auto child = std::make_shared(); + RowDescriptor row_desc(_helper.runtime_state->desc_tbl(), {0}, {false}); + child->_row_descriptor = row_desc; + + probe_operator->_inner_sink_operator->_child = nullptr; + probe_operator->_inner_probe_operator->_child = nullptr; + probe_operator->_inner_probe_operator->_build_side_child = nullptr; + + st = probe_operator->init(tnode, _helper.runtime_state.get()); + ASSERT_TRUE(st) << "init failed: " << st.to_string(); + + st = probe_operator->prepare(_helper.runtime_state.get()); + ASSERT_TRUE(st) << "prepare failed: " << st.to_string(); + + st = local_state->open(_helper.runtime_state.get()); + ASSERT_TRUE(st) << "open failed: " << st.to_string(); + + local_state->_shared_state->inner_shared_state = std::make_shared(); + local_state->_shared_state->inner_runtime_state = std::make_unique(); + local_state->_shared_state->inner_runtime_state->set_desc_tbl( + &(_helper.runtime_state->desc_tbl())); + local_state->_shared_state->inner_runtime_state->resize_op_id_to_local_state(-100); + + auto mock_inner_sink_operator = probe_operator->_inner_sink_operator; + probe_operator->_inner_sink_operator = std::make_shared( + _helper.obj_pool.get(), 0, 0, tnode, _helper.runtime_state->desc_tbl()); + EXPECT_TRUE(probe_operator->_inner_sink_operator->set_child(mock_inner_sink_operator->child())); + + st = probe_operator->_inner_sink_operator->init(tnode, _helper.runtime_state.get()); + ASSERT_TRUE(st) << "init inner sink operator failed: " << st.to_string(); + + auto inner_probe_state = std::make_unique( + _helper.runtime_state.get(), probe_operator->_inner_probe_operator.get()); + + st = inner_probe_state->init(local_state->_shared_state->inner_runtime_state.get(), info); + ASSERT_TRUE(st) << "init failed: " << st.to_string(); + + local_state->_shared_state->inner_runtime_state->emplace_local_state( + probe_operator->_inner_probe_operator->operator_id(), std::move(inner_probe_state)); + + auto inner_sink_state = std::make_unique( + probe_operator->_inner_sink_operator.get(), _helper.runtime_state.get()); + + LocalSinkStateInfo sink_info {0, _helper.runtime_profile.get(), + -1, local_state->_shared_state->inner_shared_state.get(), + {}, {}}; + st = probe_operator->_inner_sink_operator->prepare( + local_state->_shared_state->inner_runtime_state.get()); + ASSERT_TRUE(st) << "prepare failed: " << st.to_string(); + + st = inner_sink_state->init(local_state->_shared_state->inner_runtime_state.get(), sink_info); + ASSERT_TRUE(st) << "init failed: " << st.to_string(); + + local_state->_shared_state->inner_runtime_state->emplace_sink_local_state( + 0, std::move(inner_sink_state)); + + local_state->_shared_state->need_to_spill = false; + local_state->update_profile_from_inner(); + + local_state->_shared_state->need_to_spill = true; + local_state->update_profile_from_inner(); + + st = local_state->close(_helper.runtime_state.get()); + ASSERT_TRUE(st) << "close failed: " << st.to_string(); + + st = local_state->close(_helper.runtime_state.get()); + ASSERT_TRUE(st) << "close failed: " << st.to_string(); + + auto debug_string = local_state->debug_string(0); + std::cout << "debug string: " << debug_string << std::endl; + + // close is reentrant. + st = local_state->close(_helper.runtime_state.get()); + ASSERT_TRUE(st) << "close failed: " << st.to_string(); +} + +TEST_F(PartitionedHashJoinProbeOperatorTest, spill_probe_blocks) { + auto [probe_operator, sink_operator] = _helper.create_operators(); + + std::shared_ptr shared_state; + auto local_state = _helper.create_probe_local_state(_helper.runtime_state.get(), + probe_operator.get(), shared_state); + + RowDescriptor row_desc(_helper.runtime_state->desc_tbl(), {0}, {false}); + const auto& tnode = probe_operator->_tnode; + local_state->_partitioner = create_spill_partitioner( + _helper.runtime_state.get(), PartitionedHashJoinTestHelper::TEST_PARTITION_COUNT, + {tnode.hash_join_node.eq_join_conjuncts[0].left}, row_desc); + + // create probe blocks + for (int32_t i = 0; i != PartitionedHashJoinTestHelper::TEST_PARTITION_COUNT; ++i) { + if (i % 2 == 0) { + continue; + } + + vectorized::Block block = vectorized::ColumnHelper::create_block( + {1 * i, 2 * i, 3 * i}); + local_state->_probe_blocks[i].emplace_back(std::move(block)); + } + + std::vector large_data(3 * 1024 * 1024); + std::iota(large_data.begin(), large_data.end(), 0); + vectorized::Block large_block = + vectorized::ColumnHelper::create_block(large_data); + + std::vector small_data(3 * 1024); + std::iota(small_data.begin(), small_data.end(), 3 * 1024 * 1024); + vectorized::Block small_block = + vectorized::ColumnHelper::create_block(small_data); + + // add a large block to the last partition + local_state->_partitioned_blocks[PartitionedHashJoinTestHelper::TEST_PARTITION_COUNT - 1] = + vectorized::MutableBlock::create_unique(std::move(large_block)); + + // add a small block to the first partition + local_state->_partitioned_blocks[0] = + vectorized::MutableBlock::create_unique(std::move(small_block)); + + local_state->_shared_state->need_to_spill = false; + local_state->update_profile_from_inner(); + + local_state->_shared_state->need_to_spill = true; + auto st = local_state->spill_probe_blocks(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "spill probe blocks failed: " << st.to_string(); + + std::cout << "wait for spill dependency ready" << std::endl; + while (local_state->_spill_dependency->_ready.load() == false) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + std::cout << "spill dependency ready" << std::endl; + + local_state->update_profile_from_inner(); + + std::cout << "profile: " << local_state->profile()->pretty_print() << std::endl; + + for (int32_t i = 0; i != PartitionedHashJoinTestHelper::TEST_PARTITION_COUNT; ++i) { + if (!local_state->_probe_spilling_streams[i]) { + continue; + } + ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream( + local_state->_probe_spilling_streams[i]); + local_state->_probe_spilling_streams[i].reset(); + } + + auto* write_rows_counter = local_state->profile()->get_counter("SpillWriteRows"); + ASSERT_EQ(write_rows_counter->value(), + (PartitionedHashJoinTestHelper::TEST_PARTITION_COUNT / 2) * 3 + 3 * 1024 * 1024); +} + +TEST_F(PartitionedHashJoinProbeOperatorTest, RecoverProbeBlocksFromDisk) { + auto [probe_operator, sink_operator] = _helper.create_operators(); + + std::shared_ptr shared_state; + auto local_state = _helper.create_probe_local_state(_helper.runtime_state.get(), + probe_operator.get(), shared_state); + + // Create and register a spill stream for testing + const uint32_t test_partition = 0; + auto& spill_stream = local_state->_probe_spilling_streams[test_partition]; + ASSERT_TRUE(ExecEnv::GetInstance() + ->spill_stream_mgr() + ->register_spill_stream( + _helper.runtime_state.get(), spill_stream, + print_id(_helper.runtime_state->query_id()), "hash_probe", + probe_operator->node_id(), std::numeric_limits::max(), + std::numeric_limits::max(), local_state->profile()) + .ok()); + + // Write some test data to spill stream + { + vectorized::Block block = + vectorized::ColumnHelper::create_block({1, 2, 3}); + ASSERT_TRUE(spill_stream->spill_block(_helper.runtime_state.get(), block, false).ok()); + ASSERT_TRUE(spill_stream->spill_eof().ok()); + } + + // Test recovery + bool has_data = false; + ASSERT_TRUE(local_state + ->recover_probe_blocks_from_disk(_helper.runtime_state.get(), + test_partition, has_data) + .ok()); + ASSERT_TRUE(has_data); + + // Wait for async recovery to complete + while (local_state->_spill_dependency->_ready.load() == false) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + std::cout << "profile: " << local_state->profile()->pretty_print() << std::endl; + + // Verify recovered data + auto& probe_blocks = local_state->_probe_blocks[test_partition]; + ASSERT_FALSE(probe_blocks.empty()); + ASSERT_EQ(probe_blocks[0].rows(), 3); + + // Verify counters + auto* recovery_rows_counter = local_state->profile()->get_counter("SpillRecoveryProbeRows"); + ASSERT_EQ(recovery_rows_counter->value(), 3); + auto* recovery_blocks_counter = local_state->profile()->get_counter("SpillReadBlockCount"); + ASSERT_EQ(recovery_blocks_counter->value(), 1); + + // Verify stream cleanup + ASSERT_EQ(local_state->_probe_spilling_streams[test_partition], nullptr); +} + +TEST_F(PartitionedHashJoinProbeOperatorTest, RecoverProbeBlocksFromDiskLargeData) { + auto [probe_operator, sink_operator] = _helper.create_operators(); + + std::shared_ptr shared_state; + auto local_state = _helper.create_probe_local_state(_helper.runtime_state.get(), + probe_operator.get(), shared_state); + + // Create and register a spill stream for testing + const uint32_t test_partition = 0; + auto& spill_stream = local_state->_probe_spilling_streams[test_partition]; + ASSERT_TRUE(ExecEnv::GetInstance() + ->spill_stream_mgr() + ->register_spill_stream( + _helper.runtime_state.get(), spill_stream, + print_id(_helper.runtime_state->query_id()), "hash_probe", + probe_operator->node_id(), std::numeric_limits::max(), + std::numeric_limits::max(), local_state->profile()) + .ok()); + + // Write some test data to spill stream + { + // create block larger than 32MB(4 * (8 * 1024 * 1024 + 10)) + std::vector large_data(8 * 1024 * 1024 + 10); + std::iota(large_data.begin(), large_data.end(), 0); + vectorized::Block large_block = + vectorized::ColumnHelper::create_block(large_data); + + ASSERT_TRUE( + spill_stream->spill_block(_helper.runtime_state.get(), large_block, false).ok()); + + vectorized::Block block = + vectorized::ColumnHelper::create_block({1, 2, 3}); + ASSERT_TRUE(spill_stream->spill_block(_helper.runtime_state.get(), block, false).ok()); + ASSERT_TRUE(spill_stream->spill_eof().ok()); + } + + // Test recovery + bool has_data = true; + while (has_data) { + ASSERT_TRUE(local_state + ->recover_probe_blocks_from_disk(_helper.runtime_state.get(), + test_partition, has_data) + .ok()); + + // Wait for async recovery to complete + while (local_state->_spill_dependency->_ready.load() == false) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + } + + std::cout << "profile: " << local_state->profile()->pretty_print() << std::endl; + + // Verify recovered data + auto& probe_blocks = local_state->_probe_blocks[test_partition]; + ASSERT_FALSE(probe_blocks.empty()); + ASSERT_EQ(probe_blocks[0].rows(), 8 * 1024 * 1024 + 10); + ASSERT_EQ(probe_blocks[1].rows(), 3); + + // Verify counters + auto* recovery_rows_counter = local_state->profile()->get_counter("SpillRecoveryProbeRows"); + ASSERT_EQ(recovery_rows_counter->value(), 3 + 8 * 1024 * 1024 + 10); + auto* recovery_blocks_counter = local_state->profile()->get_counter("SpillReadBlockCount"); + ASSERT_EQ(recovery_blocks_counter->value(), 2); + + // Verify stream cleanup + ASSERT_EQ(local_state->_probe_spilling_streams[test_partition], nullptr); +} + +TEST_F(PartitionedHashJoinProbeOperatorTest, RecoverProbeBlocksFromDiskEmpty) { + auto [probe_operator, sink_operator] = _helper.create_operators(); + + std::shared_ptr shared_state; + auto local_state = _helper.create_probe_local_state(_helper.runtime_state.get(), + probe_operator.get(), shared_state); + + // Test multiple cases + const uint32_t test_partition = 0; + + auto& spilled_stream = local_state->_probe_spilling_streams[test_partition]; + ASSERT_TRUE(ExecEnv::GetInstance() + ->spill_stream_mgr() + ->register_spill_stream( + _helper.runtime_state.get(), spilled_stream, + print_id(_helper.runtime_state->query_id()), "hash_probe", + probe_operator->node_id(), std::numeric_limits::max(), + std::numeric_limits::max(), local_state->profile()) + .ok()); + ASSERT_TRUE(spilled_stream->spill_eof().ok()); + + bool has_data = false; + ASSERT_TRUE(local_state + ->recover_probe_blocks_from_disk(_helper.runtime_state.get(), + test_partition, has_data) + .ok()); + ASSERT_TRUE(has_data); + while (local_state->_spill_dependency->_ready.load() == false) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + ASSERT_TRUE(local_state->_probe_blocks[test_partition].empty()) + << "probe blocks not empty: " << local_state->_probe_blocks[test_partition].size(); + + ASSERT_TRUE(spilled_stream == nullptr); +} + +TEST_F(PartitionedHashJoinProbeOperatorTest, RecoverProbeBlocksFromDiskError) { + auto [probe_operator, sink_operator] = _helper.create_operators(); + + std::shared_ptr shared_state; + auto local_state = _helper.create_probe_local_state(_helper.runtime_state.get(), + probe_operator.get(), shared_state); + + // Test multiple cases + const uint32_t test_partition = 0; + + auto& spilling_stream = local_state->_probe_spilling_streams[test_partition]; + ASSERT_TRUE(ExecEnv::GetInstance() + ->spill_stream_mgr() + ->register_spill_stream( + _helper.runtime_state.get(), spilling_stream, + print_id(_helper.runtime_state->query_id()), "hash_probe", + probe_operator->node_id(), std::numeric_limits::max(), + std::numeric_limits::max(), local_state->profile()) + .ok()); + + // Write some test data to spill stream + { + vectorized::Block block = + vectorized::ColumnHelper::create_block({1, 2, 3}); + ASSERT_TRUE(spilling_stream->spill_block(_helper.runtime_state.get(), block, false).ok()); + ASSERT_TRUE(spilling_stream->spill_eof().ok()); + } + + Status spill_status; + ExecEnv::GetInstance()->_fragment_mgr = + new MockFragmentManager(spill_status, ExecEnv::GetInstance()); + + const auto enable_debug_points = config::enable_debug_points; + config::enable_debug_points = true; + DebugPoints::instance()->add("fault_inject::spill_stream::read_next_block"); + bool has_data = false; + ASSERT_TRUE(local_state + ->recover_probe_blocks_from_disk(_helper.runtime_state.get(), + test_partition, has_data) + .ok()); + ASSERT_TRUE(has_data); + while (local_state->_spill_dependency->_ready.load() == false) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + ExecEnv::GetInstance()->spill_stream_mgr()->delete_spill_stream(spilling_stream); + spilling_stream.reset(); + + config::enable_debug_points = enable_debug_points; + + ASSERT_FALSE(spill_status.ok()); + ASSERT_TRUE(spill_status.to_string().find("fault_inject spill_stream read_next_block") != + std::string::npos) + << "unexpected error: " << spill_status.to_string(); +} + +TEST_F(PartitionedHashJoinProbeOperatorTest, RecoverBuildBlocksFromDisk) { + // Setup test environment + auto [probe_operator, sink_operator] = _helper.create_operators(); + + // Initialize local state + std::shared_ptr shared_state; + auto local_state = _helper.create_probe_local_state(_helper.runtime_state.get(), + probe_operator.get(), shared_state); + + // Create and register spill stream with test data + const uint32_t test_partition = 0; + auto& spilled_stream = local_state->_shared_state->spilled_streams[test_partition]; + ASSERT_TRUE(ExecEnv::GetInstance() + ->spill_stream_mgr() + ->register_spill_stream( + _helper.runtime_state.get(), spilled_stream, + print_id(_helper.runtime_state->query_id()), "hash_build", + probe_operator->node_id(), std::numeric_limits::max(), + std::numeric_limits::max(), local_state->profile()) + .ok()); + + // Write test data + { + vectorized::Block block = + vectorized::ColumnHelper::create_block({1, 2, 3}); + ASSERT_TRUE(spilled_stream->spill_block(_helper.runtime_state.get(), block, false).ok()); + ASSERT_TRUE(spilled_stream->spill_eof().ok()); + } + + // Test recovery + bool has_data = false; + ASSERT_TRUE(local_state + ->recover_build_blocks_from_disk(_helper.runtime_state.get(), + test_partition, has_data) + .ok()); + ASSERT_TRUE(has_data); + + // Wait for async recovery + while (local_state->_spill_dependency->_ready.load() == false) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + // Verify recovered data + ASSERT_TRUE(local_state->_recovered_build_block != nullptr); + ASSERT_EQ(local_state->_recovered_build_block->rows(), 3); + + // Verify counters + auto* recovery_rows_counter = local_state->profile()->get_counter("SpillRecoveryBuildRows"); + ASSERT_EQ(recovery_rows_counter->value(), 3); + auto* recovery_blocks_counter = local_state->profile()->get_counter("SpillReadBlockCount"); + ASSERT_EQ(recovery_blocks_counter->value(), 1); + + // Verify stream cleanup + ASSERT_EQ(local_state->_shared_state->spilled_streams[test_partition], nullptr); +} + +TEST_F(PartitionedHashJoinProbeOperatorTest, RecoverBuildBlocksFromDiskCanceled) { + // Setup test environment + auto [probe_operator, sink_operator] = _helper.create_operators(); + + // Initialize local state + std::shared_ptr shared_state; + auto local_state = _helper.create_probe_local_state(_helper.runtime_state.get(), + probe_operator.get(), shared_state); + + // Create and register spill stream with test data + const uint32_t test_partition = 0; + auto& spilled_stream = local_state->_shared_state->spilled_streams[test_partition]; + ASSERT_TRUE(ExecEnv::GetInstance() + ->spill_stream_mgr() + ->register_spill_stream( + _helper.runtime_state.get(), spilled_stream, + print_id(_helper.runtime_state->query_id()), "hash_build", + probe_operator->node_id(), std::numeric_limits::max(), + std::numeric_limits::max(), local_state->profile()) + .ok()); + + // Write test data + { + vectorized::Block block = + vectorized::ColumnHelper::create_block({1, 2, 3}); + ASSERT_TRUE(spilled_stream->spill_block(_helper.runtime_state.get(), block, false).ok()); + ASSERT_TRUE(spilled_stream->spill_eof().ok()); + } + + // Test recovery + bool has_data = false; + ASSERT_TRUE(local_state + ->recover_build_blocks_from_disk(_helper.runtime_state.get(), + test_partition, has_data) + .ok()); + ASSERT_TRUE(has_data); + + // Wait for async recovery + while (local_state->_spill_dependency->_ready.load() == false) { + _helper.runtime_state->_query_ctx->_exec_status.update(Status::Cancelled("test canceled")); + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + ASSERT_TRUE(_helper.runtime_state->is_cancelled()); +} + +TEST_F(PartitionedHashJoinProbeOperatorTest, need_more_input_data) { + // Setup test environment + auto [probe_operator, sink_operator] = _helper.create_operators(); + + // Initialize local state + std::shared_ptr shared_state; + auto local_state = _helper.create_probe_local_state(_helper.runtime_state.get(), + probe_operator.get(), shared_state); + + local_state->_shared_state->need_to_spill = true; + local_state->_child_eos = false; + ASSERT_EQ(probe_operator->need_more_input_data(_helper.runtime_state.get()), + !local_state->_child_eos); + + local_state->_child_eos = true; + ASSERT_EQ(probe_operator->need_more_input_data(_helper.runtime_state.get()), + !local_state->_child_eos); + + local_state->_shared_state->need_to_spill = false; + auto inner_operator = std::dynamic_pointer_cast( + probe_operator->_inner_probe_operator); + + inner_operator->need_more_data = true; + ASSERT_EQ(probe_operator->need_more_input_data(_helper.runtime_state.get()), true); + inner_operator->need_more_data = false; + ASSERT_EQ(probe_operator->need_more_input_data(_helper.runtime_state.get()), false); +} + +TEST_F(PartitionedHashJoinProbeOperatorTest, revocable_mem_size) { + // Setup test environment + auto [probe_operator, sink_operator] = _helper.create_operators(); + + // Initialize local state + std::shared_ptr shared_state; + auto local_state = _helper.create_probe_local_state(_helper.runtime_state.get(), + probe_operator.get(), shared_state); + + local_state->_child_eos = true; + ASSERT_EQ(probe_operator->revocable_mem_size(_helper.runtime_state.get()), 0); + + local_state->_child_eos = false; + auto block1 = vectorized::ColumnHelper::create_block({1, 2, 3}); + local_state->_probe_blocks[0].emplace_back(block1); + ASSERT_EQ(probe_operator->revocable_mem_size(_helper.runtime_state.get()), + block1.allocated_bytes()); + auto block2 = + vectorized::ColumnHelper::create_block({1, 2, 3, 5, 6, 7}); + local_state->_partitioned_blocks[0] = + vectorized::MutableBlock::create_unique(std::move(block2)); + + // block2 is small, so it should not be counted + ASSERT_EQ(probe_operator->revocable_mem_size(_helper.runtime_state.get()), + block1.allocated_bytes()); + + // Create large input block (> 32k) + std::vector large_data(9 * 1024); + std::iota(large_data.begin(), large_data.end(), 0); + vectorized::Block large_block = + vectorized::ColumnHelper::create_block(large_data); + + const auto large_size = large_block.allocated_bytes(); + local_state->_partitioned_blocks[0] = + vectorized::MutableBlock::create_unique(std::move(large_block)); + ASSERT_EQ(probe_operator->revocable_mem_size(_helper.runtime_state.get()), + block1.allocated_bytes() + large_size); + + local_state->_child_eos = true; + ASSERT_EQ(probe_operator->revocable_mem_size(_helper.runtime_state.get()), 0); +} + +TEST_F(PartitionedHashJoinProbeOperatorTest, get_reserve_mem_size) { + // Setup test environment + auto [probe_operator, sink_operator] = _helper.create_operators(); + + // Initialize local state + std::shared_ptr shared_state; + auto local_state = _helper.create_probe_local_state(_helper.runtime_state.get(), + probe_operator.get(), shared_state); + + local_state->_shared_state->need_to_spill = true; + local_state->_child_eos = false; + + local_state->_need_to_setup_internal_operators = false; + ASSERT_EQ(probe_operator->get_reserve_mem_size(_helper.runtime_state.get()), + vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM); + + local_state->_need_to_setup_internal_operators = true; + ASSERT_GT(probe_operator->get_reserve_mem_size(_helper.runtime_state.get()), + vectorized::SpillStream::MAX_SPILL_WRITE_BATCH_MEM); + + const auto default_reserve_size = + _helper.runtime_state->minimum_operator_memory_required_bytes() + + probe_operator->get_child()->get_reserve_mem_size(_helper.runtime_state.get()); + local_state->_shared_state->need_to_spill = false; + ASSERT_EQ(probe_operator->get_reserve_mem_size(_helper.runtime_state.get()), + default_reserve_size); + + local_state->_shared_state->need_to_spill = true; + local_state->_child_eos = true; + ASSERT_EQ(probe_operator->get_reserve_mem_size(_helper.runtime_state.get()), + default_reserve_size); +} + +TEST_F(PartitionedHashJoinProbeOperatorTest, RecoverBuildBlocksFromDiskEmpty) { + // Similar setup as above... + auto [probe_operator, sink_operator] = _helper.create_operators(); + + std::shared_ptr shared_state; + auto local_state = _helper.create_probe_local_state(_helper.runtime_state.get(), + probe_operator.get(), shared_state); + + // Test empty stream + const uint32_t test_partition = 0; + auto& spilled_stream = local_state->_shared_state->spilled_streams[test_partition]; + ASSERT_TRUE(ExecEnv::GetInstance() + ->spill_stream_mgr() + ->register_spill_stream( + _helper.runtime_state.get(), spilled_stream, + print_id(_helper.runtime_state->query_id()), "hash_build", + probe_operator->node_id(), std::numeric_limits::max(), + std::numeric_limits::max(), local_state->profile()) + .ok()); + + ASSERT_TRUE(spilled_stream->spill_eof().ok()); + + bool has_data = false; + ASSERT_TRUE(local_state + ->recover_build_blocks_from_disk(_helper.runtime_state.get(), + test_partition, has_data) + .ok()); + ASSERT_TRUE(has_data); + + while (local_state->_spill_dependency->_ready.load() == false) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + ASSERT_EQ(spilled_stream, nullptr); + ASSERT_TRUE(local_state->_recovered_build_block == nullptr); +} + +TEST_F(PartitionedHashJoinProbeOperatorTest, RecoverBuildBlocksFromDiskLargeData) { + // Similar setup as above... + auto [probe_operator, sink_operator] = _helper.create_operators(); + + std::shared_ptr shared_state; + auto local_state = _helper.create_probe_local_state(_helper.runtime_state.get(), + probe_operator.get(), shared_state); + + // Test empty stream + const uint32_t test_partition = 0; + auto& spilled_stream = local_state->_shared_state->spilled_streams[test_partition]; + ASSERT_TRUE(ExecEnv::GetInstance() + ->spill_stream_mgr() + ->register_spill_stream( + _helper.runtime_state.get(), spilled_stream, + print_id(_helper.runtime_state->query_id()), "hash_build", + probe_operator->node_id(), std::numeric_limits::max(), + std::numeric_limits::max(), local_state->profile()) + .ok()); + + // Write some test data to spill stream + { + // create block larger than 32MB(4 * (8 * 1024 * 1024 + 10)) + std::vector large_data(8 * 1024 * 1024 + 10); + std::iota(large_data.begin(), large_data.end(), 0); + vectorized::Block large_block = + vectorized::ColumnHelper::create_block(large_data); + + ASSERT_TRUE( + spilled_stream->spill_block(_helper.runtime_state.get(), large_block, false).ok()); + + vectorized::Block block = + vectorized::ColumnHelper::create_block({1, 2, 3}); + ASSERT_TRUE(spilled_stream->spill_block(_helper.runtime_state.get(), block, false).ok()); + } + ASSERT_TRUE(spilled_stream->spill_eof().ok()); + + bool has_data = false; + do { + ASSERT_TRUE(local_state + ->recover_build_blocks_from_disk(_helper.runtime_state.get(), + test_partition, has_data) + .ok()); + + while (local_state->_spill_dependency->_ready.load() == false) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + ASSERT_TRUE(local_state->_recovered_build_block); + } while (has_data); + + ASSERT_EQ(spilled_stream, nullptr); + + // Verify recovered data + ASSERT_EQ(local_state->_recovered_build_block->rows(), 8 * 1024 * 1024 + 10 + 3); + + // Verify counters + auto* recovery_rows_counter = local_state->profile()->get_counter("SpillRecoveryBuildRows"); + ASSERT_EQ(recovery_rows_counter->value(), 8 * 1024 * 1024 + 10 + 3); + auto* recovery_blocks_counter = local_state->profile()->get_counter("SpillReadBlockCount"); + ASSERT_EQ(recovery_blocks_counter->value(), 2); +} + +TEST_F(PartitionedHashJoinProbeOperatorTest, RecoverBuildBlocksFromDiskError) { + // Similar setup code as above... + // Similar setup as above... + auto [probe_operator, sink_operator] = _helper.create_operators(); + + std::shared_ptr shared_state; + auto local_state = _helper.create_probe_local_state(_helper.runtime_state.get(), + probe_operator.get(), shared_state); + + // Test empty stream + const uint32_t test_partition = 0; + auto& spilled_stream = local_state->_shared_state->spilled_streams[test_partition]; + ASSERT_TRUE(ExecEnv::GetInstance() + ->spill_stream_mgr() + ->register_spill_stream( + _helper.runtime_state.get(), spilled_stream, + print_id(_helper.runtime_state->query_id()), "hash_build", + probe_operator->node_id(), std::numeric_limits::max(), + std::numeric_limits::max(), local_state->profile()) + .ok()); + + ASSERT_TRUE(spilled_stream->spill_eof().ok()); + + ASSERT_TRUE(local_state->_recovered_build_block == nullptr); + + Status spill_status; + ExecEnv::GetInstance()->_fragment_mgr = + new MockFragmentManager(spill_status, ExecEnv::GetInstance()); + + const auto enable_debug_points = config::enable_debug_points; + config::enable_debug_points = true; + // Test error handling with fault injection + DebugPoints::instance()->add("fault_inject::partitioned_hash_join_probe::recover_build_blocks"); + bool has_data = false; + auto status = local_state->recover_build_blocks_from_disk(_helper.runtime_state.get(), + test_partition, has_data); + + ASSERT_TRUE(status.ok()) << "recover build blocks failed: " << status.to_string(); + while (local_state->_spill_dependency->_ready.load() == false) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + config::enable_debug_points = enable_debug_points; + ASSERT_FALSE(spill_status.ok()); + ASSERT_TRUE(spill_status.to_string().find("fault_inject partitioned_hash_join_probe " + "recover_build_blocks failed") != std::string::npos) + << "incorrect recover build blocks status: " << spill_status.to_string(); +} + +TEST_F(PartitionedHashJoinProbeOperatorTest, GetBlockTestNonSpill) { + // Setup operators and pipeline + auto [probe_operator, sink_operator] = _helper.create_operators(); + + // Initialize local state + std::shared_ptr shared_state; + auto local_state = _helper.create_probe_local_state(_helper.runtime_state.get(), + probe_operator.get(), shared_state); + + // Setup child data + auto input_block = vectorized::Block::create_unique(); + input_block->swap(vectorized::ColumnHelper::create_block({1, 2, 3})); + + auto probe_side_source_operator = + std::dynamic_pointer_cast(probe_operator->get_child()); + probe_side_source_operator->set_block(std::move(*input_block)); + + local_state->_shared_state->need_to_spill = false; + + // Test non empty input block path + { + vectorized::Block output_block; + bool eos = false; + + auto st = probe_operator->get_block(_helper.runtime_state.get(), &output_block, &eos); + ASSERT_TRUE(st.ok()); + ASSERT_FALSE(eos); + } + + // Test empty input block case + { + auto empty_block = vectorized::Block::create_unique(); + probe_side_source_operator->set_block(std::move(*empty_block)); + + vectorized::Block output_block; + bool eos = false; + + auto st = probe_operator->get_block(_helper.runtime_state.get(), &output_block, &eos); + ASSERT_TRUE(st.ok()); + ASSERT_FALSE(eos); + ASSERT_EQ(output_block.rows(), 0); + } + + // Test end of stream case + { + probe_side_source_operator->set_eos(); + vectorized::Block output_block; + bool eos = false; + + auto st = probe_operator->get_block(_helper.runtime_state.get(), &output_block, &eos); + ASSERT_TRUE(st.ok()) << "get block failed: " << st.to_string(); + ASSERT_TRUE(eos); + } +} + +TEST_F(PartitionedHashJoinProbeOperatorTest, PushEmptyBlock) { + // Setup test environment + auto [probe_operator, sink_operator] = _helper.create_operators(); + + // Setup local state + std::shared_ptr shared_state; + auto local_state = _helper.create_probe_local_state(_helper.runtime_state.get(), + probe_operator.get(), shared_state); + + // Create empty input block + vectorized::Block empty_block; + + // Test pushing empty block without EOS + auto st = probe_operator->push(_helper.runtime_state.get(), &empty_block, false); + ASSERT_TRUE(st.ok()); + + // Verify no partitioned blocks were created + for (uint32_t i = 0; i < probe_operator->_partition_count; ++i) { + ASSERT_EQ(local_state->_partitioned_blocks[i], nullptr); + } +} + +TEST_F(PartitionedHashJoinProbeOperatorTest, PushPartitionData) { + // Setup test environment + auto [probe_operator, sink_operator] = _helper.create_operators(); + + // Initialize local state + std::shared_ptr shared_state; + auto local_state = _helper.create_probe_local_state(_helper.runtime_state.get(), + probe_operator.get(), shared_state); + // Setup row descriptor and partitioner + RowDescriptor row_desc(_helper.runtime_state->desc_tbl(), {0}, {false}); + const auto& tnode = probe_operator->_tnode; + local_state->_partitioner = create_spill_partitioner( + _helper.runtime_state.get(), PartitionedHashJoinTestHelper::TEST_PARTITION_COUNT, + {tnode.hash_join_node.eq_join_conjuncts[0].left}, row_desc); + + // Create test input block + vectorized::Block input_block = + vectorized::ColumnHelper::create_block({1, 2, 3, 4, 5}); + + // Test pushing data + auto st = probe_operator->push(_helper.runtime_state.get(), &input_block, false); + ASSERT_TRUE(st.ok()); + + // Verify partitioned blocks + int64_t total_partitioned_rows = 0; + for (uint32_t i = 0; i < probe_operator->_partition_count; ++i) { + if (local_state->_partitioned_blocks[i]) { + total_partitioned_rows += local_state->_partitioned_blocks[i]->rows(); + } + } + ASSERT_EQ(total_partitioned_rows, 5); // All rows should be partitioned +} + +TEST_F(PartitionedHashJoinProbeOperatorTest, PushWithEOS) { + // Setup test environment + auto [probe_operator, sink_operator] = _helper.create_operators(); + + // Initialize local state + std::shared_ptr shared_state; + auto local_state = _helper.create_probe_local_state(_helper.runtime_state.get(), + probe_operator.get(), shared_state); + + // Setup row descriptor and partitioner + RowDescriptor row_desc(_helper.runtime_state->desc_tbl(), {0}, {false}); + const auto& tnode = probe_operator->_tnode; + local_state->_partitioner = create_spill_partitioner( + _helper.runtime_state.get(), PartitionedHashJoinTestHelper::TEST_PARTITION_COUNT, + {tnode.hash_join_node.eq_join_conjuncts[0].left}, row_desc); + + // Create test data and push with EOS + vectorized::Block input_block = + vectorized::ColumnHelper::create_block({1, 2, 3}); + + auto st = probe_operator->push(_helper.runtime_state.get(), &input_block, false); + ASSERT_TRUE(st.ok()) << "Push failed: " << st.to_string(); + + input_block.clear(); + st = probe_operator->push(_helper.runtime_state.get(), &input_block, true); + ASSERT_TRUE(st.ok()) << "Push failed: " << st.to_string(); + + // Verify all data is moved to probe blocks due to EOS + int64_t total_probe_block_rows = 0; + for (uint32_t i = 0; i < probe_operator->_partition_count; ++i) { + for (const auto& block : local_state->_probe_blocks[i]) { + total_probe_block_rows += block.rows(); + } + } + ASSERT_EQ(total_probe_block_rows, 3); // All rows should be in probe blocks + + // Verify partitioned blocks are cleared + for (uint32_t i = 0; i < probe_operator->_partition_count; ++i) { + ASSERT_EQ(local_state->_partitioned_blocks[i], nullptr); + } +} + +TEST_F(PartitionedHashJoinProbeOperatorTest, PushLargeBlock) { + // Setup test environment + auto [probe_operator, sink_operator] = _helper.create_operators(); + + // Initialize local state + std::shared_ptr shared_state; + auto local_state = _helper.create_probe_local_state(_helper.runtime_state.get(), + probe_operator.get(), shared_state); + + // Setup row descriptor and partitioner + RowDescriptor row_desc(_helper.runtime_state->desc_tbl(), {0}, {false}); + const auto& tnode = probe_operator->_tnode; + local_state->_partitioner = create_spill_partitioner( + _helper.runtime_state.get(), PartitionedHashJoinTestHelper::TEST_PARTITION_COUNT, + {tnode.hash_join_node.eq_join_conjuncts[0].left}, row_desc); + + // Create large input block (> 2M rows) + std::vector large_data(3 * 1024 * 1024); + std::iota(large_data.begin(), large_data.end(), 0); + vectorized::Block large_block = + vectorized::ColumnHelper::create_block(large_data); + + // Push large block + auto st = probe_operator->push(_helper.runtime_state.get(), &large_block, false); + ASSERT_TRUE(st.ok()); + + // Verify some partitions have blocks moved to probe_blocks due to size threshold + bool found_probe_blocks = false; + size_t partitioned_rows_count = 0; + for (uint32_t i = 0; i < probe_operator->_partition_count; ++i) { + if (!local_state->_probe_blocks[i].empty()) { + for (auto& block : local_state->_probe_blocks[i]) { + if (!block.empty()) { + partitioned_rows_count += block.rows(); + found_probe_blocks = true; + } + } + } + if (local_state->_partitioned_blocks[i] && !local_state->_partitioned_blocks[i]->empty()) { + partitioned_rows_count += local_state->_partitioned_blocks[i]->rows(); + found_probe_blocks = true; + } + } + + ASSERT_EQ(partitioned_rows_count, large_block.rows()); + ASSERT_TRUE(found_probe_blocks); + + // Verify bytes counter + auto* probe_blocks_bytes = local_state->profile()->get_counter("ProbeBloksBytesInMem"); + ASSERT_GT(probe_blocks_bytes->value(), 0); +} + +TEST_F(PartitionedHashJoinProbeOperatorTest, PullBasic) { + auto [probe_operator, sink_operator] = _helper.create_operators(); + + std::shared_ptr shared_state; + auto local_state = _helper.create_probe_local_state(_helper.runtime_state.get(), + probe_operator.get(), shared_state); + + local_state->_need_to_setup_internal_operators = true; + local_state->_partition_cursor = 0; + + vectorized::Block test_block; + bool eos = false; + + auto st = probe_operator->pull(_helper.runtime_state.get(), &test_block, &eos); + ASSERT_TRUE(st.ok()) << "Pull failed: " << st.to_string(); + ASSERT_FALSE(eos) << "First pull should not be eos"; + + ASSERT_EQ(1, local_state->_partition_cursor) << "Partition cursor should be 1"; +} + +TEST_F(PartitionedHashJoinProbeOperatorTest, PullMultiplePartitions) { + auto [probe_operator, sink_operator] = _helper.create_operators(); + std::shared_ptr shared_state; + auto local_state = _helper.create_probe_local_state(_helper.runtime_state.get(), + probe_operator.get(), shared_state); + + for (uint32_t i = 0; i < PartitionedHashJoinTestHelper::TEST_PARTITION_COUNT; i++) { + auto& probe_blocks = local_state->_probe_blocks[i]; + probe_blocks.emplace_back( + vectorized::ColumnHelper::create_block({1, 2, 3})); + } + + vectorized::Block output_block; + bool eos = false; + + for (uint32_t i = 0; i < PartitionedHashJoinTestHelper::TEST_PARTITION_COUNT; i++) { + local_state->_partition_cursor = i; + local_state->_need_to_setup_internal_operators = true; + + auto st = probe_operator->pull(_helper.runtime_state.get(), &output_block, &eos); + ASSERT_TRUE(st.ok()) << "Pull failed for partition " << i; + + if (i == PartitionedHashJoinTestHelper::TEST_PARTITION_COUNT - 1) { + ASSERT_TRUE(eos) << "Last partition should be eos"; + } else { + ASSERT_FALSE(eos) << "Non-last partition should not be eos"; + } + } +} + +TEST_F(PartitionedHashJoinProbeOperatorTest, PullWithDiskRecovery) { + auto [probe_operator, sink_operator] = _helper.create_operators(); + std::shared_ptr shared_state; + auto local_state = _helper.create_probe_local_state(_helper.runtime_state.get(), + probe_operator.get(), shared_state); + + local_state->_shared_state->need_to_spill = true; + + const uint32_t test_partition = 0; + auto& spilled_stream = local_state->_shared_state->spilled_streams[test_partition]; + auto& spilling_stream = local_state->_probe_spilling_streams[test_partition]; + + local_state->_need_to_setup_internal_operators = true; + + auto st = ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream( + _helper.runtime_state.get(), spilled_stream, + print_id(_helper.runtime_state->query_id()), "hash_probe_spilled", + probe_operator->node_id(), std::numeric_limits::max(), + std::numeric_limits::max(), local_state->profile()); + + ASSERT_TRUE(st) << "Register spill stream failed: " << st.to_string(); + st = ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream( + _helper.runtime_state.get(), spilling_stream, + print_id(_helper.runtime_state->query_id()), "hash_probe", probe_operator->node_id(), + std::numeric_limits::max(), std::numeric_limits::max(), + local_state->profile()); + + ASSERT_TRUE(st) << "Register spill stream failed: " << st.to_string(); + + vectorized::Block spill_block = + vectorized::ColumnHelper::create_block({1, 2, 3}); + st = spilled_stream->spill_block(_helper.runtime_state.get(), spill_block, true); + ASSERT_TRUE(st) << "Spill block failed: " << st.to_string(); + st = spilling_stream->spill_block(_helper.runtime_state.get(), spill_block, false); + ASSERT_TRUE(st) << "Spill block failed: " << st.to_string(); + + vectorized::Block output_block; + bool eos = false; + + st = probe_operator->pull(_helper.runtime_state.get(), &output_block, &eos); + ASSERT_TRUE(st.ok()) << "Pull failed: " << st.to_string(); + while (local_state->_spill_dependency->_ready.load() == false) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + st = probe_operator->pull(_helper.runtime_state.get(), &output_block, &eos); + while (local_state->_spill_dependency->_ready.load() == false) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + ASSERT_TRUE(st.ok()) << "Pull failed: " << st.to_string(); + ASSERT_FALSE(eos) << "Should not be eos during disk recovery"; + + ASSERT_GT(local_state->_recovery_probe_rows->value(), 0) + << "Should have recovered some rows from disk"; +} + +TEST_F(PartitionedHashJoinProbeOperatorTest, PullWithEmptyPartition) { + auto [probe_operator, sink_operator] = _helper.create_operators(); + std::shared_ptr shared_state; + auto local_state = _helper.create_probe_local_state(_helper.runtime_state.get(), + probe_operator.get(), shared_state); + + // 设置空分区 + local_state->_partition_cursor = 0; + local_state->_need_to_setup_internal_operators = true; + + vectorized::Block output_block; + bool eos = false; + + auto st = probe_operator->pull(_helper.runtime_state.get(), &output_block, &eos); + ASSERT_TRUE(st.ok()) << "Pull failed for empty partition"; + ASSERT_FALSE(eos) << "Should not be eos for first empty partition"; + + // 验证分区游标已更新 + ASSERT_EQ(1, local_state->_partition_cursor) + << "Partition cursor should move to next after empty partition"; +} + +TEST_F(PartitionedHashJoinProbeOperatorTest, Other) { + auto [probe_operator, _] = _helper.create_operators(); + + std::shared_ptr shared_state; + auto local_state = _helper.create_probe_local_state(_helper.runtime_state.get(), + probe_operator.get(), shared_state); + + auto st = probe_operator->_setup_internal_operator_for_non_spill(*local_state, + _helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "Setup internal operator failed: " << st.to_string(); + + local_state->_shared_state->need_to_spill = true; + ASSERT_FALSE(probe_operator->_should_revoke_memory(_helper.runtime_state.get())); + + st = probe_operator->_revoke_memory(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "Revoke memory failed: " << st.to_string(); +} + +} // namespace doris::pipeline \ No newline at end of file diff --git a/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp b/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp new file mode 100644 index 00000000000000..0514d481d3ad89 --- /dev/null +++ b/be/test/pipeline/operator/partitioned_hash_join_sink_operator_test.cpp @@ -0,0 +1,398 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "pipeline/exec/partitioned_hash_join_sink_operator.h" + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "common/config.h" +#include "common/exception.h" +#include "partitioned_hash_join_test_helper.h" +#include "pipeline/common/data_gen_functions/vnumbers_tvf.h" +#include "pipeline/exec/operator.h" +#include "pipeline/exec/partitioned_hash_join_probe_operator.h" +#include "pipeline/pipeline_task.h" +#include "runtime/define_primitive_type.h" +#include "runtime/descriptor_helper.h" +#include "runtime/descriptors.h" +#include "runtime/exec_env.h" +#include "runtime/query_context.h" +#include "runtime/runtime_state.h" +#include "testutil/column_helper.h" +#include "testutil/mock/mock_data_stream_sender.h" +#include "testutil/mock/mock_descriptors.h" +#include "testutil/mock/mock_operators.h" +#include "testutil/mock/mock_runtime_state.h" +#include "util/runtime_profile.h" +#include "vec/core/block.h" +#include "vec/data_types/data_type.h" +#include "vec/data_types/data_type_number.h" +#include "vec/exprs/vexpr_context.h" +#include "vec/exprs/vexpr_fwd.h" +#include "vec/spill/spill_stream_manager.h" + +namespace doris::pipeline { + +class PartitionedHashJoinSinkOperatorTest : public testing::Test { +public: + ~PartitionedHashJoinSinkOperatorTest() override = default; + void SetUp() override { _helper.SetUp(); } + + void TearDown() override { _helper.TearDown(); } + +protected: + PartitionedHashJoinTestHelper _helper; +}; + +TEST_F(PartitionedHashJoinSinkOperatorTest, Init) { + TPlanNode tnode = _helper.create_test_plan_node(); + const DescriptorTbl& desc_tbl = _helper.runtime_state->desc_tbl(); + + ASSERT_EQ(desc_tbl.get_tuple_descs().size(), 2); + + tnode.row_tuples.push_back(desc_tbl.get_tuple_descs().front()->id()); + tnode.nullable_tuples.push_back(false); + + PartitionedHashJoinSinkOperatorX operator_x( + _helper.obj_pool.get(), 0, 0, tnode, desc_tbl, + PartitionedHashJoinTestHelper::TEST_PARTITION_COUNT); + + auto child = std::make_shared(); + child->_row_descriptor = RowDescriptor(_helper.runtime_state->desc_tbl(), {1}, {false}); + EXPECT_TRUE(operator_x.set_child(child)); + + ASSERT_TRUE(operator_x.init(tnode, _helper.runtime_state.get())); + ASSERT_EQ(operator_x._partition_count, PartitionedHashJoinTestHelper::TEST_PARTITION_COUNT); + ASSERT_TRUE(operator_x._partitioner != nullptr); + + // ObjectPool* pool, int operator_id, int dest_id, + // const TPlanNode& tnode, const DescriptorTbl& descs + // ObjectPool* pool, const TPlanNode& tnode, int operator_id, + // const DescriptorTbl& descs + operator_x.set_inner_operators( + std::make_shared(_helper.obj_pool.get(), 0, 0, tnode, + _helper.runtime_state->desc_tbl()), + std::make_shared(_helper.obj_pool.get(), tnode, 0, + _helper.runtime_state->desc_tbl())); + + auto st = operator_x.prepare(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "Prepare failed: " << st.to_string(); +} + +TEST_F(PartitionedHashJoinSinkOperatorTest, InitLocalState) { + auto [_, sink_operator] = _helper.create_operators(); + + auto shared_state = std::make_shared(); + auto local_state_uptr = PartitionedHashJoinSinkLocalState::create_unique( + sink_operator.get(), _helper.runtime_state.get()); + auto local_state = local_state_uptr.get(); + shared_state = std::make_shared(); + + _helper.runtime_state->emplace_sink_local_state(sink_operator->operator_id(), + std::move(local_state_uptr)); + + auto st = sink_operator->init(sink_operator->_tnode, _helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "Init failed: " << st.to_string(); + + EXPECT_TRUE(sink_operator->_inner_sink_operator->set_child(nullptr)); + sink_operator->_inner_probe_operator->_build_side_child = nullptr; + sink_operator->_inner_probe_operator->_child = nullptr; + + st = sink_operator->prepare(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "Prepare failed: " << st.to_string(); + + RuntimeProfile runtime_profile("test"); + std::map, std::shared_ptr>> + le_state_map; + TDataSink t_sink; + LocalSinkStateInfo info {.parent_profile = &runtime_profile, + .shared_state = shared_state.get(), + .le_state_map = le_state_map, + .tsink = t_sink}; + st = local_state->init(_helper.runtime_state.get(), info); + ASSERT_TRUE(st) << "init failed: " << st.to_string(); + + st = local_state->open(_helper.runtime_state.get()); + ASSERT_TRUE(st) << "open failed: " << st.to_string(); + + local_state->update_memory_usage(); + + shared_state->need_to_spill = false; + auto reserve_size = local_state->get_reserve_mem_size(_helper.runtime_state.get(), false); + + shared_state->need_to_spill = true; + reserve_size = local_state->get_reserve_mem_size(_helper.runtime_state.get(), false); + ASSERT_EQ(reserve_size, + sink_operator->_partition_count * vectorized::SpillStream::MIN_SPILL_WRITE_BATCH_MEM); + + auto* finish_dep = local_state->finishdependency(); + ASSERT_TRUE(finish_dep != nullptr); + + shared_state->need_to_spill = false; + + st = local_state->close(_helper.runtime_state.get(), Status::OK()); + ASSERT_TRUE(st) << "close failed: " << st.to_string(); + + st = local_state->close(_helper.runtime_state.get(), Status::OK()); + ASSERT_TRUE(st) << "close failed: " << st.to_string(); +} + +TEST_F(PartitionedHashJoinSinkOperatorTest, InitBuildExprs) { + TPlanNode tnode = _helper.create_test_plan_node(); + // 添加多个等值连接条件来测试表达式构建 + for (int i = 0; i < 3; i++) { + TEqJoinCondition eq_cond; + eq_cond.left = TExpr(); + eq_cond.right = TExpr(); + tnode.hash_join_node.eq_join_conjuncts.push_back(eq_cond); + } + + DescriptorTbl desc_tbl; + PartitionedHashJoinSinkOperatorX operator_x( + _helper.obj_pool.get(), 0, 0, tnode, desc_tbl, + PartitionedHashJoinTestHelper::TEST_PARTITION_COUNT); + + ASSERT_TRUE(operator_x.init(tnode, _helper.runtime_state.get())); + ASSERT_EQ(operator_x._build_exprs.size(), 4); // 1个初始 + 3个新增 +} + +TEST_F(PartitionedHashJoinSinkOperatorTest, Prepare) { + auto [_, sink_operator] = _helper.create_operators(); + + const auto& tnode = sink_operator->_tnode; + + // 初始化操作符 + ASSERT_TRUE(sink_operator->init(tnode, _helper.runtime_state.get())); + + sink_operator->_partitioner = + std::make_unique(PartitionedHashJoinTestHelper::TEST_PARTITION_COUNT); + + sink_operator->_inner_sink_operator->_child = nullptr; + sink_operator->_inner_probe_operator->_build_side_child = nullptr; + sink_operator->_inner_probe_operator->_child = nullptr; + + auto st = sink_operator->prepare(_helper.runtime_state.get()); + ASSERT_TRUE(st) << "Open failed: " << st.to_string(); + + ASSERT_TRUE(sink_operator->_partitioner != nullptr); +} + +TEST_F(PartitionedHashJoinSinkOperatorTest, Sink) { + auto [_, sink_operator] = _helper.create_operators(); + + std::shared_ptr shared_state; + auto* sink_local_state = _helper.create_sink_local_state(_helper.runtime_state.get(), + sink_operator.get(), shared_state); + + auto read_dependency = + Dependency::create_shared(sink_operator->operator_id(), sink_operator->node_id(), + "HashJoinBuildReadDependency", false); + sink_local_state->_shared_state->need_to_spill = false; + + shared_state->source_deps.emplace_back(read_dependency); + + vectorized::Block block; + bool eos = true; + + ASSERT_EQ(read_dependency->_ready.load(), false); + auto st = sink_operator->sink(_helper.runtime_state.get(), &block, eos); + ASSERT_TRUE(st.ok()) << "Sink failed: " << st.to_string(); + + ASSERT_EQ(read_dependency->_ready.load(), true); +} + +TEST_F(PartitionedHashJoinSinkOperatorTest, SinkEosAndSpill) { + auto [_, sink_operator] = _helper.create_operators(); + + auto shared_state = std::make_shared(); + + std::map, std::shared_ptr>> + le_state_map; + LocalSinkStateInfo sink_info {.task_idx = 0, + .parent_profile = _helper.runtime_profile.get(), + .shared_state = shared_state.get(), + .le_state_map = le_state_map, + .tsink = TDataSink()}; + auto st = sink_operator->setup_local_state(_helper.runtime_state.get(), sink_info); + ASSERT_TRUE(st.ok()) << "Setup local state failed: " << st.to_string(); + + st = sink_operator->init(sink_operator->_tnode, _helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "Init failed: " << st.to_string(); + + sink_operator->_inner_sink_operator->_child = nullptr; + sink_operator->_inner_probe_operator->_build_side_child = nullptr; + sink_operator->_inner_probe_operator->_child = nullptr; + st = sink_operator->prepare(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "Open failed: " << st.to_string(); + + auto* sink_local_state = reinterpret_cast( + _helper.runtime_state->get_sink_local_state()); + ASSERT_TRUE(sink_local_state != nullptr) << "no sink local state"; + + st = sink_local_state->open(_helper.runtime_state.get()); + ASSERT_TRUE(st.ok()) << "Open failed: " << st.to_string(); + + auto read_dependency = + Dependency::create_shared(sink_operator->operator_id(), sink_operator->node_id(), + "HashJoinBuildReadDependency", false); + shared_state->source_deps.emplace_back(read_dependency); + + vectorized::Block block; + + // sink empty block + sink_local_state->_shared_state->need_to_spill = false; + ASSERT_EQ(read_dependency->_ready.load(), false); + st = sink_operator->sink(_helper.runtime_state.get(), &block, false); + ASSERT_TRUE(st.ok()) << "Sink failed: " << st.to_string(); + + block = vectorized::ColumnHelper::create_block({1, 2, 3}); + + // sink non-empty block + st = sink_operator->sink(_helper.runtime_state.get(), &block, false); + ASSERT_TRUE(st.ok()) << "Sink failed: " << st.to_string(); + + sink_local_state->_shared_state->need_to_spill = true; + ASSERT_EQ(read_dependency->_ready.load(), false); + st = sink_operator->sink(_helper.runtime_state.get(), &block, false); + ASSERT_TRUE(st.ok()) << "Sink failed: " << st.to_string(); + + while (sink_local_state->_spill_dependency->_ready.load() == false) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + ASSERT_EQ(read_dependency->_ready.load(), false); + st = sink_operator->sink(_helper.runtime_state.get(), &block, true); + ASSERT_TRUE(st.ok()) << "Sink failed: " << st.to_string(); + + while (read_dependency->_ready.load() == false) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + + ASSERT_TRUE(sink_local_state->_dependency->_ready.load()); +} + +TEST_F(PartitionedHashJoinSinkOperatorTest, RevokeMemoryEmpty) { + auto [_, sink_operator] = _helper.create_operators(); + + std::shared_ptr shared_state; + auto* sink_state = _helper.create_sink_local_state(_helper.runtime_state.get(), + sink_operator.get(), shared_state); + + // Expect revoke memory to trigger spilling + auto status = sink_state->revoke_memory(_helper.runtime_state.get(), nullptr); + ASSERT_TRUE(status.ok()) << "Revoke memory failed: " << status.to_string(); + ASSERT_TRUE(sink_state->_shared_state->need_to_spill); +} + +TEST_F(PartitionedHashJoinSinkOperatorTest, RevokeMemory) { + auto [_, sink_operator] = _helper.create_operators(); + + std::shared_ptr shared_state; + auto sink_state = _helper.create_sink_local_state(_helper.runtime_state.get(), + sink_operator.get(), shared_state); + + // prepare & set child operator + auto child = std::dynamic_pointer_cast(sink_operator->child()); + + RowDescriptor row_desc(_helper.runtime_state->desc_tbl(), {1}, {false}); + child->_row_descriptor = row_desc; + EXPECT_EQ(child->row_desc().num_slots(), 1); + + const auto& tnode = sink_operator->_tnode; + // prepare and set partitioner + auto partitioner = std::make_unique(sink_operator->_partition_count); + auto status = partitioner->init({tnode.hash_join_node.eq_join_conjuncts[0].right}); + ASSERT_TRUE(status.ok()) << "Init partitioner failed: " << status.to_string(); + status = partitioner->prepare(_helper.runtime_state.get(), sink_operator->_child->row_desc()); + ASSERT_TRUE(status.ok()) << "Prepare partitioner failed: " << status.to_string(); + sink_state->_partitioner = std::move(partitioner); + sink_state->_shared_state->need_to_spill = false; + + DCHECK_GE(sink_operator->_child->row_desc().get_column_id(1), 0); + + for (uint32_t i = 0; i != sink_operator->_partition_count; ++i) { + auto& spilling_stream = sink_state->_shared_state->spilled_streams[i]; + auto st = (ExecEnv::GetInstance()->spill_stream_mgr()->register_spill_stream( + _helper.runtime_state.get(), spilling_stream, + print_id(_helper.runtime_state->query_id()), fmt::format("hash_build_sink_{}", i), + sink_operator->node_id(), std::numeric_limits::max(), + std::numeric_limits::max(), sink_state->profile())); + ASSERT_TRUE(st.ok()) << "Register spill stream failed: " << st.to_string(); + } + + auto& inner_sink = sink_operator->_inner_sink_operator; + + auto inner_sink_local_state = std::make_unique( + inner_sink.get(), sink_state->_shared_state->inner_runtime_state.get()); + inner_sink_local_state->_hash_table_memory_usage = + sink_state->profile()->add_counter("HashTableMemoryUsage", TUnit::BYTES); + inner_sink_local_state->_build_arena_memory_usage = + sink_state->profile()->add_counter("BuildArenaMemoryUsage", TUnit::BYTES); + + auto block = vectorized::ColumnHelper::create_block({1, 2, 3}); + ASSERT_EQ(block.rows(), 3); + inner_sink_local_state->_build_side_mutable_block = std::move(block); + + sink_state->_shared_state->inner_runtime_state->emplace_sink_local_state( + 0, std::move(inner_sink_local_state)); + + // Expect revoke memory to trigger spilling + status = sink_state->revoke_memory(_helper.runtime_state.get(), nullptr); + ASSERT_TRUE(status.ok()) << "Revoke memory failed: " << status.to_string(); + ASSERT_TRUE(sink_state->_shared_state->need_to_spill); + + std::cout << "wait for spill dependency ready" << std::endl; + while (sink_state->_spill_dependency->_ready.load() == false) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + std::cout << "spill dependency ready" << std::endl; + + std::cout << "profile: " << sink_state->profile()->pretty_print() << std::endl; + + auto written_rows_counter = sink_state->profile()->get_counter("SpillWriteRows"); + auto written_rows = written_rows_counter->value(); + ASSERT_EQ(written_rows, 2) << "SpillWriteRows: " << written_rows_counter->value(); + + std::vector large_data(3 * 1024 * 1024); + std::iota(large_data.begin(), large_data.end(), 0); + vectorized::Block large_block = + vectorized::ColumnHelper::create_block(large_data); + + sink_state->_shared_state->partitioned_build_blocks[0] = + vectorized::MutableBlock::create_unique(std::move(large_block)); + status = sink_state->revoke_memory(_helper.runtime_state.get(), nullptr); + ASSERT_TRUE(status.ok()) << "Revoke memory failed: " << status.to_string(); + std::cout << "wait for spill dependency ready" << std::endl; + while (sink_state->_spill_dependency->_ready.load() == false) { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } + std::cout << "spill dependency ready" << std::endl; + + ASSERT_EQ(written_rows + 3 * 1024 * 1024, written_rows_counter->value()); +} + +} // namespace doris::pipeline \ No newline at end of file diff --git a/be/test/pipeline/operator/partitioned_hash_join_test_helper.cpp b/be/test/pipeline/operator/partitioned_hash_join_test_helper.cpp new file mode 100644 index 00000000000000..03f9b3f8ae0961 --- /dev/null +++ b/be/test/pipeline/operator/partitioned_hash_join_test_helper.cpp @@ -0,0 +1,243 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "partitioned_hash_join_test_helper.h" + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +namespace doris::pipeline { +void PartitionedHashJoinTestHelper::SetUp() { + runtime_state = std::make_unique(); + obj_pool = std::make_unique(); + + runtime_profile = std::make_shared("test"); + + query_ctx = generate_one_query(); + + runtime_state->_query_ctx = query_ctx.get(); + runtime_state->_query_id = query_ctx->query_id(); + runtime_state->resize_op_id_to_local_state(-100); + + ADD_TIMER(runtime_profile.get(), "ExecTime"); + runtime_profile->AddHighWaterMarkCounter("MemoryUsed", TUnit::BYTES, "", 0); + + auto desc_table = create_test_table_descriptor(); + auto st = DescriptorTbl::create(obj_pool.get(), desc_table, &desc_tbl); + DCHECK(!desc_table.slotDescriptors.empty()); + EXPECT_TRUE(st.ok()) << "create descriptor table failed: " << st.to_string(); + runtime_state->set_desc_tbl(desc_tbl); + + auto spill_data_dir = std::make_unique("/tmp/partitioned_join_test", + 1024L * 1024 * 4); + st = io::global_local_filesystem()->create_directory(spill_data_dir->path(), false); + EXPECT_TRUE(st.ok()) << "create directory: " << spill_data_dir->path() + << " failed: " << st.to_string(); + std::unordered_map> data_map; + data_map.emplace("test", std::move(spill_data_dir)); + auto* spill_stream_manager = new vectorized::SpillStreamManager(std::move(data_map)); + ExecEnv::GetInstance()->_spill_stream_mgr = spill_stream_manager; + st = spill_stream_manager->init(); + EXPECT_TRUE(st.ok()) << "init spill stream manager failed: " << st.to_string(); +} + +void PartitionedHashJoinTestHelper::TearDown() { + ExecEnv::GetInstance()->spill_stream_mgr()->async_cleanup_query(runtime_state->query_id()); + doris::ExecEnv::GetInstance()->spill_stream_mgr()->get_spill_io_thread_pool()->wait(); + doris::ExecEnv::GetInstance()->spill_stream_mgr()->stop(); + SAFE_DELETE(ExecEnv::GetInstance()->_spill_stream_mgr); +} + +TPlanNode PartitionedHashJoinTestHelper::create_test_plan_node() { + TPlanNode tnode; + tnode.node_id = 0; + tnode.node_type = TPlanNodeType::HASH_JOIN_NODE; + tnode.num_children = 2; + tnode.hash_join_node.join_op = TJoinOp::INNER_JOIN; + + TEqJoinCondition eq_cond; + eq_cond.left = TExpr(); + eq_cond.right = TExpr(); + + tnode.row_tuples.push_back(0); + tnode.row_tuples.push_back(1); + tnode.nullable_tuples.push_back(false); + tnode.nullable_tuples.push_back(false); + tnode.node_type = TPlanNodeType::HASH_JOIN_NODE; + tnode.hash_join_node.join_op = TJoinOp::INNER_JOIN; + tnode.__isset.hash_join_node = true; + + tnode.hash_join_node.vintermediate_tuple_id_list.emplace_back(0); + tnode.hash_join_node.__isset.vintermediate_tuple_id_list = true; + + tnode.output_tuple_id = 0; + tnode.__isset.output_tuple_id = true; + + // TEqJoinCondition& eq_cond = tnode.hash_join_node.eq_join_conjuncts[0]; + eq_cond.left.nodes.emplace_back(); + eq_cond.right.nodes.emplace_back(); + eq_cond.left.nodes[0].node_type = TExprNodeType::SLOT_REF; + eq_cond.right.nodes[0].node_type = TExprNodeType::SLOT_REF; + + TTypeNode type_node; + type_node.type = TTypeNodeType::SCALAR; + type_node.scalar_type.type = TPrimitiveType::INT; + type_node.__isset.scalar_type = true; + + eq_cond.left.nodes[0].type.types.emplace_back(type_node); + eq_cond.right.nodes[0].type.types.emplace_back(type_node); + eq_cond.left.nodes[0].num_children = 0; + eq_cond.right.nodes[0].num_children = 0; + eq_cond.left.nodes[0].slot_ref.slot_id = 0; + eq_cond.right.nodes[0].slot_ref.slot_id = 1; + tnode.hash_join_node.eq_join_conjuncts.push_back(eq_cond); + + return tnode; +} + +std::tuple, + std::shared_ptr> +PartitionedHashJoinTestHelper::create_operators() { + TPlanNode tnode = create_test_plan_node(); + auto desc_tbl = runtime_state->desc_tbl(); + + EXPECT_EQ(desc_tbl.get_tuple_descs().size(), 2); + + auto probe_operator = std::make_shared( + obj_pool.get(), tnode, 0, desc_tbl, TEST_PARTITION_COUNT); + auto sink_operator = std::make_shared( + obj_pool.get(), 0, 0, tnode, desc_tbl, TEST_PARTITION_COUNT); + + auto child_operator = std::make_shared(); + auto probe_side_source_operator = std::make_shared(); + auto probe_side_sink_operator = std::make_shared(); + auto [probe_pipeline, _] = generate_hash_join_pipeline(probe_operator, child_operator, + probe_side_sink_operator, sink_operator); + + RowDescriptor row_desc(runtime_state->desc_tbl(), {1}, {false}); + child_operator->_row_descriptor = row_desc; + + RowDescriptor row_desc_probe(runtime_state->desc_tbl(), {0}, {false}); + probe_side_source_operator->_row_descriptor = row_desc_probe; + + EXPECT_TRUE(probe_operator->set_child(probe_side_source_operator)); + EXPECT_TRUE(probe_operator->set_child(child_operator)); + EXPECT_TRUE(sink_operator->set_child(child_operator)); + + auto inner_sink_operator = std::make_shared( + obj_pool.get(), 0, 0, tnode, runtime_state->desc_tbl()); + auto inner_probe_operator = std::make_shared( + obj_pool.get(), tnode, 0, runtime_state->desc_tbl()); + + EXPECT_TRUE(inner_sink_operator->set_child(child_operator)); + EXPECT_TRUE(inner_probe_operator->set_child(probe_side_source_operator)); + EXPECT_TRUE(inner_probe_operator->set_child(child_operator)); + + auto st = inner_sink_operator->init(tnode, runtime_state.get()); + EXPECT_TRUE(st.ok()) << "Init inner sink operator failed: " << st.to_string(); + st = inner_probe_operator->init(tnode, runtime_state.get()); + EXPECT_TRUE(st.ok()) << "Init inner probe operator failed: " << st.to_string(); + + probe_operator->set_inner_operators(inner_sink_operator, inner_probe_operator); + + sink_operator->set_inner_operators(inner_sink_operator, inner_probe_operator); + + // Setup task and state + std::map, std::shared_ptr>> + le_state_map; + pipeline_task = std::make_shared(probe_pipeline, 0, runtime_state.get(), nullptr, + nullptr, le_state_map, 0); + runtime_state->set_task(pipeline_task.get()); + return {probe_operator, sink_operator}; +} + +PartitionedHashJoinProbeLocalState* PartitionedHashJoinTestHelper::create_probe_local_state( + RuntimeState* state, PartitionedHashJoinProbeOperatorX* probe_operator, + std::shared_ptr& shared_state) { + auto local_state_uptr = + std::make_unique(state, probe_operator); + auto local_state = local_state_uptr.get(); + shared_state = std::make_shared(); + local_state->init_counters(); + local_state->_shared_state = shared_state.get(); + shared_state->need_to_spill = true; + + ADD_TIMER(local_state->profile(), "ExecTime"); + local_state->profile()->AddHighWaterMarkCounter("MemoryUsage", TUnit::BYTES, "", 0); + local_state->init_spill_read_counters(); + local_state->init_spill_write_counters(); + local_state->init_counters(); + local_state->_copy_shared_spill_profile = false; + local_state->_internal_runtime_profile = std::make_unique("inner_test"); + + local_state->_partitioned_blocks.resize(probe_operator->_partition_count); + local_state->_probe_spilling_streams.resize(probe_operator->_partition_count); + + local_state->_spill_dependency = + Dependency::create_shared(0, 0, "PartitionedHashJoinProbeOperatorTestSpillDep", true); + shared_state->spilled_streams.resize(probe_operator->_partition_count); + shared_state->partitioned_build_blocks.resize(probe_operator->_partition_count); + + shared_state->inner_runtime_state = std::make_unique(); + shared_state->inner_shared_state = std::make_shared(); + + state->emplace_local_state(probe_operator->operator_id(), std::move(local_state_uptr)); + return local_state; +} + +PartitionedHashJoinSinkLocalState* PartitionedHashJoinTestHelper::create_sink_local_state( + RuntimeState* state, PartitionedHashJoinSinkOperatorX* sink_operator, + std::shared_ptr& shared_state) { + auto local_state_uptr = std::make_unique( + sink_operator, state, obj_pool.get()); + auto local_state = local_state_uptr.get(); + shared_state = std::make_shared(); + local_state->init_spill_counters(); + local_state->_shared_state = shared_state.get(); + shared_state->need_to_spill = true; + + ADD_TIMER(local_state->profile(), "ExecTime"); + local_state->profile()->AddHighWaterMarkCounter("MemoryUsage", TUnit::BYTES, "", 0); + local_state->_internal_runtime_profile = std::make_unique("inner_test"); + + local_state->_dependency = shared_state->create_sink_dependency( + sink_operator->dests_id().front(), sink_operator->operator_id(), + "PartitionedHashJoinTestDep"); + + local_state->_spill_dependency = + Dependency::create_shared(0, 0, "PartitionedHashJoinSinkOperatorTestSpillDep", true); + + shared_state->spilled_streams.resize(sink_operator->_partition_count); + shared_state->partitioned_build_blocks.resize(sink_operator->_partition_count); + + shared_state->inner_runtime_state = std::make_unique(); + shared_state->inner_shared_state = std::make_shared(); + shared_state->setup_shared_profile(local_state->profile()); + + state->emplace_sink_local_state(sink_operator->operator_id(), std::move(local_state_uptr)); + return local_state; +} +} // namespace doris::pipeline \ No newline at end of file diff --git a/be/test/pipeline/operator/partitioned_hash_join_test_helper.h b/be/test/pipeline/operator/partitioned_hash_join_test_helper.h new file mode 100644 index 00000000000000..95865aea21e184 --- /dev/null +++ b/be/test/pipeline/operator/partitioned_hash_join_test_helper.h @@ -0,0 +1,292 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include "common/config.h" +#include "common/object_pool.h" +#include "olap/olap_define.h" +#include "pipeline/exec/partitioned_hash_join_sink_operator.h" +#include "pipeline/exec/spill_utils.h" +#include "pipeline/pipeline_task.h" +#include "runtime/exec_env.h" +#include "runtime/fragment_mgr.h" +#include "testutil/column_helper.h" +#include "testutil/creators.h" +#include "testutil/mock/mock_operators.h" +#include "testutil/mock/mock_runtime_state.h" +#include "util/debug_points.h" +#include "util/runtime_profile.h" +#include "vec/core/block.h" +#include "vec/data_types/data_type_number.h" +#include "vec/spill/spill_stream_manager.h" + +namespace doris::pipeline { + +class MockPartitioner : public vectorized::PartitionerBase { +public: + MockPartitioner(size_t partition_count) : PartitionerBase(partition_count) {} + Status init(const std::vector& texprs) override { return Status::OK(); } + + Status prepare(RuntimeState* state, const RowDescriptor& row_desc) override { + return Status::OK(); + } + + Status open(RuntimeState* state) override { return Status::OK(); } + + Status close(RuntimeState* state) override { return Status::OK(); } + + Status do_partitioning(RuntimeState* state, vectorized::Block* block, bool eos, + bool* already_sent) const override { + if (already_sent) { + *already_sent = false; + } + return Status::OK(); + } + + Status clone(RuntimeState* state, std::unique_ptr& partitioner) override { + partitioner = std::make_unique(_partition_count); + return Status::OK(); + } + + vectorized::ChannelField get_channel_ids() const override { return {}; } +}; + +class MockExpr : public vectorized::VExpr { +public: + Status prepare(RuntimeState* state, const RowDescriptor& row_desc, + vectorized::VExprContext* context) override { + return Status::OK(); + } + + Status open(RuntimeState* state, vectorized::VExprContext* context, + FunctionContext::FunctionStateScope scope) override { + return Status::OK(); + } +}; + +class MockHashJoinBuildSharedState : public HashJoinSharedState { +public: +}; + +class MockPartitionedHashJoinSharedState : public PartitionedHashJoinSharedState { +public: + MockPartitionedHashJoinSharedState() { + need_to_spill = false; + inner_runtime_state = nullptr; + spilled_streams.clear(); + partitioned_build_blocks.clear(); + } + + // 添加必要的初始化方法 + void init(size_t partition_count) { + spilled_streams.resize(partition_count); + partitioned_build_blocks.resize(partition_count); + } +}; + +class MockHashJoinSharedState : public HashJoinSharedState {}; + +class MockHashJoinBuildSinkLocalState : public HashJoinBuildSinkLocalState { +public: + // DataSinkOperatorXBase* parent, RuntimeState* state + MockHashJoinBuildSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) + : HashJoinBuildSinkLocalState(parent, state) { + _runtime_profile = std::make_unique("test"); + _profile = _runtime_profile.get(); + _memory_used_counter = + _profile->AddHighWaterMarkCounter("MemoryUsage", TUnit::BYTES, "", 1); + + ADD_TIMER(_profile, "PublishRuntimeFilterTime"); + ADD_TIMER(_profile, "BuildRuntimeFilterTime"); + ADD_TIMER(_profile, "BuildHashTableTime"); + ADD_TIMER(_profile, "MergeBuildBlockTime"); + ADD_TIMER(_profile, "BuildTableInsertTime"); + ADD_TIMER(_profile, "BuildExprCallTime"); + ADD_TIMER(_profile, "RuntimeFilterInitTime"); + ADD_COUNTER(_profile, "MemoryUsageBuildBlocks", TUnit::UNIT); + ADD_COUNTER(_profile, "MemoryUsageHashTable", TUnit::BYTES); + ADD_COUNTER(_profile, "MemoryUsageBuildKeyArena", TUnit::BYTES); + } + + Status init(RuntimeState* state, LocalSinkStateInfo& info) override { return Status::OK(); } + Status open(RuntimeState* state) override { return Status::OK(); } + Status close(RuntimeState* state, Status status) override { return Status::OK(); } + size_t get_reserve_mem_size(RuntimeState* state, bool eos) override { return 0; } + +private: + std::unique_ptr _runtime_profile; +}; + +class MockHashJoinBuildOperator : public HashJoinBuildSinkOperatorX { +public: + MockHashJoinBuildOperator(ObjectPool* pool, int operator_id, int dest_id, + const TPlanNode& tnode, const DescriptorTbl& descs) + : HashJoinBuildSinkOperatorX(pool, operator_id, dest_id, tnode, descs) {} + ~MockHashJoinBuildOperator() override = default; + + Status prepare(RuntimeState* state) override { return Status::OK(); } + + Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) override { + state->emplace_sink_local_state( + _operator_id, std::make_unique(this, state)); + return Status::OK(); + } + + Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override { + return Status::OK(); + } + + std::string get_memory_usage_debug_str(RuntimeState* state) const override { return "mock"; } +}; + +class MockFragmentManager : public FragmentMgr { +public: + MockFragmentManager(Status& status_, ExecEnv* exec_env) + : FragmentMgr(exec_env), status(status_) {} + void cancel_query(const TUniqueId query_id, const Status reason) override { status = reason; } + +private: + Status& status; +}; + +class MockHashJoinProbeLocalState : public HashJoinProbeLocalState { + ENABLE_FACTORY_CREATOR(MockHashJoinProbeLocalState); + +public: + MockHashJoinProbeLocalState(RuntimeState* state, OperatorXBase* parent) + : HashJoinProbeLocalState(state, parent) { + _runtime_profile = std::make_unique("test"); + } + + Status open(RuntimeState* state) override { return Status::OK(); } +}; + +class MockHashJoinProbeOperator : public HashJoinProbeOperatorX { +public: + MockHashJoinProbeOperator(ObjectPool* pool, const TPlanNode& tnode, int operator_id, + const DescriptorTbl& descs) + : HashJoinProbeOperatorX(pool, tnode, operator_id, descs) {} + ~MockHashJoinProbeOperator() override = default; + + Status push(RuntimeState* state, vectorized::Block* input_block, bool eos_) const override { + const_cast(this)->block.swap(*input_block); + const_cast(this)->eos = eos_; + const_cast(this)->need_more_data = !eos; + return Status::OK(); + } + + Status pull(doris::RuntimeState* state, vectorized::Block* output_block, + bool* eos_) const override { + output_block->swap(const_cast(this)->block); + *eos_ = eos; + const_cast(this)->block.clear_column_data(); + return Status::OK(); + } + + Status setup_local_state(RuntimeState* state, LocalStateInfo& info) override { + state->emplace_local_state(_operator_id, + std::make_unique(state, this)); + return Status::OK(); + } + + bool need_more_input_data(RuntimeState* state) const override { return need_more_data; } + bool need_more_data = true; + + vectorized::Block block; + bool eos = false; +}; + +class MockPartitionedHashJoinProbeLocalState : public PartitionedHashJoinProbeLocalState { +public: + MockPartitionedHashJoinProbeLocalState(RuntimeState* state, OperatorXBase* parent) + : PartitionedHashJoinProbeLocalState(state, parent) { + _runtime_profile = std::make_unique("test"); + } + + void init_counters() { + PartitionedHashJoinProbeLocalState::init_counters(); + _rows_returned_counter = + ADD_COUNTER_WITH_LEVEL(_runtime_profile, "RowsProduced", TUnit::UNIT, 1); + _blocks_returned_counter = + ADD_COUNTER_WITH_LEVEL(_runtime_profile, "BlocksProduced", TUnit::UNIT, 1); + _projection_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "ProjectionTime", 1); + _init_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "InitTime", 1); + _open_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "OpenTime", 1); + _close_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "CloseTime", 1); + _exec_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "ExecTime", 1); + _memory_used_counter = + _runtime_profile->AddHighWaterMarkCounter("MemoryUsage", TUnit::BYTES, "", 1); + } + + void update_profile_from_inner() override {}; +}; + +class MockPartitionedHashJoinSinkLocalState : public PartitionedHashJoinSinkLocalState { +public: + MockPartitionedHashJoinSinkLocalState(PartitionedHashJoinSinkOperatorX* parent, + RuntimeState* state, ObjectPool* pool) + : PartitionedHashJoinSinkLocalState(parent, state) { + _profile = pool->add(new RuntimeProfile("MockPartitionedHashJoinSinkLocalStateProfile")); + + _memory_used_counter = + _profile->AddHighWaterMarkCounter("MemoryUsage", TUnit::BYTES, "", 1); + } + + Status init(RuntimeState* state, LocalSinkStateInfo& info) override { return Status::OK(); } + + void update_profile_from_inner() override {} +}; + +class PartitionedHashJoinTestHelper { +public: + void SetUp(); + void TearDown(); + + TPlanNode create_test_plan_node(); + + PartitionedHashJoinProbeLocalState* create_probe_local_state( + RuntimeState* state, PartitionedHashJoinProbeOperatorX* probe_operator, + std::shared_ptr& shared_state); + + PartitionedHashJoinSinkLocalState* create_sink_local_state( + RuntimeState* state, PartitionedHashJoinSinkOperatorX* sink_operator, + std::shared_ptr& shared_state); + + std::tuple, + std::shared_ptr> + create_operators(); + + std::unique_ptr runtime_state; + std::unique_ptr obj_pool; + std::shared_ptr query_ctx; + std::shared_ptr runtime_profile; + std::shared_ptr pipeline_task; + DescriptorTbl* desc_tbl; + static constexpr uint32_t TEST_PARTITION_COUNT = 8; +}; +} // namespace doris::pipeline \ No newline at end of file diff --git a/be/test/testutil/creators.h b/be/test/testutil/creators.h new file mode 100644 index 00000000000000..91064ade29edc8 --- /dev/null +++ b/be/test/testutil/creators.h @@ -0,0 +1,109 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include +#include + +#include +#include +#include + +#include "pipeline/exec/operator.h" +#include "pipeline/exec/spill_utils.h" +#include "pipeline/pipeline.h" +#include "runtime/descriptor_helper.h" +#include "runtime/descriptors.h" +#include "runtime/query_context.h" +#include "runtime/types.h" +#include "util/uid_util.h" + +namespace doris { +using namespace pipeline; +inline std::shared_ptr generate_one_query(const TQueryOptions& options) { + TNetworkAddress fe_address; + fe_address.hostname = "127.0.0.1"; + fe_address.port = 8060; + auto query_context = QueryContext::create_shared(generate_uuid(), ExecEnv::GetInstance(), + options, TNetworkAddress {}, true, fe_address, + QuerySource::INTERNAL_FRONTEND); + return query_context; +} + +inline std::shared_ptr generate_one_query() { + TQueryOptions query_options; + query_options.query_type = TQueryType::SELECT; + query_options.mem_limit = 1024L * 1024 * 128; + query_options.query_slot_count = 1; + return generate_one_query(query_options); +} + +inline TDescriptorTable create_test_table_descriptor(bool nullable = false) { + TTupleDescriptorBuilder tuple_builder; + tuple_builder.add_slot(TSlotDescriptorBuilder() + .type(PrimitiveType::TYPE_INT) + .column_name("col1") + .column_pos(0) + .nullable(nullable) + .build()); + + TDescriptorTableBuilder builder; + + tuple_builder.build(&builder); + + TTupleDescriptorBuilder() + .add_slot(TSlotDescriptorBuilder() + .type(TYPE_INT) + .column_name("col2") + .column_pos(0) + .nullable(nullable) + .build()) + .build(&builder); + + return builder.desc_tbl(); +} + +inline std::pair generate_hash_join_pipeline( + std::shared_ptr probe_operator, + std::shared_ptr build_side_source, + pipeline::DataSinkOperatorPtr probe_side_sink_operator, DataSinkOperatorPtr sink_operator) { + auto probe_pipeline = std::make_shared(0, 1, 1); + auto build_pipeline = std::make_shared(1, 1, 1); + + static_cast(probe_pipeline->add_operator(probe_operator, 1)); + static_cast(probe_pipeline->set_sink(probe_side_sink_operator)); + static_cast(build_pipeline->add_operator(build_side_source, 1)); + static_cast(build_pipeline->set_sink(sink_operator)); + + return {probe_pipeline, build_pipeline}; +} + +inline std::unique_ptr create_spill_partitioner( + RuntimeState* state, const int32_t partition_count, const std::vector& exprs, + const RowDescriptor& row_desc) { + auto partitioner = std::make_unique(partition_count); + auto st = partitioner->init(exprs); + DCHECK(st.ok()) << "init partitioner failed: " << st.to_string(); + st = partitioner->prepare(state, row_desc); + DCHECK(st.ok()) << "prepare partitioner failed: " << st.to_string(); + return partitioner; +} + +} // namespace doris \ No newline at end of file diff --git a/be/test/testutil/mock/mock_descriptors.h b/be/test/testutil/mock/mock_descriptors.h index 67ab1772756217..ffb37edf424d75 100644 --- a/be/test/testutil/mock/mock_descriptors.h +++ b/be/test/testutil/mock/mock_descriptors.h @@ -17,6 +17,9 @@ #pragma once +#include +#include + #include #include "runtime/descriptors.h" @@ -45,10 +48,12 @@ class MockRowDescriptor : public RowDescriptor { auto* slot = pool->add(new MockSlotDescriptor()); slot->type = type; slots.push_back(slot); + _num_slots++; } auto* tuple_desc = pool->add(new MockTupleDescriptor()); tuple_desc->Slots = slots; tuple_desc_map.push_back(tuple_desc); + _tuple_desc_map.push_back(tuple_desc); } const std::vector& tuple_descriptors() const override { return tuple_desc_map; @@ -56,4 +61,23 @@ class MockRowDescriptor : public RowDescriptor { std::vector tuple_desc_map; }; +class MockDescriptorTbl : public DescriptorTbl { +public: + MockDescriptorTbl(std::vector types, ObjectPool* pool) { + std::vector slots; + for (auto type : types) { + auto* slot = pool->add(new MockSlotDescriptor()); + slot->type = type; + slots.push_back(slot); + } + auto* tuple_desc = pool->add(new MockTupleDescriptor()); + tuple_desc->Slots = slots; + tuple_descriptors.push_back(tuple_desc); + } + + MOCK_METHOD(std::vector, get_tuple_descs, (), (const)); + + std::vector tuple_descriptors; +}; + } // namespace doris \ No newline at end of file diff --git a/be/test/testutil/mock/mock_operators.h b/be/test/testutil/mock/mock_operators.h new file mode 100644 index 00000000000000..58b1a738926e8b --- /dev/null +++ b/be/test/testutil/mock/mock_operators.h @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once +#include + +#include "pipeline/exec/hashjoin_probe_operator.h" +#include "pipeline/exec/partitioned_hash_join_probe_operator.h" +#include "testutil/mock/mock_descriptors.h" + +namespace doris::pipeline { + +class MockChildOperator : public OperatorXBase { +public: + void set_block(vectorized::Block&& block) { _block = std::move(block); } + void set_eos() { _eos = true; } + Status get_block_after_projects(RuntimeState* state, vectorized::Block* block, + bool* eos) override { + block->swap(_block); + *eos = _eos; + return Status::OK(); + } + + Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override { + block->swap(_block); + *eos = _eos; + return Status::OK(); + } + Status setup_local_state(RuntimeState* state, LocalStateInfo& info) override { + return Status::OK(); + } + +private: + vectorized::Block _block; + bool _eos = false; +}; + +class MockSinkOperator final : public DataSinkOperatorXBase { +public: + Status sink(RuntimeState* state, vectorized::Block* block, bool eos) override { + return Status::OK(); + } + + Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) override { + return Status::OK(); + } + + std::shared_ptr create_shared_state() const override { return nullptr; } +}; +} // namespace doris::pipeline \ No newline at end of file