Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import asyncio
from collections.abc import Generator
from pathlib import Path
from typing import Any
Expand Down Expand Up @@ -45,12 +46,20 @@ def client(self) -> httpx.AsyncClient:
self._client = client
return client

async def _execute(self, generator: Generator[dict[str, Any], httpx.Response, Any]):
async def _execute(self, generator: Generator[dict[str, Any], Any, Any]):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general Any is not great, should this be httpx.Response | None?

try:
kwargs = next(generator)
while True:
response = await self.client.request(**kwargs)
kwargs = generator.send(response)
# Check if this is a sleep request
if "_sleep" in kwargs:
sleep_duration = kwargs["_sleep"]
await asyncio.sleep(sleep_duration)
# Send None back to the generator after sleeping
kwargs = generator.send(None)
else:
# Regular HTTP request
response = await self.client.request(**kwargs)
kwargs = generator.send(response)
except StopIteration as e:
return e.value

Expand Down
15 changes: 12 additions & 3 deletions openhands-sdk/openhands/sdk/workspace/remote/base.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import time
from collections.abc import Generator
from pathlib import Path
from typing import Any
Expand Down Expand Up @@ -60,12 +61,20 @@ def client(self) -> httpx.Client:
self._client = client
return client

def _execute(self, generator: Generator[dict[str, Any], httpx.Response, Any]):
def _execute(self, generator: Generator[dict[str, Any], Any, Any]):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto

try:
kwargs = next(generator)
while True:
response = self.client.request(**kwargs)
kwargs = generator.send(response)
# Check if this is a sleep request
if "_sleep" in kwargs:
sleep_duration = kwargs["_sleep"]
time.sleep(sleep_duration)
# Send None back to the generator after sleeping
kwargs = generator.send(None)
else:
# Regular HTTP request
response = self.client.request(**kwargs)
kwargs = generator.send(response)
except StopIteration as e:
return e.value

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@

_logger = logging.getLogger(__name__)

# Polling configuration
POLL_INTERVAL_SECONDS = 0.1


class RemoteWorkspaceMixin(BaseModel):
"""Mixin providing remote workspace operations.
Expand Down Expand Up @@ -43,12 +46,16 @@ def _execute_command_generator(
command: str,
cwd: str | Path | None,
timeout: float,
) -> Generator[dict[str, Any], httpx.Response, CommandResult]:
) -> Generator[dict[str, Any], httpx.Response | None, CommandResult]:
"""Execute a bash command on the remote system.

This method starts a bash command via the remote agent server API,
then polls for the output until the command completes.

The generator yields either:
- HTTP request dicts (with "method", "url", etc.) for making requests
- Sleep request dicts (with "_sleep" key) for waiting between polls

Args:
command: The bash command to execute
cwd: Working directory (optional)
Expand All @@ -69,13 +76,14 @@ def _execute_command_generator(

try:
# Start the command
response: httpx.Response = yield {
response: httpx.Response | None = yield {
"method": "POST",
"url": f"{self.host}/api/bash/start_bash_command",
"json": payload,
"headers": self._headers,
"timeout": timeout + 5.0, # Add buffer to HTTP timeout
}
assert response is not None
response.raise_for_status()
bash_command = response.json()
command_id = bash_command["id"]
Expand All @@ -84,9 +92,9 @@ def _execute_command_generator(

# Step 2: Poll for output until command completes
start_time = time.time()
stdout_parts = []
stderr_parts = []
exit_code = None
stdout_parts: list[str] = []
stderr_parts: list[str] = []
exit_code: int | None = None

while time.time() - start_time < timeout:
# Search for all events
Expand All @@ -101,6 +109,7 @@ def _execute_command_generator(
"headers": self._headers,
"timeout": timeout,
}
assert response is not None
response.raise_for_status()
search_result = response.json()

Expand All @@ -118,8 +127,9 @@ def _execute_command_generator(
if exit_code is not None:
break

# Wait a bit before polling again
time.sleep(0.1)
# Yield a sleep request - the executor will handle this appropriately
# (sync sleep for sync executor, async sleep for async executor)
yield {"_sleep": POLL_INTERVAL_SECONDS}

# If we timed out waiting for completion
if exit_code is None:
Expand Down
30 changes: 30 additions & 0 deletions tests/sdk/workspace/remote/test_async_remote_workspace.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,36 @@ def test_generator():
mock_client.request.assert_called_once_with(method="GET", url="http://test.com")


@pytest.mark.asyncio
@patch("openhands.sdk.workspace.remote.async_remote_workspace.asyncio.sleep")
async def test_async_execute_handles_sleep_requests(mock_async_sleep):
"""Test _execute method handles sleep requests with asyncio.sleep."""
workspace = AsyncRemoteWorkspace(
host="http://localhost:8000", working_dir="workspace"
)

# Mock async client
mock_client = AsyncMock()
mock_response = Mock()
mock_client.request.return_value = mock_response
workspace._client = mock_client

# Create a generator that yields a sleep request between HTTP requests
def test_generator():
yield {"method": "GET", "url": "http://test1.com"}
yield {"_sleep": 0.1} # Sleep request
yield {"method": "GET", "url": "http://test2.com"}
return "test_result"

result = await workspace._execute(test_generator())

assert result == "test_result"
# Verify asyncio.sleep was called with the correct duration
mock_async_sleep.assert_called_once_with(0.1)
# Verify HTTP requests were made
assert mock_client.request.call_count == 2


@pytest.mark.asyncio
@patch(
"openhands.sdk.workspace.remote.async_remote_workspace.AsyncRemoteWorkspace._execute"
Expand Down
42 changes: 25 additions & 17 deletions tests/sdk/workspace/remote/test_remote_workspace_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -141,16 +141,15 @@ def test_execute_command_generator_with_path_cwd():
assert start_kwargs["json"]["cwd"] == "/tmp/test"


@patch("time.sleep")
@patch("time.time")
def test_execute_command_generator_polling_loop(mock_time, mock_sleep):
"""Test _execute_command_generator polling loop behavior."""
@patch("openhands.sdk.workspace.remote.remote_workspace_mixin.time")
def test_execute_command_generator_polling_loop(mock_time):
"""Test _execute_command_generator polling loop behavior with sleep requests."""
mixin = RemoteWorkspaceMixinHelper(
host="http://localhost:8000", working_dir="workspace"
)

# Mock time progression
mock_time.side_effect = [0, 0.1, 0.2, 0.3] # Simulate time passing
mock_time.time.side_effect = [0, 0.1, 0.2, 0.3] # Simulate time passing

# Mock responses
start_response = Mock()
Expand Down Expand Up @@ -185,13 +184,20 @@ def test_execute_command_generator_polling_loop(mock_time, mock_sleep):
# Start command
next(generator)

# First poll
generator.send(start_response)
# First poll request
poll_kwargs_1 = generator.send(start_response)
assert poll_kwargs_1["method"] == "GET"

# First poll response - no exit code, should yield sleep request
sleep_request = generator.send(poll_response_1)
assert "_sleep" in sleep_request
assert sleep_request["_sleep"] == 0.1

# Second poll
generator.send(poll_response_1)
# After sleep, should yield second poll request
poll_kwargs_2 = generator.send(None) # Send None after sleep
assert poll_kwargs_2["method"] == "GET"

# Final result
# Second poll response - has exit code, should complete
try:
generator.send(poll_response_2)
assert False, "Generator should have stopped"
Expand All @@ -200,9 +206,6 @@ def test_execute_command_generator_polling_loop(mock_time, mock_sleep):
assert result.stdout == "processing...\ndone\n"
assert result.exit_code == 0

# Verify sleep was called between polls
mock_sleep.assert_called_with(0.1)


@patch("openhands.sdk.workspace.remote.remote_workspace_mixin.time")
def test_execute_command_generator_timeout(mock_time):
Expand All @@ -211,7 +214,8 @@ def test_execute_command_generator_timeout(mock_time):
host="http://localhost:8000", working_dir="workspace"
)

# Mock time to simulate timeout
# Mock time to simulate timeout after first poll
# time.time() is called: at start, before first poll, after first poll (timeout)
mock_time.time.side_effect = [
0,
0,
Expand Down Expand Up @@ -241,12 +245,16 @@ def test_execute_command_generator_timeout(mock_time):
# Start command
next(generator)

# Poll once
# First poll request
generator.send(start_response)

# Send poll response and get timeout result
# First poll response - no exit code, yields sleep request
sleep_request = generator.send(poll_response)
assert "_sleep" in sleep_request

# After sleep, time check shows timeout - should return timeout result
try:
generator.send(poll_response)
generator.send(None) # Send None after sleep
assert False, "Generator should have stopped"
except StopIteration as e:
result = e.value
Expand Down
Loading