Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add Attempt, Operation and GFE Metrics #1302

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 23 additions & 20 deletions google/cloud/spanner_v1/_opentelemetry_tracing.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
except ImportError:
HAS_OPENTELEMETRY_INSTALLED = False

from google.cloud.spanner_v1.metrics.metrics_capture import MetricsCapture

TRACER_NAME = "cloud.google.com/python/spanner"
TRACER_VERSION = gapic_version.__version__
extended_tracing_globally_disabled = (
Expand Down Expand Up @@ -107,26 +109,27 @@ def trace_call(name, session=None, extra_attributes=None, observability_options=
with tracer.start_as_current_span(
name, kind=trace.SpanKind.CLIENT, attributes=attributes
) as span:
try:
yield span
except Exception as error:
span.set_status(Status(StatusCode.ERROR, str(error)))
# OpenTelemetry-Python imposes invoking span.record_exception on __exit__
# on any exception. We should file a bug later on with them to only
# invoke .record_exception if not already invoked, hence we should not
# invoke .record_exception on our own else we shall have 2 exceptions.
raise
else:
# All spans still have set_status available even if for example
# NonRecordingSpan doesn't have "_status".
absent_span_status = getattr(span, "_status", None) is None
if absent_span_status or span._status.status_code == StatusCode.UNSET:
# OpenTelemetry-Python only allows a status change
# if the current code is UNSET or ERROR. At the end
# of the generator's consumption, only set it to OK
# it wasn't previously set otherwise.
# https://github.com/googleapis/python-spanner/issues/1246
span.set_status(Status(StatusCode.OK))
with MetricsCapture():
try:
yield span
except Exception as error:
span.set_status(Status(StatusCode.ERROR, str(error)))
# OpenTelemetry-Python imposes invoking span.record_exception on __exit__
# on any exception. We should file a bug later on with them to only
# invoke .record_exception if not already invoked, hence we should not
# invoke .record_exception on our own else we shall have 2 exceptions.
raise
else:
# All spans still have set_status available even if for example
# NonRecordingSpan doesn't have "_status".
absent_span_status = getattr(span, "_status", None) is None
if absent_span_status or span._status.status_code == StatusCode.UNSET:
# OpenTelemetry-Python only allows a status change
# if the current code is UNSET or ERROR. At the end
# of the generator's consumption, only set it to OK
# it wasn't previously set otherwise.
# https://github.com/googleapis/python-spanner/issues/1246
span.set_status(Status(StatusCode.OK))


def get_current_span():
Expand Down
5 changes: 3 additions & 2 deletions google/cloud/spanner_v1/batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from google.cloud.spanner_v1._helpers import _retry_on_aborted_exception
from google.cloud.spanner_v1._helpers import _check_rst_stream_error
from google.api_core.exceptions import InternalServerError
from google.cloud.spanner_v1.metrics.metrics_capture import MetricsCapture
import time

DEFAULT_RETRY_TIMEOUT_SECS = 30
Expand Down Expand Up @@ -226,7 +227,7 @@ def commit(
self._session,
trace_attributes,
observability_options=observability_options,
):
), MetricsCapture():
method = functools.partial(
api.commit,
request=request,
Expand Down Expand Up @@ -348,7 +349,7 @@ def batch_write(self, request_options=None, exclude_txn_from_change_streams=Fals
self._session,
trace_attributes,
observability_options=observability_options,
):
), MetricsCapture():
method = functools.partial(
api.batch_write,
request=request,
Expand Down
44 changes: 44 additions & 0 deletions google/cloud/spanner_v1/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,30 @@
from google.cloud.spanner_v1._helpers import _merge_query_options
from google.cloud.spanner_v1._helpers import _metadata_with_prefix
from google.cloud.spanner_v1.instance import Instance
from google.cloud.spanner_v1.metrics.constants import (
ENABLE_SPANNER_METRICS_ENV_VAR,
METRIC_EXPORT_INTERVAL_MS,
)
from google.cloud.spanner_v1.metrics.spanner_metrics_tracer_factory import (
SpannerMetricsTracerFactory,
)
from google.cloud.spanner_v1.metrics.metrics_exporter import (
CloudMonitoringMetricsExporter,
)

try:
from opentelemetry import metrics
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader

HAS_GOOGLE_CLOUD_MONITORING_INSTALLED = True
except ImportError: # pragma: NO COVER
HAS_GOOGLE_CLOUD_MONITORING_INSTALLED = False


_CLIENT_INFO = client_info.ClientInfo(client_library_version=__version__)
EMULATOR_ENV_VAR = "SPANNER_EMULATOR_HOST"
ENABLE_BUILTIN_METRICS_ENV_VAR = "SPANNER_ENABLE_BUILTIN_METRICS"
_EMULATOR_HOST_HTTP_SCHEME = (
"%s contains a http scheme. When used with a scheme it may cause gRPC's "
"DNS resolver to endlessly attempt to resolve. %s is intended to be used "
Expand All @@ -73,6 +94,10 @@ def _get_spanner_optimizer_statistics_package():
return os.getenv(OPTIMIZER_STATISITCS_PACKAGE_ENV_VAR, "")


def _get_spanner_enable_builtin_metrics():
return os.getenv(ENABLE_SPANNER_METRICS_ENV_VAR) == "true"


class Client(ClientWithProject):
"""Client for interacting with Cloud Spanner API.

Expand Down Expand Up @@ -195,6 +220,25 @@ def __init__(
"http://" in self._emulator_host or "https://" in self._emulator_host
):
warnings.warn(_EMULATOR_HOST_HTTP_SCHEME)
# Check flag to enable Spanner builtin metrics
if (
_get_spanner_enable_builtin_metrics()
and HAS_GOOGLE_CLOUD_MONITORING_INSTALLED
):
meter_provider = metrics.NoOpMeterProvider()
if not _get_spanner_emulator_host():
meter_provider = MeterProvider(
metric_readers=[
PeriodicExportingMetricReader(
CloudMonitoringMetricsExporter(),
export_interval_millis=METRIC_EXPORT_INTERVAL_MS,
)
]
)
metrics.set_meter_provider(meter_provider)
SpannerMetricsTracerFactory()
else:
SpannerMetricsTracerFactory(enabled=False)

self._route_to_leader_enabled = route_to_leader_enabled
self._directed_read_options = directed_read_options
Expand Down
15 changes: 8 additions & 7 deletions google/cloud/spanner_v1/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
get_current_span,
trace_call,
)
from google.cloud.spanner_v1.metrics.metrics_capture import MetricsCapture


SPANNER_DATA_SCOPE = "https://www.googleapis.com/auth/spanner.data"
Expand Down Expand Up @@ -702,7 +703,7 @@ def execute_pdml():
with trace_call(
"CloudSpanner.Database.execute_partitioned_pdml",
observability_options=self.observability_options,
) as span:
) as span, MetricsCapture():
with SessionCheckout(self._pool) as session:
add_span_event(span, "Starting BeginTransaction")
txn = api.begin_transaction(
Expand Down Expand Up @@ -897,7 +898,7 @@ def run_in_transaction(self, func, *args, **kw):
with trace_call(
"CloudSpanner.Database.run_in_transaction",
observability_options=observability_options,
):
), MetricsCapture():
# Sanity check: Is there a transaction already running?
# If there is, then raise a red flag. Otherwise, mark that this one
# is running.
Expand Down Expand Up @@ -1489,7 +1490,7 @@ def generate_read_batches(
f"CloudSpanner.{type(self).__name__}.generate_read_batches",
extra_attributes=dict(table=table, columns=columns),
observability_options=self.observability_options,
):
), MetricsCapture():
partitions = self._get_snapshot().partition_read(
table=table,
columns=columns,
Expand Down Expand Up @@ -1540,7 +1541,7 @@ def process_read_batch(
with trace_call(
f"CloudSpanner.{type(self).__name__}.process_read_batch",
observability_options=observability_options,
):
), MetricsCapture():
kwargs = copy.deepcopy(batch["read"])
keyset_dict = kwargs.pop("keyset")
kwargs["keyset"] = KeySet._from_dict(keyset_dict)
Expand Down Expand Up @@ -1625,7 +1626,7 @@ def generate_query_batches(
f"CloudSpanner.{type(self).__name__}.generate_query_batches",
extra_attributes=dict(sql=sql),
observability_options=self.observability_options,
):
), MetricsCapture():
partitions = self._get_snapshot().partition_query(
sql=sql,
params=params,
Expand Down Expand Up @@ -1681,7 +1682,7 @@ def process_query_batch(
with trace_call(
f"CloudSpanner.{type(self).__name__}.process_query_batch",
observability_options=self.observability_options,
):
), MetricsCapture():
return self._get_snapshot().execute_sql(
partition=batch["partition"],
**batch["query"],
Expand Down Expand Up @@ -1746,7 +1747,7 @@ def run_partitioned_query(
f"CloudSpanner.${type(self).__name__}.run_partitioned_query",
extra_attributes=dict(sql=sql),
observability_options=self.observability_options,
):
), MetricsCapture():
partitions = list(
self.generate_query_batches(
sql,
Expand Down
3 changes: 2 additions & 1 deletion google/cloud/spanner_v1/merged_result_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from threading import Lock, Event

from google.cloud.spanner_v1._opentelemetry_tracing import trace_call
from google.cloud.spanner_v1.metrics.metrics_capture import MetricsCapture

if TYPE_CHECKING:
from google.cloud.spanner_v1.database import BatchSnapshot
Expand Down Expand Up @@ -45,7 +46,7 @@ def run(self):
with trace_call(
"CloudSpanner.PartitionExecutor.run",
observability_options=observability_options,
):
), MetricsCapture():
self.__run()

def __run(self):
Expand Down
10 changes: 9 additions & 1 deletion google/cloud/spanner_v1/metrics/constants.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2025 Google LLC
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -15,6 +15,12 @@
BUILT_IN_METRICS_METER_NAME = "gax-python"
NATIVE_METRICS_PREFIX = "spanner.googleapis.com/internal/client"
SPANNER_RESOURCE_TYPE = "spanner_instance_client"
SPANNER_SERVICE_NAME = "spanner-python"
GOOGLE_CLOUD_RESOURCE_KEY = "google-cloud-resource-prefix"
GOOGLE_CLOUD_REGION_KEY = "cloud.region"
GOOGLE_CLOUD_REGION_GLOBAL = "global"
SPANNER_METHOD_PREFIX = "/google.spanner.v1."
ENABLE_SPANNER_METRICS_ENV_VAR = "SPANNER_ENABLE_BUILTIN_METRICS"

# Monitored resource labels
MONITORED_RES_LABEL_KEY_PROJECT = "project_id"
Expand Down Expand Up @@ -61,3 +67,5 @@
METRIC_NAME_OPERATION_COUNT,
METRIC_NAME_ATTEMPT_COUNT,
]

METRIC_EXPORT_INTERVAL_MS = 60000 # 1 Minute
75 changes: 75 additions & 0 deletions google/cloud/spanner_v1/metrics/metrics_capture.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Copyright 2025 Google LLC All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
This module provides functionality for capturing metrics in Cloud Spanner operations.

It includes a context manager class, MetricsCapture, which automatically handles the
start and completion of metrics tracing for a given operation. This ensures that metrics
are consistently recorded for Cloud Spanner operations, facilitating observability and
performance monitoring.
"""

from .spanner_metrics_tracer_factory import SpannerMetricsTracerFactory


class MetricsCapture:
"""Context manager for capturing metrics in Cloud Spanner operations.

This class provides a context manager interface to automatically handle
the start and completion of metrics tracing for a given operation.
"""

def __enter__(self):
"""Enter the runtime context related to this object.

This method initializes a new metrics tracer for the operation and
records the start of the operation.

Returns:
MetricsCapture: The instance of the context manager.
"""
# Short circuit out if metrics are disabled
factory = SpannerMetricsTracerFactory()
if not factory.enabled:
return self

# Define a new metrics tracer for the new operation
SpannerMetricsTracerFactory.current_metrics_tracer = (
factory.create_metrics_tracer()
)
if SpannerMetricsTracerFactory.current_metrics_tracer:
SpannerMetricsTracerFactory.current_metrics_tracer.record_operation_start()
return self

def __exit__(self, exc_type, exc_value, traceback):
"""Exit the runtime context related to this object.

This method records the completion of the operation. If an exception
occurred, it will be propagated after the metrics are recorded.

Args:
exc_type (Type[BaseException]): The exception type.
exc_value (BaseException): The exception value.
traceback (TracebackType): The traceback object.

Returns:
bool: False to propagate the exception if any occurred.
"""
# Short circuit out if metrics are disable
if not SpannerMetricsTracerFactory().enabled:
return False

if SpannerMetricsTracerFactory.current_metrics_tracer:
SpannerMetricsTracerFactory.current_metrics_tracer.record_operation_completion()
return False # Propagate the exception if any
Loading
Loading