diff --git a/.github/config/muted_ya.txt b/.github/config/muted_ya.txt index f43771c43956..235790a85df4 100644 --- a/.github/config/muted_ya.txt +++ b/.github/config/muted_ya.txt @@ -48,7 +48,6 @@ ydb/core/quoter/ut QuoterWithKesusTest.KesusRecreation ydb/core/quoter/ut QuoterWithKesusTest.PrefetchCoefficient ydb/core/statistics/aggregator/ut AnalyzeColumnshard.AnalyzeRebootColumnShard ydb/core/tx/datashard/ut_incremental_backup IncrementalBackup.ComplexRestoreBackupCollection+WithIncremental -ydb/core/tx/schemeshard/ut_export_reboots_s3 TExportToS3WithRebootsTests.CancelShouldSucceedOnManyTables ydb/core/tx/schemeshard/ut_export_reboots_s3 TExportToS3WithRebootsTests.ShouldSucceedOnManyTables ydb/core/tx/schemeshard/ut_login_large TSchemeShardLoginLargeTest.RemoveLogin_Many ydb/core/tx/schemeshard/ut_move_reboots TSchemeShardMoveRebootsTest.WithData diff --git a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp index 06a11c1a9d2f..0bf789cf4aff 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp @@ -554,6 +554,18 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase Send(Self->TxAllocatorClient, new TEvTxAllocatorClient::TEvAllocate(), 0, exportInfo->Id); } + void PrepareAutoDropping(TSchemeShard* ss, TExportInfo::TPtr exportInfo, NIceDb::TNiceDb& db) { + bool isContinued = false; + PrepareDropping(ss, exportInfo, db, TExportInfo::EState::AutoDropping, [&](ui64 itemIdx) { + exportInfo->PendingDropItems.push_back(itemIdx); + isContinued = true; + AllocateTxId(exportInfo); + }); + if (!isContinued) { + AllocateTxId(exportInfo); + } + } + void SubscribeTx(TTxId txId) { Send(Self->SelfId(), new TEvSchemeShard::TEvNotifyTxCompletion(ui64(txId))); } @@ -1146,8 +1158,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase Self->PersistExportItemState(db, exportInfo, itemIdx); if (AllOf(exportInfo->Items, &TExportInfo::TItem::IsDone)) { - PrepareDropping(Self, exportInfo, db, true); - AllocateTxId(exportInfo); + PrepareAutoDropping(Self, exportInfo, db); } } else if (exportInfo->State == EState::Cancellation) { item.State = EState::Cancelled; @@ -1347,8 +1358,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase } } if (!itemHasIssues && AllOf(exportInfo->Items, &TExportInfo::TItem::IsDone)) { - PrepareDropping(Self, exportInfo, db, true); - AllocateTxId(exportInfo); + PrepareAutoDropping(Self, exportInfo, db); } Self->PersistExportItemState(db, exportInfo, itemIdx); @@ -1365,7 +1375,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase item.WaitTxId = InvalidTxId; Self->PersistExportItemState(db, exportInfo, itemIdx); - if (exportInfo->AllItemsAreDropped() || exportInfo->State == EState::AutoDropping) { + if (exportInfo->AllItemsAreDropped()) { AllocateTxId(exportInfo); } } else { diff --git a/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp b/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp index 22ca33b0d24d..33d3c1ff0db1 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp @@ -328,9 +328,17 @@ TString ExportItemPathName(const TString& exportPathName, ui32 itemIdx) { return TStringBuilder() << exportPathName << "/" << itemIdx; } -void PrepareDropping(TSchemeShard* ss, TExportInfo::TPtr exportInfo, NIceDb::TNiceDb& db, bool isAutoDropping) { +void PrepareDropping( + TSchemeShard* ss, + TExportInfo::TPtr exportInfo, + NIceDb::TNiceDb& db, + TExportInfo::EState droppingState, + std::function func) +{ + Y_ABORT_UNLESS(IsIn({TExportInfo::EState::AutoDropping, TExportInfo::EState::Dropping}, droppingState)); + exportInfo->WaitTxId = InvalidTxId; - exportInfo->State = isAutoDropping ? TExportInfo::EState::AutoDropping : TExportInfo::EState::Dropping; + exportInfo->State = droppingState; ss->PersistExportState(db, exportInfo); for (ui32 itemIdx : xrange(exportInfo->Items.size())) { @@ -341,8 +349,8 @@ void PrepareDropping(TSchemeShard* ss, TExportInfo::TPtr exportInfo, NIceDb::TNi const TPath itemPath = TPath::Resolve(ExportItemPathName(ss, exportInfo, itemIdx), ss); if (itemPath.IsResolved() && !itemPath.IsDeleted()) { item.State = TExportInfo::EState::Dropping; - if (isAutoDropping) { - exportInfo->PendingDropItems.push_back(itemIdx); + if (exportInfo->State == TExportInfo::EState::AutoDropping) { + func(itemIdx); } } @@ -350,5 +358,9 @@ void PrepareDropping(TSchemeShard* ss, TExportInfo::TPtr exportInfo, NIceDb::TNi } } +void PrepareDropping(TSchemeShard* ss, TExportInfo::TPtr exportInfo, NIceDb::TNiceDb& db) { + PrepareDropping(ss, exportInfo, db, TExportInfo::EState::Dropping, [](ui64){}); +} + } // NSchemeShard } // NKikimr diff --git a/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.h b/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.h index c6f7b5aaa342..97f9d18e33e1 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.h +++ b/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.h @@ -47,7 +47,9 @@ THolder CancelPropose( TString ExportItemPathName(TSchemeShard* ss, const TExportInfo::TPtr exportInfo, ui32 itemIdx); TString ExportItemPathName(const TString& exportPathName, ui32 itemIdx); -void PrepareDropping(TSchemeShard* ss, TExportInfo::TPtr exportInfo, NIceDb::TNiceDb& db, bool isAutoDropping = false); +void PrepareDropping(TSchemeShard* ss, TExportInfo::TPtr exportInfo, NIceDb::TNiceDb& db, + TExportInfo::EState droppingState, std::function func); +void PrepareDropping(TSchemeShard* ss, TExportInfo::TPtr exportInfo, NIceDb::TNiceDb& db); } // NSchemeShard } // NKikimr diff --git a/ydb/core/tx/schemeshard/ut_export/ut_export.cpp b/ydb/core/tx/schemeshard/ut_export/ut_export.cpp index 4534fc6d2546..3974307eda4d 100644 --- a/ydb/core/tx/schemeshard/ut_export/ut_export.cpp +++ b/ydb/core/tx/schemeshard/ut_export/ut_export.cpp @@ -1264,7 +1264,7 @@ partitioning_settings { TTestEnv env(runtime); ui64 txId = 100; - TBlockEvents blockPartition0(runtime, [](auto&& ev) { + TBlockEvents blockPartition01(runtime, [](auto&& ev) { return ev->Get()->Request.GetKey() == "/data_01.csv"; }); @@ -1300,7 +1300,7 @@ partitioning_settings { )", port)); - runtime.WaitFor("put object request from 01 partition", [&]{ return blockPartition0.size() >= 1; }); + runtime.WaitFor("put object request from 01 partition", [&]{ return blockPartition01.size() >= 1; }); bool isCompleted = false; while (!isCompleted) { @@ -1318,8 +1318,8 @@ partitioning_settings { } } - blockPartition0.Stop(); - blockPartition0.Unblock(); + blockPartition01.Stop(); + blockPartition01.Unblock(); env.TestWaitNotification(runtime, txId);