Skip to content

Commit

Permalink
[ut](spill) add UT for partitioned hash join (apache#48432)
Browse files Browse the repository at this point in the history
  • Loading branch information
mrhhsg authored Mar 6, 2025
1 parent 6fbe9d4 commit df5d8c1
Show file tree
Hide file tree
Showing 16 changed files with 2,334 additions and 20 deletions.
9 changes: 5 additions & 4 deletions be/src/pipeline/exec/hashjoin_build_sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#pragma once

#include "common/be_mock_util.h"
#include "exprs/runtime_filter_slots.h"
#include "join_build_sink_operator.h"
#include "operator.h"
Expand All @@ -25,7 +26,7 @@ namespace doris::pipeline {
#include "common/compile_check_begin.h"
class HashJoinBuildSinkOperatorX;

class HashJoinBuildSinkLocalState final
class HashJoinBuildSinkLocalState MOCK_REMOVE(final)
: public JoinBuildSinkLocalState<HashJoinSharedState, HashJoinBuildSinkLocalState> {
public:
ENABLE_FACTORY_CREATOR(HashJoinBuildSinkLocalState);
Expand Down Expand Up @@ -56,7 +57,7 @@ class HashJoinBuildSinkLocalState final

Status disable_runtime_filters(RuntimeState* state);

[[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state, bool eos);
[[nodiscard]] MOCK_FUNCTION size_t get_reserve_mem_size(RuntimeState* state, bool eos);

protected:
Status _hash_table_init(RuntimeState* state);
Expand Down Expand Up @@ -109,7 +110,7 @@ class HashJoinBuildSinkLocalState final
RuntimeProfile::Counter* _runtime_filter_init_timer = nullptr;
};

class HashJoinBuildSinkOperatorX final
class HashJoinBuildSinkOperatorX MOCK_REMOVE(final)
: public JoinBuildSinkOperatorX<HashJoinBuildSinkLocalState> {
public:
HashJoinBuildSinkOperatorX(ObjectPool* pool, int operator_id, int dest_id,
Expand All @@ -129,7 +130,7 @@ class HashJoinBuildSinkOperatorX final

[[nodiscard]] size_t get_memory_usage(RuntimeState* state) const;

std::string get_memory_usage_debug_str(RuntimeState* state) const;
MOCK_FUNCTION std::string get_memory_usage_debug_str(RuntimeState* state) const;

bool should_dry_run(RuntimeState* state) override {
return _is_broadcast_join && !state->get_sink_local_state()
Expand Down
6 changes: 4 additions & 2 deletions be/src/pipeline/exec/hashjoin_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <stdint.h>

#include "common/be_mock_util.h"
#include "common/status.h"
#include "operator.h"
#include "pipeline/exec/join_probe_operator.h"
Expand All @@ -43,7 +44,7 @@ using HashTableCtxVariants =
ProcessHashTableProbe<TJoinOp::NULL_AWARE_LEFT_SEMI_JOIN>>;

class HashJoinProbeOperatorX;
class HashJoinProbeLocalState final
class HashJoinProbeLocalState MOCK_REMOVE(final)
: public JoinProbeLocalState<HashJoinSharedState, HashJoinProbeLocalState> {
public:
using Parent = HashJoinProbeOperatorX;
Expand Down Expand Up @@ -124,7 +125,8 @@ class HashJoinProbeLocalState final
RuntimeProfile::Counter* _non_equal_join_conjuncts_timer = nullptr;
};

class HashJoinProbeOperatorX final : public JoinProbeOperatorX<HashJoinProbeLocalState> {
class HashJoinProbeOperatorX MOCK_REMOVE(final)
: public JoinProbeOperatorX<HashJoinProbeLocalState> {
public:
HashJoinProbeOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id,
const DescriptorTbl& descs);
Expand Down
8 changes: 6 additions & 2 deletions be/src/pipeline/exec/operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,6 @@ class PipelineXSpillLocalState : public PipelineXLocalState<SharedStateArg> {
Status init(RuntimeState* state, LocalStateInfo& info) override {
RETURN_IF_ERROR(PipelineXLocalState<SharedStateArg>::init(state, info));

_spill_total_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillTotalTime", 1);
init_spill_read_counters();

return Status::OK();
Expand Down Expand Up @@ -317,6 +316,8 @@ class PipelineXSpillLocalState : public PipelineXLocalState<SharedStateArg> {
}

void init_spill_read_counters() {
_spill_total_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillTotalTime", 1);

// Spill read counters
_spill_recover_time = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillRecoverTime", 1);

Expand Down Expand Up @@ -669,7 +670,11 @@ class PipelineXSpillSinkLocalState : public PipelineXSinkLocalState<SharedStateA

Status init(RuntimeState* state, LocalSinkStateInfo& info) override {
RETURN_IF_ERROR(Base::init(state, info));
init_spill_counters();
return Status::OK();
}

void init_spill_counters() {
_spill_total_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillTotalTime", 1);

_spill_write_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteTime", 1);
Expand All @@ -696,7 +701,6 @@ class PipelineXSpillSinkLocalState : public PipelineXSinkLocalState<SharedStateA
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillMaxRowsOfPartition", TUnit::UNIT, 1);
_spill_min_rows_of_partition =
ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillMinRowsOfPartition", TUnit::UNIT, 1);
return Status::OK();
}

std::vector<Dependency*> dependencies() const override {
Expand Down
10 changes: 7 additions & 3 deletions be/src/pipeline/exec/partitioned_hash_join_probe_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ Status PartitionedHashJoinProbeLocalState::init(RuntimeState* state, LocalStateI
"HashJoinProbeSpillDependency", true);
state->get_task()->add_spill_dependency(_spill_dependency.get());

init_counters();
return Status::OK();
}

void PartitionedHashJoinProbeLocalState::init_counters() {
_partition_timer = ADD_TIMER(profile(), "SpillPartitionTime");
_partition_shuffle_timer = ADD_TIMER(profile(), "SpillPartitionShuffleTime");
_spill_build_rows = ADD_COUNTER(profile(), "SpillBuildRows", TUnit::UNIT);
Expand All @@ -78,7 +83,6 @@ Status PartitionedHashJoinProbeLocalState::init(RuntimeState* state, LocalStateI
ADD_COUNTER_WITH_LEVEL(profile(), "ProbeBloksBytesInMem", TUnit::BYTES, 1);
_memory_usage_reserved =
ADD_COUNTER_WITH_LEVEL(profile(), "MemoryUsageReserved", TUnit::BYTES, 1);
return Status::OK();
}

#define UPDATE_COUNTER_FROM_INNER(name) \
Expand Down Expand Up @@ -417,7 +421,7 @@ Status PartitionedHashJoinProbeLocalState::recover_probe_blocks_from_disk(Runtim
st = spilled_stream->read_next_block_sync(&block, &eos);
if (!st.ok()) {
break;
} else {
} else if (!block.empty()) {
COUNTER_UPDATE(_recovery_probe_rows, block.rows());
COUNTER_UPDATE(_recovery_probe_blocks, 1);
read_size += block.allocated_bytes();
Expand Down Expand Up @@ -786,7 +790,7 @@ size_t PartitionedHashJoinProbeOperatorX::_revocable_mem_size(RuntimeState* stat
size_t PartitionedHashJoinProbeOperatorX::get_reserve_mem_size(RuntimeState* state) {
auto& local_state = get_local_state(state);
const auto need_to_spill = local_state._shared_state->need_to_spill;
if (!need_to_spill || !local_state._child_eos) {
if (!need_to_spill || local_state._child_eos) {
return Base::get_reserve_mem_size(state);
}

Expand Down
7 changes: 5 additions & 2 deletions be/src/pipeline/exec/partitioned_hash_join_probe_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <cstdint>
#include <memory>

#include "common/be_mock_util.h"
#include "common/status.h"
#include "operator.h"
#include "pipeline/dependency.h"
Expand All @@ -36,7 +37,7 @@ namespace pipeline {

class PartitionedHashJoinProbeOperatorX;

class PartitionedHashJoinProbeLocalState final
class PartitionedHashJoinProbeLocalState MOCK_REMOVE(final)
: public PipelineXSpillLocalState<PartitionedHashJoinSharedState> {
public:
using Parent = PartitionedHashJoinProbeOperatorX;
Expand Down Expand Up @@ -65,7 +66,9 @@ class PartitionedHashJoinProbeLocalState final

std::string debug_string(int indentation_level = 0) const override;

void update_profile_from_inner();
MOCK_FUNCTION void update_profile_from_inner();

void init_counters();

friend class PartitionedHashJoinProbeOperatorX;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ Status PartitionedHashJoinSinkLocalState::_revoke_unpartitioned_block(
-(inner_sink_state->_hash_table_memory_usage->value() +
inner_sink_state->_build_arena_memory_usage->value()));
}
auto row_desc = p._child->row_desc();
const auto& row_desc = p._child->row_desc();
const auto num_slots = row_desc.num_slots();
vectorized::Block build_block;
int64_t block_old_mem = 0;
Expand Down
6 changes: 5 additions & 1 deletion be/src/pipeline/exec/partitioned_hash_join_sink_operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

#include <atomic>

#include "common/be_mock_util.h"
#include "common/status.h"
#include "operator.h"
#include "pipeline/exec/hashjoin_build_sink.h"
Expand Down Expand Up @@ -51,7 +52,7 @@ class PartitionedHashJoinSinkLocalState
size_t revocable_mem_size(RuntimeState* state) const;
[[nodiscard]] size_t get_reserve_mem_size(RuntimeState* state, bool eos);
void update_memory_usage();
void update_profile_from_inner();
MOCK_FUNCTION void update_profile_from_inner();

Dependency* finishdependency() override;

Expand Down Expand Up @@ -143,6 +144,9 @@ class PartitionedHashJoinSinkOperatorX

private:
friend class PartitionedHashJoinSinkLocalState;
#ifdef BE_TEST
friend class PartitionedHashJoinSinkOperatorTest;
#endif

const TJoinDistributionType::type _join_distribution;

Expand Down
11 changes: 7 additions & 4 deletions be/src/pipeline/exec/spill_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,16 @@ class SpillRunnable : public Runnable {
return;
}

Defer set_ready_defer([&] {
if (_spill_dependency) {
_spill_dependency->set_ready();
}
});

if (_state->is_cancelled()) {
return;
}

auto status = _spill_exec_func();
if (!status.ok()) {
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(_state->query_id(), status);
Expand All @@ -153,10 +160,6 @@ class SpillRunnable : public Runnable {
ExecEnv::GetInstance()->fragment_mgr()->cancel_query(_state->query_id(), status2);
}
}

if (_spill_dependency) {
_spill_dependency->set_ready();
}
}

protected:
Expand Down
3 changes: 2 additions & 1 deletion be/src/runtime/fragment_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <unordered_map>
#include <vector>

#include "common/be_mock_util.h"
#include "common/status.h"
#include "gutil/ref_counted.h"
#include "http/rest_monitor_iface.h"
Expand Down Expand Up @@ -140,7 +141,7 @@ class FragmentMgr : public RestMonitorIface {
std::shared_ptr<pipeline::PipelineFragmentContext>&&);

// Can be used in both version.
void cancel_query(const TUniqueId query_id, const Status reason);
MOCK_FUNCTION void cancel_query(const TUniqueId query_id, const Status reason);

void cancel_worker();

Expand Down
Loading

0 comments on commit df5d8c1

Please sign in to comment.