Skip to content

Commit 3ed3cc1

Browse files
authored
RSDK-9839: Have Go module clients be webrtc aware. (#4751)
1 parent 1d4d42e commit 3ed3cc1

File tree

4 files changed

+114
-12
lines changed

4 files changed

+114
-12
lines changed

components/camera/client.go

+11-3
Original file line numberDiff line numberDiff line change
@@ -661,9 +661,16 @@ func (c *client) Unsubscribe(ctx context.Context, id rtppassthrough.Subscription
661661
}
662662

663663
func (c *client) trackName() string {
664-
// if c.conn is a *grpc.SharedConn then the client
665-
// is talking to a module and we need to send the fully qualified name
666-
if _, ok := c.conn.(*grpc.SharedConn); ok {
664+
// if c.conn is a *grpc.SharedConn then, this is being used for communication between a
665+
// viam-server and module. The viam-server will have one SharedConn and the module will have a
666+
// different one.
667+
//
668+
// When asking a module to start a video stream, we create a track name with the full resource
669+
// name (i.e: rdk:components:camera/foo).
670+
//
671+
// Modules talking back to the viam-server for a camera stream should use the "short
672+
// name"/`SDPTrackName` (i.e: `foo`).
673+
if sc, ok := c.conn.(*grpc.SharedConn); ok && sc.IsConnectedToModule() {
667674
return c.Name().String()
668675
}
669676

@@ -673,6 +680,7 @@ func (c *client) trackName() string {
673680
// as the remote doesn't know it's own name from the perspective of the main part
674681
return c.Name().PopRemote().SDPTrackName()
675682
}
683+
676684
// in this case we are talking to a main part & the remote name (if it exists) needs to be preserved
677685
return c.Name().SDPTrackName()
678686
}

grpc/shared_conn.go

+70-7
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ import (
2525
// a resource may register with a SharedConn which supports WebRTC.
2626
type OnTrackCB func(tr *webrtc.TrackRemote, r *webrtc.RTPReceiver)
2727

28+
//nolint
29+
// The following describes the SharedConn lifetime for viam-server's modmanager communicating with
30+
// modules it has spawned.
31+
//
2832
// SharedConn wraps both a GRPC connection & (optionally) a peer connection & controls access to both.
2933
// For modules, the grpc connection is over a Unix socket. The WebRTC `PeerConnection` is made
3034
// separately. The `SharedConn` continues to implement the `rpc.ClientConn` interface by pairing up
@@ -71,20 +75,70 @@ type SharedConn struct {
7175
// `peerConnMu` synchronizes changes to the underlying `peerConn`. Such that calls consecutive
7276
// calls to `GrpcConn` and `PeerConn` will return connections from the same (or newer, but not
7377
// prior) "generations".
74-
peerConnMu sync.Mutex
75-
peerConn *webrtc.PeerConnection
76-
peerConnReady <-chan struct{}
77-
peerConnClosed <-chan struct{}
78+
peerConnMu sync.Mutex
79+
peerConn *webrtc.PeerConnection
80+
peerConnReady <-chan struct{}
7881
// peerConnFailed gets closed when a PeerConnection fails to connect. The peerConn pointer is
7982
// set to nil before this channel is closed.
8083
peerConnFailed chan struct{}
8184

8285
onTrackCBByTrackNameMu sync.Mutex
8386
onTrackCBByTrackName map[string]OnTrackCB
8487

88+
// isConnectedToViamServer identifies whether this SharedConn is running inside a viam-server
89+
// talking to a module, or a module talking to a viam-server. We use this to determine whether
90+
// to use long names or short names for PeerConnection video (or audio) track names.
91+
isConnectedToViamServer bool
92+
8593
logger logging.Logger
8694
}
8795

96+
// NewSharedConnForModule acts as a constructor for `SharedConn` for modules that are communicating
97+
// back to their parent viam-server.
98+
func NewSharedConnForModule(grpcConn rpc.ClientConn, peerConn *webrtc.PeerConnection, logger logging.Logger) *SharedConn {
99+
// We must be passed a ready connection.
100+
pcReady := make(chan struct{})
101+
close(pcReady)
102+
103+
ret := &SharedConn{
104+
peerConn: peerConn,
105+
peerConnReady: pcReady,
106+
// We were passed in a ready connection. Only create this for when `Close` is called.
107+
peerConnFailed: make(chan struct{}),
108+
onTrackCBByTrackName: make(map[string]OnTrackCB),
109+
isConnectedToViamServer: true,
110+
logger: logger,
111+
}
112+
ret.grpcConn.ReplaceConn(grpcConn)
113+
114+
ret.peerConn.OnTrack(func(trackRemote *webrtc.TrackRemote, rtpReceiver *webrtc.RTPReceiver) {
115+
ret.onTrackCBByTrackNameMu.Lock()
116+
onTrackCB, ok := ret.onTrackCBByTrackName[trackRemote.StreamID()]
117+
ret.onTrackCBByTrackNameMu.Unlock()
118+
if !ok {
119+
msg := "Callback not found for StreamID: %s, keys(resOnTrackCBs): %#v"
120+
ret.logger.Errorf(msg, trackRemote.StreamID(), maps.Keys(ret.onTrackCBByTrackName))
121+
return
122+
}
123+
onTrackCB(trackRemote, rtpReceiver)
124+
})
125+
126+
return ret
127+
}
128+
129+
// IsConnectedToModule returns whether this shared conn is being used to communicate with a module.
130+
func (sc *SharedConn) IsConnectedToModule() bool {
131+
return !sc.isConnectedToViamServer
132+
}
133+
134+
// IsConnectedToViamServer returns whether this shared conn is being used to communicate with a
135+
// viam-server. Note this implies the client is running within a module process. Typical
136+
// clients/remote connections are a pure webrtc connection. As opposed to a frankenstein tcp/unix
137+
// socket + webrtc connection.
138+
func (sc *SharedConn) IsConnectedToViamServer() bool {
139+
return sc.isConnectedToViamServer
140+
}
141+
88142
// Invoke forwards to the underlying GRPC Connection.
89143
func (sc *SharedConn) Invoke(
90144
ctx context.Context,
@@ -143,8 +197,8 @@ func (sc *SharedConn) PeerConn() *webrtc.PeerConnection {
143197
return ret
144198
}
145199

146-
// ResetConn acts as a constructor for `SharedConn`. ResetConn replaces the underlying
147-
// connection objects in addition to some other initialization.
200+
// ResetConn acts as a constructor for `SharedConn` inside the viam-server (not modules). ResetConn
201+
// replaces the underlying connection objects in addition to some other initialization.
148202
//
149203
// The first call to `ResetConn` is guaranteed to happen before any access to connection objects
150204
// happens. But subequent calls can be entirely asynchronous to components/services accessing
@@ -193,7 +247,16 @@ func (sc *SharedConn) ResetConn(conn rpc.ClientConn, moduleLogger logging.Logger
193247
}
194248

195249
sc.peerConn = peerConn
196-
sc.peerConnReady, sc.peerConnClosed, err = rpc.ConfigureForRenegotiation(peerConn, rpc.PeerRoleClient, sc.logger)
250+
// When communicating with modules, we may both call `AddStream` on the module (we are the
251+
// client) _and_ the module may call `AddStream` on us (for example, to stream video data from a
252+
// different module). Thus we must use the `PeerRoleServer` role such that we install an
253+
// `OnNegotiationNeeded` callback for when the stream server handle for `AddStream` attempts to
254+
// call `peerConn.AddTrack`.
255+
//
256+
// That said, it's not been rigorously exercised that when both ends initiate a renegotiation
257+
// for different video tracks, that we end up in a state where both video tracks are
258+
// successfully created.
259+
sc.peerConnReady, _, err = rpc.ConfigureForRenegotiation(peerConn, rpc.PeerRoleServer, sc.logger)
197260
if err != nil {
198261
sc.logger.Warnw("Unable to create optional renegotiation channel for module. Ignoring.", "err", err)
199262
return

module/module.go

+3
Original file line numberDiff line numberDiff line change
@@ -395,6 +395,9 @@ func (m *Module) connectParent(ctx context.Context) error {
395395
}
396396

397397
m.parent = rc
398+
if m.pc != nil {
399+
m.parent.SetPeerConnection(m.pc)
400+
}
398401
return nil
399402
}
400403

robot/client/client.go

+30-2
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
1717
"github.com/jhump/protoreflect/desc"
1818
"github.com/jhump/protoreflect/grpcreflect"
19+
"github.com/viamrobotics/webrtc/v3"
1920
"go.uber.org/multierr"
2021
"go.uber.org/zap"
2122
"go.uber.org/zap/zapcore"
@@ -112,6 +113,9 @@ type RobotClient struct {
112113
// webrtc. We don't want a network disconnect to result in reconnecting over tcp such that
113114
// performance would be impacted.
114115
serverIsWebrtcEnabled bool
116+
117+
pc *webrtc.PeerConnection
118+
sharedConn *grpc.SharedConn
115119
}
116120

117121
// RemoteTypeName is the type name used for a remote. This is for internal use.
@@ -291,6 +295,9 @@ func New(ctx context.Context, address string, clientLogger logging.ZapCompatible
291295
rpc.WithStreamClientInterceptor(streamClientInterceptor()),
292296
)
293297

298+
// If we're a client running as part of a module, we annotate our requests with our module
299+
// name. That way the receiver (e.g: viam-server) can execute logic based on where a request
300+
// came from. Such as knowing what WebRTC connection to add a video track to.
294301
if rOpts.modName != "" {
295302
inter := &grpc.ModInterceptors{ModName: rOpts.modName}
296303
rc.dialOptions = append(rc.dialOptions, rpc.WithUnaryClientInterceptor(inter.UnaryClientInterceptor))
@@ -680,10 +687,10 @@ func (rc *RobotClient) ResourceByName(name resource.Name) (resource.Resource, er
680687
func (rc *RobotClient) createClient(name resource.Name) (resource.Resource, error) {
681688
apiInfo, ok := resource.LookupGenericAPIRegistration(name.API)
682689
if !ok || apiInfo.RPCClient == nil {
683-
return grpc.NewForeignResource(name, &rc.conn), nil
690+
return grpc.NewForeignResource(name, rc.getClientConn()), nil
684691
}
685692
logger := rc.Logger().Sublogger(resource.RemoveRemoteName(name).ShortName())
686-
return apiInfo.RPCClient(rc.backgroundCtx, &rc.conn, rc.remoteName, name, logger)
693+
return apiInfo.RPCClient(rc.backgroundCtx, rc.getClientConn(), rc.remoteName, name, logger)
687694
}
688695

689696
func (rc *RobotClient) resources(ctx context.Context) ([]resource.Name, []resource.RPCAPI, error) {
@@ -1299,6 +1306,27 @@ func (rc *RobotClient) Tunnel(ctx context.Context, conn io.ReadWriteCloser, dest
12991306
return errors.Join(err, readerSenderErr, recvWriterErr)
13001307
}
13011308

1309+
// SetPeerConnection is only to be called internally from modules.
1310+
func (rc *RobotClient) SetPeerConnection(pc *webrtc.PeerConnection) {
1311+
rc.mu.Lock()
1312+
rc.pc = pc
1313+
rc.mu.Unlock()
1314+
}
1315+
1316+
func (rc *RobotClient) getClientConn() rpc.ClientConn {
1317+
// Must be called with `rc.mu` in ReadLock+ mode.
1318+
if rc.sharedConn != nil {
1319+
return rc.sharedConn
1320+
}
1321+
1322+
if rc.pc == nil {
1323+
return &rc.conn
1324+
}
1325+
1326+
rc.sharedConn = grpc.NewSharedConnForModule(&rc.conn, rc.pc, rc.logger.Sublogger("shared_conn"))
1327+
return rc.sharedConn
1328+
}
1329+
13021330
func unaryClientInterceptor() googlegrpc.UnaryClientInterceptor {
13031331
return func(
13041332
ctx context.Context,

0 commit comments

Comments
 (0)