Skip to content

Commit e337514

Browse files
feat(profiling): track asyncio.as_completed
1 parent cc972d6 commit e337514

File tree

3 files changed

+121
-0
lines changed

3 files changed

+121
-0
lines changed

ddtrace/profiling/_asyncio.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
if typing.TYPE_CHECKING:
99
import asyncio
10+
import asyncio as aio_types
1011

1112
from ddtrace.internal._unpatched import _threading as ddtrace_threading
1213
from ddtrace.internal.datadog.profiling import stack_v2
@@ -145,6 +146,22 @@ def _(f, args, kwargs):
145146
for future in futures:
146147
stack_v2.link_tasks(parent, future)
147148

149+
@partial(wrap, sys.modules["asyncio"].tasks.as_completed)
150+
def _(f, args, kwargs):
151+
loop = typing.cast(typing.Optional["asyncio.AbstractEventLoop"], kwargs.get("loop"))
152+
parent: typing.Optional["aio_types.Task[typing.Any]"] = globals()["current_task"](loop)
153+
154+
if parent is not None:
155+
fs = typing.cast(typing.Iterable["asyncio.Future"], get_argument_value(args, kwargs, 0, "fs"))
156+
futures: typing.Set["asyncio.Future"] = {asyncio.ensure_future(f, loop=loop) for f in set(fs)}
157+
for future in futures:
158+
stack_v2.link_tasks(parent, future)
159+
160+
# Replace fs with the ensured futures to avoid double-wrapping
161+
args = (futures,) + args[1:]
162+
163+
return f(*args, **kwargs)
164+
148165
_call_init_asyncio(asyncio)
149166

150167

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
features:
2+
- |
3+
profiling: This introduces tracking for the ``asyncio.as_completed`` util in the Profiler.
Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
import pytest
2+
3+
4+
@pytest.mark.subprocess(
5+
env=dict(
6+
DD_PROFILING_OUTPUT_PPROF="/tmp/test_asyncio_as_completed",
7+
),
8+
err=None,
9+
)
10+
# For macOS: err=None ignores expected stderr from tracer failing to connect to agent (not relevant to this test)
11+
def test_asyncio_as_completed() -> None:
12+
import asyncio
13+
import os
14+
import random
15+
from sys import version_info as PYVERSION
16+
17+
from ddtrace.internal.datadog.profiling import stack_v2
18+
from ddtrace.profiling import profiler
19+
from tests.profiling.collector import pprof_utils
20+
21+
assert stack_v2.is_available, stack_v2.failure_msg
22+
23+
async def other(t: float) -> None:
24+
await asyncio.sleep(t)
25+
26+
async def wait_and_return_delay(t: float) -> float:
27+
await other(t)
28+
return t
29+
30+
async def main() -> None:
31+
# Create a mix of Tasks and Coroutines
32+
futures = [
33+
asyncio.create_task(wait_and_return_delay(i / 10)) if i % 2 == 0 else wait_and_return_delay(i / 10)
34+
for i in range(10)
35+
]
36+
37+
# Randomize the order of the futures
38+
random.shuffle(futures)
39+
40+
# Wait for the futures to complete and store their result (each Future will return
41+
# the time that it slept for)
42+
result: list[float] = []
43+
for future in asyncio.as_completed(futures):
44+
result.append(await future)
45+
46+
# Validate that the returned results are in ascending order
47+
# which should be the case since each future will wait x seconds
48+
# before returning x, and all tasks are started around the same time.
49+
assert sorted(result) == result
50+
51+
p = profiler.Profiler()
52+
p.start()
53+
54+
loop = asyncio.new_event_loop()
55+
asyncio.set_event_loop(loop)
56+
loop.run_until_complete(main())
57+
58+
p.stop()
59+
60+
output_filename = os.environ["DD_PROFILING_OUTPUT_PPROF"] + "." + str(os.getpid())
61+
62+
profile = pprof_utils.parse_newest_profile(output_filename)
63+
64+
samples = pprof_utils.get_samples_with_label_key(profile, "task name")
65+
assert len(samples) > 0
66+
67+
locations = [
68+
pprof_utils.StackLocation(
69+
function_name="wait_and_return_delay",
70+
filename="test_asyncio_as_completed.py",
71+
line_no=wait_and_return_delay.__code__.co_firstlineno + 1,
72+
),
73+
pprof_utils.StackLocation(
74+
function_name="main",
75+
filename="test_asyncio_as_completed.py",
76+
line_no=main.__code__.co_firstlineno + 14,
77+
),
78+
]
79+
80+
if PYVERSION < (3, 13):
81+
locations = [
82+
pprof_utils.StackLocation(
83+
function_name="sleep",
84+
filename="",
85+
line_no=-1,
86+
),
87+
pprof_utils.StackLocation(
88+
function_name="other",
89+
filename="test_asyncio_as_completed.py",
90+
line_no=other.__code__.co_firstlineno + 1,
91+
),
92+
] + locations
93+
94+
pprof_utils.assert_profile_has_sample(
95+
profile,
96+
samples,
97+
expected_sample=pprof_utils.StackEvent(
98+
thread_name="MainThread",
99+
locations=locations,
100+
),
101+
)

0 commit comments

Comments
 (0)