diff --git a/ddtrace/llmobs/_writer.py b/ddtrace/llmobs/_writer.py index b80675bcc47..e7c05b03da3 100644 --- a/ddtrace/llmobs/_writer.py +++ b/ddtrace/llmobs/_writer.py @@ -111,7 +111,7 @@ def __init__( ) -> None: super(BaseLLMObsWriter, self).__init__(interval=interval) self._lock = forksafe.RLock() - self._buffer: List[Union[LLMObsSpanEvent, LLMObsEvaluationMetricEvent]] = [] + self._buffer: List[str] = [] self._buffer_size: int = 0 self._timeout: float = timeout self._api_key: str = _api_key or config._dd_api_key @@ -148,7 +148,7 @@ def stop(self, timeout=None): def on_shutdown(self): self.periodic() - def _enqueue(self, event: Union[LLMObsSpanEvent, LLMObsEvaluationMetricEvent], event_size: int) -> None: + def _enqueue(self, encoded_event: str, event_size: int) -> None: """Internal shared logic of enqueuing events to be submitted to LLM Observability.""" with self._lock: if len(self._buffer) >= self.BUFFER_LIMIT: @@ -160,7 +160,7 @@ def _enqueue(self, event: Union[LLMObsSpanEvent, LLMObsEvaluationMetricEvent], e if self._buffer_size + event_size > EVP_PAYLOAD_SIZE_LIMIT: logger.debug("manually flushing buffer because queueing next event will exceed EVP payload limit") self.periodic() - self._buffer.append(event) + self._buffer.append(encoded_event) self._buffer_size += event_size def _encode(self, payload, num_events): @@ -177,7 +177,7 @@ def periodic(self) -> None: with self._lock: if not self._buffer: return - events = self._buffer + enc_events = self._buffer self._buffer = [] self._buffer_size = 0 @@ -188,16 +188,17 @@ def periodic(self) -> None: "`LLMObs.enable(api_key=...)` before running your application." ) return - data = self._data(events) - enc_llm_events = self._encode(data, len(events)) - if not enc_llm_events: - return + payload = self._data(enc_events) try: - self._send_payload_with_retry(enc_llm_events, len(events)) + self._send_payload_with_retry(payload, len(enc_events)) except Exception: - telemetry.record_dropped_payload(len(events), event_type=self.EVENT_TYPE, error="connection_error") + telemetry.record_dropped_payload(len(enc_events), event_type=self.EVENT_TYPE, error="connection_error") logger.error( - "failed to send %d LLMObs %s events to %s", len(events), self.EVENT_TYPE, self._intake, exc_info=True + "failed to send %d LLMObs %s events to %s", + len(enc_events), + self.EVENT_TYPE, + self._intake, + exc_info=True, ) def _send_payload(self, payload: bytes, num_events: int): @@ -263,11 +264,16 @@ class LLMObsEvalMetricWriter(BaseLLMObsWriter): ENDPOINT = EVAL_ENDPOINT def enqueue(self, event: LLMObsEvaluationMetricEvent) -> None: - event_size = len(safe_json(event)) - self._enqueue(event, event_size) + encoded_event = self._encode(event, 1) + if not encoded_event: + return + event_size = len(encoded_event) + self._enqueue(encoded_event, event_size) - def _data(self, events: List[LLMObsEvaluationMetricEvent]) -> Dict[str, Any]: - return {"data": {"type": "evaluation_metric", "attributes": {"metrics": events}}} + def _data(self, events: List[str]) -> str: + metrics = ",".join(events) + enc_payload = f'{{"data": {{"type": "evaluation_metric", "attributes": {{"metrics": [{metrics}]}}}}}}' + return enc_payload class LLMObsSpanWriter(BaseLLMObsWriter): @@ -279,7 +285,10 @@ class LLMObsSpanWriter(BaseLLMObsWriter): ENDPOINT = SPAN_ENDPOINT def enqueue(self, event: LLMObsSpanEvent) -> None: - raw_event_size = len(safe_json(event)) + encoded_event = self._encode(event, 1) + if not encoded_event: + return + raw_event_size = len(encoded_event) truncated_event_size = None should_truncate = raw_event_size >= EVP_EVENT_SIZE_LIMIT if should_truncate: @@ -288,16 +297,20 @@ def enqueue(self, event: LLMObsSpanEvent) -> None: raw_event_size, ) event = _truncate_span_event(event) - truncated_event_size = len(safe_json(event)) + encoded_event = self._encode(event, 1) + truncated_event_size = len(encoded_event) telemetry.record_span_event_raw_size(event, raw_event_size) telemetry.record_span_event_size(event, truncated_event_size or raw_event_size) - self._enqueue(event, truncated_event_size or raw_event_size) - - def _data(self, events: List[LLMObsSpanEvent]) -> List[Dict[str, Any]]: - return [ - {"_dd.stage": "raw", "_dd.tracer_version": ddtrace.__version__, "event_type": "span", "spans": [event]} - for event in events - ] + self._enqueue(encoded_event, truncated_event_size or raw_event_size) + + def _data(self, events: List[str]) -> str: + payload = [] + tracer_version = ddtrace.__version__ + for event in events: + payload.append( + f'{{"_dd.stage": "raw", "_dd.tracer_version": "{tracer_version}", "event_type": "span", "spans": [{event}]}}' + ) + return "[{}]".format(",".join(payload)) def _truncate_span_event(event: LLMObsSpanEvent) -> LLMObsSpanEvent: