Skip to content

Commit da24ec2

Browse files
fix: comprehensive bug fixes across all services (#233)
* fix: comprehensive bug fixes across all services Critical fixes: - brainzgraphinator: fix wrong keys for mbid, mb_type, begin_date/end_date (always null/wrong) - brainztableinator: fix begin_date/end_date reading from wrong location - insights: fix infinite loop in cache invalidate_all (string/int cursor comparison) - insights: include metric param in top-artists cache key to prevent wrong cached responses - api: fix KeyError on missing 'total' column in queue history endpoint - extractor: gate state marker completion behind success check (prevent silent data loss) - extractor: use DataType::discogs() instead of DataType::all() to avoid publishing to non-existent exchange - docker-compose.prod: add missing production secret overrides for 3 MusicBrainz services - scripts: fix SQL/command injection in reset-password.sh - dashboard: fix PostgreSQL connection leak in get_database_info (called every 2s) - schema-init: set autocommit for DDL to prevent cascading InFailedSqlTransaction Important fixes: - explore: fix _doExplore -> _loadExplore (credits profile broken) - explore: fix window.graphController -> window.exploreApp (NLQ entity links broken) - explore: fix SSE stream eventType not persisting across chunks, flush buffer on done - api/sync: release Redis lock after task creation, add password-change token check - common: fix active_connections counter growing monotonically in PostgreSQL pool - common: lazy-init asyncio.Lock in CircuitBreaker (RuntimeError on Python 3.12+) - common: fix CircuitBreaker HALF_OPEN race condition - graphinator: fix DETACH DELETE count always returning 0, use summary.counters - graphinator: use MERGE instead of MATCH for Master node in process_release - graphinator: add None guard before graph.close() - brainz services: reset active_connection/active_channel to None after recovery failure - dashboard: handle JSONDecodeError in admin proxy handlers (return 400 not 500) - dashboard: consume Neo4j result before issuing second query on same session - docker-compose.prod: remove unnecessary Neo4j credentials from explore service - mcp-server: add error handling for API failures in all tool handlers - insights: wrap _log_computation in error handlers to prevent exception masking - tableinator: parse started_at as datetime with UTC timezone for purge_stale_rows - utilities: fix connection leak in debug_message.py - utilities: replace hardcoded Neo4j password in system_monitor.py - tests: fix test names, mock cursor types, and assertions to match bug fixes Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * test: fix JS test and add coverage for new bug fix code - Fix credits.test.js: update mock from _doExplore to _loadExplore and fix assertion order to match the corrected credits.js code - Add CircuitBreaker tests: lazy asyncio.Lock init, HALF_OPEN race fix for both sync and async paths - Add insights computation tests: verify original exception propagates when _log_computation also fails - Add MCP server tests: HTTP error and network error return error dicts - Add admin proxy tests: malformed JSON body returns 400 for all proxy endpoints - Add sync password_changed test: token issued before password change returns 401 - Add tableinator purge test: naive datetime (no timezone) gets UTC Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * test: cover remaining uncovered lines from bug fixes - insights/computations.py: add tests for label_longevity, anniversaries, data_completeness, and rarity exception masking prevention - common/db_resilience.py: add tests for lazy lock init in _on_success_async and _on_failure_async - explore/app.js: add test for NLQ entity click calling _switchPane and _loadExplore on exploreApp - explore/api-client.js: add tests for SSE buffer flush on stream completion and eventType persisting across chunk boundaries Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 5073575 commit da24ec2

38 files changed

Lines changed: 916 additions & 144 deletions

api/queries/metrics_queries.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ async def get_queue_history(pool: Any, range_value: str) -> dict[str, Any]:
9999
"ts": row["ts"],
100100
"ready": _round_or_int(row["ready"], is_raw=is_raw),
101101
"unacked": _round_or_int(row["unacked"], is_raw=is_raw),
102-
"total": _round_or_int(row["total"], is_raw=is_raw),
102+
"total": _round_or_int(row.get("ready", 0), is_raw=is_raw) + _round_or_int(row.get("unacked", 0), is_raw=is_raw),
103103
"publish_rate": _round_or_int(row["publish_rate"], is_raw=is_raw),
104104
"deliver_rate": _round_or_int(row["deliver_rate"], is_raw=is_raw),
105105
}

api/routers/sync.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,17 @@ async def _get_current_user(
6464
detail="Token has been revoked",
6565
headers={"WWW-Authenticate": "Bearer"},
6666
)
67+
# Check if password was changed after token was issued
68+
if user_id and _redis:
69+
pw_changed = await _redis.get(f"password_changed:{user_id}")
70+
if pw_changed:
71+
iat = payload.get("iat")
72+
if iat and int(iat) < int(pw_changed):
73+
raise HTTPException(
74+
status_code=status.HTTP_401_UNAUTHORIZED,
75+
detail="Token has been revoked",
76+
headers={"WWW-Authenticate": "Bearer"},
77+
)
6778
return payload
6879
except ValueError as exc:
6980
raise HTTPException(
@@ -142,6 +153,7 @@ async def trigger_sync(
142153
# Set per-user cooldown to prevent rapid re-triggers
143154
if _redis:
144155
await _redis.setex(f"sync:cooldown:{user_id}", 600, "1")
156+
await _redis.delete(f"sync:lock:{user_id}")
145157

146158
logger.info("🔄 Sync triggered", user_id=user_id, sync_id=sync_id)
147159
return JSONResponse(content={"sync_id": sync_id, "status": "started"}, status_code=status.HTTP_202_ACCEPTED)

brainzgraphinator/brainzgraphinator.py

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -292,11 +292,13 @@ async def enrich_artist(tx: Any, record: dict[str, Any]) -> bool:
292292
" a.mb_updated_at = $mb_updated_at "
293293
"RETURN a.id AS matched_id",
294294
discogs_id=discogs_id,
295-
mbid=record.get("mbid"),
296-
mb_type=record.get("type"),
295+
mbid=record.get("mbid", record.get("id")),
296+
mb_type=record.get("mb_type", record.get("type")),
297297
mb_gender=record.get("gender"),
298-
mb_begin_date=record.get("begin_date"),
299-
mb_end_date=record.get("end_date"),
298+
mb_begin_date=record.get(
299+
"begin_date", (record.get("life_span") or {}).get("begin")
300+
),
301+
mb_end_date=record.get("end_date", (record.get("life_span") or {}).get("end")),
300302
mb_area=record.get("area"),
301303
mb_begin_area=record.get("begin_area"),
302304
mb_end_area=record.get("end_area"),
@@ -338,11 +340,13 @@ async def enrich_label(tx: Any, record: dict[str, Any]) -> bool:
338340
" l.mb_updated_at = $mb_updated_at "
339341
"RETURN l.id AS matched_id",
340342
discogs_id=discogs_id,
341-
mbid=record.get("mbid"),
342-
mb_type=record.get("type"),
343+
mbid=record.get("mbid", record.get("id")),
344+
mb_type=record.get("mb_type", record.get("type")),
343345
mb_label_code=record.get("label_code"),
344-
mb_begin_date=record.get("begin_date"),
345-
mb_end_date=record.get("end_date"),
346+
mb_begin_date=record.get(
347+
"begin_date", (record.get("life_span") or {}).get("begin")
348+
),
349+
mb_end_date=record.get("end_date", (record.get("life_span") or {}).get("end")),
346350
mb_area=record.get("area"),
347351
mb_updated_at=datetime.now(UTC).isoformat(),
348352
)
@@ -373,7 +377,7 @@ async def enrich_release(tx: Any, record: dict[str, Any]) -> bool:
373377
" r.mb_updated_at = $mb_updated_at "
374378
"RETURN r.id AS matched_id",
375379
discogs_id=discogs_id,
376-
mbid=record.get("mbid"),
380+
mbid=record.get("mbid", record.get("id")),
377381
mb_barcode=record.get("barcode"),
378382
mb_status=record.get("status"),
379383
mb_updated_at=datetime.now(UTC).isoformat(),
@@ -448,8 +452,8 @@ async def enrich_release_group(tx: Any, record: dict[str, Any]) -> bool:
448452
" m.mb_updated_at = $mb_updated_at "
449453
"RETURN m.id AS matched_id",
450454
discogs_id=discogs_id,
451-
mbid=record.get("mbid"),
452-
mb_type=record.get("type"),
455+
mbid=record.get("mbid", record.get("id")),
456+
mb_type=record.get("mb_type", record.get("type")),
453457
mb_secondary_types=record.get("secondary_types", []),
454458
mb_first_release_date=record.get("first_release_date"),
455459
mb_disambiguation=record.get("disambiguation"),
@@ -763,6 +767,8 @@ async def _recover_consumers() -> None:
763767
await temp_connection.close()
764768
except Exception: # nosec: B110
765769
pass
770+
active_connection = None
771+
active_channel = None
766772

767773

768774
async def main() -> None:

brainztableinator/brainztableinator.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,8 @@ async def _recover_consumers() -> None:
427427
await temp_connection.close()
428428
except Exception: # nosec: B110
429429
pass
430+
active_connection = None
431+
active_channel = None
430432

431433

432434
async def _insert_relationship(
@@ -498,9 +500,11 @@ async def process_artist(conn: Any, record: dict[str, Any]) -> None:
498500
record.get("sort_name", ""),
499501
record.get("mb_type", ""),
500502
record.get("gender", ""),
501-
record.get("begin_date"),
502-
record.get("end_date"),
503-
record.get("ended", False),
503+
record.get("begin_date", (record.get("life_span") or {}).get("begin")),
504+
record.get("end_date", (record.get("life_span") or {}).get("end")),
505+
record.get(
506+
"ended", (record.get("life_span") or {}).get("ended", False)
507+
),
504508
record.get("area", ""),
505509
record.get("begin_area", ""),
506510
record.get("end_area", ""),
@@ -543,9 +547,11 @@ async def process_label(conn: Any, record: dict[str, Any]) -> None:
543547
record.get("name", ""),
544548
record.get("mb_type", ""),
545549
record.get("label_code"),
546-
record.get("begin_date"),
547-
record.get("end_date"),
548-
record.get("ended", False),
550+
record.get("begin_date", (record.get("life_span") or {}).get("begin")),
551+
record.get("end_date", (record.get("life_span") or {}).get("end")),
552+
record.get(
553+
"ended", (record.get("life_span") or {}).get("ended", False)
554+
),
549555
record.get("area", ""),
550556
record.get("disambiguation", ""),
551557
record.get("discogs_label_id"),

common/db_resilience.py

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,17 +45,38 @@ def __init__(self, config: CircuitBreakerConfig):
4545
self.last_failure_time: datetime | None = None
4646
self.state = CircuitState.CLOSED
4747
self._lock = Lock()
48-
self._async_lock = asyncio.Lock()
48+
self._async_lock: asyncio.Lock | None = None
4949

5050
def call(self, func: Callable[..., T], *args: Any, **kwargs: Any) -> T:
5151
"""Execute function with circuit breaker protection."""
52+
execute_under_lock = False
5253
with self._lock:
5354
if self.state == CircuitState.OPEN:
5455
if self._should_attempt_reset():
5556
self.state = CircuitState.HALF_OPEN
5657
logger.info(f"🔄 {self.config.name}: Circuit breaker entering HALF_OPEN state")
58+
execute_under_lock = True
5759
else:
5860
raise Exception(f"{self.config.name}: Circuit breaker is OPEN")
61+
elif self.state == CircuitState.HALF_OPEN:
62+
execute_under_lock = True
63+
64+
if execute_under_lock:
65+
# Execute trial call under lock to prevent HALF_OPEN race
66+
try:
67+
result = func(*args, **kwargs)
68+
self.failure_count = 0
69+
if self.state != CircuitState.CLOSED:
70+
logger.info(f"✅ {self.config.name}: Circuit breaker reset to CLOSED")
71+
self.state = CircuitState.CLOSED
72+
return result
73+
except self.config.expected_exception:
74+
self.failure_count += 1
75+
self.last_failure_time = datetime.now(UTC)
76+
if self.failure_count >= self.config.failure_threshold and self.state != CircuitState.OPEN:
77+
logger.error(f"❌ {self.config.name}: Circuit breaker OPEN after {self.failure_count} failures")
78+
self.state = CircuitState.OPEN
79+
raise
5980

6081
try:
6182
result = func(*args, **kwargs)
@@ -67,13 +88,39 @@ def call(self, func: Callable[..., T], *args: Any, **kwargs: Any) -> T:
6788

6889
async def call_async(self, func: Callable[..., Any], *args: Any, **kwargs: Any) -> Any:
6990
"""Execute async function with circuit breaker protection."""
91+
if self._async_lock is None:
92+
self._async_lock = asyncio.Lock()
93+
execute_under_lock = False
7094
async with self._async_lock:
7195
if self.state == CircuitState.OPEN:
7296
if self._should_attempt_reset():
7397
self.state = CircuitState.HALF_OPEN
7498
logger.info(f"🔄 {self.config.name}: Circuit breaker entering HALF_OPEN state")
99+
execute_under_lock = True
75100
else:
76101
raise Exception(f"{self.config.name}: Circuit breaker is OPEN")
102+
elif self.state == CircuitState.HALF_OPEN:
103+
execute_under_lock = True
104+
105+
if execute_under_lock:
106+
# Execute trial call under lock to prevent HALF_OPEN race
107+
try:
108+
if asyncio.iscoroutinefunction(func):
109+
result = await func(*args, **kwargs)
110+
else:
111+
result = func(*args, **kwargs)
112+
self.failure_count = 0
113+
if self.state != CircuitState.CLOSED:
114+
logger.info(f"✅ {self.config.name}: Circuit breaker reset to CLOSED")
115+
self.state = CircuitState.CLOSED
116+
return result
117+
except self.config.expected_exception:
118+
self.failure_count += 1
119+
self.last_failure_time = datetime.now(UTC)
120+
if self.failure_count >= self.config.failure_threshold and self.state != CircuitState.OPEN:
121+
logger.error(f"❌ {self.config.name}: Circuit breaker OPEN after {self.failure_count} failures")
122+
self.state = CircuitState.OPEN
123+
raise
77124

78125
try:
79126
if asyncio.iscoroutinefunction(func):
@@ -110,6 +157,8 @@ def _on_failure(self) -> None:
110157

111158
async def _on_success_async(self) -> None:
112159
"""Handle successful async call."""
160+
if self._async_lock is None:
161+
self._async_lock = asyncio.Lock()
113162
async with self._async_lock:
114163
self.failure_count = 0
115164
if self.state != CircuitState.CLOSED:
@@ -118,6 +167,8 @@ async def _on_success_async(self) -> None:
118167

119168
async def _on_failure_async(self) -> None:
120169
"""Handle failed async call."""
170+
if self._async_lock is None:
171+
self._async_lock = asyncio.Lock()
121172
async with self._async_lock:
122173
self.failure_count += 1
123174
self.last_failure_time = datetime.now(UTC)

common/postgres_resilient.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,8 @@ def connection(self) -> Generator[psycopg.Connection[Any]]:
212212
if conn and not conn.closed and not self._closed:
213213
try:
214214
self.connections.put_nowait(conn)
215+
with self._lock:
216+
self.active_connections = max(0, self.active_connections - 1)
215217
except Full:
216218
# Pool is full, close connection
217219
with contextlib.suppress(Exception):
@@ -338,7 +340,6 @@ async def initialize(self) -> None:
338340
conn = await self._create_connection()
339341
if conn:
340342
await self.connections.put(conn)
341-
self.active_connections += 1
342343
except asyncio.QueueFull:
343344
break
344345
except Exception as e:

dashboard/admin_proxy.py

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
from fastapi import APIRouter, Query, Request, Response
1515
import httpx
16+
from starlette.responses import JSONResponse
1617
import structlog
1718

1819

@@ -91,7 +92,10 @@ async def proxy_login(request: Request) -> Response:
9192
"""Proxy login requests to the API service."""
9293
url = _build_url("/api/admin/auth/login")
9394
headers = _auth_headers(request)
94-
sanitised_body = await _validated_json_body(request)
95+
try:
96+
sanitised_body = await _validated_json_body(request)
97+
except json.JSONDecodeError:
98+
return JSONResponse(content={"detail": "Malformed JSON in request body"}, status_code=400)
9599
try:
96100
async with httpx.AsyncClient(timeout=30.0) as client:
97101
if sanitised_body:
@@ -154,7 +158,10 @@ async def proxy_trigger(request: Request) -> Response:
154158
"""Proxy extraction trigger requests to the API service."""
155159
url = _build_url("/api/admin/extractions/trigger")
156160
headers = _auth_headers(request)
157-
sanitised_body = await _validated_json_body(request)
161+
try:
162+
sanitised_body = await _validated_json_body(request)
163+
except json.JSONDecodeError:
164+
return JSONResponse(content={"detail": "Malformed JSON in request body"}, status_code=400)
158165
try:
159166
async with httpx.AsyncClient(timeout=30.0) as client:
160167
if sanitised_body:
@@ -173,7 +180,10 @@ async def proxy_trigger_musicbrainz(request: Request) -> Response:
173180
"""Proxy MusicBrainz extraction trigger requests to the API service."""
174181
url = _build_url("/api/admin/extractions/trigger")
175182
headers = _auth_headers(request)
176-
sanitised_body = await _validated_json_body(request)
183+
try:
184+
sanitised_body = await _validated_json_body(request)
185+
except json.JSONDecodeError:
186+
return JSONResponse(content={"detail": "Malformed JSON in request body"}, status_code=400)
177187
body_dict: dict = json.loads(sanitised_body) if sanitised_body else {}
178188
body_dict["source"] = "musicbrainz"
179189
payload = json.dumps(body_dict, separators=(",", ":")).encode()

dashboard/dashboard.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,7 @@ async def get_database_info(self) -> list[DatabaseInfo]:
348348
databases = []
349349

350350
# Check PostgreSQL
351+
conn = None
351352
try:
352353
if self.postgres_conn is None:
353354
raise RuntimeError("PostgreSQL resilient connection not initialized")
@@ -392,13 +393,18 @@ async def get_database_info(self) -> list[DatabaseInfo]:
392393
error=str(e),
393394
)
394395
)
396+
finally:
397+
if conn is not None:
398+
with contextlib.suppress(Exception):
399+
await conn.rollback()
395400

396401
# Check Neo4j
397402
try:
398403
if self.neo4j_driver:
399404
async with self.neo4j_driver.session() as session:
400405
result = await session.run("CALL dbms.components() YIELD name, versions")
401-
await result.single() # Consume result
406+
await result.single()
407+
await result.consume()
402408

403409
# Get database size
404410
result = await session.run("CALL apoc.meta.stats() YIELD nodeCount, relCount")

0 commit comments

Comments
 (0)