diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 4eed3400..ff6676bf 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -84,7 +84,7 @@ jobs: run: uv sync --dev - name: Run pytest with coverage - run: uv run pytest + run: uv run pytest --ignore=tests/benchmarks - name: Upload coverage to Codecov if: matrix.python-version == '3.12' diff --git a/.gitignore b/.gitignore index 978a333b..42034a60 100644 --- a/.gitignore +++ b/.gitignore @@ -52,5 +52,11 @@ AGENTS.md tests/examples/* -# Integration test files (contain credentials/connection strings) -tests/llm/clients/oss/openai/async_integration.py +# Benchmarking results +tests/benchmarks/results/ +results/ +*.json +*.csv +!pyproject.toml +!package.json +!composer.json diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 45bcead6..00e3fbf7 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -16,17 +16,17 @@ repos: - repo: local hooks: - #- id: ty - #name: ty type checker - #entry: uvx ty check --exclude 'tests/llm/clients/**/*.py' - #language: system - #types: [python] - #pass_filenames: false - #always_run: true + - id: ty + name: ty type checker + entry: uvx ty check + language: system + types: [python] + pass_filenames: false + always_run: true - id: pytest name: pytest - entry: uv run pytest + entry: uv run pytest --ignore=tests/benchmarks language: system pass_filenames: false always_run: true diff --git a/pyproject.toml b/pyproject.toml index fbf0ed65..0ee2c04d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -87,11 +87,14 @@ python_classes = ["Test*"] python_functions = ["test_*"] markers = [ "asyncio: marks tests as async (deselect with '-m \"not asyncio\"')", + "benchmark: marks tests as performance benchmarks", ] asyncio_mode = "auto" addopts = [ "-v", "--strict-markers", + "-m", + "not benchmark", "--cov=memori", "--cov-report=term-missing", "--cov-report=html", @@ -116,7 +119,15 @@ exclude_lines = [ "if TYPE_CHECKING:", ] -[tool.ty] +[tool.ty.src] +exclude = [ + "tests/llm/clients/**/*.py", + "**/__pycache__/**", +] + +[tool.ty.environment] +python-version = "3.12" + [dependency-groups] dev = [ @@ -139,8 +150,10 @@ dev = [ "pymysql>=1.1.2", "pytest>=8.4.2", "pytest-asyncio>=0.24.0", + "pytest-benchmark>=4.0.0", "pytest-cov>=6.0.0", "pytest-mock>=3.15.1", + "psutil>=5.9.0", "requests>=2.32.5", "ruff>=0.8.0", "sqlalchemy>=2.0.44", diff --git a/tests/benchmarks/README.md b/tests/benchmarks/README.md new file mode 100644 index 00000000..d9c6c1a3 --- /dev/null +++ b/tests/benchmarks/README.md @@ -0,0 +1,58 @@ +# AWS EC2 Benchmark Guide + +This guide explains how to run Memori benchmarks on an EC2 instance in the same VPC as your AWS database (RDS Postgres or MySQL). + +## Setup on EC2 + +1. **SSH into EC2**: + ```bash + ssh ec2-user@your-ec2-ip + ``` + +2. **Run Setup**: + Copy `tests/benchmarks/setup_ec2_benchmarks.sh` to your EC2 or clone the repo and run it: + ```bash + chmod +x tests/benchmarks/setup_ec2_benchmarks.sh + ./tests/benchmarks/setup_ec2_benchmarks.sh + ``` + +## Running Benchmarks + +The `run_benchmarks_ec2.sh` script is flexible and handles automatic CSV generation. + +### Environment Variables + +- `DB_TYPE`: `postgres` (default) or `mysql` +- `TEST_TYPE`: `all` (default), `end_to_end`, `db_retrieval`, `semantic_search`, `embedding` +- `BENCHMARK_POSTGRES_URL`: Connection string for Postgres +- `BENCHMARK_MYSQL_URL`: Connection string for MySQL + +### Examples + +#### Run all Postgres benchmarks +```bash +export BENCHMARK_POSTGRES_URL="CHANGEME" +DB_TYPE=postgres TEST_TYPE=all ./tests/benchmarks/run_benchmarks_ec2.sh +``` + +#### Run only End-to-End MySQL benchmarks +```bash +export BENCHMARK_MYSQL_URL="CHANGEME" +DB_TYPE=mysql TEST_TYPE=end_to_end ./tests/benchmarks/run_benchmarks_ec2.sh +``` + +## Results + +All results are automatically saved to the `./results` directory with a timestamp to prevent overwriting: +- JSON output: `results_{db}_{type}_{timestamp}.json` +- **CSV Report**: `report_{db}_{type}_{timestamp}.csv` + +To download the CSV reports to your local machine: +```bash +scp ec2-user@your-ec2-ip:~/Memori/results/report_*.csv ./local_results/ +``` + +## Database Connection Requirements + +Ensure the EC2 Security Group allows outbound traffic to the database on ports 5432 (Postgres) or 3306 (MySQL). +The database must be in the same VPC or accessible via VPC Peering/Transit Gateway. diff --git a/tests/benchmarks/conftest.py b/tests/benchmarks/conftest.py new file mode 100644 index 00000000..a2d56d72 --- /dev/null +++ b/tests/benchmarks/conftest.py @@ -0,0 +1,175 @@ +"""Pytest fixtures for performance benchmarks.""" + +import os + +import pytest +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker + +from memori import Memori +from memori.llm._embeddings import embed_texts +from tests.benchmarks.fixtures.sample_data import ( + generate_facts_with_size, + generate_sample_queries, +) + + +@pytest.fixture +def postgres_db_connection(): + """Create a PostgreSQL database connection factory for benchmarking (via AWS/Docker).""" + postgres_uri = os.environ.get( + "BENCHMARK_POSTGRES_URL", + # Matches docker-compose.yml default DB name + "postgresql://memori:memori@localhost:5432/memori_test", + ) + + from sqlalchemy import text + + # Support SSL root certificate via environment variable (for AWS RDS) + connect_args = {} + sslrootcert = os.environ.get("BENCHMARK_POSTGRES_SSLROOTCERT") + if sslrootcert: + connect_args["sslrootcert"] = sslrootcert + # Ensure sslmode is set if using SSL cert + if "sslmode" not in postgres_uri: + # Add sslmode=require if not already in URI + separator = "&" if "?" in postgres_uri else "?" + postgres_uri = f"{postgres_uri}{separator}sslmode=require" + + engine = create_engine( + postgres_uri, + pool_pre_ping=True, + pool_recycle=300, + connect_args=connect_args if connect_args else None, + ) + + try: + with engine.connect() as conn: + conn.execute(text("SELECT 1")) + except Exception as e: + pytest.skip( + f"PostgreSQL not available at {postgres_uri}: {e}. " + "Set BENCHMARK_POSTGRES_URL to a database that exists." + ) + + Session = sessionmaker(autocommit=False, autoflush=False, bind=engine) + + yield Session + engine.dispose() + + +@pytest.fixture +def mysql_db_connection(): + """Create a MySQL database connection factory for benchmarking (via AWS/Docker).""" + mysql_uri = os.environ.get( + "BENCHMARK_MYSQL_URL", + "mysql+pymysql://memori:memori@localhost:3306/memori_test", + ) + + from sqlalchemy import text + + engine = create_engine( + mysql_uri, + pool_pre_ping=True, + pool_recycle=300, + ) + + try: + with engine.connect() as conn: + conn.execute(text("SELECT 1")) + except Exception as e: + pytest.skip(f"MySQL not available at {mysql_uri}: {e}") + + Session = sessionmaker(autocommit=False, autoflush=False, bind=engine) + + yield Session + engine.dispose() + + +@pytest.fixture( + params=["postgres", "mysql"], + ids=["postgres", "mysql"], +) +def db_connection(request): + """Parameterized fixture for realistic database types (no SQLite).""" + db_type = request.param + + if db_type == "postgres": + return request.getfixturevalue("postgres_db_connection") + elif db_type == "mysql": + return request.getfixturevalue("mysql_db_connection") + + pytest.skip(f"Unsupported benchmark database type: {db_type}") + + +@pytest.fixture +def memori_instance(db_connection, request): + """Create a Memori instance with the specified database for benchmarking.""" + mem = Memori(conn=db_connection) + mem.config.storage.build() + + db_type_param = None + for marker in request.node.iter_markers("parametrize"): + if "db_connection" in marker.args[0]: + db_type_param = marker.args[1][0] if marker.args[1] else None + break + + # Try to infer from connection + if not db_type_param: + try: + # SQLAlchemy sessionmaker is callable, so detect it first by presence of a bind. + bind = getattr(db_connection, "kw", {}).get("bind", None) + if bind is not None: + db_type_param = bind.dialect.name + else: + db_type_param = "unknown" + except Exception: + db_type_param = "unknown" + + mem._benchmark_db_type = db_type_param # ty: ignore[unresolved-attribute] + return mem + + +@pytest.fixture +def sample_queries(): + """Provide sample queries of varying lengths.""" + return generate_sample_queries() + + +@pytest.fixture +def fact_content_size(): + """Fixture for fact content size. + + Note: Embeddings are always 768 dimensions (3072 bytes binary) regardless of text size. + """ + return "small" + + +@pytest.fixture( + params=[5, 50, 100, 300, 600, 1000], + ids=lambda x: f"n{x}", +) +def entity_with_n_facts(memori_instance, fact_content_size, request): + """Create an entity with N facts for benchmarking database retrieval.""" + fact_count = request.param + entity_id = f"benchmark-entity-{fact_count}-{fact_content_size}" + memori_instance.attribution(entity_id=entity_id, process_id="benchmark-process") + + facts = generate_facts_with_size(fact_count, fact_content_size) + fact_embeddings = embed_texts(facts) + + entity_db_id = memori_instance.config.storage.driver.entity.create(entity_id) + memori_instance.config.storage.driver.entity_fact.create( + entity_db_id, facts, fact_embeddings + ) + + db_type = getattr(memori_instance, "_benchmark_db_type", "unknown") + + return { + "entity_id": entity_id, + "entity_db_id": entity_db_id, + "fact_count": fact_count, + "content_size": fact_content_size, + "db_type": db_type, + "facts": facts, + } diff --git a/tests/benchmarks/fixtures/sample_data.py b/tests/benchmarks/fixtures/sample_data.py new file mode 100644 index 00000000..09b968f7 --- /dev/null +++ b/tests/benchmarks/fixtures/sample_data.py @@ -0,0 +1,113 @@ +"""Helper functions for generating sample test data for benchmarks.""" + +import random +import string + +random.seed(42) + + +def generate_random_string(length: int = 10) -> str: + """Generate a random string of specified length.""" + return "".join(random.choices(string.ascii_letters + string.digits, k=length)) + + +def generate_sample_fact() -> str: + """Generate a realistic sample fact for testing.""" + templates = [ + "User likes {item}", + "User lives in {location}", + "User works at {company}", + "User's favorite color is {color}", + "User prefers {preference}", + "User has {count} {item}", + "User enjoys {activity}", + "User's birthday is {date}", + ] + + items = ["pizza", "coffee", "books", "music", "movies", "travel", "coding"] + locations = ["New York", "San Francisco", "London", "Tokyo", "Paris"] + companies = ["Tech Corp", "Startup Inc", "Big Company", "Small Business"] + colors = ["blue", "red", "green", "purple", "yellow"] + preferences = ["dark mode", "light mode", "minimalist design", "detailed UI"] + activities = ["reading", "hiking", "cooking", "gaming", "photography"] + dates = ["January 1st", "March 15th", "June 30th", "December 25th"] + + template = random.choice(templates) + fact = template.format( + item=random.choice(items), + location=random.choice(locations), + company=random.choice(companies), + color=random.choice(colors), + preference=random.choice(preferences), + count=random.randint(1, 10), + activity=random.choice(activities), + date=random.choice(dates), + ) + + return fact + + +def generate_sample_queries() -> dict[str, list[str]]: + """Generate sample queries of varying lengths for benchmarking.""" + return { + "short": [ + "What do I like?", + "Where do I live?", + "My preferences?", + "Favorite color?", + "Birthday?", + ], + "medium": [ + "What are my favorite things?", + "Tell me about where I live", + "What are my preferences for software?", + "What is my favorite color and why?", + "When is my birthday and how do I celebrate?", + ], + "long": [ + "Can you tell me about all the things I like and enjoy doing?", + "I want to know more about where I currently live and work", + "What are all my preferences when it comes to software and design?", + "Please provide details about my favorite color and any related memories", + "I'd like to know when my birthday is and how I typically celebrate it", + ], + } + + +def generate_facts(count: int) -> list[str]: + """Generate a list of unique sample facts.""" + facts = [] + seen = set() + + while len(facts) < count: + fact = generate_sample_fact() + # Add unique identifier to ensure no duplicates + unique_fact = f"{fact} (id: {len(facts)})" + + # Double-check uniqueness (shouldn't be needed with id, but safe) + if unique_fact not in seen: + facts.append(unique_fact) + seen.add(unique_fact) + + return facts + + +def generate_facts_with_size(count: int, size: str = "small") -> list[str]: + """Generate facts for benchmarking. + + Args: + count: Number of facts to generate + size: Content size + + Returns: + List of unique facts + """ + base_facts = generate_facts(count) + + def with_id_suffix(text: str, idx: int, max_len: int) -> str: + suffix = f" (id: {idx})" + if max_len <= len(suffix): + return suffix[-max_len:] + return text[: max_len - len(suffix)] + suffix + + return [with_id_suffix(fact, i, 60) for i, fact in enumerate(base_facts)] diff --git a/tests/benchmarks/generate_percentile_report.py b/tests/benchmarks/generate_percentile_report.py new file mode 100644 index 00000000..6a07e0b3 --- /dev/null +++ b/tests/benchmarks/generate_percentile_report.py @@ -0,0 +1,279 @@ +"""Generate percentile report (p50/p95/p99) from benchmark JSON results.""" + +import json +import sys +from pathlib import Path + + +def calculate_percentile(data, percentile): + """Calculate percentile from sorted data.""" + if not data: + return None + sorted_data = sorted(data) + index = (len(sorted_data) - 1) * percentile / 100 + lower = int(index) + upper = lower + 1 + weight = index - lower + + if upper >= len(sorted_data): + return sorted_data[lower] + + return sorted_data[lower] * (1 - weight) + sorted_data[upper] * weight + + +def extract_n_from_test_name(test_name): + """Extract N (number of records) from test name.""" + import re + + # Our parametrized ids use "n{N}" (e.g. "[n1000-postgres-small]") + match = re.search(r"n(\d+)", test_name) + if match: + return int(match.group(1)) + + # Backwards-compatible: plain numeric parameter (e.g. "[1000]") + match = re.search(r"\[(\d+)\]", test_name) + return int(match.group(1)) if match else None + + +def extract_db_type_from_test_name(test_name): + """Extract database type from test name.""" + import re + + # Look for database type in test name (postgres, mysql) + match = re.search(r"\[(postgres|mysql)[-\]]", test_name) + if not match: + match = re.search(r"-(postgres|mysql)[-\]]", test_name) + if not match: + match = re.search(r"(postgres|mysql)", test_name) + return match.group(1) if match else "unknown" + + +def extract_content_size_from_test_name(test_name): + """Extract content size from test name.""" + import re + + match = re.search(r"\[small[-\]]", test_name) + if not match: + match = re.search(r"-small[-\]]", test_name) + if not match: + match = re.search(r"small", test_name) + return match.group(0) if match else "small" + + +def extract_benchmark_id_from_test_name(test_name): + """ + Extract a stable benchmark identifier from pytest-benchmark's name field. + Examples: + "test_benchmark_end_to_end_recall[n1000-sqlite-small]" -> "test_benchmark_end_to_end_recall" + "test_benchmark_query_embedding_short" -> "test_benchmark_query_embedding_short" + """ + import re + + match = re.match(r"([^\[]+)", test_name) + return match.group(1) if match else test_name + + +def generate_percentile_report(json_file_path, max_n=None): + """Generate p50/p95/p99 report from benchmark JSON. + + Args: + json_file_path: Path to pytest-benchmark JSON output + max_n: Optional maximum N value to include (filters out tests with N > max_n) + """ + with open(json_file_path) as f: + data = json.load(f) + + benchmarks = {} + + for benchmark in data.get("benchmarks", []): + test_name = benchmark.get("name", "") + benchmark_id = extract_benchmark_id_from_test_name(test_name) + n = extract_n_from_test_name(test_name) + + # Skip if N is None (couldn't extract) or exceeds max_n filter + if n is None: + continue + if max_n is not None and n > max_n: + continue + + db_type = extract_db_type_from_test_name(test_name) + content_size = extract_content_size_from_test_name(test_name) + extra_info = benchmark.get("extra_info", {}) or {} + + stats = benchmark.get("stats", {}) + times = stats.get("data", []) + + if not times: + continue + + peak_rss_bytes = extra_info.get("peak_rss_bytes") + + p50 = calculate_percentile(times, 50) + p95 = calculate_percentile(times, 95) + p99 = calculate_percentile(times, 99) + + # Create composite key. We include benchmark_id so different benchmark types + # don't overwrite each other (e.g. end-to-end vs db fetch for same N/db/size). + key = (benchmark_id, n, db_type, content_size) + + benchmarks[key] = { + "benchmark_id": benchmark_id, + "n": n, + "db_type": db_type, + "content_size": content_size, + "p50": p50, + "p95": p95, + "p99": p99, + "mean": stats.get("mean", 0), + "min": stats.get("min", 0), + "max": stats.get("max", 0), + "peak_rss_bytes": peak_rss_bytes, + } + + return benchmarks + + +def generate_report(benchmarks, output_format="table"): + """Generate percentile report in specified format as string.""" + lines = [] + + if output_format == "table": + lines.append("\n" + "=" * 100) + lines.append( + "PERCENTILE REPORT (p50/p95/p99) per N, Database Type, and Content Size" + ) + lines.append("=" * 100) + lines.append( + f"{'Benchmark':<34} {'N':<8} {'DB':<12} {'Size':<8} {'p50 (ms)':<12} {'p95 (ms)':<12} " + f"{'p99 (ms)':<12} {'Mean (ms)':<12} {'Peak RSS (MB)':<14}" + ) + lines.append("-" * 100) + + for key in sorted(benchmarks.keys()): + stats = benchmarks[key] + peak_rss_mb = ( + (stats["peak_rss_bytes"] / (1024 * 1024)) + if stats.get("peak_rss_bytes") is not None + else None + ) + lines.append( + f"{stats['benchmark_id']:<34} " + f"{stats['n']:<8} " + f"{stats['db_type']:<12} " + f"{stats['content_size']:<8} " + f"{stats['p50'] * 1000:<12.4f} " + f"{stats['p95'] * 1000:<12.4f} " + f"{stats['p99'] * 1000:<12.4f} " + f"{stats['mean'] * 1000:<12.4f} " + f"{(f'{peak_rss_mb:.1f}' if peak_rss_mb is not None else ''):<14}" + ) + lines.append("=" * 100) + return "\n".join(lines) + + elif output_format == "csv": + lines.append( + "benchmark_id,N,db_type,content_size,p50_ms,p95_ms,p99_ms,mean_ms,min_ms,max_ms,peak_rss_mb" + ) + for key in sorted(benchmarks.keys()): + stats = benchmarks[key] + peak_rss_mb = ( + (stats["peak_rss_bytes"] / (1024 * 1024)) + if stats.get("peak_rss_bytes") is not None + else None + ) + lines.append( + f"{stats['benchmark_id']}," + f"{stats['n']}," + f"{stats['db_type']}," + f"{stats['content_size']}," + f"{stats['p50'] * 1000:.4f}," + f"{stats['p95'] * 1000:.4f}," + f"{stats['p99'] * 1000:.4f}," + f"{stats['mean'] * 1000:.4f}," + f"{stats['min'] * 1000:.4f}," + f"{stats['max'] * 1000:.4f}," + f"{(f'{peak_rss_mb:.4f}' if peak_rss_mb is not None else '')}" + ) + return "\n".join(lines) + + elif output_format == "json": + output = {} + for key in sorted(benchmarks.keys()): + stats = benchmarks[key] + output_key = f"{stats['benchmark_id']}_{stats['n']}_{stats['db_type']}_{stats['content_size']}" + peak_rss_mb = ( + (stats["peak_rss_bytes"] / (1024 * 1024)) + if stats.get("peak_rss_bytes") is not None + else None + ) + output[output_key] = { + "benchmark_id": stats["benchmark_id"], + "n": stats["n"], + "db_type": stats["db_type"], + "content_size": stats["content_size"], + "p50_ms": stats["p50"] * 1000, + "p95_ms": stats["p95"] * 1000, + "p99_ms": stats["p99"] * 1000, + "mean_ms": stats["mean"] * 1000, + "min_ms": stats["min"] * 1000, + "max_ms": stats["max"] * 1000, + "peak_rss_mb": peak_rss_mb, + } + return json.dumps(output, indent=2) + + return "" + + +def print_report(benchmarks, output_format="table"): + """Print percentile report to stdout.""" + print(generate_report(benchmarks, output_format)) + + +def main(): + if len(sys.argv) < 2: + print( + "Usage: python generate_percentile_report.py [format] [output_file] [max_n]" + ) + print(" format: table (default), csv, or json") + print(" output_file: optional file path to write report (default: stdout)") + print( + " max_n: optional maximum N value to include (filters out tests with N > max_n)" + ) + sys.exit(1) + + json_file = Path(sys.argv[1]) + if not json_file.exists(): + print(f"Error: File not found: {json_file}") + sys.exit(1) + + output_format = sys.argv[2] if len(sys.argv) > 2 else "table" + + if output_format not in ["table", "csv", "json"]: + print(f"Error: Invalid format '{output_format}'. Use: table, csv, or json") + sys.exit(1) + + output_file = sys.argv[3] if len(sys.argv) > 3 else None + max_n = int(sys.argv[4]) if len(sys.argv) > 4 else None + + benchmarks = generate_percentile_report(json_file, max_n=max_n) + + if not benchmarks: + print("No benchmark data found with N values.") + if max_n: + print(f"(Filtered to N <= {max_n})") + sys.exit(1) + + report = generate_report(benchmarks, output_format) + + if output_file: + output_path = Path(output_file) + output_path.write_text(report) + print(f"Report written to: {output_path}") + if max_n: + print(f"(Filtered to N <= {max_n})") + else: + print(report) + + +if __name__ == "__main__": + main() diff --git a/tests/benchmarks/memory_utils.py b/tests/benchmarks/memory_utils.py new file mode 100644 index 00000000..a12af84a --- /dev/null +++ b/tests/benchmarks/memory_utils.py @@ -0,0 +1,44 @@ +import threading +import time +from collections.abc import Callable +from typing import TypeVar + +T = TypeVar("T") + + +def measure_peak_rss_bytes( + fn: Callable[[], T], *, sample_interval_seconds: float = 0.005 +) -> tuple[T, int | None]: + """ + Measure approximate peak RSS (process resident set size) while running fn. + + Returns (result, peak_rss_bytes). If psutil isn't available, peak is None. + """ + try: + import psutil # type: ignore[import-not-found] + except Exception: + return fn(), None + + proc = psutil.Process() + peak = {"rss": proc.memory_info().rss} + stop = threading.Event() + + def _sampler() -> None: + while not stop.is_set(): + try: + rss = proc.memory_info().rss + if rss > peak["rss"]: + peak["rss"] = rss + except Exception: + pass + time.sleep(sample_interval_seconds) + + t = threading.Thread(target=_sampler, daemon=True) + t.start() + try: + result = fn() + finally: + stop.set() + t.join(timeout=1.0) + + return result, int(peak["rss"]) diff --git a/tests/benchmarks/run_benchmarks_ec2.sh b/tests/benchmarks/run_benchmarks_ec2.sh new file mode 100755 index 00000000..ed3350ca --- /dev/null +++ b/tests/benchmarks/run_benchmarks_ec2.sh @@ -0,0 +1,85 @@ +#!/bin/bash +# Shared benchmark execution functions for AWS EC2 environment + +set -e + +# Get script location to handle relative paths correctly +BENCHMARK_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +REPO_ROOT="$(cd "$BENCHMARK_DIR/../.." && pwd)" + +# Default settings +DB_TYPE=${DB_TYPE:-"postgres"} +TEST_TYPE=${TEST_TYPE:-"all"} # options: all, end_to_end, db_retrieval, semantic_search, embedding +OUTPUT_DIR=${OUTPUT_DIR:-"$REPO_ROOT/results"} + +mkdir -p "$OUTPUT_DIR" + +run_benchmarks() { + local db=$1 + local type=$2 + local timestamp=$(date +%Y%m%d_%H%M%S) + local output_json="$OUTPUT_DIR/results_${db}_${type}_${timestamp}.json" + local output_csv="$OUTPUT_DIR/report_${db}_${type}_${timestamp}.csv" + + echo "====================================================" + echo "Running benchmarks for: DB=$db, Type=$type" + echo "Output JSON: $output_json" + echo "Output CSV: $output_csv" + echo "====================================================" + + # Determine pytest filter (-k) based on test type + local filter="" + case $type in + "end_to_end") filter="TestEndToEndRecallBenchmarks" ;; + "db_retrieval") filter="DatabaseEmbeddingRetrievalBenchmarks or DatabaseFactContentRetrievalBenchmarks" ;; + "semantic_search") filter="TestSemanticSearchBenchmarks" ;; + "embedding") filter="TestQueryEmbeddingBenchmarks" ;; + "all") filter="" ;; + *) echo "Unknown test type: $type"; exit 1 ;; + esac + + # Add database filter + if [[ -n "$filter" ]]; then + filter="($filter) and $db" + else + filter="$db" + fi + + # Run benchmarks from repo root + ( + cd "$REPO_ROOT" + uv run pytest -m benchmark \ + --benchmark-only \ + tests/benchmarks/test_recall_benchmarks.py \ + -k "$filter" \ + -v \ + --benchmark-json="$output_json" + + # Automatically convert to CSV + if [[ -f "$output_json" ]]; then + echo "Converting results to CSV..." + uv run python tests/benchmarks/generate_percentile_report.py \ + "$output_json" \ + csv \ + "$output_csv" + echo "CSV Report generated: $output_csv" + else + echo "Warning: JSON results file not found, skipping CSV generation." + fi + ) +} + +# If script is executed directly (not sourced), run based on env vars +if [[ "${BASH_SOURCE[0]}" == "${0}" ]]; then + # Print usage info if help requested + if [[ "$1" == "--help" || "$1" == "-h" ]]; then + echo "Usage: DB_TYPE=[postgres|mysql] TEST_TYPE=[all|end_to_end|db_retrieval|semantic_search|embedding] $0" + exit 0 + fi + + run_benchmarks "$DB_TYPE" "$TEST_TYPE" + + echo "====================================================" + echo "Benchmark Run Complete" + echo "====================================================" +fi diff --git a/tests/benchmarks/semantic_accuracy_dataset.py b/tests/benchmarks/semantic_accuracy_dataset.py new file mode 100644 index 00000000..ab15f3eb --- /dev/null +++ b/tests/benchmarks/semantic_accuracy_dataset.py @@ -0,0 +1,148 @@ +""" +Curated semantic-accuracy dataset for recall evaluation. + +Each query maps to one or more acceptable facts (strings) that should be returned by recall. +More relevant facts per query = finer-grained recall metrics. +""" + +DATASET = { + "facts": [ + # Location facts (0-4) + "User lives in Paris (id: 0)", + "User's apartment is in the 11th arrondissement (id: 1)", + "User moved to France 3 years ago (id: 2)", + "User previously lived in London (id: 3)", + "User's office is near the Eiffel Tower (id: 4)", + # Color/preference facts (5-9) + "User's favorite color is blue (id: 5)", + "User prefers navy blue over light blue (id: 6)", + "User likes wearing blue shirts (id: 7)", + "User painted their room blue (id: 8)", + "User's car is blue (id: 9)", + # Food facts (10-19) + "User likes pizza (id: 10)", + "User enjoys Italian cuisine (id: 11)", + "User's favorite pizza topping is pepperoni (id: 12)", + "User likes coffee (id: 13)", + "User drinks espresso every morning (id: 14)", + "User prefers dark roast coffee (id: 15)", + "User enjoys sushi (id: 16)", + "User is vegetarian on weekdays (id: 17)", + "User loves Thai food (id: 18)", + "User dislikes cilantro (id: 19)", + # Work facts (20-24) + "User works at Tech Corp (id: 20)", + "User is a software engineer (id: 21)", + "User has been at Tech Corp for 2 years (id: 22)", + "User works remotely on Fridays (id: 23)", + "User's manager is named Sarah (id: 24)", + # Hobby facts (25-34) + "User enjoys hiking (id: 25)", + "User hiked Mont Blanc last summer (id: 26)", + "User goes hiking every weekend (id: 27)", + "User enjoys cooking (id: 28)", + "User took a cooking class in Italy (id: 29)", + "User specializes in French cuisine (id: 30)", + "User plays guitar (id: 31)", + "User reads science fiction (id: 32)", + "User practices yoga (id: 33)", + "User runs marathons (id: 34)", + # Settings/preferences (35-39) + "User prefers dark mode (id: 35)", + "User uses vim keybindings (id: 36)", + "User prefers metric units (id: 37)", + "User's timezone is CET (id: 38)", + "User speaks French and English (id: 39)", + # Personal facts (40-49) + "User's birthday is March 15th (id: 40)", + "User was born in 1990 (id: 41)", + "User is 34 years old (id: 42)", + "User has 2 cats (id: 43)", + "User's cats are named Luna and Shadow (id: 44)", + "User adopted the cats in 2021 (id: 45)", + "User is married (id: 46)", + "User's spouse works in finance (id: 47)", + "User has a brother named Tom (id: 48)", + "User's mother lives in Spain (id: 49)", + ], + # query -> list of acceptable facts (relevant set) + # Each query has 5-10 relevant facts for finer-grained recall + "queries": { + "Where do I live?": [ + "User lives in Paris (id: 0)", + "User's apartment is in the 11th arrondissement (id: 1)", + "User moved to France 3 years ago (id: 2)", + "User previously lived in London (id: 3)", + "User's office is near the Eiffel Tower (id: 4)", + "User's timezone is CET (id: 38)", + ], + "What's my favorite color?": [ + "User's favorite color is blue (id: 5)", + "User prefers navy blue over light blue (id: 6)", + "User likes wearing blue shirts (id: 7)", + "User painted their room blue (id: 8)", + "User's car is blue (id: 9)", + ], + "What food do I like?": [ + "User likes pizza (id: 10)", + "User enjoys Italian cuisine (id: 11)", + "User's favorite pizza topping is pepperoni (id: 12)", + "User likes coffee (id: 13)", + "User enjoys sushi (id: 16)", + "User loves Thai food (id: 18)", + ], + "Tell me about my coffee preferences": [ + "User likes coffee (id: 13)", + "User drinks espresso every morning (id: 14)", + "User prefers dark roast coffee (id: 15)", + ], + "Where do I work?": [ + "User works at Tech Corp (id: 20)", + "User is a software engineer (id: 21)", + "User has been at Tech Corp for 2 years (id: 22)", + "User works remotely on Fridays (id: 23)", + "User's manager is named Sarah (id: 24)", + "User's office is near the Eiffel Tower (id: 4)", + ], + "What do I enjoy doing outdoors?": [ + "User enjoys hiking (id: 25)", + "User hiked Mont Blanc last summer (id: 26)", + "User goes hiking every weekend (id: 27)", + "User runs marathons (id: 34)", + ], + "What are my cooking skills?": [ + "User enjoys cooking (id: 28)", + "User took a cooking class in Italy (id: 29)", + "User specializes in French cuisine (id: 30)", + ], + "What hobbies do I have?": [ + "User enjoys hiking (id: 25)", + "User goes hiking every weekend (id: 27)", + "User enjoys cooking (id: 28)", + "User plays guitar (id: 31)", + "User reads science fiction (id: 32)", + "User practices yoga (id: 33)", + "User runs marathons (id: 34)", + ], + "Do I prefer dark mode?": [ + "User prefers dark mode (id: 35)", + "User uses vim keybindings (id: 36)", + ], + "When is my birthday?": [ + "User's birthday is March 15th (id: 40)", + "User was born in 1990 (id: 41)", + "User is 34 years old (id: 42)", + ], + "Do I have any pets?": [ + "User has 2 cats (id: 43)", + "User's cats are named Luna and Shadow (id: 44)", + "User adopted the cats in 2021 (id: 45)", + ], + "Tell me about my family": [ + "User is married (id: 46)", + "User's spouse works in finance (id: 47)", + "User has a brother named Tom (id: 48)", + "User's mother lives in Spain (id: 49)", + ], + }, +} diff --git a/tests/benchmarks/semantic_accuracy_metrics.py b/tests/benchmarks/semantic_accuracy_metrics.py new file mode 100644 index 00000000..46d41351 --- /dev/null +++ b/tests/benchmarks/semantic_accuracy_metrics.py @@ -0,0 +1,47 @@ +import math + + +def recall_at_k(relevant: set[str], retrieved: list[str], k: int) -> float: + if not relevant: + return 0.0 + topk = retrieved[:k] + hit = any(item in relevant for item in topk) + return 1.0 if hit else 0.0 + + +def precision_at_k(relevant: set[str], retrieved: list[str], k: int) -> float: + if k <= 0: + return 0.0 + topk = retrieved[:k] + if not topk: + return 0.0 + hits = sum(1 for item in topk if item in relevant) + return hits / min(k, len(topk)) + + +def mrr(relevant: set[str], retrieved: list[str]) -> float: + for i, item in enumerate(retrieved, start=1): + if item in relevant: + return 1.0 / i + return 0.0 + + +def ndcg_at_k(relevant: set[str], retrieved: list[str], k: int) -> float: + if k <= 0: + return 0.0 + topk = retrieved[:k] + if not topk: + return 0.0 + + def dcg(items: list[str]) -> float: + score = 0.0 + for i, item in enumerate(items, start=1): + rel = 1.0 if item in relevant else 0.0 + score += rel / math.log2(i + 1) + return score + + ideal = list(relevant)[:k] + ideal_dcg = dcg(ideal) + if ideal_dcg == 0.0: + return 0.0 + return dcg(topk) / ideal_dcg diff --git a/tests/benchmarks/setup_ec2_benchmarks.sh b/tests/benchmarks/setup_ec2_benchmarks.sh new file mode 100755 index 00000000..ed4fad5e --- /dev/null +++ b/tests/benchmarks/setup_ec2_benchmarks.sh @@ -0,0 +1,42 @@ +#!/bin/bash +# Setup script for running benchmarks on AWS EC2 + +set -e + +echo "Setting up Memori benchmarks on EC2..." + +# Install system dependencies +sudo apt-get update +sudo apt-get install -y \ + python3.12 \ + python3.12-venv \ + git \ + curl \ + build-essential \ + postgresql-client \ + default-mysql-client + +# Install uv +curl -LsSf https://astral.sh/uv/install.sh | sh +source $HOME/.cargo/env + +# Clone repository (replace with your repo URL if not already cloned) +# git clone https://github.com/MemoriLabs/Memori.git +# cd Memori + +# Sync dependencies +uv sync --all-extras + +# Source the runner to get run_benchmarks function +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +source "$SCRIPT_DIR/run_benchmarks_ec2.sh" + +echo "Setup complete!" +echo "" +echo "To run all benchmarks for Postgres:" +echo " DB_TYPE=postgres TEST_TYPE=all ./tests/benchmarks/run_benchmarks_ec2.sh" +echo "" +echo "To run end-to-end benchmarks for MySQL:" +echo " DB_TYPE=mysql TEST_TYPE=end_to_end ./tests/benchmarks/run_benchmarks_ec2.sh" +echo "" +echo "Results will be automatically saved to the ./results directory as CSV." diff --git a/tests/benchmarks/test_recall_accuracy.py b/tests/benchmarks/test_recall_accuracy.py new file mode 100644 index 00000000..cdf38a09 --- /dev/null +++ b/tests/benchmarks/test_recall_accuracy.py @@ -0,0 +1,45 @@ +import random + +from memori.memory.recall import Recall + + +def test_recall_accuracy_topk(memori_instance, entity_with_n_facts): + """ + Accuracy proxy: for a sample of stored facts, querying with the exact fact text + should retrieve that fact in top-k (ideally top-1). + + This validates the end-to-end recall pipeline returns the correct row given an + exact-match query (embedding + DB pull + FAISS + content fetch). + """ + entity_db_id = entity_with_n_facts["entity_db_id"] + facts = entity_with_n_facts["facts"] + + rng = random.Random(42) + sample_size = min(10, len(facts)) + sampled = rng.sample(facts, k=sample_size) + + recall = Recall(memori_instance.config) + + top1_hits = 0 + top5_hits = 0 + + for fact in sampled: + results = recall.search_facts(query=fact, limit=5, entity_id=entity_db_id) + contents = [r.get("content") for r in results] + + if contents and contents[0] == fact: + top1_hits += 1 + if fact in contents: + top5_hits += 1 + + # Print a small summary if running with -s + db_type = entity_with_n_facts["db_type"] + n = entity_with_n_facts["fact_count"] + size = entity_with_n_facts["content_size"] + print( + f"[recall-accuracy] db={db_type} n={n} size={size} " + f"top1={top1_hits}/{sample_size} top5={top5_hits}/{sample_size}" + ) + + # Hard assertions: exact-match should always be in top-5 for this pipeline. + assert top5_hits == sample_size diff --git a/tests/benchmarks/test_recall_benchmarks.py b/tests/benchmarks/test_recall_benchmarks.py new file mode 100644 index 00000000..c455dfd6 --- /dev/null +++ b/tests/benchmarks/test_recall_benchmarks.py @@ -0,0 +1,182 @@ +"""Performance benchmarks for Memori recall functionality.""" + +import pytest + +from memori._search import find_similar_embeddings +from memori.llm._embeddings import embed_texts +from memori.memory.recall import Recall +from tests.benchmarks.memory_utils import measure_peak_rss_bytes + + +@pytest.mark.benchmark +class TestQueryEmbeddingBenchmarks: + """Benchmarks for query embedding generation.""" + + def test_benchmark_query_embedding_short(self, benchmark, sample_queries): + """Benchmark embedding generation for short queries.""" + query = sample_queries["short"][0] + + def _embed(): + return embed_texts(query) + + result = benchmark(_embed) + assert len(result) > 0 + assert len(result[0]) > 0 + + def test_benchmark_query_embedding_medium(self, benchmark, sample_queries): + """Benchmark embedding generation for medium-length queries.""" + query = sample_queries["medium"][0] + + def _embed(): + return embed_texts(query) + + result = benchmark(_embed) + assert len(result) > 0 + assert len(result[0]) > 0 + + def test_benchmark_query_embedding_long(self, benchmark, sample_queries): + """Benchmark embedding generation for long queries.""" + query = sample_queries["long"][0] + + def _embed(): + return embed_texts(query) + + result = benchmark(_embed) + assert len(result) > 0 + assert len(result[0]) > 0 + + def test_benchmark_query_embedding_batch(self, benchmark, sample_queries): + """Benchmark embedding generation for multiple queries at once.""" + queries = sample_queries["short"][:5] + + def _embed(): + return embed_texts(queries) + + result = benchmark(_embed) + assert len(result) == len(queries) + assert all(len(emb) > 0 for emb in result) + + +@pytest.mark.benchmark +class TestDatabaseEmbeddingRetrievalBenchmarks: + """Benchmarks for database embedding retrieval.""" + + def test_benchmark_db_embedding_retrieval( + self, benchmark, memori_instance, entity_with_n_facts + ): + """Benchmark retrieving embeddings from database for different fact counts.""" + entity_db_id = entity_with_n_facts["entity_db_id"] + fact_count = entity_with_n_facts["fact_count"] + entity_fact_driver = memori_instance.config.storage.driver.entity_fact + + def _retrieve(): + return entity_fact_driver.get_embeddings(entity_db_id, limit=fact_count) + + _, peak_rss = measure_peak_rss_bytes(_retrieve) + if peak_rss is not None: + benchmark.extra_info["peak_rss_bytes"] = peak_rss + + result = benchmark(_retrieve) + assert len(result) == fact_count + assert all("id" in row and "content_embedding" in row for row in result) + + +@pytest.mark.benchmark +class TestDatabaseFactContentRetrievalBenchmarks: + """Benchmarks for fetching fact content by ids (final recall DB step). + + This benchmarks the final step after semantic search has already identified + the top-k most similar embeddings. We only retrieve content for those top results + (typically 5-10 facts), not all facts in the database. + """ + + @pytest.mark.parametrize("retrieval_limit", [5, 10], ids=["limit5", "limit10"]) + def test_benchmark_db_fact_content_retrieval( + self, benchmark, memori_instance, entity_with_n_facts, retrieval_limit + ): + """Benchmark retrieving content for top-k facts after semantic search. + + Args: + retrieval_limit: Number of fact IDs to retrieve content for (after semantic + search has already filtered to top results). This should be small (5-10). + """ + entity_db_id = entity_with_n_facts["entity_db_id"] + entity_fact_driver = memori_instance.config.storage.driver.entity_fact + + # Simulate semantic search returning top-k IDs (outside benchmark timing) + # In real flow: get_embeddings(embeddings_limit=1000) -> FAISS search -> top-k IDs + seed_rows = entity_fact_driver.get_embeddings( + entity_db_id, limit=retrieval_limit + ) + fact_ids = [row["id"] for row in seed_rows] + + def _retrieve(): + return entity_fact_driver.get_facts_by_ids(fact_ids) + + _, peak_rss = measure_peak_rss_bytes(_retrieve) + if peak_rss is not None: + benchmark.extra_info["peak_rss_bytes"] = peak_rss + + result = benchmark(_retrieve) + assert len(result) == len(fact_ids) + assert all("id" in row and "content" in row for row in result) + + +@pytest.mark.benchmark +class TestSemanticSearchBenchmarks: + """Benchmarks for semantic search (FAISS similarity search).""" + + def test_benchmark_semantic_search( + self, benchmark, memori_instance, entity_with_n_facts, sample_queries + ): + """Benchmark FAISS similarity search for different embedding counts.""" + entity_db_id = entity_with_n_facts["entity_db_id"] + fact_count = entity_with_n_facts["fact_count"] + entity_fact_driver = memori_instance.config.storage.driver.entity_fact + + # Pre-fetch embeddings (not part of benchmark) + db_results = entity_fact_driver.get_embeddings(entity_db_id, limit=fact_count) + embeddings = [(row["id"], row["content_embedding"]) for row in db_results] + + # Pre-generate query embedding (not part of benchmark) + query = sample_queries["short"][0] + query_embedding = embed_texts(query)[0] + + # Benchmark only the similarity search + def _search(): + return find_similar_embeddings(embeddings, query_embedding, limit=5) + + _, peak_rss = measure_peak_rss_bytes(_search) + if peak_rss is not None: + benchmark.extra_info["peak_rss_bytes"] = peak_rss + + result = benchmark(_search) + assert len(result) > 0 + assert all(isinstance(item, tuple) and len(item) == 2 for item in result) + assert all( + isinstance(item[0], int) and isinstance(item[1], float) for item in result + ) + + +@pytest.mark.benchmark +class TestEndToEndRecallBenchmarks: + """Benchmarks for end-to-end recall (embed query + DB + FAISS + content fetch).""" + + def test_benchmark_end_to_end_recall( + self, benchmark, memori_instance, entity_with_n_facts, sample_queries + ): + entity_db_id = entity_with_n_facts["entity_db_id"] + query = sample_queries["short"][0] + + recall = Recall(memori_instance.config) + + def _recall(): + return recall.search_facts(query=query, limit=5, entity_id=entity_db_id) + + _, peak_rss = measure_peak_rss_bytes(_recall) + if peak_rss is not None: + benchmark.extra_info["peak_rss_bytes"] = peak_rss + + result = benchmark(_recall) + assert isinstance(result, list) + assert len(result) <= 5 diff --git a/tests/benchmarks/test_recall_semantic_accuracy.py b/tests/benchmarks/test_recall_semantic_accuracy.py new file mode 100644 index 00000000..ad89e0d6 --- /dev/null +++ b/tests/benchmarks/test_recall_semantic_accuracy.py @@ -0,0 +1,144 @@ +import random + +import pytest + +from memori.llm._embeddings import embed_texts +from memori.memory.recall import Recall +from tests.benchmarks.semantic_accuracy_dataset import DATASET +from tests.benchmarks.semantic_accuracy_metrics import ( + mrr, + ndcg_at_k, + precision_at_k, + recall_at_k, +) + + +def _embeddings_available() -> bool: + # If the embedding model can't load, Memori falls back to all-zeros embeddings. + # That makes semantic accuracy meaningless, so we skip instead of failing. + vec = embed_texts("sanity check")[0] + return any(v != 0.0 for v in vec) + + +def _generate_hard_distractors( + count: int, *, rng: random.Random, forbidden: set[str] +) -> list[str]: + cities = ["London", "Berlin", "Rome", "Madrid", "Lisbon", "Dublin", "Vienna"] + colors = ["red", "green", "yellow", "purple", "orange", "black", "white"] + foods = ["sushi", "tacos", "ramen", "burgers", "pasta", "salad", "ice cream"] + drinks = ["tea", "sparkling water", "matcha", "hot chocolate", "juice"] + companies = ["Acme Corp", "Globex", "Initech", "Hooli", "Soylent", "Umbrella"] + activities = ["running", "swimming", "reading", "gaming", "cycling", "yoga"] + themes = ["light mode", "system theme", "high contrast mode"] + birthdays = ["April 1st", "May 20th", "June 7th", "July 30th", "Oct 12th"] + pets = ["1 cat", "3 cats", "2 dogs", "a dog", "a cat", "no pets"] + + templates = [ + lambda v: f"User lives in {v}", + lambda v: f"User's favorite color is {v}", + lambda v: f"User likes {v}", + lambda v: f"User works at {v}", + lambda v: f"User enjoys {v}", + lambda v: f"User prefers {v}", + lambda v: f"User's birthday is {v}", + lambda v: f"User has {v}", + ] + values = [ + cities, + colors, + foods + drinks, + companies, + activities, + themes, + birthdays, + pets, + ] + + distractors: list[str] = [] + for i in range(count): + idx = i % len(templates) + base = templates[idx](rng.choice(values[idx])) + candidate = f"{base} (id: d{i})" + if candidate in forbidden: + candidate = f"{base} (note: alt) (id: d{i})" + distractors.append(candidate) + + return distractors + + +@pytest.mark.skipif(not _embeddings_available(), reason="Embedding model unavailable") +@pytest.mark.parametrize( + "total_records", [10, 100, 500, 1000, 5000], ids=lambda n: f"n{n}" +) +def test_semantic_recall_accuracy(memori_instance, total_records): + """ + Semantic accuracy evaluation (the "right way"): + - seed a labeled dataset of facts + - run a labeled set of queries + - compute standard IR metrics (Recall@k, Precision@k, MRR, nDCG@k) + """ + # Seed dataset facts + distractors into a fresh entity + facts = list(DATASET["facts"]) + queries = DATASET["queries"] + + # Expand to the requested total size by adding distractors. + # This lets us evaluate how accuracy changes as the number of stored records grows. + if total_records < len(facts): + pytest.skip( + f"total_records={total_records} is smaller than labeled fact count={len(facts)}" + ) + + distractor_count = total_records - len(facts) + rng = random.Random(123) + forbidden = set(facts) + distractors = _generate_hard_distractors( + distractor_count, rng=rng, forbidden=forbidden + ) + rng.shuffle(distractors) + facts.extend(distractors) + + entity_id = f"semantic-accuracy-entity-{total_records}" + memori_instance.attribution(entity_id=entity_id, process_id="semantic-accuracy") + entity_db_id = memori_instance.config.storage.driver.entity.create(entity_id) + + fact_embeddings = embed_texts(facts) + memori_instance.config.storage.driver.entity_fact.create( + entity_db_id, facts, fact_embeddings + ) + + # Make the evaluation honest: search across the full corpus for this N. + # Otherwise recall will only consider the first `recall_embeddings_limit` rows (default 1000). + memori_instance.config.recall_embeddings_limit = total_records + + recall = Recall(memori_instance.config) + + k = 5 + scores = { + "recall@5": [], + "precision@5": [], + "mrr": [], + "ndcg@5": [], + } + + for query, expected in queries.items(): + relevant = set(expected) + results = recall.search_facts(query=query, limit=k, entity_id=entity_db_id) + retrieved = [r.get("content", "") for r in results] + + scores["recall@5"].append(recall_at_k(relevant, retrieved, k)) + scores["precision@5"].append(precision_at_k(relevant, retrieved, k)) + scores["mrr"].append(mrr(relevant, retrieved)) + scores["ndcg@5"].append(ndcg_at_k(relevant, retrieved, k)) + + # Aggregate (mean) metrics + mean_scores = {k: sum(v) / len(v) for k, v in scores.items()} + + db_type = getattr(memori_instance, "_benchmark_db_type", "unknown") + print( + f"[semantic-accuracy] db={db_type} total={total_records} " + f"labeled={len(DATASET['facts'])} distractors={distractor_count} " + f"embeddings_limit={memori_instance.config.recall_embeddings_limit} {mean_scores}" + ) + + # We intentionally don't hard-fail on aggressive thresholds here because the goal + # is to *benchmark* accuracy as N grows. The printed metrics are the artifact.