diff --git a/instrumentation-genai/opentelemetry-instrumentation-crewai/CHANGELOG.md b/instrumentation-genai/opentelemetry-instrumentation-crewai/CHANGELOG.md new file mode 100644 index 0000000..a58b35a --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-crewai/CHANGELOG.md @@ -0,0 +1,35 @@ +# Changelog + +All notable changes to this project will be documented in this file. + +The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), +and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). + +## [Unreleased] + +## [0.1.0] - 2025-11-25 + +### Added +- Initial release of CrewAI instrumentation +- Wrapper-based instrumentation for CrewAI workflows, agents, tasks, and tools +- Support for `Crew.kickoff()` → `Workflow` spans +- Support for `Task.execute_sync()` → `Step` spans +- Support for `Agent.execute_task()` → `AgentInvocation` spans +- Support for `BaseTool.run()` and `CrewStructuredTool.invoke()` → `ToolCall` spans +- Integration with `splunk-otel-util-genai` for standardized GenAI telemetry +- Proper trace context propagation using `contextvars` +- Rich span attributes for all CrewAI components +- Defensive instrumentation that doesn't break applications on errors + +### Documentation +- Comprehensive README with usage examples +- Compositional instrumentation patterns (CrewAI + OpenAI + Vector Stores) +- Configuration and environment variable documentation + +### Limitations +- Synchronous workflows only (async support planned for future release) +- LLM calls not instrumented (use provider-specific instrumentation) + +[Unreleased]: https://github.com/signalfx/splunk-otel-python-contrib/compare/v0.1.0...HEAD +[0.1.0]: https://github.com/signalfx/splunk-otel-python-contrib/releases/tag/v0.1.0 + diff --git a/instrumentation-genai/opentelemetry-instrumentation-crewai/README.rst b/instrumentation-genai/opentelemetry-instrumentation-crewai/README.rst new file mode 100644 index 0000000..7519a5d --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-crewai/README.rst @@ -0,0 +1,198 @@ +OpenTelemetry CrewAI Instrumentation +===================================== + +|pypi| + +.. |pypi| image:: https://badge.fury.io/py/splunk-otel-instrumentation-crewai.svg + :target: https://pypi.org/project/splunk-otel-instrumentation-crewai/ + +This library provides OpenTelemetry instrumentation for `CrewAI `_, +a framework for orchestrating autonomous AI agents. + +Installation +------------ + +.. code-block:: bash + + pip install splunk-otel-instrumentation-crewai + + +Usage +----- + +.. code-block:: python + + from opentelemetry.instrumentation.crewai import CrewAIInstrumentor + from crewai import Agent, Task, Crew + + # Instrument CrewAI + CrewAIInstrumentor().instrument() + + # Create your crew + agent = Agent( + role="Research Analyst", + goal="Provide accurate research", + backstory="Expert researcher with attention to detail", + ) + + task = Task( + description="Research the latest AI trends", + expected_output="A comprehensive report on AI trends", + agent=agent, + ) + + crew = Crew(agents=[agent], tasks=[task]) + + # Run your crew - telemetry is automatically captured + result = crew.kickoff() + + +What Gets Instrumented +----------------------- + +This instrumentation captures: + +- **Crews** → Mapped to ``Workflow`` spans +- **Tasks** → Mapped to ``Step`` spans +- **Agents** → Mapped to ``AgentInvocation`` spans +- **Tool Usage** → Mapped to ``ToolCall`` spans + +All spans are properly nested with correct parent-child relationships and include +rich attributes about the operation. + + +Compositional Instrumentation +------------------------------ + +This instrumentation focuses on CrewAI's workflow orchestration. For complete observability: + +**CrewAI Only** + +.. code-block:: python + + from opentelemetry.instrumentation.crewai import CrewAIInstrumentor + + CrewAIInstrumentor().instrument() + +Provides workflow structure but no LLM call details. + +**CrewAI + OpenAI** + +.. code-block:: python + + from opentelemetry.instrumentation.crewai import CrewAIInstrumentor + from opentelemetry.instrumentation.openai import OpenAIInstrumentor + + CrewAIInstrumentor().instrument() + OpenAIInstrumentor().instrument() + +Adds LLM call spans with token usage, model names, and latency metrics. + +**Full Stack (CrewAI + OpenAI + Vector Store)** + +.. code-block:: python + + from opentelemetry.instrumentation.crewai import CrewAIInstrumentor + from opentelemetry.instrumentation.openai import OpenAIInstrumentor + from opentelemetry.instrumentation.chromadb import ChromaDBInstrumentor + + CrewAIInstrumentor().instrument() + OpenAIInstrumentor().instrument() + ChromaDBInstrumentor().instrument() + +Complete RAG workflow visibility with vector store operations. + + +Configuration +------------- + +Environment Variables +~~~~~~~~~~~~~~~~~~~~~ + +.. code-block:: bash + + # Disable CrewAI's built-in telemetry (recommended) + export CREWAI_DISABLE_TELEMETRY=true + + +Instrumentation Options +~~~~~~~~~~~~~~~~~~~~~~~ + +.. code-block:: python + + from opentelemetry.instrumentation.crewai import CrewAIInstrumentor + + # Basic instrumentation + CrewAIInstrumentor().instrument() + + # With custom tracer provider + CrewAIInstrumentor().instrument(tracer_provider=my_tracer_provider) + + # Uninstrumentation + CrewAIInstrumentor().uninstrument() + + +Requirements +------------ + +- Python >= 3.9 +- CrewAI >= 0.70.0 +- OpenTelemetry API >= 1.38 +- ``splunk-otel-util-genai`` >= 0.1.4 + + +Trace Hierarchy Example +------------------------ + +.. code-block:: + + Crew: Customer Support (Workflow) + ├── Task: inquiry_resolution (Step) + │ └── Agent: Senior Support Representative + │ ├── LLM: gpt-4o-mini (via openai-instrumentation) + │ └── Tool: docs_scrape + └── Task: quality_assurance (Step) + └── Agent: QA Specialist + └── LLM: gpt-4o-mini (via openai-instrumentation) + + +Each span includes rich attributes: + +- ``gen_ai.system`` = "crewai" +- ``gen_ai.operation.name`` = "invoke_workflow" | "invoke_agent" | "execute_tool" +- Framework-specific attributes (agent role, task description, tool names, etc.) + + +Limitations +----------- + +- **Async Support**: Currently supports synchronous workflows only. Async support (``kickoff_async()``) + is planned for a future release. +- **LLM Calls**: Not instrumented here. Use provider-specific instrumentation + (e.g., ``opentelemetry-instrumentation-openai``). + + +Contributing +------------ + +Contributions are welcome! Please ensure: + +- All tests pass +- Code follows project style guidelines +- Instrumentation is defensive (catches exceptions) +- Documentation is updated + + +Links +----- + +- `CrewAI Documentation `_ +- `OpenTelemetry Python `_ +- `Splunk GenAI Utilities `_ + + +License +------- + +Apache-2.0 + diff --git a/instrumentation-genai/opentelemetry-instrumentation-crewai/examples/manual/.env b/instrumentation-genai/opentelemetry-instrumentation-crewai/examples/manual/.env new file mode 100644 index 0000000..a04ad24 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-crewai/examples/manual/.env @@ -0,0 +1,7 @@ +CREWAI_DISABLE_TELEMETRY=true +OPENAI_API_KEY= +OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317 +PYTHONUNBUFFERED=1 +OTEL_SERVICE_NAME=crewai-examples +DEEPEVAL_TELEMETRY_OPT_OUT="YES" +OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT=true \ No newline at end of file diff --git a/instrumentation-genai/opentelemetry-instrumentation-crewai/examples/manual/Dockerfile b/instrumentation-genai/opentelemetry-instrumentation-crewai/examples/manual/Dockerfile new file mode 100644 index 0000000..4c2eae9 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-crewai/examples/manual/Dockerfile @@ -0,0 +1,35 @@ +FROM python:3.12-slim + +WORKDIR /app + +# Install git for pip dependencies +RUN apt-get update && apt-get install -y --no-install-recommends \ + git \ + && rm -rf /var/lib/apt/lists/* + +# Copy only the CrewAI instrumentation package and example +COPY instrumentation-genai/opentelemetry-instrumentation-crewai /app/opentelemetry-instrumentation-crewai + +# Set working directory to examples +WORKDIR /app/opentelemetry-instrumentation-crewai/examples + +# Install Python dependencies (including genai utils from PyPI) +RUN pip install --no-cache-dir -r requirements.txt + +# Install local CrewAI instrumentation package +RUN pip install --no-cache-dir /app/opentelemetry-instrumentation-crewai + +# Verify packages are installed correctly +RUN python3 -c "from opentelemetry.instrumentation.crewai import CrewAIInstrumentor; print('✓ CrewAI instrumentation available')" && \ + python3 -c "from opentelemetry.util.genai.handler import get_telemetry_handler; print('✓ GenAI handler available (from PyPI)')" + +# Set default environment variables +ENV OTEL_PYTHON_LOG_CORRELATION=true \ + OTEL_PYTHON_LOG_LEVEL=info \ + OTEL_EXPORTER_OTLP_PROTOCOL=grpc \ + PYTHONUNBUFFERED=1 \ + CREWAI_DISABLE_TELEMETRY=true + +# Run the customer support example +CMD ["python3", "customer_support.py"] + diff --git a/instrumentation-genai/opentelemetry-instrumentation-crewai/examples/manual/cronjob.yaml b/instrumentation-genai/opentelemetry-instrumentation-crewai/examples/manual/cronjob.yaml new file mode 100644 index 0000000..e596c04 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-crewai/examples/manual/cronjob.yaml @@ -0,0 +1,162 @@ +apiVersion: batch/v1 +kind: CronJob +metadata: + name: customer-support-crew + namespace: o11y-4-ai-admehra + labels: + app: customer-support-crew + component: telemetry + annotations: + description: "Customer support CrewAI multi-agent with OpenTelemetry instrumentation and GenAI evaluations" + git-commit: "8b573f3" +spec: + # Run every 4 hours from 8 AM to 4 PM PST on weekdays (Monday-Friday) + # Times in PST: 8am, 12pm, 4pm + schedule: "0 8,12,16 * * 1-5" + timeZone: "America/Los_Angeles" + suspend: false + + # Keep last 3 successful and 1 failed job for debugging + successfulJobsHistoryLimit: 3 + failedJobsHistoryLimit: 1 + + jobTemplate: + metadata: + labels: + app: customer-support-crew + component: telemetry + spec: + template: + metadata: + labels: + app: customer-support-crew + component: telemetry + spec: + restartPolicy: OnFailure + + containers: + - name: customer-support-crew + # Multi-platform image (amd64, arm64) with git commit hash tag + image: admehra621/customer-support-crew:latest + imagePullPolicy: Always + + env: + # === GenAI Semantic Conventions (REQUIRED) === + - name: OTEL_SEMCONV_STABILITY_OPT_IN + value: "gen_ai_latest_experimental" + + # === OpenTelemetry Resource Attributes === + - name: OTEL_RESOURCE_ATTRIBUTES + value: "deployment.environment=o11y-inframon-ai,git.commit.id=8b573f3" + + # === Service name for telemetry === + - name: OTEL_SERVICE_NAME + value: "customer-support-crew" + + # === OpenAI Configuration === + - name: OPENAI_API_KEY + valueFrom: + secretKeyRef: + name: openai-credentials + key: api-key + + - name: OPENAI_MODEL_NAME + value: "gpt-4o-mini" + + # === Serper API Key for web search (if available) === + # Uncomment if you add serper-api-key to the secret + # - name: SERPER_API_KEY + # valueFrom: + # secretKeyRef: + # name: openai-credentials + # key: serper-api-key + + # === CrewAI Configuration === + - name: CREWAI_DISABLE_TELEMETRY + value: "true" + + # === Deepeval Telemetry Opt-Out === + - name: DEEPEVAL_TELEMETRY_OPT_OUT + value: "YES" + + # === GenAI Content Capture === + - name: OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT + value: "true" + + - name: OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT_MODE + value: "SPAN_AND_EVENT" + + # === GenAI Emitters Configuration === + - name: OTEL_INSTRUMENTATION_GENAI_EMITTERS + value: "span_metric_event,splunk" + + - name: OTEL_INSTRUMENTATION_GENAI_EMITTERS_EVALUATION + value: "replace-category:SplunkEvaluationResults" + + # === Evaluation Settings === + # All 5 default evaluations enabled (bias, toxicity, relevance, hallucination, sentiment) + - name: OTEL_INSTRUMENTATION_GENAI_EVALS_RESULTS_AGGREGATION + value: "true" + + # === OpenTelemetry Logs Exporter === + - name: OTEL_LOGS_EXPORTER + value: "otlp" + + # === Get the host IP for Splunk OTEL agent === + - name: SPLUNK_OTEL_AGENT + valueFrom: + fieldRef: + fieldPath: status.hostIP + + # === OpenTelemetry OTLP endpoint using Splunk agent === + - name: OTEL_EXPORTER_OTLP_ENDPOINT + value: "http://$(SPLUNK_OTEL_AGENT):4317" + + # === OTLP Protocol (grpc) === + - name: OTEL_EXPORTER_OTLP_PROTOCOL + value: "grpc" + + # === Exclude health check URLs === + - name: OTEL_PYTHON_EXCLUDED_URLS + value: "^(https?://)?[^/]+(/)?$" + + # === Traces Sampler Configuration === + - name: OTEL_TRACES_SAMPLER + value: "parentbased_traceidratio" + + - name: OTEL_TRACES_SAMPLER_ARG + value: "1.0" + + # === Enable Python logging auto instrumentation === + - name: OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED + value: "true" + + # === Enable log correlation === + - name: OTEL_PYTHON_LOG_CORRELATION + value: "true" + + # === Enable CrewAI content capture === + - name: OTEL_INSTRUMENTATION_CREWAI_CAPTURE_MESSAGE_CONTENT + value: "true" + + # === Enable Splunk profiler === + - name: SPLUNK_PROFILER_ENABLED + value: "true" + + # === Unbuffered Python output === + - name: PYTHONUNBUFFERED + value: "1" + + # === GenAI evaluation sampling rate === + - name: OTEL_GENAI_EVALUATION_SAMPLING_RATE + value: "1" + + # === Resource limits === + resources: + requests: + memory: "512Mi" + cpu: "500m" + limits: + memory: "1Gi" + cpu: "1000m" + diff --git a/instrumentation-genai/opentelemetry-instrumentation-crewai/examples/manual/customer_support.py b/instrumentation-genai/opentelemetry-instrumentation-crewai/examples/manual/customer_support.py new file mode 100644 index 0000000..c470582 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-crewai/examples/manual/customer_support.py @@ -0,0 +1,222 @@ +from crewai import Agent, Task, Crew + +import sys +import time +from crewai_tools import ScrapeWebsiteTool + +import os +from opentelemetry import trace, metrics +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter +from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import OTLPMetricExporter +from opentelemetry.sdk import trace as trace_sdk +from opentelemetry.sdk import metrics as metrics_sdk +from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor, BatchSpanProcessor +from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader, ConsoleMetricExporter + +from opentelemetry.instrumentation.crewai import CrewAIInstrumentor +from opentelemetry.instrumentation.openai import OpenAIInstrumentor + +# Enable console output for local debugging (set to "false" in cluster) +ENABLE_CONSOLE_OUTPUT = os.environ.get("OTEL_CONSOLE_OUTPUT", "false").lower() == "true" + +# Configure Trace Provider with OTLP exporter +tracer_provider = trace_sdk.TracerProvider() +tracer_provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter())) + +if ENABLE_CONSOLE_OUTPUT: + tracer_provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter())) + +# CRITICAL: Register the tracer provider globally so it can be flushed +trace.set_tracer_provider(tracer_provider) + +# Configure Metrics Provider with OTLP exporter +metric_readers = [ + PeriodicExportingMetricReader( + OTLPMetricExporter(), + export_interval_millis=60000 # Export every 60 seconds for production + ) +] + +if ENABLE_CONSOLE_OUTPUT: + metric_readers.append( + PeriodicExportingMetricReader( + ConsoleMetricExporter(), + export_interval_millis=60000 + ) + ) + +meter_provider = metrics_sdk.MeterProvider(metric_readers=metric_readers) +metrics.set_meter_provider(meter_provider) + +# Disable CrewAI's built-in telemetry +os.environ["CREWAI_DISABLE_TELEMETRY"] = "true" +os.environ["OPENAI_MODEL_NAME"] = 'gpt-4o-mini' + +# Enable metrics in genai-util (defaults to span-only) +os.environ["OTEL_INSTRUMENTATION_GENAI_EMITTERS"] = "span_metric" + +support_agent = Agent( + role="Senior Support Representative", + goal="Be the most friendly and helpful " + "support representative in your team", + backstory=( + "You work at crewAI (https://crewai.com) and " + " are now working on providing " + "support to {customer}, a super important customer " + " for your company." + "You need to make sure that you provide the best support!" + "Make sure to provide full complete answers, " + " and make no assumptions." + ), + allow_delegation=False, + verbose=False +) + +# By not setting allow_delegation=False, allow_delegation takes its default value of being True. +# This means the agent can delegate its work to another agent which is better suited to do a particular task. + + +support_quality_assurance_agent = Agent( + role="Support Quality Assurance Specialist", + goal="Get recognition for providing the " + "best support quality assurance in your team", + backstory=( + "You work at crewAI (https://crewai.com) and " + "are now working with your team " + "on a request from {customer} ensuring that " + "the support representative is " + "providing the best support possible.\n" + "You need to make sure that the support representative " + "is providing full" + "complete answers, and make no assumptions." + ), + verbose=False +) + +docs_scrape_tool = ScrapeWebsiteTool( + website_url="https://docs.crewai.com/en/concepts/crews" +) + +# You are passing the Tool on the Task Level +inquiry_resolution = Task( + description=( + "{customer} just reached out with a super important ask:\n" + "{inquiry}\n\n" + "{person} from {customer} is the one that reached out. " + "Make sure to use everything you know " + "to provide the best support possible." + "You must strive to provide a complete " + "and accurate response to the customer's inquiry." + ), + expected_output=( + "A detailed, informative response to the " + "customer's inquiry that addresses " + "all aspects of their question.\n" + "The response should include references " + "to everything you used to find the answer, " + "including external data or solutions. " + "Ensure the answer is complete, " + "leaving no questions unanswered, and maintain a helpful and friendly " + "tone throughout." + ), + tools=[docs_scrape_tool], + agent=support_agent, +) + +# quality_assurance_review is not using any Tool(s) +# Here the QA Agent will only review the work of the Support Agent +quality_assurance_review = Task( + description=( + "Review the response drafted by the Senior Support Representative for {customer}'s inquiry. " + "Ensure that the answer is comprehensive, accurate, and adheres to the " + "high-quality standards expected for customer support.\n" + "Verify that all parts of the customer's inquiry " + "have been addressed " + "thoroughly, with a helpful and friendly tone.\n" + "Check for references and sources used to " + " find the information, " + "ensuring the response is well-supported and " + "leaves no questions unanswered." + ), + expected_output=( + "A final, detailed, and informative response " + "ready to be sent to the customer.\n" + "This response should fully address the " + "customer's inquiry, incorporating all " + "relevant feedback and improvements.\n" + "Don't be too formal, we are a chill and cool company " + "but maintain a professional and friendly tone throughout." + ), + agent=support_quality_assurance_agent, +) + +# Setting memory=True when putting the crew together enables Memory +crew = Crew( + agents=[support_agent, support_quality_assurance_agent], + tasks=[inquiry_resolution, quality_assurance_review], + verbose=False, + memory=True +) + +inputs = { + "customer": "Splunk Olly for AI", + "person": "Aditya Mehra", + "inquiry": "I need help with setting up a Crew " + "and kicking it off, specifically " + "how can I add memory to my crew? " + "Can you provide guidance?" +} + +OpenAIInstrumentor().instrument( + tracer_provider=tracer_provider) +CrewAIInstrumentor().instrument( + tracer_provider=tracer_provider, + meter_provider=meter_provider +) + +def flush_telemetry(): + """Flush all OpenTelemetry providers before exit to ensure traces and metrics are exported.""" + print("\n[FLUSH] Starting telemetry flush", flush=True) + + # Flush traces + try: + tracer_provider = trace.get_tracer_provider() + if hasattr(tracer_provider, "force_flush"): + print("[FLUSH] Flushing traces (timeout=30s)", flush=True) + tracer_provider.force_flush(timeout_millis=30000) + except Exception as e: + print(f"[FLUSH] Warning: Could not flush traces: {e}", flush=True) + + # Flush metrics + try: + meter_provider_instance = metrics.get_meter_provider() + if hasattr(meter_provider_instance, "force_flush"): + print("[FLUSH] Flushing metrics (timeout=30s)", flush=True) + meter_provider_instance.force_flush(timeout_millis=30000) + if hasattr(meter_provider_instance, "shutdown"): + print("[FLUSH] Shutting down metrics provider", flush=True) + meter_provider_instance.shutdown() + except Exception as e: + print(f"[FLUSH] Warning: Could not flush metrics: {e}", flush=True) + + # Give batch processors time to complete final export + time.sleep(2) + print("[FLUSH] Telemetry flush complete\n", flush=True) + +if __name__ == "__main__": + exit_code = 0 + try: + result = crew.kickoff(inputs=inputs) + print("\n[SUCCESS] Crew execution completed") + except Exception as e: + print(f"\n[ERROR] Crew execution failed: {e}", file=sys.stderr) + import traceback + traceback.print_exc() + exit_code = 1 + finally: + # CRITICAL: Always flush telemetry to ensure spans and metrics are exported + print("\n" + "="*100) + print("METRICS OUTPUT BELOW - Look for gen_ai.agent.duration and gen_ai.workflow.duration") + print("="*100 + "\n") + flush_telemetry() + sys.exit(exit_code) diff --git a/instrumentation-genai/opentelemetry-instrumentation-crewai/examples/manual/financial_assistant.py b/instrumentation-genai/opentelemetry-instrumentation-crewai/examples/manual/financial_assistant.py new file mode 100644 index 0000000..1a0a9df --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-crewai/examples/manual/financial_assistant.py @@ -0,0 +1,199 @@ +from crewai import Agent, Task, Crew, Process +from langchain_openai import ChatOpenAI + +import os +# Disable CrewAI's built-in telemetry +os.environ["CREWAI_DISABLE_TELEMETRY"] = "true" +os.environ["OPENAI_MODEL_NAME"] = 'gpt-3.5-turbo' +# os.environ["OPENAI_MODEL_NAME"] = 'gpt-4o-mini' + +from crewai_tools import ScrapeWebsiteTool, SerperDevTool + +search_tool = SerperDevTool() +scrape_tool = ScrapeWebsiteTool() + +data_analyst_agent = Agent( + role="Data Analyst", + goal="Monitor and analyze market data in real-time " + "to identify trends and predict market movements.", + backstory="Specializing in financial markets, this agent " + "uses statistical modeling and machine learning " + "to provide crucial insights. With a knack for data, " + "the Data Analyst Agent is the cornerstone for " + "informing trading decisions.", + verbose=True, + allow_delegation=True, + tools = [scrape_tool, search_tool] +) + +trading_strategy_agent = Agent( + role="Trading Strategy Developer", + goal="Develop and test various trading strategies based " + "on insights from the Data Analyst Agent.", + backstory="Equipped with a deep understanding of financial " + "markets and quantitative analysis, this agent " + "devises and refines trading strategies. It evaluates " + "the performance of different approaches to determine " + "the most profitable and risk-averse options.", + verbose=True, + allow_delegation=True, + tools = [scrape_tool, search_tool] +) + +execution_agent = Agent( + role="Trade Advisor", + goal="Suggest optimal trade execution strategies " + "based on approved trading strategies.", + backstory="This agent specializes in analyzing the timing, price, " + "and logistical details of potential trades. By evaluating " + "these factors, it provides well-founded suggestions for " + "when and how trades should be executed to maximize " + "efficiency and adherence to strategy.", + verbose=True, + allow_delegation=True, + tools = [scrape_tool, search_tool] +) + +risk_management_agent = Agent( + role="Risk Advisor", + goal="Evaluate and provide insights on the risks " + "associated with potential trading activities.", + backstory="Armed with a deep understanding of risk assessment models " + "and market dynamics, this agent scrutinizes the potential " + "risks of proposed trades. It offers a detailed analysis of " + "risk exposure and suggests safeguards to ensure that " + "trading activities align with the firm’s risk tolerance.", + verbose=True, + allow_delegation=True, + tools = [scrape_tool, search_tool] +) + +# Task for Data Analyst Agent: Analyze Market Data +data_analysis_task = Task( + description=( + "Continuously monitor and analyze market data for " + "the selected stock ({stock_selection}). " + "Use statistical modeling and machine learning to " + "identify trends and predict market movements." + ), + expected_output=( + "Insights and alerts about significant market " + "opportunities or threats for {stock_selection}." + ), + agent=data_analyst_agent, +) + +# Task for Trading Strategy Agent: Develop Trading Strategies +strategy_development_task = Task( + description=( + "Develop and refine trading strategies based on " + "the insights from the Data Analyst and " + "user-defined risk tolerance ({risk_tolerance}). " + "Consider trading preferences ({trading_strategy_preference})." + ), + expected_output=( + "A set of potential trading strategies for {stock_selection} " + "that align with the user's risk tolerance." + ), + agent=trading_strategy_agent, +) + +# Task for Trade Advisor Agent: Plan Trade Execution +execution_planning_task = Task( + description=( + "Analyze approved trading strategies to determine the " + "best execution methods for {stock_selection}, " + "considering current market conditions and optimal pricing." + ), + expected_output=( + "Detailed execution plans suggesting how and when to " + "execute trades for {stock_selection}." + ), + agent=execution_agent, +) + +# Task for Risk Advisor Agent: Assess Trading Risks +risk_assessment_task = Task( + description=( + "Evaluate the risks associated with the proposed trading " + "strategies and execution plans for {stock_selection}. " + "Provide a detailed analysis of potential risks " + "and suggest mitigation strategies." + ), + expected_output=( + "A comprehensive risk analysis report detailing potential " + "risks and mitigation recommendations for {stock_selection}." + ), + agent=risk_management_agent, +) + +# Define the crew with agents and tasks +financial_trading_crew = Crew( + agents=[data_analyst_agent, + trading_strategy_agent, + execution_agent, + risk_management_agent], + + tasks=[data_analysis_task, + strategy_development_task, + execution_planning_task, + risk_assessment_task], + + manager_llm=ChatOpenAI(model="gpt-3.5-turbo",temperature=0.1), + process=Process.sequential, + verbose=True +) + +# Example data for kicking off the process +financial_trading_inputs = { + 'stock_selection': 'CSCO', + 'initial_capital': '100000', + 'risk_tolerance': 'Medium', + 'trading_strategy_preference': 'Day Trading', + 'news_impact_consideration': True +} + +from opentelemetry import trace +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter +from opentelemetry.sdk import trace as trace_sdk +from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor, BatchSpanProcessor + +from opentelemetry.instrumentation.crewai import CrewAIInstrumentor + +tracer_provider = trace_sdk.TracerProvider() +tracer_provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter())) +tracer_provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter())) + +# CRITICAL: Register the tracer provider globally so it can be flushed +trace.set_tracer_provider(tracer_provider) + +CrewAIInstrumentor().instrument(tracer_provider=tracer_provider) + +### this execution will take some time to run +result = financial_trading_crew.kickoff(inputs=financial_trading_inputs) + + +# ============================================================================ +# Splunk Trace Wireframe - Traces Only (No Metrics) +# ============================================================================ +# Sequential Process Trace Structure: +# gen_ai.workflow crew +# ├── gen_ai.step (Data Analysis) +# │ └── invoke_agent Data Analyst +# │ ├── chat (OpenAI) ← NEW! LLM call +# │ │ └── gen_ai.choice +# │ ├── chat (OpenAI) ← NEW! Another LLM call +# │ ├── tool Search the internet +# │ └── tool Read website content +# ├── gen_ai.step (Strategy Development) +# │ └── invoke_agent Trading Strategy Developer +# │ ├── chat (OpenAI) ← NEW! LLM calls visible +# │ └── tool Search the internet +# ├── gen_ai.step (Execution Planning) +# │ └── invoke_agent Trade Advisor +# │ └── chat (OpenAI) ← NEW! +# └── gen_ai.step (Risk Assessment) +# └── invoke_agent Risk Advisor +# ├── chat (OpenAI) ← NEW! +# └── tool Read website content +# ============================================================================ \ No newline at end of file diff --git a/instrumentation-genai/opentelemetry-instrumentation-crewai/examples/manual/requirements.txt b/instrumentation-genai/opentelemetry-instrumentation-crewai/examples/manual/requirements.txt new file mode 100644 index 0000000..d11ce6d --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-crewai/examples/manual/requirements.txt @@ -0,0 +1,31 @@ +# Core CrewAI dependencies +crewai>=0.70.0 +crewai-tools>=0.12.0 + +# OpenAI +openai>=1.0.0 + +# OpenTelemetry core packages +opentelemetry-api>=1.38.0 +opentelemetry-sdk>=1.38.0 +opentelemetry-exporter-otlp-proto-http>=1.38.0 +opentelemetry-exporter-otlp-proto-grpc>=1.38.0 +opentelemetry-instrumentation>=0.59b0 +opentelemetry-semantic-conventions>=0.59b0 + +# OpenTelemetry instrumentations for LLM providers +opentelemetry-instrumentation-openai>=0.30.0 + +# Splunk GenAI utilities and emitters +splunk-otel-util-genai>=0.1.4 +splunk-otel-genai-emitters-splunk +splunk-otel-util-genai-evals +splunk-otel-genai-evals-deepeval>=0.1.6 + +# DeepEval for evaluations +deepeval>=3.0.0 + +# Other dependencies +pydantic>=2.0.0 +python-dotenv>=1.0.0 + diff --git a/instrumentation-genai/opentelemetry-instrumentation-crewai/examples/manual/researcher_writer_manager.py b/instrumentation-genai/opentelemetry-instrumentation-crewai/examples/manual/researcher_writer_manager.py new file mode 100644 index 0000000..3cd9335 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-crewai/examples/manual/researcher_writer_manager.py @@ -0,0 +1,78 @@ +from crewai import Agent, Crew, Task, Process +# Disable CrewAI's built-in telemetry +import os +os.environ["CREWAI_DISABLE_TELEMETRY"] = "true" +os.environ["OPENAI_MODEL_NAME"] = 'gpt-5-mini' + +# Manager agent coordinates the team +manager = Agent( + role="Project Manager", + goal="Coordinate team efforts and ensure project success", + backstory="Experienced project manager skilled at delegation and quality control", + allow_delegation=True, + verbose=True +) + +# Specialist agents +researcher = Agent( + role="Researcher", + goal="Provide accurate research and analysis", + backstory="Expert researcher with deep analytical skills", + allow_delegation=False, # Specialists focus on their expertise + verbose=True +) + +writer = Agent( + role="Writer", + goal="Create compelling content", + backstory="Skilled writer who creates engaging content", + allow_delegation=False, + verbose=True +) + +# Manager-led task +project_task = Task( + description="Create a comprehensive market analysis report with recommendations", + expected_output="Executive summary, detailed analysis, and strategic recommendations", + agent=manager # Manager will delegate to specialists +) + +# Hierarchical crew +crew = Crew( + agents=[manager, researcher, writer], + tasks=[project_task], + process=Process.hierarchical, # Manager coordinates everything + manager_llm="gpt-4o", # Specify LLM for manager + verbose=True +) + +from opentelemetry import trace +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter +from opentelemetry.sdk import trace as trace_sdk +from opentelemetry.sdk.trace.export import ConsoleSpanExporter, SimpleSpanProcessor, BatchSpanProcessor + +tracer_provider = trace_sdk.TracerProvider() +tracer_provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter())) +tracer_provider.add_span_processor(SimpleSpanProcessor(ConsoleSpanExporter())) +# CRITICAL: Register the tracer provider globally so it can be flushed +trace.set_tracer_provider(tracer_provider) + +from opentelemetry.instrumentation.crewai import CrewAIInstrumentor + +CrewAIInstrumentor().instrument(tracer_provider=tracer_provider) + +crew.kickoff() + +# ============================================================================ +# Trace Wireframe - Hierarchical CrewAI with Manager Delegation +# ============================================================================ +# gen_ai.workflow crew 1m2.249844s +# └── gen_ai.step Create a comprehensive market analysis report with recommendations 1m2.238044s +# └── invoke_agent Crew Manager 1m2.237179s +# ├── tool Ask question to coworker 7.328951s +# │ └── invoke_agent Project Manager 7.326713s +# ├── tool Delegate work to coworker 11.406297s +# │ └── invoke_agent Project Manager 11.401578s +# └── tool Delegate work to coworker 6.136559s +# └── invoke_agent Project Manager 6.130725s +# ============================================================================ \ No newline at end of file diff --git a/instrumentation-genai/opentelemetry-instrumentation-crewai/examples/zero-code/.env b/instrumentation-genai/opentelemetry-instrumentation-crewai/examples/zero-code/.env new file mode 100644 index 0000000..bf7cdc2 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-crewai/examples/zero-code/.env @@ -0,0 +1,11 @@ +CREWAI_DISABLE_TELEMETRY=true +DEEPEVAL_TELEMETRY_OPT_OUT="YES" +OTEL_SERVICE_NAME=customer-support-crew-zero-code +OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317 +OTEL_TRACES_EXPORTER=otlp +OTEL_METRICS_EXPORTER=otlp +OTEL_LOG_LEVEL=info +OTEL_EXPORTER_OTLP_PROTOCOL=grpc +PYTHONUNBUFFERED=1 +SERPER_API_KEY= +OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT=true diff --git a/instrumentation-genai/opentelemetry-instrumentation-crewai/examples/zero-code/Dockerfile b/instrumentation-genai/opentelemetry-instrumentation-crewai/examples/zero-code/Dockerfile new file mode 100644 index 0000000..69f1520 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-crewai/examples/zero-code/Dockerfile @@ -0,0 +1,40 @@ +FROM python:3.12-slim + +WORKDIR /app + +# Install git for pip dependencies +RUN apt-get update && apt-get install -y --no-install-recommends \ + git \ + && rm -rf /var/lib/apt/lists/* + +# Copy only the CrewAI instrumentation package and zero-code example +COPY instrumentation-genai/opentelemetry-instrumentation-crewai /app/opentelemetry-instrumentation-crewai + +# Set working directory to zero-code example +WORKDIR /app/opentelemetry-instrumentation-crewai/examples/zero-code + +# Install Python dependencies (including genai utils from PyPI) +RUN pip install --no-cache-dir -r requirements.txt + +# Install local CrewAI instrumentation package with instruments extras +RUN pip install --no-cache-dir /app/opentelemetry-instrumentation-crewai[instruments] + +# Verify packages are installed correctly +RUN python3 -c "from opentelemetry.instrumentation.crewai import CrewAIInstrumentor; print('✓ CrewAI instrumentation available')" && \ + python3 -c "from opentelemetry.util.genai.handler import get_telemetry_handler; print('✓ GenAI handler available (from PyPI)')" && \ + python3 -c "import opentelemetry.instrumentation; print('✓ OpenTelemetry instrumentation available')" + +# Copy and make the run script executable +COPY instrumentation-genai/opentelemetry-instrumentation-crewai/examples/zero-code/run.sh /app/run.sh +RUN chmod +x /app/run.sh + +# Set default environment variables +ENV OTEL_PYTHON_LOG_CORRELATION=true \ + OTEL_PYTHON_LOG_LEVEL=info \ + OTEL_EXPORTER_OTLP_PROTOCOL=grpc \ + PYTHONUNBUFFERED=1 \ + CREWAI_DISABLE_TELEMETRY=true + +# Use wrapper script for proper telemetry flushing +CMD ["/app/run.sh"] + diff --git a/instrumentation-genai/opentelemetry-instrumentation-crewai/examples/zero-code/README.md b/instrumentation-genai/opentelemetry-instrumentation-crewai/examples/zero-code/README.md new file mode 100644 index 0000000..bf47e83 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-crewai/examples/zero-code/README.md @@ -0,0 +1,145 @@ +# CrewAI Zero-Code Instrumentation Example + +This example demonstrates **zero-code instrumentation** of CrewAI applications using `opentelemetry-instrument` with no code changes required. + +## Prerequisites + +1. **OpenAI API Key** - Required for LLM calls +2. **OTel Collector** (optional) - For sending telemetry to backends + +## Setup + +```bash +cd instrumentation-genai/opentelemetry-instrumentation-crewai/examples/zero-code + +# 1. Install dependencies +pip install -r requirements.txt + +# 2. Install CrewAI instrumentation (from local source during development) +pip install -e ../../[instruments] + +# 3. Configure environment variables +cp env.example .env +# Edit .env and add your OPENAI_API_KEY +``` + +## Configuration (.env) + +Create a `.env` file with: + +```bash +# OpenAI API Key (required) +OPENAI_API_KEY=your-openai-api-key-here + +# OpenTelemetry Configuration +OTEL_SERVICE_NAME=crewai-zero-code +OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317 +OTEL_EXPORTER_OTLP_PROTOCOL=grpc + +# Enable metrics (required for gen_ai.agent.duration, gen_ai.workflow.duration) +OTEL_INSTRUMENTATION_GENAI_EMITTERS=span_metric + +# Disable CrewAI built-in telemetry (recommended) +CREWAI_DISABLE_TELEMETRY=true + +# OpenAI Model +OPENAI_MODEL_NAME=gpt-4o-mini +``` + +## Run with Console Output + +```bash +# Using python-dotenv to load .env file +dotenv run -- opentelemetry-instrument \ + --traces_exporter console \ + python customer_support.py +``` + +## Run with OTLP Exporter + +```bash +# Ensure OTel collector is running on localhost:4317 + +dotenv run -- opentelemetry-instrument \ + --traces_exporter otlp \ + --metrics_exporter otlp \ + python customer_support.py +``` + +## What Gets Instrumented + +✅ **CrewAI** - Workflows, tasks, agents, tools +✅ **OpenAI** - LLM calls, token usage, embeddings +✅ **ChromaDB** - Memory queries/updates (when `memory=True`) +✅ **HTTP** - Web scraping and external API calls + +## Expected Trace Structure + +``` +gen_ai.workflow crew +├── gen_ai.step Task 1 (Support Inquiry) +│ └── invoke_agent Senior Support Representative +│ ├── chroma.query (memory retrieval) +│ ├── embeddings text-embedding-3-small +│ ├── chat gpt-4o-mini (LLM reasoning) +│ └── tool Read website content +│ └── GET https://docs.crewai.com/... +└── gen_ai.step Task 2 (QA Review) + └── invoke_agent Support QA Specialist + ├── chroma.query (memory retrieval) + ├── embeddings text-embedding-3-small + └── chat gpt-4o-mini (LLM review) +``` + +## Key Environment Variables + +| Variable | Description | Default | +|----------|-------------|---------| +| `OPENAI_API_KEY` | OpenAI API key (**required**) | - | +| `OTEL_SERVICE_NAME` | Service name in traces | `unknown_service` | +| `OTEL_EXPORTER_OTLP_ENDPOINT` | OTLP endpoint URL | `http://localhost:4317` | +| `OTEL_INSTRUMENTATION_GENAI_EMITTERS` | Enable metrics (`span_metric`) | `span` | +| `CREWAI_DISABLE_TELEMETRY` | Disable CrewAI telemetry | `false` | +| `OPENAI_MODEL_NAME` | Default OpenAI model | `gpt-4o-mini` | + +## Metrics Generated + +When `OTEL_INSTRUMENTATION_GENAI_EMITTERS=span_metric`: + +- `gen_ai.workflow.duration` - Total crew execution time +- `gen_ai.agent.duration` - Per-agent execution time +- `gen_ai.client.token.usage` - Token counts per LLM call +- `gen_ai.client.operation.duration` - LLM call latency + +## Troubleshooting + +**"Attempting to instrument while already instrumented"** +Normal warning, safe to ignore. Means auto-instrumentation is working correctly. + +**No traces appearing in console?** +1. Verify you're using `--traces_exporter console` +2. Check that `OPENAI_API_KEY` is set correctly +3. Enable debug logging: `export OTEL_LOG_LEVEL=debug` + +**No metrics appearing?** +Ensure `OTEL_INSTRUMENTATION_GENAI_EMITTERS=span_metric` is set in your `.env` file. + +**OTel collector connection refused?** +Verify your collector is running: +```bash +docker run -p 4317:4317 otel/opentelemetry-collector +``` + +## Production Deployment + +For production, use PyPI packages: + +```bash +pip install splunk-otel-instrumentation-crewai[instruments] +pip install opentelemetry-distro opentelemetry-exporter-otlp +``` + +Then run with: +```bash +opentelemetry-instrument python your_crewai_app.py +``` diff --git a/instrumentation-genai/opentelemetry-instrumentation-crewai/examples/zero-code/cronjob.yaml b/instrumentation-genai/opentelemetry-instrumentation-crewai/examples/zero-code/cronjob.yaml new file mode 100644 index 0000000..56c7216 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-crewai/examples/zero-code/cronjob.yaml @@ -0,0 +1,159 @@ +apiVersion: batch/v1 +kind: CronJob +metadata: + name: customer-support-crew-zero-code + namespace: o11y-4-ai-admehra + labels: + app: customer-support-crew-zero-code + component: telemetry + annotations: + description: "Customer support CrewAI with zero-code OpenTelemetry instrumentation" + git-commit: "d9b2968" +spec: + # Run every 4 hours from 8 AM to 4 PM PST on weekdays (Monday-Friday) + # Times in PST: 8am, 12pm, 4pm + schedule: "0 8,12,16 * * 1-5" + timeZone: "America/Los_Angeles" + suspend: false + + # Keep last 3 successful and 1 failed job for debugging + successfulJobsHistoryLimit: 3 + failedJobsHistoryLimit: 1 + + jobTemplate: + metadata: + labels: + app: customer-support-crew-zero-code + component: telemetry + spec: + template: + metadata: + labels: + app: customer-support-crew-zero-code + component: telemetry + spec: + restartPolicy: OnFailure + + containers: + - name: customer-support-crew-zero-code + image: admehra621/customer-support-crew-zero-code:latest + imagePullPolicy: Always + + env: + # === GenAI Semantic Conventions (REQUIRED) === + - name: OTEL_SEMCONV_STABILITY_OPT_IN + value: "gen_ai_latest_experimental" + + # === OpenTelemetry Resource Attributes === + - name: OTEL_RESOURCE_ATTRIBUTES + value: "deployment.environment=o11y-inframon-ai,git.commit.id=d9b2968" + + # === Service name for telemetry === + - name: OTEL_SERVICE_NAME + value: "customer-support-crew-zero-code" + + # === OpenAI Configuration === + - name: OPENAI_API_KEY + valueFrom: + secretKeyRef: + name: openai-credentials + key: api-key + + - name: OPENAI_MODEL_NAME + value: "gpt-4o-mini" + + # === Serper API Key for web search === + - name: SERPER_API_KEY + valueFrom: + secretKeyRef: + name: openai-credentials + key: serper-api-key + + # === CrewAI Configuration === + - name: CREWAI_DISABLE_TELEMETRY + value: "true" + + # === Deepeval Telemetry Opt-Out === + - name: DEEPEVAL_TELEMETRY_OPT_OUT + value: "YES" + + # === GenAI Content Capture === + - name: OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT + value: "true" + + - name: OTEL_INSTRUMENTATION_GENAI_CAPTURE_MESSAGE_CONTENT_MODE + value: "SPAN_AND_EVENT" + + # === GenAI Emitters Configuration === + - name: OTEL_INSTRUMENTATION_GENAI_EMITTERS + value: "span_metric_event,splunk" + + - name: OTEL_INSTRUMENTATION_GENAI_EMITTERS_EVALUATION + value: "replace-category:SplunkEvaluationResults" + + # === Evaluation Settings === + - name: OTEL_INSTRUMENTATION_GENAI_EVALS_RESULTS_AGGREGATION + value: "true" + + # === OpenTelemetry Logs Exporter === + - name: OTEL_LOGS_EXPORTER + value: "otlp" + + # === Get the host IP for Splunk OTEL agent === + - name: SPLUNK_OTEL_AGENT + valueFrom: + fieldRef: + fieldPath: status.hostIP + + # === OpenTelemetry OTLP endpoint using Splunk agent === + - name: OTEL_EXPORTER_OTLP_ENDPOINT + value: "http://$(SPLUNK_OTEL_AGENT):4317" + + # === OTLP Protocol (grpc) === + - name: OTEL_EXPORTER_OTLP_PROTOCOL + value: "grpc" + + # === Exclude health check URLs === + - name: OTEL_PYTHON_EXCLUDED_URLS + value: "^(https?://)?[^/]+(/)?$" + + # === Traces Sampler Configuration === + - name: OTEL_TRACES_SAMPLER + value: "parentbased_traceidratio" + + - name: OTEL_TRACES_SAMPLER_ARG + value: "1.0" + + # === Enable Python logging auto instrumentation === + - name: OTEL_PYTHON_LOGGING_AUTO_INSTRUMENTATION_ENABLED + value: "true" + + # === Enable log correlation === + - name: OTEL_PYTHON_LOG_CORRELATION + value: "true" + + # === Enable CrewAI content capture === + - name: OTEL_INSTRUMENTATION_CREWAI_CAPTURE_MESSAGE_CONTENT + value: "true" + + # === Enable Splunk profiler === + - name: SPLUNK_PROFILER_ENABLED + value: "true" + + # === Unbuffered Python output === + - name: PYTHONUNBUFFERED + value: "1" + + # === GenAI evaluation sampling rate === + - name: OTEL_GENAI_EVALUATION_SAMPLING_RATE + value: "1" + + # === Resource limits === + resources: + requests: + memory: "512Mi" + cpu: "500m" + limits: + memory: "1Gi" + cpu: "1000m" + diff --git a/instrumentation-genai/opentelemetry-instrumentation-crewai/examples/zero-code/customer_support.py b/instrumentation-genai/opentelemetry-instrumentation-crewai/examples/zero-code/customer_support.py new file mode 100644 index 0000000..1663e21 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-crewai/examples/zero-code/customer_support.py @@ -0,0 +1,124 @@ +import os + +# Set environment before any other imports +os.environ["CREWAI_DISABLE_TELEMETRY"] = "true" +os.environ["OPENAI_MODEL_NAME"] = 'gpt-4o-mini' +os.environ["OTEL_INSTRUMENTATION_GENAI_EMITTERS"] = "span_metric" + +# Now import CrewAI +from crewai import Agent, Task, Crew +from crewai_tools import ScrapeWebsiteTool + +support_agent = Agent( + role="Senior Support Representative", + goal="Be the most friendly and helpful " + "support representative in your team", + backstory=( + "You work at crewAI (https://crewai.com) and " + " are now working on providing " + "support to {customer}, a super important customer " + " for your company." + "You need to make sure that you provide the best support!" + "Make sure to provide full complete answers, " + " and make no assumptions." + ), + allow_delegation=False, + verbose=False +) + +# By not setting allow_delegation=False, allow_delegation takes its default value of being True. +# This means the agent can delegate its work to another agent which is better suited to do a particular task. + + +support_quality_assurance_agent = Agent( + role="Support Quality Assurance Specialist", + goal="Get recognition for providing the " + "best support quality assurance in your team", + backstory=( + "You work at crewAI (https://crewai.com) and " + "are now working with your team " + "on a request from {customer} ensuring that " + "the support representative is " + "providing the best support possible.\n" + "You need to make sure that the support representative " + "is providing full" + "complete answers, and make no assumptions." + ), + verbose=False +) + +docs_scrape_tool = ScrapeWebsiteTool( + website_url="https://docs.crewai.com/en/concepts/crews" +) + +# You are passing the Tool on the Task Level +inquiry_resolution = Task( + description=( + "{customer} just reached out with a super important ask:\n" + "{inquiry}\n\n" + "{person} from {customer} is the one that reached out. " + "Make sure to use everything you know " + "to provide the best support possible." + "You must strive to provide a complete " + "and accurate response to the customer's inquiry." + ), + expected_output=( + "A detailed, informative response to the " + "customer's inquiry that addresses " + "all aspects of their question.\n" + "The response should include references " + "to everything you used to find the answer, " + "including external data or solutions. " + "Ensure the answer is complete, " + "leaving no questions unanswered, and maintain a helpful and friendly " + "tone throughout." + ), + tools=[docs_scrape_tool], + agent=support_agent, +) + +# quality_assurance_review is not using any Tool(s) +# Here the QA Agent will only review the work of the Support Agent +quality_assurance_review = Task( + description=( + "Review the response drafted by the Senior Support Representative for {customer}'s inquiry. " + "Ensure that the answer is comprehensive, accurate, and adheres to the " + "high-quality standards expected for customer support.\n" + "Verify that all parts of the customer's inquiry " + "have been addressed " + "thoroughly, with a helpful and friendly tone.\n" + "Check for references and sources used to " + " find the information, " + "ensuring the response is well-supported and " + "leaves no questions unanswered." + ), + expected_output=( + "A final, detailed, and informative response " + "ready to be sent to the customer.\n" + "This response should fully address the " + "customer's inquiry, incorporating all " + "relevant feedback and improvements.\n" + "Don't be too formal, we are a chill and cool company " + "but maintain a professional and friendly tone throughout." + ), + agent=support_quality_assurance_agent, +) + +# Setting memory=True when putting the crew together enables Memory +crew = Crew( + agents=[support_agent, support_quality_assurance_agent], + tasks=[inquiry_resolution, quality_assurance_review], + verbose=False, + memory=True +) + +inputs = { + "customer": "Splunk Olly for AI", + "person": "Aditya Mehra", + "inquiry": "I need help with setting up a Crew " + "and kicking it off, specifically " + "how can I add memory to my crew? " + "Can you provide guidance?" +} + +result = crew.kickoff(inputs=inputs) diff --git a/instrumentation-genai/opentelemetry-instrumentation-crewai/examples/zero-code/env.example b/instrumentation-genai/opentelemetry-instrumentation-crewai/examples/zero-code/env.example new file mode 100644 index 0000000..43003fd --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-crewai/examples/zero-code/env.example @@ -0,0 +1,17 @@ +# OpenAI API Key (required) +OPENAI_API_KEY=your-openai-api-key-here + +# OpenTelemetry Configuration +OTEL_SERVICE_NAME=crewai-zero-code +OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4317 +OTEL_EXPORTER_OTLP_PROTOCOL=grpc + +# Enable metrics (required for gen_ai.agent.duration, gen_ai.workflow.duration) +OTEL_INSTRUMENTATION_GENAI_EMITTERS=span_metric + +# Disable CrewAI built-in telemetry (recommended) +CREWAI_DISABLE_TELEMETRY=true + +# OpenAI Model +OPENAI_MODEL_NAME=gpt-4o-mini + diff --git a/instrumentation-genai/opentelemetry-instrumentation-crewai/examples/zero-code/requirements.txt b/instrumentation-genai/opentelemetry-instrumentation-crewai/examples/zero-code/requirements.txt new file mode 100644 index 0000000..56c3765 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-crewai/examples/zero-code/requirements.txt @@ -0,0 +1,34 @@ +# Core CrewAI dependencies +crewai>=0.70.0 +crewai-tools>=0.12.0 + +# OpenAI +openai>=1.0.0 + +# OpenTelemetry core packages +opentelemetry-api>=1.38.0 +opentelemetry-sdk>=1.38.0 +opentelemetry-exporter-otlp-proto-http>=1.38.0 +opentelemetry-exporter-otlp-proto-grpc>=1.38.0 +opentelemetry-instrumentation>=0.59b0 +opentelemetry-semantic-conventions>=0.59b0 + +# Splunk GenAI utilities and emitters +splunk-otel-util-genai>=0.1.4 +splunk-otel-genai-emitters-splunk +splunk-otel-util-genai-evals +splunk-otel-genai-evals-deepeval>=0.1.6 + +# OpenTelemetry Distro (for zero-code instrumentation) +opentelemetry-distro>=0.59b0 + +# DeepEval for evaluations +deepeval>=3.0.0 + +# Other dependencies +pydantic>=2.0.0 +python-dotenv>=1.0.0 + +opentelemetry-instrumentation-openai +opentelemetry-instrumentation-chromadb +opentelemetry-instrumentation-requests diff --git a/instrumentation-genai/opentelemetry-instrumentation-crewai/examples/zero-code/run.sh b/instrumentation-genai/opentelemetry-instrumentation-crewai/examples/zero-code/run.sh new file mode 100644 index 0000000..ce13968 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-crewai/examples/zero-code/run.sh @@ -0,0 +1,23 @@ +#!/bin/bash +set -e + +echo "[INIT] Starting zero-code instrumented CrewAI application" +echo "[INIT] Service: $OTEL_SERVICE_NAME" +echo "[INIT] Endpoint: $OTEL_EXPORTER_OTLP_ENDPOINT" +echo "" + +# Run with opentelemetry-instrument (zero-code instrumentation) +opentelemetry-instrument python3 customer_support.py + +EXIT_CODE=$? + +# Give time for final telemetry export +echo "" +echo "[FLUSH] Waiting for telemetry export to complete..." +sleep 5 + +echo "[FLUSH] Telemetry export complete" +echo "[EXIT] Application exited with code: $EXIT_CODE" + +exit $EXIT_CODE + diff --git a/instrumentation-genai/opentelemetry-instrumentation-crewai/pyproject.toml b/instrumentation-genai/opentelemetry-instrumentation-crewai/pyproject.toml new file mode 100644 index 0000000..c9ab105 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-crewai/pyproject.toml @@ -0,0 +1,68 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "splunk-otel-instrumentation-crewai" +dynamic = ["version"] +description = "OpenTelemetry CrewAI instrumentation" +readme = "README.rst" +license = "Apache-2.0" +requires-python = ">=3.9" +authors = [ + { name = "Splunk", email = "o11y-gdi@splunk.com" }, +] +classifiers = [ + "Development Status :: 4 - Beta", + "Intended Audience :: Developers", + "License :: OSI Approved :: Apache Software License", + "Programming Language :: Python", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", +] +dependencies = [ + "opentelemetry-api ~= 1.38.0.dev0", + "opentelemetry-instrumentation ~= 0.59b0.dev0", + "opentelemetry-semantic-conventions ~= 0.59b0.dev0", + "splunk-otel-util-genai>=0.1.4", + "wrapt >= 1.14.0, < 2.0.0", +] + +[project.optional-dependencies] +instruments = [ + "crewai >= 0.70.0", +] +test = [ + "crewai >= 0.70.0", + "crewai-tools >= 0.12.0", + "pytest >= 7.0.0", + "pytest-cov >= 4.0.0", +] + +[project.entry-points.opentelemetry_instrumentor] +crewai = "opentelemetry.instrumentation.crewai:CrewAIInstrumentor" + +[project.urls] +Homepage = "https://github.com/signalfx/splunk-otel-python-contrib/tree/main/instrumentation-genai/opentelemetry-instrumentation-crewai" +Repository = "https://github.com/signalfx/splunk-otel-python-contrib" + +[tool.hatch.version] +path = "src/opentelemetry/instrumentation/crewai/version.py" + +[tool.hatch.build.targets.sdist] +include = [ + "/src", + "/tests", +] + +[tool.hatch.build.targets.wheel] +packages = ["src/opentelemetry"] + +[tool.ruff] +exclude = [ + "./", +] + diff --git a/instrumentation-genai/opentelemetry-instrumentation-crewai/src/opentelemetry/instrumentation/crewai/__init__.py b/instrumentation-genai/opentelemetry-instrumentation-crewai/src/opentelemetry/instrumentation/crewai/__init__.py new file mode 100644 index 0000000..255b63f --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-crewai/src/opentelemetry/instrumentation/crewai/__init__.py @@ -0,0 +1,14 @@ +""" +OpenTelemetry CrewAI Instrumentation + +Wrapper-based instrumentation for CrewAI using splunk-otel-util-genai. +""" + +from opentelemetry.instrumentation.crewai.instrumentation import CrewAIInstrumentor +from opentelemetry.instrumentation.crewai.version import __version__ + +__all__ = [ + "CrewAIInstrumentor", + "__version__" +] + diff --git a/instrumentation-genai/opentelemetry-instrumentation-crewai/src/opentelemetry/instrumentation/crewai/instrumentation.py b/instrumentation-genai/opentelemetry-instrumentation-crewai/src/opentelemetry/instrumentation/crewai/instrumentation.py new file mode 100644 index 0000000..2d55741 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-crewai/src/opentelemetry/instrumentation/crewai/instrumentation.py @@ -0,0 +1,401 @@ +""" +OpenTelemetry CrewAI Instrumentation + +Wrapper-based instrumentation for CrewAI using splunk-otel-util-genai. +""" + +import contextvars +from typing import Collection, Optional + +from wrapt import wrap_function_wrapper +from opentelemetry.instrumentation.utils import unwrap +from opentelemetry.instrumentation.instrumentor import BaseInstrumentor +from opentelemetry.util.genai.handler import TelemetryHandler +from opentelemetry.util.genai.types import ( + Workflow, + AgentInvocation, + Step, + ToolCall, + Error, +) + +_instruments = ("crewai >= 0.70.0",) + +# Global handler instance (singleton) +_handler: Optional[TelemetryHandler] = None + +# Context variable to track parent run IDs for nested operations +_current_run_id: contextvars.ContextVar[Optional[str]] = contextvars.ContextVar( + "crewai_current_run_id", default=None +) + + +class CrewAIInstrumentor(BaseInstrumentor): + """ + OpenTelemetry instrumentation for CrewAI using splunk-otel-util-genai. + + This instrumentor provides standardized telemetry for CrewAI workflows, + agents, tasks, and tool executions. + + Note: LLM calls are NOT instrumented here. Use opentelemetry-instrumentation-openai + or other provider-specific instrumentations for LLM observability. + """ + + def instrumentation_dependencies(self) -> Collection[str]: + return _instruments + + def _instrument(self, **kwargs): + """Apply instrumentation to CrewAI components.""" + global _handler + + # Initialize TelemetryHandler with tracer provider + tracer_provider = kwargs.get("tracer_provider") + if not tracer_provider: + from opentelemetry import trace + tracer_provider = trace.get_tracer_provider() + + meter_provider = kwargs.get("meter_provider") + if not meter_provider: + from opentelemetry import metrics + meter_provider = metrics.get_meter_provider() + + _handler = TelemetryHandler(tracer_provider=tracer_provider, meter_provider=meter_provider) + + # Crew.kickoff -> Workflow + wrap_function_wrapper( + "crewai.crew", + "Crew.kickoff", + _wrap_crew_kickoff + ) + + # Agent.execute_task -> AgentInvocation + wrap_function_wrapper( + "crewai.agent", + "Agent.execute_task", + _wrap_agent_execute_task + ) + + # Task.execute_sync -> Step + wrap_function_wrapper( + "crewai.task", + "Task.execute_sync", + _wrap_task_execute + ) + + # BaseTool.run -> ToolCall + wrap_function_wrapper( + "crewai.tools.base_tool", + "BaseTool.run", + _wrap_tool_run + ) + + # CrewStructuredTool.invoke -> ToolCall (for @tool decorated functions) + wrap_function_wrapper( + "crewai.tools.structured_tool", + "CrewStructuredTool.invoke", + _wrap_structured_tool_invoke + ) + + def _uninstrument(self, **kwargs): + """Remove instrumentation from CrewAI components.""" + unwrap("crewai.crew.Crew", "kickoff") + unwrap("crewai.agent.Agent", "execute_task") + unwrap("crewai.task.Task", "execute_sync") + unwrap("crewai.tools.base_tool.BaseTool", "run") + unwrap("crewai.tools.structured_tool.CrewStructuredTool", "invoke") + + +def _wrap_crew_kickoff(wrapped, instance, args, kwargs): + """ + Wrap Crew.kickoff to create a Workflow span. + + Maps to: Workflow type from splunk-otel-util-genai + """ + try: + handler = _handler + parent_run_id = _current_run_id.get() + + # Create workflow invocation + workflow = Workflow( + name=getattr(instance, "name", None) or "CrewAI Workflow", + workflow_type="crewai.crew", + parent_run_id=parent_run_id, + framework="crewai", + system="crewai", + ) + + inputs = kwargs.get("inputs", {}) + if inputs: + workflow.initial_input = str(inputs)[:500] + + # Start the workflow + handler.start_workflow(workflow) + + # Set as current run ID for child operations + token = _current_run_id.set(str(workflow.run_id)) + except Exception: + # If instrumentation setup fails, just run the original function + return wrapped(*args, **kwargs) + + try: + result = wrapped(*args, **kwargs) + + # Capture result information + try: + if result: + if hasattr(result, "raw"): + workflow.final_output = str(result.raw)[:1000] + + # Stop the workflow successfully + handler.stop_workflow(workflow) + except Exception: + # Ignore instrumentation errors on success path + pass + + return result + except Exception as exc: + # Wrapped function failed - record error and end span + try: + handler.fail(workflow, Error(message=str(exc), type=type(exc))) + except Exception: + pass + raise + finally: + # Restore previous run ID context + try: + _current_run_id.reset(token) + except Exception: + pass + + +def _wrap_agent_execute_task(wrapped, instance, args, kwargs): + """ + Wrap Agent.execute_task to create an AgentInvocation span. + + Maps to: AgentInvocation type from splunk-otel-util-genai + """ + try: + handler = _handler + parent_run_id = _current_run_id.get() + + # Create agent invocation + agent_invocation = AgentInvocation( + name=getattr(instance, "role", "Unknown Agent"), + parent_run_id=parent_run_id, + framework="crewai", + system="crewai", + ) + + # Capture task description as input context + task = kwargs.get("task") + if task and hasattr(task, "description"): + agent_invocation.input_context = task.description[:500] + + # Start the agent invocation + handler.start_agent(agent_invocation) + + # Set as current run ID for child operations + token = _current_run_id.set(str(agent_invocation.run_id)) + except Exception: + # If instrumentation setup fails, just run the original function + return wrapped(*args, **kwargs) + + try: + result = wrapped(*args, **kwargs) + + # Capture result and metrics + try: + if result: + agent_invocation.output_result = str(result)[:1000] + + # Extract token usage if available + if hasattr(instance, "_token_process"): + try: + token_summary = instance._token_process.get_summary() + if hasattr(token_summary, "prompt_tokens"): + agent_invocation.input_tokens = token_summary.prompt_tokens + if hasattr(token_summary, "completion_tokens"): + agent_invocation.output_tokens = token_summary.completion_tokens + except Exception: + pass # Ignore token extraction errors + + # Stop the agent invocation successfully + handler.stop_agent(agent_invocation) + except Exception: + # Ignore instrumentation errors on success path + pass + + return result + except Exception as exc: + # Wrapped function failed - record error and end span + try: + handler.fail(agent_invocation, Error(message=str(exc), type=type(exc))) + except Exception: + pass + raise + finally: + # Restore previous run ID context + try: + _current_run_id.reset(token) + except Exception: + pass + + +def _wrap_task_execute(wrapped, instance, args, kwargs): + """ + Wrap Task.execute_sync to create a Step span. + + Maps to: Step type from splunk-otel-util-genai + """ + try: + handler = _handler + parent_run_id = _current_run_id.get() + + # Create step + step = Step( + name=getattr(instance, "description", None) or "Task Execution", + parent_run_id=parent_run_id, + framework="crewai", + system="crewai", + ) + + # Set step fields from task + if hasattr(instance, "description"): + step.description = instance.description[:500] + step.input_data = instance.description[:500] + if hasattr(instance, "expected_output"): + step.objective = instance.expected_output[:500] + if hasattr(instance, "agent") and hasattr(instance.agent, "role"): + step.assigned_agent = instance.agent.role + + # Start the step + handler.start_step(step) + + # Set as current run ID for child operations + token = _current_run_id.set(str(step.run_id)) + except Exception: + # If instrumentation setup fails, just run the original function + return wrapped(*args, **kwargs) + + try: + result = wrapped(*args, **kwargs) + + # Capture result + try: + if result: + step.output_data = str(result)[:1000] + + # Stop the step successfully + handler.stop_step(step) + except Exception: + # Ignore instrumentation errors on success path + pass + + return result + except Exception as exc: + # Wrapped function failed - record error and end span + try: + handler.fail(step, Error(message=str(exc), type=type(exc))) + except Exception: + pass + raise + finally: + # Restore previous run ID context + try: + _current_run_id.reset(token) + except Exception: + pass + + +def _wrap_tool_run(wrapped, instance, args, kwargs): + """ + Wrap BaseTool.run to create a ToolCall span. + + Maps to: ToolCall type from splunk-otel-util-genai + """ + try: + handler = _handler + parent_run_id = _current_run_id.get() + + # Create tool call + tool_call = ToolCall( + name=getattr(instance, "name", "unknown_tool"), + arguments=str(kwargs) if kwargs else "{}", + id=str(id(instance)), + parent_run_id=parent_run_id, + framework="crewai", + system="crewai", + ) + + # Start the tool call + handler.start_tool_call(tool_call) + except Exception: + # If instrumentation setup fails, just run the original function + return wrapped(*args, **kwargs) + + try: + result = wrapped(*args, **kwargs) + + # Stop the tool call successfully + try: + handler.stop_tool_call(tool_call) + except Exception: + # Ignore instrumentation errors on success path + pass + + return result + except Exception as exc: + # Wrapped function failed - record error and end span + try: + handler.fail(tool_call, Error(message=str(exc), type=type(exc))) + except Exception: + pass + raise + + +def _wrap_structured_tool_invoke(wrapped, instance, args, kwargs): + """ + Wrap CrewStructuredTool.invoke to create a ToolCall span. + + This handles tools created with the @tool decorator. + Maps to: ToolCall type from splunk-otel-util-genai + """ + try: + handler = _handler + parent_run_id = _current_run_id.get() + + # Create tool call + tool_call = ToolCall( + name=getattr(instance, "name", "unknown_tool"), + arguments=str(kwargs) if kwargs else "{}", + id=str(id(instance)), + parent_run_id=parent_run_id, + framework="crewai", + system="crewai", + ) + + # Start the tool call + handler.start_tool_call(tool_call) + except Exception: + # If instrumentation setup fails, just run the original function + return wrapped(*args, **kwargs) + + try: + result = wrapped(*args, **kwargs) + + # Stop the tool call successfully + try: + handler.stop_tool_call(tool_call) + except Exception: + # Ignore instrumentation errors on success path + pass + + return result + except Exception as exc: + # Wrapped function failed - record error and end span + try: + handler.fail(tool_call, Error(message=str(exc), type=type(exc))) + except Exception: + pass + raise + diff --git a/instrumentation-genai/opentelemetry-instrumentation-crewai/src/opentelemetry/instrumentation/crewai/version.py b/instrumentation-genai/opentelemetry-instrumentation-crewai/src/opentelemetry/instrumentation/crewai/version.py new file mode 100644 index 0000000..bdfd304 --- /dev/null +++ b/instrumentation-genai/opentelemetry-instrumentation-crewai/src/opentelemetry/instrumentation/crewai/version.py @@ -0,0 +1,4 @@ +"""Version information for opentelemetry-instrumentation-crewai.""" + +__version__ = "0.1.0" +