Skip to content

Commit

Permalink
RSDK-6281 pass extra through the server (#3430)
Browse files Browse the repository at this point in the history
  • Loading branch information
bazile-clyde authored and 10zingpd committed Jan 18, 2024
1 parent 2fc551b commit 5828e97
Show file tree
Hide file tree
Showing 7 changed files with 227 additions and 6 deletions.
23 changes: 22 additions & 1 deletion components/camera/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ import (
"go.opencensus.io/trace"
pb "go.viam.com/api/component/camera/v1"
goutils "go.viam.com/utils"
goprotoutils "go.viam.com/utils/protoutils"
"go.viam.com/utils/rpc"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/structpb"

"go.viam.com/rdk/data"
"go.viam.com/rdk/gostream"
Expand Down Expand Up @@ -60,13 +63,31 @@ func NewClientFromConn(
}, nil
}

func getExtra(ctx context.Context) (*structpb.Struct, error) {
ext := &structpb.Struct{}
if extra, ok := FromContext(ctx); ok {
var err error
if ext, err = goprotoutils.StructToStructPb(extra); err != nil {
return nil, err
}
}

dataExt, err := data.GetExtraFromContext(ctx)
if err != nil {
return nil, err
}

proto.Merge(ext, dataExt)
return ext, nil
}

func (c *client) Read(ctx context.Context) (image.Image, func(), error) {
ctx, span := trace.StartSpan(ctx, "camera::client::Read")
defer span.End()
mimeType := gostream.MIMETypeHint(ctx, "")
expectedType, _ := utils.CheckLazyMIMEType(mimeType)

ext, err := data.GetExtraFromContext(ctx)
ext, err := getExtra(ctx)
if err != nil {
return nil, nil, err
}
Expand Down
68 changes: 68 additions & 0 deletions components/camera/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"google.golang.org/grpc/metadata"

"go.viam.com/rdk/components/camera"
"go.viam.com/rdk/data"
"go.viam.com/rdk/gostream"
viamgrpc "go.viam.com/rdk/grpc"
"go.viam.com/rdk/logging"
Expand Down Expand Up @@ -259,6 +260,73 @@ func TestClient(t *testing.T) {
test.That(t, err, test.ShouldNotBeNil)
test.That(t, err.Error(), test.ShouldContainSubstring, errPropertiesFailed.Error())

test.That(t, conn.Close(), test.ShouldBeNil)
})
t.Run("camera client extra", func(t *testing.T) {
conn, err := viamgrpc.Dial(context.Background(), listener1.Addr().String(), logger)
test.That(t, err, test.ShouldBeNil)

camClient, err := camera.NewClientFromConn(context.Background(), conn, "", camera.Named(testCameraName), logger)
test.That(t, err, test.ShouldBeNil)

injectCamera.StreamFunc = func(ctx context.Context, errHandlers ...gostream.ErrorHandler) (gostream.VideoStream, error) {
extra, ok := camera.FromContext(ctx)
test.That(t, ok, test.ShouldBeTrue)
test.That(t, extra, test.ShouldBeEmpty)
return nil, errStreamFailed
}

ctx := context.Background()
_, _, err = camera.ReadImage(ctx, camClient)
test.That(t, err, test.ShouldNotBeNil)
test.That(t, err.Error(), test.ShouldContainSubstring, errStreamFailed.Error())

injectCamera.StreamFunc = func(ctx context.Context, errHandlers ...gostream.ErrorHandler) (gostream.VideoStream, error) {
extra, ok := camera.FromContext(ctx)
test.That(t, ok, test.ShouldBeTrue)
test.That(t, len(extra), test.ShouldEqual, 1)
test.That(t, extra["hello"], test.ShouldEqual, "world")
return nil, errStreamFailed
}

// one kvp created with camera.Extra
ext := camera.Extra{"hello": "world"}
ctx = camera.NewContext(ctx, ext)
_, _, err = camera.ReadImage(ctx, camClient)
test.That(t, err, test.ShouldNotBeNil)
test.That(t, err.Error(), test.ShouldContainSubstring, errStreamFailed.Error())

injectCamera.StreamFunc = func(ctx context.Context, errHandlers ...gostream.ErrorHandler) (gostream.VideoStream, error) {
extra, ok := camera.FromContext(ctx)
test.That(t, ok, test.ShouldBeTrue)
test.That(t, len(extra), test.ShouldEqual, 1)
test.That(t, extra[data.FromDMString], test.ShouldBeTrue)

return nil, errStreamFailed
}

// one kvp created with data.FromDMContextKey
ctx = context.WithValue(context.Background(), data.FromDMContextKey{}, true)
_, _, err = camera.ReadImage(ctx, camClient)
test.That(t, err, test.ShouldNotBeNil)
test.That(t, err.Error(), test.ShouldContainSubstring, errStreamFailed.Error())

injectCamera.StreamFunc = func(ctx context.Context, errHandlers ...gostream.ErrorHandler) (gostream.VideoStream, error) {
extra, ok := camera.FromContext(ctx)
test.That(t, ok, test.ShouldBeTrue)
test.That(t, len(extra), test.ShouldEqual, 2)
test.That(t, extra["hello"], test.ShouldEqual, "world")
test.That(t, extra[data.FromDMString], test.ShouldBeTrue)
return nil, errStreamFailed
}

// merge values from data and camera
ext = camera.Extra{"hello": "world"}
ctx = camera.NewContext(ctx, ext)
_, _, err = camera.ReadImage(ctx, camClient)
test.That(t, err, test.ShouldNotBeNil)
test.That(t, err.Error(), test.ShouldContainSubstring, errStreamFailed.Error())

test.That(t, conn.Close(), test.ShouldBeNil)
})
}
Expand Down
22 changes: 22 additions & 0 deletions components/camera/extra.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package camera

import "context"

// Extra is the type of value stored in the Contexts.
type (
Extra map[string]interface{}
key int
)

var extraKey key

// NewContext returns a new Context that carries value Extra.
func NewContext(ctx context.Context, e Extra) context.Context {
return context.WithValue(ctx, extraKey, e)
}

// FromContext returns the Extra value stored in ctx, if any.
func FromContext(ctx context.Context) (Extra, bool) {
ext, ok := ctx.Value(extraKey).(Extra)
return ext, ok
}
27 changes: 27 additions & 0 deletions components/camera/extra_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package camera

import (
"context"
"testing"

"go.viam.com/test"
)

func TestExtraEmpty(t *testing.T) {
ctx := context.Background()
_, ok := FromContext(ctx)
test.That(t, ok, test.ShouldBeFalse)
}

func TestExtraRoundtrip(t *testing.T) {
ctx := context.Background()
expected := Extra{
"It goes one": "by one",
"even two": "by two",
}

ctx = NewContext(ctx, expected)
actual, ok := FromContext(ctx)
test.That(t, ok, test.ShouldBeTrue)
test.That(t, actual, test.ShouldEqual, expected)
}
7 changes: 2 additions & 5 deletions components/camera/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
pb "go.viam.com/api/component/camera/v1"
"google.golang.org/genproto/googleapis/api/httpbody"

"go.viam.com/rdk/data"
"go.viam.com/rdk/gostream"
"go.viam.com/rdk/logging"
"go.viam.com/rdk/pointcloud"
Expand Down Expand Up @@ -73,10 +72,8 @@ func (s *serviceServer) GetImage(

req.MimeType = utils.WithLazyMIMEType(req.MimeType)

// Add 'fromDataManagement' to context to avoid threading extra through gostream API.
if req.Extra.AsMap()[data.FromDMString] == true {
ctx = context.WithValue(ctx, data.FromDMContextKey{}, true)
}
ext := req.Extra.AsMap()
ctx = NewContext(ctx, ext)

img, release, err := ReadImage(gostream.WithMIMETypeHint(ctx, req.MimeType), cam)
if err != nil {
Expand Down
84 changes: 84 additions & 0 deletions components/camera/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,10 @@ import (

pb "go.viam.com/api/component/camera/v1"
"go.viam.com/test"
goprotoutils "go.viam.com/utils/protoutils"

"go.viam.com/rdk/components/camera"
"go.viam.com/rdk/data"
"go.viam.com/rdk/gostream"
"go.viam.com/rdk/pointcloud"
"go.viam.com/rdk/resource"
Expand Down Expand Up @@ -419,4 +421,86 @@ func TestServer(t *testing.T) {
test.That(t, resp.MimeTypes, test.ShouldContain, utils.MimeTypePNG)
test.That(t, resp.MimeTypes, test.ShouldContain, utils.MimeTypeH264)
})

t.Run("GetImage with extra", func(t *testing.T) {
injectCamera.StreamFunc = func(ctx context.Context, errHandlers ...gostream.ErrorHandler) (gostream.VideoStream, error) {
extra, ok := camera.FromContext(ctx)
test.That(t, ok, test.ShouldBeTrue)
test.That(t, extra, test.ShouldBeEmpty)
return nil, errStreamFailed
}

_, err := cameraServer.GetImage(context.Background(), &pb.GetImageRequest{
Name: testCameraName,
})

test.That(t, err, test.ShouldNotBeNil)
test.That(t, err.Error(), test.ShouldContainSubstring, errStreamFailed.Error())

injectCamera.StreamFunc = func(ctx context.Context, errHandlers ...gostream.ErrorHandler) (gostream.VideoStream, error) {
extra, ok := camera.FromContext(ctx)
test.That(t, ok, test.ShouldBeTrue)
test.That(t, len(extra), test.ShouldEqual, 1)
test.That(t, extra["hello"], test.ShouldEqual, "world")
return nil, errStreamFailed
}

ext, err := goprotoutils.StructToStructPb(camera.Extra{"hello": "world"})
test.That(t, err, test.ShouldBeNil)

_, err = cameraServer.GetImage(context.Background(), &pb.GetImageRequest{
Name: testCameraName,
Extra: ext,
})

test.That(t, err, test.ShouldNotBeNil)
test.That(t, err.Error(), test.ShouldContainSubstring, errStreamFailed.Error())

injectCamera.StreamFunc = func(ctx context.Context, errHandlers ...gostream.ErrorHandler) (gostream.VideoStream, error) {
extra, ok := camera.FromContext(ctx)
test.That(t, ok, test.ShouldBeTrue)
test.That(t, len(extra), test.ShouldEqual, 1)
test.That(t, extra[data.FromDMString], test.ShouldBeTrue)

return nil, errStreamFailed
}

// one kvp created with data.FromDMContextKey
ext, err = goprotoutils.StructToStructPb(map[string]interface{}{data.FromDMString: true})
test.That(t, err, test.ShouldBeNil)

_, err = cameraServer.GetImage(context.Background(), &pb.GetImageRequest{
Name: testCameraName,
Extra: ext,
})

test.That(t, err, test.ShouldNotBeNil)
test.That(t, err.Error(), test.ShouldContainSubstring, errStreamFailed.Error())

injectCamera.StreamFunc = func(ctx context.Context, errHandlers ...gostream.ErrorHandler) (gostream.VideoStream, error) {
extra, ok := camera.FromContext(ctx)
test.That(t, ok, test.ShouldBeTrue)
test.That(t, len(extra), test.ShouldEqual, 2)
test.That(t, extra["hello"], test.ShouldEqual, "world")
test.That(t, extra[data.FromDMString], test.ShouldBeTrue)
return nil, errStreamFailed
}

// use values from data and camera
ext, err = goprotoutils.StructToStructPb(
map[string]interface{}{
data.FromDMString: true,
"hello": "world",
},
)
test.That(t, err, test.ShouldBeNil)

_, err = cameraServer.GetImage(context.Background(), &pb.GetImageRequest{
Name: testCameraName,
Extra: ext,
})

test.That(t, err, test.ShouldNotBeNil)
test.That(t, err.Error(), test.ShouldContainSubstring, errStreamFailed.Error())
})
}
2 changes: 2 additions & 0 deletions data/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ var sleepCaptureCutoff = 2 * time.Millisecond
type CaptureFunc func(ctx context.Context, params map[string]*anypb.Any) (interface{}, error)

// FromDMContextKey is used to check whether the context is from data management.
// Deprecated: use a camera.Extra with camera.NewContext instead.
type FromDMContextKey struct{}

// FromDMString is used to access the 'fromDataManagement' value from a request's Extra struct.
Expand Down Expand Up @@ -327,6 +328,7 @@ func FailedToReadErr(component, method string, err error) error {
}

// GetExtraFromContext sets the extra struct with "fromDataManagement": true if the flag is true in the context.
// Deprecated: Use camera.FromContext instead.
func GetExtraFromContext(ctx context.Context) (*structpb.Struct, error) {
extra := make(map[string]interface{})
if ctx.Value(FromDMContextKey{}) == true {
Expand Down

0 comments on commit 5828e97

Please sign in to comment.