@@ -565,205 +565,211 @@ mod tests {
565565 assert ! ( result. is_ok( ) ) ;
566566 }
567567
568- #[ tokio:: test]
569- #[ allow( clippy:: too_many_lines) ]
570- async fn it_should_handle_race_conditions_when_updating_udp_performance_metrics_in_parallel ( ) {
571- // Number of concurrent requests per server
572- const REQUESTS_PER_SERVER : usize = 100 ;
568+ mod race_conditions {
573569
574- let repo = Repository :: new ( ) ;
575- let now = CurrentClock :: now ( ) ;
570+ use core :: f64 ;
571+ use std :: time :: Duration ;
576572
577- // Define labels for two different UDP servers
578- let server1_labels = LabelSet :: from ( [
579- ( "request_kind" , "connect" ) ,
580- ( "server_binding_address_ip_family" , "inet" ) ,
581- ( "server_port" , "6868" ) ,
582- ] ) ;
583- let server2_labels = LabelSet :: from ( [
584- ( "request_kind" , "connect" ) ,
585- ( "server_binding_address_ip_family" , "inet" ) ,
586- ( "server_port" , "6969" ) ,
587- ] ) ;
573+ use tokio:: task:: JoinHandle ;
574+ use torrust_tracker_clock:: clock:: Time ;
575+ use torrust_tracker_metrics:: metric_name;
588576
589- let mut handles = vec ! [ ] ;
577+ use super :: * ;
578+ use crate :: CurrentClock ;
590579
591- // Spawn tasks for server 1
592- for i in 0 ..REQUESTS_PER_SERVER {
593- let repo_clone = repo. clone ( ) ;
594- let labels = server1_labels. clone ( ) ;
595- let handle = tokio:: spawn ( async move {
596- // Simulate varying processing times (1000ns to 5000ns)
597- let processing_time_ns = 1000 + ( i % 5 ) * 1000 ;
598- let processing_time = Duration :: from_nanos ( processing_time_ns as u64 ) ;
580+ #[ tokio:: test]
581+ async fn it_should_handle_race_conditions_when_updating_udp_performance_metrics_in_parallel ( ) {
582+ const REQUESTS_PER_SERVER : usize = 100 ;
599583
600- repo_clone
601- . recalculate_udp_avg_processing_time_ns ( processing_time, & labels, now)
602- . await
603- } ) ;
604- handles. push ( handle) ;
605- }
584+ // ** Set up test data and environment **
606585
607- // Spawn tasks for server 2
608- for i in 0 ..REQUESTS_PER_SERVER {
609- let repo_clone = repo. clone ( ) ;
610- let labels = server2_labels. clone ( ) ;
611- let handle = tokio:: spawn ( async move {
612- // Simulate different processing times (2000ns to 6000ns)
613- let processing_time_ns = 2000 + ( i % 5 ) * 1000 ;
614- let processing_time = Duration :: from_nanos ( processing_time_ns as u64 ) ;
586+ let repo = Repository :: new ( ) ;
587+ let now = CurrentClock :: now ( ) ;
615588
616- repo_clone
617- . recalculate_udp_avg_processing_time_ns ( processing_time, & labels, now)
618- . await
619- } ) ;
620- handles. push ( handle) ;
621- }
589+ let server1_labels = create_server_metric_labels ( "6868" ) ;
590+ let server2_labels = create_server_metric_labels ( "6969" ) ;
622591
623- // Collect all the results
624- let mut server1_results = Vec :: new ( ) ;
625- let mut server2_results = Vec :: new ( ) ;
592+ // ** Execute concurrent metric updates **
626593
627- for ( i, handle) in handles. into_iter ( ) . enumerate ( ) {
628- let result = handle. await . unwrap ( ) ;
629- if i < REQUESTS_PER_SERVER {
630- server1_results. push ( result) ;
631- } else {
632- server2_results. push ( result) ;
633- }
634- }
594+ // Spawn concurrent tasks for server 1 with processing times [1000, 2000, 3000, 4000, 5000] ns
595+ let server1_handles = spawn_server_tasks ( & repo, & server1_labels, 1000 , now, REQUESTS_PER_SERVER ) ;
635596
636- // Verify that all tasks completed successfully
637- assert_eq ! ( server1_results. len( ) , REQUESTS_PER_SERVER ) ;
638- assert_eq ! ( server2_results. len( ) , REQUESTS_PER_SERVER ) ;
597+ // Spawn concurrent tasks for server 2 with processing times [2000, 3000, 4000, 5000, 6000] ns
598+ let server2_handles = spawn_server_tasks ( & repo, & server2_labels, 2000 , now, REQUESTS_PER_SERVER ) ;
639599
640- // Verify that all results are finite and positive
641- for result in & server1_results {
642- assert ! ( result. is_finite( ) , "Server 1 result should be finite: {result}" ) ;
643- assert ! ( * result > 0.0 , "Server 1 result should be positive: {result}" ) ;
644- }
600+ // Wait for both servers' results
601+ let ( server1_results, server2_results) = tokio:: join!(
602+ collect_concurrent_task_results( server1_handles) ,
603+ collect_concurrent_task_results( server2_handles)
604+ ) ;
605+
606+ // ** Verify results and metrics **
607+
608+ // Verify correctness of concurrent operations
609+ assert_server_results_are_valid ( & server1_results, "Server 1" , REQUESTS_PER_SERVER ) ;
610+ assert_server_results_are_valid ( & server2_results, "Server 2" , REQUESTS_PER_SERVER ) ;
611+
612+ let stats = repo. get_stats ( ) . await ;
645613
646- for result in & server2_results {
647- assert ! ( result. is_finite( ) , "Server 2 result should be finite: {result}" ) ;
648- assert ! ( * result > 0.0 , "Server 2 result should be positive: {result}" ) ;
614+ // Verify each server's metrics individually
615+ let server1_avg = assert_server_metrics_are_correct ( & stats, & server1_labels, "Server 1" , REQUESTS_PER_SERVER , 3000.0 ) ;
616+ let server2_avg = assert_server_metrics_are_correct ( & stats, & server2_labels, "Server 2" , REQUESTS_PER_SERVER , 4000.0 ) ;
617+
618+ // Verify relationship between servers
619+ assert_server_metrics_relationship ( server1_avg, server2_avg) ;
620+
621+ // Verify each server's result consistency individually
622+ assert_server_result_matches_stored_average ( & server1_results, & stats, & server1_labels, "Server 1" ) ;
623+ assert_server_result_matches_stored_average ( & server2_results, & stats, & server2_labels, "Server 2" ) ;
624+
625+ // Verify metric collection integrity
626+ assert_metric_collection_integrity ( & stats) ;
649627 }
650628
651- // Get final stats and verify metrics integrity
652- let stats = repo. get_stats ( ) . await ;
629+ // Test helper functions to hide implementation details
653630
654- // Verify that the processed requests counters are correct for each server
655- #[ allow( clippy:: cast_sign_loss) ]
656- #[ allow( clippy:: cast_possible_truncation) ]
657- let server1_processed = stats
658- . metric_collection
659- . get_counter_value (
660- & metric_name ! ( UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSED_REQUESTS_TOTAL ) ,
661- & server1_labels,
662- )
663- . unwrap ( )
664- . value ( ) ;
631+ fn create_server_metric_labels ( port : & str ) -> LabelSet {
632+ LabelSet :: from ( [
633+ ( "request_kind" , "connect" ) ,
634+ ( "server_binding_address_ip_family" , "inet" ) ,
635+ ( "server_port" , port) ,
636+ ] )
637+ }
665638
666- #[ allow( clippy:: cast_sign_loss) ]
667- #[ allow( clippy:: cast_possible_truncation) ]
668- let server2_processed = stats
669- . metric_collection
670- . get_counter_value (
671- & metric_name ! ( UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSED_REQUESTS_TOTAL ) ,
672- & server2_labels,
673- )
674- . unwrap ( )
675- . value ( ) ;
639+ fn spawn_server_tasks (
640+ repo : & Repository ,
641+ labels : & LabelSet ,
642+ base_processing_time_ns : usize ,
643+ now : DurationSinceUnixEpoch ,
644+ requests_per_server : usize ,
645+ ) -> Vec < JoinHandle < f64 > > {
646+ let mut handles = vec ! [ ] ;
647+
648+ for i in 0 ..requests_per_server {
649+ let repo_clone = repo. clone ( ) ;
650+ let labels_clone = labels. clone ( ) ;
651+ let handle = tokio:: spawn ( async move {
652+ let processing_time_ns = base_processing_time_ns + ( i % 5 ) * 1000 ;
653+ let processing_time = Duration :: from_nanos ( processing_time_ns as u64 ) ;
654+ repo_clone
655+ . recalculate_udp_avg_processing_time_ns ( processing_time, & labels_clone, now)
656+ . await
657+ } ) ;
658+ handles. push ( handle) ;
659+ }
676660
677- assert_eq ! (
678- server1_processed, REQUESTS_PER_SERVER as u64 ,
679- "Server 1 should have processed {REQUESTS_PER_SERVER} requests" ,
680- ) ;
681- assert_eq ! (
682- server2_processed, REQUESTS_PER_SERVER as u64 ,
683- "Server 2 should have processed {REQUESTS_PER_SERVER} requests" ,
684- ) ;
661+ handles
662+ }
685663
686- // Verify that the final average processing times are reasonable
687- #[ allow( clippy:: cast_sign_loss) ]
688- #[ allow( clippy:: cast_possible_truncation) ]
689- let server1_final_avg = stats
690- . metric_collection
691- . get_gauge_value (
692- & metric_name ! ( UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS ) ,
693- & server1_labels,
694- )
695- . unwrap ( )
696- . value ( ) ;
664+ async fn collect_concurrent_task_results ( handles : Vec < tokio:: task:: JoinHandle < f64 > > ) -> Vec < f64 > {
665+ let mut server_results = Vec :: new ( ) ;
697666
698- #[ allow( clippy:: cast_sign_loss) ]
699- #[ allow( clippy:: cast_possible_truncation) ]
700- let server2_final_avg = stats
701- . metric_collection
702- . get_gauge_value (
703- & metric_name ! ( UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS ) ,
704- & server2_labels,
705- )
706- . unwrap ( )
707- . value ( ) ;
667+ for handle in handles {
668+ let result = handle. await . unwrap ( ) ;
669+ server_results. push ( result) ;
670+ }
708671
709- // Server 1: 100 requests cycling through [1000, 2000, 3000, 4000, 5000] ns
710- // Expected average: (20×1000 + 20×2000 + 20×3000 + 20×4000 + 20×5000) / 100 = 3000 ns
711- // Note: Moving average with concurrent updates may have small deviations due to order dependency
712- assert ! (
713- ( server1_final_avg - 3000.0 ) . abs( ) < 50.0 ,
714- "Server 1 final average should be close to 3000ns (±50ns), got {server1_final_avg}ns"
715- ) ;
672+ server_results
673+ }
716674
717- // Server 2: 100 requests cycling through [2000, 3000, 4000, 5000, 6000] ns
718- // Expected average: (20×2000 + 20×3000 + 20×4000 + 20×5000 + 20×6000) / 100 = 4000 ns
719- // Note: Moving average with concurrent updates may have small deviations due to order dependency
720- assert ! (
721- ( server2_final_avg - 4000.0 ) . abs( ) < 50.0 ,
722- "Server 2 final average should be close to 4000ns (±50ns), got {server2_final_avg}ns"
723- ) ;
675+ fn assert_server_results_are_valid ( results : & [ f64 ] , server_name : & str , expected_count : usize ) {
676+ // Verify all tasks completed
677+ assert_eq ! (
678+ results. len( ) ,
679+ expected_count,
680+ "{server_name} should have {expected_count} results"
681+ ) ;
682+
683+ // Verify all results are valid numbers
684+ for result in results {
685+ assert ! ( result. is_finite( ) , "{server_name} result should be finite: {result}" ) ;
686+ assert ! ( * result > 0.0 , "{server_name} result should be positive: {result}" ) ;
687+ }
688+ }
724689
725- // Verify that the two servers have different averages (they should since they have different processing time ranges)
726- assert ! (
727- ( server1_final_avg - server2_final_avg) . abs( ) > 950.0 ,
728- "Server 1 and Server 2 should have different average processing times"
729- ) ;
690+ fn assert_server_metrics_are_correct (
691+ stats : & Metrics ,
692+ labels : & LabelSet ,
693+ server_name : & str ,
694+ expected_request_count : usize ,
695+ expected_avg_ns : f64 ,
696+ ) -> f64 {
697+ // Verify request count
698+ let processed_requests = get_processed_requests_count ( stats, labels) ;
699+ assert_eq ! (
700+ processed_requests, expected_request_count as u64 ,
701+ "{server_name} should have processed {expected_request_count} requests"
702+ ) ;
703+
704+ // Verify average processing time is within expected range
705+ let avg_processing_time = get_average_processing_time ( stats, labels) ;
706+ assert ! (
707+ ( avg_processing_time - expected_avg_ns) . abs( ) < 50.0 ,
708+ "{server_name} average should be ~{expected_avg_ns}ns (±50ns), got {avg_processing_time}ns"
709+ ) ;
710+
711+ avg_processing_time
712+ }
730713
731- // Server 2 should generally have higher averages since its processing times are higher
732- assert ! (
733- server2_final_avg > server1_final_avg,
734- "Server 2 average ({server2_final_avg}) should be higher than Server 1 average ({server1_final_avg})"
735- ) ;
714+ fn assert_server_metrics_relationship ( server1_avg : f64 , server2_avg : f64 ) {
715+ const MIN_DIFFERENCE_NS : f64 = 950.0 ;
736716
737- // Verify that the moving average calculation maintains consistency
738- // The last result for each server should match the final stored average
739- let server1_last_result = server1_results. last ( ) . copied ( ) . unwrap ( ) ;
740- let server2_last_result = server2_results. last ( ) . copied ( ) . unwrap ( ) ;
717+ assert_averages_are_significantly_different ( server1_avg, server2_avg, MIN_DIFFERENCE_NS ) ;
718+ assert_server_ordering_is_correct ( server1_avg, server2_avg) ;
719+ }
741720
742- // Note: Due to race conditions, the last result might not exactly match the final stored average
743- // but it should be in a reasonable range. We'll check that they're in the same ballpark.
744- let server1_diff = ( server1_last_result - server1_final_avg) . abs ( ) ;
745- let server2_diff = ( server2_last_result - server2_final_avg) . abs ( ) ;
721+ fn assert_averages_are_significantly_different ( avg1 : f64 , avg2 : f64 , min_difference : f64 ) {
722+ let difference = ( avg1 - avg2) . abs ( ) ;
723+ assert ! (
724+ difference > min_difference,
725+ "Server averages should differ by more than {min_difference}ns, but difference was {difference}ns"
726+ ) ;
727+ }
746728
747- assert ! (
748- server1_diff <= 0.0 ,
749- "Server 1 last result ({server1_last_result}) should be equal to final average ({server1_final_avg}), diff: {server1_diff}" ,
729+ fn assert_server_ordering_is_correct ( server1_avg : f64 , server2_avg : f64 ) {
730+ // Server 2 should have higher average since it has higher processing times [2000-6000] vs [1000-5000]
731+ assert ! (
732+ server2_avg > server1_avg,
733+ "Server 2 average ({server2_avg}ns) should be higher than Server 1 ({server1_avg}ns) due to higher processing time ranges"
750734 ) ;
735+ }
751736
752- assert ! (
753- server2_diff <= 0.0 ,
754- "Server 2 last result ({server2_last_result}) should be equal to final average ({server2_final_avg}), diff: {server2_diff}" ,
755- ) ;
737+ fn assert_server_result_matches_stored_average ( results : & [ f64 ] , stats : & Metrics , labels : & LabelSet , server_name : & str ) {
738+ let final_avg = get_average_processing_time ( stats, labels) ;
739+ let last_result = results. last ( ) . copied ( ) . unwrap ( ) ;
756740
757- // Verify that the metric collection contains the expected metrics for both servers
758- assert ! ( stats
759- . metric_collection
760- . contains_gauge( & metric_name!( UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS ) ) ) ;
761- assert ! ( stats
762- . metric_collection
763- . contains_counter( & metric_name!( UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSED_REQUESTS_TOTAL ) ) ) ;
741+ assert ! (
742+ ( last_result - final_avg) . abs( ) <= f64 :: EPSILON ,
743+ "{server_name} last result ({last_result}) should match final average ({final_avg}) exactly"
744+ ) ;
745+ }
764746
765- println ! (
766- "Race condition test completed successfully:\n Server 1: {server1_processed} requests, final avg: {server1_final_avg}ns\n Server 2: {server2_processed} requests, final avg: {server2_final_avg}ns"
767- ) ;
747+ fn assert_metric_collection_integrity ( stats : & Metrics ) {
748+ assert ! ( stats
749+ . metric_collection
750+ . contains_gauge( & metric_name!( UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS ) ) ) ;
751+ assert ! ( stats
752+ . metric_collection
753+ . contains_counter( & metric_name!( UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSED_REQUESTS_TOTAL ) ) ) ;
754+ }
755+
756+ fn get_processed_requests_count ( stats : & Metrics , labels : & LabelSet ) -> u64 {
757+ stats
758+ . metric_collection
759+ . get_counter_value (
760+ & metric_name ! ( UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSED_REQUESTS_TOTAL ) ,
761+ labels,
762+ )
763+ . unwrap ( )
764+ . value ( )
765+ }
766+
767+ fn get_average_processing_time ( stats : & Metrics , labels : & LabelSet ) -> f64 {
768+ stats
769+ . metric_collection
770+ . get_gauge_value ( & metric_name ! ( UDP_TRACKER_SERVER_PERFORMANCE_AVG_PROCESSING_TIME_NS ) , labels)
771+ . unwrap ( )
772+ . value ( )
773+ }
768774 }
769775}
0 commit comments