forked from viamrobotics/rdk
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdata_client.go
1591 lines (1474 loc) · 46.4 KB
/
data_client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Package app contains a gRPC based data client.
package app
import (
"context"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"time"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
pb "go.viam.com/api/app/data/v1"
setPb "go.viam.com/api/app/dataset/v1"
syncPb "go.viam.com/api/app/datasync/v1"
"go.viam.com/utils/rpc"
"google.golang.org/protobuf/types/known/anypb"
"google.golang.org/protobuf/types/known/structpb"
"google.golang.org/protobuf/types/known/timestamppb"
"go.viam.com/rdk/protoutils"
)
// Order specifies the order in which data is returned.
type Order int32
// Order constants define the possible ordering options.
const (
Unspecified Order = iota
Descending
Ascending
)
// DataRequest encapsulates the filter for the data, a limit on the max results returned,
// a last string associated with the last returned document, and the sorting order by time.
//nolint:revive // stutter: Ignore the "stuttering" warning for this type name
type DataRequest struct {
Filter Filter
Limit int
Last string
SortOrder Order
}
// Filter defines the fields over which we can filter data using a logic AND.
type Filter struct {
ComponentName string
ComponentType string
Method string
RobotName string
RobotID string
PartName string
PartID string
LocationIDs []string
OrganizationIDs []string
MimeType []string
Interval CaptureInterval
TagsFilter TagsFilter
BboxLabels []string
DatasetID string
}
// TagsFilterType specifies how data can be filtered based on tags.
type TagsFilterType int32
// TagsFilterType constants define the ways data can be filtered based on tag matching criteria.
const (
TagsFilterTypeUnspecified TagsFilterType = iota
TagsFilterTypeMatchByOr
TagsFilterTypeTagged
TagsFilterTypeUntagged
)
// TagsFilter defines the type of filtering and, if applicable, over which tags to perform a logical OR.
type TagsFilter struct {
Type TagsFilterType
Tags []string
}
// CaptureMetadata contains information on the settings used for the data capture.
type CaptureMetadata struct {
OrganizationID string
LocationID string
RobotName string
RobotID string
PartName string
PartID string
ComponentType string
ComponentName string
MethodName string
MethodParameters map[string]interface{}
Tags []string
MimeType string
}
// CaptureInterval describes the start and end time of the capture in this file.
type CaptureInterval struct {
Start time.Time
End time.Time
}
// TabularData contains data and metadata associated with tabular data.
type TabularData struct {
Data map[string]interface{}
MetadataIndex int
Metadata *CaptureMetadata
TimeRequested time.Time
TimeReceived time.Time
}
// BinaryData contains data and metadata associated with binary data.
type BinaryData struct {
Binary []byte
Metadata *BinaryMetadata
}
// BinaryMetadata is the metadata associated with binary data.
type BinaryMetadata struct {
ID string
CaptureMetadata CaptureMetadata
TimeRequested time.Time
TimeReceived time.Time
FileName string
FileExt string
URI string
Annotations *Annotations
DatasetIDs []string
}
// BinaryID is the unique identifier for a file that one can request to be retrieved or modified.
type BinaryID struct {
FileID string
OrganizationID string
LocationID string
}
// BoundingBox represents a labeled bounding box on an image.
// x and y values are normalized ratios between 0 and 1.
type BoundingBox struct {
ID string
Label string
XMinNormalized float64
YMinNormalized float64
XMaxNormalized float64
YMaxNormalized float64
}
// Annotations are data annotations used for machine learning.
type Annotations struct {
Bboxes []*BoundingBox
}
// TabularDataByFilterResponse represents the result of a TabularDataByFilter query.
// It contains the retrieved tabular data and associated metadata,
// the total number of entries retrieved (Count), and the ID of the last returned page (Last).
type TabularDataByFilterResponse struct {
TabularData []*TabularData
Count int
Last string
}
// BinaryDataByFilterResponse represents the result of a BinaryDataByFilter query.
// It contains the retrieved binary data and associated metadata,
// the total number of entries retrieved (Count), and the ID of the last returned page (Last).
type BinaryDataByFilterResponse struct {
BinaryData []*BinaryData
Count int
Last string
}
// GetDatabaseConnectionResponse represents the response returned by GetDatabaseConnection.
// It contains the hostname endpoint, a URI for connecting to the MongoDB Atlas Data Federation instance,
// and a flag indicating whether a database user is configured for the Viam organization.
type GetDatabaseConnectionResponse struct {
Hostname string
MongodbURI string
HasDatabaseUser bool
}
// GetLatestTabularDataResponse represents the response returned by GetLatestTabularData. It contains the most recently captured data
// payload, the time it was captured, and the time it was synced.
type GetLatestTabularDataResponse struct {
TimeCaptured time.Time
TimeSynced time.Time
Payload map[string]interface{}
}
// ExportTabularDataResponse represents the result of an ExportTabularData API call.
type ExportTabularDataResponse struct {
OrganizationID string
LocationID string
RobotID string
RobotName string
PartID string
PartName string
ResourceName string
ResourceSubtype string
MethodName string
TimeCaptured time.Time
MethodParameters map[string]interface{}
Tags []string
Payload map[string]interface{}
}
// DataSyncClient structs
// SensorMetadata contains the time the sensor data was requested and was received.
type SensorMetadata struct {
TimeRequested time.Time
TimeReceived time.Time
MimeType MimeType
Annotations *Annotations
}
// SensorData contains the contents and metadata for tabular data.
type SensorData struct {
Metadata SensorMetadata
SDStruct map[string]interface{}
SDBinary []byte
}
// DataType specifies the type of data uploaded.
type DataType int32
// DataType constants define the possible DataType options.
const (
DataTypeUnspecified DataType = iota
DataTypeBinarySensor
DataTypeTabularSensor
DataTypeFile
)
// MimeType specifies the format of a file being uploaded.
type MimeType int32
// MimeType constants define the possible MimeType options.
const (
MimeTypeUnspecified MimeType = iota
MimeTypeJPEG
MimeTypePNG
MimeTypePCD
)
// UploadMetadata contains the metadata for binary (image + file) data.
type UploadMetadata struct {
PartID string
ComponentType string
ComponentName string
MethodName string
Type DataType
FileName string
MethodParameters map[string]interface{}
FileExtension string
Tags []string
}
// FileData contains the contents of binary (image + file) data.
type FileData struct {
Data []byte
}
// DataByFilterOptions contains optional parameters for TabularDataByFilter and BinaryDataByFilter.
type DataByFilterOptions struct {
// No Filter implies all data.
Filter *Filter
// Limit is the maximum number of entries to include in a page. Limit defaults to 50 if unspecified.
Limit int
// Last indicates the object identifier of the Last-returned data.
// This is returned by calls to TabularDataByFilter and BinaryDataByFilter as the `Last` value.
// If provided, the server will return the next data entries after the last object identifier.
Last string
SortOrder Order
CountOnly bool
IncludeInternalData bool
}
// TabularDataByMQLOptions contains optional parameters for TabularDataByMQL.
type TabularDataByMQLOptions struct {
UseRecentData bool
}
// BinaryDataCaptureUploadOptions represents optional parameters for the BinaryDataCaptureUpload method.
type BinaryDataCaptureUploadOptions struct {
Type *DataType
FileName *string
MethodParameters map[string]interface{}
Tags []string
DataRequestTimes *[2]time.Time
}
// TabularDataCaptureUploadOptions represents optional parameters for the TabularDataCaptureUpload method.
type TabularDataCaptureUploadOptions struct {
Type *DataType
FileName *string
MethodParameters map[string]interface{}
FileExtension *string
Tags []string
}
// StreamingDataCaptureUploadOptions represents optional parameters for the StreamingDataCaptureUpload method.
type StreamingDataCaptureUploadOptions struct {
ComponentType *string
ComponentName *string
MethodName *string
Type *DataType
FileName *string
MethodParameters map[string]interface{}
Tags []string
DataRequestTimes *[2]time.Time
}
// FileUploadOptions represents optional parameters for the FileUploadFromPath & FileUploadFromBytes methods.
type FileUploadOptions struct {
ComponentType *string
ComponentName *string
MethodName *string
FileName *string
MethodParameters map[string]interface{}
FileExtension *string
Tags []string
}
// UpdateBoundingBoxOptions contains optional parameters for UpdateBoundingBox.
type UpdateBoundingBoxOptions struct {
Label *string
// Normalized coordinates where all coordinates must be in the range [0, 1].
XMinNormalized *float64
YMinNormalized *float64
XMaxNormalized *float64
YMaxNormalized *float64
}
// Dataset contains the information of a dataset.
type Dataset struct {
ID string
Name string
OrganizationID string
TimeCreated *time.Time
}
// DataClient implements the DataServiceClient interface.
type DataClient struct {
dataClient pb.DataServiceClient
dataSyncClient syncPb.DataSyncServiceClient
datasetClient setPb.DatasetServiceClient
}
func newDataClient(conn rpc.ClientConn) *DataClient {
dataClient := pb.NewDataServiceClient(conn)
syncClient := syncPb.NewDataSyncServiceClient(conn)
setClient := setPb.NewDatasetServiceClient(conn)
return &DataClient{
dataClient: dataClient,
dataSyncClient: syncClient,
datasetClient: setClient,
}
}
// BsonToGo converts raw BSON data (as [][]byte) into native Go types and interfaces.
// Returns a slice of maps representing the data objects.
func BsonToGo(rawData [][]byte) ([]map[string]interface{}, error) {
dataObjects := []map[string]interface{}{}
for _, byteSlice := range rawData {
// Unmarshal each BSON byte slice into a Go map
obj := map[string]interface{}{}
if err := bson.Unmarshal(byteSlice, &obj); err != nil {
return nil, err
}
// Convert the unmarshalled map to native Go types
convertedObj := convertBsonToNative(obj).(map[string]interface{})
dataObjects = append(dataObjects, convertedObj)
}
return dataObjects, nil
}
// TabularDataByFilter queries tabular data and metadata based on given filters.
// Deprecated: This endpoint will be removed in a future version.
func (d *DataClient) TabularDataByFilter(ctx context.Context, opts *DataByFilterOptions) (*TabularDataByFilterResponse, error) {
dataReq := pb.DataRequest{}
var countOnly, includeInternalData bool
if opts != nil {
dataReq.Filter = filterToProto(opts.Filter)
if opts.Limit != 0 {
dataReq.Limit = uint64(opts.Limit)
}
if opts.Last != "" {
dataReq.Last = opts.Last
}
dataReq.SortOrder = orderToProto(opts.SortOrder)
countOnly = opts.CountOnly
includeInternalData = opts.IncludeInternalData
}
//nolint:deprecated,staticcheck
resp, err := d.dataClient.TabularDataByFilter(ctx, &pb.TabularDataByFilterRequest{
DataRequest: &dataReq,
CountOnly: countOnly,
IncludeInternalData: includeInternalData,
})
if err != nil {
return nil, err
}
// TabularData contains tabular data and associated metadata
dataArray := []*TabularData{}
var metadata *pb.CaptureMetadata
for _, tabData := range resp.Data {
if int(tabData.MetadataIndex) < len(resp.Metadata) {
metadata = resp.Metadata[tabData.MetadataIndex]
} else {
metadata = &pb.CaptureMetadata{}
}
data, err := tabularDataFromProto(tabData, metadata)
if err != nil {
return nil, err
}
dataArray = append(dataArray, data)
}
return &TabularDataByFilterResponse{
TabularData: dataArray,
Count: int(resp.Count),
Last: resp.Last,
}, nil
}
// TabularDataBySQL queries tabular data with a SQL query.
func (d *DataClient) TabularDataBySQL(ctx context.Context, organizationID, sqlQuery string) ([]map[string]interface{}, error) {
resp, err := d.dataClient.TabularDataBySQL(ctx, &pb.TabularDataBySQLRequest{
OrganizationId: organizationID,
SqlQuery: sqlQuery,
})
if err != nil {
return nil, err
}
dataObjects, err := BsonToGo(resp.RawData)
if err != nil {
return nil, err
}
return dataObjects, nil
}
// TabularDataByMQL queries tabular data with MQL (MongoDB Query Language) queries.
func (d *DataClient) TabularDataByMQL(
ctx context.Context, organizationID string, query []map[string]interface{}, opts *TabularDataByMQLOptions,
) ([]map[string]interface{}, error) {
mqlBinary := [][]byte{}
for _, q := range query {
binary, err := bson.Marshal(q)
if err != nil {
return nil, fmt.Errorf("failed to marshal BSON query: %w", err)
}
mqlBinary = append(mqlBinary, binary)
}
useRecentData := false
if opts != nil {
useRecentData = opts.UseRecentData
}
resp, err := d.dataClient.TabularDataByMQL(ctx, &pb.TabularDataByMQLRequest{
OrganizationId: organizationID,
MqlBinary: mqlBinary,
UseRecentData: &useRecentData,
})
if err != nil {
return nil, err
}
result, err := BsonToGo(resp.RawData)
if err != nil {
return nil, err
}
return result, nil
}
// GetLatestTabularData gets the most recent tabular data captured from the specified data source, as well as the time that it was captured
// and synced. If no data was synced to the data source within the last year, LatestTabularDataReturn will be empty.
func (d *DataClient) GetLatestTabularData(ctx context.Context, partID, resourceName, resourceSubtype, methodName string) (
*GetLatestTabularDataResponse, error,
) {
resp, err := d.dataClient.GetLatestTabularData(ctx, &pb.GetLatestTabularDataRequest{
PartId: partID,
ResourceName: resourceName,
ResourceSubtype: resourceSubtype,
MethodName: methodName,
})
if err != nil {
return nil, err
}
return &GetLatestTabularDataResponse{
TimeCaptured: resp.TimeCaptured.AsTime(),
TimeSynced: resp.TimeSynced.AsTime(),
Payload: resp.Payload.AsMap(),
}, nil
}
// ExportTabularData returns a stream of ExportTabularDataResponses.
func (d *DataClient) ExportTabularData(
ctx context.Context, partID, resourceName, resourceSubtype, method string, interval CaptureInterval,
) ([]*ExportTabularDataResponse, error) {
stream, err := d.dataClient.ExportTabularData(ctx, &pb.ExportTabularDataRequest{
PartId: partID,
ResourceName: resourceName,
ResourceSubtype: resourceSubtype,
MethodName: method,
Interval: captureIntervalToProto(interval),
})
if err != nil {
return nil, err
}
var responses []*ExportTabularDataResponse
for {
response, err := stream.Recv()
if errors.Is(err, io.EOF) {
break
}
if err != nil {
return nil, err
}
responses = append(responses, exportTabularDataResponseFromProto(response))
}
return responses, nil
}
// BinaryDataByFilter queries binary data and metadata based on given filters.
func (d *DataClient) BinaryDataByFilter(
ctx context.Context, includeBinary bool, opts *DataByFilterOptions,
) (*BinaryDataByFilterResponse, error) {
dataReq := pb.DataRequest{}
var countOnly, includeInternalData bool
if opts != nil {
dataReq.Filter = filterToProto(opts.Filter)
if opts.Limit != 0 {
dataReq.Limit = uint64(opts.Limit)
}
if opts.Last != "" {
dataReq.Last = opts.Last
}
dataReq.SortOrder = orderToProto(opts.SortOrder)
countOnly = opts.CountOnly
includeInternalData = opts.IncludeInternalData
}
resp, err := d.dataClient.BinaryDataByFilter(ctx, &pb.BinaryDataByFilterRequest{
DataRequest: &dataReq,
IncludeBinary: includeBinary,
CountOnly: countOnly,
IncludeInternalData: includeInternalData,
})
if err != nil {
return nil, err
}
data := make([]*BinaryData, len(resp.Data))
for i, protoData := range resp.Data {
binData, err := binaryDataFromProto(protoData)
if err != nil {
return nil, err
}
data[i] = binData
}
return &BinaryDataByFilterResponse{
BinaryData: data,
Count: int(resp.Count),
Last: resp.Last,
}, nil
}
// BinaryDataByIDs queries binary data and metadata based on given IDs.
func (d *DataClient) BinaryDataByIDs(ctx context.Context, binaryIDs []*BinaryID) ([]*BinaryData, error) {
resp, err := d.dataClient.BinaryDataByIDs(ctx, &pb.BinaryDataByIDsRequest{
IncludeBinary: true,
BinaryIds: binaryIDsToProto(binaryIDs),
})
if err != nil {
return nil, err
}
data := make([]*BinaryData, len(resp.Data))
for i, protoData := range resp.Data {
binData, err := binaryDataFromProto(protoData)
if err != nil {
return nil, err
}
data[i] = binData
}
return data, nil
}
// DeleteTabularData deletes tabular data older than a number of days, based on the given organization ID.
// It returns the number of tabular datapoints deleted.
func (d *DataClient) DeleteTabularData(ctx context.Context, organizationID string, deleteOlderThanDays int) (int, error) {
resp, err := d.dataClient.DeleteTabularData(ctx, &pb.DeleteTabularDataRequest{
OrganizationId: organizationID,
DeleteOlderThanDays: uint32(deleteOlderThanDays),
})
if err != nil {
return 0, err
}
return int(resp.DeletedCount), nil
}
// DeleteBinaryDataByFilter deletes binary data based on given filters. If filter is empty, delete all data.
// It returns the number of binary datapoints deleted.
func (d *DataClient) DeleteBinaryDataByFilter(ctx context.Context, filter *Filter) (int, error) {
resp, err := d.dataClient.DeleteBinaryDataByFilter(ctx, &pb.DeleteBinaryDataByFilterRequest{
Filter: filterToProto(filter),
IncludeInternalData: true,
})
if err != nil {
return 0, err
}
return int(resp.DeletedCount), nil
}
// DeleteBinaryDataByIDs deletes binary data based on given IDs.
// It returns the number of binary datapoints deleted.
func (d *DataClient) DeleteBinaryDataByIDs(ctx context.Context, binaryIDs []*BinaryID) (int, error) {
resp, err := d.dataClient.DeleteBinaryDataByIDs(ctx, &pb.DeleteBinaryDataByIDsRequest{
BinaryIds: binaryIDsToProto(binaryIDs),
})
if err != nil {
return 0, err
}
return int(resp.DeletedCount), nil
}
// AddTagsToBinaryDataByIDs adds string tags, unless the tags are already present, to binary data based on given IDs.
func (d *DataClient) AddTagsToBinaryDataByIDs(ctx context.Context, tags []string, binaryIDs []*BinaryID) error {
_, err := d.dataClient.AddTagsToBinaryDataByIDs(ctx, &pb.AddTagsToBinaryDataByIDsRequest{
BinaryIds: binaryIDsToProto(binaryIDs),
Tags: tags,
})
return err
}
// AddTagsToBinaryDataByFilter adds string tags, unless the tags are already present, to binary data based on the given filter.
// If no filter is given, all data will be tagged.
func (d *DataClient) AddTagsToBinaryDataByFilter(ctx context.Context, tags []string, filter *Filter) error {
_, err := d.dataClient.AddTagsToBinaryDataByFilter(ctx, &pb.AddTagsToBinaryDataByFilterRequest{
Filter: filterToProto(filter),
Tags: tags,
})
return err
}
// RemoveTagsFromBinaryDataByIDs removes string tags from binary data based on given IDs.
// It returns the number of binary files which had tags removed.
func (d *DataClient) RemoveTagsFromBinaryDataByIDs(ctx context.Context,
tags []string, binaryIDs []*BinaryID,
) (int, error) {
resp, err := d.dataClient.RemoveTagsFromBinaryDataByIDs(ctx, &pb.RemoveTagsFromBinaryDataByIDsRequest{
BinaryIds: binaryIDsToProto(binaryIDs),
Tags: tags,
})
if err != nil {
return 0, err
}
return int(resp.DeletedCount), nil
}
// RemoveTagsFromBinaryDataByFilter removes the specified string tags from binary data that match the given filter.
// If no filter is given, all data will be untagged.
// It returns the number of binary files from which tags were removed.
func (d *DataClient) RemoveTagsFromBinaryDataByFilter(ctx context.Context,
tags []string, filter *Filter,
) (int, error) {
resp, err := d.dataClient.RemoveTagsFromBinaryDataByFilter(ctx, &pb.RemoveTagsFromBinaryDataByFilterRequest{
Filter: filterToProto(filter),
Tags: tags,
})
if err != nil {
return 0, err
}
return int(resp.DeletedCount), nil
}
// TagsByFilter retrieves all unique tags associated with the data that match the specified filter.
// It returns the list of these unique tags. If no filter is given, all data tags are returned.
func (d *DataClient) TagsByFilter(ctx context.Context, filter *Filter) ([]string, error) {
resp, err := d.dataClient.TagsByFilter(ctx, &pb.TagsByFilterRequest{
Filter: filterToProto(filter),
})
if err != nil {
return nil, err
}
return resp.Tags, nil
}
// AddBoundingBoxToImageByID adds a bounding box to an image with the specified ID,
// using the provided label and position in normalized coordinates.
// All normalized coordinates (xMin, yMin, xMax, yMax) must be float values in the range [0, 1].
func (d *DataClient) AddBoundingBoxToImageByID(
ctx context.Context,
binaryID *BinaryID,
label string,
xMinNormalized float64,
yMinNormalized float64,
xMaxNormalized float64,
yMaxNormalized float64,
) (string, error) {
resp, err := d.dataClient.AddBoundingBoxToImageByID(ctx, &pb.AddBoundingBoxToImageByIDRequest{
BinaryId: binaryIDToProto(binaryID),
Label: label,
XMinNormalized: xMinNormalized,
YMinNormalized: yMinNormalized,
XMaxNormalized: xMaxNormalized,
YMaxNormalized: yMaxNormalized,
})
if err != nil {
return "", err
}
return resp.BboxId, nil
}
// RemoveBoundingBoxFromImageByID removes a bounding box from an image with the given ID.
func (d *DataClient) RemoveBoundingBoxFromImageByID(
ctx context.Context,
bboxID string,
binaryID *BinaryID,
) error {
_, err := d.dataClient.RemoveBoundingBoxFromImageByID(ctx, &pb.RemoveBoundingBoxFromImageByIDRequest{
BinaryId: binaryIDToProto(binaryID),
BboxId: bboxID,
})
return err
}
// BoundingBoxLabelsByFilter retrieves all unique string labels for bounding boxes that match the specified filter.
// It returns a list of these labels. If no filter is given, all labels are returned.
func (d *DataClient) BoundingBoxLabelsByFilter(ctx context.Context, filter *Filter) ([]string, error) {
resp, err := d.dataClient.BoundingBoxLabelsByFilter(ctx, &pb.BoundingBoxLabelsByFilterRequest{
Filter: filterToProto(filter),
})
if err != nil {
return nil, err
}
return resp.Labels, nil
}
// UpdateBoundingBox updates the bounding box for a given bbox ID for the file represented by the binary ID.
func (d *DataClient) UpdateBoundingBox(ctx context.Context, binaryID *BinaryID, bboxID string, opts *UpdateBoundingBoxOptions) error {
var label *string
var xMinNormalized, yMinNormalized, xMaxNormalized, yMaxNormalized *float64
if opts != nil {
label = opts.Label
xMinNormalized = opts.XMinNormalized
yMinNormalized = opts.YMinNormalized
xMaxNormalized = opts.XMaxNormalized
yMaxNormalized = opts.YMaxNormalized
}
_, err := d.dataClient.UpdateBoundingBox(ctx, &pb.UpdateBoundingBoxRequest{
BinaryId: binaryIDToProto(binaryID),
BboxId: bboxID,
Label: label,
XMinNormalized: xMinNormalized,
YMinNormalized: yMinNormalized,
XMaxNormalized: xMaxNormalized,
YMaxNormalized: yMaxNormalized,
})
return err
}
// GetDatabaseConnection establishes a connection to a MongoDB Atlas Data Federation instance.
// It returns the hostname endpoint, a URI for connecting to the database via MongoDB clients,
// and a flag indicating whether a database user is configured for the Viam organization.
func (d *DataClient) GetDatabaseConnection(ctx context.Context, organizationID string) (*GetDatabaseConnectionResponse, error) {
resp, err := d.dataClient.GetDatabaseConnection(ctx, &pb.GetDatabaseConnectionRequest{
OrganizationId: organizationID,
})
if err != nil {
return nil, err
}
return &GetDatabaseConnectionResponse{
Hostname: resp.Hostname,
MongodbURI: resp.MongodbUri,
HasDatabaseUser: resp.HasDatabaseUser,
}, nil
}
// ConfigureDatabaseUser configures a database user for the Viam organization's MongoDB Atlas Data Federation instance.
func (d *DataClient) ConfigureDatabaseUser(
ctx context.Context,
organizationID string,
password string,
) error {
_, err := d.dataClient.ConfigureDatabaseUser(ctx, &pb.ConfigureDatabaseUserRequest{
OrganizationId: organizationID,
Password: password,
})
return err
}
// AddBinaryDataToDatasetByIDs adds the binary data with the given binary IDs to the dataset.
func (d *DataClient) AddBinaryDataToDatasetByIDs(
ctx context.Context,
binaryIDs []*BinaryID,
datasetID string,
) error {
_, err := d.dataClient.AddBinaryDataToDatasetByIDs(ctx, &pb.AddBinaryDataToDatasetByIDsRequest{
BinaryIds: binaryIDsToProto(binaryIDs),
DatasetId: datasetID,
})
return err
}
// RemoveBinaryDataFromDatasetByIDs removes the binary data with the given binary IDs from the dataset.
func (d *DataClient) RemoveBinaryDataFromDatasetByIDs(
ctx context.Context,
binaryIDs []*BinaryID,
datasetID string,
) error {
_, err := d.dataClient.RemoveBinaryDataFromDatasetByIDs(ctx, &pb.RemoveBinaryDataFromDatasetByIDsRequest{
BinaryIds: binaryIDsToProto(binaryIDs),
DatasetId: datasetID,
})
return err
}
// BinaryDataCaptureUpload uploads the contents and metadata for binary data.
func (d *DataClient) BinaryDataCaptureUpload(
ctx context.Context,
binaryData []byte,
partID string,
componentType string,
componentName string,
methodName string,
fileExtension string,
options *BinaryDataCaptureUploadOptions,
) (string, error) {
var sensorMetadata SensorMetadata
metadata := UploadMetadata{
PartID: partID,
ComponentType: componentType,
ComponentName: componentName,
MethodName: methodName,
Type: DataTypeBinarySensor,
FileExtension: formatFileExtension(fileExtension),
}
if options != nil {
if options.FileName != nil {
metadata.FileName = *options.FileName
}
if options.MethodParameters != nil {
metadata.MethodParameters = options.MethodParameters
}
if options.Tags != nil {
metadata.Tags = options.Tags
}
if options.DataRequestTimes != nil && len(options.DataRequestTimes) == 2 {
sensorMetadata = SensorMetadata{
TimeRequested: options.DataRequestTimes[0],
TimeReceived: options.DataRequestTimes[1],
}
}
}
sensorData := SensorData{
Metadata: sensorMetadata,
SDStruct: nil,
SDBinary: binaryData,
}
response, err := d.dataCaptureUpload(ctx, metadata, []SensorData{sensorData})
if err != nil {
return "", err
}
return response, nil
}
// TabularDataCaptureUpload uploads the contents and metadata for tabular data.
func (d *DataClient) TabularDataCaptureUpload(
ctx context.Context,
tabularData []map[string]interface{},
partID string,
componentType string,
componentName string,
methodName string,
dataRequestTimes [][2]time.Time,
options *TabularDataCaptureUploadOptions,
) (string, error) {
if len(dataRequestTimes) != len(tabularData) {
return "", errors.New("dataRequestTimes and tabularData lengths must be equal")
}
var sensorContents []SensorData
for i, tabData := range tabularData {
sensorMetadata := SensorMetadata{}
dates := dataRequestTimes[i]
if len(dates) == 2 {
sensorMetadata.TimeRequested = dates[0]
sensorMetadata.TimeReceived = dates[1]
}
sensorData := SensorData{
Metadata: sensorMetadata,
SDStruct: tabData,
SDBinary: nil,
}
sensorContents = append(sensorContents, sensorData)
}
metadata := UploadMetadata{
PartID: partID,
ComponentType: componentType,
ComponentName: componentName,
MethodName: methodName,
Type: DataTypeTabularSensor,
}
if options != nil {
if options.FileName != nil {
metadata.FileName = *options.FileName
}
if options.MethodParameters != nil {
metadata.MethodParameters = options.MethodParameters
}
if options.FileExtension != nil {
metadata.FileExtension = formatFileExtension(*options.FileExtension)
}
if options.Tags != nil {
metadata.Tags = options.Tags
}
}
response, err := d.dataCaptureUpload(ctx, metadata, sensorContents)
if err != nil {
return "", err
}
return response, nil
}
// dataCaptureUpload uploads the metadata and contents for either tabular or binary data,
// and returns the file ID associated with the uploaded data and metadata.
func (d *DataClient) dataCaptureUpload(ctx context.Context, metadata UploadMetadata, sensorContents []SensorData) (string, error) {
sensorContentsPb, err := sensorContentsToProto(sensorContents)
if err != nil {
return "", err
}
resp, err := d.dataSyncClient.DataCaptureUpload(ctx, &syncPb.DataCaptureUploadRequest{
Metadata: uploadMetadataToProto(metadata),
SensorContents: sensorContentsPb,
})
if err != nil {
return "", err
}
return resp.FileId, nil
}
// StreamingDataCaptureUpload uploads metadata and streaming binary data in chunks.
func (d *DataClient) StreamingDataCaptureUpload(
ctx context.Context,
data []byte,
partID string,
fileExt string,
options *StreamingDataCaptureUploadOptions,
) (string, error) {
uploadMetadata := UploadMetadata{
PartID: partID,
Type: DataTypeBinarySensor,
FileExtension: fileExt,
}
var sensorMetadata SensorMetadata
if options != nil {
if options.ComponentType != nil {
uploadMetadata.ComponentType = *options.ComponentType
}
if options.ComponentName != nil {
uploadMetadata.ComponentName = *options.ComponentName
}
if options.MethodName != nil {
uploadMetadata.MethodName = *options.MethodName
}
if options.FileName != nil {
uploadMetadata.FileName = *options.FileName
}
if options.MethodParameters != nil {
uploadMetadata.MethodParameters = options.MethodParameters
}
if options.Tags != nil {
uploadMetadata.Tags = options.Tags
}
if options.DataRequestTimes != nil && len(options.DataRequestTimes) == 2 {
sensorMetadata = SensorMetadata{
TimeRequested: options.DataRequestTimes[0],
TimeReceived: options.DataRequestTimes[1],
}
}
}
uploadMetadataPb := uploadMetadataToProto(uploadMetadata)
sensorMetadataPb := sensorMetadataToProto(sensorMetadata)
metadata := &syncPb.DataCaptureUploadMetadata{
UploadMetadata: uploadMetadataPb,
SensorMetadata: sensorMetadataPb,
}
// establish a streaming connection.
stream, err := d.dataSyncClient.StreamingDataCaptureUpload(ctx)
if err != nil {
return "", err
}