Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RSDK-9839: Have Go module clients be webrtc aware. #4751

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 60 additions & 0 deletions grpc/interceptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

// DefaultMethodTimeout is the default context timeout for all inbound gRPC
Expand Down Expand Up @@ -43,3 +44,62 @@ func EnsureTimeoutUnaryClientInterceptor(

return invoker(ctx, method, req, reply, cc, opts...)
}

// The following code is for appending/extracting grpc metadata regarding module names/origins via
// contexts.
type modNameKeyType int

const modNameKeyID = modNameKeyType(iota)

// GetModuleName returns the module name (if any) the request came from. The module name will match
// a string from the robot config.
func GetModuleName(ctx context.Context) string {
valI := ctx.Value(modNameKeyID)
if val, ok := valI.(string); ok {
return val
}

return ""
}

const modNameMetadataKey = "modName"

// ModInterceptors takes a user input `ModName` and exposes an interceptor method that will attach
// it to outgoing gRPC requests.
type ModInterceptors struct {
ModName string
}

// UnaryClientInterceptor adds a module name to any outgoing unary gRPC request.
func (mc *ModInterceptors) UnaryClientInterceptor(
ctx context.Context,
method string,
req, reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
ctx = metadata.AppendToOutgoingContext(ctx, modNameMetadataKey, mc.ModName)
return invoker(ctx, method, req, reply, cc, opts...)
}

// ModNameUnaryServerInterceptor checks the incoming RPC metadata for a module name and attaches any
// information to a context that can be retrieved with `GetModuleName`.
func ModNameUnaryServerInterceptor(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
meta, ok := metadata.FromIncomingContext(ctx)
if !ok {
return handler(ctx, req)
}

values := meta.Get(modNameMetadataKey)
if len(values) == 1 {
ctx = context.WithValue(ctx, modNameKeyID, values[0])
}

return handler(ctx, req)
}
53 changes: 46 additions & 7 deletions grpc/shared_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ import (
// a resource may register with a SharedConn which supports WebRTC.
type OnTrackCB func(tr *webrtc.TrackRemote, r *webrtc.RTPReceiver)

//nolint
// The following describes the SharedConn lifetime for viam-server's modmanager communicating with
// modules it has spawned.
//
// SharedConn wraps both a GRPC connection & (optionally) a peer connection & controls access to both.
// For modules, the grpc connection is over a Unix socket. The WebRTC `PeerConnection` is made
// separately. The `SharedConn` continues to implement the `rpc.ClientConn` interface by pairing up
Expand Down Expand Up @@ -71,10 +75,9 @@ type SharedConn struct {
// `peerConnMu` synchronizes changes to the underlying `peerConn`. Such that calls consecutive
// calls to `GrpcConn` and `PeerConn` will return connections from the same (or newer, but not
// prior) "generations".
peerConnMu sync.Mutex
peerConn *webrtc.PeerConnection
peerConnReady <-chan struct{}
peerConnClosed <-chan struct{}
peerConnMu sync.Mutex
peerConn *webrtc.PeerConnection
peerConnReady <-chan struct{}
// peerConnFailed gets closed when a PeerConnection fails to connect. The peerConn pointer is
// set to nil before this channel is closed.
peerConnFailed chan struct{}
Expand All @@ -85,6 +88,42 @@ type SharedConn struct {
logger logging.Logger
}

// NewSharedConnForModule acts as a constructor for `SharedConn` for modules that are communicating
// back to their parent viam-server.
func NewSharedConnForModule(grpcConn rpc.ClientConn, peerConn *webrtc.PeerConnection, logger logging.Logger) *SharedConn {
// We must be passed a ready connection.
pcReady := make(chan struct{})
close(pcReady)

ret := &SharedConn{
peerConn: peerConn,
peerConnReady: pcReady,
// We were passed in a ready connection. Only create this for when `Close` is called.
peerConnFailed: make(chan struct{}),
onTrackCBByTrackName: make(map[string]OnTrackCB),
logger: logger,
}
ret.grpcConn.ReplaceConn(grpcConn)

ret.peerConn.OnTrack(func(trackRemote *webrtc.TrackRemote, rtpReceiver *webrtc.RTPReceiver) {
sid := trackRemote.StreamID()
if sid == "rtsp-1" {
sid = "rdk:component:camera/rtsp-1"
}
ret.onTrackCBByTrackNameMu.Lock()
onTrackCB, ok := ret.onTrackCBByTrackName[sid]
ret.onTrackCBByTrackNameMu.Unlock()
if !ok {
msg := "Callback not found for StreamID: %s, keys(resOnTrackCBs): %#v"
ret.logger.Errorf(msg, trackRemote.StreamID(), maps.Keys(ret.onTrackCBByTrackName))
return
}
onTrackCB(trackRemote, rtpReceiver)
})

return ret
}

// Invoke forwards to the underlying GRPC Connection.
func (sc *SharedConn) Invoke(
ctx context.Context,
Expand Down Expand Up @@ -143,8 +182,8 @@ func (sc *SharedConn) PeerConn() *webrtc.PeerConnection {
return ret
}

// ResetConn acts as a constructor for `SharedConn`. ResetConn replaces the underlying
// connection objects in addition to some other initialization.
// ResetConn acts as a constructor for `SharedConn` inside the viam-server (not modules). ResetConn
// replaces the underlying connection objects in addition to some other initialization.
//
// The first call to `ResetConn` is guaranteed to happen before any access to connection objects
// happens. But subequent calls can be entirely asynchronous to components/services accessing
Expand Down Expand Up @@ -193,7 +232,7 @@ func (sc *SharedConn) ResetConn(conn rpc.ClientConn, moduleLogger logging.Logger
}

sc.peerConn = peerConn
sc.peerConnReady, sc.peerConnClosed, err = rpc.ConfigureForRenegotiation(peerConn, rpc.PeerRoleClient, sc.logger)
sc.peerConnReady, _, err = rpc.ConfigureForRenegotiation(peerConn, rpc.PeerRoleServer, sc.logger)
if err != nil {
sc.logger.Warnw("Unable to create optional renegotiation channel for module. Ignoring.", "err", err)
return
Expand Down
1 change: 1 addition & 0 deletions module/modmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1408,6 +1408,7 @@ func getFullEnvironment(
environment := map[string]string{
"VIAM_HOME": viamHomeDir,
"VIAM_MODULE_DATA": dataDir,
"VIAM_MODULE_NAME": cfg.Name,
}
if cfg.Type == config.ModuleTypeRegistry {
environment["VIAM_MODULE_ID"] = cfg.ModuleID
Expand Down
24 changes: 23 additions & 1 deletion module/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,9 @@ type peerResourceState struct {

// Module represents an external resource module that services components/services.
type Module struct {
// The name of the module as per the robot config.
name string

shutdownCtx context.Context
shutdownFn context.CancelFunc
parent *client.RobotClient
Expand Down Expand Up @@ -219,7 +222,12 @@ func NewModule(ctx context.Context, address string, logger logging.Logger) (*Mod
}

cancelCtx, cancel := context.WithCancel(context.Background())

// If the env variable does not exist, the empty string is returned.
modName, _ := os.LookupEnv("VIAM_MODULE_NAME")

m := &Module{
name: modName,
shutdownCtx: cancelCtx,
shutdownFn: cancel,
logger: logger,
Expand Down Expand Up @@ -369,12 +377,26 @@ func (m *Module) connectParent(ctx context.Context) error {
clientLogger := logging.NewLogger("networking.module-connection")
clientLogger.SetLevel(m.logger.GetLevel())
// TODO(PRODUCT-343): add session support to modules
rc, err := client.New(ctx, fullAddr, clientLogger, client.WithDisableSessions())

connectOptions := []client.RobotClientOption{
client.WithDisableSessions(),
}

// Modules compiled against newer SDKs may be running against older `viam-server`s that do not
// provide the module name as an env variable.
if m.name != "" {
connectOptions = append(connectOptions, client.WithModName(m.name))
}

rc, err := client.New(ctx, fullAddr, m.logger, connectOptions...)
if err != nil {
return err
}

m.parent = rc
if m.pc != nil {
m.parent.SetPeerConnection(m.pc)
}
return nil
}

Expand Down
21 changes: 19 additions & 2 deletions module/testmodule/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"go.viam.com/rdk/components/generic"
"go.viam.com/rdk/components/motor"
"go.viam.com/rdk/components/sensor"
"go.viam.com/rdk/logging"
"go.viam.com/rdk/module"
"go.viam.com/rdk/resource"
Expand Down Expand Up @@ -106,9 +107,21 @@ func mainWithArgs(ctx context.Context, args []string, logger logging.Logger) err
func newHelper(
ctx context.Context, deps resource.Dependencies, conf resource.Config, logger logging.Logger,
) (resource.Resource, error) {
var dependsOnSensor sensor.Sensor
for _, resObj := range deps {
if resSensor, ok := resObj.(sensor.Sensor); ok {
dependsOnSensor = resSensor
}
}

if len(deps) > 0 && dependsOnSensor == nil {
panic("bad")
}

return &helper{
Named: conf.ResourceName().AsNamed(),
logger: logger,
Named: conf.ResourceName().AsNamed(),
logger: logger,
dependsOnSensor: dependsOnSensor,
}, nil
}

Expand All @@ -117,6 +130,7 @@ type helper struct {
resource.TriviallyCloseable
logger logging.Logger
numReconfigurations int
dependsOnSensor sensor.Sensor
}

// DoCommand looks up the "real" command from the map it's passed.
Expand Down Expand Up @@ -191,6 +205,9 @@ func (h *helper) DoCommand(ctx context.Context, req map[string]interface{}) (map
return map[string]any{}, nil
case "get_num_reconfigurations":
return map[string]any{"num_reconfigurations": h.numReconfigurations}, nil
case "do_readings_on_dep":
_, err := h.dependsOnSensor.Readings(ctx, nil)
return nil, err
default:
return nil, fmt.Errorf("unknown command string %s", cmd)
}
Expand Down
38 changes: 36 additions & 2 deletions robot/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
"github.com/jhump/protoreflect/desc"
"github.com/jhump/protoreflect/grpcreflect"
"github.com/viamrobotics/webrtc/v3"
"go.uber.org/multierr"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
Expand Down Expand Up @@ -111,6 +112,12 @@ type RobotClient struct {
// webrtc. We don't want a network disconnect to result in reconnecting over tcp such that
// performance would be impacted.
serverIsWebrtcEnabled bool

// If this client is running in a module process and this client represents the gRPC connection
// back to a viam-server. If this is true, `pc` and `sharedConn` are expected to be set.
isModuleConnection bool
pc *webrtc.PeerConnection
sharedConn *grpc.SharedConn
}

// RemoteTypeName is the type name used for a remote. This is for internal use.
Expand Down Expand Up @@ -267,6 +274,7 @@ func New(ctx context.Context, address string, clientLogger logging.ZapCompatible
sessionsDisabled: rOpts.disableSessions,
heartbeatCtx: heartbeatCtx,
heartbeatCtxCancel: heartbeatCtxCancel,
isModuleConnection: rOpts.modName != "",
}

// interceptors are applied in order from first to last
Expand All @@ -290,6 +298,11 @@ func New(ctx context.Context, address string, clientLogger logging.ZapCompatible
rpc.WithStreamClientInterceptor(streamClientInterceptor()),
)

if rOpts.modName != "" {
inter := &grpc.ModInterceptors{ModName: rOpts.modName}
rc.dialOptions = append(rc.dialOptions, rpc.WithUnaryClientInterceptor(inter.UnaryClientInterceptor))
}

if err := rc.Connect(ctx); err != nil {
return nil, err
}
Expand Down Expand Up @@ -674,10 +687,10 @@ func (rc *RobotClient) ResourceByName(name resource.Name) (resource.Resource, er
func (rc *RobotClient) createClient(name resource.Name) (resource.Resource, error) {
apiInfo, ok := resource.LookupGenericAPIRegistration(name.API)
if !ok || apiInfo.RPCClient == nil {
return grpc.NewForeignResource(name, &rc.conn), nil
return grpc.NewForeignResource(name, rc.getClientConn()), nil
}
logger := rc.Logger().Sublogger(resource.RemoveRemoteName(name).ShortName())
return apiInfo.RPCClient(rc.backgroundCtx, &rc.conn, rc.remoteName, name, logger)
return apiInfo.RPCClient(rc.backgroundCtx, rc.getClientConn(), rc.remoteName, name, logger)
}

func (rc *RobotClient) resources(ctx context.Context) ([]resource.Name, []resource.RPCAPI, error) {
Expand Down Expand Up @@ -1185,6 +1198,27 @@ func (rc *RobotClient) Version(ctx context.Context) (robot.VersionResponse, erro
return mVersion, nil
}

// SetPeerConnection is only to be called internally from modules.
func (rc *RobotClient) SetPeerConnection(pc *webrtc.PeerConnection) {
rc.mu.Lock()
rc.pc = pc
rc.mu.Unlock()
}

func (rc *RobotClient) getClientConn() rpc.ClientConn {
// Must be called with `rc.mu` in ReadLock+ mode.
if rc.sharedConn != nil {
return rc.sharedConn
}

if rc.pc == nil {
return &rc.conn
}

rc.sharedConn = grpc.NewSharedConnForModule(&rc.conn, rc.pc, rc.logger.Sublogger("shared_conn"))
return rc.sharedConn
}

func unaryClientInterceptor() googlegrpc.UnaryClientInterceptor {
return func(
ctx context.Context,
Expand Down
10 changes: 10 additions & 0 deletions robot/client/client_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type robotClientOpts struct {

// controls whether or not sessions are disabled.
disableSessions bool

modName string
}

// RobotClientOption configures how we set up the connection.
Expand All @@ -56,6 +58,14 @@ func newFuncRobotClientOption(f func(*robotClientOpts)) *funcRobotClientOption {
}
}

// WithModName attaches a unary interceptor that attaches the module name for each outgoing gRPC
// request.
func WithModName(modName string) RobotClientOption {
return newFuncRobotClientOption(func(o *robotClientOpts) {
o.modName = modName
})
}

// WithRefreshEvery returns a RobotClientOption for how often to refresh the status/parts of the
// robot.
func WithRefreshEvery(refreshEvery time.Duration) RobotClientOption {
Expand Down
Loading
Loading