Skip to content

25-1: Auto-remove temporary export tables #17734

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 8 commits into
base: stable-25-1
Choose a base branch
from
3 changes: 2 additions & 1 deletion ydb/core/tx/schemeshard/schemeshard_export.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ void TSchemeShard::FromXxportInfo(NKikimrExport::TExport& exprt, const TExportIn
case TExportInfo::EState::CopyTables:
exprt.SetProgress(Ydb::Export::ExportProgress::PROGRESS_PREPARING);
break;


case TExportInfo::EState::AutoDropping:
case TExportInfo::EState::Transferring:
case TExportInfo::EState::Done:
for (ui32 itemIdx : xrange(exportInfo->Items.size())) {
Expand Down
28 changes: 24 additions & 4 deletions ydb/core/tx/schemeshard/schemeshard_export__create.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,18 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
Send(Self->TxAllocatorClient, new TEvTxAllocatorClient::TEvAllocate(), 0, exportInfo->Id);
}

void PrepareAutoDropping(TSchemeShard* ss, TExportInfo::TPtr exportInfo, NIceDb::TNiceDb& db) {
bool isContinued = false;
PrepareDropping(ss, exportInfo, db, TExportInfo::EState::AutoDropping, [&](ui64 itemIdx) {
exportInfo->PendingDropItems.push_back(itemIdx);
isContinued = true;
AllocateTxId(exportInfo, itemIdx);
});
if (!isContinued) {
AllocateTxId(exportInfo);
}
}

void SubscribeTx(TTxId txId) {
Send(Self->SelfId(), new TEvSchemeShard::TEvNotifyTxCompletion(ui64(txId)));
}
Expand Down Expand Up @@ -617,7 +629,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
const auto& item = exportInfo->Items.at(itemIdx);

if (item.WaitTxId == InvalidTxId) {
if (item.SourcePathType == NKikimrSchemeOp::EPathTypeTable) {
if (item.SourcePathType == NKikimrSchemeOp::EPathTypeTable && item.State <= EState::Transferring) {
pendingTables.emplace_back(itemIdx);
} else {
UploadScheme(exportInfo, itemIdx, ctx);
Expand Down Expand Up @@ -665,6 +677,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
SendNotificationsIfFinished(exportInfo);
break;

case EState::AutoDropping:
case EState::Dropping:
if (!exportInfo->AllItemsAreDropped()) {
for (ui32 itemIdx : xrange(exportInfo->Items.size())) {
Expand Down Expand Up @@ -749,6 +762,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
}
break;

case EState::AutoDropping:
case EState::Dropping:
if (exportInfo->PendingDropItems) {
itemIdx = PopFront(exportInfo->PendingDropItems);
Expand Down Expand Up @@ -821,6 +835,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
}
break;

case EState::AutoDropping:
case EState::Dropping:
if (isMultipleMods || isNotExist) {
if (record.GetPathDropTxId()) {
Expand Down Expand Up @@ -925,6 +940,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
Self->PersistExportItemState(db, exportInfo, itemIdx);
break;

case EState::AutoDropping:
case EState::Dropping:
if (!exportInfo->AllItemsAreDropped()) {
Y_ABORT_UNLESS(itemIdx < exportInfo->Items.size());
Expand Down Expand Up @@ -1011,7 +1027,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
Self->PersistExportItemState(db, exportInfo, itemIdx);

if (AllOf(exportInfo->Items, &TExportInfo::TItem::IsDone)) {
EndExport(exportInfo, EState::Done, db);
PrepareAutoDropping(Self, exportInfo, db);
}
} else if (exportInfo->State == EState::Cancellation) {
item.State = EState::Cancelled;
Expand Down Expand Up @@ -1140,14 +1156,14 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
}
}
if (!itemHasIssues && AllOf(exportInfo->Items, &TExportInfo::TItem::IsDone)) {
exportInfo->State = EState::Done;
exportInfo->EndTime = TAppData::TimeProvider->Now();
PrepareAutoDropping(Self, exportInfo, db);
}

Self->PersistExportItemState(db, exportInfo, itemIdx);
break;
}

case EState::AutoDropping:
case EState::Dropping:
if (!exportInfo->AllItemsAreDropped()) {
Y_ABORT_UNLESS(itemIdx < exportInfo->Items.size());
Expand All @@ -1163,6 +1179,10 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
} else {
SendNotificationsIfFinished(exportInfo, true); // for tests

if (exportInfo->State == EState::AutoDropping) {
return EndExport(exportInfo, EState::Done, db);
}

if (exportInfo->Uid) {
Self->ExportsByUid.erase(exportInfo->Uid);
}
Expand Down
19 changes: 2 additions & 17 deletions ydb/core/tx/schemeshard/schemeshard_export__forget.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,23 +77,8 @@ struct TSchemeShard::TExport::TTxForget: public TSchemeShard::TXxport::TTxBase {
LOG_D("TExport::TTxForget, dropping export tables"
<< ", info: " << exportInfo->ToString()
);
exportInfo->WaitTxId = InvalidTxId;
exportInfo->State = TExportInfo::EState::Dropping;
Self->PersistExportState(db, exportInfo);

for (ui32 itemIdx : xrange(exportInfo->Items.size())) {
auto& item = exportInfo->Items.at(itemIdx);

item.WaitTxId = InvalidTxId;
item.State = TExportInfo::EState::Dropped;

const TPath itemPath = TPath::Resolve(ExportItemPathName(Self, exportInfo, itemIdx), Self);
if (itemPath.IsResolved() && !itemPath.IsDeleted()) {
item.State = TExportInfo::EState::Dropping;
}

Self->PersistExportItemState(db, exportInfo, itemIdx);
}

PrepareDropping(Self, exportInfo, db);

Progress = true;
}
Expand Down
34 changes: 34 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -319,5 +319,39 @@ TString ExportItemPathName(const TString& exportPathName, ui32 itemIdx) {
return TStringBuilder() << exportPathName << "/" << itemIdx;
}

void PrepareDropping(
TSchemeShard* ss,
TExportInfo::TPtr exportInfo,
NIceDb::TNiceDb& db,
TExportInfo::EState droppingState,
std::function<void(ui64)> func)
{
Y_ABORT_UNLESS(IsIn({TExportInfo::EState::AutoDropping, TExportInfo::EState::Dropping}, droppingState));

exportInfo->WaitTxId = InvalidTxId;
exportInfo->State = droppingState;
ss->PersistExportState(db, exportInfo);

for (ui32 itemIdx : xrange(exportInfo->Items.size())) {
auto& item = exportInfo->Items.at(itemIdx);

item.WaitTxId = InvalidTxId;
item.State = TExportInfo::EState::Dropped;
const TPath itemPath = TPath::Resolve(ExportItemPathName(ss, exportInfo, itemIdx), ss);
if (itemPath.IsResolved() && !itemPath.IsDeleted()) {
item.State = TExportInfo::EState::Dropping;
if (exportInfo->State == TExportInfo::EState::AutoDropping) {
func(itemIdx);
}
}

ss->PersistExportItemState(db, exportInfo, itemIdx);
}
}

void PrepareDropping(TSchemeShard* ss, TExportInfo::TPtr exportInfo, NIceDb::TNiceDb& db) {
PrepareDropping(ss, exportInfo, db, TExportInfo::EState::Dropping, [](ui64){});
}

} // NSchemeShard
} // NKikimr
4 changes: 4 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,5 +47,9 @@ THolder<TEvSchemeShard::TEvCancelTx> CancelPropose(
TString ExportItemPathName(TSchemeShard* ss, const TExportInfo::TPtr exportInfo, ui32 itemIdx);
TString ExportItemPathName(const TString& exportPathName, ui32 itemIdx);

void PrepareDropping(TSchemeShard* ss, TExportInfo::TPtr exportInfo, NIceDb::TNiceDb& db,
TExportInfo::EState droppingState, std::function<void(ui64)> func);
void PrepareDropping(TSchemeShard* ss, TExportInfo::TPtr exportInfo, NIceDb::TNiceDb& db);

} // NSchemeShard
} // NKikimr
7 changes: 6 additions & 1 deletion ydb/core/tx/schemeshard/schemeshard_info_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -2668,6 +2668,7 @@ struct TExportInfo: public TSimpleRefCount<TExportInfo> {
Done = 240,
Dropping = 241,
Dropped = 242,
AutoDropping = 243,
Cancellation = 250,
Cancelled = 251,
};
Expand Down Expand Up @@ -2782,12 +2783,16 @@ struct TExportInfo: public TSimpleRefCount<TExportInfo> {
return State == EState::Dropping;
}

bool IsAutoDropping() const {
return State == EState::AutoDropping;
}

bool IsCancelling() const {
return State == EState::Cancellation;
}

bool IsInProgress() const {
return IsPreparing() || IsWorking() || IsDropping() || IsCancelling();
return IsPreparing() || IsWorking() || IsDropping() || IsAutoDropping() || IsCancelling();
}

bool IsDone() const {
Expand Down
95 changes: 86 additions & 9 deletions ydb/core/tx/schemeshard/ut_export/ut_export.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1179,10 +1179,23 @@ partitioning_settings {
}
)", port));
const ui64 exportId = txId;
::NKikimrSubDomains::TDiskSpaceUsage afterExport;

TTestActorRuntime::TEventObserver prevObserverFunc;
prevObserverFunc = runtime.SetObserverFunc([&](TAutoPtr<IEventHandle>& event) {
if (auto* p = event->CastAsLocal<TEvSchemeShard::TEvModifySchemeTransaction>()) {
auto& record = p->Record;
if (record.TransactionSize() >= 1 &&
record.GetTransaction(0).GetOperationType() == NKikimrSchemeOp::ESchemeOpDropTable) {
afterExport = waitForStats(2);
}
}
return prevObserverFunc(event);
});

env.TestWaitNotification(runtime, exportId);

TestGetExport(runtime, exportId, "/MyRoot", Ydb::StatusIds::SUCCESS);
const auto afterExport = waitForStats(2);
UNIT_ASSERT_STRINGS_EQUAL(expected.DebugString(), afterExport.DebugString());

TestForgetExport(runtime, ++txId, "/MyRoot", exportId);
Expand All @@ -1198,13 +1211,23 @@ partitioning_settings {
TTestEnv env(runtime);
ui64 txId = 100;

TBlockEvents<NKikimr::NWrappers::NExternalStorage::TEvPutObjectRequest> blockPartition01(runtime, [](auto&& ev) {
return ev->Get()->Request.GetKey() == "/data_01.csv";
});

TestCreateTable(runtime, ++txId, "/MyRoot", R"(
Name: "Table"
Columns { Name: "key" Type: "Utf8" }
Columns { Name: "key" Type: "Uint32" }
Columns { Name: "value" Type: "Utf8" }
KeyColumnNames: ["key"]
SplitBoundary { KeyPrefix { Tuple { Optional { Uint32: 10 } }}}
)");
env.TestWaitNotification(runtime, txId);

WriteRow(runtime, ++txId, "/MyRoot/Table", 0, 1, "v1");
env.TestWaitNotification(runtime, txId);
WriteRow(runtime, ++txId, "/MyRoot/Table", 1, 100, "v100");
env.TestWaitNotification(runtime, txId);

TPortManager portManager;
const ui16 port = portManager.GetPort();
Expand All @@ -1222,17 +1245,34 @@ partitioning_settings {
}
}
)", port));

runtime.WaitFor("put object request from 01 partition", [&]{ return blockPartition01.size() >= 1; });
bool isCompleted = false;

while (!isCompleted) {
const auto desc = TestGetExport(runtime, txId, "/MyRoot");
const auto entry = desc.GetResponse().GetEntry();
const auto& item = entry.GetItemsProgress(0);

if (item.parts_completed() > 0) {
isCompleted = true;
UNIT_ASSERT_VALUES_EQUAL(item.parts_total(), 2);
UNIT_ASSERT_VALUES_EQUAL(item.parts_completed(), 1);
UNIT_ASSERT(item.has_start_time());
} else {
runtime.SimulateSleep(TDuration::Seconds(1));
}
}

blockPartition01.Stop();
blockPartition01.Unblock();

env.TestWaitNotification(runtime, txId);

const auto desc = TestGetExport(runtime, txId, "/MyRoot");
const auto& entry = desc.GetResponse().GetEntry();
UNIT_ASSERT_VALUES_EQUAL(entry.ItemsProgressSize(), 1);
const auto entry = desc.GetResponse().GetEntry();

const auto& item = entry.GetItemsProgress(0);
UNIT_ASSERT_VALUES_EQUAL(item.parts_total(), 1);
UNIT_ASSERT_VALUES_EQUAL(item.parts_completed(), 1);
UNIT_ASSERT(item.has_start_time());
UNIT_ASSERT(item.has_end_time());
UNIT_ASSERT_VALUES_EQUAL(entry.ItemsProgressSize(), 1);
}

Y_UNIT_TEST(ShouldRestartOnScanErrors) {
Expand Down Expand Up @@ -2645,4 +2685,41 @@ attributes {

gen.Check();
}

Y_UNIT_TEST(AutoDropping) {
TTestBasicRuntime runtime;

TPortManager portManager;
const ui16 port = portManager.GetPort();

TS3Mock s3Mock({}, TS3Mock::TSettings(port));
UNIT_ASSERT(s3Mock.Start());


auto request = Sprintf(R"(
ExportToS3Settings {
endpoint: "localhost:%d"
scheme: HTTP
items {
source_path: "/MyRoot/Table"
destination_prefix: ""
}
}
)", port);

TTestEnv env(runtime);

Run(runtime, env, TVector<TString>{
R"(
Name: "Table"
Columns { Name: "key" Type: "Utf8" }
Columns { Name: "value" Type: "Utf8" }
KeyColumnNames: ["key"]
)",
}, request, Ydb::StatusIds::SUCCESS, "/MyRoot");

auto desc = DescribePath(runtime, "/MyRoot");
UNIT_ASSERT_EQUAL(desc.GetPathDescription().ChildrenSize(), 1);
UNIT_ASSERT_EQUAL(desc.GetPathDescription().GetChildren(0).GetName(), "Table");
}
}
Loading
Loading