Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
nicksanford committed May 10, 2024
1 parent 74df8c1 commit 3751f6c
Show file tree
Hide file tree
Showing 8 changed files with 46 additions and 41 deletions.
15 changes: 6 additions & 9 deletions components/camera/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"go.viam.com/rdk/data"
"go.viam.com/rdk/gostream"
"go.viam.com/rdk/grpc"
rdkgrpc "go.viam.com/rdk/grpc"
"go.viam.com/rdk/logging"
"go.viam.com/rdk/pointcloud"
"go.viam.com/rdk/protoutils"
Expand Down Expand Up @@ -464,7 +463,10 @@ func (c *client) SubscribeRTP(
sc.AddOnTrackSub(c.Name(), c.addOnTrackSubFunc(trackReceived, trackClosed, sub.ID))
// remove the OnTrackSub once we either fail or succeed
defer sc.RemoveOnTrackSub(c.Name())
if _, err := c.streamClient.AddStream(ctx, &streampb.AddStreamRequest{Name: c.Name().Name}); err != nil {

// c.logger.CDebugw(ctx, "SubscribeRTP calling AddStream", "subID", sub.ID.String(), "c.Name()", c.Name(), "c.Name().Name", c.Name().Name, "resource.RemoveRemoteName(c.Name())", resource.RemoveRemoteName(c.Name()), "resource.RemoveRemoteName(c.Name()).String()", resource.RemoveRemoteName(c.Name()), "err", err)

if _, err := c.streamClient.AddStream(ctx, &streampb.AddStreamRequest{Name: resource.RemoveRemoteName(c.Name()).ShortName()}); err != nil {
c.logger.CDebugw(ctx, "SubscribeRTP AddStream hit error", "subID", sub.ID.String(), "name", c.Name(), "err", err)
return rtppassthrough.NilSubscription, err
}
Expand Down Expand Up @@ -520,7 +522,7 @@ func (c *client) SubscribeRTP(
func (c *client) addOnTrackSubFunc(
trackReceived, trackClosed chan struct{},
parentID rtppassthrough.SubscriptionID,
) rdkgrpc.OnTrackCB {
) grpc.OnTrackCB {
return func(tr *webrtc.TrackRemote, r *webrtc.RTPReceiver) {
close(trackReceived)
c.activeBackgroundWorkers.Add(1)
Expand Down Expand Up @@ -590,11 +592,6 @@ func (c *client) Unsubscribe(ctx context.Context, id rtppassthrough.Subscription
return ErrNoPeerConnection
}

_, ok := c.conn.(*rdkgrpc.SharedConn)
if !ok {
c.mu.Unlock()
return ErrNoSharedPeerConnection
}
c.logger.CDebugw(ctx, "Unsubscribe called with", "name", c.Name(), "subID", id.String())

bufAndCB, ok := c.bufAndCBByID[id]
Expand All @@ -606,7 +603,7 @@ func (c *client) Unsubscribe(ctx context.Context, id rtppassthrough.Subscription

if len(c.bufAndCBByID) == 1 {
c.logger.CDebugw(ctx, "Unsubscribe calling RemoveStream", "name", c.Name(), "subID", id.String())
if _, err := c.streamClient.RemoveStream(ctx, &streampb.RemoveStreamRequest{Name: c.Name().String()}); err != nil {
if _, err := c.streamClient.RemoveStream(ctx, &streampb.RemoveStreamRequest{Name: resource.RemoveRemoteName(c.Name()).ShortName()}); err != nil {
c.logger.CWarnw(ctx, "Unsubscribe RemoveStream returned err", "name", c.Name(), "subID", id.String(), "err", err)
c.mu.Unlock()
return err
Expand Down
7 changes: 3 additions & 4 deletions components/camera/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -772,15 +772,14 @@ func TestMultiplexOverRemoteConnection(t *testing.T) {
test.That(t, image, test.ShouldNotBeNil)
greenLog(t, "got images")

recvPktsCtx, recvPktsFn := context.WithCancel(context.Background())
sub, err := cameraClient.(rtppassthrough.Source).SubscribeRTP(mainCtx, 4096, func(pkts []*rtp.Packet) {
t.Log("got em")
t.FailNow()
recvPktsFn()
})
test.That(t, err, test.ShouldBeNil)
<-recvPktsCtx.Done()
greenLog(t, "got packets")

time.Sleep(time.Second)

err = cameraClient.(rtppassthrough.Source).Unsubscribe(mainCtx, sub.ID)
test.That(t, err, test.ShouldBeNil)
greenLog(t, "unsubscribe")
Expand Down
16 changes: 8 additions & 8 deletions grpc/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/pion/webrtc/v3"
"go.viam.com/rdk/resource"
"go.viam.com/utils/rpc"
"golang.org/x/exp/maps"
googlegrpc "google.golang.org/grpc"
)

Expand Down Expand Up @@ -72,16 +73,15 @@ func (c *ReconfigurableClientConn) ReplaceConn(conn rpc.ClientConn) {
golog.Global().Infof("DBG. OnTrack added. PC: %p\n", pc)
pc.OnTrack(func(trackRemote *webrtc.TrackRemote, rtpReceiver *webrtc.RTPReceiver) {
golog.Global().Infof("DBG. OnTrack called. PC: %p\n", pc)
name, err := resource.NewFromString(trackRemote.StreamID())
if err != nil {
golog.Global().Errorf("StreamID did not parse as a StreamID ResourceName: %s", trackRemote.StreamID())
return
}
c.resOnTrackMu.Lock()

// hack to get around import cycle
// needs to be kept in sync with caemra.Named()
name := resource.NewName(resource.APINamespaceRDK.WithComponentType("camera"), trackRemote.StreamID())
onTrackCB, ok := c.resOnTrackCBs[name]
c.resOnTrackMu.Unlock()
if !ok {
golog.Global().Errorf("Callback not found for StreamID: %s", trackRemote.StreamID())
golog.Global().Fatalf("Callback not found for StreamID: %s, keys(resOnTrackCBs): %#v", trackRemote.StreamID(), maps.Keys(c.resOnTrackCBs))
return
}
onTrackCB(trackRemote, rtpReceiver)
Expand Down Expand Up @@ -117,12 +117,12 @@ func (c *ReconfigurableClientConn) Close() error {
func (c *ReconfigurableClientConn) AddOnTrackSub(name resource.Name, onTrackCB OnTrackCB) {
c.resOnTrackMu.Lock()
defer c.resOnTrackMu.Unlock()
c.resOnTrackCBs[name] = onTrackCB
c.resOnTrackCBs[resource.RemoveRemoteName(name)] = onTrackCB
}

// RemoveOnTrackSub removes an OnTrack subscription for the resource.
func (c *ReconfigurableClientConn) RemoveOnTrackSub(name resource.Name) {
c.resOnTrackMu.Lock()
defer c.resOnTrackMu.Unlock()
delete(c.resOnTrackCBs, name)
delete(c.resOnTrackCBs, resource.RemoveRemoteName(name))
}
12 changes: 12 additions & 0 deletions resource/name.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,15 @@ func (n Name) String() string {
}
return name
}

// SDPTrackName returns a valid SDP video/audio track name as defined in RFC 4566 (https://www.rfc-editor.org/rfc/rfc4566)
// where track names should not include colons.
func SDPTrackName(name Name) string {
return strings.ReplaceAll(name.ShortName(), ":", "+")
}

// // SDPTrackName returns a valid SDP video/audio track name as defined in RFC 4566 (https://www.rfc-editor.org/rfc/rfc4566)
// // where track names should not include colons.
// func SDPTrackNameToResourceName(string) (Name, error) {
// return strings.ReplaceAll(name.String(), ":", "+")
// }
4 changes: 3 additions & 1 deletion robot/web/stream/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
streampb "go.viam.com/api/stream/v1"
"go.viam.com/utils"
"go.viam.com/utils/rpc"
"golang.org/x/exp/maps"

"go.viam.com/rdk/gostream"
"go.viam.com/rdk/logging"
Expand Down Expand Up @@ -127,7 +128,7 @@ func (ss *Server) AddStream(ctx context.Context, req *streampb.AddStreamRequest)

// return error if there is no stream for that camera
if !ok {
err := fmt.Errorf("no stream for %q", req.Name)
err := fmt.Errorf("no stream for %q, available streams: %#v", req.Name, maps.Keys(ss.nameToStreamState))
ss.logger.Error(err.Error())
return nil, err
}
Expand Down Expand Up @@ -222,6 +223,7 @@ func (ss *Server) AddStream(ctx context.Context, req *streampb.AddStreamRequest)

// if the stream supports video, add the video track
if trackLocal, haveTrackLocal := streamStateToAdd.Stream.VideoTrackLocal(); haveTrackLocal {
ss.logger.Infof("AddStream calling addTrack on trackLocal.StreamID(): %s", trackLocal.StreamID())
if err := addTrack(trackLocal); err != nil {
ss.logger.Error(err.Error())
return nil, err
Expand Down
13 changes: 6 additions & 7 deletions robot/web/stream/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package state
import (
"context"
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -423,13 +422,13 @@ func (ss *StreamState) restart(ctx context.Context) {
}

func (ss *StreamState) streamH264Passthrough(ctx context.Context) error {
for _, n := range camera.NamesFromRobot(ss.robot) {
ss.logger.CInfof(ctx, "NICK: %s", n)
}
n := strings.ReplaceAll(camera.Named(ss.Stream.Name()).ShortName(), "+", ":")
ss.logger.CInfof(ctx, "NICK: n: %s", n)
// for _, n := range camera.NamesFromRobot(ss.robot) {
// ss.logger.CInfof(ctx, "NICK: %s", n)
// }
// n := strings.ReplaceAll(camera.Named(ss.Stream.Name()).ShortName(), "+", ":")
// ss.logger.CInfof(ctx, "NICK: n: %s", n)

cam, err := camera.FromRobot(ss.robot, n)
cam, err := camera.FromRobot(ss.robot, ss.Stream.Name())
if err != nil {
return err
}
Expand Down
6 changes: 0 additions & 6 deletions robot/web/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,6 @@ func hasManagedAuthHandlers(handlers []config.AuthHandlerConfig) bool {
return false
}

// validSDPTrackName returns a valid SDP video/audio track name as defined in RFC 4566 (https://www.rfc-editor.org/rfc/rfc4566)
// where track names should not include colons.
func validSDPTrackName(name string) string {
return strings.ReplaceAll(name, ":", "+")
}

// A Service controls the web server for a robot.
type Service interface {
resource.Resource
Expand Down
14 changes: 8 additions & 6 deletions robot/web/web_c.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,18 +287,20 @@ func (svc *webService) startAudioStream(ctx context.Context, source gostream.Aud

// refreshVideoSources checks and initializes every possible video source that could be viewed from the robot.
func (svc *webService) refreshVideoSources() {
for _, name := range camera.ResourceNamesFromRobot(svc.r) {
cam, err := robot.ResourceFromRobot[camera.Camera](svc.r, name)
for _, name := range camera.NamesFromRobot(svc.r) {
cam, err := camera.FromRobot(svc.r, name)
// for _, name := range camera.ResourceNamesFromRobot(svc.r) {
// cam, err := robot.ResourceFromRobot[camera.Camera](svc.r, name)
if err != nil {
continue
}
existing, ok := svc.videoSources[validSDPTrackName(name)]
existing, ok := svc.videoSources[resource.SDPTrackName(cam.Name())]
if ok {
existing.Swap(cam)
continue
}
newSwapper := gostream.NewHotSwappableVideoSource(cam)
svc.videoSources[validSDPTrackName(name)] = newSwapper
svc.videoSources[resource.SDPTrackName(cam.Name())] = newSwapper
}
}

Expand All @@ -309,13 +311,13 @@ func (svc *webService) refreshAudioSources() {
if err != nil {
continue
}
existing, ok := svc.audioSources[validSDPTrackName(name)]
existing, ok := svc.audioSources[resource.SDPTrackName(input.Name())]
if ok {
existing.Swap(input)
continue
}
newSwapper := gostream.NewHotSwappableAudioSource(input)
svc.audioSources[validSDPTrackName(name)] = newSwapper
svc.audioSources[resource.SDPTrackName(input.Name())] = newSwapper
}
}

Expand Down

0 comments on commit 3751f6c

Please sign in to comment.