@@ -589,7 +589,10 @@ impl TableMetadataBuilder {
589589     /// 
590590     /// Important: Use this method with caution. The builder does not check 
591591     /// if the added schema is compatible with the current schema. 
592-      pub  fn  add_schema ( mut  self ,  schema :  Schema )  -> Self  { 
592+      pub  fn  add_schema ( mut  self ,  schema :  Schema )  -> Result < Self >  { 
593+         // Validate that new schema fields don't conflict with existing partition field names 
594+         self . check_schema_partition_name_conflicts ( & schema) ?; 
595+ 
593596        let  new_schema_id = self . reuse_or_create_new_schema_id ( & schema) ; 
594597        let  schema_found = self . metadata . schemas . contains_key ( & new_schema_id) ; 
595598
@@ -601,7 +604,7 @@ impl TableMetadataBuilder {
601604                self . last_added_schema_id  = Some ( new_schema_id) ; 
602605            } 
603606
604-             return  self ; 
607+             return  Ok ( self ) ; 
605608        } 
606609
607610        // New schemas might contain only old columns. In this case last_column_id should not be 
@@ -623,7 +626,7 @@ impl TableMetadataBuilder {
623626
624627        self . last_added_schema_id  = Some ( new_schema_id) ; 
625628
626-         self 
629+         Ok ( self ) 
627630    } 
628631
629632    /// Set the current schema id. 
@@ -679,7 +682,54 @@ impl TableMetadataBuilder {
679682
680683    /// Add a schema and set it as the current schema. 
681684     pub  fn  add_current_schema ( self ,  schema :  Schema )  -> Result < Self >  { 
682-         self . add_schema ( schema) . set_current_schema ( Self :: LAST_ADDED ) 
685+         self . add_schema ( schema) ?
686+             . set_current_schema ( Self :: LAST_ADDED ) 
687+     } 
688+ 
689+     /// Check if schema field names conflict with existing partition field names. 
690+      /// 
691+      /// This validation prevents introducing new field names that match existing partition field names 
692+      /// that don't correspond to any field in the current schema. Field names that already exist in the 
693+      /// current schema are allowed since partition fields are typically derived from schema fields. 
694+      /// 
695+      /// # Errors 
696+      /// - Schema field name conflicts with existing partition field name that doesn't correspond to any current schema field. 
697+      fn  check_schema_partition_name_conflicts ( & self ,  schema :  & Schema )  -> Result < ( ) >  { 
698+         let  existing_partition_field_names:  HashSet < & str >  = self 
699+             . metadata 
700+             . partition_specs 
701+             . values ( ) 
702+             . flat_map ( |spec| spec. fields ( ) . iter ( ) . map ( |field| field. name . as_str ( ) ) ) 
703+             . collect ( ) ; 
704+ 
705+         let  current_schema_field_names:  HashSet < & str >  = self 
706+             . metadata 
707+             . schemas 
708+             . get ( & self . metadata . current_schema_id ) 
709+             . map ( |current_schema| { 
710+                 current_schema
711+                     . as_struct ( ) 
712+                     . fields ( ) 
713+                     . iter ( ) 
714+                     . map ( |field| field. name . as_str ( ) ) 
715+                     . collect ( ) 
716+             } ) 
717+             . unwrap_or_default ( ) ; 
718+ 
719+         if  let  Some ( conflicting_field)  = schema. as_struct ( ) . fields ( ) . iter ( ) . find ( |field| { 
720+             existing_partition_field_names. contains ( field. name . as_str ( ) ) 
721+                 && !current_schema_field_names. contains ( field. name . as_str ( ) ) 
722+         } )  { 
723+             return  Err ( Error :: new ( 
724+                 ErrorKind :: DataInvalid , 
725+                 format ! ( 
726+                     "Cannot add schema field '{}' because it conflicts with existing partition field name. Schema evolution cannot introduce field names that match existing partition field names." , 
727+                     conflicting_field. name
728+                 ) , 
729+             ) ) ; 
730+         } 
731+ 
732+         Ok ( ( ) ) 
683733    } 
684734
685735    /// Add a partition spec to the table metadata. 
@@ -1859,6 +1909,7 @@ mod tests {
18591909
18601910        let  build_result = builder
18611911            . add_schema ( added_schema. clone ( ) ) 
1912+             . unwrap ( ) 
18621913            . set_current_schema ( 1 ) 
18631914            . unwrap ( ) 
18641915            . build ( ) 
@@ -2496,4 +2547,110 @@ mod tests {
24962547            } ; 
24972548        assert_eq ! ( remove_schema_ids,  & [ 0 ] ) ; 
24982549    } 
2550+ 
2551+     #[ test]  
2552+     fn  test_schema_evolution_now_correctly_validates_partition_field_name_conflicts ( )  { 
2553+         let  initial_schema = Schema :: builder ( ) 
2554+             . with_fields ( vec ! [ 
2555+                 NestedField :: required( 1 ,  "data" ,  Type :: Primitive ( PrimitiveType :: String ) ) . into( ) , 
2556+             ] ) 
2557+             . build ( ) 
2558+             . unwrap ( ) ; 
2559+ 
2560+         let  partition_spec_with_bucket = UnboundPartitionSpec :: builder ( ) 
2561+             . with_spec_id ( 0 ) 
2562+             . add_partition_field ( 1 ,  "bucket_data" ,  Transform :: Bucket ( 16 ) ) 
2563+             . unwrap ( ) 
2564+             . build ( ) ; 
2565+ 
2566+         let  metadata = TableMetadataBuilder :: new ( 
2567+             initial_schema, 
2568+             partition_spec_with_bucket, 
2569+             SortOrder :: unsorted_order ( ) , 
2570+             TEST_LOCATION . to_string ( ) , 
2571+             FormatVersion :: V2 , 
2572+             HashMap :: new ( ) , 
2573+         ) 
2574+         . unwrap ( ) 
2575+         . build ( ) 
2576+         . unwrap ( ) 
2577+         . metadata ; 
2578+ 
2579+         let  partition_field_names:  Vec < String >  = metadata
2580+             . default_partition_spec ( ) 
2581+             . fields ( ) 
2582+             . iter ( ) 
2583+             . map ( |f| f. name . clone ( ) ) 
2584+             . collect ( ) ; 
2585+         assert ! ( partition_field_names. contains( & "bucket_data" . to_string( ) ) ) ; 
2586+ 
2587+         let  evolved_schema = Schema :: builder ( ) 
2588+             . with_fields ( vec ! [ 
2589+                 NestedField :: required( 1 ,  "data" ,  Type :: Primitive ( PrimitiveType :: String ) ) . into( ) , 
2590+                 // Adding a schema field with the same name as an existing partition field 
2591+                 NestedField :: required( 2 ,  "bucket_data" ,  Type :: Primitive ( PrimitiveType :: Int ) ) . into( ) , 
2592+             ] ) 
2593+             . build ( ) 
2594+             . unwrap ( ) ; 
2595+ 
2596+         let  builder = metadata. into_builder ( Some ( 
2597+             "s3://bucket/test/location/metadata/metadata1.json" . to_string ( ) , 
2598+         ) ) ; 
2599+ 
2600+         // Try to add the evolved schema - this should now fail immediately with a clear error 
2601+         let  result = builder. add_current_schema ( evolved_schema) ; 
2602+ 
2603+         assert ! ( result. is_err( ) ) ; 
2604+         let  error = result. unwrap_err ( ) ; 
2605+         let  error_message = error. message ( ) ; 
2606+         assert ! ( error_message. contains( "Cannot add schema field 'bucket_data' because it conflicts with existing partition field name" ) ) ; 
2607+         assert ! ( error_message. contains( "Schema evolution cannot introduce field names that match existing partition field names" ) ) ; 
2608+     } 
2609+ 
2610+     #[ test]  
2611+     fn  test_schema_evolution_should_validate_on_schema_add_not_metadata_build ( )  { 
2612+         let  initial_schema = Schema :: builder ( ) 
2613+             . with_fields ( vec ! [ 
2614+                 NestedField :: required( 1 ,  "data" ,  Type :: Primitive ( PrimitiveType :: String ) ) . into( ) , 
2615+             ] ) 
2616+             . build ( ) 
2617+             . unwrap ( ) ; 
2618+ 
2619+         let  partition_spec = UnboundPartitionSpec :: builder ( ) 
2620+             . with_spec_id ( 0 ) 
2621+             . add_partition_field ( 1 ,  "partition_col" ,  Transform :: Bucket ( 16 ) ) 
2622+             . unwrap ( ) 
2623+             . build ( ) ; 
2624+ 
2625+         let  metadata = TableMetadataBuilder :: new ( 
2626+             initial_schema, 
2627+             partition_spec, 
2628+             SortOrder :: unsorted_order ( ) , 
2629+             TEST_LOCATION . to_string ( ) , 
2630+             FormatVersion :: V2 , 
2631+             HashMap :: new ( ) , 
2632+         ) 
2633+         . unwrap ( ) 
2634+         . build ( ) 
2635+         . unwrap ( ) 
2636+         . metadata ; 
2637+ 
2638+         let  non_conflicting_schema = Schema :: builder ( ) 
2639+             . with_fields ( vec ! [ 
2640+                 NestedField :: required( 1 ,  "data" ,  Type :: Primitive ( PrimitiveType :: String ) ) . into( ) , 
2641+                 NestedField :: required( 2 ,  "new_field" ,  Type :: Primitive ( PrimitiveType :: Int ) ) . into( ) , 
2642+             ] ) 
2643+             . build ( ) 
2644+             . unwrap ( ) ; 
2645+ 
2646+         // This should succeed since there's no name conflict 
2647+         let  result = metadata
2648+             . clone ( ) 
2649+             . into_builder ( Some ( "test_location" . to_string ( ) ) ) 
2650+             . add_current_schema ( non_conflicting_schema) 
2651+             . unwrap ( ) 
2652+             . build ( ) ; 
2653+ 
2654+         assert ! ( result. is_ok( ) ) ; 
2655+     } 
24992656} 
0 commit comments