Skip to content

Commit e4ffe7b

Browse files
add support for profiling threading.Semaphore objects in Lock profiler
1 parent 441fc1c commit e4ffe7b

File tree

6 files changed

+250
-71
lines changed

6 files changed

+250
-71
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -201,3 +201,5 @@ tests/appsec/iast/fixtures/taint_sinks/not_exists.txt
201201

202202
# Rust build artifacts
203203
src/native/target*
204+
205+
src/native/Cargo.lock

ddtrace/profiling/collector/_lock.py

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ class _ProfiledLock:
5252
"init_location",
5353
"acquired_time",
5454
"name",
55+
"is_internal",
5556
)
5657

5758
def __init__(
@@ -60,6 +61,7 @@ def __init__(
6061
tracer: Optional[Tracer],
6162
max_nframes: int,
6263
capture_sampler: collector.CaptureSampler,
64+
is_internal: bool = False,
6365
) -> None:
6466
self.__wrapped__: Any = wrapped
6567
self.tracer: Optional[Tracer] = tracer
@@ -71,6 +73,9 @@ def __init__(
7173
self.init_location: str = f"{os.path.basename(code.co_filename)}:{frame.f_lineno}"
7274
self.acquired_time: int = 0
7375
self.name: Optional[str] = None
76+
# If True, this lock is internal to another sync primitive (e.g., Lock inside Semaphore)
77+
# and should not generate profile samples to avoid double-counting
78+
self.is_internal: bool = is_internal
7479

7580
### DUNDER methods ###
7681

@@ -161,6 +166,11 @@ def _flush_sample(self, start: int, end: int, is_acquire: bool) -> None:
161166
end: End timestamp in nanoseconds
162167
is_acquire: True for acquire operations, False for release operations
163168
"""
169+
# Skip profiling for internal locks (e.g., Lock inside Semaphore/Condition)
170+
# to avoid double-counting when multiple collectors are active
171+
if self.is_internal:
172+
return
173+
164174
handle: ddup.SampleHandle = ddup.SampleHandle()
165175

166176
handle.push_monotonic_ns(end)
@@ -262,6 +272,8 @@ class LockCollector(collector.CaptureSamplerCollector):
262272
"""Record lock usage."""
263273

264274
PROFILED_LOCK_CLASS: Type[Any]
275+
PATCH_MODULE: Any # e.g., threading module
276+
PATCH_ATTR_NAME: str # e.g., "Lock", "RLock", "Semaphore"
265277

266278
def __init__(
267279
self,
@@ -275,11 +287,11 @@ def __init__(
275287
self.tracer: Optional[Tracer] = tracer
276288
self._original_lock: Any = None
277289

278-
@abc.abstractmethod
279-
def _get_patch_target(self) -> Callable[..., Any]: ...
290+
def _get_patch_target(self) -> Callable[..., Any]:
291+
return getattr(self.PATCH_MODULE, self.PATCH_ATTR_NAME)
280292

281-
@abc.abstractmethod
282-
def _set_patch_target(self, value: Any) -> None: ...
293+
def _set_patch_target(self, value: Any) -> None:
294+
setattr(self.PATCH_MODULE, self.PATCH_ATTR_NAME, value)
283295

284296
def _start_service(self) -> None:
285297
"""Start collecting lock usage."""
@@ -297,12 +309,30 @@ def patch(self) -> None:
297309
original_lock: Any = self._original_lock # Capture non-None value
298310

299311
def _profiled_allocate_lock(*args: Any, **kwargs: Any) -> _ProfiledLock:
300-
"""Simple wrapper that returns profiled locks."""
312+
"""Simple wrapper that returns profiled locks.
313+
314+
Detects if the lock is being created from within threading.py stdlib
315+
(i.e., internal to Semaphore/Condition) to avoid double-counting.
316+
"""
317+
import threading as threading_module
318+
319+
# Check if caller is from threading.py (internal lock)
320+
is_internal: bool = False
321+
try:
322+
# Frame 0: _profiled_allocate_lock
323+
# Frame 1: _LockAllocatorWrapper.__call__
324+
# Frame 2: actual caller (threading.Lock() call site)
325+
caller_filename = sys._getframe(2).f_code.co_filename
326+
is_internal = bool(threading_module.__file__) and caller_filename == threading_module.__file__
327+
except (ValueError, AttributeError):
328+
pass
329+
301330
return self.PROFILED_LOCK_CLASS(
302331
wrapped=original_lock(*args, **kwargs),
303332
tracer=self.tracer,
304333
max_nframes=self.nframes,
305334
capture_sampler=self._capture_sampler,
335+
is_internal=is_internal,
306336
)
307337

308338
self._set_patch_target(_LockAllocatorWrapper(_profiled_allocate_lock))

ddtrace/profiling/collector/threading.py

Lines changed: 14 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,34 +18,32 @@ class _ProfiledThreadingRLock(_lock._ProfiledLock):
1818
pass
1919

2020

21+
class _ProfiledThreadingSemaphore(_lock._ProfiledLock):
22+
pass
23+
24+
2125
class ThreadingLockCollector(_lock.LockCollector):
2226
"""Record threading.Lock usage."""
2327

2428
PROFILED_LOCK_CLASS = _ProfiledThreadingLock
25-
26-
def _get_patch_target(self) -> typing.Type[threading.Lock]:
27-
return threading.Lock
28-
29-
def _set_patch_target(
30-
self,
31-
value: typing.Any,
32-
) -> None:
33-
threading.Lock = value
29+
PATCH_MODULE = threading
30+
PATCH_ATTR_NAME = "Lock"
3431

3532

3633
class ThreadingRLockCollector(_lock.LockCollector):
3734
"""Record threading.RLock usage."""
3835

3936
PROFILED_LOCK_CLASS = _ProfiledThreadingRLock
37+
PATCH_MODULE = threading
38+
PATCH_ATTR_NAME = "RLock"
39+
4040

41-
def _get_patch_target(self) -> typing.Type[threading.RLock]:
42-
return threading.RLock
41+
class ThreadingSemaphoreCollector(_lock.LockCollector):
42+
"""Record threading.Semaphore usage."""
4343

44-
def _set_patch_target(
45-
self,
46-
value: typing.Any,
47-
) -> None:
48-
threading.RLock = value
44+
PROFILED_LOCK_CLASS = _ProfiledThreadingSemaphore
45+
PATCH_MODULE = threading
46+
PATCH_ATTR_NAME = "Semaphore"
4947

5048

5149
# Also patch threading.Thread so echion can track thread lifetimes

ddtrace/profiling/profiler.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,7 @@ def start_collector(collector_class: Type) -> None:
220220
self._collectors_on_import = [
221221
("threading", lambda _: start_collector(threading.ThreadingLockCollector)),
222222
("threading", lambda _: start_collector(threading.ThreadingRLockCollector)),
223+
("threading", lambda _: start_collector(threading.ThreadingSemaphoreCollector)),
223224
("asyncio", lambda _: start_collector(asyncio.AsyncioLockCollector)),
224225
]
225226

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
---
2+
features:
3+
- |
4+
profiling: Add support for ``threading.Semaphore`` locking type profiling in Python.
5+
The Lock profiler now detects "internal" ``threading.Lock`` primitives, i.e. those that are part of implementation of higher-level locking types (such as ``threading.Semaphore``).
6+
Doing so prevents double-counting samples for Semaphore objects, which are implemented with using Condition, which itself uses Lock.
7+
The internal lock, while still being patched due to the profiler design, is designated as internal (`self.is_internal == True`), which prevents it from being logged.

0 commit comments

Comments
 (0)