From dbd1d23e96d8cd88fabc279a4014d3e9a8405c0e Mon Sep 17 00:00:00 2001 From: Islam Sharabash Date: Mon, 3 Mar 2025 17:50:57 -0800 Subject: [PATCH] Fix tracing of async cursors for psycopg This copies the traced_execution of AsyncCursorTracer except query_method is awaited within the span. Fixes https://github.com/open-telemetry/opentelemetry-python-contrib/issues/2486 --- .../instrumentation/psycopg/__init__.py | 44 ++++++++++++++++++- 1 file changed, 42 insertions(+), 2 deletions(-) diff --git a/instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/__init__.py b/instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/__init__.py index 38a6264c6d..19f3576c32 100644 --- a/instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-psycopg/src/opentelemetry/instrumentation/psycopg/__init__.py @@ -149,7 +149,7 @@ from opentelemetry.instrumentation.instrumentor import BaseInstrumentor from opentelemetry.instrumentation.psycopg.package import _instruments from opentelemetry.instrumentation.psycopg.version import __version__ -from opentelemetry.trace import TracerProvider +from opentelemetry.trace import SpanKind, TracerProvider _logger = logging.getLogger(__name__) _OTEL_CURSOR_FACTORY_KEY = "_otel_orig_cursor_factory" @@ -381,6 +381,46 @@ def callproc(self, *args: Any, **kwargs: Any): return TracedCursorFactory +class AsyncCursorTracer(CursorTracer): + async def traced_execution( + self, + cursor: dbapi.CursorT, + query_method: Callable[..., Any], + *args: tuple[Any, ...], + **kwargs: dict[Any, Any], + ): + name = self.get_operation_name(cursor, args) + if not name: + name = ( + self._db_api_integration.database + if self._db_api_integration.database + else self._db_api_integration.name + ) + + with self._db_api_integration._tracer.start_as_current_span( + name, kind=SpanKind.CLIENT + ) as span: + if span.is_recording(): + if args and self._commenter_enabled: + if self._enable_attribute_commenter: + # sqlcomment is added to executed query and db.statement span attribute + args = self._update_args_with_added_sql_comment( + args, cursor + ) + self._populate_span(span, cursor, *args) + else: + # sqlcomment is only added to executed query + # so db.statement is set before add_sql_comment + self._populate_span(span, cursor, *args) + args = self._update_args_with_added_sql_comment( + args, cursor + ) + else: + # no sqlcomment anywhere + self._populate_span(span, cursor, *args) + return await query_method(*args, **kwargs) + + def _new_cursor_async_factory( db_api: DatabaseApiAsyncIntegration | None = None, base_factory: type[psycopg.AsyncCursor] | None = None, @@ -395,7 +435,7 @@ def _new_cursor_async_factory( tracer_provider=tracer_provider, ) base_factory = base_factory or psycopg.AsyncCursor - _cursor_tracer = CursorTracer(db_api) + _cursor_tracer = AsyncCursorTracer(db_api) class TracedCursorAsyncFactory(base_factory): async def execute(self, *args: Any, **kwargs: Any):