Skip to content

Commit 693cd3c

Browse files
refactor(pipeline): make StepContext generic by moving domain fields to subclasses
Strip StepContext down to sample + metadata only; domain-specific fields (skillbook, agent_output, reflection, etc.) are added via subclassing. Update branch merge functions to inspect subclass fields via type(ctxs[0]), accept pre-built StepContext in run()/run_async() instead of raw samples, and add background_stats() for monitoring background thread progress.
1 parent c0b4f49 commit 693cd3c

4 files changed

Lines changed: 96 additions & 118 deletions

File tree

docs/PIPELINE_DESIGN.md

Lines changed: 51 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
# Pipeline Architecture Design
22

3-
Design decisions for the generalized pipeline system.
4-
3+
Design decisions for the generalized pipeline system. Trying to keep is as generic as possible.
54
---
65

76
## Core Primitives
@@ -55,16 +54,16 @@ class StepProtocol(Protocol):
5554

5655
### StepContext — immutability contract
5756

58-
`StepContext` is a frozen dataclass. Steps never mutate the incoming context — they return a new one via `.replace()`:
57+
`StepContext` is a frozen dataclass. Steps never mutate the incoming context — they return a new one via `.replace()`.
58+
59+
The pipeline engine defines a minimal base with only two fields:
5960

6061
```python
6162
from types import MappingProxyType
6263

6364
@dataclass(frozen=True)
6465
class StepContext:
6566
sample: Any
66-
agent_output: str | None = None
67-
reflection: str | None = None
6867
metadata: MappingProxyType = field(default_factory=lambda: MappingProxyType({}))
6968

7069
def __post_init__(self):
@@ -76,6 +75,38 @@ class StepContext:
7675
return dataclasses.replace(self, **changes)
7776
```
7877

78+
The engine never reads anything beyond `sample` and `metadata`. All domain-specific fields are added by subclassing.
79+
80+
#### Subclassing for domain fields
81+
82+
Consuming applications subclass `StepContext` to add named fields for concepts shared across their pipelines:
83+
84+
```python
85+
@dataclass(frozen=True)
86+
class ACEContext(StepContext):
87+
# Shared across all ACE pipelines
88+
skillbook: Skillbook | None = None
89+
environment: TaskEnvironment | None = None
90+
91+
# Produced by steps (None until the providing step runs)
92+
agent_output: AgentOutput | None = None
93+
environment_result: EnvironmentResult | None = None
94+
reflection: ReflectorOutput | None = None
95+
skill_manager_output: UpdateBatch | None = None
96+
97+
# Runner bookkeeping
98+
epoch: int = 1
99+
total_epochs: int = 1
100+
step_index: int = 0
101+
total_steps: int = 0
102+
```
103+
104+
The `requires`/`provides` validation works on attribute names (strings) — it checks that the field exists on the context object at runtime, so it is subclass-agnostic. A step that declares `requires = {"skillbook"}` works whether the context is `ACEContext` or any other subclass that has a `skillbook` attribute.
105+
106+
Data that is specific to a single integration or step goes in `metadata` to prevent field accumulation on the subclass. For example, `metadata["browser_history"]` for browser-use or `metadata["transcript_path"]` for Claude Code.
107+
108+
#### Immutable update patterns
109+
79110
Updating metadata follows the same immutable pattern as any other field:
80111

81112
```python
@@ -86,14 +117,12 @@ Steps follow this pattern:
86117

87118
```python
88119
def __call__(self, ctx: StepContext) -> StepContext:
89-
result = self.agent.run(ctx.sample)
90-
return ctx.replace(agent_output=result)
120+
result = do_work(ctx.sample)
121+
return ctx.replace(result=result)
91122
```
92123

93124
`frozen=True` makes mutation a hard error at runtime rather than a subtle bug. It also makes `Branch` safe by default — since `StepContext` is immutable, all branches can receive the same object without risk; no deep copy is needed.
94125

95-
**Field naming rule:** Named fields (`agent_output`, `reflection`) are reserved for concepts shared across all ACE pipelines. Integration-specific data always goes in `metadata`. This prevents the base class from accumulating fields over time as integrations are added.
96-
97126
---
98127

99128
## Pipeline
@@ -121,10 +150,10 @@ pipe = (
121150
)
122151
```
123152

124-
**Fan-out across samples:**
153+
**Fan-out across contexts:**
125154

126155
```python
127-
pipe.run(samples, workers=4) # same pipeline, N samples in parallel
156+
pipe.run(contexts, workers=4) # same pipeline, N contexts in parallel
128157
```
129158

130159
### Inner pipeline as a fan-out step
@@ -135,10 +164,11 @@ A `Pipeline`-as-`Step` receives one context and must return one context — but
135164
class MultiSearchStep:
136165
"""Generates N queries from one context, runs them in parallel, merges."""
137166
def __call__(self, ctx: StepContext) -> StepContext:
138-
queries = generate_queries(ctx.sample) # 1 → N sub-inputs
167+
queries = generate_queries(ctx.sample) # 1 → N
168+
sub_ctxs = [StepContext(sample=q) for q in queries]
139169
sub_pipe = Pipeline().then(FetchStep())
140-
results = sub_pipe.run(queries, workers=len(queries)) # parallel
141-
return ctx.replace(agent_output=merge(results)) # N → 1
170+
results = sub_pipe.run(sub_ctxs, workers=len(queries)) # parallel
171+
return ctx.replace(agent_output=merge(results)) # N → 1
142172
```
143173

144174
`sub_pipe.run()` is a top-level runner call, so `async_boundary` and `workers` on its inner steps fire normally. From the outer pipeline's perspective, `MultiSearchStep` is a black box that takes one context and returns one context — the fan-out is an internal implementation detail.
@@ -265,7 +295,7 @@ for step in self.steps:
265295
ctx = await asyncio.to_thread(step, ctx)
266296
```
267297

268-
Pipeline entry points: `pipe.run(samples)` for sync contexts, `await pipe.run_async(samples)` for async contexts (e.g. inside browser-use).
298+
Pipeline entry points: `pipe.run(contexts)` for sync callers, `await pipe.run_async(contexts)` for async callers (e.g. inside browser-use).
269299

270300
This type is about **not blocking**. Nothing runs in parallel — the pipeline is still sequential, it just yields the thread during waits.
271301

@@ -394,7 +424,7 @@ These two knobs control different thread pools and do not interact:
394424

395425
| Knob | Pool | Controls |
396426
|---|---|---|
397-
| `pipe.run(samples, workers=N)` | foreground pool | how many samples run through pre-boundary steps simultaneously |
427+
| `pipe.run(contexts, workers=N)` | foreground pool | how many contexts run through pre-boundary steps simultaneously |
398428
| `step.max_workers = K` | background pool per step class | how many instances of that step run in the background simultaneously |
399429

400430
A sample leaves the foreground pool when it crosses the `async_boundary` point and enters the background step's pool. With `workers=4` and `ReflectStep.max_workers=3`, you can have 4 samples in Agent/Evaluate and 3 reflections running concurrently — two separate pools, no multiplication.
@@ -413,15 +443,14 @@ Failure semantics differ depending on which side of the `async_boundary` a step
413443

414444
```python
415445
# Pipeline runner (foreground loop)
416-
for sample in samples:
446+
for ctx in contexts:
417447
try:
418-
ctx = initial_context(sample)
419448
for step in self.foreground_steps:
420449
ctx = step(ctx)
421450
self._submit_to_background(ctx)
422-
results.append(SampleResult(sample=sample, output=ctx, error=None, failed_at=None))
451+
results.append(SampleResult(sample=ctx.sample, output=ctx, error=None, failed_at=None))
423452
except Exception as e:
424-
results.append(SampleResult(sample=sample, output=None, error=e, failed_at=type(step).__name__))
453+
results.append(SampleResult(sample=ctx.sample, output=None, error=e, failed_at=type(step).__name__))
425454
```
426455

427456
**Background steps** (after the boundary): the caller has already moved on, so exceptions cannot propagate. Background failures are captured and attached to the `SampleResult` — nothing is dropped silently.
@@ -442,60 +471,9 @@ When a `Branch` step fails, `failed_at` is `"Branch"` and `error` is a `BranchEr
442471

443472
Retry logic is the responsibility of individual steps, not the pipeline.
444473

445-
**Shutdown:** `wait_for_learning(timeout=N)` raises `TimeoutError` if background steps have not drained within `N` seconds. Individual step implementations are responsible for their own per-call timeouts (e.g. LLM API call timeouts).
446-
447-
---
448-
449-
## Integrations as Pipelines
450-
451-
Each external framework integration (browser-use, LangChain, Claude Code) is its own `Pipeline` subclass with integration-specific steps. It is **not** embedded as a step inside `ACEPipeline`.
452-
453-
```
454-
ace/integrations/
455-
browser_use/
456-
pipeline.py ← BrowserPipeline
457-
steps/
458-
execute.py ← BrowserExecuteStep
459-
langchain/
460-
pipeline.py ← LangChainPipeline
461-
steps/
462-
execute.py ← LangChainExecuteStep
463-
claude_code/
464-
pipeline.py ← ClaudeCodePipeline
465-
steps/
466-
execute.py ← ClaudeCodeExecuteStep
467-
persist.py ← PersistStep
468-
```
469-
470-
Each integration pipeline replaces `AgentStep + EvaluateStep` with its own execute step, then reuses the shared `ReflectStep` and `UpdateStep`:
471-
472-
```python
473-
BrowserPipeline:
474-
[BrowserExecuteStep, ReflectStep, UpdateStep]
475-
476-
LangChainPipeline:
477-
[LangChainExecuteStep, ReflectStep, UpdateStep]
478-
479-
ClaudeCodePipeline:
480-
[ClaudeCodeExecuteStep, ReflectStep, UpdateStep, PersistStep]
481-
```
482-
483-
---
484-
485-
## Generic Steps Folder
486-
487-
`ace/pipeline/steps/` contains only steps that are reusable across any pipeline — one file per class:
488-
489-
```
490-
ace/pipeline/steps/
491-
__init__.py
492-
agent.py ← AgentStep
493-
evaluate.py ← EvaluateStep
494-
reflect.py ← ReflectStep
495-
update.py ← UpdateStep
496-
```
474+
**Shutdown:** `wait_for_background(timeout=N)` raises `TimeoutError` if background steps have not drained within `N` seconds. Individual step implementations are responsible for their own per-call timeouts (e.g. LLM API call timeouts).
497475

498-
Integration-specific steps live next to their pipeline, not here.
476+
**Monitoring:** `background_stats()` returns a `dict` with `active` and `completed` counts for background threads. Thread-safe — can be called from any thread while the pipeline is running. This is the public API for monitoring background progress; callers should not access `_bg_lock` or `_bg_threads` directly.
499477

500478
---
501479

pipeline/branch.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,12 +30,14 @@ def _merge_raise_on_conflict(ctxs: list[StepContext]) -> StepContext:
3030
3131
Metadata is always merged (union across all branches; last writer wins
3232
within metadata — there is no named-field semantic there).
33+
34+
Uses ``type(ctxs[0])`` so subclass fields are included in the comparison.
3335
"""
3436
if len(ctxs) == 1:
3537
return ctxs[0]
3638

3739
conflicts: set[str] = set()
38-
for f in dataclasses.fields(StepContext):
40+
for f in dataclasses.fields(type(ctxs[0])):
3941
if f.name == "metadata":
4042
continue
4143
first_val = getattr(ctxs[0], f.name)
@@ -56,15 +58,19 @@ def _merge_raise_on_conflict(ctxs: list[StepContext]) -> StepContext:
5658

5759

5860
def _merge_last_write_wins(ctxs: list[StepContext]) -> StepContext:
59-
"""Last branch's value wins for every conflicting field."""
61+
"""Last branch's value wins for every conflicting field.
62+
63+
Uses ``type(ctxs[0])`` so subclass fields are included in the comparison.
64+
"""
6065
if len(ctxs) == 1:
6166
return ctxs[0]
6267

6368
# Start from first context, overlay with each subsequent one
6469
result = ctxs[0]
70+
ctx_type = type(ctxs[0])
6571
for ctx in ctxs[1:]:
6672
changes: dict = {}
67-
for f in dataclasses.fields(StepContext):
73+
for f in dataclasses.fields(ctx_type):
6874
if f.name == "metadata":
6975
continue
7076
val = getattr(ctx, f.name)

pipeline/context.py

Lines changed: 11 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -5,54 +5,33 @@
55
import dataclasses
66
from dataclasses import dataclass, field
77
from types import MappingProxyType
8-
from typing import Any
8+
from typing import Any, Self
99

1010

1111
@dataclass(frozen=True)
1212
class StepContext:
1313
"""Frozen context object passed from step to step.
1414
15-
Named fields cover every concept shared across ACE pipelines.
16-
Integration-specific data goes in ``metadata`` so named fields never grow
17-
as integrations are added.
15+
The pipeline engine only requires ``sample`` and ``metadata``. All
16+
domain-specific fields are added by subclassing — the engine never reads
17+
anything beyond these two fields.
18+
19+
Consuming applications subclass ``StepContext`` to add named fields for
20+
concepts shared across their pipelines. Integration-specific data goes
21+
in ``metadata`` to prevent field accumulation on the subclass.
1822
1923
Steps never mutate the incoming context — they call ``.replace()`` to
2024
produce a new one.
2125
"""
2226

23-
# Core input
2427
sample: Any = None
25-
skillbook: Any = None
26-
environment: Any = None
27-
28-
# Epoch / progress counters (set by the runner, not by steps)
29-
epoch: int = 1
30-
total_epochs: int = 1
31-
step_index: int = 0
32-
total_steps: int = 0
33-
34-
# Rolling window of past reflections (tuple for immutability)
35-
recent_reflections: tuple = field(default_factory=tuple)
36-
37-
# Named outputs produced by the four ACE steps
38-
agent_output: Any = None
39-
environment_result: Any = None
40-
reflection: Any = None
41-
skill_manager_output: Any = None
42-
43-
# Integration-specific payload — always goes here, never as a new named field
4428
metadata: MappingProxyType = field(default_factory=lambda: MappingProxyType({}))
4529

4630
def __post_init__(self) -> None:
4731
# Coerce plain dict → MappingProxyType so mutation is a hard runtime error
4832
if not isinstance(self.metadata, MappingProxyType):
4933
object.__setattr__(self, "metadata", MappingProxyType(self.metadata))
50-
# Coerce list/other iterables → tuple for immutability
51-
if not isinstance(self.recent_reflections, tuple):
52-
object.__setattr__(
53-
self, "recent_reflections", tuple(self.recent_reflections)
54-
)
55-
56-
def replace(self, **changes: Any) -> "StepContext":
57-
"""Return a new StepContext with the given fields replaced."""
34+
35+
def replace(self, **changes: Any) -> Self:
36+
"""Return a new context with the given fields replaced."""
5837
return dataclasses.replace(self, **changes)

0 commit comments

Comments
 (0)