diff --git a/.sqlx/query-69e1491c5c93cd474a3712590f2af3445834d73fe652b8d9b5812142100544f4.json b/.sqlx/query-69e1491c5c93cd474a3712590f2af3445834d73fe652b8d9b5812142100544f4.json deleted file mode 100644 index 9b32f31..0000000 --- a/.sqlx/query-69e1491c5c93cd474a3712590f2af3445834d73fe652b8d9b5812142100544f4.json +++ /dev/null @@ -1,14 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "DELETE FROM job_executions WHERE id = $1 AND state = 'pending'", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Uuid" - ] - }, - "nullable": [] - }, - "hash": "69e1491c5c93cd474a3712590f2af3445834d73fe652b8d9b5812142100544f4" -} diff --git a/.sqlx/query-da1731957184bb32791a503e9a0de36225e01db0c9998b356239809a9bf227be.json b/.sqlx/query-fd73fbaeda7e48215355e712c45d345e9b71acd33aff23c23775b924cedcd6ee.json similarity index 66% rename from .sqlx/query-da1731957184bb32791a503e9a0de36225e01db0c9998b356239809a9bf227be.json rename to .sqlx/query-fd73fbaeda7e48215355e712c45d345e9b71acd33aff23c23775b924cedcd6ee.json index 0069d97..e8e8c86 100644 --- a/.sqlx/query-da1731957184bb32791a503e9a0de36225e01db0c9998b356239809a9bf227be.json +++ b/.sqlx/query-fd73fbaeda7e48215355e712c45d345e9b71acd33aff23c23775b924cedcd6ee.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n UPDATE job_executions\n SET state = 'pending', execute_at = $1, attempt_index = attempt_index + 1, poller_instance_id = NULL\n WHERE state = 'running' AND alive_at < $1::timestamptz\n RETURNING id as id\n ", + "query": "\n UPDATE job_executions\n SET state = 'pending', execute_at = $1, attempt_index = attempt_index + 1, poller_instance_id = NULL\n WHERE state = 'running' AND alive_at < $1::timestamptz\n AND job_type = ANY($2)\n RETURNING id as id\n ", "describe": { "columns": [ { @@ -11,12 +11,13 @@ ], "parameters": { "Left": [ - "Timestamptz" + "Timestamptz", + "TextArray" ] }, "nullable": [ false ] }, - "hash": "da1731957184bb32791a503e9a0de36225e01db0c9998b356239809a9bf227be" + "hash": "fd73fbaeda7e48215355e712c45d345e9b71acd33aff23c23775b924cedcd6ee" } diff --git a/Cargo.lock b/Cargo.lock index 13494e4..9b33ddb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -421,9 +421,8 @@ checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" [[package]] name = "es-entity" -version = "0.10.30" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aa815a1b5e711330415883ee1f0646189ef9d461a58d0333cbe60c6a13faae2b" +version = "0.10.31-dev" +source = "git+https://github.com/GaloyMoney/es-entity?rev=06e3dd0#06e3dd04e97f6b96ef9e5688cdcded67a3989647" dependencies = [ "chrono", "derive_builder", @@ -446,9 +445,8 @@ dependencies = [ [[package]] name = "es-entity-macros" -version = "0.10.30" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b1f6383e8ee2f7fe5d49f7808221521984534beb08015fe50163f5d21c132d6" +version = "0.10.31-dev" +source = "git+https://github.com/GaloyMoney/es-entity?rev=06e3dd0#06e3dd04e97f6b96ef9e5688cdcded67a3989647" dependencies = [ "convert_case", "darling 0.23.0", diff --git a/Cargo.toml b/Cargo.toml index e4f0eaa..8d33fcf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -42,7 +42,7 @@ anyhow = { workspace = true } [workspace.dependencies] -es-entity = "0.10.30" +es-entity = { git = "https://github.com/GaloyMoney/es-entity", rev = "06e3dd0" } anyhow = "1.0" async-trait = "0.1" diff --git a/src/poller.rs b/src/poller.rs index 1e6706c..5aa61b5 100644 --- a/src/poller.rs +++ b/src/poller.rs @@ -211,9 +211,10 @@ impl JobPoller { let job_lost_interval = self.config.job_lost_interval; let pool = self.repo.pool().clone(); let clock = self.clock.clone(); + let supported_job_types = self.registry.registered_job_types(); OwnedTaskHandle::new(spawn_named_task!("job-poller-lost-handler", async move { loop { - clock.sleep(job_lost_interval / 2).await; + clock.sleep_coalesce(job_lost_interval / 2).await; let now = clock.now(); let check_time = now - job_lost_interval; @@ -230,9 +231,11 @@ impl JobPoller { UPDATE job_executions SET state = 'pending', execute_at = $1, attempt_index = attempt_index + 1, poller_instance_id = NULL WHERE state = 'running' AND alive_at < $1::timestamptz + AND job_type = ANY($2) RETURNING id as id "#, check_time, + &supported_job_types as _, ) .fetch_all(&pool) .await @@ -292,7 +295,7 @@ impl JobPoller { } }; drop(_guard); - clock.sleep(timeout).await; + clock.sleep_coalesce(timeout).await; } } )) diff --git a/tests/job.rs b/tests/job.rs index 4348c75..785b69d 100644 --- a/tests/job.rs +++ b/tests/job.rs @@ -3,11 +3,11 @@ mod helpers; use async_trait::async_trait; use chrono::{DateTime, Utc}; use job::{ - ClockHandle, CurrentJob, Job, JobCompletion, JobCompletionResult, JobId, JobInitializer, - JobRunner, JobSpawner, JobSpec, JobSvcConfig, JobTerminalState, JobType, Jobs, RetrySettings, - error::JobError, + ClockHandle, CurrentJob, Job, JobCompletion, JobId, JobInitializer, JobRunner, JobSpawner, + JobSpec, JobSvcConfig, JobTerminalState, JobType, Jobs, RetrySettings, error::JobError, }; use serde::{Deserialize, Serialize}; +use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use tokio::sync::{Mutex, Notify}; @@ -1222,3 +1222,289 @@ async fn test_await_completion_timeout() -> anyhow::Result<()> { Ok(()) } + +// -- Multi-day scheduling tests -- + +#[derive(Debug, Serialize, Deserialize)] +struct MultiDayJobConfig { + label: String, +} + +struct MultiDayJobInitializer { + execution_times: Arc>>>, +} + +impl JobInitializer for MultiDayJobInitializer { + type Config = MultiDayJobConfig; + + fn job_type(&self) -> JobType { + JobType::new("multi-day-job") + } + + fn init( + &self, + job: &Job, + _: JobSpawner, + ) -> Result, Box> { + Ok(Box::new(MultiDayJobRunner { + job_id: job.id, + execution_times: Arc::clone(&self.execution_times), + })) + } +} + +struct MultiDayJobRunner { + job_id: JobId, + execution_times: Arc>>>, +} + +#[async_trait] +impl JobRunner for MultiDayJobRunner { + async fn run( + &self, + current_job: CurrentJob, + ) -> Result> { + let now = current_job.clock().now(); + self.execution_times.lock().await.insert(self.job_id, now); + Ok(JobCompletion::Complete) + } +} + +/// Polls until all specified jobs are marked completed in the database. +async fn wait_for_jobs_completed(jobs: &Jobs, ids: &[JobId], max_attempts: usize) { + let mut attempts = 0; + loop { + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + let mut all_done = true; + for id in ids { + let job = jobs.find(*id).await.expect("job should exist"); + if !job.completed() { + all_done = false; + break; + } + } + if all_done { + return; + } + attempts += 1; + if attempts >= max_attempts { + panic!( + "Jobs {:?} did not all complete within {} attempts ({}ms)", + ids, + max_attempts, + max_attempts * 100, + ); + } + } +} + +/// Test that jobs scheduled across multiple days all fire correctly when the +/// manual clock is advanced one day at a time. +/// +/// # What this test verifies +/// +/// When `controller.advance(1 day)` is called, the manual clock jumps forward. +/// Housekeeping loops (keep-alive, lost-handler) use `sleep_coalesce()` so they +/// wake once at the final time instead of at every intermediate interval. This +/// means the polling loop can dispatch jobs promptly without being starved by +/// ~1700 housekeeping wake-ups per day-advance. +/// +/// # Cross-test isolation +/// +/// The lost-handler SQL is scoped to `job_type = ANY(registered_types)`, so a +/// poller with a far-future manual clock only resets its own job types. This +/// prevents cross-test interference when multiple pollers share the same DB. +#[tokio::test] +async fn test_multi_day_scheduling_with_artificial_clock() -> anyhow::Result<()> { + let pool = helpers::init_pool().await?; + + let (clock, controller) = ClockHandle::manual(); + let initial_time = clock.now(); + + let config = JobSvcConfig::builder() + .pool(pool) + .clock(clock.clone()) + .build() + .expect("Failed to build JobsConfig"); + + let mut jobs = Jobs::init(config).await?; + + let execution_times: Arc>>> = + Arc::new(Mutex::new(HashMap::new())); + let spawner = jobs.add_initializer(MultiDayJobInitializer { + execution_times: Arc::clone(&execution_times), + }); + + jobs.start_poll() + .await + .expect("Failed to start job polling"); + + // Schedule 5 jobs at various future times + let job_2h_a = JobId::new(); + let job_2h_b = JobId::new(); + let job_2d = JobId::new(); + let job_4d = JobId::new(); + let job_7d = JobId::new(); + + let at_2h = initial_time + chrono::Duration::hours(2); + let at_2d = initial_time + chrono::Duration::days(2); + let at_4d = initial_time + chrono::Duration::days(4); + let at_7d = initial_time + chrono::Duration::days(7); + + spawner + .spawn_at( + job_2h_a, + MultiDayJobConfig { + label: "2h-a".into(), + }, + at_2h, + ) + .await?; + spawner + .spawn_at( + job_2h_b, + MultiDayJobConfig { + label: "2h-b".into(), + }, + at_2h, + ) + .await?; + spawner + .spawn_at(job_2d, MultiDayJobConfig { label: "2d".into() }, at_2d) + .await?; + spawner + .spawn_at(job_4d, MultiDayJobConfig { label: "4d".into() }, at_4d) + .await?; + spawner + .spawn_at(job_7d, MultiDayJobConfig { label: "7d".into() }, at_7d) + .await?; + + // No jobs should have run yet + tokio::time::sleep(tokio::time::Duration::from_millis(200)).await; + assert!( + execution_times.lock().await.is_empty(), + "No jobs should run before clock advances" + ); + + let one_day = std::time::Duration::from_secs(86_400); + let wait_attempts = 50; // 5 seconds — plenty now that housekeeping coalesces + + // --- Day 1: the two 2-hour jobs should fire --- + controller.advance(one_day).await; + wait_for_jobs_completed(&jobs, &[job_2h_a, job_2h_b], wait_attempts).await; + { + let times = execution_times.lock().await; + assert!( + times.contains_key(&job_2h_a), + "2h-a should have run after day 1" + ); + assert!( + times.contains_key(&job_2h_b), + "2h-b should have run after day 1" + ); + assert!( + !times.contains_key(&job_2d), + "2d should NOT have run after day 1" + ); + assert!( + !times.contains_key(&job_4d), + "4d should NOT have run after day 1" + ); + assert!( + !times.contains_key(&job_7d), + "7d should NOT have run after day 1" + ); + } + + // --- Day 2: the 2-day job should fire --- + controller.advance(one_day).await; + wait_for_jobs_completed(&jobs, &[job_2d], wait_attempts).await; + { + let times = execution_times.lock().await; + assert!( + times.contains_key(&job_2d), + "2d should have run after day 2" + ); + assert!( + !times.contains_key(&job_4d), + "4d should NOT have run after day 2" + ); + assert!( + !times.contains_key(&job_7d), + "7d should NOT have run after day 2" + ); + } + + // --- Day 3: no new jobs --- + controller.advance(one_day).await; + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + { + let times = execution_times.lock().await; + assert_eq!(times.len(), 3, "Only 3 jobs should have run by day 3"); + } + + // --- Day 4: the 4-day job should fire --- + controller.advance(one_day).await; + wait_for_jobs_completed(&jobs, &[job_4d], wait_attempts).await; + { + let times = execution_times.lock().await; + assert!( + times.contains_key(&job_4d), + "4d should have run after day 4" + ); + assert!( + !times.contains_key(&job_7d), + "7d should NOT have run after day 4" + ); + } + + // --- Days 5 and 6: no new jobs --- + controller.advance(one_day).await; + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + controller.advance(one_day).await; + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + { + let times = execution_times.lock().await; + assert_eq!(times.len(), 4, "Only 4 jobs should have run by day 6"); + } + + // --- Day 7: the 7-day job should fire --- + controller.advance(one_day).await; + wait_for_jobs_completed(&jobs, &[job_7d], wait_attempts).await; + + // All 5 jobs should now be recorded + { + let times = execution_times.lock().await; + assert_eq!(times.len(), 5, "All 5 jobs should have run by day 7"); + } + + // Verify every job is completed in the database + for id in [job_2h_a, job_2h_b, job_2d, job_4d, job_7d] { + let job = jobs.find(id).await?; + assert!(job.completed(), "Job {id} should be completed"); + } + + // Verify execution times are at or after their scheduled times + { + let times = execution_times.lock().await; + for (label, id, scheduled) in [ + ("2h-a", job_2h_a, at_2h), + ("2h-b", job_2h_b, at_2h), + ("2d", job_2d, at_2d), + ("4d", job_4d, at_4d), + ("7d", job_7d, at_7d), + ] { + let exec_time = times[&id]; + assert!( + exec_time >= scheduled, + "Job {label} executed at {exec_time} but was scheduled for {scheduled}", + ); + } + } + + // Explicit shutdown prevents the lost-handler (which uses our far-future + // manual clock) from resetting other tests' running jobs to 'pending'. + jobs.shutdown().await?; + + Ok(()) +}