diff --git a/autobot-backend/orchestration/__init__.py b/autobot-backend/orchestration/__init__.py index 5769b24cb..5b10f081b 100644 --- a/autobot-backend/orchestration/__init__.py +++ b/autobot-backend/orchestration/__init__.py @@ -16,9 +16,17 @@ - dag_executor: DAG-based execution with condition/branch routing (#2140) - error_handler: Step-level error handling and workflow checkpointing (#2154) - execution_modes: Dry-run validation and step-by-step debug mode (#2148) +- sub_workflow: Sub-workflow composition — workflows as reusable building blocks (#2143) """ from .agent_registry import AgentRegistry, get_default_agents +from .sub_workflow import ( + MAX_NESTING_DEPTH, + SubWorkflowExecutor, + SubWorkflowStep, + extract_sub_workflow_step, + is_sub_workflow_step, +) from .dag_executor import DAGExecutor, NodeType, WorkflowDAG, build_dag, workflow_has_condition_nodes from .error_handler import ( BackoffStrategy, @@ -61,6 +69,12 @@ "WorkflowExecutor", "WorkflowMemory", "WorkflowPlanner", + # Sub-workflow composition (#2143) + "MAX_NESTING_DEPTH", + "SubWorkflowExecutor", + "SubWorkflowStep", + "extract_sub_workflow_step", + "is_sub_workflow_step", # DAG execution (#2140) "DAGExecutor", "NodeType", diff --git a/autobot-backend/orchestration/sub_workflow.py b/autobot-backend/orchestration/sub_workflow.py new file mode 100644 index 000000000..1bae95ae7 --- /dev/null +++ b/autobot-backend/orchestration/sub_workflow.py @@ -0,0 +1,315 @@ +# AutoBot - AI-Powered Automation Platform +# Copyright (c) 2025 mrveiss +# Author: mrveiss +""" +Sub-Workflow Composition — workflows as reusable building blocks. + +Issue #2143: Allow a workflow step to invoke another stored workflow by ID, +mapping parent variables into the child's input context and capturing child +outputs under a named key in the parent step_outputs registry. + +Key classes +----------- +SubWorkflowStep + Typed description of a sub-workflow invocation embedded inside a parent + workflow step dict. + +SubWorkflowExecutor + Resolves input mappings, executes the child workflow through a provided + WorkflowExecutor instance (recursive composition), captures outputs, and + enforces a nesting depth limit to prevent infinite recursion. + +Module-level helpers +-------------------- +is_sub_workflow_step(step) + Return True when *step* carries ``type="sub_workflow"`` and a non-empty + ``workflow_id``. + +extract_sub_workflow_step(step) + Parse a raw step dict into a SubWorkflowStep dataclass. +""" + +import logging +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional + +from .variable_resolver import StepOutput, VariableResolver + +if TYPE_CHECKING: + # Avoid a circular import at runtime — WorkflowExecutor imports this module. + from .workflow_executor import WorkflowExecutor + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Constants +# --------------------------------------------------------------------------- + +#: Hard limit on how deeply sub-workflows may nest to prevent runaway recursion. +MAX_NESTING_DEPTH: int = 5 + +# --------------------------------------------------------------------------- +# Data model +# --------------------------------------------------------------------------- + + +@dataclass +class SubWorkflowStep: + """ + Typed representation of a sub-workflow invocation step. + + Attributes: + workflow_id: ID of the child workflow to execute. + input_mapping: Dict mapping child input key → parent variable expression. + Expressions follow the ``${steps..}`` syntax + resolved by VariableResolver before execution. + output_key: Key under which the child's outputs are stored in the + parent's ``step_outputs`` registry after execution. + step_id: Step identifier in the parent workflow (from ``step["id"]``). + """ + + workflow_id: str + input_mapping: Dict[str, str] = field(default_factory=dict) + output_key: str = "sub_workflow_output" + step_id: str = "" + + +# --------------------------------------------------------------------------- +# WorkflowFetcher protocol (callable type alias) +# --------------------------------------------------------------------------- + +#: Callable that retrieves a workflow definition by ID. +#: Returns a dict with at minimum ``{"steps": [...]}``; may include ``"edges"``. +#: Returns None when the workflow is not found. +WorkflowFetcher = Callable[[str], Optional[Dict[str, Any]]] + + +# --------------------------------------------------------------------------- +# SubWorkflowExecutor +# --------------------------------------------------------------------------- + + +class SubWorkflowExecutor: + """ + Executes a sub-workflow step within a parent workflow execution. + + Responsibilities + ---------------- + 1. Enforce a maximum nesting depth (``MAX_NESTING_DEPTH``) so recursive + or accidentally cyclic compositions fail loudly rather than overflowing. + 2. Apply ``input_mapping`` — resolve parent variable expressions and build + the child workflow's input context dict. + 3. Delegate execution to the parent ``WorkflowExecutor`` instance (which + already holds all agent callbacks and supporting infrastructure). + 4. Wrap the child execution context in a ``StepOutput`` keyed under + ``output_key`` so parent steps can reference child results via the + standard ``${steps..output.*}`` syntax. + + Args: + workflow_executor: The parent ``WorkflowExecutor`` instance. Reused + for child execution so circuit breaker, retry, and + checkpoint machinery applies uniformly. + workflow_fetcher: Callable ``(workflow_id: str) → workflow_dict | None``. + Injected to avoid coupling this module to any + particular storage backend. + """ + + def __init__( + self, + workflow_executor: "WorkflowExecutor", + workflow_fetcher: WorkflowFetcher, + ) -> None: + self._executor = workflow_executor + self._fetch_workflow = workflow_fetcher + + async def execute( + self, + sub_step: SubWorkflowStep, + parent_context: Dict[str, Any], + parent_step_outputs: Dict[str, StepOutput], + current_depth: int = 0, + ) -> Dict[str, Any]: + """ + Execute a sub-workflow and return the step result dict. + + The result follows the same shape used by ``_execute_coordinated_step``: + ``{"success": True/False, "step_id": ..., "sub_workflow_result": {...}}``. + + Args: + sub_step: Parsed sub-workflow step descriptor. + parent_context: Parent workflow's context dict (forwarded to child). + parent_step_outputs: Completed step outputs from the parent execution, + used to resolve ``${steps.…}`` references in + ``input_mapping`` values. + current_depth: Current nesting depth (0 = top-level call). + + Returns: + Step result dict with ``success`` bool and ``sub_workflow_result`` key. + """ + if current_depth >= MAX_NESTING_DEPTH: + logger.error( + "Sub-workflow '%s' (step %s): maximum nesting depth %d reached — aborting", + sub_step.workflow_id, + sub_step.step_id, + MAX_NESTING_DEPTH, + ) + raise RecursionError( + f"Sub-workflow '{sub_step.workflow_id}' exceeds maximum nesting depth " + f"of {MAX_NESTING_DEPTH}. Check for circular workflow references." + ) + + logger.info( + "Sub-workflow step %s: starting child workflow '%s' (depth=%d)", + sub_step.step_id, + sub_step.workflow_id, + current_depth, + ) + + workflow_def = self._fetch_workflow(sub_step.workflow_id) + if workflow_def is None: + logger.error( + "Sub-workflow step %s: workflow '%s' not found", + sub_step.step_id, + sub_step.workflow_id, + ) + raise ValueError( + f"Sub-workflow step '{sub_step.step_id}': " + f"workflow '{sub_step.workflow_id}' not found." + ) + + child_steps: List[Dict[str, Any]] = workflow_def.get("steps", []) + child_edges: List[Dict[str, Any]] = workflow_def.get("edges", []) + + child_context = self._build_child_context( + sub_step, parent_context, parent_step_outputs + ) + + child_result = await self._executor.execute_coordinated_workflow( + workflow_id=sub_step.workflow_id, + steps=child_steps, + context=child_context, + edges=child_edges or None, + ) + + success = child_result.get("status") == "completed" + logger.info( + "Sub-workflow step %s: child '%s' finished — status=%s", + sub_step.step_id, + sub_step.workflow_id, + child_result.get("status"), + ) + + return { + "success": success, + "step_id": sub_step.step_id, + "sub_workflow_result": child_result, + "output_key": sub_step.output_key, + } + + # ------------------------------------------------------------------ + # Private helpers + # ------------------------------------------------------------------ + + def _build_child_context( + self, + sub_step: SubWorkflowStep, + parent_context: Dict[str, Any], + parent_step_outputs: Dict[str, StepOutput], + ) -> Dict[str, Any]: + """ + Build the context dict passed to the child workflow. + + Starts from a shallow copy of *parent_context*, then applies + ``input_mapping`` to inject resolved parent variable values under + the child's expected input keys. + + Args: + sub_step: The sub-workflow step descriptor. + parent_context: Parent execution context. + parent_step_outputs: Parent step outputs for variable resolution. + + Returns: + Child context dict with mapped inputs merged in. + """ + child_context: Dict[str, Any] = dict(parent_context) + child_context["_sub_workflow_inputs"] = {} + + if not sub_step.input_mapping: + return child_context + + resolver = VariableResolver() + resolved_inputs: Dict[str, Any] = {} + + for child_key, parent_expr in sub_step.input_mapping.items(): + resolved = resolver.resolve(parent_expr, parent_step_outputs) + if resolved == parent_expr and "${steps." in parent_expr: + logger.warning( + "Sub-workflow step %s: input_mapping key '%s' — " + "expression '%s' could not be resolved; passing raw expression", + sub_step.step_id, + child_key, + parent_expr, + ) + resolved_inputs[child_key] = resolved + logger.debug( + "Sub-workflow step %s: mapped input '%s' = %r", + sub_step.step_id, + child_key, + resolved, + ) + + child_context["_sub_workflow_inputs"] = resolved_inputs + return child_context + + +# --------------------------------------------------------------------------- +# Module-level helpers +# --------------------------------------------------------------------------- + + +def is_sub_workflow_step(step: Dict[str, Any]) -> bool: + """ + Return True when *step* is a sub-workflow invocation step. + + A step qualifies when it carries ``type="sub_workflow"`` and a + non-empty ``workflow_id`` field. + + Issue #2143. + """ + return step.get("type") == "sub_workflow" and bool(step.get("workflow_id")) + + +def extract_sub_workflow_step(step: Dict[str, Any]) -> SubWorkflowStep: + """ + Parse a raw step dict into a ``SubWorkflowStep`` dataclass. + + Expected step dict shape:: + + { + "id": "invoke_child", + "type": "sub_workflow", + "workflow_id": "wf-data-pipeline", + "input_mapping": { + "dataset_path": "${steps.fetch.output.path}", + "threshold": "0.8" + }, + "output_key": "pipeline_result" + } + + Raises: + ValueError: when ``workflow_id`` is absent or empty. + + Issue #2143. + """ + workflow_id: str = step.get("workflow_id", "") + if not workflow_id: + raise ValueError( + f"Sub-workflow step '{step.get('id', '')}' is missing 'workflow_id'." + ) + + return SubWorkflowStep( + workflow_id=workflow_id, + input_mapping=step.get("input_mapping", {}), + output_key=step.get("output_key", "sub_workflow_output"), + step_id=step.get("id", ""), + ) diff --git a/autobot-backend/orchestration/sub_workflow_test.py b/autobot-backend/orchestration/sub_workflow_test.py new file mode 100644 index 000000000..b606a86d2 --- /dev/null +++ b/autobot-backend/orchestration/sub_workflow_test.py @@ -0,0 +1,293 @@ +# AutoBot - AI-Powered Automation Platform +# Copyright (c) 2025 mrveiss +# Author: mrveiss +"""Unit tests for sub-workflow composition. Issue #2143.""" + +from typing import Any, Dict, Optional +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from orchestration.sub_workflow import ( + MAX_NESTING_DEPTH, + SubWorkflowExecutor, + SubWorkflowStep, + extract_sub_workflow_step, + is_sub_workflow_step, +) +from orchestration.variable_resolver import StepOutput + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_workflow_executor() -> MagicMock: + """Return a MagicMock that satisfies WorkflowExecutor's interface.""" + executor = MagicMock() + executor.execute_coordinated_workflow = AsyncMock( + return_value={"status": "completed", "step_results": {}} + ) + return executor + + +def _make_step_output(data: Dict[str, Any], status: str = "completed") -> StepOutput: + import json + + stdout = json.dumps(data) + return StepOutput(status=status, stdout=stdout, parsed_json=data) + + +# --------------------------------------------------------------------------- +# is_sub_workflow_step +# --------------------------------------------------------------------------- + + +class TestIsSubWorkflowStep: + def test_valid_sub_workflow_step(self): + step = {"id": "s1", "type": "sub_workflow", "workflow_id": "wf-child"} + assert is_sub_workflow_step(step) is True + + def test_regular_step_returns_false(self): + assert is_sub_workflow_step({"id": "s1", "type": "step", "action": "run"}) is False + + def test_missing_type_returns_false(self): + assert is_sub_workflow_step({"id": "s1", "workflow_id": "wf-child"}) is False + + def test_empty_workflow_id_returns_false(self): + assert is_sub_workflow_step({"id": "s1", "type": "sub_workflow", "workflow_id": ""}) is False + + def test_missing_workflow_id_returns_false(self): + assert is_sub_workflow_step({"id": "s1", "type": "sub_workflow"}) is False + + +# --------------------------------------------------------------------------- +# extract_sub_workflow_step +# --------------------------------------------------------------------------- + + +class TestExtractSubWorkflowStep: + def test_minimal_valid_step(self): + step = {"id": "invoke", "type": "sub_workflow", "workflow_id": "wf-abc"} + sub = extract_sub_workflow_step(step) + assert sub.workflow_id == "wf-abc" + assert sub.step_id == "invoke" + assert sub.input_mapping == {} + assert sub.output_key == "sub_workflow_output" + + def test_full_step_mapping(self): + step = { + "id": "run-child", + "type": "sub_workflow", + "workflow_id": "wf-pipeline", + "input_mapping": {"path": "${steps.fetch.output.path}", "threshold": "0.8"}, + "output_key": "pipeline_result", + } + sub = extract_sub_workflow_step(step) + assert sub.workflow_id == "wf-pipeline" + assert sub.input_mapping == {"path": "${steps.fetch.output.path}", "threshold": "0.8"} + assert sub.output_key == "pipeline_result" + assert sub.step_id == "run-child" + + def test_missing_workflow_id_raises(self): + with pytest.raises(ValueError, match="missing 'workflow_id'"): + extract_sub_workflow_step({"id": "bad", "type": "sub_workflow"}) + + def test_empty_workflow_id_raises(self): + with pytest.raises(ValueError, match="missing 'workflow_id'"): + extract_sub_workflow_step({"id": "bad", "type": "sub_workflow", "workflow_id": ""}) + + +# --------------------------------------------------------------------------- +# SubWorkflowExecutor — basic execution +# --------------------------------------------------------------------------- + + +class TestSubWorkflowExecutorBasic: + def _make_executor(self, workflow_def: Optional[Dict[str, Any]]) -> SubWorkflowExecutor: + wf_executor = _make_workflow_executor() + fetcher = MagicMock(return_value=workflow_def) + return SubWorkflowExecutor(workflow_executor=wf_executor, workflow_fetcher=fetcher) + + @pytest.mark.asyncio + async def test_basic_execution_returns_success(self): + workflow_def = {"steps": [{"id": "child_step", "type": "step", "action": "do"}]} + executor = self._make_executor(workflow_def) + sub_step = SubWorkflowStep(workflow_id="wf-child", step_id="invoke") + + result = await executor.execute(sub_step, parent_context={}, parent_step_outputs={}) + + assert result["success"] is True + assert result["step_id"] == "invoke" + assert "sub_workflow_result" in result + + @pytest.mark.asyncio + async def test_failed_child_returns_success_false(self): + workflow_def = {"steps": []} + wf_executor = _make_workflow_executor() + wf_executor.execute_coordinated_workflow = AsyncMock( + return_value={"status": "failed", "step_results": {}} + ) + fetcher = MagicMock(return_value=workflow_def) + executor = SubWorkflowExecutor(workflow_executor=wf_executor, workflow_fetcher=fetcher) + sub_step = SubWorkflowStep(workflow_id="wf-fail", step_id="invoke") + + result = await executor.execute(sub_step, parent_context={}, parent_step_outputs={}) + + assert result["success"] is False + + @pytest.mark.asyncio + async def test_missing_workflow_id_raises_value_error(self): + executor = self._make_executor(None) + sub_step = SubWorkflowStep(workflow_id="wf-missing", step_id="invoke") + + with pytest.raises(ValueError, match="not found"): + await executor.execute(sub_step, parent_context={}, parent_step_outputs={}) + + @pytest.mark.asyncio + async def test_child_workflow_called_with_correct_id(self): + workflow_def = {"steps": []} + wf_executor = _make_workflow_executor() + fetcher = MagicMock(return_value=workflow_def) + executor = SubWorkflowExecutor(workflow_executor=wf_executor, workflow_fetcher=fetcher) + sub_step = SubWorkflowStep(workflow_id="wf-target", step_id="invoke") + + await executor.execute(sub_step, parent_context={}, parent_step_outputs={}) + + wf_executor.execute_coordinated_workflow.assert_awaited_once() + call_kwargs = wf_executor.execute_coordinated_workflow.call_args + assert call_kwargs.kwargs["workflow_id"] == "wf-target" + + +# --------------------------------------------------------------------------- +# SubWorkflowExecutor — variable mapping +# --------------------------------------------------------------------------- + + +class TestSubWorkflowExecutorVariableMapping: + def _make_executor(self, workflow_def: Dict[str, Any]) -> tuple: + wf_executor = _make_workflow_executor() + fetcher = MagicMock(return_value=workflow_def) + executor = SubWorkflowExecutor(workflow_executor=wf_executor, workflow_fetcher=fetcher) + return executor, wf_executor + + @pytest.mark.asyncio + async def test_literal_value_passed_as_child_input(self): + workflow_def = {"steps": []} + executor, wf_executor = self._make_executor(workflow_def) + sub_step = SubWorkflowStep( + workflow_id="wf-child", + step_id="invoke", + input_mapping={"threshold": "0.9"}, + ) + + await executor.execute(sub_step, parent_context={}, parent_step_outputs={}) + + call_kwargs = wf_executor.execute_coordinated_workflow.call_args + child_ctx = call_kwargs.kwargs["context"] + assert child_ctx["_sub_workflow_inputs"]["threshold"] == "0.9" + + @pytest.mark.asyncio + async def test_variable_reference_resolved_from_parent_outputs(self): + workflow_def = {"steps": []} + executor, wf_executor = self._make_executor(workflow_def) + parent_outputs = {"fetch": _make_step_output({"path": "/data/file.csv"})} + sub_step = SubWorkflowStep( + workflow_id="wf-child", + step_id="invoke", + input_mapping={"dataset_path": "${steps.fetch.output.path}"}, + ) + + await executor.execute(sub_step, parent_context={}, parent_step_outputs=parent_outputs) + + call_kwargs = wf_executor.execute_coordinated_workflow.call_args + child_ctx = call_kwargs.kwargs["context"] + assert child_ctx["_sub_workflow_inputs"]["dataset_path"] == "/data/file.csv" + + @pytest.mark.asyncio + async def test_unresolvable_reference_passed_as_raw_expression(self): + """Unresolvable ${steps.…} tokens are passed through unchanged (with a warning).""" + workflow_def = {"steps": []} + executor, wf_executor = self._make_executor(workflow_def) + sub_step = SubWorkflowStep( + workflow_id="wf-child", + step_id="invoke", + input_mapping={"key": "${steps.missing_step.output.value}"}, + ) + + await executor.execute(sub_step, parent_context={}, parent_step_outputs={}) + + call_kwargs = wf_executor.execute_coordinated_workflow.call_args + child_ctx = call_kwargs.kwargs["context"] + # Unresolvable token is left as-is + assert child_ctx["_sub_workflow_inputs"]["key"] == "${steps.missing_step.output.value}" + + @pytest.mark.asyncio + async def test_no_input_mapping_produces_empty_inputs(self): + workflow_def = {"steps": []} + executor, wf_executor = self._make_executor(workflow_def) + sub_step = SubWorkflowStep(workflow_id="wf-child", step_id="invoke") + + await executor.execute(sub_step, parent_context={}, parent_step_outputs={}) + + call_kwargs = wf_executor.execute_coordinated_workflow.call_args + child_ctx = call_kwargs.kwargs["context"] + assert child_ctx["_sub_workflow_inputs"] == {} + + @pytest.mark.asyncio + async def test_output_key_stored_in_result(self): + workflow_def = {"steps": []} + executor, _ = self._make_executor(workflow_def) + sub_step = SubWorkflowStep( + workflow_id="wf-child", + step_id="invoke", + output_key="my_custom_key", + ) + + result = await executor.execute(sub_step, parent_context={}, parent_step_outputs={}) + + assert result["output_key"] == "my_custom_key" + + +# --------------------------------------------------------------------------- +# SubWorkflowExecutor — max depth guard +# --------------------------------------------------------------------------- + + +class TestSubWorkflowExecutorMaxDepth: + @pytest.mark.asyncio + async def test_max_depth_raises_recursion_error(self): + wf_executor = _make_workflow_executor() + fetcher = MagicMock(return_value={"steps": []}) + executor = SubWorkflowExecutor(workflow_executor=wf_executor, workflow_fetcher=fetcher) + sub_step = SubWorkflowStep(workflow_id="wf-deep", step_id="invoke") + + with pytest.raises(RecursionError, match="maximum nesting depth"): + await executor.execute( + sub_step, + parent_context={}, + parent_step_outputs={}, + current_depth=MAX_NESTING_DEPTH, + ) + + @pytest.mark.asyncio + async def test_depth_just_below_max_executes_normally(self): + """Depth of MAX_NESTING_DEPTH - 1 should not raise.""" + wf_executor = _make_workflow_executor() + fetcher = MagicMock(return_value={"steps": []}) + executor = SubWorkflowExecutor(workflow_executor=wf_executor, workflow_fetcher=fetcher) + sub_step = SubWorkflowStep(workflow_id="wf-near-limit", step_id="invoke") + + result = await executor.execute( + sub_step, + parent_context={}, + parent_step_outputs={}, + current_depth=MAX_NESTING_DEPTH - 1, + ) + assert result["success"] is True + + def test_max_nesting_depth_constant(self): + """Sanity-check: the constant is a positive integer.""" + assert isinstance(MAX_NESTING_DEPTH, int) + assert MAX_NESTING_DEPTH > 0 diff --git a/autobot-backend/orchestration/workflow_executor.py b/autobot-backend/orchestration/workflow_executor.py index 957826cd9..d2a61fbff 100644 --- a/autobot-backend/orchestration/workflow_executor.py +++ b/autobot-backend/orchestration/workflow_executor.py @@ -14,6 +14,9 @@ each step executes; completed step results stored as StepOutput. Issue #2154: Step-level error handlers (retry/skip/fallback/pause/abort) and workflow resume-from-checkpoint via Redis. +Issue #2143: Sub-workflow composition — a step with type="sub_workflow" delegates + to SubWorkflowExecutor, which executes the child workflow and stores + its result as a StepOutput under the caller's step_outputs registry. """ import asyncio @@ -39,6 +42,7 @@ WorkflowCheckpointManager, ) from .execution_modes import DebugController, DryRunValidator, ExecutionMode +from .sub_workflow import SubWorkflowExecutor, extract_sub_workflow_step, is_sub_workflow_step from .types import AgentInteraction, AgentProfile from .variable_resolver import StepOutput, VariableResolver from .workflow_memory import WorkflowMemory @@ -64,16 +68,21 @@ def __init__( reserve_agent_callback: Callable[[str], None], release_agent_callback: Callable[[str], None], update_performance_callback: Callable[[str, bool, float], None], + workflow_fetcher: Optional[Callable[[str], Optional[Dict[str, Any]]]] = None, ): """ Initialize the workflow executor. Args: - agent_registry: Registry of available agents - agent_interactions: List to track agent interactions - reserve_agent_callback: Function to reserve an agent - release_agent_callback: Function to release an agent - update_performance_callback: Function to update agent performance + agent_registry: Registry of available agents. + agent_interactions: List to track agent interactions. + reserve_agent_callback: Function to reserve an agent. + release_agent_callback: Function to release an agent. + update_performance_callback: Function to update agent performance. + workflow_fetcher: Optional callable ``(workflow_id) → workflow_dict`` + used to load child workflows for sub-workflow + composition (Issue #2143). When None, sub-workflow + steps raise ``ValueError`` at execution time. """ self.agent_registry = agent_registry self.agent_interactions = agent_interactions @@ -85,6 +94,12 @@ def __init__( # Issue #2154: checkpoint manager and error handler self._checkpoint_manager = WorkflowCheckpointManager() self._error_handler = StepErrorHandler() + # Issue #2143: sub-workflow executor (None when fetcher not provided) + self._sub_workflow_executor: Optional[SubWorkflowExecutor] = ( + SubWorkflowExecutor(workflow_executor=self, workflow_fetcher=workflow_fetcher) + if workflow_fetcher is not None + else None + ) def _group_steps_by_dependency( self, steps: List[Dict[str, Any]] @@ -208,6 +223,11 @@ async def _execute_step_with_agent( step_outputs: Dict[str, StepOutput] = execution_context.get("step_outputs", {}) self._resolve_step_variables(step, step_outputs) + # Issue #2143: sub-workflow steps are handled before agent reservation. + if is_sub_workflow_step(step): + await self._execute_sub_workflow_step(step, execution_context, context) + return + if agent_id: self._reserve_agent(agent_id) @@ -243,6 +263,88 @@ async def _execute_step_with_agent( if agent_id: self._release_agent(agent_id) + async def _execute_sub_workflow_step( + self, + step: Dict[str, Any], + execution_context: Dict[str, Any], + context: Dict[str, Any], + ) -> None: + """ + Execute a sub-workflow invocation step and store its result. + + Delegates to SubWorkflowExecutor, then stores the child execution + context as a StepOutput in ``execution_context["step_outputs"]`` under + the step's ID, so parent steps can reference child results via the + standard ``${steps..output.*}`` variable syntax. + + Issue #2143. + + Args: + step: The sub-workflow step dict (type="sub_workflow"). + execution_context: Parent workflow's execution context (mutated in-place). + context: Parent workflow's input context forwarded to the child. + """ + step_id = step["id"] + step_start_time = time.time() + + if self._sub_workflow_executor is None: + logger.error( + "Sub-workflow step %s: no workflow_fetcher configured — cannot execute", step_id + ) + step["status"] = "failed" + step["result"] = { + "success": False, + "step_id": step_id, + "error": "WorkflowExecutor was not initialised with a workflow_fetcher; " + "sub-workflow steps cannot be executed.", + } + execution_context["step_results"][step_id] = step["result"] + return + + sub_step = extract_sub_workflow_step(step) + parent_step_outputs: Dict[str, StepOutput] = execution_context.get("step_outputs", {}) + + try: + step_result = await self._sub_workflow_executor.execute( + sub_step=sub_step, + parent_context=context, + parent_step_outputs=parent_step_outputs, + ) + except Exception as exc: + elapsed = time.time() - step_start_time + logger.error("Sub-workflow step %s failed: %s", step_id, exc) + step["status"] = "failed" + step["execution_time"] = elapsed + step["result"] = {"success": False, "step_id": step_id, "error": str(exc)} + execution_context["step_results"][step_id] = step["result"] + return + + elapsed = time.time() - step_start_time + step["status"] = "completed" if step_result.get("success") else "failed" + step["execution_time"] = elapsed + step["result"] = step_result + execution_context["step_results"][step_id] = step_result + + # Store the child's execution context as a StepOutput for variable piping. + if "step_outputs" in execution_context: + child_ctx = step_result.get("sub_workflow_result", {}) + stdout = "" + execution_context["step_outputs"][step_id] = StepOutput( + status="completed" if step_result.get("success") else "failed", + stdout=stdout, + parsed_json=child_ctx if isinstance(child_ctx, dict) else None, + metadata={"output_key": sub_step.output_key, "execution_time": elapsed}, + ) + logger.debug( + "Sub-workflow step %s: stored child result under step_outputs[%s]", + step_id, + step_id, + ) + + # Issue #2154: checkpoint after successful completion. + if step_result.get("success"): + self._save_checkpoint(execution_context.get("workflow_id", ""), step_id, step_result) + async def _execute_step_with_retry( self, step: Dict[str, Any],