Skip to content

Commit

Permalink
RSDK-9818: Annotate gRPC requests from modules to the viam-server wit…
Browse files Browse the repository at this point in the history
…h module names. (#4749)
  • Loading branch information
dgottlieb authored Jan 30, 2025
1 parent 56719cc commit 7824694
Show file tree
Hide file tree
Showing 8 changed files with 200 additions and 3 deletions.
60 changes: 60 additions & 0 deletions grpc/interceptors.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

// DefaultMethodTimeout is the default context timeout for all inbound gRPC
Expand Down Expand Up @@ -43,3 +44,62 @@ func EnsureTimeoutUnaryClientInterceptor(

return invoker(ctx, method, req, reply, cc, opts...)
}

// The following code is for appending/extracting grpc metadata regarding module names/origins via
// contexts.
type modNameKeyType int

const modNameKeyID = modNameKeyType(iota)

// GetModuleName returns the module name (if any) the request came from. The module name will match
// a string from the robot config.
func GetModuleName(ctx context.Context) string {
valI := ctx.Value(modNameKeyID)
if val, ok := valI.(string); ok {
return val
}

return ""
}

const modNameMetadataKey = "modName"

// ModInterceptors takes a user input `ModName` and exposes an interceptor method that will attach
// it to outgoing gRPC requests.
type ModInterceptors struct {
ModName string
}

// UnaryClientInterceptor adds a module name to any outgoing unary gRPC request.
func (mc *ModInterceptors) UnaryClientInterceptor(
ctx context.Context,
method string,
req, reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
ctx = metadata.AppendToOutgoingContext(ctx, modNameMetadataKey, mc.ModName)
return invoker(ctx, method, req, reply, cc, opts...)
}

// ModNameUnaryServerInterceptor checks the incoming RPC metadata for a module name and attaches any
// information to a context that can be retrieved with `GetModuleName`.
func ModNameUnaryServerInterceptor(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
meta, ok := metadata.FromIncomingContext(ctx)
if !ok {
return handler(ctx, req)
}

values := meta.Get(modNameMetadataKey)
if len(values) == 1 {
ctx = context.WithValue(ctx, modNameKeyID, values[0])
}

return handler(ctx, req)
}
1 change: 1 addition & 0 deletions module/modmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1436,6 +1436,7 @@ func getFullEnvironment(
environment := map[string]string{
"VIAM_HOME": viamHomeDir,
"VIAM_MODULE_DATA": dataDir,
"VIAM_MODULE_NAME": cfg.Name,
}
if cfg.Type == config.ModuleTypeRegistry {
environment["VIAM_MODULE_ID"] = cfg.ModuleID
Expand Down
22 changes: 21 additions & 1 deletion module/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,10 @@ type peerResourceState struct {

// Module represents an external resource module that services components/services.
type Module struct {
// The name of the module as per the robot config. This value is communicated via the
// `VIAM_MODULE_NAME` env var.
name string

shutdownCtx context.Context
shutdownFn context.CancelFunc
parent *client.RobotClient
Expand Down Expand Up @@ -219,7 +223,12 @@ func NewModule(ctx context.Context, address string, logger logging.Logger) (*Mod
}

cancelCtx, cancel := context.WithCancel(context.Background())

// If the env variable does not exist, the empty string is returned.
modName, _ := os.LookupEnv("VIAM_MODULE_NAME")

m := &Module{
name: modName,
shutdownCtx: cancelCtx,
shutdownFn: cancel,
logger: logger,
Expand Down Expand Up @@ -369,7 +378,18 @@ func (m *Module) connectParent(ctx context.Context) error {
clientLogger := logging.NewLogger("networking.module-connection")
clientLogger.SetLevel(m.logger.GetLevel())
// TODO(PRODUCT-343): add session support to modules
rc, err := client.New(ctx, fullAddr, clientLogger, client.WithDisableSessions())

connectOptions := []client.RobotClientOption{
client.WithDisableSessions(),
}

// Modules compiled against newer SDKs may be running against older `viam-server`s that do not
// provide the module name as an env variable.
if m.name != "" {
connectOptions = append(connectOptions, client.WithModName(m.name))
}

rc, err := client.New(ctx, fullAddr, m.logger, connectOptions...)
if err != nil {
return err
}
Expand Down
23 changes: 21 additions & 2 deletions module/testmodule/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (

"go.viam.com/rdk/components/generic"
"go.viam.com/rdk/components/motor"
"go.viam.com/rdk/components/sensor"
"go.viam.com/rdk/logging"
"go.viam.com/rdk/module"
"go.viam.com/rdk/resource"
Expand Down Expand Up @@ -106,9 +107,23 @@ func mainWithArgs(ctx context.Context, args []string, logger logging.Logger) err
func newHelper(
ctx context.Context, deps resource.Dependencies, conf resource.Config, logger logging.Logger,
) (resource.Resource, error) {
var dependsOnSensor sensor.Sensor
var err error
if len(conf.DependsOn) > 0 {
dependsOnSensor, err = sensor.FromDependencies(deps, conf.DependsOn[0])
if err != nil {
return nil, err
}
}

if len(deps) > 0 && dependsOnSensor == nil {
return nil, fmt.Errorf("sensor not found in deps: %v", deps)
}

return &helper{
Named: conf.ResourceName().AsNamed(),
logger: logger,
Named: conf.ResourceName().AsNamed(),
logger: logger,
dependsOnSensor: dependsOnSensor,
}, nil
}

Expand All @@ -117,6 +132,7 @@ type helper struct {
resource.TriviallyCloseable
logger logging.Logger
numReconfigurations int
dependsOnSensor sensor.Sensor
}

// DoCommand looks up the "real" command from the map it's passed.
Expand Down Expand Up @@ -191,6 +207,9 @@ func (h *helper) DoCommand(ctx context.Context, req map[string]interface{}) (map
return map[string]any{}, nil
case "get_num_reconfigurations":
return map[string]any{"num_reconfigurations": h.numReconfigurations}, nil
case "do_readings_on_dep":
_, err := h.dependsOnSensor.Readings(ctx, nil)
return nil, err
default:
return nil, fmt.Errorf("unknown command string %s", cmd)
}
Expand Down
5 changes: 5 additions & 0 deletions robot/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,11 @@ func New(ctx context.Context, address string, clientLogger logging.ZapCompatible
rpc.WithStreamClientInterceptor(streamClientInterceptor()),
)

if rOpts.modName != "" {
inter := &grpc.ModInterceptors{ModName: rOpts.modName}
rc.dialOptions = append(rc.dialOptions, rpc.WithUnaryClientInterceptor(inter.UnaryClientInterceptor))
}

if err := rc.Connect(ctx); err != nil {
return nil, err
}
Expand Down
10 changes: 10 additions & 0 deletions robot/client/client_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ type robotClientOpts struct {

// controls whether or not sessions are disabled.
disableSessions bool

modName string
}

// RobotClientOption configures how we set up the connection.
Expand All @@ -56,6 +58,14 @@ func newFuncRobotClientOption(f func(*robotClientOpts)) *funcRobotClientOption {
}
}

// WithModName attaches a unary interceptor that attaches the module name for each outgoing gRPC
// request. Should only be used in Viam module library code.
func WithModName(modName string) RobotClientOption {
return newFuncRobotClientOption(func(o *robotClientOpts) {
o.modName = modName
})
}

// WithRefreshEvery returns a RobotClientOption for how often to refresh the status/parts of the
// robot.
func WithRefreshEvery(refreshEvery time.Duration) RobotClientOption {
Expand Down
78 changes: 78 additions & 0 deletions robot/impl/local_robot_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4534,3 +4534,81 @@ func TestRemovingOfflineRemotes(t *testing.T) {
cancelReconfig()
wg.Wait()
}

// TestModuleNamePassing asserts that module names are passed from viam-server -> module
// properly. Such that incoming requests from module -> viam-server identify themselves. And can be
// observed on contexts via `[r]grpc.GetModuleName(ctx)`.
func TestModuleNamePassing(t *testing.T) {
logger := logging.NewTestLogger(t)

ctx := context.Background()

// We will inject a `ReadingsFunc` handler. The request should come from the `testmodule` and
// the interceptors should pass along a module name. Which will get captured in the
// `moduleNameCh` that the end of the test will assert on.
//
// The channel must be buffered to such that the `ReadingsFunc` returns without waiting on a
// reader of the channel.
moduleNameCh := make(chan string, 1)
callbackSensor := &inject.Sensor{
ReadingsFunc: func(ctx context.Context, extra map[string]any) (map[string]any, error) {
moduleNameCh <- rgrpc.GetModuleName(ctx)
return map[string]any{
"reading": 42,
}, nil
},
CloseFunc: func(ctx context.Context) error {
return nil
},
}

// The resource registry is a global. We must use unique model names to avoid unexpected
// collisions.
callbackModelName := resource.DefaultModelFamily.WithModel(utils.RandomAlphaString(8))
resource.RegisterComponent(
sensor.API,
callbackModelName,
resource.Registration[sensor.Sensor, resource.NoNativeConfig]{Constructor: func(
ctx context.Context,
deps resource.Dependencies,
conf resource.Config,
logger logging.Logger,
) (sensor.Sensor, error) {
// Be lazy -- just return an a singleton object.
return callbackSensor, nil
}})

const moduleName = "fancy_module_name"
localRobot := setupLocalRobot(t, ctx, &config.Config{
Modules: []config.Module{
{
Name: moduleName,
ExePath: rtestutils.BuildTempModule(t, "module/testmodule"),
Type: config.ModuleTypeLocal,
},
},
Components: []resource.Config{
// We will invoke a special `DoCommand` on `modularComp`. It will expect its `DependsOn:
// "foo"` to be a sensor. And call the `Readings` API on that sensor.
{
Name: "modularComp",
API: generic.API,
Model: resource.NewModel("rdk", "test", "helper"),
DependsOn: []string{"foo"},
},
// `foo` will be a sensor that we've instrumented with the injected `ReadingsFunc`.
{
Name: "foo",
API: sensor.API,
Model: callbackModelName,
},
},
}, logger)

res, err := localRobot.ResourceByName(generic.Named("modularComp"))
test.That(t, err, test.ShouldBeNil)

_, err = res.DoCommand(ctx, map[string]interface{}{"command": "do_readings_on_dep"})
test.That(t, err, test.ShouldBeNil)
test.That(t, <-moduleNameCh, test.ShouldEqual, moduleName)
}
4 changes: 4 additions & 0 deletions robot/web/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,10 @@ func (svc *webService) StartModule(ctx context.Context) error {

unaryInterceptors = append(unaryInterceptors, grpc.EnsureTimeoutUnaryServerInterceptor)

// Attach the module name (as defined by the robot config) to the handler context. Can be
// accessed via `grpc.GetModuleName`.
unaryInterceptors = append(unaryInterceptors, grpc.ModNameUnaryServerInterceptor)

opManager := svc.r.OperationManager()
unaryInterceptors = append(unaryInterceptors,
opManager.UnaryServerInterceptor, logging.UnaryServerInterceptor)
Expand Down

0 comments on commit 7824694

Please sign in to comment.