Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/common/exception/src/exception_code.rs
Original file line number Diff line number Diff line change
Expand Up @@ -515,6 +515,8 @@ build_exceptions! {
UndropDbHasNoHistory(2312),
/// Undrop table with no drop time
UndropTableWithNoDropTime(2313),
/// Undrop table blocked by vacuum retention guard
UndropTableRetentionGuard(2326),
/// Drop table with drop time
DropTableWithDropTime(2314),
/// Drop database with drop time
Expand Down
47 changes: 47 additions & 0 deletions src/meta/api/src/garbage_collection_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::collections::HashSet;
use std::convert::Infallible;
use std::ops::Range;

use chrono::DateTime;
Expand All @@ -27,6 +28,7 @@ use databend_common_meta_app::principal::TenantOwnershipObjectIdent;
use databend_common_meta_app::schema::index_id_ident::IndexIdIdent;
use databend_common_meta_app::schema::index_id_to_name_ident::IndexIdToNameIdent;
use databend_common_meta_app::schema::table_niv::TableNIV;
use databend_common_meta_app::schema::vacuum_watermark_ident::VacuumWatermarkIdent;
use databend_common_meta_app::schema::AutoIncrementStorageIdent;
use databend_common_meta_app::schema::DBIdTableName;
use databend_common_meta_app::schema::DatabaseId;
Expand All @@ -40,13 +42,15 @@ use databend_common_meta_app::schema::TableCopiedFileNameIdent;
use databend_common_meta_app::schema::TableId;
use databend_common_meta_app::schema::TableIdHistoryIdent;
use databend_common_meta_app::schema::TableIdToName;
use databend_common_meta_app::schema::VacuumWatermark;
use databend_common_meta_app::tenant::Tenant;
use databend_common_meta_kvapi::kvapi;
use databend_common_meta_kvapi::kvapi::DirName;
use databend_common_meta_kvapi::kvapi::Key;
use databend_common_meta_types::txn_op::Request;
use databend_common_meta_types::txn_op_response::Response;
use databend_common_meta_types::MetaError;
use databend_common_meta_types::SeqV;
use databend_common_meta_types::TxnRequest;
use display_more::DisplaySliceExt;
use fastrace::func_name;
Expand All @@ -60,6 +64,7 @@ use log::warn;
use crate::index_api::IndexApi;
use crate::kv_app_error::KVAppError;
use crate::kv_pb_api::KVPbApi;
use crate::kv_pb_crud_api::KVPbCrudApi;
use crate::txn_backoff::txn_backoff;
use crate::txn_condition_util::txn_cond_eq_seq;
use crate::txn_core_util::send_txn;
Expand Down Expand Up @@ -101,6 +106,48 @@ where
}
Ok(num_meta_key_removed)
}

/// Fetch and conditionally set the vacuum retention timestamp.
///
/// This method implements the monotonic timestamp update semantics:
/// - Only updates the timestamp if the new value is greater than the current one
/// - Returns the OLD timestamp value
/// - Ensures atomicity using compare-and-swap operations
#[fastrace::trace]
async fn fetch_set_vacuum_timestamp(
&self,
tenant: &Tenant,
new_timestamp: DateTime<Utc>,
) -> Result<Option<VacuumWatermark>, KVAppError> {
let ident = VacuumWatermarkIdent::new_global(tenant.clone());

// Use crud_upsert_with for atomic compare-and-swap semantics
let transition = self
.crud_upsert_with::<Infallible>(&ident, |current: Option<SeqV<VacuumWatermark>>| {
let current_retention: Option<VacuumWatermark> = current.map(|v| v.data);

// Check if we should update based on monotonic property
let should_update = match current_retention {
None => true, // Never set before, always update
Some(existing) => new_timestamp > existing.time, // Only update if new timestamp is greater
};

if should_update {
let new_retention = VacuumWatermark::new(new_timestamp);
Ok(Some(new_retention))
} else {
// Return None to indicate no update needed
Ok(None)
}
})
.await?
// Safe to unwrap: type of business logic error is `Infallible`
.unwrap();

// Extract the old value to return
let old_retention = transition.prev.map(|v| v.data);
Ok(old_retention)
}
}

#[async_trait::async_trait]
Expand Down
35 changes: 35 additions & 0 deletions src/meta/api/src/schema_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use databend_common_meta_app::app_error::AppError;
use databend_common_meta_app::app_error::DropTableWithDropTime;
use databend_common_meta_app::app_error::UndropTableAlreadyExists;
use databend_common_meta_app::app_error::UndropTableHasNoHistory;
use databend_common_meta_app::app_error::UndropTableRetentionGuard;
use databend_common_meta_app::app_error::UnknownTable;
use databend_common_meta_app::app_error::UnknownTableId;
use databend_common_meta_app::principal::OwnershipObject;
Expand All @@ -29,6 +30,7 @@ use databend_common_meta_app::schema::marked_deleted_index_id::MarkedDeletedInde
use databend_common_meta_app::schema::marked_deleted_index_ident::MarkedDeletedIndexIdIdent;
use databend_common_meta_app::schema::marked_deleted_table_index_id::MarkedDeletedTableIndexId;
use databend_common_meta_app::schema::marked_deleted_table_index_ident::MarkedDeletedTableIndexIdIdent;
use databend_common_meta_app::schema::vacuum_watermark_ident::VacuumWatermarkIdent;
use databend_common_meta_app::schema::DBIdTableName;
use databend_common_meta_app::schema::DatabaseId;
use databend_common_meta_app::schema::DatabaseMeta;
Expand Down Expand Up @@ -459,10 +461,39 @@ pub async fn handle_undrop_table(
"undrop table"
);

// Check vacuum retention guard before allowing undrop
let drop_marker = *seq_table_meta
.data
.drop_on
.as_ref()
.unwrap_or(&seq_table_meta.data.updated_on);

// Read vacuum timestamp with seq for concurrent safety
let vacuum_ident = VacuumWatermarkIdent::new_global(tenant_dbname_tbname.tenant().clone());
let seq_vacuum_retention = kv_api.get_pb(&vacuum_ident).await?;

// Early retention guard check for fast failure
if let Some(ref sr) = seq_vacuum_retention {
let retention_time = sr.data.time;

if drop_marker <= retention_time {
return Err(KVAppError::AppError(AppError::UndropTableRetentionGuard(
UndropTableRetentionGuard::new(
&tenant_dbname_tbname.table_name,
drop_marker,
retention_time,
),
)));
}
}

{
// reset drop on time
seq_table_meta.drop_on = None;

// Prepare conditions for concurrent safety
let vacuum_seq = seq_vacuum_retention.as_ref().map(|sr| sr.seq).unwrap_or(0);

let txn = TxnRequest::new(
vec![
// db has not to change, i.e., no new table is created.
Expand All @@ -472,6 +503,10 @@ pub async fn handle_undrop_table(
txn_cond_eq_seq(&dbid_tbname, dbid_tbname_seq),
// table is not changed
txn_cond_eq_seq(&tbid, seq_table_meta.seq),
// Concurrent safety: vacuum timestamp seq must not change during undrop
// - If vacuum_retention exists: seq must remain the same (no update by vacuum)
// - If vacuum_retention is None: seq must remain 0 (no creation by vacuum)
txn_cond_eq_seq(&vacuum_ident, vacuum_seq),
],
vec![
// Changing a table in a db has to update the seq of db_meta,
Expand Down
84 changes: 84 additions & 0 deletions src/meta/api/src/schema_api_test_suite.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ use databend_common_meta_app::schema::index_id_to_name_ident::IndexIdToNameIdent
use databend_common_meta_app::schema::least_visible_time_ident::LeastVisibleTimeIdent;
use databend_common_meta_app::schema::sequence_storage::SequenceStorageIdent;
use databend_common_meta_app::schema::table_niv::TableNIV;
use databend_common_meta_app::schema::vacuum_watermark_ident::VacuumWatermarkIdent;
use databend_common_meta_app::schema::CatalogMeta;
use databend_common_meta_app::schema::CatalogNameIdent;
use databend_common_meta_app::schema::CatalogOption;
Expand Down Expand Up @@ -351,6 +352,7 @@ impl SchemaApiTestSuite {
suite.gc_dropped_db_after_undrop(&b.build().await).await?;
suite.catalog_create_get_list_drop(&b.build().await).await?;
suite.table_least_visible_time(&b.build().await).await?;
suite.vacuum_retention_timestamp(&b.build().await).await?;
suite
.drop_table_without_tableid_to_name(&b.build().await)
.await?;
Expand Down Expand Up @@ -1485,6 +1487,88 @@ impl SchemaApiTestSuite {
Ok(())
}

#[fastrace::trace]
async fn vacuum_retention_timestamp<MT: SchemaApi + kvapi::KVApi<Error = MetaError>>(
&self,
mt: &MT,
) -> anyhow::Result<()> {
let tenant_name = "vacuum_retention_timestamp";
let tenant = Tenant::new_or_err(tenant_name, func_name!())?;

// Test basic timestamp operations - monotonic property
let first = DateTime::<Utc>::from_timestamp(1_000, 0).unwrap();
let earlier = DateTime::<Utc>::from_timestamp(500, 0).unwrap();
let later = DateTime::<Utc>::from_timestamp(2_000, 0).unwrap();

// Test fetch_set_vacuum_timestamp with correct return semantics
let old_retention = mt.fetch_set_vacuum_timestamp(&tenant, first).await?;
// Should return None as old value since never set before
assert_eq!(old_retention, None);

// Attempt to set earlier timestamp should return current value (first) unchanged
let old_retention = mt.fetch_set_vacuum_timestamp(&tenant, earlier).await?;
assert_eq!(old_retention.unwrap().time, first); // Should return the PREVIOUS value

// Set later timestamp should work and return previous value (first)
let old_retention = mt.fetch_set_vacuum_timestamp(&tenant, later).await?;
assert_eq!(old_retention.unwrap().time, first); // Should return PREVIOUS value (first)

// Verify current stored value
let vacuum_ident = VacuumWatermarkIdent::new_global(tenant.clone());
let stored = mt.get_pb(&vacuum_ident).await?;
assert_eq!(stored.unwrap().data.time, later);

// Test undrop retention guard behavior
{
let mut util = Util::new(
mt,
tenant_name,
"db_retention_guard",
"tbl_retention_guard",
"FUSE",
);
util.create_db().await?;
let (table_id, _table_meta) = util.create_table().await?;
util.drop_table_by_id().await?;

let table_meta = mt
.get_pb(&TableId::new(table_id))
.await?
.expect("dropped table meta must exist");
let drop_time = table_meta
.data
.drop_on
.expect("dropped table should carry drop_on timestamp");

// Set retention timestamp after drop time to block undrop
let retention_candidate = drop_time + chrono::Duration::seconds(1);
let old_retention = mt
.fetch_set_vacuum_timestamp(&tenant, retention_candidate)
.await?
.unwrap();
// Should return the previous retention time (later)
assert_eq!(old_retention.time, later);

// Undrop should now fail due to retention guard
let undrop_err = mt
.undrop_table(UndropTableReq {
name_ident: TableNameIdent::new(&tenant, util.db_name(), util.tbl_name()),
})
.await
.expect_err("undrop must fail once vacuum retention blocks it");

match undrop_err {
KVAppError::AppError(AppError::UndropTableRetentionGuard(e)) => {
assert_eq!(e.drop_time(), drop_time);
assert_eq!(e.retention(), retention_candidate);
}
other => panic!("unexpected undrop error: {other:?}"),
}
}

Ok(())
}

#[fastrace::trace]
async fn table_create_get_drop<MT: SchemaApi + kvapi::KVApi<Error = MetaError>>(
&self,
Expand Down
43 changes: 43 additions & 0 deletions src/meta/app/src/app_error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

use std::fmt::Display;

use chrono::DateTime;
use chrono::Utc;
use databend_common_exception::ErrorCode;
use databend_common_meta_types::MatchSeq;

Expand Down Expand Up @@ -261,6 +263,36 @@ impl UndropTableHasNoHistory {
}
}

#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[error("Cannot undrop table '{table_name}': table was dropped at {drop_time} before vacuum started at {retention}. Data may have been cleaned up.")]
pub struct UndropTableRetentionGuard {
table_name: String,
drop_time: DateTime<Utc>,
retention: DateTime<Utc>,
}

impl UndropTableRetentionGuard {
pub fn new(
table_name: impl Into<String>,
drop_time: DateTime<Utc>,
retention: DateTime<Utc>,
) -> Self {
Self {
table_name: table_name.into(),
drop_time,
retention,
}
}

pub fn drop_time(&self) -> DateTime<Utc> {
self.drop_time
}

pub fn retention(&self) -> DateTime<Utc> {
self.retention
}
}

#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)]
#[error("TableVersionMismatched: {table_id} expect `{expect}` but `{curr}` while `{context}`")]
pub struct TableVersionMismatched {
Expand Down Expand Up @@ -1008,6 +1040,9 @@ pub enum AppError {
#[error(transparent)]
UndropTableHasNoHistory(#[from] UndropTableHasNoHistory),

#[error(transparent)]
UndropTableRetentionGuard(#[from] UndropTableRetentionGuard),

#[error(transparent)]
DatabaseAlreadyExists(#[from] DatabaseAlreadyExists),

Expand Down Expand Up @@ -1461,6 +1496,11 @@ impl AppErrorMessage for UndropTableWithNoDropTime {
}
}

impl AppErrorMessage for UndropTableRetentionGuard {
// Use default implementation that calls self.to_string()
// since there's no sensitive information to strip
}

impl AppErrorMessage for DropTableWithDropTime {
fn message(&self) -> String {
format!("Drop table '{}' with drop_on time", self.table_name)
Expand Down Expand Up @@ -1590,6 +1630,9 @@ impl From<AppError> for ErrorCode {
AppError::UndropTableWithNoDropTime(err) => {
ErrorCode::UndropTableWithNoDropTime(err.message())
}
AppError::UndropTableRetentionGuard(err) => {
ErrorCode::UndropTableRetentionGuard(err.message())
}
AppError::DropTableWithDropTime(err) => ErrorCode::DropTableWithDropTime(err.message()),
AppError::DropDbWithDropTime(err) => ErrorCode::DropDbWithDropTime(err.message()),
AppError::UndropDbWithNoDropTime(err) => {
Expand Down
4 changes: 4 additions & 0 deletions src/meta/app/src/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ pub mod sequence_storage;
mod table;
pub mod table_lock_ident;
pub mod table_niv;
mod vacuum_watermark;
pub mod vacuum_watermark_ident;

pub use auto_increment::GetAutoIncrementNextValueReply;
pub use auto_increment::GetAutoIncrementNextValueReq;
Expand Down Expand Up @@ -154,3 +156,5 @@ pub use table::UpsertTableCopiedFileReq;
pub use table::UpsertTableOptionReply;
pub use table::UpsertTableOptionReq;
pub use table_lock_ident::TableLockIdent;
pub use vacuum_watermark::VacuumWatermark;
pub use vacuum_watermark_ident::VacuumWatermarkIdent;
28 changes: 28 additions & 0 deletions src/meta/app/src/schema/vacuum_watermark.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use chrono::DateTime;
use chrono::Utc;

/// Monotonic timestamp marker indicating when vacuum cleanup started for a tenant.
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct VacuumWatermark {
pub time: DateTime<Utc>,
}

impl VacuumWatermark {
pub fn new(time: DateTime<Utc>) -> Self {
Self { time }
}
}
Loading
Loading