Skip to content

Commit

Permalink
RSDK-7403: Improve remote camera clients.
Browse files Browse the repository at this point in the history
  • Loading branch information
dgottlieb committed Aug 16, 2024
1 parent ea22dd9 commit a2a389a
Show file tree
Hide file tree
Showing 21 changed files with 1,755 additions and 883 deletions.
510 changes: 311 additions & 199 deletions components/camera/client.go

Large diffs are not rendered by default.

509 changes: 509 additions & 0 deletions components/camera/client_test.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions components/camera/fake/camera.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,8 @@ func (c *Camera) SubscribeRTP(
bufferSize int,
packetsCB rtppassthrough.PacketCallback,
) (rtppassthrough.Subscription, error) {
logging.Global().Warnf("SubscribeRTP FAKE START %s", c.Name().String())
defer logging.Global().Warnf("SubscribeRTP FAKE END %s", c.Name().String())
if !c.RTPPassthrough {
return rtppassthrough.NilSubscription, ErrRTPPassthroughNotEnabled
}
Expand Down
2 changes: 2 additions & 0 deletions components/camera/rtppassthrough/buffer.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ func (w *Buffer) Close() {
// where it will be run in the future.
// If the buffer is full, it returnns an error and does
// add the callback to the buffer.
//
// Dan: Public rtp.Packets and not callbacks? Until we've proved a need for more generality?
func (w *Buffer) Publish(cb func()) error {
rawErr := w.err.Load()

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ require (
github.com/pion/stun v0.6.1 // indirect
github.com/pion/transport/v2 v2.2.10 // indirect
github.com/pion/turn/v2 v2.1.6 // indirect
github.com/pion/webrtc/v3 v3.2.36 // indirect
github.com/pion/webrtc/v3 v3.2.42 // indirect
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect
github.com/pkg/profile v1.6.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1145,8 +1145,8 @@ github.com/pion/transport/v3 v3.0.7/go.mod h1:YleKiTZ4vqNxVwh77Z0zytYi7rXHl7j6uP
github.com/pion/turn/v2 v2.1.3/go.mod h1:huEpByKKHix2/b9kmTAM3YoX6MKP+/D//0ClgUYR2fY=
github.com/pion/turn/v2 v2.1.6 h1:Xr2niVsiPTB0FPtt+yAWKFUkU1eotQbGgpTIld4x1Gc=
github.com/pion/turn/v2 v2.1.6/go.mod h1:huEpByKKHix2/b9kmTAM3YoX6MKP+/D//0ClgUYR2fY=
github.com/pion/webrtc/v3 v3.2.36 h1:RM/miAv0M4TrhhS7h2mcZXt44K68WmpVDkUOgz2l2l8=
github.com/pion/webrtc/v3 v3.2.36/go.mod h1:wWQz1PuKNSNK4VrJJNpPN3vZmKEi4zA6i2ynaQOlxIU=
github.com/pion/webrtc/v3 v3.2.42 h1:WN/ZuMjtpQOoGRCZUg/zFG+JHEvYLVyDKOxU6H1qWlE=
github.com/pion/webrtc/v3 v3.2.42/go.mod h1:M1RAe3TNTD1tzyvqHrbVODfwdPGSXOUo/OgpoGGJqFY=
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 h1:KoWmjvw+nsYOo29YJK9vDA65RGE3NrOnUtO7a+RF9HU=
github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
8 changes: 8 additions & 0 deletions gostream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type Stream interface {
// Start starts processing frames.
Start()
WriteRTP(pkt *rtp.Packet) error
// VideoStreamSourceChanged()

// Ready signals that there is at least one client connected and that
// streams are ready for input. The returned context should be used for
Expand Down Expand Up @@ -161,6 +162,13 @@ func (bs *basicStream) Start() {
utils.ManagedGo(bs.processOutputAudioChunks, bs.activeBackgroundWorkers.Done)
}

func (bs *basicStream) VideoStreamSourceChanged() {
bs.videoTrackLocal.rtpTrack.StreamSourceChanged()
}

// NOTE: (Nick S) This only writes video RTP packets
// if we also need to support writing audio RTP packets, we should split
// this method into WriteVideoRTP and WriteAudioRTP
func (bs *basicStream) WriteRTP(pkt *rtp.Packet) error {
return bs.videoTrackLocal.rtpTrack.WriteRTP(pkt)
}
Expand Down
91 changes: 86 additions & 5 deletions gostream/webrtc_track.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ import (
"math"
"strings"
"sync"
"sync/atomic"
"time"

"github.com/pion/rtp"
"github.com/pion/rtp/codecs"
"github.com/viamrobotics/webrtc/v3"
"go.uber.org/multierr"
"go.viam.com/rdk/logging"
)

// Adapted from https://github.com/pion/webrtc/blob/master/track_local_static.go
Expand All @@ -34,15 +36,21 @@ type trackLocalStaticRTP struct {
bindings []trackBinding
codec webrtc.RTPCodecCapability
id, rid, streamID string

// testing
sequenceNumberOffset func(uint16) uint16
streamSourceChanged atomic.Bool
highestSequenceNumber uint16
}

// newtrackLocalStaticRTP returns a trackLocalStaticRTP.
func newtrackLocalStaticRTP(c webrtc.RTPCodecCapability, id, streamID string) *trackLocalStaticRTP {
return &trackLocalStaticRTP{
codec: c,
bindings: []trackBinding{},
id: id,
streamID: streamID,
sequenceNumberOffset: func(u uint16) uint16 { return u },
codec: c,
bindings: []trackBinding{},
id: id,
streamID: streamID,
}
}

Expand Down Expand Up @@ -112,13 +120,86 @@ func (s *trackLocalStaticRTP) Codec() webrtc.RTPCodecCapability {
return s.codec
}

func sequenceNumberOffset(lastMaxPacketSequenceNumber uint16, firstNewPacketSequenceNumber uint16) func(uint16) uint16 {
if lastMaxPacketSequenceNumber > firstNewPacketSequenceNumber {
logging.Global().Warnf("new sequenceNumberGenerator which is not identity lastMaxPacketSequenceNumber(%d), firstNewPacketSequenceNumber(%d)", lastMaxPacketSequenceNumber, firstNewPacketSequenceNumber)
// continue with prev
return func(sequenceNumber uint16) uint16 {
return lastMaxPacketSequenceNumber + 1 + sequenceNumber - firstNewPacketSequenceNumber
}

}
return func(u uint16) uint16 { return u }
}

func (s *trackLocalStaticRTP) StreamSourceChanged() {
s.streamSourceChanged.Store(true)
}

func wrapped(currentSequenceNumber uint16, hightestSequenceNumber uint16) bool {
// if the current sequence number is smaller than the currentHighestSequenceNumber by more
// than half the u16, assume that the sequence number has wrapped
if currentSequenceNumber > hightestSequenceNumber {
return false
}
return hightestSequenceNumber-currentSequenceNumber > math.MaxUint16/2
}

func (s *trackLocalStaticRTP) WriteRTP(p *rtp.Packet) error {
s.mu.RLock()
defer s.mu.RUnlock()

writeErrs := []error{}
outboundPacket := *p

for _, b := range s.bindings {
outboundPacket.Header.SSRC = uint32(b.ssrc)
outboundPacket.Header.PayloadType = uint8(b.payloadType)
if _, err := b.writeStream.WriteRTP(&outboundPacket.Header, outboundPacket.Payload); err != nil {
writeErrs = append(writeErrs, err)
}
}

return multierr.Combine(writeErrs...)
}

// WriteRTP writes a RTP Packet to the trackLocalStaticRTP
// If one PeerConnection fails the packets will still be sent to
// all PeerConnections. The error message will contain the ID of the failed
// PeerConnections so you can remove them.
func (s *trackLocalStaticRTP) WriteRTP(p *rtp.Packet) error {
func (s *trackLocalStaticRTP) WriteRTPModified(p *rtp.Packet) error {
s.mu.RLock()
defer s.mu.RUnlock()
originalSequenceNumber := p.Header.SequenceNumber

// create new generator if the stream has changed needed
if s.streamSourceChanged.CompareAndSwap(true, false) {
// TODO: Rollover
s.sequenceNumberOffset = sequenceNumberOffset(s.highestSequenceNumber, p.Header.SequenceNumber)
}

// Update the header's sequence number to
p.Header.SequenceNumber = s.sequenceNumberOffset(p.Header.SequenceNumber)

// set the currentHighestSequenceNumber to the current packet's sequence number
// if the packet's sequence number is greater than the current highest sequence number
// or if the sequence number wrapped
setHighest := false
if p.Header.SequenceNumber > s.highestSequenceNumber {
setHighest = true
}

if wrapped(p.Header.SequenceNumber, s.highestSequenceNumber) {
logging.Global().Warnf("updating highestSequenceNumber to %d due to wrapping, prevhighestSequenceNumber: %d, originalSN: %d", p.Header.SequenceNumber, s.highestSequenceNumber, originalSequenceNumber)
setHighest = true
}

if setHighest {
s.highestSequenceNumber = p.Header.SequenceNumber
} else {
logging.Global().Warnf("publishing out of order message p.Header.SequenceNumber(%d), originalSN: %d, highestSequenceNumber: %d", p.Header.SequenceNumber, originalSequenceNumber, s.highestSequenceNumber)
}
logging.Global().Debugf("SN: %d", p.Header.SequenceNumber)

writeErrs := []error{}
outboundPacket := *p
Expand Down
42 changes: 42 additions & 0 deletions grpc/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,19 @@ import (
"sync"

"github.com/viamrobotics/webrtc/v3"
"go.viam.com/rdk/logging"
"go.viam.com/utils/rpc"
"golang.org/x/exp/maps"
googlegrpc "google.golang.org/grpc"
)

// ReconfigurableClientConn allows for the underlying client connections to be swapped under the hood.
type ReconfigurableClientConn struct {
connMu sync.RWMutex
conn rpc.ClientConn

onTrackCBByTrackNameMu sync.Mutex
onTrackCBByTrackName map[string]OnTrackCB
}

// Return this constant such that backoff error logging can compare consecutive errors and reliably
Expand Down Expand Up @@ -57,8 +62,31 @@ func (c *ReconfigurableClientConn) NewStream(
// ReplaceConn replaces the underlying client connection with the connection passed in. This does not close the
// old connection, the caller is expected to close it if needed.
func (c *ReconfigurableClientConn) ReplaceConn(conn rpc.ClientConn) {
logging.Global().Info("ReplaceConn START")
defer logging.Global().Info("ReplaceConn END")
c.connMu.Lock()
c.conn = conn
// It is safe to access this without a mutex as it is only ever nil once at the beginning of the
// ReconfigurableClientConn's lifetime
if c.onTrackCBByTrackName == nil {
c.onTrackCBByTrackName = make(map[string]OnTrackCB)
}

if pc := conn.PeerConn(); pc != nil {
pc.OnTrack(func(trackRemote *webrtc.TrackRemote, rtpReceiver *webrtc.RTPReceiver) {
logging.Global().Warnf("OnTrack START %s pc: %p", trackRemote.StreamID(), pc)
defer logging.Global().Warnf("OnTrack END %s pc: %p", trackRemote.StreamID(), pc)
c.onTrackCBByTrackNameMu.Lock()
onTrackCB, ok := c.onTrackCBByTrackName[trackRemote.StreamID()]
c.onTrackCBByTrackNameMu.Unlock()
if !ok {
msg := "Callback not found for StreamID (trackName): %s, keys(resOnTrackCBs): %#v"
logging.Global().Errorf(msg, trackRemote.StreamID(), maps.Keys(c.onTrackCBByTrackName))
return
}
onTrackCB(trackRemote, rtpReceiver)
})
}
c.connMu.Unlock()
}

Expand All @@ -84,3 +112,17 @@ func (c *ReconfigurableClientConn) Close() error {
c.conn = nil
return conn.Close()
}

// AddOnTrackSub adds an OnTrack subscription for the track.
func (c *ReconfigurableClientConn) AddOnTrackSub(trackName string, onTrackCB OnTrackCB) {
c.onTrackCBByTrackNameMu.Lock()
defer c.onTrackCBByTrackNameMu.Unlock()
c.onTrackCBByTrackName[trackName] = onTrackCB
}

// RemoveOnTrackSub removes an OnTrack subscription for the track.
func (c *ReconfigurableClientConn) RemoveOnTrackSub(trackName string) {
c.onTrackCBByTrackNameMu.Lock()
defer c.onTrackCBByTrackNameMu.Unlock()
delete(c.onTrackCBByTrackName, trackName)
}
48 changes: 23 additions & 25 deletions grpc/shared_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package grpc
import (
"context"
"errors"
"fmt"
"net"
"sync"
"time"
Expand All @@ -15,10 +14,10 @@ import (
"go.uber.org/zap"
"go.viam.com/utils"
"go.viam.com/utils/rpc"
"golang.org/x/exp/maps"
googlegrpc "google.golang.org/grpc"

"go.viam.com/rdk/logging"
"go.viam.com/rdk/resource"
rutils "go.viam.com/rdk/utils"
)

Expand Down Expand Up @@ -80,8 +79,8 @@ type SharedConn struct {
// set to nil before this channel is closed.
peerConnFailed chan struct{}

resOnTrackMu sync.Mutex
resOnTrackCBs map[resource.Name]OnTrackCB
onTrackCBByTrackNameMu sync.Mutex
onTrackCBByTrackName map[string]OnTrackCB

logger logging.Logger
}
Expand All @@ -106,18 +105,18 @@ func (sc *SharedConn) NewStream(
return sc.grpcConn.NewStream(ctx, desc, method, opts...)
}

// AddOnTrackSub adds an OnTrack subscription for the resource.
func (sc *SharedConn) AddOnTrackSub(name resource.Name, onTrackCB OnTrackCB) {
sc.resOnTrackMu.Lock()
defer sc.resOnTrackMu.Unlock()
sc.resOnTrackCBs[name] = onTrackCB
// AddOnTrackSub adds an OnTrack subscription for the track.
func (sc *SharedConn) AddOnTrackSub(trackName string, onTrackCB OnTrackCB) {
sc.onTrackCBByTrackNameMu.Lock()
defer sc.onTrackCBByTrackNameMu.Unlock()
sc.onTrackCBByTrackName[trackName] = onTrackCB
}

// RemoveOnTrackSub removes an OnTrack subscription for the resource.
func (sc *SharedConn) RemoveOnTrackSub(name resource.Name) {
sc.resOnTrackMu.Lock()
defer sc.resOnTrackMu.Unlock()
delete(sc.resOnTrackCBs, name)
// RemoveOnTrackSub removes an OnTrack subscription for the track.
func (sc *SharedConn) RemoveOnTrackSub(trackName string) {
sc.onTrackCBByTrackNameMu.Lock()
defer sc.onTrackCBByTrackNameMu.Unlock()
delete(sc.onTrackCBByTrackName, trackName)
}

// GrpcConn returns a gRPC capable client connection.
Expand Down Expand Up @@ -159,9 +158,11 @@ func (sc *SharedConn) ResetConn(conn rpc.ClientConn, moduleLogger logging.Logger
sc.logger = moduleLogger.Sublogger("networking.conn")
}

if sc.resOnTrackCBs == nil {
// It is safe to access this without a mutex as it is only ever nil once at the beginning of the
// SharedConn's lifetime
if sc.onTrackCBByTrackName == nil {
// Same initilization argument as above with the logger.
sc.resOnTrackCBs = make(map[resource.Name]OnTrackCB)
sc.onTrackCBByTrackName = make(map[string]OnTrackCB)
}

sc.peerConnMu.Lock()
Expand Down Expand Up @@ -199,16 +200,12 @@ func (sc *SharedConn) ResetConn(conn rpc.ClientConn, moduleLogger logging.Logger
}

sc.peerConn.OnTrack(func(trackRemote *webrtc.TrackRemote, rtpReceiver *webrtc.RTPReceiver) {
name, err := resource.NewFromString(trackRemote.StreamID())
if err != nil {
sc.logger.Errorw("StreamID did not parse as a ResourceName", "sharedConn", fmt.Sprintf("%p", sc), "streamID", trackRemote.StreamID())
return
}
sc.resOnTrackMu.Lock()
onTrackCB, ok := sc.resOnTrackCBs[name]
sc.resOnTrackMu.Unlock()
sc.onTrackCBByTrackNameMu.Lock()
onTrackCB, ok := sc.onTrackCBByTrackName[trackRemote.StreamID()]
sc.onTrackCBByTrackNameMu.Unlock()
if !ok {
sc.logger.Errorw("Callback not found for StreamID", "sharedConn", fmt.Sprintf("%p", sc), "streamID", trackRemote.StreamID())
msg := "Callback not found for StreamID: %s, keys(resOnTrackCBs): %#v"
sc.logger.Errorf(msg, trackRemote.StreamID(), maps.Keys(sc.onTrackCBByTrackName))
return
}
onTrackCB(trackRemote, rtpReceiver)
Expand Down Expand Up @@ -339,6 +336,7 @@ func NewLocalPeerConnection(logger logging.Logger) (*webrtc.PeerConnection, erro

return false
})
settingEngine.DisableSRTPReplayProtection(true)

options := []func(a *webrtc.API){webrtc.WithMediaEngine(&m), webrtc.WithInterceptorRegistry(&i)}
if utils.Debug {
Expand Down
9 changes: 9 additions & 0 deletions grpc/tracker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package grpc

// Tracker allows callback functions to a WebRTC peer connection's OnTrack callback
// function by track name.
// Both grpc.SharedConn and grpc.ReconfigurableClientConn implement tracker.
type Tracker interface {
AddOnTrackSub(trackName string, onTrackCB OnTrackCB)
RemoveOnTrackSub(trackName string)
}
20 changes: 20 additions & 0 deletions grpc/tracker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package grpc

import (
"reflect"
"testing"

"go.viam.com/test"
)

func TestTrackerImplementations(t *testing.T) {
tracker := reflect.TypeOf((*Tracker)(nil)).Elem()

t.Run("*ReconfigurableClientConn should implement Tracker", func(t *testing.T) {
test.That(t, reflect.TypeOf(&ReconfigurableClientConn{}).Implements(tracker), test.ShouldBeTrue)
})

t.Run("*SharedConn should implement Tracker", func(t *testing.T) {
test.That(t, reflect.TypeOf(&SharedConn{}).Implements(tracker), test.ShouldBeTrue)
})
}
Loading

0 comments on commit a2a389a

Please sign in to comment.