Skip to content

Commit

Permalink
Fix tracing of async cursors for psycopg
Browse files Browse the repository at this point in the history
This copies the traced_execution of AsyncCursorTracer except
query_method is awaited within the span.

Fixes #2486
  • Loading branch information
ibash committed Mar 4, 2025
1 parent 81eaea5 commit dbd1d23
Showing 1 changed file with 42 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@
from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.psycopg.package import _instruments
from opentelemetry.instrumentation.psycopg.version import __version__
from opentelemetry.trace import TracerProvider
from opentelemetry.trace import SpanKind, TracerProvider

_logger = logging.getLogger(__name__)
_OTEL_CURSOR_FACTORY_KEY = "_otel_orig_cursor_factory"
Expand Down Expand Up @@ -381,6 +381,46 @@ def callproc(self, *args: Any, **kwargs: Any):
return TracedCursorFactory


class AsyncCursorTracer(CursorTracer):
async def traced_execution(
self,
cursor: dbapi.CursorT,
query_method: Callable[..., Any],
*args: tuple[Any, ...],
**kwargs: dict[Any, Any],
):
name = self.get_operation_name(cursor, args)
if not name:
name = (
self._db_api_integration.database
if self._db_api_integration.database
else self._db_api_integration.name
)

with self._db_api_integration._tracer.start_as_current_span(
name, kind=SpanKind.CLIENT
) as span:
if span.is_recording():
if args and self._commenter_enabled:
if self._enable_attribute_commenter:
# sqlcomment is added to executed query and db.statement span attribute
args = self._update_args_with_added_sql_comment(
args, cursor
)
self._populate_span(span, cursor, *args)
else:
# sqlcomment is only added to executed query
# so db.statement is set before add_sql_comment
self._populate_span(span, cursor, *args)
args = self._update_args_with_added_sql_comment(
args, cursor
)
else:
# no sqlcomment anywhere
self._populate_span(span, cursor, *args)
return await query_method(*args, **kwargs)


def _new_cursor_async_factory(
db_api: DatabaseApiAsyncIntegration | None = None,
base_factory: type[psycopg.AsyncCursor] | None = None,
Expand All @@ -395,7 +435,7 @@ def _new_cursor_async_factory(
tracer_provider=tracer_provider,
)
base_factory = base_factory or psycopg.AsyncCursor
_cursor_tracer = CursorTracer(db_api)
_cursor_tracer = AsyncCursorTracer(db_api)

class TracedCursorAsyncFactory(base_factory):
async def execute(self, *args: Any, **kwargs: Any):
Expand Down

0 comments on commit dbd1d23

Please sign in to comment.