diff --git a/.gitignore b/.gitignore index 99c30f52f..d6620fd4b 100644 --- a/.gitignore +++ b/.gitignore @@ -21,3 +21,6 @@ dev/ # Results file results.tsv + +# Observability module state (persisted locally, not tracked) +.autoresearch/ diff --git a/analysis.ipynb b/analysis.ipynb index 8455ea4e3..136c7fd8d 100644 --- a/analysis.ipynb +++ b/analysis.ipynb @@ -213,6 +213,48 @@ "print(f\"\\n{'':>4} {hits['delta'].sum():+.6f} {'':>10} TOTAL improvement over baseline\")" ] }, + { + "cell_type": "markdown", + "id": "d9amjr8fb09", + "source": "## Change Category Effectiveness (Hebbian Memory Analysis)\n\nAnalyzes which types of changes (architecture, learning_rate, optimizer, etc.) tend to produce improvements vs. regressions. Powered by the experiment memory system's auto-tagging and Hebbian association tracking.", + "metadata": {} + }, + { + "cell_type": "code", + "id": "b8d79w8w3rl", + "source": "from memory import ExperimentMemory, CHANGE_CATEGORIES\n\n# Auto-tag each experiment from its description and build category stats\nmemory = ExperimentMemory()\nvalid_df = df[df[\"status\"] != \"CRASH\"].copy()\nvalid_df = valid_df.reset_index(drop=True)\n\n# Compute delta_bpb for each experiment (vs previous kept baseline)\nbaseline = valid_df.iloc[0][\"val_bpb\"]\ncurrent_best = baseline\nfor i, row in valid_df.iterrows():\n delta = row[\"val_bpb\"] - current_best\n memory.store_experiment(\n commit=str(row.get(\"commit\", f\"exp_{i}\"))[:7],\n description=str(row[\"description\"]),\n val_bpb=row[\"val_bpb\"],\n delta_bpb=delta,\n status=row[\"status\"].lower(),\n peak_vram_mb=row.get(\"memory_gb\", 0) * 1024,\n )\n if row[\"status\"] == \"KEEP\":\n current_best = row[\"val_bpb\"]\n\n# Get Hebbian associations\nassociations = memory.get_associations()\nif associations:\n assoc_df = pd.DataFrame(associations)\n assoc_df = assoc_df.sort_values(\"weight\", ascending=True)\n\n fig, axes = plt.subplots(1, 2, figsize=(16, max(6, len(assoc_df) * 0.4)))\n\n # Left: Hebbian weight (bar chart)\n colors = [\"#2ecc71\" if w > 0 else \"#e74c3c\" for w in assoc_df[\"weight\"]]\n axes[0].barh(assoc_df[\"category\"], assoc_df[\"weight\"], color=colors, edgecolor=\"black\", linewidth=0.3)\n axes[0].set_xlabel(\"Hebbian Weight (positive = promising)\")\n axes[0].set_title(\"Change Category Associations\")\n axes[0].axvline(x=0, color=\"black\", linewidth=0.5)\n axes[0].grid(True, alpha=0.2, axis=\"x\")\n\n # Right: Success rate (bar chart)\n axes[1].barh(assoc_df[\"category\"], assoc_df[\"success_rate\"], color=\"#3498db\", edgecolor=\"black\", linewidth=0.3)\n axes[1].set_xlabel(\"Success Rate (fraction of experiments kept)\")\n axes[1].set_title(\"Keep Rate by Category\")\n axes[1].set_xlim(0, 1)\n axes[1].grid(True, alpha=0.2, axis=\"x\")\n\n # Annotate with experiment counts\n for idx, row in assoc_df.iterrows():\n axes[1].annotate(f\"n={row['total_experiments']}\", \n (row[\"success_rate\"] + 0.02, row[\"category\"]),\n fontsize=8, va=\"center\")\n\n plt.tight_layout()\n plt.savefig(\"category_effectiveness.png\", dpi=150, bbox_inches=\"tight\")\n plt.show()\n print(\"Saved to category_effectiveness.png\")\nelse:\n print(\"No experiment associations found.\")", + "metadata": {}, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "id": "dp12riz23e", + "source": "## Monitoring Dashboard & Anomaly Timeline\n\nLoads session metrics from the monitor and resilience modules (if available) to show experiment velocity, alert timeline, and VRAM pressure trends.", + "metadata": {} + }, + { + "cell_type": "code", + "id": "l5a00nogegs", + "source": "import json\nfrom pathlib import Path\n\n# --- Load monitor session data if available ---\nmetrics_path = Path(\".autoresearch/metrics/session.json\")\nif metrics_path.exists():\n with open(metrics_path) as f:\n metrics = json.load(f)\n\n session = metrics.get(\"session\", {})\n experiments = metrics.get(\"experiments\", [])\n alerts = metrics.get(\"alerts\", [])\n\n print(\"=== MONITOR SESSION SUMMARY ===\")\n print(f\" Duration: {session.get('total_training_seconds', 0) / 3600:.1f} hours training\")\n print(f\" Experiments: {session.get('total_experiments', 0)}\")\n print(f\" Best BPB: {session.get('best_val_bpb', 'N/A')}\")\n print(f\" Velocity: {session.get('improvement_velocity', 0):.6f} BPB/hour\")\n print()\n\n if experiments:\n # Plot VRAM usage over experiments\n vram_data = [(i, e[\"peak_vram_mb\"]) for i, e in enumerate(experiments)\n if e.get(\"peak_vram_mb\", 0) > 0]\n if vram_data:\n fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 5))\n\n exp_nums, vram_vals = zip(*vram_data)\n ax1.plot(exp_nums, [v / 1024 for v in vram_vals], \"o-\", color=\"#e67e22\", markersize=4)\n ax1.set_xlabel(\"Experiment #\")\n ax1.set_ylabel(\"Peak VRAM (GB)\")\n ax1.set_title(\"VRAM Usage Trend\")\n ax1.grid(True, alpha=0.2)\n\n # Plot experiment duration\n durations = [(i, e[\"training_seconds\"]) for i, e in enumerate(experiments)\n if e.get(\"training_seconds\", 0) > 0]\n if durations:\n exp_nums_d, dur_vals = zip(*durations)\n ax2.bar(exp_nums_d, dur_vals, color=\"#9b59b6\", alpha=0.7)\n ax2.axhline(y=300, color=\"red\", linestyle=\"--\", alpha=0.5, label=\"5min budget\")\n ax2.set_xlabel(\"Experiment #\")\n ax2.set_ylabel(\"Training Seconds\")\n ax2.set_title(\"Training Duration per Experiment\")\n ax2.legend()\n ax2.grid(True, alpha=0.2)\n\n plt.tight_layout()\n plt.savefig(\"monitoring_dashboard.png\", dpi=150, bbox_inches=\"tight\")\n plt.show()\n print(\"Saved to monitoring_dashboard.png\")\n\n # Alert timeline\n if alerts:\n print(f\"\\n=== ALERTS ({len(alerts)} total) ===\")\n for a in alerts[-15:]:\n sev = a.get(\"severity\", \"?\")\n icon = {\"info\": \"[i]\", \"warning\": \"[!]\", \"critical\": \"[X]\"}.get(sev, \"[?]\")\n cat = a.get(\"category\", \"?\")\n msg = a.get(\"message\", \"?\")\n print(f\" {icon} [{cat}] {msg}\")\nelse:\n print(\"No monitor session data found at .autoresearch/metrics/session.json\")\n print(\"Run experiments with monitor.py integration to generate session data.\")", + "metadata": {}, + "execution_count": null, + "outputs": [] + }, + { + "cell_type": "markdown", + "id": "ibv01zaj1z", + "source": "## Guidance Report\n\nGenerates an experiment guidance report using the advisor module, showing suggested next experiments, contradictions, and strategy assessment.", + "metadata": {} + }, + { + "cell_type": "code", + "id": "l80c9cwb87d", + "source": "from guidance import ExperimentAdvisor\n\n# Build advisor from the memory we populated above\nadvisor = ExperimentAdvisor(memory=memory)\nguidance = advisor.get_guidance()\n\n# Print the formatted guidance report\nprint(guidance[\"formatted\"])\n\n# Show top suggestions as a table\nif guidance[\"suggestions\"]:\n sugg_df = pd.DataFrame(guidance[\"suggestions\"])\n print(\"\\nSuggestion details:\")\n print(sugg_df[[\"priority\", \"category\", \"confidence\", \"description\"]].to_string(index=False))\n\n# Show contradictions if any\nif guidance[\"contradictions\"]:\n print(f\"\\n{len(guidance['contradictions'])} contradictions found:\")\n for c in guidance[\"contradictions\"]:\n print(f\" [{c['description']}]\")\n print(f\" {c['explanation']}\")", + "metadata": {}, + "execution_count": null, + "outputs": [] + }, { "cell_type": "code", "execution_count": null, @@ -243,4 +285,4 @@ }, "nbformat": 4, "nbformat_minor": 5 -} +} \ No newline at end of file diff --git a/docs/AGENT_HANDOFF.md b/docs/AGENT_HANDOFF.md new file mode 100644 index 000000000..d83b59d97 --- /dev/null +++ b/docs/AGENT_HANDOFF.md @@ -0,0 +1,191 @@ +# Agent Handoff: autoresearch Observability & Intelligence Enhancement + +**Date:** 2026-03-18 +**Author:** reh3376 +**Branch:** `feat/mdemg-observability-and-memory` +**PR:** https://github.com/karpathy/autoresearch/pull/329 +**Target:** `karpathy/autoresearch:master` + +--- + +## Project Context + +**autoresearch** is Andrej Karpathy's framework for autonomous AI-driven LLM hyperparameter research. An AI agent modifies `train.py`, runs 5-minute GPU experiments, and keeps improvements — compounding gains overnight with zero human intervention. + +This work adds production-grade observability, learning, and resilience infrastructure inspired by **mdemg** — a persistent memory system for AI coding agents built on Neo4j, Hebbian learning, and Prometheus metrics. + +--- + +## Work Completed + +### Research & Analysis Phase + +1. **Deep dive into mdemg codebase** (~165K LOC, Go) + - Analyzed 38 internal packages across monitoring, learning, resilience, and autonomy + - Mapped 4 git submodules (homebrew, windows, menubar, autoresearch) + - Identified key patterns: Hebbian learning with tanh soft-capping, circuit breakers with half-open recovery, Jiminy proactive guidance with 4-source parallel fan-out, CMS surprise-weighted storage, RSIC self-improvement cycle + +2. **Deep dive into autoresearch codebase** + - Full analysis of `prepare.py` (read-only data/eval), `train.py` (agent-editable model/optimizer), `program.md` (agent instructions) + - Documented GPT architecture (RoPE, Flash Attention 3, MuonAdamW dual optimizer, value embeddings, sliding window attention) + - Mapped the complete experiment lifecycle and agent autonomy constraints + +3. **Gap analysis: mdemg patterns applicable to autoresearch** + - Identified that autoresearch had no monitoring, no cross-session memory, no anomaly detection, no learning from experiment patterns, and only basic crash handling (NaN/loss > 100) + +### Implementation Phase + +Four new Python modules created (2,193 LOC total, zero new dependencies): + +| Module | Lines | mdemg Inspiration | Key Classes | +|--------|-------|-------------------|-------------| +| `monitor.py` | 535 | `internal/metrics/` | `ExperimentTracker`, `AlertThresholds`, `format_dashboard()` | +| `memory.py` | 566 | `internal/conversation/`, `internal/learning/` | `ExperimentMemory`, `HebbianAssociation`, `HebbianConfig` | +| `resilience.py` | 571 | `internal/circuitbreaker/`, `internal/anomaly/`, `internal/backpressure/` | `CircuitBreaker`, `AnomalyDetector`, `BackpressureMonitor`, `ExperimentGuard` | +| `guidance.py` | 521 | `internal/jiminy/`, `internal/ape/` | `ExperimentAdvisor`, `StrategyAssessment`, `ExperimentSuggestion` | + +### Documentation & Integration Phase + +- **program.md**: +205 lines with full module documentation, usage examples, env var reference, and recommended integration pattern +- **analysis.ipynb**: +6 cells for Hebbian category effectiveness charts, monitoring dashboard with VRAM trends, and guidance report generation +- **.gitignore**: added `.autoresearch/` state directory exclusion + +### Key Technical Decisions Made + +1. **Zero new dependencies** — all modules use Python stdlib only (json, time, math, dataclasses) +2. **Non-intrusive design** — modules never modify train.py or prepare.py; they observe and advise +3. **Opt-in architecture** — each module is independently usable +4. **Tanh soft-capping** for Hebbian weights (from mdemg) instead of hard clamping — continuous learning without saturation walls +5. **Circuit breaker with exponential backoff** — CLOSED → OPEN → HALF_OPEN state machine with configurable cooldown multiplier + +--- + +## Suggested Future Work + +### High Priority + +#### 1. Train.py Integration Hook +The modules are implemented but not yet wired into `train.py`. A lightweight integration wrapper that the agent can optionally call from the training loop would close the gap: + +```python +# Suggested: add to train.py training loop +from monitor import ExperimentTracker +tracker = ExperimentTracker() +# ... call tracker.record_step() each step +``` + +This should remain optional — the agent decides whether to use it based on program.md guidance. + +#### 2. Automated Test Suite +The modules have no unit tests yet. Recommended coverage: + +- `test_monitor.py`: ExperimentTracker lifecycle, alert threshold triggering, Prometheus text format validation, JSON export round-trip +- `test_memory.py`: Hebbian weight updates (positive/negative signals), tanh soft-capping bounds, temporal decay, auto-tagging accuracy, persistence round-trip +- `test_resilience.py`: Circuit breaker state transitions (CLOSED→OPEN→HALF_OPEN→CLOSED), anomaly detection for each pattern (plateau, VRAM creep, regression, crash cluster), backpressure levels +- `test_guidance.py`: Suggestion ranking, contradiction detection, strategy phase classification + +#### 3. Agent Instruction Enhancement for Module Usage +Update `program.md` to make the agent more opinionated about *when* to consult guidance vs. when to just run experiments. Current docs explain *how* but not *when* — e.g., "consult guidance every 5th experiment" or "check guidance after 3 consecutive discards." + +### Medium Priority + +#### 4. Loss Curve Comparison Visualization +The monitor captures per-step loss curves but analysis.ipynb doesn't yet overlay them for comparison. A cell that overlays the last N experiments' loss curves (color-coded by keep/discard) would help identify training dynamic patterns. + +#### 5. Experiment Embedding Space +Rather than keyword auto-tagging, compute semantic embeddings of experiment descriptions (using the existing tokenizer or a lightweight model) and cluster experiments in embedding space. This would: +- Improve category assignment accuracy +- Reveal unexpected category structure +- Enable similarity-based "try something like experiment X" suggestions + +#### 6. Multi-GPU / Multi-Agent Coordination +autoresearch currently runs on a single GPU. The monitoring and memory modules could be extended with: +- Shared memory store (SQLite or shared JSON) for multiple agents exploring in parallel +- Distributed circuit breaker (aggregate crash rates across agents) +- Cross-agent contradiction detection (agent A found X helps, agent B found X hurts) + +#### 7. Prometheus + Grafana Dashboard Template +The `get_prometheus_text()` method is implemented but there's no accompanying Grafana dashboard JSON. A pre-built dashboard template with panels for: +- Experiment throughput (experiments/hour) +- BPB frontier progression +- VRAM pressure gauge +- Alert timeline +- Category effectiveness heatmap + +Would make the monitoring immediately actionable for teams running autoresearch at scale. + +### Lower Priority / Exploratory + +#### 8. RSIC Full Implementation +The current guidance module implements only the *assess* phase of mdemg's RSIC (Recursive Self-Improvement Cycle). The full cycle is: + +- **Assess** (implemented): evaluate strategy effectiveness +- **Reflect** (not implemented): identify why certain experiments succeeded/failed +- **Plan** (not implemented): generate a multi-experiment research plan +- **Speculate** (not implemented): predict outcomes of proposed experiments +- **Execute** (not implemented): autonomous plan execution with rollback + +Full RSIC would make the agent significantly more strategic, moving from reactive (try something → evaluate) to proactive (plan a research trajectory → execute → adjust). + +#### 9. Hebbian Network Visualization +The memory module stores pairwise associations between change categories but doesn't visualize the network. A graph visualization (networkx or d3.js) showing: +- Nodes = change categories, sized by experiment count +- Edges = co-occurrence in experiments, colored by joint success rate +- Node color = Hebbian weight (green = promising, red = dead end) + +Would provide intuitive insight into the experiment search landscape. + +#### 10. Historical Experiment Replay +Import historical `results.tsv` files into the memory system to bootstrap Hebbian associations for new sessions. This would let the agent start with learned priors rather than exploring from scratch — analogous to mdemg's space transfer feature. + +#### 11. Adaptive Alert Thresholds +Current alert thresholds are static (configurable via env vars but fixed during a session). Implementing adaptive thresholds that learn from the experiment distribution — e.g., loss spike threshold based on rolling loss variance — would reduce false positives in early (volatile) vs. late (stable) experiment phases. + +--- + +## Architecture Reference + +``` +autoresearch/ +├── train.py # Agent-modifiable (unchanged by this work) +├── prepare.py # Read-only data/eval (unchanged) +├── program.md # Agent instructions (updated with module docs) +├── analysis.ipynb # Post-hoc analysis (enhanced with new cells) +├── monitor.py # NEW: metrics, alerting, Prometheus export +├── memory.py # NEW: Hebbian learning, cross-session memory +├── resilience.py # NEW: circuit breakers, anomaly detection +├── guidance.py # NEW: proactive suggestions, strategy assessment +├── .gitignore # Updated: excludes .autoresearch/ +├── .autoresearch/ # Runtime state (gitignored) +│ ├── metrics/ # monitor session data + JSON exports +│ ├── memory/ # experiment memory + Hebbian associations +│ └── resilience/ # circuit breaker + anomaly state +└── docs/ + └── AGENT_HANDOFF.md # This file +``` + +## Module Dependency Graph + +``` +guidance.py ──depends on──> memory.py (Hebbian associations) + ──depends on──> monitor.py (session stats, alerts) + ──depends on──> resilience.py (safety status) + +resilience.py (standalone — no module dependencies) +monitor.py (standalone — no module dependencies) +memory.py (standalone — no module dependencies) +``` + +All four modules are independently usable. `guidance.py` optionally integrates with the other three but gracefully handles `None` for any missing module. + +--- + +## Key Files for Onboarding + +If picking up this work, read in this order: + +1. `program.md` — the "Observability & Intelligence Modules" section at the bottom +2. `monitor.py` — simplest module, good entry point for understanding the pattern +3. `memory.py` — core Hebbian learning logic, most novel contribution +4. `resilience.py` — circuit breaker state machine, anomaly detection patterns +5. `guidance.py` — synthesis layer, depends on understanding the other three diff --git a/guidance.py b/guidance.py new file mode 100644 index 000000000..4d288c6da --- /dev/null +++ b/guidance.py @@ -0,0 +1,521 @@ +""" +Proactive experiment suggestion engine for autoresearch. + +Inspired by mdemg's Jiminy inner voice system and RSIC (Recursive +Self-Improvement Cycle), this module provides: + +- Experiment suggestions based on Hebbian memory associations +- Plateau detection and radical change recommendations +- Frontier detection (unexplored hyperparameter regions) +- Contradiction surfacing (conflicting results that need investigation) +- Meta-cognitive assessment of experiment strategy effectiveness + +Architecture from mdemg's Jiminy: + Jiminy fires on every prompt with 4 parallel knowledge sources + (consulting suggestions, correction vectors, contradiction edges, + frontier detection) merged with a 6-second timeout. Results are + deduplicated, confidence-filtered, and injected into the agent's + context. + +Here we adapt that pattern for experiment guidance: before each +experiment, the advisor assembles context from memory, resilience +status, and experiment history to suggest the most promising next +experiment. + +From mdemg's RSIC: + The Recursive Self-Improvement Cycle runs: assess -> reflect -> + plan -> speculate -> execute. We simplify this to: + assess (how are we doing?) -> suggest (what to try next?) -> + with optional radical mode when plateaued. + +Usage: + from memory import ExperimentMemory + from monitor import ExperimentTracker + from resilience import ExperimentGuard + + advisor = ExperimentAdvisor(memory, tracker, guard) + guidance = advisor.get_guidance() + print(guidance["formatted"]) +""" + +import time +from dataclasses import dataclass, field +from typing import Optional + +# Import types for type hints (these modules are in the same package) +# At runtime, actual instances are passed to the constructor +from typing import TYPE_CHECKING +if TYPE_CHECKING: + from memory import ExperimentMemory + from monitor import ExperimentTracker + from resilience import ExperimentGuard + + +# --------------------------------------------------------------------------- +# Suggestion types +# --------------------------------------------------------------------------- + +@dataclass +class ExperimentSuggestion: + """A suggested next experiment with rationale.""" + category: str # change category tag + description: str # human-readable suggestion + rationale: str # why this is suggested + confidence: float # 0-1, how confident we are this will help + priority: int # 1=highest priority + source: str # where this suggestion came from + + +@dataclass +class Contradiction: + """A pair of conflicting experiment results.""" + description: str + experiment_a: str # description of first experiment + experiment_b: str # description of second experiment + bpb_a: float + bpb_b: float + explanation: str # why these conflict + + +@dataclass +class StrategyAssessment: + """Meta-cognitive assessment of the experiment strategy. + + Inspired by mdemg's RSIC assess phase which evaluates the + effectiveness of the current approach and recommends adjustments. + """ + phase: str # exploring | exploiting | plateaued | recovering + effectiveness: float # 0-1, how well the current strategy is working + total_experiments: int + improvements_found: int + improvement_rate: float + velocity_trend: str # accelerating | stable | decelerating | stalled + recommendation: str # high-level strategy recommendation + + +# --------------------------------------------------------------------------- +# ExperimentAdvisor +# --------------------------------------------------------------------------- + +class ExperimentAdvisor: + """Proactive guidance engine for autonomous experiment research. + + Assembles context from memory (Hebbian associations, experiment + history), monitoring (metrics, alerts), and resilience (circuit + breaker, anomalies) to suggest the most promising next experiment. + + Inspired by mdemg's Jiminy inner voice which: + 1. Fans out to 4 knowledge sources in parallel + 2. Merges results with deduplication + 3. Filters by confidence threshold + 4. Formats for injection into agent context + + Here we adapt that to experiment guidance: + 1. Memory associations -> promising categories + 2. Experiment history -> what's been tried recently + 3. Anomaly status -> safety constraints + 4. Frontier analysis -> unexplored regions + 5. Contradiction detection -> conflicting results + """ + + def __init__(self, memory: Optional["ExperimentMemory"] = None, + tracker: Optional["ExperimentTracker"] = None, + guard: Optional["ExperimentGuard"] = None): + self.memory = memory + self.tracker = tracker + self.guard = guard + + def get_guidance(self) -> dict: + """Generate comprehensive guidance for the next experiment. + + This is the main entry point. Returns a dict with: + - suggestions: ranked list of experiment suggestions + - contradictions: conflicting results to investigate + - assessment: meta-cognitive strategy assessment + - warnings: safety warnings from resilience systems + - formatted: human-readable formatted guidance string + + Inspired by mdemg's Jiminy which assembles guidance from + multiple sources and formats it for injection into prompts. + """ + suggestions = self._generate_suggestions() + contradictions = self._detect_contradictions() + assessment = self._assess_strategy() + warnings = self._collect_warnings() + + guidance = { + "suggestions": [ + { + "category": s.category, + "description": s.description, + "rationale": s.rationale, + "confidence": s.confidence, + "priority": s.priority, + } + for s in suggestions + ], + "contradictions": [ + { + "description": c.description, + "experiment_a": c.experiment_a, + "experiment_b": c.experiment_b, + "explanation": c.explanation, + } + for c in contradictions + ], + "assessment": { + "phase": assessment.phase, + "effectiveness": assessment.effectiveness, + "velocity_trend": assessment.velocity_trend, + "recommendation": assessment.recommendation, + }, + "warnings": warnings, + "formatted": self._format_guidance( + suggestions, contradictions, assessment, warnings + ), + "generated_at": time.time(), + } + return guidance + + # -- Suggestion generation (Jiminy source 1: consulting) ---------------- + + def _generate_suggestions(self) -> list[ExperimentSuggestion]: + """Generate ranked experiment suggestions. + + Combines multiple signal sources: + 1. Hebbian associations from memory (what has worked) + 2. Unexplored frontiers (what hasn't been tried) + 3. Dead end avoidance (what to skip) + 4. Plateau breakers (radical changes when stuck) + """ + suggestions = [] + priority = 1 + + # Source 1: Promising directions from Hebbian memory + if self.memory: + promising = self.memory.get_promising_directions(top_k=3) + for p in promising: + cat = p["category"] + score = p["score"] + reason = p["reason"] + + if p["experiments"] == 0: + desc = f"Try a {cat} change — this category is unexplored" + rationale = "Frontier: no experiments in this category yet" + else: + desc = f"Try a {cat} change — historically effective" + rationale = f"Hebbian association: {reason}" + + suggestions.append(ExperimentSuggestion( + category=cat, description=desc, + rationale=rationale, confidence=min(score, 1.0), + priority=priority, source="hebbian_memory", + )) + priority += 1 + + # Source 2: Plateau breakers + if self.memory: + plateau = self.memory.get_plateaus() + if plateau.get("plateau_detected"): + suggestions.append(ExperimentSuggestion( + category="radical", + description="Try a radical architectural change to break plateau", + rationale=plateau.get("reason", "No improvements recently"), + confidence=0.3, + priority=priority, + source="plateau_detection", + )) + priority += 1 + + # Also suggest combining prior improvements + suggestions.append(ExperimentSuggestion( + category="combination", + description="Combine the top 2-3 near-miss experiments", + rationale="Near-misses may compound when combined", + confidence=0.4, + priority=priority, + source="plateau_detection", + )) + priority += 1 + + # Source 3: Surprise-driven exploration + if self.memory: + surprises = self.memory.get_surprise_highlights(top_k=2) + for s in surprises: + if s["status"] == "discard" and s["surprise_score"] > 1.5: + # Surprisingly bad result — the inverse might work + suggestions.append(ExperimentSuggestion( + category=s["change_tags"][0] if s["change_tags"] else "misc", + description=f"Revisit '{s['description']}' — " + f"surprisingly bad result, try the opposite", + rationale=f"Surprise score {s['surprise_score']:.2f} — " + f"high surprise suggests unexplored dynamics", + confidence=0.35, + priority=priority, + source="surprise_analysis", + )) + priority += 1 + + # Source 4: Dead end warnings (negative suggestions) + if self.memory: + dead_ends = self.memory.get_dead_ends() + for de in dead_ends[:2]: + suggestions.append(ExperimentSuggestion( + category=de["category"], + description=f"AVOID {de['category']} changes — " + f"consistently unpromising", + rationale=f"weight={de['weight']:.3f}, " + f"success={de['success_rate']:.0%} over " + f"{de['experiments']} experiments", + confidence=min(de["experiments"] / 10, 1.0), + priority=99, # Low priority (avoidance, not action) + source="dead_end_detection", + )) + + # Sort by priority + suggestions.sort(key=lambda s: s.priority) + return suggestions + + # -- Contradiction detection (Jiminy source 3) -------------------------- + + def _detect_contradictions(self) -> list[Contradiction]: + """Detect contradicting experiment results. + + Finds pairs of experiments where similar changes produced + opposite outcomes. These contradictions suggest that the + outcome depends on context (other hyperparameters, model + state, etc.) and warrant investigation. + + Inspired by mdemg's CONTRADICTS edges which link nodes + with conflicting information. + """ + contradictions = [] + if not self.memory: + return contradictions + + history = self.memory.get_experiment_history(last_n=50) + if len(history) < 4: + return contradictions + + # Group experiments by their primary change tag + by_tag: dict[str, list[dict]] = {} + for exp in history: + if exp.get("status") == "crash": + continue + for tag in exp.get("change_tags", []): + by_tag.setdefault(tag, []).append(exp) + + # Find contradictions: same category, opposite outcomes + for tag, exps in by_tag.items(): + kept = [e for e in exps if e.get("status") == "keep"] + discarded = [e for e in exps if e.get("status") == "discard"] + + if kept and discarded: + # Find the best kept and worst discarded + best_kept = min(kept, key=lambda e: e.get("val_bpb", float("inf"))) + worst_disc = max(discarded, key=lambda e: e.get("val_bpb", 0)) + + if (worst_disc.get("val_bpb", 0) > + best_kept.get("val_bpb", float("inf")) + 0.001): + contradictions.append(Contradiction( + description=f"Contradicting results for '{tag}' changes", + experiment_a=best_kept.get("description", "?"), + experiment_b=worst_disc.get("description", "?"), + bpb_a=best_kept.get("val_bpb", 0), + bpb_b=worst_disc.get("val_bpb", 0), + explanation=( + f"Same category '{tag}' produced both improvement " + f"({best_kept.get('val_bpb', 0):.6f}) and regression " + f"({worst_disc.get('val_bpb', 0):.6f}). " + f"The outcome likely depends on context — investigate " + f"what differs between these experiments." + ), + )) + + return contradictions[:5] # Limit to top 5 contradictions + + # -- Strategy assessment (RSIC assess phase) ---------------------------- + + def _assess_strategy(self) -> StrategyAssessment: + """Meta-cognitive assessment of experiment strategy. + + Evaluates the overall effectiveness of the current research + approach and recommends strategic adjustments. + + Inspired by mdemg's RSIC which cycles through: + assess -> reflect -> plan -> speculate -> execute + + Here we implement the assess phase, providing the agent with + a high-level understanding of where it stands. + """ + total = 0 + improvements = 0 + recent_improvements = 0 + velocity_trend = "stable" + + if self.tracker: + summary = self.tracker.get_summary() + total = summary.get("total_experiments", 0) + improvements = summary.get("kept", 0) + + # Determine phase + if total < 5: + phase = "exploring" + recommendation = ("Early exploration phase — try diverse changes " + "across different categories to map the landscape") + elif self.memory: + plateau = self.memory.get_plateaus() + if plateau.get("plateau_detected"): + phase = "plateaued" + recommendation = ("Plateau detected — shift to radical changes: " + "different architectures, unusual hyperparameter " + "ranges, or combine multiple near-miss improvements") + elif improvements / max(total, 1) > 0.25: + phase = "exploiting" + recommendation = ("High hit rate — continue exploiting promising " + "directions, try finer-grained variations of " + "what's working") + else: + phase = "exploring" + recommendation = ("Low hit rate — broaden search to new categories, " + "try changes you haven't attempted yet") + else: + phase = "exploring" + recommendation = "Continue exploring — no memory system active" + + # Improvement rate + improvement_rate = improvements / max(total, 1) + + # Velocity trend from tracker + if self.tracker and len(self.tracker.experiments) >= 10: + recent = self.tracker.experiments[-10:] + earlier = self.tracker.experiments[-20:-10] if len( + self.tracker.experiments) >= 20 else [] + + recent_keeps = sum(1 for e in recent if e.status == "keep") + if earlier: + earlier_keeps = sum(1 for e in earlier if e.status == "keep") + if recent_keeps > earlier_keeps: + velocity_trend = "accelerating" + elif recent_keeps < earlier_keeps: + velocity_trend = "decelerating" + if recent_keeps == 0 and earlier_keeps == 0: + velocity_trend = "stalled" + recent_improvements = recent_keeps + + # Effectiveness score (0-1) + effectiveness = min(improvement_rate * 2, 1.0) # 50% keep rate = 1.0 + if velocity_trend == "stalled": + effectiveness *= 0.5 + elif velocity_trend == "decelerating": + effectiveness *= 0.75 + + return StrategyAssessment( + phase=phase, + effectiveness=round(effectiveness, 3), + total_experiments=total, + improvements_found=improvements, + improvement_rate=round(improvement_rate, 3), + velocity_trend=velocity_trend, + recommendation=recommendation, + ) + + # -- Warning collection ------------------------------------------------- + + def _collect_warnings(self) -> list[str]: + """Collect safety warnings from all resilience systems.""" + warnings = [] + + if self.guard: + status = self.guard.get_status() + cb = status.get("circuit_breaker", {}) + if cb.get("state") != "closed": + warnings.append( + f"Circuit breaker {cb.get('state', '?')} — " + f"{cb.get('consecutive_failures', 0)} consecutive failures") + + bp = status.get("backpressure", {}) + if bp.get("level") in ("warning", "critical"): + warnings.append( + f"VRAM pressure {bp.get('level', '?')} — " + f"{bp.get('utilization', 0):.0%} utilized, " + f"trend: {bp.get('trend', '?')}") + + for anomaly in status.get("recent_anomalies", []): + if anomaly.get("severity") in ("warning", "critical"): + warnings.append(anomaly.get("message", "Unknown anomaly")) + + if self.tracker: + alerts = self.tracker.get_recent_alerts(5) + for alert in alerts: + if alert.get("severity") in ("warning", "critical"): + warnings.append(alert.get("message", "Unknown alert")) + + # Deduplicate + seen = set() + unique = [] + for w in warnings: + if w not in seen: + seen.add(w) + unique.append(w) + + return unique + + # -- Formatting --------------------------------------------------------- + + def _format_guidance(self, suggestions: list[ExperimentSuggestion], + contradictions: list[Contradiction], + assessment: StrategyAssessment, + warnings: list[str]) -> str: + """Format guidance as a human-readable string. + + This formatted output is designed to be injected into the agent's + context before each experiment decision, similar to how mdemg's + Jiminy injects proactive guidance into every Claude prompt. + """ + lines = [ + "=" * 60, + " EXPERIMENT GUIDANCE (auto-generated)", + "=" * 60, + "", + f" Phase: {assessment.phase.upper()}", + f" Effectiveness: {assessment.effectiveness:.0%} " + f"({assessment.improvements_found}/{assessment.total_experiments} kept)", + f" Velocity: {assessment.velocity_trend}", + f" Strategy: {assessment.recommendation}", + ] + + if warnings: + lines += ["", " WARNINGS:"] + for w in warnings: + lines.append(f" [!] {w}") + + if suggestions: + # Separate actionable suggestions from avoidance + actionable = [s for s in suggestions if s.priority < 99] + avoidances = [s for s in suggestions if s.priority >= 99] + + if actionable: + lines += ["", " SUGGESTED NEXT EXPERIMENTS:"] + for s in actionable[:5]: + lines.append( + f" {s.priority}. [{s.category}] {s.description}") + lines.append( + f" Rationale: {s.rationale}") + lines.append( + f" Confidence: {s.confidence:.0%}") + + if avoidances: + lines += ["", " AVOID:"] + for s in avoidances: + lines.append(f" [-] {s.description}") + lines.append(f" {s.rationale}") + + if contradictions: + lines += ["", " CONTRADICTIONS TO INVESTIGATE:"] + for c in contradictions[:3]: + lines.append(f" [?] {c.description}") + lines.append(f" {c.explanation}") + + lines += ["", "=" * 60] + return "\n".join(lines) diff --git a/memory.py b/memory.py new file mode 100644 index 000000000..3f5b9bcf0 --- /dev/null +++ b/memory.py @@ -0,0 +1,566 @@ +""" +Cross-session experiment memory with Hebbian learning for autoresearch. + +Inspired by mdemg's Conversation Memory System (CMS) and Hebbian learning +engine, this module provides: + +- Persistent experiment knowledge base across research sessions +- Hebbian association tracking: strengthens connections between change + categories and positive/negative outcomes +- Temporal decay: older experiments contribute less to association weights, + keeping the system responsive to recent findings +- Surprise-weighted storage: unexpected results (large improvements or + regressions from seemingly minor changes) are weighted more heavily +- Pattern extraction: identifies which categories of changes tend to + improve BPB and which are dead ends + +Architecture note from mdemg: + mdemg uses a multi-layer graph (L0->Ln) with CO_ACTIVATED_WITH edges + that strengthen via Hebbian learning (tanh soft-capping, multi-rate eta). + We adapt this for experiment associations: when a change category leads + to improvement, the association weight is strengthened. When it leads to + regression, the weight is weakened. Temporal decay ensures the system + doesn't get stuck on stale patterns. + +Usage: + memory = ExperimentMemory() + memory.store_experiment( + commit="a1b2c3d", + description="increase matrix LR to 0.06", + val_bpb=0.993, + delta_bpb=-0.004, + status="keep", + change_tags=["learning_rate", "optimizer"], + ) + directions = memory.get_promising_directions() + memory.save() +""" + +import json +import math +import os +import time +from collections import defaultdict +from dataclasses import dataclass, field, asdict +from pathlib import Path +from typing import Optional + + +# --------------------------------------------------------------------------- +# Change category taxonomy +# --------------------------------------------------------------------------- + +# Standardized tags for categorizing experiment changes. The agent (or a +# classifier in guidance.py) assigns one or more of these to each experiment. +CHANGE_CATEGORIES = [ + "architecture", # model depth, width, layer structure + "attention", # attention mechanism changes (heads, windows, etc.) + "activation", # activation function changes (ReLU, GeLU, etc.) + "optimizer", # optimizer algorithm changes + "learning_rate", # learning rate tuning + "schedule", # warmup/cooldown/decay schedule changes + "batch_size", # batch size or gradient accumulation changes + "initialization", # weight initialization changes + "regularization", # weight decay, dropout, etc. + "normalization", # layer norm, RMS norm, etc. + "embedding", # embedding changes (value embeds, positional, etc.) + "numerical", # precision, softcap, numerical stability + "simplification", # code removal or simplification + "combination", # combining multiple prior improvements + "radical", # fundamental architectural departures +] + + +# --------------------------------------------------------------------------- +# Data structures +# --------------------------------------------------------------------------- + +@dataclass +class ExperimentEntry: + """A single experiment stored in memory.""" + commit: str + description: str + val_bpb: float + delta_bpb: float # negative = improvement + status: str # keep | discard | crash + change_tags: list[str] = field(default_factory=list) + timestamp: float = field(default_factory=time.time) + surprise_score: float = 0.0 # how unexpected the result was + peak_vram_mb: float = 0.0 + session_id: str = "" + + +@dataclass +class HebbianAssociation: + """Tracks the association between a change category and outcomes. + + Inspired by mdemg's CO_ACTIVATED_WITH edges which strengthen when + nodes are co-activated and decay over time. Here, "co-activation" + means a change category was present in an experiment that improved BPB. + + Uses tanh soft-capping (from mdemg) instead of hard clamping to allow + continuous learning without a hard saturation wall. + """ + category: str + # Positive outcomes (improvements) + positive_count: int = 0 + positive_total_delta: float = 0.0 # sum of BPB improvements (positive values) + # Negative outcomes (regressions) + negative_count: int = 0 + negative_total_delta: float = 0.0 # sum of BPB regressions (positive values) + # Current association weight (positive = promising, negative = unpromising) + weight: float = 0.0 + # Last update timestamp (for temporal decay) + last_updated: float = field(default_factory=time.time) + # Total experiments involving this category + total_experiments: int = 0 + + +# --------------------------------------------------------------------------- +# Hebbian learning parameters (inspired by mdemg's learning engine) +# --------------------------------------------------------------------------- + +@dataclass +class HebbianConfig: + """Configuration for Hebbian association learning. + + Mirrors mdemg's learning rate configuration with: + - eta: base learning rate for weight updates + - wmax: soft saturation limit (used with tanh capping) + - decay_rate: temporal decay applied to weights over time + - surprise_multiplier: extra weight for surprising results + """ + eta: float = 0.1 # base learning rate + wmax: float = 1.0 # soft saturation limit for tanh capping + decay_rate: float = 0.02 # per-hour decay rate + decay_floor: float = 0.01 # minimum weight magnitude before zeroing + surprise_multiplier: float = 2.0 # bonus for surprising results + cautious_window_hours: float = 1.0 # recently-reinforced edges skip decay + + +# --------------------------------------------------------------------------- +# ExperimentMemory +# --------------------------------------------------------------------------- + +class ExperimentMemory: + """Persistent cross-session experiment knowledge base. + + Stores experiment results, tracks Hebbian associations between change + categories and outcomes, and provides pattern extraction for guiding + future experiments. + + Inspired by mdemg's CMS which provides: + - Surprise-weighted learning (novel info retained longer) + - Observation types (decision, correction, learning, etc.) + - Resume/observe/correct/recall/consolidate APIs + - Cross-session persistence + + Here we adapt those patterns for experiment research: + - Surprise = unexpected BPB delta given the change category's history + - Observation types = keep/discard/crash + - Resume = load from disk at start of new session + - Recall = query associations for promising directions + - Consolidate = extract meta-patterns across experiments + """ + + def __init__(self, memory_dir: str = ".autoresearch/memory", + config: Optional[HebbianConfig] = None): + self.memory_dir = Path(memory_dir) + self.memory_dir.mkdir(parents=True, exist_ok=True) + self.config = config or HebbianConfig() + + self.experiments: list[ExperimentEntry] = [] + self.associations: dict[str, HebbianAssociation] = {} + self.session_id = f"session_{int(time.time())}" + + # Initialize associations for all known categories + for cat in CHANGE_CATEGORIES: + self.associations[cat] = HebbianAssociation(category=cat) + + # Load prior state + self.load() + + # -- Core operations ---------------------------------------------------- + + def store_experiment(self, commit: str, description: str, + val_bpb: float, delta_bpb: float, status: str, + change_tags: Optional[list[str]] = None, + peak_vram_mb: float = 0.0): + """Store an experiment result and update Hebbian associations. + + Args: + commit: Git commit hash (short) + description: What the experiment tried + val_bpb: Validation BPB achieved + delta_bpb: Change from previous best (negative = improvement) + status: keep | discard | crash + change_tags: Categories of changes made + peak_vram_mb: Peak VRAM usage + """ + tags = change_tags or self._auto_tag(description) + + # Compute surprise score + surprise = self._compute_surprise(tags, delta_bpb, status) + + entry = ExperimentEntry( + commit=commit, description=description, + val_bpb=val_bpb, delta_bpb=delta_bpb, status=status, + change_tags=tags, surprise_score=surprise, + peak_vram_mb=peak_vram_mb, session_id=self.session_id, + ) + self.experiments.append(entry) + + # Update Hebbian associations (skip crashes — no signal) + if status != "crash": + self._hebbian_update(tags, delta_bpb, surprise) + + self.save() + + def get_associations(self) -> list[dict]: + """Return all Hebbian associations sorted by weight (most promising first). + + Returns a list of dicts with category, weight, positive/negative counts, + average improvement, and a confidence score based on sample size. + """ + results = [] + for cat, assoc in sorted(self.associations.items(), + key=lambda x: x[1].weight, reverse=True): + if assoc.total_experiments == 0: + continue + avg_improvement = (assoc.positive_total_delta / + max(assoc.positive_count, 1)) + results.append({ + "category": cat, + "weight": round(assoc.weight, 4), + "positive_count": assoc.positive_count, + "negative_count": assoc.negative_count, + "total_experiments": assoc.total_experiments, + "avg_improvement": round(avg_improvement, 6), + "success_rate": round(assoc.positive_count / + max(assoc.total_experiments, 1), 3), + "confidence": min(assoc.total_experiments / 10, 1.0), + }) + return results + + def get_promising_directions(self, top_k: int = 5) -> list[dict]: + """Return the top-K most promising change categories. + + Combines Hebbian weight with confidence (sample size) to rank + categories. Categories with high weight but low confidence are + ranked lower (they need more experiments to be trusted). + + Inspired by mdemg's spreading activation which computes transient + scores per query rather than relying solely on stored weights. + """ + scored = [] + for cat, assoc in self.associations.items(): + if assoc.total_experiments == 0: + # Unexplored categories get a small exploration bonus + scored.append({ + "category": cat, + "score": 0.1, # exploration bonus + "reason": "unexplored — worth trying", + "experiments": 0, + }) + continue + + confidence = min(assoc.total_experiments / 10, 1.0) + # Blend weight with exploration bonus for low-sample categories + exploration_bonus = 0.05 * (1 - confidence) + score = assoc.weight * confidence + exploration_bonus + + if score > 0: + reason = (f"weight={assoc.weight:.3f}, " + f"success_rate={assoc.positive_count}/{assoc.total_experiments}, " + f"confidence={confidence:.1f}") + scored.append({ + "category": cat, + "score": round(score, 4), + "reason": reason, + "experiments": assoc.total_experiments, + }) + + scored.sort(key=lambda x: x["score"], reverse=True) + return scored[:top_k] + + def get_dead_ends(self, min_experiments: int = 3) -> list[dict]: + """Return categories that consistently fail to improve BPB. + + These are categories with negative Hebbian weight and enough + experiments to be statistically meaningful. The agent should + avoid these unless combining with other promising changes. + """ + dead_ends = [] + for cat, assoc in self.associations.items(): + if (assoc.total_experiments >= min_experiments and + assoc.weight < -0.1): + dead_ends.append({ + "category": cat, + "weight": round(assoc.weight, 4), + "experiments": assoc.total_experiments, + "success_rate": round(assoc.positive_count / + max(assoc.total_experiments, 1), 3), + }) + dead_ends.sort(key=lambda x: x["weight"]) + return dead_ends + + def get_plateaus(self, window: int = 10) -> dict: + """Detect improvement plateaus by analyzing recent experiment history. + + Returns analysis of whether improvements are slowing down, + inspired by mdemg's anomaly detection for stale nodes. + """ + if len(self.experiments) < window: + return {"plateau_detected": False, "reason": "insufficient data"} + + recent = self.experiments[-window:] + kept = [e for e in recent if e.status == "keep"] + avg_delta = 0.0 + if kept: + avg_delta = sum(e.delta_bpb for e in kept) / len(kept) + + # Compare to earlier improvement rate + if len(self.experiments) > 2 * window: + earlier = self.experiments[-2 * window:-window] + earlier_kept = [e for e in earlier if e.status == "keep"] + earlier_avg = 0.0 + if earlier_kept: + earlier_avg = sum(e.delta_bpb for e in earlier_kept) / len(earlier_kept) + + velocity_ratio = abs(avg_delta) / max(abs(earlier_avg), 1e-8) + if velocity_ratio < 0.3: + return { + "plateau_detected": True, + "reason": f"improvement velocity dropped to {velocity_ratio:.0%} of earlier rate", + "recent_avg_delta": round(avg_delta, 6), + "earlier_avg_delta": round(earlier_avg, 6), + "kept_in_window": len(kept), + } + + return { + "plateau_detected": len(kept) == 0, + "reason": "no improvements in window" if len(kept) == 0 + else f"{len(kept)} improvements in last {window} experiments", + "recent_avg_delta": round(avg_delta, 6), + "kept_in_window": len(kept), + } + + def decay(self): + """Apply temporal decay to all Hebbian association weights. + + Inspired by mdemg's temporal decay system which uses exponential + weight decay with cautious skipping of recently-reinforced edges. + + Called periodically (e.g., once per hour or after N experiments) + to keep the system responsive to recent findings rather than + being dominated by old results. + """ + now = time.time() + cfg = self.config + for assoc in self.associations.values(): + hours_since_update = (now - assoc.last_updated) / 3600 + # Skip recently reinforced associations (cautious decay) + if hours_since_update < cfg.cautious_window_hours: + continue + # Exponential decay + decay_factor = math.exp(-cfg.decay_rate * hours_since_update) + assoc.weight *= decay_factor + # Floor: zero out negligible weights + if abs(assoc.weight) < cfg.decay_floor: + assoc.weight = 0.0 + self.save() + + def get_experiment_history(self, last_n: int = 20) -> list[dict]: + """Return recent experiment history for context.""" + return [asdict(e) for e in self.experiments[-last_n:]] + + def get_surprise_highlights(self, top_k: int = 5) -> list[dict]: + """Return the most surprising experiments (highest surprise scores). + + Surprise-weighted storage is inspired by mdemg's CMS which + retains novel observations longer than routine ones. Surprising + experiments often reveal important dynamics in the search space. + """ + sorted_exps = sorted(self.experiments, + key=lambda e: e.surprise_score, reverse=True) + return [ + { + "commit": e.commit, + "description": e.description, + "val_bpb": e.val_bpb, + "delta_bpb": round(e.delta_bpb, 6), + "surprise_score": round(e.surprise_score, 4), + "status": e.status, + "change_tags": e.change_tags, + } + for e in sorted_exps[:top_k] + ] + + # -- Persistence -------------------------------------------------------- + + def save(self, path: Optional[str] = None): + """Persist memory state to disk.""" + out = Path(path) if path else self.memory_dir / "memory.json" + data = { + "experiments": [asdict(e) for e in self.experiments], + "associations": {k: asdict(v) for k, v in self.associations.items()}, + "session_id": self.session_id, + "saved_at": time.time(), + } + with open(out, "w") as f: + json.dump(data, f, indent=2, default=str) + + def load(self, path: Optional[str] = None): + """Load memory state from disk.""" + src = Path(path) if path else self.memory_dir / "memory.json" + if not src.exists(): + return + try: + with open(src) as f: + data = json.load(f) + + self.experiments = [ + ExperimentEntry(**e) for e in data.get("experiments", []) + ] + for cat, assoc_data in data.get("associations", {}).items(): + self.associations[cat] = HebbianAssociation(**assoc_data) + except (json.JSONDecodeError, TypeError, KeyError): + pass # Corrupted memory — start fresh + + # -- Internal: Hebbian learning ----------------------------------------- + + def _hebbian_update(self, tags: list[str], delta_bpb: float, + surprise: float): + """Update Hebbian association weights based on experiment outcome. + + Uses tanh soft-capping from mdemg's learning engine: + w_new = wmax * tanh((w_old + eta * signal) / wmax) + + This provides smooth saturation instead of hard clamping, + allowing continuous learning even near the weight limits. + + The learning signal is: + - Positive (strengthening) when BPB improved (delta_bpb < 0) + - Negative (weakening) when BPB regressed (delta_bpb > 0) + - Scaled by surprise for unexpected results + """ + cfg = self.config + now = time.time() + + for tag in tags: + if tag not in self.associations: + self.associations[tag] = HebbianAssociation(category=tag) + + assoc = self.associations[tag] + assoc.total_experiments += 1 + assoc.last_updated = now + + if delta_bpb < 0: + # Improvement: strengthen association + assoc.positive_count += 1 + assoc.positive_total_delta += abs(delta_bpb) + signal = abs(delta_bpb) * 100 # Scale up small BPB deltas + else: + # Regression: weaken association + assoc.negative_count += 1 + assoc.negative_total_delta += abs(delta_bpb) + signal = -abs(delta_bpb) * 100 + + # Apply surprise multiplier + if surprise > 1.0: + signal *= cfg.surprise_multiplier + + # Tanh soft-capped update (mdemg's learning rule) + raw = assoc.weight + cfg.eta * signal + assoc.weight = cfg.wmax * math.tanh(raw / cfg.wmax) + + def _compute_surprise(self, tags: list[str], delta_bpb: float, + status: str) -> float: + """Compute how surprising this experiment result is. + + Surprise is high when: + 1. The outcome contradicts the Hebbian association (e.g., a + "dead end" category produces a big improvement) + 2. The BPB delta is unusually large + 3. A crash occurs in a previously stable category + + Inspired by mdemg's CMS surprise-weighted learning where novel + observations are retained longer than routine ones. + """ + if status == "crash": + return 1.5 # crashes are moderately surprising + + if not tags: + return 1.0 # no prior to compare against + + # Average expected direction from associations + avg_weight = 0.0 + count = 0 + for tag in tags: + if tag in self.associations and self.associations[tag].total_experiments > 0: + avg_weight += self.associations[tag].weight + count += 1 + + if count == 0: + return 1.0 # unknown categories are moderately surprising + + avg_weight /= count + + # Surprise = contradiction between expected and actual + if avg_weight > 0 and delta_bpb > 0: + # Expected improvement, got regression + return 1.5 + abs(delta_bpb) * 100 + elif avg_weight < 0 and delta_bpb < 0: + # Expected regression, got improvement + return 2.0 + abs(delta_bpb) * 100 + else: + # Outcome matched expectation — low surprise + return 0.5 + abs(delta_bpb) * 50 + + def _auto_tag(self, description: str) -> list[str]: + """Automatically tag an experiment based on its description. + + Simple keyword matching to assign change categories. The agent + can also provide explicit tags for more accuracy. + """ + desc_lower = description.lower() + tags = [] + + keyword_map = { + "architecture": ["layer", "depth", "width", "block", "head", + "dim", "model size", "n_layer", "n_embd"], + "attention": ["attention", "window", "flash", "causal", "kv", + "gqa", "sliding", "sssl", "rope", "rotary"], + "activation": ["relu", "gelu", "swish", "silu", "activation", + "squared", "tanh"], + "optimizer": ["optimizer", "muon", "adamw", "adam", "sgd", + "momentum", "nesterov"], + "learning_rate": ["lr", "learning rate", "learning_rate", + "matrix_lr", "embedding_lr", "scalar_lr"], + "schedule": ["warmup", "cooldown", "warmdown", "schedule", + "cosine", "linear decay", "final_lr"], + "batch_size": ["batch", "grad_accum", "accumulation", + "total_batch"], + "initialization": ["init", "initialization", "weight init", + "xavier", "kaiming"], + "regularization": ["decay", "weight_decay", "dropout", + "regulariz"], + "normalization": ["norm", "layernorm", "rmsnorm", "prenorm", + "postnorm"], + "embedding": ["embedding", "embed", "value embed", "wte", + "positional"], + "numerical": ["precision", "softcap", "bfloat", "float16", + "numerical", "nan", "clamp"], + "simplification": ["remov", "delet", "simplif", "clean", + "strip", "drop"], + "combination": ["combin", "merge", "together", "plus", + "along with"], + "radical": ["radical", "fundamental", "rewrite", "replace", + "entirely", "from scratch"], + } + + for category, keywords in keyword_map.items(): + if any(kw in desc_lower for kw in keywords): + tags.append(category) + + return tags if tags else ["misc"] diff --git a/monitor.py b/monitor.py new file mode 100644 index 000000000..336918f68 --- /dev/null +++ b/monitor.py @@ -0,0 +1,535 @@ +""" +Experiment monitoring and observability for autoresearch. + +Inspired by mdemg's internal/metrics/ package, this module provides: +- Per-experiment and per-step metrics collection +- Prometheus-compatible text exposition format +- JSON export for external dashboards (e.g. Grafana) +- Real-time alerting hooks with configurable thresholds +- Aggregate statistics across experiment sessions + +The monitor is designed to be non-intrusive: it collects data passively +and never interferes with the training loop. All state persists to disk +so metrics survive process restarts and can be analyzed across sessions. + +Architecture note: Uses only stdlib (json, time, statistics) to avoid +adding dependencies beyond what autoresearch already requires. +""" + +import json +import os +import time +import statistics +from dataclasses import dataclass, field, asdict +from pathlib import Path +from typing import Optional + + +# --------------------------------------------------------------------------- +# Data structures +# --------------------------------------------------------------------------- + +@dataclass +class StepMetrics: + """Metrics captured at each training step.""" + step: int + timestamp: float + train_loss: float + learning_rate: float + vram_mb: float = 0.0 + tokens_per_sec: float = 0.0 + grad_norm: float = 0.0 + progress: float = 0.0 # fraction of TIME_BUDGET elapsed + + +@dataclass +class ExperimentRecord: + """Complete record of a single experiment run.""" + experiment_id: int + commit: str + description: str + start_time: float + end_time: float = 0.0 + val_bpb: float = 0.0 + peak_vram_mb: float = 0.0 + status: str = "running" # running | keep | discard | crash + num_steps: int = 0 + total_tokens_m: float = 0.0 + mfu_percent: float = 0.0 + training_seconds: float = 0.0 + # Per-step loss curve (sampled every N steps to bound memory) + loss_curve: list = field(default_factory=list) + # Smoothed loss at end of training (pre-eval) + final_train_loss: float = 0.0 + # Change category tags for this experiment (used by memory.py) + change_tags: list = field(default_factory=list) + + +@dataclass +class SessionStats: + """Aggregate statistics for the current monitoring session.""" + session_start: float = 0.0 + total_experiments: int = 0 + kept: int = 0 + discarded: int = 0 + crashed: int = 0 + best_val_bpb: float = float("inf") + best_commit: str = "" + total_training_seconds: float = 0.0 + # Rolling improvement velocity (BPB improvement per hour) + improvement_velocity: float = 0.0 + + +# --------------------------------------------------------------------------- +# Alert thresholds (configurable via environment variables) +# --------------------------------------------------------------------------- + +@dataclass +class AlertThresholds: + """Configurable alerting thresholds. + + Set via environment variables prefixed with AUTORESEARCH_ALERT_. + For example: AUTORESEARCH_ALERT_LOSS_SPIKE=5.0 + """ + # If training loss exceeds this multiple of the smoothed loss, flag it + loss_spike_ratio: float = 3.0 + # If VRAM exceeds this (MB), warn about potential OOM + vram_warning_mb: float = 75_000.0 + # If N consecutive experiments crash, trigger critical alert + consecutive_crash_limit: int = 3 + # If no improvement in N experiments, flag plateau + plateau_window: int = 15 + # Minimum tokens/sec to consider training healthy + min_tokens_per_sec: float = 1000.0 + + @classmethod + def from_env(cls) -> "AlertThresholds": + """Load thresholds from environment variables, falling back to defaults.""" + t = cls() + prefix = "AUTORESEARCH_ALERT_" + for fld in ["loss_spike_ratio", "vram_warning_mb", + "consecutive_crash_limit", "plateau_window", + "min_tokens_per_sec"]: + env_key = prefix + fld.upper() + val = os.environ.get(env_key) + if val is not None: + cast_fn = type(getattr(t, fld)) + setattr(t, fld, cast_fn(val)) + return t + + +# --------------------------------------------------------------------------- +# Alert types +# --------------------------------------------------------------------------- + +@dataclass +class Alert: + """A monitoring alert emitted when a threshold is breached.""" + severity: str # info | warning | critical + category: str # loss_spike | vram_pressure | crash_streak | plateau | throughput + message: str + timestamp: float = field(default_factory=time.time) + experiment_id: int = -1 + value: float = 0.0 + threshold: float = 0.0 + + +# --------------------------------------------------------------------------- +# ExperimentTracker - the main monitoring class +# --------------------------------------------------------------------------- + +class ExperimentTracker: + """Tracks experiment metrics across an autonomous research session. + + Collects per-step training metrics, per-experiment summaries, and + session-level aggregate statistics. Exports to JSON and Prometheus + text format for integration with external dashboards. + + Inspired by mdemg's Prometheus metrics pipeline which provides + 10-panel overview dashboards with request rate, latency percentiles, + error rate, and cache hit tracking. Here we adapt that pattern for + experiment-level observability. + + Usage: + tracker = ExperimentTracker() + tracker.start_experiment("a1b2c3d", "increase LR to 0.06") + for step in training_loop: + tracker.record_step(step, loss, lr, vram_mb, tokens_per_sec) + tracker.end_experiment(val_bpb=0.995, status="keep", peak_vram_mb=44000) + tracker.export_json("metrics.json") + """ + + def __init__(self, metrics_dir: str = ".autoresearch/metrics", + thresholds: Optional[AlertThresholds] = None, + loss_sample_interval: int = 10): + self.metrics_dir = Path(metrics_dir) + self.metrics_dir.mkdir(parents=True, exist_ok=True) + self.thresholds = thresholds or AlertThresholds.from_env() + self.loss_sample_interval = loss_sample_interval + + self.session = SessionStats(session_start=time.time()) + self.experiments: list[ExperimentRecord] = [] + self.current: Optional[ExperimentRecord] = None + self.alerts: list[Alert] = [] + + # Track consecutive crashes for circuit-breaker alerting + self._consecutive_crashes = 0 + # Smoothed training loss for spike detection + self._smooth_loss = 0.0 + self._smooth_alpha = 0.05 + + # Load prior session data if available + self._load_session() + + # -- Experiment lifecycle ----------------------------------------------- + + def start_experiment(self, commit: str, description: str, + change_tags: Optional[list[str]] = None) -> int: + """Begin tracking a new experiment. Returns the experiment ID.""" + exp_id = self.session.total_experiments + self.current = ExperimentRecord( + experiment_id=exp_id, + commit=commit, + description=description, + start_time=time.time(), + change_tags=change_tags or [], + ) + self._smooth_loss = 0.0 + return exp_id + + def record_step(self, step: int, train_loss: float, + learning_rate: float = 0.0, vram_mb: float = 0.0, + tokens_per_sec: float = 0.0, grad_norm: float = 0.0, + progress: float = 0.0): + """Record metrics for a single training step. + + Called from within the training loop. Only stores every Nth step + to the loss curve to bound memory usage. + """ + if self.current is None: + return + + # Update smoothed loss for spike detection + if self._smooth_loss == 0.0: + self._smooth_loss = train_loss + else: + self._smooth_loss = (self._smooth_alpha * train_loss + + (1 - self._smooth_alpha) * self._smooth_loss) + + # Check for loss spike + if (self._smooth_loss > 0 and + train_loss > self.thresholds.loss_spike_ratio * self._smooth_loss): + self._emit_alert("warning", "loss_spike", + f"Loss spike: {train_loss:.4f} vs smoothed {self._smooth_loss:.4f}", + value=train_loss, threshold=self._smooth_loss) + + # Check VRAM pressure + if vram_mb > self.thresholds.vram_warning_mb: + self._emit_alert("warning", "vram_pressure", + f"VRAM {vram_mb:.0f}MB exceeds threshold {self.thresholds.vram_warning_mb:.0f}MB", + value=vram_mb, threshold=self.thresholds.vram_warning_mb) + + # Check throughput + if tokens_per_sec > 0 and tokens_per_sec < self.thresholds.min_tokens_per_sec: + self._emit_alert("info", "throughput", + f"Low throughput: {tokens_per_sec:.0f} tok/s", + value=tokens_per_sec, + threshold=self.thresholds.min_tokens_per_sec) + + # Sample loss curve at intervals + if step % self.loss_sample_interval == 0: + self.current.loss_curve.append( + StepMetrics( + step=step, timestamp=time.time(), + train_loss=train_loss, learning_rate=learning_rate, + vram_mb=vram_mb, tokens_per_sec=tokens_per_sec, + grad_norm=grad_norm, progress=progress, + ) + ) + + self.current.final_train_loss = self._smooth_loss + self.current.num_steps = step + 1 + + def end_experiment(self, val_bpb: float = 0.0, status: str = "discard", + peak_vram_mb: float = 0.0, training_seconds: float = 0.0, + total_tokens_m: float = 0.0, mfu_percent: float = 0.0): + """Finalize the current experiment and update session stats.""" + if self.current is None: + return + + self.current.end_time = time.time() + self.current.val_bpb = val_bpb + self.current.status = status + self.current.peak_vram_mb = peak_vram_mb + self.current.training_seconds = training_seconds + self.current.total_tokens_m = total_tokens_m + self.current.mfu_percent = mfu_percent + + # Convert StepMetrics in loss_curve to dicts for serialization + self.current.loss_curve = [asdict(s) if isinstance(s, StepMetrics) else s + for s in self.current.loss_curve] + + self.experiments.append(self.current) + + # Update session stats + self.session.total_experiments += 1 + self.session.total_training_seconds += training_seconds + if status == "keep": + self.session.kept += 1 + self._consecutive_crashes = 0 + if val_bpb < self.session.best_val_bpb: + self.session.best_val_bpb = val_bpb + self.session.best_commit = self.current.commit + elif status == "discard": + self.session.discarded += 1 + self._consecutive_crashes = 0 + elif status == "crash": + self.session.crashed += 1 + self._consecutive_crashes += 1 + if self._consecutive_crashes >= self.thresholds.consecutive_crash_limit: + self._emit_alert( + "critical", "crash_streak", + f"{self._consecutive_crashes} consecutive crashes detected", + value=self._consecutive_crashes, + threshold=self.thresholds.consecutive_crash_limit) + + # Check for plateau + self._check_plateau() + + # Update improvement velocity + self._update_velocity() + + self.current = None + self._save_session() + + # -- Analysis helpers --------------------------------------------------- + + def get_summary(self) -> dict: + """Return a summary dict of the current session.""" + s = self.session + return { + "session_duration_hours": (time.time() - s.session_start) / 3600, + "total_experiments": s.total_experiments, + "kept": s.kept, + "discarded": s.discarded, + "crashed": s.crashed, + "keep_rate": s.kept / max(s.kept + s.discarded, 1), + "best_val_bpb": s.best_val_bpb if s.best_val_bpb < float("inf") else None, + "best_commit": s.best_commit, + "improvement_velocity_per_hour": s.improvement_velocity, + "total_training_hours": s.total_training_seconds / 3600, + "active_alerts": len([a for a in self.alerts + if a.severity in ("warning", "critical")]), + } + + def get_recent_alerts(self, n: int = 10) -> list[dict]: + """Return the N most recent alerts.""" + return [asdict(a) for a in self.alerts[-n:]] + + def get_loss_curves(self, last_n: int = 5) -> dict: + """Return loss curves for the last N experiments. + + Useful for comparing training dynamics across experiments. + """ + curves = {} + for exp in self.experiments[-last_n:]: + curves[exp.commit] = { + "description": exp.description, + "status": exp.status, + "val_bpb": exp.val_bpb, + "loss_curve": exp.loss_curve, + } + return curves + + # -- Export formats ------------------------------------------------------ + + def export_json(self, path: str = ".autoresearch/metrics/session.json"): + """Export full session data to JSON for dashboard consumption.""" + data = { + "session": asdict(self.session), + "experiments": [asdict(e) for e in self.experiments], + "alerts": [asdict(a) for a in self.alerts], + "exported_at": time.time(), + } + out = Path(path) + out.parent.mkdir(parents=True, exist_ok=True) + with open(out, "w") as f: + json.dump(data, f, indent=2, default=str) + + def get_prometheus_text(self) -> str: + """Generate Prometheus text exposition format. + + Compatible with Prometheus /metrics endpoint or node_exporter + textfile collector. Inspired by mdemg's 10-panel Grafana dashboard + which tracks request rate, latency percentiles, error rate, and + cache hit ratios. Here we adapt those patterns for experiment metrics. + + Metrics exposed: + - autoresearch_experiments_total (counter, by status) + - autoresearch_best_val_bpb (gauge) + - autoresearch_improvement_velocity (gauge, BPB/hour) + - autoresearch_session_duration_seconds (gauge) + - autoresearch_alerts_total (counter, by severity) + - autoresearch_last_experiment_val_bpb (gauge) + - autoresearch_last_experiment_vram_mb (gauge) + - autoresearch_last_experiment_steps (gauge) + """ + s = self.session + lines = [ + "# HELP autoresearch_experiments_total Total experiments by status", + "# TYPE autoresearch_experiments_total counter", + f'autoresearch_experiments_total{{status="keep"}} {s.kept}', + f'autoresearch_experiments_total{{status="discard"}} {s.discarded}', + f'autoresearch_experiments_total{{status="crash"}} {s.crashed}', + "", + "# HELP autoresearch_best_val_bpb Best validation BPB achieved", + "# TYPE autoresearch_best_val_bpb gauge", + f"autoresearch_best_val_bpb {s.best_val_bpb if s.best_val_bpb < float('inf') else 0}", + "", + "# HELP autoresearch_improvement_velocity BPB improvement per hour", + "# TYPE autoresearch_improvement_velocity gauge", + f"autoresearch_improvement_velocity {s.improvement_velocity:.8f}", + "", + "# HELP autoresearch_session_duration_seconds Session uptime", + "# TYPE autoresearch_session_duration_seconds gauge", + f"autoresearch_session_duration_seconds {time.time() - s.session_start:.1f}", + "", + "# HELP autoresearch_alerts_total Alerts by severity", + "# TYPE autoresearch_alerts_total counter", + ] + for sev in ("info", "warning", "critical"): + count = sum(1 for a in self.alerts if a.severity == sev) + lines.append(f'autoresearch_alerts_total{{severity="{sev}"}} {count}') + + # Last experiment metrics + if self.experiments: + last = self.experiments[-1] + lines += [ + "", + "# HELP autoresearch_last_experiment_val_bpb Last experiment val BPB", + "# TYPE autoresearch_last_experiment_val_bpb gauge", + f"autoresearch_last_experiment_val_bpb {last.val_bpb}", + "", + "# HELP autoresearch_last_experiment_vram_mb Last experiment peak VRAM", + "# TYPE autoresearch_last_experiment_vram_mb gauge", + f"autoresearch_last_experiment_vram_mb {last.peak_vram_mb}", + "", + "# HELP autoresearch_last_experiment_steps Last experiment step count", + "# TYPE autoresearch_last_experiment_steps gauge", + f"autoresearch_last_experiment_steps {last.num_steps}", + ] + + lines.append("") + return "\n".join(lines) + + # -- Internal helpers --------------------------------------------------- + + def _emit_alert(self, severity: str, category: str, message: str, + value: float = 0.0, threshold: float = 0.0): + """Create and store an alert.""" + alert = Alert( + severity=severity, category=category, message=message, + experiment_id=self.current.experiment_id if self.current else -1, + value=value, threshold=threshold, + ) + self.alerts.append(alert) + + def _check_plateau(self): + """Detect if we're on an improvement plateau. + + A plateau is defined as N consecutive non-keep experiments + (excluding crashes). Inspired by mdemg's anomaly detection + which flags empty-recall and stale-node patterns. + """ + window = self.thresholds.plateau_window + if len(self.experiments) < window: + return + recent = self.experiments[-window:] + if all(e.status != "keep" for e in recent): + self._emit_alert( + "warning", "plateau", + f"No improvements in last {window} experiments — consider radical changes", + value=window, threshold=window) + + def _update_velocity(self): + """Calculate improvement velocity (BPB improvement per hour). + + Uses the kept experiments to compute the rate of improvement, + weighted toward recent results. Inspired by mdemg's learning + rate scheduling which adapts based on maturity. + """ + kept_exps = [e for e in self.experiments if e.status == "keep"] + if len(kept_exps) < 2: + self.session.improvement_velocity = 0.0 + return + + first_kept = kept_exps[0] + last_kept = kept_exps[-1] + bpb_delta = first_kept.val_bpb - last_kept.val_bpb # positive = improvement + time_delta = last_kept.end_time - first_kept.end_time + if time_delta > 0: + self.session.improvement_velocity = bpb_delta / (time_delta / 3600) + + def _save_session(self): + """Persist session state to disk for crash recovery.""" + path = self.metrics_dir / "session_state.json" + data = { + "session": asdict(self.session), + "experiment_count": len(self.experiments), + "consecutive_crashes": self._consecutive_crashes, + "saved_at": time.time(), + } + with open(path, "w") as f: + json.dump(data, f, indent=2, default=str) + + def _load_session(self): + """Restore session state from disk if available.""" + path = self.metrics_dir / "session_state.json" + if not path.exists(): + return + try: + with open(path) as f: + data = json.load(f) + # Restore crash streak counter + self._consecutive_crashes = data.get("consecutive_crashes", 0) + except (json.JSONDecodeError, KeyError): + pass # Corrupted state file — start fresh + + +# --------------------------------------------------------------------------- +# Convenience: format a quick text dashboard for terminal output +# --------------------------------------------------------------------------- + +def format_dashboard(tracker: ExperimentTracker) -> str: + """Format a compact text dashboard for terminal display. + + Provides a quick-glance summary similar to mdemg's per-space + statistics view showing nodes, edges, layers, and health scores. + Here we show experiment counts, keep rate, best BPB, velocity, + and active alerts. + """ + s = tracker.get_summary() + lines = [ + "=" * 60, + " AUTORESEARCH MONITOR", + "=" * 60, + f" Experiments: {s['total_experiments']:>5} " + f"(keep={s['kept']} discard={s['discarded']} crash={s['crashed']})", + f" Keep rate: {s['keep_rate']:>5.1%}", + f" Best BPB: {s['best_val_bpb'] or 'N/A':>10} " + f"(commit: {s['best_commit'] or 'N/A'})", + f" Velocity: {s['improvement_velocity_per_hour']:>10.6f} BPB/hour", + f" Duration: {s['session_duration_hours']:>5.1f} hours " + f"({s['total_training_hours']:.1f}h training)", + ] + + alerts = tracker.get_recent_alerts(3) + if alerts: + lines.append(f" Alerts: {s['active_alerts']} active") + for a in alerts: + icon = {"info": "[i]", "warning": "[!]", "critical": "[X]"}.get( + a["severity"], "[?]") + lines.append(f" {icon} {a['message']}") + + lines.append("=" * 60) + return "\n".join(lines) diff --git a/program.md b/program.md index dea9bcc01..86a6ca2d2 100644 --- a/program.md +++ b/program.md @@ -112,3 +112,208 @@ The idea is that you are a completely autonomous researcher trying things out. I **NEVER STOP**: Once the experiment loop has begun (after the initial setup), do NOT pause to ask the human if you should continue. Do NOT ask "should I keep going?" or "is this a good stopping point?". The human might be asleep, or gone from a computer and expects you to continue working *indefinitely* until you are manually stopped. You are autonomous. If you run out of ideas, think harder — read papers referenced in the code, re-read the in-scope files for new angles, try combining previous near-misses, try more radical architectural changes. The loop runs until the human interrupts you, period. As an example use case, a user might leave you running while they sleep. If each experiment takes you ~5 minutes then you can run approx 12/hour, for a total of about 100 over the duration of the average human sleep. The user then wakes up to experimental results, all completed by you while they slept! + +--- + +## Observability & Intelligence Modules (Optional) + +Four optional modules enhance the experiment loop with monitoring, memory, +resilience, and proactive guidance. These are **non-breaking additions** — +the core `train.py` loop works without them. They help you make smarter +experiment decisions and help humans understand what happened overnight. + +All modules use only stdlib (json, time, math, statistics) — no new +dependencies beyond what's already in `pyproject.toml`. + +### monitor.py — Experiment Metrics & Observability + +Tracks per-experiment and per-step metrics across the session. Inspired by +production observability systems with Prometheus-compatible export. + +**Key features:** +- Per-experiment tracking: val_bpb, VRAM, MFU, step count, loss curves +- Session-level aggregates: keep rate, improvement velocity (BPB/hour) +- Real-time alerting: loss spikes, VRAM pressure, crash streaks, plateaus +- Export: JSON for dashboards, Prometheus text exposition format +- Terminal dashboard: `format_dashboard(tracker)` for quick-glance status + +**Usage in the experiment loop:** + +```python +from monitor import ExperimentTracker, format_dashboard + +tracker = ExperimentTracker() + +# At the start of each experiment: +tracker.start_experiment(commit, description, change_tags=["learning_rate"]) + +# During training (optional — for per-step loss curves): +tracker.record_step(step, train_loss, lr, vram_mb, tokens_per_sec) + +# After each experiment: +tracker.end_experiment(val_bpb=0.995, status="keep", peak_vram_mb=44000) + +# Periodic dashboard output: +print(format_dashboard(tracker)) + +# Export for external dashboards: +tracker.export_json() +``` + +**Environment variables for alert thresholds:** +- `AUTORESEARCH_ALERT_LOSS_SPIKE_RATIO` — loss spike detection multiplier (default: 3.0) +- `AUTORESEARCH_ALERT_VRAM_WARNING_MB` — VRAM warning threshold (default: 75000) +- `AUTORESEARCH_ALERT_CONSECUTIVE_CRASH_LIMIT` — crashes before alert (default: 3) +- `AUTORESEARCH_ALERT_PLATEAU_WINDOW` — experiments without improvement (default: 15) + +### memory.py — Cross-Session Experiment Memory + +Persists experiment knowledge across research sessions using Hebbian +association learning. Remembers what types of changes tend to improve BPB +and which are dead ends. + +**Key features:** +- Hebbian associations: strengthens category-outcome connections over time +- Temporal decay: older experiments contribute less (keeps system responsive) +- Surprise-weighted storage: unexpected results are remembered longer +- Auto-tagging: automatically categorizes experiments by description keywords +- Pattern extraction: identifies promising directions and dead ends + +**Change categories** (the taxonomy for experiment classification): +`architecture`, `attention`, `activation`, `optimizer`, `learning_rate`, +`schedule`, `batch_size`, `initialization`, `regularization`, `normalization`, +`embedding`, `numerical`, `simplification`, `combination`, `radical` + +**Usage in the experiment loop:** + +```python +from memory import ExperimentMemory + +memory = ExperimentMemory() + +# After each experiment: +memory.store_experiment( + commit="a1b2c3d", + description="increase matrix LR to 0.06", + val_bpb=0.993, delta_bpb=-0.004, status="keep", + change_tags=["learning_rate", "optimizer"], +) + +# Before choosing the next experiment: +promising = memory.get_promising_directions(top_k=5) +dead_ends = memory.get_dead_ends() +plateau = memory.get_plateaus() + +# Periodically apply temporal decay (e.g., every 20 experiments): +memory.decay() +``` + +### resilience.py — Circuit Breakers & Anomaly Detection + +Prevents wasting GPU time on repeated failures and detects problematic +patterns across experiments. + +**Key features:** +- Circuit breaker: blocks experiments after N consecutive crashes, with + half-open probing and exponential backoff recovery +- Anomaly detection: plateaus, VRAM creep, systematic regression, crash clusters +- Backpressure monitoring: tracks VRAM trends, warns before OOM +- Unified ExperimentGuard: single pre/post experiment interface + +**Usage in the experiment loop:** + +```python +from resilience import ExperimentGuard + +guard = ExperimentGuard(gpu_vram_mb=81920) # Set to your GPU's VRAM + +# Before each experiment: +verdict = guard.pre_experiment(description="double model width") +if verdict.blocked: + print(f"BLOCKED: {verdict.reason}") + # Skip this experiment, try something else +for warning in verdict.warnings: + print(f"WARNING: {warning}") + +# After each experiment: +guard.post_experiment(val_bpb=0.995, status="keep", peak_vram_mb=44000) +``` + +### guidance.py — Proactive Experiment Suggestions + +Synthesizes signals from memory, monitoring, and resilience to suggest +the most promising next experiment. Detects contradictions and assesses +overall strategy effectiveness. + +**Key features:** +- Ranked experiment suggestions based on Hebbian memory associations +- Plateau breaker: suggests radical changes when stuck +- Contradiction detection: finds conflicting results that need investigation +- Strategy assessment: evaluates phase (exploring/exploiting/plateaued) +- Formatted guidance: ready-to-inject context for agent decisions + +**Usage in the experiment loop:** + +```python +from guidance import ExperimentAdvisor + +advisor = ExperimentAdvisor(memory=memory, tracker=tracker, guard=guard) + +# Before choosing the next experiment: +guidance = advisor.get_guidance() +print(guidance["formatted"]) # Human-readable guidance +# Use guidance["suggestions"] to inform your next experiment choice +``` + +### Recommended Integration Pattern + +Here's the full recommended pattern for the experiment loop with all modules: + +```python +from monitor import ExperimentTracker, format_dashboard +from memory import ExperimentMemory +from resilience import ExperimentGuard +from guidance import ExperimentAdvisor + +tracker = ExperimentTracker() +memory = ExperimentMemory() +guard = ExperimentGuard() +advisor = ExperimentAdvisor(memory=memory, tracker=tracker, guard=guard) + +# LOOP FOREVER: +while True: + # 1. Get guidance for next experiment + guidance = advisor.get_guidance() + # (Use suggestions to pick your next experiment) + + # 2. Pre-experiment safety check + verdict = guard.pre_experiment(description=description) + if verdict.blocked: + # Wait or try a different approach + continue + + # 3. Modify train.py and commit + # ... + + # 4. Run experiment + tracker.start_experiment(commit, description, change_tags) + # uv run train.py > run.log 2>&1 + tracker.end_experiment(val_bpb, status, peak_vram_mb) + + # 5. Record results + memory.store_experiment(commit, description, val_bpb, delta_bpb, status) + guard.post_experiment(val_bpb, status, peak_vram_mb) + + # 6. Periodic maintenance + if tracker.session.total_experiments % 20 == 0: + memory.decay() + tracker.export_json() + print(format_dashboard(tracker)) +``` + +### State directories + +All modules persist state to `.autoresearch/` (gitignored): +- `.autoresearch/metrics/` — monitor session data and JSON exports +- `.autoresearch/memory/` — experiment memory and Hebbian associations +- `.autoresearch/resilience/` — circuit breaker and anomaly state diff --git a/resilience.py b/resilience.py new file mode 100644 index 000000000..ad3f8f8c5 --- /dev/null +++ b/resilience.py @@ -0,0 +1,571 @@ +""" +Circuit breakers and anomaly detection for autoresearch. + +Inspired by mdemg's resilience infrastructure: +- internal/circuitbreaker/: Per-endpoint circuit breakers with half-open + recovery and exponential backoff +- internal/anomaly/: Multi-dimensional anomaly detection (empty-resume, + empty-recall, stale-node patterns) +- internal/backpressure/: Memory pressure handling with graceful degradation +- internal/ratelimit/: Provider-specific rate limiting + +This module adapts those patterns for autonomous experiment management: + +1. CircuitBreaker: Prevents wasting GPU time on repeated failures. + After N consecutive crashes, enters OPEN state (experiments blocked). + Periodically allows a single HALF_OPEN probe to test recovery. + +2. AnomalyDetector: Detects problematic patterns across experiments: + - Loss plateaus (no improvement in N experiments) + - VRAM creep (monotonically increasing memory usage) + - Systematic regression (each experiment worse than the last) + - Crash clustering (crashes concentrated in recent experiments) + +3. BackpressureMonitor: Tracks VRAM usage trends and warns when + the system is approaching GPU memory limits. + +4. ExperimentGuard: Wraps experiment execution with all safety checks, + providing a single pre/post-experiment interface. + +Usage: + guard = ExperimentGuard() + + # Before running an experiment + verdict = guard.pre_experiment(description="double model width") + if verdict.blocked: + print(f"Blocked: {verdict.reason}") + # Skip this experiment + + # After running an experiment + guard.post_experiment( + val_bpb=0.995, status="keep", peak_vram_mb=44000 + ) +""" + +import json +import time +from dataclasses import dataclass, field, asdict +from enum import Enum +from pathlib import Path +from typing import Optional + + +# --------------------------------------------------------------------------- +# Circuit Breaker (inspired by mdemg's internal/circuitbreaker/) +# --------------------------------------------------------------------------- + +class CircuitState(Enum): + """Circuit breaker states. + + CLOSED: Normal operation, experiments proceed freely. + OPEN: Too many failures, experiments are blocked. + HALF_OPEN: Probing — allow one experiment to test recovery. + + State transitions: + CLOSED -> OPEN: after N consecutive failures + OPEN -> HALF_OPEN: after cooldown period + HALF_OPEN -> CLOSED: if probe experiment succeeds + HALF_OPEN -> OPEN: if probe experiment fails + """ + CLOSED = "closed" + OPEN = "open" + HALF_OPEN = "half_open" + + +@dataclass +class CircuitBreakerConfig: + """Circuit breaker configuration. + + Mirrors mdemg's per-endpoint circuit breaker configuration which + allows different thresholds for different failure modes. + """ + # Number of consecutive failures to trip the breaker + failure_threshold: int = 5 + # Seconds to wait before allowing a probe (half-open) + cooldown_seconds: float = 120.0 + # Max consecutive probes that can fail before extending cooldown + max_probe_failures: int = 2 + # Cooldown multiplier after probe failure (exponential backoff) + backoff_multiplier: float = 2.0 + # Maximum cooldown duration (cap for exponential backoff) + max_cooldown_seconds: float = 600.0 + + +class CircuitBreaker: + """Prevents wasting GPU time on repeated experiment failures. + + When experiments crash repeatedly, the circuit breaker OPENS to + prevent further wasted runs. After a cooldown period, it enters + HALF_OPEN state and allows a single probe experiment. If the probe + succeeds, the breaker closes. If it fails, the cooldown is extended + with exponential backoff. + + Adapted from mdemg's circuit breaker pattern which protects embedding + provider endpoints from cascading failures with half-open recovery. + """ + + def __init__(self, config: Optional[CircuitBreakerConfig] = None): + self.config = config or CircuitBreakerConfig() + self.state = CircuitState.CLOSED + self.consecutive_failures = 0 + self.last_failure_time = 0.0 + self.current_cooldown = self.config.cooldown_seconds + self.probe_failures = 0 + self.total_trips = 0 + + def record_success(self): + """Record a successful experiment (keep or discard with valid BPB).""" + self.consecutive_failures = 0 + self.probe_failures = 0 + self.current_cooldown = self.config.cooldown_seconds + if self.state != CircuitState.CLOSED: + self.state = CircuitState.CLOSED + + def record_failure(self): + """Record a failed experiment (crash, OOM, timeout).""" + self.consecutive_failures += 1 + self.last_failure_time = time.time() + + if self.state == CircuitState.HALF_OPEN: + # Probe failed — re-open with extended cooldown + self.probe_failures += 1 + self.current_cooldown = min( + self.current_cooldown * self.config.backoff_multiplier, + self.config.max_cooldown_seconds + ) + self.state = CircuitState.OPEN + return + + if self.consecutive_failures >= self.config.failure_threshold: + self.state = CircuitState.OPEN + self.total_trips += 1 + + def allow_experiment(self) -> tuple[bool, str]: + """Check if an experiment is allowed to run. + + Returns (allowed, reason) tuple. If the circuit is OPEN and the + cooldown has elapsed, transitions to HALF_OPEN and allows one probe. + """ + if self.state == CircuitState.CLOSED: + return True, "circuit closed — normal operation" + + if self.state == CircuitState.OPEN: + elapsed = time.time() - self.last_failure_time + if elapsed >= self.current_cooldown: + self.state = CircuitState.HALF_OPEN + return True, (f"circuit half-open — probe experiment " + f"(cooldown was {self.current_cooldown:.0f}s)") + remaining = self.current_cooldown - elapsed + return False, (f"circuit OPEN — {self.consecutive_failures} consecutive " + f"failures, {remaining:.0f}s until probe") + + if self.state == CircuitState.HALF_OPEN: + return True, "circuit half-open — probe in progress" + + return True, "unknown state — allowing" + + def get_status(self) -> dict: + """Return circuit breaker status.""" + return { + "state": self.state.value, + "consecutive_failures": self.consecutive_failures, + "total_trips": self.total_trips, + "current_cooldown": self.current_cooldown, + "probe_failures": self.probe_failures, + } + + +# --------------------------------------------------------------------------- +# Anomaly Detector (inspired by mdemg's internal/anomaly/) +# --------------------------------------------------------------------------- + +@dataclass +class Anomaly: + """A detected anomaly in the experiment stream.""" + anomaly_type: str # plateau | vram_creep | systematic_regression | crash_cluster + severity: str # info | warning | critical + message: str + timestamp: float = field(default_factory=time.time) + data: dict = field(default_factory=dict) + + +class AnomalyDetector: + """Detects problematic patterns across experiments. + + Inspired by mdemg's anomaly detection system which watches for: + - Empty resume events (no prior memory found) + - Empty recall results (retrieval returning nothing) + - Stale nodes (nodes not accessed in a long time) + + Here we adapt those patterns for experiment research: + - Plateau: no improvements in a window of experiments + - VRAM creep: memory usage increasing monotonically + - Systematic regression: each experiment worse than the last + - Crash clustering: high crash rate in recent experiments + """ + + def __init__(self, plateau_window: int = 15, + vram_creep_window: int = 5, + regression_window: int = 5, + crash_rate_threshold: float = 0.5): + self.plateau_window = plateau_window + self.vram_creep_window = vram_creep_window + self.regression_window = regression_window + self.crash_rate_threshold = crash_rate_threshold + self.anomalies: list[Anomaly] = [] + + def analyze(self, experiments: list[dict]) -> list[Anomaly]: + """Run all anomaly detectors on the experiment history. + + Args: + experiments: List of experiment dicts with keys: + val_bpb, status, peak_vram_mb, description + + Returns: + List of newly detected anomalies. + """ + new_anomalies = [] + + if len(experiments) >= self.plateau_window: + anomaly = self._check_plateau(experiments) + if anomaly: + new_anomalies.append(anomaly) + + if len(experiments) >= self.vram_creep_window: + anomaly = self._check_vram_creep(experiments) + if anomaly: + new_anomalies.append(anomaly) + + if len(experiments) >= self.regression_window: + anomaly = self._check_regression(experiments) + if anomaly: + new_anomalies.append(anomaly) + + if len(experiments) >= 5: + anomaly = self._check_crash_cluster(experiments) + if anomaly: + new_anomalies.append(anomaly) + + self.anomalies.extend(new_anomalies) + return new_anomalies + + def _check_plateau(self, experiments: list[dict]) -> Optional[Anomaly]: + """Detect improvement plateaus.""" + recent = experiments[-self.plateau_window:] + kept = [e for e in recent if e.get("status") == "keep"] + if len(kept) == 0: + return Anomaly( + anomaly_type="plateau", + severity="warning", + message=f"No improvements in last {self.plateau_window} experiments", + data={"window": self.plateau_window, "experiments_checked": len(recent)}, + ) + return None + + def _check_vram_creep(self, experiments: list[dict]) -> Optional[Anomaly]: + """Detect monotonically increasing VRAM usage. + + Inspired by mdemg's backpressure monitoring which tracks memory + pressure and triggers graceful degradation. + """ + recent = [e for e in experiments[-self.vram_creep_window:] + if e.get("peak_vram_mb", 0) > 0] + if len(recent) < self.vram_creep_window: + return None + + vram_values = [e["peak_vram_mb"] for e in recent] + # Check if strictly increasing + if all(vram_values[i] < vram_values[i + 1] + for i in range(len(vram_values) - 1)): + increase_pct = ((vram_values[-1] - vram_values[0]) / + max(vram_values[0], 1)) * 100 + return Anomaly( + anomaly_type="vram_creep", + severity="warning" if increase_pct < 20 else "critical", + message=(f"VRAM increasing monotonically: " + f"{vram_values[0]:.0f} -> {vram_values[-1]:.0f} MB " + f"(+{increase_pct:.1f}%)"), + data={"vram_values": vram_values, "increase_pct": increase_pct}, + ) + return None + + def _check_regression(self, experiments: list[dict]) -> Optional[Anomaly]: + """Detect systematic regression (each result worse than the last).""" + recent = [e for e in experiments[-self.regression_window:] + if e.get("status") != "crash" and e.get("val_bpb", 0) > 0] + if len(recent) < self.regression_window: + return None + + bpb_values = [e["val_bpb"] for e in recent] + # Check if strictly increasing (worse BPB) + if all(bpb_values[i] < bpb_values[i + 1] + for i in range(len(bpb_values) - 1)): + return Anomaly( + anomaly_type="systematic_regression", + severity="warning", + message=(f"BPB worsening over last {self.regression_window} " + f"experiments: {bpb_values[0]:.6f} -> {bpb_values[-1]:.6f}"), + data={"bpb_values": bpb_values}, + ) + return None + + def _check_crash_cluster(self, experiments: list[dict]) -> Optional[Anomaly]: + """Detect high crash rate in recent experiments.""" + recent = experiments[-10:] + crash_count = sum(1 for e in recent if e.get("status") == "crash") + crash_rate = crash_count / len(recent) + if crash_rate >= self.crash_rate_threshold: + return Anomaly( + anomaly_type="crash_cluster", + severity="critical", + message=f"High crash rate: {crash_count}/{len(recent)} ({crash_rate:.0%}) in recent experiments", + data={"crash_count": crash_count, "window": len(recent), + "crash_rate": crash_rate}, + ) + return None + + def get_all_anomalies(self) -> list[dict]: + """Return all detected anomalies.""" + return [asdict(a) for a in self.anomalies] + + +# --------------------------------------------------------------------------- +# Backpressure Monitor (inspired by mdemg's internal/backpressure/) +# --------------------------------------------------------------------------- + +class BackpressureMonitor: + """Monitors VRAM pressure and suggests adjustments. + + Tracks VRAM usage across experiments and provides warnings when + approaching GPU memory limits. Can suggest batch size or model + size reductions to stay within safe limits. + + Inspired by mdemg's backpressure system which monitors Go heap + memory and triggers graceful degradation (reducing cache sizes, + deferring background tasks) when pressure exceeds thresholds. + """ + + def __init__(self, gpu_vram_mb: float = 81_920.0, # H100 80GB + warning_threshold: float = 0.85, + critical_threshold: float = 0.95): + self.gpu_vram_mb = gpu_vram_mb + self.warning_threshold = warning_threshold + self.critical_threshold = critical_threshold + self.vram_history: list[float] = [] + + def record_vram(self, peak_vram_mb: float): + """Record peak VRAM from an experiment.""" + self.vram_history.append(peak_vram_mb) + + def get_pressure(self) -> dict: + """Calculate current VRAM pressure level. + + Returns pressure analysis with: + - level: ok | warning | critical + - utilization: fraction of GPU VRAM used + - trend: increasing | stable | decreasing + - suggestion: actionable recommendation + """ + if not self.vram_history: + return {"level": "ok", "utilization": 0.0, + "trend": "unknown", "suggestion": None} + + latest = self.vram_history[-1] + utilization = latest / self.gpu_vram_mb + + # Determine trend from last 5 measurements + trend = "stable" + if len(self.vram_history) >= 3: + recent = self.vram_history[-3:] + if all(recent[i] < recent[i + 1] for i in range(len(recent) - 1)): + trend = "increasing" + elif all(recent[i] > recent[i + 1] for i in range(len(recent) - 1)): + trend = "decreasing" + + # Determine level and suggestion + level = "ok" + suggestion = None + if utilization >= self.critical_threshold: + level = "critical" + suggestion = ("VRAM critically high — reduce DEVICE_BATCH_SIZE " + "or model depth immediately") + elif utilization >= self.warning_threshold: + level = "warning" + if trend == "increasing": + suggestion = ("VRAM approaching limit with increasing trend — " + "avoid changes that increase model size") + else: + suggestion = "VRAM high but stable — monitor closely" + + return { + "level": level, + "utilization": round(utilization, 3), + "peak_vram_mb": latest, + "gpu_vram_mb": self.gpu_vram_mb, + "trend": trend, + "suggestion": suggestion, + } + + +# --------------------------------------------------------------------------- +# ExperimentGuard - unified pre/post experiment safety +# --------------------------------------------------------------------------- + +@dataclass +class PreExperimentVerdict: + """Result of pre-experiment safety checks.""" + allowed: bool + blocked: bool # True if experiment should be skipped + reason: str + warnings: list[str] = field(default_factory=list) + suggestions: list[str] = field(default_factory=list) + + +class ExperimentGuard: + """Unified pre/post experiment safety wrapper. + + Combines circuit breaker, anomaly detection, and backpressure + monitoring into a single interface. Call pre_experiment() before + running and post_experiment() after. + + This is the primary integration point for resilience features. + Inspired by mdemg's guardrail system which validates MCP constraints + and provides server-side enforcement. + + Usage: + guard = ExperimentGuard() + + # Before each experiment: + verdict = guard.pre_experiment("try larger model") + if verdict.blocked: + # Skip this experiment, log the reason + ... + + # After each experiment: + guard.post_experiment(val_bpb=0.995, status="keep", peak_vram_mb=44000) + """ + + def __init__(self, state_dir: str = ".autoresearch/resilience", + gpu_vram_mb: float = 81_920.0): + self.state_dir = Path(state_dir) + self.state_dir.mkdir(parents=True, exist_ok=True) + + self.circuit_breaker = CircuitBreaker() + self.anomaly_detector = AnomalyDetector() + self.backpressure = BackpressureMonitor(gpu_vram_mb=gpu_vram_mb) + + self._experiment_history: list[dict] = [] + self._load_state() + + def pre_experiment(self, description: str = "") -> PreExperimentVerdict: + """Run all pre-experiment safety checks. + + Returns a verdict indicating whether the experiment should proceed, + along with any warnings or suggestions. + """ + warnings = [] + suggestions = [] + + # Check circuit breaker + cb_allowed, cb_reason = self.circuit_breaker.allow_experiment() + if not cb_allowed: + return PreExperimentVerdict( + allowed=False, blocked=True, + reason=f"Circuit breaker: {cb_reason}", + warnings=[cb_reason], + suggestions=["Wait for cooldown or fix the root cause of crashes"], + ) + + # Check VRAM pressure + pressure = self.backpressure.get_pressure() + if pressure["level"] == "critical": + warnings.append(f"VRAM critical: {pressure['utilization']:.0%} utilized") + suggestions.append(pressure["suggestion"] or "Reduce model size") + elif pressure["level"] == "warning": + warnings.append(f"VRAM warning: {pressure['utilization']:.0%} utilized") + if pressure["suggestion"]: + suggestions.append(pressure["suggestion"]) + + # Run anomaly detection + new_anomalies = self.anomaly_detector.analyze(self._experiment_history) + for anomaly in new_anomalies: + if anomaly.severity == "critical": + warnings.append(f"[CRITICAL] {anomaly.message}") + elif anomaly.severity == "warning": + warnings.append(anomaly.message) + + return PreExperimentVerdict( + allowed=True, blocked=False, + reason=cb_reason, + warnings=warnings, + suggestions=suggestions, + ) + + def post_experiment(self, val_bpb: float = 0.0, status: str = "discard", + peak_vram_mb: float = 0.0, description: str = ""): + """Record experiment outcome and update all safety systems.""" + # Record in history + self._experiment_history.append({ + "val_bpb": val_bpb, + "status": status, + "peak_vram_mb": peak_vram_mb, + "description": description, + "timestamp": time.time(), + }) + + # Update circuit breaker + if status == "crash": + self.circuit_breaker.record_failure() + else: + self.circuit_breaker.record_success() + + # Update backpressure + if peak_vram_mb > 0: + self.backpressure.record_vram(peak_vram_mb) + + self._save_state() + + def get_status(self) -> dict: + """Return comprehensive resilience status.""" + return { + "circuit_breaker": self.circuit_breaker.get_status(), + "backpressure": self.backpressure.get_pressure(), + "anomalies": len(self.anomaly_detector.anomalies), + "recent_anomalies": [ + asdict(a) for a in self.anomaly_detector.anomalies[-3:] + ], + "total_experiments_tracked": len(self._experiment_history), + } + + def _save_state(self): + """Persist resilience state to disk.""" + path = self.state_dir / "state.json" + data = { + "circuit_breaker": self.circuit_breaker.get_status(), + "experiment_history": self._experiment_history[-100:], # Keep last 100 + "anomalies": [asdict(a) for a in self.anomaly_detector.anomalies], + "vram_history": self.backpressure.vram_history[-50:], + "saved_at": time.time(), + } + with open(path, "w") as f: + json.dump(data, f, indent=2, default=str) + + def _load_state(self): + """Restore resilience state from disk.""" + path = self.state_dir / "state.json" + if not path.exists(): + return + try: + with open(path) as f: + data = json.load(f) + self._experiment_history = data.get("experiment_history", []) + self.backpressure.vram_history = data.get("vram_history", []) + + # Restore circuit breaker state + cb = data.get("circuit_breaker", {}) + self.circuit_breaker.consecutive_failures = cb.get("consecutive_failures", 0) + self.circuit_breaker.total_trips = cb.get("total_trips", 0) + state_str = cb.get("state", "closed") + self.circuit_breaker.state = CircuitState(state_str) + except (json.JSONDecodeError, TypeError, KeyError, ValueError): + pass # Corrupted state — start fresh