From 1a3e50fcc0f4d79043da363b56a13bff50ae8293 Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Mon, 11 Nov 2024 18:20:23 +0800 Subject: [PATCH 1/2] cancel query when runtime_filter's rpc failed --- be/src/common/status.h | 1 + be/src/exprs/runtime_filter.cpp | 19 ++++++++------- be/src/exprs/runtime_filter.h | 4 ++-- be/src/exprs/runtime_filter_slots.h | 4 ++-- be/src/exprs/runtime_filter_slots_cross.h | 4 ++-- be/src/pipeline/exec/hashjoin_build_sink.cpp | 2 +- .../exec/nested_loop_join_build_operator.cpp | 2 +- be/src/runtime/fragment_mgr.cpp | 4 ++-- be/src/runtime/runtime_filter_mgr.cpp | 12 ++++++---- be/src/runtime/runtime_filter_mgr.h | 5 ++-- be/src/runtime/runtime_state.cpp | 6 ++++- be/src/runtime/runtime_state.h | 2 ++ be/src/util/ref_count_closure.h | 24 +++++++++++++++---- 13 files changed, 59 insertions(+), 30 deletions(-) diff --git a/be/src/common/status.h b/be/src/common/status.h index 91386a5887e63e..e3291688dd234f 100644 --- a/be/src/common/status.h +++ b/be/src/common/status.h @@ -489,6 +489,7 @@ class [[nodiscard]] Status { ERROR_CTOR_NOSTACK(NeedSendAgain, NEED_SEND_AGAIN) ERROR_CTOR_NOSTACK(CgroupError, CGROUP_ERROR) ERROR_CTOR_NOSTACK(ObtainLockFailed, OBTAIN_LOCK_FAILED) + ERROR_CTOR_NOSTACK(NetworkError, NETWORK_ERROR) #undef ERROR_CTOR template diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 85f1c535c7038b..2318a0acdd487d 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -990,14 +990,14 @@ void IRuntimeFilter::insert_batch(const vectorized::ColumnPtr column, size_t sta _wrapper->insert_batch(column, start); } -Status IRuntimeFilter::publish(bool publish_local) { +Status IRuntimeFilter::publish(RuntimeState* state, bool publish_local) { DCHECK(is_producer()); auto send_to_remote = [&](IRuntimeFilter* filter) { TNetworkAddress addr; DCHECK(_state != nullptr); RETURN_IF_ERROR(_state->runtime_filter_mgr->get_merge_addr(&addr)); - return filter->push_to_remote(&addr); + return filter->push_to_remote(state, &addr); }; auto send_to_local = [&](std::shared_ptr wrapper) { std::vector> filters; @@ -1088,8 +1088,10 @@ class SyncSizeClosure : public AutoReleaseClosure req, std::shared_ptr> callback, std::shared_ptr dependency, - RuntimeFilterContextSPtr rf_context) - : Base(req, callback), _dependency(std::move(dependency)), _rf_context(rf_context) {} + RuntimeFilterContextSPtr rf_context, std::weak_ptr context) + : Base(req, callback, context), + _dependency(std::move(dependency)), + _rf_context(rf_context) {} }; Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filter_size) { @@ -1133,8 +1135,8 @@ Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filt auto callback = DummyBrpcCallback::create_shared(); // IRuntimeFilter maybe deconstructed before the rpc finished, so that could not use // a raw pointer in closure. Has to use the context's shared ptr. - auto closure = - SyncSizeClosure::create_unique(request, callback, _dependency, _wrapper->_context); + auto closure = SyncSizeClosure::create_unique(request, callback, _dependency, + _wrapper->_context, state->get_query_ctx_weak()); auto* pquery_id = request->mutable_query_id(); pquery_id->set_hi(_state->query_id.hi()); pquery_id->set_lo(_state->query_id.lo()); @@ -1157,7 +1159,7 @@ Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filt return Status::OK(); } -Status IRuntimeFilter::push_to_remote(const TNetworkAddress* addr) { +Status IRuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress* addr) { DCHECK(is_producer()); std::shared_ptr stub( _state->exec_env->brpc_internal_client_cache()->get_client(*addr)); @@ -1170,7 +1172,8 @@ Status IRuntimeFilter::push_to_remote(const TNetworkAddress* addr) { auto merge_filter_callback = DummyBrpcCallback::create_shared(); auto merge_filter_closure = AutoReleaseClosure>:: - create_unique(merge_filter_request, merge_filter_callback); + create_unique(merge_filter_request, merge_filter_callback, + state->get_query_ctx_weak()); void* data = nullptr; int len = 0; diff --git a/be/src/exprs/runtime_filter.h b/be/src/exprs/runtime_filter.h index f5a069d9e55f85..84a7f36c8a808c 100644 --- a/be/src/exprs/runtime_filter.h +++ b/be/src/exprs/runtime_filter.h @@ -225,7 +225,7 @@ class IRuntimeFilter { // publish filter // push filter to remote node or push down it to scan_node - Status publish(bool publish_local = false); + Status publish(RuntimeState* state, bool publish_local = false); Status send_filter_size(RuntimeState* state, uint64_t local_filter_size); @@ -293,7 +293,7 @@ class IRuntimeFilter { bool need_sync_filter_size(); // async push runtimefilter to remote node - Status push_to_remote(const TNetworkAddress* addr); + Status push_to_remote(RuntimeState* state, const TNetworkAddress* addr); void init_profile(RuntimeProfile* parent_profile); diff --git a/be/src/exprs/runtime_filter_slots.h b/be/src/exprs/runtime_filter_slots.h index 42c5f598633ad9..3c18735e4e82ce 100644 --- a/be/src/exprs/runtime_filter_slots.h +++ b/be/src/exprs/runtime_filter_slots.h @@ -149,10 +149,10 @@ class VRuntimeFilterSlots { } // publish runtime filter - Status publish(bool publish_local) { + Status publish(RuntimeState* state, bool publish_local) { for (auto& pair : _runtime_filters_map) { for (auto& filter : pair.second) { - RETURN_IF_ERROR(filter->publish(publish_local)); + RETURN_IF_ERROR(filter->publish(state, publish_local)); } } return Status::OK(); diff --git a/be/src/exprs/runtime_filter_slots_cross.h b/be/src/exprs/runtime_filter_slots_cross.h index 01ae21a75992de..a49f2928f842a9 100644 --- a/be/src/exprs/runtime_filter_slots_cross.h +++ b/be/src/exprs/runtime_filter_slots_cross.h @@ -72,9 +72,9 @@ class VRuntimeFilterSlotsCross { return Status::OK(); } - Status publish() { + Status publish(RuntimeState* state) { for (auto filter : _runtime_filters) { - RETURN_IF_ERROR(filter->publish()); + RETURN_IF_ERROR(filter->publish(state)); } return Status::OK(); } diff --git a/be/src/pipeline/exec/hashjoin_build_sink.cpp b/be/src/pipeline/exec/hashjoin_build_sink.cpp index 37de9ac93d839f..74db3a5c06caf0 100644 --- a/be/src/pipeline/exec/hashjoin_build_sink.cpp +++ b/be/src/pipeline/exec/hashjoin_build_sink.cpp @@ -157,7 +157,7 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu } } SCOPED_TIMER(_publish_runtime_filter_timer); - RETURN_IF_ERROR(_runtime_filter_slots->publish(!_should_build_hash_table)); + RETURN_IF_ERROR(_runtime_filter_slots->publish(state, !_should_build_hash_table)); return Base::close(state, exec_status); } diff --git a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp index 83b378e792c3fa..41cd8068dd771e 100644 --- a/be/src/pipeline/exec/nested_loop_join_build_operator.cpp +++ b/be/src/pipeline/exec/nested_loop_join_build_operator.cpp @@ -43,7 +43,7 @@ struct RuntimeFilterBuild { } { SCOPED_TIMER(_parent->publish_runtime_filter_timer()); - RETURN_IF_ERROR(runtime_filter_slots.publish()); + RETURN_IF_ERROR(runtime_filter_slots.publish(state)); } return Status::OK(); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 18aacb452a6477..cba7468bf15c3b 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -1203,7 +1203,7 @@ Status FragmentMgr::send_filter_size(const PSendFilterSizeRequest* request) { std::shared_ptr filter_controller; RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, &filter_controller)); - auto merge_status = filter_controller->send_filter_size(request); + auto merge_status = filter_controller->send_filter_size(query_ctx, request); return merge_status; } @@ -1245,7 +1245,7 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request, SCOPED_ATTACH_TASK(query_ctx.get()); std::shared_ptr filter_controller; RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, &filter_controller)); - auto merge_status = filter_controller->merge(request, attach_data); + auto merge_status = filter_controller->merge(query_ctx, request, attach_data); return merge_status; } diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index 1a238787207b17..ac8af8c7f80a07 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -305,7 +305,8 @@ Status RuntimeFilterMergeControllerEntity::init(UniqueId query_id, return Status::OK(); } -Status RuntimeFilterMergeControllerEntity::send_filter_size(const PSendFilterSizeRequest* request) { +Status RuntimeFilterMergeControllerEntity::send_filter_size(std::weak_ptr query_ctx, + const PSendFilterSizeRequest* request) { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); std::shared_ptr cnt_val; @@ -339,7 +340,8 @@ Status RuntimeFilterMergeControllerEntity::send_filter_size(const PSendFilterSiz auto closure = AutoReleaseClosure>:: create_unique(std::make_shared(), - DummyBrpcCallback::create_shared()); + DummyBrpcCallback::create_shared(), + query_ctx); auto* pquery_id = closure->request_->mutable_query_id(); pquery_id->set_hi(_state->query_id.hi()); @@ -377,7 +379,8 @@ Status RuntimeFilterMgr::sync_filter_size(const PSyncFilterSizeRequest* request) } // merge data -Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* request, +Status RuntimeFilterMergeControllerEntity::merge(std::weak_ptr query_ctx, + const PMergeFilterRequest* request, butil::IOBufAsZeroCopyInputStream* attach_data) { SCOPED_CONSUME_MEM_TRACKER(_mem_tracker); std::shared_ptr cnt_val; @@ -449,7 +452,8 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ auto closure = AutoReleaseClosure>:: create_unique(std::make_shared(apply_request), - DummyBrpcCallback::create_shared()); + DummyBrpcCallback::create_shared(), + query_ctx); closure->request_->set_filter_id(request->filter_id()); closure->request_->set_merge_time(merge_time); diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h index b0aea7568cff65..bac61d6248a88f 100644 --- a/be/src/runtime/runtime_filter_mgr.h +++ b/be/src/runtime/runtime_filter_mgr.h @@ -156,10 +156,11 @@ class RuntimeFilterMergeControllerEntity { const TQueryOptions& query_options); // handle merge rpc - Status merge(const PMergeFilterRequest* request, + Status merge(std::weak_ptr query_ctx, const PMergeFilterRequest* request, butil::IOBufAsZeroCopyInputStream* attach_data); - Status send_filter_size(const PSendFilterSizeRequest* request); + Status send_filter_size(std::weak_ptr query_ctx, + const PSendFilterSizeRequest* request); UniqueId query_id() const { return _query_id; } diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp index e3f9d075c8ffc2..38522f49dc3e13 100644 --- a/be/src/runtime/runtime_state.cpp +++ b/be/src/runtime/runtime_state.cpp @@ -40,6 +40,7 @@ #include "pipeline/exec/operator.h" #include "pipeline/pipeline_task.h" #include "runtime/exec_env.h" +#include "runtime/fragment_mgr.h" #include "runtime/load_path_mgr.h" #include "runtime/memory/mem_tracker_limiter.h" #include "runtime/memory/thread_mem_tracker_mgr.h" @@ -129,7 +130,6 @@ RuntimeState::RuntimeState(pipeline::PipelineFragmentContext*, const TUniqueId& : _profile("Fragment " + print_id(instance_id)), _load_channel_profile(""), _obj_pool(new ObjectPool()), - _runtime_filter_mgr(nullptr), _unreported_error_idx(0), _query_id(query_id), _fragment_id(fragment_id), @@ -294,6 +294,10 @@ Status RuntimeState::init(const TUniqueId& fragment_instance_id, const TQueryOpt return Status::OK(); } +std::weak_ptr RuntimeState::get_query_ctx_weak() { + return _exec_env->fragment_mgr()->get_or_erase_query_ctx_with_lock(_query_ctx->query_id()); +} + void RuntimeState::init_mem_trackers(const std::string& name, const TUniqueId& id) { _query_mem_tracker = MemTrackerLimiter::create_shared( MemTrackerLimiter::Type::OTHER, fmt::format("{}#Id={}", name, print_id(id))); diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index abc823bc25b291..e627688aafa17b 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -449,6 +449,8 @@ class RuntimeState { QueryContext* get_query_ctx() { return _query_ctx; } + std::weak_ptr get_query_ctx_weak(); + void set_query_mem_tracker(const std::shared_ptr& tracker) { _query_mem_tracker = tracker; } diff --git a/be/src/util/ref_count_closure.h b/be/src/util/ref_count_closure.h index 92772a82373fec..560aebb98ee15e 100644 --- a/be/src/util/ref_count_closure.h +++ b/be/src/util/ref_count_closure.h @@ -20,7 +20,9 @@ #include #include +#include +#include "runtime/query_context.h" #include "runtime/thread_context.h" #include "service/brpc.h" #include "util/ref_count_closure.h" @@ -79,8 +81,9 @@ class AutoReleaseClosure : public google::protobuf::Closure { ENABLE_FACTORY_CREATOR(AutoReleaseClosure); public: - AutoReleaseClosure(std::shared_ptr req, std::shared_ptr callback) - : request_(req), callback_(callback) { + AutoReleaseClosure(std::shared_ptr req, std::shared_ptr callback, + std::weak_ptr context = {}) + : request_(req), callback_(callback), context_(std::move(context)) { this->cntl_ = callback->cntl_; this->response_ = callback->response_; } @@ -113,12 +116,22 @@ class AutoReleaseClosure : public google::protobuf::Closure { protected: virtual void _process_if_rpc_failed() { - LOG(WARNING) << "RPC meet failed: " << cntl_->ErrorText(); + std::string error_msg = "RPC meet failed: " + cntl_->ErrorText(); + if (auto ctx = context_.lock(); ctx) { + ctx->cancel(Status::NetworkError(error_msg)); + } else { + LOG(WARNING) << error_msg; + } } virtual void _process_if_meet_error_status(const Status& status) { - // no need to log END_OF_FILE, reduce the unlessful log - if (!status.is()) { + if (status.is()) { + // no need to log END_OF_FILE, reduce the unlessful log + return; + } + if (auto ctx = context_.lock(); ctx) { + ctx->cancel(status); + } else { LOG(WARNING) << "RPC meet error status: " << status; } } @@ -136,6 +149,7 @@ class AutoReleaseClosure : public google::protobuf::Closure { // Use a weak ptr to keep the callback, so that the callback can be deleted if the main // thread is freed. Weak callback_; + std::weak_ptr context_; }; } // namespace doris From df9d56bc968d5ada4b2812c0e8f917cac1522d3d Mon Sep 17 00:00:00 2001 From: BiteTheDDDDt Date: Wed, 13 Nov 2024 15:38:47 +0800 Subject: [PATCH 2/2] update --- be/src/exprs/runtime_filter.cpp | 10 +++++++--- be/src/runtime/query_context.h | 6 ++++++ be/src/runtime/runtime_filter_mgr.cpp | 10 ++++++---- .../main/java/org/apache/doris/qe/SessionVariable.java | 5 +++++ gensrc/thrift/PaloInternalService.thrift | 2 ++ 5 files changed, 26 insertions(+), 7 deletions(-) diff --git a/be/src/exprs/runtime_filter.cpp b/be/src/exprs/runtime_filter.cpp index 2318a0acdd487d..d05bb6fa3cfc44 100644 --- a/be/src/exprs/runtime_filter.cpp +++ b/be/src/exprs/runtime_filter.cpp @@ -1135,8 +1135,10 @@ Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filt auto callback = DummyBrpcCallback::create_shared(); // IRuntimeFilter maybe deconstructed before the rpc finished, so that could not use // a raw pointer in closure. Has to use the context's shared ptr. - auto closure = SyncSizeClosure::create_unique(request, callback, _dependency, - _wrapper->_context, state->get_query_ctx_weak()); + auto closure = SyncSizeClosure::create_unique( + request, callback, _dependency, _wrapper->_context, + state->query_options().ignore_runtime_filter_error ? std::weak_ptr {} + : state->get_query_ctx_weak()); auto* pquery_id = request->mutable_query_id(); pquery_id->set_hi(_state->query_id.hi()); pquery_id->set_lo(_state->query_id.lo()); @@ -1173,7 +1175,9 @@ Status IRuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress auto merge_filter_closure = AutoReleaseClosure>:: create_unique(merge_filter_request, merge_filter_callback, - state->get_query_ctx_weak()); + state->query_options().ignore_runtime_filter_error + ? std::weak_ptr {} + : state->get_query_ctx_weak()); void* data = nullptr; int len = 0; diff --git a/be/src/runtime/query_context.h b/be/src/runtime/query_context.h index ef753ee62259b4..3852a8e7ce1cd6 100644 --- a/be/src/runtime/query_context.h +++ b/be/src/runtime/query_context.h @@ -164,6 +164,12 @@ class QueryContext { return _query_options.__isset.fe_process_uuid ? _query_options.fe_process_uuid : 0; } + bool ignore_runtime_filter_error() const { + return _query_options.__isset.ignore_runtime_filter_error + ? _query_options.ignore_runtime_filter_error + : false; + } + // global runtime filter mgr, the runtime filter have remote target or // need local merge should regist here. before publish() or push_to_remote() // the runtime filter should do the local merge work diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp index ac8af8c7f80a07..4b4f48801239e3 100644 --- a/be/src/runtime/runtime_filter_mgr.cpp +++ b/be/src/runtime/runtime_filter_mgr.cpp @@ -327,6 +327,8 @@ Status RuntimeFilterMergeControllerEntity::send_filter_size(std::weak_ptrsource_addrs.size() == cnt_val->producer_size) { + auto ctx = query_ctx.lock()->ignore_runtime_filter_error() ? std::weak_ptr {} + : query_ctx; for (auto addr : cnt_val->source_addrs) { std::shared_ptr stub( ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(addr)); @@ -340,8 +342,7 @@ Status RuntimeFilterMergeControllerEntity::send_filter_size(std::weak_ptr>:: create_unique(std::make_shared(), - DummyBrpcCallback::create_shared(), - query_ctx); + DummyBrpcCallback::create_shared(), ctx); auto* pquery_id = closure->request_->mutable_query_id(); pquery_id->set_hi(_state->query_id.hi()); @@ -447,13 +448,14 @@ Status RuntimeFilterMergeControllerEntity::merge(std::weak_ptr que has_attachment = true; } + auto ctx = query_ctx.lock()->ignore_runtime_filter_error() ? std::weak_ptr {} + : query_ctx; std::vector& targets = cnt_val->targetv2_info; for (auto& target : targets) { auto closure = AutoReleaseClosure>:: create_unique(std::make_shared(apply_request), - DummyBrpcCallback::create_shared(), - query_ctx); + DummyBrpcCallback::create_shared(), ctx); closure->request_->set_filter_id(request->filter_id()); closure->request_->set_merge_time(merge_time); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 1d7bd2e89d15fc..391e4cecb552b5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -1163,6 +1163,10 @@ public enum IgnoreSplitType { "Force the sort algorithm of SortNode to be specified" }) public String forceSortAlgorithm = ""; + @VariableMgr.VarAttr(name = "ignore_runtime_filter_error", needForward = true, description = { "在rf遇到错误的时候忽略该rf", + "Ignore the rf when it encounters an error" }) + public boolean ignoreRuntimeFilterError = false; + @VariableMgr.VarAttr(name = RUNTIME_FILTER_MODE, needForward = true) private String runtimeFilterMode = "GLOBAL"; @@ -3947,6 +3951,7 @@ public TQueryOptions toThrift() { tResult.setOrcTinyStripeThresholdBytes(orcTinyStripeThresholdBytes); tResult.setOrcMaxMergeDistanceBytes(orcMaxMergeDistanceBytes); tResult.setOrcOnceMaxReadBytes(orcOnceMaxReadBytes); + tResult.setIgnoreRuntimeFilterError(ignoreRuntimeFilterError); return tResult; } diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 29fecc27539ff4..392aa8658df1d2 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -356,6 +356,8 @@ struct TQueryOptions { 138: optional i64 orc_tiny_stripe_threshold_bytes = 8388608; 139: optional i64 orc_once_max_read_bytes = 8388608; 140: optional i64 orc_max_merge_distance_bytes = 1048576; + + 141: optional bool ignore_runtime_filter_error = false; // For cloud, to control if the content would be written into file cache // In write path, to control if the content would be written into file cache. // In read path, read from file cache or remote storage when execute query.