Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ dependencies = [
# go/keep-sorted start
"PyYAML>=6.0.2, <7.0.0", # For APIHubToolset.
"aiosqlite>=0.21.0", # For SQLite database
"agentic_sandbox @ git+https://github.com/kubernetes-sigs/agent-sandbox.git@dbac66ecba5497ac493ca5e4ab5e0fcb1c945134#subdirectory=clients/python/agentic-sandbox-client", # For Agent Sandboxed Code Execution
"anyio>=4.9.0, <5.0.0", # For MCP Session Manager
"authlib>=1.5.1, <2.0.0", # For RestAPI Tool
"click>=8.1.8, <9.0.0", # For CLI tools
Expand Down
54 changes: 47 additions & 7 deletions src/google/adk/code_executors/gke_code_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import logging
import uuid

from agentic_sandbox import SandboxClient

import kubernetes as k8s
from kubernetes.watch import Watch

Expand Down Expand Up @@ -70,6 +72,8 @@ class GkeCodeExecutor(BaseCodeExecutor):
namespace: str = "default"
image: str = "python:3.11-slim"
timeout_seconds: int = 300
executor_type: str = "job" # "job" or "sandbox"
sandbox_gateway_name: str | None = None
cpu_requested: str = "200m"
mem_requested: str = "256Mi"
# The maximum CPU the container can use, in "millicores". 1000m is 1 full CPU core.
Expand All @@ -78,12 +82,17 @@ class GkeCodeExecutor(BaseCodeExecutor):

kubeconfig_path: str | None = None
kubeconfig_context: str | None = None

# Sandbox constants
python_sandbox_template: str = "python-sandbox-template"

_batch_v1: k8s.client.BatchV1Api
_core_v1: k8s.client.CoreV1Api

def __init__(
self,
executor_type: str = "job",
sandbox_gateway_name: str | None = None,
kubeconfig_path: str | None = None,
kubeconfig_context: str | None = None,
**data,
Expand All @@ -96,6 +105,8 @@ def __init__(
3. Automatically via the default local kubeconfig file (~/.kube/config).
"""
super().__init__(**data)
self.executor_type = executor_type
self.sandbox_gateway_name = sandbox_gateway_name
self.kubeconfig_path = kubeconfig_path
self.kubeconfig_context = kubeconfig_context

Expand Down Expand Up @@ -136,11 +147,26 @@ def __init__(
self._batch_v1 = client.BatchV1Api()
self._core_v1 = client.CoreV1Api()

def execute_code(
self,
invocation_context: InvocationContext,
code_execution_input: CodeExecutionInput,
) -> CodeExecutionResult:
def _execute_in_sandbox(self, code: str) -> CodeExecutionResult:
"""Executes code using Agent Sandbox Client."""
try:
with SandboxClient(
template_name=self.python_sandbox_template,
gateway_name=self.sandbox_gateway_name,
namespace=self.namespace
) as sandbox:
# Execute the code as a python script
logger.debug("Executing code in sandbox:\n```\n%s\n```", code)
sandbox.write("script.py", code)
result = sandbox.run("python3 script.py")

return CodeExecutionResult(stdout=result.stdout)
except Exception as e:
return CodeExecutionResult(
stderr=f"Sandbox execution failed: {str(e)}",
)

def _execute_as_job(self, code: str, invocation_context: InvocationContext) -> CodeExecutionResult:
"""Orchestrates the secure execution of a code snippet on GKE."""
job_name = f"adk-exec-{uuid.uuid4().hex[:10]}"
configmap_name = f"code-src-{job_name}"
Expand All @@ -150,7 +176,7 @@ def execute_code(
# 1. Create a ConfigMap to mount LLM-generated code into the Pod.
# 2. Create a Job that runs the code from the ConfigMap.
# 3. Set the Job as the ConfigMap's owner for automatic cleanup.
self._create_code_configmap(configmap_name, code_execution_input.code)
self._create_code_configmap(configmap_name, code)
job_manifest = self._create_job_manifest(
job_name, configmap_name, invocation_context
)
Expand All @@ -162,7 +188,7 @@ def execute_code(
logger.info(
f"Submitted Job '{job_name}' to namespace '{self.namespace}'."
)
logger.debug("Executing code:\n```\n%s\n```", code_execution_input.code)
logger.debug("Executing code:\n```\n%s\n```", code)
return self._watch_job_completion(job_name)

except ApiException as e:
Expand All @@ -186,6 +212,20 @@ def execute_code(
stderr=f"An unexpected executor error occurred: {e}"
)

def execute_code(
self,
invocation_context: InvocationContext,
code_execution_input: CodeExecutionInput,
) -> CodeExecutionResult:

"""Overrides the base method to route execution based on executor_type."""
code = code_execution_input.code
if self.executor_type == "sandbox":
return self._execute_in_sandbox(code)
else:
# Fallback to existing GKE Job logic
return self._execute_as_job(code, invocation_context)

def _create_job_manifest(
self,
job_name: str,
Expand Down
77 changes: 77 additions & 0 deletions tests/unittests/code_executors/test_gke_code_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def test_init_defaults(self):
assert executor.timeout_seconds == 300
assert executor.cpu_requested == "200m"
assert executor.mem_limit == "512Mi"
assert executor.executor_type == "job"

def test_init_with_overrides(self):
"""Tests that class attributes can be overridden at instantiation."""
Expand All @@ -79,11 +80,13 @@ def test_init_with_overrides(self):
image="custom-python:latest",
timeout_seconds=60,
cpu_limit="1000m",
executor_type="sandbox",
)
assert executor.namespace == "test-ns"
assert executor.image == "custom-python:latest"
assert executor.timeout_seconds == 60
assert executor.cpu_limit == "1000m"
assert executor.executor_type == "sandbox"

@patch("google.adk.code_executors.gke_code_executor.Watch")
def test_execute_code_success(
Expand Down Expand Up @@ -225,3 +228,77 @@ def test_create_job_manifest_structure(self, mock_invocation_context):
assert sec_context.allow_privilege_escalation is False
assert sec_context.read_only_root_filesystem is True
assert sec_context.capabilities.drop == ["ALL"]

@patch("google.adk.code_executors.gke_code_executor.SandboxClient")
def test_execute_code_forks_to_sandbox(
self,
mock_sandbox_client,
mock_invocation_context,
mock_k8s_clients,
):
"""Tests that execute_code uses SandboxClient when executor_type='sandbox'."""
# Setup Sandbox mock
mock_sandbox_instance = (
mock_sandbox_client.return_value.__enter__.return_value
)
mock_run_result = MagicMock()
mock_run_result.stdout = "sandbox stdout"
mock_run_result.stderr = None
mock_sandbox_instance.run.return_value = mock_run_result

# Instantiate with sandbox type
executor = GkeCodeExecutor(executor_type="sandbox")
code_input = CodeExecutionInput(code='print("sandbox")')

# Execute
result = executor.execute_code(mock_invocation_context, code_input)

# Assertions
assert result.stdout == "sandbox stdout"

# Verify SandboxClient was used
mock_sandbox_client.assert_called_once()
mock_sandbox_instance.run.assert_called_once()

# Verify Job path was NOT taken
mock_k8s_clients["batch_v1"].create_namespaced_job.assert_not_called()

@patch("google.adk.code_executors.gke_code_executor.SandboxClient")
@patch("google.adk.code_executors.gke_code_executor.Watch")
def test_execute_code_forks_to_job(
self,
mock_watch,
mock_sandbox_client,
mock_invocation_context,
mock_k8s_clients,
):
"""Tests that execute_code uses K8s Job when executor_type='job'."""
# Setup K8s Job mocks (success path)
mock_job = MagicMock()
mock_job.status.succeeded = True
mock_watch.return_value.stream.return_value = [{"object": mock_job}]

mock_pod = MagicMock()
mock_pod.metadata.name = "pod-1"
mock_k8s_clients["core_v1"].list_namespaced_pod.return_value.items = [
mock_pod
]
mock_k8s_clients["core_v1"].read_namespaced_pod_log.return_value = (
"job stdout"
)

# Instantiate with job type
executor = GkeCodeExecutor(executor_type="job")
code_input = CodeExecutionInput(code='print("job")')

# Execute
result = executor.execute_code(mock_invocation_context, code_input)

# Assertions
assert result.stdout == "job stdout"

# Verify Job path WAS taken
mock_k8s_clients["batch_v1"].create_namespaced_job.assert_called_once()

# Verify SandboxClient was NOT used
mock_sandbox_client.assert_not_called()