Skip to content

Use Task.start_link in stream worker for caller propagation#11

Open
MikaAK wants to merge 1 commit into
freshaengineering:mainfrom
MikaAK:fix/propagate-callers-in-stream
Open

Use Task.start_link in stream worker for caller propagation#11
MikaAK wants to merge 1 commit into
freshaengineering:mainfrom
MikaAK:fix/propagate-callers-in-stream

Conversation

@MikaAK

@MikaAK MikaAK commented Apr 23, 2026

Copy link
Copy Markdown

Summary

LangEx.Graph.Stream.start_execution/3 spawns its worker via spawn_link/1, which does not propagate the parent's $callers / $ancestors process-dictionary entries. This breaks libraries that rely on per-PID ownership to route calls back to the test process, including:

  • Ecto.Adapters.SQL.Sandbox.allow/3 / Mode.SandboxOwner when the worker touches a shared DB connection
  • Custom per-PID test stubs (registries keyed on $callers)
  • Caller-chain tracking for telemetry / tracing

Today the stream worker is invisible to these mechanisms, so tests that stub LLM responses per-PID silently fall through to the real HTTP layer (or fail with "no stub registered") when the code under test flows through Graphs.stream/3.

The fix

Swap spawn_link/1 for Task.start_link/1. Task already captures and restores $callers / $ancestors internally (that is what Task.Supervised.start_link does under the hood), and brings a few additional wins for free:

  • OTP-style crash logging via Task's crash handler
  • Correct participation in OTP telemetry / tracing
  • No manual Process.put(:"$callers", ...) dance
 defp start_execution(graph, input, opts) do
   parent = self()
-
-  spawn_link(fn ->
-    state = State.apply_update(graph.initial_state, input, graph.reducers)
-    ...
-  end)
+
+  {:ok, pid} =
+    Task.start_link(fn ->
+      state = State.apply_update(graph.initial_state, input, graph.reducers)
+      ...
+    end)
+
+  pid
 end

Regression test

Added in test/lang_ex/graph/stream_test.exs: invokes the stream, has a node send its Process.get(:"$callers") / Process.get(:"$ancestors") back to the consumer, and asserts the consumer PID appears in both lists.

Compatibility

  • No public API changes
  • No new dependencies (Task is stdlib)
  • Behaviour change is additive: consumers that do not rely on caller ownership see no difference
  • Link semantics preserved — Task.start_link links to the calling process just like spawn_link

Testing

mix test
# 145 tests, 0 failures

@MikaAK MikaAK force-pushed the fix/propagate-callers-in-stream branch from f2b3be3 to ceb38c1 Compare April 23, 2026 19:32
MikaAK added a commit to MikaAK/lang_ex that referenced this pull request Apr 23, 2026
@MikaAK MikaAK changed the title Propagate $callers and $ancestors into stream worker Use Task.start_link in stream worker for caller propagation Apr 23, 2026
The stream worker was spawn_link'd without propagating the caller's
process dictionary, so libraries relying on per-PID ownership (e.g.
Ecto.Adapters.SQL.Sandbox.allow/3, custom per-test sandboxes keyed by
$callers, or process-dictionary-based stubs) could not see the stream
consumer as a caller.

Switch spawn_link to Task.start_link: Task internally captures and
restores $callers and $ancestors, gets OTP-style crash logging for
free, and participates correctly in telemetry/tracing trees. No manual
Process.put dance and no behaviour change for callers that did not
rely on ownership propagation.

Includes a regression test asserting the worker sees the consumer PID
in both $callers and $ancestors.
@MikaAK MikaAK force-pushed the fix/propagate-callers-in-stream branch from ceb38c1 to 92a3818 Compare April 23, 2026 19:52
@twist900

twist900 commented Jun 4, 2026

Copy link
Copy Markdown
Contributor

@claude review

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants