From 4d9ef5f80b9a6c59b9fae0ff2809b30eab508027 Mon Sep 17 00:00:00 2001 From: adityamehra Date: Thu, 13 Nov 2025 10:46:29 -0800 Subject: [PATCH 1/5] use pypi translator clean-up Dockerfile --- .../traceloop/{Dockerfile.tl => Dockerfile} | 8 +- .../traceloop/Dockerfile.lc | 69 ------ .../traceloop/cronjob-tl-lc.yaml | 151 ------------- .../{cronjob-tl.yaml => cronjob.yaml} | 6 +- .../traceloop/{main_traceloop.py => main.py} | 12 ++ .../traceloop/requirements.old.txt | 84 -------- .../traceloop/requirements.traceloop.txt | 200 ------------------ .../traceloop/requirements.txt | 13 ++ 8 files changed, 32 insertions(+), 511 deletions(-) rename instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/{Dockerfile.tl => Dockerfile} (87%) delete mode 100644 instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/Dockerfile.lc delete mode 100644 instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/cronjob-tl-lc.yaml rename instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/{cronjob-tl.yaml => cronjob.yaml} (98%) rename instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/{main_traceloop.py => main.py} (97%) delete mode 100644 instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/requirements.old.txt delete mode 100644 instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/requirements.traceloop.txt create mode 100644 instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/requirements.txt diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/Dockerfile.tl b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/Dockerfile similarity index 87% rename from instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/Dockerfile.tl rename to instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/Dockerfile index 0f2e7c4..a77757e 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/Dockerfile.tl +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/Dockerfile @@ -11,13 +11,13 @@ RUN apt-get update && apt-get install -y --no-install-recommends \ && rm -rf /var/lib/apt/lists/* # Copy required packages (build context is repo root) -COPY util/opentelemetry-util-genai-traceloop-translator /app/opentelemetry-util-genai-traceloop-translator +# COPY util/opentelemetry-util-genai-traceloop-translator /app/opentelemetry-util-genai-traceloop-translator COPY instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner /app/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner WORKDIR /app/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop # Install Python dependencies (excluding local -e packages and git+ssh dependencies) -RUN pip install --no-cache-dir -r requirements.traceloop.txt +RUN pip install --no-cache-dir -r requirements.txt # Verify packages are installed correctly RUN python3 -c "from opentelemetry.util.genai.handler import get_telemetry_handler; print('✓ GenAI handler available')" && \ @@ -28,7 +28,7 @@ RUN python3 -c "from opentelemetry.util.genai.handler import get_telemetry_handl python3 -c "from opentelemetry.util.genai.traceloop import enable_traceloop_translator; print('✓ Traceloop translator available')" # Make the script executable -RUN chmod +x main_traceloop.py +RUN chmod +x main.py # Set default environment variables ENV OTEL_PYTHON_LOG_CORRELATION=true \ @@ -37,4 +37,4 @@ ENV OTEL_PYTHON_LOG_CORRELATION=true \ PYTHONUNBUFFERED=1 # Run the Traceloop version -CMD ["python3", "main_traceloop.py"] \ No newline at end of file +CMD ["python3", "main.py"] \ No newline at end of file diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/Dockerfile.lc b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/Dockerfile.lc deleted file mode 100644 index 5f14b1c..0000000 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/Dockerfile.lc +++ /dev/null @@ -1,69 +0,0 @@ -# Use Python 3.12 as base image -FROM python:3.12-slim - -# Set working directory -WORKDIR /app - -# Install system dependencies -RUN apt-get update && apt-get install -y --no-install-recommends \ - git \ - && rm -rf /var/lib/apt/lists/* - -# Copy the entire instrumentation-genai and util directories to maintain package paths -# Build context is at repo root to access both instrumentation-genai/ and util/ -COPY instrumentation-genai/opentelemetry-instrumentation-langchain /app/opentelemetry-instrumentation-langchain -COPY util/opentelemetry-util-genai /app/opentelemetry-util-genai -COPY util/opentelemetry-util-genai-traceloop-translator /app/opentelemetry-util-genai-traceloop-translator -COPY util/opentelemetry-util-genai-evals /app/opentelemetry-util-genai-evals -COPY util/opentelemetry-util-genai-evals-deepeval /app/opentelemetry-util-genai-evals-deepeval -COPY util/opentelemetry-util-genai-emitters-splunk /app/opentelemetry-util-genai-emitters-splunk - -# Set working directory to the traceloop example -WORKDIR /app/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop - -# Install Python dependencies from requirements.traceloop.txt (excluding local -e packages and git+ssh) -# First, create a temporary requirements file without the local editable packages and SSH git dependencies -RUN grep -v "^-e" requirements.traceloop.txt | grep -v "git+ssh" > /tmp/requirements_external.txt && \ - pip install --no-cache-dir -r /tmp/requirements_external.txt && \ - rm /tmp/requirements_external.txt - -# Install the local packages in editable mode -# The Traceloop translator will enable zero-code instrumentation via .pth file -# splunk-otel-instrumentation-langchain IS installed -RUN cd /app/opentelemetry-util-genai && \ - pip install --no-cache-dir --no-deps -e . && \ - cd /app/opentelemetry-util-genai-evals && \ - pip install --no-cache-dir --no-deps -e . && \ - cd /app/opentelemetry-util-genai-evals-deepeval && \ - pip install --no-cache-dir --no-deps -e . && \ - cd /app/opentelemetry-util-genai-emitters-splunk && \ - pip install --no-cache-dir --no-deps -e . && \ - cd /app/opentelemetry-instrumentation-langchain && \ - pip install --no-cache-dir --no-deps -e . && \ - cd /app/opentelemetry-util-genai-traceloop-translator && \ - pip install --no-cache-dir --no-deps -e . - -# Verify packages are installed correctly -RUN python3 -c "from opentelemetry.util.genai.handler import get_telemetry_handler; print('✓ GenAI handler available')" && \ - python3 -c "from opentelemetry.util.genai.evals import create_evaluation_manager; print('✓ Evaluation manager available')" && \ - python3 -c "import opentelemetry.util.genai.emitters.splunk; print('✓ Splunk emitters available')" && \ - python3 -c "import opentelemetry.util.evaluator.deepeval; print('✓ Deepeval evaluator module available')" && \ - python3 -c "import deepeval; print('✓ Deepeval SDK installed')" && \ - python3 -c "from opentelemetry.util.genai.traceloop import enable_traceloop_translator; print('✓ Traceloop translator available')" - -# Make the script executable -RUN chmod +x main_traceloop.py - -# Set default environment variables for OpenTelemetry -ENV OTEL_PYTHON_LOG_CORRELATION=true \ - OTEL_PYTHON_LOG_LEVEL=info \ - OTEL_EXPORTER_OTLP_PROTOCOL=http/protobuf \ - PYTHONUNBUFFERED=1 - -# Health check (optional) -HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ - CMD python3 -c "import sys; sys.exit(0)" - -# Run the Traceloop version -CMD ["python3", "main_traceloop.py"] - diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/cronjob-tl-lc.yaml b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/cronjob-tl-lc.yaml deleted file mode 100644 index 02b1067..0000000 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/cronjob-tl-lc.yaml +++ /dev/null @@ -1,151 +0,0 @@ -apiVersion: batch/v1 -kind: CronJob -metadata: - name: trip-planner-tl - namespace: o11y-4-ai-admehra - labels: - app: trip-planner-tl - component: telemetry - annotations: - description: "Multi-agent trip planner with Traceloop translator and GenAI evaluations. splunk-otel-instrumentation-langchain is installed" - git-commit: "1b4045e" -spec: - # Run every 30 minutes from 8 AM to 5 PM PST on weekdays (Monday-Friday) - # Offset from travel-planner-tl by 45 minutes (runs at :15 and :45) - schedule: "15,45 8-17 * * 1-5" - timeZone: "America/Los_Angeles" - suspend: false - - # Keep last 3 successful and 1 failed job for debugging - successfulJobsHistoryLimit: 3 - failedJobsHistoryLimit: 1 - - jobTemplate: - metadata: - labels: - app: trip-planner-tl - component: telemetry - spec: - template: - metadata: - labels: - app: trip-planner-tl - component: telemetry - spec: - restartPolicy: OnFailure - - containers: - - name: trip-planner-traceloop - # Multi-platform image (amd64, arm64) with git commit hash tag - image: admehra621/trip-planner-tl-lc:1b4045e - imagePullPolicy: Always - - env: - # === GenAI Semantic Conventions (REQUIRED) === - - name: OTEL_SEMCONV_STABILITY_OPT_IN - value: "gen_ai_latest_experimental" - - # === OpenTelemetry Resource Attributes === - - name: OTEL_RESOURCE_ATTRIBUTES - value: "deployment.environment=o11y-inframon-ai,git.commit.id=1b4045e" - - # === Service name for telemetry === - - name: OTEL_SERVICE_NAME - value: "trip-planner-tl" - - # === OpenAI Configuration === - - name: OPENAI_API_KEY - valueFrom: - secretKeyRef: - name: openai-credentials - key: api-key - - - name: OPENAI_MODEL - value: "gpt-4o-mini" - - # === Deepeval Telemetry Opt-Out === - - name: DEEPEVAL_TELEMETRY_OPT_OUT - value: "1" - - # === GenAI Content Capture === - - name: OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT - value: "true" - - - name: OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT_MODE - value: "SPAN_AND_EVENT" - - # === GenAI Emitters Configuration === - - name: OTEL_INSTRUMENTATION_GENAI_EMITTERS - value: "span_metric_event,splunk" - - - name: OTEL_INSTRUMENTATION_GENAI_EMITTERS_EVALUATION - value: "replace-category:SplunkEvaluationResults" - - # === Evaluation Settings === - # All 5 default evaluations enabled (bias, toxicity, relevance, hallucination, sentiment) - - name: OTEL_INSTRUMENTATION_GENAI_EVALS_RESULTS_AGGREGATION - value: "true" - - # === GenAI Debug Flags (disabled for production) === - # - name: OTEL_GENAI_EVAL_DEBUG_SKIPS - # value: "true" - # - name: OTEL_GENAI_EVAL_DEBUG_EACH - # value: "true" - # - name: OTEL_INSTRUMENTATION_GENAI_DEBUG - # value: "true" - - # === OpenTelemetry Logs Exporter === - - name: OTEL_LOGS_EXPORTER - value: "otlp" - - # === Get the host IP for Splunk OTEL agent === - - name: SPLUNK_OTEL_AGENT - valueFrom: - fieldRef: - fieldPath: status.hostIP - - # === OpenTelemetry OTLP endpoint using Splunk agent === - - name: OTEL_EXPORTER_OTLP_ENDPOINT - value: "http://$(SPLUNK_OTEL_AGENT):4317" - - # === OTLP Protocol (grpc) === - - name: OTEL_EXPORTER_OTLP_PROTOCOL - value: "grpc" - - # === Exclude health check URLs === - - name: OTEL_PYTHON_EXCLUDED_URLS - value: "^(https?://)?[^/]+(/)?$" - - # === Enable Python logging auto instrumentation === - - name: OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED - value: "true" - - # === Enable log correlation === - - name: OTEL_PYTHON_LOG_CORRELATION - value: "true" - - # === Enable LangChain content capture === - - name: OTEL_INSTRUMENTATION_LANGCHAIN_CAPTURE_MESSAGE_CONTENT - value: "true" - - # === Enable Splunk profiler === - - name: SPLUNK_PROFILER_ENABLED - value: "true" - - # === Unbuffered Python output === - - name: PYTHONUNBUFFERED - value: "1" - - # === GenAI evaluation sampling rate === - - name: OTEL_GENAI_EVALUATION_SAMPLING_RATE - value: "1" - - # === Resource limits === - resources: - requests: - memory: "512Mi" - cpu: "500m" - limits: - memory: "1Gi" - cpu: "1000m" - diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/cronjob-tl.yaml b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/cronjob.yaml similarity index 98% rename from instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/cronjob-tl.yaml rename to instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/cronjob.yaml index e1e3c16..fdcf79a 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/cronjob-tl.yaml +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/cronjob.yaml @@ -8,7 +8,7 @@ metadata: component: telemetry annotations: description: "Multi-agent travel planner with Traceloop translator and GenAI evaluations. splunk-otel-instrumentation-langchain is NOT installed" - git-commit: "adc36a2" + git-commit: "8108b9f" spec: # Run every 30 minutes from 8 AM to 5 PM PST on weekdays (Monday-Friday) schedule: "*/30 8-17 * * 1-5" @@ -36,7 +36,7 @@ spec: containers: - name: travel-planner-traceloop # Multi-platform image (amd64, arm64) with git commit hash tag - image: admehra621/travel-planner-tl:adc36a2 + image: admehra621/travel-planner-tl:8108b9f imagePullPolicy: Always env: @@ -46,7 +46,7 @@ spec: # === OpenTelemetry Resource Attributes === - name: OTEL_RESOURCE_ATTRIBUTES - value: "deployment.environment=o11y-inframon-ai,git.commit.id=adc36a2" + value: "deployment.environment=o11y-inframon-ai,git.commit.id=8108b9f" # === Service name for telemetry === - name: OTEL_SERVICE_NAME diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/main_traceloop.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/main.py similarity index 97% rename from instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/main_traceloop.py rename to instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/main.py index e838132..e16fdc6 100755 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/main_traceloop.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/main.py @@ -127,8 +127,13 @@ def _configure_otlp_logging() -> None: This is needed for evaluation results to be emitted as OTLP log records. Traceloop SDK handles traces, but we need to explicitly configure logs. + + CRITICAL: Also configures EventLoggerProvider to use the same LoggerProvider, + since Events are just LogRecords and need the same exporter. """ from opentelemetry._logs import get_logger_provider + from opentelemetry import _events + from opentelemetry.sdk._events import EventLoggerProvider # Check if already configured try: @@ -160,6 +165,13 @@ def _configure_otlp_logging() -> None: logger_provider.add_log_record_processor(log_processor) set_logger_provider(logger_provider) print(f"[INIT] OTLP logging configured, endpoint={log_endpoint}") + + # CRITICAL FIX: Configure EventLoggerProvider to use the same LoggerProvider + # Events are just LogRecords under the hood, so they need to go through the + # same LoggerProvider with the OTLP exporter. Without this, events go to + # a default/NoOp provider and never reach the collector! + _events.set_event_logger_provider(EventLoggerProvider(logger_provider)) + print(f"[INIT] EventLoggerProvider configured (uses same OTLP exporter)") # Configure logging for evaluation results diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/requirements.old.txt b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/requirements.old.txt deleted file mode 100644 index c8c2080..0000000 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/requirements.old.txt +++ /dev/null @@ -1,84 +0,0 @@ -opentelemetry-util-http==0.59b0 -orjson==3.11.4 -ormsgpack==1.11.0 -packaging==25.0 -platformdirs==4.5.0 -pluggy==1.6.0 -portalocker==3.2.0 -posthog==3.25.0 -pre-commit==3.7.0 -propcache==0.4.1 -protobuf==6.33.0 -pyasn1==0.6.1 -pyasn1_modules==0.4.2 -pydantic==2.12.3 -pydantic-settings==2.11.0 -pydantic_core==2.41.4 -pyfiglet==1.0.4 -Pygments==2.19.2 -pylint==3.0.2 -pyright==1.1.404 -pytest==8.4.2 -pytest-asyncio==1.2.0 -pytest-cov==4.1.0 -pytest-repeat==0.9.4 -pytest-rerunfailures==12.0 -pytest-xdist==3.8.0 -python-dateutil==2.9.0.post0 -python-dotenv==1.2.1 -PyYAML==6.0.3 -readme-renderer==42.0 -regex==2025.10.23 -requests==2.32.5 -requests-toolbelt==1.0.0 -rich==14.2.0 -rsa==4.9.1 -ruamel.yaml==0.17.21 -ruff==0.6.9 -sentry-sdk==2.43.0 -setuptools==80.9.0 -shellingham==1.5.4 -six==1.17.0 -sniffio==1.3.1 -snowballstemmer==3.0.1 -Sphinx==7.1.2 -sphinx-autodoc-typehints==1.25.2 -sphinx-rtd-theme==2.0.0rc4 -sphinxcontrib-applehelp==2.0.0 -sphinxcontrib-devhelp==2.0.0 -sphinxcontrib-htmlhelp==2.1.0 -sphinxcontrib-jquery==4.1 -sphinxcontrib-jsmath==1.0.1 -sphinxcontrib-qthelp==2.0.0 -sphinxcontrib-serializinghtml==2.0.0 -# Editable install with no version control (splunk-otel-genai-emitters-splunk==0.1b0.dev0) --e /app/opentelemetry-util-genai-emitters-splunk -# Editable install with no version control (splunk-otel-genai-evals-deepeval==0.1b0.dev0) --e /app/opentelemetry-util-genai-evals-deepeval -# Editable install with no version control (splunk-otel-util-genai==0.1b0.dev0) --e /app/opentelemetry-util-genai -# Editable install with no version control (splunk-otel-util-genai-evals==0.1b0.dev0) --e /app/opentelemetry-util-genai-evals -# Editable install with no version control (splunk-otel-util-genai-translator-traceloop==0.0.0.dev0) --e /app/opentelemetry-util-genai-traceloop-translator -tabulate==0.9.0 -tenacity==9.1.2 -tiktoken==0.12.0 -tokenizers==0.22.1 -tomlkit==0.13.3 -tqdm==4.67.1 -traceloop-sdk==0.47.5 -typer==0.20.0 -typer-slim==0.20.0 -typing-inspection==0.4.2 -typing_extensions==4.15.0 -urllib3==2.5.0 -virtualenv==20.35.4 -webencodings==0.5.1 -websockets==15.0.1 -wheel==0.45.1 -wrapt==1.17.3 -xxhash==3.6.0 -yarl==1.22.0 -zipp==3.23.0 -zstandard==0.25.0 \ No newline at end of file diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/requirements.traceloop.txt b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/requirements.traceloop.txt deleted file mode 100644 index 1e0ca0d..0000000 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/requirements.traceloop.txt +++ /dev/null @@ -1,200 +0,0 @@ -aiohappyeyeballs==2.6.1 -aiohttp==3.13.2 -aiosignal==1.4.0 -alabaster==0.7.16 -annotated-types==0.7.0 -anthropic==0.72.0 -anyio==4.11.0 -asgiref==3.10.0 -astroid==3.0.3 -attrs==25.4.0 -babel==2.17.0 -backoff==2.2.1 -bleach==4.1.0 -cachetools==6.2.1 -certifi==2025.10.5 -cfgv==3.4.0 -charset-normalizer==3.4.4 -click==8.2.1 -codespell==2.1.0 -colorama==0.4.6 -coverage==7.11.0 -cuid==0.4 -deepeval==3.3.9 -Deprecated==1.3.1 -dill==0.4.0 -distlib==0.4.0 -distro==1.9.0 -docstring_parser==0.17.0 -docutils==0.20.1 -execnet==2.1.1 -filelock==3.20.0 -flaky==3.8.1 -frozenlist==1.8.0 -fsspec==2025.10.0 -google-auth==2.42.1 -google-genai==1.47.0 -googleapis-common-protos==1.71.0 -grpcio==1.76.0 -h11==0.16.0 -hf-xet==1.2.0 -httpcore==1.0.9 -httpretty==1.1.4 -httpx==0.28.1 -huggingface-hub==1.0.1 -identify==2.6.15 -idna==3.11 -imagesize==1.4.1 -importlib_metadata==8.7.0 -inflection==0.5.1 -iniconfig==2.3.0 -isort==5.13.2 -Jinja2==3.1.6 -jiter==0.11.1 -jsonpatch==1.33 -jsonpointer==3.0.0 -langchain==1.0.3 -langchain-core==1.0.2 -langchain-openai==1.0.1 -langgraph==1.0.2 -langgraph-checkpoint==3.0.0 -langgraph-prebuilt==1.0.2 -langgraph-sdk==0.2.9 -langsmith==0.4.39 -markdown-it-py==4.0.0 -MarkupSafe==3.0.3 -mccabe==0.7.0 -mdurl==0.1.2 -monotonic==1.6 -multidict==6.7.0 -nest-asyncio==1.6.0 -nh3==0.3.2 -nodeenv==1.9.1 -ollama==0.6.0 -openai==2.6.1 -opentelemetry-api @ git+https://github.com/open-telemetry/opentelemetry-python.git@1f68134481c46e476a127b61a36dc69889275d15#subdirectory=opentelemetry-api -opentelemetry-exporter-otlp==1.38.0 -opentelemetry-exporter-otlp-proto-common @ git+https://github.com/open-telemetry/opentelemetry-python.git@1f68134481c46e476a127b61a36dc69889275d15#subdirectory=exporter/opentelemetry-exporter-otlp-proto-common -opentelemetry-exporter-otlp-proto-grpc @ git+https://github.com/open-telemetry/opentelemetry-python.git@1f68134481c46e476a127b61a36dc69889275d15#subdirectory=exporter/opentelemetry-exporter-otlp-proto-grpc -opentelemetry-exporter-otlp-proto-http==1.38.0 -opentelemetry-instrumentation @ git+https://github.com/open-telemetry/opentelemetry-python-contrib.git@071f68697432e7e57b31238a5998dc4c1325855c#subdirectory=opentelemetry-instrumentation -# OpenTelemetry instrumentation packages - version must match traceloop-sdk version (0.47.5) -opentelemetry-instrumentation-alephalpha==0.47.5 -opentelemetry-instrumentation-anthropic==0.47.5 -opentelemetry-instrumentation-bedrock==0.47.5 -opentelemetry-instrumentation-chromadb==0.47.5 -opentelemetry-instrumentation-cohere==0.47.5 -opentelemetry-instrumentation-crewai==0.47.5 -opentelemetry-instrumentation-google-generativeai==0.47.5 -opentelemetry-instrumentation-groq==0.47.5 -opentelemetry-instrumentation-haystack==0.47.5 -opentelemetry-instrumentation-lancedb==0.47.5 -opentelemetry-instrumentation-llamaindex==0.47.5 -opentelemetry-instrumentation-logging==0.59b0 -opentelemetry-instrumentation-marqo==0.47.5 -opentelemetry-instrumentation-mcp==0.47.5 -opentelemetry-instrumentation-milvus==0.47.5 -opentelemetry-instrumentation-mistralai==0.47.5 -opentelemetry-instrumentation-ollama==0.47.5 -opentelemetry-instrumentation-openai==0.47.5 -opentelemetry-instrumentation-openai-agents==0.47.5 -opentelemetry-instrumentation-pinecone==0.47.5 -opentelemetry-instrumentation-qdrant==0.47.5 -opentelemetry-instrumentation-redis==0.59b0 -opentelemetry-instrumentation-replicate==0.47.5 -opentelemetry-instrumentation-requests==0.59b0 -opentelemetry-instrumentation-sagemaker==0.47.5 -opentelemetry-instrumentation-sqlalchemy==0.59b0 -opentelemetry-instrumentation-threading==0.59b0 -opentelemetry-instrumentation-together==0.47.5 -opentelemetry-instrumentation-transformers==0.47.5 -opentelemetry-instrumentation-urllib3==0.59b0 -opentelemetry-instrumentation-vertexai==0.47.5 -opentelemetry-instrumentation-watsonx==0.47.5 -opentelemetry-instrumentation-weaviate==0.47.5 -opentelemetry-instrumentation-writer==0.47.5 -opentelemetry-proto @ git+https://github.com/open-telemetry/opentelemetry-python.git@1f68134481c46e476a127b61a36dc69889275d15#subdirectory=opentelemetry-proto -opentelemetry-sdk @ git+https://github.com/open-telemetry/opentelemetry-python.git@1f68134481c46e476a127b61a36dc69889275d15#subdirectory=opentelemetry-sdk -opentelemetry-semantic-conventions @ git+https://github.com/open-telemetry/opentelemetry-python.git@1f68134481c46e476a127b61a36dc69889275d15#subdirectory=opentelemetry-semantic-conventions -opentelemetry-semantic-conventions-ai==0.4.13 -opentelemetry-test-utils @ git+https://github.com/open-telemetry/opentelemetry-python.git@1f68134481c46e476a127b61a36dc69889275d15#subdirectory=tests/opentelemetry-test-utils -opentelemetry-util-http==0.59b0 -orjson==3.11.4 -ormsgpack==1.11.0 -packaging==25.0 -platformdirs==4.5.0 -pluggy==1.6.0 -portalocker==3.2.0 -posthog==3.25.0 -pre-commit==3.7.0 -propcache==0.4.1 -protobuf==6.33.0 -pyasn1==0.6.1 -pyasn1_modules==0.4.2 -pydantic==2.12.3 -pydantic-settings==2.11.0 -pydantic_core==2.41.4 -pyfiglet==1.0.4 -Pygments==2.19.2 -pylint==3.0.2 -pyright==1.1.404 -pytest==8.4.2 -pytest-asyncio==1.2.0 -pytest-cov==4.1.0 -pytest-repeat==0.9.4 -pytest-rerunfailures==12.0 -pytest-xdist==3.8.0 -python-dateutil==2.9.0.post0 -python-dotenv==1.2.1 -PyYAML==6.0.3 -readme-renderer==42.0 -regex==2025.10.23 -requests==2.32.5 -requests-toolbelt==1.0.0 -rich==14.2.0 -rsa==4.9.1 -ruamel.yaml==0.17.21 -ruff==0.6.9 -sentry-sdk==2.43.0 -setuptools==80.9.0 -shellingham==1.5.4 -six==1.17.0 -sniffio==1.3.1 -snowballstemmer==3.0.1 -Sphinx==7.1.2 -sphinx-autodoc-typehints==1.25.2 -sphinx-rtd-theme==2.0.0rc4 -sphinxcontrib-applehelp==2.0.0 -sphinxcontrib-devhelp==2.0.0 -sphinxcontrib-htmlhelp==2.1.0 -sphinxcontrib-jquery==4.1 -sphinxcontrib-jsmath==1.0.1 -sphinxcontrib-qthelp==2.0.0 -sphinxcontrib-serializinghtml==2.0.0 -splunk-otel-genai-evals-deepeval>=0.1.6 -splunk-otel-util-genai -splunk-otel-util-genai-evals -splunk-otel-genai-emitters-splunk -# Editable install with no version control (splunk-otel-util-genai-translator-traceloop==0.0.0.dev0) --e /app/opentelemetry-util-genai-traceloop-translator -tabulate==0.9.0 -tenacity==9.1.2 -tiktoken==0.12.0 -tokenizers==0.22.1 -tomlkit==0.13.3 -tqdm==4.67.1 -traceloop-sdk==0.47.5 -typer==0.20.0 -typer-slim==0.20.0 -typing-inspection==0.4.2 -typing_extensions==4.15.0 -urllib3==2.5.0 -virtualenv==20.35.4 -webencodings==0.5.1 -websockets==15.0.1 -wheel==0.45.1 -wrapt==1.17.3 -xxhash==3.6.0 -yarl==1.22.0 -zipp==3.23.0 -zstandard==0.25.0 diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/requirements.txt b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/requirements.txt new file mode 100644 index 0000000..c5e96e2 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/requirements.txt @@ -0,0 +1,13 @@ +langchain>=1.0.0 +langchain-openai>=1.0.0 +langgraph>=1.0.0 +python-dotenv>=1.0.0 +traceloop-sdk==0.47.5 +deepeval +splunk-otel-genai-evals-deepeval>=0.1.6 +splunk-otel-util-genai +splunk-otel-util-genai-evals +splunk-otel-genai-emitters-splunk +splunk-otel-util-genai-translator-traceloop +python-dotenv +openai From dc14a6b02cf450d462a77cafcab57899f91cc47e Mon Sep 17 00:00:00 2001 From: adityamehra Date: Fri, 14 Nov 2025 14:12:31 -0800 Subject: [PATCH 2/5] Merge branch 'main' into feature/use-translator-pypi --- CHANGELOG.md | 20 - .../CHANGELOG.md | 7 +- .../client_server_version/client.py | 63 +-- .../client_server_version/main.py | 284 ++++++----- .../in_house_version/Dockerfile | 13 +- .../in_house_version/deployment.yaml | 13 +- .../in_house_version/main_in_house.py | 45 +- .../multi_agent_travel_planner/main.py | 13 +- .../CHANGELOG.md | 7 + .../CHANGELOG.md | 7 + .../CHANGELOG.md | 7 + .../opentelemetry/util/genai/evals/manager.py | 12 +- .../CHANGELOG.md | 7 + .../genai/processor/content_normalizer.py | 64 ++- .../processor/traceloop_span_processor.py | 443 +++++++++++------- .../util/genai/traceloop/__init__.py | 6 +- .../tests/test_args_wrapper_format.py | 233 +++++---- .../tests/test_message_serialization.py | 118 +++-- .../test_nested_traceloop_reconstruction.py | 307 +++++++----- util/opentelemetry-util-genai/CHANGELOG.md | 15 +- 20 files changed, 1013 insertions(+), 671 deletions(-) delete mode 100644 CHANGELOG.md create mode 100644 util/opentelemetry-util-genai-emitters-splunk/CHANGELOG.md create mode 100644 util/opentelemetry-util-genai-evals-deepeval/CHANGELOG.md create mode 100644 util/opentelemetry-util-genai-evals/CHANGELOG.md create mode 100644 util/opentelemetry-util-genai-traceloop-translator/CHANGELOG.md diff --git a/CHANGELOG.md b/CHANGELOG.md deleted file mode 100644 index d097989..0000000 --- a/CHANGELOG.md +++ /dev/null @@ -1,20 +0,0 @@ -# Changelog - -All notable changes to this repository are documented in this file. - -The format is based on the [Splunk GDI specification](https://github.com/signalfx/gdi-specification/blob/v1.0.0/specification/repository.md), -and this repository adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). - -## Unreleased - ---- - -## Version 0.1.0 - 2025-11-07 - -- Initial 0.1.0 release of all GenAI packages: - - splunk-otel-util-genai - - splunk-otel-util-genai-evals - - splunk-otel-genai-emitters-splunk - - splunk-otel-genai-evals-deepeval - - splunk-otel-instrumentation-langchain - - splunk-otel-util-genai-translator-traceloop diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/CHANGELOG.md b/instrumentation-genai/opentelemetry-instrumentation-langchain/CHANGELOG.md index 6209a70..0a4736f 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/CHANGELOG.md +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/CHANGELOG.md @@ -1,8 +1,7 @@ # Changelog -All notable changes to this project will be documented in this file. +All notable changes to this repository are documented in this file. -The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), -and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## Version 0.1.4 - 2025-11-07 -## Unreleased \ No newline at end of file +- Initial 0.1.4 release of splunk-otel-instrumentation-langchain diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/client_server_version/client.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/client_server_version/client.py index b5688e2..e0a29d8 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/client_server_version/client.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/client_server_version/client.py @@ -48,14 +48,14 @@ def generate_random_poison_config() -> dict: """Generate a random poison configuration for testing.""" # Random probability between 0.5 and 1.0 prob = round(random.uniform(0.5, 1.0), 2) - + # Random subset of poison types num_types = random.randint(1, len(POISON_TYPES)) types = random.sample(POISON_TYPES, num_types) - + # Random max snippets max_snippets = random.randint(1, 3) - + return { "prob": prob, "types": types, @@ -88,12 +88,12 @@ def run_client( # Select random or custom cities origin = custom_origin or random.choice(ORIGINS) destination = custom_destination or random.choice(DESTINATIONS) - + print("🌍 Travel Planner HTTP Client") print("=" * 60) print(f"📍 Origin: {origin}") print(f"🎯 Destination: {destination}") - + # Generate poison config if requested poison_config = None if use_poison: @@ -103,18 +103,18 @@ def run_client( print(f" Types: {', '.join(poison_config['types'])}") print(f" Max snippets: {poison_config['max']}") print(f" Seed: {poison_config['seed']}") - + # Generate user request user_request = generate_travel_request(origin, destination) print("\n✉️ User Request:") print(f" {user_request}") - + # Get server URL from environment or default to localhost server_url = os.getenv("SERVER_URL", "http://localhost:8080") - + print("\n🔌 Connecting to Flask server...") print(f" URL: {server_url}") - + # Prepare request data data = { "origin": origin, @@ -122,56 +122,56 @@ def run_client( "user_request": user_request, "travellers": random.randint(1, 4), } - + if poison_config: data["poison_config"] = poison_config - + print("\n📤 Sending request to server...") - + try: # Make HTTP POST request to /travel/plan endpoint response = requests.post( f"{server_url}/travel/plan", json=data, timeout=300, # 5 minutes timeout for long-running travel planning - headers={"Content-Type": "application/json"} + headers={"Content-Type": "application/json"}, ) response.raise_for_status() - + result = response.json() - + print("\n✅ Received response from server!") print("=" * 60) - + # Display the result print(f"\n📋 Session ID: {result['session_id']}") print(f"📅 Travel Dates: {result['departure']} → {result['return_date']}") print(f"👥 Travellers: {result['travellers']}") - - if result.get('poison_events'): + + if result.get("poison_events"): print("\n💉 Poison Events Triggered:") - for event in result['poison_events']: + for event in result["poison_events"]: print(f" - {event}") - + print("\n✈️ Flight Summary:") print(f" {result['flight_summary']}") - + print("\n🏨 Hotel Summary:") print(f" {result['hotel_summary']}") - + print("\n🎭 Activities Summary:") print(f" {result['activities_summary']}") - + print("\n🎉 Final Itinerary:") print("─" * 60) - print(result['final_itinerary']) + print(result["final_itinerary"]) print("─" * 60) - - if result.get('agent_steps'): + + if result.get("agent_steps"): print("\n🤖 Agent Steps:") - for step in result['agent_steps']: + for step in result["agent_steps"]: print(f" - {step['agent']}: {step['status']}") - + except requests.exceptions.Timeout: print("\n❌ Error: Request timed out after 5 minutes") sys.exit(1) @@ -192,7 +192,7 @@ def run_client( def main(): """Main entry point for the client.""" import argparse - + parser = argparse.ArgumentParser( description="Travel Planner MCP Client - Request travel itineraries with optional quality evaluation" ) @@ -211,9 +211,9 @@ def main(): type=str, help=f"Destination city (default: random from {DESTINATIONS})", ) - + args = parser.parse_args() - + try: run_client( use_poison=not args.no_poison, @@ -226,6 +226,7 @@ def main(): except Exception as e: print(f"\n\n❌ Error: {e}") import traceback + traceback.print_exc() sys.exit(1) diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/client_server_version/main.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/client_server_version/main.py index e3911a2..fe02490 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/client_server_version/main.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/client_server_version/main.py @@ -27,7 +27,7 @@ [Coord] --> [Flight] --> [Hotel] --> [Act.] --> [Synth] --> END | | | | | └──────────┼──────────┼──────────┼──────────┘ - | | | + | | | (OTEL Spans/Metrics) @@ -236,9 +236,7 @@ # Configure tracing/metrics/logging once per process so exported data goes to OTLP. trace.set_tracer_provider(TracerProvider()) -trace.get_tracer_provider().add_span_processor( - BatchSpanProcessor(OTLPSpanExporter()) -) +trace.get_tracer_provider().add_span_processor(BatchSpanProcessor(OTLPSpanExporter())) demo_tracer = trace.get_tracer("instrumentation.langchain.demo") @@ -365,9 +363,7 @@ def _model_name() -> str: return os.getenv("OPENAI_MODEL", "gpt-5-nano") -def _create_llm( - agent_name: str, *, temperature: float, session_id: str -) -> ChatOpenAI: +def _create_llm(agent_name: str, *, temperature: float, session_id: str) -> ChatOpenAI: """Create an LLM instance decorated with tags/metadata for tracing.""" model = _model_name() tags = [f"agent:{agent_name}", "travel-planner"] @@ -392,7 +388,9 @@ def _create_llm( # --------------------------------------------------------------------------- -def _poison_config(custom_config: Optional[Dict[str, object]] = None) -> Dict[str, object]: +def _poison_config( + custom_config: Optional[Dict[str, object]] = None, +) -> Dict[str, object]: """Read environment variables or custom config controlling prompt poisoning. TRAVEL_POISON_PROB: Base probability (0-1) that a given agent step is poisoned. @@ -404,13 +402,16 @@ def _poison_config(custom_config: Optional[Dict[str, object]] = None) -> Dict[st """ if custom_config: prob = float(custom_config.get("prob", 0.8)) - types = custom_config.get("types", [ - "hallucination", - "bias", - "irrelevance", - "negative_sentiment", - "toxicity", - ]) + types = custom_config.get( + "types", + [ + "hallucination", + "bias", + "irrelevance", + "negative_sentiment", + "toxicity", + ], + ) max_snippets = int(custom_config.get("max", 2)) seed = custom_config.get("seed") if seed: @@ -425,7 +426,9 @@ def _poison_config(custom_config: Optional[Dict[str, object]] = None) -> Dict[st "hallucination,bias,irrelevance,negative_sentiment,toxicity", ) # Proper comma split - types = [t.strip() for t in types_raw.split(",") if t.strip()] if types_raw else [] + types = ( + [t.strip() for t in types_raw.split(",") if t.strip()] if types_raw else [] + ) if not types: # fallback if split logic above produced empty list types = [ "hallucination", @@ -475,25 +478,26 @@ def _generate_poison_snippet(kind: str, agent_name: str) -> str: "could easily disappoint them." # negative framing ) if kind == "toxicity": - return ( - "Remark: The coordination process feels messy and slightly chaotic." # mild critical tone - ) + return "Remark: The coordination process feels messy and slightly chaotic." # mild critical tone return f"(poison:{kind})" def maybe_add_quality_noise( - agent_name: str, base_prompt: str, state: PlannerState, custom_poison_config: Optional[Dict[str, object]] = None + agent_name: str, + base_prompt: str, + state: PlannerState, + custom_poison_config: Optional[Dict[str, object]] = None, ) -> str: """Randomly inject one or more poisoning snippets into the prompt. Records injected types into state['poison_events'] for later tracing context. - + If custom_poison_config is explicitly None, no poisoning is applied. """ # If explicitly None, disable poisoning if custom_poison_config is None: return base_prompt - + cfg = _poison_config(custom_poison_config) if random.random() > cfg["prob"]: return base_prompt @@ -502,13 +506,9 @@ def maybe_add_quality_noise( random.shuffle(available) count = random.randint(1, min(cfg["max"], len(available))) chosen = available[:count] - snippets = [ - _generate_poison_snippet(kind, agent_name) for kind in chosen - ] + snippets = [_generate_poison_snippet(kind, agent_name) for kind in chosen] # Record events - state["poison_events"].extend( - [f"{agent_name}:{kind}" for kind in chosen] - ) + state["poison_events"].extend([f"{agent_name}:{kind}" for kind in chosen]) injected = base_prompt + "\n\n" + "\n".join(snippets) + "\n" return injected @@ -517,6 +517,7 @@ def maybe_add_quality_noise( # Pretty Printing Utilities # --------------------------------------------------------------------------- + def pretty_print_message(message, indent=False): """Pretty print a single langchain message.""" try: @@ -524,7 +525,7 @@ def pretty_print_message(message, indent=False): if not indent: print(pretty_message, file=sys.stderr, flush=True) return - + indented = "\n".join("\t" + c for c in pretty_message.split("\n")) print(indented, file=sys.stderr, flush=True) except Exception: @@ -540,30 +541,34 @@ def pretty_print_messages(update, last_message=False): # skip parent graph updates in the printouts if len(ns) == 0: return - + graph_id = ns[-1].split(":")[0] print(f"\n🔹 Update from subgraph {graph_id}:", file=sys.stderr, flush=True) is_subgraph = True - + for node_name, node_update in update.items(): update_label = f"📍 Update from node {node_name}:" if is_subgraph: update_label = "\t" + update_label - + print(f"\n{update_label}", file=sys.stderr, flush=True) - + # Check if node_update has messages if "messages" in node_update: try: messages = convert_to_messages(node_update["messages"]) if last_message: messages = messages[-1:] - + for m in messages: pretty_print_message(m, indent=is_subgraph) except Exception as e: - print(f" (Could not pretty print messages: {e})", file=sys.stderr, flush=True) - + print( + f" (Could not pretty print messages: {e})", + file=sys.stderr, + flush=True, + ) + print("", file=sys.stderr, flush=True) @@ -604,22 +609,19 @@ def _http_root_attributes(state: PlannerState) -> Dict[str, str]: # --------------------------------------------------------------------------- -def coordinator_node(state: PlannerState, custom_poison_config: Optional[Dict[str, object]] = None) -> PlannerState: - llm = _create_llm( - "coordinator", temperature=0.2, session_id=state["session_id"] - ) - agent = ( - _create_react_agent(llm, tools=[]) - .with_config( - { - "run_name": "coordinator", - "tags": ["agent", "agent:coordinator"], - "metadata": { - "agent_name": "coordinator", - "session_id": state["session_id"], - }, - } - ) +def coordinator_node( + state: PlannerState, custom_poison_config: Optional[Dict[str, object]] = None +) -> PlannerState: + llm = _create_llm("coordinator", temperature=0.2, session_id=state["session_id"]) + agent = _create_react_agent(llm, tools=[]).with_config( + { + "run_name": "coordinator", + "tags": ["agent", "agent:coordinator"], + "metadata": { + "agent_name": "coordinator", + "session_id": state["session_id"], + }, + } ) system_message = SystemMessage( content=( @@ -643,27 +645,29 @@ def coordinator_node(state: PlannerState, custom_poison_config: Optional[Dict[st return state -def flight_specialist_node(state: PlannerState, custom_poison_config: Optional[Dict[str, object]] = None) -> PlannerState: +def flight_specialist_node( + state: PlannerState, custom_poison_config: Optional[Dict[str, object]] = None +) -> PlannerState: llm = _create_llm( "flight_specialist", temperature=0.4, session_id=state["session_id"] ) - agent = ( - _create_react_agent(llm, tools=[mock_search_flights]).with_config( - { - "run_name": "flight_specialist", - "tags": ["agent", "agent:flight_specialist"], - "metadata": { - "agent_name": "flight_specialist", - "session_id": state["session_id"], - }, - } - ) + agent = _create_react_agent(llm, tools=[mock_search_flights]).with_config( + { + "run_name": "flight_specialist", + "tags": ["agent", "agent:flight_specialist"], + "metadata": { + "agent_name": "flight_specialist", + "session_id": state["session_id"], + }, + } ) step = ( f"Find an appealing flight from {state['origin']} to {state['destination']} " f"departing {state['departure']} for {state['travellers']} travellers." ) - step = maybe_add_quality_noise("flight_specialist", step, state, custom_poison_config) + step = maybe_add_quality_noise( + "flight_specialist", step, state, custom_poison_config + ) result = agent.invoke({"messages": [HumanMessage(content=step)]}) final_message = result["messages"][-1] state["flight_summary"] = ( @@ -680,27 +684,29 @@ def flight_specialist_node(state: PlannerState, custom_poison_config: Optional[D return state -def hotel_specialist_node(state: PlannerState, custom_poison_config: Optional[Dict[str, object]] = None) -> PlannerState: +def hotel_specialist_node( + state: PlannerState, custom_poison_config: Optional[Dict[str, object]] = None +) -> PlannerState: llm = _create_llm( "hotel_specialist", temperature=0.5, session_id=state["session_id"] ) - agent = ( - _create_react_agent(llm, tools=[mock_search_hotels]).with_config( - { - "run_name": "hotel_specialist", - "tags": ["agent", "agent:hotel_specialist"], - "metadata": { - "agent_name": "hotel_specialist", - "session_id": state["session_id"], - }, - } - ) + agent = _create_react_agent(llm, tools=[mock_search_hotels]).with_config( + { + "run_name": "hotel_specialist", + "tags": ["agent", "agent:hotel_specialist"], + "metadata": { + "agent_name": "hotel_specialist", + "session_id": state["session_id"], + }, + } ) step = ( f"Recommend a boutique hotel in {state['destination']} between {state['departure']} " f"and {state['return_date']} for {state['travellers']} travellers." ) - step = maybe_add_quality_noise("hotel_specialist", step, state, custom_poison_config) + step = maybe_add_quality_noise( + "hotel_specialist", step, state, custom_poison_config + ) result = agent.invoke({"messages": [HumanMessage(content=step)]}) final_message = result["messages"][-1] state["hotel_summary"] = ( @@ -717,24 +723,26 @@ def hotel_specialist_node(state: PlannerState, custom_poison_config: Optional[Di return state -def activity_specialist_node(state: PlannerState, custom_poison_config: Optional[Dict[str, object]] = None) -> PlannerState: +def activity_specialist_node( + state: PlannerState, custom_poison_config: Optional[Dict[str, object]] = None +) -> PlannerState: llm = _create_llm( "activity_specialist", temperature=0.6, session_id=state["session_id"] ) - agent = ( - _create_react_agent(llm, tools=[mock_search_activities]).with_config( - { - "run_name": "activity_specialist", - "tags": ["agent", "agent:activity_specialist"], - "metadata": { - "agent_name": "activity_specialist", - "session_id": state["session_id"], - }, - } - ) + agent = _create_react_agent(llm, tools=[mock_search_activities]).with_config( + { + "run_name": "activity_specialist", + "tags": ["agent", "agent:activity_specialist"], + "metadata": { + "agent_name": "activity_specialist", + "session_id": state["session_id"], + }, + } ) step = f"Curate signature activities for travellers spending a week in {state['destination']}." - step = maybe_add_quality_noise("activity_specialist", step, state, custom_poison_config) + step = maybe_add_quality_noise( + "activity_specialist", step, state, custom_poison_config + ) result = agent.invoke({"messages": [HumanMessage(content=step)]}) final_message = result["messages"][-1] state["activities_summary"] = ( @@ -751,7 +759,9 @@ def activity_specialist_node(state: PlannerState, custom_poison_config: Optional return state -def plan_synthesizer_node(state: PlannerState, custom_poison_config: Optional[Dict[str, object]] = None) -> PlannerState: +def plan_synthesizer_node( + state: PlannerState, custom_poison_config: Optional[Dict[str, object]] = None +) -> PlannerState: llm = _create_llm( "plan_synthesizer", temperature=0.3, session_id=state["session_id"] ) @@ -801,14 +811,30 @@ def should_continue(state: PlannerState) -> str: return mapping.get(state["current_agent"], END) -def build_workflow(custom_poison_config: Optional[Dict[str, object]] = None) -> StateGraph: +def build_workflow( + custom_poison_config: Optional[Dict[str, object]] = None, +) -> StateGraph: graph = StateGraph(PlannerState) # Wrap nodes to pass custom_poison_config - graph.add_node("coordinator", lambda state: coordinator_node(state, custom_poison_config)) - graph.add_node("flight_specialist", lambda state: flight_specialist_node(state, custom_poison_config)) - graph.add_node("hotel_specialist", lambda state: hotel_specialist_node(state, custom_poison_config)) - graph.add_node("activity_specialist", lambda state: activity_specialist_node(state, custom_poison_config)) - graph.add_node("plan_synthesizer", lambda state: plan_synthesizer_node(state, custom_poison_config)) + graph.add_node( + "coordinator", lambda state: coordinator_node(state, custom_poison_config) + ) + graph.add_node( + "flight_specialist", + lambda state: flight_specialist_node(state, custom_poison_config), + ) + graph.add_node( + "hotel_specialist", + lambda state: hotel_specialist_node(state, custom_poison_config), + ) + graph.add_node( + "activity_specialist", + lambda state: activity_specialist_node(state, custom_poison_config), + ) + graph.add_node( + "plan_synthesizer", + lambda state: plan_synthesizer_node(state, custom_poison_config), + ) graph.add_conditional_edges(START, should_continue) graph.add_conditional_edges("coordinator", should_continue) graph.add_conditional_edges("flight_specialist", should_continue) @@ -875,15 +901,13 @@ def plan_travel_internal( ], } ] - + with tracer.start_as_current_span( name="POST /travel/plan", kind=SpanKind.SERVER, attributes=attributes, ) as root_span: - root_span.set_attribute( - "gen_ai.input.messages", json.dumps(root_input) - ) + root_span.set_attribute("gen_ai.input.messages", json.dumps(root_input)) config = { "configurable": {"thread_id": session_id}, @@ -896,10 +920,7 @@ def plan_travel_internal( for step in compiled_app.stream(initial_state, config): node_name, node_state = next(iter(step.items())) final_state = node_state - agent_steps.append({ - "agent": node_name, - "status": "completed" - }) + agent_steps.append({"agent": node_name, "status": "completed"}) if not final_state: final_plan = "" @@ -907,9 +928,7 @@ def plan_travel_internal( final_plan = final_state.get("final_itinerary") or "" if final_plan: - preview = final_plan[:500] + ( - "..." if len(final_plan) > 500 else "" - ) + preview = final_plan[:500] + ("..." if len(final_plan) > 500 else "") root_span.set_attribute("travel.plan.preview", preview) if final_state and final_state.get("poison_events"): root_span.set_attribute( @@ -946,35 +965,41 @@ def plan_travel_internal( "travellers": travellers, "flight_summary": final_state.get("flight_summary") if final_state else None, "hotel_summary": final_state.get("hotel_summary") if final_state else None, - "activities_summary": final_state.get("activities_summary") if final_state else None, + "activities_summary": final_state.get("activities_summary") + if final_state + else None, "final_itinerary": final_plan, "poison_events": final_state.get("poison_events") if final_state else [], "agent_steps": agent_steps, } -@app.route('/travel/plan', methods=['POST']) +@app.route("/travel/plan", methods=["POST"]) def plan(): """Handle travel planning requests via HTTP POST.""" try: data = request.get_json() - + origin = data.get("origin", "Seattle") destination = data.get("destination", "Paris") user_request = data.get( "user_request", f"Planning a week-long trip from {origin} to {destination}. " - "Looking for boutique hotel, flights and unique experiences." + "Looking for boutique hotel, flights and unique experiences.", ) travellers = int(data.get("travellers", 2)) - + # Parse poison config from client poison_config = None if "poison_config" in data: poison_config = data["poison_config"] - print(f"[SERVER] Processing travel plan: {origin} -> {destination}", file=sys.stderr, flush=True) - + print( + f"[SERVER] Processing travel plan: {origin} -> {destination}", + file=sys.stderr, + flush=True, + ) + result = plan_travel_internal( origin=origin, destination=destination, @@ -982,28 +1007,37 @@ def plan(): travellers=travellers, poison_config=poison_config, ) - - print("[SERVER] Travel plan completed successfully", file=sys.stderr, flush=True) - print("\n" + "="*80, file=sys.stderr) + + print( + "[SERVER] Travel plan completed successfully", file=sys.stderr, flush=True + ) + print("\n" + "=" * 80, file=sys.stderr) print("TRAVEL PLAN RESULT:", file=sys.stderr) pprint(result, stream=sys.stderr) - print("="*80 + "\n", file=sys.stderr, flush=True) + print("=" * 80 + "\n", file=sys.stderr, flush=True) return jsonify(result), 200 - + except Exception as e: - print(f"[SERVER] Error processing travel plan: {e}", file=sys.stderr, flush=True) + print( + f"[SERVER] Error processing travel plan: {e}", file=sys.stderr, flush=True + ) import traceback + traceback.print_exc(file=sys.stderr) return jsonify({"error": str(e)}), 500 -@app.route('/health', methods=['GET']) +@app.route("/health", methods=["GET"]) def health(): """Health check endpoint for k8s.""" return jsonify({"status": "healthy", "service": "travel-planner-flask"}), 200 if __name__ == "__main__": - print("[INFO] Starting Flask server on http://0.0.0.0:8080", file=sys.stderr, flush=True) - app.run(host="0.0.0.0", port=8080, debug=False) \ No newline at end of file + print( + "[INFO] Starting Flask server on http://0.0.0.0:8080", + file=sys.stderr, + flush=True, + ) + app.run(host="0.0.0.0", port=8080, debug=False) diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/in_house_version/Dockerfile b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/in_house_version/Dockerfile index af6e4c5..a4ab268 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/in_house_version/Dockerfile +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/in_house_version/Dockerfile @@ -1,6 +1,9 @@ # Use Python 3.12 slim image FROM python:3.12-slim +# Set environment variables +ENV PYTHONUNBUFFERED=1 + # Set working directory WORKDIR /app @@ -13,11 +16,11 @@ RUN apt-get update && apt-get install -y \ # Install Splunk OpenTelemetry packages from PyPI RUN pip install --no-cache-dir \ - splunk-otel-util-genai \ - splunk-otel-genai-emitters-splunk \ - splunk-otel-util-genai-evals \ - splunk-otel-genai-evals-deepeval \ - splunk-otel-instrumentation-langchain + splunk-otel-util-genai==0.1.4 \ + splunk-otel-genai-emitters-splunk==0.1.4 \ + splunk-otel-util-genai-evals==0.1.4 \ + splunk-otel-genai-evals-deepeval==0.1.6 \ + splunk-otel-instrumentation-langchain==0.1.4 # Copy requirements file (for other dependencies like langchain, langgraph, etc.) COPY instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/requirements.txt . diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/in_house_version/deployment.yaml b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/in_house_version/deployment.yaml index 3b2469a..0210c44 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/in_house_version/deployment.yaml +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/in_house_version/deployment.yaml @@ -17,12 +17,15 @@ spec: spec: containers: - name: travel-planner - image: shuniche855/travel-planner:0.0.4 + image: shuniche855/travel-planner:0.0.5 imagePullPolicy: Always ports: - containerPort: 8080 name: http env: + # Python unbuffered output for real-time logging + - name: PYTHONUNBUFFERED + value: "1" # Load OpenAI API key from secret - name: OPENAI_API_KEY valueFrom: @@ -66,7 +69,7 @@ spec: - name: OTEL_GENAI_EVAL_DEBUG_EACH value: "false" - name: OTEL_INSTRUMENTATION_GENAI_DEBUG - value: "false" + value: "true" - name: SPLUNK_PROFILER_ENABLED value: "true" # Set evaluation wait time to 10 seconds (short enough to avoid health check timeout) @@ -92,9 +95,9 @@ spec: path: /health port: 8080 initialDelaySeconds: 30 - periodSeconds: 10 - timeoutSeconds: 5 - failureThreshold: 6 + periodSeconds: 30 + timeoutSeconds: 10 + failureThreshold: 10 readinessProbe: httpGet: path: /health diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/in_house_version/main_in_house.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/in_house_version/main_in_house.py index 130362a..d4fc093 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/in_house_version/main_in_house.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/in_house_version/main_in_house.py @@ -58,7 +58,6 @@ ) from opentelemetry.sdk._logs import LoggerProvider # type: ignore[attr-defined] -from opentelemetry._logs import set_logger_provider # type: ignore[attr-defined] from opentelemetry.sdk._logs.export import BatchLogRecordProcessor # type: ignore[attr-defined] from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor @@ -80,6 +79,7 @@ # Health check endpoint for Kubernetes # --------------------------------------------------------------------------- + class HealthHandler(BaseHTTPRequestHandler): protocol_version = "HTTP/1.1" # Use HTTP/1.1 instead of HTTP/1.0 @@ -271,7 +271,9 @@ def run_travel_planner( "final_itinerary": final_plan, "flight_summary": final_state.get("flight_summary") if final_state else None, "hotel_summary": final_state.get("hotel_summary") if final_state else None, - "activities_summary": final_state.get("activities_summary") if final_state else None, + "activities_summary": final_state.get("activities_summary") + if final_state + else None, } @@ -368,6 +370,7 @@ def _coerce_content(raw: Any) -> str: # LLM metadata extraction helper # --------------------------------------------------------------------------- + def _apply_llm_response_metadata(message: Any, llm_invocation: LLMInvocation) -> None: """Populate LLMInvocation from a LangChain response message. @@ -413,11 +416,15 @@ def _apply_llm_response_metadata(message: Any, llm_invocation: LLMInvocation) -> token_usage: Any = resp_meta.get("token_usage") or resp_meta.get("usage") if isinstance(token_usage, dict): if input_tokens is None: - prompt_val = token_usage.get("prompt_tokens") or token_usage.get("input_tokens") + prompt_val = token_usage.get("prompt_tokens") or token_usage.get( + "input_tokens" + ) if isinstance(prompt_val, int): input_tokens = prompt_val if output_tokens is None: - completion_val = token_usage.get("completion_tokens") or token_usage.get("output_tokens") + completion_val = token_usage.get( + "completion_tokens" + ) or token_usage.get("output_tokens") if isinstance(completion_val, int): output_tokens = completion_val @@ -517,26 +524,15 @@ def _create_llm(agent_name: str, *, temperature: float, session_id: str) -> Chat def _configure_manual_instrumentation() -> None: """Initialise trace and metric providers that export to the configured OTLP endpoint.""" """Configure tracing/metrics/logging manually once per process so exported data goes to OTLP.""" - from opentelemetry.sdk.trace import TracerProvider - from opentelemetry.sdk.trace.export import BatchSpanProcessor - from opentelemetry.trace import SpanKind - from opentelemetry import _events, _logs, metrics, trace + from opentelemetry import _events, _logs, trace from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter - from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import ( - OTLPMetricExporter, - ) - from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import ( - OTLPSpanExporter, - ) from opentelemetry.sdk._events import EventLoggerProvider - from opentelemetry.sdk._logs import LoggerProvider - from opentelemetry.sdk._logs.export import BatchLogRecordProcessor - from opentelemetry.sdk.metrics import MeterProvider - from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader - + trace.set_tracer_provider(TracerProvider()) - trace.get_tracer_provider().add_span_processor(BatchSpanProcessor(OTLPSpanExporter())) + trace.get_tracer_provider().add_span_processor( + BatchSpanProcessor(OTLPSpanExporter()) + ) metric_reader = PeriodicExportingMetricReader(OTLPMetricExporter()) metrics.set_meter_provider(MeterProvider(metric_readers=[metric_reader])) @@ -547,6 +543,7 @@ def _configure_manual_instrumentation() -> None: ) _events.set_event_logger_provider(EventLoggerProvider()) + # --------------------------------------------------------------------------- # LangGraph nodes # --------------------------------------------------------------------------- @@ -618,7 +615,7 @@ def flight_specialist_node(state: PlannerState) -> PlannerState: ), ) handler.start_agent(agent_invocation) - + llm_invocation = LLMInvocation( request_model=_model_name(), operation="chat", @@ -673,7 +670,7 @@ def flight_specialist_node(state: PlannerState) -> PlannerState: ) ) state["current_agent"] = "hotel_specialist" - # Attach agent output messages + # Attach agent output messages # No direct output messages attribute assignment; output_result contains summary handler.stop_agent(agent_invocation) @@ -980,7 +977,9 @@ def main(argv: Optional[List[str]] = None) -> None: "and a few unique experiences." ) print(f"🚀 Running single request: {user_request}") - result = run_travel_planner(user_request=user_request, session_id="fake-session-001") + result = run_travel_planner( + user_request=user_request, session_id="fake-session-001" + ) print(f"✅ Planning completed for session: {result['session_id']}") wait_seconds = int(os.getenv("EVAL_WAIT_SECONDS", "300")) diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/main.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/main.py index e32aeff..ac65cc0 100644 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/main.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/main.py @@ -17,11 +17,12 @@ Coordinates a set of LangChain agents (coordinator, flight, hotel, activities, plan synthesizer) to build a travel itinerary to demonstrate OpenTelemetry LangChain -instrumentation. +instrumentation. See README.md for more information """ + import argparse import os import random @@ -291,6 +292,7 @@ def maybe_add_quality_noise( injected = base_prompt + "\n\n" + "\n".join(snippets) + "\n" return injected + # --------------------------------------------------------------------------- # LangGraph nodes # --------------------------------------------------------------------------- @@ -504,11 +506,11 @@ def build_workflow() -> StateGraph: # instrumentation is useful for debugging and development of instrumentation in IDE # --------------------------------------------------------------------------- + def _configure_manual_instrumentation() -> None: """Configure tracing/metrics/logging manually once per process so exported data goes to OTLP.""" from opentelemetry.sdk.trace import TracerProvider from opentelemetry.sdk.trace.export import BatchSpanProcessor - from opentelemetry.trace import SpanKind from opentelemetry import _events, _logs, metrics, trace from opentelemetry.exporter.otlp.proto.grpc._log_exporter import OTLPLogExporter @@ -524,9 +526,11 @@ def _configure_manual_instrumentation() -> None: from opentelemetry.sdk._logs.export import BatchLogRecordProcessor from opentelemetry.sdk.metrics import MeterProvider from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader - + trace.set_tracer_provider(TracerProvider()) - trace.get_tracer_provider().add_span_processor(BatchSpanProcessor(OTLPSpanExporter())) + trace.get_tracer_provider().add_span_processor( + BatchSpanProcessor(OTLPSpanExporter()) + ) metric_reader = PeriodicExportingMetricReader(OTLPMetricExporter()) metrics.set_meter_provider(MeterProvider(metric_readers=[metric_reader])) @@ -615,6 +619,7 @@ def main(manual_instrumentation: bool = False) -> None: time.sleep(300) + if __name__ == "__main__": parser = argparse.ArgumentParser( description="Multi-agent travel planner with optional manual instrumentation" diff --git a/util/opentelemetry-util-genai-emitters-splunk/CHANGELOG.md b/util/opentelemetry-util-genai-emitters-splunk/CHANGELOG.md new file mode 100644 index 0000000..f1d8714 --- /dev/null +++ b/util/opentelemetry-util-genai-emitters-splunk/CHANGELOG.md @@ -0,0 +1,7 @@ +# Changelog + +All notable changes to this repository are documented in this file. + +## Version 0.1.4 - 2025-11-07 + +- Initial 0.1.4 release of splunk-otel-genai-emitters-splunk diff --git a/util/opentelemetry-util-genai-evals-deepeval/CHANGELOG.md b/util/opentelemetry-util-genai-evals-deepeval/CHANGELOG.md new file mode 100644 index 0000000..028eb49 --- /dev/null +++ b/util/opentelemetry-util-genai-evals-deepeval/CHANGELOG.md @@ -0,0 +1,7 @@ +# Changelog + +All notable changes to this repository are documented in this file. + +## Version 0.1.6 - 2025-11-07 + +- Initial 0.1.6 release of splunk-otel-genai-evals-deepeval \ No newline at end of file diff --git a/util/opentelemetry-util-genai-evals/CHANGELOG.md b/util/opentelemetry-util-genai-evals/CHANGELOG.md new file mode 100644 index 0000000..25c6e2d --- /dev/null +++ b/util/opentelemetry-util-genai-evals/CHANGELOG.md @@ -0,0 +1,7 @@ +# Changelog + +All notable changes to this repository are documented in this file. + +## Version 0.1.4 - 2025-11-07 + +- Initial 0.1.4 release of splunk-otel-util-genai-evals \ No newline at end of file diff --git a/util/opentelemetry-util-genai-evals/src/opentelemetry/util/genai/evals/manager.py b/util/opentelemetry-util-genai-evals/src/opentelemetry/util/genai/evals/manager.py index c492da7..902f729 100644 --- a/util/opentelemetry-util-genai-evals/src/opentelemetry/util/genai/evals/manager.py +++ b/util/opentelemetry-util-genai-evals/src/opentelemetry/util/genai/evals/manager.py @@ -113,9 +113,9 @@ def on_completion(self, invocation: GenAI) -> None: return # Only evaluate LLMInvocation or AgentInvocation if ( - not isinstance(invocation, LLMInvocation) - and not isinstance(invocation, AgentInvocation) - and not isinstance(invocation, Workflow) + not isinstance(invocation, LLMInvocation) + and not isinstance(invocation, AgentInvocation) + and not isinstance(invocation, Workflow) ): return @@ -127,9 +127,9 @@ def on_completion(self, invocation: GenAI) -> None: if msgs: first = msgs[0] if ( - first.parts - and first.parts[0] == "ToolCall" - and first.finish_reason == "tool_calls" + first.parts + and first.parts[0] == "ToolCall" + and first.finish_reason == "tool_calls" ): offer = False diff --git a/util/opentelemetry-util-genai-traceloop-translator/CHANGELOG.md b/util/opentelemetry-util-genai-traceloop-translator/CHANGELOG.md new file mode 100644 index 0000000..9200e8a --- /dev/null +++ b/util/opentelemetry-util-genai-traceloop-translator/CHANGELOG.md @@ -0,0 +1,7 @@ +# Changelog + +All notable changes to this repository are documented in this file. + +## Version 0.1.5 - 2025-11-07 + +- Initial 0.1.5 release of splunk-otel-util-genai-translator-traceloop \ No newline at end of file diff --git a/util/opentelemetry-util-genai-traceloop-translator/src/opentelemetry/util/genai/processor/content_normalizer.py b/util/opentelemetry-util-genai-traceloop-translator/src/opentelemetry/util/genai/processor/content_normalizer.py index b357921..aee862d 100644 --- a/util/opentelemetry-util-genai-traceloop-translator/src/opentelemetry/util/genai/processor/content_normalizer.py +++ b/util/opentelemetry-util-genai-traceloop-translator/src/opentelemetry/util/genai/processor/content_normalizer.py @@ -30,15 +30,15 @@ def _coerce_text_part(content: Any) -> Dict[str, Any]: def _extract_langchain_messages(content_val: Any) -> List[Dict[str, Any]]: """ Extract actual message content from nested LangChain message objects. - + Handles formats like: - {"messages": [{"lc": 1, "kwargs": {"content": "text", "type": "human"}}]} - {"outputs": {"messages": [{"lc": 1, "kwargs": {"content": "text"}}]}} - + Returns list of extracted messages with their content and role. """ extracted = [] - + try: # Parse if it's a JSON string if isinstance(content_val, str): @@ -46,31 +46,33 @@ def _extract_langchain_messages(content_val: Any) -> List[Dict[str, Any]]: content_val = json.loads(content_val) except Exception: return [] # Not JSON, let caller handle it - + if not isinstance(content_val, dict): return [] - + # Check for "outputs" wrapper (common in workflow outputs) - if "outputs" in content_val and isinstance(content_val["outputs"], dict): + if "outputs" in content_val and isinstance( + content_val["outputs"], dict + ): content_val = content_val["outputs"] - + # Look for "messages" array messages = content_val.get("messages", []) if not isinstance(messages, list): return [] - + # Extract content from each LangChain message for msg in messages: if not isinstance(msg, dict): continue - + # Check if this is a LangChain message (has "lc": 1 and "kwargs") if msg.get("lc") == 1 and "kwargs" in msg: kwargs = msg["kwargs"] if isinstance(kwargs, dict): msg_content = kwargs.get("content") msg_type = kwargs.get("type", "unknown") - + if msg_content: # Map LangChain types to roles if msg_type == "human": @@ -82,14 +84,13 @@ def _extract_langchain_messages(content_val: Any) -> List[Dict[str, Any]]: else: # Infer from message position role = "user" if not extracted else "assistant" - - extracted.append({ - "content": msg_content, - "role": role - }) - + + extracted.append( + {"content": msg_content, "role": role} + ) + return extracted - + except Exception: return [] @@ -118,19 +119,26 @@ def normalize_traceloop_content( if k not in ("role", "finish_reason", "finishReason") } content_val = temp or "" - + # CRITICAL FIX: Check if content contains nested LangChain messages # This handles the format where Traceloop serializes workflow inputs/outputs # with LangChain message objects embedded in JSON langchain_messages = _extract_langchain_messages(content_val) - + if langchain_messages: # We found nested LangChain messages - extract their content for lc_msg in langchain_messages: parts = [_coerce_text_part(lc_msg["content"])] - msg: Dict[str, Any] = {"role": lc_msg["role"], "parts": parts} + msg: Dict[str, Any] = { + "role": lc_msg["role"], + "parts": parts, + } if direction == "output": - fr = m.get("finish_reason") or m.get("finishReason") or "stop" + fr = ( + m.get("finish_reason") + or m.get("finishReason") + or "stop" + ) msg["finish_reason"] = fr normalized.append(msg) else: @@ -138,10 +146,14 @@ def normalize_traceloop_content( parts = [_coerce_text_part(content_val)] msg: Dict[str, Any] = {"role": role, "parts": parts} if direction == "output": - fr = m.get("finish_reason") or m.get("finishReason") or "stop" + fr = ( + m.get("finish_reason") + or m.get("finishReason") + or "stop" + ) msg["finish_reason"] = fr normalized.append(msg) - + return normalized # Dict variants @@ -221,7 +233,11 @@ def normalize_traceloop_content( if "messages" in raw and isinstance(raw["messages"], list): return normalize_traceloop_content(raw["messages"], direction) # wrapper args (LangGraph/Traceloop format with function call args) - if "args" in raw and isinstance(raw["args"], list) and len(raw["args"]) > 0: + if ( + "args" in raw + and isinstance(raw["args"], list) + and len(raw["args"]) > 0 + ): # Extract first arg (usually contains messages and other params) first_arg = raw["args"][0] if isinstance(first_arg, dict): diff --git a/util/opentelemetry-util-genai-traceloop-translator/src/opentelemetry/util/genai/processor/traceloop_span_processor.py b/util/opentelemetry-util-genai-traceloop-translator/src/opentelemetry/util/genai/processor/traceloop_span_processor.py index b834b7c..a381589 100644 --- a/util/opentelemetry-util-genai-traceloop-translator/src/opentelemetry/util/genai/processor/traceloop_span_processor.py +++ b/util/opentelemetry-util-genai-traceloop-translator/src/opentelemetry/util/genai/processor/traceloop_span_processor.py @@ -86,7 +86,7 @@ class TransformationRule: traceloop_attributes: Dict[str, Any] = field(default_factory=dict) def matches( - self, span: ReadableSpan + self, span: ReadableSpan ) -> bool: # pragma: no cover - simple logic if self.match_name: if not fnmatch.fnmatch(span.name, self.match_name): @@ -114,7 +114,7 @@ def matches( if k not in span.attributes: return False if expected is not None and str(span.attributes.get(k)) != str( - expected + expected ): return False return True @@ -147,11 +147,11 @@ def _load_rules_from_env() -> List[TransformationRule]: attribute_transformations=r.get( "attribute_transformations", {} ) - or {}, + or {}, name_transformations=r.get("name_transformations", {}) - or {}, + or {}, traceloop_attributes=r.get("traceloop_attributes", {}) - or {}, + or {}, ) ) return rules @@ -171,15 +171,15 @@ class TraceloopSpanProcessor(SpanProcessor): """ def __init__( - self, - attribute_transformations: Optional[Dict[str, Any]] = None, - name_transformations: Optional[Dict[str, str]] = None, - traceloop_attributes: Optional[Dict[str, Any]] = None, - span_filter: Optional[Callable[[ReadableSpan], bool]] = None, - rules: Optional[List[TransformationRule]] = None, - load_env_rules: bool = True, - telemetry_handler: Optional[TelemetryHandler] = None, - mutate_original_span: bool = True, + self, + attribute_transformations: Optional[Dict[str, Any]] = None, + name_transformations: Optional[Dict[str, str]] = None, + traceloop_attributes: Optional[Dict[str, Any]] = None, + span_filter: Optional[Callable[[ReadableSpan], bool]] = None, + rules: Optional[List[TransformationRule]] = None, + load_env_rules: bool = True, + telemetry_handler: Optional[TelemetryHandler] = None, + mutate_original_span: bool = True, ): """ Initialize the Traceloop span processor. @@ -263,8 +263,8 @@ def _default_span_filter(self, span: ReadableSpan) -> bool: if span.attributes: # Check for traceloop entity attributes if ( - "traceloop.entity.input" in span.attributes - or "traceloop.entity.output" in span.attributes + "traceloop.entity.input" in span.attributes + or "traceloop.entity.output" in span.attributes ): # We already filtered task/workflow spans above, so if we get here # it means it has model data @@ -273,14 +273,14 @@ def _default_span_filter(self, span: ReadableSpan) -> bool: for attr_key in span.attributes.keys(): attr_key_lower = str(attr_key).lower() if any( - marker in attr_key_lower - for marker in ["llm", "ai", "gen_ai", "model"] + marker in attr_key_lower + for marker in ["llm", "ai", "gen_ai", "model"] ): return True return False def on_start( - self, span: Span, parent_context: Optional[Context] = None + self, span: Span, parent_context: Optional[Context] = None ) -> None: """Called when a span is started.""" pass @@ -355,7 +355,7 @@ def _process_span_translation(self, span: ReadableSpan) -> Optional[Any]: if invocation is None: logger.debug( "[TL_PROCESSOR] Skipping span translation - invocation creation returned None: %s", - span.name + span.name, ) return None @@ -369,9 +369,9 @@ def _process_span_translation(self, span: ReadableSpan) -> Optional[Any]: if span.parent: parent_span_id = getattr(span.parent, "span_id", None) if ( - parent_span_id - and parent_span_id - in self._original_to_translated_invocation + parent_span_id + and parent_span_id + in self._original_to_translated_invocation ): # We found the translated invocation of the parent - use its span translated_parent_invocation = ( @@ -381,9 +381,9 @@ def _process_span_translation(self, span: ReadableSpan) -> Optional[Any]: translated_parent_invocation, "span", None ) if ( - translated_parent_span - and hasattr(translated_parent_span, "is_recording") - and translated_parent_span.is_recording() + translated_parent_span + and hasattr(translated_parent_span, "is_recording") + and translated_parent_span.is_recording() ): from opentelemetry.trace import set_span_in_context @@ -407,35 +407,50 @@ def _process_span_translation(self, span: ReadableSpan) -> Optional[Any]: try: if hasattr(synthetic_span, "get_span_context"): span_ctx = synthetic_span.get_span_context() - synthetic_span_id = span_ctx.span_id if span_ctx else None + synthetic_span_id = ( + span_ctx.span_id if span_ctx else None + ) except Exception: pass if not synthetic_span_id: # Try alternative way to get span ID try: - from opentelemetry.util.genai.span_context import extract_span_context + from opentelemetry.util.genai.span_context import ( + extract_span_context, + ) + span_ctx = extract_span_context(synthetic_span) - synthetic_span_id = span_ctx.span_id if span_ctx else None + synthetic_span_id = ( + span_ctx.span_id if span_ctx else None + ) except Exception: pass if synthetic_span_id: self._synthetic_span_ids.add(synthetic_span_id) - logger.debug("[TL_PROCESSOR] Marked synthetic span ID=%s for skipping", synthetic_span_id) + logger.debug( + "[TL_PROCESSOR] Marked synthetic span ID=%s for skipping", + synthetic_span_id, + ) # Also set attribute as defense-in-depth - if hasattr(synthetic_span, "set_attribute") and synthetic_span.is_recording(): + if ( + hasattr(synthetic_span, "set_attribute") + and synthetic_span.is_recording() + ): try: - synthetic_span.set_attribute("_traceloop_translated", True) + synthetic_span.set_attribute( + "_traceloop_translated", True + ) except Exception: pass # Store the mapping from original span_id to translated INVOCATION (we'll close it later) if original_span_id: - self._original_to_translated_invocation[ - original_span_id - ] = invocation + self._original_to_translated_invocation[original_span_id] = ( + invocation + ) # DON'T call stop_llm yet - we'll do that after processing all children return invocation except Exception as emit_err: # pragma: no cover - defensive @@ -444,7 +459,9 @@ def _process_span_translation(self, span: ReadableSpan) -> Optional[Any]: ) return None - def _should_skip_span(self, span: ReadableSpan, span_id: Optional[int] = None) -> bool: + def _should_skip_span( + self, span: ReadableSpan, span_id: Optional[int] = None + ) -> bool: """ Check if a span should be skipped from processing. @@ -457,17 +474,25 @@ def _should_skip_span(self, span: ReadableSpan, span_id: Optional[int] = None) - # Skip synthetic spans we created (check span ID in set) if span_id and span_id in self._synthetic_span_ids: - _logger.debug("[TL_PROCESSOR] Skipping synthetic span (ID in set): %s", span.name) + _logger.debug( + "[TL_PROCESSOR] Skipping synthetic span (ID in set): %s", + span.name, + ) return True # Fallback: Also check attributes for defense-in-depth if span.attributes and "_traceloop_translated" in span.attributes: - _logger.debug("[TL_PROCESSOR] Skipping synthetic span (attribute): %s", span.name) + _logger.debug( + "[TL_PROCESSOR] Skipping synthetic span (attribute): %s", + span.name, + ) return True # Skip already processed spans if span.attributes and "_traceloop_processed" in span.attributes: - _logger.debug("[TL_PROCESSOR] Skipping already processed span: %s", span.name) + _logger.debug( + "[TL_PROCESSOR] Skipping already processed span: %s", span.name + ) return True return False @@ -499,20 +524,25 @@ def on_end(self, span: ReadableSpan) -> None: if exclude_pattern.lower() in span_name.lower(): _logger.debug( "[TL_PROCESSOR] Span excluded (will not export): pattern='%s', span=%s", - exclude_pattern, span_name + exclude_pattern, + span_name, ) # CRITICAL: Mark span as non-sampled to prevent export # This prevents the span from being sent to the backend - if hasattr(span, "_context") and hasattr(span._context, "_trace_flags"): # type: ignore + if hasattr(span, "_context") and hasattr( + span._context, "_trace_flags" + ): # type: ignore try: # Set trace flags to 0 (not sampled) span._context._trace_flags = 0 # type: ignore _logger.debug( - "[TL_PROCESSOR] Marked span as non-sampled: %s", span_name + "[TL_PROCESSOR] Marked span as non-sampled: %s", + span_name, ) except Exception as e: _logger.debug( - "[TL_PROCESSOR] Could not mark span as non-sampled: %s", e + "[TL_PROCESSOR] Could not mark span as non-sampled: %s", + e, ) return @@ -526,21 +556,31 @@ def on_end(self, span: ReadableSpan) -> None: invocation = self._process_span_translation(span) if invocation: # DEBUG: Verify messages are present before calling stop_llm - input_count = len(invocation.input_messages) if invocation.input_messages else 0 - output_count = len(invocation.output_messages) if invocation.output_messages else 0 + input_count = ( + len(invocation.input_messages) + if invocation.input_messages + else 0 + ) + output_count = ( + len(invocation.output_messages) + if invocation.output_messages + else 0 + ) _logger.debug( "[TL_PROCESSOR] Calling stop_llm with messages: input=%d, output=%d, span=%s", - input_count, output_count, span.name + input_count, + output_count, + span.name, ) if input_count == 0 and output_count == 0: _logger.warning( "[TL_PROCESSOR] WARNING: No messages on invocation before stop_llm! span=%s", - span.name + span.name, ) else: _logger.info( "[TL_PROCESSOR] Skipped LLM span (no invocation created - missing messages): %s", - span.name + span.name, ) return # Exit early, don't try to process further @@ -581,8 +621,14 @@ def on_end(self, span: ReadableSpan) -> None: invocations_to_close = [] for buffered_span in spans_to_process: # Skip spans that should not be processed - buffered_span_id = getattr(getattr(buffered_span, "context", None), "span_id", None) - if self._should_skip_span(buffered_span, buffered_span_id): + buffered_span_id = getattr( + getattr(buffered_span, "context", None), + "span_id", + None, + ) + if self._should_skip_span( + buffered_span, buffered_span_id + ): continue result_invocation = self._process_span_translation( @@ -592,7 +638,7 @@ def on_end(self, span: ReadableSpan) -> None: invocations_to_close.append(result_invocation) handler = ( - self.telemetry_handler or get_telemetry_handler() + self.telemetry_handler or get_telemetry_handler() ) for invocation in reversed(invocations_to_close): try: @@ -612,7 +658,7 @@ def on_end(self, span: ReadableSpan) -> None: logging.warning("[TL_PROCESSOR] Span transformation failed: %s", e) def _sort_spans_by_hierarchy( - self, spans: List[ReadableSpan] + self, spans: List[ReadableSpan] ) -> List[ReadableSpan]: """Sort spans so parents come before children.""" # Build a map of span_id to span @@ -688,7 +734,8 @@ def _is_llm_span(self, span: ReadableSpan) -> bool: if exclude_pattern.lower() in span_name.lower(): _logger.debug( "[TL_PROCESSOR] Span excluded (matches pattern '%s'): name=%s", - exclude_pattern, span_name + exclude_pattern, + span_name, ) return False @@ -699,26 +746,37 @@ def _is_llm_span(self, span: ReadableSpan) -> bool: operation_name = span.attributes.get("gen_ai.operation.name") if operation_name: # Only trigger on actual LLM operations: chat, completion, embedding - if any(op in str(operation_name).lower() for op in _LLM_OPERATIONS): + if any( + op in str(operation_name).lower() for op in _LLM_OPERATIONS + ): _logger.debug( "[TL_PROCESSOR] LLM span detected (gen_ai.operation.name=%s): name=%s", - operation_name, span.name + operation_name, + span.name, ) return True else: # Has operation name but not an LLM operation (e.g., "workflow", "task", "tool") _logger.debug( "[TL_PROCESSOR] Non-LLM operation (gen_ai.operation.name=%s): name=%s", - operation_name, span.name + operation_name, + span.name, ) return False # No gen_ai.operation.name means it wasn't transformed or doesn't match our rules - _logger.debug("[TL_PROCESSOR] Span skipped (no gen_ai.operation.name): name=%s", span.name) + _logger.debug( + "[TL_PROCESSOR] Span skipped (no gen_ai.operation.name): name=%s", + span.name, + ) return False def _reconstruct_and_set_messages( - self, original_attrs: dict, mutated_attrs: dict, span_name: str, span_id: Optional[int] = None + self, + original_attrs: dict, + mutated_attrs: dict, + span_name: str, + span_id: Optional[int] = None, ) -> Optional[tuple]: """ Reconstruct messages from Traceloop format and set them as gen_ai.* attributes. @@ -731,14 +789,12 @@ def _reconstruct_and_set_messages( _logger = logging.getLogger(__name__) # Extract Traceloop serialized data - original_input_data = ( - original_attrs.get("traceloop.entity.input") or - mutated_attrs.get("gen_ai.input.messages") - ) - original_output_data = ( - original_attrs.get("traceloop.entity.output") or - mutated_attrs.get("gen_ai.output.messages") - ) + original_input_data = original_attrs.get( + "traceloop.entity.input" + ) or mutated_attrs.get("gen_ai.input.messages") + original_output_data = original_attrs.get( + "traceloop.entity.output" + ) or mutated_attrs.get("gen_ai.output.messages") if not original_input_data and not original_output_data: return None # Nothing to reconstruct @@ -751,47 +807,66 @@ def _reconstruct_and_set_messages( # Convert to GenAI SDK format (with .parts containing Text objects) # This is the format DeepEval expects: InputMessage/OutputMessage with Text objects - input_messages = self._convert_langchain_to_genai_messages(lc_input, "input") - output_messages = self._convert_langchain_to_genai_messages(lc_output, "output") + input_messages = self._convert_langchain_to_genai_messages( + lc_input, "input" + ) + output_messages = self._convert_langchain_to_genai_messages( + lc_output, "output" + ) # Serialize to JSON and store as gen_ai.* attributes (for span export) if input_messages: # Convert to OTel format: list of dicts with role and parts - input_json = json.dumps([ - { - "role": msg.role, - "parts": [{"type": "text", "content": part.content} for part in msg.parts] - } - for msg in input_messages - ]) + input_json = json.dumps( + [ + { + "role": msg.role, + "parts": [ + {"type": "text", "content": part.content} + for part in msg.parts + ], + } + for msg in input_messages + ] + ) mutated_attrs["gen_ai.input.messages"] = input_json if output_messages: - output_json = json.dumps([ - { - "role": msg.role, - "parts": [{"type": "text", "content": part.content} for part in msg.parts], - "finish_reason": getattr(msg, "finish_reason", "stop") - } - for msg in output_messages - ]) + output_json = json.dumps( + [ + { + "role": msg.role, + "parts": [ + {"type": "text", "content": part.content} + for part in msg.parts + ], + "finish_reason": getattr( + msg, "finish_reason", "stop" + ), + } + for msg in output_messages + ] + ) mutated_attrs["gen_ai.output.messages"] = output_json _logger.debug( "[TL_PROCESSOR] Messages reconstructed in mutation: input=%d, output=%d, span=%s", len(input_messages) if input_messages else 0, len(output_messages) if output_messages else 0, - span_name + span_name, ) # Cache the Python message objects for later use (avoid second reconstruction) if span_id is not None: - self._message_cache[span_id] = (input_messages, output_messages) + self._message_cache[span_id] = ( + input_messages, + output_messages, + ) _logger.debug( "[TL_PROCESSOR] Cached messages for span_id=%s: input=%d, output=%d", span_id, len(input_messages) if input_messages else 0, - len(output_messages) if output_messages else 0 + len(output_messages) if output_messages else 0, ) return (input_messages, output_messages) @@ -799,7 +874,8 @@ def _reconstruct_and_set_messages( except Exception as e: _logger.debug( "[TL_PROCESSOR] Message reconstruction in mutation failed: %s, span=%s", - e, span_name + e, + span_name, ) return None @@ -861,18 +937,35 @@ def _mutate_span_if_needed(self, span: ReadableSpan) -> None: # Check gen_ai.operation.name (set during transformation) to determine if this is an LLM span operation_name = mutated.get("gen_ai.operation.name", "") # Check span_kind from both transformed and original attributes (fallback for safety) - span_kind = mutated.get("gen_ai.span.kind", "") or original.get("traceloop.span.kind", "") + span_kind = mutated.get( + "gen_ai.span.kind", "" + ) or original.get("traceloop.span.kind", "") # Fallback: infer from span name if operation name not set if not operation_name and span.name: span_name_lower = span.name.lower() - for pattern in ["openai.chat", "anthropic.chat", ".chat", "chat ", "completion", "embed"]: + for pattern in [ + "openai.chat", + "anthropic.chat", + ".chat", + "chat ", + "completion", + "embed", + ]: if pattern in span_name_lower: - operation_name = "chat" if "chat" in pattern else ( - "embedding" if "embed" in pattern else "completion") + operation_name = ( + "chat" + if "chat" in pattern + else ( + "embedding" + if "embed" in pattern + else "completion" + ) + ) _logger.debug( "[TL_PROCESSOR] Inferred operation from span name: %s → %s", - span.name, operation_name + span.name, + operation_name, ) break @@ -882,28 +975,37 @@ def _mutate_span_if_needed(self, span: ReadableSpan) -> None: ) is_agent_operation = any( - op in str(span_kind).lower() - for op in ["agent"] + op in str(span_kind).lower() for op in ["agent"] ) is_task_operation = any( - op in str(span_kind).lower() - for op in ["task"] + op in str(span_kind).lower() for op in ["task"] ) - if is_llm_operation or is_agent_operation or is_task_operation: + if ( + is_llm_operation + or is_agent_operation + or is_task_operation + ): # This is an LLM span - reconstruct messages once and cache them - span_id = getattr(getattr(span, "context", None), "span_id", None) - self._reconstruct_and_set_messages(original, mutated, span.name, span_id) + span_id = getattr( + getattr(span, "context", None), "span_id", None + ) + self._reconstruct_and_set_messages( + original, mutated, span.name, span_id + ) _logger.debug( "[TL_PROCESSOR] Messages reconstructed for LLM span: operation=%s, span=%s, span_id=%s", - operation_name, span.name, span_id + operation_name, + span.name, + span_id, ) else: # Not an LLM span - skip message reconstruction _logger.debug( "[TL_PROCESSOR] Skipping message reconstruction for non-LLM span: operation=%s, span=%s", - operation_name, span.name + operation_name, + span.name, ) # Mark as processed @@ -947,7 +1049,7 @@ def _mutate_span_if_needed(self, span: ReadableSpan) -> None: ) def _apply_attribute_transformations( - self, base: Dict[str, Any], transformations: Optional[Dict[str, Any]] + self, base: Dict[str, Any], transformations: Optional[Dict[str, Any]] ) -> Dict[str, Any]: if not transformations: return base @@ -960,8 +1062,8 @@ def _apply_attribute_transformations( value = base.pop(old) # Special handling for entity input/output - normalize and serialize if old in ( - "traceloop.entity.input", - "traceloop.entity.output", + "traceloop.entity.input", + "traceloop.entity.output", ): try: direction = "input" if "input" in old else "output" @@ -989,9 +1091,9 @@ def _apply_attribute_transformations( return base def _derive_new_name( - self, - original_name: str, - name_transformations: Optional[Dict[str, str]], + self, + original_name: str, + name_transformations: Optional[Dict[str, str]], ) -> Optional[str]: if not name_transformations: return None @@ -1006,7 +1108,7 @@ def _derive_new_name( return None def _convert_langchain_to_genai_messages( - self, langchain_messages: Optional[List], direction: str + self, langchain_messages: Optional[List], direction: str ) -> List: """ Convert LangChain messages to GenAI SDK message format. @@ -1014,7 +1116,11 @@ def _convert_langchain_to_genai_messages( LangChain messages have .content directly, but GenAI SDK expects messages with .parts containing Text/ToolCall objects. """ - from opentelemetry.util.genai.types import InputMessage, OutputMessage, Text + from opentelemetry.util.genai.types import ( + InputMessage, + OutputMessage, + Text, + ) if not langchain_messages: return [] @@ -1042,11 +1148,19 @@ def _convert_langchain_to_genai_messages( # CRITICAL 1: Check if content is a JSON string with LangChain serialization format # Basically only use the "content" of the incoming traceloop entity input/output - if isinstance(content, str) and content.startswith("{") and '"lc"' in content: + if ( + isinstance(content, str) + and content.startswith("{") + and '"lc"' in content + ): try: parsed = json.loads(content) # LangChain serialization format: {"lc": 1, "kwargs": {"content": "..."}} - if isinstance(parsed, dict) and "kwargs" in parsed and "content" in parsed["kwargs"]: + if ( + isinstance(parsed, dict) + and "kwargs" in parsed + and "content" in parsed["kwargs"] + ): content = parsed["kwargs"]["content"] logging.getLogger(__name__).debug( "[TL_PROCESSOR] Extracted content from LangChain serialization format" @@ -1054,7 +1168,7 @@ def _convert_langchain_to_genai_messages( except (json.JSONDecodeError, KeyError, TypeError) as e: logging.getLogger(__name__).warning( "[TL_PROCESSOR] Failed to parse LangChain serialization: %s", - str(e) + str(e), ) # CRITICAL 2: Ensure content is a string, not a dict or other object @@ -1063,16 +1177,22 @@ def _convert_langchain_to_genai_messages( # Try to extract the actual text from it if "content" in content: content = content["content"] - elif "parts" in content and isinstance(content["parts"], list): + elif "parts" in content and isinstance( + content["parts"], list + ): # Extract from parts structure - text_parts = [p.get("content", "") for p in content["parts"] if isinstance(p, dict)] + text_parts = [ + p.get("content", "") + for p in content["parts"] + if isinstance(p, dict) + ] content = " ".join(text_parts) else: # Fallback: serialize to JSON string (not ideal) content = json.dumps(content) logging.getLogger(__name__).warning( "[TL_PROCESSOR] Content is dict, serializing: %s", - str(content)[:100] + str(content)[:100], ) parts = [Text(content=str(content))] if content else [] @@ -1081,15 +1201,10 @@ def _convert_langchain_to_genai_messages( if direction == "output": finish_reason = getattr(lc_msg, "finish_reason", "stop") genai_msg = OutputMessage( - role=role, - parts=parts, - finish_reason=finish_reason + role=role, parts=parts, finish_reason=finish_reason ) else: - genai_msg = InputMessage( - role=role, - parts=parts - ) + genai_msg = InputMessage(role=role, parts=parts) genai_messages.append(genai_msg) except Exception as e: @@ -1101,12 +1216,12 @@ def _convert_langchain_to_genai_messages( return genai_messages def _build_invocation( - self, - existing_span: ReadableSpan, - *, - attribute_transformations: Optional[Dict[str, Any]] = None, - name_transformations: Optional[Dict[str, str]] = None, - traceloop_attributes: Optional[Dict[str, Any]] = None, + self, + existing_span: ReadableSpan, + *, + attribute_transformations: Optional[Dict[str, Any]] = None, + name_transformations: Optional[Dict[str, str]] = None, + traceloop_attributes: Optional[Dict[str, Any]] = None, ) -> LLMInvocation: base_attrs: Dict[str, Any] = ( dict(existing_span.attributes) if existing_span.attributes else {} @@ -1117,14 +1232,14 @@ def _build_invocation( # Try both old format (traceloop.entity.*) and new format (gen_ai.*) # Support both singular and plural attribute names original_input_data = ( - base_attrs.get("gen_ai.input.messages") or - base_attrs.get("gen_ai.input.message") or - base_attrs.get("traceloop.entity.input") + base_attrs.get("gen_ai.input.messages") + or base_attrs.get("gen_ai.input.message") + or base_attrs.get("traceloop.entity.input") ) original_output_data = ( - base_attrs.get("gen_ai.output.messages") or - base_attrs.get("gen_ai.output.message") or - base_attrs.get("traceloop.entity.output") + base_attrs.get("gen_ai.output.messages") + or base_attrs.get("gen_ai.output.message") + or base_attrs.get("traceloop.entity.output") ) # Apply attribute transformations @@ -1152,10 +1267,10 @@ def _build_invocation( # Try to get model from various attribute sources request_model = ( - base_attrs.get("gen_ai.request.model") - or base_attrs.get("gen_ai.response.model") - or base_attrs.get("llm.request.model") - or base_attrs.get("ai.model.name") + base_attrs.get("gen_ai.request.model") + or base_attrs.get("gen_ai.response.model") + or base_attrs.get("llm.request.model") + or base_attrs.get("ai.model.name") ) # Infer model from original span name pattern like "chat gpt-4" if not found @@ -1179,10 +1294,10 @@ def _build_invocation( "traceloop.span.kind" ) if not request_model and span_kind in ( - "task", - "workflow", - "agent", - "tool", + "task", + "workflow", + "agent", + "tool", ): # Use the original span name to avoid "chat unknown" if not new_name: @@ -1203,7 +1318,9 @@ def _build_invocation( base_attrs.setdefault("gen_ai.override.span_name", new_name) # Get messages from cache (reconstructed during mutation, no need to reconstruct again) - span_id = getattr(getattr(existing_span, "context", None), "span_id", None) + span_id = getattr( + getattr(existing_span, "context", None), "span_id", None + ) cached_messages = self._message_cache.get(span_id) _logger = logging.getLogger(__name__) @@ -1212,7 +1329,7 @@ def _build_invocation( span_id, span_id in self._message_cache if span_id else False, len(self._message_cache), - existing_span.name + existing_span.name, ) if cached_messages: @@ -1223,7 +1340,7 @@ def _build_invocation( len(input_messages) if input_messages else 0, len(output_messages) if output_messages else 0, existing_span.name, - span_id + span_id, ) else: # Fallback: try to reconstruct if not in cache (shouldn't happen for LLM spans) @@ -1235,40 +1352,51 @@ def _build_invocation( span_id, existing_span.name, original_input_data is not None, - original_output_data is not None + original_output_data is not None, ) if original_input_data or original_output_data: try: _logger.debug( "[TL_PROCESSOR] Attempting fallback reconstruction: input_len=%d, output_len=%d", - len(str(original_input_data)) if original_input_data else 0, - len(str(original_output_data)) if original_output_data else 0 + len(str(original_input_data)) + if original_input_data + else 0, + len(str(original_output_data)) + if original_output_data + else 0, ) lc_input, lc_output = reconstruct_messages_from_traceloop( original_input_data, original_output_data ) # Convert LangChain messages to GenAI SDK format for evaluations - input_messages = self._convert_langchain_to_genai_messages(lc_input, "input") - output_messages = self._convert_langchain_to_genai_messages(lc_output, "output") + input_messages = self._convert_langchain_to_genai_messages( + lc_input, "input" + ) + output_messages = ( + self._convert_langchain_to_genai_messages( + lc_output, "output" + ) + ) _logger.debug( "[TL_PROCESSOR] Fallback: reconstructed messages for invocation: input=%d, output=%d, span=%s", len(input_messages) if input_messages else 0, len(output_messages) if output_messages else 0, - existing_span.name + existing_span.name, ) except Exception as e: _logger.warning( "[TL_PROCESSOR] Message reconstruction failed: %s, span=%s", - e, existing_span.name + e, + existing_span.name, ) else: _logger.error( "[TL_PROCESSOR] ERROR: No message data available! span_id=%s, span=%s, attrs_keys=%s", span_id, existing_span.name, - list(base_attrs.keys())[:20] + list(base_attrs.keys())[:20], ) # Create invocation with reconstructed messages @@ -1278,7 +1406,7 @@ def _build_invocation( len(input_messages) if input_messages else 0, len(output_messages) if output_messages else 0, existing_span.name, - span_id + span_id, ) # CRITICAL: Don't create invocation if we don't have messages @@ -1289,9 +1417,10 @@ def _build_invocation( "span=%s, span_id=%s, is_llm=%s, is_agent=%s, is_task=%s", existing_span.name, span_id, - "llm" in str(base_attrs.get("gen_ai.operation.name", "")).lower(), + "llm" + in str(base_attrs.get("gen_ai.operation.name", "")).lower(), "agent" in str(base_attrs.get("gen_ai.span.kind", "")).lower(), - "task" in str(base_attrs.get("gen_ai.span.kind", "")).lower() + "task" in str(base_attrs.get("gen_ai.span.kind", "")).lower(), ) return None @@ -1303,7 +1432,7 @@ def _build_invocation( "span=%s, span_id=%s, output_messages=%s", existing_span.name, span_id, - output_messages + output_messages, ) return None diff --git a/util/opentelemetry-util-genai-traceloop-translator/src/opentelemetry/util/genai/traceloop/__init__.py b/util/opentelemetry-util-genai-traceloop-translator/src/opentelemetry/util/genai/traceloop/__init__.py index 16503cf..08c1147 100644 --- a/util/opentelemetry-util-genai-traceloop-translator/src/opentelemetry/util/genai/traceloop/__init__.py +++ b/util/opentelemetry-util-genai-traceloop-translator/src/opentelemetry/util/genai/traceloop/__init__.py @@ -45,7 +45,7 @@ "traceloop.span.kind": "gen_ai.span.kind", "llm.request.type": "gen_ai.operation.name", "gen_ai.completion.0.content": "gen_ai.output.messages", - "gen_ai.prompt.0.content": "gen_ai.input.messages" + "gen_ai.prompt.0.content": "gen_ai.input.messages", } } @@ -85,7 +85,7 @@ def enable_traceloop_translator( "TraceloopSpanProcessor already registered (global flag); skipping duplicate" ) return False - + # Import here to avoid circular imports from ..processor.traceloop_span_processor import TraceloopSpanProcessor @@ -206,7 +206,7 @@ def wrapped_set_tracer_provider(tracer_provider): "TraceloopSpanProcessor already registered (global flag); skipping deferred registration" ) return result - + processor = TraceloopSpanProcessor( attribute_transformations=_DEFAULT_ATTR_TRANSFORMATIONS, name_transformations=_DEFAULT_NAME_TRANSFORMATIONS, diff --git a/util/opentelemetry-util-genai-traceloop-translator/tests/test_args_wrapper_format.py b/util/opentelemetry-util-genai-traceloop-translator/tests/test_args_wrapper_format.py index cb4e57f..a048507 100644 --- a/util/opentelemetry-util-genai-traceloop-translator/tests/test_args_wrapper_format.py +++ b/util/opentelemetry-util-genai-traceloop-translator/tests/test_args_wrapper_format.py @@ -1,9 +1,10 @@ """Test handling of args wrapper format from LangGraph/Traceloop.""" -import json import pytest -from opentelemetry.util.genai.processor.content_normalizer import normalize_traceloop_content +from opentelemetry.util.genai.processor.content_normalizer import ( + normalize_traceloop_content, +) class TestArgsWrapperFormat: @@ -13,31 +14,40 @@ def test_args_wrapper_with_messages(self): """Test the actual format shown in debugger.""" # This is the EXACT format from the debugger screenshot input_data = { - "args": [{ - "messages": [{ - "lc": 1, - "type": "constructor", - "id": ["langchain", "schema", "messages", "HumanMessage"], - "kwargs": { - "content": "We're planning a romantic long-week trip to Paris from Seattle next month. We'd love a boutique hotel, business-class flights and a few unique experiences.", - "type": "human", - "id": "8bb38518-7561-40e0-9c3a-682b825ca00d" - } - }], - "user_request": "We're planning a romantic long-week trip to Paris from Seattle next month. We'd love a boutique hotel, business-class flights and a few unique experiences.", - "session_id": "f158b070-5e18-43f7-99f0-095364ed1211", - "origin": "Seattle", - "destination": "Paris", - "departure": "2025-12-07", - "return_date": "2025-12-14", - "travellers": 2, - "flight_summary": None, - "hotel_summary": None, - "activities_summary": None, - "final_itinerary": None, - "current_agent": "start" - }], - "kwargs": {} + "args": [ + { + "messages": [ + { + "lc": 1, + "type": "constructor", + "id": [ + "langchain", + "schema", + "messages", + "HumanMessage", + ], + "kwargs": { + "content": "We're planning a romantic long-week trip to Paris from Seattle next month. We'd love a boutique hotel, business-class flights and a few unique experiences.", + "type": "human", + "id": "8bb38518-7561-40e0-9c3a-682b825ca00d", + }, + } + ], + "user_request": "We're planning a romantic long-week trip to Paris from Seattle next month. We'd love a boutique hotel, business-class flights and a few unique experiences.", + "session_id": "f158b070-5e18-43f7-99f0-095364ed1211", + "origin": "Seattle", + "destination": "Paris", + "departure": "2025-12-07", + "return_date": "2025-12-14", + "travellers": 2, + "flight_summary": None, + "hotel_summary": None, + "activities_summary": None, + "final_itinerary": None, + "current_agent": "start", + } + ], + "kwargs": {}, } # Normalize @@ -45,90 +55,109 @@ def test_args_wrapper_with_messages(self): # Verify assert len(result) == 1, f"Should have 1 message, got {len(result)}" - + message = result[0] - assert message["role"] == "user", f"Role should be 'user', got {message['role']}" - assert len(message["parts"]) == 1, f"Should have 1 part, got {len(message['parts'])}" - + assert ( + message["role"] == "user" + ), f"Role should be 'user', got {message['role']}" + assert ( + len(message["parts"]) == 1 + ), f"Should have 1 part, got {len(message['parts'])}" + part = message["parts"][0] - assert part["type"] == "text", f"Part type should be 'text', got {part['type']}" - assert "Paris" in part["content"], f"Content should mention Paris" - assert "Seattle" in part["content"], f"Content should mention Seattle" - assert "boutique hotel" in part["content"], f"Content should mention boutique hotel" + assert ( + part["type"] == "text" + ), f"Part type should be 'text', got {part['type']}" + assert "Paris" in part["content"], "Content should mention Paris" + assert "Seattle" in part["content"], "Content should mention Seattle" + assert ( + "boutique hotel" in part["content"] + ), "Content should mention boutique hotel" def test_args_wrapper_with_multiple_messages(self): """Test args wrapper with conversation history.""" input_data = { - "args": [{ - "messages": [ - { - "lc": 1, - "type": "constructor", - "id": ["langchain", "schema", "messages", "SystemMessage"], - "kwargs": { - "content": "You are a helpful assistant.", - "type": "system" - } - }, - { - "lc": 1, - "type": "constructor", - "id": ["langchain", "schema", "messages", "HumanMessage"], - "kwargs": { - "content": "Hello!", - "type": "human" - } - } - ] - }], - "kwargs": {} + "args": [ + { + "messages": [ + { + "lc": 1, + "type": "constructor", + "id": [ + "langchain", + "schema", + "messages", + "SystemMessage", + ], + "kwargs": { + "content": "You are a helpful assistant.", + "type": "system", + }, + }, + { + "lc": 1, + "type": "constructor", + "id": [ + "langchain", + "schema", + "messages", + "HumanMessage", + ], + "kwargs": {"content": "Hello!", "type": "human"}, + }, + ] + } + ], + "kwargs": {}, } result = normalize_traceloop_content(input_data, "input") assert len(result) == 2, f"Should have 2 messages, got {len(result)}" - + # System message assert result[0]["role"] == "system" - assert result[0]["parts"][0]["content"] == "You are a helpful assistant." - + assert ( + result[0]["parts"][0]["content"] == "You are a helpful assistant." + ) + # Human message assert result[1]["role"] == "user" assert result[1]["parts"][0]["content"] == "Hello!" def test_args_wrapper_empty_messages(self): """Test args wrapper with empty messages array.""" - input_data = { - "args": [{ - "messages": [] - }], - "kwargs": {} - } + input_data = {"args": [{"messages": []}], "kwargs": {}} result = normalize_traceloop_content(input_data, "input") - assert result == [], f"Should return empty list for empty messages" + assert result == [], "Should return empty list for empty messages" def test_args_wrapper_output_format(self): """Test args wrapper for output (response) format.""" output_data = { - "args": [{ - "messages": [ - { - "lc": 1, - "type": "constructor", - "id": ["langchain", "schema", "messages", "AIMessage"], - "kwargs": { - "content": "I can help you plan your trip to Paris!", - "type": "ai", - "response_metadata": { - "finish_reason": "stop" - } + "args": [ + { + "messages": [ + { + "lc": 1, + "type": "constructor", + "id": [ + "langchain", + "schema", + "messages", + "AIMessage", + ], + "kwargs": { + "content": "I can help you plan your trip to Paris!", + "type": "ai", + "response_metadata": {"finish_reason": "stop"}, + }, } - } - ] - }], - "kwargs": {} + ] + } + ], + "kwargs": {}, } result = normalize_traceloop_content(output_data, "output") @@ -143,15 +172,19 @@ def test_nested_inputs_still_works(self): # Old format with "inputs" wrapper old_format = { "inputs": { - "messages": [{ - "lc": 1, - "type": "constructor", - "id": ["langchain", "schema", "messages", "HumanMessage"], - "kwargs": { - "content": "Test message", - "type": "human" + "messages": [ + { + "lc": 1, + "type": "constructor", + "id": [ + "langchain", + "schema", + "messages", + "HumanMessage", + ], + "kwargs": {"content": "Test message", "type": "human"}, } - }] + ] } } @@ -165,15 +198,14 @@ def test_direct_messages_still_works(self): """Ensure direct messages format still works.""" # Direct format (no wrapper) direct_format = { - "messages": [{ - "lc": 1, - "type": "constructor", - "id": ["langchain", "schema", "messages", "HumanMessage"], - "kwargs": { - "content": "Direct message", - "type": "human" + "messages": [ + { + "lc": 1, + "type": "constructor", + "id": ["langchain", "schema", "messages", "HumanMessage"], + "kwargs": {"content": "Direct message", "type": "human"}, } - }] + ] } result = normalize_traceloop_content(direct_format, "input") @@ -185,4 +217,3 @@ def test_direct_messages_still_works(self): if __name__ == "__main__": pytest.main([__file__, "-v"]) - diff --git a/util/opentelemetry-util-genai-traceloop-translator/tests/test_message_serialization.py b/util/opentelemetry-util-genai-traceloop-translator/tests/test_message_serialization.py index 4bab81d..72f20dd 100644 --- a/util/opentelemetry-util-genai-traceloop-translator/tests/test_message_serialization.py +++ b/util/opentelemetry-util-genai-traceloop-translator/tests/test_message_serialization.py @@ -5,6 +5,7 @@ """ import json + import pytest from opentelemetry.util.genai.types import InputMessage, OutputMessage, Text @@ -17,14 +18,21 @@ def test_input_message_not_double_encoded(self): """Test that InputMessage content is not double-encoded.""" msg = InputMessage( role="user", - parts=[Text(content="Hello, how are you?", type="text")] + parts=[Text(content="Hello, how are you?", type="text")], ) # Serialize as we do in the processor - serialized = json.dumps([{ - "role": msg.role, - "parts": [{"type": "text", "content": part.content} for part in msg.parts] - }]) + serialized = json.dumps( + [ + { + "role": msg.role, + "parts": [ + {"type": "text", "content": part.content} + for part in msg.parts + ], + } + ] + ) # Parse back parsed = json.loads(serialized) @@ -35,11 +43,13 @@ def test_input_message_not_double_encoded(self): assert len(parsed[0]["parts"]) == 1 assert parsed[0]["parts"][0]["type"] == "text" assert parsed[0]["parts"][0]["content"] == "Hello, how are you?" - + # CRITICAL: Content should be a STRING, not nested JSON content = parsed[0]["parts"][0]["content"] assert isinstance(content, str), "Content must be string" - assert not content.startswith('{"'), "Content should NOT be JSON string" + assert not content.startswith( + '{"' + ), "Content should NOT be JSON string" assert content == "Hello, how are you?", "Content should be plain text" def test_output_message_not_double_encoded(self): @@ -47,15 +57,22 @@ def test_output_message_not_double_encoded(self): msg = OutputMessage( role="assistant", parts=[Text(content="I'm doing great, thanks!", type="text")], - finish_reason="stop" + finish_reason="stop", ) # Serialize as we do in the processor - serialized = json.dumps([{ - "role": msg.role, - "parts": [{"type": "text", "content": part.content} for part in msg.parts], - "finish_reason": msg.finish_reason - }]) + serialized = json.dumps( + [ + { + "role": msg.role, + "parts": [ + {"type": "text", "content": part.content} + for part in msg.parts + ], + "finish_reason": msg.finish_reason, + } + ] + ) # Parse back parsed = json.loads(serialized) @@ -65,36 +82,53 @@ def test_output_message_not_double_encoded(self): assert parsed[0]["role"] == "assistant" assert parsed[0]["finish_reason"] == "stop" assert len(parsed[0]["parts"]) == 1 - + # CRITICAL: Content should be plain text, not JSON content = parsed[0]["parts"][0]["content"] assert isinstance(content, str), "Content must be string" - assert not content.startswith('{"'), "Content should NOT be JSON string" - assert content == "I'm doing great, thanks!", "Content should be plain text" + assert not content.startswith( + '{"' + ), "Content should NOT be JSON string" + assert ( + content == "I'm doing great, thanks!" + ), "Content should be plain text" def test_deepeval_can_parse_serialized_messages(self): """Test that DeepEval can parse our serialized format.""" # Create messages input_msg = InputMessage( - role="user", - parts=[Text(content="Test input", type="text")] + role="user", parts=[Text(content="Test input", type="text")] ) output_msg = OutputMessage( role="assistant", parts=[Text(content="Test output", type="text")], - finish_reason="stop" + finish_reason="stop", ) # Serialize to JSON string (as stored in span attributes) - input_json = json.dumps([{ - "role": input_msg.role, - "parts": [{"type": "text", "content": part.content} for part in input_msg.parts] - }]) - output_json = json.dumps([{ - "role": output_msg.role, - "parts": [{"type": "text", "content": part.content} for part in output_msg.parts], - "finish_reason": output_msg.finish_reason - }]) + input_json = json.dumps( + [ + { + "role": input_msg.role, + "parts": [ + {"type": "text", "content": part.content} + for part in input_msg.parts + ], + } + ] + ) + output_json = json.dumps( + [ + { + "role": output_msg.role, + "parts": [ + {"type": "text", "content": part.content} + for part in output_msg.parts + ], + "finish_reason": output_msg.finish_reason, + } + ] + ) # Simulate what DeepEval does: parse JSON and extract text input_parsed = json.loads(input_json) @@ -118,31 +152,39 @@ def extract_text(messages): def test_complex_content_not_double_encoded(self): """Test that complex content with special characters is not double-encoded.""" - complex_content = 'I found a flight:\n- Airline: AeroJet\n- Price: $1044\nWould you like more information?' - + complex_content = "I found a flight:\n- Airline: AeroJet\n- Price: $1044\nWould you like more information?" + msg = OutputMessage( role="assistant", parts=[Text(content=complex_content, type="text")], - finish_reason="stop" + finish_reason="stop", ) # Serialize - serialized = json.dumps([{ - "role": msg.role, - "parts": [{"type": "text", "content": part.content} for part in msg.parts], - "finish_reason": msg.finish_reason - }]) + serialized = json.dumps( + [ + { + "role": msg.role, + "parts": [ + {"type": "text", "content": part.content} + for part in msg.parts + ], + "finish_reason": msg.finish_reason, + } + ] + ) # Parse back parsed = json.loads(serialized) content = parsed[0]["parts"][0]["content"] # Verify content is unchanged - assert content == complex_content, "Complex content should be preserved" + assert ( + content == complex_content + ), "Complex content should be preserved" assert "\n" in content, "Newlines should be preserved" assert "$" in content, "Special characters should be preserved" if __name__ == "__main__": pytest.main([__file__, "-v"]) - diff --git a/util/opentelemetry-util-genai-traceloop-translator/tests/test_nested_traceloop_reconstruction.py b/util/opentelemetry-util-genai-traceloop-translator/tests/test_nested_traceloop_reconstruction.py index 4f840cf..886319d 100644 --- a/util/opentelemetry-util-genai-traceloop-translator/tests/test_nested_traceloop_reconstruction.py +++ b/util/opentelemetry-util-genai-traceloop-translator/tests/test_nested_traceloop_reconstruction.py @@ -5,9 +5,15 @@ """ import json + import pytest -from opentelemetry.util.genai.processor.content_normalizer import normalize_traceloop_content -from opentelemetry.util.genai.processor.message_reconstructor import reconstruct_messages_from_traceloop + +from opentelemetry.util.genai.processor.content_normalizer import ( + normalize_traceloop_content, +) +from opentelemetry.util.genai.processor.message_reconstructor import ( + reconstruct_messages_from_traceloop, +) class TestNestedTraceloopReconstruction: @@ -17,42 +23,61 @@ def test_reconstruct_nested_langchain_message(self): """Test reconstruction of nested LangChain message from Traceloop format.""" # This is the actual format from Traceloop when serializing workflow inputs # The content field contains an escaped JSON string with LangChain message objects - traceloop_input = json.dumps([{ - "role": "user", - "parts": [{ - "type": "text", - "content": json.dumps({ - "args": [{ - "messages": [{ - "lc": 1, - "type": "constructor", - "id": ["langchain", "schema", "messages", "HumanMessage"], - "kwargs": { - "content": "We're planning a romantic long-week trip to Paris from Seattle next month. We'd love a boutique hotel, business-class flights and a few unique experiences.", - "type": "human", - "id": "1a8d19f3-f45f-476d-a3cf-35a0b6ddaf00" - } - }], - "user_request": "We're planning a romantic long-week trip to Paris from Seattle next month. We'd love a boutique hotel, business-class flights and a few unique experiences.", - "session_id": "ea8a14ca-0c6a-43f8-a725-c2441b00254b", - "origin": "Seattle", - "destination": "Paris", - "departure": "2025-12-07", - "return_date": "2025-12-14", - "travellers": 2, - "flight_summary": None, - "hotel_summary": None, - "activities_summary": None, - "final_itinerary": None, - "current_agent": "start" - }], - "kwargs": {} - }) - }] - }]) + traceloop_input = json.dumps( + [ + { + "role": "user", + "parts": [ + { + "type": "text", + "content": json.dumps( + { + "args": [ + { + "messages": [ + { + "lc": 1, + "type": "constructor", + "id": [ + "langchain", + "schema", + "messages", + "HumanMessage", + ], + "kwargs": { + "content": "We're planning a romantic long-week trip to Paris from Seattle next month. We'd love a boutique hotel, business-class flights and a few unique experiences.", + "type": "human", + "id": "1a8d19f3-f45f-476d-a3cf-35a0b6ddaf00", + }, + } + ], + "user_request": "We're planning a romantic long-week trip to Paris from Seattle next month. We'd love a boutique hotel, business-class flights and a few unique experiences.", + "session_id": "ea8a14ca-0c6a-43f8-a725-c2441b00254b", + "origin": "Seattle", + "destination": "Paris", + "departure": "2025-12-07", + "return_date": "2025-12-14", + "travellers": 2, + "flight_summary": None, + "hotel_summary": None, + "activities_summary": None, + "final_itinerary": None, + "current_agent": "start", + } + ], + "kwargs": {}, + } + ), + } + ], + } + ] + ) # Reconstruct messages - input_messages, _ = reconstruct_messages_from_traceloop(traceloop_input, None) + input_messages, _ = reconstruct_messages_from_traceloop( + traceloop_input, None + ) # Verify reconstruction succeeded assert input_messages is not None, "Should reconstruct input messages" @@ -66,35 +91,51 @@ def test_reconstruct_nested_langchain_message(self): assert "Paris" in content, "Should contain destination" assert "Seattle" in content, "Should contain origin" assert "romantic" in content, "Should contain user request text" - + # Should NOT contain escaped JSON artifacts assert '\\"' not in content, "Should not have escaped quotes" - assert "lc\": 1" not in content, "Should not contain LangChain metadata" - assert "kwargs" not in content or "romantic" in content, \ - "Should extract actual content, not just wrapper metadata" + assert 'lc": 1' not in content, "Should not contain LangChain metadata" + assert ( + "kwargs" not in content or "romantic" in content + ), "Should extract actual content, not just wrapper metadata" def test_normalize_deeply_nested_content(self): """Test that normalize_traceloop_content handles deeply nested structures.""" # Raw nested structure - raw_input = [{ - "role": "user", - "parts": [{ - "type": "text", - "content": json.dumps({ - "args": [{ - "messages": [{ - "lc": 1, - "type": "constructor", - "id": ["langchain", "schema", "messages", "HumanMessage"], - "kwargs": { - "content": "Plan a trip to Paris", - "type": "human" + raw_input = [ + { + "role": "user", + "parts": [ + { + "type": "text", + "content": json.dumps( + { + "args": [ + { + "messages": [ + { + "lc": 1, + "type": "constructor", + "id": [ + "langchain", + "schema", + "messages", + "HumanMessage", + ], + "kwargs": { + "content": "Plan a trip to Paris", + "type": "human", + }, + } + ] + } + ] } - }] - }] - }) - }] - }] + ), + } + ], + } + ] # Normalize normalized = normalize_traceloop_content(raw_input, "input") @@ -107,7 +148,7 @@ def test_normalize_deeply_nested_content(self): # Verify content extraction parts = normalized[0]["parts"] assert len(parts) > 0, "Should have at least one part" - + content = parts[0].get("content", "") # The content should ideally be the actual message text, not nested JSON # If it's still nested JSON, we need to improve the normalizer @@ -117,20 +158,29 @@ def test_extract_langchain_message_from_nested_json(self): """Test extracting actual LangChain message content from nested JSON.""" # This is what we receive from Traceloop nested_content = { - "args": [{ - "messages": [{ - "lc": 1, - "type": "constructor", - "id": ["langchain", "schema", "messages", "HumanMessage"], - "kwargs": { - "content": "Book a flight from Seattle to Paris", - "type": "human", - "id": "test-id-123" - } - }], - "additional_context": "More data here" - }], - "kwargs": {} + "args": [ + { + "messages": [ + { + "lc": 1, + "type": "constructor", + "id": [ + "langchain", + "schema", + "messages", + "HumanMessage", + ], + "kwargs": { + "content": "Book a flight from Seattle to Paris", + "type": "human", + "id": "test-id-123", + }, + } + ], + "additional_context": "More data here", + } + ], + "kwargs": {}, } # This is what we want to extract @@ -139,14 +189,15 @@ def test_extract_langchain_message_from_nested_json(self): # Parse the structure to extract the actual message content # This logic should be in the normalizer or reconstructor extracted = self._extract_message_content(nested_content) - - assert extracted == expected_content, \ - f"Should extract actual message content, got: {extracted}" + + assert ( + extracted == expected_content + ), f"Should extract actual message content, got: {extracted}" def _extract_message_content(self, nested_structure): """ Helper to extract actual message content from nested Traceloop structure. - + This logic should be incorporated into the content normalizer. """ # Try to find LangChain message in args @@ -164,83 +215,113 @@ def _extract_message_content(self, nested_structure): content = kwargs.get("content") if content: return content - + # Fallback: return as-is return json.dumps(nested_structure) def test_coordinator_agent_input_format(self): """Test the actual format seen in coordinator_agent.task spans - REAL DATA.""" # Real data from production traces (gen_ai.input.messages) - traceloop_input = json.dumps([{ - "role": "user", - "parts": [{ - "type": "text", - "content": "{\"messages\": [{\"lc\": 1, \"type\": \"constructor\", \"id\": [\"langchain\", \"schema\", \"messages\", \"HumanMessage\"], \"kwargs\": {\"content\": \"We're planning a romantic long-week trip to Paris from Seattle next month. We'd love a boutique hotel, business-class flights and a few unique experiences.\", \"type\": \"human\", \"id\": \"b9d7a38c-1704-4df3-95c4-d0225cbe1cc7\"}}], \"user_request\": \"We're planning a romantic long-week trip to Paris from Seattle next month. We'd love a boutique hotel, business-class flights and a few unique experiences.\", \"session_id\": \"6b777204-14d1-429c-9fba-28a2bfced313\", \"origin\": \"Seattle\", \"destination\": \"Paris\", \"departure\": \"2025-12-08\", \"return_date\": \"2025-12-15\", \"travellers\": 2, \"flight_summary\": null, \"hotel_summary\": null, \"activities_summary\": null, \"final_itinerary\": null, \"current_agent\": \"start\"}" - }] - }]) + traceloop_input = json.dumps( + [ + { + "role": "user", + "parts": [ + { + "type": "text", + "content": '{"messages": [{"lc": 1, "type": "constructor", "id": ["langchain", "schema", "messages", "HumanMessage"], "kwargs": {"content": "We\'re planning a romantic long-week trip to Paris from Seattle next month. We\'d love a boutique hotel, business-class flights and a few unique experiences.", "type": "human", "id": "b9d7a38c-1704-4df3-95c4-d0225cbe1cc7"}}], "user_request": "We\'re planning a romantic long-week trip to Paris from Seattle next month. We\'d love a boutique hotel, business-class flights and a few unique experiences.", "session_id": "6b777204-14d1-429c-9fba-28a2bfced313", "origin": "Seattle", "destination": "Paris", "departure": "2025-12-08", "return_date": "2025-12-15", "travellers": 2, "flight_summary": null, "hotel_summary": null, "activities_summary": null, "final_itinerary": null, "current_agent": "start"}', + } + ], + } + ] + ) # Expected: Clean, readable content expected_content = "We're planning a romantic long-week trip to Paris from Seattle next month. We'd love a boutique hotel, business-class flights and a few unique experiences." # Reconstruct - input_messages, _ = reconstruct_messages_from_traceloop(traceloop_input, None) + input_messages, _ = reconstruct_messages_from_traceloop( + traceloop_input, None + ) assert input_messages is not None, "Should reconstruct messages" assert len(input_messages) > 0, "Should have messages" - + # Check if content is clean actual_content = input_messages[0].content - + # The content should be the clean user request, not nested JSON # If this fails, we need to enhance the content normalizer if expected_content not in actual_content: print(f"Expected: {expected_content}") print(f"Actual: {actual_content}") - + # For now, just verify it's not completely broken assert "Paris" in actual_content, "Should at least contain Paris" - assert "Seattle" in actual_content, "Should at least contain Seattle" - + assert ( + "Seattle" in actual_content + ), "Should at least contain Seattle" def test_output_message_with_nested_parts(self): """Test output messages with nested parts structure - REAL DATA.""" # Real data from production traces (gen_ai.output.messages) # This contains the coordinator's response with LangChain AIMessage - traceloop_output = json.dumps([{ - "role": "assistant", - "parts": [{ - "type": "text", - "content": "{\"outputs\": {\"messages\": [{\"lc\": 1, \"type\": \"constructor\", \"id\": [\"langchain\", \"schema\", \"messages\", \"HumanMessage\"], \"kwargs\": {\"content\": \"We're planning a romantic long-week trip to Paris from Seattle next month. We'd love a boutique hotel, business-class flights and a few unique experiences.\", \"type\": \"human\", \"id\": \"b9d7a38c-1704-4df3-95c4-d0225cbe1cc7\"}}, {\"lc\": 1, \"type\": \"constructor\", \"id\": [\"langchain\", \"schema\", \"messages\", \"AIMessage\"], \"kwargs\": {\"content\": \"**Travel Plan for Paris Trip**\\n\\n**Traveler Details:**\\n- Departure City: Seattle\\n- Destination: Paris\\n- Trip Duration: Long weekend (exact dates to be confirmed)\\n- Travel Class: Business Class\\n- Accommodation Preference: Boutique hotel\\n- Experience Preference: Unique experiences\\n\\n**Action Items for Specialist Agents:**\\n\\n1. **Flight Arrangements:**\\n - Research and book business-class flights from Seattle to Paris for the specified dates next month.\\n - Ensure flights have convenient departure and arrival times, considering potential layovers.\\n\\n2. **Accommodation:**\\n - Identify and recommend boutique hotels in Paris that offer a romantic atmosphere and excellent amenities.\\n - Consider locations that are central and provide easy access to popular attractions.\\n - Check for availability and special packages for couples.\\n\\n3. **Unique Experiences:**\\n - Curate a list of unique experiences that align with a romantic theme, such as:\\n - Private Seine River dinner cruise.\\n - Wine tasting tours in local vineyards.\\n - Cooking classes focusing on French cuisine.\\n - Private guided tours of iconic landmarks (e.g., Eiffel Tower, Louvre).\\n - Spa day or couples massage at a luxury spa.\\n\\n4. **Itinerary Planning:**\\n - Draft a suggested itinerary that balances leisure and exploration, incorporating the unique experiences.\\n - Include recommendations for romantic dining options and local attractions.\\n\\n5. **Additional Considerations:**\\n - Check for any travel restrictions or requirements for entry into France.\\n - Provide information on transportation options within Paris (e.g., metro, taxis, car rentals).\\n - Offer travel insurance options for peace of mind.\\n\\n**Next Steps:**\\n- Confirm the exact travel dates with the traveler.\\n- Proceed with bookings once the traveler approves the proposed options.\", \"additional_kwargs\": {\"refusal\": null}, \"response_metadata\": {\"token_usage\": {\"completion_tokens\": 356, \"prompt_tokens\": 65, \"total_tokens\": 421, \"completion_tokens_details\": {\"accepted_prediction_tokens\": 0, \"audio_tokens\": 0, \"reasoning_tokens\": 0, \"rejected_prediction_tokens\": 0}, \"prompt_tokens_details\": {\"audio_tokens\": 0, \"cached_tokens\": 0}}, \"model_provider\": \"openai\", \"model_name\": \"gpt-4o-mini-2024-07-18\", \"system_fingerprint\": \"fp_560af6e559\", \"id\": \"chatcmpl-CZRbToSens9vQKBUB2FWF9QobFAQM\", \"service_tier\": \"default\", \"finish_reason\": \"stop\", \"logprobs\": null}, \"type\": \"ai\", \"id\": \"lc_run--32afa4c0-bdfb-4450-8f37-bb65f216cbac-0\", \"usage_metadata\": {\"input_tokens\": 65, \"output_tokens\": 356, \"total_tokens\": 421, \"input_token_details\": {\"audio\": 0, \"cache_read\": 0}, \"output_token_details\": {\"audio\": 0, \"reasoning\": 0}}, \"tool_calls\": [], \"invalid_tool_calls\": []}}], \"user_request\": \"We're planning a romantic long-week trip to Paris from Seattle next month. We'd love a boutique hotel, business-class flights and a few unique experiences.\", \"session_id\": \"6b777204-14d1-429c-9fba-28a2bfced313\", \"origin\": \"Seattle\", \"destination\": \"Paris\", \"departure\": \"2025-12-08\", \"return_date\": \"2025-12-15\", \"travellers\": 2, \"flight_summary\": null, \"hotel_summary\": null, \"activities_summary\": null, \"final_itinerary\": null, \"current_agent\": \"flight_specialist\"}, \"kwargs\": {\"tags\": [\"graph:step:1\"]}}" - }], - "finish_reason": "stop" - }]) + traceloop_output = json.dumps( + [ + { + "role": "assistant", + "parts": [ + { + "type": "text", + "content": '{"outputs": {"messages": [{"lc": 1, "type": "constructor", "id": ["langchain", "schema", "messages", "HumanMessage"], "kwargs": {"content": "We\'re planning a romantic long-week trip to Paris from Seattle next month. We\'d love a boutique hotel, business-class flights and a few unique experiences.", "type": "human", "id": "b9d7a38c-1704-4df3-95c4-d0225cbe1cc7"}}, {"lc": 1, "type": "constructor", "id": ["langchain", "schema", "messages", "AIMessage"], "kwargs": {"content": "**Travel Plan for Paris Trip**\\n\\n**Traveler Details:**\\n- Departure City: Seattle\\n- Destination: Paris\\n- Trip Duration: Long weekend (exact dates to be confirmed)\\n- Travel Class: Business Class\\n- Accommodation Preference: Boutique hotel\\n- Experience Preference: Unique experiences\\n\\n**Action Items for Specialist Agents:**\\n\\n1. **Flight Arrangements:**\\n - Research and book business-class flights from Seattle to Paris for the specified dates next month.\\n - Ensure flights have convenient departure and arrival times, considering potential layovers.\\n\\n2. **Accommodation:**\\n - Identify and recommend boutique hotels in Paris that offer a romantic atmosphere and excellent amenities.\\n - Consider locations that are central and provide easy access to popular attractions.\\n - Check for availability and special packages for couples.\\n\\n3. **Unique Experiences:**\\n - Curate a list of unique experiences that align with a romantic theme, such as:\\n - Private Seine River dinner cruise.\\n - Wine tasting tours in local vineyards.\\n - Cooking classes focusing on French cuisine.\\n - Private guided tours of iconic landmarks (e.g., Eiffel Tower, Louvre).\\n - Spa day or couples massage at a luxury spa.\\n\\n4. **Itinerary Planning:**\\n - Draft a suggested itinerary that balances leisure and exploration, incorporating the unique experiences.\\n - Include recommendations for romantic dining options and local attractions.\\n\\n5. **Additional Considerations:**\\n - Check for any travel restrictions or requirements for entry into France.\\n - Provide information on transportation options within Paris (e.g., metro, taxis, car rentals).\\n - Offer travel insurance options for peace of mind.\\n\\n**Next Steps:**\\n- Confirm the exact travel dates with the traveler.\\n- Proceed with bookings once the traveler approves the proposed options.", "additional_kwargs": {"refusal": null}, "response_metadata": {"token_usage": {"completion_tokens": 356, "prompt_tokens": 65, "total_tokens": 421, "completion_tokens_details": {"accepted_prediction_tokens": 0, "audio_tokens": 0, "reasoning_tokens": 0, "rejected_prediction_tokens": 0}, "prompt_tokens_details": {"audio_tokens": 0, "cached_tokens": 0}}, "model_provider": "openai", "model_name": "gpt-4o-mini-2024-07-18", "system_fingerprint": "fp_560af6e559", "id": "chatcmpl-CZRbToSens9vQKBUB2FWF9QobFAQM", "service_tier": "default", "finish_reason": "stop", "logprobs": null}, "type": "ai", "id": "lc_run--32afa4c0-bdfb-4450-8f37-bb65f216cbac-0", "usage_metadata": {"input_tokens": 65, "output_tokens": 356, "total_tokens": 421, "input_token_details": {"audio": 0, "cache_read": 0}, "output_token_details": {"audio": 0, "reasoning": 0}}, "tool_calls": [], "invalid_tool_calls": []}}], "user_request": "We\'re planning a romantic long-week trip to Paris from Seattle next month. We\'d love a boutique hotel, business-class flights and a few unique experiences.", "session_id": "6b777204-14d1-429c-9fba-28a2bfced313", "origin": "Seattle", "destination": "Paris", "departure": "2025-12-08", "return_date": "2025-12-15", "travellers": 2, "flight_summary": null, "hotel_summary": null, "activities_summary": null, "final_itinerary": null, "current_agent": "flight_specialist"}, "kwargs": {"tags": ["graph:step:1"]}}', + } + ], + "finish_reason": "stop", + } + ] + ) # Reconstruct messages - _, output_messages = reconstruct_messages_from_traceloop(None, traceloop_output) + _, output_messages = reconstruct_messages_from_traceloop( + None, traceloop_output + ) - assert output_messages is not None, "Should reconstruct output messages" + assert ( + output_messages is not None + ), "Should reconstruct output messages" assert len(output_messages) > 0, "Should have messages" - + # Get the content - should be the AIMessage content, not the wrapper JSON - content = output_messages[0].content if len(output_messages) == 1 else output_messages[-1].content - + content = ( + output_messages[0].content + if len(output_messages) == 1 + else output_messages[-1].content + ) + # The content should be the actual travel plan, not nested JSON - assert "Travel Plan for Paris Trip" in content or "Paris" in content, \ - "Should contain the actual AI response content" - assert "Accommodation" in content or "Flight" in content or "Paris" in content, \ - "Should contain travel planning content" - + assert ( + "Travel Plan for Paris Trip" in content or "Paris" in content + ), "Should contain the actual AI response content" + assert ( + "Accommodation" in content + or "Flight" in content + or "Paris" in content + ), "Should contain travel planning content" + # Should NOT contain escaped quotes or JSON metadata # Note: The actual content has \\n which is fine (markdown formatting) # but should not have \\" (escaped JSON quotes) if '\\"' in content: - print(f"WARNING: Content still has escaped quotes: {content[:200]}") - + print( + f"WARNING: Content still has escaped quotes: {content[:200]}" + ) + # Should not contain LangChain metadata in the final content if '"lc": 1' in content or '"kwargs"' in content: - print(f"WARNING: Content still contains LangChain metadata: {content[:200]}") + print( + f"WARNING: Content still contains LangChain metadata: {content[:200]}" + ) if __name__ == "__main__": pytest.main([__file__, "-v", "-s"]) - diff --git a/util/opentelemetry-util-genai/CHANGELOG.md b/util/opentelemetry-util-genai/CHANGELOG.md index f243620..0f94723 100644 --- a/util/opentelemetry-util-genai/CHANGELOG.md +++ b/util/opentelemetry-util-genai/CHANGELOG.md @@ -1,16 +1,7 @@ # Changelog -All notable changes to this project will be documented in this file. +All notable changes to this repository are documented in this file. -The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), -and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## Version 0.1.4 - 2025-11-07 -## Unreleased - -- Add a utility to parse the `OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT` environment variable. - Add `gen_ai_latest_experimental` as a new value to the Sem Conv stability flag ([#3716](https://github.com/open-telemetry/opentelemetry-python-contrib/pull/3716)). - -### Added - -- Generate Spans for LLM invocations -- Helper functions for starting and finishing LLM invocations +- Initial 0.1.4 release of splunk-otel-util-genai From 32e28a509bcc57d2239df2953d3eec571cf55214 Mon Sep 17 00:00:00 2001 From: adityamehra Date: Fri, 14 Nov 2025 14:15:22 -0800 Subject: [PATCH 3/5] fix linting --- .../examples/multi_agent_travel_planner/traceloop/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/main.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/main.py index e16fdc6..b3e1c2b 100755 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/main.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/main.py @@ -171,7 +171,7 @@ def _configure_otlp_logging() -> None: # same LoggerProvider with the OTLP exporter. Without this, events go to # a default/NoOp provider and never reach the collector! _events.set_event_logger_provider(EventLoggerProvider(logger_provider)) - print(f"[INIT] EventLoggerProvider configured (uses same OTLP exporter)") + print("[INIT] EventLoggerProvider configured (uses same OTLP exporter)") # Configure logging for evaluation results From f116be3e4f6892c4812481f5e196858302dbebd4 Mon Sep 17 00:00:00 2001 From: adityamehra Date: Fri, 14 Nov 2025 14:37:11 -0800 Subject: [PATCH 4/5] fix ruff lint --- .../examples/multi_agent_travel_planner/traceloop/main.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/main.py b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/main.py index b3e1c2b..8e3684b 100755 --- a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/main.py +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/main.py @@ -127,7 +127,7 @@ def _configure_otlp_logging() -> None: This is needed for evaluation results to be emitted as OTLP log records. Traceloop SDK handles traces, but we need to explicitly configure logs. - + CRITICAL: Also configures EventLoggerProvider to use the same LoggerProvider, since Events are just LogRecords and need the same exporter. """ @@ -165,7 +165,7 @@ def _configure_otlp_logging() -> None: logger_provider.add_log_record_processor(log_processor) set_logger_provider(logger_provider) print(f"[INIT] OTLP logging configured, endpoint={log_endpoint}") - + # CRITICAL FIX: Configure EventLoggerProvider to use the same LoggerProvider # Events are just LogRecords under the hood, so they need to go through the # same LoggerProvider with the OTLP exporter. Without this, events go to From 5356bd19ba1380b00dd6f813eba2fb5a7a1b3e26 Mon Sep 17 00:00:00 2001 From: adityamehra Date: Mon, 17 Nov 2025 12:13:50 -0800 Subject: [PATCH 5/5] Add README.md --- .../traceloop/README.md | 138 ++++++++++++++++++ 1 file changed, 138 insertions(+) create mode 100644 instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/README.md diff --git a/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/README.md b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/README.md new file mode 100644 index 0000000..38b2899 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop/README.md @@ -0,0 +1,138 @@ +# Multi-Agent Travel Planner (Traceloop translator + OpenTelemetry) + +This example shows a small team of LangChain agents connected by a LangGraph state machine to produce a week-long city break itinerary (flights, hotel, activities, synthesis). It demonstrates GenAI observability (traces, metrics, logs, evaluation metrics) using the Traceloop SDK. + +Also, for [more info](https://help.splunk.com/en/splunk-observability-cloud/observability-for-ai/instrument-an-ai-application/collect-data-from-traceloop-instrumented-ai-applications) + +## 1. Architecture Overview + +Agents (ReAct-style): + +1. `coordinator` – Interprets traveller request and outlines a plan. +2. `flight_specialist` – Suggests flight option (tool: `mock_search_flights`). +3. `hotel_specialist` – Recommends hotel (tool: `mock_search_hotels`). +4. `activity_specialist` – Curates activities (tool: `mock_search_activities`). +5. `plan_synthesizer` – Produces final structured itinerary. + +Note: for LangGraph nodes to be recognized as `AgentInvocation` by the instrumentation, it has to have the following configuration + +- `agent_name` metadata in the LangGraph config, i.e. + +```python +_create_react_agent(llm, tools=[]).with_config( + { + "metadata": { + "agent_name": "coordinator", + }, + } +``` + +- `agent` tags in the config, i.e + +```python +_create_react_agent(llm, tools=[]).with_config( + { + "tags": ["agent:coordinator"], + } +``` + +See more example in `main.py` example. + +LangGraph `StateGraph` drives transitions; `should_continue` moves through sequence until `END`. State (`PlannerState`) accumulates messages, per-agent summaries, poison events, and the final itinerary. + +```text +[User Request] --> Pre-Parse (origin/dest/dates) --> START + | + v + LangGraph Workflow + +-------------+----------+-------------+---------------+ + | | | | | + [Coordinator] -> [Flight] -> [Hotel] -> [Activities] -> [Synthesizer] -> END + | | | | | + +-------------+----------+-------------+---------------+ +``` + +## 2. Prompt Poisoning (Quality Noise) + +To exercise evaluation dimensions, the app can inject mild quality-degrading snippets into agent prompts (hallucination, bias, irrelevance, negative sentiment, toxicity). Controlled by env vars: + +| Variable | Default | Purpose | +|----------|---------|---------| +| `TRAVEL_POISON_PROB` | `0.8` | Probability a step is poisoned | +| `TRAVEL_POISON_TYPES` | `hallucination,bias,irrelevance,negative_sentiment,toxicity` | Allowed poison types | +| `TRAVEL_POISON_MAX` | `2` | Max snippets injected per step | +| `TRAVEL_POISON_SEED` | (unset) | Deterministic seed | + +Injected kinds recorded in `state['poison_events']` and can surface evaluation metrics (bias, relevance, hallucination, sentiment, toxicity). Snippets are intentionally mild and safe. + +## 3. Local Setup + +```bash +cd instrumentation-genai/opentelemetry-instrumentation-langchain/examples/multi_agent_travel_planner/traceloop +python3 -m venv .venv +source .venv/bin/activate +pip install -r requirements.txt +``` + +modify `.env`, change there or export your `OPENAI_API_KEY`. Run the app + +Evaluation metrics appear on `model/LLM` spans allowing correlation with injected poison events. These synthetic spans are added in `on_end()` lifecycle of the span using the `TelemetryHandelr` in `splunk-otel-util-genai-translator-traceloop` + +## 4. Docker Usage + +The `Dockerfile` installs dependencies and sets `CMD ["python3", "main.py"]`. + +Build image: + +```bash +docker build -t travel-planner-tl:latest . +``` + +Run manual mode: + +```bash +docker run --rm -e OPENAI_API_KEY=$OPENAI_API_KEY \ + -e OTEL_EXPORTER_OTLP_ENDPOINT=http://host.docker.internal:4317 \ + travel-planner-tl:latest python main.py +``` + +## 7. Kubernetes Deployment + +Example `cronjob.yaml` runs this demo as a cron job workload + +## 8. Core Environment Variables + +| Variable | Purpose | +|----------|---------| +| `OPENAI_API_KEY` | Authenticates ChatOpenAI | +| `OPENAI_MODEL` | Chooses model (default fallback in code) | +| `OTEL_EXPORTER_OTLP_ENDPOINT` | Collector endpoint (gRPC) | +| `OTEL_EXPORTER_OTLP_PROTOCOL` | Usually `grpc` | +| `OTEL_SERVICE_NAME` | Service identity | +| `OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT` | Capture prompt & output content | +| `TRAVEL_POISON_*` | Controls prompt noise injection | + +## 9. Observing Evaluation Metrics + +Metrics & logs exported (names may include operation suffixes): + +- `gen_ai.evaluation.hallucination` +- `gen_ai.evaluation.bias` +- `gen_ai.evaluation.relevance` +- `gen_ai.evaluation.sentiment` +- `gen_ai.evaluation.toxicity` + +Higher scores often correlate with injected poison snippets. + +## 10. Key Files + +| File | Purpose | +|------|---------| +| `main.py` | Workflow, poisoning, optional manual instrumentation setup | +| `Dockerfile` | Container build (editable installs + example requirements) | +| `cronjob.yaml` | Example CronJob manifest (modify for two flavors) | +| `requirements.txt` | Python dependencies for the sample | + +## 11. Disclaimer + +Poison snippets are intentionally mild, non-harmful, and used solely to trigger evaluation telemetry for demonstration. Not production travel advice.