Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion api/queries/metrics_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ async def get_queue_history(pool: Any, range_value: str) -> dict[str, Any]:
"deliver_rate": _round_or_int(row["deliver_rate"], is_raw=is_raw),
}

target = dlq_summary if name.endswith("-dlq") else queues
target = dlq_summary if name.endswith(".dlq") else queues
if name not in target:
target[name] = {"history": [], "current": {}}
target[name]["history"].append(point)
Expand Down
2 changes: 1 addition & 1 deletion brainztableinator/brainztableinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ async def _insert_external_link(
"INSERT INTO musicbrainz.external_links "
"(mbid, entity_type, url, service_name) "
"VALUES (%s, %s, %s, %s) "
"ON CONFLICT (mbid, entity_type, service_name) DO UPDATE SET url = EXCLUDED.url",
"ON CONFLICT (mbid, entity_type, service_name, url) DO UPDATE SET url = EXCLUDED.url",
(
mbid,
entity_type,
Expand Down
7 changes: 3 additions & 4 deletions dashboard/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -403,10 +403,9 @@ async def _get_database_info_locked(self) -> list[DatabaseInfo]:
)
finally:
# The resilient connection manages the single connection internally;
# do NOT close it here — just ensure any in-progress transaction is rolled back.
if conn is not None:
with contextlib.suppress(Exception):
await conn.rollback()
# do NOT close it here. Autocommit is always True on pool connections,
# so rollback() is a no-op — omit it per the autocommit contract.
pass

# Check Neo4j
try:
Expand Down
6 changes: 5 additions & 1 deletion extractor/src/extractor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,13 @@ pub async fn process_discogs_data(
state_marker.save(&marker_path).await?;

// Send extraction_complete with actual record counts from state marker
// Use data type names (e.g., "artists") as keys — consistent with the normal
// completion path (lines 288-292) so consumers can look up counts reliably.
let mut record_counts = HashMap::new();
for (file_name, file_state) in &state_marker.processing_phase.progress_by_file {
record_counts.insert(file_name.clone(), file_state.records_extracted);
if let Some(dt) = extract_data_type(file_name) {
record_counts.insert(dt.to_string(), file_state.records_extracted);
}
}
match mq_factory.create(&config.amqp_connection, &config.discogs_exchange_prefix).await {
Ok(mq) => {
Expand Down
17 changes: 14 additions & 3 deletions graphinator/graphinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1072,6 +1072,16 @@ async def handler(message: AbstractIncomingMessage) -> None:
return

record = normalize_record(data_type, record)

# Validate required 'id' field — nack with requeue=False to avoid
# infinite requeue loop for malformed messages (matches tableinator).
# Check both missing key and None value since normalize_record
# sets id=None when the raw message lacks an id field.
if not record.get("id"):
logger.error("❌ Message missing 'id' field", data_type=data_type)
await message.nack(requeue=False)
return

record_id = record.get("id", "unknown")
record_name = record.get(name_field, default_name)

Expand All @@ -1091,7 +1101,10 @@ def tx_fn(tx: Any) -> bool:

updated = await session.execute_write(tx_fn)

# Increment counters only after successful write
# Ack first, then increment counters — avoids double-ack-then-nack
# if ack raises (exception handler would attempt nack on already-acked msg)
await message.ack()

message_counts[data_type] += 1
last_message_time[data_type] = time.time()
if message_counts[data_type] % progress_interval == 0:
Expand All @@ -1110,8 +1123,6 @@ def tx_fn(tx: Any) -> bool:
f"🔄 Skipped {data_type[:-1]} (no changes needed)",
record_id=record_id,
)

await message.ack()
except (ServiceUnavailable, SessionExpired) as e:
logger.warning(
f"⚠️ Neo4j unavailable, will retry {data_type[:-1]} message",
Expand Down
2 changes: 1 addition & 1 deletion schema-init/schema_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
NEO4J_PASSWORD = get_secret("NEO4J_PASSWORD", "discogsography")

POSTGRES_HOST = os.environ.get("POSTGRES_HOST", "localhost")
POSTGRES_USERNAME = os.environ.get("POSTGRES_USERNAME", "discogsography")
POSTGRES_USERNAME = get_secret("POSTGRES_USERNAME", "discogsography")
POSTGRES_PASSWORD = get_secret("POSTGRES_PASSWORD", "discogsography")
POSTGRES_DATABASE = os.environ.get("POSTGRES_DATABASE", "discogsography")

Expand Down
12 changes: 1 addition & 11 deletions scripts/test-database-resilience.sh
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,6 @@ monitor_logs() {
# Initial health check
echo -e "\n${BLUE}🏥 Initial Health Check${NC}"
echo "========================"
check_health "Extractor" 8000
check_health "Graphinator" 8001
check_health "Tableinator" 8002
check_health "Dashboard" 8003
check_health "API" 8004

Expand Down Expand Up @@ -153,16 +150,9 @@ sleep 30
# Check recovery
echo -e "\n${BLUE}🔍 Checking RabbitMQ Recovery${NC}"
sleep 10
check_health "Extractor" 8000
check_health "Graphinator" 8001
check_health "Tableinator" 8002

# Final health check
echo -e "\n${BLUE}🏥 Final Health Check${NC}"
echo "====================="
check_health "Extractor" 8000
check_health "Graphinator" 8001
check_health "Tableinator" 8002
check_health "Dashboard" 8003
check_health "API" 8004

Expand Down Expand Up @@ -190,4 +180,4 @@ echo ""
echo "For more detailed analysis, check individual service logs:"
echo " docker compose logs graphinator | grep -i circuit"
echo " docker compose logs tableinator | grep -i retry"
echo " docker compose logs extractor | grep -i connection"
echo " docker compose logs extractor-discogs extractor-musicbrainz | grep -i connection"
4 changes: 2 additions & 2 deletions scripts/update-project.sh
Original file line number Diff line number Diff line change
Expand Up @@ -1133,8 +1133,8 @@ show_verification_steps() {
echo " curl -f http://localhost:8003/health"
echo " # Check explore service"
echo " curl -f http://localhost:8007/health"
echo " # Check insights service"
echo " curl -f http://localhost:8009/health"
echo " # Check insights service (internal only — proxied via API)"
echo " curl -f http://localhost:8004/api/insights/health"
echo ""
echo "4. 📊 Review dependency changes:"
echo " # Check for security advisories"
Expand Down
8 changes: 4 additions & 4 deletions tests/api/test_metrics_queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ async def test_dlq_separation(self):
"deliver_rate": 4.0,
},
{
"queue_name": "graphinator-artists-dlq",
"queue_name": "graphinator-artists.dlq",
"ts": "2026-03-25T10:00:00",
"ready": 3,
"unacked": 0,
Expand All @@ -170,9 +170,9 @@ async def test_dlq_separation(self):
result = await get_queue_history(pool, "1h")

assert "graphinator-artists" in result["queues"]
assert "graphinator-artists-dlq" not in result["queues"]
assert "graphinator-artists-dlq" in result["dlq_summary"]
assert result["dlq_summary"]["graphinator-artists-dlq"]["current"]["ready"] == 3
assert "graphinator-artists.dlq" not in result["queues"]
assert "graphinator-artists.dlq" in result["dlq_summary"]
assert result["dlq_summary"]["graphinator-artists.dlq"]["current"]["ready"] == 3

@pytest.mark.asyncio
async def test_empty_data(self):
Expand Down
2 changes: 1 addition & 1 deletion tests/brainztableinator/test_brainztableinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,7 @@ async def test_insert_external_link(self):
mock_cursor.execute.assert_called_once()
sql = mock_cursor.execute.call_args[0][0]
assert "INSERT INTO musicbrainz.external_links" in sql
assert "ON CONFLICT (mbid, entity_type, service_name) DO UPDATE SET url = EXCLUDED.url" in sql
assert "ON CONFLICT (mbid, entity_type, service_name, url) DO UPDATE SET url = EXCLUDED.url" in sql

@pytest.mark.asyncio
async def test_insert_external_link_no_service_skips(self):
Expand Down
12 changes: 12 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,9 @@ def reset_global_state() -> Iterator[None]:
graphinator.graphinator.current_task = None
graphinator.graphinator.current_progress = 0.0
graphinator.graphinator.consumer_tags = {}
graphinator.graphinator.completed_files = set()
graphinator.graphinator.queues = {}
graphinator.graphinator.idle_mode = False
except (ImportError, AttributeError):
pass

Expand All @@ -205,6 +208,9 @@ def reset_global_state() -> Iterator[None]:
"releases": 0.0,
}
tableinator.tableinator.consumer_tags = {}
tableinator.tableinator.completed_files = set()
tableinator.tableinator.queues = {}
tableinator.tableinator.idle_mode = False
except (ImportError, AttributeError):
pass

Expand Down Expand Up @@ -239,6 +245,9 @@ def reset_global_state() -> Iterator[None]:
graphinator.graphinator.current_task = None
graphinator.graphinator.current_progress = 0.0
graphinator.graphinator.consumer_tags = {}
graphinator.graphinator.completed_files = set()
graphinator.graphinator.queues = {}
graphinator.graphinator.idle_mode = False
except (ImportError, AttributeError):
pass

Expand All @@ -264,6 +273,9 @@ def reset_global_state() -> Iterator[None]:
"releases": 0.0,
}
tableinator.tableinator.consumer_tags = {}
tableinator.tableinator.completed_files = set()
tableinator.tableinator.queues = {}
tableinator.tableinator.idle_mode = False
except (ImportError, AttributeError):
pass

Expand Down
13 changes: 13 additions & 0 deletions tests/graphinator/test_graphinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,19 @@ async def test_reject_on_shutdown(self) -> None:
mock_message.nack.assert_called_once_with(requeue=True)
mock_message.ack.assert_not_called()

@pytest.mark.asyncio
@patch("graphinator.graphinator.shutdown_requested", False)
async def test_nack_message_missing_id_field(self) -> None:
"""Test that messages without 'id' are nacked with requeue=False."""
mock_message = AsyncMock(spec=AbstractIncomingMessage)
mock_message.body = json.dumps({"name": "No ID Artist"}).encode()

with patch("graphinator.graphinator.graph", MagicMock()):
await on_artist_message(mock_message)

mock_message.nack.assert_called_once_with(requeue=False)
mock_message.ack.assert_not_called()

@pytest.mark.asyncio
@patch("graphinator.graphinator.shutdown_requested", False)
async def test_handle_processing_error(self, sample_artist_data: dict[str, Any]) -> None:
Expand Down
4 changes: 2 additions & 2 deletions utilities/debug_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import pika

from common.config import get_secret
from common.config import AMQP_QUEUE_PREFIX_GRAPHINATOR, get_secret


def get_message_from_queue(
Expand Down Expand Up @@ -141,7 +141,7 @@ def main() -> None:
print(f"Invalid queue type: {queue_type}")
sys.exit(1)

queue_name = f"discogsography-discogs-graphinator-{queue_type}"
queue_name = f"{AMQP_QUEUE_PREFIX_GRAPHINATOR}-{queue_type}"

print(f"🔍 Debugging Queue: {queue_name}")

Expand Down
2 changes: 1 addition & 1 deletion utilities/system_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ def monitor_system() -> None:
total_messages = 0
for queue in queues:
if "discogsography" in queue["name"] or "musicbrainz" in queue["name"]:
name = queue["name"].replace("discogsography-", "").replace("musicbrainz-", "mb-")
name = queue["name"].replace("discogsography-discogs-", "").replace("discogsography-musicbrainz-", "mb-")
ready = queue.get("messages_ready", 0)
unacked = queue.get("messages_unacknowledged", 0)
total = queue.get("messages", 0)
Expand Down
Loading