Skip to content

Commit 2af1019

Browse files
authored
RSDK-2880: Add timestamp information to the replay camera (#2371)
1 parent 0d64962 commit 2af1019

File tree

9 files changed

+302
-20
lines changed

9 files changed

+302
-20
lines changed

components/camera/client_test.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ import (
1414
"github.com/edaniels/gostream"
1515
"go.viam.com/test"
1616
"go.viam.com/utils/rpc"
17+
"google.golang.org/grpc"
18+
"google.golang.org/grpc/metadata"
1719

1820
"go.viam.com/rdk/components/camera"
1921
viamgrpc "go.viam.com/rdk/grpc"
@@ -24,6 +26,7 @@ import (
2426
"go.viam.com/rdk/testutils"
2527
"go.viam.com/rdk/testutils/inject"
2628
rutils "go.viam.com/rdk/utils"
29+
"go.viam.com/rdk/utils/contextutils"
2730
)
2831

2932
func TestClient(t *testing.T) {
@@ -390,3 +393,65 @@ func TestClientLazyImage(t *testing.T) {
390393

391394
test.That(t, conn.Close(), test.ShouldBeNil)
392395
}
396+
397+
func TestClientWithInterceptor(t *testing.T) {
398+
// Set up gRPC server
399+
logger := golog.NewTestLogger(t)
400+
listener1, err := net.Listen("tcp", "localhost:0")
401+
test.That(t, err, test.ShouldBeNil)
402+
rpcServer, err := rpc.NewServer(logger, rpc.WithUnauthenticated())
403+
test.That(t, err, test.ShouldBeNil)
404+
405+
// Set up camera that adds timestamps into the gRPC response header.
406+
injectCamera := &inject.Camera{}
407+
408+
pcA := pointcloud.New()
409+
err = pcA.Set(pointcloud.NewVector(5, 5, 5), nil)
410+
test.That(t, err, test.ShouldBeNil)
411+
412+
k, v := "hello", "world"
413+
injectCamera.NextPointCloudFunc = func(ctx context.Context) (pointcloud.PointCloud, error) {
414+
var grpcMetadata metadata.MD = make(map[string][]string)
415+
grpcMetadata.Set(k, v)
416+
grpc.SendHeader(ctx, grpcMetadata)
417+
return pcA, nil
418+
}
419+
420+
// Register CameraService API in our gRPC server.
421+
resources := map[resource.Name]camera.Camera{
422+
camera.Named(testCameraName): injectCamera,
423+
}
424+
cameraSvc, err := resource.NewAPIResourceCollection(camera.API, resources)
425+
test.That(t, err, test.ShouldBeNil)
426+
resourceAPI, ok, err := resource.LookupAPIRegistration[camera.Camera](camera.API)
427+
test.That(t, err, test.ShouldBeNil)
428+
test.That(t, ok, test.ShouldBeTrue)
429+
test.That(t, resourceAPI.RegisterRPCService(context.Background(), rpcServer, cameraSvc), test.ShouldBeNil)
430+
431+
// Start serving requests.
432+
go rpcServer.Serve(listener1)
433+
defer rpcServer.Stop()
434+
435+
// Set up gRPC client with context with metadata interceptor.
436+
conn, err := viamgrpc.Dial(
437+
context.Background(),
438+
listener1.Addr().String(),
439+
logger,
440+
rpc.WithUnaryClientInterceptor(contextutils.ContextWithMetadataUnaryClientInterceptor),
441+
)
442+
test.That(t, err, test.ShouldBeNil)
443+
camera1Client, err := camera.NewClientFromConn(context.Background(), conn, "", camera.Named(testCameraName), logger)
444+
test.That(t, err, test.ShouldBeNil)
445+
446+
// Construct a ContextWithMetadata to pass into NextPointCloud and check that the
447+
// interceptor correctly injected the metadata from the gRPC response header into the
448+
// context.
449+
ctx, md := contextutils.ContextWithMetadata(context.Background())
450+
pcB, err := camera1Client.NextPointCloud(ctx)
451+
test.That(t, err, test.ShouldBeNil)
452+
_, got := pcB.At(5, 5, 5)
453+
test.That(t, got, test.ShouldBeTrue)
454+
test.That(t, md[k][0], test.ShouldEqual, v)
455+
456+
test.That(t, conn.Close(), test.ShouldBeNil)
457+
}

components/camera/replaypcd/replaypcd.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,16 @@ import (
1414
datapb "go.viam.com/api/app/data/v1"
1515
goutils "go.viam.com/utils"
1616
"go.viam.com/utils/rpc"
17+
"google.golang.org/grpc"
18+
"google.golang.org/grpc/metadata"
1719
"google.golang.org/protobuf/types/known/timestamppb"
1820

1921
"go.viam.com/rdk/components/camera"
2022
"go.viam.com/rdk/internal/cloud"
2123
"go.viam.com/rdk/pointcloud"
2224
"go.viam.com/rdk/resource"
2325
"go.viam.com/rdk/rimage/transform"
26+
"go.viam.com/rdk/utils/contextutils"
2427
)
2528

2629
// Model is the model of a replay camera.
@@ -146,8 +149,28 @@ func (replay *pcdCamera) NextPointCloud(ctx context.Context) (pointcloud.PointCl
146149
return nil, errEndOfDataset
147150
}
148151

152+
// If the caller is communicating with the replay camera over gRPC, set the timestamps on
153+
// the gRPC header.
154+
md := resp.GetData()[0].GetMetadata()
155+
if stream := grpc.ServerTransportStreamFromContext(ctx); stream != nil {
156+
var grpcMetadata metadata.MD = make(map[string][]string)
157+
158+
timeReq := md.GetTimeRequested()
159+
if timeReq != nil {
160+
grpcMetadata.Set(contextutils.TimeRequestedMetadataKey, timeReq.AsTime().Format(time.RFC3339Nano))
161+
}
162+
timeRec := md.GetTimeReceived()
163+
if timeRec != nil {
164+
grpcMetadata.Set(contextutils.TimeReceivedMetadataKey, timeRec.AsTime().Format(time.RFC3339Nano))
165+
}
166+
167+
if err := grpc.SetHeader(ctx, grpcMetadata); err != nil {
168+
return nil, err
169+
}
170+
}
171+
149172
replay.lastData = resp.GetLast()
150-
data := resp.Data[0].GetBinary()
173+
data := resp.GetData()[0].GetBinary()
151174

152175
r, err := gzip.NewReader(bytes.NewBuffer(data))
153176
if err != nil {

components/camera/replaypcd/replaypcd_test.go

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,12 @@ import (
1212
"go.viam.com/test"
1313
"go.viam.com/utils"
1414
"go.viam.com/utils/artifact"
15+
"google.golang.org/grpc"
1516

1617
"go.viam.com/rdk/internal/cloud"
1718
"go.viam.com/rdk/pointcloud"
19+
"go.viam.com/rdk/testutils"
20+
"go.viam.com/rdk/utils/contextutils"
1821
)
1922

2023
const datasetDirectory = "slam/mock_lidar/%d.pcd"
@@ -424,3 +427,43 @@ func TestUnimplementedFunctions(t *testing.T) {
424427

425428
test.That(t, serverClose(), test.ShouldBeNil)
426429
}
430+
431+
// TestNextPointCloudTimestamps tests that calls to NextPointCloud on the replay camera will inject
432+
// the time received and time requested metadata into the gRPC response header.
433+
func TestNextPointCloudTimestamps(t *testing.T) {
434+
// Construct replay camera.
435+
ctx := context.Background()
436+
cfg := &Config{Source: "source"}
437+
replayCamera, serverClose, err := createNewReplayPCDCamera(ctx, t, cfg, true)
438+
test.That(t, err, test.ShouldBeNil)
439+
test.That(t, replayCamera, test.ShouldNotBeNil)
440+
441+
// Repeatedly call NextPointCloud, checking for timestamps in the gRPC header.
442+
for i := 0; i < numPCDFiles; i++ {
443+
serverStream := testutils.NewServerTransportStream()
444+
ctx = grpc.NewContextWithServerTransportStream(ctx, serverStream)
445+
pc, err := replayCamera.NextPointCloud(ctx)
446+
test.That(t, err, test.ShouldBeNil)
447+
test.That(t, pc, test.ShouldResemble, getPointCloudFromArtifact(t, i))
448+
449+
expectedTimeReq := fmt.Sprintf(testTime, i)
450+
expectedTimeRec := fmt.Sprintf(testTime, i+1)
451+
452+
actualTimeReq := serverStream.Value(contextutils.TimeRequestedMetadataKey)[0]
453+
actualTimeRec := serverStream.Value(contextutils.TimeReceivedMetadataKey)[0]
454+
455+
test.That(t, expectedTimeReq, test.ShouldEqual, actualTimeReq)
456+
test.That(t, expectedTimeRec, test.ShouldEqual, actualTimeRec)
457+
}
458+
459+
// Confirm the end of the dataset was reached when expected
460+
pc, err := replayCamera.NextPointCloud(ctx)
461+
test.That(t, err, test.ShouldNotBeNil)
462+
test.That(t, err.Error(), test.ShouldContainSubstring, errEndOfDataset.Error())
463+
test.That(t, pc, test.ShouldBeNil)
464+
465+
err = replayCamera.Close(ctx)
466+
test.That(t, err, test.ShouldBeNil)
467+
468+
test.That(t, serverClose(), test.ShouldBeNil)
469+
}

components/camera/replaypcd/replaypcd_utils_test.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,15 @@ import (
1111
"path/filepath"
1212
"strconv"
1313
"testing"
14+
"time"
1415

1516
"github.com/edaniels/golog"
1617
"github.com/pkg/errors"
1718
datapb "go.viam.com/api/app/data/v1"
1819
"go.viam.com/test"
1920
"go.viam.com/utils/artifact"
2021
"go.viam.com/utils/rpc"
22+
"google.golang.org/protobuf/types/known/timestamppb"
2123

2224
"go.viam.com/rdk/components/camera"
2325
viamgrpc "go.viam.com/rdk/grpc"
@@ -28,6 +30,8 @@ import (
2830
"go.viam.com/rdk/testutils/inject"
2931
)
3032

33+
const testTime = "2000-01-01T12:00:%02dZ"
34+
3135
// mockDataServiceServer is a struct that includes unimplemented versions of all the Data Service endpoints. These
3236
// can be overwritten to allow developers to trigger desired behaviors during testing.
3337
type mockDataServiceServer struct {
@@ -60,9 +64,17 @@ func (mDServer *mockDataServiceServer) BinaryDataByFilter(ctx context.Context, r
6064
gz.Close()
6165

6266
// Construct response
67+
timeReq, err := time.Parse(time.RFC3339, fmt.Sprintf(testTime, newFileNum))
68+
if err != nil {
69+
return nil, errors.Wrap(err, "failed parsing time")
70+
}
71+
timeRec := timeReq.Add(time.Second)
6372
binaryData := &datapb.BinaryData{
64-
Binary: dataBuf.Bytes(),
65-
Metadata: &datapb.BinaryMetadata{},
73+
Binary: dataBuf.Bytes(),
74+
Metadata: &datapb.BinaryMetadata{
75+
TimeRequested: timestamppb.New(timeReq),
76+
TimeReceived: timestamppb.New(timeRec),
77+
},
6678
}
6779

6880
resp := &datapb.BinaryDataByFilterResponse{

robot/client/client.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import (
3838
"go.viam.com/rdk/robot/packages"
3939
"go.viam.com/rdk/session"
4040
"go.viam.com/rdk/spatialmath"
41+
"go.viam.com/rdk/utils/contextutils"
4142
)
4243

4344
var (
@@ -278,6 +279,7 @@ func New(ctx context.Context, address string, logger golog.Logger, opts ...Robot
278279
// interceptors are applied in order from first to last
279280
rc.dialOptions = append(
280281
rc.dialOptions,
282+
rpc.WithUnaryClientInterceptor(contextutils.ContextWithMetadataUnaryClientInterceptor),
281283
// error handling
282284
rpc.WithUnaryClientInterceptor(rc.handleUnaryDisconnect),
283285
rpc.WithStreamClientInterceptor(rc.handleStreamDisconnect),

session/context_test.go

Lines changed: 4 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,10 +10,10 @@ import (
1010
"github.com/google/uuid"
1111
"go.viam.com/test"
1212
"google.golang.org/grpc"
13-
"google.golang.org/grpc/metadata"
1413

1514
"go.viam.com/rdk/resource"
1615
"go.viam.com/rdk/session"
16+
"go.viam.com/rdk/testutils"
1717
)
1818

1919
func TestToFromContext(t *testing.T) {
@@ -64,7 +64,7 @@ func TestSafetyMonitor(t *testing.T) {
6464
}
6565

6666
func TestSafetyMonitorForMetadata(t *testing.T) {
67-
stream1 := &myStream{}
67+
stream1 := testutils.NewServerTransportStream()
6868
streamCtx := grpc.NewContextWithServerTransportStream(context.Background(), stream1)
6969

7070
sess1 := session.New(context.Background(), "ownerID", time.Minute, nil)
@@ -73,26 +73,13 @@ func TestSafetyMonitorForMetadata(t *testing.T) {
7373
name1 := resource.NewName(resource.APINamespace("foo").WithType("bar").WithSubtype("baz"), "barf")
7474
name2 := resource.NewName(resource.APINamespace("woo").WithType("war").WithSubtype("waz"), "warf")
7575
session.SafetyMonitor(nextCtx, myThing{Named: name1.AsNamed()})
76-
test.That(t, stream1.md[session.SafetyMonitoredResourceMetadataKey], test.ShouldResemble, []string{name1.String()})
76+
test.That(t, stream1.Value(session.SafetyMonitoredResourceMetadataKey), test.ShouldResemble, []string{name1.String()})
7777
session.SafetyMonitor(nextCtx, myThing{Named: name2.AsNamed()})
78-
test.That(t, stream1.md[session.SafetyMonitoredResourceMetadataKey], test.ShouldResemble, []string{name2.String()})
78+
test.That(t, stream1.Value(session.SafetyMonitoredResourceMetadataKey), test.ShouldResemble, []string{name2.String()})
7979
}
8080

8181
type myThing struct {
8282
resource.Named
8383
resource.AlwaysRebuild
8484
resource.TriviallyCloseable
8585
}
86-
87-
type myStream struct {
88-
mu sync.Mutex
89-
grpc.ServerTransportStream
90-
md metadata.MD
91-
}
92-
93-
func (s *myStream) SetHeader(md metadata.MD) error {
94-
s.mu.Lock()
95-
defer s.mu.Unlock()
96-
s.md = md.Copy()
97-
return nil
98-
}

testutils/rpc.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ package testutils
33

44
import (
55
"context"
6+
"sync"
67

78
"go.viam.com/utils/rpc"
89
"google.golang.org/grpc"
10+
"google.golang.org/grpc/metadata"
911
)
1012

1113
// TrackingDialer tracks dial attempts.
@@ -48,3 +50,35 @@ func (td *TrackingDialer) DialFunc(
4850
}
4951
return conn, cached, err
5052
}
53+
54+
// ServerTransportStream implements grpc.ServerTransportStream and can be used to test setting
55+
// metadata in the gRPC response header.
56+
type ServerTransportStream struct {
57+
mu sync.Mutex
58+
grpc.ServerTransportStream
59+
md metadata.MD
60+
}
61+
62+
// NewServerTransportStream creates a new ServerTransportStream.
63+
func NewServerTransportStream() *ServerTransportStream {
64+
return &ServerTransportStream{
65+
md: metadata.New(make(map[string]string)),
66+
}
67+
}
68+
69+
// SetHeader implements grpc.ServerTransportStream.
70+
func (s *ServerTransportStream) SetHeader(md metadata.MD) error {
71+
s.mu.Lock()
72+
defer s.mu.Unlock()
73+
for k, v := range md {
74+
s.md[k] = v
75+
}
76+
return nil
77+
}
78+
79+
// Value returns the value in the metadata map corresponding to a given key.
80+
func (s *ServerTransportStream) Value(key string) []string {
81+
s.mu.Lock()
82+
defer s.mu.Unlock()
83+
return s.md[key]
84+
}

0 commit comments

Comments
 (0)