Skip to content

chore(llmobs): minimize number of span/eval encoding #13273

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 37 additions & 24 deletions ddtrace/llmobs/_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def __init__(
) -> None:
super(BaseLLMObsWriter, self).__init__(interval=interval)
self._lock = forksafe.RLock()
self._buffer: List[Union[LLMObsSpanEvent, LLMObsEvaluationMetricEvent]] = []
self._buffer: List[str] = []
self._buffer_size: int = 0
self._timeout: float = timeout
self._api_key: str = _api_key or config._dd_api_key
Expand Down Expand Up @@ -148,7 +148,7 @@ def stop(self, timeout=None):
def on_shutdown(self):
self.periodic()

def _enqueue(self, event: Union[LLMObsSpanEvent, LLMObsEvaluationMetricEvent], event_size: int) -> None:
def _enqueue(self, encoded_event: str, event_size: int) -> None:
"""Internal shared logic of enqueuing events to be submitted to LLM Observability."""
with self._lock:
if len(self._buffer) >= self.BUFFER_LIMIT:
Expand All @@ -160,7 +160,7 @@ def _enqueue(self, event: Union[LLMObsSpanEvent, LLMObsEvaluationMetricEvent], e
if self._buffer_size + event_size > EVP_PAYLOAD_SIZE_LIMIT:
logger.debug("manually flushing buffer because queueing next event will exceed EVP payload limit")
self.periodic()
self._buffer.append(event)
self._buffer.append(encoded_event)
self._buffer_size += event_size

def _encode(self, payload, num_events):
Expand All @@ -177,7 +177,7 @@ def periodic(self) -> None:
with self._lock:
if not self._buffer:
return
events = self._buffer
enc_events = self._buffer
self._buffer = []
self._buffer_size = 0

Expand All @@ -188,16 +188,17 @@ def periodic(self) -> None:
"`LLMObs.enable(api_key=...)` before running your application."
)
return
data = self._data(events)
enc_llm_events = self._encode(data, len(events))
if not enc_llm_events:
return
payload = self._data(enc_events)
try:
self._send_payload_with_retry(enc_llm_events, len(events))
self._send_payload_with_retry(payload, len(enc_events))
except Exception:
telemetry.record_dropped_payload(len(events), event_type=self.EVENT_TYPE, error="connection_error")
telemetry.record_dropped_payload(len(enc_events), event_type=self.EVENT_TYPE, error="connection_error")
logger.error(
"failed to send %d LLMObs %s events to %s", len(events), self.EVENT_TYPE, self._intake, exc_info=True
"failed to send %d LLMObs %s events to %s",
len(enc_events),
self.EVENT_TYPE,
self._intake,
exc_info=True,
)

def _send_payload(self, payload: bytes, num_events: int):
Expand Down Expand Up @@ -263,11 +264,16 @@ class LLMObsEvalMetricWriter(BaseLLMObsWriter):
ENDPOINT = EVAL_ENDPOINT

def enqueue(self, event: LLMObsEvaluationMetricEvent) -> None:
event_size = len(safe_json(event))
self._enqueue(event, event_size)
encoded_event = self._encode(event, 1)
if not encoded_event:
return
event_size = len(encoded_event)
self._enqueue(encoded_event, event_size)

def _data(self, events: List[LLMObsEvaluationMetricEvent]) -> Dict[str, Any]:
return {"data": {"type": "evaluation_metric", "attributes": {"metrics": events}}}
def _data(self, events: List[str]) -> str:
metrics = ",".join(events)
enc_payload = f'{{"data": {{"type": "evaluation_metric", "attributes": {{"metrics": [{metrics}]}}}}}}'
return enc_payload


class LLMObsSpanWriter(BaseLLMObsWriter):
Expand All @@ -279,7 +285,10 @@ class LLMObsSpanWriter(BaseLLMObsWriter):
ENDPOINT = SPAN_ENDPOINT

def enqueue(self, event: LLMObsSpanEvent) -> None:
raw_event_size = len(safe_json(event))
encoded_event = self._encode(event, 1)
if not encoded_event:
return
raw_event_size = len(encoded_event)
truncated_event_size = None
should_truncate = raw_event_size >= EVP_EVENT_SIZE_LIMIT
if should_truncate:
Expand All @@ -288,16 +297,20 @@ def enqueue(self, event: LLMObsSpanEvent) -> None:
raw_event_size,
)
event = _truncate_span_event(event)
truncated_event_size = len(safe_json(event))
encoded_event = self._encode(event, 1)
truncated_event_size = len(encoded_event)
telemetry.record_span_event_raw_size(event, raw_event_size)
telemetry.record_span_event_size(event, truncated_event_size or raw_event_size)
self._enqueue(event, truncated_event_size or raw_event_size)

def _data(self, events: List[LLMObsSpanEvent]) -> List[Dict[str, Any]]:
return [
{"_dd.stage": "raw", "_dd.tracer_version": ddtrace.__version__, "event_type": "span", "spans": [event]}
for event in events
]
self._enqueue(encoded_event, truncated_event_size or raw_event_size)

def _data(self, events: List[str]) -> str:
payload = []
tracer_version = ddtrace.__version__
for event in events:
payload.append(
f'{{"_dd.stage": "raw", "_dd.tracer_version": "{tracer_version}", "event_type": "span", "spans": [{event}]}}'
)
return "[{}]".format(",".join(payload))


def _truncate_span_event(event: LLMObsSpanEvent) -> LLMObsSpanEvent:
Expand Down
Loading