From 29323227fae3768d46efb8dd7170f6edac2c4ea6 Mon Sep 17 00:00:00 2001 From: Robert Wlodarczyk Date: Thu, 2 Apr 2026 09:36:25 -0700 Subject: [PATCH 1/2] =?UTF-8?q?fix:=20comprehensive=20bug=20fixes=20across?= =?UTF-8?q?=20all=20services=20=E2=80=94=20round=2019?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fix ON CONFLICT column mismatch in brainztableinator external_links (3 cols vs 4-col unique constraint — caused runtime PostgreSQL error) - Fix DLQ suffix check in metrics_queries ("-dlq" → ".dlq") - Fix schema-init POSTGRES_USERNAME to use get_secret() for Docker secrets - Fix extractor resume path record_counts using filenames instead of data type names as keys (inconsistent with normal completion path) - Add missing 'id' field validation in graphinator non-batch path (prevented infinite requeue loop for malformed messages) - Remove incorrect conn.rollback() on autocommit connection in dashboard - Use AMQP_QUEUE_PREFIX_GRAPHINATOR constant in debug_message utility - Fix system_monitor display name replacement for MusicBrainz queues - Fix test-database-resilience.sh: remove health checks for unmapped ports, fix extractor service name reference - Fix update-project.sh insights health check suggestion - Move graphinator ack before counter increment to prevent double-ack - Add completed_files, queues, idle_mode to test fixture global reset Co-Authored-By: Claude Opus 4.6 (1M context) --- api/queries/metrics_queries.py | 2 +- brainztableinator/brainztableinator.py | 2 +- dashboard/dashboard.py | 7 +++---- extractor/src/extractor.rs | 6 +++++- graphinator/graphinator.py | 15 ++++++++++++--- schema-init/schema_init.py | 2 +- scripts/test-database-resilience.sh | 12 +----------- scripts/update-project.sh | 4 ++-- tests/api/test_metrics_queries.py | 8 ++++---- tests/brainztableinator/test_brainztableinator.py | 2 +- tests/conftest.py | 12 ++++++++++++ utilities/debug_message.py | 4 ++-- utilities/system_monitor.py | 2 +- 13 files changed, 46 insertions(+), 32 deletions(-) diff --git a/api/queries/metrics_queries.py b/api/queries/metrics_queries.py index 409d1a2e..5e829080 100644 --- a/api/queries/metrics_queries.py +++ b/api/queries/metrics_queries.py @@ -112,7 +112,7 @@ async def get_queue_history(pool: Any, range_value: str) -> dict[str, Any]: "deliver_rate": _round_or_int(row["deliver_rate"], is_raw=is_raw), } - target = dlq_summary if name.endswith("-dlq") else queues + target = dlq_summary if name.endswith(".dlq") else queues if name not in target: target[name] = {"history": [], "current": {}} target[name]["history"].append(point) diff --git a/brainztableinator/brainztableinator.py b/brainztableinator/brainztableinator.py index b676a7e5..3944ff6e 100644 --- a/brainztableinator/brainztableinator.py +++ b/brainztableinator/brainztableinator.py @@ -469,7 +469,7 @@ async def _insert_external_link( "INSERT INTO musicbrainz.external_links " "(mbid, entity_type, url, service_name) " "VALUES (%s, %s, %s, %s) " - "ON CONFLICT (mbid, entity_type, service_name) DO UPDATE SET url = EXCLUDED.url", + "ON CONFLICT (mbid, entity_type, service_name, url) DO UPDATE SET url = EXCLUDED.url", ( mbid, entity_type, diff --git a/dashboard/dashboard.py b/dashboard/dashboard.py index 52da0fcb..89cd0da6 100755 --- a/dashboard/dashboard.py +++ b/dashboard/dashboard.py @@ -403,10 +403,9 @@ async def _get_database_info_locked(self) -> list[DatabaseInfo]: ) finally: # The resilient connection manages the single connection internally; - # do NOT close it here — just ensure any in-progress transaction is rolled back. - if conn is not None: - with contextlib.suppress(Exception): - await conn.rollback() + # do NOT close it here. Autocommit is always True on pool connections, + # so rollback() is a no-op — omit it per the autocommit contract. + pass # Check Neo4j try: diff --git a/extractor/src/extractor.rs b/extractor/src/extractor.rs index 32107118..766ad51d 100644 --- a/extractor/src/extractor.rs +++ b/extractor/src/extractor.rs @@ -179,9 +179,13 @@ pub async fn process_discogs_data( state_marker.save(&marker_path).await?; // Send extraction_complete with actual record counts from state marker + // Use data type names (e.g., "artists") as keys — consistent with the normal + // completion path (lines 288-292) so consumers can look up counts reliably. let mut record_counts = HashMap::new(); for (file_name, file_state) in &state_marker.processing_phase.progress_by_file { - record_counts.insert(file_name.clone(), file_state.records_extracted); + if let Some(dt) = extract_data_type(file_name) { + record_counts.insert(dt.to_string(), file_state.records_extracted); + } } match mq_factory.create(&config.amqp_connection, &config.discogs_exchange_prefix).await { Ok(mq) => { diff --git a/graphinator/graphinator.py b/graphinator/graphinator.py index ff72d53c..1454a0be 100644 --- a/graphinator/graphinator.py +++ b/graphinator/graphinator.py @@ -1072,6 +1072,14 @@ async def handler(message: AbstractIncomingMessage) -> None: return record = normalize_record(data_type, record) + + # Validate required 'id' field — nack with requeue=False to avoid + # infinite requeue loop for malformed messages (matches tableinator) + if "id" not in record: + logger.error("❌ Message missing 'id' field", data_type=data_type) + await message.nack(requeue=False) + return + record_id = record.get("id", "unknown") record_name = record.get(name_field, default_name) @@ -1091,7 +1099,10 @@ def tx_fn(tx: Any) -> bool: updated = await session.execute_write(tx_fn) - # Increment counters only after successful write + # Ack first, then increment counters — avoids double-ack-then-nack + # if ack raises (exception handler would attempt nack on already-acked msg) + await message.ack() + message_counts[data_type] += 1 last_message_time[data_type] = time.time() if message_counts[data_type] % progress_interval == 0: @@ -1110,8 +1121,6 @@ def tx_fn(tx: Any) -> bool: f"🔄 Skipped {data_type[:-1]} (no changes needed)", record_id=record_id, ) - - await message.ack() except (ServiceUnavailable, SessionExpired) as e: logger.warning( f"⚠️ Neo4j unavailable, will retry {data_type[:-1]} message", diff --git a/schema-init/schema_init.py b/schema-init/schema_init.py index 62a7816a..28884dda 100755 --- a/schema-init/schema_init.py +++ b/schema-init/schema_init.py @@ -40,7 +40,7 @@ NEO4J_PASSWORD = get_secret("NEO4J_PASSWORD", "discogsography") POSTGRES_HOST = os.environ.get("POSTGRES_HOST", "localhost") -POSTGRES_USERNAME = os.environ.get("POSTGRES_USERNAME", "discogsography") +POSTGRES_USERNAME = get_secret("POSTGRES_USERNAME", "discogsography") POSTGRES_PASSWORD = get_secret("POSTGRES_PASSWORD", "discogsography") POSTGRES_DATABASE = os.environ.get("POSTGRES_DATABASE", "discogsography") diff --git a/scripts/test-database-resilience.sh b/scripts/test-database-resilience.sh index aad3332d..d18e0848 100755 --- a/scripts/test-database-resilience.sh +++ b/scripts/test-database-resilience.sh @@ -74,9 +74,6 @@ monitor_logs() { # Initial health check echo -e "\n${BLUE}🏥 Initial Health Check${NC}" echo "========================" -check_health "Extractor" 8000 -check_health "Graphinator" 8001 -check_health "Tableinator" 8002 check_health "Dashboard" 8003 check_health "API" 8004 @@ -153,16 +150,9 @@ sleep 30 # Check recovery echo -e "\n${BLUE}🔍 Checking RabbitMQ Recovery${NC}" sleep 10 -check_health "Extractor" 8000 -check_health "Graphinator" 8001 -check_health "Tableinator" 8002 - # Final health check echo -e "\n${BLUE}🏥 Final Health Check${NC}" echo "=====================" -check_health "Extractor" 8000 -check_health "Graphinator" 8001 -check_health "Tableinator" 8002 check_health "Dashboard" 8003 check_health "API" 8004 @@ -190,4 +180,4 @@ echo "" echo "For more detailed analysis, check individual service logs:" echo " docker compose logs graphinator | grep -i circuit" echo " docker compose logs tableinator | grep -i retry" -echo " docker compose logs extractor | grep -i connection" +echo " docker compose logs extractor-discogs extractor-musicbrainz | grep -i connection" diff --git a/scripts/update-project.sh b/scripts/update-project.sh index 84444d81..98c72d4d 100755 --- a/scripts/update-project.sh +++ b/scripts/update-project.sh @@ -1133,8 +1133,8 @@ show_verification_steps() { echo " curl -f http://localhost:8003/health" echo " # Check explore service" echo " curl -f http://localhost:8007/health" - echo " # Check insights service" - echo " curl -f http://localhost:8009/health" + echo " # Check insights service (internal only — proxied via API)" + echo " curl -f http://localhost:8004/api/insights/health" echo "" echo "4. 📊 Review dependency changes:" echo " # Check for security advisories" diff --git a/tests/api/test_metrics_queries.py b/tests/api/test_metrics_queries.py index 67764baf..c503c3f6 100644 --- a/tests/api/test_metrics_queries.py +++ b/tests/api/test_metrics_queries.py @@ -155,7 +155,7 @@ async def test_dlq_separation(self): "deliver_rate": 4.0, }, { - "queue_name": "graphinator-artists-dlq", + "queue_name": "graphinator-artists.dlq", "ts": "2026-03-25T10:00:00", "ready": 3, "unacked": 0, @@ -170,9 +170,9 @@ async def test_dlq_separation(self): result = await get_queue_history(pool, "1h") assert "graphinator-artists" in result["queues"] - assert "graphinator-artists-dlq" not in result["queues"] - assert "graphinator-artists-dlq" in result["dlq_summary"] - assert result["dlq_summary"]["graphinator-artists-dlq"]["current"]["ready"] == 3 + assert "graphinator-artists.dlq" not in result["queues"] + assert "graphinator-artists.dlq" in result["dlq_summary"] + assert result["dlq_summary"]["graphinator-artists.dlq"]["current"]["ready"] == 3 @pytest.mark.asyncio async def test_empty_data(self): diff --git a/tests/brainztableinator/test_brainztableinator.py b/tests/brainztableinator/test_brainztableinator.py index 4667ece4..4103955c 100644 --- a/tests/brainztableinator/test_brainztableinator.py +++ b/tests/brainztableinator/test_brainztableinator.py @@ -353,7 +353,7 @@ async def test_insert_external_link(self): mock_cursor.execute.assert_called_once() sql = mock_cursor.execute.call_args[0][0] assert "INSERT INTO musicbrainz.external_links" in sql - assert "ON CONFLICT (mbid, entity_type, service_name) DO UPDATE SET url = EXCLUDED.url" in sql + assert "ON CONFLICT (mbid, entity_type, service_name, url) DO UPDATE SET url = EXCLUDED.url" in sql @pytest.mark.asyncio async def test_insert_external_link_no_service_skips(self): diff --git a/tests/conftest.py b/tests/conftest.py index ec9ee6d5..03af9126 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -179,6 +179,9 @@ def reset_global_state() -> Iterator[None]: graphinator.graphinator.current_task = None graphinator.graphinator.current_progress = 0.0 graphinator.graphinator.consumer_tags = {} + graphinator.graphinator.completed_files = set() + graphinator.graphinator.queues = {} + graphinator.graphinator.idle_mode = False except (ImportError, AttributeError): pass @@ -205,6 +208,9 @@ def reset_global_state() -> Iterator[None]: "releases": 0.0, } tableinator.tableinator.consumer_tags = {} + tableinator.tableinator.completed_files = set() + tableinator.tableinator.queues = {} + tableinator.tableinator.idle_mode = False except (ImportError, AttributeError): pass @@ -239,6 +245,9 @@ def reset_global_state() -> Iterator[None]: graphinator.graphinator.current_task = None graphinator.graphinator.current_progress = 0.0 graphinator.graphinator.consumer_tags = {} + graphinator.graphinator.completed_files = set() + graphinator.graphinator.queues = {} + graphinator.graphinator.idle_mode = False except (ImportError, AttributeError): pass @@ -264,6 +273,9 @@ def reset_global_state() -> Iterator[None]: "releases": 0.0, } tableinator.tableinator.consumer_tags = {} + tableinator.tableinator.completed_files = set() + tableinator.tableinator.queues = {} + tableinator.tableinator.idle_mode = False except (ImportError, AttributeError): pass diff --git a/utilities/debug_message.py b/utilities/debug_message.py index 088eba50..4492aa25 100755 --- a/utilities/debug_message.py +++ b/utilities/debug_message.py @@ -7,7 +7,7 @@ import pika -from common.config import get_secret +from common.config import AMQP_QUEUE_PREFIX_GRAPHINATOR, get_secret def get_message_from_queue( @@ -141,7 +141,7 @@ def main() -> None: print(f"Invalid queue type: {queue_type}") sys.exit(1) - queue_name = f"discogsography-discogs-graphinator-{queue_type}" + queue_name = f"{AMQP_QUEUE_PREFIX_GRAPHINATOR}-{queue_type}" print(f"🔍 Debugging Queue: {queue_name}") diff --git a/utilities/system_monitor.py b/utilities/system_monitor.py index 2c1f0a44..0c70e26f 100755 --- a/utilities/system_monitor.py +++ b/utilities/system_monitor.py @@ -146,7 +146,7 @@ def monitor_system() -> None: total_messages = 0 for queue in queues: if "discogsography" in queue["name"] or "musicbrainz" in queue["name"]: - name = queue["name"].replace("discogsography-", "").replace("musicbrainz-", "mb-") + name = queue["name"].replace("discogsography-discogs-", "").replace("discogsography-musicbrainz-", "mb-") ready = queue.get("messages_ready", 0) unacked = queue.get("messages_unacknowledged", 0) total = queue.get("messages", 0) From 328ea22b95e2c1a3e142179ec01ee71c5a08f631 Mon Sep 17 00:00:00 2001 From: Robert Wlodarczyk Date: Thu, 2 Apr 2026 09:59:14 -0700 Subject: [PATCH 2/2] test: add coverage for graphinator missing-id validation - Add test_nack_message_missing_id_field for non-batch handler - Fix id check to use `not record.get("id")` since normalize_record always adds the key with None value when id is missing Co-Authored-By: Claude Opus 4.6 (1M context) --- graphinator/graphinator.py | 6 ++++-- tests/graphinator/test_graphinator.py | 13 +++++++++++++ 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/graphinator/graphinator.py b/graphinator/graphinator.py index 1454a0be..dda9dd2c 100644 --- a/graphinator/graphinator.py +++ b/graphinator/graphinator.py @@ -1074,8 +1074,10 @@ async def handler(message: AbstractIncomingMessage) -> None: record = normalize_record(data_type, record) # Validate required 'id' field — nack with requeue=False to avoid - # infinite requeue loop for malformed messages (matches tableinator) - if "id" not in record: + # infinite requeue loop for malformed messages (matches tableinator). + # Check both missing key and None value since normalize_record + # sets id=None when the raw message lacks an id field. + if not record.get("id"): logger.error("❌ Message missing 'id' field", data_type=data_type) await message.nack(requeue=False) return diff --git a/tests/graphinator/test_graphinator.py b/tests/graphinator/test_graphinator.py index 0ddccee2..5fe606c7 100644 --- a/tests/graphinator/test_graphinator.py +++ b/tests/graphinator/test_graphinator.py @@ -96,6 +96,19 @@ async def test_reject_on_shutdown(self) -> None: mock_message.nack.assert_called_once_with(requeue=True) mock_message.ack.assert_not_called() + @pytest.mark.asyncio + @patch("graphinator.graphinator.shutdown_requested", False) + async def test_nack_message_missing_id_field(self) -> None: + """Test that messages without 'id' are nacked with requeue=False.""" + mock_message = AsyncMock(spec=AbstractIncomingMessage) + mock_message.body = json.dumps({"name": "No ID Artist"}).encode() + + with patch("graphinator.graphinator.graph", MagicMock()): + await on_artist_message(mock_message) + + mock_message.nack.assert_called_once_with(requeue=False) + mock_message.ack.assert_not_called() + @pytest.mark.asyncio @patch("graphinator.graphinator.shutdown_requested", False) async def test_handle_processing_error(self, sample_artist_data: dict[str, Any]) -> None: