Skip to content

Commit 9e1cdba

Browse files
committed
wip
1 parent 8ec39cc commit 9e1cdba

File tree

3 files changed

+42
-21
lines changed

3 files changed

+42
-21
lines changed

components/camera/client.go

+10-4
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ import (
88
"image"
99
"io"
1010
"os"
11-
"runtime/debug"
1211
"sync"
1312
"sync/atomic"
1413
"time"
@@ -97,7 +96,6 @@ func NewClientFromConn(
9796
associatedSubs: map[int][]rtppassthrough.SubscriptionID{},
9897
logger: logger,
9998
}
100-
debug.PrintStack()
10199
logger.Infof("%p cameraClient %#v", c, c)
102100
return cc, nil
103101
}
@@ -484,7 +482,11 @@ func (c *client) SubscribeRTP(
484482
// PeerConnection renegotiation to add this camera's video track and have the `OnTrack`
485483
// callback invoked.
486484
c.logger.Infof("%p SubscribeRTP calling AddStream on %s", c, c.trackName())
487-
if _, err := c.streamClient.AddStream(ctx, &streampb.AddStreamRequest{Name: c.trackName()}); err != nil {
485+
486+
if _, err := c.streamClient.AddStream(
487+
ctx,
488+
&streampb.AddStreamRequest{Name: c.trackName()},
489+
); err != nil {
488490
c.logger.CDebugf(ctx, "%p SubscribeRTP AddStream hit error subID: %s, trackName: %s, err: %s", c, sub.ID.String(), c.trackName(), err.Error())
489491
return rtppassthrough.NilSubscription, err
490492
}
@@ -643,7 +645,11 @@ func (c *client) Unsubscribe(ctx context.Context, id rtppassthrough.Subscription
643645
request := &streampb.RemoveStreamRequest{Name: c.trackName()}
644646
// We assume the server responds with a success if the requested `Name` is unknown/already
645647
// removed.
646-
if _, err := c.streamClient.RemoveStream(ctx, request); err != nil {
648+
649+
if _, err := c.streamClient.RemoveStream(
650+
ctx,
651+
request,
652+
); err != nil {
647653
c.logger.CWarnw(ctx, "Unsubscribe RemoveStream returned err", "trackName",
648654
c.trackName(), "subID", id.String(), "err", err)
649655
c.rtpPassthroughMu.Unlock()

grpc/shared_conn.go

+17-15
Original file line numberDiff line numberDiff line change
@@ -95,22 +95,22 @@ func NewSharedConn(grpcConn rpc.ClientConn, peerConn *webrtc.PeerConnection, log
9595
// We were passed in a ready connection. Only create this for when `Close` is called.
9696
peerConnFailed: make(chan struct{}),
9797
onTrackCBByTrackName: make(map[string]OnTrackCB),
98+
logger: logger,
9899
}
99-
ret.ResetConn(&ret.grpcConn, logger)
100-
// peerConn.OnTrack(func(trackRemote *webrtc.TrackRemote, rtpReceiver *webrtc.RTPReceiver) {
101-
// ret.logger.Warnf("OnTrack called on pc: %p, %#v, trackRemote: %p, %#v, rtpReceiver: %p, %#v", peerConn, peerConn, trackRemote, trackRemote, rtpReceiver, rtpReceiver)
102-
// ret.onTrackCBByTrackNameMu.Lock()
103-
// onTrackCB, ok := ret.onTrackCBByTrackName[trackRemote.StreamID()]
104-
// ret.onTrackCBByTrackNameMu.Unlock()
105-
// if !ok {
106-
// msg := "Callback not found for StreamID: %s, keys(resOnTrackCBs): %#v"
107-
// ret.logger.Errorf(msg, trackRemote.StreamID(), maps.Keys(ret.onTrackCBByTrackName))
108-
// return
109-
// }
110-
// onTrackCB(trackRemote, rtpReceiver)
111-
// })
112-
113-
// debug.PrintStack()
100+
ret.grpcConn.ReplaceConn(grpcConn)
101+
ret.logger.Infof("OnTrack installed on %p", peerConn)
102+
ret.peerConn.OnTrack(func(trackRemote *webrtc.TrackRemote, rtpReceiver *webrtc.RTPReceiver) {
103+
ret.logger.Info("OnTrack called:", trackRemote.ID())
104+
ret.onTrackCBByTrackNameMu.Lock()
105+
onTrackCB, ok := ret.onTrackCBByTrackName[trackRemote.StreamID()]
106+
ret.onTrackCBByTrackNameMu.Unlock()
107+
if !ok {
108+
msg := "Callback not found for StreamID: %s, keys(resOnTrackCBs): %#v"
109+
ret.logger.Errorf(msg, trackRemote.StreamID(), maps.Keys(ret.onTrackCBByTrackName))
110+
return
111+
}
112+
onTrackCB(trackRemote, rtpReceiver)
113+
})
114114

115115
return ret
116116
}
@@ -184,6 +184,7 @@ func (sc *SharedConn) PeerConn() *webrtc.PeerConnection {
184184
// happens. But subequent calls can be entirely asynchronous to components/services accessing
185185
// `SharedConn` for connection objects.
186186
func (sc *SharedConn) ResetConn(conn rpc.ClientConn, moduleLogger logging.Logger) {
187+
moduleLogger.Infof("ResetConn called on %p, sharedConn: %p", conn, sc)
187188
sc.grpcConn.ReplaceConn(conn)
188189
if sc.logger == nil {
189190
// The first call to `ResetConn` happens before anything can access `sc.logger`. So long as
@@ -226,6 +227,7 @@ func (sc *SharedConn) ResetConn(conn rpc.ClientConn, moduleLogger logging.Logger
226227
return
227228
}
228229

230+
sc.logger.Infof("shared conn: %p, peerConnection: %p", sc, peerConn)
229231
sc.peerConn = peerConn
230232
sc.peerConnReady, _, err = rpc.ConfigureForRenegotiation(peerConn, rpc.PeerRoleClient, sc.logger)
231233
if err != nil {

robot/web/web.go

+15-2
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"goji.io"
3333
"goji.io/pat"
3434
googlegrpc "google.golang.org/grpc"
35+
"google.golang.org/grpc/metadata"
3536

3637
"go.viam.com/rdk/config"
3738
"go.viam.com/rdk/gostream"
@@ -218,6 +219,18 @@ func (svc *webService) StartModule(ctx context.Context) error {
218219
if !strings.HasPrefix(info.FullMethod, "/proto.stream.v1.StreamService") {
219220
return handler(ctx, req)
220221
}
222+
svc.logger.Infof("middleware START")
223+
defer svc.logger.Infof("middleware END")
224+
crn := metadata.ValueFromIncomingContext(ctx, "calling_resource_name")
225+
svc.logger.Infof("NICK!!! crn: %#v", crn)
226+
if len(crn) != 1 {
227+
return handler(ctx, req)
228+
}
229+
230+
callerResourceName, err := resource.NewFromString(crn[0])
231+
if err != nil {
232+
return handler(ctx, req)
233+
}
221234
svc.logger.Infof("ctx before: %#v", ctx)
222235
svc.logger.Infof("req: %#v", req)
223236
svc.logger.Infof("info: %#v", info)
@@ -245,7 +258,7 @@ func (svc *webService) StartModule(ctx context.Context) error {
245258
return handler(ctx, req)
246259
}
247260
svc.resourceNameToPeerConnectionMu.Lock()
248-
pc, ok := svc.resourceNameToPeerConnectionMap[name]
261+
pc, ok := svc.resourceNameToPeerConnectionMap[callerResourceName.String()]
249262
svc.resourceNameToPeerConnectionMu.Unlock()
250263

251264
if !ok {
@@ -258,7 +271,7 @@ func (svc *webService) StartModule(ctx context.Context) error {
258271
return handler(ctx, req)
259272
}
260273
ctx = rpc.UnsafeContextWithPeerConnection(ctx, pc)
261-
svc.logger.Infof("ctx after: %#v", ctx)
274+
svc.logger.Infof("ctx after: %#v, callerResourceName: %s, pc: %p", ctx, callerResourceName, pc)
262275
return handler(ctx, req)
263276
}
264277
unaryInterceptors = append(unaryInterceptors, f)

0 commit comments

Comments
 (0)