diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index 26d65d1c8..f9e17808a 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -561,7 +561,7 @@ impl TableUpdate { pub fn apply(self, builder: TableMetadataBuilder) -> Result { match self { TableUpdate::AssignUuid { uuid } => Ok(builder.assign_uuid(uuid)), - TableUpdate::AddSchema { schema, .. } => Ok(builder.add_schema(schema)), + TableUpdate::AddSchema { schema, .. } => Ok(builder.add_schema(schema)?), TableUpdate::SetCurrentSchema { schema_id } => builder.set_current_schema(schema_id), TableUpdate::AddSpec { spec } => builder.add_partition_spec(spec), TableUpdate::SetDefaultSpec { spec_id } => builder.set_default_partition_spec(spec_id), diff --git a/crates/iceberg/src/spec/table_metadata.rs b/crates/iceberg/src/spec/table_metadata.rs index 1f62afb31..d7347d50e 100644 --- a/crates/iceberg/src/spec/table_metadata.rs +++ b/crates/iceberg/src/spec/table_metadata.rs @@ -224,6 +224,22 @@ impl TableMetadata { TableMetadataBuilder::new_from_metadata(self, current_file_location) } + /// Check if a partition field name exists in any partition spec. + #[inline] + pub(crate) fn partition_name_exists(&self, name: &str) -> bool { + self.partition_specs + .values() + .any(|spec| spec.fields().iter().any(|pf| pf.name == name)) + } + + /// Check if a field name exists in any schema. + #[inline] + pub(crate) fn name_exists_in_any_schema(&self, name: &str) -> bool { + self.schemas + .values() + .any(|schema| schema.field_by_name(name).is_some()) + } + /// Returns format version of this metadata. #[inline] pub fn format_version(&self) -> FormatVersion { @@ -3133,4 +3149,217 @@ mod tests { // Verify it returns an error assert!(result.is_err()); } + + #[test] + fn test_partition_name_exists() { + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "partition_col", Type::Primitive(PrimitiveType::Int)) + .into(), + ]) + .build() + .unwrap(); + + let spec1 = PartitionSpec::builder(schema.clone()) + .with_spec_id(1) + .add_partition_field("data", "data_partition", Transform::Identity) + .unwrap() + .build() + .unwrap(); + + let spec2 = PartitionSpec::builder(schema.clone()) + .with_spec_id(2) + .add_partition_field("partition_col", "partition_bucket", Transform::Bucket(16)) + .unwrap() + .build() + .unwrap(); + + // Build metadata with these specs + let metadata = TableMetadataBuilder::new( + schema, + spec1.clone().into_unbound(), + SortOrder::unsorted_order(), + "s3://test/location".to_string(), + FormatVersion::V2, + HashMap::new(), + ) + .unwrap() + .add_partition_spec(spec2.into_unbound()) + .unwrap() + .build() + .unwrap() + .metadata; + + assert!(metadata.partition_name_exists("data_partition")); + assert!(metadata.partition_name_exists("partition_bucket")); + + assert!(!metadata.partition_name_exists("nonexistent_field")); + assert!(!metadata.partition_name_exists("data")); // schema field name, not partition field name + assert!(!metadata.partition_name_exists("")); + } + + #[test] + fn test_partition_name_exists_empty_specs() { + // Create metadata with no partition specs (unpartitioned table) + let schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(); + + let metadata = TableMetadataBuilder::new( + schema, + PartitionSpec::unpartition_spec().into_unbound(), + SortOrder::unsorted_order(), + "s3://test/location".to_string(), + FormatVersion::V2, + HashMap::new(), + ) + .unwrap() + .build() + .unwrap() + .metadata; + + assert!(!metadata.partition_name_exists("any_field")); + assert!(!metadata.partition_name_exists("data")); + } + + #[test] + fn test_name_exists_in_any_schema() { + // Create multiple schemas with different fields + let schema1 = Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "field1", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "field2", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(); + + let schema2 = Schema::builder() + .with_schema_id(2) + .with_fields(vec![ + NestedField::required(1, "field1", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(3, "field3", Type::Primitive(PrimitiveType::Long)).into(), + ]) + .build() + .unwrap(); + + let metadata = TableMetadataBuilder::new( + schema1, + PartitionSpec::unpartition_spec().into_unbound(), + SortOrder::unsorted_order(), + "s3://test/location".to_string(), + FormatVersion::V2, + HashMap::new(), + ) + .unwrap() + .add_current_schema(schema2) + .unwrap() + .build() + .unwrap() + .metadata; + + assert!(metadata.name_exists_in_any_schema("field1")); // exists in both schemas + assert!(metadata.name_exists_in_any_schema("field2")); // exists only in schema1 (historical) + assert!(metadata.name_exists_in_any_schema("field3")); // exists only in schema2 (current) + + assert!(!metadata.name_exists_in_any_schema("nonexistent_field")); + assert!(!metadata.name_exists_in_any_schema("field4")); + assert!(!metadata.name_exists_in_any_schema("")); + } + + #[test] + fn test_name_exists_in_any_schema_empty_schemas() { + let schema = Schema::builder().with_fields(vec![]).build().unwrap(); + + let metadata = TableMetadataBuilder::new( + schema, + PartitionSpec::unpartition_spec().into_unbound(), + SortOrder::unsorted_order(), + "s3://test/location".to_string(), + FormatVersion::V2, + HashMap::new(), + ) + .unwrap() + .build() + .unwrap() + .metadata; + + assert!(!metadata.name_exists_in_any_schema("any_field")); + } + + #[test] + fn test_helper_methods_multi_version_scenario() { + // Test a realistic multi-version scenario + let initial_schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required( + 3, + "deprecated_field", + Type::Primitive(PrimitiveType::String), + ) + .into(), + ]) + .build() + .unwrap(); + + let metadata = TableMetadataBuilder::new( + initial_schema, + PartitionSpec::unpartition_spec().into_unbound(), + SortOrder::unsorted_order(), + "s3://test/location".to_string(), + FormatVersion::V2, + HashMap::new(), + ) + .unwrap(); + + let evolved_schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required( + 3, + "deprecated_field", + Type::Primitive(PrimitiveType::String), + ) + .into(), + NestedField::required(4, "new_field", Type::Primitive(PrimitiveType::Double)) + .into(), + ]) + .build() + .unwrap(); + + // Then add a third schema that removes the deprecated field + let _final_schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Long)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(4, "new_field", Type::Primitive(PrimitiveType::Double)) + .into(), + NestedField::required(5, "latest_field", Type::Primitive(PrimitiveType::Boolean)) + .into(), + ]) + .build() + .unwrap(); + + let final_metadata = metadata + .add_current_schema(evolved_schema) + .unwrap() + .build() + .unwrap() + .metadata; + + assert!(!final_metadata.partition_name_exists("nonexistent_partition")); // unpartitioned table + + assert!(final_metadata.name_exists_in_any_schema("id")); // exists in both schemas + assert!(final_metadata.name_exists_in_any_schema("name")); // exists in both schemas + assert!(final_metadata.name_exists_in_any_schema("deprecated_field")); // exists in both schemas + assert!(final_metadata.name_exists_in_any_schema("new_field")); // only in current schema + assert!(!final_metadata.name_exists_in_any_schema("never_existed")); + } } diff --git a/crates/iceberg/src/spec/table_metadata_builder.rs b/crates/iceberg/src/spec/table_metadata_builder.rs index 1f3f89533..068f2002f 100644 --- a/crates/iceberg/src/spec/table_metadata_builder.rs +++ b/crates/iceberg/src/spec/table_metadata_builder.rs @@ -589,7 +589,10 @@ impl TableMetadataBuilder { /// /// Important: Use this method with caution. The builder does not check /// if the added schema is compatible with the current schema. - pub fn add_schema(mut self, schema: Schema) -> Self { + pub fn add_schema(mut self, schema: Schema) -> Result { + // Validate that new schema fields don't conflict with existing partition field names + self.validate_schema_field_names(&schema)?; + let new_schema_id = self.reuse_or_create_new_schema_id(&schema); let schema_found = self.metadata.schemas.contains_key(&new_schema_id); @@ -601,7 +604,7 @@ impl TableMetadataBuilder { self.last_added_schema_id = Some(new_schema_id); } - return self; + return Ok(self); } // New schemas might contain only old columns. In this case last_column_id should not be @@ -623,7 +626,7 @@ impl TableMetadataBuilder { self.last_added_schema_id = Some(new_schema_id); - self + Ok(self) } /// Set the current schema id. @@ -679,7 +682,97 @@ impl TableMetadataBuilder { /// Add a schema and set it as the current schema. pub fn add_current_schema(self, schema: Schema) -> Result { - self.add_schema(schema).set_current_schema(Self::LAST_ADDED) + self.add_schema(schema)? + .set_current_schema(Self::LAST_ADDED) + } + + /// Validate schema field names against partition field names across all historical schemas. + /// + /// Due to Iceberg's multi-version property, this check ignores existing schema fields + /// that match partition names (schema evolution allows re-adding previously removed fields). + /// Only NEW field names that conflict with partition names are rejected. + /// + /// # Errors + /// - Schema field name conflicts with partition field name but doesn't exist in any historical schema. + fn validate_schema_field_names(&self, schema: &Schema) -> Result<()> { + if self.metadata.schemas.is_empty() { + return Ok(()); + } + + for field_name in schema.field_id_to_name_map().values() { + let has_partition_conflict = self.metadata.partition_name_exists(field_name); + let is_new_field = !self.metadata.name_exists_in_any_schema(field_name); + + if has_partition_conflict && is_new_field { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot add schema field '{}' because it conflicts with existing partition field name. \ + Schema evolution cannot introduce field names that match existing partition field names.", + field_name + ), + )); + } + } + + Ok(()) + } + + /// Validate partition field names against schema field names across all historical schemas. + /// + /// Due to Iceberg's multi-version property, partition fields can share names with schema fields + /// if they meet specific requirements (identity transform + matching source field ID). + /// This validation enforces those rules across all historical schema versions. + /// + /// # Errors + /// - Partition field name conflicts with schema field name but doesn't use identity transform. + /// - Partition field uses identity transform but references wrong source field ID. + fn validate_partition_field_names(&self, unbound_spec: &UnboundPartitionSpec) -> Result<()> { + if self.metadata.schemas.is_empty() { + return Ok(()); + } + + let current_schema = self.get_current_schema()?; + for partition_field in unbound_spec.fields() { + let exists_in_any_schema = self + .metadata + .name_exists_in_any_schema(&partition_field.name); + + // Skip if partition field name doesn't conflict with any schema field + if !exists_in_any_schema { + continue; + } + + // If name exists in schemas, validate against current schema rules + if let Some(schema_field) = current_schema.field_by_name(&partition_field.name) { + let is_identity_transform = + partition_field.transform == crate::spec::Transform::Identity; + let has_matching_source_id = schema_field.id == partition_field.source_id; + + if !is_identity_transform { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot create partition with name '{}' that conflicts with schema field and is not an identity transform.", + partition_field.name + ), + )); + } + + if !has_matching_source_id { + return Err(Error::new( + ErrorKind::DataInvalid, + format!( + "Cannot create identity partition sourced from different field in schema. \ + Field name '{}' has id `{}` in schema but partition source id is `{}`", + partition_field.name, schema_field.id, partition_field.source_id + ), + )); + } + } + } + + Ok(()) } /// Add a partition spec to the table metadata. @@ -694,6 +787,10 @@ impl TableMetadataBuilder { /// - The partition spec has non-sequential field ids and the table format version is 1. pub fn add_partition_spec(mut self, unbound_spec: UnboundPartitionSpec) -> Result { let schema = self.get_current_schema()?.clone(); + + // Check if partition field names conflict with schema field names across all schemas + self.validate_partition_field_names(&unbound_spec)?; + let spec = PartitionSpecBuilder::new_from_unbound(unbound_spec.clone(), schema)? .with_last_assigned_field_id(self.metadata.last_partition_id) .build()?; @@ -1859,6 +1956,7 @@ mod tests { let build_result = builder .add_schema(added_schema.clone()) + .unwrap() .set_current_schema(1) .unwrap() .build() @@ -2496,4 +2594,404 @@ mod tests { }; assert_eq!(remove_schema_ids, &[0]); } + + #[test] + fn test_schema_evolution_now_correctly_validates_partition_field_name_conflicts() { + let initial_schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(); + + let partition_spec_with_bucket = UnboundPartitionSpec::builder() + .with_spec_id(0) + .add_partition_field(1, "bucket_data", Transform::Bucket(16)) + .unwrap() + .build(); + + let metadata = TableMetadataBuilder::new( + initial_schema, + partition_spec_with_bucket, + SortOrder::unsorted_order(), + TEST_LOCATION.to_string(), + FormatVersion::V2, + HashMap::new(), + ) + .unwrap() + .build() + .unwrap() + .metadata; + + let partition_field_names: Vec = metadata + .default_partition_spec() + .fields() + .iter() + .map(|f| f.name.clone()) + .collect(); + assert!(partition_field_names.contains(&"bucket_data".to_string())); + + let evolved_schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(), + // Adding a schema field with the same name as an existing partition field + NestedField::required(2, "bucket_data", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(); + + let builder = metadata.into_builder(Some( + "s3://bucket/test/location/metadata/metadata1.json".to_string(), + )); + + // Try to add the evolved schema - this should now fail immediately with a clear error + let result = builder.add_current_schema(evolved_schema); + + assert!(result.is_err()); + let error = result.unwrap_err(); + let error_message = error.message(); + assert!(error_message.contains("Cannot add schema field 'bucket_data' because it conflicts with existing partition field name")); + assert!(error_message.contains("Schema evolution cannot introduce field names that match existing partition field names")); + } + + #[test] + fn test_schema_evolution_should_validate_on_schema_add_not_metadata_build() { + let initial_schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(); + + let partition_spec = UnboundPartitionSpec::builder() + .with_spec_id(0) + .add_partition_field(1, "partition_col", Transform::Bucket(16)) + .unwrap() + .build(); + + let metadata = TableMetadataBuilder::new( + initial_schema, + partition_spec, + SortOrder::unsorted_order(), + TEST_LOCATION.to_string(), + FormatVersion::V2, + HashMap::new(), + ) + .unwrap() + .build() + .unwrap() + .metadata; + + let non_conflicting_schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "new_field", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(); + + // This should succeed since there's no name conflict + let result = metadata + .clone() + .into_builder(Some("test_location".to_string())) + .add_current_schema(non_conflicting_schema) + .unwrap() + .build(); + + assert!(result.is_ok()); + } + + #[test] + fn test_partition_spec_evolution_validates_schema_field_name_conflicts() { + let initial_schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "existing_field", Type::Primitive(PrimitiveType::Int)) + .into(), + ]) + .build() + .unwrap(); + + let partition_spec = UnboundPartitionSpec::builder() + .with_spec_id(0) + .add_partition_field(1, "data_bucket", Transform::Bucket(16)) + .unwrap() + .build(); + + let metadata = TableMetadataBuilder::new( + initial_schema, + partition_spec, + SortOrder::unsorted_order(), + TEST_LOCATION.to_string(), + FormatVersion::V2, + HashMap::new(), + ) + .unwrap() + .build() + .unwrap() + .metadata; + + let builder = metadata.into_builder(Some( + "s3://bucket/test/location/metadata/metadata1.json".to_string(), + )); + + let conflicting_partition_spec = UnboundPartitionSpec::builder() + .with_spec_id(1) + .add_partition_field(1, "existing_field", Transform::Bucket(8)) + .unwrap() + .build(); + + let result = builder.add_partition_spec(conflicting_partition_spec); + + assert!(result.is_err()); + let error = result.unwrap_err(); + let error_message = error.message(); + // The error comes from our multi-version validation + assert!(error_message.contains( + "Cannot create partition with name 'existing_field' that conflicts with schema field" + )); + assert!(error_message.contains("and is not an identity transform")); + } + + #[test] + fn test_schema_evolution_validates_against_all_historical_schemas() { + // Create a table with an initial schema that has a field "existing_field" + let initial_schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "existing_field", Type::Primitive(PrimitiveType::Int)) + .into(), + ]) + .build() + .unwrap(); + + let partition_spec = UnboundPartitionSpec::builder() + .with_spec_id(0) + .add_partition_field(1, "bucket_data", Transform::Bucket(16)) + .unwrap() + .build(); + + let metadata = TableMetadataBuilder::new( + initial_schema, + partition_spec, + SortOrder::unsorted_order(), + TEST_LOCATION.to_string(), + FormatVersion::V2, + HashMap::new(), + ) + .unwrap() + .build() + .unwrap() + .metadata; + + // Add a second schema that removes the existing_field but keeps the data field + let second_schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(3, "new_field", Type::Primitive(PrimitiveType::String)) + .into(), + ]) + .build() + .unwrap(); + + let metadata = metadata + .into_builder(Some("test_location".to_string())) + .add_current_schema(second_schema) + .unwrap() + .build() + .unwrap() + .metadata; + + // Now try to add a third schema that reintroduces "existing_field" + // This should succeed because "existing_field" exists in a historical schema, + // even though there's a partition field named "bucket_data" + let third_schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(3, "new_field", Type::Primitive(PrimitiveType::String)) + .into(), + NestedField::required(4, "existing_field", Type::Primitive(PrimitiveType::Int)) + .into(), + ]) + .build() + .unwrap(); + + let builder = metadata + .clone() + .into_builder(Some("test_location".to_string())); + + // This should succeed because "existing_field" exists in a historical schema + let result = builder.add_current_schema(third_schema); + assert!(result.is_ok()); + + // However, trying to add a schema field that conflicts with the partition field + // and doesn't exist in any historical schema should fail + let conflicting_schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(3, "new_field", Type::Primitive(PrimitiveType::String)) + .into(), + NestedField::required(4, "existing_field", Type::Primitive(PrimitiveType::Int)) + .into(), + NestedField::required(5, "bucket_data", Type::Primitive(PrimitiveType::String)) + .into(), // conflicts with partition field + ]) + .build() + .unwrap(); + + let builder2 = metadata.into_builder(Some("test_location".to_string())); + let result2 = builder2.add_current_schema(conflicting_schema); + + // This should fail because "bucket_data" conflicts with partition field name + // and doesn't exist in any historical schema + assert!(result2.is_err()); + let error = result2.unwrap_err(); + assert!(error.message().contains("Cannot add schema field 'bucket_data' because it conflicts with existing partition field name")); + } + + #[test] + fn test_schema_evolution_allows_existing_partition_field_if_exists_in_historical_schema() { + // Create initial schema with a field + let initial_schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "partition_data", Type::Primitive(PrimitiveType::Int)) + .into(), + ]) + .build() + .unwrap(); + + let partition_spec = UnboundPartitionSpec::builder() + .with_spec_id(0) + .add_partition_field(2, "partition_data", Transform::Identity) + .unwrap() + .build(); + + let metadata = TableMetadataBuilder::new( + initial_schema, + partition_spec, + SortOrder::unsorted_order(), + TEST_LOCATION.to_string(), + FormatVersion::V2, + HashMap::new(), + ) + .unwrap() + .build() + .unwrap() + .metadata; + + // Add a new schema that still contains the partition_data field + let evolved_schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "partition_data", Type::Primitive(PrimitiveType::Int)) + .into(), + NestedField::required(3, "new_field", Type::Primitive(PrimitiveType::String)) + .into(), + ]) + .build() + .unwrap(); + + // This should succeed because partition_data exists in historical schemas + let result = metadata + .into_builder(Some("test_location".to_string())) + .add_current_schema(evolved_schema); + + assert!(result.is_ok()); + } + + #[test] + fn test_schema_evolution_prevents_new_field_conflicting_with_partition_field() { + // Create initial schema WITHOUT the conflicting field + let initial_schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(); + + let partition_spec = UnboundPartitionSpec::builder() + .with_spec_id(0) + .add_partition_field(1, "bucket_data", Transform::Bucket(16)) + .unwrap() + .build(); + + let metadata = TableMetadataBuilder::new( + initial_schema, + partition_spec, + SortOrder::unsorted_order(), + TEST_LOCATION.to_string(), + FormatVersion::V2, + HashMap::new(), + ) + .unwrap() + .build() + .unwrap() + .metadata; + + // Try to add a schema with a field that conflicts with partition field name + let conflicting_schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(), + // This field name conflicts with the partition field "bucket_data" + NestedField::required(2, "bucket_data", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(); + + let builder = metadata.into_builder(Some("test_location".to_string())); + let result = builder.add_current_schema(conflicting_schema); + + // This should fail because "bucket_data" conflicts with partition field name + // and doesn't exist in any historical schema + assert!(result.is_err()); + let error = result.unwrap_err(); + assert!(error.message().contains("Cannot add schema field 'bucket_data' because it conflicts with existing partition field name")); + } + + #[test] + fn test_partition_spec_evolution_allows_non_conflicting_names() { + let initial_schema = Schema::builder() + .with_fields(vec![ + NestedField::required(1, "data", Type::Primitive(PrimitiveType::String)).into(), + NestedField::required(2, "existing_field", Type::Primitive(PrimitiveType::Int)) + .into(), + ]) + .build() + .unwrap(); + + let partition_spec = UnboundPartitionSpec::builder() + .with_spec_id(0) + .add_partition_field(1, "data_bucket", Transform::Bucket(16)) + .unwrap() + .build(); + + let metadata = TableMetadataBuilder::new( + initial_schema, + partition_spec, + SortOrder::unsorted_order(), + TEST_LOCATION.to_string(), + FormatVersion::V2, + HashMap::new(), + ) + .unwrap() + .build() + .unwrap() + .metadata; + + let builder = metadata.into_builder(Some( + "s3://bucket/test/location/metadata/metadata1.json".to_string(), + )); + + // Try to add a partition spec with a field name that does NOT conflict with existing schema fields + let non_conflicting_partition_spec = UnboundPartitionSpec::builder() + .with_spec_id(1) + .add_partition_field(2, "new_partition_field", Transform::Bucket(8)) + .unwrap() + .build(); + + let result = builder.add_partition_spec(non_conflicting_partition_spec); + + assert!(result.is_ok()); + } }