Skip to content

Commit

Permalink
[fix](load) add lock for runtime_state->tablet_commit_infos (#48709)
Browse files Browse the repository at this point in the history
Lock before read/write tablet_commit_infos vector, to fix
use-after-free.
  • Loading branch information
kaijchen authored Mar 7, 2025
1 parent 63a601f commit 00ec587
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 41 deletions.
4 changes: 2 additions & 2 deletions be/src/io/fs/multi_table_pipe.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -263,9 +263,9 @@ Status MultiTablePipe::exec_plans(ExecEnv* exec_env, std::vector<ExecParam> para

{
std::lock_guard<std::mutex> l(_tablet_commit_infos_lock);
auto commit_infos = state->tablet_commit_infos();
_tablet_commit_infos.insert(_tablet_commit_infos.end(),
state->tablet_commit_infos().begin(),
state->tablet_commit_infos().end());
commit_infos.begin(), commit_infos.end());
}
_number_total_rows += state->num_rows_load_total();
_number_loaded_rows += state->num_rows_load_success();
Expand Down
27 changes: 9 additions & 18 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -492,35 +492,26 @@ void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
}
}
}
if (!req.runtime_state->tablet_commit_infos().empty()) {
if (auto tci = req.runtime_state->tablet_commit_infos(); !tci.empty()) {
params.__isset.commitInfos = true;
params.commitInfos.reserve(req.runtime_state->tablet_commit_infos().size());
for (auto& info : req.runtime_state->tablet_commit_infos()) {
params.commitInfos.push_back(info);
}
params.commitInfos.insert(params.commitInfos.end(), tci.begin(), tci.end());
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
if (!rs->tablet_commit_infos().empty()) {
if (auto rs_tci = rs->tablet_commit_infos(); !rs_tci.empty()) {
params.__isset.commitInfos = true;
params.commitInfos.insert(params.commitInfos.end(),
rs->tablet_commit_infos().begin(),
rs->tablet_commit_infos().end());
params.commitInfos.insert(params.commitInfos.end(), rs_tci.begin(), rs_tci.end());
}
}
}
if (!req.runtime_state->error_tablet_infos().empty()) {
if (auto eti = req.runtime_state->error_tablet_infos(); !eti.empty()) {
params.__isset.errorTabletInfos = true;
params.errorTabletInfos.reserve(req.runtime_state->error_tablet_infos().size());
for (auto& info : req.runtime_state->error_tablet_infos()) {
params.errorTabletInfos.push_back(info);
}
params.errorTabletInfos.insert(params.errorTabletInfos.end(), eti.begin(), eti.end());
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
if (!rs->error_tablet_infos().empty()) {
if (auto rs_eti = rs->error_tablet_infos(); !rs_eti.empty()) {
params.__isset.errorTabletInfos = true;
params.errorTabletInfos.insert(params.errorTabletInfos.end(),
rs->error_tablet_infos().begin(),
rs->error_tablet_infos().end());
params.errorTabletInfos.insert(params.errorTabletInfos.end(), rs_eti.begin(),
rs_eti.end());
}
}
}
Expand Down
27 changes: 21 additions & 6 deletions be/src/runtime/runtime_state.h
Original file line number Diff line number Diff line change
Expand Up @@ -424,19 +424,33 @@ class RuntimeState {

bool enable_page_cache() const;

const std::vector<TTabletCommitInfo>& tablet_commit_infos() const {
std::vector<TTabletCommitInfo> tablet_commit_infos() const {
std::lock_guard<std::mutex> lock(_tablet_infos_mutex);
return _tablet_commit_infos;
}

std::vector<TTabletCommitInfo>& tablet_commit_infos() { return _tablet_commit_infos; }
void add_tablet_commit_infos(std::vector<TTabletCommitInfo>& commit_infos) {
std::lock_guard<std::mutex> lock(_tablet_infos_mutex);
_tablet_commit_infos.insert(_tablet_commit_infos.end(),
std::make_move_iterator(commit_infos.begin()),
std::make_move_iterator(commit_infos.end()));
}

std::vector<THivePartitionUpdate>& hive_partition_updates() { return _hive_partition_updates; }
std::vector<TErrorTabletInfo> error_tablet_infos() const {
std::lock_guard<std::mutex> lock(_tablet_infos_mutex);
return _error_tablet_infos;
}

std::vector<TIcebergCommitData>& iceberg_commit_datas() { return _iceberg_commit_datas; }
void add_error_tablet_infos(std::vector<TErrorTabletInfo>& tablet_infos) {
std::lock_guard<std::mutex> lock(_tablet_infos_mutex);
_error_tablet_infos.insert(_error_tablet_infos.end(),
std::make_move_iterator(tablet_infos.begin()),
std::make_move_iterator(tablet_infos.end()));
}

const std::vector<TErrorTabletInfo>& error_tablet_infos() const { return _error_tablet_infos; }
std::vector<THivePartitionUpdate>& hive_partition_updates() { return _hive_partition_updates; }

std::vector<TErrorTabletInfo>& error_tablet_infos() { return _error_tablet_infos; }
std::vector<TIcebergCommitData>& iceberg_commit_datas() { return _iceberg_commit_datas; }

// local runtime filter mgr, the runtime filter do not have remote target or
// not need local merge should regist here. the instance exec finish, the local
Expand Down Expand Up @@ -752,6 +766,7 @@ class RuntimeState {
int64_t _error_row_number;
std::string _error_log_file_path;
std::unique_ptr<std::ofstream> _error_log_file; // error file path, absolute path
mutable std::mutex _tablet_infos_mutex;
std::vector<TTabletCommitInfo> _tablet_commit_infos;
std::vector<TErrorTabletInfo> _error_tablet_infos;
int _max_operator_id = 0;
Expand Down
2 changes: 1 addition & 1 deletion be/src/runtime/stream_load/stream_load_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ Status StreamLoadExecutor::execute_plan_fragment(std::shared_ptr<StreamLoadConte
ctx->txn_id = state->wal_id();
}
ctx->exec_env()->new_load_stream_mgr()->remove(ctx->id);
ctx->commit_infos = std::move(state->tablet_commit_infos());
ctx->commit_infos = state->tablet_commit_infos();
ctx->number_total_rows = state->num_rows_load_total();
ctx->number_loaded_rows = state->num_rows_load_success();
ctx->number_filtered_rows = state->num_rows_load_filtered();
Expand Down
21 changes: 11 additions & 10 deletions be/src/vec/sink/writer/vtablet_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -195,15 +195,18 @@ Status IndexChannel::check_intolerable_failure() {
}

void IndexChannel::set_error_tablet_in_state(RuntimeState* state) {
std::vector<TErrorTabletInfo>& error_tablet_infos = state->error_tablet_infos();
std::vector<TErrorTabletInfo> error_tablet_infos;

std::lock_guard<doris::SpinLock> l(_fail_lock);
for (const auto& it : _failed_channels_msgs) {
TErrorTabletInfo error_info;
error_info.__set_tabletId(it.first);
error_info.__set_msg(it.second);
error_tablet_infos.emplace_back(error_info);
{
std::lock_guard<doris::SpinLock> l(_fail_lock);
for (const auto& it : _failed_channels_msgs) {
TErrorTabletInfo error_info;
error_info.__set_tabletId(it.first);
error_info.__set_msg(it.second);
error_tablet_infos.emplace_back(error_info);
}
}
state->add_error_tablet_infos(error_tablet_infos);
}

void IndexChannel::set_tablets_received_rows(
Expand Down Expand Up @@ -968,9 +971,7 @@ Status VNodeChannel::close_wait(RuntimeState* state) {

if (_add_batches_finished) {
_close_check();
state->tablet_commit_infos().insert(state->tablet_commit_infos().end(),
std::make_move_iterator(_tablet_commit_infos.begin()),
std::make_move_iterator(_tablet_commit_infos.end()));
_state->add_tablet_commit_infos(_tablet_commit_infos);

_index_channel->set_error_tablet_in_state(state);
_index_channel->set_tablets_received_rows(_tablets_received_rows, _node_id);
Expand Down
5 changes: 1 addition & 4 deletions be/src/vec/sink/writer/vtablet_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -682,10 +682,7 @@ Status VTabletWriterV2::close(Status exec_status) {
std::vector<TTabletCommitInfo> tablet_commit_infos;
RETURN_IF_ERROR(
_create_commit_info(tablet_commit_infos, _load_stream_map, _num_replicas));
_state->tablet_commit_infos().insert(
_state->tablet_commit_infos().end(),
std::make_move_iterator(tablet_commit_infos.begin()),
std::make_move_iterator(tablet_commit_infos.end()));
_state->add_tablet_commit_infos(tablet_commit_infos);
}

// _number_input_rows don't contain num_rows_load_filtered and num_rows_load_unselected in scan node
Expand Down

0 comments on commit 00ec587

Please sign in to comment.