Skip to content

Commit

Permalink
[RSDK-8294] Use Global Connection to App in Internal Cloud Service an…
Browse files Browse the repository at this point in the history
…d Local Robot (#4782)
  • Loading branch information
bashar-515 authored Feb 18, 2025
1 parent 49cc054 commit 1c8c1a3
Show file tree
Hide file tree
Showing 23 changed files with 144 additions and 164 deletions.
2 changes: 1 addition & 1 deletion cli/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ func setupWithRunningPart(
Model: resource.DefaultServiceModel,
},
},
}, logging.NewInMemoryLogger(t))
}, nil, logging.NewInMemoryLogger(t))
test.That(t, err, test.ShouldBeNil)

options, _, addr := robottestutils.CreateBaseOptionsAndListener(t)
Expand Down
4 changes: 2 additions & 2 deletions components/camera/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,7 +627,7 @@ func setupRealRobot(
t.Helper()

ctx := context.Background()
robot, err := robotimpl.RobotFromConfig(ctx, robotConfig, logger)
robot, err := robotimpl.RobotFromConfig(ctx, robotConfig, nil, logger)
test.That(t, err, test.ShouldBeNil)

// We initialize with a stream config such that the stream server is capable of creating video stream and
Expand All @@ -652,7 +652,7 @@ func setupRealRobotWithOptions(
t.Helper()

ctx := context.Background()
robot, err := robotimpl.RobotFromConfig(ctx, robotConfig, logger)
robot, err := robotimpl.RobotFromConfig(ctx, robotConfig, nil, logger)
test.That(t, err, test.ShouldBeNil)

// We initialize with a stream config such that the stream server is capable of creating video stream and
Expand Down
2 changes: 1 addition & 1 deletion components/camera/transformpipeline/classifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func buildRobotWithClassifier(logger logging.Logger) (robot.Robot, error) {
defer os.Remove(newConfFile)

// make the robot from new config
r, err := robotimpl.RobotFromConfigPath(context.Background(), newConfFile, logger)
r, err := robotimpl.RobotFromConfigPath(context.Background(), newConfFile, nil, logger)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion components/camera/transformpipeline/detector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func buildRobotWithFakeCamera(logger logging.Logger) (robot.Robot, error) {
}
defer os.Remove(newConfFile)
// make the robot from new config
return robotimpl.RobotFromConfigPath(context.Background(), newConfFile, logger)
return robotimpl.RobotFromConfigPath(context.Background(), newConfFile, nil, logger)
}

func TestColorDetectionSource(t *testing.T) {
Expand Down
14 changes: 13 additions & 1 deletion examples/customresources/demos/remoteserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import (
"errors"

goutils "go.viam.com/utils"
"go.viam.com/utils/rpc"

"go.viam.com/rdk/config"
_ "go.viam.com/rdk/examples/customresources/models/mygizmo"
"go.viam.com/rdk/grpc"
"go.viam.com/rdk/logging"
robotimpl "go.viam.com/rdk/robot/impl"
"go.viam.com/rdk/robot/web"
Expand Down Expand Up @@ -40,7 +42,17 @@ func mainWithArgs(ctx context.Context, args []string, logger logging.Logger) (er
return err
}

myRobot, err := robotimpl.RobotFromConfig(ctx, cfg, logger)
var appConn rpc.ClientConn
if cfg.Cloud != nil && cfg.Cloud.AppAddress != "" {
appConn, err = grpc.NewAppConn(ctx, cfg.Cloud.AppAddress, cfg.Cloud.Secret, cfg.Cloud.ID, logger)
if err != nil {
return nil
}

defer goutils.UncheckedErrorFunc(appConn.Close)
}

myRobot, err := robotimpl.RobotFromConfig(ctx, cfg, appConn, logger)
if err != nil {
return err
}
Expand Down
4 changes: 2 additions & 2 deletions examples/customresources/demos/remoteserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func TestGizmo(t *testing.T) {

cfgServer, err := config.Read(ctx, utils.ResolveFile("./examples/customresources/demos/remoteserver/remote.json"), logger, nil)
test.That(t, err, test.ShouldBeNil)
remoteB, err := robotimpl.New(ctx, cfgServer, logger.Sublogger("remoteB"))
remoteB, err := robotimpl.New(ctx, cfgServer, nil, logger.Sublogger("remoteB"))
test.That(t, err, test.ShouldBeNil)
options := weboptions.New()
options.Network.BindAddress = remoteAddrB
Expand Down Expand Up @@ -116,7 +116,7 @@ func TestGizmo(t *testing.T) {
},
},
}
mainPart, err := robotimpl.New(ctx, mainPartConfig, logger.Sublogger("mainPart.client"))
mainPart, err := robotimpl.New(ctx, mainPartConfig, nil, logger.Sublogger("mainPart.client"))
defer func() {
test.That(t, mainPart.Close(context.Background()), test.ShouldBeNil)
}()
Expand Down
9 changes: 4 additions & 5 deletions grpc/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,8 @@ type ReconfigurableClientConn struct {
onTrackCBByTrackName map[string]OnTrackCB
}

// Return this constant such that backoff error logging can compare consecutive errors and reliably
// conclude they are the same.
var errNotConnected = errors.New("not connected")
// ErrNotConnected returns so that backoff error logging can compare consecutive errors and reliably conclude they are the same.
var ErrNotConnected = errors.New("not connected")

// Invoke invokes using the underlying client connection. In the case of c.conn being closed in the middle of
// an Invoke call, it is expected that c.conn can handle that and return a well-formed error.
Expand All @@ -39,7 +38,7 @@ func (c *ReconfigurableClientConn) Invoke(
conn := c.conn
c.connMu.RUnlock()
if conn == nil {
return errNotConnected
return ErrNotConnected
}
return conn.Invoke(ctx, method, args, reply, opts...)
}
Expand All @@ -56,7 +55,7 @@ func (c *ReconfigurableClientConn) NewStream(
conn := c.conn
c.connMu.RUnlock()
if conn == nil {
return nil, errNotConnected
return nil, ErrNotConnected
}
return conn.NewStream(ctx, desc, method, opts...)
}
Expand Down
31 changes: 12 additions & 19 deletions internal/cloud/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,26 +41,32 @@ type ConnectionService interface {

// NewCloudConnectionService makes a new cloud connection service to get gRPC connections
// to a cloud service managing robots.
func NewCloudConnectionService(cfg *config.Cloud, logger logging.Logger) ConnectionService {
func NewCloudConnectionService(cfg *config.Cloud, conn rpc.ClientConn, logger logging.Logger) ConnectionService {
if cfg == nil || cfg.AppAddress == "" {
return &cloudManagedService{
Named: InternalServiceName.AsNamed(),
}
}
return &cloudManagedService{

cm := &cloudManagedService{
Named: InternalServiceName.AsNamed(),
conn: conn,
managed: true,
dialer: rpc.NewCachedDialer(),
cloudCfg: *cfg,
logger: logger,
}

return cm
}

type cloudManagedService struct {
resource.Named
// we assume the config is immutable for the lifetime of the process
resource.TriviallyReconfigurable

conn rpc.ClientConn

managed bool
cloudCfg config.Cloud
logger logging.Logger
Expand All @@ -69,27 +75,14 @@ type cloudManagedService struct {
dialer rpc.Dialer
}

// AcquireConnection returns the connection provided to `NewCloudConnectionService` regardless of the state of the `cloudManagedService`.
// This means that if `Close` has been called on the `cloudManagedService`, `AcquireConnection` can still return an open connection.
func (cm *cloudManagedService) AcquireConnection(ctx context.Context) (string, rpc.ClientConn, error) {
cm.dialerMu.RLock()
defer cm.dialerMu.RUnlock()
if !cm.managed {
if cm.conn == nil {
return "", nil, ErrNotCloudManaged
}
if cm.dialer == nil {
return "", nil, errors.New("service closed")
}

ctx = rpc.ContextWithDialer(ctx, cm.dialer)
timeout := connectTimeout
// When environment indicates we are behind a proxy, bump timeout. Network
// operations tend to take longer when behind a proxy.
if os.Getenv(rpc.SocksProxyEnvVar) != "" {
timeout = connectTimeoutBehindProxy
}
timeOutCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
conn, err := config.CreateNewGRPCClient(timeOutCtx, &cm.cloudCfg, cm.logger)
return cm.cloudCfg.ID, conn, err
return cm.cloudCfg.ID, cm.conn, nil
}

func (cm *cloudManagedService) AcquireConnectionAPIKey(ctx context.Context,
Expand Down
Loading

0 comments on commit 1c8c1a3

Please sign in to comment.