From a52709f344f255499c65c1784f32a7edca587c0a Mon Sep 17 00:00:00 2001 From: Mathis Engelbart Date: Sat, 22 Mar 2025 21:29:31 +0800 Subject: [PATCH 01/11] Move GCC rewrite from interceptor to this repo --- gcc/acknowledgment.go | 38 ++++ gcc/arrival_group_accumulator.go | 48 +++++ gcc/arrival_group_accumulator_test.go | 239 +++++++++++++++++++++++++ gcc/delay_rate_controller.go | 86 +++++++++ gcc/delivery_rate_estimator.go | 87 +++++++++ gcc/delivery_rate_estimator_test.go | 74 ++++++++ gcc/exponential_moving_average.go | 19 ++ gcc/exponential_moving_average_test.go | 122 +++++++++++++ gcc/kalman.go | 92 ++++++++++ gcc/loss_rate_controller.go | 53 ++++++ gcc/loss_rate_controller_test.go | 86 +++++++++ gcc/overuse_detector.go | 84 +++++++++ gcc/overuse_detector_test.go | 186 +++++++++++++++++++ gcc/rate_controller.go | 76 ++++++++ gcc/rate_controller_test.go | 143 +++++++++++++++ gcc/rtt.go | 8 + gcc/send_side_bwe.go | 50 ++++++ gcc/state.go | 59 ++++++ gcc/state_test.go | 27 +++ gcc/usage.go | 24 +++ go.mod | 19 +- go.sum | 11 +- 22 files changed, 1612 insertions(+), 19 deletions(-) create mode 100644 gcc/acknowledgment.go create mode 100644 gcc/arrival_group_accumulator.go create mode 100644 gcc/arrival_group_accumulator_test.go create mode 100644 gcc/delay_rate_controller.go create mode 100644 gcc/delivery_rate_estimator.go create mode 100644 gcc/delivery_rate_estimator_test.go create mode 100644 gcc/exponential_moving_average.go create mode 100644 gcc/exponential_moving_average_test.go create mode 100644 gcc/kalman.go create mode 100644 gcc/loss_rate_controller.go create mode 100644 gcc/loss_rate_controller_test.go create mode 100644 gcc/overuse_detector.go create mode 100644 gcc/overuse_detector_test.go create mode 100644 gcc/rate_controller.go create mode 100644 gcc/rate_controller_test.go create mode 100644 gcc/rtt.go create mode 100644 gcc/send_side_bwe.go create mode 100644 gcc/state.go create mode 100644 gcc/state_test.go create mode 100644 gcc/usage.go diff --git a/gcc/acknowledgment.go b/gcc/acknowledgment.go new file mode 100644 index 0000000..c23347f --- /dev/null +++ b/gcc/acknowledgment.go @@ -0,0 +1,38 @@ +package gcc + +import ( + "fmt" + "time" +) + +type ECN uint8 + +const ( + //nolint:misspell + // ECNNonECT signals Non ECN-Capable Transport, Non-ECT + ECNNonECT ECN = iota // 00 + + //nolint:misspell + // ECNECT1 signals ECN Capable Transport, ECT(0) + ECNECT1 // 01 + + //nolint:misspell + // ECNECT0 signals ECN Capable Transport, ECT(1) + ECNECT0 // 10 + + // ECNCE signals ECN Congestion Encountered, CE + ECNCE // 11 +) + +type Acknowledgment struct { + SeqNr int64 + Size uint16 + Departure time.Time + Arrived bool + Arrival time.Time + ECN ECN +} + +func (a Acknowledgment) String() string { + return fmt.Sprintf("seq=%v, departure=%v, arrival=%v", a.SeqNr, a.Departure, a.Arrival) +} diff --git a/gcc/arrival_group_accumulator.go b/gcc/arrival_group_accumulator.go new file mode 100644 index 0000000..a83e09f --- /dev/null +++ b/gcc/arrival_group_accumulator.go @@ -0,0 +1,48 @@ +package gcc + +import ( + "time" +) + +type arrivalGroup []Acknowledgment + +type arrivalGroupAccumulator struct { + next arrivalGroup + burstInterval time.Duration + maxBurstDuration time.Duration +} + +func newArrivalGroupAccumulator() *arrivalGroupAccumulator { + return &arrivalGroupAccumulator{ + next: make([]Acknowledgment, 0), + burstInterval: 5 * time.Millisecond, + maxBurstDuration: 100 * time.Millisecond, + } +} + +func (a *arrivalGroupAccumulator) onPacketAcked(ack Acknowledgment) arrivalGroup { + if len(a.next) == 0 { + a.next = append(a.next, ack) + return nil + } + + if ack.Departure.Sub(a.next[0].Departure) < a.burstInterval { + a.next = append(a.next, ack) + return nil + } + + sendTimeDelta := ack.Departure.Sub(a.next[0].Departure) + arrivalTimeDeltaLast := ack.Arrival.Sub(a.next[len(a.next)-1].Arrival) + arrivalTimeDeltaFirst := ack.Arrival.Sub(a.next[0].Arrival) + propagationDelta := arrivalTimeDeltaFirst - sendTimeDelta + + if propagationDelta < 0 && arrivalTimeDeltaLast <= a.burstInterval && arrivalTimeDeltaFirst < a.maxBurstDuration { + a.next = append(a.next, ack) + return nil + } + + group := make(arrivalGroup, len(a.next)) + copy(group, a.next) + a.next = arrivalGroup{ack} + return group +} diff --git a/gcc/arrival_group_accumulator_test.go b/gcc/arrival_group_accumulator_test.go new file mode 100644 index 0000000..b40e454 --- /dev/null +++ b/gcc/arrival_group_accumulator_test.go @@ -0,0 +1,239 @@ +package gcc + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestArrivalGroupAccumulator(t *testing.T) { + triggerNewGroupElement := Acknowledgment{ + Departure: time.Time{}.Add(time.Second), + Arrival: time.Time{}.Add(time.Second), + } + cases := []struct { + name string + log []Acknowledgment + exp []arrivalGroup + }{ + { + name: "emptyCreatesNoGroups", + log: []Acknowledgment{}, + exp: []arrivalGroup{}, + }, + { + name: "createsSingleElementGroup", + log: []Acknowledgment{ + { + Departure: time.Time{}, + Arrival: time.Time{}.Add(time.Millisecond), + }, + triggerNewGroupElement, + }, + exp: []arrivalGroup{{ + { + Departure: time.Time{}, + Arrival: time.Time{}.Add(time.Millisecond), + }, + }, + }, + }, + { + name: "createsTwoElementGroup", + log: []Acknowledgment{ + { + Arrival: time.Time{}.Add(15 * time.Millisecond), + }, + { + Departure: time.Time{}.Add(3 * time.Millisecond), + Arrival: time.Time{}.Add(20 * time.Millisecond), + }, + triggerNewGroupElement, + }, + exp: []arrivalGroup{{ + { + Departure: time.Time{}, + Arrival: time.Time{}.Add(15 * time.Millisecond), + }, + { + Departure: time.Time{}.Add(3 * time.Millisecond), + Arrival: time.Time{}.Add(20 * time.Millisecond), + }, + }}, + }, + { + name: "createsTwoArrivalGroups1", + log: []Acknowledgment{ + { + Departure: time.Time{}, + Arrival: time.Time{}.Add(15 * time.Millisecond), + }, + { + Departure: time.Time{}.Add(3 * time.Millisecond), + Arrival: time.Time{}.Add(20 * time.Millisecond), + }, + { + Departure: time.Time{}.Add(9 * time.Millisecond), + Arrival: time.Time{}.Add(24 * time.Millisecond), + }, + triggerNewGroupElement, + }, + exp: []arrivalGroup{ + { + { + Arrival: time.Time{}.Add(15 * time.Millisecond), + }, + { + Departure: time.Time{}.Add(3 * time.Millisecond), + Arrival: time.Time{}.Add(20 * time.Millisecond), + }, + }, + { + { + Departure: time.Time{}.Add(9 * time.Millisecond), + Arrival: time.Time{}.Add(24 * time.Millisecond), + }, + }, + }, + }, + { + name: "createsTwoArrivalGroups2", + log: []Acknowledgment{ + { + Departure: time.Time{}, + Arrival: time.Time{}.Add(15 * time.Millisecond), + }, + { + Departure: time.Time{}.Add(3 * time.Millisecond), + Arrival: time.Time{}.Add(20 * time.Millisecond), + }, + { + Departure: time.Time{}.Add(9 * time.Millisecond), + Arrival: time.Time{}.Add(30 * time.Millisecond), + }, + triggerNewGroupElement, + }, + exp: []arrivalGroup{ + { + { + Arrival: time.Time{}.Add(15 * time.Millisecond), + }, + { + Departure: time.Time{}.Add(3 * time.Millisecond), + Arrival: time.Time{}.Add(20 * time.Millisecond), + }, + }, + { + { + Departure: time.Time{}.Add(9 * time.Millisecond), + Arrival: time.Time{}.Add(30 * time.Millisecond), + }, + }, + }, + }, + { + name: "ignoresOutOfOrderPackets", + log: []Acknowledgment{ + { + Departure: time.Time{}, + Arrival: time.Time{}.Add(15 * time.Millisecond), + }, + { + Departure: time.Time{}.Add(6 * time.Millisecond), + Arrival: time.Time{}.Add(34 * time.Millisecond), + }, + { + Departure: time.Time{}.Add(8 * time.Millisecond), + Arrival: time.Time{}.Add(30 * time.Millisecond), + }, + triggerNewGroupElement, + }, + exp: []arrivalGroup{ + { + { + Departure: time.Time{}, + Arrival: time.Time{}.Add(15 * time.Millisecond), + }, + }, + { + { + Departure: time.Time{}.Add(6 * time.Millisecond), + Arrival: time.Time{}.Add(34 * time.Millisecond), + }, + { + Departure: time.Time{}.Add(8 * time.Millisecond), + Arrival: time.Time{}.Add(30 * time.Millisecond), + }, + }, + }, + }, + { + name: "newGroupBecauseOfInterDepartureTime", + log: []Acknowledgment{ + { + SeqNr: 0, + Departure: time.Time{}, + Arrival: time.Time{}.Add(4 * time.Millisecond), + }, + { + SeqNr: 1, + Departure: time.Time{}.Add(3 * time.Millisecond), + Arrival: time.Time{}.Add(4 * time.Millisecond), + }, + { + SeqNr: 2, + Departure: time.Time{}.Add(6 * time.Millisecond), + Arrival: time.Time{}.Add(10 * time.Millisecond), + }, + { + SeqNr: 3, + Departure: time.Time{}.Add(9 * time.Millisecond), + Arrival: time.Time{}.Add(10 * time.Millisecond), + }, + triggerNewGroupElement, + }, + exp: []arrivalGroup{ + { + { + SeqNr: 0, + Departure: time.Time{}, + Arrival: time.Time{}.Add(4 * time.Millisecond), + }, + { + SeqNr: 1, + Departure: time.Time{}.Add(3 * time.Millisecond), + Arrival: time.Time{}.Add(4 * time.Millisecond), + }, + }, + { + { + SeqNr: 2, + Departure: time.Time{}.Add(6 * time.Millisecond), + Arrival: time.Time{}.Add(10 * time.Millisecond), + }, + { + SeqNr: 3, + Departure: time.Time{}.Add(9 * time.Millisecond), + Arrival: time.Time{}.Add(10 * time.Millisecond), + }, + }, + }, + }, + } + + for _, tc := range cases { + tc := tc + t.Run(tc.name, func(t *testing.T) { + aga := newArrivalGroupAccumulator() + received := []arrivalGroup{} + for _, ack := range tc.log { + next := aga.onPacketAcked(ack) + if next != nil { + received = append(received, next) + } + } + assert.Equal(t, tc.exp, received) + }) + } +} diff --git a/gcc/delay_rate_controller.go b/gcc/delay_rate_controller.go new file mode 100644 index 0000000..5c4f7f2 --- /dev/null +++ b/gcc/delay_rate_controller.go @@ -0,0 +1,86 @@ +package gcc + +import ( + "time" + + "github.com/pion/logging" +) + +const maxSamples = 1000 + +type DelayRateController struct { + log logging.LeveledLogger + aga *arrivalGroupAccumulator + last arrivalGroup + kf *kalmanFilter + od *overuseDetector + rc *rateController + latestUsage usage + samples int +} + +func NewDelayRateController(initialRate int) *DelayRateController { + return &DelayRateController{ + log: logging.NewDefaultLoggerFactory().NewLogger("bwe_delay_rate_controller"), + aga: newArrivalGroupAccumulator(), + last: []Acknowledgment{}, + kf: newKalmanFilter(), + od: newOveruseDetector(true), + rc: newRateController(initialRate), + latestUsage: 0, + samples: 0, + } +} + +func (c *DelayRateController) OnPacketAcked(ack Acknowledgment) { + next := c.aga.onPacketAcked(ack) + if next == nil { + return + } + if len(next) == 0 { + // ignore empty groups, should never occur + return + } + if len(c.last) == 0 { + c.last = next + return + } + + prevSize := groupSize(c.last) + nextSize := groupSize(next) + sizeDelta := nextSize - prevSize + + interArrivalTime := next[len(next)-1].Arrival.Sub(c.last[len(c.last)-1].Arrival) + interDepartureTime := next[len(next)-1].Departure.Sub(c.last[len(c.last)-1].Departure) + interGroupDelay := interArrivalTime - interDepartureTime + estimate := c.kf.update(float64(interGroupDelay.Milliseconds()), float64(sizeDelta)) + c.samples++ + c.latestUsage = c.od.update(ack.Arrival, estimate, c.samples) + c.last = next + c.log.Tracef( + "ts=%v.%06d, seq=%v, size=%v, interArrivalTime=%v, interDepartureTime=%v, interGroupDelay=%v, estimate=%v, threshold=%v, usage=%v, state=%v", + c.last[0].Departure.UTC().Format("2006/01/02 15:04:05"), + c.last[0].Departure.UTC().Nanosecond()/1e3, + next[0].SeqNr, + nextSize, + interArrivalTime.Microseconds(), + interDepartureTime.Microseconds(), + interGroupDelay.Microseconds(), + estimate, + c.od.delayThreshold, + int(c.latestUsage), + int(c.rc.s), + ) +} + +func (c *DelayRateController) Update(ts time.Time, lastDeliveryRate int, rtt time.Duration) int { + return c.rc.update(ts, c.latestUsage, lastDeliveryRate, rtt) +} + +func groupSize(group arrivalGroup) int { + sum := 0 + for _, ack := range group { + sum += int(ack.Size) + } + return sum +} diff --git a/gcc/delivery_rate_estimator.go b/gcc/delivery_rate_estimator.go new file mode 100644 index 0000000..5bbf547 --- /dev/null +++ b/gcc/delivery_rate_estimator.go @@ -0,0 +1,87 @@ +package gcc + +import ( + "container/heap" + "time" +) + +type deliveryRateHeapItem struct { + arrival time.Time + size int +} + +type deliveryRateHeap []deliveryRateHeapItem + +// Len implements heap.Interface. +func (d deliveryRateHeap) Len() int { + return len(d) +} + +// Less implements heap.Interface. +func (d deliveryRateHeap) Less(i int, j int) bool { + return d[i].arrival.Before(d[j].arrival) +} + +// Pop implements heap.Interface. +func (d *deliveryRateHeap) Pop() any { + old := *d + n := len(old) + x := old[n-1] + *d = old[0 : n-1] + return x +} + +// Push implements heap.Interface. +func (d *deliveryRateHeap) Push(x any) { + *d = append(*d, x.(deliveryRateHeapItem)) +} + +// Swap implements heap.Interface. +func (d deliveryRateHeap) Swap(i int, j int) { + d[i], d[j] = d[j], d[i] +} + +type deliveryRateEstimator struct { + window time.Duration + latestArrival time.Time + history *deliveryRateHeap +} + +func newDeliveryRateEstimator(window time.Duration) *deliveryRateEstimator { + return &deliveryRateEstimator{ + window: window, + latestArrival: time.Time{}, + history: &deliveryRateHeap{}, + } +} + +func (e *deliveryRateEstimator) OnPacketAcked(arrival time.Time, size int) { + if arrival.After(e.latestArrival) { + e.latestArrival = arrival + } + heap.Push(e.history, deliveryRateHeapItem{ + arrival: arrival, + size: size, + }) +} + +func (e *deliveryRateEstimator) GetRate() int { + deadline := e.latestArrival.Add(-e.window) + for len(*e.history) > 0 && (*e.history)[0].arrival.Before(deadline) { + heap.Pop(e.history) + } + earliest := e.latestArrival + sum := 0 + for _, i := range *e.history { + if i.arrival.Before(earliest) { + earliest = i.arrival + } + sum += i.size + } + d := e.latestArrival.Sub(earliest) + if d == 0 { + return 0 + } + rate := 8 * float64(sum) / d.Seconds() + return int(rate) +} diff --git a/gcc/delivery_rate_estimator_test.go b/gcc/delivery_rate_estimator_test.go new file mode 100644 index 0000000..5fd4424 --- /dev/null +++ b/gcc/delivery_rate_estimator_test.go @@ -0,0 +1,74 @@ +package gcc + +import ( + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestDeliveryRateEstimator(t *testing.T) { + type ack struct { + arrival time.Time + size int + } + cases := []struct { + window time.Duration + acks []ack + expectedRate int + }{ + { + window: 0, + acks: []ack{}, + expectedRate: 0, + }, + { + window: time.Second, + acks: []ack{}, + expectedRate: 0, + }, + { + window: time.Second, + acks: []ack{ + {time.Time{}, 1200}, + }, + expectedRate: 0, + }, + { + window: time.Second, + acks: []ack{ + {time.Time{}.Add(time.Millisecond), 1200}, + }, + expectedRate: 0, + }, + { + window: time.Second, + acks: []ack{ + {time.Time{}.Add(time.Second), 1200}, + {time.Time{}.Add(1500 * time.Millisecond), 1200}, + {time.Time{}.Add(2 * time.Second), 1200}, + }, + expectedRate: 28800, + }, + { + window: time.Second, + acks: []ack{ + {time.Time{}.Add(500 * time.Millisecond), 1200}, + {time.Time{}.Add(time.Second), 1200}, + {time.Time{}.Add(1500 * time.Millisecond), 1200}, + {time.Time{}.Add(2 * time.Second), 1200}, + }, + expectedRate: 28800, + }, + } + for i, tc := range cases { + t.Run(fmt.Sprintf("%v", i), func(t *testing.T) { + e := newDeliveryRateEstimator(tc.window) + for _, ack := range tc.acks { + e.OnPacketAcked(ack.arrival, ack.size) + } + assert.Equal(t, tc.expectedRate, e.GetRate()) + }) + } +} diff --git a/gcc/exponential_moving_average.go b/gcc/exponential_moving_average.go new file mode 100644 index 0000000..8b568bd --- /dev/null +++ b/gcc/exponential_moving_average.go @@ -0,0 +1,19 @@ +package gcc + +type exponentialMovingAverage struct { + initialized bool + alpha float64 + average float64 + variance float64 +} + +func (a *exponentialMovingAverage) update(sample float64) { + if !a.initialized { + a.average = sample + a.initialized = true + } else { + delta := sample - a.average + a.average = a.alpha*sample + (1-a.alpha)*a.average + a.variance = (1-a.alpha)*a.variance + a.alpha*(1-a.alpha)*(delta*delta) + } +} diff --git a/gcc/exponential_moving_average_test.go b/gcc/exponential_moving_average_test.go new file mode 100644 index 0000000..6ccab62 --- /dev/null +++ b/gcc/exponential_moving_average_test.go @@ -0,0 +1,122 @@ +package gcc + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +// python to generate test cases: +// import numpy as np +// import pandas as pd +// data = np.random.randint(1, 10, size=10) +// df = pd.DataFrame(data) +// expectedAvg = df.ewm(alpha=0.9, adjust=False).mean() +// expectedVar = df.ewm(alpha=0.9, adjust=False).var(bias=True) + +func TestExponentialMovingAverage(t *testing.T) { + cases := []struct { + alpha float64 + updates []float64 + expectedAvg []float64 + expectedVar []float64 + }{ + { + alpha: 0.9, + updates: []float64{}, + expectedAvg: []float64{}, + expectedVar: []float64{}, + }, + { + alpha: 0.9, + updates: []float64{1, 2, 3, 4}, + expectedAvg: []float64{ + 1.000, + 1.900, + 2.890, + 3.889, + }, + expectedVar: []float64{ + 0.000000, + 0.090000, + 0.117900, + 0.122679, + }, + }, + { + alpha: 0.9, + updates: []float64{8, 8, 5, 1, 3, 1, 8, 2, 8, 9}, + expectedAvg: []float64{ + 8.000000, + 8.000000, + 5.300000, + 1.430000, + 2.843000, + 1.184300, + 7.318430, + 2.531843, + 7.453184, + 8.845318, + }, + expectedVar: []float64{ + 0.000000, + 0.000000, + 0.810000, + 1.745100, + 0.396351, + 0.345334, + 4.215372, + 2.967250, + 2.987792, + 0.514117, + }, + }, + { + alpha: 0.9, + updates: []float64{7, 5, 6, 7, 3, 6, 8, 9, 5, 5}, + expectedAvg: []float64{ + 7.000000, + 5.200000, + 5.920000, + 6.892000, + 3.389200, + 5.738920, + 7.773892, + 8.877389, + 5.387739, + 5.038774, + }, + expectedVar: []float64{ + 0.000000, + 0.360000, + 0.093600, + 0.114336, + 1.374723, + 0.750937, + 0.535217, + 0.188822, + 1.371955, + 0.150726, + }, + }, + } + for i, tc := range cases { + t.Run(fmt.Sprintf("%v", i), func(t *testing.T) { + a := exponentialMovingAverage{ + alpha: tc.alpha, + average: 0, + variance: 0, + } + avgs := []float64{} + vars := []float64{} + for _, u := range tc.updates { + a.update(u) + avgs = append(avgs, a.average) + vars = append(vars, a.variance) + } + assert.InDeltaSlice(t, tc.expectedAvg, avgs, 0.001) + assert.InDeltaSlice(t, tc.expectedVar, vars, 0.001) + }) + } +} diff --git a/gcc/kalman.go b/gcc/kalman.go new file mode 100644 index 0000000..236f7a9 --- /dev/null +++ b/gcc/kalman.go @@ -0,0 +1,92 @@ +package gcc + +import ( + "math" +) + +type kalmanFilter struct { + state [2]float64 // [slope, offset] + + processNoise [2]float64 + e [2][2]float64 + avgNoise float64 + varNoise float64 +} + +type kalmanOption func(*kalmanFilter) + +func initSlope(e float64) kalmanOption { + return func(k *kalmanFilter) { + k.state[0] = e + } +} + +func newKalmanFilter(opts ...kalmanOption) *kalmanFilter { + kf := &kalmanFilter{ + state: [2]float64{8.0 / 512.0, 0}, + processNoise: [2]float64{1e-13, 1e-3}, + e: [2][2]float64{{100.0, 0}, {0, 1e-1}}, + varNoise: 50.0, + } + for _, opt := range opts { + opt(kf) + } + return kf +} + +func (k *kalmanFilter) update(timeDelta float64, sizeDelta float64) float64 { + k.e[0][0] += k.processNoise[0] + k.e[1][1] += k.processNoise[1] + + h := [2]float64{sizeDelta, 1.0} + Eh := [2]float64{ + k.e[0][0]*h[0] + k.e[0][1]*h[1], + k.e[1][0]*h[0] + k.e[1][1]*h[1], + } + residual := timeDelta - (k.state[0]*h[0] + k.state[1]*h[1]) + + maxResidual := 3.0 * math.Sqrt(k.varNoise) + if math.Abs(residual) < maxResidual { + k.updateNoiseEstimate(residual, timeDelta) + } else { + if residual < 0 { + k.updateNoiseEstimate(-maxResidual, timeDelta) + } else { + k.updateNoiseEstimate(maxResidual, timeDelta) + } + } + + denom := k.varNoise + h[0]*Eh[0] + h[1]*Eh[1] + + K := [2]float64{ + Eh[0] / denom, Eh[1] / denom, + } + + IKh := [2][2]float64{ + {1.0 - K[0]*h[0], -K[0] * h[1]}, + {-K[1] * h[0], 1.0 - K[1]*h[1]}, + } + + e00 := k.e[0][0] + e01 := k.e[0][1] + + k.e[0][0] = e00*IKh[0][0] + k.e[1][0]*IKh[0][1] + k.e[0][1] = e01*IKh[0][0] + k.e[1][1]*IKh[0][1] + k.e[1][0] = e00*IKh[1][0] + k.e[1][0]*IKh[1][1] + k.e[1][1] = e01*IKh[1][0] + k.e[1][1]*IKh[1][1] + + k.state[0] = k.state[0] + K[0]*residual + k.state[1] = k.state[1] + K[1]*residual + + return k.state[1] +} + +func (k *kalmanFilter) updateNoiseEstimate(residual float64, timeDelta float64) { + alpha := 0.002 + beta := math.Pow(1-alpha, timeDelta*30.0/1000.0) + k.avgNoise = beta*k.avgNoise + (1-beta)*residual + k.varNoise = beta*k.varNoise + (1-beta)*(k.avgNoise-residual)*(k.avgNoise-residual) + if k.varNoise < 1 { + k.varNoise = 1 + } +} diff --git a/gcc/loss_rate_controller.go b/gcc/loss_rate_controller.go new file mode 100644 index 0000000..3db8976 --- /dev/null +++ b/gcc/loss_rate_controller.go @@ -0,0 +1,53 @@ +package gcc + +type LossRateController struct { + bitrate int + min, max float64 + + packetsSinceLastUpdate int + arrivedSinceLastUpdate int + lostSinceLastUpdate int +} + +func NewLossRateController(initialRate, minRate, maxRate int) *LossRateController { + return &LossRateController{ + bitrate: initialRate, + min: float64(minRate), + max: float64(maxRate), + packetsSinceLastUpdate: 0, + arrivedSinceLastUpdate: 0, + lostSinceLastUpdate: 0, + } +} + +func (l *LossRateController) OnPacketAcked() { + l.packetsSinceLastUpdate++ + l.arrivedSinceLastUpdate++ +} + +func (l *LossRateController) OnPacketLost() { + l.packetsSinceLastUpdate++ + l.lostSinceLastUpdate++ +} + +func (l *LossRateController) Update(lastDeliveryRate int) int { + lossRate := float64(l.lostSinceLastUpdate) / float64(l.packetsSinceLastUpdate) + var target float64 + if lossRate > 0.1 { + target = float64(l.bitrate) * (1 - 0.5*lossRate) + target = max(target, l.min) + } else if lossRate < 0.02 { + target = float64(l.bitrate) * 1.05 + target = max(min(target, 1.5*float64(lastDeliveryRate)), float64(l.bitrate)) + target = min(target, l.max) + } + if target != 0 { + l.bitrate = int(target) + } + + l.packetsSinceLastUpdate = 0 + l.arrivedSinceLastUpdate = 0 + l.lostSinceLastUpdate = 0 + + return l.bitrate +} diff --git a/gcc/loss_rate_controller_test.go b/gcc/loss_rate_controller_test.go new file mode 100644 index 0000000..3821425 --- /dev/null +++ b/gcc/loss_rate_controller_test.go @@ -0,0 +1,86 @@ +package gcc + +import ( + "fmt" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestLossRateController(t *testing.T) { + cases := []struct { + init, min, max int + acked int + lost int + deliveredRate int + expectedRate int + }{ + {}, // all zeros + { + init: 100_000, + min: 100_000, + max: 1_000_000, + acked: 0, + lost: 0, + deliveredRate: 0, + expectedRate: 100_000, + }, + { + init: 100_000, + min: 100_000, + max: 1_000_000, + acked: 99, + lost: 1, + deliveredRate: 100_000, + expectedRate: 105_000, + }, + { + init: 100_000, + min: 100_000, + max: 1_000_000, + acked: 99, + lost: 1, + deliveredRate: 90_000, + expectedRate: 105_000, + }, + { + init: 100_000, + min: 100_000, + max: 1_000_000, + acked: 95, + lost: 5, + deliveredRate: 99_000, + expectedRate: 100_000, + }, + { + init: 100_000, + min: 50_000, + max: 1_000_000, + acked: 89, + lost: 11, + deliveredRate: 90_000, + expectedRate: 94_500, + }, + { + init: 100_000, + min: 100_000, + max: 1_000_000, + acked: 89, + lost: 11, + deliveredRate: 90_000, + expectedRate: 100_000, + }, + } + for i, tc := range cases { + t.Run(fmt.Sprintf("%v", i), func(t *testing.T) { + lrc := NewLossRateController(tc.init, tc.min, tc.max) + for i := 0; i < tc.acked; i++ { + lrc.OnPacketAcked() + } + for i := 0; i < tc.lost; i++ { + lrc.OnPacketLost() + } + assert.Equal(t, tc.expectedRate, lrc.Update(tc.deliveredRate)) + }) + } +} diff --git a/gcc/overuse_detector.go b/gcc/overuse_detector.go new file mode 100644 index 0000000..61c8217 --- /dev/null +++ b/gcc/overuse_detector.go @@ -0,0 +1,84 @@ +package gcc + +import ( + "math" + "time" +) + +const ( + kU = 0.01 + kD = 0.00018 + + maxNumDeltas = 60 +) + +type overuseDetector struct { + adaptiveThreshold bool + overUseTimeThreshold time.Duration + delayThreshold float64 + lastEstimate time.Duration + lastUpdate time.Time + firstOverUse time.Time + inOveruse bool + lastUsage usage +} + +func newOveruseDetector(adaptive bool) *overuseDetector { + return &overuseDetector{ + adaptiveThreshold: adaptive, + overUseTimeThreshold: 10 * time.Millisecond, + delayThreshold: 12.5, + lastEstimate: 0, + lastUpdate: time.Time{}, + firstOverUse: time.Time{}, + inOveruse: false, + } +} + +func (d *overuseDetector) update(ts time.Time, trend float64, numDeltas int) usage { + if numDeltas < 2 { + return usageNormal + } + modifiedTrend := float64(min(numDeltas, maxNumDeltas)) * trend + + if modifiedTrend > d.delayThreshold { + if d.firstOverUse.IsZero() { + d.firstOverUse = ts + } + if ts.Sub(d.firstOverUse) > d.overUseTimeThreshold { + d.firstOverUse = time.Time{} + d.lastUsage = usageOver + } + } else if modifiedTrend < -d.delayThreshold { + d.firstOverUse = time.Time{} + d.lastUsage = usageUnder + } else { + d.firstOverUse = time.Time{} + d.lastUsage = usageNormal + } + if d.adaptiveThreshold { + d.adaptThreshold(ts, modifiedTrend) + } + return d.lastUsage +} + +func (d *overuseDetector) adaptThreshold(ts time.Time, modifiedTrend float64) { + if d.lastUpdate.IsZero() { + d.lastUpdate = ts + } + if math.Abs(modifiedTrend) > d.delayThreshold+15 { + d.lastUpdate = ts + return + } + k := kU + if math.Abs(modifiedTrend) < d.delayThreshold { + k = kD + } + delta := ts.Sub(d.lastUpdate) + if delta > 100*time.Millisecond { + delta = 100 * time.Millisecond + } + d.delayThreshold += k * (math.Abs(modifiedTrend) - d.delayThreshold) * float64(delta.Milliseconds()) + d.delayThreshold = max(min(d.delayThreshold, 600.0), 6.0) + d.lastUpdate = ts +} diff --git a/gcc/overuse_detector_test.go b/gcc/overuse_detector_test.go new file mode 100644 index 0000000..a83cff2 --- /dev/null +++ b/gcc/overuse_detector_test.go @@ -0,0 +1,186 @@ +package gcc + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestOveruseDetectorUpdate(t *testing.T) { + type estimate struct { + ts time.Time + estimate float64 + numDeltas int + } + cases := []struct { + name string + adaptive bool + values []estimate + expected []usage + }{ + { + name: "noEstimateNoUsageStatic", + adaptive: false, + values: []estimate{}, + expected: []usage{}, + }, + { + name: "overuseStatic", + adaptive: false, + values: []estimate{ + {time.Time{}, 1.0, 1}, + {time.Time{}.Add(5 * time.Millisecond), 20, 2}, + {time.Time{}.Add(20 * time.Millisecond), 30, 3}, + }, + expected: []usage{usageNormal, usageNormal, usageOver}, + }, + { + name: "normaluseStatic", + adaptive: false, + values: []estimate{{estimate: 0}}, + expected: []usage{usageNormal}, + }, + { + name: "underuseStatic", + adaptive: false, + values: []estimate{{time.Time{}, -20, 2}}, + expected: []usage{usageUnder}, + }, + { + name: "noOverUseBeforeDelayStatic", + adaptive: false, + values: []estimate{ + {time.Time{}.Add(time.Millisecond), 20, 1}, + {time.Time{}.Add(2 * time.Millisecond), 30, 2}, + {time.Time{}.Add(30 * time.Millisecond), 50, 3}, + }, + expected: []usage{usageNormal, usageNormal, usageOver}, + }, + { + name: "noOverUseIfEstimateDecreasedStatic", + adaptive: false, + values: []estimate{ + {time.Time{}.Add(time.Millisecond), 20, 1}, + {time.Time{}.Add(10 * time.Millisecond), 40, 2}, + {time.Time{}.Add(30 * time.Millisecond), 50, 3}, + {time.Time{}.Add(35 * time.Millisecond), 3, 4}, + }, + expected: []usage{usageNormal, usageNormal, usageOver, usageNormal}, + }, + { + name: "noEstimateNoUsageAdaptive", + adaptive: true, + values: []estimate{}, + expected: []usage{}, + }, + { + name: "overuseAdaptive", + adaptive: true, + values: []estimate{ + {time.Time{}, 1, 1}, + {time.Time{}.Add(5 * time.Millisecond), 20, 2}, + {time.Time{}.Add(20 * time.Millisecond), 30, 3}, + }, + expected: []usage{usageNormal, usageNormal, usageOver}, + }, + { + name: "normaluseAdaptive", + adaptive: true, + values: []estimate{{estimate: 0}}, + expected: []usage{usageNormal}, + }, + { + name: "underuseAdaptive", + adaptive: true, + values: []estimate{{time.Time{}, -20, 2}}, + expected: []usage{usageUnder}, + }, + { + name: "noOverUseBeforeDelayAdaptive", + adaptive: true, + values: []estimate{ + {time.Time{}.Add(time.Millisecond), 20, 1}, + {time.Time{}.Add(2 * time.Millisecond), 30, 2}, + {time.Time{}.Add(30 * time.Millisecond), 50, 3}, + }, + expected: []usage{usageNormal, usageNormal, usageOver}, + }, + { + name: "noOverUseIfEstimateDecreasedAdaptive", + adaptive: true, + values: []estimate{ + {time.Time{}.Add(time.Millisecond), 20, 1}, + {time.Time{}.Add(10 * time.Millisecond), 40, 2}, + {time.Time{}.Add(30 * time.Millisecond), 50, 3}, + {time.Time{}.Add(35 * time.Millisecond), 3, 4}, + }, + expected: []usage{usageNormal, usageNormal, usageOver, usageNormal}, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + od := newOveruseDetector(tc.adaptive) + received := []usage{} + for _, e := range tc.values { + usage := od.update(e.ts, e.estimate, e.numDeltas) + received = append(received, usage) + } + assert.Equal(t, tc.expected, received) + }) + } +} + +func TestOveruseDetectorAdaptThreshold(t *testing.T) { + cases := []struct { + name string + od *overuseDetector + ts time.Time + estimate float64 + expectedThreshold float64 + }{ + { + name: "minThreshold", + od: &overuseDetector{}, + ts: time.Time{}, + estimate: 0, + expectedThreshold: 6, + }, + { + name: "increase", + od: &overuseDetector{ + delayThreshold: 12.5, + lastUpdate: time.Time{}.Add(time.Second), + }, + ts: time.Time{}.Add(2 * time.Second), + estimate: 25, + expectedThreshold: 25, + }, + { + name: "maxThreshold", + od: &overuseDetector{ + delayThreshold: 6, + lastUpdate: time.Time{}, + }, + ts: time.Time{}.Add(time.Second), + estimate: 6.1, + expectedThreshold: 6, + }, + { + name: "decrease", + od: &overuseDetector{ + delayThreshold: 12.5, + lastUpdate: time.Time{}, + }, + ts: time.Time{}.Add(time.Second), + estimate: 1, + expectedThreshold: 12.5, + }, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + tc.od.adaptThreshold(tc.ts, tc.estimate) + assert.Equal(t, tc.expectedThreshold, tc.od.delayThreshold) + }) + } +} diff --git a/gcc/rate_controller.go b/gcc/rate_controller.go new file mode 100644 index 0000000..d181420 --- /dev/null +++ b/gcc/rate_controller.go @@ -0,0 +1,76 @@ +package gcc + +import ( + "math" + "time" +) + +type rateController struct { + s state + rate int + + decreaseFactor float64 // (beta) + lastUpdate time.Time + lastDecrease *exponentialMovingAverage +} + +func newRateController(initialRate int) *rateController { + return &rateController{ + s: stateIncrease, + rate: initialRate, + decreaseFactor: 0.85, + lastUpdate: time.Time{}, + lastDecrease: &exponentialMovingAverage{}, + } +} + +func (c *rateController) update(ts time.Time, u usage, deliveredRate int, rtt time.Duration) int { + nextState := c.s.transition(u) + c.s = nextState + + if c.s == stateIncrease { + var target float64 + if c.canIncreaseMultiplicatively(float64(deliveredRate)) { + window := ts.Sub(c.lastUpdate) + target = c.multiplicativeIncrease(float64(c.rate), window) + } else { + bitsPerFrame := float64(c.rate) / 30.0 + packetsPerFrame := math.Ceil(bitsPerFrame / (1200 * 8)) + expectedPacketSizeBits := bitsPerFrame / packetsPerFrame + target = c.additiveIncrease(float64(c.rate), int(expectedPacketSizeBits), rtt) + } + c.rate = int(max(min(target, 1.5*float64(deliveredRate)), float64(c.rate))) + } + + if c.s == stateDecrease { + c.rate = int(c.decreaseFactor * float64(deliveredRate)) + c.lastDecrease.update(float64(c.rate)) + } + + c.lastUpdate = ts + + return c.rate +} + +func (c *rateController) canIncreaseMultiplicatively(deliveredRate float64) bool { + if c.lastDecrease.average == 0 { + return true + } + stdDev := math.Sqrt(c.lastDecrease.variance) + lower := c.lastDecrease.average - 3*stdDev + upper := c.lastDecrease.average + 3*stdDev + return deliveredRate < lower || deliveredRate > upper +} + +func (c *rateController) multiplicativeIncrease(rate float64, window time.Duration) float64 { + exponent := min(window.Seconds(), 1.0) + eta := math.Pow(1.08, exponent) + target := eta * rate + return target +} + +func (c *rateController) additiveIncrease(rate float64, expectedPacketSizeBits int, window time.Duration) float64 { + alpha := 0.5 * min(window.Seconds(), 1.0) + target := rate + max(1000, alpha*float64(expectedPacketSizeBits)) + return target +} diff --git a/gcc/rate_controller_test.go b/gcc/rate_controller_test.go new file mode 100644 index 0000000..5e0a575 --- /dev/null +++ b/gcc/rate_controller_test.go @@ -0,0 +1,143 @@ +package gcc + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" +) + +func TestRateController(t *testing.T) { + cases := []struct { + name string + rc rateController + ts time.Time + u usage + delivered int + rtt time.Duration + expectedRate int + }{ + { + name: "zero", + rc: rateController{ + s: 0, + rate: 0, + decreaseFactor: 0, + lastUpdate: time.Time{}, + lastDecrease: &exponentialMovingAverage{}, + }, + ts: time.Time{}, + u: 0, + delivered: 0, + rtt: 0, + expectedRate: 0, + }, + { + name: "multiplicativeIncrease", + rc: rateController{ + s: stateIncrease, + rate: 100, + decreaseFactor: 0.9, + lastUpdate: time.Time{}, + lastDecrease: &exponentialMovingAverage{}, + }, + ts: time.Time{}.Add(time.Second), + u: usageNormal, + delivered: 100, + rtt: 0, + expectedRate: 108, + }, + { + name: "minimumAdditiveIncrease", + rc: rateController{ + s: stateIncrease, + rate: 100_000, + decreaseFactor: 0.9, + lastUpdate: time.Time{}, + lastDecrease: &exponentialMovingAverage{ + average: 100_000, + }, + }, + ts: time.Time{}.Add(time.Second), + u: usageNormal, + delivered: 100_000, + rtt: 20 * time.Millisecond, + expectedRate: 101_000, + }, + { + name: "additiveIncrease", + rc: rateController{ + s: stateIncrease, + rate: 1_000_000, + decreaseFactor: 0.9, + lastUpdate: time.Time{}, + lastDecrease: &exponentialMovingAverage{ + average: 1_000_000, + }, + }, + ts: time.Time{}.Add(time.Second), + u: usageNormal, + delivered: 1_000_000, + rtt: 2000 * time.Millisecond, + expectedRate: 1_004166, + }, + { + name: "minimumAdditiveIncreaseAppLimited", + rc: rateController{ + s: stateIncrease, + rate: 100_000, + decreaseFactor: 0.9, + lastUpdate: time.Time{}, + lastDecrease: &exponentialMovingAverage{ + average: 100_000, + }, + }, + ts: time.Time{}.Add(time.Second), + u: usageNormal, + delivered: 50_000, + rtt: 20 * time.Millisecond, + expectedRate: 100_000, + }, + { + name: "additiveIncreaseAppLimited", + rc: rateController{ + s: stateIncrease, + rate: 1_000_000, + decreaseFactor: 0.9, + lastUpdate: time.Time{}, + lastDecrease: &exponentialMovingAverage{ + average: 1_000_000, + }, + }, + ts: time.Time{}.Add(time.Second), + u: usageNormal, + delivered: 100_000, + rtt: 2000 * time.Millisecond, + expectedRate: 1_000_000, + }, + { + name: "decrease", + rc: rateController{ + s: stateDecrease, + rate: 1_000_000, + decreaseFactor: 0.9, + lastUpdate: time.Time{}, + lastDecrease: &exponentialMovingAverage{ + average: 1_000_000, + }, + }, + ts: time.Time{}.Add(time.Second), + u: usageOver, + delivered: 1_000_000, + rtt: 2000 * time.Millisecond, + expectedRate: 900_000, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + res := tc.rc.update(tc.ts, tc.u, tc.delivered, tc.rtt) + assert.Equal(t, tc.expectedRate, res) + }) + } +} diff --git a/gcc/rtt.go b/gcc/rtt.go new file mode 100644 index 0000000..a6f49e4 --- /dev/null +++ b/gcc/rtt.go @@ -0,0 +1,8 @@ +package gcc + +import "time" + +func MeasureRTT(reportSent, reportReceived, latestAckedSent, latestAckedArrival time.Time) time.Duration { + pendingTime := reportSent.Sub(latestAckedArrival) + return reportReceived.Sub(latestAckedSent) - pendingTime +} diff --git a/gcc/send_side_bwe.go b/gcc/send_side_bwe.go new file mode 100644 index 0000000..34472e9 --- /dev/null +++ b/gcc/send_side_bwe.go @@ -0,0 +1,50 @@ +package gcc + +import ( + "time" + + "github.com/pion/logging" +) + +type SendSideController struct { + log logging.LeveledLogger + dre *deliveryRateEstimator + lbc *LossRateController + drc *DelayRateController + rate int +} + +func NewSendSideController(initialRate, minRate, maxRate int) *SendSideController { + return &SendSideController{ + log: logging.NewDefaultLoggerFactory().NewLogger("bwe_send_side_controller"), + dre: newDeliveryRateEstimator(time.Second), + lbc: NewLossRateController(initialRate, minRate, maxRate), + drc: NewDelayRateController(initialRate), + rate: initialRate, + } +} + +func (c *SendSideController) OnAcks(arrival time.Time, rtt time.Duration, acks []Acknowledgment) int { + if len(acks) == 0 { + return c.rate + } + + for _, ack := range acks { + if ack.Arrived { + c.lbc.OnPacketAcked() + if !ack.Arrival.IsZero() { + c.dre.OnPacketAcked(ack.Arrival, int(ack.Size)) + c.drc.OnPacketAcked(ack) + } + } else { + c.lbc.OnPacketLost() + } + } + + delivered := c.dre.GetRate() + lossTarget := c.lbc.Update(delivered) + delayTarget := c.drc.Update(arrival, delivered, rtt) + c.rate = min(lossTarget, delayTarget) + c.log.Tracef("rtt=%v, delivered=%v, lossTarget=%v, delayTarget=%v, target=%v", rtt.Nanoseconds(), delivered, lossTarget, delayTarget, c.rate) + return c.rate +} diff --git a/gcc/state.go b/gcc/state.go new file mode 100644 index 0000000..055967b --- /dev/null +++ b/gcc/state.go @@ -0,0 +1,59 @@ +package gcc + +import "fmt" + +type state int + +const ( + stateDecrease state = -1 + stateHold state = 0 + stateIncrease state = 1 +) + +func (s state) transition(u usage) state { + switch s { + case stateHold: + switch u { + case usageOver: + return stateDecrease + case usageNormal: + return stateIncrease + case usageUnder: + return stateHold + } + + case stateIncrease: + switch u { + case usageOver: + return stateDecrease + case usageNormal: + return stateIncrease + case usageUnder: + return stateHold + } + + case stateDecrease: + switch u { + case usageOver: + return stateDecrease + case usageNormal: + return stateHold + case usageUnder: + return stateHold + } + } + return stateIncrease +} + +func (s state) String() string { + switch s { + case stateIncrease: + return "increase" + case stateDecrease: + return "decrease" + case stateHold: + return "hold" + default: + return fmt.Sprintf("invalid state: %d", s) + } +} diff --git a/gcc/state_test.go b/gcc/state_test.go new file mode 100644 index 0000000..991c2bc --- /dev/null +++ b/gcc/state_test.go @@ -0,0 +1,27 @@ +package gcc + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestState(t *testing.T) { + t.Run("hold", func(t *testing.T) { + assert.Equal(t, stateDecrease, stateHold.transition(usageOver)) + assert.Equal(t, stateIncrease, stateHold.transition(usageNormal)) + assert.Equal(t, stateHold, stateHold.transition(usageUnder)) + }) + + t.Run("increase", func(t *testing.T) { + assert.Equal(t, stateDecrease, stateIncrease.transition(usageOver)) + assert.Equal(t, stateIncrease, stateIncrease.transition(usageNormal)) + assert.Equal(t, stateHold, stateIncrease.transition(usageUnder)) + }) + + t.Run("decrease", func(t *testing.T) { + assert.Equal(t, stateDecrease, stateDecrease.transition(usageOver)) + assert.Equal(t, stateHold, stateDecrease.transition(usageNormal)) + assert.Equal(t, stateHold, stateDecrease.transition(usageUnder)) + }) +} diff --git a/gcc/usage.go b/gcc/usage.go new file mode 100644 index 0000000..030b133 --- /dev/null +++ b/gcc/usage.go @@ -0,0 +1,24 @@ +package gcc + +import "fmt" + +type usage int + +const ( + usageUnder usage = -1 + usageNormal usage = 0 + usageOver usage = 1 +) + +func (u usage) String() string { + switch u { + case usageOver: + return "overuse" + case usageUnder: + return "underuse" + case usageNormal: + return "normal" + default: + return fmt.Sprintf("invalid usage: %d", u) + } +} diff --git a/go.mod b/go.mod index ca12b57..37f72ba 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/pion/bwe-test -go 1.20 +go 1.21 require ( github.com/gorilla/websocket v1.5.3 @@ -14,24 +14,21 @@ require ( golang.org/x/sync v0.11.0 ) -require ( - github.com/pion/dtls/v3 v3.0.6 // indirect - github.com/pion/ice/v4 v4.0.10 // indirect - github.com/pion/mdns/v2 v2.0.7 // indirect - github.com/pion/srtp/v3 v3.0.4 // indirect - github.com/pion/stun/v3 v3.0.0 // indirect - github.com/pion/turn/v4 v4.0.0 // indirect - github.com/wlynxg/anet v0.0.5 // indirect -) - require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/google/uuid v1.6.0 // indirect github.com/pion/datachannel v1.5.10 // indirect + github.com/pion/dtls/v3 v3.0.6 // indirect + github.com/pion/ice/v4 v4.0.10 // indirect + github.com/pion/mdns/v2 v2.0.7 // indirect github.com/pion/randutil v0.1.0 // indirect github.com/pion/sctp v1.8.39 // indirect github.com/pion/sdp/v3 v3.0.11 // indirect + github.com/pion/srtp/v3 v3.0.4 // indirect + github.com/pion/stun/v3 v3.0.0 // indirect + github.com/pion/turn/v4 v4.0.0 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/wlynxg/anet v0.0.5 // indirect golang.org/x/crypto v0.33.0 // indirect golang.org/x/net v0.35.0 // indirect golang.org/x/sys v0.30.0 // indirect diff --git a/go.sum b/go.sum index a49db00..2627438 100644 --- a/go.sum +++ b/go.sum @@ -5,15 +5,13 @@ github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+ github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/pion/datachannel v1.5.10 h1:ly0Q26K1i6ZkGf42W7D4hQYR90pZwzFOjTq5AuCKk4o= github.com/pion/datachannel v1.5.10/go.mod h1:p/jJfC9arb29W7WrxyKbepTU20CFgyx5oLo8Rs4Py/M= -github.com/pion/dtls/v3 v3.0.4 h1:44CZekewMzfrn9pmGrj5BNnTMDCFwr+6sLH+cCuLM7U= -github.com/pion/dtls/v3 v3.0.4/go.mod h1:R373CsjxWqNPf6MEkfdy3aSe9niZvL/JaKlGeFphtMg= github.com/pion/dtls/v3 v3.0.6 h1:7Hkd8WhAJNbRgq9RgdNh1aaWlZlGpYTzdqjy9x9sK2E= github.com/pion/dtls/v3 v3.0.6/go.mod h1:iJxNQ3Uhn1NZWOMWlLxEEHAN5yX7GyPvvKw04v9bzYU= -github.com/pion/ice/v4 v4.0.8 h1:ajNx0idNG+S+v9Phu4LSn2cs8JEfTsA1/tEjkkAVpFY= -github.com/pion/ice/v4 v4.0.8/go.mod h1:y3M18aPhIxLlcO/4dn9X8LzLLSma84cx6emMSu14FGw= github.com/pion/ice/v4 v4.0.10 h1:P59w1iauC/wPk9PdY8Vjl4fOFL5B+USq1+xbDcN6gT4= github.com/pion/ice/v4 v4.0.10/go.mod h1:y3M18aPhIxLlcO/4dn9X8LzLLSma84cx6emMSu14FGw= github.com/pion/interceptor v0.1.37 h1:aRA8Zpab/wE7/c0O3fh1PqY0AJI3fCSEM5lRWJVorwI= @@ -28,8 +26,6 @@ github.com/pion/rtcp v1.2.15 h1:LZQi2JbdipLOj4eBjK4wlVoQWfrZbh3Q6eHtWtJBZBo= github.com/pion/rtcp v1.2.15/go.mod h1:jlGuAjHMEXwMUHK78RgX0UmEJFV4zUKOFHR7OP+D3D0= github.com/pion/rtp v1.8.15 h1:MuhuGn1cxpVCPLNY1lI7F1tQ8Spntpgf12ob+pOYT8s= github.com/pion/rtp v1.8.15/go.mod h1:bAu2UFKScgzyFqvUKmbvzSdPr+NGbZtv6UB2hesqXBk= -github.com/pion/sctp v1.8.37 h1:ZDmGPtRPX9mKCiVXtMbTWybFw3z/hVKAZgU81wcOrqs= -github.com/pion/sctp v1.8.37/go.mod h1:cNiLdchXra8fHQwmIoqw0MbLLMs+f7uQ+dGMG2gWebE= github.com/pion/sctp v1.8.39 h1:PJma40vRHa3UTO3C4MyeJDQ+KIobVYRZQZ0Nt7SjQnE= github.com/pion/sctp v1.8.39/go.mod h1:cNiLdchXra8fHQwmIoqw0MbLLMs+f7uQ+dGMG2gWebE= github.com/pion/sdp/v3 v3.0.11 h1:VhgVSopdsBKwhCFoyyPmT1fKMeV9nLMrEKxNOdy3IVI= @@ -42,8 +38,6 @@ github.com/pion/transport/v3 v3.0.7 h1:iRbMH05BzSNwhILHoBoAPxoB9xQgOaJk+591KC9P1 github.com/pion/transport/v3 v3.0.7/go.mod h1:YleKiTZ4vqNxVwh77Z0zytYi7rXHl7j6uPLGhhz9rwo= github.com/pion/turn/v4 v4.0.0 h1:qxplo3Rxa9Yg1xXDxxH8xaqcyGUtbHYw4QSCvmFWvhM= github.com/pion/turn/v4 v4.0.0/go.mod h1:MuPDkm15nYSklKpN8vWJ9W2M0PlyQZqYt1McGuxG7mA= -github.com/pion/webrtc/v4 v4.0.14 h1:nyds/sFRR+HvmWoBa6wrL46sSfpArE0qR883MBW96lg= -github.com/pion/webrtc/v4 v4.0.14/go.mod h1:R3+qTnQTS03UzwDarYecgioNf7DYgTsldxnCXB821Kk= github.com/pion/webrtc/v4 v4.1.0 h1:yq/p0G5nKGbHISf0YKNA8Yk+kmijbblBvuSLwaJ4QYg= github.com/pion/webrtc/v4 v4.1.0/go.mod h1:cgEGkcpxGkT6Di2ClBYO5lP9mFXbCfEOrkYUpjjCQO4= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -62,5 +56,6 @@ golang.org/x/sys v0.30.0 h1:QjkSwP/36a20jFYWkSue1YwXzLmsV5Gfq7Eiy72C1uc= golang.org/x/sys v0.30.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= From 79f04e775d9d31f149c6492cc9f9e96c4afc7516 Mon Sep 17 00:00:00 2001 From: Mathis Engelbart Date: Sat, 22 Mar 2025 21:38:31 +0800 Subject: [PATCH 02/11] Add SPDX license headers --- gcc/acknowledgment.go | 3 +++ gcc/arrival_group_accumulator.go | 3 +++ gcc/arrival_group_accumulator_test.go | 3 +++ gcc/delay_rate_controller.go | 3 +++ gcc/delivery_rate_estimator.go | 3 +++ gcc/delivery_rate_estimator_test.go | 3 +++ gcc/exponential_moving_average.go | 3 +++ gcc/exponential_moving_average_test.go | 3 +++ gcc/kalman.go | 3 +++ gcc/loss_rate_controller.go | 3 +++ gcc/loss_rate_controller_test.go | 3 +++ gcc/overuse_detector.go | 3 +++ gcc/overuse_detector_test.go | 3 +++ gcc/rate_controller.go | 3 +++ gcc/rate_controller_test.go | 3 +++ gcc/rtt.go | 3 +++ gcc/send_side_bwe.go | 3 +++ gcc/state.go | 3 +++ gcc/state_test.go | 3 +++ gcc/usage.go | 3 +++ 20 files changed, 60 insertions(+) diff --git a/gcc/acknowledgment.go b/gcc/acknowledgment.go index c23347f..441fc41 100644 --- a/gcc/acknowledgment.go +++ b/gcc/acknowledgment.go @@ -1,3 +1,6 @@ +// SPDX-FileCopyrightText: 2025 The Pion community +// SPDX-License-Identifier: MIT + package gcc import ( diff --git a/gcc/arrival_group_accumulator.go b/gcc/arrival_group_accumulator.go index a83e09f..cc2d33b 100644 --- a/gcc/arrival_group_accumulator.go +++ b/gcc/arrival_group_accumulator.go @@ -1,3 +1,6 @@ +// SPDX-FileCopyrightText: 2025 The Pion community +// SPDX-License-Identifier: MIT + package gcc import ( diff --git a/gcc/arrival_group_accumulator_test.go b/gcc/arrival_group_accumulator_test.go index b40e454..2e244e5 100644 --- a/gcc/arrival_group_accumulator_test.go +++ b/gcc/arrival_group_accumulator_test.go @@ -1,3 +1,6 @@ +// SPDX-FileCopyrightText: 2025 The Pion community +// SPDX-License-Identifier: MIT + package gcc import ( diff --git a/gcc/delay_rate_controller.go b/gcc/delay_rate_controller.go index 5c4f7f2..8cdccf8 100644 --- a/gcc/delay_rate_controller.go +++ b/gcc/delay_rate_controller.go @@ -1,3 +1,6 @@ +// SPDX-FileCopyrightText: 2025 The Pion community +// SPDX-License-Identifier: MIT + package gcc import ( diff --git a/gcc/delivery_rate_estimator.go b/gcc/delivery_rate_estimator.go index 5bbf547..6cfee86 100644 --- a/gcc/delivery_rate_estimator.go +++ b/gcc/delivery_rate_estimator.go @@ -1,3 +1,6 @@ +// SPDX-FileCopyrightText: 2025 The Pion community +// SPDX-License-Identifier: MIT + package gcc import ( diff --git a/gcc/delivery_rate_estimator_test.go b/gcc/delivery_rate_estimator_test.go index 5fd4424..d50dec1 100644 --- a/gcc/delivery_rate_estimator_test.go +++ b/gcc/delivery_rate_estimator_test.go @@ -1,3 +1,6 @@ +// SPDX-FileCopyrightText: 2025 The Pion community +// SPDX-License-Identifier: MIT + package gcc import ( diff --git a/gcc/exponential_moving_average.go b/gcc/exponential_moving_average.go index 8b568bd..c20450b 100644 --- a/gcc/exponential_moving_average.go +++ b/gcc/exponential_moving_average.go @@ -1,3 +1,6 @@ +// SPDX-FileCopyrightText: 2025 The Pion community +// SPDX-License-Identifier: MIT + package gcc type exponentialMovingAverage struct { diff --git a/gcc/exponential_moving_average_test.go b/gcc/exponential_moving_average_test.go index 6ccab62..293b90c 100644 --- a/gcc/exponential_moving_average_test.go +++ b/gcc/exponential_moving_average_test.go @@ -1,3 +1,6 @@ +// SPDX-FileCopyrightText: 2025 The Pion community +// SPDX-License-Identifier: MIT + package gcc import ( diff --git a/gcc/kalman.go b/gcc/kalman.go index 236f7a9..1c77846 100644 --- a/gcc/kalman.go +++ b/gcc/kalman.go @@ -1,3 +1,6 @@ +// SPDX-FileCopyrightText: 2025 The Pion community +// SPDX-License-Identifier: MIT + package gcc import ( diff --git a/gcc/loss_rate_controller.go b/gcc/loss_rate_controller.go index 3db8976..4779953 100644 --- a/gcc/loss_rate_controller.go +++ b/gcc/loss_rate_controller.go @@ -1,3 +1,6 @@ +// SPDX-FileCopyrightText: 2025 The Pion community +// SPDX-License-Identifier: MIT + package gcc type LossRateController struct { diff --git a/gcc/loss_rate_controller_test.go b/gcc/loss_rate_controller_test.go index 3821425..ac881da 100644 --- a/gcc/loss_rate_controller_test.go +++ b/gcc/loss_rate_controller_test.go @@ -1,3 +1,6 @@ +// SPDX-FileCopyrightText: 2025 The Pion community +// SPDX-License-Identifier: MIT + package gcc import ( diff --git a/gcc/overuse_detector.go b/gcc/overuse_detector.go index 61c8217..9b6fa9a 100644 --- a/gcc/overuse_detector.go +++ b/gcc/overuse_detector.go @@ -1,3 +1,6 @@ +// SPDX-FileCopyrightText: 2025 The Pion community +// SPDX-License-Identifier: MIT + package gcc import ( diff --git a/gcc/overuse_detector_test.go b/gcc/overuse_detector_test.go index a83cff2..a55368e 100644 --- a/gcc/overuse_detector_test.go +++ b/gcc/overuse_detector_test.go @@ -1,3 +1,6 @@ +// SPDX-FileCopyrightText: 2025 The Pion community +// SPDX-License-Identifier: MIT + package gcc import ( diff --git a/gcc/rate_controller.go b/gcc/rate_controller.go index d181420..095eabf 100644 --- a/gcc/rate_controller.go +++ b/gcc/rate_controller.go @@ -1,3 +1,6 @@ +// SPDX-FileCopyrightText: 2025 The Pion community +// SPDX-License-Identifier: MIT + package gcc import ( diff --git a/gcc/rate_controller_test.go b/gcc/rate_controller_test.go index 5e0a575..ebef313 100644 --- a/gcc/rate_controller_test.go +++ b/gcc/rate_controller_test.go @@ -1,3 +1,6 @@ +// SPDX-FileCopyrightText: 2025 The Pion community +// SPDX-License-Identifier: MIT + package gcc import ( diff --git a/gcc/rtt.go b/gcc/rtt.go index a6f49e4..be7d612 100644 --- a/gcc/rtt.go +++ b/gcc/rtt.go @@ -1,3 +1,6 @@ +// SPDX-FileCopyrightText: 2025 The Pion community +// SPDX-License-Identifier: MIT + package gcc import "time" diff --git a/gcc/send_side_bwe.go b/gcc/send_side_bwe.go index 34472e9..5ce8c87 100644 --- a/gcc/send_side_bwe.go +++ b/gcc/send_side_bwe.go @@ -1,3 +1,6 @@ +// SPDX-FileCopyrightText: 2025 The Pion community +// SPDX-License-Identifier: MIT + package gcc import ( diff --git a/gcc/state.go b/gcc/state.go index 055967b..04ef3bf 100644 --- a/gcc/state.go +++ b/gcc/state.go @@ -1,3 +1,6 @@ +// SPDX-FileCopyrightText: 2025 The Pion community +// SPDX-License-Identifier: MIT + package gcc import "fmt" diff --git a/gcc/state_test.go b/gcc/state_test.go index 991c2bc..68acf56 100644 --- a/gcc/state_test.go +++ b/gcc/state_test.go @@ -1,3 +1,6 @@ +// SPDX-FileCopyrightText: 2025 The Pion community +// SPDX-License-Identifier: MIT + package gcc import ( diff --git a/gcc/usage.go b/gcc/usage.go index 030b133..d3ccfac 100644 --- a/gcc/usage.go +++ b/gcc/usage.go @@ -1,3 +1,6 @@ +// SPDX-FileCopyrightText: 2025 The Pion community +// SPDX-License-Identifier: MIT + package gcc import "fmt" From b0c89047ea86ddd77ad84faae0167b8be2a3cff3 Mon Sep 17 00:00:00 2001 From: Mathis Engelbart Date: Wed, 30 Jul 2025 18:11:09 +0200 Subject: [PATCH 03/11] Remove unused constant --- gcc/delay_rate_controller.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/gcc/delay_rate_controller.go b/gcc/delay_rate_controller.go index 8cdccf8..c2d5a1b 100644 --- a/gcc/delay_rate_controller.go +++ b/gcc/delay_rate_controller.go @@ -9,8 +9,6 @@ import ( "github.com/pion/logging" ) -const maxSamples = 1000 - type DelayRateController struct { log logging.LeveledLogger aga *arrivalGroupAccumulator From 241f08b9b1c5977e1b53dc2f4db29ff5e6466183 Mon Sep 17 00:00:00 2001 From: Mathis Engelbart Date: Wed, 30 Jul 2025 18:12:14 +0200 Subject: [PATCH 04/11] Add options to allow custom logger --- gcc/delay_rate_controller.go | 4 ++-- gcc/send_side_bwe.go | 22 +++++++++++++++++++--- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/gcc/delay_rate_controller.go b/gcc/delay_rate_controller.go index c2d5a1b..8d1ff82 100644 --- a/gcc/delay_rate_controller.go +++ b/gcc/delay_rate_controller.go @@ -20,9 +20,9 @@ type DelayRateController struct { samples int } -func NewDelayRateController(initialRate int) *DelayRateController { +func NewDelayRateController(initialRate int, logger logging.LeveledLogger) *DelayRateController { return &DelayRateController{ - log: logging.NewDefaultLoggerFactory().NewLogger("bwe_delay_rate_controller"), + log: logger, aga: newArrivalGroupAccumulator(), last: []Acknowledgment{}, kf: newKalmanFilter(), diff --git a/gcc/send_side_bwe.go b/gcc/send_side_bwe.go index 5ce8c87..dae618c 100644 --- a/gcc/send_side_bwe.go +++ b/gcc/send_side_bwe.go @@ -9,6 +9,16 @@ import ( "github.com/pion/logging" ) +type Option func(*SendSideController) error + +func Logger(l logging.LeveledLogger) Option { + return func(ssc *SendSideController) error { + ssc.log = l + ssc.drc.log = l + return nil + } +} + type SendSideController struct { log logging.LeveledLogger dre *deliveryRateEstimator @@ -17,14 +27,20 @@ type SendSideController struct { rate int } -func NewSendSideController(initialRate, minRate, maxRate int) *SendSideController { - return &SendSideController{ +func NewSendSideController(initialRate, minRate, maxRate int, opts ...Option) (*SendSideController, error) { + ssc := &SendSideController{ log: logging.NewDefaultLoggerFactory().NewLogger("bwe_send_side_controller"), dre: newDeliveryRateEstimator(time.Second), lbc: NewLossRateController(initialRate, minRate, maxRate), - drc: NewDelayRateController(initialRate), + drc: NewDelayRateController(initialRate, logging.NewDefaultLoggerFactory().NewLogger("bwe_delay_rate_controller")), rate: initialRate, } + for _, opt := range opts { + if err := opt(ssc); err != nil { + return nil, err + } + } + return ssc, nil } func (c *SendSideController) OnAcks(arrival time.Time, rtt time.Duration, acks []Acknowledgment) int { From 27dddf40dbdf9f12d1e0457825a0da7fd428300b Mon Sep 17 00:00:00 2001 From: Mathis Engelbart Date: Wed, 30 Jul 2025 18:12:52 +0200 Subject: [PATCH 05/11] Ignore all acks of out of order packets --- gcc/send_side_bwe.go | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/gcc/send_side_bwe.go b/gcc/send_side_bwe.go index dae618c..d6f8653 100644 --- a/gcc/send_side_bwe.go +++ b/gcc/send_side_bwe.go @@ -20,11 +20,12 @@ func Logger(l logging.LeveledLogger) Option { } type SendSideController struct { - log logging.LeveledLogger - dre *deliveryRateEstimator - lbc *LossRateController - drc *DelayRateController - rate int + log logging.LeveledLogger + dre *deliveryRateEstimator + lbc *LossRateController + drc *DelayRateController + rate int + highestAcked uint64 } func NewSendSideController(initialRate, minRate, maxRate int, opts ...Option) (*SendSideController, error) { @@ -49,7 +50,13 @@ func (c *SendSideController) OnAcks(arrival time.Time, rtt time.Duration, acks [ } for _, ack := range acks { + if ack.SeqNr < c.highestAcked { + continue + } if ack.Arrived { + if ack.SeqNr > c.highestAcked { + c.highestAcked = ack.SeqNr + } c.lbc.OnPacketAcked() if !ack.Arrival.IsZero() { c.dre.OnPacketAcked(ack.Arrival, int(ack.Size)) From 437822b0fcb9cc04a4c66c717f800379a1a5db63 Mon Sep 17 00:00:00 2001 From: Mathis Engelbart Date: Thu, 31 Jul 2025 15:50:57 +0200 Subject: [PATCH 06/11] Use unsigned sequence numbers --- gcc/acknowledgment.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/gcc/acknowledgment.go b/gcc/acknowledgment.go index 441fc41..efe0e9b 100644 --- a/gcc/acknowledgment.go +++ b/gcc/acknowledgment.go @@ -28,7 +28,7 @@ const ( ) type Acknowledgment struct { - SeqNr int64 + SeqNr uint64 Size uint16 Departure time.Time Arrived bool From a95d17cadf0558fafb040d3e88d64198aaaef7f7 Mon Sep 17 00:00:00 2001 From: Mathis Engelbart Date: Thu, 31 Jul 2025 15:51:05 +0200 Subject: [PATCH 07/11] Add package description --- gcc/gcc.go | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 gcc/gcc.go diff --git a/gcc/gcc.go b/gcc/gcc.go new file mode 100644 index 0000000..871b875 --- /dev/null +++ b/gcc/gcc.go @@ -0,0 +1,3 @@ +// Package gcc implements a congestion controller based on +// https://datatracker.ietf.org/doc/html/draft-ietf-rmcat-gcc-02. +package gcc From 32ce973e096a36f583495b09e49a0fc6462fcf20 Mon Sep 17 00:00:00 2001 From: Mathis Engelbart Date: Thu, 31 Jul 2025 15:53:20 +0200 Subject: [PATCH 08/11] Reduce code duplication --- gcc/arrival_group_accumulator.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/gcc/arrival_group_accumulator.go b/gcc/arrival_group_accumulator.go index cc2d33b..a7c4d1a 100644 --- a/gcc/arrival_group_accumulator.go +++ b/gcc/arrival_group_accumulator.go @@ -29,12 +29,12 @@ func (a *arrivalGroupAccumulator) onPacketAcked(ack Acknowledgment) arrivalGroup return nil } - if ack.Departure.Sub(a.next[0].Departure) < a.burstInterval { + sendTimeDelta := ack.Departure.Sub(a.next[0].Departure) + if sendTimeDelta < a.burstInterval { a.next = append(a.next, ack) return nil } - sendTimeDelta := ack.Departure.Sub(a.next[0].Departure) arrivalTimeDeltaLast := ack.Arrival.Sub(a.next[len(a.next)-1].Arrival) arrivalTimeDeltaFirst := ack.Arrival.Sub(a.next[0].Arrival) propagationDelta := arrivalTimeDeltaFirst - sendTimeDelta From dd4e888d7761bd8e8db42aded8612f931d867b8e Mon Sep 17 00:00:00 2001 From: Mathis Engelbart Date: Thu, 31 Jul 2025 15:55:40 +0200 Subject: [PATCH 09/11] Make zero departure times explicit --- gcc/arrival_group_accumulator_test.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/gcc/arrival_group_accumulator_test.go b/gcc/arrival_group_accumulator_test.go index 2e244e5..16befb5 100644 --- a/gcc/arrival_group_accumulator_test.go +++ b/gcc/arrival_group_accumulator_test.go @@ -46,7 +46,8 @@ func TestArrivalGroupAccumulator(t *testing.T) { name: "createsTwoElementGroup", log: []Acknowledgment{ { - Arrival: time.Time{}.Add(15 * time.Millisecond), + Departure: time.Time{}, + Arrival: time.Time{}.Add(15 * time.Millisecond), }, { Departure: time.Time{}.Add(3 * time.Millisecond), @@ -85,7 +86,8 @@ func TestArrivalGroupAccumulator(t *testing.T) { exp: []arrivalGroup{ { { - Arrival: time.Time{}.Add(15 * time.Millisecond), + Departure: time.Time{}, + Arrival: time.Time{}.Add(15 * time.Millisecond), }, { Departure: time.Time{}.Add(3 * time.Millisecond), @@ -120,7 +122,8 @@ func TestArrivalGroupAccumulator(t *testing.T) { exp: []arrivalGroup{ { { - Arrival: time.Time{}.Add(15 * time.Millisecond), + Departure: time.Time{}, + Arrival: time.Time{}.Add(15 * time.Millisecond), }, { Departure: time.Time{}.Add(3 * time.Millisecond), From dbc0c6bae4590fe43c8161b34e7c245911ada343 Mon Sep 17 00:00:00 2001 From: Mathis Engelbart Date: Thu, 31 Jul 2025 15:58:56 +0200 Subject: [PATCH 10/11] Unexport types that are for internal use only --- gcc/delay_rate_controller.go | 10 +++++----- gcc/delivery_rate_estimator.go | 4 ++-- gcc/delivery_rate_estimator_test.go | 4 ++-- gcc/loss_rate_controller.go | 12 ++++++------ gcc/loss_rate_controller_test.go | 8 ++++---- gcc/send_side_bwe.go | 22 +++++++++++----------- 6 files changed, 30 insertions(+), 30 deletions(-) diff --git a/gcc/delay_rate_controller.go b/gcc/delay_rate_controller.go index 8d1ff82..9bf7b87 100644 --- a/gcc/delay_rate_controller.go +++ b/gcc/delay_rate_controller.go @@ -9,7 +9,7 @@ import ( "github.com/pion/logging" ) -type DelayRateController struct { +type delayRateController struct { log logging.LeveledLogger aga *arrivalGroupAccumulator last arrivalGroup @@ -20,8 +20,8 @@ type DelayRateController struct { samples int } -func NewDelayRateController(initialRate int, logger logging.LeveledLogger) *DelayRateController { - return &DelayRateController{ +func newDelayRateController(initialRate int, logger logging.LeveledLogger) *delayRateController { + return &delayRateController{ log: logger, aga: newArrivalGroupAccumulator(), last: []Acknowledgment{}, @@ -33,7 +33,7 @@ func NewDelayRateController(initialRate int, logger logging.LeveledLogger) *Dela } } -func (c *DelayRateController) OnPacketAcked(ack Acknowledgment) { +func (c *delayRateController) onPacketAcked(ack Acknowledgment) { next := c.aga.onPacketAcked(ack) if next == nil { return @@ -74,7 +74,7 @@ func (c *DelayRateController) OnPacketAcked(ack Acknowledgment) { ) } -func (c *DelayRateController) Update(ts time.Time, lastDeliveryRate int, rtt time.Duration) int { +func (c *delayRateController) update(ts time.Time, lastDeliveryRate int, rtt time.Duration) int { return c.rc.update(ts, c.latestUsage, lastDeliveryRate, rtt) } diff --git a/gcc/delivery_rate_estimator.go b/gcc/delivery_rate_estimator.go index 6cfee86..2892942 100644 --- a/gcc/delivery_rate_estimator.go +++ b/gcc/delivery_rate_estimator.go @@ -58,7 +58,7 @@ func newDeliveryRateEstimator(window time.Duration) *deliveryRateEstimator { } } -func (e *deliveryRateEstimator) OnPacketAcked(arrival time.Time, size int) { +func (e *deliveryRateEstimator) onPacketAcked(arrival time.Time, size int) { if arrival.After(e.latestArrival) { e.latestArrival = arrival } @@ -68,7 +68,7 @@ func (e *deliveryRateEstimator) OnPacketAcked(arrival time.Time, size int) { }) } -func (e *deliveryRateEstimator) GetRate() int { +func (e *deliveryRateEstimator) getRate() int { deadline := e.latestArrival.Add(-e.window) for len(*e.history) > 0 && (*e.history)[0].arrival.Before(deadline) { heap.Pop(e.history) diff --git a/gcc/delivery_rate_estimator_test.go b/gcc/delivery_rate_estimator_test.go index d50dec1..0324ffb 100644 --- a/gcc/delivery_rate_estimator_test.go +++ b/gcc/delivery_rate_estimator_test.go @@ -69,9 +69,9 @@ func TestDeliveryRateEstimator(t *testing.T) { t.Run(fmt.Sprintf("%v", i), func(t *testing.T) { e := newDeliveryRateEstimator(tc.window) for _, ack := range tc.acks { - e.OnPacketAcked(ack.arrival, ack.size) + e.onPacketAcked(ack.arrival, ack.size) } - assert.Equal(t, tc.expectedRate, e.GetRate()) + assert.Equal(t, tc.expectedRate, e.getRate()) }) } } diff --git a/gcc/loss_rate_controller.go b/gcc/loss_rate_controller.go index 4779953..63fa41c 100644 --- a/gcc/loss_rate_controller.go +++ b/gcc/loss_rate_controller.go @@ -3,7 +3,7 @@ package gcc -type LossRateController struct { +type lossRateController struct { bitrate int min, max float64 @@ -12,8 +12,8 @@ type LossRateController struct { lostSinceLastUpdate int } -func NewLossRateController(initialRate, minRate, maxRate int) *LossRateController { - return &LossRateController{ +func newLossRateController(initialRate, minRate, maxRate int) *lossRateController { + return &lossRateController{ bitrate: initialRate, min: float64(minRate), max: float64(maxRate), @@ -23,17 +23,17 @@ func NewLossRateController(initialRate, minRate, maxRate int) *LossRateControlle } } -func (l *LossRateController) OnPacketAcked() { +func (l *lossRateController) onPacketAcked() { l.packetsSinceLastUpdate++ l.arrivedSinceLastUpdate++ } -func (l *LossRateController) OnPacketLost() { +func (l *lossRateController) onPacketLost() { l.packetsSinceLastUpdate++ l.lostSinceLastUpdate++ } -func (l *LossRateController) Update(lastDeliveryRate int) int { +func (l *lossRateController) update(lastDeliveryRate int) int { lossRate := float64(l.lostSinceLastUpdate) / float64(l.packetsSinceLastUpdate) var target float64 if lossRate > 0.1 { diff --git a/gcc/loss_rate_controller_test.go b/gcc/loss_rate_controller_test.go index ac881da..ad0a425 100644 --- a/gcc/loss_rate_controller_test.go +++ b/gcc/loss_rate_controller_test.go @@ -76,14 +76,14 @@ func TestLossRateController(t *testing.T) { } for i, tc := range cases { t.Run(fmt.Sprintf("%v", i), func(t *testing.T) { - lrc := NewLossRateController(tc.init, tc.min, tc.max) + lrc := newLossRateController(tc.init, tc.min, tc.max) for i := 0; i < tc.acked; i++ { - lrc.OnPacketAcked() + lrc.onPacketAcked() } for i := 0; i < tc.lost; i++ { - lrc.OnPacketLost() + lrc.onPacketLost() } - assert.Equal(t, tc.expectedRate, lrc.Update(tc.deliveredRate)) + assert.Equal(t, tc.expectedRate, lrc.update(tc.deliveredRate)) }) } } diff --git a/gcc/send_side_bwe.go b/gcc/send_side_bwe.go index d6f8653..1a66101 100644 --- a/gcc/send_side_bwe.go +++ b/gcc/send_side_bwe.go @@ -22,8 +22,8 @@ func Logger(l logging.LeveledLogger) Option { type SendSideController struct { log logging.LeveledLogger dre *deliveryRateEstimator - lbc *LossRateController - drc *DelayRateController + lbc *lossRateController + drc *delayRateController rate int highestAcked uint64 } @@ -32,8 +32,8 @@ func NewSendSideController(initialRate, minRate, maxRate int, opts ...Option) (* ssc := &SendSideController{ log: logging.NewDefaultLoggerFactory().NewLogger("bwe_send_side_controller"), dre: newDeliveryRateEstimator(time.Second), - lbc: NewLossRateController(initialRate, minRate, maxRate), - drc: NewDelayRateController(initialRate, logging.NewDefaultLoggerFactory().NewLogger("bwe_delay_rate_controller")), + lbc: newLossRateController(initialRate, minRate, maxRate), + drc: newDelayRateController(initialRate, logging.NewDefaultLoggerFactory().NewLogger("bwe_delay_rate_controller")), rate: initialRate, } for _, opt := range opts { @@ -57,19 +57,19 @@ func (c *SendSideController) OnAcks(arrival time.Time, rtt time.Duration, acks [ if ack.SeqNr > c.highestAcked { c.highestAcked = ack.SeqNr } - c.lbc.OnPacketAcked() + c.lbc.onPacketAcked() if !ack.Arrival.IsZero() { - c.dre.OnPacketAcked(ack.Arrival, int(ack.Size)) - c.drc.OnPacketAcked(ack) + c.dre.onPacketAcked(ack.Arrival, int(ack.Size)) + c.drc.onPacketAcked(ack) } } else { - c.lbc.OnPacketLost() + c.lbc.onPacketLost() } } - delivered := c.dre.GetRate() - lossTarget := c.lbc.Update(delivered) - delayTarget := c.drc.Update(arrival, delivered, rtt) + delivered := c.dre.getRate() + lossTarget := c.lbc.update(delivered) + delayTarget := c.drc.update(arrival, delivered, rtt) c.rate = min(lossTarget, delayTarget) c.log.Tracef("rtt=%v, delivered=%v, lossTarget=%v, delayTarget=%v, target=%v", rtt.Nanoseconds(), delivered, lossTarget, delayTarget, c.rate) return c.rate From 5362b62c9a7556793b552cff7e94f4b92b4a0bde Mon Sep 17 00:00:00 2001 From: Mathis Engelbart Date: Thu, 31 Jul 2025 16:39:24 +0200 Subject: [PATCH 11/11] Fix linter issues --- gcc/acknowledgment.go | 17 ++++--- gcc/arrival_group_accumulator.go | 4 ++ gcc/arrival_group_accumulator_test.go | 45 ++--------------- gcc/delay_rate_controller.go | 4 +- gcc/delivery_rate_estimator.go | 3 ++ gcc/kalman.go | 20 +++----- gcc/overuse_detector.go | 14 +++--- gcc/overuse_detector_test.go | 4 +- gcc/rate_controller.go | 3 ++ gcc/rtt.go | 3 ++ gcc/send_side_bwe.go | 20 +++++++- gcc/state.go | 69 +++++++++++++++++---------- 12 files changed, 108 insertions(+), 98 deletions(-) diff --git a/gcc/acknowledgment.go b/gcc/acknowledgment.go index efe0e9b..7fb40c4 100644 --- a/gcc/acknowledgment.go +++ b/gcc/acknowledgment.go @@ -8,25 +8,28 @@ import ( "time" ) +// ECN represents the ECN bits of an IP packet header. type ECN uint8 const ( - //nolint:misspell - // ECNNonECT signals Non ECN-Capable Transport, Non-ECT + // ECNNonECT signals Non ECN-Capable Transport, Non-ECT. + // nolint:misspell ECNNonECT ECN = iota // 00 - //nolint:misspell - // ECNECT1 signals ECN Capable Transport, ECT(0) + // ECNECT1 signals ECN Capable Transport, ECT(0). + // nolint:misspell ECNECT1 // 01 - //nolint:misspell - // ECNECT0 signals ECN Capable Transport, ECT(1) + // ECNECT0 signals ECN Capable Transport, ECT(1). + // nolint:misspell ECNECT0 // 10 - // ECNCE signals ECN Congestion Encountered, CE + // ECNCE signals ECN Congestion Encountered, CE. + // nolint:misspell ECNCE // 11 ) +// An Acknowledgment stores send and receive information about a packet. type Acknowledgment struct { SeqNr uint64 Size uint16 diff --git a/gcc/arrival_group_accumulator.go b/gcc/arrival_group_accumulator.go index a7c4d1a..a40520e 100644 --- a/gcc/arrival_group_accumulator.go +++ b/gcc/arrival_group_accumulator.go @@ -26,12 +26,14 @@ func newArrivalGroupAccumulator() *arrivalGroupAccumulator { func (a *arrivalGroupAccumulator) onPacketAcked(ack Acknowledgment) arrivalGroup { if len(a.next) == 0 { a.next = append(a.next, ack) + return nil } sendTimeDelta := ack.Departure.Sub(a.next[0].Departure) if sendTimeDelta < a.burstInterval { a.next = append(a.next, ack) + return nil } @@ -41,11 +43,13 @@ func (a *arrivalGroupAccumulator) onPacketAcked(ack Acknowledgment) arrivalGroup if propagationDelta < 0 && arrivalTimeDeltaLast <= a.burstInterval && arrivalTimeDeltaFirst < a.maxBurstDuration { a.next = append(a.next, ack) + return nil } group := make(arrivalGroup, len(a.next)) copy(group, a.next) a.next = arrivalGroup{ack} + return group } diff --git a/gcc/arrival_group_accumulator_test.go b/gcc/arrival_group_accumulator_test.go index 16befb5..656827a 100644 --- a/gcc/arrival_group_accumulator_test.go +++ b/gcc/arrival_group_accumulator_test.go @@ -34,13 +34,14 @@ func TestArrivalGroupAccumulator(t *testing.T) { }, triggerNewGroupElement, }, - exp: []arrivalGroup{{ + exp: []arrivalGroup{ { - Departure: time.Time{}, - Arrival: time.Time{}.Add(time.Millisecond), + { + Departure: time.Time{}, + Arrival: time.Time{}.Add(time.Millisecond), + }, }, }, - }, }, { name: "createsTwoElementGroup", @@ -102,42 +103,6 @@ func TestArrivalGroupAccumulator(t *testing.T) { }, }, }, - { - name: "createsTwoArrivalGroups2", - log: []Acknowledgment{ - { - Departure: time.Time{}, - Arrival: time.Time{}.Add(15 * time.Millisecond), - }, - { - Departure: time.Time{}.Add(3 * time.Millisecond), - Arrival: time.Time{}.Add(20 * time.Millisecond), - }, - { - Departure: time.Time{}.Add(9 * time.Millisecond), - Arrival: time.Time{}.Add(30 * time.Millisecond), - }, - triggerNewGroupElement, - }, - exp: []arrivalGroup{ - { - { - Departure: time.Time{}, - Arrival: time.Time{}.Add(15 * time.Millisecond), - }, - { - Departure: time.Time{}.Add(3 * time.Millisecond), - Arrival: time.Time{}.Add(20 * time.Millisecond), - }, - }, - { - { - Departure: time.Time{}.Add(9 * time.Millisecond), - Arrival: time.Time{}.Add(30 * time.Millisecond), - }, - }, - }, - }, { name: "ignoresOutOfOrderPackets", log: []Acknowledgment{ diff --git a/gcc/delay_rate_controller.go b/gcc/delay_rate_controller.go index 9bf7b87..3bdef07 100644 --- a/gcc/delay_rate_controller.go +++ b/gcc/delay_rate_controller.go @@ -44,6 +44,7 @@ func (c *delayRateController) onPacketAcked(ack Acknowledgment) { } if len(c.last) == 0 { c.last = next + return } @@ -59,7 +60,7 @@ func (c *delayRateController) onPacketAcked(ack Acknowledgment) { c.latestUsage = c.od.update(ack.Arrival, estimate, c.samples) c.last = next c.log.Tracef( - "ts=%v.%06d, seq=%v, size=%v, interArrivalTime=%v, interDepartureTime=%v, interGroupDelay=%v, estimate=%v, threshold=%v, usage=%v, state=%v", + "ts=%v.%06d, seq=%v, size=%v, interArrivalTime=%v, interDepartureTime=%v, interGroupDelay=%v, estimate=%v, threshold=%v, usage=%v, state=%v", // nolint c.last[0].Departure.UTC().Format("2006/01/02 15:04:05"), c.last[0].Departure.UTC().Nanosecond()/1e3, next[0].SeqNr, @@ -83,5 +84,6 @@ func groupSize(group arrivalGroup) int { for _, ack := range group { sum += int(ack.Size) } + return sum } diff --git a/gcc/delivery_rate_estimator.go b/gcc/delivery_rate_estimator.go index 2892942..2e176c1 100644 --- a/gcc/delivery_rate_estimator.go +++ b/gcc/delivery_rate_estimator.go @@ -31,11 +31,13 @@ func (d *deliveryRateHeap) Pop() any { n := len(old) x := old[n-1] *d = old[0 : n-1] + return x } // Push implements heap.Interface. func (d *deliveryRateHeap) Push(x any) { + // nolint *d = append(*d, x.(deliveryRateHeapItem)) } @@ -86,5 +88,6 @@ func (e *deliveryRateEstimator) getRate() int { return 0 } rate := 8 * float64(sum) / d.Seconds() + return int(rate) } diff --git a/gcc/kalman.go b/gcc/kalman.go index 1c77846..3079f31 100644 --- a/gcc/kalman.go +++ b/gcc/kalman.go @@ -16,24 +16,14 @@ type kalmanFilter struct { varNoise float64 } -type kalmanOption func(*kalmanFilter) - -func initSlope(e float64) kalmanOption { - return func(k *kalmanFilter) { - k.state[0] = e - } -} - -func newKalmanFilter(opts ...kalmanOption) *kalmanFilter { +func newKalmanFilter() *kalmanFilter { kf := &kalmanFilter{ state: [2]float64{8.0 / 512.0, 0}, processNoise: [2]float64{1e-13, 1e-3}, e: [2][2]float64{{100.0, 0}, {0, 1e-1}}, varNoise: 50.0, } - for _, opt := range opts { - opt(kf) - } + return kf } @@ -41,6 +31,7 @@ func (k *kalmanFilter) update(timeDelta float64, sizeDelta float64) float64 { k.e[0][0] += k.processNoise[0] k.e[1][1] += k.processNoise[1] + // nolint h := [2]float64{sizeDelta, 1.0} Eh := [2]float64{ k.e[0][0]*h[0] + k.e[0][1]*h[1], @@ -61,6 +52,7 @@ func (k *kalmanFilter) update(timeDelta float64, sizeDelta float64) float64 { denom := k.varNoise + h[0]*Eh[0] + h[1]*Eh[1] + // nolint K := [2]float64{ Eh[0] / denom, Eh[1] / denom, } @@ -78,8 +70,8 @@ func (k *kalmanFilter) update(timeDelta float64, sizeDelta float64) float64 { k.e[1][0] = e00*IKh[1][0] + k.e[1][0]*IKh[1][1] k.e[1][1] = e01*IKh[1][0] + k.e[1][1]*IKh[1][1] - k.state[0] = k.state[0] + K[0]*residual - k.state[1] = k.state[1] + K[1]*residual + k.state[0] += K[0] * residual + k.state[1] += K[1] * residual return k.state[1] } diff --git a/gcc/overuse_detector.go b/gcc/overuse_detector.go index 9b6fa9a..2235e4f 100644 --- a/gcc/overuse_detector.go +++ b/gcc/overuse_detector.go @@ -44,7 +44,8 @@ func (d *overuseDetector) update(ts time.Time, trend float64, numDeltas int) usa } modifiedTrend := float64(min(numDeltas, maxNumDeltas)) * trend - if modifiedTrend > d.delayThreshold { + switch { + case modifiedTrend > d.delayThreshold: if d.firstOverUse.IsZero() { d.firstOverUse = ts } @@ -52,16 +53,17 @@ func (d *overuseDetector) update(ts time.Time, trend float64, numDeltas int) usa d.firstOverUse = time.Time{} d.lastUsage = usageOver } - } else if modifiedTrend < -d.delayThreshold { + case modifiedTrend < -d.delayThreshold: d.firstOverUse = time.Time{} d.lastUsage = usageUnder - } else { + default: d.firstOverUse = time.Time{} d.lastUsage = usageNormal } if d.adaptiveThreshold { d.adaptThreshold(ts, modifiedTrend) } + return d.lastUsage } @@ -71,16 +73,14 @@ func (d *overuseDetector) adaptThreshold(ts time.Time, modifiedTrend float64) { } if math.Abs(modifiedTrend) > d.delayThreshold+15 { d.lastUpdate = ts + return } k := kU if math.Abs(modifiedTrend) < d.delayThreshold { k = kD } - delta := ts.Sub(d.lastUpdate) - if delta > 100*time.Millisecond { - delta = 100 * time.Millisecond - } + delta := min(ts.Sub(d.lastUpdate), 100*time.Millisecond) d.delayThreshold += k * (math.Abs(modifiedTrend) - d.delayThreshold) * float64(delta.Milliseconds()) d.delayThreshold = max(min(d.delayThreshold, 600.0), 6.0) d.lastUpdate = ts diff --git a/gcc/overuse_detector_test.go b/gcc/overuse_detector_test.go index a55368e..11299b2 100644 --- a/gcc/overuse_detector_test.go +++ b/gcc/overuse_detector_test.go @@ -126,8 +126,8 @@ func TestOveruseDetectorUpdate(t *testing.T) { od := newOveruseDetector(tc.adaptive) received := []usage{} for _, e := range tc.values { - usage := od.update(e.ts, e.estimate, e.numDeltas) - received = append(received, usage) + u := od.update(e.ts, e.estimate, e.numDeltas) + received = append(received, u) } assert.Equal(t, tc.expected, received) }) diff --git a/gcc/rate_controller.go b/gcc/rate_controller.go index 095eabf..2690984 100644 --- a/gcc/rate_controller.go +++ b/gcc/rate_controller.go @@ -62,6 +62,7 @@ func (c *rateController) canIncreaseMultiplicatively(deliveredRate float64) bool stdDev := math.Sqrt(c.lastDecrease.variance) lower := c.lastDecrease.average - 3*stdDev upper := c.lastDecrease.average + 3*stdDev + return deliveredRate < lower || deliveredRate > upper } @@ -69,11 +70,13 @@ func (c *rateController) multiplicativeIncrease(rate float64, window time.Durati exponent := min(window.Seconds(), 1.0) eta := math.Pow(1.08, exponent) target := eta * rate + return target } func (c *rateController) additiveIncrease(rate float64, expectedPacketSizeBits int, window time.Duration) float64 { alpha := 0.5 * min(window.Seconds(), 1.0) target := rate + max(1000, alpha*float64(expectedPacketSizeBits)) + return target } diff --git a/gcc/rtt.go b/gcc/rtt.go index be7d612..bb5a4e2 100644 --- a/gcc/rtt.go +++ b/gcc/rtt.go @@ -5,7 +5,10 @@ package gcc import "time" +// MeasureRTT measures the RTT. TODO(ME): Remove this function from this +// package? func MeasureRTT(reportSent, reportReceived, latestAckedSent, latestAckedArrival time.Time) time.Duration { pendingTime := reportSent.Sub(latestAckedArrival) + return reportReceived.Sub(latestAckedSent) - pendingTime } diff --git a/gcc/send_side_bwe.go b/gcc/send_side_bwe.go index 1a66101..ea33a39 100644 --- a/gcc/send_side_bwe.go +++ b/gcc/send_side_bwe.go @@ -9,16 +9,20 @@ import ( "github.com/pion/logging" ) +// Option is a functional option for a SendSideController. type Option func(*SendSideController) error +// Logger configures a custom logger for a SendSideController. func Logger(l logging.LeveledLogger) Option { return func(ssc *SendSideController) error { ssc.log = l ssc.drc.log = l + return nil } } +// SendSideController is a sender side congestion controller. type SendSideController struct { log logging.LeveledLogger dre *deliveryRateEstimator @@ -28,6 +32,8 @@ type SendSideController struct { highestAcked uint64 } +// NewSendSideController creates a new SendSideController with initial, min and +// max rates. func NewSendSideController(initialRate, minRate, maxRate int, opts ...Option) (*SendSideController, error) { ssc := &SendSideController{ log: logging.NewDefaultLoggerFactory().NewLogger("bwe_send_side_controller"), @@ -41,9 +47,13 @@ func NewSendSideController(initialRate, minRate, maxRate int, opts ...Option) (* return nil, err } } + return ssc, nil } +// OnAcks must be called when new acknowledgments arrive. arrival is the arrival +// time of the feedback, RTT is the last measured RTT and acks is a list of +// Acknowledgments contained in the latest feedback. func (c *SendSideController) OnAcks(arrival time.Time, rtt time.Duration, acks []Acknowledgment) int { if len(acks) == 0 { return c.rate @@ -71,6 +81,14 @@ func (c *SendSideController) OnAcks(arrival time.Time, rtt time.Duration, acks [ lossTarget := c.lbc.update(delivered) delayTarget := c.drc.update(arrival, delivered, rtt) c.rate = min(lossTarget, delayTarget) - c.log.Tracef("rtt=%v, delivered=%v, lossTarget=%v, delayTarget=%v, target=%v", rtt.Nanoseconds(), delivered, lossTarget, delayTarget, c.rate) + c.log.Tracef( + "rtt=%v, delivered=%v, lossTarget=%v, delayTarget=%v, target=%v", + rtt.Nanoseconds(), + delivered, + lossTarget, + delayTarget, + c.rate, + ) + return c.rate } diff --git a/gcc/state.go b/gcc/state.go index 04ef3bf..6d0a274 100644 --- a/gcc/state.go +++ b/gcc/state.go @@ -16,35 +16,52 @@ const ( func (s state) transition(u usage) state { switch s { case stateHold: - switch u { - case usageOver: - return stateDecrease - case usageNormal: - return stateIncrease - case usageUnder: - return stateHold - } - + return transitionFromHold(u) case stateIncrease: - switch u { - case usageOver: - return stateDecrease - case usageNormal: - return stateIncrease - case usageUnder: - return stateHold - } - + return transitionFromIncrease(u) case stateDecrease: - switch u { - case usageOver: - return stateDecrease - case usageNormal: - return stateHold - case usageUnder: - return stateHold - } + return transitionFromDecrease(u) + } + + return stateIncrease +} + +func transitionFromHold(u usage) state { + switch u { + case usageOver: + return stateDecrease + case usageNormal: + return stateIncrease + case usageUnder: + return stateHold + } + + return stateIncrease +} + +func transitionFromIncrease(u usage) state { + switch u { + case usageOver: + return stateDecrease + case usageNormal: + return stateIncrease + case usageUnder: + return stateHold } + + return stateIncrease +} + +func transitionFromDecrease(u usage) state { + switch u { + case usageOver: + return stateDecrease + case usageNormal: + return stateHold + case usageUnder: + return stateHold + } + return stateIncrease }