From 73799e2da146ce85ba81fe4f8bbb7a959a332100 Mon Sep 17 00:00:00 2001 From: st-shchetinin Date: Thu, 17 Apr 2025 17:30:04 +0300 Subject: [PATCH 1/9] fix --- .github/config/muted_ya.txt | 1 - .../schemeshard/schemeshard_export__create.cpp | 15 +++++++++------ .../schemeshard_export_flow_proposals.cpp | 16 ++++++++++++---- .../schemeshard_export_flow_proposals.h | 4 +++- 4 files changed, 24 insertions(+), 12 deletions(-) diff --git a/.github/config/muted_ya.txt b/.github/config/muted_ya.txt index f43771c43956..235790a85df4 100644 --- a/.github/config/muted_ya.txt +++ b/.github/config/muted_ya.txt @@ -48,7 +48,6 @@ ydb/core/quoter/ut QuoterWithKesusTest.KesusRecreation ydb/core/quoter/ut QuoterWithKesusTest.PrefetchCoefficient ydb/core/statistics/aggregator/ut AnalyzeColumnshard.AnalyzeRebootColumnShard ydb/core/tx/datashard/ut_incremental_backup IncrementalBackup.ComplexRestoreBackupCollection+WithIncremental -ydb/core/tx/schemeshard/ut_export_reboots_s3 TExportToS3WithRebootsTests.CancelShouldSucceedOnManyTables ydb/core/tx/schemeshard/ut_export_reboots_s3 TExportToS3WithRebootsTests.ShouldSucceedOnManyTables ydb/core/tx/schemeshard/ut_login_large TSchemeShardLoginLargeTest.RemoveLogin_Many ydb/core/tx/schemeshard/ut_move_reboots TSchemeShardMoveRebootsTest.WithData diff --git a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp index 06a11c1a9d2f..57666a77c93a 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp @@ -554,6 +554,12 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase Send(Self->TxAllocatorClient, new TEvTxAllocatorClient::TEvAllocate(), 0, exportInfo->Id); } + void PrepareAutoDropping(TSchemeShard* ss, TExportInfo::TPtr exportInfo, NIceDb::TNiceDb& db) { + PrepareDropping(ss, exportInfo, db, TExportInfo::EState::AutoDropping, [&](ui64 itemIdx){ + exportInfo->PendingDropItems.push_back(itemIdx); + }); + } + void SubscribeTx(TTxId txId) { Send(Self->SelfId(), new TEvSchemeShard::TEvNotifyTxCompletion(ui64(txId))); } @@ -805,7 +811,6 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase if (item.SourcePathType != NKikimrSchemeOp::EPathTypeTable || item.State != EState::Dropping) { continue; } - if (item.WaitTxId == InvalidTxId) { exportInfo->PendingDropItems.push_back(itemIdx); AllocateTxId(exportInfo, itemIdx); @@ -1146,8 +1151,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase Self->PersistExportItemState(db, exportInfo, itemIdx); if (AllOf(exportInfo->Items, &TExportInfo::TItem::IsDone)) { - PrepareDropping(Self, exportInfo, db, true); - AllocateTxId(exportInfo); + PrepareAutoDropping(Self, exportInfo, db); } } else if (exportInfo->State == EState::Cancellation) { item.State = EState::Cancelled; @@ -1347,8 +1351,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase } } if (!itemHasIssues && AllOf(exportInfo->Items, &TExportInfo::TItem::IsDone)) { - PrepareDropping(Self, exportInfo, db, true); - AllocateTxId(exportInfo); + PrepareAutoDropping(Self, exportInfo, db); } Self->PersistExportItemState(db, exportInfo, itemIdx); @@ -1365,7 +1368,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase item.WaitTxId = InvalidTxId; Self->PersistExportItemState(db, exportInfo, itemIdx); - if (exportInfo->AllItemsAreDropped() || exportInfo->State == EState::AutoDropping) { + if (exportInfo->AllItemsAreDropped()) { AllocateTxId(exportInfo); } } else { diff --git a/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp b/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp index 22ca33b0d24d..7fda77e264bf 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp @@ -328,9 +328,12 @@ TString ExportItemPathName(const TString& exportPathName, ui32 itemIdx) { return TStringBuilder() << exportPathName << "/" << itemIdx; } -void PrepareDropping(TSchemeShard* ss, TExportInfo::TPtr exportInfo, NIceDb::TNiceDb& db, bool isAutoDropping) { +void PrepareDropping(TSchemeShard* ss, TExportInfo::TPtr exportInfo, NIceDb::TNiceDb& db, + TExportInfo::EState droppingState, std::function func) { + Y_ABORT_UNLESS(IsIn({TExportInfo::EState::AutoDropping, TExportInfo::EState::Dropping}, droppingState)); + exportInfo->WaitTxId = InvalidTxId; - exportInfo->State = isAutoDropping ? TExportInfo::EState::AutoDropping : TExportInfo::EState::Dropping; + exportInfo->State = droppingState; ss->PersistExportState(db, exportInfo); for (ui32 itemIdx : xrange(exportInfo->Items.size())) { @@ -341,8 +344,8 @@ void PrepareDropping(TSchemeShard* ss, TExportInfo::TPtr exportInfo, NIceDb::TNi const TPath itemPath = TPath::Resolve(ExportItemPathName(ss, exportInfo, itemIdx), ss); if (itemPath.IsResolved() && !itemPath.IsDeleted()) { item.State = TExportInfo::EState::Dropping; - if (isAutoDropping) { - exportInfo->PendingDropItems.push_back(itemIdx); + if (exportInfo->State == TExportInfo::EState::AutoDropping) { + func(itemIdx); } } @@ -350,5 +353,10 @@ void PrepareDropping(TSchemeShard* ss, TExportInfo::TPtr exportInfo, NIceDb::TNi } } +void PrepareDropping(TSchemeShard* ss, TExportInfo::TPtr exportInfo, NIceDb::TNiceDb& db) { + PrepareDropping(ss, exportInfo, db, TExportInfo::EState::Dropping, [](ui64){}); +} + + } // NSchemeShard } // NKikimr diff --git a/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.h b/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.h index c6f7b5aaa342..97f9d18e33e1 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.h +++ b/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.h @@ -47,7 +47,9 @@ THolder CancelPropose( TString ExportItemPathName(TSchemeShard* ss, const TExportInfo::TPtr exportInfo, ui32 itemIdx); TString ExportItemPathName(const TString& exportPathName, ui32 itemIdx); -void PrepareDropping(TSchemeShard* ss, TExportInfo::TPtr exportInfo, NIceDb::TNiceDb& db, bool isAutoDropping = false); +void PrepareDropping(TSchemeShard* ss, TExportInfo::TPtr exportInfo, NIceDb::TNiceDb& db, + TExportInfo::EState droppingState, std::function func); +void PrepareDropping(TSchemeShard* ss, TExportInfo::TPtr exportInfo, NIceDb::TNiceDb& db); } // NSchemeShard } // NKikimr From 369f5e74126deb062a2e01079e9b5522447e620a Mon Sep 17 00:00:00 2001 From: st-shchetinin Date: Thu, 17 Apr 2025 17:35:20 +0300 Subject: [PATCH 2/9] fix blockPartiotion01 --- ydb/core/tx/schemeshard/ut_export/ut_export.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/ydb/core/tx/schemeshard/ut_export/ut_export.cpp b/ydb/core/tx/schemeshard/ut_export/ut_export.cpp index 4534fc6d2546..3974307eda4d 100644 --- a/ydb/core/tx/schemeshard/ut_export/ut_export.cpp +++ b/ydb/core/tx/schemeshard/ut_export/ut_export.cpp @@ -1264,7 +1264,7 @@ partitioning_settings { TTestEnv env(runtime); ui64 txId = 100; - TBlockEvents blockPartition0(runtime, [](auto&& ev) { + TBlockEvents blockPartition01(runtime, [](auto&& ev) { return ev->Get()->Request.GetKey() == "/data_01.csv"; }); @@ -1300,7 +1300,7 @@ partitioning_settings { )", port)); - runtime.WaitFor("put object request from 01 partition", [&]{ return blockPartition0.size() >= 1; }); + runtime.WaitFor("put object request from 01 partition", [&]{ return blockPartition01.size() >= 1; }); bool isCompleted = false; while (!isCompleted) { @@ -1318,8 +1318,8 @@ partitioning_settings { } } - blockPartition0.Stop(); - blockPartition0.Unblock(); + blockPartition01.Stop(); + blockPartition01.Unblock(); env.TestWaitNotification(runtime, txId); From 82e22ea4bba610482d1822bb556b5164400d6720 Mon Sep 17 00:00:00 2001 From: st-shchetinin Date: Thu, 17 Apr 2025 19:58:42 +0300 Subject: [PATCH 3/9] fix --- ydb/core/tx/schemeshard/schemeshard_export__create.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp index 57666a77c93a..ce229e8073b1 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp @@ -557,6 +557,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase void PrepareAutoDropping(TSchemeShard* ss, TExportInfo::TPtr exportInfo, NIceDb::TNiceDb& db) { PrepareDropping(ss, exportInfo, db, TExportInfo::EState::AutoDropping, [&](ui64 itemIdx){ exportInfo->PendingDropItems.push_back(itemIdx); + AllocateTxId(exportInfo); }); } From c775b2754ae49b4f94158a7098a523b3e35229a7 Mon Sep 17 00:00:00 2001 From: stanislav_shchetinin Date: Thu, 17 Apr 2025 20:15:01 +0300 Subject: [PATCH 4/9] Apply suggestions from code review Co-authored-by: Ilnaz Nizametdinov --- ydb/core/tx/schemeshard/schemeshard_export__create.cpp | 2 +- .../tx/schemeshard/schemeshard_export_flow_proposals.cpp | 9 +++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp index ce229e8073b1..858b5bbfc585 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp @@ -555,7 +555,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase } void PrepareAutoDropping(TSchemeShard* ss, TExportInfo::TPtr exportInfo, NIceDb::TNiceDb& db) { - PrepareDropping(ss, exportInfo, db, TExportInfo::EState::AutoDropping, [&](ui64 itemIdx){ + PrepareDropping(ss, exportInfo, db, TExportInfo::EState::AutoDropping, [&](ui64 itemIdx) { exportInfo->PendingDropItems.push_back(itemIdx); AllocateTxId(exportInfo); }); diff --git a/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp b/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp index 7fda77e264bf..b37811304d35 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp @@ -328,8 +328,13 @@ TString ExportItemPathName(const TString& exportPathName, ui32 itemIdx) { return TStringBuilder() << exportPathName << "/" << itemIdx; } -void PrepareDropping(TSchemeShard* ss, TExportInfo::TPtr exportInfo, NIceDb::TNiceDb& db, - TExportInfo::EState droppingState, std::function func) { +void PrepareDropping( + TSchemeShard* ss, + TExportInfo::TPtr exportInfo, + NIceDb::TNiceDb& db, + TExportInfo::EState droppingState, + std::function func) +{ Y_ABORT_UNLESS(IsIn({TExportInfo::EState::AutoDropping, TExportInfo::EState::Dropping}, droppingState)); exportInfo->WaitTxId = InvalidTxId; From edb7ae05eb367ec446211f774c55adfdfa16f7f7 Mon Sep 17 00:00:00 2001 From: st-shchetinin Date: Thu, 17 Apr 2025 20:19:03 +0300 Subject: [PATCH 5/9] extra diff --- .../schemeshard_export__create.cpp | 1 + .../compatibility/test_compatibility.py | 25 +++++++++++++++++++ 2 files changed, 26 insertions(+) diff --git a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp index 858b5bbfc585..9f07c438fb25 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp @@ -812,6 +812,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase if (item.SourcePathType != NKikimrSchemeOp::EPathTypeTable || item.State != EState::Dropping) { continue; } + if (item.WaitTxId == InvalidTxId) { exportInfo->PendingDropItems.push_back(itemIdx); AllocateTxId(exportInfo, itemIdx); diff --git a/ydb/tests/functional/compatibility/test_compatibility.py b/ydb/tests/functional/compatibility/test_compatibility.py index dbcf60b8ff15..9e89e2041ff9 100644 --- a/ydb/tests/functional/compatibility/test_compatibility.py +++ b/ydb/tests/functional/compatibility/test_compatibility.py @@ -142,3 +142,28 @@ def test_export(self): ] yatest.common.execute(export_command, wait=True, stdout=self.output_f, stderr=self.output_f) + + s3_resource = boto3.resource("s3", endpoint_url=s3_endpoint, + aws_access_key_id=s3_access_key, + aws_secret_access_key=s3_secret_key) + + bucket = s3_resource.Bucket(s3_bucket) + objects = list(bucket.objects.all()) + print(bucket) + print(s3_resource) + print(objects) + metadata_found = False + data_found = False + scheme_found = False + for obj in objects: + key = obj.key + if key.endswith('metadata.json'): + metadata_found = True + elif key.endswith('data_00.csv'): + data_found = True + elif key.endswith('scheme.pb'): + scheme_found = True + + assert metadata_found, "Export metadata file was not found in S3" + assert data_found, "Export data file was not found in S3" + assert scheme_found, "Export scheme file was not found in S3" From 994ac082134cf9ba704f40fdbf136c607fdeb7f0 Mon Sep 17 00:00:00 2001 From: st-shchetinin Date: Thu, 17 Apr 2025 20:29:41 +0300 Subject: [PATCH 6/9] removed test --- .../compatibility/test_compatibility.py | 22 +------------------ 1 file changed, 1 insertion(+), 21 deletions(-) diff --git a/ydb/tests/functional/compatibility/test_compatibility.py b/ydb/tests/functional/compatibility/test_compatibility.py index 9e89e2041ff9..d0a6d58fe244 100644 --- a/ydb/tests/functional/compatibility/test_compatibility.py +++ b/ydb/tests/functional/compatibility/test_compatibility.py @@ -146,24 +146,4 @@ def test_export(self): s3_resource = boto3.resource("s3", endpoint_url=s3_endpoint, aws_access_key_id=s3_access_key, aws_secret_access_key=s3_secret_key) - - bucket = s3_resource.Bucket(s3_bucket) - objects = list(bucket.objects.all()) - print(bucket) - print(s3_resource) - print(objects) - metadata_found = False - data_found = False - scheme_found = False - for obj in objects: - key = obj.key - if key.endswith('metadata.json'): - metadata_found = True - elif key.endswith('data_00.csv'): - data_found = True - elif key.endswith('scheme.pb'): - scheme_found = True - - assert metadata_found, "Export metadata file was not found in S3" - assert data_found, "Export data file was not found in S3" - assert scheme_found, "Export scheme file was not found in S3" + From 32412494b3281d8de055fe3a3fcf902b1bb9ba3f Mon Sep 17 00:00:00 2001 From: st-shchetinin Date: Thu, 17 Apr 2025 20:32:31 +0300 Subject: [PATCH 7/9] removed --- ydb/tests/functional/compatibility/test_compatibility.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/ydb/tests/functional/compatibility/test_compatibility.py b/ydb/tests/functional/compatibility/test_compatibility.py index d0a6d58fe244..dbcf60b8ff15 100644 --- a/ydb/tests/functional/compatibility/test_compatibility.py +++ b/ydb/tests/functional/compatibility/test_compatibility.py @@ -142,8 +142,3 @@ def test_export(self): ] yatest.common.execute(export_command, wait=True, stdout=self.output_f, stderr=self.output_f) - - s3_resource = boto3.resource("s3", endpoint_url=s3_endpoint, - aws_access_key_id=s3_access_key, - aws_secret_access_key=s3_secret_key) - From c7cd25f8d241cea15cbd60896ddaca61243415fe Mon Sep 17 00:00:00 2001 From: st-shchetinin Date: Thu, 17 Apr 2025 20:35:43 +0300 Subject: [PATCH 8/9] extra --- ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp b/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp index b37811304d35..33d3c1ff0db1 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp @@ -362,6 +362,5 @@ void PrepareDropping(TSchemeShard* ss, TExportInfo::TPtr exportInfo, NIceDb::TNi PrepareDropping(ss, exportInfo, db, TExportInfo::EState::Dropping, [](ui64){}); } - } // NSchemeShard } // NKikimr From a565453c57375cde77b5d836ae5c2ec3d86e94a5 Mon Sep 17 00:00:00 2001 From: st-shchetinin Date: Fri, 18 Apr 2025 19:02:55 +0300 Subject: [PATCH 9/9] fix view --- ydb/core/tx/schemeshard/schemeshard_export__create.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp index 9f07c438fb25..0bf789cf4aff 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp @@ -555,10 +555,15 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase } void PrepareAutoDropping(TSchemeShard* ss, TExportInfo::TPtr exportInfo, NIceDb::TNiceDb& db) { + bool isContinued = false; PrepareDropping(ss, exportInfo, db, TExportInfo::EState::AutoDropping, [&](ui64 itemIdx) { exportInfo->PendingDropItems.push_back(itemIdx); + isContinued = true; AllocateTxId(exportInfo); }); + if (!isContinued) { + AllocateTxId(exportInfo); + } } void SubscribeTx(TTxId txId) {