diff --git a/internal/proto/storage.pb.go b/internal/proto/storage.pb.go new file mode 100644 index 000000000..c77ea1077 --- /dev/null +++ b/internal/proto/storage.pb.go @@ -0,0 +1,156 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.30.0 +// protoc v3.21.12 +// source: storage.proto + +package proto + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// Observations represent the observations (time series) stored in the observations column in the Observation table. +type Observations struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Map from date to value. + // Examples: "2024" -> "123", "2025-05" -> "-456.78" + Values map[string]string `protobuf:"bytes,1,rep,name=values,proto3" json:"values,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` +} + +func (x *Observations) Reset() { + *x = Observations{} + if protoimpl.UnsafeEnabled { + mi := &file_storage_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Observations) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Observations) ProtoMessage() {} + +func (x *Observations) ProtoReflect() protoreflect.Message { + mi := &file_storage_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Observations.ProtoReflect.Descriptor instead. +func (*Observations) Descriptor() ([]byte, []int) { + return file_storage_proto_rawDescGZIP(), []int{0} +} + +func (x *Observations) GetValues() map[string]string { + if x != nil { + return x.Values + } + return nil +} + +var File_storage_proto protoreflect.FileDescriptor + +var file_storage_proto_rawDesc = []byte{ + 0x0a, 0x0d, 0x73, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, + 0x0b, 0x64, 0x61, 0x74, 0x61, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x73, 0x22, 0x88, 0x01, 0x0a, + 0x0c, 0x4f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x3d, 0x0a, + 0x06, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x25, 0x2e, + 0x64, 0x61, 0x74, 0x61, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, 0x73, 0x2e, 0x4f, 0x62, 0x73, 0x65, + 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x2e, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x45, + 0x6e, 0x74, 0x72, 0x79, 0x52, 0x06, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x1a, 0x39, 0x0a, 0x0b, + 0x56, 0x61, 0x6c, 0x75, 0x65, 0x73, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, + 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, + 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, + 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x42, 0x30, 0x5a, 0x2e, 0x67, 0x69, 0x74, 0x68, 0x75, + 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x64, 0x61, 0x74, 0x61, 0x63, 0x6f, 0x6d, 0x6d, 0x6f, 0x6e, + 0x73, 0x6f, 0x72, 0x67, 0x2f, 0x6d, 0x69, 0x78, 0x65, 0x72, 0x2f, 0x69, 0x6e, 0x74, 0x65, 0x72, + 0x6e, 0x61, 0x6c, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x33, +} + +var ( + file_storage_proto_rawDescOnce sync.Once + file_storage_proto_rawDescData = file_storage_proto_rawDesc +) + +func file_storage_proto_rawDescGZIP() []byte { + file_storage_proto_rawDescOnce.Do(func() { + file_storage_proto_rawDescData = protoimpl.X.CompressGZIP(file_storage_proto_rawDescData) + }) + return file_storage_proto_rawDescData +} + +var file_storage_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_storage_proto_goTypes = []interface{}{ + (*Observations)(nil), // 0: datacommons.Observations + nil, // 1: datacommons.Observations.ValuesEntry +} +var file_storage_proto_depIdxs = []int32{ + 1, // 0: datacommons.Observations.values:type_name -> datacommons.Observations.ValuesEntry + 1, // [1:1] is the sub-list for method output_type + 1, // [1:1] is the sub-list for method input_type + 1, // [1:1] is the sub-list for extension type_name + 1, // [1:1] is the sub-list for extension extendee + 0, // [0:1] is the sub-list for field type_name +} + +func init() { file_storage_proto_init() } +func file_storage_proto_init() { + if File_storage_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_storage_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Observations); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_storage_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_storage_proto_goTypes, + DependencyIndexes: file_storage_proto_depIdxs, + MessageInfos: file_storage_proto_msgTypes, + }.Build() + File_storage_proto = out.File + file_storage_proto_rawDesc = nil + file_storage_proto_goTypes = nil + file_storage_proto_depIdxs = nil +} diff --git a/internal/server/spanner/model.go b/internal/server/spanner/model.go index 4c90f4b69..8c98e6e03 100644 --- a/internal/server/spanner/model.go +++ b/internal/server/spanner/model.go @@ -16,10 +16,11 @@ package spanner import ( - "encoding/json" + "encoding/base64" "fmt" - "google.golang.org/protobuf/types/known/structpb" + pb "github.com/datacommonsorg/mixer/internal/proto" + "google.golang.org/protobuf/proto" ) // Property struct represents a subset of a row in the Edge table. @@ -65,25 +66,23 @@ type TimeSeries []*DateValue // DecodeSpanner decodes the observations field to a TimeSeries value. // This is inherited from the spanner Decoder interface to decode from a spanner type to a custom type. // Reference: https://cloud.google.com/go/docs/reference/cloud.google.com/go/spanner/latest#cloud_google_com_go_spanner_Decoder -// Note that the undecoded values are of type ListValue and each element a string value. +// Note that the undecoded value is a base64 encoded string. func (ts *TimeSeries) DecodeSpanner(val interface{}) (err error) { - listVal, ok := val.(*structpb.ListValue) - if !ok { - return fmt.Errorf("failed to decode TimeSeries: (%v)", val) + obs := &pb.Observations{} + decodedVal, err := base64.StdEncoding.DecodeString(val.(string)) + if err != nil { + return fmt.Errorf("failed to decode base64 encoded string: (%v)", err) + } + err = proto.Unmarshal(decodedVal, obs) + if err != nil { + return fmt.Errorf("failed to decode Observations: (%v)", err) } *ts = []*DateValue{} - for _, v := range listVal.Values { - var data map[string]string - err := json.Unmarshal([]byte(v.GetStringValue()), &data) - if err != nil { - return fmt.Errorf("failed to decode TimeSeries value: (%v)", v) - } - for date, strVal := range data { - *ts = append(*ts, &DateValue{ - Date: date, - Value: strVal, - }) - } + for date, value := range obs.Values { + *ts = append(*ts, &DateValue{ + Date: date, + Value: value, + }) } return nil } diff --git a/internal/server/spanner/statements.go b/internal/server/spanner/statements.go index 504b5e2bd..bbc7ce802 100644 --- a/internal/server/spanner/statements.go +++ b/internal/server/spanner/statements.go @@ -256,7 +256,7 @@ var statements = struct { variable_measured, observation_about, %s, - provenance, + provenance_url AS provenance, COALESCE(observation_period, '') AS observation_period, COALESCE(measurement_method, '') AS measurement_method, COALESCE(unit, '') AS unit, diff --git a/internal/server/spanner/temp/main.go b/internal/server/spanner/temp/main.go new file mode 100644 index 000000000..30a8dddd5 --- /dev/null +++ b/internal/server/spanner/temp/main.go @@ -0,0 +1,39 @@ +package main + +import ( + "context" + "fmt" + "log" + + "github.com/datacommonsorg/mixer/internal/server/spanner" +) + +const spannerInfoYaml = ` +project: datcom-store +instance: dc-kg-test +database: dc_graph_stable +` + +// This is a temporary program to test proto fields in spanner. +// Usage: go run internal/server/spanner/temp/main.go + +func main() { + ctx := context.Background() + client, err := spanner.NewSpannerClient(ctx, spannerInfoYaml) + if err != nil { + log.Fatalf("Failed to create SpannerClient: %v", err) + } + variables := []string{"AirPollutant_Cancer_Risk"} + entities := []string{"geoId/01001", "geoId/02013"} + obs, err := client.GetObservations(ctx, variables, entities, "", false) + if err != nil { + log.Fatalf("Failed to get observations: %v", err) + } + for _, o := range obs { + fmt.Printf("Observations for %s %s:\n", o.VariableMeasured, o.ObservationAbout) + for _, o2 := range o.Observations { + fmt.Printf(" %v %v\n", o2.Date, o2.Value) + } + + } +} diff --git a/proto/storage.proto b/proto/storage.proto new file mode 100644 index 000000000..109a2e840 --- /dev/null +++ b/proto/storage.proto @@ -0,0 +1,14 @@ +syntax = "proto3"; + +package datacommons; + +option go_package = "github.com/datacommonsorg/mixer/internal/proto"; + +// Includes protos that are used in spanner storage. + +// Observations represent the observations (time series) stored in the observations column in the Observation table. +message Observations { + // Map from date to value. + // Examples: "2024" -> "123", "2025-05" -> "-456.78" + map values = 1; +} \ No newline at end of file