Skip to content

Commit 8d8d3c0

Browse files
Merge pull request #85 from kayba-ai/test/David/Opik
feat(opik): Redesign Opik integration with pipeline-level tracing
2 parents 8e6e42e + f9c53b6 commit 8d8d3c0

13 files changed

Lines changed: 487 additions & 103 deletions

File tree

AGENTS.md

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co
44

55
## Repository Guidelines
66

7-
### Design Document Maintenance
7+
### Documentation Maintenance
88
Before working on code in `ace/` or `ace_next/`, read `docs/ACE_DESIGN.md` to understand the current architecture.
99
Before working on code in `pipeline/` or `ace_next/core/`, read `docs/PIPELINE_DESIGN.md` to understand the pipeline engine.
1010

11-
When changes to code alter architecture, add new modules, change public APIs, rename concepts, or modify execution flow described in these documents, update the respective design doc to reflect the changes:
11+
**Docs MUST be kept in sync with code.** Any change that alters a public API, renames a concept, adds/removes a module, or changes execution flow **requires** a corresponding update to the relevant docs. Do not merge code changes that make the documentation inaccurate.
12+
13+
Key design docs:
1214
- `docs/ACE_DESIGN.md` — core ACE architecture: roles, skillbook, adaptation loops, insight levels, integration patterns
1315
- `docs/PIPELINE_DESIGN.md` — pipeline engine: steps, StepProtocol, Pipeline, SubRunner, RR pipeline
1416

@@ -20,6 +22,14 @@ When changes to code alter architecture, add new modules, change public APIs, re
2022
- `examples/` — runnable demos grouped by integration
2123
- `benchmarks/`, `scripts/` — research/evaluation tooling (not shipped to PyPI)
2224
- `docs/` — guides and reference material
25+
- `docs/getting-started/` — installation, setup, quick-start
26+
- `docs/concepts/` — core concepts: roles, skillbook, insight levels
27+
- `docs/guides/` — in-depth guides: full pipeline, integration, testing, prompts
28+
- `docs/integrations/` — per-integration docs (LiteLLM, LangChain, browser-use, Claude Code, Opik)
29+
- `docs/pipeline/` — pipeline engine docs: core concepts, custom steps, branching, error handling
30+
- `docs/api/` — API reference
31+
- `docs/ACE_DESIGN.md` — architecture design doc (keep in sync with code)
32+
- `docs/PIPELINE_DESIGN.md` — pipeline engine design doc (keep in sync with code)
2333

2434
### Commands
2535
- `uv sync` — install all dependencies

ace_next/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
LangChain,
2929
TraceAnalyser,
3030
)
31+
from .rr import RRConfig, RRStep
3132
from .steps.opik import OPIK_AVAILABLE, OpikStep, register_opik_litellm_callback
3233

3334
__all__ = [
@@ -56,6 +57,9 @@
5657
"ClaudeCode",
5758
"LangChain",
5859
"TraceAnalyser",
60+
# Recursive Reflector
61+
"RRStep",
62+
"RRConfig",
5963
# Deduplication
6064
"DeduplicationConfig",
6165
"DeduplicationManager",

ace_next/runners/ace.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from typing import Any
88

99
from pipeline import Pipeline
10-
from pipeline.protocol import SampleResult
10+
from pipeline.protocol import SampleResult, StepProtocol
1111

1212
from ..core.context import ACEStepContext, SkillbookView
1313
from ..core.environments import Sample, TaskEnvironment
@@ -48,6 +48,7 @@ def from_roles(
4848
dedup_interval: int = 10,
4949
checkpoint_dir: str | Path | None = None,
5050
checkpoint_interval: int = 10,
51+
extra_steps: list[StepProtocol] | None = None,
5152
) -> ACE:
5253
"""Construct from pre-built role instances.
5354
@@ -64,6 +65,8 @@ def from_roles(
6465
dedup_interval: Samples between deduplication runs.
6566
checkpoint_dir: Directory for checkpoint files.
6667
checkpoint_interval: Samples between checkpoint saves.
68+
extra_steps: Additional steps appended after the learning
69+
tail (e.g. ``OpikStep``).
6770
"""
6871
skillbook = skillbook or Skillbook()
6972
steps = [
@@ -79,6 +82,8 @@ def from_roles(
7982
checkpoint_interval=checkpoint_interval,
8083
),
8184
]
85+
if extra_steps:
86+
steps.extend(extra_steps)
8287
return cls(pipeline=Pipeline(steps), skillbook=skillbook)
8388

8489
def run(

ace_next/runners/litellm.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ def __init__(
6767
checkpoint_dir: str | Path | None = None,
6868
checkpoint_interval: int = 10,
6969
is_learning: bool = True,
70+
opik: bool = False,
71+
opik_project: str = "ace-framework",
72+
opik_tags: list[str] | None = None,
7073
) -> None:
7174
# Resolve skillbook
7275
if skillbook_path:
@@ -98,6 +101,29 @@ def __init__(
98101
self._checkpoint_dir = checkpoint_dir
99102
self._checkpoint_interval = checkpoint_interval
100103

104+
# Opik observability (explicit opt-in — fail loudly)
105+
self._opik_step: Any = None
106+
if opik:
107+
from ..steps.opik import OPIK_AVAILABLE, OpikStep, register_opik_litellm_callback
108+
109+
if not OPIK_AVAILABLE:
110+
raise ImportError(
111+
"opik=True requires the 'opik' package. "
112+
"Install it with: pip install ace-framework[observability]"
113+
)
114+
115+
self._opik_step = OpikStep(
116+
project_name=opik_project,
117+
tags=opik_tags,
118+
)
119+
if not self._opik_step.enabled:
120+
raise RuntimeError(
121+
"OpikStep failed to initialize. Check your Opik configuration "
122+
"(~/.opik.config, OPIK_API_KEY, OPIK_WORKSPACE env vars)."
123+
)
124+
# Register LiteLLM-level callback for per-call token/cost tracking
125+
register_opik_litellm_callback(project_name=opik_project)
126+
101127
# Lazy-init caches
102128
self._ace: ACE | None = None
103129
self._analyser: TraceAnalyser | None = None
@@ -128,6 +154,9 @@ def from_model(
128154
checkpoint_dir: Optional[Union[str, Path]] = None,
129155
checkpoint_interval: int = 10,
130156
is_learning: bool = True,
157+
opik: bool = False,
158+
opik_project: str = "ace-framework",
159+
opik_tags: Optional[list[str]] = None,
131160
**llm_kwargs: Any,
132161
) -> ACELiteLLM:
133162
"""Build from a model string (creates LiteLLMClient + roles).
@@ -148,6 +177,10 @@ def from_model(
148177
checkpoint_dir: Directory for checkpoint files.
149178
checkpoint_interval: Samples between checkpoint saves.
150179
is_learning: Whether learning is enabled.
180+
opik: Enable Opik observability (pipeline traces +
181+
LiteLLM per-call token/cost tracking).
182+
opik_project: Opik project name.
183+
opik_tags: Tags applied to every Opik trace.
151184
**llm_kwargs: Extra kwargs forwarded to ``LiteLLMClient``.
152185
"""
153186
from ..providers import LiteLLMClient
@@ -170,12 +203,21 @@ def from_model(
170203
checkpoint_dir=checkpoint_dir,
171204
checkpoint_interval=checkpoint_interval,
172205
is_learning=is_learning,
206+
opik=opik,
207+
opik_project=opik_project,
208+
opik_tags=opik_tags,
173209
)
174210

175211
# ------------------------------------------------------------------
176212
# Lazy-init runners
177213
# ------------------------------------------------------------------
178214

215+
def _get_extra_steps(self) -> list[Any] | None:
216+
"""Return extra pipeline steps (e.g. OpikStep) or None."""
217+
if self._opik_step is not None:
218+
return [self._opik_step]
219+
return None
220+
179221
def _get_ace(self, environment: TaskEnvironment | None = None) -> ACE:
180222
"""Return (or build) the cached ACE runner."""
181223
env = environment or self.environment
@@ -194,6 +236,7 @@ def _get_ace(self, environment: TaskEnvironment | None = None) -> ACE:
194236
dedup_interval=self._dedup_interval,
195237
checkpoint_dir=self._checkpoint_dir,
196238
checkpoint_interval=self._checkpoint_interval,
239+
extra_steps=self._get_extra_steps(),
197240
)
198241
return self._ace
199242

@@ -208,6 +251,7 @@ def _get_analyser(self) -> TraceAnalyser:
208251
dedup_interval=self._dedup_interval,
209252
checkpoint_dir=self._checkpoint_dir,
210253
checkpoint_interval=self._checkpoint_interval,
254+
extra_steps=self._get_extra_steps(),
211255
)
212256
return self._analyser
213257

ace_next/runners/trace_analyser.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
from typing import Any
88

99
from pipeline import Pipeline
10-
from pipeline.protocol import SampleResult
10+
from pipeline.protocol import SampleResult, StepProtocol
1111

1212
from ..core.context import ACEStepContext, SkillbookView
1313
from ..protocols import (
@@ -47,6 +47,7 @@ def from_roles(
4747
dedup_interval: int = 10,
4848
checkpoint_dir: str | Path | None = None,
4949
checkpoint_interval: int = 10,
50+
extra_steps: list[StepProtocol] | None = None,
5051
) -> TraceAnalyser:
5152
"""Construct from pre-built role instances.
5253
@@ -60,6 +61,8 @@ def from_roles(
6061
checkpoint_dir: Directory for checkpoint files. Appends a
6162
``CheckpointStep`` when provided.
6263
checkpoint_interval: Samples between checkpoint saves.
64+
extra_steps: Additional steps appended after the learning
65+
tail (e.g. ``OpikStep``).
6366
"""
6467
skillbook = skillbook or Skillbook()
6568
steps = learning_tail(
@@ -71,6 +74,8 @@ def from_roles(
7174
checkpoint_dir=checkpoint_dir,
7275
checkpoint_interval=checkpoint_interval,
7376
)
77+
if extra_steps:
78+
steps.extend(extra_steps)
7479
return cls(pipeline=Pipeline(steps), skillbook=skillbook)
7580

7681
def run(

ace_next/steps/opik.py

Lines changed: 53 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,30 @@
22
33
Terminal side-effect step that creates an Opik trace per sample with
44
pipeline metadata, agent output, reflection insights, and skill
5-
manager operations. Optionally registers the LiteLLM OpikLogger
6-
callback for per-LLM-call token/cost tracking.
5+
manager operations.
76
87
Place at the end of the pipeline (after ApplyStep). Gracefully
98
degrades to a no-op when Opik is not installed or is disabled.
9+
10+
**Explicit opt-in only** — constructing an ``OpikStep`` is the
11+
opt-in signal. Opik is never auto-enabled just because the package
12+
is installed.
13+
14+
Two independent tracing modes:
15+
16+
1. **Pipeline step** (this class) — client-agnostic, reads
17+
``ACEStepContext`` fields and creates one Opik trace per sample.
18+
2. **LiteLLM callback** (``register_opik_litellm_callback``) —
19+
LiteLLM-specific, registers ``OpikLogger`` on
20+
``litellm.callbacks`` for per-LLM-call token/cost tracking.
21+
Call separately when needed; ``OpikStep`` does NOT register it.
1022
"""
1123

1224
from __future__ import annotations
1325

1426
import logging
1527
import os
28+
import warnings
1629
from typing import Any
1730

1831
from ..core.context import ACEStepContext
@@ -44,7 +57,8 @@ def register_opik_litellm_callback(
4457
"""Register ``OpikLogger`` on ``litellm.callbacks`` for token/cost tracking.
4558
4659
Standalone helper — call this when you want LiteLLM-level Opik
47-
tracing without adding ``OpikStep`` to the pipeline.
60+
tracing. This is **separate** from ``OpikStep`` (pipeline-level
61+
tracing) and must be called explicitly.
4862
4963
Returns ``True`` if the callback was successfully registered.
5064
"""
@@ -54,10 +68,28 @@ def register_opik_litellm_callback(
5468
import litellm
5569
from litellm.integrations.opik.opik import OpikLogger
5670

57-
opik_logger = OpikLogger(project_name=project_name)
71+
# OpikLogger.__init__ calls asyncio.create_task() which spews
72+
# ERROR logs + RuntimeWarnings when no event loop is running.
73+
# Suppress only those specific messages during init.
74+
class _AsyncInitFilter(logging.Filter):
75+
def filter(self, record: logging.LogRecord) -> bool:
76+
return "Asynchronous processing not initialized" not in record.getMessage()
77+
78+
_ll = logging.getLogger("LiteLLM")
79+
_f = _AsyncInitFilter()
80+
_ll.addFilter(_f)
81+
try:
82+
with warnings.catch_warnings():
83+
warnings.filterwarnings(
84+
"ignore",
85+
message="coroutine.*periodic_flush.*never awaited",
86+
category=RuntimeWarning,
87+
)
88+
opik_logger = OpikLogger(project_name=project_name)
89+
finally:
90+
_ll.removeFilter(_f)
5891
already = any(
59-
getattr(cb, "__class__", None) and cb.__class__.__name__.lower() == "opik"
60-
for cb in getattr(litellm, "callbacks", [])
92+
isinstance(cb, OpikLogger) for cb in getattr(litellm, "callbacks", [])
6193
)
6294
if not already:
6395
litellm.callbacks.append(opik_logger)
@@ -77,11 +109,13 @@ class OpikStep:
77109
Pure side-effect step — reads context fields and creates an Opik
78110
trace per sample. Never mutates the context.
79111
112+
**Does NOT register the LiteLLM callback.** Call
113+
``register_opik_litellm_callback()`` separately if you also want
114+
per-LLM-call token/cost tracking.
115+
80116
Args:
81117
project_name: Opik project name.
82118
tags: Tags applied to every trace.
83-
register_litellm_callback: Also register ``OpikLogger`` on
84-
``litellm.callbacks`` for per-LLM-call token/cost tracking.
85119
"""
86120

87121
requires = frozenset({"skillbook"})
@@ -91,7 +125,6 @@ def __init__(
91125
self,
92126
project_name: str = "ace-framework",
93127
tags: list[str] | None = None,
94-
register_litellm_callback: bool = True,
95128
) -> None:
96129
self.project_name = project_name
97130
self.tags = tags or ["ace"]
@@ -100,14 +133,21 @@ def __init__(
100133

101134
if self.enabled:
102135
try:
103-
self._client = _opik.Opik(project_name=project_name)
136+
# Pass config explicitly from env vars so we never depend
137+
# on the global ~/.opik.config file.
138+
api_key = os.environ.get("OPIK_API_KEY")
139+
workspace = os.environ.get("OPIK_WORKSPACE")
140+
host = os.environ.get("OPIK_URL_OVERRIDE")
141+
self._client = _opik.Opik(
142+
project_name=project_name,
143+
api_key=api_key or None,
144+
workspace=workspace or None,
145+
host=host or None,
146+
)
104147
except Exception as exc:
105148
logger.debug("OpikStep: failed to create Opik client: %s", exc)
106149
self.enabled = False
107150

108-
if self.enabled and register_litellm_callback:
109-
register_opik_litellm_callback(project_name=project_name)
110-
111151
def __call__(self, ctx: ACEStepContext) -> ACEStepContext:
112152
if not self.enabled:
113153
return ctx

0 commit comments

Comments
 (0)