Skip to content

Commit

Permalink
[branch-2.0](schema change) opt cooldown data schema change (#40963)
Browse files Browse the repository at this point in the history
## Proposed changes

This PR solves 3 problems of schema change for cooldown data:

1. Schema change will build a new tablet for base tablet, and all
replicas do this separately, so the cooldown data SC writes all replicas
to the remote storage, But the cooldown rowsets just need one master
replica to cooldown, the other replicas follow the master. To optimize
this issue, Doris can just write the new rowsets to the local storage,
and the tablet will cooldown this rowsets automatically for just one
mater replica.

3. In schema change job and rollup job, FE generates createReplicaTask,
which specifies the storage policy of the tablets to create, table's
storage policy may be empty if we just set the partitions' storage
policy, so we use partition's storage policy instead of table.

5. checks the target tablets in the loop of data conversion, and
terminates the data conversion thread if the tablet is been dropped.
  • Loading branch information
DarvenDuan committed Sep 20, 2024
1 parent a6be436 commit 70e8224
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 3 deletions.
16 changes: 15 additions & 1 deletion be/src/olap/schema_change.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,13 @@ Status VSchemaChangeDirectly::_inner_process(RowsetReaderSharedPtr rowset_reader
TabletSchemaSPtr base_tablet_schema) {
bool eof = false;
do {
// tablet may be dropped due to user cancel, schema change thread should fast fail
// and release tablet lock.
if (new_tablet->tablet_state() == TABLET_SHUTDOWN) {
return Status::Error<TABLE_ALREADY_DELETED_ERROR>(
"fail to process tablet because it is to be deleted. tablet_id={}",
new_tablet->tablet_id());
}
auto new_block =
vectorized::Block::create_unique(new_tablet->tablet_schema()->create_block());
auto ref_block = vectorized::Block::create_unique(base_tablet_schema->create_block());
Expand Down Expand Up @@ -580,6 +587,13 @@ Status VSchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_rea

bool eof = false;
do {
// tablet may be dropped due to user cancel, schema change thread should fast fail
// and release tablet lock.
if (new_tablet->tablet_state() == TABLET_SHUTDOWN) {
return Status::Error<TABLE_ALREADY_DELETED_ERROR>(
"fail to process tablet because it is to be deleted. tablet_id={}",
new_tablet->tablet_id());
}
auto ref_block = vectorized::Block::create_unique(base_tablet_schema->create_block());
auto st = rowset_reader->next_block(ref_block.get());
if (!st) {
Expand Down Expand Up @@ -1103,7 +1117,7 @@ Status SchemaChangeHandler::_convert_historical_rowsets(const SchemaChangeParams
context.segments_overlap = rs_reader->rowset()->rowset_meta()->segments_overlap();
context.tablet_schema = new_tablet->tablet_schema();
context.newest_write_timestamp = rs_reader->newest_write_timestamp();
context.fs = rs_reader->rowset()->rowset_meta()->fs();
context.fs = io::global_local_filesystem();
context.write_type = DataWriteType::TYPE_SCHEMA_CHANGE;
Status status = new_tablet->create_rowset_writer(context, &rowset_writer);
if (!status.ok()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,8 @@ protected void runPendingJob() throws AlterCancelException {
tabletType,
null,
tbl.getCompressionType(),
tbl.getEnableUniqueKeyMergeOnWrite(), tbl.getStoragePolicy(),
tbl.getEnableUniqueKeyMergeOnWrite(),
tbl.getPartitionInfo().getDataProperty(partitionId).getStoragePolicy(),
tbl.disableAutoCompaction(),
tbl.enableSingleReplicaCompaction(),
tbl.skipWriteIndexOnLoad(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,8 @@ protected void runPendingJob() throws AlterCancelException {
tbl.getPartitionInfo().getTabletType(partitionId),
null,
tbl.getCompressionType(),
tbl.getEnableUniqueKeyMergeOnWrite(), tbl.getStoragePolicy(),
tbl.getEnableUniqueKeyMergeOnWrite(),
tbl.getPartitionInfo().getDataProperty(partitionId).getStoragePolicy(),
tbl.disableAutoCompaction(),
tbl.enableSingleReplicaCompaction(),
tbl.skipWriteIndexOnLoad(),
Expand Down

0 comments on commit 70e8224

Please sign in to comment.