Skip to content

Commit 02d3cc3

Browse files
feat(profiling): track asyncio.as_completed
1 parent 61c07d1 commit 02d3cc3

File tree

2 files changed

+110
-0
lines changed

2 files changed

+110
-0
lines changed

ddtrace/profiling/_asyncio.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,22 @@ def _(f, args, kwargs):
145145
for future in futures:
146146
stack_v2.link_tasks(parent, future)
147147

148+
@partial(wrap, sys.modules["asyncio"].tasks.as_completed)
149+
def _(f: typing.Callable, args: typing.Tuple[typing.Any], kwargs: typing.Dict[str, typing.Any]) -> typing.Iterator["asyncio.Future[typing.Any]"]:
150+
loop = typing.cast(typing.Optional["asyncio.AbstractEventLoop"], kwargs.get("loop"))
151+
parent: typing.Optional["asyncio.Task"] = globals()["current_task"](loop)
152+
153+
if parent is not None:
154+
fs = typing.cast(typing.Iterable["asyncio.Future"], get_argument_value(args, kwargs, 0, "fs"))
155+
futures: typing.Set["asyncio.Future"] = {asyncio.ensure_future(f, loop=loop) for f in set(fs)}
156+
for future in futures:
157+
stack_v2.link_tasks(parent, future)
158+
159+
# Replace fs with the ensured futures to avoid double-wrapping
160+
args = (futures,) + args[1:]
161+
162+
return f(*args, **kwargs)
163+
148164
_call_init_asyncio(asyncio)
149165

150166

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
import pytest
2+
3+
4+
@pytest.mark.subprocess(
5+
env=dict(
6+
DD_PROFILING_OUTPUT_PPROF="/tmp/test_asyncio_as_completed",
7+
),
8+
err=None,
9+
)
10+
# For macOS: err=None ignores expected stderr from tracer failing to connect to agent (not relevant to this test)
11+
def test_asyncio_as_completed() -> None:
12+
import asyncio
13+
import os
14+
import random
15+
16+
from ddtrace.internal.datadog.profiling import stack_v2
17+
from ddtrace.profiling import profiler
18+
from tests.profiling.collector import pprof_utils
19+
20+
assert stack_v2.is_available, stack_v2.failure_msg
21+
22+
async def other(t: float) -> None:
23+
await asyncio.sleep(t)
24+
25+
async def wait_and_return_delay(t: float) -> float:
26+
await other(t)
27+
return t
28+
29+
async def main() -> None:
30+
# Create a mix of Tasks and Coroutines
31+
futures = [
32+
asyncio.create_task(wait_and_return_delay(i / 10)) if i % 2 == 0 else wait_and_return_delay(i / 10)
33+
for i in range(10)
34+
]
35+
36+
# Randomize the order of the futures
37+
random.shuffle(futures)
38+
39+
# Wait for the futures to complete and store their result (each Future will return
40+
# the time that it slept for)
41+
result: list[float] = []
42+
for future in asyncio.as_completed(futures):
43+
result.append(await future)
44+
45+
# Validate that the returned results are in ascending order
46+
# which should be the case since each future will wait x seconds
47+
# before returning x, and all tasks are started around the same time.
48+
assert sorted(result) == result
49+
50+
p = profiler.Profiler()
51+
p.start()
52+
53+
loop = asyncio.new_event_loop()
54+
asyncio.set_event_loop(loop)
55+
loop.run_until_complete(main())
56+
57+
p.stop()
58+
59+
output_filename = os.environ["DD_PROFILING_OUTPUT_PPROF"] + "." + str(os.getpid())
60+
61+
profile = pprof_utils.parse_newest_profile(output_filename)
62+
63+
samples = pprof_utils.get_samples_with_label_key(profile, "task name")
64+
assert len(samples) > 0
65+
66+
pprof_utils.assert_profile_has_sample(
67+
profile,
68+
samples,
69+
expected_sample=pprof_utils.StackEvent(
70+
thread_name="MainThread",
71+
locations=[
72+
pprof_utils.StackLocation(
73+
function_name="sleep",
74+
filename="",
75+
line_no=-1,
76+
),
77+
pprof_utils.StackLocation(
78+
function_name="other",
79+
filename="test_asyncio_as_completed.py",
80+
line_no=other.__code__.co_firstlineno + 1,
81+
),
82+
pprof_utils.StackLocation(
83+
function_name="wait_and_return_delay",
84+
filename="test_asyncio_as_completed.py",
85+
line_no=wait_and_return_delay.__code__.co_firstlineno + 1,
86+
),
87+
pprof_utils.StackLocation(
88+
function_name="main",
89+
filename="test_asyncio_as_completed.py",
90+
line_no=main.__code__.co_firstlineno + 16,
91+
),
92+
],
93+
),
94+
)

0 commit comments

Comments
 (0)