From 00ec5875d213c7f97876d2a54482aad8201fa606 Mon Sep 17 00:00:00 2001 From: Kaijie Chen Date: Fri, 7 Mar 2025 22:45:22 +0800 Subject: [PATCH] [fix](load) add lock for runtime_state->tablet_commit_infos (#48709) Lock before read/write tablet_commit_infos vector, to fix use-after-free. --- be/src/io/fs/multi_table_pipe.cpp | 4 +-- be/src/runtime/fragment_mgr.cpp | 27 +++++++------------ be/src/runtime/runtime_state.h | 27 ++++++++++++++----- .../stream_load/stream_load_executor.cpp | 2 +- be/src/vec/sink/writer/vtablet_writer.cpp | 21 ++++++++------- be/src/vec/sink/writer/vtablet_writer_v2.cpp | 5 +--- 6 files changed, 45 insertions(+), 41 deletions(-) diff --git a/be/src/io/fs/multi_table_pipe.cpp b/be/src/io/fs/multi_table_pipe.cpp index b3af2531f15dd7..eb601c5d6f5a15 100644 --- a/be/src/io/fs/multi_table_pipe.cpp +++ b/be/src/io/fs/multi_table_pipe.cpp @@ -263,9 +263,9 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector para { std::lock_guard l(_tablet_commit_infos_lock); + auto commit_infos = state->tablet_commit_infos(); _tablet_commit_infos.insert(_tablet_commit_infos.end(), - state->tablet_commit_infos().begin(), - state->tablet_commit_infos().end()); + commit_infos.begin(), commit_infos.end()); } _number_total_rows += state->num_rows_load_total(); _number_loaded_rows += state->num_rows_load_success(); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 4d795df55f969c..6b9b02438a3a3c 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -492,35 +492,26 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) { } } } - if (!req.runtime_state->tablet_commit_infos().empty()) { + if (auto tci = req.runtime_state->tablet_commit_infos(); !tci.empty()) { params.__isset.commitInfos = true; - params.commitInfos.reserve(req.runtime_state->tablet_commit_infos().size()); - for (auto& info : req.runtime_state->tablet_commit_infos()) { - params.commitInfos.push_back(info); - } + params.commitInfos.insert(params.commitInfos.end(), tci.begin(), tci.end()); } else if (!req.runtime_states.empty()) { for (auto* rs : req.runtime_states) { - if (!rs->tablet_commit_infos().empty()) { + if (auto rs_tci = rs->tablet_commit_infos(); !rs_tci.empty()) { params.__isset.commitInfos = true; - params.commitInfos.insert(params.commitInfos.end(), - rs->tablet_commit_infos().begin(), - rs->tablet_commit_infos().end()); + params.commitInfos.insert(params.commitInfos.end(), rs_tci.begin(), rs_tci.end()); } } } - if (!req.runtime_state->error_tablet_infos().empty()) { + if (auto eti = req.runtime_state->error_tablet_infos(); !eti.empty()) { params.__isset.errorTabletInfos = true; - params.errorTabletInfos.reserve(req.runtime_state->error_tablet_infos().size()); - for (auto& info : req.runtime_state->error_tablet_infos()) { - params.errorTabletInfos.push_back(info); - } + params.errorTabletInfos.insert(params.errorTabletInfos.end(), eti.begin(), eti.end()); } else if (!req.runtime_states.empty()) { for (auto* rs : req.runtime_states) { - if (!rs->error_tablet_infos().empty()) { + if (auto rs_eti = rs->error_tablet_infos(); !rs_eti.empty()) { params.__isset.errorTabletInfos = true; - params.errorTabletInfos.insert(params.errorTabletInfos.end(), - rs->error_tablet_infos().begin(), - rs->error_tablet_infos().end()); + params.errorTabletInfos.insert(params.errorTabletInfos.end(), rs_eti.begin(), + rs_eti.end()); } } } diff --git a/be/src/runtime/runtime_state.h b/be/src/runtime/runtime_state.h index aa486d3b8b63fc..7d1f0bc9faf221 100644 --- a/be/src/runtime/runtime_state.h +++ b/be/src/runtime/runtime_state.h @@ -424,19 +424,33 @@ class RuntimeState { bool enable_page_cache() const; - const std::vector& tablet_commit_infos() const { + std::vector tablet_commit_infos() const { + std::lock_guard lock(_tablet_infos_mutex); return _tablet_commit_infos; } - std::vector& tablet_commit_infos() { return _tablet_commit_infos; } + void add_tablet_commit_infos(std::vector& commit_infos) { + std::lock_guard lock(_tablet_infos_mutex); + _tablet_commit_infos.insert(_tablet_commit_infos.end(), + std::make_move_iterator(commit_infos.begin()), + std::make_move_iterator(commit_infos.end())); + } - std::vector& hive_partition_updates() { return _hive_partition_updates; } + std::vector error_tablet_infos() const { + std::lock_guard lock(_tablet_infos_mutex); + return _error_tablet_infos; + } - std::vector& iceberg_commit_datas() { return _iceberg_commit_datas; } + void add_error_tablet_infos(std::vector& tablet_infos) { + std::lock_guard lock(_tablet_infos_mutex); + _error_tablet_infos.insert(_error_tablet_infos.end(), + std::make_move_iterator(tablet_infos.begin()), + std::make_move_iterator(tablet_infos.end())); + } - const std::vector& error_tablet_infos() const { return _error_tablet_infos; } + std::vector& hive_partition_updates() { return _hive_partition_updates; } - std::vector& error_tablet_infos() { return _error_tablet_infos; } + std::vector& iceberg_commit_datas() { return _iceberg_commit_datas; } // local runtime filter mgr, the runtime filter do not have remote target or // not need local merge should regist here. the instance exec finish, the local @@ -752,6 +766,7 @@ class RuntimeState { int64_t _error_row_number; std::string _error_log_file_path; std::unique_ptr _error_log_file; // error file path, absolute path + mutable std::mutex _tablet_infos_mutex; std::vector _tablet_commit_infos; std::vector _error_tablet_infos; int _max_operator_id = 0; diff --git a/be/src/runtime/stream_load/stream_load_executor.cpp b/be/src/runtime/stream_load/stream_load_executor.cpp index e14378bb1fddf4..48682a21677d5d 100644 --- a/be/src/runtime/stream_load/stream_load_executor.cpp +++ b/be/src/runtime/stream_load/stream_load_executor.cpp @@ -78,7 +78,7 @@ Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptrtxn_id = state->wal_id(); } ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id); - ctx->commit_infos = std::move(state->tablet_commit_infos()); + ctx->commit_infos = state->tablet_commit_infos(); ctx->number_total_rows = state->num_rows_load_total(); ctx->number_loaded_rows = state->num_rows_load_success(); ctx->number_filtered_rows = state->num_rows_load_filtered(); diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index e04ef16072672d..7f66308839c78b 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -195,15 +195,18 @@ Status IndexChannel::check_intolerable_failure() { } void IndexChannel::set_error_tablet_in_state(RuntimeState* state) { - std::vector& error_tablet_infos = state->error_tablet_infos(); + std::vector error_tablet_infos; - std::lock_guard l(_fail_lock); - for (const auto& it : _failed_channels_msgs) { - TErrorTabletInfo error_info; - error_info.__set_tabletId(it.first); - error_info.__set_msg(it.second); - error_tablet_infos.emplace_back(error_info); + { + std::lock_guard l(_fail_lock); + for (const auto& it : _failed_channels_msgs) { + TErrorTabletInfo error_info; + error_info.__set_tabletId(it.first); + error_info.__set_msg(it.second); + error_tablet_infos.emplace_back(error_info); + } } + state->add_error_tablet_infos(error_tablet_infos); } void IndexChannel::set_tablets_received_rows( @@ -968,9 +971,7 @@ Status VNodeChannel::close_wait(RuntimeState* state) { if (_add_batches_finished) { _close_check(); - state->tablet_commit_infos().insert(state->tablet_commit_infos().end(), - std::make_move_iterator(_tablet_commit_infos.begin()), - std::make_move_iterator(_tablet_commit_infos.end())); + _state->add_tablet_commit_infos(_tablet_commit_infos); _index_channel->set_error_tablet_in_state(state); _index_channel->set_tablets_received_rows(_tablets_received_rows, _node_id); diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index b3baf89867fc5c..8652127d88f768 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -682,10 +682,7 @@ Status VTabletWriterV2::close(Status exec_status) { std::vector tablet_commit_infos; RETURN_IF_ERROR( _create_commit_info(tablet_commit_infos, _load_stream_map, _num_replicas)); - _state->tablet_commit_infos().insert( - _state->tablet_commit_infos().end(), - std::make_move_iterator(tablet_commit_infos.begin()), - std::make_move_iterator(tablet_commit_infos.end())); + _state->add_tablet_commit_infos(tablet_commit_infos); } // _number_input_rows don't contain num_rows_load_filtered and num_rows_load_unselected in scan node