Skip to content

Commit 2c9ad2a

Browse files
feat: Support description-to-DCID resolution for Spanner backend (#1799)
This PR implements description-based entity resolution for the V3 Spanner data source (see b/477664860). Changes: - V3 Implementation: Updated `SpannerDataSource` to support the `<-description->dcid` arc. - Decoupled the entity recognition logic in the `recon` package from the Bigtable backend, so that the shared resolution process (In-memory -> Maps API -> Storage) can be used by both V2 and V3 paths.
1 parent 12b4ddc commit 2c9ad2a

12 files changed

Lines changed: 401 additions & 66 deletions

cmd/main.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -194,8 +194,10 @@ func main() {
194194
sources := []datasource.DataSource{}
195195

196196
// Spanner Graph.
197+
var spannerClient spanner.SpannerClient
197198
if flags.EnableV3 && flags.UseSpannerGraph {
198-
spannerClient, err := spanner.NewSpannerClient(ctx, *spannerGraphInfo, flags.SpannerGraphDatabase, flags.UseStaleReads)
199+
var err error
200+
spannerClient, err = spanner.NewSpannerClient(ctx, *spannerGraphInfo, flags.SpannerGraphDatabase, flags.UseStaleReads)
199201
if err != nil {
200202
slog.Error("Failed to create Spanner client", "error", err)
201203
os.Exit(1)
@@ -204,9 +206,6 @@ func main() {
204206
spannerClient.Start()
205207
}
206208
defer spannerClient.Close()
207-
var ds datasource.DataSource = spanner.NewSpannerDataSource(spannerClient)
208-
// TODO: Order sources by priority once other implementations are added.
209-
sources = append(sources, ds)
210209
}
211210
slog.Info("After Spanner client creation")
212211

@@ -382,6 +381,13 @@ func main() {
382381
}
383382
}
384383

384+
// Initialize SpannerDataSource now that dependencies are ready.
385+
if spannerClient != nil {
386+
var ds datasource.DataSource = spanner.NewSpannerDataSource(spannerClient, store.RecogPlaceStore, mapsClient)
387+
// TODO: Order sources by priority once other implementations are added.
388+
sources = append(sources, ds)
389+
}
390+
385391
// Add remote data source if it was created.
386392
if remoteDataSource != nil {
387393
sources = append(sources, remoteDataSource)

internal/server/handler_v1.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -470,7 +470,7 @@ func (s *Server) VariableAncestors(
470470
func (s *Server) RecognizePlaces(
471471
ctx context.Context, in *pb.RecognizePlacesRequest,
472472
) (*pb.RecognizePlacesResponse, error) {
473-
return recon.RecognizePlaces(ctx, in, s.store, false)
473+
return recon.RecognizePlaces(ctx, in, s.store.RecogPlaceStore, false)
474474
}
475475

476476
// RecognizeEntities implements API for Mixer.RecognizeEntities.

internal/server/recon/find_entities.go

Lines changed: 71 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,11 @@ import (
2626
pbv1 "github.com/datacommonsorg/mixer/internal/proto/v1"
2727
"github.com/datacommonsorg/mixer/internal/server/v1/propertyvalues"
2828
"github.com/datacommonsorg/mixer/internal/store"
29+
"github.com/datacommonsorg/mixer/internal/store/files"
2930
"github.com/datacommonsorg/mixer/internal/util"
3031
"golang.org/x/sync/errgroup"
32+
"google.golang.org/grpc/codes"
33+
"google.golang.org/grpc/status"
3134
"googlemaps.github.io/maps"
3235
)
3336

@@ -36,9 +39,15 @@ const (
3639
maxMapsAPICallsInParallel = 25
3740
)
3841

39-
type entityInfo struct {
40-
description string
41-
typeOf string
42+
// PlaceIdToDcidFunc is a function that resolves a list of place IDs to a map of place ID to DCIDs.
43+
type PlaceIdToDcidFunc func(ctx context.Context, placeIds []string) (map[string][]string, error)
44+
45+
// GetPlaceTypesFunc is a function that resolves a set of DCIDs to their types.
46+
type GetPlaceTypesFunc func(ctx context.Context, dcidSet map[string]struct{}) (map[string]map[string]struct{}, error)
47+
48+
type EntityInfo struct {
49+
Description string
50+
TypeOf string
4251
}
4352

4453
// FindEntities implements API for Mixer.FindEntities.
@@ -88,18 +97,22 @@ func BulkFindEntities(
8897
}
8998

9099
// Load input.
91-
entityInfoSet := map[entityInfo]struct{}{}
100+
entityInfoSet := map[EntityInfo]struct{}{}
92101
for _, entity := range in.GetEntities() {
93102
description := entity.GetDescription()
94103
if description == "" {
95104
continue
96105
}
97-
entityInfoSet[entityInfo{description, entity.GetType()}] = struct{}{}
106+
entityInfoSet[EntityInfo{description, entity.GetType()}] = struct{}{}
98107
}
99108

100109
// Get DCIDs.
101-
entityInfoToDCIDs, dcidSet, err := resolveDCIDs(
102-
ctx, mapsClient, store, entityInfoSet)
110+
placeIdToDcidFunc := func(ctx context.Context, placeIds []string) (map[string][]string, error) {
111+
res, _, err := resolveDCIDsFromPlaceIDs(ctx, placeIds, store)
112+
return res, err
113+
}
114+
entityInfoToDCIDs, dcidSet, err := ResolveDCIDs(
115+
ctx, mapsClient, store.RecogPlaceStore, placeIdToDcidFunc, entityInfoSet)
103116
if err != nil {
104117
return nil, err
105118
}
@@ -114,12 +127,12 @@ func BulkFindEntities(
114127
resp := &pb.BulkFindEntitiesResponse{}
115128
for entityInfo, dcids := range entityInfoToDCIDs {
116129
entity := &pb.BulkFindEntitiesResponse_Entity{
117-
Description: entityInfo.description,
118-
Type: entityInfo.typeOf,
130+
Description: entityInfo.Description,
131+
Type: entityInfo.TypeOf,
119132
}
120133

121134
if len(dcids) != 0 {
122-
if entityInfo.typeOf == "" {
135+
if entityInfo.TypeOf == "" {
123136
// No type filtering.
124137
entity.Dcids = dcids
125138
} else {
@@ -130,7 +143,7 @@ func BulkFindEntities(
130143
if !ok {
131144
continue
132145
}
133-
if _, ok := typeSet[entityInfo.typeOf]; ok {
146+
if _, ok := typeSet[entityInfo.TypeOf]; ok {
134147
filteredDCIDs = append(filteredDCIDs, dcid)
135148
}
136149
}
@@ -158,34 +171,39 @@ func BulkFindEntities(
158171
//
159172
// Consider calling both, if both have results, prefer RecognizePlaces,
160173
// but use Maps API as signal to reorder the results.
161-
func resolveDCIDs(
174+
func ResolveDCIDs(
162175
ctx context.Context,
163176
mapsClient internalmaps.MapsClient,
164-
store *store.Store,
165-
entityInfoSet map[entityInfo]struct{},
177+
recogPlaceStore *files.RecogPlaceStore,
178+
placeIdToDcidFunc PlaceIdToDcidFunc,
179+
entityInfoSet map[EntityInfo]struct{},
166180
) (
167-
map[entityInfo][]string, /* entityInfo -> [DCID] */
181+
map[EntityInfo][]string, /* EntityInfo -> [DCID] */
168182
map[string]struct{}, /* [DCID] for all entities */
169183
error,
170184
) {
171185
// First try to resolve DCIDs by RecognizePlaces.
172186
entityInfoToDCIDs, dcidSet, err := resolveWithRecognizePlaces(
173-
ctx, store, entityInfoSet)
187+
ctx, recogPlaceStore, entityInfoSet)
174188
if err != nil {
175189
return nil, nil, err
176190
}
177191

178192
// See if there are any entities that cannot be resolved by RecognizePlaces.
179-
missingEntityInfoSet := map[entityInfo]struct{}{}
193+
missingEntityInfoSet := map[EntityInfo]struct{}{}
180194
for entityInfo := range entityInfoSet {
181195
if dcids, ok := entityInfoToDCIDs[entityInfo]; !ok || len(dcids) == 0 {
182196
missingEntityInfoSet[entityInfo] = struct{}{}
183197
}
184198
}
185199
if len(missingEntityInfoSet) > 0 {
200+
if mapsClient == nil {
201+
return nil, nil, status.Error(codes.FailedPrecondition, "Maps API client is required but not configured for description resolution")
202+
}
203+
186204
// For entities that cannot be resolved by RecognizePlaces, try Maps API.
187205
missingEntityInfoToDCIDSet, missingDcidSet, err := resolveWithMapsAPI(
188-
ctx, mapsClient, store, missingEntityInfoSet)
206+
ctx, mapsClient, placeIdToDcidFunc, missingEntityInfoSet)
189207
if err != nil {
190208
return nil, nil, err
191209
}
@@ -202,7 +220,7 @@ func resolveDCIDs(
202220
}
203221

204222
// Format the result, transform DCID set to DCID list.
205-
res := map[entityInfo][]string{}
223+
res := map[EntityInfo][]string{}
206224
for e, dcids := range entityInfoToDCIDs {
207225
res[e] = append(res[e], dcids...)
208226
}
@@ -212,10 +230,10 @@ func resolveDCIDs(
212230

213231
func resolveWithRecognizePlaces(
214232
ctx context.Context,
215-
store *store.Store,
216-
entityInfoSet map[entityInfo]struct{},
233+
recogPlaceStore *files.RecogPlaceStore,
234+
entityInfoSet map[EntityInfo]struct{},
217235
) (
218-
map[entityInfo][]string, /* entityInfo -> [DCID] */
236+
map[EntityInfo][]string, /* EntityInfo -> [DCID] */
219237
map[string]struct{}, /* DCID set for all entities */
220238
error,
221239
) {
@@ -226,7 +244,7 @@ func resolveWithRecognizePlaces(
226244
s := strings.ReplaceAll(strings.ToLower(n), " ", "")
227245
return strings.ReplaceAll(s, ",", "")
228246
}
229-
names, ok := store.RecogPlaceStore.DcidToNames[dcid]
247+
names, ok := recogPlaceStore.DcidToNames[dcid]
230248
if !ok {
231249
return false
232250
}
@@ -243,20 +261,20 @@ func resolveWithRecognizePlaces(
243261
}
244262
descriptionToType := map[string]string{}
245263
for e := range entityInfoSet {
246-
req.Queries = append(req.Queries, e.description)
247-
descriptionToType[e.description] = e.typeOf
264+
req.Queries = append(req.Queries, e.Description)
265+
descriptionToType[e.Description] = e.TypeOf
248266
}
249267

250-
resp, err := RecognizePlaces(ctx, req, store, true)
268+
resp, err := RecognizePlaces(ctx, req, recogPlaceStore, true)
251269
if err != nil {
252270
return nil, nil, err
253271
}
254272

255-
entityInfoToDCIDs := map[entityInfo][]string{}
273+
entityInfoToDCIDs := map[EntityInfo][]string{}
256274
dcidSet := map[string]struct{}{}
257275

258276
for query, items := range resp.GetQueryItems() {
259-
e := entityInfo{description: query, typeOf: descriptionToType[query]}
277+
e := EntityInfo{Description: query, TypeOf: descriptionToType[query]}
260278
entityInfoToDCIDs[e] = []string{}
261279
for _, item := range items.GetItems() {
262280
for _, place := range item.GetPlaces() {
@@ -275,10 +293,10 @@ func resolveWithRecognizePlaces(
275293
func resolveWithMapsAPI(
276294
ctx context.Context,
277295
mapsClient internalmaps.MapsClient,
278-
store *store.Store,
279-
entityInfoSet map[entityInfo]struct{},
296+
placeIdToDcidFunc PlaceIdToDcidFunc,
297+
entityInfoSet map[EntityInfo]struct{},
280298
) (
281-
map[entityInfo]map[string]struct{}, /* entityInfo -> DCID set */
299+
map[EntityInfo]map[string]struct{}, /* EntityInfo -> DCID set */
282300
map[string]struct{}, /* [DCID] for all entities */
283301
error,
284302
) {
@@ -290,16 +308,22 @@ func resolveWithMapsAPI(
290308
}
291309

292310
if len(placeIDSet) == 0 {
293-
return map[entityInfo]map[string]struct{}{}, map[string]struct{}{}, nil
311+
return map[EntityInfo]map[string]struct{}{}, map[string]struct{}{}, nil
294312
}
295313

296314
// Resolve place IDs to get DCIDs.
297-
placeIDToDCIDs, dcidSet, err := resolveDCIDsFromPlaceIDs(ctx, placeIDSet, store)
315+
placeIDToDCIDs, err := placeIdToDcidFunc(ctx, util.StringSetToSlice(placeIDSet))
298316
if err != nil {
299317
return nil, nil, err
300318
}
319+
dcidSet := map[string]struct{}{}
320+
for _, dcids := range placeIDToDCIDs {
321+
for _, dcid := range dcids {
322+
dcidSet[dcid] = struct{}{}
323+
}
324+
}
301325

302-
res := map[entityInfo]map[string]struct{}{}
326+
res := map[EntityInfo]map[string]struct{}{}
303327
for entityInfo, placeIDs := range entityInfoToPlaceIDs {
304328
if _, ok := res[entityInfo]; !ok {
305329
res[entityInfo] = map[string]struct{}{}
@@ -319,19 +343,19 @@ func resolveWithMapsAPI(
319343
func resolvePlaceIDsFromDescriptions(
320344
ctx context.Context,
321345
mapsClient internalmaps.MapsClient,
322-
entityInfoSet map[entityInfo]struct{},
346+
entityInfoSet map[EntityInfo]struct{},
323347
) (
324-
map[entityInfo][]string, /* entityInfo -> [place ID] */
348+
map[EntityInfo][]string, /* EntityInfo -> [place ID] */
325349
map[string]struct{}, /* [place ID] for all entities */
326350
error,
327351
) {
328352
type resolveResult struct {
329-
entityInfo *entityInfo
353+
entityInfo *EntityInfo
330354
placeIDs []string
331355
}
332356

333357
// Distribute entityInfoSet to maxMapsAPICallsInParallel shards for parallel processing.
334-
entityInfoListShards := make([][]entityInfo, maxMapsAPICallsInParallel)
358+
entityInfoListShards := make([][]EntityInfo, maxMapsAPICallsInParallel)
335359
idx := 0
336360
for entityInfo := range entityInfoSet {
337361
entityInfoListShards[idx] = append(entityInfoListShards[idx], entityInfo)
@@ -348,7 +372,8 @@ func resolvePlaceIDsFromDescriptions(
348372
mapsAPICallWorkerFunc := func(ctx context.Context, i int) func() error {
349373
return func() error {
350374
for _, entityInfo := range entityInfoListShards[i] {
351-
placeIDs, err := findPlaceIDsForEntity(ctx, mapsClient, &entityInfo)
375+
info := entityInfo
376+
placeIDs, err := findPlaceIDsForEntity(ctx, mapsClient, &info)
352377
if err != nil {
353378
return err
354379
}
@@ -358,7 +383,7 @@ func resolvePlaceIDsFromDescriptions(
358383
usedPlaceIds = []string{placeIDs[0]}
359384
}
360385
resolveResultChan <- resolveResult{
361-
entityInfo: &entityInfo,
386+
entityInfo: &info,
362387
placeIDs: usedPlaceIds}
363388
}
364389
return nil
@@ -377,7 +402,7 @@ func resolvePlaceIDsFromDescriptions(
377402
close(resolveResultChan)
378403

379404
// Read out the results sent by workers.
380-
entityInfoToPlaceIDs := map[entityInfo][]string{}
405+
entityInfoToPlaceIDs := map[EntityInfo][]string{}
381406
placeIDSet := map[string]struct{}{}
382407
for res := range resolveResultChan {
383408
entityInfoToPlaceIDs[*res.entityInfo] = res.placeIDs
@@ -392,11 +417,11 @@ func resolvePlaceIDsFromDescriptions(
392417
func findPlaceIDsForEntity(
393418
ctx context.Context,
394419
mapsClient internalmaps.MapsClient,
395-
entityInfo *entityInfo,
420+
entityInfo *EntityInfo,
396421
) ([]string, error) {
397422
// When type is supplied, we append it to the description to increase the accuracy.
398-
input := entityInfo.description
399-
if t := entityInfo.typeOf; t != "" {
423+
input := entityInfo.Description
424+
if t := entityInfo.TypeOf; t != "" {
400425
input += (" " + t)
401426
}
402427

@@ -419,7 +444,7 @@ func findPlaceIDsForEntity(
419444

420445
func resolveDCIDsFromPlaceIDs(
421446
ctx context.Context,
422-
placeIDSet map[string]struct{},
447+
placeIDs []string,
423448
store *store.Store,
424449
) (
425450
map[string][]string, /* Place ID -> [DCID] */
@@ -430,7 +455,7 @@ func resolveDCIDsFromPlaceIDs(
430455
&pb.ResolveIdsRequest{
431456
InProp: "placeId",
432457
OutProp: "dcid",
433-
Ids: util.StringSetToSlice(placeIDSet),
458+
Ids: placeIDs,
434459
},
435460
store)
436461
if err != nil {

internal/server/recon/recognize.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,11 +46,11 @@ var (
4646
func RecognizePlaces(
4747
ctx context.Context,
4848
in *pb.RecognizePlacesRequest,
49-
store *store.Store,
49+
recogPlaceStore *files.RecogPlaceStore,
5050
resolveBogusName bool,
5151
) (*pb.RecognizePlacesResponse, error) {
5252
pr := &placeRecognition{
53-
recogPlaceStore: store.RecogPlaceStore,
53+
recogPlaceStore: recogPlaceStore,
5454
resolveBogusName: resolveBogusName,
5555
}
5656

0 commit comments

Comments
 (0)