diff --git a/ydb/core/tx/schemeshard/schemeshard_export.cpp b/ydb/core/tx/schemeshard/schemeshard_export.cpp index 9f0b199f12f7..2c2a61f0deda 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export.cpp @@ -100,7 +100,8 @@ void TSchemeShard::FromXxportInfo(NKikimrExport::TExport& exprt, const TExportIn case TExportInfo::EState::CopyTables: exprt.SetProgress(Ydb::Export::ExportProgress::PROGRESS_PREPARING); break; - + + case TExportInfo::EState::AutoDropping: case TExportInfo::EState::Transferring: case TExportInfo::EState::Done: for (ui32 itemIdx : xrange(exportInfo->Items.size())) { diff --git a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp index 7fe783bb8682..cf819508495b 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp @@ -432,6 +432,18 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase Send(Self->TxAllocatorClient, new TEvTxAllocatorClient::TEvAllocate(), 0, exportInfo->Id); } + void PrepareAutoDropping(TSchemeShard* ss, TExportInfo::TPtr exportInfo, NIceDb::TNiceDb& db) { + bool isContinued = false; + PrepareDropping(ss, exportInfo, db, TExportInfo::EState::AutoDropping, [&](ui64 itemIdx) { + exportInfo->PendingDropItems.push_back(itemIdx); + isContinued = true; + AllocateTxId(exportInfo, itemIdx); + }); + if (!isContinued) { + AllocateTxId(exportInfo); + } + } + void SubscribeTx(TTxId txId) { Send(Self->SelfId(), new TEvSchemeShard::TEvNotifyTxCompletion(ui64(txId))); } @@ -617,7 +629,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase const auto& item = exportInfo->Items.at(itemIdx); if (item.WaitTxId == InvalidTxId) { - if (item.SourcePathType == NKikimrSchemeOp::EPathTypeTable) { + if (item.SourcePathType == NKikimrSchemeOp::EPathTypeTable && item.State <= EState::Transferring) { pendingTables.emplace_back(itemIdx); } else { UploadScheme(exportInfo, itemIdx, ctx); @@ -665,6 +677,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase SendNotificationsIfFinished(exportInfo); break; + case EState::AutoDropping: case EState::Dropping: if (!exportInfo->AllItemsAreDropped()) { for (ui32 itemIdx : xrange(exportInfo->Items.size())) { @@ -749,6 +762,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase } break; + case EState::AutoDropping: case EState::Dropping: if (exportInfo->PendingDropItems) { itemIdx = PopFront(exportInfo->PendingDropItems); @@ -821,6 +835,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase } break; + case EState::AutoDropping: case EState::Dropping: if (isMultipleMods || isNotExist) { if (record.GetPathDropTxId()) { @@ -925,6 +940,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase Self->PersistExportItemState(db, exportInfo, itemIdx); break; + case EState::AutoDropping: case EState::Dropping: if (!exportInfo->AllItemsAreDropped()) { Y_ABORT_UNLESS(itemIdx < exportInfo->Items.size()); @@ -1011,7 +1027,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase Self->PersistExportItemState(db, exportInfo, itemIdx); if (AllOf(exportInfo->Items, &TExportInfo::TItem::IsDone)) { - EndExport(exportInfo, EState::Done, db); + PrepareAutoDropping(Self, exportInfo, db); } } else if (exportInfo->State == EState::Cancellation) { item.State = EState::Cancelled; @@ -1140,14 +1156,14 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase } } if (!itemHasIssues && AllOf(exportInfo->Items, &TExportInfo::TItem::IsDone)) { - exportInfo->State = EState::Done; - exportInfo->EndTime = TAppData::TimeProvider->Now(); + PrepareAutoDropping(Self, exportInfo, db); } Self->PersistExportItemState(db, exportInfo, itemIdx); break; } + case EState::AutoDropping: case EState::Dropping: if (!exportInfo->AllItemsAreDropped()) { Y_ABORT_UNLESS(itemIdx < exportInfo->Items.size()); @@ -1163,6 +1179,10 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase } else { SendNotificationsIfFinished(exportInfo, true); // for tests + if (exportInfo->State == EState::AutoDropping) { + return EndExport(exportInfo, EState::Done, db); + } + if (exportInfo->Uid) { Self->ExportsByUid.erase(exportInfo->Uid); } diff --git a/ydb/core/tx/schemeshard/schemeshard_export__forget.cpp b/ydb/core/tx/schemeshard/schemeshard_export__forget.cpp index 2a4fd3c5f186..d4455cc40837 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export__forget.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export__forget.cpp @@ -77,23 +77,8 @@ struct TSchemeShard::TExport::TTxForget: public TSchemeShard::TXxport::TTxBase { LOG_D("TExport::TTxForget, dropping export tables" << ", info: " << exportInfo->ToString() ); - exportInfo->WaitTxId = InvalidTxId; - exportInfo->State = TExportInfo::EState::Dropping; - Self->PersistExportState(db, exportInfo); - - for (ui32 itemIdx : xrange(exportInfo->Items.size())) { - auto& item = exportInfo->Items.at(itemIdx); - - item.WaitTxId = InvalidTxId; - item.State = TExportInfo::EState::Dropped; - - const TPath itemPath = TPath::Resolve(ExportItemPathName(Self, exportInfo, itemIdx), Self); - if (itemPath.IsResolved() && !itemPath.IsDeleted()) { - item.State = TExportInfo::EState::Dropping; - } - - Self->PersistExportItemState(db, exportInfo, itemIdx); - } + + PrepareDropping(Self, exportInfo, db); Progress = true; } diff --git a/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp b/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp index 8f152650a29d..f70b17b13d75 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp @@ -319,5 +319,39 @@ TString ExportItemPathName(const TString& exportPathName, ui32 itemIdx) { return TStringBuilder() << exportPathName << "/" << itemIdx; } +void PrepareDropping( + TSchemeShard* ss, + TExportInfo::TPtr exportInfo, + NIceDb::TNiceDb& db, + TExportInfo::EState droppingState, + std::function func) +{ + Y_ABORT_UNLESS(IsIn({TExportInfo::EState::AutoDropping, TExportInfo::EState::Dropping}, droppingState)); + + exportInfo->WaitTxId = InvalidTxId; + exportInfo->State = droppingState; + ss->PersistExportState(db, exportInfo); + + for (ui32 itemIdx : xrange(exportInfo->Items.size())) { + auto& item = exportInfo->Items.at(itemIdx); + + item.WaitTxId = InvalidTxId; + item.State = TExportInfo::EState::Dropped; + const TPath itemPath = TPath::Resolve(ExportItemPathName(ss, exportInfo, itemIdx), ss); + if (itemPath.IsResolved() && !itemPath.IsDeleted()) { + item.State = TExportInfo::EState::Dropping; + if (exportInfo->State == TExportInfo::EState::AutoDropping) { + func(itemIdx); + } + } + + ss->PersistExportItemState(db, exportInfo, itemIdx); + } +} + +void PrepareDropping(TSchemeShard* ss, TExportInfo::TPtr exportInfo, NIceDb::TNiceDb& db) { + PrepareDropping(ss, exportInfo, db, TExportInfo::EState::Dropping, [](ui64){}); +} + } // NSchemeShard } // NKikimr diff --git a/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.h b/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.h index a84cb925ea08..97f9d18e33e1 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.h +++ b/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.h @@ -47,5 +47,9 @@ THolder CancelPropose( TString ExportItemPathName(TSchemeShard* ss, const TExportInfo::TPtr exportInfo, ui32 itemIdx); TString ExportItemPathName(const TString& exportPathName, ui32 itemIdx); +void PrepareDropping(TSchemeShard* ss, TExportInfo::TPtr exportInfo, NIceDb::TNiceDb& db, + TExportInfo::EState droppingState, std::function func); +void PrepareDropping(TSchemeShard* ss, TExportInfo::TPtr exportInfo, NIceDb::TNiceDb& db); + } // NSchemeShard } // NKikimr diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index 78322d3b8604..17105f358c22 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -2668,6 +2668,7 @@ struct TExportInfo: public TSimpleRefCount { Done = 240, Dropping = 241, Dropped = 242, + AutoDropping = 243, Cancellation = 250, Cancelled = 251, }; @@ -2782,12 +2783,16 @@ struct TExportInfo: public TSimpleRefCount { return State == EState::Dropping; } + bool IsAutoDropping() const { + return State == EState::AutoDropping; + } + bool IsCancelling() const { return State == EState::Cancellation; } bool IsInProgress() const { - return IsPreparing() || IsWorking() || IsDropping() || IsCancelling(); + return IsPreparing() || IsWorking() || IsDropping() || IsAutoDropping() || IsCancelling(); } bool IsDone() const { diff --git a/ydb/core/tx/schemeshard/ut_export/ut_export.cpp b/ydb/core/tx/schemeshard/ut_export/ut_export.cpp index 8bcc2c72ca6a..55e111ca004b 100644 --- a/ydb/core/tx/schemeshard/ut_export/ut_export.cpp +++ b/ydb/core/tx/schemeshard/ut_export/ut_export.cpp @@ -1179,10 +1179,23 @@ partitioning_settings { } )", port)); const ui64 exportId = txId; + ::NKikimrSubDomains::TDiskSpaceUsage afterExport; + + TTestActorRuntime::TEventObserver prevObserverFunc; + prevObserverFunc = runtime.SetObserverFunc([&](TAutoPtr& event) { + if (auto* p = event->CastAsLocal()) { + auto& record = p->Record; + if (record.TransactionSize() >= 1 && + record.GetTransaction(0).GetOperationType() == NKikimrSchemeOp::ESchemeOpDropTable) { + afterExport = waitForStats(2); + } + } + return prevObserverFunc(event); + }); + env.TestWaitNotification(runtime, exportId); TestGetExport(runtime, exportId, "/MyRoot", Ydb::StatusIds::SUCCESS); - const auto afterExport = waitForStats(2); UNIT_ASSERT_STRINGS_EQUAL(expected.DebugString(), afterExport.DebugString()); TestForgetExport(runtime, ++txId, "/MyRoot", exportId); @@ -1198,13 +1211,23 @@ partitioning_settings { TTestEnv env(runtime); ui64 txId = 100; + TBlockEvents blockPartition01(runtime, [](auto&& ev) { + return ev->Get()->Request.GetKey() == "/data_01.csv"; + }); + TestCreateTable(runtime, ++txId, "/MyRoot", R"( Name: "Table" - Columns { Name: "key" Type: "Utf8" } + Columns { Name: "key" Type: "Uint32" } Columns { Name: "value" Type: "Utf8" } KeyColumnNames: ["key"] + SplitBoundary { KeyPrefix { Tuple { Optional { Uint32: 10 } }}} )"); env.TestWaitNotification(runtime, txId); + + WriteRow(runtime, ++txId, "/MyRoot/Table", 0, 1, "v1"); + env.TestWaitNotification(runtime, txId); + WriteRow(runtime, ++txId, "/MyRoot/Table", 1, 100, "v100"); + env.TestWaitNotification(runtime, txId); TPortManager portManager; const ui16 port = portManager.GetPort(); @@ -1222,17 +1245,34 @@ partitioning_settings { } } )", port)); + + runtime.WaitFor("put object request from 01 partition", [&]{ return blockPartition01.size() >= 1; }); + bool isCompleted = false; + + while (!isCompleted) { + const auto desc = TestGetExport(runtime, txId, "/MyRoot"); + const auto entry = desc.GetResponse().GetEntry(); + const auto& item = entry.GetItemsProgress(0); + + if (item.parts_completed() > 0) { + isCompleted = true; + UNIT_ASSERT_VALUES_EQUAL(item.parts_total(), 2); + UNIT_ASSERT_VALUES_EQUAL(item.parts_completed(), 1); + UNIT_ASSERT(item.has_start_time()); + } else { + runtime.SimulateSleep(TDuration::Seconds(1)); + } + } + + blockPartition01.Stop(); + blockPartition01.Unblock(); + env.TestWaitNotification(runtime, txId); const auto desc = TestGetExport(runtime, txId, "/MyRoot"); - const auto& entry = desc.GetResponse().GetEntry(); - UNIT_ASSERT_VALUES_EQUAL(entry.ItemsProgressSize(), 1); + const auto entry = desc.GetResponse().GetEntry(); - const auto& item = entry.GetItemsProgress(0); - UNIT_ASSERT_VALUES_EQUAL(item.parts_total(), 1); - UNIT_ASSERT_VALUES_EQUAL(item.parts_completed(), 1); - UNIT_ASSERT(item.has_start_time()); - UNIT_ASSERT(item.has_end_time()); + UNIT_ASSERT_VALUES_EQUAL(entry.ItemsProgressSize(), 1); } Y_UNIT_TEST(ShouldRestartOnScanErrors) { @@ -2645,4 +2685,41 @@ attributes { gen.Check(); } + + Y_UNIT_TEST(AutoDropping) { + TTestBasicRuntime runtime; + + TPortManager portManager; + const ui16 port = portManager.GetPort(); + + TS3Mock s3Mock({}, TS3Mock::TSettings(port)); + UNIT_ASSERT(s3Mock.Start()); + + + auto request = Sprintf(R"( + ExportToS3Settings { + endpoint: "localhost:%d" + scheme: HTTP + items { + source_path: "/MyRoot/Table" + destination_prefix: "" + } + } + )", port); + + TTestEnv env(runtime); + + Run(runtime, env, TVector{ + R"( + Name: "Table" + Columns { Name: "key" Type: "Utf8" } + Columns { Name: "value" Type: "Utf8" } + KeyColumnNames: ["key"] + )", + }, request, Ydb::StatusIds::SUCCESS, "/MyRoot"); + + auto desc = DescribePath(runtime, "/MyRoot"); + UNIT_ASSERT_EQUAL(desc.GetPathDescription().ChildrenSize(), 1); + UNIT_ASSERT_EQUAL(desc.GetPathDescription().GetChildren(0).GetName(), "Table"); + } } diff --git a/ydb/core/tx/schemeshard/ut_export_reboots_s3/ut_export_reboots_s3.cpp b/ydb/core/tx/schemeshard/ut_export_reboots_s3/ut_export_reboots_s3.cpp index 73823901479f..a3c1b56825da 100644 --- a/ydb/core/tx/schemeshard/ut_export_reboots_s3/ut_export_reboots_s3.cpp +++ b/ydb/core/tx/schemeshard/ut_export_reboots_s3/ut_export_reboots_s3.cpp @@ -510,7 +510,7 @@ Y_UNIT_TEST_SUITE(TExportToS3WithRebootsTests) { )"); } - class TestData { + class TTestData { public: static const TTypedScheme& Table() { return TableScheme; @@ -531,9 +531,9 @@ Y_UNIT_TEST_SUITE(TExportToS3WithRebootsTests) { static const TString RequestString; }; - const char* TestData::TableName = "Table"; + const char* TTestData::TableName = "Table"; - const TTypedScheme TestData::TableScheme = TTypedScheme { + const TTypedScheme TTestData::TableScheme = TTypedScheme { EPathTypeTable, Sprintf(R"( Name: "%s" @@ -543,7 +543,7 @@ Y_UNIT_TEST_SUITE(TExportToS3WithRebootsTests) { )", TableName) }; - const TTypedScheme TestData::ChangefeedScheme = TTypedScheme { + const TTypedScheme TTestData::ChangefeedScheme = TTypedScheme { EPathTypeCdcStream, Sprintf(R"( TableName: "%s" @@ -556,7 +556,7 @@ Y_UNIT_TEST_SUITE(TExportToS3WithRebootsTests) { )", TableName) }; - const TString TestData::RequestString = R"( + const TString TTestData::RequestString = R"( ExportToS3Settings { endpoint: "localhost:%d" scheme: HTTP @@ -569,22 +569,55 @@ Y_UNIT_TEST_SUITE(TExportToS3WithRebootsTests) { Y_UNIT_TEST(ShouldSucceedOnSingleShardTableWithChangefeed) { RunS3({ - TestData::Table(), - TestData::Changefeed() - }, TestData::Request()); + TTestData::Table(), + TTestData::Changefeed() + }, TTestData::Request()); } Y_UNIT_TEST(CancelOnSingleShardTableWithChangefeed) { CancelS3({ - TestData::Table(), - TestData::Changefeed() - }, TestData::Request()); + TTestData::Table(), + TTestData::Changefeed() + }, TTestData::Request()); } Y_UNIT_TEST(ForgetShouldSucceedOnSingleShardTableWithChangefeed) { ForgetS3({ - TestData::Table(), - TestData::Changefeed() - }, TestData::Request()); + TTestData::Table(), + TTestData::Changefeed() + }, TTestData::Request()); + } + + Y_UNIT_TEST(ShouldSucceedAutoDropping) { + TPortManager portManager; + const ui16 port = portManager.GetPort(); + + TTestWithReboots t; + TS3Mock s3Mock({}, TS3Mock::TSettings(port)); + UNIT_ASSERT(s3Mock.Start()); + + t.Run([&](TTestActorRuntime& runtime, bool& activeZone) { + runtime.SetLogPriority(NKikimrServices::EXPORT, NActors::NLog::PRI_TRACE); + { + TInactiveZone inactive(activeZone); + CreateSchemeObjects(t, runtime, { + TTestData::Table() + }); + + TestExport(runtime, ++t.TxId, "/MyRoot", Sprintf(TTestData::Request().data(), port)); + } + + const ui64 exportId = t.TxId; + t.TestEnv->TestWaitNotification(runtime, exportId); + + { + TInactiveZone inactive(activeZone); + TestGetExport(runtime, exportId, "/MyRoot"); + TestRmDir(runtime, ++t.TxId, "/MyRoot", "DirA"); + auto desc = DescribePath(runtime, "/MyRoot"); + UNIT_ASSERT_EQUAL(desc.GetPathDescription().ChildrenSize(), 1); + UNIT_ASSERT_EQUAL(desc.GetPathDescription().GetChildren(0).GetName(), "Table"); + } + }); } } diff --git a/ydb/tests/functional/compatibility/test_compatibility.py b/ydb/tests/functional/compatibility/test_compatibility.py index 348908e7c3b2..77c1d977c5f6 100644 --- a/ydb/tests/functional/compatibility/test_compatibility.py +++ b/ydb/tests/functional/compatibility/test_compatibility.py @@ -1,5 +1,9 @@ # -*- coding: utf-8 -*- +import boto3 +import tempfile import yatest +import os +import json from ydb.tests.library.harness.kikimr_runner import KiKiMR from ydb.tests.library.harness.kikimr_config import KikimrConfigGenerator from ydb.tests.library.harness.param_constants import kikimr_driver_path @@ -24,6 +28,10 @@ def setup_class(cls): ) ) cls.driver.wait() + output_path = yatest.common.test_output_path() + cls.output_f = open(os.path.join(output_path, "out.log"), "w") + + cls.s3_config = cls.setup_s3() @classmethod def teardown_class(cls): @@ -33,9 +41,32 @@ def teardown_class(cls): if hasattr(cls, 'cluster'): cls.cluster.stop(kill=True) # TODO fix + @staticmethod + def setup_s3(): + s3_endpoint = os.getenv("S3_ENDPOINT") + s3_access_key = "minio" + s3_secret_key = "minio123" + s3_bucket = "export_test_bucket" + + resource = boto3.resource("s3", endpoint_url=s3_endpoint, aws_access_key_id=s3_access_key, aws_secret_access_key=s3_secret_key) + + bucket = resource.Bucket(s3_bucket) + bucket.create() + bucket.objects.all().delete() + + return s3_endpoint, s3_access_key, s3_secret_key, s3_bucket + + def _execute_command_and_get_result(self, command): + with tempfile.NamedTemporaryFile(mode='w+', delete=True) as temp_file: + yatest.common.execute(command, wait=True, stdout=temp_file, stderr=temp_file) + temp_file.flush() + temp_file.seek(0) + result = json.load(temp_file) + self.output_f.write(str(result) + "\n") + return result + def test_simple(self): session = ydb.retry_operation_sync(lambda: self.driver.table_client.session().create()) - with ydb.SessionPool(self.driver, size=1) as pool: with pool.checkout() as session: session.execute_scheme( @@ -75,3 +106,86 @@ def test_simple(self): result = list(result_sets[0].rows[0].values()) assert len(result) == 1 assert result[0] == upsert_count * iteration_count + + def test_export(self): + s3_endpoint, s3_access_key, s3_secret_key, s3_bucket = self.s3_config + + with ydb.SessionPool(self.driver, size=1) as pool: + with pool.checkout() as session: + for table_num in range(1, 6): + table_name = f"sample_table_{table_num}" + session.execute_scheme( + f"create table `{table_name}` (id Uint64, payload Utf8, PRIMARY KEY(id));" + ) + + query = f"""INSERT INTO `{table_name}` (id, payload) VALUES + (1, 'Payload 1 for table {table_num}'), + (2, 'Payload 2 for table {table_num}'), + (3, 'Payload 3 for table {table_num}'), + (4, 'Payload 4 for table {table_num}'), + (5, 'Payload 5 for table {table_num}');""" + session.transaction().execute( + query, commit_tx=True + ) + + export_command = [ + yatest.common.binary_path(os.getenv("YDB_CLI_BINARY")), + "--endpoint", + "grpc://localhost:%d" % self.cluster.nodes[1].grpc_port, + "--database=/Root", + "export", + "s3", + "--s3-endpoint", + s3_endpoint, + "--bucket", + s3_bucket, + "--access-key", + s3_access_key, + "--secret-key", + s3_secret_key, + "--item", + "src=/Root,dst=.", + "--format", + "proto-json-base64" + ] + + result_export = self._execute_command_and_get_result(export_command) + + export_id = result_export["id"] + status_export = result_export["status"] + progress_export = result_export["metadata"]["progress"] + + assert status_export == "SUCCESS" + assert progress_export in ["PROGRESS_PREPARING", "PROGRESS_DONE"] + + operation_get_command = [ + yatest.common.binary_path(os.getenv("YDB_CLI_BINARY")), + "--endpoint", + "grpc://localhost:%d" % self.cluster.nodes[1].grpc_port, + "--database=/Root", + "operation", + "get", + "%s" % export_id, + "--format", + "proto-json-base64" + ] + + while progress_export != "PROGRESS_DONE": + result_get = self._execute_command_and_get_result(operation_get_command) + progress_export = result_get["metadata"]["progress"] + + s3_resource = boto3.resource("s3", endpoint_url=s3_endpoint, aws_access_key_id=s3_access_key, aws_secret_access_key=s3_secret_key) + + keys_expected = set() + for table_num in range(1, 6): + table_name = f"sample_table_{table_num}" + keys_expected.add(table_name + "/data_00.csv") + keys_expected.add(table_name + "/metadata.json") + keys_expected.add(table_name + "/scheme.pb") + + bucket = s3_resource.Bucket(s3_bucket) + keys = set() + for x in list(bucket.objects.all()): + keys.add(x.key) + + assert keys_expected <= keys diff --git a/ydb/tests/functional/compatibility/ya.make b/ydb/tests/functional/compatibility/ya.make index 41f809b3e175..40873e73dc98 100644 --- a/ydb/tests/functional/compatibility/ya.make +++ b/ydb/tests/functional/compatibility/ya.make @@ -11,6 +11,7 @@ TEST_SRCS( SIZE(LARGE) REQUIREMENTS(cpu:all) INCLUDE(${ARCADIA_ROOT}/ydb/tests/large.inc) +INCLUDE(${ARCADIA_ROOT}/ydb/tests/tools/s3_recipe/recipe.inc) DEPENDS( ydb/apps/ydb @@ -19,6 +20,7 @@ DEPENDS( ) PEERDIR( + contrib/python/boto3 ydb/tests/library ydb/tests/stress/simple_queue/workload )