Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature](runtime-filter) cancel query when runtime_filter's rpc failed #43627

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,7 @@ class [[nodiscard]] Status {
ERROR_CTOR_NOSTACK(NeedSendAgain, NEED_SEND_AGAIN)
ERROR_CTOR_NOSTACK(CgroupError, CGROUP_ERROR)
ERROR_CTOR_NOSTACK(ObtainLockFailed, OBTAIN_LOCK_FAILED)
ERROR_CTOR_NOSTACK(NetworkError, NETWORK_ERROR)
#undef ERROR_CTOR

template <int code>
Expand Down
23 changes: 15 additions & 8 deletions be/src/exprs/runtime_filter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -990,14 +990,14 @@ void IRuntimeFilter::insert_batch(const vectorized::ColumnPtr column, size_t sta
_wrapper->insert_batch(column, start);
}

Status IRuntimeFilter::publish(bool publish_local) {
Status IRuntimeFilter::publish(RuntimeState* state, bool publish_local) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: function 'publish' has cognitive complexity of 77 (threshold 50) [readability-function-cognitive-complexity]

Status IRuntimeFilter::publish(RuntimeState* state, bool publish_local) {
                       ^
Additional context

be/src/exprs/runtime_filter.cpp:995: nesting level increased to 1

    auto send_to_remote = [&](IRuntimeFilter* filter) {
                          ^

be/src/exprs/runtime_filter.cpp:998: +2, including nesting penalty of 1, nesting level increased to 2

        RETURN_IF_ERROR(_state->runtime_filter_mgr->get_merge_addr(&addr));
        ^

be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/exprs/runtime_filter.cpp:998: +3, including nesting penalty of 2, nesting level increased to 3

        RETURN_IF_ERROR(_state->runtime_filter_mgr->get_merge_addr(&addr));
        ^

be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/exprs/runtime_filter.cpp:1001: nesting level increased to 1

    auto send_to_local = [&](std::shared_ptr<RuntimePredicateWrapper> wrapper) {
                         ^

be/src/exprs/runtime_filter.cpp:1003: +2, including nesting penalty of 1, nesting level increased to 2

        RETURN_IF_ERROR(_state->runtime_filter_mgr->get_consume_filters(_filter_id, filters));
        ^

be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/exprs/runtime_filter.cpp:1003: +3, including nesting penalty of 2, nesting level increased to 3

        RETURN_IF_ERROR(_state->runtime_filter_mgr->get_consume_filters(_filter_id, filters));
        ^

be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/exprs/runtime_filter.cpp:1013: nesting level increased to 1

    auto do_local_merge = [&]() {
                          ^

be/src/exprs/runtime_filter.cpp:1015: +2, including nesting penalty of 1, nesting level increased to 2

        RETURN_IF_ERROR(_state->runtime_filter_mgr->get_local_merge_producer_filters(
        ^

be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/exprs/runtime_filter.cpp:1015: +3, including nesting penalty of 2, nesting level increased to 3

        RETURN_IF_ERROR(_state->runtime_filter_mgr->get_local_merge_producer_filters(
        ^

be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/exprs/runtime_filter.cpp:1018: +2, including nesting penalty of 1, nesting level increased to 2

        RETURN_IF_ERROR(local_merge_filters->filters[0]->merge_from(_wrapper.get()));
        ^

be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/exprs/runtime_filter.cpp:1018: +3, including nesting penalty of 2, nesting level increased to 3

        RETURN_IF_ERROR(local_merge_filters->filters[0]->merge_from(_wrapper.get()));
        ^

be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/exprs/runtime_filter.cpp:1020: +2, including nesting penalty of 1, nesting level increased to 2

        if (local_merge_filters->merge_time == 0) {
        ^

be/src/exprs/runtime_filter.cpp:1021: +3, including nesting penalty of 2, nesting level increased to 3

            if (_has_local_target) {
            ^

be/src/exprs/runtime_filter.cpp:1022: +4, including nesting penalty of 3, nesting level increased to 4

                RETURN_IF_ERROR(send_to_local(local_merge_filters->filters[0]->_wrapper));
                ^

be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/exprs/runtime_filter.cpp:1022: +5, including nesting penalty of 4, nesting level increased to 5

                RETURN_IF_ERROR(send_to_local(local_merge_filters->filters[0]->_wrapper));
                ^

be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/exprs/runtime_filter.cpp:1023: +1, nesting level increased to 3

            } else {
              ^

be/src/exprs/runtime_filter.cpp:1024: +4, including nesting penalty of 3, nesting level increased to 4

                RETURN_IF_ERROR(send_to_remote(local_merge_filters->filters[0].get()));
                ^

be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/exprs/runtime_filter.cpp:1024: +5, including nesting penalty of 4, nesting level increased to 5

                RETURN_IF_ERROR(send_to_remote(local_merge_filters->filters[0].get()));
                ^

be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/exprs/runtime_filter.cpp:1030: +1, including nesting penalty of 0, nesting level increased to 1

    if (_need_local_merge && _has_local_target) {
    ^

be/src/exprs/runtime_filter.cpp:1030: +1

    if (_need_local_merge && _has_local_target) {
                          ^

be/src/exprs/runtime_filter.cpp:1031: +2, including nesting penalty of 1, nesting level increased to 2

        RETURN_IF_ERROR(do_local_merge());
        ^

be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/exprs/runtime_filter.cpp:1031: +3, including nesting penalty of 2, nesting level increased to 3

        RETURN_IF_ERROR(do_local_merge());
        ^

be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/exprs/runtime_filter.cpp:1032: +1, nesting level increased to 1

    } else if (_has_local_target) {
           ^

be/src/exprs/runtime_filter.cpp:1033: +2, including nesting penalty of 1, nesting level increased to 2

        RETURN_IF_ERROR(send_to_local(_wrapper));
        ^

be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/exprs/runtime_filter.cpp:1033: +3, including nesting penalty of 2, nesting level increased to 3

        RETURN_IF_ERROR(send_to_local(_wrapper));
        ^

be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/exprs/runtime_filter.cpp:1034: +1, nesting level increased to 1

    } else if (!publish_local) {
           ^

be/src/exprs/runtime_filter.cpp:1035: +2, including nesting penalty of 1, nesting level increased to 2

        if (_is_broadcast_join || _state->be_exec_version < USE_NEW_SERDE) {
        ^

be/src/exprs/runtime_filter.cpp:1035: +1

        if (_is_broadcast_join || _state->be_exec_version < USE_NEW_SERDE) {
                               ^

be/src/exprs/runtime_filter.cpp:1036: +3, including nesting penalty of 2, nesting level increased to 3

            RETURN_IF_ERROR(send_to_remote(this));
            ^

be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/exprs/runtime_filter.cpp:1036: +4, including nesting penalty of 3, nesting level increased to 4

            RETURN_IF_ERROR(send_to_remote(this));
            ^

be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/exprs/runtime_filter.cpp:1037: +1, nesting level increased to 2

        } else {
          ^

be/src/exprs/runtime_filter.cpp:1038: +3, including nesting penalty of 2, nesting level increased to 3

            RETURN_IF_ERROR(do_local_merge());
            ^

be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'

    do {                                \
    ^

be/src/exprs/runtime_filter.cpp:1038: +4, including nesting penalty of 3, nesting level increased to 4

            RETURN_IF_ERROR(do_local_merge());
            ^

be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'

        if (UNLIKELY(!_status_.ok())) { \
        ^

be/src/exprs/runtime_filter.cpp:1040: +1, nesting level increased to 1

    } else {
      ^

DCHECK(is_producer());

auto send_to_remote = [&](IRuntimeFilter* filter) {
TNetworkAddress addr;
DCHECK(_state != nullptr);
RETURN_IF_ERROR(_state->runtime_filter_mgr->get_merge_addr(&addr));
return filter->push_to_remote(&addr);
return filter->push_to_remote(state, &addr);
};
auto send_to_local = [&](std::shared_ptr<RuntimePredicateWrapper> wrapper) {
std::vector<std::shared_ptr<IRuntimeFilter>> filters;
Expand Down Expand Up @@ -1088,8 +1088,10 @@ class SyncSizeClosure : public AutoReleaseClosure<PSendFilterSizeRequest,
SyncSizeClosure(std::shared_ptr<PSendFilterSizeRequest> req,
std::shared_ptr<DummyBrpcCallback<PSendFilterSizeResponse>> callback,
std::shared_ptr<pipeline::Dependency> dependency,
RuntimeFilterContextSPtr rf_context)
: Base(req, callback), _dependency(std::move(dependency)), _rf_context(rf_context) {}
RuntimeFilterContextSPtr rf_context, std::weak_ptr<QueryContext> context)
: Base(req, callback, context),
_dependency(std::move(dependency)),
_rf_context(rf_context) {}
};

Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filter_size) {
Expand Down Expand Up @@ -1133,8 +1135,10 @@ Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filt
auto callback = DummyBrpcCallback<PSendFilterSizeResponse>::create_shared();
// IRuntimeFilter maybe deconstructed before the rpc finished, so that could not use
// a raw pointer in closure. Has to use the context's shared ptr.
auto closure =
SyncSizeClosure::create_unique(request, callback, _dependency, _wrapper->_context);
auto closure = SyncSizeClosure::create_unique(
request, callback, _dependency, _wrapper->_context,
state->query_options().ignore_runtime_filter_error ? std::weak_ptr<QueryContext> {}
: state->get_query_ctx_weak());
auto* pquery_id = request->mutable_query_id();
pquery_id->set_hi(_state->query_id.hi());
pquery_id->set_lo(_state->query_id.lo());
Expand All @@ -1157,7 +1161,7 @@ Status IRuntimeFilter::send_filter_size(RuntimeState* state, uint64_t local_filt
return Status::OK();
}

Status IRuntimeFilter::push_to_remote(const TNetworkAddress* addr) {
Status IRuntimeFilter::push_to_remote(RuntimeState* state, const TNetworkAddress* addr) {
DCHECK(is_producer());
std::shared_ptr<PBackendService_Stub> stub(
_state->exec_env->brpc_internal_client_cache()->get_client(*addr));
Expand All @@ -1170,7 +1174,10 @@ Status IRuntimeFilter::push_to_remote(const TNetworkAddress* addr) {
auto merge_filter_callback = DummyBrpcCallback<PMergeFilterResponse>::create_shared();
auto merge_filter_closure =
AutoReleaseClosure<PMergeFilterRequest, DummyBrpcCallback<PMergeFilterResponse>>::
create_unique(merge_filter_request, merge_filter_callback);
create_unique(merge_filter_request, merge_filter_callback,
state->query_options().ignore_runtime_filter_error
? std::weak_ptr<QueryContext> {}
: state->get_query_ctx_weak());
void* data = nullptr;
int len = 0;

Expand Down
4 changes: 2 additions & 2 deletions be/src/exprs/runtime_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ class IRuntimeFilter {

// publish filter
// push filter to remote node or push down it to scan_node
Status publish(bool publish_local = false);
Status publish(RuntimeState* state, bool publish_local = false);

Status send_filter_size(RuntimeState* state, uint64_t local_filter_size);

Expand Down Expand Up @@ -293,7 +293,7 @@ class IRuntimeFilter {
bool need_sync_filter_size();

// async push runtimefilter to remote node
Status push_to_remote(const TNetworkAddress* addr);
Status push_to_remote(RuntimeState* state, const TNetworkAddress* addr);

void init_profile(RuntimeProfile* parent_profile);

Expand Down
4 changes: 2 additions & 2 deletions be/src/exprs/runtime_filter_slots.h
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,10 @@ class VRuntimeFilterSlots {
}

// publish runtime filter
Status publish(bool publish_local) {
Status publish(RuntimeState* state, bool publish_local) {
for (auto& pair : _runtime_filters_map) {
for (auto& filter : pair.second) {
RETURN_IF_ERROR(filter->publish(publish_local));
RETURN_IF_ERROR(filter->publish(state, publish_local));
}
}
return Status::OK();
Expand Down
4 changes: 2 additions & 2 deletions be/src/exprs/runtime_filter_slots_cross.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,9 @@ class VRuntimeFilterSlotsCross {
return Status::OK();
}

Status publish() {
Status publish(RuntimeState* state) {
for (auto filter : _runtime_filters) {
RETURN_IF_ERROR(filter->publish());
RETURN_IF_ERROR(filter->publish(state));
}
return Status::OK();
}
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/hashjoin_build_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ Status HashJoinBuildSinkLocalState::close(RuntimeState* state, Status exec_statu
}
}
SCOPED_TIMER(_publish_runtime_filter_timer);
RETURN_IF_ERROR(_runtime_filter_slots->publish(!_should_build_hash_table));
RETURN_IF_ERROR(_runtime_filter_slots->publish(state, !_should_build_hash_table));
return Base::close(state, exec_status);
}

Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/nested_loop_join_build_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ struct RuntimeFilterBuild {
}
{
SCOPED_TIMER(_parent->publish_runtime_filter_timer());
RETURN_IF_ERROR(runtime_filter_slots.publish());
RETURN_IF_ERROR(runtime_filter_slots.publish(state));
}

return Status::OK();
Expand Down
4 changes: 2 additions & 2 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1203,7 +1203,7 @@ Status FragmentMgr::send_filter_size(const PSendFilterSizeRequest* request) {

std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller;
RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, &filter_controller));
auto merge_status = filter_controller->send_filter_size(request);
auto merge_status = filter_controller->send_filter_size(query_ctx, request);
return merge_status;
}

Expand Down Expand Up @@ -1245,7 +1245,7 @@ Status FragmentMgr::merge_filter(const PMergeFilterRequest* request,
SCOPED_ATTACH_TASK(query_ctx.get());
std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller;
RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, &filter_controller));
auto merge_status = filter_controller->merge(request, attach_data);
auto merge_status = filter_controller->merge(query_ctx, request, attach_data);
return merge_status;
}

Expand Down
6 changes: 6 additions & 0 deletions be/src/runtime/query_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,12 @@ class QueryContext {
return _query_options.__isset.fe_process_uuid ? _query_options.fe_process_uuid : 0;
}

bool ignore_runtime_filter_error() const {
return _query_options.__isset.ignore_runtime_filter_error
? _query_options.ignore_runtime_filter_error
: false;
}

// global runtime filter mgr, the runtime filter have remote target or
// need local merge should regist here. before publish() or push_to_remote()
// the runtime filter should do the local merge work
Expand Down
14 changes: 10 additions & 4 deletions be/src/runtime/runtime_filter_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,8 @@ Status RuntimeFilterMergeControllerEntity::init(UniqueId query_id,
return Status::OK();
}

Status RuntimeFilterMergeControllerEntity::send_filter_size(const PSendFilterSizeRequest* request) {
Status RuntimeFilterMergeControllerEntity::send_filter_size(std::weak_ptr<QueryContext> query_ctx,
const PSendFilterSizeRequest* request) {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
std::shared_ptr<RuntimeFilterCntlVal> cnt_val;

Expand All @@ -326,6 +327,8 @@ Status RuntimeFilterMergeControllerEntity::send_filter_size(const PSendFilterSiz

Status st = Status::OK();
if (cnt_val->source_addrs.size() == cnt_val->producer_size) {
auto ctx = query_ctx.lock()->ignore_runtime_filter_error() ? std::weak_ptr<QueryContext> {}
: query_ctx;
for (auto addr : cnt_val->source_addrs) {
std::shared_ptr<PBackendService_Stub> stub(
ExecEnv::GetInstance()->brpc_internal_client_cache()->get_client(addr));
Expand All @@ -339,7 +342,7 @@ Status RuntimeFilterMergeControllerEntity::send_filter_size(const PSendFilterSiz
auto closure = AutoReleaseClosure<PSyncFilterSizeRequest,
DummyBrpcCallback<PSyncFilterSizeResponse>>::
create_unique(std::make_shared<PSyncFilterSizeRequest>(),
DummyBrpcCallback<PSyncFilterSizeResponse>::create_shared());
DummyBrpcCallback<PSyncFilterSizeResponse>::create_shared(), ctx);

auto* pquery_id = closure->request_->mutable_query_id();
pquery_id->set_hi(_state->query_id.hi());
Expand Down Expand Up @@ -377,7 +380,8 @@ Status RuntimeFilterMgr::sync_filter_size(const PSyncFilterSizeRequest* request)
}

// merge data
Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* request,
Status RuntimeFilterMergeControllerEntity::merge(std::weak_ptr<QueryContext> query_ctx,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: function 'merge' exceeds recommended size/complexity thresholds [readability-function-size]

Status RuntimeFilterMergeControllerEntity::merge(std::weak_ptr<QueryContext> query_ctx,
                                           ^
Additional context

be/src/runtime/runtime_filter_mgr.cpp:381: 118 lines including whitespace and comments (threshold 80)

Status RuntimeFilterMergeControllerEntity::merge(std::weak_ptr<QueryContext> query_ctx,
                                           ^

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: function 'merge' exceeds recommended size/complexity thresholds [readability-function-size]

Status RuntimeFilterMergeControllerEntity::merge(std::weak_ptr<QueryContext> query_ctx,
                                           ^
Additional context

be/src/runtime/runtime_filter_mgr.cpp:382: 119 lines including whitespace and comments (threshold 80)

Status RuntimeFilterMergeControllerEntity::merge(std::weak_ptr<QueryContext> query_ctx,
                                           ^

const PMergeFilterRequest* request,
butil::IOBufAsZeroCopyInputStream* attach_data) {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker);
std::shared_ptr<RuntimeFilterCntlVal> cnt_val;
Expand Down Expand Up @@ -444,12 +448,14 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
has_attachment = true;
}

auto ctx = query_ctx.lock()->ignore_runtime_filter_error() ? std::weak_ptr<QueryContext> {}
: query_ctx;
std::vector<TRuntimeFilterTargetParamsV2>& targets = cnt_val->targetv2_info;
for (auto& target : targets) {
auto closure = AutoReleaseClosure<PPublishFilterRequestV2,
DummyBrpcCallback<PPublishFilterResponse>>::
create_unique(std::make_shared<PPublishFilterRequestV2>(apply_request),
DummyBrpcCallback<PPublishFilterResponse>::create_shared());
DummyBrpcCallback<PPublishFilterResponse>::create_shared(), ctx);

closure->request_->set_filter_id(request->filter_id());
closure->request_->set_merge_time(merge_time);
Expand Down
5 changes: 3 additions & 2 deletions be/src/runtime/runtime_filter_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,11 @@ class RuntimeFilterMergeControllerEntity {
const TQueryOptions& query_options);

// handle merge rpc
Status merge(const PMergeFilterRequest* request,
Status merge(std::weak_ptr<QueryContext> query_ctx, const PMergeFilterRequest* request,
butil::IOBufAsZeroCopyInputStream* attach_data);

Status send_filter_size(const PSendFilterSizeRequest* request);
Status send_filter_size(std::weak_ptr<QueryContext> query_ctx,
const PSendFilterSizeRequest* request);

UniqueId query_id() const { return _query_id; }

Expand Down
6 changes: 5 additions & 1 deletion be/src/runtime/runtime_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "pipeline/exec/operator.h"
#include "pipeline/pipeline_task.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "runtime/load_path_mgr.h"
#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/memory/thread_mem_tracker_mgr.h"
Expand Down Expand Up @@ -129,7 +130,6 @@ RuntimeState::RuntimeState(pipeline::PipelineFragmentContext*, const TUniqueId&
: _profile("Fragment " + print_id(instance_id)),
_load_channel_profile("<unnamed>"),
_obj_pool(new ObjectPool()),
_runtime_filter_mgr(nullptr),
_unreported_error_idx(0),
_query_id(query_id),
_fragment_id(fragment_id),
Expand Down Expand Up @@ -294,6 +294,10 @@ Status RuntimeState::init(const TUniqueId& fragment_instance_id, const TQueryOpt
return Status::OK();
}

std::weak_ptr<QueryContext> RuntimeState::get_query_ctx_weak() {
return _exec_env->fragment_mgr()->get_or_erase_query_ctx_with_lock(_query_ctx->query_id());
}

void RuntimeState::init_mem_trackers(const std::string& name, const TUniqueId& id) {
_query_mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::OTHER, fmt::format("{}#Id={}", name, print_id(id)));
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -449,6 +449,8 @@ class RuntimeState {

QueryContext* get_query_ctx() { return _query_ctx; }

std::weak_ptr<QueryContext> get_query_ctx_weak();

void set_query_mem_tracker(const std::shared_ptr<MemTrackerLimiter>& tracker) {
_query_mem_tracker = tracker;
}
Expand Down
24 changes: 19 additions & 5 deletions be/src/util/ref_count_closure.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
#include <google/protobuf/stubs/common.h>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warning: 'google/protobuf/stubs/common.h' file not found [clang-diagnostic-error]

#include <google/protobuf/stubs/common.h>
         ^


#include <atomic>
#include <utility>

#include "runtime/query_context.h"
#include "runtime/thread_context.h"
#include "service/brpc.h"
#include "util/ref_count_closure.h"
Expand Down Expand Up @@ -79,8 +81,9 @@ class AutoReleaseClosure : public google::protobuf::Closure {
ENABLE_FACTORY_CREATOR(AutoReleaseClosure);

public:
AutoReleaseClosure(std::shared_ptr<Request> req, std::shared_ptr<Callback> callback)
: request_(req), callback_(callback) {
AutoReleaseClosure(std::shared_ptr<Request> req, std::shared_ptr<Callback> callback,
std::weak_ptr<QueryContext> context = {})
: request_(req), callback_(callback), context_(std::move(context)) {
this->cntl_ = callback->cntl_;
this->response_ = callback->response_;
}
Expand Down Expand Up @@ -113,12 +116,22 @@ class AutoReleaseClosure : public google::protobuf::Closure {

protected:
virtual void _process_if_rpc_failed() {
LOG(WARNING) << "RPC meet failed: " << cntl_->ErrorText();
std::string error_msg = "RPC meet failed: " + cntl_->ErrorText();
if (auto ctx = context_.lock(); ctx) {
ctx->cancel(Status::NetworkError(error_msg));
} else {
LOG(WARNING) << error_msg;
}
}

virtual void _process_if_meet_error_status(const Status& status) {
// no need to log END_OF_FILE, reduce the unlessful log
if (!status.is<ErrorCode::END_OF_FILE>()) {
if (status.is<ErrorCode::END_OF_FILE>()) {
// no need to log END_OF_FILE, reduce the unlessful log
return;
}
if (auto ctx = context_.lock(); ctx) {
ctx->cancel(status);
} else {
LOG(WARNING) << "RPC meet error status: " << status;
}
}
Expand All @@ -136,6 +149,7 @@ class AutoReleaseClosure : public google::protobuf::Closure {
// Use a weak ptr to keep the callback, so that the callback can be deleted if the main
// thread is freed.
Weak callback_;
std::weak_ptr<QueryContext> context_;
};

} // namespace doris
Original file line number Diff line number Diff line change
Expand Up @@ -1163,6 +1163,10 @@ public enum IgnoreSplitType {
"Force the sort algorithm of SortNode to be specified" })
public String forceSortAlgorithm = "";

@VariableMgr.VarAttr(name = "ignore_runtime_filter_error", needForward = true, description = { "在rf遇到错误的时候忽略该rf",
"Ignore the rf when it encounters an error" })
public boolean ignoreRuntimeFilterError = false;

@VariableMgr.VarAttr(name = RUNTIME_FILTER_MODE, needForward = true)
private String runtimeFilterMode = "GLOBAL";

Expand Down Expand Up @@ -3947,6 +3951,7 @@ public TQueryOptions toThrift() {
tResult.setOrcTinyStripeThresholdBytes(orcTinyStripeThresholdBytes);
tResult.setOrcMaxMergeDistanceBytes(orcMaxMergeDistanceBytes);
tResult.setOrcOnceMaxReadBytes(orcOnceMaxReadBytes);
tResult.setIgnoreRuntimeFilterError(ignoreRuntimeFilterError);

return tResult;
}
Expand Down
2 changes: 2 additions & 0 deletions gensrc/thrift/PaloInternalService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,8 @@ struct TQueryOptions {
138: optional i64 orc_tiny_stripe_threshold_bytes = 8388608;
139: optional i64 orc_once_max_read_bytes = 8388608;
140: optional i64 orc_max_merge_distance_bytes = 1048576;

141: optional bool ignore_runtime_filter_error = false;
// For cloud, to control if the content would be written into file cache
// In write path, to control if the content would be written into file cache.
// In read path, read from file cache or remote storage when execute query.
Expand Down
Loading