Skip to content

Commit

Permalink
RSDK-2880: Add timestamp information to the replay camera (viamroboti…
Browse files Browse the repository at this point in the history
  • Loading branch information
dmhilly authored May 22, 2023
1 parent 0d64962 commit 2af1019
Show file tree
Hide file tree
Showing 9 changed files with 302 additions and 20 deletions.
65 changes: 65 additions & 0 deletions components/camera/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/edaniels/gostream"
"go.viam.com/test"
"go.viam.com/utils/rpc"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

"go.viam.com/rdk/components/camera"
viamgrpc "go.viam.com/rdk/grpc"
Expand All @@ -24,6 +26,7 @@ import (
"go.viam.com/rdk/testutils"
"go.viam.com/rdk/testutils/inject"
rutils "go.viam.com/rdk/utils"
"go.viam.com/rdk/utils/contextutils"
)

func TestClient(t *testing.T) {
Expand Down Expand Up @@ -390,3 +393,65 @@ func TestClientLazyImage(t *testing.T) {

test.That(t, conn.Close(), test.ShouldBeNil)
}

func TestClientWithInterceptor(t *testing.T) {
// Set up gRPC server
logger := golog.NewTestLogger(t)
listener1, err := net.Listen("tcp", "localhost:0")
test.That(t, err, test.ShouldBeNil)
rpcServer, err := rpc.NewServer(logger, rpc.WithUnauthenticated())
test.That(t, err, test.ShouldBeNil)

// Set up camera that adds timestamps into the gRPC response header.
injectCamera := &inject.Camera{}

pcA := pointcloud.New()
err = pcA.Set(pointcloud.NewVector(5, 5, 5), nil)
test.That(t, err, test.ShouldBeNil)

k, v := "hello", "world"
injectCamera.NextPointCloudFunc = func(ctx context.Context) (pointcloud.PointCloud, error) {
var grpcMetadata metadata.MD = make(map[string][]string)
grpcMetadata.Set(k, v)
grpc.SendHeader(ctx, grpcMetadata)
return pcA, nil
}

// Register CameraService API in our gRPC server.
resources := map[resource.Name]camera.Camera{
camera.Named(testCameraName): injectCamera,
}
cameraSvc, err := resource.NewAPIResourceCollection(camera.API, resources)
test.That(t, err, test.ShouldBeNil)
resourceAPI, ok, err := resource.LookupAPIRegistration[camera.Camera](camera.API)
test.That(t, err, test.ShouldBeNil)
test.That(t, ok, test.ShouldBeTrue)
test.That(t, resourceAPI.RegisterRPCService(context.Background(), rpcServer, cameraSvc), test.ShouldBeNil)

// Start serving requests.
go rpcServer.Serve(listener1)
defer rpcServer.Stop()

// Set up gRPC client with context with metadata interceptor.
conn, err := viamgrpc.Dial(
context.Background(),
listener1.Addr().String(),
logger,
rpc.WithUnaryClientInterceptor(contextutils.ContextWithMetadataUnaryClientInterceptor),
)
test.That(t, err, test.ShouldBeNil)
camera1Client, err := camera.NewClientFromConn(context.Background(), conn, "", camera.Named(testCameraName), logger)
test.That(t, err, test.ShouldBeNil)

// Construct a ContextWithMetadata to pass into NextPointCloud and check that the
// interceptor correctly injected the metadata from the gRPC response header into the
// context.
ctx, md := contextutils.ContextWithMetadata(context.Background())
pcB, err := camera1Client.NextPointCloud(ctx)
test.That(t, err, test.ShouldBeNil)
_, got := pcB.At(5, 5, 5)
test.That(t, got, test.ShouldBeTrue)
test.That(t, md[k][0], test.ShouldEqual, v)

test.That(t, conn.Close(), test.ShouldBeNil)
}
25 changes: 24 additions & 1 deletion components/camera/replaypcd/replaypcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,16 @@ import (
datapb "go.viam.com/api/app/data/v1"
goutils "go.viam.com/utils"
"go.viam.com/utils/rpc"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/protobuf/types/known/timestamppb"

"go.viam.com/rdk/components/camera"
"go.viam.com/rdk/internal/cloud"
"go.viam.com/rdk/pointcloud"
"go.viam.com/rdk/resource"
"go.viam.com/rdk/rimage/transform"
"go.viam.com/rdk/utils/contextutils"
)

// Model is the model of a replay camera.
Expand Down Expand Up @@ -146,8 +149,28 @@ func (replay *pcdCamera) NextPointCloud(ctx context.Context) (pointcloud.PointCl
return nil, errEndOfDataset
}

// If the caller is communicating with the replay camera over gRPC, set the timestamps on
// the gRPC header.
md := resp.GetData()[0].GetMetadata()
if stream := grpc.ServerTransportStreamFromContext(ctx); stream != nil {
var grpcMetadata metadata.MD = make(map[string][]string)

timeReq := md.GetTimeRequested()
if timeReq != nil {
grpcMetadata.Set(contextutils.TimeRequestedMetadataKey, timeReq.AsTime().Format(time.RFC3339Nano))
}
timeRec := md.GetTimeReceived()
if timeRec != nil {
grpcMetadata.Set(contextutils.TimeReceivedMetadataKey, timeRec.AsTime().Format(time.RFC3339Nano))
}

if err := grpc.SetHeader(ctx, grpcMetadata); err != nil {
return nil, err
}
}

replay.lastData = resp.GetLast()
data := resp.Data[0].GetBinary()
data := resp.GetData()[0].GetBinary()

r, err := gzip.NewReader(bytes.NewBuffer(data))
if err != nil {
Expand Down
43 changes: 43 additions & 0 deletions components/camera/replaypcd/replaypcd_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@ import (
"go.viam.com/test"
"go.viam.com/utils"
"go.viam.com/utils/artifact"
"google.golang.org/grpc"

"go.viam.com/rdk/internal/cloud"
"go.viam.com/rdk/pointcloud"
"go.viam.com/rdk/testutils"
"go.viam.com/rdk/utils/contextutils"
)

const datasetDirectory = "slam/mock_lidar/%d.pcd"
Expand Down Expand Up @@ -424,3 +427,43 @@ func TestUnimplementedFunctions(t *testing.T) {

test.That(t, serverClose(), test.ShouldBeNil)
}

// TestNextPointCloudTimestamps tests that calls to NextPointCloud on the replay camera will inject
// the time received and time requested metadata into the gRPC response header.
func TestNextPointCloudTimestamps(t *testing.T) {
// Construct replay camera.
ctx := context.Background()
cfg := &Config{Source: "source"}
replayCamera, serverClose, err := createNewReplayPCDCamera(ctx, t, cfg, true)
test.That(t, err, test.ShouldBeNil)
test.That(t, replayCamera, test.ShouldNotBeNil)

// Repeatedly call NextPointCloud, checking for timestamps in the gRPC header.
for i := 0; i < numPCDFiles; i++ {
serverStream := testutils.NewServerTransportStream()
ctx = grpc.NewContextWithServerTransportStream(ctx, serverStream)
pc, err := replayCamera.NextPointCloud(ctx)
test.That(t, err, test.ShouldBeNil)
test.That(t, pc, test.ShouldResemble, getPointCloudFromArtifact(t, i))

expectedTimeReq := fmt.Sprintf(testTime, i)
expectedTimeRec := fmt.Sprintf(testTime, i+1)

actualTimeReq := serverStream.Value(contextutils.TimeRequestedMetadataKey)[0]
actualTimeRec := serverStream.Value(contextutils.TimeReceivedMetadataKey)[0]

test.That(t, expectedTimeReq, test.ShouldEqual, actualTimeReq)
test.That(t, expectedTimeRec, test.ShouldEqual, actualTimeRec)
}

// Confirm the end of the dataset was reached when expected
pc, err := replayCamera.NextPointCloud(ctx)
test.That(t, err, test.ShouldNotBeNil)
test.That(t, err.Error(), test.ShouldContainSubstring, errEndOfDataset.Error())
test.That(t, pc, test.ShouldBeNil)

err = replayCamera.Close(ctx)
test.That(t, err, test.ShouldBeNil)

test.That(t, serverClose(), test.ShouldBeNil)
}
16 changes: 14 additions & 2 deletions components/camera/replaypcd/replaypcd_utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ import (
"path/filepath"
"strconv"
"testing"
"time"

"github.com/edaniels/golog"
"github.com/pkg/errors"
datapb "go.viam.com/api/app/data/v1"
"go.viam.com/test"
"go.viam.com/utils/artifact"
"go.viam.com/utils/rpc"
"google.golang.org/protobuf/types/known/timestamppb"

"go.viam.com/rdk/components/camera"
viamgrpc "go.viam.com/rdk/grpc"
Expand All @@ -28,6 +30,8 @@ import (
"go.viam.com/rdk/testutils/inject"
)

const testTime = "2000-01-01T12:00:%02dZ"

// mockDataServiceServer is a struct that includes unimplemented versions of all the Data Service endpoints. These
// can be overwritten to allow developers to trigger desired behaviors during testing.
type mockDataServiceServer struct {
Expand Down Expand Up @@ -60,9 +64,17 @@ func (mDServer *mockDataServiceServer) BinaryDataByFilter(ctx context.Context, r
gz.Close()

// Construct response
timeReq, err := time.Parse(time.RFC3339, fmt.Sprintf(testTime, newFileNum))
if err != nil {
return nil, errors.Wrap(err, "failed parsing time")
}
timeRec := timeReq.Add(time.Second)
binaryData := &datapb.BinaryData{
Binary: dataBuf.Bytes(),
Metadata: &datapb.BinaryMetadata{},
Binary: dataBuf.Bytes(),
Metadata: &datapb.BinaryMetadata{
TimeRequested: timestamppb.New(timeReq),
TimeReceived: timestamppb.New(timeRec),
},
}

resp := &datapb.BinaryDataByFilterResponse{
Expand Down
2 changes: 2 additions & 0 deletions robot/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
"go.viam.com/rdk/robot/packages"
"go.viam.com/rdk/session"
"go.viam.com/rdk/spatialmath"
"go.viam.com/rdk/utils/contextutils"
)

var (
Expand Down Expand Up @@ -278,6 +279,7 @@ func New(ctx context.Context, address string, logger golog.Logger, opts ...Robot
// interceptors are applied in order from first to last
rc.dialOptions = append(
rc.dialOptions,
rpc.WithUnaryClientInterceptor(contextutils.ContextWithMetadataUnaryClientInterceptor),
// error handling
rpc.WithUnaryClientInterceptor(rc.handleUnaryDisconnect),
rpc.WithStreamClientInterceptor(rc.handleStreamDisconnect),
Expand Down
21 changes: 4 additions & 17 deletions session/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import (
"github.com/google/uuid"
"go.viam.com/test"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"

"go.viam.com/rdk/resource"
"go.viam.com/rdk/session"
"go.viam.com/rdk/testutils"
)

func TestToFromContext(t *testing.T) {
Expand Down Expand Up @@ -64,7 +64,7 @@ func TestSafetyMonitor(t *testing.T) {
}

func TestSafetyMonitorForMetadata(t *testing.T) {
stream1 := &myStream{}
stream1 := testutils.NewServerTransportStream()
streamCtx := grpc.NewContextWithServerTransportStream(context.Background(), stream1)

sess1 := session.New(context.Background(), "ownerID", time.Minute, nil)
Expand All @@ -73,26 +73,13 @@ func TestSafetyMonitorForMetadata(t *testing.T) {
name1 := resource.NewName(resource.APINamespace("foo").WithType("bar").WithSubtype("baz"), "barf")
name2 := resource.NewName(resource.APINamespace("woo").WithType("war").WithSubtype("waz"), "warf")
session.SafetyMonitor(nextCtx, myThing{Named: name1.AsNamed()})
test.That(t, stream1.md[session.SafetyMonitoredResourceMetadataKey], test.ShouldResemble, []string{name1.String()})
test.That(t, stream1.Value(session.SafetyMonitoredResourceMetadataKey), test.ShouldResemble, []string{name1.String()})
session.SafetyMonitor(nextCtx, myThing{Named: name2.AsNamed()})
test.That(t, stream1.md[session.SafetyMonitoredResourceMetadataKey], test.ShouldResemble, []string{name2.String()})
test.That(t, stream1.Value(session.SafetyMonitoredResourceMetadataKey), test.ShouldResemble, []string{name2.String()})
}

type myThing struct {
resource.Named
resource.AlwaysRebuild
resource.TriviallyCloseable
}

type myStream struct {
mu sync.Mutex
grpc.ServerTransportStream
md metadata.MD
}

func (s *myStream) SetHeader(md metadata.MD) error {
s.mu.Lock()
defer s.mu.Unlock()
s.md = md.Copy()
return nil
}
34 changes: 34 additions & 0 deletions testutils/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ package testutils

import (
"context"
"sync"

"go.viam.com/utils/rpc"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
)

// TrackingDialer tracks dial attempts.
Expand Down Expand Up @@ -48,3 +50,35 @@ func (td *TrackingDialer) DialFunc(
}
return conn, cached, err
}

// ServerTransportStream implements grpc.ServerTransportStream and can be used to test setting
// metadata in the gRPC response header.
type ServerTransportStream struct {
mu sync.Mutex
grpc.ServerTransportStream
md metadata.MD
}

// NewServerTransportStream creates a new ServerTransportStream.
func NewServerTransportStream() *ServerTransportStream {
return &ServerTransportStream{
md: metadata.New(make(map[string]string)),
}
}

// SetHeader implements grpc.ServerTransportStream.
func (s *ServerTransportStream) SetHeader(md metadata.MD) error {
s.mu.Lock()
defer s.mu.Unlock()
for k, v := range md {
s.md[k] = v
}
return nil
}

// Value returns the value in the metadata map corresponding to a given key.
func (s *ServerTransportStream) Value(key string) []string {
s.mu.Lock()
defer s.mu.Unlock()
return s.md[key]
}
Loading

0 comments on commit 2af1019

Please sign in to comment.