Skip to content

Commit beba940

Browse files
Sean-Dercptpcrd
andcommitted
Don't drop packets when probing Simulcast
Before any packets that we read during the probe would get lost Co-authored-by: cptpcrd <[email protected]>
1 parent ed9f7fa commit beba940

File tree

6 files changed

+139
-33
lines changed

6 files changed

+139
-33
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ require (
1313
github.com/pion/rtp v1.8.25
1414
github.com/pion/sctp v1.8.40
1515
github.com/pion/sdp/v3 v3.0.16
16-
github.com/pion/srtp/v3 v3.0.8
16+
github.com/pion/srtp/v3 v3.0.9
1717
github.com/pion/stun/v3 v3.0.1
1818
github.com/pion/transport/v3 v3.1.1
1919
github.com/pion/turn/v4 v4.1.3

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ github.com/pion/sctp v1.8.40 h1:bqbgWYOrUhsYItEnRObUYZuzvOMsVplS3oNgzedBlG8=
5959
github.com/pion/sctp v1.8.40/go.mod h1:SPBBUENXE6ThkEksN5ZavfAhFYll+h+66ZiG6IZQuzo=
6060
github.com/pion/sdp/v3 v3.0.16 h1:0dKzYO6gTAvuLaAKQkC02eCPjMIi4NuAr/ibAwrGDCo=
6161
github.com/pion/sdp/v3 v3.0.16/go.mod h1:9tyKzznud3qiweZcD86kS0ff1pGYB3VX+Bcsmkx6IXo=
62-
github.com/pion/srtp/v3 v3.0.8 h1:RjRrjcIeQsilPzxvdaElN0CpuQZdMvcl9VZ5UY9suUM=
63-
github.com/pion/srtp/v3 v3.0.8/go.mod h1:2Sq6YnDH7/UDCvkSoHSDNDeyBcFgWL0sAVycVbAsXFg=
62+
github.com/pion/srtp/v3 v3.0.9 h1:lRGF4G61xxj+m/YluB3ZnBpiALSri2lTzba0kGZMrQY=
63+
github.com/pion/srtp/v3 v3.0.9/go.mod h1:E+AuWd7Ug2Fp5u38MKnhduvpVkveXJX6J4Lq4rxUYt8=
6464
github.com/pion/stun/v3 v3.0.1 h1:jx1uUq6BdPihF0yF33Jj2mh+C9p0atY94IkdnW174kA=
6565
github.com/pion/stun/v3 v3.0.1/go.mod h1:RHnvlKFg+qHgoKIqtQWMOJF52wsImCAf/Jh5GjX+4Tw=
6666
github.com/pion/transport/v3 v3.1.1 h1:Tr684+fnnKlhPceU+ICdrw6KKkTms+5qHMgw6bIkYOM=

peerconnection.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
"crypto/rand"
1313
"errors"
1414
"fmt"
15-
"io"
1615
"slices"
1716
"strconv"
1817
"strings"
@@ -1688,7 +1687,7 @@ func (pc *PeerConnection) handleNonMediaBandwidthProbe() {
16881687
}
16891688
}
16901689

1691-
func (pc *PeerConnection) handleIncomingSSRC(rtpStream io.Reader, ssrc SSRC) error { //nolint:gocyclo,gocognit,cyclop
1690+
func (pc *PeerConnection) handleIncomingSSRC(rtpStream *srtp.ReadStreamSRTP, ssrc SSRC) error { //nolint:gocyclo,gocognit,cyclop
16921691
remoteDescription := pc.RemoteDescription()
16931692
if remoteDescription == nil {
16941693
return errPeerConnRemoteDescriptionNil
@@ -1725,7 +1724,7 @@ func (pc *PeerConnection) handleIncomingSSRC(rtpStream io.Reader, ssrc SSRC) err
17251724
// We read the RTP packet to determine the payload type
17261725
b := make([]byte, pc.api.settingEngine.getReceiveMTU())
17271726

1728-
i, err := rtpStream.Read(b)
1727+
i, err := rtpStream.Peek(b)
17291728
if err != nil {
17301729
return err
17311730
}
@@ -1930,7 +1929,7 @@ func (pc *PeerConnection) undeclaredRTPMediaProcessor() { //nolint:cyclop
19301929
continue
19311930
}
19321931

1933-
go func(rtpStream io.Reader, ssrc SSRC) {
1932+
go func(rtpStream *srtp.ReadStreamSRTP, ssrc SSRC) {
19341933
if err := pc.handleIncomingSSRC(rtpStream, ssrc); err != nil {
19351934
pc.log.Errorf(incomingUnhandledRTPSsrc, ssrc, err)
19361935
}

peerconnection_media_test.go

Lines changed: 113 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"bufio"
1111
"bytes"
1212
"context"
13+
"crypto/rand"
1314
"errors"
1415
"fmt"
1516
"io"
@@ -2062,14 +2063,13 @@ func TestPeerConnection_Simulcast_RTX(t *testing.T) { //nolint:cyclop
20622063
assert.NotZero(t, ridID)
20632064
assert.NotZero(t, rsid)
20642065

2065-
err = signalPairWithModification(pcOffer, pcAnswer, func(sdp string) string {
2066+
assert.NoError(t, signalPairWithModification(pcOffer, pcAnswer, func(sdp string) string {
20662067
// Original chrome sdp contains no ssrc info https://pastebin.com/raw/JTjX6zg6
20672068
re := regexp.MustCompile("(?m)[\r\n]+^.*a=ssrc.*$")
20682069
res := re.ReplaceAllString(sdp, "")
20692070

20702071
return res
2071-
})
2072-
assert.NoError(t, err)
2072+
}))
20732073

20742074
// padding only packets should not affect simulcast probe
20752075
var sequenceNumber uint16
@@ -2493,3 +2493,113 @@ func Test_PeerConnection_RTX_E2E(t *testing.T) { //nolint:cyclop
24932493
closePairNow(t, pcOffer, pcAnswer)
24942494
assert.NoError(t, wan.Stop())
24952495
}
2496+
2497+
// Simulcast probe would lose packets. This test asserts that we don't drop
2498+
// any packets during the probe
2499+
func TestPeerConnection_Simulcast_Probe_PacketLoss(t *testing.T) { //nolint:cyclop
2500+
t.Skip()
2501+
2502+
lim := test.TimeOut(time.Second * 30)
2503+
defer lim.Stop()
2504+
2505+
report := test.CheckRoutines(t)
2506+
defer report()
2507+
2508+
const rtpPktCount = 10
2509+
pcOffer, pcAnswer, wan := createVNetPair(t, nil)
2510+
2511+
rids := []string{"a", "b", "c"}
2512+
vp8WriterA, err := NewTrackLocalStaticRTP(
2513+
RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID(rids[0]),
2514+
)
2515+
assert.NoError(t, err)
2516+
2517+
vp8WriterB, err := NewTrackLocalStaticRTP(
2518+
RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID(rids[1]),
2519+
)
2520+
assert.NoError(t, err)
2521+
2522+
vp8WriterC, err := NewTrackLocalStaticRTP(
2523+
RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID(rids[2]),
2524+
)
2525+
assert.NoError(t, err)
2526+
2527+
sender, err := pcOffer.AddTrack(vp8WriterA)
2528+
assert.NoError(t, err)
2529+
assert.NotNil(t, sender)
2530+
2531+
assert.NoError(t, sender.AddEncoding(vp8WriterB))
2532+
assert.NoError(t, sender.AddEncoding(vp8WriterC))
2533+
2534+
expectedBuffers := [][]byte{
2535+
make([]byte, outboundMTU*rtpPktCount),
2536+
make([]byte, outboundMTU*rtpPktCount),
2537+
make([]byte, outboundMTU*rtpPktCount),
2538+
}
2539+
for i := range expectedBuffers {
2540+
_, err := rand.Read(expectedBuffers[i])
2541+
assert.NoError(t, err)
2542+
}
2543+
2544+
pcAnswer.OnTrack(func(trackRemote *TrackRemote, _ *RTPReceiver) {
2545+
actualBuffer := []byte{}
2546+
2547+
for i := 0; i < rtpPktCount; i++ {
2548+
pkt, _, err := trackRemote.ReadRTP()
2549+
assert.NoError(t, err)
2550+
2551+
actualBuffer = append(actualBuffer, pkt.Payload...)
2552+
fmt.Println(pkt.SequenceNumber)
2553+
}
2554+
2555+
assert.Equal(t, actualBuffer, expectedBuffers[0])
2556+
})
2557+
2558+
var midID, ridID uint8
2559+
for _, extension := range sender.GetParameters().HeaderExtensions {
2560+
switch extension.URI {
2561+
case sdp.SDESMidURI:
2562+
midID = uint8(extension.ID) //nolint:gosec // G115
2563+
case sdp.SDESRTPStreamIDURI:
2564+
ridID = uint8(extension.ID) //nolint:gosec // G115
2565+
}
2566+
}
2567+
assert.NotZero(t, midID)
2568+
assert.NotZero(t, ridID)
2569+
2570+
assert.NoError(t, signalPairWithModification(pcOffer, pcAnswer, func(sdp string) string {
2571+
// Original chrome sdp contains no ssrc info https://pastebin.com/raw/JTjX6zg6
2572+
re := regexp.MustCompile("(?m)[\r\n]+^.*a=ssrc.*$")
2573+
res := re.ReplaceAllString(sdp, "")
2574+
2575+
return res
2576+
}))
2577+
2578+
peerConnectionConnected := untilConnectionState(PeerConnectionStateConnected, pcOffer, pcAnswer)
2579+
peerConnectionConnected.Wait()
2580+
2581+
for sequenceNumber := uint16(0); sequenceNumber < rtpPktCount; sequenceNumber++ {
2582+
for i, track := range []*TrackLocalStaticRTP{vp8WriterA} {
2583+
pkt := &rtp.Packet{
2584+
Header: rtp.Header{
2585+
Version: 2,
2586+
PayloadType: 96,
2587+
SequenceNumber: sequenceNumber,
2588+
},
2589+
}
2590+
2591+
// Make sure that packets for Stream received before MID/RID don't get dropped
2592+
if sequenceNumber > 3 {
2593+
assert.NoError(t, pkt.SetExtension(midID, []byte("0")))
2594+
assert.NoError(t, pkt.SetExtension(ridID, []byte(track.RID())))
2595+
}
2596+
2597+
offset := int(sequenceNumber) * outboundMTU
2598+
pkt.Payload = expectedBuffers[i][offset : offset+outboundMTU]
2599+
assert.NoError(t, track.WriteRTP(pkt))
2600+
}
2601+
}
2602+
2603+
assert.NoError(t, wan.Stop())
2604+
closePairNow(t, pcOffer, pcAnswer)
2605+
}

track_local_static_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -827,8 +827,7 @@ func Test_TrackRemote_ReadRTP_UnmarshalError(t *testing.T) {
827827
tr := newTrackRemote(RTPCodecTypeVideo, 0, 0, "", recv)
828828

829829
tr.mu.Lock()
830-
tr.peeked = []byte{0x80, 96}
831-
tr.peekedAttributes = nil
830+
tr.peekedPackets = []*peekedPacket{{payload: []byte{0x80, 96}}}
832831
tr.mu.Unlock()
833832

834833
pkt, attrs, err := tr.ReadRTP()

track_remote.go

Lines changed: 19 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@ import (
1515
"github.com/pion/rtp"
1616
)
1717

18+
type peekedPacket struct {
19+
payload []byte
20+
attributes interceptor.Attributes
21+
}
22+
1823
// TrackRemote represents a single inbound source of media.
1924
type TrackRemote struct {
2025
mu sync.RWMutex
@@ -30,9 +35,9 @@ type TrackRemote struct {
3035
params RTPParameters
3136
rid string
3237

33-
receiver *RTPReceiver
34-
peeked []byte
35-
peekedAttributes interceptor.Attributes
38+
receiver *RTPReceiver
39+
40+
peekedPackets []*peekedPacket
3641

3742
audioPlayoutStatsProviders []AudioPlayoutStatsProvider
3843
}
@@ -116,25 +121,19 @@ func (t *TrackRemote) Codec() RTPCodecParameters {
116121
func (t *TrackRemote) Read(b []byte) (n int, attributes interceptor.Attributes, err error) {
117122
t.mu.RLock()
118123
receiver := t.receiver
119-
peeked := t.peeked != nil
124+
var peekedPkt *peekedPacket
125+
if len(t.peekedPackets) != 0 {
126+
peekedPkt = t.peekedPackets[0]
127+
t.peekedPackets = t.peekedPackets[1:]
128+
}
120129
t.mu.RUnlock()
121130

122-
if peeked {
123-
t.mu.Lock()
124-
data := t.peeked
125-
attributes = t.peekedAttributes
126-
127-
t.peeked = nil
128-
t.peekedAttributes = nil
129-
t.mu.Unlock()
130-
// someone else may have stolen our packet when we
131-
// released the lock. Deal with it.
132-
if data != nil {
133-
n = copy(b, data)
134-
err = t.checkAndUpdateTrack(b)
131+
if peekedPkt != nil {
132+
n = copy(b, peekedPkt.payload)
133+
attributes = peekedPkt.attributes
134+
err = t.checkAndUpdateTrack(b)
135135

136-
return n, attributes, err
137-
}
136+
return
138137
}
139138

140139
// If there's a separate RTX track and an RTX packet is available, return that
@@ -212,8 +211,7 @@ func (t *TrackRemote) peek(b []byte) (n int, a interceptor.Attributes, err error
212211
// that case.
213212
data := make([]byte, n)
214213
n = copy(data, b[:n])
215-
t.peeked = data
216-
t.peekedAttributes = a
214+
t.peekedPackets = append(t.peekedPackets, &peekedPacket{payload: data, attributes: a})
217215
t.mu.Unlock()
218216

219217
return

0 commit comments

Comments
 (0)