diff --git a/modules/tool-recipes/amplifier_module_tool_recipes/__init__.py b/modules/tool-recipes/amplifier_module_tool_recipes/__init__.py index 2d301f7..7ca7ac4 100644 --- a/modules/tool-recipes/amplifier_module_tool_recipes/__init__.py +++ b/modules/tool-recipes/amplifier_module_tool_recipes/__init__.py @@ -242,6 +242,7 @@ def description(self) -> str: - approvals: List pending approvals across sessions - approve: Approve a stage to continue execution - deny: Deny a stage to stop execution +- cancel: Cancel a running recipe session (graceful or immediate) Example: Execute recipe: {{"operation": "execute", "recipe_path": "@recipes:examples/code-review.yaml", "context": {{"file_path": "src/auth.py"}}}} @@ -250,7 +251,8 @@ def description(self) -> str: Validate recipe: {{"operation": "validate", "recipe_path": "@recipes:examples/my-recipe.yaml"}} List approvals: {{"operation": "approvals"}} Approve stage: {{"operation": "approve", "session_id": "...", "stage_name": "planning"}} - Deny stage: {{"operation": "deny", "session_id": "...", "stage_name": "planning", "reason": "needs revision"}}""" + Deny stage: {{"operation": "deny", "session_id": "...", "stage_name": "planning", "reason": "needs revision"}} + Cancel recipe: {{"operation": "cancel", "session_id": "...", "immediate": false}}""" @property def input_schema(self) -> dict: @@ -267,6 +269,7 @@ def input_schema(self) -> dict: "approvals", "approve", "deny", + "cancel", ], "description": "Operation to perform", }, @@ -280,7 +283,7 @@ def input_schema(self) -> dict: }, "session_id": { "type": "string", - "description": "Session ID (required for 'resume', 'approve', 'deny' operations)", + "description": "Session ID (required for 'resume', 'approve', 'deny', 'cancel' operations)", }, "stage_name": { "type": "string", @@ -290,6 +293,10 @@ def input_schema(self) -> dict: "type": "string", "description": "Reason for denial (optional for 'deny' operation)", }, + "immediate": { + "type": "boolean", + "description": "If true, request immediate cancellation (don't wait for current step). For 'cancel' operation.", + }, }, "required": ["operation"], } @@ -321,6 +328,8 @@ async def execute(self, input: dict[str, Any]) -> ToolResult: return await self._approve_stage(input) if operation == "deny": return await self._deny_stage(input) + if operation == "cancel": + return await self._cancel_recipe(input) return ToolResult( success=False, error={"message": f"Unknown operation: {operation}"}, @@ -750,3 +759,79 @@ async def _deny_stage(self, input: dict[str, Any]) -> ToolResult: success=False, error={"message": f"Failed to deny stage: {str(e)}"}, ) + + async def _cancel_recipe(self, input: dict[str, Any]) -> ToolResult: + """Cancel a running recipe session. + + First cancellation request triggers graceful cancellation (complete current step). + Second request (or immediate=True) triggers immediate cancellation. + Cancelled sessions can be resumed later. + """ + session_id = input.get("session_id") + immediate = input.get("immediate", False) + + if not session_id: + return ToolResult( + success=False, + error={"message": "session_id is required for cancel operation"}, + ) + + project_path = Path.cwd() + + # Verify session exists + if not self.session_manager.session_exists(session_id, project_path): + return ToolResult( + success=False, + error={"message": f"Session not found: {session_id}"}, + ) + + # Check current cancellation status + from .session import CancellationStatus + + current_status = self.session_manager.get_cancellation_status( + session_id, project_path + ) + + if current_status == CancellationStatus.CANCELLED: + return ToolResult( + success=False, + error={ + "message": f"Session already cancelled: {session_id}. Use 'resume' to restart.", + }, + ) + + # Request cancellation + success, message = self.session_manager.request_cancellation( + session_id, project_path, immediate=immediate + ) + + if not success: + return ToolResult( + success=False, + error={"message": message}, + ) + + # Determine the cancellation level + new_status = self.session_manager.get_cancellation_status( + session_id, project_path + ) + level = ( + "immediate" if new_status == CancellationStatus.IMMEDIATE else "graceful" + ) + + return ToolResult( + success=True, + output={ + "status": "cancellation_requested", + "session_id": session_id, + "level": level, + "message": message, + "next_steps": ( + "Recipe will stop immediately." + if level == "immediate" + else "Recipe will stop after current step completes. " + "Send another cancel request (or use immediate=true) for immediate cancellation." + ), + "resume_info": "Use 'resume' operation to restart the recipe from where it stopped.", + }, + ) diff --git a/modules/tool-recipes/amplifier_module_tool_recipes/executor.py b/modules/tool-recipes/amplifier_module_tool_recipes/executor.py index 42053d2..b5a1beb 100644 --- a/modules/tool-recipes/amplifier_module_tool_recipes/executor.py +++ b/modules/tool-recipes/amplifier_module_tool_recipes/executor.py @@ -52,6 +52,30 @@ def __init__(self, session_id: str, stage_name: str, approval_prompt: str): super().__init__(f"Execution paused at stage '{stage_name}' awaiting approval") +class CancellationRequestedError(Exception): + """Raised when cancellation is requested and execution should stop. + + This is similar to ApprovalGatePausedError - it signals that execution + has been interrupted, but in this case due to a cancellation request. + The recipe can be resumed later from the last checkpoint. + """ + + def __init__( + self, + session_id: str, + is_immediate: bool, + current_step: str | None = None, + message: str | None = None, + ): + self.session_id = session_id + self.is_immediate = is_immediate + self.current_step = current_step + level = "immediate" if is_immediate else "graceful" + step_info = f" at step '{current_step}'" if current_step else "" + self.message = message or f"Recipe {session_id} cancellation ({level}){step_info}" + super().__init__(self.message) + + @dataclass class RecursionState: """Track recursion across nested recipe executions.""" @@ -222,6 +246,70 @@ def _show_progress(self, message: str, level: str = "info") -> None: if display_system is not None: display_system.show_message(message=message, level=level, source="recipe") + def _check_cancellation( + self, + session_id: str, + project_path: Path, + current_step: str | None = None, + allow_graceful_completion: bool = False, + ) -> None: + """Check if cancellation requested and raise if so. + + This method should be called at loop boundaries (before each step, + before each loop iteration, etc.) to enable responsive cancellation. + + Args: + session_id: Current session identifier + project_path: Project path for session lookup + current_step: Current step ID for error context + allow_graceful_completion: If True, only raise on IMMEDIATE cancellation. + Use this when a step is in progress and should + be allowed to complete for graceful cancellation. + + Raises: + CancellationRequestedError: If cancellation has been requested + """ + if not self.session_manager.is_cancellation_requested(session_id, project_path): + return + + is_immediate = self.session_manager.is_immediate_cancellation(session_id, project_path) + + # Graceful cancellation allows current step to complete + if allow_graceful_completion and not is_immediate: + return + + raise CancellationRequestedError( + session_id=session_id, + is_immediate=is_immediate, + current_step=current_step, + ) + + def _check_coordinator_cancellation( + self, + session_id: str, + project_path: Path, + ) -> None: + """Check if coordinator has cancellation requested (e.g., from SIGINT). + + This integrates with amplifier-core's CancellationToken, allowing + cancellation signals from the CLI (Ctrl+C) to propagate to recipes. + + Args: + session_id: Current session identifier + project_path: Project path for session lookup + """ + # Check if coordinator has a cancellation token + cancellation = getattr(self.coordinator, "cancellation", None) + if cancellation is None: + return + + if not cancellation.is_cancelled: + return + + # Propagate coordinator cancellation to session state + is_immediate = cancellation.is_immediate + self.session_manager.request_cancellation(session_id, project_path, immediate=is_immediate) + async def execute_recipe( self, recipe: Recipe, @@ -232,6 +320,7 @@ async def execute_recipe( recursion_state: RecursionState | None = None, rate_limiter: RateLimiter | None = None, orchestrator_config: OrchestratorConfig | None = None, + parent_session_id: str | None = None, ) -> dict[str, Any]: """ Execute recipe with checkpointing and resumption. @@ -245,6 +334,7 @@ async def execute_recipe( recursion_state: Optional recursion tracking state (for nested recipes) rate_limiter: Optional rate limiter (inherited from parent recipe) orchestrator_config: Optional orchestrator config (inherited from parent recipe) + parent_session_id: Parent session ID for cancellation checks in sub-recipes Returns: Final context dict with all step outputs @@ -328,6 +418,10 @@ async def execute_recipe( completed_steps = [] session_started = datetime.datetime.now().isoformat() + # Effective session ID for cancellation checks + # For sub-recipes (session_id=None), use parent_session_id to inherit cancellation state + cancellation_session_id = session_id or parent_session_id + # Show recipe start progress total_steps = len(recipe.steps) self._show_progress(f"📋 Starting recipe: {recipe.name} ({total_steps} steps)") @@ -353,6 +447,12 @@ async def execute_recipe( for i in range(current_step_index, len(recipe.steps)): step = recipe.steps[i] + # Check for cancellation before starting each step + # Use cancellation_session_id to support both root recipes and sub-recipes + if cancellation_session_id: + self._check_coordinator_cancellation(cancellation_session_id, project_path) + self._check_cancellation(cancellation_session_id, project_path, current_step=step.id) + # Add step metadata to context context["step"] = {"id": step.id, "index": i} @@ -378,7 +478,10 @@ async def execute_recipe( # Handle foreach loops if step.foreach: try: - await self._execute_loop(step, context, project_path, recursion_state, recipe_path, rate_limiter, orchestrator_config) + await self._execute_loop( + step, context, project_path, recursion_state, recipe_path, + rate_limiter, orchestrator_config, session_id=cancellation_session_id + ) # Update completed steps and session state after loop completes completed_steps.append(step.id) state = { @@ -400,7 +503,8 @@ async def execute_recipe( try: if step.type == "recipe": result = await self._execute_recipe_step( - step, context, project_path, recursion_state, recipe_path, rate_limiter, orchestrator_config + step, context, project_path, recursion_state, recipe_path, rate_limiter, orchestrator_config, + parent_session_id=cancellation_session_id ) elif step.type == "bash": # Bash steps don't count against agent recursion limits @@ -412,7 +516,10 @@ async def execute_recipe( else: # Agent step - track for recursion limits recursion_state.increment_steps() - result = await self.execute_step_with_retry(step, context, rate_limiter, orchestrator_config) + result = await self.execute_step_with_retry( + step, context, rate_limiter, orchestrator_config, + session_id=cancellation_session_id, project_path=project_path + ) # Process result: unwrap spawn() output and optionally parse JSON result = self._process_step_result(result, step) @@ -441,6 +548,20 @@ async def execute_recipe( except SkipRemainingError: # Skip remaining steps break + except CancellationRequestedError: + # Cancellation requested - save state and re-raise + raise + + except CancellationRequestedError as e: + # Mark session as cancelled and save state for later resumption + self.session_manager.mark_cancelled( + session_id, project_path, + cancelled_at_step=e.current_step, + ) + if state is not None: + self.session_manager.save_state(session_id, project_path, state) + self._show_progress(f"⚠️ Recipe cancelled at step: {e.current_step or 'unknown'}", level="warning") + raise except Exception: # Save state even on error for resumption @@ -531,6 +652,10 @@ async def _execute_staged_recipe( for stage_idx in range(current_stage_index, len(recipe.stages)): stage = recipe.stages[stage_idx] + # Check for cancellation before starting each stage + self._check_coordinator_cancellation(session_id, project_path) + self._check_cancellation(session_id, project_path, current_step=f"stage:{stage.name}") + # Show stage progress self._show_progress(f"📦 Stage {stage_idx + 1}/{total_stages}: {stage.name}") @@ -547,6 +672,10 @@ async def _execute_staged_recipe( for step_idx in range(start_step, len(stage.steps)): step = stage.steps[step_idx] + # Check for cancellation before starting each step + self._check_coordinator_cancellation(session_id, project_path) + self._check_cancellation(session_id, project_path, current_step=step.id) + # Add step metadata to context context["step"] = {"id": step.id, "index": step_idx, "stage": stage.name} @@ -566,7 +695,10 @@ async def _execute_staged_recipe( # Handle foreach loops if step.foreach: try: - await self._execute_loop(step, context, project_path, recursion_state, recipe_path, rate_limiter, orchestrator_config) + await self._execute_loop( + step, context, project_path, recursion_state, recipe_path, + rate_limiter, orchestrator_config, session_id=session_id + ) completed_steps.append(step.id) self._save_staged_state( session_id, @@ -586,7 +718,8 @@ async def _execute_staged_recipe( try: if step.type == "recipe": result = await self._execute_recipe_step( - step, context, project_path, recursion_state, recipe_path, rate_limiter, orchestrator_config + step, context, project_path, recursion_state, recipe_path, rate_limiter, orchestrator_config, + parent_session_id=session_id ) elif step.type == "bash": # Bash steps don't count against agent recursion limits @@ -598,7 +731,10 @@ async def _execute_staged_recipe( else: # Agent step - track for recursion limits recursion_state.increment_steps() - result = await self.execute_step_with_retry(step, context, rate_limiter, orchestrator_config) + result = await self.execute_step_with_retry( + step, context, rate_limiter, orchestrator_config, + session_id=session_id, project_path=project_path + ) # Process result: unwrap spawn() output and optionally parse JSON result = self._process_step_result(result, step) @@ -620,6 +756,9 @@ async def _execute_staged_recipe( except SkipRemainingError: break + except CancellationRequestedError: + # Cancellation requested - re-raise to outer handler + raise # Stage completed - check for approval gate completed_stages.append(stage.name) @@ -656,6 +795,24 @@ async def _execute_staged_recipe( except ApprovalGatePausedError: # Re-raise approval pause (not an error) raise + except CancellationRequestedError as e: + # Mark session as cancelled and save state for later resumption + self.session_manager.mark_cancelled( + session_id, project_path, + cancelled_at_step=e.current_step, + ) + self._save_staged_state( + session_id, + project_path, + recipe, + context, + current_stage_index, + current_step_in_stage, + completed_stages, + completed_steps, + ) + self._show_progress(f"⚠️ Recipe cancelled at step: {e.current_step or 'unknown'}", level="warning") + raise except Exception: # Save state for resumption on error self._save_staged_state( @@ -711,6 +868,8 @@ async def execute_step_with_retry( context: dict[str, Any], rate_limiter: RateLimiter | None = None, orchestrator_config: OrchestratorConfig | None = None, + session_id: str | None = None, + project_path: Path | None = None, ) -> Any: """ Execute step with retry logic. @@ -720,6 +879,8 @@ async def execute_step_with_retry( context: Current context variables rate_limiter: Optional rate limiter for pacing orchestrator_config: Optional orchestrator config for spawned sessions + session_id: Session identifier for cancellation checks + project_path: Project path for cancellation checks Returns: Step result @@ -727,6 +888,7 @@ async def execute_step_with_retry( Raises: Exception if all retries fail and on_error='fail' SkipRemainingError if on_error='skip_remaining' + CancellationRequestedError if cancellation requested """ retry_config = step.retry or {} max_attempts = retry_config.get("max_attempts", 1) @@ -737,6 +899,10 @@ async def execute_step_with_retry( last_error = None for attempt in range(max_attempts): + # Check for cancellation before each attempt + if session_id and project_path: + self._check_coordinator_cancellation(session_id, project_path) + self._check_cancellation(session_id, project_path, current_step=step.id) try: # Acquire rate limiter slot if configured if rate_limiter: @@ -977,6 +1143,7 @@ async def _execute_loop( recipe_path: Path | None = None, rate_limiter: RateLimiter | None = None, orchestrator_config: OrchestratorConfig | None = None, + session_id: str | None = None, ) -> None: """ Execute a step with foreach iteration. @@ -993,10 +1160,12 @@ async def _execute_loop( project_path: Current project directory recursion_state: Recursion tracking state orchestrator_config: Optional orchestrator config for spawned sessions + session_id: Session identifier for cancellation checks Raises: ValueError: If foreach variable invalid or iteration fails SkipRemainingError: If on_error='skip_remaining' and iteration fails + CancellationRequestedError: If cancellation requested """ # Resolve foreach variable (step.foreach is guaranteed non-None by caller) assert step.foreach is not None @@ -1027,12 +1196,12 @@ async def _execute_loop( if step.parallel: # Parallel execution: run all iterations concurrently results = await self._execute_loop_parallel( - step, context, items, loop_var, project_path, recursion_state, recipe_path, rate_limiter, orchestrator_config + step, context, items, loop_var, project_path, recursion_state, recipe_path, rate_limiter, orchestrator_config, session_id ) else: # Sequential execution: run iterations one at a time results = await self._execute_loop_sequential( - step, context, items, loop_var, project_path, recursion_state, recipe_path, rate_limiter, orchestrator_config + step, context, items, loop_var, project_path, recursion_state, recipe_path, rate_limiter, orchestrator_config, session_id ) # Store results @@ -1052,18 +1221,27 @@ async def _execute_loop_sequential( recipe_path: Path | None = None, rate_limiter: RateLimiter | None = None, orchestrator_config: OrchestratorConfig | None = None, + session_id: str | None = None, ) -> list[Any]: """Execute loop iterations sequentially.""" results = [] for idx, item in enumerate(items): + # Check for cancellation before each iteration + if session_id and project_path: + self._check_coordinator_cancellation(session_id, project_path) + self._check_cancellation(session_id, project_path, current_step=f"{step.id}[{idx}]") + # Set loop variable in context context[loop_var] = item try: # Execute based on step type (agent, recipe, or bash) if step.type == "recipe": - result = await self._execute_recipe_step(step, context, project_path, recursion_state, recipe_path, rate_limiter, orchestrator_config) + result = await self._execute_recipe_step( + step, context, project_path, recursion_state, recipe_path, rate_limiter, orchestrator_config, + parent_session_id=session_id + ) elif step.type == "bash": # Bash steps don't count against agent recursion limits bash_result = await self._execute_bash_step(step, context, project_path) @@ -1074,7 +1252,10 @@ async def _execute_loop_sequential( else: # Agent step - track for recursion limits recursion_state.increment_steps() - result = await self.execute_step_with_retry(step, context, rate_limiter, orchestrator_config) + result = await self.execute_step_with_retry( + step, context, rate_limiter, orchestrator_config, + session_id=session_id, project_path=project_path + ) # Process result: unwrap spawn() output and optionally parse JSON result = self._process_step_result(result, step) @@ -1082,6 +1263,9 @@ async def _execute_loop_sequential( except SkipRemainingError: # Propagate skip_remaining raise + except CancellationRequestedError: + # Propagate cancellation + raise except Exception as e: # Fail fast - no partial completion in MVP raise ValueError(f"Step '{step.id}' iteration {idx} failed: {e}") from e @@ -1103,6 +1287,7 @@ async def _execute_loop_parallel( recipe_path: Path | None = None, rate_limiter: RateLimiter | None = None, orchestrator_config: OrchestratorConfig | None = None, + session_id: str | None = None, ) -> list[Any]: """ Execute loop iterations in parallel using asyncio.gather. @@ -1117,6 +1302,11 @@ async def _execute_loop_parallel( Rate limiting is applied via the rate_limiter if configured. """ + # Check for cancellation before starting parallel execution + if session_id and project_path: + self._check_coordinator_cancellation(session_id, project_path) + self._check_cancellation(session_id, project_path, current_step=f"{step.id}[parallel]") + # For agent steps, pre-check total steps limit (all will run in parallel) if step.type == "agent": if recursion_state.total_steps + len(items) > recursion_state.max_total_steps: @@ -1149,7 +1339,8 @@ async def execute_iteration(idx: int, item: Any) -> Any: # Execute based on step type (agent, recipe, or bash) if step.type == "recipe": result = await self._execute_recipe_step( - step, iter_context, project_path, recursion_state, recipe_path, rate_limiter, orchestrator_config + step, iter_context, project_path, recursion_state, recipe_path, rate_limiter, orchestrator_config, + parent_session_id=session_id ) elif step.type == "bash": # Bash steps don't count against agent recursion limits @@ -1160,12 +1351,17 @@ async def execute_iteration(idx: int, item: Any) -> Any: result = bash_result.stdout else: # Agent step - rate limiting handled inside execute_step_with_retry - result = await self.execute_step_with_retry(step, iter_context, rate_limiter, orchestrator_config) + result = await self.execute_step_with_retry( + step, iter_context, rate_limiter, orchestrator_config, + session_id=session_id, project_path=project_path + ) # Process result: unwrap spawn() output and optionally parse JSON return self._process_step_result(result, step) except SkipRemainingError: raise + except CancellationRequestedError: + raise except Exception as e: raise ValueError(f"Step '{step.id}' iteration {idx} failed: {e}") from e @@ -1194,6 +1390,7 @@ async def _execute_recipe_step( parent_recipe_path: Path | None = None, rate_limiter: RateLimiter | None = None, orchestrator_config: OrchestratorConfig | None = None, + parent_session_id: str | None = None, ) -> dict[str, Any]: """ Execute a recipe composition step by loading and running a sub-recipe. @@ -1206,6 +1403,7 @@ async def _execute_recipe_step( parent_recipe_path: Path to parent recipe file (for relative resolution) rate_limiter: Optional rate limiter (inherited from parent recipe) orchestrator_config: Optional orchestrator config (inherited from parent recipe) + parent_session_id: Parent's session ID for cancellation checks Returns: Sub-recipe's final context dict @@ -1253,6 +1451,7 @@ async def _execute_recipe_step( # Execute sub-recipe recursively # Note: rate_limiter and orchestrator_config are inherited from parent (sub-recipes cannot override) + # parent_session_id is passed so sub-recipes can check for cancellation result = await self.execute_recipe( recipe=sub_recipe, context_vars=sub_context, @@ -1262,6 +1461,7 @@ async def _execute_recipe_step( recursion_state=child_state, rate_limiter=rate_limiter, # Inherit parent's rate limiter orchestrator_config=orchestrator_config, # Inherit parent's orchestrator config + parent_session_id=parent_session_id, # For cancellation checks ) # Propagate total steps back to parent state diff --git a/modules/tool-recipes/amplifier_module_tool_recipes/session.py b/modules/tool-recipes/amplifier_module_tool_recipes/session.py index ea411df..77ed661 100644 --- a/modules/tool-recipes/amplifier_module_tool_recipes/session.py +++ b/modules/tool-recipes/amplifier_module_tool_recipes/session.py @@ -21,6 +21,21 @@ class ApprovalStatus(str, Enum): TIMEOUT = "timeout" # Timed out waiting for approval +class CancellationStatus(str, Enum): + """Cancellation status for a recipe session. + + State machine: + NONE -> REQUESTED (first cancel request, graceful) + REQUESTED -> IMMEDIATE (second cancel request, stop now) + REQUESTED/IMMEDIATE -> CANCELLED (execution stopped) + """ + + NONE = "none" # Running normally + REQUESTED = "requested" # Graceful cancellation requested (complete current step) + IMMEDIATE = "immediate" # Immediate cancellation requested (stop ASAP) + CANCELLED = "cancelled" # Execution stopped due to cancellation + + def generate_session_id() -> str: """Generate unique session ID following W3C Trace Context pattern. @@ -427,3 +442,136 @@ def check_approval_timeout(self, session_id: str, project_path: Path) -> Approva return ApprovalStatus.TIMEOUT return None + + # === Cancellation Methods === + + def get_cancellation_status(self, session_id: str, project_path: Path) -> CancellationStatus: + """Get current cancellation status for a session. + + Args: + session_id: Session identifier + project_path: Project directory + + Returns: + CancellationStatus for the session (NONE if session doesn't exist) + """ + if not self.session_exists(session_id, project_path): + return CancellationStatus.NONE + + try: + state = self.load_state(session_id, project_path) + status_str = state.get("cancellation_status", CancellationStatus.NONE.value) + return CancellationStatus(status_str) + except (FileNotFoundError, json.JSONDecodeError): + return CancellationStatus.NONE + + def is_cancellation_requested(self, session_id: str, project_path: Path) -> bool: + """Check if cancellation has been requested (graceful or immediate). + + Args: + session_id: Session identifier + project_path: Project directory + + Returns: + True if cancellation requested (REQUESTED or IMMEDIATE status) + """ + status = self.get_cancellation_status(session_id, project_path) + return status in (CancellationStatus.REQUESTED, CancellationStatus.IMMEDIATE) + + def is_immediate_cancellation(self, session_id: str, project_path: Path) -> bool: + """Check if immediate cancellation has been requested. + + Args: + session_id: Session identifier + project_path: Project directory + + Returns: + True if immediate cancellation requested + """ + return self.get_cancellation_status(session_id, project_path) == CancellationStatus.IMMEDIATE + + def request_cancellation( + self, + session_id: str, + project_path: Path, + immediate: bool = False, + ) -> tuple[bool, str]: + """Request cancellation of a running recipe session. + + First request triggers graceful cancellation (complete current step). + Second request (or immediate=True) triggers immediate cancellation. + + Args: + session_id: Session identifier + project_path: Project directory + immediate: If True, request immediate cancellation + + Returns: + Tuple of (success, message) - success is True if request was processed + """ + state = self.load_state(session_id, project_path) + current_status = CancellationStatus(state.get("cancellation_status", CancellationStatus.NONE.value)) + + if current_status == CancellationStatus.CANCELLED: + return False, "Session already cancelled" + + now = datetime.datetime.now().isoformat() + + if immediate or current_status == CancellationStatus.REQUESTED: + # Escalate to immediate + state["cancellation_status"] = CancellationStatus.IMMEDIATE.value + state["cancellation_requested_at"] = state.get("cancellation_requested_at", now) + state["cancellation_escalated_at"] = now + self.save_state(session_id, project_path, state) + + if current_status == CancellationStatus.REQUESTED: + return True, "Escalated to immediate cancellation" + return True, "Immediate cancellation requested" + + # First request - graceful + state["cancellation_status"] = CancellationStatus.REQUESTED.value + state["cancellation_requested_at"] = now + self.save_state(session_id, project_path, state) + return True, "Graceful cancellation requested (will complete current step)" + + def mark_cancelled( + self, + session_id: str, + project_path: Path, + cancelled_at_step: str | None = None, + error: str | None = None, + ) -> None: + """Mark a session as cancelled (execution stopped). + + Args: + session_id: Session identifier + project_path: Project directory + cancelled_at_step: Step ID where execution stopped + error: Optional error message + """ + state = self.load_state(session_id, project_path) + + was_immediate = state.get("cancellation_status") == CancellationStatus.IMMEDIATE.value + state["cancellation_status"] = CancellationStatus.CANCELLED.value + state["cancelled_at"] = datetime.datetime.now().isoformat() + state["cancelled_at_step"] = cancelled_at_step + state["cancellation_was_immediate"] = was_immediate + if error: + state["cancellation_error"] = error + + self.save_state(session_id, project_path, state) + + def clear_cancellation(self, session_id: str, project_path: Path) -> None: + """Clear cancellation status (for resuming a cancelled session). + + Args: + session_id: Session identifier + project_path: Project directory + """ + state = self.load_state(session_id, project_path) + + # Only clear if actually cancelled (not if in progress) + if state.get("cancellation_status") == CancellationStatus.CANCELLED.value: + state["cancellation_status"] = CancellationStatus.NONE.value + # Keep history fields for debugging + self.save_state(session_id, project_path, state) diff --git a/modules/tool-recipes/tests/test_cancellation.py b/modules/tool-recipes/tests/test_cancellation.py new file mode 100644 index 0000000..46dff37 --- /dev/null +++ b/modules/tool-recipes/tests/test_cancellation.py @@ -0,0 +1,474 @@ +"""Tests for recipe cancellation functionality.""" + +import asyncio +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from amplifier_module_tool_recipes.executor import ( + CancellationRequestedError, + RecipeExecutor, +) +from amplifier_module_tool_recipes.models import Recipe, Step +from amplifier_module_tool_recipes.session import CancellationStatus, SessionManager + + +class TestCancellationStatus: + """Tests for CancellationStatus enum.""" + + def test_all_statuses_exist(self): + """All expected statuses are defined.""" + assert CancellationStatus.NONE.value == "none" + assert CancellationStatus.REQUESTED.value == "requested" # Graceful cancellation + assert CancellationStatus.IMMEDIATE.value == "immediate" + assert CancellationStatus.CANCELLED.value == "cancelled" + + def test_status_from_string(self): + """Status can be created from string value.""" + assert CancellationStatus("none") == CancellationStatus.NONE + assert CancellationStatus("requested") == CancellationStatus.REQUESTED + assert CancellationStatus("immediate") == CancellationStatus.IMMEDIATE + assert CancellationStatus("cancelled") == CancellationStatus.CANCELLED + + +class TestSessionManagerCancellation: + """Tests for SessionManager cancellation methods.""" + + def test_request_graceful_cancellation(self, session_manager: SessionManager, temp_dir: Path): + """Request graceful cancellation sets correct status.""" + recipe = Recipe( + name="test", + description="test", + version="1.0.0", + steps=[Step(id="s1", agent="a", prompt="p")], + ) + session_id = session_manager.create_session(recipe, temp_dir) + + session_manager.request_cancellation(session_id, temp_dir, immediate=False) + + status = session_manager.get_cancellation_status(session_id, temp_dir) + assert status == CancellationStatus.REQUESTED # Graceful = REQUESTED + assert session_manager.is_cancellation_requested(session_id, temp_dir) + assert not session_manager.is_immediate_cancellation(session_id, temp_dir) + + def test_request_immediate_cancellation(self, session_manager: SessionManager, temp_dir: Path): + """Request immediate cancellation sets correct status.""" + recipe = Recipe( + name="test", + description="test", + version="1.0.0", + steps=[Step(id="s1", agent="a", prompt="p")], + ) + session_id = session_manager.create_session(recipe, temp_dir) + + session_manager.request_cancellation(session_id, temp_dir, immediate=True) + + status = session_manager.get_cancellation_status(session_id, temp_dir) + assert status == CancellationStatus.IMMEDIATE + assert session_manager.is_cancellation_requested(session_id, temp_dir) + assert session_manager.is_immediate_cancellation(session_id, temp_dir) + + def test_clear_cancellation(self, session_manager: SessionManager, temp_dir: Path): + """Clear cancellation resets status to NONE (only works after CANCELLED status).""" + recipe = Recipe( + name="test", + description="test", + version="1.0.0", + steps=[Step(id="s1", agent="a", prompt="p")], + ) + session_id = session_manager.create_session(recipe, temp_dir) + + # First mark as cancelled (clear only works on CANCELLED status) + session_manager.mark_cancelled(session_id, temp_dir, cancelled_at_step="step-1") + status = session_manager.get_cancellation_status(session_id, temp_dir) + assert status == CancellationStatus.CANCELLED + + session_manager.clear_cancellation(session_id, temp_dir) + + assert not session_manager.is_cancellation_requested(session_id, temp_dir) + status = session_manager.get_cancellation_status(session_id, temp_dir) + assert status == CancellationStatus.NONE + + def test_mark_cancelled(self, session_manager: SessionManager, temp_dir: Path): + """Mark cancelled sets final CANCELLED status with step info.""" + recipe = Recipe( + name="test", + description="test", + version="1.0.0", + steps=[Step(id="s1", agent="a", prompt="p")], + ) + session_id = session_manager.create_session(recipe, temp_dir) + + session_manager.mark_cancelled(session_id, temp_dir, cancelled_at_step="step-5") + + status = session_manager.get_cancellation_status(session_id, temp_dir) + assert status == CancellationStatus.CANCELLED + + # Verify step info is saved + state = session_manager.load_state(session_id, temp_dir) + assert state.get("cancelled_at_step") == "step-5" + assert "cancelled_at" in state + + def test_nonexistent_session_returns_none_status( + self, session_manager: SessionManager, temp_dir: Path + ): + """Nonexistent session returns NONE status without error.""" + status = session_manager.get_cancellation_status("nonexistent-session", temp_dir) + assert status == CancellationStatus.NONE + assert not session_manager.is_cancellation_requested("nonexistent-session", temp_dir) + + def test_upgrade_graceful_to_immediate(self, session_manager: SessionManager, temp_dir: Path): + """Can upgrade from graceful to immediate cancellation.""" + recipe = Recipe( + name="test", + description="test", + version="1.0.0", + steps=[Step(id="s1", agent="a", prompt="p")], + ) + session_id = session_manager.create_session(recipe, temp_dir) + + # Request graceful first + session_manager.request_cancellation(session_id, temp_dir, immediate=False) + assert not session_manager.is_immediate_cancellation(session_id, temp_dir) + + # Upgrade to immediate + session_manager.request_cancellation(session_id, temp_dir, immediate=True) + assert session_manager.is_immediate_cancellation(session_id, temp_dir) + + +class TestCancellationRequestedError: + """Tests for CancellationRequestedError exception.""" + + def test_exception_attributes(self): + """Exception stores all attributes correctly.""" + error = CancellationRequestedError( + session_id="test-session", + is_immediate=True, + current_step="analyze-code", + ) + + assert error.session_id == "test-session" + assert error.is_immediate is True + assert error.current_step == "analyze-code" + + def test_exception_message_graceful(self): + """Graceful cancellation message is formatted correctly.""" + error = CancellationRequestedError( + session_id="test-session", + is_immediate=False, + current_step="step-1", + ) + + assert "test-session" in str(error) + assert "graceful" in str(error) + assert "step-1" in str(error) + + def test_exception_message_immediate(self): + """Immediate cancellation message is formatted correctly.""" + error = CancellationRequestedError( + session_id="test-session", + is_immediate=True, + current_step="step-1", + ) + + assert "test-session" in str(error) + assert "immediate" in str(error) + + def test_exception_is_an_exception(self): + """CancellationRequestedError is a proper Exception subclass.""" + assert issubclass(CancellationRequestedError, Exception) + assert issubclass(CancellationRequestedError, BaseException) + + +class TestExecutorCancellation: + """Tests for RecipeExecutor cancellation behavior.""" + + @pytest.fixture + def mock_coordinator(self): + """Create a mock coordinator.""" + coordinator = MagicMock() + coordinator.session = MagicMock() + coordinator.config = {"agents": {}} + coordinator.get_capability.return_value = AsyncMock(return_value="result") + # No cancellation token by default + coordinator.cancellation = None + return coordinator + + @pytest.fixture + def real_session_manager(self, temp_dir: Path): + """Create a real session manager for cancellation tests.""" + return SessionManager(base_dir=temp_dir, auto_cleanup_days=7) + + @pytest.mark.asyncio + async def test_graceful_cancellation_before_step( + self, mock_coordinator, real_session_manager, temp_dir: Path + ): + """Graceful cancellation requested before execution raises at first step.""" + recipe = Recipe( + name="test", + description="test", + version="1.0.0", + steps=[ + Step(id="step-1", agent="a", prompt="First"), + Step(id="step-2", agent="a", prompt="Second"), + ], + ) + + # Create session and request graceful cancellation before execution + session_id = real_session_manager.create_session(recipe, temp_dir) + real_session_manager.request_cancellation(session_id, temp_dir, immediate=False) + + mock_coordinator.get_capability.return_value = AsyncMock(return_value="result") + executor = RecipeExecutor(mock_coordinator, real_session_manager) + + with pytest.raises(CancellationRequestedError) as exc_info: + await executor.execute_recipe(recipe, {}, temp_dir, session_id=session_id) + + # Should raise before executing any steps + assert not exc_info.value.is_immediate + assert exc_info.value.current_step == "step-1" + # Verify spawn was never called (cancelled before step execution) + mock_coordinator.get_capability.return_value.assert_not_called() + + @pytest.mark.asyncio + async def test_immediate_cancellation_stops_immediately( + self, mock_coordinator, real_session_manager, temp_dir: Path + ): + """Immediate cancellation stops without completing current step.""" + executor = RecipeExecutor(mock_coordinator, real_session_manager) + + recipe = Recipe( + name="test", + description="test", + version="1.0.0", + steps=[ + Step(id="step-1", agent="a", prompt="First"), + Step(id="step-2", agent="a", prompt="Second"), + ], + ) + + # Pre-create session and request immediate cancellation + session_id = real_session_manager.create_session(recipe, temp_dir) + real_session_manager.request_cancellation(session_id, temp_dir, immediate=True) + + with pytest.raises(CancellationRequestedError) as exc_info: + await executor.execute_recipe(recipe, {}, temp_dir, session_id=session_id) + + assert exc_info.value.is_immediate + assert exc_info.value.current_step == "step-1" + + @pytest.mark.asyncio + async def test_cancellation_in_foreach_loop( + self, mock_coordinator, real_session_manager, temp_dir: Path + ): + """Cancellation during foreach loop stops at iteration boundary.""" + recipe = Recipe( + name="test", + description="test", + version="1.0.0", + steps=[ + Step( + id="loop", + agent="a", + prompt="Process {{item}}", + foreach="{{items}}", + collect="results", + ), + ], + context={"items": ["a", "b", "c", "d", "e"]}, + ) + + # Pre-create session + session_id = real_session_manager.create_session(recipe, temp_dir) + iteration_count = 0 + + async def mock_spawn(*args, **kwargs): + nonlocal iteration_count + iteration_count += 1 + if iteration_count == 2: + # Cancel after second iteration + real_session_manager.request_cancellation(session_id, temp_dir, immediate=False) + return f"result-{iteration_count}" + + mock_coordinator.get_capability.return_value = AsyncMock(side_effect=mock_spawn) + executor = RecipeExecutor(mock_coordinator, real_session_manager) + + with pytest.raises(CancellationRequestedError): + await executor.execute_recipe(recipe, {}, temp_dir, session_id=session_id) + + # Should have processed 2 items before cancellation took effect + assert iteration_count == 2 + + @pytest.mark.asyncio + async def test_cancellation_saves_state_for_resumption( + self, mock_coordinator, real_session_manager, temp_dir: Path + ): + """Cancellation saves session state including cancellation info.""" + recipe = Recipe( + name="test", + description="test", + version="1.0.0", + steps=[ + Step(id="step-1", agent="a", prompt="First", output="r1"), + Step(id="step-2", agent="a", prompt="Second", output="r2"), + ], + ) + + # Create session and request immediate cancellation + session_id = real_session_manager.create_session(recipe, temp_dir) + real_session_manager.request_cancellation(session_id, temp_dir, immediate=True) + + mock_coordinator.get_capability.return_value = AsyncMock(return_value="result") + executor = RecipeExecutor(mock_coordinator, real_session_manager) + + with pytest.raises(CancellationRequestedError): + await executor.execute_recipe(recipe, {}, temp_dir, session_id=session_id) + + # Verify state was saved with cancellation info + state = real_session_manager.load_state(session_id, temp_dir) + assert state["cancellation_status"] == CancellationStatus.CANCELLED.value + assert "cancelled_at_step" in state + + +class TestCoordinatorCancellationIntegration: + """Tests for integration with coordinator's CancellationToken.""" + + @pytest.fixture + def mock_coordinator_with_cancellation(self): + """Create a mock coordinator with cancellation token.""" + coordinator = MagicMock() + coordinator.session = MagicMock() + coordinator.config = {"agents": {}} + coordinator.get_capability.return_value = AsyncMock(return_value="result") + + # Mock cancellation token + cancellation = MagicMock() + cancellation.is_cancelled = False + cancellation.is_immediate = False + coordinator.cancellation = cancellation + + return coordinator + + @pytest.fixture + def mock_session_manager(self): + """Create a mock session manager.""" + manager = MagicMock() + manager.create_session.return_value = "test-session-id" + manager.load_state.return_value = { + "current_step_index": 0, + "context": {}, + "completed_steps": [], + "started": "2025-01-01T00:00:00", + } + manager.is_cancellation_requested.return_value = False + manager.is_immediate_cancellation.return_value = False + return manager + + @pytest.mark.asyncio + async def test_coordinator_cancellation_propagates_to_session( + self, mock_coordinator_with_cancellation, mock_session_manager, temp_dir: Path + ): + """Coordinator cancellation token triggers session cancellation.""" + call_count = 0 + + async def mock_spawn(*args, **kwargs): + nonlocal call_count + call_count += 1 + if call_count == 1: + # Simulate Ctrl+C by setting coordinator cancellation + mock_coordinator_with_cancellation.cancellation.is_cancelled = True + mock_coordinator_with_cancellation.cancellation.is_immediate = False + return f"result-{call_count}" + + mock_coordinator_with_cancellation.get_capability.return_value = AsyncMock( + side_effect=mock_spawn + ) + + # After coordinator cancellation is set, session manager should report it + def check_cancellation(*args, **kwargs): + return mock_coordinator_with_cancellation.cancellation.is_cancelled + + mock_session_manager.is_cancellation_requested.side_effect = check_cancellation + + executor = RecipeExecutor(mock_coordinator_with_cancellation, mock_session_manager) + + recipe = Recipe( + name="test", + description="test", + version="1.0.0", + steps=[ + Step(id="step-1", agent="a", prompt="First"), + Step(id="step-2", agent="a", prompt="Second"), + ], + ) + + with pytest.raises(CancellationRequestedError): + await executor.execute_recipe(recipe, {}, temp_dir) + + # Verify request_cancellation was called to propagate coordinator cancellation + mock_session_manager.request_cancellation.assert_called() + + +class TestNestedRecipeCancellation: + """Tests for cancellation propagation to nested recipes.""" + + @pytest.fixture + def mock_coordinator(self): + """Create a mock coordinator.""" + coordinator = MagicMock() + coordinator.session = MagicMock() + coordinator.config = {"agents": {}} + coordinator.get_capability.return_value = AsyncMock(return_value="result") + coordinator.cancellation = None + return coordinator + + @pytest.fixture + def real_session_manager(self, temp_dir: Path): + """Create a real session manager.""" + return SessionManager(base_dir=temp_dir, auto_cleanup_days=7) + + @pytest.mark.asyncio + async def test_nested_recipe_inherits_parent_cancellation( + self, mock_coordinator, real_session_manager, temp_dir: Path + ): + """Nested recipe inherits cancellation from parent session.""" + # Create sub-recipe file + sub_recipe_content = """ +name: sub-recipe +description: A sub-recipe +version: 1.0.0 +steps: + - id: sub-step-1 + agent: test-agent + prompt: "Sub step 1" +""" + sub_recipe_path = temp_dir / "sub-recipe.yaml" + sub_recipe_path.write_text(sub_recipe_content) + + parent_recipe = Recipe( + name="parent", + description="Parent recipe", + version="1.0.0", + steps=[ + Step(id="call-sub", type="recipe", recipe="sub-recipe.yaml"), + Step(id="after-sub", agent="a", prompt="After sub"), + ], + ) + + # Create session and request cancellation before execution + session_id = real_session_manager.create_session(parent_recipe, temp_dir) + real_session_manager.request_cancellation(session_id, temp_dir, immediate=True) + + mock_coordinator.get_capability.return_value = AsyncMock(return_value="result") + executor = RecipeExecutor(mock_coordinator, real_session_manager) + + with pytest.raises(CancellationRequestedError): + await executor.execute_recipe( + parent_recipe, {}, temp_dir, + session_id=session_id, + recipe_path=temp_dir / "parent.yaml" + ) + + # Spawn should never be called since cancellation was pre-requested + mock_coordinator.get_capability.return_value.assert_not_called() diff --git a/modules/tool-recipes/tests/test_executor_composition.py b/modules/tool-recipes/tests/test_executor_composition.py index 006c98a..dc7214d 100644 --- a/modules/tool-recipes/tests/test_executor_composition.py +++ b/modules/tool-recipes/tests/test_executor_composition.py @@ -33,6 +33,9 @@ def mock_session_manager(): "completed_steps": [], "started": "2025-01-01T00:00:00", } + # Mock cancellation methods to return no cancellation by default + manager.is_cancellation_requested.return_value = False + manager.is_immediate_cancellation.return_value = False return manager diff --git a/modules/tool-recipes/tests/test_executor_conditions.py b/modules/tool-recipes/tests/test_executor_conditions.py index 6f304a7..d8c37ce 100644 --- a/modules/tool-recipes/tests/test_executor_conditions.py +++ b/modules/tool-recipes/tests/test_executor_conditions.py @@ -33,6 +33,9 @@ def mock_session_manager(temp_dir): "completed_steps": [], "started": "2025-01-01T00:00:00", } + # Mock cancellation methods to return no cancellation by default + manager.is_cancellation_requested.return_value = False + manager.is_immediate_cancellation.return_value = False return manager diff --git a/modules/tool-recipes/tests/test_executor_loops.py b/modules/tool-recipes/tests/test_executor_loops.py index 731780f..76ff7f2 100644 --- a/modules/tool-recipes/tests/test_executor_loops.py +++ b/modules/tool-recipes/tests/test_executor_loops.py @@ -33,6 +33,9 @@ def mock_session_manager(): "completed_steps": [], "started": "2025-01-01T00:00:00", } + # Mock cancellation methods to return no cancellation by default + manager.is_cancellation_requested.return_value = False + manager.is_immediate_cancellation.return_value = False return manager