Skip to content

Nicole cybul/flaky test investigation3 #13265

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 64 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
64 commits
Select commit Hold shift + click to select a range
69d9709
add litellm apm integration
ncybul Mar 19, 2025
f6c9ead
add tests for litellm
ncybul Mar 19, 2025
7e43133
reuse cassettes that are similar across different litellm requests
ncybul Mar 20, 2025
65714e5
add test snapshots
ncybul Mar 20, 2025
462b576
trace get_llm_provider
ncybul Mar 21, 2025
a01b415
remove provider tagging for now
ncybul Mar 21, 2025
4869794
test out other models
ncybul Mar 21, 2025
0adc8e4
add global tags test
ncybul Mar 21, 2025
800fcf4
add release note
ncybul Mar 21, 2025
1cc21e1
run black
ncybul Mar 21, 2025
b95d96f
fix requirements lock file
ncybul Mar 21, 2025
9f63cee
remove unnecessary bedrock credentials in tests
ncybul Mar 21, 2025
a066c99
add documentation
ncybul Mar 23, 2025
010f85e
add llmobs base span tag method for litellm
ncybul Mar 25, 2025
528a0ec
add provider tagging and stream capturing support
ncybul Mar 25, 2025
4b16a4c
attach usage metrics to llmobs spans
ncybul Mar 25, 2025
0a6b744
finish non streamed spans
ncybul Mar 25, 2025
33aae75
use sample pc config
ncybul Mar 26, 2025
184d728
move openai message parsing utils to shared utils file
ncybul Mar 26, 2025
a241160
reuse role for litellm streamed multi choice responses
ncybul Mar 26, 2025
2d2337a
pass operation to litellm llmobs set tags
ncybul Mar 26, 2025
3891da9
wrap get_llm_provider in litellm sdk
ncybul Mar 26, 2025
df8449e
update provider map to store parsed model name and provider
ncybul Mar 27, 2025
122cf45
update model name based on model map
ncybul Mar 27, 2025
92670c5
add llmobs test file and remove usage extraction logic
ncybul Mar 27, 2025
60830c4
add more tests
ncybul Mar 28, 2025
f4d76d7
run black
ncybul Mar 28, 2025
a4a2b24
add tool call tests
ncybul Mar 30, 2025
64f48a1
add test for case where integrations are enabled
ncybul Mar 31, 2025
4dd467c
add tests for proxy requests
ncybul Mar 31, 2025
f0b8a72
add litellm tests to suitespec
ncybul Apr 1, 2025
2ed9abb
Merge branch 'main' into nicole-cybul/litellm-llmobs-tracing
ncybul Apr 8, 2025
8d5d24b
consume streams for apm tests
ncybul Apr 8, 2025
377b40d
remove unnecessary snapshot files and riot requirements
ncybul Apr 8, 2025
d4185d8
fix merge conflict bug for openai span linking
ncybul Apr 8, 2025
48e57c3
fix integrations enabled test
ncybul Apr 8, 2025
252f092
add next and anext methods to traced stream classes
ncybul Apr 8, 2025
07685a8
remove unnecessary config stuff
ncybul Apr 8, 2025
c273b52
add release note
ncybul Apr 8, 2025
5ee0f03
fix errored out streamed requests not setting llmobs tags properly
ncybul Apr 8, 2025
a5cdfa5
run black
ncybul Apr 8, 2025
c92cc9d
Merge branch 'main' into nicole-cybul/litellm-llmobs-tracing
ncybul Apr 9, 2025
1474258
style fixes
ncybul Apr 10, 2025
c9ba90a
use wrapt.ObjectProxy for traced streams
ncybul Apr 10, 2025
b2e8f59
small improvements to model name and metric extraction
ncybul Apr 10, 2025
4b14921
cleaned up configuration for litellm tests
ncybul Apr 10, 2025
73417db
style fixes
ncybul Apr 10, 2025
f163063
make test specifically for excluding usage
ncybul Apr 10, 2025
59bc4d4
style and type fixes
ncybul Apr 10, 2025
65297a3
add typing for message variable
ncybul Apr 10, 2025
dd01f97
ruff fixes
ncybul Apr 10, 2025
72a6fe5
add mock tracer to openai pin instead of litellm
ncybul Apr 11, 2025
5b2e62e
add argument for parametrized config
ncybul Apr 20, 2025
a8ec735
run black
ncybul Apr 20, 2025
bcfbd4e
patch openai manually
ncybul Apr 21, 2025
22b052d
try moving flaky test
ncybul Apr 22, 2025
e5c18f0
simplify flaky openai enabled test
ncybul Apr 22, 2025
80ee6c6
revert to passing test
ncybul Apr 22, 2025
7813ada
Revert "revert to passing test"
ncybul Apr 22, 2025
695304c
Merge branch 'main' into nicole-cybul/litellm-llmobs-tracing
ncybul Apr 23, 2025
efefbb2
use common test llmobs span writer
ncybul Apr 23, 2025
83b68fc
run black
ncybul Apr 24, 2025
e982630
run ruff
ncybul Apr 24, 2025
a6d8500
manually override tracer for litellm and openai
ncybul Apr 24, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 66 additions & 10 deletions ddtrace/contrib/internal/litellm/patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import litellm

from ddtrace import config
from ddtrace.contrib.internal.litellm.utils import TracedLiteLLMAsyncStream
from ddtrace.contrib.internal.litellm.utils import TracedLiteLLMStream
from ddtrace.contrib.trace_utils import unwrap
from ddtrace.contrib.trace_utils import with_traced_module
from ddtrace.contrib.trace_utils import wrap
Expand All @@ -21,6 +23,25 @@ def get_version() -> str:

@with_traced_module
def traced_completion(litellm, pin, func, instance, args, kwargs):
return _traced_completion(litellm, pin, func, instance, args, kwargs, False)


@with_traced_module
async def traced_acompletion(litellm, pin, func, instance, args, kwargs):
return await _traced_acompletion(litellm, pin, func, instance, args, kwargs, False)


@with_traced_module
def traced_text_completion(litellm, pin, func, instance, args, kwargs):
return _traced_completion(litellm, pin, func, instance, args, kwargs, True)


@with_traced_module
async def traced_atext_completion(litellm, pin, func, instance, args, kwargs):
return await _traced_acompletion(litellm, pin, func, instance, args, kwargs, True)


def _traced_completion(litellm, pin, func, instance, args, kwargs, is_completion):
integration = litellm._datadog_integration
model = get_argument_value(args, kwargs, 0, "model", None)
host = None
Expand All @@ -31,19 +52,29 @@ def traced_completion(litellm, pin, func, instance, args, kwargs):
func.__name__,
model=model,
host=host,
submit_to_llmobs=False,
submit_to_llmobs=integration.should_submit_to_llmobs(kwargs, model),
)
stream = kwargs.get("stream", False)
resp = None
try:
return func(*args, **kwargs)
resp = func(*args, **kwargs)
if stream:
return TracedLiteLLMStream(resp, integration, span, kwargs, is_completion)
return resp
except Exception:
span.set_exc_info(*sys.exc_info())
raise
finally:
span.finish()
# streamed spans will be finished separately once the stream generator is exhausted
if not stream:
if integration.is_pc_sampled_llmobs(span):
integration.llmobs_set_tags(
span, args=args, kwargs=kwargs, response=resp, operation="completion" if is_completion else "chat"
)
span.finish()


@with_traced_module
async def traced_acompletion(litellm, pin, func, instance, args, kwargs):
async def _traced_acompletion(litellm, pin, func, instance, args, kwargs, is_completion):
integration = litellm._datadog_integration
model = get_argument_value(args, kwargs, 0, "model", None)
host = None
Expand All @@ -54,15 +85,36 @@ async def traced_acompletion(litellm, pin, func, instance, args, kwargs):
func.__name__,
model=model,
host=host,
submit_to_llmobs=False,
submit_to_llmobs=integration.should_submit_to_llmobs(kwargs, model),
)
stream = kwargs.get("stream", False)
resp = None
try:
return await func(*args, **kwargs)
resp = await func(*args, **kwargs)
if stream:
return TracedLiteLLMAsyncStream(resp, integration, span, kwargs, is_completion)
return resp
except Exception:
span.set_exc_info(*sys.exc_info())
raise
finally:
span.finish()
# streamed spans will be finished separately once the stream generator is exhausted
if not stream:
if integration.is_pc_sampled_llmobs(span):
integration.llmobs_set_tags(
span, args=args, kwargs=kwargs, response=resp, operation="completion" if is_completion else "chat"
)
span.finish()


@with_traced_module
def traced_get_llm_provider(litellm, pin, func, instance, args, kwargs):
requested_model = get_argument_value(args, kwargs, 0, "model", None)
integration = litellm._datadog_integration
model, custom_llm_provider, dynamic_api_key, api_base = func(*args, **kwargs)
# store the model name and provider in the integration
integration._model_map[requested_model] = (model, custom_llm_provider)
return model, custom_llm_provider, dynamic_api_key, api_base


def patch():
Expand All @@ -77,8 +129,10 @@ def patch():

wrap("litellm", "completion", traced_completion(litellm))
wrap("litellm", "acompletion", traced_acompletion(litellm))
wrap("litellm", "text_completion", traced_completion(litellm))
wrap("litellm", "atext_completion", traced_acompletion(litellm))
wrap("litellm", "text_completion", traced_text_completion(litellm))
wrap("litellm", "atext_completion", traced_atext_completion(litellm))
wrap("litellm", "get_llm_provider", traced_get_llm_provider(litellm))
wrap("litellm", "main.get_llm_provider", traced_get_llm_provider(litellm))


def unpatch():
Expand All @@ -91,5 +145,7 @@ def unpatch():
unwrap(litellm, "acompletion")
unwrap(litellm, "text_completion")
unwrap(litellm, "atext_completion")
unwrap(litellm, "get_llm_provider")
unwrap(litellm.main, "get_llm_provider")

delattr(litellm, "_datadog_integration")
129 changes: 129 additions & 0 deletions ddtrace/contrib/internal/litellm/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import sys

import wrapt

from ddtrace.internal.logger import get_logger
from ddtrace.llmobs._integrations.utils import openai_construct_completion_from_streamed_chunks
from ddtrace.llmobs._integrations.utils import openai_construct_message_from_streamed_chunks


log = get_logger(__name__)


class BaseTracedLiteLLMStream(wrapt.ObjectProxy):
def __init__(self, wrapped, integration, span, kwargs, is_completion=False):
super().__init__(wrapped)
n = kwargs.get("n", 1) or 1
self._dd_integration = integration
self._dd_span = span
self._kwargs = kwargs
self._streamed_chunks = [[] for _ in range(n)]
self._is_completion = is_completion


class TracedLiteLLMStream(BaseTracedLiteLLMStream):
def __enter__(self):
self.__wrapped__.__enter__()
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.__wrapped__.__exit__(exc_type, exc_val, exc_tb)

def __iter__(self):
try:
for chunk in self.__wrapped__:
yield chunk
_loop_handler(chunk, self._streamed_chunks)
except Exception:
self._dd_span.set_exc_info(*sys.exc_info())
raise
finally:
_process_finished_stream(
self._dd_integration, self._dd_span, self._kwargs, self._streamed_chunks, self._is_completion
)
self._dd_span.finish()

def __next__(self):
try:
chunk = self.__wrapped__.__next__()
_loop_handler(chunk, self._streamed_chunks)
return chunk
except StopIteration:
raise
except Exception:
self._dd_span.set_exc_info(*sys.exc_info())
raise
finally:
_process_finished_stream(
self._dd_integration, self._dd_span, self._kwargs, self._streamed_chunks, self._is_completion
)
self._dd_span.finish()


class TracedLiteLLMAsyncStream(BaseTracedLiteLLMStream):
async def __aenter__(self):
await self.__wrapped__.__aenter__()
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.__wrapped__.__aexit__(exc_type, exc_val, exc_tb)

async def __aiter__(self):
try:
async for chunk in self.__wrapped__:
yield chunk
_loop_handler(chunk, self._streamed_chunks)
except Exception:
self._dd_span.set_exc_info(*sys.exc_info())
raise
finally:
_process_finished_stream(
self._dd_integration, self._dd_span, self._kwargs, self._streamed_chunks, self._is_completion
)
self._dd_span.finish()

async def __anext__(self):
try:
chunk = await self.__wrapped__.__anext__()
_loop_handler(chunk, self._streamed_chunks)
return chunk
except StopAsyncIteration:
raise
except Exception:
self._dd_span.set_exc_info(*sys.exc_info())
raise
finally:
_process_finished_stream(
self._dd_integration, self._dd_span, self._kwargs, self._streamed_chunks, self._is_completion
)
self._dd_span.finish()


def _loop_handler(chunk, streamed_chunks):
"""Appends the chunk to the correct index in the streamed_chunks list.

When handling a streamed chat/completion response, this function is called for each chunk in the streamed response.
"""
for choice in chunk.choices:
streamed_chunks[choice.index].append(choice)
if getattr(chunk, "usage", None):
streamed_chunks[0].insert(0, chunk)


def _process_finished_stream(integration, span, kwargs, streamed_chunks, is_completion=False):
try:
if is_completion:
formatted_completions = [
openai_construct_completion_from_streamed_chunks(choice) for choice in streamed_chunks
]
else:
formatted_completions = [
openai_construct_message_from_streamed_chunks(choice) for choice in streamed_chunks
]
operation = "completion" if is_completion else "chat"
if integration.is_pc_sampled_llmobs(span):
integration.llmobs_set_tags(
span, args=[], kwargs=kwargs, response=formatted_completions, operation=operation
)
except Exception:
log.warning("Error processing streamed completion/chat response.", exc_info=True)
89 changes: 8 additions & 81 deletions ddtrace/contrib/internal/openai/utils.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import re
import sys
from typing import Any
from typing import AsyncGenerator
from typing import Dict
from typing import Generator
from typing import List

import wrapt

from ddtrace.internal.logger import get_logger
from ddtrace.llmobs._integrations.utils import openai_construct_completion_from_streamed_chunks
from ddtrace.llmobs._integrations.utils import openai_construct_message_from_streamed_chunks
from ddtrace.llmobs._utils import _get_attr


Expand Down Expand Up @@ -265,9 +264,13 @@ def _process_finished_stream(integration, span, kwargs, streamed_chunks, is_comp
request_messages = kwargs.get("messages", None)
try:
if is_completion:
formatted_completions = [_construct_completion_from_streamed_chunks(choice) for choice in streamed_chunks]
formatted_completions = [
openai_construct_completion_from_streamed_chunks(choice) for choice in streamed_chunks
]
else:
formatted_completions = [_construct_message_from_streamed_chunks(choice) for choice in streamed_chunks]
formatted_completions = [
openai_construct_message_from_streamed_chunks(choice) for choice in streamed_chunks
]
if integration.is_pc_sampled_span(span):
_tag_streamed_response(integration, span, formatted_completions)
_set_token_metrics(span, formatted_completions, prompts, request_messages, kwargs)
Expand All @@ -277,82 +280,6 @@ def _process_finished_stream(integration, span, kwargs, streamed_chunks, is_comp
log.warning("Error processing streamed completion/chat response.", exc_info=True)


def _construct_completion_from_streamed_chunks(streamed_chunks: List[Any]) -> Dict[str, str]:
"""Constructs a completion dictionary of form {"text": "...", "finish_reason": "..."} from streamed chunks."""
if not streamed_chunks:
return {"text": ""}
completion = {"text": "".join(c.text for c in streamed_chunks if getattr(c, "text", None))}
if streamed_chunks[-1].finish_reason is not None:
completion["finish_reason"] = streamed_chunks[-1].finish_reason
if hasattr(streamed_chunks[0], "usage"):
completion["usage"] = streamed_chunks[0].usage
return completion


def _construct_tool_call_from_streamed_chunk(stored_tool_calls, tool_call_chunk=None, function_call_chunk=None):
"""Builds a tool_call dictionary from streamed function_call/tool_call chunks."""
if function_call_chunk:
if not stored_tool_calls:
stored_tool_calls.append({"name": getattr(function_call_chunk, "name", ""), "arguments": ""})
stored_tool_calls[0]["arguments"] += getattr(function_call_chunk, "arguments", "")
return
if not tool_call_chunk:
return
tool_call_idx = getattr(tool_call_chunk, "index", None)
tool_id = getattr(tool_call_chunk, "id", None)
tool_type = getattr(tool_call_chunk, "type", None)
function_call = getattr(tool_call_chunk, "function", None)
function_name = getattr(function_call, "name", "")
# Find tool call index in tool_calls list, as it may potentially arrive unordered (i.e. index 2 before 0)
list_idx = next(
(idx for idx, tool_call in enumerate(stored_tool_calls) if tool_call["index"] == tool_call_idx),
None,
)
if list_idx is None:
stored_tool_calls.append(
{"name": function_name, "arguments": "", "index": tool_call_idx, "tool_id": tool_id, "type": tool_type}
)
list_idx = -1
stored_tool_calls[list_idx]["arguments"] += getattr(function_call, "arguments", "")


def _construct_message_from_streamed_chunks(streamed_chunks: List[Any]) -> Dict[str, str]:
"""Constructs a chat completion message dictionary from streamed chunks.
The resulting message dictionary is of form:
{"content": "...", "role": "...", "tool_calls": [...], "finish_reason": "..."}
"""
message = {"content": "", "tool_calls": []}
for chunk in streamed_chunks:
if getattr(chunk, "usage", None):
message["usage"] = chunk.usage
if not hasattr(chunk, "delta"):
continue
if getattr(chunk, "index", None) and not message.get("index"):
message["index"] = chunk.index
if getattr(chunk.delta, "role") and not message.get("role"):
message["role"] = chunk.delta.role
if getattr(chunk, "finish_reason", None) and not message.get("finish_reason"):
message["finish_reason"] = chunk.finish_reason
chunk_content = getattr(chunk.delta, "content", "")
if chunk_content:
message["content"] += chunk_content
continue
function_call = getattr(chunk.delta, "function_call", None)
if function_call:
_construct_tool_call_from_streamed_chunk(message["tool_calls"], function_call_chunk=function_call)
tool_calls = getattr(chunk.delta, "tool_calls", None)
if not tool_calls:
continue
for tool_call in tool_calls:
_construct_tool_call_from_streamed_chunk(message["tool_calls"], tool_call_chunk=tool_call)
if message["tool_calls"]:
message["tool_calls"].sort(key=lambda x: x.get("index", 0))
else:
message.pop("tool_calls", None)
message["content"] = message["content"].strip()
return message


def _tag_streamed_response(integration, span, completions_or_messages=None):
"""Tagging logic for streamed completions and chat completions."""
for idx, choice in enumerate(completions_or_messages):
Expand Down
Loading
Loading