-
Notifications
You must be signed in to change notification settings - Fork 3.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Feature](runtime-filter) cancel query when runtime_filter's rpc failed #43627
base: master
Are you sure you want to change the base?
Conversation
run buildall |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clang-tidy made some suggestions
@@ -990,14 +990,14 @@ void IRuntimeFilter::insert_batch(const vectorized::ColumnPtr column, size_t sta | |||
_wrapper->insert_batch(column, start); | |||
} | |||
|
|||
Status IRuntimeFilter::publish(bool publish_local) { | |||
Status IRuntimeFilter::publish(RuntimeState* state, bool publish_local) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
warning: function 'publish' has cognitive complexity of 77 (threshold 50) [readability-function-cognitive-complexity]
Status IRuntimeFilter::publish(RuntimeState* state, bool publish_local) {
^
Additional context
be/src/exprs/runtime_filter.cpp:995: nesting level increased to 1
auto send_to_remote = [&](IRuntimeFilter* filter) {
^
be/src/exprs/runtime_filter.cpp:998: +2, including nesting penalty of 1, nesting level increased to 2
RETURN_IF_ERROR(_state->runtime_filter_mgr->get_merge_addr(&addr));
^
be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/exprs/runtime_filter.cpp:998: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(_state->runtime_filter_mgr->get_merge_addr(&addr));
^
be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/exprs/runtime_filter.cpp:1001: nesting level increased to 1
auto send_to_local = [&](std::shared_ptr<RuntimePredicateWrapper> wrapper) {
^
be/src/exprs/runtime_filter.cpp:1003: +2, including nesting penalty of 1, nesting level increased to 2
RETURN_IF_ERROR(_state->runtime_filter_mgr->get_consume_filters(_filter_id, filters));
^
be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/exprs/runtime_filter.cpp:1003: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(_state->runtime_filter_mgr->get_consume_filters(_filter_id, filters));
^
be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/exprs/runtime_filter.cpp:1013: nesting level increased to 1
auto do_local_merge = [&]() {
^
be/src/exprs/runtime_filter.cpp:1015: +2, including nesting penalty of 1, nesting level increased to 2
RETURN_IF_ERROR(_state->runtime_filter_mgr->get_local_merge_producer_filters(
^
be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/exprs/runtime_filter.cpp:1015: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(_state->runtime_filter_mgr->get_local_merge_producer_filters(
^
be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/exprs/runtime_filter.cpp:1018: +2, including nesting penalty of 1, nesting level increased to 2
RETURN_IF_ERROR(local_merge_filters->filters[0]->merge_from(_wrapper.get()));
^
be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/exprs/runtime_filter.cpp:1018: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(local_merge_filters->filters[0]->merge_from(_wrapper.get()));
^
be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/exprs/runtime_filter.cpp:1020: +2, including nesting penalty of 1, nesting level increased to 2
if (local_merge_filters->merge_time == 0) {
^
be/src/exprs/runtime_filter.cpp:1021: +3, including nesting penalty of 2, nesting level increased to 3
if (_has_local_target) {
^
be/src/exprs/runtime_filter.cpp:1022: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(send_to_local(local_merge_filters->filters[0]->_wrapper));
^
be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/exprs/runtime_filter.cpp:1022: +5, including nesting penalty of 4, nesting level increased to 5
RETURN_IF_ERROR(send_to_local(local_merge_filters->filters[0]->_wrapper));
^
be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/exprs/runtime_filter.cpp:1023: +1, nesting level increased to 3
} else {
^
be/src/exprs/runtime_filter.cpp:1024: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(send_to_remote(local_merge_filters->filters[0].get()));
^
be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/exprs/runtime_filter.cpp:1024: +5, including nesting penalty of 4, nesting level increased to 5
RETURN_IF_ERROR(send_to_remote(local_merge_filters->filters[0].get()));
^
be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/exprs/runtime_filter.cpp:1030: +1, including nesting penalty of 0, nesting level increased to 1
if (_need_local_merge && _has_local_target) {
^
be/src/exprs/runtime_filter.cpp:1030: +1
if (_need_local_merge && _has_local_target) {
^
be/src/exprs/runtime_filter.cpp:1031: +2, including nesting penalty of 1, nesting level increased to 2
RETURN_IF_ERROR(do_local_merge());
^
be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/exprs/runtime_filter.cpp:1031: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(do_local_merge());
^
be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/exprs/runtime_filter.cpp:1032: +1, nesting level increased to 1
} else if (_has_local_target) {
^
be/src/exprs/runtime_filter.cpp:1033: +2, including nesting penalty of 1, nesting level increased to 2
RETURN_IF_ERROR(send_to_local(_wrapper));
^
be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/exprs/runtime_filter.cpp:1033: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(send_to_local(_wrapper));
^
be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/exprs/runtime_filter.cpp:1034: +1, nesting level increased to 1
} else if (!publish_local) {
^
be/src/exprs/runtime_filter.cpp:1035: +2, including nesting penalty of 1, nesting level increased to 2
if (_is_broadcast_join || _state->be_exec_version < USE_NEW_SERDE) {
^
be/src/exprs/runtime_filter.cpp:1035: +1
if (_is_broadcast_join || _state->be_exec_version < USE_NEW_SERDE) {
^
be/src/exprs/runtime_filter.cpp:1036: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(send_to_remote(this));
^
be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/exprs/runtime_filter.cpp:1036: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(send_to_remote(this));
^
be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/exprs/runtime_filter.cpp:1037: +1, nesting level increased to 2
} else {
^
be/src/exprs/runtime_filter.cpp:1038: +3, including nesting penalty of 2, nesting level increased to 3
RETURN_IF_ERROR(do_local_merge());
^
be/src/common/status.h:632: expanded from macro 'RETURN_IF_ERROR'
do { \
^
be/src/exprs/runtime_filter.cpp:1038: +4, including nesting penalty of 3, nesting level increased to 4
RETURN_IF_ERROR(do_local_merge());
^
be/src/common/status.h:634: expanded from macro 'RETURN_IF_ERROR'
if (UNLIKELY(!_status_.ok())) { \
^
be/src/exprs/runtime_filter.cpp:1040: +1, nesting level increased to 1
} else {
^
@@ -377,7 +379,8 @@ Status RuntimeFilterMgr::sync_filter_size(const PSyncFilterSizeRequest* request) | |||
} | |||
|
|||
// merge data | |||
Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* request, | |||
Status RuntimeFilterMergeControllerEntity::merge(std::weak_ptr<QueryContext> query_ctx, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
warning: function 'merge' exceeds recommended size/complexity thresholds [readability-function-size]
Status RuntimeFilterMergeControllerEntity::merge(std::weak_ptr<QueryContext> query_ctx,
^
Additional context
be/src/runtime/runtime_filter_mgr.cpp:381: 118 lines including whitespace and comments (threshold 80)
Status RuntimeFilterMergeControllerEntity::merge(std::weak_ptr<QueryContext> query_ctx,
^
@@ -20,7 +20,9 @@ | |||
#include <google/protobuf/stubs/common.h> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
warning: 'google/protobuf/stubs/common.h' file not found [clang-diagnostic-error]
#include <google/protobuf/stubs/common.h>
^
run buildall |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clang-tidy made some suggestions
@@ -377,7 +380,8 @@ Status RuntimeFilterMgr::sync_filter_size(const PSyncFilterSizeRequest* request) | |||
} | |||
|
|||
// merge data | |||
Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* request, | |||
Status RuntimeFilterMergeControllerEntity::merge(std::weak_ptr<QueryContext> query_ctx, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
warning: function 'merge' exceeds recommended size/complexity thresholds [readability-function-size]
Status RuntimeFilterMergeControllerEntity::merge(std::weak_ptr<QueryContext> query_ctx,
^
Additional context
be/src/runtime/runtime_filter_mgr.cpp:382: 119 lines including whitespace and comments (threshold 80)
Status RuntimeFilterMergeControllerEntity::merge(std::weak_ptr<QueryContext> query_ctx,
^
run beut |
What problem does this PR solve?
cancel query when runtime_filter's rpc failed
Problem Summary:
Release note
None
Check List (For Author)
Test
Behavior changed:
Does this need documentation?
Check List (For Reviewer who merge this PR)