@@ -73,7 +73,7 @@ func TestSnapshotThreadSafety(t *testing.T) {
73
73
// TestFreshSnapshotCaching tests that fresh snapshots are cached and not recomputed.
74
74
func TestFreshSnapshotCaching (t * testing.T ) {
75
75
mockMeter := & MockCPUPowerMeter {}
76
- pkg := new ( MockEnergyZone )
76
+ pkg := & MockEnergyZone {}
77
77
pkg .On ("Name" ).Return ("package" )
78
78
pkg .On ("MaxEnergy" ).Return (Energy (1_000_000 ))
79
79
@@ -182,9 +182,9 @@ func TestStaleSnapshotRefreshing(t *testing.T) {
182
182
// TestSingleflightSnapshot tests that concurrent requests for stale data
183
183
// result in only one computation.
184
184
func TestSingleflightSnapshot (t * testing.T ) {
185
- mockMeter := new ( MockCPUPowerMeter )
185
+ mockMeter := & MockCPUPowerMeter {}
186
186
// only needs Name and Energy & Max for computation
187
- pkg := new ( MockEnergyZone )
187
+ pkg := & MockEnergyZone {}
188
188
pkg .On ("Name" ).Return ("package" )
189
189
190
190
var energyCallCount atomic.Int32
@@ -193,6 +193,7 @@ func TestSingleflightSnapshot(t *testing.T) {
193
193
time .Sleep (20 * time .Millisecond )
194
194
energyCallCount .Add (1 )
195
195
}).Return (Energy (100_000 ), nil )
196
+
196
197
pkg .On ("MaxEnergy" ).Return (Energy (1_000_000 ))
197
198
198
199
energyZones := []device.EnergyZone {pkg }
@@ -424,3 +425,173 @@ func TestSnapshot_ConcurrentAfterError(t *testing.T) {
424
425
assert .NotEqual (t , s1 .Timestamp , lastSnapshot .Timestamp ,
425
426
"New snapshot should have a different timestamp" )
426
427
}
428
+
429
+ func TestPowerMonitor_ConcurrentCollection (t * testing.T ) {
430
+ t .Run ("snapshots while collection should not cause race conditions" , func (t * testing.T ) {
431
+ mockMeter := & MockCPUPowerMeter {}
432
+
433
+ // Set up pkg mock
434
+ pkg := & MockEnergyZone {}
435
+ pkg .On ("Name" ).Return ("package" )
436
+ pkg .On ("MaxEnergy" ).Return (Energy (1000 * Joule ))
437
+
438
+ // Energy reads will return increasing values with artificial delay
439
+ var energyVal atomic.Uint64
440
+ energyVal .Store (1000 )
441
+
442
+ var computationCount atomic.Int32
443
+ pkg .On ("Energy" ).Run (func (args mock.Arguments ) {
444
+ computationCount .Add (1 )
445
+ // Add small delay to simulate slow I/O and increase chance of concurrent access
446
+ time .Sleep (8 * time .Millisecond )
447
+ // Increment by 10 each time
448
+ energyVal .Add (10 )
449
+ }).Return (Energy (energyVal .Load ()), nil ).Maybe ()
450
+
451
+ mockMeter .On ("Init" , mock .Anything ).Return (nil ).Once ()
452
+ mockMeter .On ("Zones" ).Return ([]EnergyZone {pkg }, nil )
453
+
454
+ fakeClock := testingclock .NewFakeClock (time .Now ())
455
+
456
+ monitor := NewPowerMonitor (
457
+ mockMeter ,
458
+ WithInterval (50 * time .Millisecond ),
459
+ WithClock (fakeClock ),
460
+ WithMaxStaleness (30 * time .Millisecond ), // Short staleness to force recalculation
461
+ )
462
+
463
+ // Initialize monitor
464
+ ctx , cancel := context .WithCancel (context .Background ())
465
+ defer cancel ()
466
+
467
+ err := monitor .Init (ctx )
468
+ require .NoError (t , err )
469
+
470
+ // run in backgroundn
471
+ go monitor .Run (ctx )
472
+
473
+ // wait for monitor to startt
474
+ time .Sleep (10 * time .Millisecond )
475
+
476
+ // request snapshots concurrently and advance time
477
+ var wg sync.WaitGroup
478
+ numGoroutines := runtime .NumCPU () * 3
479
+ const numIterations = 5
480
+
481
+ // Track any errors
482
+ var encounteredErr atomic.Bool
483
+
484
+ for i := range numGoroutines {
485
+ wg .Add (1 )
486
+ go func (id int ) {
487
+ defer wg .Done ()
488
+
489
+ for j := range numIterations {
490
+
491
+ // Each goroutine requests a snapshot
492
+ snapshot , err := monitor .Snapshot ()
493
+ if err != nil {
494
+ t .Logf ("Goroutine %d iteration %d: Error getting snapshot: %v" , id , j , err )
495
+ encounteredErr .Store (true )
496
+ continue
497
+ }
498
+
499
+ // Verify snapshot is valid
500
+ if snapshot == nil {
501
+ t .Logf ("Goroutine %d iteration %d: Got nil snapshot" , id , j )
502
+ encounteredErr .Store (true )
503
+ continue
504
+ }
505
+
506
+ // Verify snapshot has zone data
507
+ if _ , ok := snapshot .Node .Zones [pkg ]; ! ok {
508
+ t .Logf ("Goroutine %d iteration %d: Missing zone data" , id , j )
509
+ encounteredErr .Store (true )
510
+ }
511
+
512
+ // sleep a bit to increase chance of concurrent access
513
+ time .Sleep (time .Duration (id ) * time .Millisecond )
514
+ }
515
+ }(i )
516
+ }
517
+
518
+ // keep clock ticking while snapshots are requested
519
+ go func () {
520
+ for range numIterations {
521
+ fakeClock .Step (50 * time .Millisecond )
522
+ time .Sleep (10 * time .Millisecond )
523
+ }
524
+ }()
525
+
526
+ wg .Wait ()
527
+
528
+ // verify no errors were encountered
529
+ assert .False (t , encounteredErr .Load (), "Some goroutines encountered errors" )
530
+
531
+ mockMeter .AssertExpectations (t )
532
+ pkg .AssertExpectations (t )
533
+ })
534
+
535
+ t .Run ("Snapshot handles stale data with concurrent requests" , func (t * testing.T ) {
536
+ mockMeter := & MockCPUPowerMeter {}
537
+
538
+ pkg := & MockEnergyZone {}
539
+ pkg .On ("Name" ).Return ("package" )
540
+
541
+ // Track the number of collections by tracking energy reads
542
+ var computeCount atomic.Int32
543
+ pkg .On ("Energy" ).Run (func (args mock.Arguments ) {
544
+ // Add delay to increase chance of concurrent access
545
+ time .Sleep (10 * time .Millisecond )
546
+ computeCount .Add (1 )
547
+ }).Return (Energy (100 * Joule ), nil ).Maybe ()
548
+
549
+ mockMeter .On ("Init" , mock .Anything ).Return (nil ).Once ()
550
+ mockMeter .On ("Zones" ).Return ([]EnergyZone {pkg }, nil )
551
+
552
+ fakeClock := testingclock .NewFakeClock (time .Now ())
553
+
554
+ monitor := NewPowerMonitor (
555
+ mockMeter ,
556
+ WithClock (fakeClock ),
557
+ WithMaxStaleness (50 * time .Millisecond ),
558
+ )
559
+
560
+ ctx , cancel := context .WithCancel (context .Background ())
561
+ defer cancel ()
562
+
563
+ err := monitor .Init (ctx )
564
+ require .NoError (t , err )
565
+
566
+ // Init should not collect
567
+ assert .Equal (t , int32 (0 ), computeCount .Load ())
568
+
569
+ // Launch multiple concurrent requests for the stale snapshot
570
+ var wg sync.WaitGroup
571
+ numGoroutines := runtime .NumCPU () * 3
572
+
573
+ for range numGoroutines {
574
+ wg .Add (1 )
575
+ go func () {
576
+ defer wg .Done ()
577
+
578
+ snapshot , err := monitor .Snapshot ()
579
+ assert .NoError (t , err )
580
+ assert .NotNil (t , snapshot )
581
+
582
+ // Verify snapshot has zone data
583
+ assert .Contains (t , snapshot .Node .Zones , pkg )
584
+ }()
585
+ }
586
+
587
+ wg .Wait ()
588
+
589
+ // Despite having may concurrent requests for a stale snapshot,
590
+ // we should have computed power only once due to singleflight
591
+ assert .Equal (t , int32 (1 ), computeCount .Load (),
592
+ "Computation should happen exactly once despite concurrent requests" )
593
+
594
+ mockMeter .AssertExpectations (t )
595
+ pkg .AssertExpectations (t )
596
+ })
597
+ }
0 commit comments