diff --git a/internal/server/spanner/client.go b/internal/server/spanner/client.go index 35600db4d..a9df51e0d 100644 --- a/internal/server/spanner/client.go +++ b/internal/server/spanner/client.go @@ -24,6 +24,7 @@ import ( "time" "cloud.google.com/go/spanner" + pb "github.com/datacommonsorg/mixer/internal/proto" v2 "github.com/datacommonsorg/mixer/internal/server/v2" "github.com/datacommonsorg/mixer/internal/translator/types" "gopkg.in/yaml.v3" @@ -39,6 +40,7 @@ type SpannerClient interface { ResolveByID(ctx context.Context, nodes []string, in, out string) (map[string][]string, error) GetEventCollectionDate(ctx context.Context, placeID, eventType string) ([]string, error) Sparql(ctx context.Context, nodes []types.Node, queries []*types.Query, opts *types.QueryOptions) ([][]string, error) + GetVariableMetadata(ctx context.Context, ids []string) (map[string][]*pb.StatVarSummary_ProvenanceSummary, error) Id() string Start() Close() diff --git a/internal/server/spanner/golden/datasource_test.go b/internal/server/spanner/golden/datasource_test.go index e8c1d0fe3..c6c5384c2 100644 --- a/internal/server/spanner/golden/datasource_test.go +++ b/internal/server/spanner/golden/datasource_test.go @@ -61,12 +61,15 @@ func (m *mockSpannerClient) ResolveByID(ctx context.Context, nodes []string, in, func (m *mockSpannerClient) Sparql(ctx context.Context, nodes []types.Node, queries []*types.Query, opts *types.QueryOptions) ([][]string, error) { return nil, nil } +func (m *mockSpannerClient) GetVariableMetadata(ctx context.Context, ids []string) (map[string][]*pb.StatVarSummary_ProvenanceSummary, error) { + return nil, nil +} func (m *mockSpannerClient) GetEventCollectionDate(ctx context.Context, placeID, eventType string) ([]string, error) { return nil, nil } func (m *mockSpannerClient) Id() string { return "mock" } -func (m *mockSpannerClient) Start() {} -func (m *mockSpannerClient) Close() {} +func (m *mockSpannerClient) Start() {} +func (m *mockSpannerClient) Close() {} func TestSpannerResolve(t *testing.T) { client := test.NewSpannerClient() diff --git a/internal/server/spanner/golden/query/get_variable_metadata.json b/internal/server/spanner/golden/query/get_variable_metadata.json new file mode 100644 index 000000000..9a1f61b8d --- /dev/null +++ b/internal/server/spanner/golden/query/get_variable_metadata.json @@ -0,0 +1,91 @@ +{ + "Count_Household_FamilyHousehold": [ + { + "import_name": "CensusACS5YearSurvey", + "series_summary": [ + { + "series_key": { + "measurement_method": "CensusACS5yrSurvey" + }, + "earliest_date": "2011", + "latest_date": "2023", + "place_type_summary": { + "AdministrativeArea1": {}, + "AdministrativeArea2": {}, + "AdministrativeArea4": {}, + "AdministrativeArea5": {}, + "Borough": {}, + "CensusCoreBasedStatisticalArea": {}, + "CensusCountyDivision": {}, + "CensusDivision": {}, + "CensusTract": {}, + "CensusZipCodeTabulationArea": {}, + "City": {}, + "CongressionalDistrict": {}, + "Country": {}, + "County": {}, + "ElementarySchoolDistrict": {}, + "HighSchoolDistrict": {}, + "Neighborhood": {}, + "Place": {}, + "ProvisionalNode": {}, + "SchoolDistrict": {}, + "State": {}, + "StateComponent": {}, + "Town": {}, + "Village": {} + }, + "observation_count": 2547106, + "time_series_count": 220438 + } + ], + "observation_count": 2547106, + "time_series_count": 220438 + } + ], + "Count_Household_HasComputer": [ + { + "import_name": "CensusACS5YearSurvey", + "series_summary": [ + { + "series_key": { + "measurement_method": "CensusACS5yrSurvey" + }, + "earliest_date": "2017", + "latest_date": "2023", + "place_type_summary": { + "AdministrativeArea1": {}, + "AdministrativeArea2": {}, + "AdministrativeArea4": {}, + "AdministrativeArea5": {}, + "Borough": {}, + "CensusBlockGroup": {}, + "CensusCoreBasedStatisticalArea": {}, + "CensusCountyDivision": {}, + "CensusDivision": {}, + "CensusTract": {}, + "CensusZipCodeTabulationArea": {}, + "City": {}, + "CongressionalDistrict": {}, + "Country": {}, + "County": {}, + "ElementarySchoolDistrict": {}, + "HighSchoolDistrict": {}, + "Neighborhood": {}, + "Place": {}, + "ProvisionalNode": {}, + "SchoolDistrict": {}, + "State": {}, + "StateComponent": {}, + "Town": {}, + "Village": {} + }, + "observation_count": 2805865, + "time_series_count": 511517 + } + ], + "observation_count": 2805865, + "time_series_count": 511517 + } + ] +} \ No newline at end of file diff --git a/internal/server/spanner/golden/query_builder/get_variable_metadata.sql b/internal/server/spanner/golden/query_builder/get_variable_metadata.sql new file mode 100644 index 000000000..d145241bb --- /dev/null +++ b/internal/server/spanner/golden/query_builder/get_variable_metadata.sql @@ -0,0 +1,8 @@ + SELECT + key, + TO_JSON_STRING(value) AS value, + FROM + Cache + WHERE + type = 'ProvenanceSummary' + AND key IN ('Count_Household_FamilyHousehold','Count_Household_HasComputer') \ No newline at end of file diff --git a/internal/server/spanner/golden/query_builder_test.go b/internal/server/spanner/golden/query_builder_test.go index 6f5cbcdb5..54de2f6cb 100644 --- a/internal/server/spanner/golden/query_builder_test.go +++ b/internal/server/spanner/golden/query_builder_test.go @@ -125,6 +125,18 @@ func TestSparqlQuery(t *testing.T) { } } +func TestGetVariableMetadataQuery(t *testing.T) { + t.Parallel() + + for _, c := range variableMetadataTestCases { + goldenFile := c.golden + ".sql" + + runQueryBuilderGoldenTest(t, goldenFile, func(ctx context.Context) (interface{}, error) { + return spanner.GetCacheDataQuery(spanner.TypeProvenanceSummary, c.variables), nil + }) + } +} + func TestGetEventCollectionDateQuery(t *testing.T) { t.Parallel() diff --git a/internal/server/spanner/golden/query_cases_test.go b/internal/server/spanner/golden/query_cases_test.go index e53d50853..e16c0f76b 100644 --- a/internal/server/spanner/golden/query_cases_test.go +++ b/internal/server/spanner/golden/query_cases_test.go @@ -411,6 +411,16 @@ var sparqlTestCases = []struct { }, } +var variableMetadataTestCases = []struct { + variables []string + golden string +}{ + { + variables: []string{"Count_Household_FamilyHousehold", "Count_Household_HasComputer"}, + golden: "get_variable_metadata", + }, +} + var eventCollectionDateTestCases = []struct { placeDcid string eventType string diff --git a/internal/server/spanner/golden/query_test.go b/internal/server/spanner/golden/query_test.go index fd19ce569..37cc329f1 100644 --- a/internal/server/spanner/golden/query_test.go +++ b/internal/server/spanner/golden/query_test.go @@ -190,6 +190,23 @@ func TestSparql(t *testing.T) { } } +func TestGetVariableMetadata(t *testing.T) { + client := test.NewSpannerClient() + if client == nil { + return + } + + t.Parallel() + + for _, c := range variableMetadataTestCases { + goldenFile := c.golden + ".json" + + runQueryGoldenTest(t, goldenFile, func(ctx context.Context) (interface{}, error) { + return client.GetVariableMetadata(ctx, c.variables) + }) + } +} + func TestGetEventCollectionDate(t *testing.T) { client := test.NewSpannerClient() if client == nil { diff --git a/internal/server/spanner/model.go b/internal/server/spanner/model.go index 81c259dc9..c51a8d8a0 100644 --- a/internal/server/spanner/model.go +++ b/internal/server/spanner/model.go @@ -24,6 +24,13 @@ import ( "google.golang.org/protobuf/proto" ) +// CacheDataType represents the type of cache data. +type CacheDataType string + +const ( + TypeProvenanceSummary = "ProvenanceSummary" +) + // Property struct represents a subset of a row in the Edge table. type Property struct { SubjectID string `spanner:"subject_id"` diff --git a/internal/server/spanner/query.go b/internal/server/spanner/query.go index 37426b201..1e0dec52b 100644 --- a/internal/server/spanner/query.go +++ b/internal/server/spanner/query.go @@ -23,10 +23,13 @@ import ( "cloud.google.com/go/spanner" "github.com/datacommonsorg/mixer/internal/metrics" + pb "github.com/datacommonsorg/mixer/internal/proto" v2 "github.com/datacommonsorg/mixer/internal/server/v2" "github.com/datacommonsorg/mixer/internal/translator/types" "google.golang.org/api/iterator" "google.golang.org/grpc/codes" + "google.golang.org/protobuf/encoding/protojson" + "google.golang.org/protobuf/proto" ) const ( @@ -46,8 +49,9 @@ func (sc *spannerDatabaseClient) GetNodeProps(ctx context.Context, ids []string, props[id] = []*Property{} } - err := sc.queryStructs( + err := queryStructs( ctx, + sc, *GetNodePropsQuery(ids, out), func() interface{} { return &Property{} @@ -75,8 +79,9 @@ func (sc *spannerDatabaseClient) GetNodeEdgesByID(ctx context.Context, ids []str edges[id] = []*Edge{} } - err := sc.queryStructs( + err := queryStructs( ctx, + sc, *GetNodeEdgesByIDQuery(ids, arc, pageSize, offset), func() interface{} { return &Edge{} @@ -101,8 +106,9 @@ func (sc *spannerDatabaseClient) GetObservations(ctx context.Context, variables return nil, fmt.Errorf("entity must be specified") } - err := sc.queryStructs( + err := queryStructs( ctx, + sc, *GetObservationsQuery(variables, entities), func() interface{} { return &Observation{} @@ -126,8 +132,9 @@ func (sc *spannerDatabaseClient) GetObservationsContainedInPlace(ctx context.Con return observations, nil } - err := sc.queryStructs( + err := queryStructs( ctx, + sc, *GetObservationsContainedInPlaceQuery(variables, containedInPlace), func() interface{} { return &Observation{} @@ -153,8 +160,9 @@ func (sc *spannerDatabaseClient) SearchNodes(ctx context.Context, query string, return nodes, nil } - err := sc.queryStructs( + err := queryStructs( ctx, + sc, *SearchNodesQuery(query, types), func() interface{} { return &SearchNode{} @@ -186,8 +194,9 @@ func (sc *spannerDatabaseClient) ResolveByID(ctx context.Context, nodes []string valueMap[value] = node } - err := sc.queryStructs( + err := queryStructs( ctx, + sc, *ResolveByIDQuery(nodes, in, out), func() interface{} { return &ResolutionCandidate{} @@ -208,7 +217,7 @@ func (sc *spannerDatabaseClient) ResolveByID(ctx context.Context, nodes []string // GetEventCollectionDate retrieves event collection dates from Spanner. func (sc *spannerDatabaseClient) GetEventCollectionDate(ctx context.Context, placeID, eventType string) ([]string, error) { stmt := GetEventCollectionDateQuery(placeID, eventType) - rows, err := sc.queryDynamic(ctx, *stmt) + rows, err := queryDynamic(ctx, sc, *stmt) if err != nil { return nil, err } @@ -228,7 +237,28 @@ func (sc *spannerDatabaseClient) Sparql(ctx context.Context, nodes []types.Node, return nil, fmt.Errorf("error building sparql query: %v", err) } - return sc.queryDynamic(ctx, *query) + return queryDynamic(ctx, sc, *query) +} + +func (sc *spannerDatabaseClient) GetVariableMetadata(ctx context.Context, variables []string) (map[string][]*pb.StatVarSummary_ProvenanceSummary, error) { + if len(variables) == 0 { + return map[string][]*pb.StatVarSummary_ProvenanceSummary{}, + nil + } + + results, err := queryCache( + ctx, + sc, + *GetCacheDataQuery(TypeProvenanceSummary, variables), + func() *pb.StatVarSummary_ProvenanceSummary { + return &pb.StatVarSummary_ProvenanceSummary{} + }, + ) + if err != nil { + return nil, err + } + + return results, nil } // fetchAndUpdateTimestamp queries Spanner and updates the timestamp. @@ -293,32 +323,50 @@ func (sc *spannerDatabaseClient) executeQuery( } // queryStructs executes a query and maps the results to an input struct. -func (sc *spannerDatabaseClient) queryStructs( +func queryStructs( ctx context.Context, + sc *spannerDatabaseClient, stmt spanner.Statement, newStruct func() interface{}, withStruct func(interface{}), ) error { return sc.executeQuery(ctx, stmt, func(iter *spanner.RowIterator) error { - return sc.processRows(iter, newStruct, withStruct) + return processRows(iter, newStruct, withStruct) }) } // queryDynamic executes a dynamically constructed query and returns the results as a slice of string slices. -func (sc *spannerDatabaseClient) queryDynamic( +func queryDynamic( ctx context.Context, + sc *spannerDatabaseClient, stmt spanner.Statement, ) ([][]string, error) { var rowData [][]string err := sc.executeQuery(ctx, stmt, func(iter *spanner.RowIterator) error { - result, err := sc.processDynamicRows(iter) + result, err := processDynamicRows(iter) rowData = result return err }) return rowData, err } -func (sc *spannerDatabaseClient) processRows(iter *spanner.RowIterator, newStruct func() interface{}, withStruct func(interface{})) error { +// queryCache executes a query and maps the results to an input cache proto. +func queryCache[T proto.Message]( + ctx context.Context, + sc *spannerDatabaseClient, + stmt spanner.Statement, + newProto func() T, +) (map[string][]T, error) { + var data map[string][]T + err := sc.executeQuery(ctx, stmt, func(iter *spanner.RowIterator) error { + result, err := processCacheRows(iter, newProto) + data = result + return err + }) + return data, err +} + +func processRows(iter *spanner.RowIterator, newStruct func() interface{}, withStruct func(interface{})) error { for { row, err := iter.Next() if err == iterator.Done { @@ -339,7 +387,7 @@ func (sc *spannerDatabaseClient) processRows(iter *spanner.RowIterator, newStruc } // processDynamicRows processes rows from dynamically constructed queries. -func (sc *spannerDatabaseClient) processDynamicRows(iter *spanner.RowIterator) ([][]string, error) { +func processDynamicRows(iter *spanner.RowIterator) ([][]string, error) { rowData := [][]string{} for { row, err := iter.Next() @@ -362,3 +410,39 @@ func (sc *spannerDatabaseClient) processDynamicRows(iter *spanner.RowIterator) ( } return rowData, nil } + +// processCacheRows processes rows and maps them to a proto struct. +func processCacheRows[T proto.Message](iter *spanner.RowIterator, newProto func() T) (map[string][]T, error) { + results := make(map[string][]T) + unmarshaler := protojson.UnmarshalOptions{DiscardUnknown: true} + + for { + row, err := iter.Next() + if err == iterator.Done { + break + } + if err != nil { + return nil, fmt.Errorf("failed to fetch row: %w", err) + } + + var key string + if err := row.ColumnByName("key", &key); err != nil { + return nil, fmt.Errorf("failed to read key column: %w", err) + } + + var jsonStr spanner.NullString + if err := row.ColumnByName("value", &jsonStr); err != nil { + return nil, fmt.Errorf("failed to read value column: %w", err) + } + + if jsonStr.Valid { + msg := newProto() + if err := unmarshaler.Unmarshal([]byte(jsonStr.StringVal), msg); err != nil { + return nil, fmt.Errorf("failed to unmarshal proto: %w", err) + } + results[key] = append(results[key], msg) + } + } + + return results, nil +} diff --git a/internal/server/spanner/query_builder.go b/internal/server/spanner/query_builder.go index 004cde0a5..656f42740 100644 --- a/internal/server/spanner/query_builder.go +++ b/internal/server/spanner/query_builder.go @@ -348,6 +348,19 @@ func SparqlQuery(nodes []types.Node, queries []*types.Query, opts *types.QueryOp }, nil } +func GetCacheDataQuery(typeFilter CacheDataType, keys []string) *spanner.Statement { + keyFilter, keyVal := getParamStatement("key", keys) + params := map[string]interface{}{ + "type": string(typeFilter), + "key": keyVal, + } + + return &spanner.Statement{ + SQL: fmt.Sprintf(statements.getCacheData, keyFilter), + Params: params, + } +} + // generateSafeAliasMap generates a map of safe aliases for SPARQL queries. func generateSafeAliasMap(queries []*types.Query) map[string]string { safeAliasMap := make(map[string]string) @@ -403,6 +416,7 @@ func getParamStatement(param string, inputs []string) (string, interface{}) { } return fmt.Sprintf(statements.getParams, param), inputs } + func GetEventCollectionDateQuery(placeID, eventType string) *spanner.Statement { return &spanner.Statement{ SQL: statements.getEventCollectionDate, diff --git a/internal/server/spanner/statements.go b/internal/server/spanner/statements.go index fc8ed90bb..38c28cc95 100644 --- a/internal/server/spanner/statements.go +++ b/internal/server/spanner/statements.go @@ -85,6 +85,8 @@ var statements = struct { nodeFilter string // Generic triple pattern. triple string + // Get data from Cache table. + getCacheData string // Fetch event dates for a given type and location. getEventCollectionDate string }{ @@ -294,6 +296,14 @@ var statements = struct { WHERE %[1]s.subject_id IN UNNEST(@%[1]s)`, triple: `(%[1]s:Node%[2]s)-[:Edge {predicate: @predicate%[3]d}]->(%[4]s:Node%[5]s)`, + getCacheData: ` SELECT + key, + TO_JSON_STRING(value) AS value, + FROM + Cache + WHERE + type = @type + AND key %s`, getEventCollectionDate: ` @{force_join_order=true} GRAPH DCGraph MATCH (event:Node)-[:Edge {predicate: 'affectedPlace', object_id: @placeID}]->(), (event)-[:Edge {predicate: 'typeOf', object_id: @eventType}]->(), (event)-[:Edge {predicate: 'startDate'}]->(dateNode:Node) RETURN DISTINCT