Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/usage/mini_v.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ Useful switches:

- `f1` or `?`: Show keybinding help
- `q` (or `ctrl+q`): Quit the agent
- `i` (or `ctrl+i`): Interrupt the agent immediately (works during long-running commands)
- `c`: Switch to `confirm` mode
- `y` (or `ctrl+y`): Switch to `yolo` mode
- `h` or `LEFT`: Go to previous step of the agent
Expand Down
130 changes: 127 additions & 3 deletions src/minisweagent/agents/interactive_textual.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ def __init__(self, app: "TextualAgent", *args, **kwargs):
self.app = app
super().__init__(*args, config_class=TextualAgentConfig, **kwargs)
self._current_action_from_human = False
self._agent_thread_id = None
self._current_process = None

def add_message(self, role: str, content: str, **kwargs):
super().add_message(role, content, **kwargs)
Expand All @@ -57,9 +59,21 @@ def query(self) -> dict:
self.add_message("assistant", msg["content"])
return msg
self._current_action_from_human = False
return super().query()
try:
return super().query()
except KeyboardInterrupt:
# Handle interrupt during LLM query
interruption_message = self.app.input_container.request_input(
"[bold yellow]Interrupted.[/bold yellow] [green]Type a comment/command[/green]"
).strip()
if not interruption_message:
interruption_message = "Temporary interruption caught."
raise NonTerminatingException(f"Interrupted by user: {interruption_message}")

def run(self, task: str, **kwargs) -> tuple[str, str]:
# Store the thread ID so we can inject KeyboardInterrupt into it
self._agent_thread_id = threading.get_ident()

try:
exit_status, result = super().run(task, **kwargs)
except Exception as e:
Expand All @@ -72,6 +86,19 @@ def run(self, task: str, **kwargs) -> tuple[str, str]:
self.app.call_from_thread(self.app.action_quit)
return exit_status, result

def step(self) -> dict:
"""Override step to handle interrupts like interactive.py"""
try:
return super().step()
except KeyboardInterrupt:
# Same behavior as interactive.py
interruption_message = self.app.input_container.request_input(
"[bold yellow]Interrupted.[/bold yellow] [green]Type a comment/command[/green]"
).strip()
if not interruption_message:
interruption_message = "Temporary interruption caught."
raise NonTerminatingException(f"Interrupted by user: {interruption_message}")

def execute_action(self, action: dict) -> dict:
if self.config.mode == "human" and not self._current_action_from_human: # threading, grrrrr
raise NonTerminatingException("Command not executed because user switched to manual mode.")
Expand All @@ -83,7 +110,74 @@ def execute_action(self, action: dict) -> dict:
result = self.app.input_container.request_input("Press ENTER to confirm or provide rejection reason")
if result: # Non-empty string means rejection
raise NonTerminatingException(f"Command not executed: {result}")
return super().execute_action(action)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, we can't do this like that,this completely bypasses the environment setup (we might not be running locally but in a docker container for example). I thought it might be possible to just catch the keyboardinterrupt at the same place? Take a look at how this is done for the non-visual interactive mode

# Use Popen to allow interrupting long-running commands
import subprocess
import time

try:
# Start the process
self._current_process = subprocess.Popen(
action["action"],
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
cwd=self.env.config.cwd
if hasattr(self.env, "config") and hasattr(self.env.config, "cwd") and self.env.config.cwd
else None,
env=dict(self.env.config.env)
if hasattr(self.env, "config") and hasattr(self.env.config, "env")
else None,
)

# Poll the process with timeout
timeout = (
self.env.config.timeout if hasattr(self.env, "config") and hasattr(self.env.config, "timeout") else 30
)
start_time = time.time()

while self._current_process.poll() is None:
if time.time() - start_time > timeout:
self._current_process.terminate()
try:
self._current_process.wait(timeout=1)
except subprocess.TimeoutExpired:
self._current_process.kill()
self._current_process.wait()

output = self._current_process.stdout.read() if self._current_process.stdout else ""
raise subprocess.TimeoutExpired(action["action"], timeout, output=output.encode())

time.sleep(0.05) # Poll every 50ms

# Get output
output = self._current_process.stdout.read() if self._current_process.stdout else ""
returncode = self._current_process.returncode

result = {"output": output, "returncode": returncode}
self.has_finished(result)
return result

except KeyboardInterrupt:
# Terminate the process if it's still running
if self._current_process and self._current_process.poll() is None:
self._current_process.terminate()
try:
self._current_process.wait(timeout=1)
except subprocess.TimeoutExpired:
self._current_process.kill()
self._current_process.wait()

# Handle interrupt during command execution
interruption_message = self.app.input_container.request_input(
"[bold yellow]Interrupted.[/bold yellow] [green]Type a comment/command[/green]"
).strip()
if not interruption_message:
interruption_message = "Temporary interruption caught."
raise NonTerminatingException(f"Interrupted by user: {interruption_message}")
finally:
self._current_process = None

def has_finished(self, output: dict[str, str]):
try:
Expand Down Expand Up @@ -248,6 +342,7 @@ class TextualAgent(App):
Binding("j,down", "scroll_down", "Scroll down", show=False),
Binding("k,up", "scroll_up", "Scroll up", show=False),
Binding("q,ctrl+q", "quit", "Quit", tooltip="Quit the agent"),
Binding("i,ctrl+i", "interrupt", "INTERRUPT", tooltip="Interrupt the agent and provide feedback"),
Binding("y,ctrl+y", "yolo", "YOLO mode", tooltip="Switch to YOLO Mode (LM actions will execute immediately)"),
Binding(
"c",
Expand Down Expand Up @@ -277,7 +372,12 @@ def __init__(self, model, env, **kwargs):
self._vscroll = VerticalScroll()

def run(self, task: str, **kwargs) -> tuple[str, str]:
threading.Thread(target=lambda: self.agent.run(task, **kwargs), daemon=True).start()
def agent_run_wrapper():
# Store the thread ID so we can inject KeyboardInterrupt into it
self.agent._agent_thread_id = threading.get_ident()
return self.agent.run(task, **kwargs)

threading.Thread(target=agent_run_wrapper, daemon=True).start()
super().run()
return self.exit_status, self.result

Expand Down Expand Up @@ -425,6 +525,30 @@ def action_confirm(self):
self.agent.config.mode = "confirm"
self.notify("Confirm mode enabled - LM proposes commands and you confirm/reject them")

def action_interrupt(self):
"""Interrupt the agent by raising KeyboardInterrupt in its thread."""
import ctypes

if self.agent_state == "RUNNING" and self.agent._agent_thread_id is not None:
# First, terminate any running subprocess
if self.agent._current_process and self.agent._current_process.poll() is None:
self.agent._current_process.terminate()

# Then inject KeyboardInterrupt into the agent thread
ret = ctypes.pythonapi.PyThreadState_SetAsyncExc(
ctypes.c_long(self.agent._agent_thread_id), ctypes.py_object(KeyboardInterrupt)
)
if ret == 0:
self.notify("Failed to interrupt - thread not found")
elif ret > 1:
# If more than one thread affected, undo it
ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(self.agent._agent_thread_id), None)
self.notify("Failed to interrupt - multiple threads affected")
elif self.agent_state == "AWAITING_INPUT":
self.notify("Already awaiting input - use the input field to provide feedback")
else:
self.notify("Cannot interrupt - agent is not running")

def action_next_step(self) -> None:
self.i_step += 1

Expand Down
132 changes: 132 additions & 0 deletions tests/agents/test_interactive_textual.py
Original file line number Diff line number Diff line change
Expand Up @@ -870,3 +870,135 @@ async def test_system_commands_are_callable():
assert callable(command.callback), (
f"Command '{command.title}' has non-callable callback: {command.callback}"
)


@pytest.mark.slow
async def test_interrupt_agent_run():
"""Test interrupting an agent run and providing feedback using KeyboardInterrupt."""
# Use 3 sleep commands to ensure we have time to interrupt
app = TextualAgent(
model=DeterministicModel(
outputs=[
"/sleep 0.5",
"/sleep 0.5",
"/sleep 0.5",
"Step 1\n```bash\necho 'step1'\n```",
"Step 2\n```bash\necho 'COMPLETE_TASK_AND_SUBMIT_FINAL_OUTPUT'\n```",
]
),
env=LocalEnvironment(),
mode="yolo",
)

async with app.run_test() as pilot:
# Start the agent with the task
threading.Thread(target=lambda: app.agent.run("Interrupt test"), daemon=True).start()

# Wait for agent to start running and have a thread ID
for _ in range(20):
await pilot.pause(0.05)
if app.agent._agent_thread_id is not None:
break

# Ensure we have a thread ID
assert app.agent._agent_thread_id is not None

# Wait a bit to ensure the agent is in the middle of sleeping
await pilot.pause(0.2)

# The agent might be RUNNING or might have already moved to next state
# Either way, we should be able to interrupt
# Request interrupt using the key binding
# This will inject KeyboardInterrupt into the agent thread
await pilot.press("i")

# Wait for the interrupt to be processed
# The KeyboardInterrupt should be raised and caught in step()
for _ in range(50):
await pilot.pause(0.1)
if app.agent_state == "AWAITING_INPUT" and "Interrupted" in get_screen_text(app):
break
else:
raise AssertionError("Agent did not show interrupt prompt within 5 seconds")

# Should show interrupt prompt
assert "Interrupted" in get_screen_text(app)
assert "Type a comment/command" in get_screen_text(app)

# Provide feedback
await type_text(pilot, "Please use a different approach")
await pilot.press("enter")
await pilot.pause(0.3)

# Navigate through steps to find the interrupt message
# The interrupt message should be in one of the earlier steps
found_interrupt_message = False
for step in range(app.n_steps):
app.i_step = step
if "Interrupted by user: Please use a different approach" in get_screen_text(app):
found_interrupt_message = True
break

assert found_interrupt_message, f"Could not find interrupt message in any of {app.n_steps} steps"
assert app.agent_state in ["RUNNING", "STOPPED", "AWAITING_INPUT"]


async def test_interrupt_when_not_running():
"""Test that interrupt shows appropriate message when agent is not running."""
app = TextualAgent(
model=DeterministicModel(outputs=["Test\n```bash\necho 'COMPLETE_TASK_AND_SUBMIT_FINAL_OUTPUT'\n```"]),
env=LocalEnvironment(),
mode="yolo",
confirm_exit=False, # Disable exit confirmation so agent stops immediately
)

async with app.run_test() as pilot:
# Start the agent
threading.Thread(target=lambda: app.agent.run("Test"), daemon=True).start()

# Wait for agent to finish
for _ in range(50):
await pilot.pause(0.1)
if app.agent_state == "STOPPED":
break

# Verify agent is stopped
assert app.agent_state == "STOPPED"

# Try to interrupt when agent is stopped
# The action should just show a notification
await pilot.press("i")
await pilot.pause(0.1)

# Agent should still be stopped (no interrupt was injected)
assert app.agent_state == "STOPPED"


async def test_interrupt_when_awaiting_input():
"""Test that interrupt shows appropriate message when already awaiting input."""
app = TextualAgent(
model=DeterministicModel(outputs=["Test\n```bash\necho 'test'\n```"]),
env=LocalEnvironment(),
mode="confirm",
)

async with app.run_test() as pilot:
# Start the agent
threading.Thread(target=lambda: app.agent.run("Test"), daemon=True).start()

# Wait for agent to request input
for _ in range(50):
await pilot.pause(0.1)
if app.agent_state == "AWAITING_INPUT":
break

# Verify agent is awaiting input
assert app.agent_state == "AWAITING_INPUT"

# Try to interrupt when already awaiting input
await pilot.press("escape") # Unfocus from input
await pilot.press("i")
await pilot.pause(0.1)

# Agent should still be awaiting input (no interrupt was injected)
assert app.agent_state == "AWAITING_INPUT"
Loading