Skip to content

Commit e66b5bd

Browse files
feat: prevent concurrent Discogs and MusicBrainz extraction (#271)
* feat(extractor): add discogs_health_url to ExtractorConfig Add `discogs_health_url` field to `ExtractorConfig` with default value `http://extractor-discogs:8000/health`, readable via `DISCOGS_HEALTH_URL` env var. Used by the MusicBrainz extractor to poll the Discogs extractor's health endpoint before starting extraction (mutual exclusion). Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * feat(extractor): add wait_for_discogs_idle() health polling functions Add functions that poll the Discogs extractor's health endpoint and block while extraction_status is "running", enabling mutual exclusion between Discogs and MusicBrainz extraction. Falls back after 10 unreachable attempts. Includes 6 tests covering idle/completed/failed status, running-then-idle transition, max retries, and shutdown signal. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * feat(extractor): integrate wait_for_discogs_idle into MusicBrainz loop Call wait_for_discogs_idle() before each process_musicbrainz_data() invocation in run_musicbrainz_loop() to ensure Discogs and MusicBrainz extractions never run concurrently. Initial extraction propagates errors; periodic and triggered extractions log and continue on health check failure. Add DISCOGS_HEALTH_URL env var to extractor-musicbrainz in docker-compose.yml. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * style(extractor): apply rustfmt to changed files Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix(extractor): increase trigger test shutdown delay for CI reliability The trigger tests spawn a background task that sends shutdown after a delay. With the added wait_for_discogs_idle() HTTP call, the trigger arm takes longer to complete. In CI's slower environment, the 2000ms delay caused the shutdown to fire during the trigger arm (not during the select!), leading to a race condition in tokio's single-threaded test runtime. Increased to 5000ms. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix(extractor): increase trigger test initial delay for CI reliability The background task that removes the state marker and sets the trigger was only waiting 50ms. With the new wait_for_discogs_idle() HTTP call before initial processing, 50ms is not enough in CI — the marker gets deleted before initial processing reads it, causing unexpected failures. Increased to 2000ms to ensure initial processing (including health check) completes before the marker is removed and the trigger fires. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent ec45b69 commit e66b5bd

8 files changed

Lines changed: 326 additions & 142 deletions

File tree

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

docker-compose.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,6 +342,7 @@ services:
342342
command: ["--source", "musicbrainz"]
343343
user: "1000:1000"
344344
environment:
345+
DISCOGS_HEALTH_URL: "http://extractor-discogs:8000/health"
345346
LOG_LEVEL: info
346347
MUSICBRAINZ_ROOT: /musicbrainz-data
347348
PERIODIC_CHECK_DAYS: "3"

extractor/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ anyhow = "1.0"
5252
thiserror = "2.0"
5353

5454
# HTTP client for downloads (replaces AWS S3 SDK)
55-
reqwest = { version = "0.13", features = ["stream", "rustls"] }
55+
reqwest = { version = "0.13", features = ["json", "stream", "rustls"] }
5656
bytes = "1.11"
5757
regex = "1.12"
5858
urlencoding = "2.1"

extractor/src/config.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ pub struct ExtractorConfig {
2121
pub discogs_exchange_prefix: String,
2222
pub musicbrainz_exchange_prefix: String,
2323
pub musicbrainz_dump_url: String,
24+
pub discogs_health_url: String,
2425
}
2526

2627
impl Default for ExtractorConfig {
@@ -41,6 +42,7 @@ impl Default for ExtractorConfig {
4142
discogs_exchange_prefix: "discogsography-discogs".to_string(),
4243
musicbrainz_exchange_prefix: "discogsography-musicbrainz".to_string(),
4344
musicbrainz_dump_url: "https://data.metabrainz.org/pub/musicbrainz/data/json-dumps/".to_string(),
45+
discogs_health_url: "http://extractor-discogs:8000/health".to_string(),
4446
}
4547
}
4648
}
@@ -97,12 +99,13 @@ impl ExtractorConfig {
9799
let musicbrainz_dump_url =
98100
std::env::var("MUSICBRAINZ_DUMP_URL").unwrap_or_else(|_| "https://data.metabrainz.org/pub/musicbrainz/data/json-dumps/".to_string());
99101

102+
let discogs_health_url = std::env::var("DISCOGS_HEALTH_URL").unwrap_or_else(|_| "http://extractor-discogs:8000/health".to_string());
103+
100104
let health_port = std::env::var("HEALTH_PORT").unwrap_or_else(|_| "8000".to_string()).parse::<u16>().unwrap_or(8000);
101105
let queue_size = std::env::var("QUEUE_SIZE").unwrap_or_else(|_| "5000".to_string()).parse::<usize>().unwrap_or(5000).max(1);
102106
let progress_log_interval =
103107
std::env::var("PROGRESS_LOG_INTERVAL").unwrap_or_else(|_| "1000".to_string()).parse::<usize>().unwrap_or(1000).max(1);
104-
let state_save_interval =
105-
std::env::var("STATE_SAVE_INTERVAL").unwrap_or_else(|_| "5000".to_string()).parse::<usize>().unwrap_or(5000).max(1);
108+
let state_save_interval = std::env::var("STATE_SAVE_INTERVAL").unwrap_or_else(|_| "5000".to_string()).parse::<usize>().unwrap_or(5000).max(1);
106109

107110
Ok(Self {
108111
amqp_connection,
@@ -120,6 +123,7 @@ impl ExtractorConfig {
120123
discogs_exchange_prefix,
121124
musicbrainz_exchange_prefix,
122125
musicbrainz_dump_url,
126+
discogs_health_url,
123127
})
124128
}
125129
}

extractor/src/extractor.rs

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -797,6 +797,67 @@ pub async fn run_extraction_loop(
797797
Ok(())
798798
}
799799

800+
pub(crate) const DISCOGS_POLL_INTERVAL: Duration = Duration::from_secs(60);
801+
pub(crate) const DISCOGS_HEALTH_TIMEOUT: Duration = Duration::from_secs(5);
802+
pub(crate) const DISCOGS_MAX_UNREACHABLE_RETRIES: u32 = 10;
803+
804+
/// Wait until the Discogs extractor is not actively extracting.
805+
pub async fn wait_for_discogs_idle(url: &str, shutdown_flag: &AtomicBool) -> Result<()> {
806+
wait_for_discogs_idle_with_interval(url, shutdown_flag, DISCOGS_POLL_INTERVAL).await
807+
}
808+
809+
/// Internal implementation with configurable poll interval (for testing).
810+
pub async fn wait_for_discogs_idle_with_interval(url: &str, shutdown_flag: &AtomicBool, poll_interval: Duration) -> Result<()> {
811+
let client = reqwest::Client::builder().timeout(DISCOGS_HEALTH_TIMEOUT).build()?;
812+
813+
let mut unreachable_count: u32 = 0;
814+
815+
loop {
816+
if shutdown_flag.load(Ordering::SeqCst) {
817+
info!("🛑 Shutdown requested, stopping Discogs health check wait");
818+
return Ok(());
819+
}
820+
821+
match client.get(url).send().await {
822+
Ok(response) => {
823+
unreachable_count = 0;
824+
match response.json::<serde_json::Value>().await {
825+
Ok(body) => {
826+
let status = body.get("extraction_status").and_then(|v| v.as_str()).unwrap_or("unknown");
827+
828+
if status == "running" {
829+
info!("⏳ Discogs extraction in progress, waiting before starting MusicBrainz extraction...");
830+
} else {
831+
info!("✅ Discogs extractor idle (status: {}), proceeding with MusicBrainz extraction", status);
832+
return Ok(());
833+
}
834+
}
835+
Err(e) => {
836+
warn!("⚠️ Failed to parse Discogs health response: {}, proceeding", e);
837+
return Ok(());
838+
}
839+
}
840+
}
841+
Err(_) => {
842+
unreachable_count += 1;
843+
if unreachable_count >= DISCOGS_MAX_UNREACHABLE_RETRIES {
844+
warn!(
845+
"⚠️ Discogs health endpoint unreachable after {} attempts, proceeding with MusicBrainz extraction",
846+
DISCOGS_MAX_UNREACHABLE_RETRIES
847+
);
848+
return Ok(());
849+
}
850+
warn!(
851+
"⚠️ Discogs health endpoint unreachable (attempt {}/{}), retrying in {:?}...",
852+
unreachable_count, DISCOGS_MAX_UNREACHABLE_RETRIES, poll_interval
853+
);
854+
}
855+
}
856+
857+
tokio::time::sleep(poll_interval).await;
858+
}
859+
}
860+
800861
/// Main MusicBrainz extraction loop with periodic checks for new dumps.
801862
pub async fn run_musicbrainz_loop(
802863
config: Arc<ExtractorConfig>,
@@ -821,6 +882,9 @@ pub async fn run_musicbrainz_loop(
821882

822883
info!("🎵 Starting MusicBrainz extraction...");
823884

885+
// Wait for Discogs extractor to finish before starting MusicBrainz
886+
wait_for_discogs_idle(&config.discogs_health_url, &shutdown_flag).await?;
887+
824888
let success =
825889
process_musicbrainz_data(config.clone(), state.clone(), shutdown_flag.clone(), force_reprocess, mq_factory.clone(), compiled_rules.clone())
826890
.await?;
@@ -846,6 +910,10 @@ pub async fn run_musicbrainz_loop(
846910
tokio::select! {
847911
_ = sleep(check_interval) => {
848912
info!("🔄 Starting periodic check for new MusicBrainz dumps...");
913+
// Wait for Discogs extractor to finish before starting MusicBrainz
914+
if let Err(e) = wait_for_discogs_idle(&config.discogs_health_url, &shutdown_flag).await {
915+
error!("❌ Failed to check Discogs health: {}", e);
916+
}
849917
let start = Instant::now();
850918
match process_musicbrainz_data(config.clone(), state.clone(), shutdown_flag.clone(), false, mq_factory.clone(), compiled_rules.clone()).await {
851919
Ok(true) => {
@@ -861,6 +929,10 @@ pub async fn run_musicbrainz_loop(
861929
}
862930
trigger_force_reprocess = wait_for_trigger(&trigger) => {
863931
info!("🔄 MusicBrainz extraction triggered via API (force_reprocess={})...", trigger_force_reprocess);
932+
// Wait for Discogs extractor to finish before starting MusicBrainz
933+
if let Err(e) = wait_for_discogs_idle(&config.discogs_health_url, &shutdown_flag).await {
934+
error!("❌ Failed to check Discogs health: {}", e);
935+
}
864936
let start = Instant::now();
865937
match process_musicbrainz_data(config.clone(), state.clone(), shutdown_flag.clone(), trigger_force_reprocess, mq_factory.clone(), compiled_rules.clone()).await {
866938
Ok(true) => info!("✅ Triggered MusicBrainz extraction completed in {:?}", start.elapsed()),

extractor/src/tests/config_tests.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,25 @@ fn test_musicbrainz_dump_url_from_env() {
217217
}
218218
}
219219

220+
#[test]
221+
fn test_discogs_health_url_default() {
222+
let config = ExtractorConfig::default();
223+
assert_eq!(config.discogs_health_url, "http://extractor-discogs:8000/health");
224+
}
225+
226+
#[test]
227+
#[serial]
228+
fn test_discogs_health_url_from_env() {
229+
unsafe {
230+
std::env::set_var("DISCOGS_HEALTH_URL", "http://custom-host:9000/health");
231+
}
232+
let config = ExtractorConfig::from_env().unwrap();
233+
assert_eq!(config.discogs_health_url, "http://custom-host:9000/health");
234+
unsafe {
235+
std::env::remove_var("DISCOGS_HEALTH_URL");
236+
}
237+
}
238+
220239
#[test]
221240
fn test_build_amqp_url_special_characters() {
222241
let url = build_amqp_url("user@host", "p@ss:w/rd#1", "localhost", "5672");

extractor/src/tests/extractor_tests.rs

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1050,3 +1050,113 @@ async fn test_extraction_status_set_failed_on_error() {
10501050
let s = state.read().await;
10511051
assert_eq!(s.extraction_status, ExtractionStatus::Failed);
10521052
}
1053+
1054+
mod wait_for_discogs_idle_tests {
1055+
use crate::extractor::{wait_for_discogs_idle, wait_for_discogs_idle_with_interval};
1056+
use std::sync::atomic::AtomicBool;
1057+
use tokio::time::Duration;
1058+
1059+
#[tokio::test]
1060+
async fn test_proceeds_when_idle() {
1061+
let mut server = mockito::Server::new_async().await;
1062+
let mock = server
1063+
.mock("GET", "/health")
1064+
.with_status(200)
1065+
.with_header("content-type", "application/json")
1066+
.with_body(r#"{"extraction_status": "idle"}"#)
1067+
.create_async()
1068+
.await;
1069+
1070+
let shutdown = AtomicBool::new(false);
1071+
let url = format!("{}/health", server.url());
1072+
let result = wait_for_discogs_idle(&url, &shutdown).await;
1073+
1074+
assert!(result.is_ok());
1075+
mock.assert_async().await;
1076+
}
1077+
1078+
#[tokio::test]
1079+
async fn test_proceeds_when_completed() {
1080+
let mut server = mockito::Server::new_async().await;
1081+
let mock = server
1082+
.mock("GET", "/health")
1083+
.with_status(200)
1084+
.with_header("content-type", "application/json")
1085+
.with_body(r#"{"extraction_status": "completed"}"#)
1086+
.create_async()
1087+
.await;
1088+
1089+
let shutdown = AtomicBool::new(false);
1090+
let url = format!("{}/health", server.url());
1091+
let result = wait_for_discogs_idle(&url, &shutdown).await;
1092+
1093+
assert!(result.is_ok());
1094+
mock.assert_async().await;
1095+
}
1096+
1097+
#[tokio::test]
1098+
async fn test_proceeds_when_failed() {
1099+
let mut server = mockito::Server::new_async().await;
1100+
let mock = server
1101+
.mock("GET", "/health")
1102+
.with_status(200)
1103+
.with_header("content-type", "application/json")
1104+
.with_body(r#"{"extraction_status": "failed"}"#)
1105+
.create_async()
1106+
.await;
1107+
1108+
let shutdown = AtomicBool::new(false);
1109+
let url = format!("{}/health", server.url());
1110+
let result = wait_for_discogs_idle(&url, &shutdown).await;
1111+
1112+
assert!(result.is_ok());
1113+
mock.assert_async().await;
1114+
}
1115+
1116+
#[tokio::test]
1117+
async fn test_waits_then_proceeds_when_running_then_idle() {
1118+
let mut server = mockito::Server::new_async().await;
1119+
let _mock_running = server
1120+
.mock("GET", "/health")
1121+
.with_status(200)
1122+
.with_header("content-type", "application/json")
1123+
.with_body(r#"{"extraction_status": "running"}"#)
1124+
.expect(1)
1125+
.create_async()
1126+
.await;
1127+
let _mock_idle = server
1128+
.mock("GET", "/health")
1129+
.with_status(200)
1130+
.with_header("content-type", "application/json")
1131+
.with_body(r#"{"extraction_status": "idle"}"#)
1132+
.expect(1)
1133+
.create_async()
1134+
.await;
1135+
1136+
let shutdown = AtomicBool::new(false);
1137+
let url = format!("{}/health", server.url());
1138+
let result = wait_for_discogs_idle_with_interval(&url, &shutdown, Duration::from_millis(10)).await;
1139+
1140+
assert!(result.is_ok());
1141+
}
1142+
1143+
#[tokio::test]
1144+
async fn test_proceeds_after_max_unreachable_retries() {
1145+
// Use a port that nothing listens on
1146+
let url = "http://127.0.0.1:19999/health";
1147+
let shutdown = AtomicBool::new(false);
1148+
let result = wait_for_discogs_idle_with_interval(url, &shutdown, Duration::from_millis(10)).await;
1149+
1150+
assert!(result.is_ok());
1151+
}
1152+
1153+
#[tokio::test]
1154+
async fn test_respects_shutdown_signal() {
1155+
let shutdown = AtomicBool::new(true);
1156+
// Use unreachable port — should return immediately due to shutdown flag
1157+
let url = "http://127.0.0.1:19999/health";
1158+
let result = wait_for_discogs_idle_with_interval(url, &shutdown, Duration::from_millis(10)).await;
1159+
1160+
assert!(result.is_ok());
1161+
}
1162+
}

0 commit comments

Comments
 (0)