Skip to content

Commit d8cc57b

Browse files
authored
fix: Avoid mixed Langfuse traces in async envs (#2207)
* LangfuseTracer is now thread-safe, avoids mixed traces in async environments * Fix post merge * fix test * Remove duplicate test
1 parent 01506ad commit d8cc57b

File tree

2 files changed

+124
-30
lines changed

2 files changed

+124
-30
lines changed

integrations/langfuse/src/haystack_integrations/tracing/langfuse/tracer.py

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,14 @@
6161
_COMPONENT_OUTPUT_KEY = "haystack.component.output"
6262
_COMPONENT_INPUT_KEY = "haystack.component.input"
6363

64-
# Context var used to keep track of tracing related info.
65-
# This mainly useful for parents spans.
64+
# External session metadata for trace correlation (Haystack system)
65+
# Stores trace_id, user_id, session_id, tags, version for root trace creation
6666
tracing_context_var: ContextVar[Dict[Any, Any]] = ContextVar("tracing_context")
6767

68+
# Internal span execution hierarchy for our tracer
69+
# Manages parent-child relationships and prevents cross-request span interleaving
70+
span_stack_var: ContextVar[Optional[List["LangfuseSpan"]]] = ContextVar("span_stack", default=None)
71+
6872

6973
class LangfuseSpan(Span):
7074
"""
@@ -265,6 +269,7 @@ def create_span(self, context: SpanContext) -> LangfuseSpan:
265269
)
266270
raise RuntimeError(message)
267271

272+
# Get external tracing context for root trace creation (correlation metadata)
268273
tracing_ctx = tracing_context_var.get({})
269274
if not context.parent_span:
270275
# Create a new trace when there's no parent span
@@ -360,6 +365,7 @@ def __init__(
360365
"before importing Haystack."
361366
)
362367
self._tracer = tracer
368+
# Keep _context as deprecated shim to avoid AttributeError if anyone uses it
363369
self._context: List[LangfuseSpan] = []
364370
self._name = name
365371
self._public = public
@@ -391,7 +397,12 @@ def trace(
391397
# Create span using the handler
392398
span = self._span_handler.create_span(span_context)
393399

394-
self._context.append(span)
400+
# Build new span hierarchy: copy existing stack, add new span, save for restoration
401+
prev_stack = span_stack_var.get()
402+
new_stack = (prev_stack or []).copy()
403+
new_stack.append(span)
404+
token = span_stack_var.set(new_stack)
405+
395406
span.set_tags(tags)
396407

397408
try:
@@ -414,10 +425,8 @@ def trace(
414425
cleanup_error=cleanup_error,
415426
)
416427
finally:
417-
# CRITICAL: Always pop context to prevent corruption
418-
# This is especially important for nested pipeline scenarios
419-
if self._context and self._context[-1] == span:
420-
self._context.pop()
428+
# Restore previous span stack using saved token - ensures proper cleanup
429+
span_stack_var.reset(token)
421430

422431
if self.enforce_flush:
423432
self.flush()
@@ -431,7 +440,9 @@ def current_span(self) -> Optional[Span]:
431440
432441
:return: The current span if available, else None.
433442
"""
434-
return self._context[-1] if self._context else None
443+
# Get top of span stack (most recent span) from context-local storage
444+
stack = span_stack_var.get()
445+
return stack[-1] if stack else None
435446

436447
def get_trace_url(self) -> str:
437448
"""

integrations/langfuse/tests/test_tracer.py

Lines changed: 105 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -2,48 +2,57 @@
22
#
33
# SPDX-License-Identifier: Apache-2.0
44

5+
import asyncio
56
import datetime
67
import logging
78
import sys
9+
import json
810
from typing import Optional
911
from unittest.mock import MagicMock, Mock, patch
1012

1113
import pytest
14+
from haystack import Pipeline, component
1215
from haystack.dataclasses import ChatMessage, ToolCall
1316

1417
from haystack_integrations.tracing.langfuse.tracer import (
15-
_COMPONENT_OUTPUT_KEY, DefaultSpanHandler, LangfuseSpan, LangfuseTracer,
16-
SpanContext)
18+
_COMPONENT_OUTPUT_KEY,
19+
DefaultSpanHandler,
20+
LangfuseSpan,
21+
LangfuseTracer,
22+
SpanContext,
23+
)
24+
from haystack_integrations.components.connectors.langfuse import LangfuseConnector
1725

1826

1927
class MockSpan:
20-
def __init__(self):
28+
def __init__(self, name="mock_span"):
2129
self._data = {}
2230
self._span = self
23-
self.operation_name = "operation_name"
31+
self.operation_name = name
32+
self._name = name
2433

2534
def raw_span(self):
2635
return self
2736

2837
def span(self, name=None):
29-
# assert correct operation name passed to the span
30-
assert name == "operation_name"
31-
return self
38+
# Return a new mock span for child spans
39+
return MockSpan(name=name or "child_span")
3240

3341
def update(self, **kwargs):
3442
self._data.update(kwargs)
3543

3644
def generation(self, name=None):
37-
return self
45+
# Return a new mock span for generation spans
46+
return MockSpan(name=name or "generation_span")
3847

3948
def end(self):
4049
pass
4150

4251

4352
class MockTracer:
44-
4553
def trace(self, name, **kwargs):
46-
return MockSpan()
54+
# Return a unique mock span for each trace call
55+
return MockSpan(name=name)
4756

4857
def flush(self):
4958
pass
@@ -59,7 +68,6 @@ def handle(self, span: LangfuseSpan, component_type: Optional[str]) -> None:
5968

6069

6170
class TestLangfuseSpan:
62-
6371
# LangfuseSpan can be initialized with a span object
6472
def test_initialized_with_span_object(self):
6573
mock_span = Mock()
@@ -232,7 +240,8 @@ def test_initialization(self):
232240
langfuse_instance = Mock()
233241
tracer = LangfuseTracer(tracer=langfuse_instance, name="Haystack", public=True)
234242
assert tracer._tracer == langfuse_instance
235-
assert tracer._context == []
243+
# Check behavioral state instead of internal _context list
244+
assert tracer.current_span() is None
236245
assert tracer._name == "Haystack"
237246
assert tracer._public
238247

@@ -255,13 +264,14 @@ def test_create_new_span(self):
255264

256265
# check that the trace method is called on the tracer instance with the provided operation name and tags
257266
with tracer.trace("operation_name", tags={"tag1": "value1", "tag2": "value2"}) as span:
258-
assert len(tracer._context) == 1, "The trace span should have been added to the the root context span"
267+
# Check that there is a current active span during tracing
268+
assert tracer.current_span() is not None, "There should be an active span during tracing"
269+
assert tracer.current_span() == span, "The current span should be the active span"
259270
assert span.raw_span().operation_name == "operation_name"
260271
assert span.raw_span().metadata == {"tag1": "value1", "tag2": "value2"}
261272

262-
assert (
263-
len(tracer._context) == 0
264-
), "The trace span should have been popped, and the root span is closed as well"
273+
# Check that the span is cleaned up after tracing
274+
assert tracer.current_span() is None, "There should be no active span after tracing completes"
265275

266276
# check that update method is called on the span instance with the provided key value pairs
267277
def test_update_span_with_pipeline_input_output_data(self):
@@ -324,12 +334,12 @@ def test_handle_tool_invoker(self):
324334
assert mock_span.update.call_count >= 1
325335
name_update_call = None
326336
for call in mock_span.update.call_args_list:
327-
if 'name' in call[1]:
337+
if "name" in call[1]:
328338
name_update_call = call
329339
break
330340

331341
assert name_update_call is not None, "No call to update the span name was made"
332-
updated_name = name_update_call[1]['name']
342+
updated_name = name_update_call[1]["name"]
333343

334344
# verify the format of the updated span name to be: `original_component_name - [list_of_tool_names]`
335345
assert updated_name != "tool_invoker", f"Expected 'tool_invoker` to be upddated with tool names"
@@ -369,8 +379,7 @@ def test_update_span_flush_disable(self, monkeypatch):
369379
monkeypatch.setenv("HAYSTACK_LANGFUSE_ENFORCE_FLUSH", "false")
370380
tracer_mock = Mock()
371381

372-
from haystack_integrations.tracing.langfuse.tracer import \
373-
LangfuseTracer
382+
from haystack_integrations.tracing.langfuse.tracer import LangfuseTracer
374383

375384
tracer = LangfuseTracer(tracer=tracer_mock, name="Haystack", public=False)
376385
with tracer.trace(operation_name="operation_name", tags={"haystack.pipeline.input_data": "hello"}) as span:
@@ -385,11 +394,12 @@ def test_context_is_empty_after_tracing(self):
385394
with tracer.trace(operation_name="operation_name", tags={"haystack.pipeline.input_data": "hello"}) as span:
386395
pass
387396

388-
assert tracer._context == []
397+
# Check behavioral state instead of internal _context list
398+
assert tracer.current_span() is None
389399

390400
def test_init_with_tracing_disabled(self, monkeypatch, caplog):
391401
# Clear haystack modules because ProxyTracer is initialized whenever haystack is imported
392-
modules_to_clear = [name for name in sys.modules if name.startswith('haystack')]
402+
modules_to_clear = [name for name in sys.modules if name.startswith("haystack")]
393403
for name in modules_to_clear:
394404
sys.modules.pop(name, None)
395405

@@ -400,3 +410,76 @@ def test_init_with_tracing_disabled(self, monkeypatch, caplog):
400410

401411
LangfuseTracer(tracer=MockTracer(), name="Haystack", public=False)
402412
assert "tracing is disabled" in caplog.text
413+
414+
def test_async_concurrency_span_isolation(self):
415+
"""
416+
Test that concurrent async traces maintain isolated span contexts.
417+
418+
This test verifies that the context-local span stack prevents cross-request
419+
span interleaving in concurrent environments like FastAPI servers.
420+
"""
421+
tracer = LangfuseTracer(tracer=MockTracer(), name="Haystack", public=False)
422+
423+
# Track spans from each task for verification
424+
task1_spans = []
425+
task2_spans = []
426+
427+
async def trace_task(task_id: str, spans_list: list):
428+
"""Simulate a request with nested tracing operations"""
429+
with tracer.trace(f"outer_operation_{task_id}") as outer_span:
430+
spans_list.append(("outer", outer_span, tracer.current_span()))
431+
432+
# Simulate some async work
433+
await asyncio.sleep(0.01)
434+
435+
with tracer.trace(f"inner_operation_{task_id}") as inner_span:
436+
spans_list.append(("inner", inner_span, tracer.current_span()))
437+
438+
# Simulate more async work
439+
await asyncio.sleep(0.01)
440+
441+
# Verify nested relationship within this task
442+
assert tracer.current_span() == inner_span
443+
444+
# After inner span, outer should be current again
445+
spans_list.append(("after_inner", None, tracer.current_span()))
446+
assert tracer.current_span() == outer_span
447+
448+
# After all spans, should be None
449+
spans_list.append(("after_outer", None, tracer.current_span()))
450+
assert tracer.current_span() is None
451+
452+
async def run_concurrent_traces():
453+
"""Run two concurrent tracing tasks"""
454+
await asyncio.gather(trace_task("task1", task1_spans), trace_task("task2", task2_spans))
455+
456+
# Run the concurrent test
457+
asyncio.run(run_concurrent_traces())
458+
459+
# Verify both tasks completed successfully
460+
assert len(task1_spans) == 4
461+
assert len(task2_spans) == 4
462+
463+
# Verify each task had proper span isolation
464+
# Task 1 spans should be different from Task 2 spans
465+
task1_outer = task1_spans[0][1] # outer span from task1
466+
task2_outer = task2_spans[0][1] # outer span from task2
467+
assert task1_outer != task2_outer
468+
469+
task1_inner = task1_spans[1][1] # inner span from task1
470+
task2_inner = task2_spans[1][1] # inner span from task2
471+
assert task1_inner != task2_inner
472+
473+
# Verify proper nesting within each task
474+
# Task 1: outer -> inner -> outer -> None
475+
assert task1_spans[0][2] == task1_outer # current_span during outer
476+
assert task1_spans[1][2] == task1_inner # current_span during inner
477+
assert task1_spans[2][2] == task1_outer # current_span after inner
478+
assert task1_spans[3][2] is None # current_span after outer
479+
480+
# Task 2: outer -> inner -> outer -> None
481+
assert task2_spans[0][2] == task2_outer # current_span during outer
482+
assert task2_spans[1][2] == task2_inner # current_span during inner
483+
assert task2_spans[2][2] == task2_outer # current_span after inner
484+
assert task2_spans[3][2] is None # current_span after outer
485+

0 commit comments

Comments
 (0)