Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -203,5 +203,6 @@ tests/appsec/iast/fixtures/taint_sinks/not_exists.txt
*.debug
*.dSYM/

# Rust build artifacts
# Rust build artifacts and dependencies
src/native/target*
src/native/Cargo.lock
46 changes: 40 additions & 6 deletions ddtrace/profiling/collector/_lock.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from __future__ import annotations

import _thread
import abc
import os.path
import sys
import time
Expand Down Expand Up @@ -52,6 +51,7 @@ class _ProfiledLock:
"init_location",
"acquired_time",
"name",
"is_internal",
)

def __init__(
Expand All @@ -60,6 +60,7 @@ def __init__(
tracer: Optional[Tracer],
max_nframes: int,
capture_sampler: collector.CaptureSampler,
is_internal: bool = False,
) -> None:
self.__wrapped__: Any = wrapped
self.tracer: Optional[Tracer] = tracer
Expand All @@ -71,6 +72,9 @@ def __init__(
self.init_location: str = f"{os.path.basename(code.co_filename)}:{frame.f_lineno}"
self.acquired_time: int = 0
self.name: Optional[str] = None
# If True, this lock is internal to another sync primitive (e.g., Lock inside Semaphore)
# and should not generate profile samples to avoid double-counting
self.is_internal: bool = is_internal

### DUNDER methods ###

Expand Down Expand Up @@ -161,6 +165,11 @@ def _flush_sample(self, start: int, end: int, is_acquire: bool) -> None:
end: End timestamp in nanoseconds
is_acquire: True for acquire operations, False for release operations
"""
# Skip profiling for internal locks (e.g., Lock inside Semaphore/Condition)
# to avoid double-counting when multiple collectors are active
if self.is_internal:
return

handle: ddup.SampleHandle = ddup.SampleHandle()

handle.push_monotonic_ns(end)
Expand Down Expand Up @@ -262,6 +271,8 @@ class LockCollector(collector.CaptureSamplerCollector):
"""Record lock usage."""

PROFILED_LOCK_CLASS: Type[Any]
PATCH_MODULE: Any # e.g., threading module
PATCH_ATTR_NAME: str # e.g., "Lock", "RLock", "Semaphore"

def __init__(
self,
Expand All @@ -275,11 +286,11 @@ def __init__(
self.tracer: Optional[Tracer] = tracer
self._original_lock: Any = None

@abc.abstractmethod
def _get_patch_target(self) -> Callable[..., Any]: ...
def _get_patch_target(self) -> Callable[..., Any]:
return getattr(self.PATCH_MODULE, self.PATCH_ATTR_NAME)

@abc.abstractmethod
def _set_patch_target(self, value: Any) -> None: ...
def _set_patch_target(self, value: Any) -> None:
setattr(self.PATCH_MODULE, self.PATCH_ATTR_NAME, value)

def _start_service(self) -> None:
"""Start collecting lock usage."""
Expand All @@ -297,12 +308,35 @@ def patch(self) -> None:
original_lock: Any = self._original_lock # Capture non-None value

def _profiled_allocate_lock(*args: Any, **kwargs: Any) -> _ProfiledLock:
"""Simple wrapper that returns profiled locks."""
"""Simple wrapper that returns profiled locks.

Detects if the lock is being created from within threading.py stdlib
(i.e., internal to Semaphore/Condition) to avoid double-counting.
"""
import threading as threading_module

# Check if caller is from threading.py (internal lock)
is_internal: bool = False
try:
# Frame 0: _profiled_allocate_lock
# Frame 1: _LockAllocatorWrapper.__call__
# Frame 2: actual caller (threading.Lock() call site)
caller_filename = sys._getframe(2).f_code.co_filename
threading_module_file = threading_module.__file__
if threading_module_file and caller_filename:
# Normalize paths to handle symlinks and different path formats
caller_filename_normalized = os.path.normpath(os.path.realpath(caller_filename))
threading_file_normalized = os.path.normpath(os.path.realpath(threading_module_file))
is_internal = caller_filename_normalized == threading_file_normalized
except (ValueError, AttributeError, OSError):
pass

return self.PROFILED_LOCK_CLASS(
wrapped=original_lock(*args, **kwargs),
tracer=self.tracer,
max_nframes=self.nframes,
capture_sampler=self._capture_sampler,
is_internal=is_internal,
)

self._set_patch_target(_LockAllocatorWrapper(_profiled_allocate_lock))
Expand Down
31 changes: 14 additions & 17 deletions ddtrace/profiling/collector/threading.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from __future__ import absolute_import

import threading
import typing

from ddtrace.internal._unpatched import _threading as ddtrace_threading
from ddtrace.internal.datadog.profiling import stack_v2
Expand All @@ -18,34 +17,32 @@ class _ProfiledThreadingRLock(_lock._ProfiledLock):
pass


class _ProfiledThreadingSemaphore(_lock._ProfiledLock):
pass


class ThreadingLockCollector(_lock.LockCollector):
"""Record threading.Lock usage."""

PROFILED_LOCK_CLASS = _ProfiledThreadingLock

def _get_patch_target(self) -> typing.Type[threading.Lock]:
return threading.Lock

def _set_patch_target(
self,
value: typing.Any,
) -> None:
threading.Lock = value
PATCH_MODULE = threading
PATCH_ATTR_NAME = "Lock"


class ThreadingRLockCollector(_lock.LockCollector):
"""Record threading.RLock usage."""

PROFILED_LOCK_CLASS = _ProfiledThreadingRLock
PATCH_MODULE = threading
PATCH_ATTR_NAME = "RLock"


def _get_patch_target(self) -> typing.Type[threading.RLock]:
return threading.RLock
class ThreadingSemaphoreCollector(_lock.LockCollector):
"""Record threading.Semaphore usage."""

def _set_patch_target(
self,
value: typing.Any,
) -> None:
threading.RLock = value
PROFILED_LOCK_CLASS = _ProfiledThreadingSemaphore
PATCH_MODULE = threading
PATCH_ATTR_NAME = "Semaphore"


# Also patch threading.Thread so echion can track thread lifetimes
Expand Down
1 change: 1 addition & 0 deletions ddtrace/profiling/profiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,7 @@ def start_collector(collector_class: Type) -> None:
self._collectors_on_import = [
("threading", lambda _: start_collector(threading.ThreadingLockCollector)),
("threading", lambda _: start_collector(threading.ThreadingRLockCollector)),
("threading", lambda _: start_collector(threading.ThreadingSemaphoreCollector)),
("asyncio", lambda _: start_collector(asyncio.AsyncioLockCollector)),
]

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
features:
- |
profiling: Add support for ``threading.Semaphore`` locking type profiling in Python.
The Lock profiler now detects and marks "internal" Lock objects, i.e. those that are part of implementation of higher-level locking types.
One example of such higher-level primitive is ``threading.Semaphore``, which is implemented with ``threading.Condition``, which itself uses ``threading.Lock`` internally.
Marking a locks as internal will prevent it from being logged, which means the sample will only be counted once.
Loading
Loading