Skip to content

Commit 96a2e1a

Browse files
Sean-Dercptpcrd
andcommitted
Don't drop packets when probing Simulcast
Before any packets that we read during the probe would get lost Co-authored-by: cptpcrd <[email protected]>
1 parent ed9f7fa commit 96a2e1a

File tree

7 files changed

+141
-34
lines changed

7 files changed

+141
-34
lines changed

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ require (
1313
github.com/pion/rtp v1.8.25
1414
github.com/pion/sctp v1.8.40
1515
github.com/pion/sdp/v3 v3.0.16
16-
github.com/pion/srtp/v3 v3.0.8
16+
github.com/pion/srtp/v3 v3.0.9
1717
github.com/pion/stun/v3 v3.0.1
1818
github.com/pion/transport/v3 v3.1.1
1919
github.com/pion/turn/v4 v4.1.3

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,8 @@ github.com/pion/sctp v1.8.40 h1:bqbgWYOrUhsYItEnRObUYZuzvOMsVplS3oNgzedBlG8=
5959
github.com/pion/sctp v1.8.40/go.mod h1:SPBBUENXE6ThkEksN5ZavfAhFYll+h+66ZiG6IZQuzo=
6060
github.com/pion/sdp/v3 v3.0.16 h1:0dKzYO6gTAvuLaAKQkC02eCPjMIi4NuAr/ibAwrGDCo=
6161
github.com/pion/sdp/v3 v3.0.16/go.mod h1:9tyKzznud3qiweZcD86kS0ff1pGYB3VX+Bcsmkx6IXo=
62-
github.com/pion/srtp/v3 v3.0.8 h1:RjRrjcIeQsilPzxvdaElN0CpuQZdMvcl9VZ5UY9suUM=
63-
github.com/pion/srtp/v3 v3.0.8/go.mod h1:2Sq6YnDH7/UDCvkSoHSDNDeyBcFgWL0sAVycVbAsXFg=
62+
github.com/pion/srtp/v3 v3.0.9 h1:lRGF4G61xxj+m/YluB3ZnBpiALSri2lTzba0kGZMrQY=
63+
github.com/pion/srtp/v3 v3.0.9/go.mod h1:E+AuWd7Ug2Fp5u38MKnhduvpVkveXJX6J4Lq4rxUYt8=
6464
github.com/pion/stun/v3 v3.0.1 h1:jx1uUq6BdPihF0yF33Jj2mh+C9p0atY94IkdnW174kA=
6565
github.com/pion/stun/v3 v3.0.1/go.mod h1:RHnvlKFg+qHgoKIqtQWMOJF52wsImCAf/Jh5GjX+4Tw=
6666
github.com/pion/transport/v3 v3.1.1 h1:Tr684+fnnKlhPceU+ICdrw6KKkTms+5qHMgw6bIkYOM=

peerconnection.go

Lines changed: 12 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ import (
1212
"crypto/rand"
1313
"errors"
1414
"fmt"
15-
"io"
1615
"slices"
1716
"strconv"
1817
"strings"
@@ -1688,7 +1687,7 @@ func (pc *PeerConnection) handleNonMediaBandwidthProbe() {
16881687
}
16891688
}
16901689

1691-
func (pc *PeerConnection) handleIncomingSSRC(rtpStream io.Reader, ssrc SSRC) error { //nolint:gocyclo,gocognit,cyclop
1690+
func (pc *PeerConnection) handleIncomingSSRC(rtpStream *srtp.ReadStreamSRTP, ssrc SSRC) error { //nolint:gocyclo,gocognit,cyclop,lll
16921691
remoteDescription := pc.RemoteDescription()
16931692
if remoteDescription == nil {
16941693
return errPeerConnRemoteDescriptionNil
@@ -1725,7 +1724,7 @@ func (pc *PeerConnection) handleIncomingSSRC(rtpStream io.Reader, ssrc SSRC) err
17251724
// We read the RTP packet to determine the payload type
17261725
b := make([]byte, pc.api.settingEngine.getReceiveMTU())
17271726

1728-
i, err := rtpStream.Read(b)
1727+
i, err := rtpStream.Peek(b)
17291728
if err != nil {
17301729
return err
17311730
}
@@ -1802,6 +1801,8 @@ func (pc *PeerConnection) handleIncomingSSRC(rtpStream io.Reader, ssrc SSRC) err
18021801
return err
18031802
}
18041803

1804+
peekedPackets := []*peekedPacket{}
1805+
18051806
// if the first packet didn't contain simuilcast IDs, then probe more packets
18061807
var paddingOnly bool
18071808
for readCount := 0; readCount <= simulcastProbeCount; readCount++ {
@@ -1811,11 +1812,16 @@ func (pc *PeerConnection) handleIncomingSSRC(rtpStream io.Reader, ssrc SSRC) err
18111812
readCount--
18121813
}
18131814

1814-
i, _, err := interceptor.Read(b, nil)
1815+
i, attributes, err := interceptor.Read(b, nil)
18151816
if err != nil {
18161817
return err
18171818
}
18181819

1820+
peekedPackets = append(peekedPackets, &peekedPacket{
1821+
payload: slices.Clone(b[:i]),
1822+
attributes: attributes,
1823+
})
1824+
18191825
if paddingOnly, err = handleUnknownRTPPacket(
18201826
b[:i], uint8(midExtensionID), //nolint:gosec // G115
18211827
uint8(streamIDExtensionID), //nolint:gosec // G115
@@ -1851,6 +1857,7 @@ func (pc *PeerConnection) handleIncomingSSRC(rtpStream io.Reader, ssrc SSRC) err
18511857
interceptor,
18521858
rtcpReadStream,
18531859
rtcpInterceptor,
1860+
peekedPackets,
18541861
)
18551862
if err != nil {
18561863
return err
@@ -1930,7 +1937,7 @@ func (pc *PeerConnection) undeclaredRTPMediaProcessor() { //nolint:cyclop
19301937
continue
19311938
}
19321939

1933-
go func(rtpStream io.Reader, ssrc SSRC) {
1940+
go func(rtpStream *srtp.ReadStreamSRTP, ssrc SSRC) {
19341941
if err := pc.handleIncomingSSRC(rtpStream, ssrc); err != nil {
19351942
pc.log.Errorf(incomingUnhandledRTPSsrc, ssrc, err)
19361943
}

peerconnection_media_test.go

Lines changed: 105 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"bufio"
1111
"bytes"
1212
"context"
13+
"crypto/rand"
1314
"errors"
1415
"fmt"
1516
"io"
@@ -2062,14 +2063,13 @@ func TestPeerConnection_Simulcast_RTX(t *testing.T) { //nolint:cyclop
20622063
assert.NotZero(t, ridID)
20632064
assert.NotZero(t, rsid)
20642065

2065-
err = signalPairWithModification(pcOffer, pcAnswer, func(sdp string) string {
2066+
assert.NoError(t, signalPairWithModification(pcOffer, pcAnswer, func(sdp string) string {
20662067
// Original chrome sdp contains no ssrc info https://pastebin.com/raw/JTjX6zg6
20672068
re := regexp.MustCompile("(?m)[\r\n]+^.*a=ssrc.*$")
20682069
res := re.ReplaceAllString(sdp, "")
20692070

20702071
return res
2071-
})
2072-
assert.NoError(t, err)
2072+
}))
20732073

20742074
// padding only packets should not affect simulcast probe
20752075
var sequenceNumber uint16
@@ -2493,3 +2493,105 @@ func Test_PeerConnection_RTX_E2E(t *testing.T) { //nolint:cyclop
24932493
closePairNow(t, pcOffer, pcAnswer)
24942494
assert.NoError(t, wan.Stop())
24952495
}
2496+
2497+
// Simulcast probe would lose packets. This test asserts that we don't drop
2498+
// any packets during the probe.
2499+
func TestPeerConnection_Simulcast_Probe_PacketLoss(t *testing.T) { //nolint:cyclop
2500+
lim := test.TimeOut(time.Second * 30)
2501+
defer lim.Stop()
2502+
2503+
report := test.CheckRoutines(t)
2504+
defer report()
2505+
2506+
const rtpPktCount = 10
2507+
pcOffer, pcAnswer, wan := createVNetPair(t, nil)
2508+
2509+
rids := []string{"a", "b", "c"}
2510+
vp8WriterA, err := NewTrackLocalStaticRTP(
2511+
RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID(rids[0]),
2512+
)
2513+
assert.NoError(t, err)
2514+
2515+
vp8WriterB, err := NewTrackLocalStaticRTP(
2516+
RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID(rids[1]),
2517+
)
2518+
assert.NoError(t, err)
2519+
2520+
vp8WriterC, err := NewTrackLocalStaticRTP(
2521+
RTPCodecCapability{MimeType: MimeTypeVP8}, "video", "pion2", WithRTPStreamID(rids[2]),
2522+
)
2523+
assert.NoError(t, err)
2524+
2525+
sender, err := pcOffer.AddTrack(vp8WriterA)
2526+
assert.NoError(t, err)
2527+
assert.NotNil(t, sender)
2528+
2529+
assert.NoError(t, sender.AddEncoding(vp8WriterB))
2530+
assert.NoError(t, sender.AddEncoding(vp8WriterC))
2531+
2532+
expectedBuffer := make([]byte, outboundMTU*rtpPktCount)
2533+
_, err = rand.Read(expectedBuffer)
2534+
assert.NoError(t, err)
2535+
2536+
ctx, cancel := context.WithCancel(context.Background())
2537+
pcAnswer.OnTrack(func(trackRemote *TrackRemote, _ *RTPReceiver) {
2538+
actualBuffer := []byte{}
2539+
2540+
for i := 0; i < rtpPktCount; i++ {
2541+
pkt, _, err := trackRemote.ReadRTP()
2542+
assert.NoError(t, err)
2543+
2544+
actualBuffer = append(actualBuffer, pkt.Payload...)
2545+
}
2546+
2547+
assert.Equal(t, actualBuffer, expectedBuffer)
2548+
cancel()
2549+
})
2550+
2551+
var midID, ridID uint8
2552+
for _, extension := range sender.GetParameters().HeaderExtensions {
2553+
switch extension.URI {
2554+
case sdp.SDESMidURI:
2555+
midID = uint8(extension.ID) //nolint:gosec // G115
2556+
case sdp.SDESRTPStreamIDURI:
2557+
ridID = uint8(extension.ID) //nolint:gosec // G115
2558+
}
2559+
}
2560+
assert.NotZero(t, midID)
2561+
assert.NotZero(t, ridID)
2562+
2563+
assert.NoError(t, signalPairWithModification(pcOffer, pcAnswer, func(sdp string) string {
2564+
// Original chrome sdp contains no ssrc info https://pastebin.com/raw/JTjX6zg6
2565+
re := regexp.MustCompile("(?m)[\r\n]+^.*a=ssrc.*$")
2566+
res := re.ReplaceAllString(sdp, "")
2567+
2568+
return res
2569+
}))
2570+
2571+
peerConnectionConnected := untilConnectionState(PeerConnectionStateConnected, pcOffer, pcAnswer)
2572+
peerConnectionConnected.Wait()
2573+
2574+
for sequenceNumber := uint16(0); sequenceNumber < rtpPktCount; sequenceNumber++ {
2575+
pkt := &rtp.Packet{
2576+
Header: rtp.Header{
2577+
Version: 2,
2578+
PayloadType: 96,
2579+
SequenceNumber: sequenceNumber,
2580+
},
2581+
}
2582+
2583+
// Make sure that packets for Stream received before MID/RID don't get dropped
2584+
if sequenceNumber > 3 {
2585+
assert.NoError(t, pkt.SetExtension(midID, []byte("0")))
2586+
assert.NoError(t, pkt.SetExtension(ridID, []byte(vp8WriterA.RID())))
2587+
}
2588+
2589+
offset := int(sequenceNumber) * outboundMTU
2590+
pkt.Payload = expectedBuffer[offset : offset+outboundMTU]
2591+
assert.NoError(t, vp8WriterA.WriteRTP(pkt))
2592+
}
2593+
2594+
<-ctx.Done()
2595+
assert.NoError(t, wan.Stop())
2596+
closePairNow(t, pcOffer, pcAnswer)
2597+
}

rtpreceiver.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -540,6 +540,7 @@ func (r *RTPReceiver) receiveForRid(
540540
rtpInterceptor interceptor.RTPReader,
541541
rtcpReadStream *srtp.ReadStreamSRTCP,
542542
rtcpInterceptor interceptor.RTCPReader,
543+
peekedPackets []*peekedPacket,
543544
) (*TrackRemote, error) {
544545
r.mu.Lock()
545546
defer r.mu.Unlock()
@@ -551,6 +552,7 @@ func (r *RTPReceiver) receiveForRid(
551552
r.tracks[i].track.codec = params.Codecs[0]
552553
r.tracks[i].track.params = params
553554
r.tracks[i].track.ssrc = SSRC(streamInfo.SSRC)
555+
r.tracks[i].track.peekedPackets = peekedPackets
554556
r.tracks[i].track.mu.Unlock()
555557

556558
r.tracks[i].streamInfo = streamInfo

track_local_static_test.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -827,8 +827,7 @@ func Test_TrackRemote_ReadRTP_UnmarshalError(t *testing.T) {
827827
tr := newTrackRemote(RTPCodecTypeVideo, 0, 0, "", recv)
828828

829829
tr.mu.Lock()
830-
tr.peeked = []byte{0x80, 96}
831-
tr.peekedAttributes = nil
830+
tr.peekedPackets = []*peekedPacket{{payload: []byte{0x80, 96}}}
832831
tr.mu.Unlock()
833832

834833
pkt, attrs, err := tr.ReadRTP()

track_remote.go

Lines changed: 18 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,11 @@ import (
1515
"github.com/pion/rtp"
1616
)
1717

18+
type peekedPacket struct {
19+
payload []byte
20+
attributes interceptor.Attributes
21+
}
22+
1823
// TrackRemote represents a single inbound source of media.
1924
type TrackRemote struct {
2025
mu sync.RWMutex
@@ -30,9 +35,9 @@ type TrackRemote struct {
3035
params RTPParameters
3136
rid string
3237

33-
receiver *RTPReceiver
34-
peeked []byte
35-
peekedAttributes interceptor.Attributes
38+
receiver *RTPReceiver
39+
40+
peekedPackets []*peekedPacket
3641

3742
audioPlayoutStatsProviders []AudioPlayoutStatsProvider
3843
}
@@ -116,25 +121,18 @@ func (t *TrackRemote) Codec() RTPCodecParameters {
116121
func (t *TrackRemote) Read(b []byte) (n int, attributes interceptor.Attributes, err error) {
117122
t.mu.RLock()
118123
receiver := t.receiver
119-
peeked := t.peeked != nil
124+
var peekedPkt *peekedPacket
125+
if len(t.peekedPackets) != 0 {
126+
peekedPkt = t.peekedPackets[0]
127+
t.peekedPackets = t.peekedPackets[1:]
128+
}
120129
t.mu.RUnlock()
121130

122-
if peeked {
123-
t.mu.Lock()
124-
data := t.peeked
125-
attributes = t.peekedAttributes
126-
127-
t.peeked = nil
128-
t.peekedAttributes = nil
129-
t.mu.Unlock()
130-
// someone else may have stolen our packet when we
131-
// released the lock. Deal with it.
132-
if data != nil {
133-
n = copy(b, data)
134-
err = t.checkAndUpdateTrack(b)
131+
if peekedPkt != nil {
132+
n = copy(b, peekedPkt.payload)
133+
err = t.checkAndUpdateTrack(b)
135134

136-
return n, attributes, err
137-
}
135+
return n, peekedPkt.attributes, err
138136
}
139137

140138
// If there's a separate RTX track and an RTX packet is available, return that
@@ -212,8 +210,7 @@ func (t *TrackRemote) peek(b []byte) (n int, a interceptor.Attributes, err error
212210
// that case.
213211
data := make([]byte, n)
214212
n = copy(data, b[:n])
215-
t.peeked = data
216-
t.peekedAttributes = a
213+
t.peekedPackets = append(t.peekedPackets, &peekedPacket{payload: data, attributes: a})
217214
t.mu.Unlock()
218215

219216
return

0 commit comments

Comments
 (0)