@@ -589,7 +589,10 @@ impl TableMetadataBuilder {
589
589
///
590
590
/// Important: Use this method with caution. The builder does not check
591
591
/// if the added schema is compatible with the current schema.
592
- pub fn add_schema ( mut self , schema : Schema ) -> Self {
592
+ pub fn add_schema ( mut self , schema : Schema ) -> Result < Self > {
593
+ // Validate that new schema fields don't conflict with existing partition field names
594
+ self . check_schema_partition_name_conflict ( & schema) ?;
595
+
593
596
let new_schema_id = self . reuse_or_create_new_schema_id ( & schema) ;
594
597
let schema_found = self . metadata . schemas . contains_key ( & new_schema_id) ;
595
598
@@ -601,7 +604,7 @@ impl TableMetadataBuilder {
601
604
self . last_added_schema_id = Some ( new_schema_id) ;
602
605
}
603
606
604
- return self ;
607
+ return Ok ( self ) ;
605
608
}
606
609
607
610
// New schemas might contain only old columns. In this case last_column_id should not be
@@ -623,7 +626,7 @@ impl TableMetadataBuilder {
623
626
624
627
self . last_added_schema_id = Some ( new_schema_id) ;
625
628
626
- self
629
+ Ok ( self )
627
630
}
628
631
629
632
/// Set the current schema id.
@@ -679,7 +682,41 @@ impl TableMetadataBuilder {
679
682
680
683
/// Add a schema and set it as the current schema.
681
684
pub fn add_current_schema ( self , schema : Schema ) -> Result < Self > {
682
- self . add_schema ( schema) . set_current_schema ( Self :: LAST_ADDED )
685
+ self . add_schema ( schema) ?
686
+ . set_current_schema ( Self :: LAST_ADDED )
687
+ }
688
+
689
+ /// Check if schema field names conflict with existing partition field names.
690
+ ///
691
+ /// # Errors
692
+ /// - Schema field name conflicts with existing partition field name.
693
+ fn check_schema_partition_name_conflict ( & self , schema : & Schema ) -> Result < ( ) > {
694
+ let current_schema = match self . get_current_schema ( ) {
695
+ Ok ( schema) => schema,
696
+ Err ( _) => return Ok ( ( ) ) , // No current schema, no conflicts possible
697
+ } ;
698
+
699
+ for field_name in schema. field_id_to_name_map ( ) . values ( ) {
700
+ let conflicts_with_partition_field =
701
+ self . metadata . partition_specs . values ( ) . any ( |spec| {
702
+ spec. fields ( )
703
+ . iter ( )
704
+ . any ( |partition_field| & partition_field. name == field_name)
705
+ } ) ;
706
+
707
+ if conflicts_with_partition_field && current_schema. field_by_name ( field_name) . is_none ( )
708
+ {
709
+ return Err ( Error :: new (
710
+ ErrorKind :: DataInvalid ,
711
+ format ! (
712
+ "Cannot add schema field '{}' because it conflicts with existing partition field name. Schema evolution cannot introduce field names that match existing partition field names." ,
713
+ field_name
714
+ ) ,
715
+ ) ) ;
716
+ }
717
+ }
718
+
719
+ Ok ( ( ) )
683
720
}
684
721
685
722
/// Add a partition spec to the table metadata.
@@ -694,6 +731,7 @@ impl TableMetadataBuilder {
694
731
/// - The partition spec has non-sequential field ids and the table format version is 1.
695
732
pub fn add_partition_spec ( mut self , unbound_spec : UnboundPartitionSpec ) -> Result < Self > {
696
733
let schema = self . get_current_schema ( ) ?. clone ( ) ;
734
+
697
735
let spec = PartitionSpecBuilder :: new_from_unbound ( unbound_spec. clone ( ) , schema) ?
698
736
. with_last_assigned_field_id ( self . metadata . last_partition_id )
699
737
. build ( ) ?;
@@ -1859,6 +1897,7 @@ mod tests {
1859
1897
1860
1898
let build_result = builder
1861
1899
. add_schema ( added_schema. clone ( ) )
1900
+ . unwrap ( )
1862
1901
. set_current_schema ( 1 )
1863
1902
. unwrap ( )
1864
1903
. build ( )
@@ -2496,4 +2535,208 @@ mod tests {
2496
2535
} ;
2497
2536
assert_eq ! ( remove_schema_ids, & [ 0 ] ) ;
2498
2537
}
2538
+
2539
+ #[ test]
2540
+ fn test_schema_evolution_now_correctly_validates_partition_field_name_conflicts ( ) {
2541
+ let initial_schema = Schema :: builder ( )
2542
+ . with_fields ( vec ! [
2543
+ NestedField :: required( 1 , "data" , Type :: Primitive ( PrimitiveType :: String ) ) . into( ) ,
2544
+ ] )
2545
+ . build ( )
2546
+ . unwrap ( ) ;
2547
+
2548
+ let partition_spec_with_bucket = UnboundPartitionSpec :: builder ( )
2549
+ . with_spec_id ( 0 )
2550
+ . add_partition_field ( 1 , "bucket_data" , Transform :: Bucket ( 16 ) )
2551
+ . unwrap ( )
2552
+ . build ( ) ;
2553
+
2554
+ let metadata = TableMetadataBuilder :: new (
2555
+ initial_schema,
2556
+ partition_spec_with_bucket,
2557
+ SortOrder :: unsorted_order ( ) ,
2558
+ TEST_LOCATION . to_string ( ) ,
2559
+ FormatVersion :: V2 ,
2560
+ HashMap :: new ( ) ,
2561
+ )
2562
+ . unwrap ( )
2563
+ . build ( )
2564
+ . unwrap ( )
2565
+ . metadata ;
2566
+
2567
+ let partition_field_names: Vec < String > = metadata
2568
+ . default_partition_spec ( )
2569
+ . fields ( )
2570
+ . iter ( )
2571
+ . map ( |f| f. name . clone ( ) )
2572
+ . collect ( ) ;
2573
+ assert ! ( partition_field_names. contains( & "bucket_data" . to_string( ) ) ) ;
2574
+
2575
+ let evolved_schema = Schema :: builder ( )
2576
+ . with_fields ( vec ! [
2577
+ NestedField :: required( 1 , "data" , Type :: Primitive ( PrimitiveType :: String ) ) . into( ) ,
2578
+ // Adding a schema field with the same name as an existing partition field
2579
+ NestedField :: required( 2 , "bucket_data" , Type :: Primitive ( PrimitiveType :: Int ) ) . into( ) ,
2580
+ ] )
2581
+ . build ( )
2582
+ . unwrap ( ) ;
2583
+
2584
+ let builder = metadata. into_builder ( Some (
2585
+ "s3://bucket/test/location/metadata/metadata1.json" . to_string ( ) ,
2586
+ ) ) ;
2587
+
2588
+ // Try to add the evolved schema - this should now fail immediately with a clear error
2589
+ let result = builder. add_current_schema ( evolved_schema) ;
2590
+
2591
+ assert ! ( result. is_err( ) ) ;
2592
+ let error = result. unwrap_err ( ) ;
2593
+ let error_message = error. message ( ) ;
2594
+ assert ! ( error_message. contains( "Cannot add schema field 'bucket_data' because it conflicts with existing partition field name" ) ) ;
2595
+ assert ! ( error_message. contains( "Schema evolution cannot introduce field names that match existing partition field names" ) ) ;
2596
+ }
2597
+
2598
+ #[ test]
2599
+ fn test_schema_evolution_should_validate_on_schema_add_not_metadata_build ( ) {
2600
+ let initial_schema = Schema :: builder ( )
2601
+ . with_fields ( vec ! [
2602
+ NestedField :: required( 1 , "data" , Type :: Primitive ( PrimitiveType :: String ) ) . into( ) ,
2603
+ ] )
2604
+ . build ( )
2605
+ . unwrap ( ) ;
2606
+
2607
+ let partition_spec = UnboundPartitionSpec :: builder ( )
2608
+ . with_spec_id ( 0 )
2609
+ . add_partition_field ( 1 , "partition_col" , Transform :: Bucket ( 16 ) )
2610
+ . unwrap ( )
2611
+ . build ( ) ;
2612
+
2613
+ let metadata = TableMetadataBuilder :: new (
2614
+ initial_schema,
2615
+ partition_spec,
2616
+ SortOrder :: unsorted_order ( ) ,
2617
+ TEST_LOCATION . to_string ( ) ,
2618
+ FormatVersion :: V2 ,
2619
+ HashMap :: new ( ) ,
2620
+ )
2621
+ . unwrap ( )
2622
+ . build ( )
2623
+ . unwrap ( )
2624
+ . metadata ;
2625
+
2626
+ let non_conflicting_schema = Schema :: builder ( )
2627
+ . with_fields ( vec ! [
2628
+ NestedField :: required( 1 , "data" , Type :: Primitive ( PrimitiveType :: String ) ) . into( ) ,
2629
+ NestedField :: required( 2 , "new_field" , Type :: Primitive ( PrimitiveType :: Int ) ) . into( ) ,
2630
+ ] )
2631
+ . build ( )
2632
+ . unwrap ( ) ;
2633
+
2634
+ // This should succeed since there's no name conflict
2635
+ let result = metadata
2636
+ . clone ( )
2637
+ . into_builder ( Some ( "test_location" . to_string ( ) ) )
2638
+ . add_current_schema ( non_conflicting_schema)
2639
+ . unwrap ( )
2640
+ . build ( ) ;
2641
+
2642
+ assert ! ( result. is_ok( ) ) ;
2643
+ }
2644
+
2645
+ #[ test]
2646
+ fn test_partition_spec_evolution_validates_schema_field_name_conflicts ( ) {
2647
+ let initial_schema = Schema :: builder ( )
2648
+ . with_fields ( vec ! [
2649
+ NestedField :: required( 1 , "data" , Type :: Primitive ( PrimitiveType :: String ) ) . into( ) ,
2650
+ NestedField :: required( 2 , "existing_field" , Type :: Primitive ( PrimitiveType :: Int ) )
2651
+ . into( ) ,
2652
+ ] )
2653
+ . build ( )
2654
+ . unwrap ( ) ;
2655
+
2656
+ let partition_spec = UnboundPartitionSpec :: builder ( )
2657
+ . with_spec_id ( 0 )
2658
+ . add_partition_field ( 1 , "data_bucket" , Transform :: Bucket ( 16 ) )
2659
+ . unwrap ( )
2660
+ . build ( ) ;
2661
+
2662
+ let metadata = TableMetadataBuilder :: new (
2663
+ initial_schema,
2664
+ partition_spec,
2665
+ SortOrder :: unsorted_order ( ) ,
2666
+ TEST_LOCATION . to_string ( ) ,
2667
+ FormatVersion :: V2 ,
2668
+ HashMap :: new ( ) ,
2669
+ )
2670
+ . unwrap ( )
2671
+ . build ( )
2672
+ . unwrap ( )
2673
+ . metadata ;
2674
+
2675
+ let builder = metadata. into_builder ( Some (
2676
+ "s3://bucket/test/location/metadata/metadata1.json" . to_string ( ) ,
2677
+ ) ) ;
2678
+
2679
+ let conflicting_partition_spec = UnboundPartitionSpec :: builder ( )
2680
+ . with_spec_id ( 1 )
2681
+ . add_partition_field ( 1 , "existing_field" , Transform :: Bucket ( 8 ) )
2682
+ . unwrap ( )
2683
+ . build ( ) ;
2684
+
2685
+ let result = builder. add_partition_spec ( conflicting_partition_spec) ;
2686
+
2687
+ assert ! ( result. is_err( ) ) ;
2688
+ let error = result. unwrap_err ( ) ;
2689
+ let error_message = error. message ( ) ;
2690
+ // The error comes from existing PartitionSpecBuilder validation
2691
+ assert ! ( error_message. contains(
2692
+ "Cannot create partition with name: 'existing_field' that conflicts with schema field"
2693
+ ) ) ;
2694
+ assert ! ( error_message. contains( "and is not an identity transform" ) ) ;
2695
+ }
2696
+
2697
+ #[ test]
2698
+ fn test_partition_spec_evolution_allows_non_conflicting_names ( ) {
2699
+ let initial_schema = Schema :: builder ( )
2700
+ . with_fields ( vec ! [
2701
+ NestedField :: required( 1 , "data" , Type :: Primitive ( PrimitiveType :: String ) ) . into( ) ,
2702
+ NestedField :: required( 2 , "existing_field" , Type :: Primitive ( PrimitiveType :: Int ) )
2703
+ . into( ) ,
2704
+ ] )
2705
+ . build ( )
2706
+ . unwrap ( ) ;
2707
+
2708
+ let partition_spec = UnboundPartitionSpec :: builder ( )
2709
+ . with_spec_id ( 0 )
2710
+ . add_partition_field ( 1 , "data_bucket" , Transform :: Bucket ( 16 ) )
2711
+ . unwrap ( )
2712
+ . build ( ) ;
2713
+
2714
+ let metadata = TableMetadataBuilder :: new (
2715
+ initial_schema,
2716
+ partition_spec,
2717
+ SortOrder :: unsorted_order ( ) ,
2718
+ TEST_LOCATION . to_string ( ) ,
2719
+ FormatVersion :: V2 ,
2720
+ HashMap :: new ( ) ,
2721
+ )
2722
+ . unwrap ( )
2723
+ . build ( )
2724
+ . unwrap ( )
2725
+ . metadata ;
2726
+
2727
+ let builder = metadata. into_builder ( Some (
2728
+ "s3://bucket/test/location/metadata/metadata1.json" . to_string ( ) ,
2729
+ ) ) ;
2730
+
2731
+ // Try to add a partition spec with a field name that does NOT conflict with existing schema fields
2732
+ let non_conflicting_partition_spec = UnboundPartitionSpec :: builder ( )
2733
+ . with_spec_id ( 1 )
2734
+ . add_partition_field ( 2 , "new_partition_field" , Transform :: Bucket ( 8 ) )
2735
+ . unwrap ( )
2736
+ . build ( ) ;
2737
+
2738
+ let result = builder. add_partition_spec ( non_conflicting_partition_spec) ;
2739
+
2740
+ assert ! ( result. is_ok( ) ) ;
2741
+ }
2499
2742
}
0 commit comments