From 70e822461f25cf63a67b82813abb1cf2d2523771 Mon Sep 17 00:00:00 2001 From: Xujian Duan <50550370+DarvenDuan@users.noreply.github.com> Date: Fri, 20 Sep 2024 11:01:44 +0800 Subject: [PATCH] [branch-2.0](schema change) opt cooldown data schema change (#40963) ## Proposed changes This PR solves 3 problems of schema change for cooldown data: 1. Schema change will build a new tablet for base tablet, and all replicas do this separately, so the cooldown data SC writes all replicas to the remote storage, But the cooldown rowsets just need one master replica to cooldown, the other replicas follow the master. To optimize this issue, Doris can just write the new rowsets to the local storage, and the tablet will cooldown this rowsets automatically for just one mater replica. 3. In schema change job and rollup job, FE generates createReplicaTask, which specifies the storage policy of the tablets to create, table's storage policy may be empty if we just set the partitions' storage policy, so we use partition's storage policy instead of table. 5. checks the target tablets in the loop of data conversion, and terminates the data conversion thread if the tablet is been dropped. --- be/src/olap/schema_change.cpp | 16 +++++++++++++++- .../java/org/apache/doris/alter/RollupJobV2.java | 3 ++- .../apache/doris/alter/SchemaChangeJobV2.java | 3 ++- 3 files changed, 19 insertions(+), 3 deletions(-) diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp index 7d867d89460d00..7e6b9a3ff356df 100644 --- a/be/src/olap/schema_change.cpp +++ b/be/src/olap/schema_change.cpp @@ -498,6 +498,13 @@ Status VSchemaChangeDirectly::_inner_process(RowsetReaderSharedPtr rowset_reader TabletSchemaSPtr base_tablet_schema) { bool eof = false; do { + // tablet may be dropped due to user cancel, schema change thread should fast fail + // and release tablet lock. + if (new_tablet->tablet_state() == TABLET_SHUTDOWN) { + return Status::Error( + "fail to process tablet because it is to be deleted. tablet_id={}", + new_tablet->tablet_id()); + } auto new_block = vectorized::Block::create_unique(new_tablet->tablet_schema()->create_block()); auto ref_block = vectorized::Block::create_unique(base_tablet_schema->create_block()); @@ -580,6 +587,13 @@ Status VSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_rea bool eof = false; do { + // tablet may be dropped due to user cancel, schema change thread should fast fail + // and release tablet lock. + if (new_tablet->tablet_state() == TABLET_SHUTDOWN) { + return Status::Error( + "fail to process tablet because it is to be deleted. tablet_id={}", + new_tablet->tablet_id()); + } auto ref_block = vectorized::Block::create_unique(base_tablet_schema->create_block()); auto st = rowset_reader->next_block(ref_block.get()); if (!st) { @@ -1103,7 +1117,7 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams context.segments_overlap = rs_reader->rowset()->rowset_meta()->segments_overlap(); context.tablet_schema = new_tablet->tablet_schema(); context.newest_write_timestamp = rs_reader->newest_write_timestamp(); - context.fs = rs_reader->rowset()->rowset_meta()->fs(); + context.fs = io::global_local_filesystem(); context.write_type = DataWriteType::TYPE_SCHEMA_CHANGE; Status status = new_tablet->create_rowset_writer(context, &rowset_writer); if (!status.ok()) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java index 19d8dc13d75f70..7bb59269a338b5 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/RollupJobV2.java @@ -282,7 +282,8 @@ protected void runPendingJob() throws AlterCancelException { tabletType, null, tbl.getCompressionType(), - tbl.getEnableUniqueKeyMergeOnWrite(), tbl.getStoragePolicy(), + tbl.getEnableUniqueKeyMergeOnWrite(), + tbl.getPartitionInfo().getDataProperty(partitionId).getStoragePolicy(), tbl.disableAutoCompaction(), tbl.enableSingleReplicaCompaction(), tbl.skipWriteIndexOnLoad(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java index 9b7c127676e58b..2591d3106e03f6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java +++ b/fe/fe-core/src/main/java/org/apache/doris/alter/SchemaChangeJobV2.java @@ -277,7 +277,8 @@ protected void runPendingJob() throws AlterCancelException { tbl.getPartitionInfo().getTabletType(partitionId), null, tbl.getCompressionType(), - tbl.getEnableUniqueKeyMergeOnWrite(), tbl.getStoragePolicy(), + tbl.getEnableUniqueKeyMergeOnWrite(), + tbl.getPartitionInfo().getDataProperty(partitionId).getStoragePolicy(), tbl.disableAutoCompaction(), tbl.enableSingleReplicaCompaction(), tbl.skipWriteIndexOnLoad(),