@@ -564,4 +564,206 @@ mod tests {
564564 // Should handle NaN values
565565 assert ! ( result. is_ok( ) ) ;
566566 }
567+
568+ #[ tokio:: test]
569+ #[ allow( clippy:: too_many_lines) ]
570+ async fn it_should_handle_race_conditions_when_updating_udp_performance_metrics_in_parallel ( ) {
571+ // Number of concurrent requests per server
572+ const REQUESTS_PER_SERVER : usize = 100 ;
573+
574+ let repo = Repository :: new ( ) ;
575+ let now = CurrentClock :: now ( ) ;
576+
577+ // Define labels for two different UDP servers
578+ let server1_labels = LabelSet :: from ( [
579+ ( "request_kind" , "connect" ) ,
580+ ( "server_binding_address_ip_family" , "inet" ) ,
581+ ( "server_port" , "6868" ) ,
582+ ] ) ;
583+ let server2_labels = LabelSet :: from ( [
584+ ( "request_kind" , "connect" ) ,
585+ ( "server_binding_address_ip_family" , "inet" ) ,
586+ ( "server_port" , "6969" ) ,
587+ ] ) ;
588+
589+ let mut handles = vec ! [ ] ;
590+
591+ // Spawn tasks for server 1
592+ for i in 0 ..REQUESTS_PER_SERVER {
593+ let repo_clone = repo. clone ( ) ;
594+ let labels = server1_labels. clone ( ) ;
595+ let handle = tokio:: spawn ( async move {
596+ // Simulate varying processing times (1000ns to 5000ns)
597+ let processing_time_ns = 1000 + ( i % 5 ) * 1000 ;
598+ let processing_time = Duration :: from_nanos ( processing_time_ns as u64 ) ;
599+
600+ repo_clone
601+ . recalculate_udp_avg_processing_time_ns ( processing_time, & labels, now)
602+ . await
603+ } ) ;
604+ handles. push ( handle) ;
605+ }
606+
607+ // Spawn tasks for server 2
608+ for i in 0 ..REQUESTS_PER_SERVER {
609+ let repo_clone = repo. clone ( ) ;
610+ let labels = server2_labels. clone ( ) ;
611+ let handle = tokio:: spawn ( async move {
612+ // Simulate different processing times (2000ns to 6000ns)
613+ let processing_time_ns = 2000 + ( i % 5 ) * 1000 ;
614+ let processing_time = Duration :: from_nanos ( processing_time_ns as u64 ) ;
615+
616+ repo_clone
617+ . recalculate_udp_avg_processing_time_ns ( processing_time, & labels, now)
618+ . await
619+ } ) ;
620+ handles. push ( handle) ;
621+ }
622+
623+ // Collect all the results
624+ let mut server1_results = Vec :: new ( ) ;
625+ let mut server2_results = Vec :: new ( ) ;
626+
627+ for ( i, handle) in handles. into_iter ( ) . enumerate ( ) {
628+ let result = handle. await . unwrap ( ) ;
629+ if i < REQUESTS_PER_SERVER {
630+ server1_results. push ( result) ;
631+ } else {
632+ server2_results. push ( result) ;
633+ }
634+ }
635+
636+ // Verify that all tasks completed successfully
637+ assert_eq ! ( server1_results. len( ) , REQUESTS_PER_SERVER ) ;
638+ assert_eq ! ( server2_results. len( ) , REQUESTS_PER_SERVER ) ;
639+
640+ // Verify that all results are finite and positive
641+ for result in & server1_results {
642+ assert ! ( result. is_finite( ) , "Server 1 result should be finite: {result}" ) ;
643+ assert ! ( * result > 0.0 , "Server 1 result should be positive: {result}" ) ;
644+ }
645+
646+ for result in & server2_results {
647+ assert ! ( result. is_finite( ) , "Server 2 result should be finite: {result}" ) ;
648+ assert ! ( * result > 0.0 , "Server 2 result should be positive: {result}" ) ;
649+ }
650+
651+ // Get final stats and verify metrics integrity
652+ let stats = repo. get_stats ( ) . await ;
653+
654+ // Verify that the processed requests counters are correct for each server
655+ #[ allow( clippy:: cast_sign_loss) ]
656+ #[ allow( clippy:: cast_possible_truncation) ]
657+ let server1_processed = stats
658+ . metric_collection
659+ . get_counter_value (
660+ & metric_name ! ( UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSED_REQUESTS_TOTAL ) ,
661+ & server1_labels,
662+ )
663+ . unwrap ( )
664+ . value ( ) ;
665+
666+ #[ allow( clippy:: cast_sign_loss) ]
667+ #[ allow( clippy:: cast_possible_truncation) ]
668+ let server2_processed = stats
669+ . metric_collection
670+ . get_counter_value (
671+ & metric_name ! ( UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSED_REQUESTS_TOTAL ) ,
672+ & server2_labels,
673+ )
674+ . unwrap ( )
675+ . value ( ) ;
676+
677+ assert_eq ! (
678+ server1_processed, REQUESTS_PER_SERVER as u64 ,
679+ "Server 1 should have processed {REQUESTS_PER_SERVER} requests" ,
680+ ) ;
681+ assert_eq ! (
682+ server2_processed, REQUESTS_PER_SERVER as u64 ,
683+ "Server 2 should have processed {REQUESTS_PER_SERVER} requests" ,
684+ ) ;
685+
686+ // Verify that the final average processing times are reasonable
687+ #[ allow( clippy:: cast_sign_loss) ]
688+ #[ allow( clippy:: cast_possible_truncation) ]
689+ let server1_final_avg = stats
690+ . metric_collection
691+ . get_gauge_value (
692+ & metric_name ! ( UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS ) ,
693+ & server1_labels,
694+ )
695+ . unwrap ( )
696+ . value ( ) ;
697+
698+ #[ allow( clippy:: cast_sign_loss) ]
699+ #[ allow( clippy:: cast_possible_truncation) ]
700+ let server2_final_avg = stats
701+ . metric_collection
702+ . get_gauge_value (
703+ & metric_name ! ( UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS ) ,
704+ & server2_labels,
705+ )
706+ . unwrap ( )
707+ . value ( ) ;
708+
709+ // Server 1: 100 requests cycling through [1000, 2000, 3000, 4000, 5000] ns
710+ // Expected average: (20×1000 + 20×2000 + 20×3000 + 20×4000 + 20×5000) / 100 = 3000 ns
711+ // Note: Moving average with concurrent updates may have small deviations due to order dependency
712+ assert ! (
713+ ( server1_final_avg - 3000.0 ) . abs( ) < 50.0 ,
714+ "Server 1 final average should be close to 3000ns (±50ns), got {server1_final_avg}ns"
715+ ) ;
716+
717+ // Server 2: 100 requests cycling through [2000, 3000, 4000, 5000, 6000] ns
718+ // Expected average: (20×2000 + 20×3000 + 20×4000 + 20×5000 + 20×6000) / 100 = 4000 ns
719+ // Note: Moving average with concurrent updates may have small deviations due to order dependency
720+ assert ! (
721+ ( server2_final_avg - 4000.0 ) . abs( ) < 50.0 ,
722+ "Server 2 final average should be close to 4000ns (±50ns), got {server2_final_avg}ns"
723+ ) ;
724+
725+ // Verify that the two servers have different averages (they should since they have different processing time ranges)
726+ assert ! (
727+ ( server1_final_avg - server2_final_avg) . abs( ) > 950.0 ,
728+ "Server 1 and Server 2 should have different average processing times"
729+ ) ;
730+
731+ // Server 2 should generally have higher averages since its processing times are higher
732+ assert ! (
733+ server2_final_avg > server1_final_avg,
734+ "Server 2 average ({server2_final_avg}) should be higher than Server 1 average ({server1_final_avg})"
735+ ) ;
736+
737+ // Verify that the moving average calculation maintains consistency
738+ // The last result for each server should match the final stored average
739+ let server1_last_result = server1_results. last ( ) . copied ( ) . unwrap ( ) ;
740+ let server2_last_result = server2_results. last ( ) . copied ( ) . unwrap ( ) ;
741+
742+ // Note: Due to race conditions, the last result might not exactly match the final stored average
743+ // but it should be in a reasonable range. We'll check that they're in the same ballpark.
744+ let server1_diff = ( server1_last_result - server1_final_avg) . abs ( ) ;
745+ let server2_diff = ( server2_last_result - server2_final_avg) . abs ( ) ;
746+
747+ assert ! (
748+ server1_diff <= 0.0 ,
749+ "Server 1 last result ({server1_last_result}) should be equal to final average ({server1_final_avg}), diff: {server1_diff}" ,
750+ ) ;
751+
752+ assert ! (
753+ server2_diff <= 0.0 ,
754+ "Server 2 last result ({server2_last_result}) should be equal to final average ({server2_final_avg}), diff: {server2_diff}" ,
755+ ) ;
756+
757+ // Verify that the metric collection contains the expected metrics for both servers
758+ assert ! ( stats
759+ . metric_collection
760+ . contains_gauge( & metric_name!( UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS ) ) ) ;
761+ assert ! ( stats
762+ . metric_collection
763+ . contains_counter( & metric_name!( UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSED_REQUESTS_TOTAL ) ) ) ;
764+
765+ println ! (
766+ "Race condition test completed successfully:\n Server 1: {server1_processed} requests, final avg: {server1_final_avg}ns\n Server 2: {server2_processed} requests, final avg: {server2_final_avg}ns"
767+ ) ;
768+ }
567769}
0 commit comments