Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/iceberg/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ impl TableUpdate {
pub fn apply(self, builder: TableMetadataBuilder) -> Result<TableMetadataBuilder> {
match self {
TableUpdate::AssignUuid { uuid } => Ok(builder.assign_uuid(uuid)),
TableUpdate::AddSchema { schema, .. } => Ok(builder.add_schema(schema)),
TableUpdate::AddSchema { schema, .. } => Ok(builder.add_schema(schema)?),
TableUpdate::SetCurrentSchema { schema_id } => builder.set_current_schema(schema_id),
TableUpdate::AddSpec { spec } => builder.add_partition_spec(spec),
TableUpdate::SetDefaultSpec { spec_id } => builder.set_default_partition_spec(spec_id),
Expand Down
229 changes: 229 additions & 0 deletions crates/iceberg/src/spec/table_metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,22 @@ impl TableMetadata {
TableMetadataBuilder::new_from_metadata(self, current_file_location)
}

/// Check if a partition field name exists in any partition spec.
#[inline]
pub(crate) fn partition_name_exists(&self, name: &str) -> bool {
self.partition_specs
.values()
.any(|spec| spec.fields().iter().any(|pf| pf.name == name))
}

/// Check if a field name exists in any schema.
#[inline]
pub(crate) fn name_exists_in_any_schema(&self, name: &str) -> bool {
self.schemas
.values()
.any(|schema| schema.field_by_name(name).is_some())
}

/// Returns format version of this metadata.
#[inline]
pub fn format_version(&self) -> FormatVersion {
Expand Down Expand Up @@ -3133,4 +3149,217 @@ mod tests {
// Verify it returns an error
assert!(result.is_err());
}

#[test]
fn test_partition_name_exists() {
let schema = Schema::builder()
.with_fields(vec![
NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
NestedField::required(2, "partition_col", Type::Primitive(PrimitiveType::Int))
.into(),
])
.build()
.unwrap();

let spec1 = PartitionSpec::builder(schema.clone())
.with_spec_id(1)
.add_partition_field("data", "data_partition", Transform::Identity)
.unwrap()
.build()
.unwrap();

let spec2 = PartitionSpec::builder(schema.clone())
.with_spec_id(2)
.add_partition_field("partition_col", "partition_bucket", Transform::Bucket(16))
.unwrap()
.build()
.unwrap();

// Build metadata with these specs
let metadata = TableMetadataBuilder::new(
schema,
spec1.clone().into_unbound(),
SortOrder::unsorted_order(),
"s3://test/location".to_string(),
FormatVersion::V2,
HashMap::new(),
)
.unwrap()
.add_partition_spec(spec2.into_unbound())
.unwrap()
.build()
.unwrap()
.metadata;

assert!(metadata.partition_name_exists("data_partition"));
assert!(metadata.partition_name_exists("partition_bucket"));

assert!(!metadata.partition_name_exists("nonexistent_field"));
assert!(!metadata.partition_name_exists("data")); // schema field name, not partition field name
assert!(!metadata.partition_name_exists(""));
}

#[test]
fn test_partition_name_exists_empty_specs() {
// Create metadata with no partition specs (unpartitioned table)
let schema = Schema::builder()
.with_fields(vec![
NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(),
])
.build()
.unwrap();

let metadata = TableMetadataBuilder::new(
schema,
PartitionSpec::unpartition_spec().into_unbound(),
SortOrder::unsorted_order(),
"s3://test/location".to_string(),
FormatVersion::V2,
HashMap::new(),
)
.unwrap()
.build()
.unwrap()
.metadata;

assert!(!metadata.partition_name_exists("any_field"));
assert!(!metadata.partition_name_exists("data"));
}

#[test]
fn test_name_exists_in_any_schema() {
// Create multiple schemas with different fields
let schema1 = Schema::builder()
.with_schema_id(1)
.with_fields(vec![
NestedField::required(1, "field1", Type::Primitive(PrimitiveType::String)).into(),
NestedField::required(2, "field2", Type::Primitive(PrimitiveType::Int)).into(),
])
.build()
.unwrap();

let schema2 = Schema::builder()
.with_schema_id(2)
.with_fields(vec![
NestedField::required(1, "field1", Type::Primitive(PrimitiveType::String)).into(),
NestedField::required(3, "field3", Type::Primitive(PrimitiveType::Long)).into(),
])
.build()
.unwrap();

let metadata = TableMetadataBuilder::new(
schema1,
PartitionSpec::unpartition_spec().into_unbound(),
SortOrder::unsorted_order(),
"s3://test/location".to_string(),
FormatVersion::V2,
HashMap::new(),
)
.unwrap()
.add_current_schema(schema2)
.unwrap()
.build()
.unwrap()
.metadata;

assert!(metadata.name_exists_in_any_schema("field1")); // exists in both schemas
assert!(metadata.name_exists_in_any_schema("field2")); // exists only in schema1 (historical)
assert!(metadata.name_exists_in_any_schema("field3")); // exists only in schema2 (current)

assert!(!metadata.name_exists_in_any_schema("nonexistent_field"));
assert!(!metadata.name_exists_in_any_schema("field4"));
assert!(!metadata.name_exists_in_any_schema(""));
}

#[test]
fn test_name_exists_in_any_schema_empty_schemas() {
let schema = Schema::builder().with_fields(vec![]).build().unwrap();

let metadata = TableMetadataBuilder::new(
schema,
PartitionSpec::unpartition_spec().into_unbound(),
SortOrder::unsorted_order(),
"s3://test/location".to_string(),
FormatVersion::V2,
HashMap::new(),
)
.unwrap()
.build()
.unwrap()
.metadata;

assert!(!metadata.name_exists_in_any_schema("any_field"));
}

#[test]
fn test_helper_methods_multi_version_scenario() {
// Test a realistic multi-version scenario
let initial_schema = Schema::builder()
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
NestedField::required(
3,
"deprecated_field",
Type::Primitive(PrimitiveType::String),
)
.into(),
])
.build()
.unwrap();

let metadata = TableMetadataBuilder::new(
initial_schema,
PartitionSpec::unpartition_spec().into_unbound(),
SortOrder::unsorted_order(),
"s3://test/location".to_string(),
FormatVersion::V2,
HashMap::new(),
)
.unwrap();

let evolved_schema = Schema::builder()
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
NestedField::required(
3,
"deprecated_field",
Type::Primitive(PrimitiveType::String),
)
.into(),
NestedField::required(4, "new_field", Type::Primitive(PrimitiveType::Double))
.into(),
])
.build()
.unwrap();

// Then add a third schema that removes the deprecated field
let _final_schema = Schema::builder()
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(),
NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
NestedField::required(4, "new_field", Type::Primitive(PrimitiveType::Double))
.into(),
NestedField::required(5, "latest_field", Type::Primitive(PrimitiveType::Boolean))
.into(),
])
.build()
.unwrap();

let final_metadata = metadata
.add_current_schema(evolved_schema)
.unwrap()
.build()
.unwrap()
.metadata;

assert!(!final_metadata.partition_name_exists("nonexistent_partition")); // unpartitioned table

assert!(final_metadata.name_exists_in_any_schema("id")); // exists in both schemas
assert!(final_metadata.name_exists_in_any_schema("name")); // exists in both schemas
assert!(final_metadata.name_exists_in_any_schema("deprecated_field")); // exists in both schemas
assert!(final_metadata.name_exists_in_any_schema("new_field")); // only in current schema
assert!(!final_metadata.name_exists_in_any_schema("never_existed"));
}
}
Loading
Loading