Skip to content

Commit

Permalink
[Fix](cloud) Should consider tablet state change whether to skip `syn…
Browse files Browse the repository at this point in the history
…c_rowsets` in publish phase (apache#48400)

considering the following situation:

1. heavy SC begins
2. alter task on tablet X(to tablet Y) is sent to be1
3. be1 shutdown for some reason
4. new loads on new tablet Y are routed to be2(which will skip to
calculate delete bitmaps in commit phase and publish phase because the
tablet's state is `NOT_READY`)
5. be1 restarted and resumed to do alter task
6. alter task on be1 finished and change the tablet's state to `RUNNING`
in MS
7. some load on tablet Y on be2 skip to calculate delete bitmap because
it doesn't know the tablet's state has changed, which will cause
duplicate key problem

Like apache#37670, this PR let the meta
service return the tablet states along with the
getDeleteBitmapUpdateLockResponse to FE and FE will send them to BE to
let the BE know whether it should sync_rowsets() due to tablet state
change on other BEs.
  • Loading branch information
bobhan1 committed Mar 10, 2025
1 parent d036421 commit 5e9a40b
Show file tree
Hide file tree
Showing 11 changed files with 290 additions and 14 deletions.
20 changes: 17 additions & 3 deletions be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <memory>
#include <random>
#include <thread>
#include <type_traits>

#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_tablet.h"
Expand Down Expand Up @@ -75,6 +76,7 @@ Status CloudEngineCalcDeleteBitmapTask::execute() {
bool has_compaction_stats = partition.__isset.base_compaction_cnts &&
partition.__isset.cumulative_compaction_cnts &&
partition.__isset.cumulative_points;
bool has_tablet_states = partition.__isset.tablet_states;
for (size_t i = 0; i < partition.tablet_ids.size(); i++) {
auto tablet_id = partition.tablet_ids[i];
auto tablet_calc_delete_bitmap_ptr = std::make_shared<CloudTabletCalcDeleteBitmapTask>(
Expand All @@ -84,6 +86,9 @@ Status CloudEngineCalcDeleteBitmapTask::execute() {
partition.base_compaction_cnts[i], partition.cumulative_compaction_cnts[i],
partition.cumulative_points[i]);
}
if (has_tablet_states) {
tablet_calc_delete_bitmap_ptr->set_tablet_state(partition.tablet_states[i]);
}
auto submit_st = token->submit_func([=]() {
auto st = tablet_calc_delete_bitmap_ptr->handle();
if (!st.ok()) {
Expand Down Expand Up @@ -128,6 +133,9 @@ void CloudTabletCalcDeleteBitmapTask::set_compaction_stats(int64_t ms_base_compa
_ms_cumulative_compaction_cnt = ms_cumulative_compaction_cnt;
_ms_cumulative_point = ms_cumulative_point;
}
void CloudTabletCalcDeleteBitmapTask::set_tablet_state(int64_t tablet_state) {
_ms_tablet_state = tablet_state;
}

Status CloudTabletCalcDeleteBitmapTask::handle() const {
VLOG_DEBUG << "start calculate delete bitmap on tablet " << _tablet_id;
Expand All @@ -146,7 +154,10 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
int64_t max_version = tablet->max_version_unlocked();
int64_t t2 = MonotonicMicros();

auto should_sync_rowsets_produced_by_compaction = [&]() {
auto should_sync_rowsets = [&]() {
if (_version != max_version + 1) {
return true;
}
if (_ms_base_compaction_cnt == -1) {
return true;
}
Expand All @@ -156,9 +167,12 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
std::shared_lock rlock(tablet->get_header_lock());
return _ms_base_compaction_cnt > tablet->base_compaction_cnt() ||
_ms_cumulative_compaction_cnt > tablet->cumulative_compaction_cnt() ||
_ms_cumulative_point > tablet->cumulative_layer_point();
_ms_cumulative_point > tablet->cumulative_layer_point() ||
(_ms_tablet_state.has_value() &&
_ms_tablet_state.value() != // an SC job finished on other BEs during this load job
static_cast<std::underlying_type_t<TabletState>>(tablet->tablet_state()));
};
if (_version != max_version + 1 || should_sync_rowsets_produced_by_compaction()) {
if (should_sync_rowsets()) {
auto sync_st = tablet->sync_rowsets();
if (!sync_st.ok()) {
LOG(WARNING) << "failed to sync rowsets. tablet_id=" << _tablet_id
Expand Down
3 changes: 3 additions & 0 deletions be/src/cloud/cloud_engine_calc_delete_bitmap_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#pragma once

#include <memory>
#include <optional>

#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet.h"
Expand All @@ -39,6 +40,7 @@ class CloudTabletCalcDeleteBitmapTask {

void set_compaction_stats(int64_t ms_base_compaction_cnt, int64_t ms_cumulative_compaction_cnt,
int64_t ms_cumulative_point);
void set_tablet_state(int64_t tablet_state);

Status handle() const;

Expand All @@ -53,6 +55,7 @@ class CloudTabletCalcDeleteBitmapTask {
int64_t _ms_base_compaction_cnt {-1};
int64_t _ms_cumulative_compaction_cnt {-1};
int64_t _ms_cumulative_point {-1};
std::optional<int64_t> _ms_tablet_state;
std::shared_ptr<MemTrackerLimiter> _mem_tracker;
};

Expand Down
34 changes: 31 additions & 3 deletions cloud/src/meta-service/meta_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2310,6 +2310,7 @@ void MetaServiceImpl::get_delete_bitmap_update_lock(google::protobuf::RpcControl
}

for (const auto& tablet_idx : request->tablet_indexes()) {
// 1. get compaction cnts
TabletStatsPB tablet_stat;
std::string stats_key =
stats_tablet_key({instance_id, tablet_idx.table_id(), tablet_idx.index_id(),
Expand Down Expand Up @@ -2343,16 +2344,43 @@ void MetaServiceImpl::get_delete_bitmap_update_lock(google::protobuf::RpcControl
response->add_base_compaction_cnts(tablet_stat.base_compaction_cnt());
response->add_cumulative_compaction_cnts(tablet_stat.cumulative_compaction_cnt());
response->add_cumulative_points(tablet_stat.cumulative_point());

// 2. get tablet states
std::string tablet_meta_key =
meta_tablet_key({instance_id, tablet_idx.table_id(), tablet_idx.index_id(),
tablet_idx.partition_id(), tablet_idx.tablet_id()});
std::string tablet_meta_val;
err = txn->get(tablet_meta_key, &tablet_meta_val);
if (err != TxnErrorCode::TXN_OK) {
ss << "failed to get tablet meta"
<< (err == TxnErrorCode::TXN_KEY_NOT_FOUND ? " (not found)" : "")
<< " instance_id=" << instance_id << " tablet_id=" << tablet_idx.tablet_id()
<< " key=" << hex(tablet_meta_key) << " err=" << err;
msg = ss.str();
code = err == TxnErrorCode::TXN_KEY_NOT_FOUND ? MetaServiceCode::TABLET_NOT_FOUND
: cast_as<ErrCategory::READ>(err);
return;
}
doris::TabletMetaCloudPB tablet_meta;
if (!tablet_meta.ParseFromString(tablet_meta_val)) {
code = MetaServiceCode::PROTOBUF_PARSE_ERR;
msg = "malformed tablet meta";
return;
}
response->add_tablet_states(
static_cast<std::underlying_type_t<TabletStatePB>>(tablet_meta.tablet_state()));
}

read_stats_sw.pause();
LOG(INFO) << fmt::format("tablet_idxes.size()={}, read tablet compaction cnts cost={} ms",
request->tablet_indexes().size(), read_stats_sw.elapsed_us() / 1000);
LOG(INFO) << fmt::format(
"tablet_idxes.size()={}, read tablet compaction cnts and tablet states cost={} ms",
request->tablet_indexes().size(), read_stats_sw.elapsed_us() / 1000);

DeleteBitmapUpdateLockPB lock_info_tmp;
if (!check_delete_bitmap_lock(code, msg, ss, txn, table_id, request->lock_id(),
request->initiator(), lock_key, lock_info_tmp)) {
LOG(WARNING) << "failed to check delete bitmap lock after get tablet stats, table_id="
LOG(WARNING) << "failed to check delete bitmap lock after get tablet stats and tablet "
"states, table_id="
<< table_id << " request lock_id=" << request->lock_id()
<< " request initiator=" << request->initiator() << " code=" << code
<< " msg=" << msg;
Expand Down
23 changes: 17 additions & 6 deletions cloud/test/meta_service_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ static void insert_rowset(MetaServiceProxy* meta_service, int64_t db_id, const s
commit_txn(meta_service, db_id, txn_id, label);
}

static void add_tablet_stats(MetaServiceProxy* meta_service, std::string instance_id,
static void add_tablet_metas(MetaServiceProxy* meta_service, std::string instance_id,
int64_t table_id, int64_t index_id,
const std::vector<std::array<int64_t, 2>>& tablet_idxes) {
std::unique_ptr<Transaction> txn;
Expand All @@ -293,6 +293,17 @@ static void add_tablet_stats(MetaServiceProxy* meta_service, std::string instanc
stats.set_cumulative_compaction_cnt(20);
stats.set_cumulative_point(30);
txn->put(stats_key, stats.SerializeAsString());

doris::TabletMetaCloudPB tablet_pb;
tablet_pb.set_table_id(table_id);
tablet_pb.set_index_id(index_id);
tablet_pb.set_partition_id(partition_id);
tablet_pb.set_tablet_id(tablet_id);
tablet_pb.set_tablet_state(doris::TabletStatePB::PB_RUNNING);
auto tablet_meta_key =
meta_tablet_key({instance_id, table_id, index_id, partition_id, tablet_id});
auto tablet_meta_val = tablet_pb.SerializeAsString();
txn->put(tablet_meta_key, tablet_meta_val);
}
ASSERT_EQ(txn->commit(), TxnErrorCode::TXN_OK);
}
Expand Down Expand Up @@ -4659,7 +4670,7 @@ TEST(MetaServiceTest, GetDeleteBitmapUpdateLockTabletStatsNormal) {
// [(partition_id, tablet_id)]
std::vector<std::array<int64_t, 2>> tablet_idxes {{70001, 12345}, {80001, 3456}, {90001, 6789}};

add_tablet_stats(meta_service.get(), instance_id, table_id, index_id, tablet_idxes);
add_tablet_metas(meta_service.get(), instance_id, table_id, index_id, tablet_idxes);

GetDeleteBitmapUpdateLockResponse res;
get_delete_bitmap_update_lock(meta_service.get(), res, db_id, table_id, index_id, tablet_idxes,
Expand Down Expand Up @@ -4713,7 +4724,7 @@ TEST(MetaServiceTest, GetDeleteBitmapUpdateLockTabletStatsLockExpired) {
std::vector<std::array<int64_t, 2>> tablet_idxes {
{70001, 12345}, {80001, 3456}, {90001, 6789}};

add_tablet_stats(meta_service.get(), instance_id, table_id, index_id, tablet_idxes);
add_tablet_metas(meta_service.get(), instance_id, table_id, index_id, tablet_idxes);

GetDeleteBitmapUpdateLockResponse res;
get_delete_bitmap_update_lock(meta_service.get(), res, db_id, table_id, index_id,
Expand Down Expand Up @@ -4754,7 +4765,7 @@ TEST(MetaServiceTest, GetDeleteBitmapUpdateLockTabletStatsLockExpired) {
std::vector<std::array<int64_t, 2>> tablet_idxes {
{70001, 12345}, {80001, 3456}, {90001, 6789}};

add_tablet_stats(meta_service.get(), instance_id, table_id, index_id, tablet_idxes);
add_tablet_metas(meta_service.get(), instance_id, table_id, index_id, tablet_idxes);

GetDeleteBitmapUpdateLockResponse res;
get_delete_bitmap_update_lock(meta_service.get(), res, db_id, table_id, index_id,
Expand Down Expand Up @@ -4796,7 +4807,7 @@ TEST(MetaServiceTest, GetDeleteBitmapUpdateLockTabletStatsError) {
std::vector<std::array<int64_t, 2>> tablet_idxes {
{70001, 12345}, {80001, 3456}, {90001, 6789}};

add_tablet_stats(meta_service.get(), instance_id, table_id, index_id, tablet_idxes);
add_tablet_metas(meta_service.get(), instance_id, table_id, index_id, tablet_idxes);

GetDeleteBitmapUpdateLockResponse res;
get_delete_bitmap_update_lock(meta_service.get(), res, db_id, table_id, index_id,
Expand Down Expand Up @@ -4841,7 +4852,7 @@ TEST(MetaServiceTest, GetDeleteBitmapUpdateLockTabletStatsError) {
tablet_idxes.push_back({partition_id, tablet_id});
}

add_tablet_stats(meta_service.get(), instance_id, table_id, index_id, tablet_idxes);
add_tablet_metas(meta_service.get(), instance_id, table_id, index_id, tablet_idxes);

GetDeleteBitmapUpdateLockResponse res;
get_delete_bitmap_update_lock(meta_service.get(), res, db_id, table_id, index_id,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -780,17 +780,26 @@ private Map<Long, List<TCalcDeleteBitmapPartitionInfo>> getCalcDeleteBitmapInfo(
if (!lockContext.getBaseCompactionCnts().isEmpty()
&& !lockContext.getCumulativeCompactionCnts().isEmpty()
&& !lockContext.getCumulativePoints().isEmpty()) {
boolean hasTabletStats = !lockContext.getTabletStates().isEmpty();

List<Long> reqBaseCompactionCnts = Lists.newArrayList();
List<Long> reqCumulativeCompactionCnts = Lists.newArrayList();
List<Long> reqCumulativePoints = Lists.newArrayList();
List<Long> reqTabletStates = Lists.newArrayList();
for (long tabletId : tabletList) {
reqBaseCompactionCnts.add(lockContext.getBaseCompactionCnts().get(tabletId));
reqCumulativeCompactionCnts.add(lockContext.getCumulativeCompactionCnts().get(tabletId));
reqCumulativePoints.add(lockContext.getCumulativePoints().get(tabletId));
if (hasTabletStats) {
reqTabletStates.add(lockContext.getTabletStates().get(tabletId));
}
}
partitionInfo.setBaseCompactionCnts(reqBaseCompactionCnts);
partitionInfo.setCumulativeCompactionCnts(reqCumulativeCompactionCnts);
partitionInfo.setCumulativePoints(reqCumulativePoints);
if (hasTabletStats) {
partitionInfo.setTabletStates(reqTabletStates);
}
}
partitionInfos.add(partitionInfo);
}
Expand Down Expand Up @@ -917,19 +926,24 @@ private void getDeleteBitmapUpdateLock(long transactionId, List<OlapTable> mowTa
List<Long> respBaseCompactionCnts = response.getBaseCompactionCntsList();
List<Long> respCumulativeCompactionCnts = response.getCumulativeCompactionCntsList();
List<Long> respCumulativePoints = response.getCumulativePointsList();
List<Long> respTabletStates = response.getTabletStatesList();
int size1 = respBaseCompactionCnts.size();
int size2 = respCumulativeCompactionCnts.size();
int size3 = respCumulativePoints.size();
if (size1 != tabletList.size() || size2 != tabletList.size() || size3 != tabletList.size()) {
int size4 = respTabletStates.size();
if (size1 != tabletList.size() || size2 != tabletList.size() || size3 != tabletList.size()
|| (size4 > 0 && size4 != tabletList.size())) {
throw new UserException("The size of returned compaction cnts can't match the size of tabletList, "
+ "tabletList.size()=" + tabletList.size() + ", respBaseCompactionCnts.size()=" + size1
+ ", respCumulativeCompactionCnts.size()=" + size2 + ", respCumulativePoints.size()=" + size3);
+ ", respCumulativeCompactionCnts.size()=" + size2 + ", respCumulativePoints.size()=" + size3
+ ", respTabletStates.size()=" + size4);
}
for (int i = 0; i < tabletList.size(); i++) {
long tabletId = tabletList.get(i);
lockContext.getBaseCompactionCnts().put(tabletId, respBaseCompactionCnts.get(i));
lockContext.getCumulativeCompactionCnts().put(tabletId, respCumulativeCompactionCnts.get(i));
lockContext.getCumulativePoints().put(tabletId, respCumulativePoints.get(i));
lockContext.getTabletStates().put(tabletId, respTabletStates.get(i));
}
totalRetryTime += retryTime;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ public class DeleteBitmapUpdateLockContext {
private Map<Long, Long> baseCompactionCnts;
private Map<Long, Long> cumulativeCompactionCnts;
private Map<Long, Long> cumulativePoints;
private Map<Long, Long> tabletStates;
private Map<Long, Set<Long>> tableToPartitions;
private Map<Long, Partition> partitions;
private Map<Long, Map<Long, List<Long>>> backendToPartitionTablets;
Expand All @@ -40,6 +41,7 @@ public DeleteBitmapUpdateLockContext() {
baseCompactionCnts = Maps.newHashMap();
cumulativeCompactionCnts = Maps.newHashMap();
cumulativePoints = Maps.newHashMap();
tabletStates = Maps.newHashMap();
tableToPartitions = Maps.newHashMap();
partitions = Maps.newHashMap();
backendToPartitionTablets = Maps.newHashMap();
Expand All @@ -63,6 +65,10 @@ public Map<Long, Long> getCumulativePoints() {
return cumulativePoints;
}

public Map<Long, Long> getTabletStates() {
return tabletStates;
}

public Map<Long, Map<Long, List<Long>>> getBackendToPartitionTablets() {
return backendToPartitionTablets;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.doris.common.MetaNotFoundException;
import org.apache.doris.common.Pair;
import org.apache.doris.common.util.Daemon;
import org.apache.doris.common.util.DebugPointUtil;
import org.apache.doris.common.util.NetUtils;
import org.apache.doris.common.util.TimeUtils;
import org.apache.doris.cooldown.CooldownConf;
Expand Down Expand Up @@ -593,7 +594,22 @@ public void tabletReport(long backendId, Map<Long, TTablet> backendTablets,
LOG.info("finished to handle tablet report from backend[{}] cost: {} ms", backendId, (end - start));
}

private static void debugBlock() {
if (DebugPointUtil.isEnable("ReportHandler.block")) {
LOG.info("debug point: block at ReportHandler.block");
while (DebugPointUtil.isEnable("ReportHandler.block")) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
LOG.info("error ", e);
}
}
LOG.info("debug point: leave ReportHandler.block");
}
}

private static void taskReport(long backendId, Map<TTaskType, Set<Long>> runningTasks) {
debugBlock();
if (LOG.isDebugEnabled()) {
LOG.debug("begin to handle task report from backend {}", backendId);
}
Expand Down
1 change: 1 addition & 0 deletions gensrc/proto/cloud.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1470,6 +1470,7 @@ message GetDeleteBitmapUpdateLockResponse {
repeated int64 base_compaction_cnts = 2;
repeated int64 cumulative_compaction_cnts = 3;
repeated int64 cumulative_points = 4;
repeated int64 tablet_states = 5;
}

message RemoveDeleteBitmapUpdateLockRequest {
Expand Down
2 changes: 2 additions & 0 deletions gensrc/thrift/AgentService.thrift
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,8 @@ struct TCalcDeleteBitmapPartitionInfo {
4: optional list<i64> base_compaction_cnts
5: optional list<i64> cumulative_compaction_cnts
6: optional list<i64> cumulative_points
7: optional list<i64> sub_txn_ids
8: optional list<i64> tablet_states
}

struct TCalcDeleteBitmapRequest {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
-- This file is automatically generated. You should know what you did if you want to edit this
-- !sql --
1 1 1
2 2 2
3 3 3

-- !sql --
1 1 1
2 2 2
3 3 3
10 88 88

-- !dup_key_count --

-- !sql --
1 \N 99
2 2 2
3 3 3
10 \N 88

Loading

0 comments on commit 5e9a40b

Please sign in to comment.