From e492d4218a59cadf350c31ca1c14faf224a4d350 Mon Sep 17 00:00:00 2001 From: Alex Van Boxel Date: Sun, 26 Jan 2025 20:59:25 +0100 Subject: [PATCH] [receiver/googlecloudpubsub] Turn noisy warn in reset metric (#37571) --- ...breceiver-turn-warn-into-reset-metric.yaml | 17 +++++ .../documentation.md | 20 +++++ receiver/googlecloudpubsubreceiver/factory.go | 10 +-- receiver/googlecloudpubsubreceiver/go.mod | 36 ++++----- receiver/googlecloudpubsubreceiver/go.sum | 56 +++++++------- .../internal/handler.go | 58 +++++++++------ .../internal/handler_test.go | 10 ++- .../internal/metadata/generated_telemetry.go | 56 ++++++++++++++ .../metadata/generated_telemetry_test.go | 74 +++++++++++++++++++ .../metadatatest/generated_telemetrytest.go | 64 ++++++++++++++++ .../generated_telemetrytest_test.go | 40 ++++++++++ .../googlecloudpubsubreceiver/metadata.yaml | 14 ++++ .../googlecloudpubsubreceiver/receiver.go | 23 ++++-- .../receiver_test.go | 14 ++-- 14 files changed, 403 insertions(+), 89 deletions(-) create mode 100644 .chloggen/pubsubreceiver-turn-warn-into-reset-metric.yaml create mode 100644 receiver/googlecloudpubsubreceiver/documentation.md create mode 100644 receiver/googlecloudpubsubreceiver/internal/metadata/generated_telemetry.go create mode 100644 receiver/googlecloudpubsubreceiver/internal/metadata/generated_telemetry_test.go create mode 100644 receiver/googlecloudpubsubreceiver/internal/metadatatest/generated_telemetrytest.go create mode 100644 receiver/googlecloudpubsubreceiver/internal/metadatatest/generated_telemetrytest_test.go diff --git a/.chloggen/pubsubreceiver-turn-warn-into-reset-metric.yaml b/.chloggen/pubsubreceiver-turn-warn-into-reset-metric.yaml new file mode 100644 index 0000000000000..ca4d1c8218f07 --- /dev/null +++ b/.chloggen/pubsubreceiver-turn-warn-into-reset-metric.yaml @@ -0,0 +1,17 @@ +change_type: enhancement + +component: googlecloudpubsubreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Turn noisy `warn` log about Pub/Sub servers into `debug`, and turn the reset count into a metric + +issues: [37571] + +subtext: | + The receiver uses the Google Cloud Pub/Sub StreamingPull API and keeps a open connection. The Pub/Sub servers + recurrently close the connection after a time period to avoid a long-running sticky connection. Before the + receiver logged `warn` log lines everytime this happened. These log lines are moved to debug so that fleets with + lots of collectors with the receiver don't span logs at warn level. To keep track of the resets, whenever a + connection reset happens a `otelcol_receiver_googlecloudpubsub_stream_restarts` metric is increased by one. + +change_logs: [user] diff --git a/receiver/googlecloudpubsubreceiver/documentation.md b/receiver/googlecloudpubsubreceiver/documentation.md new file mode 100644 index 0000000000000..7cde645140ccb --- /dev/null +++ b/receiver/googlecloudpubsubreceiver/documentation.md @@ -0,0 +1,20 @@ +[comment]: <> (Code generated by mdatagen. DO NOT EDIT.) + +# googlecloudpubsub + +## Internal Telemetry + +The following telemetry is emitted by this component. + +### otelcol_receiver.googlecloudpubsub.stream_restarts + +Number of times the stream (re)starts due to a Pub/Sub servers connection close + +The receiver uses the Google Cloud Pub/Sub StreamingPull API and keeps a open connection. The Pub/Sub servers +recurrently close the connection after a time period to avoid a long-running sticky connection. This metric +counts the number of the resets that occurred during the lifetime of the container. + + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| 1 | Sum | Int | true | diff --git a/receiver/googlecloudpubsubreceiver/factory.go b/receiver/googlecloudpubsubreceiver/factory.go index 802718a55fb42..d627c3d0efbc1 100644 --- a/receiver/googlecloudpubsubreceiver/factory.go +++ b/receiver/googlecloudpubsubreceiver/factory.go @@ -41,24 +41,24 @@ func (factory *pubsubReceiverFactory) CreateDefaultConfig() component.Config { return &Config{} } -func (factory *pubsubReceiverFactory) ensureReceiver(params receiver.Settings, config component.Config) (*pubsubReceiver, error) { +func (factory *pubsubReceiverFactory) ensureReceiver(settings receiver.Settings, config component.Config) (*pubsubReceiver, error) { receiver := factory.receivers[config.(*Config)] if receiver != nil { return receiver, nil } rconfig := config.(*Config) obsrecv, err := receiverhelper.NewObsReport(receiverhelper.ObsReportSettings{ - ReceiverID: params.ID, + ReceiverID: settings.ID, Transport: reportTransport, - ReceiverCreateSettings: params, + ReceiverCreateSettings: settings, }) if err != nil { return nil, err } receiver = &pubsubReceiver{ - logger: params.Logger, + settings: settings, obsrecv: obsrecv, - userAgent: strings.ReplaceAll(rconfig.UserAgent, "{{version}}", params.BuildInfo.Version), + userAgent: strings.ReplaceAll(rconfig.UserAgent, "{{version}}", settings.BuildInfo.Version), config: rconfig, } factory.receivers[config.(*Config)] = receiver diff --git a/receiver/googlecloudpubsubreceiver/go.mod b/receiver/googlecloudpubsubreceiver/go.mod index a18dfec2d35b3..9ac2409f5b27e 100644 --- a/receiver/googlecloudpubsubreceiver/go.mod +++ b/receiver/googlecloudpubsubreceiver/go.mod @@ -20,6 +20,10 @@ require ( go.opentelemetry.io/collector/pdata v1.24.1-0.20250130000211-c119b2a55eb4 go.opentelemetry.io/collector/receiver v0.118.1-0.20250130000211-c119b2a55eb4 go.opentelemetry.io/collector/receiver/receivertest v0.118.1-0.20250130000211-c119b2a55eb4 + go.opentelemetry.io/otel v1.34.0 + go.opentelemetry.io/otel/metric v1.34.0 + go.opentelemetry.io/otel/sdk/metric v1.34.0 + go.opentelemetry.io/otel/trace v1.34.0 go.uber.org/goleak v1.3.0 go.uber.org/multierr v1.11.0 go.uber.org/zap v1.27.0 @@ -31,12 +35,12 @@ require ( ) require ( - cloud.google.com/go v0.117.0 // indirect - cloud.google.com/go/auth v0.14.0 // indirect + cloud.google.com/go v0.118.0 // indirect + cloud.google.com/go/auth v0.14.1 // indirect cloud.google.com/go/auth/oauth2adapt v0.2.7 // indirect cloud.google.com/go/compute/metadata v0.6.0 // indirect - cloud.google.com/go/iam v1.2.2 // indirect - cloud.google.com/go/longrunning v0.6.2 // indirect + cloud.google.com/go/iam v1.3.1 // indirect + cloud.google.com/go/longrunning v0.6.4 // indirect github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect github.com/felixge/httpsnoop v1.0.4 // indirect @@ -60,23 +64,19 @@ require ( go.einride.tech/aip v0.68.0 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect - go.opentelemetry.io/collector/config/configretry v1.24.1-0.20250130000211-c119b2a55eb4 // indirect + go.opentelemetry.io/collector/config/configretry v1.24.1-0.20250123125445-24f88da7b583 // indirect go.opentelemetry.io/collector/config/configtelemetry v0.118.1-0.20250130000211-c119b2a55eb4 // indirect - go.opentelemetry.io/collector/consumer/consumererror v0.118.1-0.20250130000211-c119b2a55eb4 // indirect - go.opentelemetry.io/collector/consumer/xconsumer v0.118.1-0.20250130000211-c119b2a55eb4 // indirect + go.opentelemetry.io/collector/consumer/consumererror v0.118.1-0.20250123125445-24f88da7b583 // indirect + go.opentelemetry.io/collector/consumer/xconsumer v0.118.1-0.20250123125445-24f88da7b583 // indirect go.opentelemetry.io/collector/extension v0.118.1-0.20250130000211-c119b2a55eb4 // indirect - go.opentelemetry.io/collector/extension/xextension v0.118.1-0.20250130000211-c119b2a55eb4 // indirect - go.opentelemetry.io/collector/featuregate v1.24.1-0.20250130000211-c119b2a55eb4 // indirect + go.opentelemetry.io/collector/extension/xextension v0.118.1-0.20250123125445-24f88da7b583 // indirect + go.opentelemetry.io/collector/featuregate v1.24.1-0.20250123125445-24f88da7b583 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.118.1-0.20250130000211-c119b2a55eb4 // indirect - go.opentelemetry.io/collector/pipeline v0.118.1-0.20250130000211-c119b2a55eb4 // indirect - go.opentelemetry.io/collector/receiver/xreceiver v0.118.1-0.20250130000211-c119b2a55eb4 // indirect - go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 // indirect - go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 // indirect - go.opentelemetry.io/otel v1.34.0 // indirect - go.opentelemetry.io/otel/metric v1.34.0 // indirect + go.opentelemetry.io/collector/pipeline v0.118.1-0.20250123125445-24f88da7b583 // indirect + go.opentelemetry.io/collector/receiver/xreceiver v0.118.1-0.20250123125445-24f88da7b583 // indirect + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.59.0 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.59.0 // indirect go.opentelemetry.io/otel/sdk v1.34.0 // indirect - go.opentelemetry.io/otel/sdk/metric v1.34.0 // indirect - go.opentelemetry.io/otel/trace v1.34.0 // indirect golang.org/x/crypto v0.32.0 // indirect golang.org/x/net v0.34.0 // indirect golang.org/x/oauth2 v0.25.0 // indirect @@ -84,7 +84,7 @@ require ( golang.org/x/sys v0.29.0 // indirect golang.org/x/text v0.21.0 // indirect golang.org/x/time v0.9.0 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20250124145028-65684f501c47 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250127172529-29210b9bc287 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/receiver/googlecloudpubsubreceiver/go.sum b/receiver/googlecloudpubsubreceiver/go.sum index 864d2d7dffe18..087d317994772 100644 --- a/receiver/googlecloudpubsubreceiver/go.sum +++ b/receiver/googlecloudpubsubreceiver/go.sum @@ -1,18 +1,18 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= -cloud.google.com/go v0.117.0 h1:Z5TNFfQxj7WG2FgOGX1ekC5RiXrYgms6QscOm32M/4s= -cloud.google.com/go v0.117.0/go.mod h1:ZbwhVTb1DBGt2Iwb3tNO6SEK4q+cplHZmLWH+DelYYc= -cloud.google.com/go/auth v0.14.0 h1:A5C4dKV/Spdvxcl0ggWwWEzzP7AZMJSEIgrkngwhGYM= -cloud.google.com/go/auth v0.14.0/go.mod h1:CYsoRL1PdiDuqeQpZE0bP2pnPrGqFcOkI0nldEQis+A= +cloud.google.com/go v0.118.0 h1:tvZe1mgqRxpiVa3XlIGMiPcEUbP1gNXELgD4y/IXmeQ= +cloud.google.com/go v0.118.0/go.mod h1:zIt2pkedt/mo+DQjcT4/L3NDxzHPR29j5HcclNH+9PM= +cloud.google.com/go/auth v0.14.1 h1:AwoJbzUdxA/whv1qj3TLKwh3XX5sikny2fc40wUl+h0= +cloud.google.com/go/auth v0.14.1/go.mod h1:4JHUxlGXisL0AW8kXPtUF6ztuOksyfUQNFjfsOCXkPM= cloud.google.com/go/auth/oauth2adapt v0.2.7 h1:/Lc7xODdqcEw8IrZ9SvwnlLX6j9FHQM74z6cBk9Rw6M= cloud.google.com/go/auth/oauth2adapt v0.2.7/go.mod h1:NTbTTzfvPl1Y3V1nPpOgl2w6d/FjO7NNUQaWSox6ZMc= cloud.google.com/go/compute/metadata v0.6.0 h1:A6hENjEsCDtC1k8byVsgwvVcioamEHvZ4j01OwKxG9I= cloud.google.com/go/compute/metadata v0.6.0/go.mod h1:FjyFAW1MW0C203CEOMDTu3Dk1FlqW3Rga40jzHL4hfg= -cloud.google.com/go/iam v1.2.2 h1:ozUSofHUGf/F4tCNy/mu9tHLTaxZFLOUiKzjcgWHGIA= -cloud.google.com/go/iam v1.2.2/go.mod h1:0Ys8ccaZHdI1dEUilwzqng/6ps2YB6vRsjIe00/+6JY= +cloud.google.com/go/iam v1.3.1 h1:KFf8SaT71yYq+sQtRISn90Gyhyf4X8RGgeAVC8XGf3E= +cloud.google.com/go/iam v1.3.1/go.mod h1:3wMtuyT4NcbnYNPLMBzYRFiEfjKfJlLVLrisE7bwm34= cloud.google.com/go/logging v1.13.0 h1:7j0HgAp0B94o1YRDqiqm26w4q1rDMH7XNRU34lJXHYc= cloud.google.com/go/logging v1.13.0/go.mod h1:36CoKh6KA/M0PbhPKMq6/qety2DCAErbhXT62TuXALA= -cloud.google.com/go/longrunning v0.6.2 h1:xjDfh1pQcWPEvnfjZmwjKQEcHnpz6lHjfy7Fo0MK+hc= -cloud.google.com/go/longrunning v0.6.2/go.mod h1:k/vIs83RN4bE3YCswdXC5PFfWVILjm3hpEUlSko4PiI= +cloud.google.com/go/longrunning v0.6.4 h1:3tyw9rO3E2XVXzSApn1gyEEnH2K9SynNQjMlBi3uHLg= +cloud.google.com/go/longrunning v0.6.4/go.mod h1:ttZpLCe6e7EXvn9OxpBRx7kZEB0efv8yBO6YnVMfhJs= cloud.google.com/go/pubsub v1.45.3 h1:prYj8EEAAAwkp6WNoGTE4ahe0DgHoyJd5Pbop931zow= cloud.google.com/go/pubsub v1.45.3/go.mod h1:cGyloK/hXC4at7smAtxFnXprKEFTqmMXNNd9w+bd94Q= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= @@ -128,20 +128,20 @@ go.opentelemetry.io/collector/component v0.118.1-0.20250130000211-c119b2a55eb4 h go.opentelemetry.io/collector/component v0.118.1-0.20250130000211-c119b2a55eb4/go.mod h1:X6S0R6vXECThDa1q+m682asO/WEfMWIMeafJsFidr3E= go.opentelemetry.io/collector/component/componenttest v0.118.1-0.20250130000211-c119b2a55eb4 h1:77B30QO8IrBUtD3mhvq4g1IiYDXerFb35g91+lK14Wk= go.opentelemetry.io/collector/component/componenttest v0.118.1-0.20250130000211-c119b2a55eb4/go.mod h1:VyxLB3apQi4ureRTv8HhSGrKpimubvgWs1yiXMubYmc= -go.opentelemetry.io/collector/config/configretry v1.24.1-0.20250130000211-c119b2a55eb4 h1:Rc46d7Cw4xdnqU5u+/glso1oIHx+y6nHXwwiu+5bVu8= -go.opentelemetry.io/collector/config/configretry v1.24.1-0.20250130000211-c119b2a55eb4/go.mod h1:cleBc9I0DIWpTiiHfu9v83FUaCTqcPXmebpLxjEIqro= +go.opentelemetry.io/collector/config/configretry v1.24.1-0.20250123125445-24f88da7b583 h1:lYbRbZvxqwB7TuzZb2wDX6pCZ0e0Sfjk5o6wD8Tz0gE= +go.opentelemetry.io/collector/config/configretry v1.24.1-0.20250123125445-24f88da7b583/go.mod h1:cleBc9I0DIWpTiiHfu9v83FUaCTqcPXmebpLxjEIqro= go.opentelemetry.io/collector/config/configtelemetry v0.118.1-0.20250130000211-c119b2a55eb4 h1:Cr/TN9njhCCu6YdqkWaNvQlSZVLI/W/EpWawGGymZgI= go.opentelemetry.io/collector/config/configtelemetry v0.118.1-0.20250130000211-c119b2a55eb4/go.mod h1:SlBEwQg0qly75rXZ6W1Ig8jN25KBVBkFIIAUI1GiAAE= go.opentelemetry.io/collector/confmap v1.24.1-0.20250130000211-c119b2a55eb4 h1:XMGNUDNAMmlGVGjTQgG9BCh1WOKSOZQSER47ZRi2gWk= go.opentelemetry.io/collector/confmap v1.24.1-0.20250130000211-c119b2a55eb4/go.mod h1:Rrhs+MWoaP6AswZp+ReQ2VO9dfOfcUjdjiSHBsG+nec= go.opentelemetry.io/collector/consumer v1.24.1-0.20250130000211-c119b2a55eb4 h1:39o68Sq9sE29nwMcHZYka4H3av1AbrCmPWC70/e65DE= go.opentelemetry.io/collector/consumer v1.24.1-0.20250130000211-c119b2a55eb4/go.mod h1:LN0625PHzPlGbzwGlRj5SG4/URKxkw/aFsBvvO4GQWU= -go.opentelemetry.io/collector/consumer/consumererror v0.118.1-0.20250130000211-c119b2a55eb4 h1:+Pe1VVAE1VDtJy15pP3+CUx5uIfssSCT/f4rLdzAR3Q= -go.opentelemetry.io/collector/consumer/consumererror v0.118.1-0.20250130000211-c119b2a55eb4/go.mod h1:wJiNWl/ieSTcok2WE4f1H9whdmS1N0KZrEwub85PMtw= +go.opentelemetry.io/collector/consumer/consumererror v0.118.1-0.20250123125445-24f88da7b583 h1:Cr3hudfh+YNB6F+wWlsLzZza3XvdkYApQsOgQ1Z739g= +go.opentelemetry.io/collector/consumer/consumererror v0.118.1-0.20250123125445-24f88da7b583/go.mod h1:/fhqEIxH0hmnDa6zm38XzsdURr5GrlC9oKO70JVorHU= go.opentelemetry.io/collector/consumer/consumertest v0.118.1-0.20250130000211-c119b2a55eb4 h1:/LvvZftwxAHPQiu81JKt0kpLZ1Pb6fMQT0/xZk0uMcE= go.opentelemetry.io/collector/consumer/consumertest v0.118.1-0.20250130000211-c119b2a55eb4/go.mod h1:RTVwAJCP2PHuSGTmsqa66pQqX6rSdkDqzjdXftmiO2w= -go.opentelemetry.io/collector/consumer/xconsumer v0.118.1-0.20250130000211-c119b2a55eb4 h1:jeynUhsoUF2gUBEcFyB9mrlUikCHkN2V+aZhcZIa8aA= -go.opentelemetry.io/collector/consumer/xconsumer v0.118.1-0.20250130000211-c119b2a55eb4/go.mod h1:++/yvA1L5F8+5dU7PdZ4Y4il5OtzJhdKjVYLureVsJY= +go.opentelemetry.io/collector/consumer/xconsumer v0.118.1-0.20250123125445-24f88da7b583 h1:kVq7l/BjtuqjWSBuYAA6pEB2ucYF/6qOXGOyrFgKhzk= +go.opentelemetry.io/collector/consumer/xconsumer v0.118.1-0.20250123125445-24f88da7b583/go.mod h1:Ij9o9d7hZb4be6ql6yqMR7xy5fcFR0SSD6RRIYWlu88= go.opentelemetry.io/collector/exporter v0.118.1-0.20250130000211-c119b2a55eb4 h1:u6NzPqppLcVgebTo5FbqNfkHjTm+qNbfCjewcD29HWc= go.opentelemetry.io/collector/exporter v0.118.1-0.20250130000211-c119b2a55eb4/go.mod h1:dOEuzXCvDxGDRwQepMPly5q5H8p+ccRElaAbshWQIzA= go.opentelemetry.io/collector/exporter/exportertest v0.118.0 h1:8gWky42BcJsxoaqWbnqCDUjP3Y84hjC6RD/UWHwR7sI= @@ -152,28 +152,28 @@ go.opentelemetry.io/collector/extension v0.118.1-0.20250130000211-c119b2a55eb4 h go.opentelemetry.io/collector/extension v0.118.1-0.20250130000211-c119b2a55eb4/go.mod h1:PnkzHrY8LNYZsUqiI2wqBp441hdx2gs/LjKJkvM8Ses= go.opentelemetry.io/collector/extension/extensiontest v0.118.0 h1:rKBUaFS9elGfENG45wANmrwx7mHsmt1+YWCzxjftElg= go.opentelemetry.io/collector/extension/extensiontest v0.118.0/go.mod h1:CqNXzkIOR32D8EUpptpOXhpFkibs3kFlRyNMEgIW8l4= -go.opentelemetry.io/collector/extension/xextension v0.118.1-0.20250130000211-c119b2a55eb4 h1:3uhbiLfV5l6VXg8lDXv2UvHROXBlF0T2qVNKE6RIdr4= -go.opentelemetry.io/collector/extension/xextension v0.118.1-0.20250130000211-c119b2a55eb4/go.mod h1:XyApiptNHX3S2vG3R3yg89gP94QyJ8bcUgasP5XSzdo= -go.opentelemetry.io/collector/featuregate v1.24.1-0.20250130000211-c119b2a55eb4 h1:FQaYa9hV8DlUwV7NCdegMQTXCFvTjfx5kKM2MnjT6YI= -go.opentelemetry.io/collector/featuregate v1.24.1-0.20250130000211-c119b2a55eb4/go.mod h1:3GaXqflNDVwWndNGBJ1+XJFy3Fv/XrFgjMN60N3z7yg= +go.opentelemetry.io/collector/extension/xextension v0.118.1-0.20250123125445-24f88da7b583 h1:ZIJzVVwOgOyse1dYtjIGuWJQ1TW0i7JsS6b4VIiHReY= +go.opentelemetry.io/collector/extension/xextension v0.118.1-0.20250123125445-24f88da7b583/go.mod h1:oNYxFX5QI2BUb01qZz23x4yxCletpiGwoZLB+JcHd2M= +go.opentelemetry.io/collector/featuregate v1.24.1-0.20250123125445-24f88da7b583 h1:DRn96fZ0iGmvvGGyYj0oVrGrZy+N/UmVxi34X8KqIiI= +go.opentelemetry.io/collector/featuregate v1.24.1-0.20250123125445-24f88da7b583/go.mod h1:3GaXqflNDVwWndNGBJ1+XJFy3Fv/XrFgjMN60N3z7yg= go.opentelemetry.io/collector/pdata v1.24.1-0.20250130000211-c119b2a55eb4 h1:jMNwHuYLgYKbBdv4m52lD22sqPI7tnC0uNd/QrjVANA= go.opentelemetry.io/collector/pdata v1.24.1-0.20250130000211-c119b2a55eb4/go.mod h1:Zs7D4RXOGS7E2faGc/jfWdbmhoiHBxA7QbpuJOioxq8= go.opentelemetry.io/collector/pdata/pprofile v0.118.1-0.20250130000211-c119b2a55eb4 h1:g6XWLE+31oLoMfpr6dipOxS3cCWai+IoGX7nZWuf5+E= go.opentelemetry.io/collector/pdata/pprofile v0.118.1-0.20250130000211-c119b2a55eb4/go.mod h1:9J3kPFbrqgkgzQM3mukDr4daWmD08NvRCCymoAPiusI= go.opentelemetry.io/collector/pdata/testdata v0.118.0 h1:5N0w1SX9KIRkwvtkrpzQgXy9eGk3vfNG0ds6mhEPMIM= go.opentelemetry.io/collector/pdata/testdata v0.118.0/go.mod h1:UY+GHV5bOC1BnFburOZ0wiHReJj1XbW12mi2Ogbc5Lw= -go.opentelemetry.io/collector/pipeline v0.118.1-0.20250130000211-c119b2a55eb4 h1:Zv0i8l1HzArbqEyOF3hSJuaIQixLDScry/vhwnD19xw= -go.opentelemetry.io/collector/pipeline v0.118.1-0.20250130000211-c119b2a55eb4/go.mod h1:qE3DmoB05AW0C3lmPvdxZqd/H4po84NPzd5MrqgtL74= +go.opentelemetry.io/collector/pipeline v0.118.1-0.20250123125445-24f88da7b583 h1:RKMWMoxRzs5x11tKmU5MhoCzu+Ms1cIBwe2y/dpDPWo= +go.opentelemetry.io/collector/pipeline v0.118.1-0.20250123125445-24f88da7b583/go.mod h1:qE3DmoB05AW0C3lmPvdxZqd/H4po84NPzd5MrqgtL74= go.opentelemetry.io/collector/receiver v0.118.1-0.20250130000211-c119b2a55eb4 h1:4S7CUbP9SKskntHMvfnkogA5OenIwlnQZUpqY4yBZIk= go.opentelemetry.io/collector/receiver v0.118.1-0.20250130000211-c119b2a55eb4/go.mod h1:huwfVvNKWyMZjmeFOM1YB9HWx3cbJdjzNV9RCwFyl70= go.opentelemetry.io/collector/receiver/receivertest v0.118.1-0.20250130000211-c119b2a55eb4 h1:1EIizwzmdLiW+ah1LatGqmozQgkXZ0TNnRq5OpLTQ40= go.opentelemetry.io/collector/receiver/receivertest v0.118.1-0.20250130000211-c119b2a55eb4/go.mod h1:w8VuS9eBscFCjjAHc1NKGlCZfJFngpAFfZ/xf/k2gHg= -go.opentelemetry.io/collector/receiver/xreceiver v0.118.1-0.20250130000211-c119b2a55eb4 h1:pk8VJJ9GsA4hNtCMxsQrSfsG0It6KjEbshl0ERNMXRs= -go.opentelemetry.io/collector/receiver/xreceiver v0.118.1-0.20250130000211-c119b2a55eb4/go.mod h1:ScFWnHuCXV0Z30cePBXS7Ud9IXfF8pBQujDCRUZolXw= -go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0 h1:r6I7RJCN86bpD/FQwedZ0vSixDpwuWREjW9oRMsmqDc= -go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.54.0/go.mod h1:B9yO6b04uB80CzjedvewuqDhxJxi11s7/GtiGa8bAjI= -go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0 h1:TT4fX+nBOA/+LUkobKGW1ydGcn+G3vRw9+g5HwCphpk= -go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.54.0/go.mod h1:L7UH0GbB0p47T4Rri3uHjbpCFYrVrwc1I25QhNPiGK8= +go.opentelemetry.io/collector/receiver/xreceiver v0.118.1-0.20250123125445-24f88da7b583 h1:J4Pqe4mB7+clH50XeXqfAStYD7a/9Y0mG6C7aFi0k/w= +go.opentelemetry.io/collector/receiver/xreceiver v0.118.1-0.20250123125445-24f88da7b583/go.mod h1:WLPXXIuodY7quBgqCz3OIsPNdBMLDej5nUIbiyyfoUc= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.59.0 h1:rgMkmiGfix9vFJDcDi1PK8WEQP4FLQwLDfhp5ZLpFeE= +go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.59.0/go.mod h1:ijPqXp5P6IRRByFVVg9DY8P5HkxkHE5ARIa+86aXPf4= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.59.0 h1:CV7UdSGJt/Ao6Gp4CXckLxVRRsRgDHoI8XjbL3PDl8s= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.59.0/go.mod h1:FRmFuRJfag1IZ2dPkHnEoSFVgTVPUd2qf5Vi69hLb8I= go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY= go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI= go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ= @@ -257,8 +257,8 @@ google.golang.org/genproto v0.0.0-20241118233622-e639e219e697 h1:ToEetK57OidYuqD google.golang.org/genproto v0.0.0-20241118233622-e639e219e697/go.mod h1:JJrvXBWRZaFMxBufik1a4RpFw4HhgVtBBWQeQgUj2cc= google.golang.org/genproto/googleapis/api v0.0.0-20241209162323-e6fa225c2576 h1:CkkIfIt50+lT6NHAVoRYEyAvQGFM7xEwXUUywFvEb3Q= google.golang.org/genproto/googleapis/api v0.0.0-20241209162323-e6fa225c2576/go.mod h1:1R3kvZ1dtP3+4p4d3G8uJ8rFk/fWlScl38vanWACI08= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250124145028-65684f501c47 h1:91mG8dNTpkC0uChJUQ9zCiRqx3GEEFOWaRZ0mI6Oj2I= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250124145028-65684f501c47/go.mod h1:+2Yz8+CLJbIfL9z73EW45avw8Lmge3xVElCP9zEKi50= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250127172529-29210b9bc287 h1:J1H9f+LEdWAfHcez/4cvaVBox7cOYT+IU6rgqj5x++8= +google.golang.org/genproto/googleapis/rpc v0.0.0-20250127172529-29210b9bc287/go.mod h1:8BS3B93F/U1juMFq9+EDk+qOT5CO1R9IzXxG3PTqiRk= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= diff --git a/receiver/googlecloudpubsubreceiver/internal/handler.go b/receiver/googlecloudpubsubreceiver/internal/handler.go index 58695666ed942..3925dcb3114a2 100644 --- a/receiver/googlecloudpubsubreceiver/internal/handler.go +++ b/receiver/googlecloudpubsubreceiver/internal/handler.go @@ -12,6 +12,12 @@ import ( "sync/atomic" "time" + "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/internal/metadata" + "cloud.google.com/go/pubsub/apiv1/pubsubpb" "go.uber.org/zap" "google.golang.org/grpc/codes" @@ -36,7 +42,8 @@ type StreamHandler struct { streamWaitGroup sync.WaitGroup // wait group for the handler handlerWaitGroup sync.WaitGroup - logger *zap.Logger + settings receiver.Settings + telemetryBuilder *metadata.TelemetryBuilder // time that acknowledge loop waits before acknowledging messages ackBatchWait time.Duration @@ -51,19 +58,21 @@ func (handler *StreamHandler) ack(ackID string) { func NewHandler( ctx context.Context, - logger *zap.Logger, + settings receiver.Settings, + telemetryBuilder *metadata.TelemetryBuilder, client SubscriberClient, clientID string, subscription string, callback func(ctx context.Context, message *pubsubpb.ReceivedMessage) error, ) (*StreamHandler, error) { handler := StreamHandler{ - logger: logger, - client: client, - clientID: clientID, - subscription: subscription, - pushMessage: callback, - ackBatchWait: 10 * time.Second, + settings: settings, + telemetryBuilder: telemetryBuilder, + client: client, + clientID: clientID, + subscription: subscription, + pushMessage: callback, + ackBatchWait: 10 * time.Second, } return &handler, handler.initStream(ctx) } @@ -85,6 +94,11 @@ func (handler *StreamHandler) initStream(ctx context.Context) error { _ = handler.stream.CloseSend() return err } + handler.telemetryBuilder.ReceiverGooglecloudpubsubStreamRestarts.Add(ctx, 1, + metric.WithAttributes( + attribute.String("otelcol.component.kind", "receiver"), + attribute.String("otelcol.component.id", handler.settings.ID.String()), + )) return nil } @@ -102,7 +116,7 @@ func (handler *StreamHandler) recoverableStream(ctx context.Context) { var loopCtx context.Context loopCtx, cancel := context.WithCancel(ctx) - handler.logger.Info("Starting Streaming Pull") + handler.settings.Logger.Debug("Starting Streaming Pull") handler.streamWaitGroup.Add(2) go handler.requestStream(loopCtx, cancel) go handler.responseStream(loopCtx, cancel) @@ -117,13 +131,13 @@ func (handler *StreamHandler) recoverableStream(ctx context.Context) { if handler.isRunning.Load() { err := handler.initStream(ctx) if err != nil { - handler.logger.Error("Failed to recovery stream.") + handler.settings.Logger.Error("Failed to recovery stream.") } } - handler.logger.Warn("End of recovery loop, restarting.") + handler.settings.Logger.Debug("End of recovery loop, restarting.") time.Sleep(streamRecoveryBackoffPeriod) } - handler.logger.Warn("Shutting down recovery loop.") + handler.settings.Logger.Warn("Shutting down recovery loop.") handler.handlerWaitGroup.Done() } @@ -157,15 +171,15 @@ func (handler *StreamHandler) requestStream(ctx context.Context, cancel context. for { if err := handler.acknowledgeMessages(); err != nil { if errors.Is(err, io.EOF) { - handler.logger.Warn("EOF reached") + handler.settings.Logger.Warn("EOF reached") break } - handler.logger.Error(fmt.Sprintf("Failed in acknowledge messages with error %v", err)) + handler.settings.Logger.Error(fmt.Sprintf("Failed in acknowledge messages with error %v", err)) break } select { case <-ctx.Done(): - handler.logger.Warn("requestStream <-ctx.Done()") + handler.settings.Logger.Debug("requestStream <-ctx.Done()") case <-timer.C: timer.Reset(handler.ackBatchWait) } @@ -176,7 +190,7 @@ func (handler *StreamHandler) requestStream(ctx context.Context, cancel context. } } cancel() - handler.logger.Warn("Request Stream loop ended.") + handler.settings.Logger.Debug("Request Stream loop ended.") _ = handler.stream.CloseSend() handler.streamWaitGroup.Done() } @@ -202,18 +216,18 @@ func (handler *StreamHandler) responseStream(ctx context.Context, cancel context case errors.Is(err, io.EOF): activeStreaming = false case !grpcStatus: - handler.logger.Warn("response stream breaking on error", + handler.settings.Logger.Warn("response stream breaking on error", zap.Error(err)) activeStreaming = false case s.Code() == codes.Unavailable: - handler.logger.Info("response stream breaking on gRPC s 'Unavailable'") + handler.settings.Logger.Debug("response stream breaking on gRPC s 'Unavailable'") activeStreaming = false case s.Code() == codes.NotFound: - handler.logger.Error("resource doesn't exist, wait 60 seconds, and restarting stream") + handler.settings.Logger.Error("resource doesn't exist, wait 60 seconds, and restarting stream") time.Sleep(time.Second * 60) activeStreaming = false default: - handler.logger.Warn("response stream breaking on gRPC s "+s.Message(), + handler.settings.Logger.Warn("response stream breaking on gRPC s "+s.Message(), zap.String("s", s.Message()), zap.Error(err)) activeStreaming = false @@ -221,11 +235,11 @@ func (handler *StreamHandler) responseStream(ctx context.Context, cancel context } if errors.Is(ctx.Err(), context.Canceled) { // Canceling the loop, collector is probably stopping - handler.logger.Warn("response stream ctx.Err() == context.Canceled") + handler.settings.Logger.Warn("response stream ctx.Err() == context.Canceled") break } } cancel() - handler.logger.Warn("Response Stream loop ended.") + handler.settings.Logger.Debug("Response Stream loop ended.") handler.streamWaitGroup.Done() } diff --git a/receiver/googlecloudpubsubreceiver/internal/handler_test.go b/receiver/googlecloudpubsubreceiver/internal/handler_test.go index 94b285eb35caf..5642bc22c262b 100644 --- a/receiver/googlecloudpubsubreceiver/internal/handler_test.go +++ b/receiver/googlecloudpubsubreceiver/internal/handler_test.go @@ -12,10 +12,13 @@ import ( "cloud.google.com/go/pubsub/apiv1/pubsubpb" "cloud.google.com/go/pubsub/pstest" "github.com/stretchr/testify/assert" - "go.uber.org/zap/zaptest" "google.golang.org/api/option" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + + "go.opentelemetry.io/collector/receiver/receivertest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/internal/metadata" ) func TestCancelStream(t *testing.T) { @@ -41,10 +44,13 @@ func TestCancelStream(t *testing.T) { }) assert.NoError(t, err) + settings := receivertest.NewNopSettings() + telemetryBuilder, _ := metadata.NewTelemetryBuilder(settings.TelemetrySettings) + client, err := pubsub.NewSubscriberClient(ctx, copts...) assert.NoError(t, err) - handler, err := NewHandler(ctx, zaptest.NewLogger(t), client, "client-id", "projects/my-project/subscriptions/otlp", + handler, err := NewHandler(ctx, settings, telemetryBuilder, client, "client-id", "projects/my-project/subscriptions/otlp", func(context.Context, *pubsubpb.ReceivedMessage) error { return nil }) diff --git a/receiver/googlecloudpubsubreceiver/internal/metadata/generated_telemetry.go b/receiver/googlecloudpubsubreceiver/internal/metadata/generated_telemetry.go new file mode 100644 index 0000000000000..8f1c73f763a3c --- /dev/null +++ b/receiver/googlecloudpubsubreceiver/internal/metadata/generated_telemetry.go @@ -0,0 +1,56 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "errors" + + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/trace" + + "go.opentelemetry.io/collector/component" +) + +func Meter(settings component.TelemetrySettings) metric.Meter { + return settings.MeterProvider.Meter("github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver") +} + +func Tracer(settings component.TelemetrySettings) trace.Tracer { + return settings.TracerProvider.Tracer("github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver") +} + +// TelemetryBuilder provides an interface for components to report telemetry +// as defined in metadata and user config. +type TelemetryBuilder struct { + meter metric.Meter + ReceiverGooglecloudpubsubStreamRestarts metric.Int64Counter +} + +// TelemetryBuilderOption applies changes to default builder. +type TelemetryBuilderOption interface { + apply(*TelemetryBuilder) +} + +type telemetryBuilderOptionFunc func(mb *TelemetryBuilder) + +func (tbof telemetryBuilderOptionFunc) apply(mb *TelemetryBuilder) { + tbof(mb) +} + +// NewTelemetryBuilder provides a struct with methods to update all internal telemetry +// for a component +func NewTelemetryBuilder(settings component.TelemetrySettings, options ...TelemetryBuilderOption) (*TelemetryBuilder, error) { + builder := TelemetryBuilder{} + for _, op := range options { + op.apply(&builder) + } + builder.meter = Meter(settings) + var err, errs error + builder.ReceiverGooglecloudpubsubStreamRestarts, err = builder.meter.Int64Counter( + "otelcol_receiver.googlecloudpubsub.stream_restarts", + metric.WithDescription("Number of times the stream (re)starts due to a Pub/Sub servers connection close"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + return &builder, errs +} diff --git a/receiver/googlecloudpubsubreceiver/internal/metadata/generated_telemetry_test.go b/receiver/googlecloudpubsubreceiver/internal/metadata/generated_telemetry_test.go new file mode 100644 index 0000000000000..8e1906aa90f4f --- /dev/null +++ b/receiver/googlecloudpubsubreceiver/internal/metadata/generated_telemetry_test.go @@ -0,0 +1,74 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/metric" + embeddedmetric "go.opentelemetry.io/otel/metric/embedded" + noopmetric "go.opentelemetry.io/otel/metric/noop" + "go.opentelemetry.io/otel/trace" + embeddedtrace "go.opentelemetry.io/otel/trace/embedded" + nooptrace "go.opentelemetry.io/otel/trace/noop" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" +) + +type mockMeter struct { + noopmetric.Meter + name string +} +type mockMeterProvider struct { + embeddedmetric.MeterProvider +} + +func (m mockMeterProvider) Meter(name string, opts ...metric.MeterOption) metric.Meter { + return mockMeter{name: name} +} + +type mockTracer struct { + nooptrace.Tracer + name string +} + +type mockTracerProvider struct { + embeddedtrace.TracerProvider +} + +func (m mockTracerProvider) Tracer(name string, opts ...trace.TracerOption) trace.Tracer { + return mockTracer{name: name} +} + +func TestProviders(t *testing.T) { + set := component.TelemetrySettings{ + MeterProvider: mockMeterProvider{}, + TracerProvider: mockTracerProvider{}, + } + + meter := Meter(set) + if m, ok := meter.(mockMeter); ok { + require.Equal(t, "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver", m.name) + } else { + require.Fail(t, "returned Meter not mockMeter") + } + + tracer := Tracer(set) + if m, ok := tracer.(mockTracer); ok { + require.Equal(t, "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver", m.name) + } else { + require.Fail(t, "returned Meter not mockTracer") + } +} + +func TestNewTelemetryBuilder(t *testing.T) { + set := componenttest.NewNopTelemetrySettings() + applied := false + _, err := NewTelemetryBuilder(set, telemetryBuilderOptionFunc(func(b *TelemetryBuilder) { + applied = true + })) + require.NoError(t, err) + require.True(t, applied) +} diff --git a/receiver/googlecloudpubsubreceiver/internal/metadatatest/generated_telemetrytest.go b/receiver/googlecloudpubsubreceiver/internal/metadatatest/generated_telemetrytest.go new file mode 100644 index 0000000000000..2b88391ae7105 --- /dev/null +++ b/receiver/googlecloudpubsubreceiver/internal/metadatatest/generated_telemetrytest.go @@ -0,0 +1,64 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadatatest + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/collector/receiver/receivertest" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" +) + +type Telemetry struct { + componenttest.Telemetry +} + +func SetupTelemetry(opts ...componenttest.TelemetryOption) Telemetry { + return Telemetry{Telemetry: componenttest.NewTelemetry(opts...)} +} +func (tt *Telemetry) NewSettings() receiver.Settings { + set := receivertest.NewNopSettings() + set.ID = component.NewID(component.MustNewType("googlecloudpubsub")) + set.TelemetrySettings = tt.NewTelemetrySettings() + return set +} + +func (tt *Telemetry) AssertMetrics(t *testing.T, expected []metricdata.Metrics, opts ...metricdatatest.Option) { + var md metricdata.ResourceMetrics + require.NoError(t, tt.Reader.Collect(context.Background(), &md)) + // ensure all required metrics are present + for _, want := range expected { + got := getMetric(want.Name, md) + metricdatatest.AssertEqual(t, want, got, opts...) + } + + // ensure no additional metrics are emitted + require.Equal(t, len(expected), lenMetrics(md)) +} + +func getMetric(name string, got metricdata.ResourceMetrics) metricdata.Metrics { + for _, sm := range got.ScopeMetrics { + for _, m := range sm.Metrics { + if m.Name == name { + return m + } + } + } + + return metricdata.Metrics{} +} + +func lenMetrics(got metricdata.ResourceMetrics) int { + metricsCount := 0 + for _, sm := range got.ScopeMetrics { + metricsCount += len(sm.Metrics) + } + + return metricsCount +} diff --git a/receiver/googlecloudpubsubreceiver/internal/metadatatest/generated_telemetrytest_test.go b/receiver/googlecloudpubsubreceiver/internal/metadatatest/generated_telemetrytest_test.go new file mode 100644 index 0000000000000..9e0d82d024c5c --- /dev/null +++ b/receiver/googlecloudpubsubreceiver/internal/metadatatest/generated_telemetrytest_test.go @@ -0,0 +1,40 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadatatest + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/internal/metadata" +) + +func TestSetupTelemetry(t *testing.T) { + testTel := SetupTelemetry() + tb, err := metadata.NewTelemetryBuilder( + testTel.NewTelemetrySettings(), + ) + require.NoError(t, err) + require.NotNil(t, tb) + tb.ReceiverGooglecloudpubsubStreamRestarts.Add(context.Background(), 1) + + testTel.AssertMetrics(t, []metricdata.Metrics{ + { + Name: "otelcol_receiver.googlecloudpubsub.stream_restarts", + Description: "Number of times the stream (re)starts due to a Pub/Sub servers connection close", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + {}, + }, + }, + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreValue()) + require.NoError(t, testTel.Shutdown(context.Background())) +} diff --git a/receiver/googlecloudpubsubreceiver/metadata.yaml b/receiver/googlecloudpubsubreceiver/metadata.yaml index 28d8f8e03fbd0..f5fbc6ac1c876 100644 --- a/receiver/googlecloudpubsubreceiver/metadata.yaml +++ b/receiver/googlecloudpubsubreceiver/metadata.yaml @@ -8,6 +8,20 @@ status: codeowners: active: [alexvanboxel] +telemetry: + metrics: + receiver.googlecloudpubsub.stream_restarts: + enabled: true + description: Number of times the stream (re)starts due to a Pub/Sub servers connection close + unit: "1" + sum: + value_type: int + monotonic: true + extended_documentation: | + The receiver uses the Google Cloud Pub/Sub StreamingPull API and keeps a open connection. The Pub/Sub servers + recurrently close the connection after a time period to avoid a long-running sticky connection. This metric + counts the number of the resets that occurred during the lifetime of the container. + tests: config: project: my-project diff --git a/receiver/googlecloudpubsubreceiver/receiver.go b/receiver/googlecloudpubsubreceiver/receiver.go index 9fb36f6b1d6d9..5f316abde3a3c 100644 --- a/receiver/googlecloudpubsubreceiver/receiver.go +++ b/receiver/googlecloudpubsubreceiver/receiver.go @@ -14,6 +14,10 @@ import ( "sync" "time" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/internal/metadata" + + "go.opentelemetry.io/collector/receiver" + "cloud.google.com/go/pubsub/apiv1/pubsubpb" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" @@ -22,7 +26,6 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/receiver/receiverhelper" - "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/internal" @@ -30,7 +33,7 @@ import ( // https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#streamingpullrequest type pubsubReceiver struct { - logger *zap.Logger + settings receiver.Settings obsrecv *receiverhelper.ObsReport tracesConsumer consumer.Traces metricsConsumer consumer.Metrics @@ -43,6 +46,7 @@ type pubsubReceiver struct { logsUnmarshaler plog.Unmarshaler handler *internal.StreamHandler startOnce sync.Once + telemetryBuilder *metadata.TelemetryBuilder } type buildInEncoding int @@ -118,6 +122,11 @@ func (receiver *pubsubReceiver) Start(ctx context.Context, host component.Host) return } receiver.client = client + receiver.telemetryBuilder, err = metadata.NewTelemetryBuilder(receiver.settings.TelemetrySettings) + if err != nil { + startErr = fmt.Errorf("failed to create telemetry builder: %w", err) + return + } err = createHandlerFn(ctx) if err != nil { @@ -194,9 +203,9 @@ func (receiver *pubsubReceiver) setMarshallerFromEncodingID(encodingID buildInEn func (receiver *pubsubReceiver) Shutdown(_ context.Context) error { if receiver.handler != nil { - receiver.logger.Info("Stopping Google Pubsub receiver") + receiver.settings.Logger.Info("Stopping Google Pubsub receiver") receiver.handler.CancelNow() - receiver.logger.Info("Stopped Google Pubsub receiver") + receiver.settings.Logger.Info("Stopped Google Pubsub receiver") receiver.handler = nil } if receiver.client == nil { @@ -370,7 +379,8 @@ func (receiver *pubsubReceiver) createMultiplexingReceiverHandler(ctx context.Co var err error receiver.handler, err = internal.NewHandler( ctx, - receiver.logger, + receiver.settings, + receiver.telemetryBuilder, receiver.client, receiver.config.ClientID, receiver.config.Subscription, @@ -432,7 +442,8 @@ func (receiver *pubsubReceiver) createReceiverHandler(ctx context.Context) error receiver.handler, err = internal.NewHandler( ctx, - receiver.logger, + receiver.settings, + receiver.telemetryBuilder, receiver.client, receiver.config.ClientID, receiver.config.Subscription, diff --git a/receiver/googlecloudpubsubreceiver/receiver_test.go b/receiver/googlecloudpubsubreceiver/receiver_test.go index 72eca61b13158..5ea5c252daf6b 100644 --- a/receiver/googlecloudpubsubreceiver/receiver_test.go +++ b/receiver/googlecloudpubsubreceiver/receiver_test.go @@ -12,14 +12,13 @@ import ( "cloud.google.com/go/pubsub/pstest" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/receiver/receiverhelper" "go.opentelemetry.io/collector/receiver/receivertest" - "go.uber.org/zap" - "go.uber.org/zap/zaptest/observer" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/internal/metadata" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/googlecloudpubsubreceiver/testdata" @@ -27,9 +26,9 @@ import ( func createBaseReceiver() (*pstest.Server, *pubsubReceiver) { srv := pstest.NewServer() - core, _ := observer.New(zap.WarnLevel) + settings := receivertest.NewNopSettings() return srv, &pubsubReceiver{ - logger: zap.New(core), + settings: settings, userAgent: "test-user-agent", config: &Config{ @@ -100,8 +99,7 @@ func TestReceiver(t *testing.T) { }) assert.NoError(t, err) - core, _ := observer.New(zap.WarnLevel) - params := receivertest.NewNopSettings() + settings := receivertest.NewNopSettings() traceSink := new(consumertest.TracesSink) metricSink := new(consumertest.MetricsSink) logSink := new(consumertest.LogsSink) @@ -110,12 +108,12 @@ func TestReceiver(t *testing.T) { ReceiverID: component.NewID(metadata.Type), Transport: reportTransport, LongLivedCtx: false, - ReceiverCreateSettings: params, + ReceiverCreateSettings: settings, }) require.NoError(t, err) receiver := &pubsubReceiver{ - logger: zap.New(core), + settings: settings, obsrecv: obsrecv, userAgent: "test-user-agent",