diff --git a/Cargo.lock b/Cargo.lock index e26e152..64379bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -382,6 +382,41 @@ dependencies = [ "typenum", ] +[[package]] +name = "darling" +version = "0.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9cdf337090841a411e2a7f3deb9187445851f91b309c0c0a29e05f74a00a48c0" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1247195ecd7e3c85f83c8d2a366e4210d588e802133e1e355180a9870b517ea4" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn", +] + +[[package]] +name = "darling_macro" +version = "0.21.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d38308df82d1080de0afee5d069fa14b0326a88c14f15c5ccda35b4a6c414c81" +dependencies = [ + "darling_core", + "quote", + "syn", +] + [[package]] name = "der" version = "0.7.10" @@ -393,6 +428,16 @@ dependencies = [ "zeroize", ] +[[package]] +name = "deranged" +version = "0.5.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ececcb659e7ba858fb4f10388c250a7252eb0a27373f1a72b8748afdd248e587" +dependencies = [ + "powerfmt", + "serde_core", +] + [[package]] name = "digest" version = "0.10.7" @@ -441,6 +486,7 @@ dependencies = [ "rand 0.9.2", "serde", "serde_json", + "serde_with", "sqlx", "thiserror 2.0.17", "tokio", @@ -451,6 +497,12 @@ dependencies = [ "uuid", ] +[[package]] +name = "dyn-clone" +version = "1.0.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0881ea181b1df73ff77ffaaf9c7544ecc11e82fba9b5f27b262a3c73a332555" + [[package]] name = "either" version = "1.15.0" @@ -511,6 +563,12 @@ dependencies = [ "spin", ] +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + [[package]] name = "foldhash" version = "0.1.5" @@ -682,6 +740,12 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "hashbrown" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a9ee70c43aaf417c914396645a0fa852624801b24ebb7ae78fe8272889ac888" + [[package]] name = "hashbrown" version = "0.15.5" @@ -872,6 +936,12 @@ dependencies = [ "zerovec", ] +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "idna" version = "1.1.0" @@ -893,6 +963,17 @@ dependencies = [ "icu_properties", ] +[[package]] +name = "indexmap" +version = "1.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bd070e393353796e801d209ad339e89596eb4c8d430d18ede6a1cced8fafbd99" +dependencies = [ + "autocfg", + "hashbrown 0.12.3", + "serde", +] + [[package]] name = "indexmap" version = "2.12.1" @@ -901,6 +982,8 @@ checksum = "0ad4bb2b565bca0645f4d68c5c9af97fba094e9791da685bf83cb5f3ce74acf2" dependencies = [ "equivalent", "hashbrown 0.16.1", + "serde", + "serde_core", ] [[package]] @@ -1038,7 +1121,7 @@ dependencies = [ "crossbeam-epoch", "crossbeam-utils", "hashbrown 0.15.5", - "indexmap", + "indexmap 2.12.1", "metrics", "ordered-float", "quanta", @@ -1091,6 +1174,12 @@ dependencies = [ "zeroize", ] +[[package]] +name = "num-conv" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9" + [[package]] name = "num-integer" version = "0.1.46" @@ -1301,6 +1390,12 @@ dependencies = [ "zerovec", ] +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "ppv-lite86" version = "0.2.21" @@ -1456,6 +1551,26 @@ dependencies = [ "bitflags", ] +[[package]] +name = "ref-cast" +version = "1.0.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f354300ae66f76f1c85c5f84693f0ce81d747e2c3f21a45fef496d89c960bf7d" +dependencies = [ + "ref-cast-impl", +] + +[[package]] +name = "ref-cast-impl" +version = "1.0.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7186006dcb21920990093f30e3dea63b7d6e977bf1256be20c3563a5db070da" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "regex" version = "1.12.2" @@ -1574,6 +1689,30 @@ dependencies = [ "winapi-util", ] +[[package]] +name = "schemars" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cd191f9397d57d581cddd31014772520aa448f65ef991055d7f61582c65165f" +dependencies = [ + "dyn-clone", + "ref-cast", + "serde", + "serde_json", +] + +[[package]] +name = "schemars" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9558e172d4e8533736ba97870c4b2cd63f84b382a3d6eb063da41b91cce17289" +dependencies = [ + "dyn-clone", + "ref-cast", + "serde", + "serde_json", +] + [[package]] name = "scopeguard" version = "1.2.0" @@ -1644,6 +1783,37 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_with" +version = "3.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fa237f2807440d238e0364a218270b98f767a00d3dada77b1c53ae88940e2e7" +dependencies = [ + "base64", + "chrono", + "hex", + "indexmap 1.9.3", + "indexmap 2.12.1", + "schemars 0.9.0", + "schemars 1.1.0", + "serde_core", + "serde_json", + "serde_with_macros", + "time", +] + +[[package]] +name = "serde_with_macros" +version = "3.16.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52a8e3ca0ca629121f70ab50f95249e5a6f925cc0f6ffe8256c45b728875706c" +dependencies = [ + "darling", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "sha1" version = "0.10.6" @@ -1783,7 +1953,7 @@ dependencies = [ "futures-util", "hashbrown 0.16.1", "hashlink", - "indexmap", + "indexmap 2.12.1", "log", "memchr", "percent-encoding", @@ -1965,6 +2135,12 @@ dependencies = [ "unicode-properties", ] +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + [[package]] name = "subtle" version = "2.6.1" @@ -2042,6 +2218,37 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "time" +version = "0.3.44" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e7d9e3bb61134e77bde20dd4825b97c010155709965fedf0f49bb138e52a9d" +dependencies = [ + "deranged", + "itoa", + "num-conv", + "powerfmt", + "serde", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40868e7c1d2f0b8d73e4a8c7f0ff63af4f6d19be117e90bd73eb1d62cf831c6b" + +[[package]] +name = "time-macros" +version = "0.2.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30cfb0125f12d9c277f35663a0a33f8c30190f4e4574868a330595412d34ebf3" +dependencies = [ + "num-conv", + "time-core", +] + [[package]] name = "tinystr" version = "0.8.2" @@ -2143,7 +2350,7 @@ version = "0.22.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "41fe8c660ae4257887cf66394862d21dbca4a6ddd26f04a3560410406a2f819a" dependencies = [ - "indexmap", + "indexmap 2.12.1", "serde", "serde_spanned", "toml_datetime", diff --git a/Cargo.toml b/Cargo.toml index 4895fe8..18d7585 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,6 +19,7 @@ tokio = { version = "1", features = ["full"] } sqlx = { version = "0.9.0-alpha.1", features = ["sqlx-toml", "postgres", "runtime-tokio", "chrono", "tls-rustls", "uuid", "migrate"] } serde = { version = "1", features = ["derive"] } serde_json = "1" +serde_with = "3" anyhow = "1" thiserror = "2" async-trait = "0.1" diff --git a/benches/checkpoint.rs b/benches/checkpoint.rs index 84852e9..6591512 100644 --- a/benches/checkpoint.rs +++ b/benches/checkpoint.rs @@ -39,8 +39,11 @@ fn bench_step_cache_miss(c: &mut Criterion) { .unwrap(); let start = std::time::Instant::now(); - let worker = - ctx.client.start_worker(bench_worker_options(1, 120)).await; + let worker = ctx + .client + .start_worker(bench_worker_options(1, Duration::from_secs(120))) + .await + .unwrap(); wait_for_tasks_complete(&ctx.pool, &ctx.queue_name, 1, 60).await; total_time += start.elapsed(); @@ -91,8 +94,9 @@ fn bench_step_cache_hit(c: &mut Criterion) { let worker = ctx .client - .start_worker(bench_worker_options(1, 120)) - .await; + .start_worker(bench_worker_options(1, Duration::from_secs(120))) + .await + .unwrap(); wait_for_tasks_complete(&ctx.pool, &ctx.queue_name, 1, 60).await; worker.shutdown().await; @@ -123,8 +127,9 @@ fn bench_step_cache_hit(c: &mut Criterion) { let start = std::time::Instant::now(); let worker = ctx .client - .start_worker(bench_worker_options(1, 120)) - .await; + .start_worker(bench_worker_options(1, Duration::from_secs(120))) + .await + .unwrap(); wait_for_tasks_complete(&ctx.pool, &ctx.queue_name, 1, 60).await; total_time += start.elapsed(); @@ -177,8 +182,11 @@ fn bench_large_payload_checkpoint(c: &mut Criterion) { .unwrap(); let start = std::time::Instant::now(); - let worker = - ctx.client.start_worker(bench_worker_options(1, 120)).await; + let worker = ctx + .client + .start_worker(bench_worker_options(1, Duration::from_secs(120))) + .await + .unwrap(); wait_for_tasks_complete(&ctx.pool, &ctx.queue_name, 1, 120).await; total_time += start.elapsed(); diff --git a/benches/common/setup.rs b/benches/common/setup.rs index f1ebbaf..7130e87 100644 --- a/benches/common/setup.rs +++ b/benches/common/setup.rs @@ -120,11 +120,11 @@ pub async fn clear_completed_tasks(pool: &PgPool, queue: &str) { } /// Default worker options optimized for benchmarking -pub fn bench_worker_options(concurrency: usize, claim_timeout: u64) -> WorkerOptions { +pub fn bench_worker_options(concurrency: usize, claim_timeout: Duration) -> WorkerOptions { WorkerOptions { worker_id: None, concurrency, - poll_interval: 0.001, // Very fast polling for accurate timing + poll_interval: Duration::from_millis(1), // Very fast polling for accurate timing claim_timeout, batch_size: None, // Use default (= concurrency) fatal_on_lease_timeout: false, diff --git a/benches/concurrency.rs b/benches/concurrency.rs index 263af3b..2637605 100644 --- a/benches/concurrency.rs +++ b/benches/concurrency.rs @@ -57,8 +57,13 @@ fn bench_concurrent_claims(c: &mut Criterion) { // Sync all workers to start together barrier.wait().await; - let worker = - client.start_worker(bench_worker_options(1, 60)).await; + let worker = client + .start_worker(bench_worker_options( + 1, + Duration::from_secs(60), + )) + .await + .unwrap(); // Wait a bit then shutdown tokio::time::sleep(Duration::from_secs(15)).await; @@ -142,8 +147,13 @@ fn bench_claim_latency_distribution(c: &mut Criterion) { let handle = tokio::spawn(async move { barrier.wait().await; - let worker = - client.start_worker(bench_worker_options(4, 60)).await; + let worker = client + .start_worker(bench_worker_options( + 4, + Duration::from_secs(60), + )) + .await + .unwrap(); tokio::time::sleep(Duration::from_secs(20)).await; worker.shutdown().await; diff --git a/benches/throughput.rs b/benches/throughput.rs index 25fbeca..ef7d9a3 100644 --- a/benches/throughput.rs +++ b/benches/throughput.rs @@ -72,8 +72,12 @@ fn bench_task_throughput(c: &mut Criterion) { let start = std::time::Instant::now(); let worker = ctx .client - .start_worker(bench_worker_options(concurrency, 60)) - .await; + .start_worker(bench_worker_options( + concurrency, + Duration::from_secs(60), + )) + .await + .unwrap(); wait_for_tasks_complete( &ctx.pool, @@ -112,7 +116,11 @@ fn bench_e2e_completion(c: &mut Criterion) { let ctx = BenchContext::new().await; ctx.client.register::().await.unwrap(); - let worker = ctx.client.start_worker(bench_worker_options(1, 60)).await; + let worker = ctx + .client + .start_worker(bench_worker_options(1, Duration::from_secs(60))) + .await + .unwrap(); let start = std::time::Instant::now(); for i in 0..iters { diff --git a/src/client.rs b/src/client.rs index b685087..84042b9 100644 --- a/src/client.rs +++ b/src/client.rs @@ -4,6 +4,7 @@ use sqlx::{Executor, PgPool, Postgres}; use std::collections::HashMap; use std::marker::PhantomData; use std::sync::Arc; +use std::time::Duration; use tokio::sync::RwLock; use uuid::Uuid; @@ -36,12 +37,12 @@ struct CancellationPolicyDb { impl CancellationPolicyDb { fn from_policy(policy: &CancellationPolicy) -> Option { - if policy.max_delay.is_none() && policy.max_duration.is_none() { + if policy.max_pending_time.is_none() && policy.max_running_time.is_none() { None } else { Some(Self { - max_delay: policy.max_delay, - max_duration: policy.max_duration, + max_delay: policy.max_pending_time.map(|d| d.as_secs()), + max_duration: policy.max_running_time.map(|d| d.as_secs()), }) } } @@ -599,15 +600,25 @@ where } /// Start a worker that processes tasks from the queue - pub async fn start_worker(&self, options: WorkerOptions) -> Worker { - Worker::start( + /// + /// # Errors + /// + /// Returns [`DurableError::InvalidConfiguration`] if `claim_timeout` is less than 1 second. + pub async fn start_worker(&self, options: WorkerOptions) -> DurableResult { + if options.claim_timeout < Duration::from_secs(1) { + return Err(DurableError::InvalidConfiguration { + reason: "claim_timeout must be at least 1 second".to_string(), + }); + } + + Ok(Worker::start( self.pool.clone(), self.queue_name.clone(), self.registry.clone(), options, self.state.clone(), ) - .await + .await) } /// Close the client. Closes the pool if owned. diff --git a/src/context.rs b/src/context.rs index 80edad6..3071119 100644 --- a/src/context.rs +++ b/src/context.rs @@ -56,7 +56,7 @@ where queue_name: String, #[allow(dead_code)] task: ClaimedTask, - claim_timeout: u64, + claim_timeout: Duration, state: State, @@ -94,7 +94,7 @@ where pool: PgPool, queue_name: String, task: ClaimedTask, - claim_timeout: u64, + claim_timeout: Duration, lease_extender: LeaseExtender, registry: Arc>>, state: State, @@ -274,15 +274,14 @@ where .bind(name) .bind(&state_json) .bind(self.run_id) - .bind(self.claim_timeout as i32) + .bind(self.claim_timeout.as_secs() as i32) .execute(&self.pool) .await?; self.checkpoint_cache.insert(name.to_string(), state_json); // Notify worker that lease was extended so it can reset timers - self.lease_extender - .notify(Duration::from_secs(self.claim_timeout)); + self.lease_extender.notify(self.claim_timeout); Ok(()) } @@ -443,9 +442,11 @@ where /// /// # Arguments /// * `duration` - Extension duration. Defaults to original claim_timeout. + /// Must be at least 1 second. /// /// # Errors - /// Returns `TaskError::Control(Cancelled)` if the task was cancelled. + /// - Returns `TaskError::Validation` if duration is less than 1 second. + /// - Returns `TaskError::Control(Cancelled)` if the task was cancelled. #[cfg_attr( feature = "telemetry", tracing::instrument( @@ -455,21 +456,24 @@ where ) )] pub async fn heartbeat(&self, duration: Option) -> TaskResult<()> { - let extend_by = duration - .map(|d| d.as_secs() as i32) - .unwrap_or(self.claim_timeout as i32); + let extend_by = duration.unwrap_or(self.claim_timeout); + + if extend_by < std::time::Duration::from_secs(1) { + return Err(TaskError::Validation { + message: "heartbeat duration must be at least 1 second".to_string(), + }); + } let query = "SELECT durable.extend_claim($1, $2, $3)"; sqlx::query(query) .bind(&self.queue_name) .bind(self.run_id) - .bind(extend_by) + .bind(extend_by.as_secs() as i32) .execute(&self.pool) .await?; // Notify worker that lease was extended so it can reset timers - self.lease_extender - .notify(Duration::from_secs(extend_by as u64)); + self.lease_extender.notify(extend_by); Ok(()) } @@ -834,7 +838,7 @@ mod tests { headers: None, wake_event: None, }, - 10, + Duration::from_secs(10), LeaseExtender::dummy_for_tests(), Arc::new(RwLock::new(TaskRegistry::new())), (), diff --git a/src/error.rs b/src/error.rs index af57c9d..e7d7df4 100644 --- a/src/error.rs +++ b/src/error.rs @@ -271,6 +271,15 @@ pub enum DurableError { /// The reason the event name was invalid. reason: String, }, + + /// Configuration validation failed. + /// + /// Returned when worker options contain invalid values. + #[error("invalid configuration: {reason}")] + InvalidConfiguration { + /// The reason the configuration was invalid. + reason: String, + }, } /// Result type alias for Client API operations. diff --git a/src/types.rs b/src/types.rs index 8954cd4..5861750 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,19 +1,21 @@ use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use serde_json::Value as JsonValue; +use serde_with::{DurationSeconds, serde_as}; use std::collections::HashMap; use std::marker::PhantomData; +use std::time::Duration; use uuid::Uuid; // Default value functions for RetryStrategy -fn default_base_seconds() -> u64 { - 5 +fn default_base_delay() -> Duration { + Duration::from_secs(5) } fn default_factor() -> f64 { 2.0 } -fn default_max_seconds() -> u64 { - 300 +fn default_max_backoff() -> Duration { + Duration::from_secs(300) } /// Retry strategy for failed tasks. @@ -24,17 +26,19 @@ fn default_max_seconds() -> u64 { /// # Example /// /// ``` +/// use std::time::Duration; /// use durable::{RetryStrategy, SpawnOptions}; /// /// let options = SpawnOptions { /// retry_strategy: Some(RetryStrategy::Exponential { -/// base_seconds: 1, +/// base_delay: Duration::from_secs(1), /// factor: 2.0, -/// max_seconds: 60, +/// max_backoff: Duration::from_secs(60), /// }), /// ..Default::default() /// }; /// ``` +#[serde_as] #[derive(Debug, Clone, Serialize, Deserialize)] #[serde(tag = "kind", rename_all = "snake_case")] pub enum RetryStrategy { @@ -43,29 +47,32 @@ pub enum RetryStrategy { /// Fixed delay between retries Fixed { - /// Delay in seconds between retry attempts (default: 5) - #[serde(default = "default_base_seconds")] - base_seconds: u64, + /// Delay between retry attempts (default: 5 seconds) + #[serde(default = "default_base_delay", rename = "base_seconds")] + #[serde_as(as = "DurationSeconds")] + base_delay: Duration, }, - /// Exponential backoff: delay = base_seconds * (factor ^ (attempt - 1)) + /// Exponential backoff: delay = base_delay * (factor ^ (attempt - 1)) Exponential { - /// Initial delay in seconds (default: 5) - #[serde(default = "default_base_seconds")] - base_seconds: u64, + /// Initial delay (default: 5 seconds) + #[serde(default = "default_base_delay", rename = "base_seconds")] + #[serde_as(as = "DurationSeconds")] + base_delay: Duration, /// Multiplier for each subsequent attempt (default: 2.0) #[serde(default = "default_factor")] factor: f64, - /// Maximum delay cap in seconds (default: 300) - #[serde(default = "default_max_seconds")] - max_seconds: u64, + /// Maximum delay cap (default: 300 seconds) + #[serde(default = "default_max_backoff", rename = "max_seconds")] + #[serde_as(as = "DurationSeconds")] + max_backoff: Duration, }, } impl Default for RetryStrategy { fn default() -> Self { Self::Fixed { - base_seconds: default_base_seconds(), + base_delay: default_base_delay(), } } } @@ -74,17 +81,20 @@ impl Default for RetryStrategy { /// /// Allows tasks to be automatically cancelled based on how long they've been /// waiting or running. Useful for preventing stale tasks from consuming resources. +#[serde_as] #[derive(Debug, Clone, Default, Serialize, Deserialize)] pub struct CancellationPolicy { - /// Cancel if task has been pending for more than this many seconds. + /// Cancel if task has been pending for more than this duration. /// Checked when the task would be claimed. - #[serde(skip_serializing_if = "Option::is_none")] - pub max_delay: Option, + #[serde(skip_serializing_if = "Option::is_none", rename = "max_delay")] + #[serde_as(as = "Option>")] + pub max_pending_time: Option, - /// Cancel if task has been running for more than this many seconds total + /// Cancel if task has been running for more than this duration total /// (across all attempts). Checked on retry. - #[serde(skip_serializing_if = "Option::is_none")] - pub max_duration: Option, + #[serde(skip_serializing_if = "Option::is_none", rename = "max_duration")] + #[serde_as(as = "Option>")] + pub max_running_time: Option, } /// Options for spawning a task. @@ -94,14 +104,15 @@ pub struct CancellationPolicy { /// # Example /// /// ``` +/// use std::time::Duration; /// use durable::{SpawnOptions, RetryStrategy}; /// /// let options = SpawnOptions { /// max_attempts: Some(3), /// retry_strategy: Some(RetryStrategy::Exponential { -/// base_seconds: 5, +/// base_delay: Duration::from_secs(5), /// factor: 2.0, -/// max_seconds: 300, +/// max_backoff: Duration::from_secs(300), /// }), /// ..Default::default() /// }; @@ -130,12 +141,13 @@ pub struct SpawnOptions { /// # Example /// /// ``` +/// use std::time::Duration; /// use durable::WorkerOptions; /// /// let options = WorkerOptions { /// concurrency: 4, -/// claim_timeout: 120, -/// poll_interval: 0.5, +/// claim_timeout: Duration::from_secs(120), +/// poll_interval: Duration::from_millis(500), /// ..Default::default() /// }; /// ``` @@ -144,9 +156,9 @@ pub struct WorkerOptions { /// Unique worker identifier (default: hostname:pid) pub worker_id: Option, - /// Task lease duration in seconds (default: 120). + /// Task lease duration (default: 120 seconds). /// Tasks must complete or checkpoint within this time. - pub claim_timeout: u64, + pub claim_timeout: Duration, /// Maximum tasks to claim per poll (default: same as concurrency) pub batch_size: Option, @@ -154,8 +166,8 @@ pub struct WorkerOptions { /// Maximum parallel task executions (default: 1) pub concurrency: usize, - /// Seconds between polls when queue is empty (default: 0.25) - pub poll_interval: f64, + /// Interval between polls when queue is empty (default: 250ms) + pub poll_interval: Duration, /// Terminate process if task exceeds 2x claim_timeout (default: false). /// When false, the task is aborted but other tasks continue running. @@ -167,10 +179,10 @@ impl Default for WorkerOptions { fn default() -> Self { Self { worker_id: None, - claim_timeout: 120, + claim_timeout: Duration::from_secs(120), batch_size: None, concurrency: 1, - poll_interval: 0.25, + poll_interval: Duration::from_millis(250), fatal_on_lease_timeout: false, } } diff --git a/src/worker.rs b/src/worker.rs index ce6ead9..c93619b 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -123,7 +123,7 @@ impl Worker { let concurrency = options.concurrency; let batch_size = options.batch_size.unwrap_or(concurrency); let claim_timeout = options.claim_timeout; - let poll_interval = std::time::Duration::from_secs_f64(options.poll_interval); + let poll_interval = options.poll_interval; let fatal_on_lease_timeout = options.fatal_on_lease_timeout; // Mark worker as active @@ -223,7 +223,7 @@ impl Worker { pool: &PgPool, queue_name: &str, worker_id: &str, - claim_timeout: u64, + claim_timeout: Duration, count: usize, ) -> anyhow::Result> { #[cfg(feature = "telemetry")] @@ -236,7 +236,7 @@ impl Worker { let rows: Vec = sqlx::query_as(query) .bind(queue_name) .bind(worker_id) - .bind(claim_timeout as i32) + .bind(claim_timeout.as_secs() as i32) .bind(count as i32) .fetch_all(pool) .await?; @@ -263,7 +263,7 @@ impl Worker { queue_name: String, registry: Arc>>, task: ClaimedTask, - claim_timeout: u64, + claim_timeout: Duration, fatal_on_lease_timeout: bool, state: State, ) where @@ -305,7 +305,7 @@ impl Worker { queue_name: String, registry: Arc>>, task: ClaimedTask, - claim_timeout: u64, + claim_timeout: Duration, fatal_on_lease_timeout: bool, state: State, ) where @@ -377,7 +377,7 @@ impl Worker { let timer_handle = tokio::spawn({ let task_label = task_label.clone(); async move { - let mut warn_duration = Duration::from_secs(claim_timeout); + let mut warn_duration = claim_timeout; let mut fatal_duration = warn_duration * 2; let mut warn_fired = false; let mut deadline = Instant::now(); @@ -414,7 +414,7 @@ impl Worker { tracing::warn!( "Task {} exceeded claim timeout of {}s (no heartbeat/step since last extension)", task_label, - claim_timeout + claim_timeout.as_secs() ); warn_fired = true; } @@ -449,7 +449,7 @@ impl Worker { task_id = %task_id, run_id = %run_id, elapsed_secs = elapsed.as_secs(), - claim_timeout_secs = claim_timeout, + claim_timeout_secs = claim_timeout.as_secs(), "Task {} exceeded 2x claim timeout without heartbeat; terminating process", task_label ); @@ -459,7 +459,7 @@ impl Worker { task_id = %task_id, run_id = %run_id, elapsed_secs = elapsed.as_secs(), - claim_timeout_secs = claim_timeout, + claim_timeout_secs = claim_timeout.as_secs(), "Task {} exceeded 2x claim timeout without heartbeat; aborting task", task_label ); diff --git a/tests/checkpoint_test.rs b/tests/checkpoint_test.rs index 7ecc1bb..661902a 100644 --- a/tests/checkpoint_test.rs +++ b/tests/checkpoint_test.rs @@ -42,7 +42,9 @@ async fn test_checkpoint_prevents_step_reexecution(pool: PgPool) -> sqlx::Result fail_after_step2: true, }, SpawnOptions { - retry_strategy: Some(RetryStrategy::Fixed { base_seconds: 0 }), + retry_strategy: Some(RetryStrategy::Fixed { + base_delay: Duration::from_secs(0), + }), max_attempts: Some(2), ..Default::default() }, @@ -53,11 +55,12 @@ async fn test_checkpoint_prevents_step_reexecution(pool: PgPool) -> sqlx::Result // Start worker let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await + .unwrap(); // Wait for task to fail (will hit max attempts since fail_after_step2 is always true) let terminal = wait_for_task_terminal( @@ -94,11 +97,12 @@ async fn test_checkpoint_prevents_step_reexecution(pool: PgPool) -> sqlx::Result let worker2 = client2 .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await + .unwrap(); let terminal2 = wait_for_task_terminal( &pool, @@ -138,7 +142,9 @@ async fn test_deterministic_rand_preserved_on_retry(pool: PgPool) -> sqlx::Resul fail_on_first_attempt: true, }, SpawnOptions { - retry_strategy: Some(RetryStrategy::Fixed { base_seconds: 0 }), + retry_strategy: Some(RetryStrategy::Fixed { + base_delay: Duration::from_secs(0), + }), max_attempts: Some(2), ..Default::default() }, @@ -148,11 +154,12 @@ async fn test_deterministic_rand_preserved_on_retry(pool: PgPool) -> sqlx::Resul let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await + .unwrap(); let terminal = wait_for_task_terminal( &pool, @@ -199,7 +206,9 @@ async fn test_deterministic_now_preserved_on_retry(pool: PgPool) -> sqlx::Result fail_on_first_attempt: true, }, SpawnOptions { - retry_strategy: Some(RetryStrategy::Fixed { base_seconds: 0 }), + retry_strategy: Some(RetryStrategy::Fixed { + base_delay: Duration::from_secs(0), + }), max_attempts: Some(2), ..Default::default() }, @@ -209,11 +218,12 @@ async fn test_deterministic_now_preserved_on_retry(pool: PgPool) -> sqlx::Result let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await + .unwrap(); let terminal = wait_for_task_terminal( &pool, @@ -257,7 +267,9 @@ async fn test_deterministic_uuid7_preserved_on_retry(pool: PgPool) -> sqlx::Resu fail_on_first_attempt: true, }, SpawnOptions { - retry_strategy: Some(RetryStrategy::Fixed { base_seconds: 0 }), + retry_strategy: Some(RetryStrategy::Fixed { + base_delay: Duration::from_secs(0), + }), max_attempts: Some(2), ..Default::default() }, @@ -267,11 +279,12 @@ async fn test_deterministic_uuid7_preserved_on_retry(pool: PgPool) -> sqlx::Resu let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await + .unwrap(); let terminal = wait_for_task_terminal( &pool, @@ -316,11 +329,12 @@ async fn test_long_workflow_many_steps(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 60, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(60), ..Default::default() }) - .await; + .await + .unwrap(); let terminal = wait_for_task_terminal( &pool, @@ -368,11 +382,12 @@ async fn test_large_payload_checkpoint(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 60, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(60), ..Default::default() }) - .await; + .await + .unwrap(); let terminal = wait_for_task_terminal( &pool, diff --git a/tests/concurrency_test.rs b/tests/concurrency_test.rs index 22f9a92..5c430bf 100644 --- a/tests/concurrency_test.rs +++ b/tests/concurrency_test.rs @@ -57,12 +57,13 @@ async fn test_task_claimed_by_exactly_one_worker(pool: PgPool) -> sqlx::Result<( let worker = client .start_worker(WorkerOptions { - poll_interval: 0.01, // Fast polling - claim_timeout: 30, + poll_interval: Duration::from_millis(10), // Fast polling + claim_timeout: Duration::from_secs(30), concurrency: 1, ..Default::default() }) - .await; + .await + .unwrap(); // Let workers run for a bit tokio::time::sleep(Duration::from_millis(500)).await; @@ -156,12 +157,13 @@ async fn test_concurrent_claims_with_skip_locked(pool: PgPool) -> sqlx::Result<( let worker = client .start_worker(WorkerOptions { - poll_interval: 0.01, // Fast polling to maximize contention - claim_timeout: 30, + poll_interval: Duration::from_millis(10), // Fast polling to maximize contention + claim_timeout: Duration::from_secs(30), concurrency: 5, // Each worker handles multiple tasks ..Default::default() }) - .await; + .await + .unwrap(); // Let workers process tasks tokio::time::sleep(Duration::from_secs(3)).await; diff --git a/tests/crash_test.rs b/tests/crash_test.rs index ab1fde3..bedca5d 100644 --- a/tests/crash_test.rs +++ b/tests/crash_test.rs @@ -42,7 +42,9 @@ async fn test_crash_mid_step_resumes_from_checkpoint(pool: PgPool) -> sqlx::Resu fail_after_step2: true, }, SpawnOptions { - retry_strategy: Some(RetryStrategy::Fixed { base_seconds: 0 }), + retry_strategy: Some(RetryStrategy::Fixed { + base_delay: Duration::from_secs(0), + }), max_attempts: Some(3), ..Default::default() }, @@ -54,11 +56,12 @@ async fn test_crash_mid_step_resumes_from_checkpoint(pool: PgPool) -> sqlx::Resu { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await + .unwrap(); // Wait for some progress tokio::time::sleep(Duration::from_millis(500)).await; @@ -77,11 +80,12 @@ async fn test_crash_mid_step_resumes_from_checkpoint(pool: PgPool) -> sqlx::Resu // Second worker picks up the task let worker2 = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 5, // Short timeout to reclaim quickly + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(5), // Short timeout to reclaim quickly ..Default::default() }) - .await; + .await + .unwrap(); // Wait for task to reach terminal state let terminal = wait_for_task_terminal( @@ -113,7 +117,7 @@ async fn test_worker_drop_without_shutdown(pool: PgPool) -> sqlx::Result<()> { client.create_queue(None).await.unwrap(); client.register::().await.unwrap(); - let claim_timeout = 2; // 2 second lease + let claim_timeout = Duration::from_secs(2); // 2 second lease // Spawn a slow task that will outlive the lease let spawn_result = client @@ -127,11 +131,12 @@ async fn test_worker_drop_without_shutdown(pool: PgPool) -> sqlx::Result<()> { { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, + poll_interval: Duration::from_millis(50), claim_timeout, ..Default::default() }) - .await; + .await + .unwrap(); // Wait for task to start tokio::time::sleep(Duration::from_millis(500)).await; @@ -145,16 +150,17 @@ async fn test_worker_drop_without_shutdown(pool: PgPool) -> sqlx::Result<()> { } // Wait for real time to pass the lease timeout - tokio::time::sleep(Duration::from_secs(claim_timeout + 1)).await; + tokio::time::sleep(claim_timeout + Duration::from_secs(1)).await; // Second worker should reclaim the task let worker2 = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 60, // Longer timeout for second worker + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(60), // Longer timeout for second worker ..Default::default() }) - .await; + .await + .unwrap(); // Give time for reclaim and some progress tokio::time::sleep(Duration::from_secs(2)).await; @@ -184,7 +190,7 @@ async fn test_lease_expiration_allows_reclaim(pool: PgPool) -> sqlx::Result<()> let start_time = chrono::Utc::now(); set_fake_time(&pool, start_time).await?; - let claim_timeout = 2; // 2 second lease + let claim_timeout = Duration::from_secs(2); // 2 second lease // Spawn a long-running task that heartbeats (but we'll let the lease expire) let spawn_result = client @@ -198,11 +204,12 @@ async fn test_lease_expiration_allows_reclaim(pool: PgPool) -> sqlx::Result<()> // First worker claims the task let worker1 = client .start_worker(WorkerOptions { - poll_interval: 0.05, + poll_interval: Duration::from_millis(50), claim_timeout, ..Default::default() }) - .await; + .await + .unwrap(); // Wait for task to start tokio::time::sleep(Duration::from_millis(200)).await; @@ -214,16 +221,17 @@ async fn test_lease_expiration_allows_reclaim(pool: PgPool) -> sqlx::Result<()> worker1.shutdown().await; // Advance time past the lease timeout - advance_time(&pool, claim_timeout as i64 + 1).await?; + advance_time(&pool, claim_timeout.as_secs() as i64 + 1).await?; // Second worker should be able to reclaim let worker2 = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 60, // Longer timeout this time + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(60), // Longer timeout this time ..Default::default() }) - .await; + .await + .unwrap(); // Give second worker time to reclaim tokio::time::sleep(Duration::from_millis(500)).await; @@ -248,7 +256,7 @@ async fn test_heartbeat_prevents_lease_expiration(pool: PgPool) -> sqlx::Result< client.create_queue(None).await.unwrap(); client.register::().await.unwrap(); - let claim_timeout = 2; // 2 second lease + let claim_timeout = Duration::from_secs(2); // 2 second lease // Spawn a task that runs for 5 seconds with frequent heartbeats let spawn_result = client @@ -261,11 +269,12 @@ async fn test_heartbeat_prevents_lease_expiration(pool: PgPool) -> sqlx::Result< let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, + poll_interval: Duration::from_millis(50), claim_timeout, ..Default::default() }) - .await; + .await + .unwrap(); // Wait for task to complete let terminal = wait_for_task_terminal( @@ -309,12 +318,13 @@ async fn test_spawn_idempotency_after_retry(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), concurrency: 2, // Handle parent and child ..Default::default() }) - .await; + .await + .unwrap(); // Wait for parent to complete let terminal = wait_for_task_terminal( @@ -372,7 +382,9 @@ async fn test_step_idempotency_after_retry(pool: PgPool) -> sqlx::Result<()> { fail_after_step2: false, // Don't fail, just complete }, SpawnOptions { - retry_strategy: Some(RetryStrategy::Fixed { base_seconds: 0 }), + retry_strategy: Some(RetryStrategy::Fixed { + base_delay: Duration::from_secs(0), + }), max_attempts: Some(2), ..Default::default() }, @@ -382,11 +394,12 @@ async fn test_step_idempotency_after_retry(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await + .unwrap(); let terminal = wait_for_task_terminal( &pool, @@ -428,7 +441,7 @@ async fn test_cpu_bound_outlives_lease(pool: PgPool) -> sqlx::Result<()> { let start_time = chrono::Utc::now(); set_fake_time(&pool, start_time).await?; - let claim_timeout = 2; // 2 second lease + let claim_timeout = Duration::from_secs(2); // 2 second lease // Spawn a CPU-bound task that runs for 10 seconds (way longer than lease) let spawn_result = client @@ -437,7 +450,9 @@ async fn test_cpu_bound_outlives_lease(pool: PgPool) -> sqlx::Result<()> { duration_ms: 10000, // 10 seconds }, SpawnOptions { - retry_strategy: Some(RetryStrategy::Fixed { base_seconds: 0 }), + retry_strategy: Some(RetryStrategy::Fixed { + base_delay: Duration::from_secs(0), + }), max_attempts: Some(3), ..Default::default() }, @@ -447,17 +462,18 @@ async fn test_cpu_bound_outlives_lease(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, + poll_interval: Duration::from_millis(50), claim_timeout, ..Default::default() }) - .await; + .await + .unwrap(); // Wait for task to be claimed and start tokio::time::sleep(Duration::from_millis(200)).await; // Advance time past the lease timeout - advance_time(&pool, claim_timeout as i64 + 1).await?; + advance_time(&pool, claim_timeout.as_secs() as i64 + 1).await?; // Give time for reclaim to happen tokio::time::sleep(Duration::from_millis(1000)).await; @@ -485,7 +501,7 @@ async fn test_slow_task_outlives_lease(pool: PgPool) -> sqlx::Result<()> { client.create_queue(None).await.unwrap(); client.register::().await.unwrap(); - let claim_timeout = 2; // 2 second lease + let claim_timeout = Duration::from_secs(2); // 2 second lease // Spawn a slow task that sleeps for 30 seconds without heartbeat let spawn_result = client @@ -494,7 +510,9 @@ async fn test_slow_task_outlives_lease(pool: PgPool) -> sqlx::Result<()> { sleep_ms: 30000, // 30 seconds - much longer than lease }, SpawnOptions { - retry_strategy: Some(RetryStrategy::Fixed { base_seconds: 0 }), + retry_strategy: Some(RetryStrategy::Fixed { + base_delay: Duration::from_secs(0), + }), max_attempts: Some(5), ..Default::default() }, @@ -504,11 +522,12 @@ async fn test_slow_task_outlives_lease(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, + poll_interval: Duration::from_millis(50), claim_timeout, ..Default::default() }) - .await; + .await + .unwrap(); // Wait for task to be claimed and start sleeping tokio::time::sleep(Duration::from_millis(500)).await; @@ -517,7 +536,7 @@ async fn test_slow_task_outlives_lease(pool: PgPool) -> sqlx::Result<()> { assert_eq!(state, Some("running".to_string())); // Wait for real time to pass the lease timeout - tokio::time::sleep(Duration::from_secs(claim_timeout + 2)).await; + tokio::time::sleep(claim_timeout + Duration::from_secs(2)).await; // Verify a new run was created (reclaim happened) let run_count = count_runs_for_task(&pool, "crash_slow", spawn_result.task_id).await?; diff --git a/tests/event_test.rs b/tests/event_test.rs index 0a4ca1e..6eb9e20 100644 --- a/tests/event_test.rs +++ b/tests/event_test.rs @@ -40,11 +40,12 @@ async fn test_emit_event_wakes_waiter(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await + .unwrap(); // Wait for task to start waiting tokio::time::sleep(Duration::from_millis(300)).await; @@ -113,11 +114,12 @@ async fn test_event_already_emitted_returns_immediately(pool: PgPool) -> sqlx::R let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await + .unwrap(); // Task should complete quickly since event exists let terminal = wait_for_task_terminal( @@ -170,11 +172,12 @@ async fn test_event_timeout_triggers(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await + .unwrap(); // Wait for task to fail due to timeout let terminal = wait_for_task_terminal( @@ -226,12 +229,13 @@ async fn test_multiple_waiters_same_event(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), concurrency: 3, ..Default::default() }) - .await; + .await + .unwrap(); // Wait for all tasks to start waiting tokio::time::sleep(Duration::from_millis(500)).await; @@ -279,7 +283,9 @@ async fn test_event_payload_preserved_on_retry(pool: PgPool) -> sqlx::Result<()> event_name: "retry_event".to_string(), }, SpawnOptions { - retry_strategy: Some(RetryStrategy::Fixed { base_seconds: 0 }), + retry_strategy: Some(RetryStrategy::Fixed { + base_delay: Duration::from_secs(0), + }), max_attempts: Some(2), ..Default::default() }, @@ -289,11 +295,12 @@ async fn test_event_payload_preserved_on_retry(pool: PgPool) -> sqlx::Result<()> let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await + .unwrap(); // Wait for task to start waiting for event tokio::time::sleep(Duration::from_millis(300)).await; @@ -360,11 +367,12 @@ async fn test_event_first_writer_wins(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await + .unwrap(); let terminal = wait_for_task_terminal( &pool, @@ -411,11 +419,12 @@ async fn test_multiple_distinct_events(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await + .unwrap(); // Wait for task to start tokio::time::sleep(Duration::from_millis(300)).await; @@ -481,11 +490,12 @@ async fn test_event_write_does_not_propagate_after_wake(pool: PgPool) -> sqlx::R let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await + .unwrap(); // Wait for task to start waiting for event tokio::time::sleep(Duration::from_millis(300)).await; @@ -551,12 +561,13 @@ async fn test_emit_from_different_task(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), concurrency: 2, ..Default::default() }) - .await; + .await + .unwrap(); // Wait for waiter to start tokio::time::sleep(Duration::from_millis(300)).await; @@ -726,11 +737,12 @@ async fn test_event_timeout_error_payload(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await + .unwrap(); // Wait for task to fail due to timeout let terminal = wait_for_task_terminal( @@ -853,12 +865,13 @@ async fn test_event_race_stress(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.01, - claim_timeout: 60, + poll_interval: Duration::from_millis(10), + claim_timeout: Duration::from_secs(60), concurrency: 32, ..Default::default() }) - .await; + .await + .unwrap(); for round in 0..rounds { let mut task_ids = Vec::with_capacity(tasks_per_round); diff --git a/tests/execution_test.rs b/tests/execution_test.rs index e32c45a..876b2e5 100644 --- a/tests/execution_test.rs +++ b/tests/execution_test.rs @@ -83,11 +83,12 @@ async fn test_simple_task_executes_and_completes(pool: PgPool) -> sqlx::Result<( // Start worker with short poll interval let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await + .unwrap(); // Wait for task to complete tokio::time::sleep(Duration::from_millis(500)).await; @@ -129,11 +130,12 @@ async fn test_task_state_transitions(pool: PgPool) -> sqlx::Result<()> { // Start worker let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await + .unwrap(); // Wait for task to complete tokio::time::sleep(Duration::from_millis(500)).await; @@ -159,11 +161,12 @@ async fn test_empty_params_task_executes(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await + .unwrap(); tokio::time::sleep(Duration::from_millis(500)).await; worker.shutdown().await; @@ -196,11 +199,12 @@ async fn test_multi_step_task_completes_all_steps(pool: PgPool) -> sqlx::Result< let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await + .unwrap(); tokio::time::sleep(Duration::from_millis(500)).await; worker.shutdown().await; @@ -246,12 +250,13 @@ async fn test_multiple_tasks_execute_concurrently(pool: PgPool) -> sqlx::Result< // Start worker with concurrency > 1 let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), concurrency: 5, ..Default::default() }) - .await; + .await + .unwrap(); // Wait for all tasks to complete tokio::time::sleep(Duration::from_millis(1000)).await; @@ -285,12 +290,13 @@ async fn test_worker_concurrency_limit_respected(pool: PgPool) -> sqlx::Result<( // Start worker with low concurrency let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), concurrency: 2, // Only 2 at a time ..Default::default() }) - .await; + .await + .unwrap(); // Give some time for processing tokio::time::sleep(Duration::from_millis(2000)).await; @@ -323,11 +329,12 @@ async fn test_worker_graceful_shutdown_waits(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await + .unwrap(); // Very short wait, then shutdown tokio::time::sleep(Duration::from_millis(200)).await; @@ -394,11 +401,12 @@ async fn test_task_result_stored_correctly(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await + .unwrap(); tokio::time::sleep(Duration::from_millis(500)).await; worker.shutdown().await; @@ -430,11 +438,12 @@ async fn test_research_task_readme_example(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await + .unwrap(); tokio::time::sleep(Duration::from_millis(500)).await; worker.shutdown().await; @@ -478,11 +487,12 @@ async fn test_convenience_methods_execute(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await + .unwrap(); tokio::time::sleep(Duration::from_millis(500)).await; worker.shutdown().await; @@ -529,11 +539,12 @@ async fn test_multiple_convenience_calls_produce_different_values( let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await + .unwrap(); tokio::time::sleep(Duration::from_millis(500)).await; worker.shutdown().await; @@ -572,11 +583,12 @@ async fn test_reserved_prefix_rejected(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await + .unwrap(); tokio::time::sleep(Duration::from_millis(500)).await; worker.shutdown().await; @@ -611,11 +623,12 @@ async fn test_reserved_prefix_error_payload(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await + .unwrap(); tokio::time::sleep(Duration::from_millis(500)).await; worker.shutdown().await; @@ -672,11 +685,12 @@ async fn test_long_running_task_with_heartbeat_completes(pool: PgPool) -> sqlx:: let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 1, // 1 second claim timeout - task runs for 3x this duration + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(1), // 1 second claim timeout - task runs for 3x this duration ..Default::default() }) - .await; + .await + .unwrap(); // Wait for task to complete (3 seconds + buffer) tokio::time::sleep(Duration::from_millis(4000)).await; @@ -791,11 +805,12 @@ async fn test_task_uses_application_state(pool: PgPool) -> sqlx::Result<()> { // Start worker let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await + .unwrap(); // Wait for task to complete tokio::time::sleep(Duration::from_millis(500)).await; diff --git a/tests/fanout_test.rs b/tests/fanout_test.rs index ec039f9..d5e7ddf 100644 --- a/tests/fanout_test.rs +++ b/tests/fanout_test.rs @@ -104,12 +104,13 @@ async fn test_spawn_single_child_and_join(pool: PgPool) -> sqlx::Result<()> { // Start worker with concurrency to handle both parent and child let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), concurrency: 2, ..Default::default() }) - .await; + .await + .unwrap(); // Wait for tasks to complete tokio::time::sleep(Duration::from_millis(2000)).await; @@ -152,12 +153,13 @@ async fn test_spawn_multiple_children_and_join(pool: PgPool) -> sqlx::Result<()> // Start worker with high concurrency let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), concurrency: 10, ..Default::default() }) - .await; + .await + .unwrap(); // Wait for tasks to complete tokio::time::sleep(Duration::from_millis(3000)).await; @@ -202,12 +204,13 @@ async fn test_child_has_parent_task_id(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), concurrency: 2, ..Default::default() }) - .await; + .await + .unwrap(); tokio::time::sleep(Duration::from_millis(2000)).await; worker.shutdown().await; @@ -265,12 +268,13 @@ async fn test_child_failure_propagates_to_parent(pool: PgPool) -> sqlx::Result<( let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), concurrency: 4, ..Default::default() }) - .await; + .await + .unwrap(); // Wait for tasks to complete - longer since child needs to fail first, then parent tokio::time::sleep(Duration::from_millis(5000)).await; @@ -305,12 +309,13 @@ async fn test_cascade_cancel_when_parent_cancelled(pool: PgPool) -> sqlx::Result // Start worker to let parent spawn child let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), concurrency: 2, // Process both parent and child ..Default::default() }) - .await; + .await + .unwrap(); // Give time for parent to spawn child and child to start tokio::time::sleep(Duration::from_millis(500)).await; @@ -370,12 +375,13 @@ async fn test_spawn_by_name_from_task_context(pool: PgPool) -> sqlx::Result<()> // Start worker with concurrency to handle both parent and child let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), concurrency: 2, ..Default::default() }) - .await; + .await + .unwrap(); // Wait for tasks to complete tokio::time::sleep(Duration::from_millis(2000)).await; @@ -431,12 +437,13 @@ async fn test_join_cancelled_child_returns_child_cancelled_error(pool: PgPool) - let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), concurrency: 2, // Need concurrency for both parent and child ..Default::default() }) - .await; + .await + .unwrap(); // Wait for child to be spawned and start tokio::time::sleep(Duration::from_millis(500)).await; @@ -519,12 +526,13 @@ async fn test_child_failed_error_contains_message(pool: PgPool) -> sqlx::Result< let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), concurrency: 4, ..Default::default() }) - .await; + .await + .unwrap(); // Wait for parent to fail let terminal = wait_for_task_terminal( @@ -599,12 +607,13 @@ async fn test_join_timeout_when_parent_claim_expires(pool: PgPool) -> sqlx::Resu // When the claim expires while suspended, the task should eventually timeout let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 2, // Very short - 2 seconds + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(2), // Very short - 2 seconds concurrency: 2, ..Default::default() }) - .await; + .await + .unwrap(); // Wait for parent to fail (should timeout when claim expires) let terminal = wait_for_task_terminal( diff --git a/tests/lease_test.rs b/tests/lease_test.rs index 9456429..3cbd81b 100644 --- a/tests/lease_test.rs +++ b/tests/lease_test.rs @@ -33,7 +33,7 @@ async fn test_claim_sets_correct_expiry(pool: PgPool) -> sqlx::Result<()> { let start_time = chrono::Utc::now(); set_fake_time(&pool, start_time).await?; - let claim_timeout = 30; // 30 seconds + let claim_timeout = Duration::from_secs(30); // 30 seconds // Spawn a task that will take a while (uses heartbeats) let spawn_result = client @@ -46,11 +46,12 @@ async fn test_claim_sets_correct_expiry(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, + poll_interval: Duration::from_millis(50), claim_timeout, ..Default::default() }) - .await; + .await + .unwrap(); // Wait for the task to be claimed tokio::time::sleep(Duration::from_millis(200)).await; @@ -66,13 +67,13 @@ async fn test_claim_sets_correct_expiry(pool: PgPool) -> sqlx::Result<()> { .expect("claim_expires_at should be set"); // Should be approximately start_time + claim_timeout seconds - let expected_expiry = start_time + chrono::Duration::seconds(claim_timeout as i64); + let expected_expiry = start_time + chrono::Duration::seconds(claim_timeout.as_secs() as i64); let diff = (claim_expires - expected_expiry).num_seconds().abs(); assert!( diff <= 2, "claim_expires_at should be ~{} seconds from start, got {} seconds diff", - claim_timeout, + claim_timeout.as_secs(), diff ); @@ -91,7 +92,7 @@ async fn test_heartbeat_extends_lease(pool: PgPool) -> sqlx::Result<()> { let start_time = chrono::Utc::now(); set_fake_time(&pool, start_time).await?; - let claim_timeout = 10; // 10 seconds + let claim_timeout = Duration::from_secs(10); // 10 seconds // Spawn task that heartbeats frequently let spawn_result = client @@ -104,11 +105,12 @@ async fn test_heartbeat_extends_lease(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, + poll_interval: Duration::from_millis(50), claim_timeout, ..Default::default() }) - .await; + .await + .unwrap(); // Wait for task to be claimed tokio::time::sleep(Duration::from_millis(200)).await; @@ -165,7 +167,7 @@ async fn test_checkpoint_extends_lease(pool: PgPool) -> sqlx::Result<()> { let start_time = chrono::Utc::now(); set_fake_time(&pool, start_time).await?; - let claim_timeout = 30; + let claim_timeout = Duration::from_secs(30); let num_steps = 20; // Enough steps to observe lease extension // Spawn task that creates many checkpoints @@ -176,11 +178,12 @@ async fn test_checkpoint_extends_lease(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, + poll_interval: Duration::from_millis(50), claim_timeout, ..Default::default() }) - .await; + .await + .unwrap(); // Wait for task to start running tokio::time::sleep(Duration::from_millis(200)).await; @@ -243,11 +246,12 @@ async fn test_heartbeat_detects_cancellation(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await + .unwrap(); // Wait for task to start executing tokio::time::sleep(Duration::from_millis(500)).await; diff --git a/tests/partition_test.rs b/tests/partition_test.rs index 4003940..e02abfc 100644 --- a/tests/partition_test.rs +++ b/tests/partition_test.rs @@ -36,7 +36,9 @@ async fn test_db_connection_lost_during_checkpoint(pool: PgPool) -> sqlx::Result fail_after_step2: true, }, SpawnOptions { - retry_strategy: Some(RetryStrategy::Fixed { base_seconds: 0 }), + retry_strategy: Some(RetryStrategy::Fixed { + base_delay: Duration::from_secs(0), + }), max_attempts: Some(3), ..Default::default() }, @@ -46,11 +48,12 @@ async fn test_db_connection_lost_during_checkpoint(pool: PgPool) -> sqlx::Result let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await + .unwrap(); // Wait for task to fail (will retry but always fail after step 2) let terminal = wait_for_task_terminal( @@ -89,7 +92,7 @@ async fn test_stale_worker_checkpoint_rejected(pool: PgPool) -> sqlx::Result<()> client.create_queue(None).await.unwrap(); client.register::().await.unwrap(); - let claim_timeout = 2; // Short lease + let claim_timeout = Duration::from_secs(2); // Short lease // Spawn a slow task let spawn_result = client @@ -98,7 +101,9 @@ async fn test_stale_worker_checkpoint_rejected(pool: PgPool) -> sqlx::Result<()> sleep_ms: 30000, // 30 seconds }, SpawnOptions { - retry_strategy: Some(RetryStrategy::Fixed { base_seconds: 0 }), + retry_strategy: Some(RetryStrategy::Fixed { + base_delay: Duration::from_secs(0), + }), max_attempts: Some(5), ..Default::default() }, @@ -109,11 +114,12 @@ async fn test_stale_worker_checkpoint_rejected(pool: PgPool) -> sqlx::Result<()> // First worker claims the task let worker1 = client .start_worker(WorkerOptions { - poll_interval: 0.05, + poll_interval: Duration::from_millis(50), claim_timeout, ..Default::default() }) - .await; + .await + .unwrap(); // Wait for task to be claimed tokio::time::sleep(Duration::from_millis(500)).await; @@ -122,16 +128,17 @@ async fn test_stale_worker_checkpoint_rejected(pool: PgPool) -> sqlx::Result<()> worker1.shutdown().await; // Wait for lease to expire - tokio::time::sleep(Duration::from_secs(claim_timeout + 1)).await; + tokio::time::sleep(claim_timeout + Duration::from_secs(1)).await; // Second worker reclaims let worker2 = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 60, // Longer timeout + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(60), // Longer timeout ..Default::default() }) - .await; + .await + .unwrap(); // Wait for reclaim to happen tokio::time::sleep(Duration::from_secs(2)).await; diff --git a/tests/retry_test.rs b/tests/retry_test.rs index 4c6decc..fbc2ca8 100644 --- a/tests/retry_test.rs +++ b/tests/retry_test.rs @@ -45,11 +45,12 @@ async fn test_retry_strategy_none_no_retry(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await + .unwrap(); let terminal = wait_for_task_terminal( &pool, @@ -87,7 +88,9 @@ async fn test_retry_strategy_fixed_delay(pool: PgPool) -> sqlx::Result<()> { error_message: "intentional failure".to_string(), }, SpawnOptions { - retry_strategy: Some(RetryStrategy::Fixed { base_seconds: 5 }), + retry_strategy: Some(RetryStrategy::Fixed { + base_delay: Duration::from_secs(5), + }), max_attempts: Some(2), ..Default::default() }, @@ -97,11 +100,12 @@ async fn test_retry_strategy_fixed_delay(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await + .unwrap(); // Wait for first attempt to fail tokio::time::sleep(Duration::from_millis(300)).await; @@ -164,9 +168,9 @@ async fn test_retry_strategy_exponential_backoff(pool: PgPool) -> sqlx::Result<( }, SpawnOptions { retry_strategy: Some(RetryStrategy::Exponential { - base_seconds: 2, + base_delay: Duration::from_secs(2), factor: 2.0, - max_seconds: 100, + max_backoff: Duration::from_secs(100), }), max_attempts: Some(3), ..Default::default() @@ -177,11 +181,12 @@ async fn test_retry_strategy_exponential_backoff(pool: PgPool) -> sqlx::Result<( let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await + .unwrap(); // Wait for first attempt to fail tokio::time::sleep(Duration::from_millis(300)).await; @@ -245,7 +250,9 @@ async fn test_max_attempts_honored(pool: PgPool) -> sqlx::Result<()> { error_message: "intentional failure".to_string(), }, SpawnOptions { - retry_strategy: Some(RetryStrategy::Fixed { base_seconds: 0 }), + retry_strategy: Some(RetryStrategy::Fixed { + base_delay: Duration::from_secs(0), + }), max_attempts: Some(3), ..Default::default() }, @@ -255,11 +262,12 @@ async fn test_max_attempts_honored(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await + .unwrap(); // Wait for all attempts to complete let terminal = wait_for_task_terminal( diff --git a/tests/spawn_test.rs b/tests/spawn_test.rs index e261179..5a79dd2 100644 --- a/tests/spawn_test.rs +++ b/tests/spawn_test.rs @@ -6,6 +6,7 @@ use common::tasks::{EchoParams, EchoTask, FailingParams, FailingTask}; use durable::{CancellationPolicy, Durable, MIGRATOR, RetryStrategy, SpawnOptions}; use sqlx::PgPool; use std::collections::HashMap; +use std::time::Duration; /// Helper to create a Durable client from the test pool. async fn create_client(pool: PgPool, queue_name: &str) -> Durable { @@ -135,7 +136,9 @@ async fn test_spawn_with_retry_strategy_fixed(pool: PgPool) -> sqlx::Result<()> client.register::().await.unwrap(); let options = SpawnOptions { - retry_strategy: Some(RetryStrategy::Fixed { base_seconds: 10 }), + retry_strategy: Some(RetryStrategy::Fixed { + base_delay: Duration::from_secs(10), + }), ..Default::default() }; @@ -162,9 +165,9 @@ async fn test_spawn_with_retry_strategy_exponential(pool: PgPool) -> sqlx::Resul let options = SpawnOptions { retry_strategy: Some(RetryStrategy::Exponential { - base_seconds: 5, + base_delay: Duration::from_secs(5), factor: 2.0, - max_seconds: 300, + max_backoff: Duration::from_secs(300), }), ..Default::default() }; @@ -222,8 +225,8 @@ async fn test_spawn_with_cancellation_policy(pool: PgPool) -> sqlx::Result<()> { let options = SpawnOptions { cancellation: Some(CancellationPolicy { - max_delay: Some(60), - max_duration: Some(300), + max_pending_time: Some(Duration::from_secs(60)), + max_running_time: Some(Duration::from_secs(300)), }), ..Default::default() }; @@ -279,7 +282,9 @@ async fn test_spawn_by_name_with_options(pool: PgPool) -> sqlx::Result<()> { let options = SpawnOptions { max_attempts: Some(3), - retry_strategy: Some(RetryStrategy::Fixed { base_seconds: 5 }), + retry_strategy: Some(RetryStrategy::Fixed { + base_delay: Duration::from_secs(5), + }), ..Default::default() }; diff --git a/tests/telemetry_test.rs b/tests/telemetry_test.rs index f5a48ba..9971624 100644 --- a/tests/telemetry_test.rs +++ b/tests/telemetry_test.rs @@ -155,11 +155,12 @@ async fn test_task_lifecycle_metrics(pool: PgPool) -> sqlx::Result<()> { // Start worker let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await + .unwrap(); // Wait for task to complete wait_for_task_state( @@ -240,11 +241,12 @@ async fn test_task_failure_metrics(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await + .unwrap(); // Wait for task to fail wait_for_task_state( @@ -284,11 +286,12 @@ async fn test_worker_gauge_metrics(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await + .unwrap(); // Give the worker time to set its active gauge tokio::time::sleep(Duration::from_millis(200)).await; @@ -352,11 +355,12 @@ async fn test_checkpoint_metrics(pool: PgPool) -> sqlx::Result<()> { let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await + .unwrap(); wait_for_task_state( &pool, @@ -412,11 +416,12 @@ async fn test_task_execution_duration_metrics(pool: PgPool) -> sqlx::Result<()> let worker = client .start_worker(WorkerOptions { - poll_interval: 0.05, - claim_timeout: 30, + poll_interval: Duration::from_millis(50), + claim_timeout: Duration::from_secs(30), ..Default::default() }) - .await; + .await + .unwrap(); wait_for_task_state( &pool,