Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
"""Fix per-bank vector indexes to match configured extension

Revision ID: a4b5c6d7e8f9
Revises: c2d3e4f5g6h7, c5d6e7f8a9b0
Create Date: 2026-04-01

Migration d5e6f7a8b9c0 hardcoded HNSW when creating per-bank partial vector
indexes, ignoring HINDSIGHT_API_VECTOR_EXTENSION. Banks that existed when that
migration ran got HNSW indexes even when pgvectorscale (DiskANN) or vchord
was configured.

This migration detects the mismatch and recreates the affected indexes with
the correct type. Skipped entirely when the configured extension is pgvector
(the default), since those indexes are already correct.
"""

import os
from collections.abc import Sequence

from alembic import context, op
from sqlalchemy import text

revision: str = "a4b5c6d7e8f9"
down_revision: str | Sequence[str] | None = "d6e7f8a9b0c1"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None

_FACT_TYPES: dict[str, str] = {
"world": "worl",
"experience": "expr",
"observation": "obsv",
}


def _get_schema_prefix() -> str:
schema = context.config.get_main_option("target_schema")
return f'"{schema}".' if schema else ""


def _target_index_type() -> str | None:
"""Return the target index type, or None if pgvector (no fix needed)."""
ext = os.getenv("HINDSIGHT_API_VECTOR_EXTENSION", "pgvector").lower()
if ext == "pgvectorscale":
return "diskann"
elif ext == "vchord":
return "vchordrq"
return None


def _vector_index_using_clause() -> str:
"""Return the USING clause based on the configured vector extension."""
ext = os.getenv("HINDSIGHT_API_VECTOR_EXTENSION", "pgvector").lower()
if ext == "pgvectorscale":
return "USING diskann (embedding vector_cosine_ops) WITH (num_neighbors = 50)"
elif ext == "vchord":
return "USING vchordrq (embedding vector_l2_ops)"
else:
return "USING hnsw (embedding vector_cosine_ops)"


def upgrade() -> None:
target = _target_index_type()
if target is None:
# pgvector — indexes are already HNSW, nothing to fix
return

bind = op.get_bind()
schema_name = context.config.get_main_option("target_schema")
schema = _get_schema_prefix()
table_ref = f'"{schema_name}".memory_units' if schema_name else "memory_units"
banks_ref = f'"{schema_name}".banks' if schema_name else "banks"
using_clause = _vector_index_using_clause()
pg_schema = schema_name or "public"

rows = bind.execute(text(f"SELECT bank_id, internal_id FROM {banks_ref}")).fetchall() # noqa: S608
for row in rows:
bank_id = row[0]
internal_id = str(row[1]).replace("-", "")[:16]
escaped_bank_id = bank_id.replace("'", "''")
for ft, ft_short in _FACT_TYPES.items():
idx_name = f"idx_mu_emb_{ft_short}_{internal_id}"

# Check if this index exists and what type it is
idx_info = bind.execute(
text("SELECT indexdef FROM pg_indexes WHERE schemaname = :schema AND indexname = :idx"),
{"schema": pg_schema, "idx": idx_name},
).fetchone()

if idx_info is None:
# Index doesn't exist — create it with the correct type
bind.execute(
text(
f"CREATE INDEX IF NOT EXISTS {idx_name} "
f"ON {table_ref} {using_clause} "
f"WHERE fact_type = '{ft}' AND bank_id = '{escaped_bank_id}'"
)
)
continue

indexdef = idx_info[0].lower()
if target in indexdef:
# Already the correct type
continue

# Wrong type — drop and recreate
bind.execute(text(f"DROP INDEX IF EXISTS {schema}{idx_name}"))
bind.execute(
text(
f"CREATE INDEX IF NOT EXISTS {idx_name} "
f"ON {table_ref} {using_clause} "
f"WHERE fact_type = '{ft}' AND bank_id = '{escaped_bank_id}'"
)
)


def downgrade() -> None:
# Downgrade recreates indexes as HNSW (the original hardcoded behavior)
target = _target_index_type()
if target is None:
return

bind = op.get_bind()
schema_name = context.config.get_main_option("target_schema")
schema = _get_schema_prefix()
table_ref = f'"{schema_name}".memory_units' if schema_name else "memory_units"
banks_ref = f'"{schema_name}".banks' if schema_name else "banks"

rows = bind.execute(text(f"SELECT bank_id, internal_id FROM {banks_ref}")).fetchall() # noqa: S608
for row in rows:
bank_id = row[0]
internal_id = str(row[1]).replace("-", "")[:16]
escaped_bank_id = bank_id.replace("'", "''")
for ft, ft_short in _FACT_TYPES.items():
idx_name = f"idx_mu_emb_{ft_short}_{internal_id}"
bind.execute(text(f"DROP INDEX IF EXISTS {schema}{idx_name}"))
bind.execute(
text(
f"CREATE INDEX IF NOT EXISTS {idx_name} "
f"ON {table_ref} USING hnsw (embedding vector_cosine_ops) "
f"WHERE fact_type = '{ft}' AND bank_id = '{escaped_bank_id}'"
)
)
Original file line number Diff line number Diff line change
@@ -1,30 +1,25 @@
"""Add internal_id to banks and per-(bank, fact_type) partial HNSW indexes
"""Add internal_id to banks and per-(bank, fact_type) partial vector indexes

Revision ID: d5e6f7a8b9c0
Revises: a3b4c5d6e7f8
Create Date: 2026-03-11

This migration:
1. Adds internal_id UUID column to banks (stable identifier for index naming)
2. Drops the global HNSW index (competes with per-bank partial indexes)
3. Creates per-(bank_id, fact_type) partial HNSW indexes for all existing banks
(new banks get indexes created at bank-creation time via bank_utils.create_bank_hnsw_indexes)
2. Drops the global vector index (competes with per-bank partial indexes)
3. Creates per-(bank_id, fact_type) partial vector indexes for all existing banks
using the configured vector extension (HNSW for pgvector, DiskANN for
pgvectorscale, vchordrq for vchord).
(new banks get indexes created at bank-creation time via bank_utils.create_bank_vector_indexes)

Why per-(bank, fact_type) indexes:
- fact_type-only partial indexes are never chosen by the planner when bank_id is in the WHERE
clause, because the idx_memory_units_bank_id B-tree index always wins at planning time.
- Per-(bank, fact_type) partial indexes have both predicates matching → planner selects them.
- The global HNSW index competes for larger partitions (world, observation) and must be dropped.

For large deployments, create indexes CONCURRENTLY before running this migration:
SELECT internal_id, bank_id FROM banks;
-- for each bank and each fact_type in (world, experience, observation):
CREATE INDEX CONCURRENTLY IF NOT EXISTS idx_mu_emb_{ft}_{uid16}
ON memory_units USING hnsw (embedding vector_cosine_ops)
WHERE fact_type = '{ft}' AND bank_id = '{bank_id}';
DROP INDEX CONCURRENTLY IF EXISTS idx_memory_units_embedding;
- The global vector index competes for larger partitions (world, observation) and must be dropped.
"""

import os
from collections.abc import Sequence

from alembic import context, op
Expand All @@ -35,7 +30,7 @@
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None

_HNSW_FACT_TYPES: dict[str, str] = {
_FACT_TYPES: dict[str, str] = {
"world": "worl",
"experience": "expr",
"observation": "obsv",
Expand All @@ -47,6 +42,17 @@ def _get_schema_prefix() -> str:
return f'"{schema}".' if schema else ""


def _vector_index_using_clause() -> str:
"""Return the USING clause based on the configured vector extension."""
ext = os.getenv("HINDSIGHT_API_VECTOR_EXTENSION", "pgvector").lower()
if ext == "pgvectorscale":
return "USING diskann (embedding vector_cosine_ops) WITH (num_neighbors = 50)"
elif ext == "vchord":
return "USING vchordrq (embedding vector_l2_ops)"
else:
return "USING hnsw (embedding vector_cosine_ops)"


def upgrade() -> None:
schema = _get_schema_prefix()

Expand All @@ -56,33 +62,35 @@ def upgrade() -> None:
)
op.execute(f"ALTER TABLE {schema}banks ADD CONSTRAINT banks_internal_id_unique UNIQUE (internal_id)")

# 2. Drop any fact_type-only partial HNSW indexes that may exist from prior migrations
# 2. Drop any fact_type-only partial indexes that may exist from prior migrations
# (bank_id B-tree always wins over them when bank_id is in the WHERE clause)
op.execute(f"DROP INDEX IF EXISTS {schema}idx_mu_emb_world")
op.execute(f"DROP INDEX IF EXISTS {schema}idx_mu_emb_observation")
op.execute(f"DROP INDEX IF EXISTS {schema}idx_mu_emb_experience")

# 4. Drop global HNSW index (competes with per-bank partial indexes)
# 4. Drop global vector index (competes with per-bank partial indexes)
op.execute(f"DROP INDEX IF EXISTS {schema}idx_memory_units_embedding")

# 5. Create per-(bank, fact_type) partial HNSW indexes for all existing banks
# 5. Create per-(bank, fact_type) partial vector indexes for all existing banks
# using the configured extension (HNSW / DiskANN / vchordrq)
bind = op.get_bind()
schema_name = context.config.get_main_option("target_schema")
table_ref = f'"{schema_name}".memory_units' if schema_name else "memory_units"
banks_ref = f'"{schema_name}".banks' if schema_name else "banks"
using_clause = _vector_index_using_clause()

rows = bind.execute(text(f"SELECT bank_id, internal_id FROM {banks_ref}")).fetchall() # noqa: S608
for row in rows:
bank_id = row[0]
internal_id = str(row[1]).replace("-", "")[:16]
escaped_bank_id = bank_id.replace("'", "''")
for ft, ft_short in _HNSW_FACT_TYPES.items():
for ft, ft_short in _FACT_TYPES.items():
idx_name = f"idx_mu_emb_{ft_short}_{internal_id}"
# Index name is schema-unqualified (indexes live in the schema of their table)
bind.execute(
text(
f"CREATE INDEX IF NOT EXISTS {idx_name} "
f"ON {table_ref} USING hnsw (embedding vector_cosine_ops) "
f"ON {table_ref} {using_clause} "
f"WHERE fact_type = '{ft}' AND bank_id = '{escaped_bank_id}'"
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ async def get_bank_profile(pool, bank_id: str) -> BankProfile:

# Bank doesn't exist, create with defaults.
# Generate internal_id here so we control the value and can use it
# immediately for HNSW index creation without a RETURNING round-trip.
# immediately for vector index creation without a RETURNING round-trip.
internal_id = uuid.uuid4()
inserted = await conn.fetchval(
f"""
Expand Down
2 changes: 1 addition & 1 deletion skills/hindsight-docs/references/openapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"name": "Apache 2.0",
"url": "https://www.apache.org/licenses/LICENSE-2.0.html"
},
"version": "0.4.21"
"version": "0.4.22"
},
"paths": {
"/health": {
Expand Down
Loading